storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [06/12] storm git commit: [STORM-1961] A few fixes and refactoring
Date Tue, 07 Feb 2017 01:28:19 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple9.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple9.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple9.java
new file mode 100644
index 0000000..0cd7139
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple9.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of nine elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ * @param <T4> the type of the fourth element
+ * @param <T5> the type of the fifth element
+ * @param <T6> the type of the sixth element
+ * @param <T7> the type of the seventh element
+ * @param <T8> the type of the eighth element
+ * @param <T9> the type of the ninth element
+ */
+public class Tuple9<T1, T2, T3, T4, T5, T6, T7, T8, T9> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+    public final T5 _5;
+    public final T6 _6;
+    public final T7 _7;
+    public final T8 _8;
+    public final T9 _9;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     * @param _4 the fourth element
+     * @param _5 the fifth element
+     * @param _6 the sixth element
+     * @param _7 the seventh element
+     * @param _8 the eighth element
+     * @param _9 the ninth element
+     */
+    public Tuple9(T1 _1, T2 _2, T3 _3, T4 _4, T5 _5, T6 _6, T7 _7, T8 _8, T9 _9) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+        this._5 = _5;
+        this._6 = _6;
+        this._7 = _7;
+        this._8 = _8;
+        this._9 = _9;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple9<?, ?, ?, ?, ?, ?, ?, ?, ?> tuple9 = (Tuple9<?, ?, ?, ?, ?, ?, ?,
?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple9._1) : tuple9._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple9._2) : tuple9._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple9._3) : tuple9._3 != null) return false;
+        if (_4 != null ? !_4.equals(tuple9._4) : tuple9._4 != null) return false;
+        if (_5 != null ? !_5.equals(tuple9._5) : tuple9._5 != null) return false;
+        if (_6 != null ? !_6.equals(tuple9._6) : tuple9._6 != null) return false;
+        if (_7 != null ? !_7.equals(tuple9._7) : tuple9._7 != null) return false;
+        if (_8 != null ? !_8.equals(tuple9._8) : tuple9._8 != null) return false;
+        return _9 != null ? _9.equals(tuple9._9) : tuple9._9 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        result = 31 * result + (_5 != null ? _5.hashCode() : 0);
+        result = 31 * result + (_6 != null ? _6.hashCode() : 0);
+        result = 31 * result + (_7 != null ? _7.hashCode() : 0);
+        result = 31 * result + (_8 != null ? _8.hashCode() : 0);
+        result = 31 * result + (_9 != null ? _9.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + "," + _5 + "," + _6 + "," + _7
+ "," + _8 + "," + _9 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
index e9d5127..aa877f9 100644
--- a/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
+++ b/storm-core/test/jvm/org/apache/storm/streams/ProcessorBoltTest.java
@@ -18,6 +18,7 @@
 package org.apache.storm.streams;
 
 import com.google.common.collect.Multimap;
+import org.apache.storm.generated.GlobalStreamId;
 import org.apache.storm.streams.operations.aggregators.Sum;
 import org.apache.storm.streams.processors.AggregateProcessor;
 import org.apache.storm.streams.processors.FilterProcessor;
@@ -34,10 +35,10 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 
 import static org.junit.Assert.assertArrayEquals;
@@ -99,7 +100,7 @@ public class ProcessorBoltTest {
         assertArrayEquals(new Object[]{mockTuple2, mockTuple3, punctuation}, anchor.getAllValues().get(0).toArray());
         assertArrayEquals(new Object[]{mockTuple2, mockTuple3, punctuation}, anchor.getAllValues().get(1).toArray());
         assertArrayEquals(new Object[]{new Values(200L), new Values("__punctuation")}, values.getAllValues().toArray());
-        assertArrayEquals(new Object[]{"outputstream", "outputstream"}, os.getAllValues().toArray());
+        assertArrayEquals(new Object[]{"outputstream", "outputstream__punctuation"}, os.getAllValues().toArray());
         Mockito.verify(mockOutputCollector).ack(mockTuple2);
         Mockito.verify(mockOutputCollector).ack(mockTuple3);
         Mockito.verify(mockOutputCollector).ack(punctuation);
@@ -138,6 +139,14 @@ public class ProcessorBoltTest {
         node.setWindowedParentStreams(windowedParentStreams);
         node.setWindowed(isWindowed);
         Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
+        Mockito.when(mockStreamToProcessors.keySet()).thenReturn(Collections.singleton("inputstream"));
+        Map mockSources = Mockito.mock(Map.class);
+        GlobalStreamId mockGlobalStreamId = Mockito.mock(GlobalStreamId.class);
+        Mockito.when(mockTopologyContext.getThisSources()).thenReturn(mockSources);
+        Mockito.when(mockSources.keySet()).thenReturn(Collections.singleton(mockGlobalStreamId));
+        Mockito.when(mockGlobalStreamId.get_streamId()).thenReturn("inputstream");
+        Mockito.when(mockGlobalStreamId.get_componentId()).thenReturn("bolt0");
+        Mockito.when(mockTopologyContext.getComponentTasks(Mockito.anyString())).thenReturn(Collections.singletonList(1));
         graph.addVertex(node);
         bolt = new ProcessorBolt("bolt1", graph, Collections.singletonList(node));
         if (tsFieldName != null && !tsFieldName.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
index dbc7e27..2e6e54a 100644
--- a/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
+++ b/storm-core/test/jvm/org/apache/storm/streams/StatefulProcessorBoltTest.java
@@ -19,7 +19,7 @@ package org.apache.storm.streams;
 
 import com.google.common.collect.Multimap;
 import org.apache.storm.state.KeyValueState;
-import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.StateUpdater;
 import org.apache.storm.streams.processors.Processor;
 import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
 import org.apache.storm.task.OutputCollector;
@@ -65,7 +65,17 @@ public class StatefulProcessorBoltTest {
 
     @Test
     public void testEmitAndAck() throws Exception {
-        setUpStatefulProcessorBolt(new UpdateStateByKeyProcessor<>(new Count<>()));
+        setUpStatefulProcessorBolt(new UpdateStateByKeyProcessor<>(new StateUpdater<Object,
Long>() {
+            @Override
+            public Long init() {
+                return 0L;
+            }
+
+            @Override
+            public Long apply(Long state, Object value) {
+                return state + 1;
+            }
+        }));
         bolt.execute(mockTuple1);
         ArgumentCaptor<Collection> anchor = ArgumentCaptor.forClass(Collection.class);
         ArgumentCaptor<Values> values = ArgumentCaptor.forClass(Values.class);
@@ -80,6 +90,7 @@ public class StatefulProcessorBoltTest {
 
     private void setUpStatefulProcessorBolt(Processor<?> processor) {
         ProcessorNode node = new ProcessorNode(processor, "outputstream", new Fields("value"));
+        node.setEmitsPair(true);
         Mockito.when(mockStreamToProcessors.get(Mockito.anyString())).thenReturn(Collections.singletonList(node));
         graph = new DefaultDirectedGraph(new StreamsEdgeFactory());
         graph.addVertex(node);

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java b/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
index 1498ae4..c88fe34 100644
--- a/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
+++ b/storm-core/test/jvm/org/apache/storm/streams/StreamBuilderTest.java
@@ -28,6 +28,7 @@ import org.apache.storm.streams.operations.aggregators.Count;
 import org.apache.storm.streams.operations.mappers.PairValueMapper;
 import org.apache.storm.streams.operations.mappers.ValueMapper;
 import org.apache.storm.streams.processors.BranchProcessor;
+import org.apache.storm.streams.windowing.TumblingWindows;
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.IRichBolt;
@@ -35,6 +36,7 @@ import org.apache.storm.topology.IRichSpout;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.utils.Utils;
@@ -115,9 +117,9 @@ public class StreamBuilderTest {
 
     @Test
     public void testGroupBy() throws Exception {
-        PairStream<String, String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new PairValueMapper<>(0, 1));
+        PairStream<String, String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new PairValueMapper<>(0, 1), 2);
 
-        stream.groupByKey().aggregateByKey(new Count<>());
+        stream.window(TumblingWindows.of(BaseWindowedBolt.Count.of(10))).aggregateByKey(new
Count<>());
 
         StormTopology topology = streamBuilder.build();
         assertEquals(2, topology.get_bolts_size());
@@ -129,7 +131,7 @@ public class StreamBuilderTest {
 
     @Test
     public void testGlobalAggregate() throws Exception {
-        Stream<String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0));
+        Stream<String> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0), 2);
 
         stream.aggregate(new Count<>());
 
@@ -142,6 +144,7 @@ public class StreamBuilderTest {
         expected1.put(new GlobalStreamId(spoutId, "default"), Grouping.shuffle(new NullStruct()));
         Map<GlobalStreamId, Grouping> expected2 = new HashMap<>();
         expected2.put(new GlobalStreamId("bolt1", "s1"), Grouping.fields(Collections.emptyList()));
+        expected2.put(new GlobalStreamId("bolt1", "s1__punctuation"), Grouping.all(new NullStruct()));
         assertEquals(expected1, bolt1.get_common().get_inputs());
         assertEquals(expected2, bolt2.get_common().get_inputs());
     }
@@ -169,11 +172,57 @@ public class StreamBuilderTest {
     public void testBranchAndJoin() throws Exception {
         TopologyContext mockContext = Mockito.mock(TopologyContext.class);
         OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
-        Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0));
+        Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0), 2);
         Stream<Integer>[] streams = stream.branch(x -> x % 2 == 0, x -> x % 2
== 1);
         PairStream<Integer, Pair<Integer, Integer>> joined = streams[0].mapToPair(x
-> Pair.of(x, 1)).join(streams[1].mapToPair(x -> Pair.of(x, 1)));
         assertTrue(joined.getNode() instanceof ProcessorNode);
         StormTopology topology = streamBuilder.build();
+        assertEquals(2, topology.get_bolts_size());
+    }
+
+    @Test
+    public void testMultiPartitionByKey() {
+        TopologyContext mockContext = Mockito.mock(TopologyContext.class);
+        OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
+        Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0));
+        stream.mapToPair(x -> Pair.of(x, x))
+                .window(TumblingWindows.of(BaseWindowedBolt.Count.of(10)))
+                .reduceByKey((x, y) -> x + y)
+                .reduceByKey((x, y) -> 0)
+                .print();
+        StormTopology topology = streamBuilder.build();
+        assertEquals(2, topology.get_bolts_size());
+    }
+
+    @Test
+    public void testMultiPartitionByKeyWithRepartition() {
+        TopologyContext mockContext = Mockito.mock(TopologyContext.class);
+        OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
+        Map<GlobalStreamId, Grouping> expected = new HashMap<>();
+        expected.put(new GlobalStreamId("bolt2", "s3"), Grouping.fields(Collections.singletonList("key")));
+        expected.put(new GlobalStreamId("bolt2", "s3__punctuation"), Grouping.all(new NullStruct()));
+        Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0));
+        stream.mapToPair(x -> Pair.of(x, x))
+                .window(TumblingWindows.of(BaseWindowedBolt.Count.of(10)))
+                .reduceByKey((x, y) -> x + y)
+                .repartition(10)
+                .reduceByKey((x, y) -> 0)
+                .print();
+        StormTopology topology = streamBuilder.build();
+        assertEquals(3, topology.get_bolts_size());
+        assertEquals(expected, topology.get_bolts().get("bolt3").get_common().get_inputs());
+
+    }
+
+    @Test
+    public void testPartitionByKeySinglePartition() {
+        TopologyContext mockContext = Mockito.mock(TopologyContext.class);
+        OutputCollector mockCollector = Mockito.mock(OutputCollector.class);
+        Stream<Integer> stream = streamBuilder.newStream(newSpout(Utils.DEFAULT_STREAM_ID),
new ValueMapper<>(0));
+        stream.mapToPair(x -> Pair.of(x, x))
+                .reduceByKey((x, y) -> x + y)
+                .print();
+        StormTopology topology = streamBuilder.build();
         assertEquals(1, topology.get_bolts_size());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java b/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
index 7428e3f..1010f6f 100644
--- a/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
+++ b/storm-core/test/jvm/org/apache/storm/streams/WindowedProcessorBoltTest.java
@@ -77,7 +77,7 @@ public class WindowedProcessorBoltTest {
         Mockito.verify(mockOutputCollector, Mockito.times(2)).emit(os.capture(), values.capture());
         assertEquals("outputstream", os.getAllValues().get(0));
         assertEquals(new Values(3L), values.getAllValues().get(0));
-        assertEquals("outputstream", os.getAllValues().get(1));
+        assertEquals("outputstream__punctuation", os.getAllValues().get(1));
         assertEquals(new Values(WindowNode.PUNCTUATION), values.getAllValues().get(1));
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/test/jvm/org/apache/storm/streams/processors/JoinProcessorTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/streams/processors/JoinProcessorTest.java
b/storm-core/test/jvm/org/apache/storm/streams/processors/JoinProcessorTest.java
new file mode 100644
index 0000000..a8ace8a
--- /dev/null
+++ b/storm-core/test/jvm/org/apache/storm/streams/processors/JoinProcessorTest.java
@@ -0,0 +1,108 @@
+package org.apache.storm.streams.processors;
+
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+
+import static org.junit.Assert.*;
+
+public class JoinProcessorTest {
+    JoinProcessor<Integer, Pair<Integer, Integer>, Integer, Integer> joinProcessor;
+    String leftStream = "left";
+    String rightStream = "right";
+    List<Pair<Integer, List<Pair<Integer, Integer>>>> res = new ArrayList<>();
+
+    ProcessorContext context = new ProcessorContext() {
+        @Override
+        public <T> void forward(T input) {
+            res.add((Pair<Integer, List<Pair<Integer, Integer>>>)input);
+        }
+
+        @Override
+        public <T> void forward(T input, String stream) {
+        }
+
+        @Override
+        public boolean isWindowed() {
+            return true;
+        }
+
+        @Override
+        public Set<String> getWindowedParentStreams() {
+            return null;
+        }
+    };
+
+    List<Pair<Integer, Integer>> leftKeyValeus = Arrays.asList(
+            Pair.of(2, 4),
+            Pair.of(5, 25),
+            Pair.of(7, 49)
+    );
+
+    List<Pair<Integer, Integer>> rightKeyValues = Arrays.asList(
+            Pair.of(1, 1),
+            Pair.of(2, 8),
+            Pair.of(5, 125),
+            Pair.of(6, 216)
+    );
+
+    @Test
+    public void testInnerJoin() throws Exception {
+        joinProcessor = new JoinProcessor<>(leftStream, rightStream, new PairValueJoiner<>());
+        processValues();
+        assertEquals(Pair.of(2, Pair.of(4, 8)), res.get(0));
+        assertEquals(Pair.of(5, Pair.of(25, 125)), res.get(1));
+    }
+
+    @Test
+    public void testLeftOuterJoin() throws Exception {
+        joinProcessor = new JoinProcessor<>(leftStream, rightStream, new PairValueJoiner<>(),
+                JoinProcessor.JoinType.OUTER, JoinProcessor.JoinType.INNER);
+        processValues();
+        assertEquals(Pair.of(2, Pair.of(4, 8)), res.get(0));
+        assertEquals(Pair.of(5, Pair.of(25, 125)), res.get(1));
+        assertEquals(Pair.of(7, Pair.of(49, null)), res.get(2));
+    }
+
+    @Test
+    public void testRightOuterJoin() throws Exception {
+        joinProcessor = new JoinProcessor<>(leftStream, rightStream, new PairValueJoiner<>(),
+                JoinProcessor.JoinType.INNER, JoinProcessor.JoinType.OUTER);
+        processValues();
+        assertEquals(Pair.of(1, Pair.of(null, 1)), res.get(0));
+        assertEquals(Pair.of(2, Pair.of(4, 8)), res.get(1));
+        assertEquals(Pair.of(5, Pair.of(25, 125)), res.get(2));
+        assertEquals(Pair.of(6, Pair.of(null, 216)), res.get(3));
+    }
+
+    @Test
+    public void testFullOuterJoin() throws Exception {
+        joinProcessor = new JoinProcessor<>(leftStream, rightStream, new PairValueJoiner<>(),
+                JoinProcessor.JoinType.OUTER, JoinProcessor.JoinType.OUTER);
+        processValues();
+        assertEquals(Pair.of(1, Pair.of(null, 1)), res.get(0));
+        assertEquals(Pair.of(2, Pair.of(4, 8)), res.get(1));
+        assertEquals(Pair.of(5, Pair.of(25, 125)), res.get(2));
+        assertEquals(Pair.of(6, Pair.of(null, 216)), res.get(3));
+        assertEquals(Pair.of(7, Pair.of(49, null)), res.get(4));
+    }
+
+    private void processValues() {
+        res.clear();
+        joinProcessor.init(context);
+        for (Pair<Integer, Integer> kv : leftKeyValeus) {
+            joinProcessor.execute(kv, leftStream);
+        }
+        for (Pair<Integer, Integer> kv : rightKeyValues) {
+            joinProcessor.execute(kv, rightStream);
+        }
+        joinProcessor.finish();
+    }
+
+}
\ No newline at end of file


Mime
View raw message