storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [07/12] storm git commit: [STORM-1961] A few fixes and refactoring
Date Tue, 07 Feb 2017 01:28:20 GMT
http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
index e19a0c6..7dff25d 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamBuilder.java
@@ -30,8 +30,10 @@ import org.apache.storm.streams.operations.mappers.PairValueMapper;
 import org.apache.storm.streams.operations.mappers.TupleValueMapper;
 import org.apache.storm.streams.processors.JoinProcessor;
 import org.apache.storm.streams.processors.MapProcessor;
+import org.apache.storm.streams.processors.Processor;
 import org.apache.storm.streams.processors.StateQueryProcessor;
 import org.apache.storm.streams.processors.StatefulProcessor;
+import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
 import org.apache.storm.streams.windowing.Window;
 import org.apache.storm.topology.BoltDeclarer;
 import org.apache.storm.topology.IBasicBolt;
@@ -195,18 +197,30 @@ public class StreamBuilder {
     }
 
     Node addNode(Node parent, Node child) {
-        return addNode(parent, child, parent.getParallelism(), parent.getOutputStreams().iterator().next());
+        return addNode(parent, child, parent.getOutputStreams().iterator().next(), parent.getParallelism());
     }
 
     Node addNode(Node parent, Node child, int parallelism) {
-        return addNode(parent, child, parallelism, parent.getOutputStreams().iterator().next());
+        return addNode(parent, child, parent.getOutputStreams().iterator().next(), parallelism);
+    }
+
+    // insert child in-between parent and its current child nodes
+    Node insert(Node parent, Node child) {
+        Node newChild = addNode(parent, child);
+        for (Edge edge : graph.outgoingEdgesOf(parent)) {
+            Node oldChild = edge.getTarget();
+            graph.removeEdge(parent, oldChild);
+            oldChild.removeParentStreams(parent);
+            addNode(newChild, oldChild);
+        }
+        return newChild;
     }
 
     Node addNode(Node parent, Node child, String parentStreamId) {
-        return addNode(parent, child, parent.getParallelism(), parentStreamId);
+        return addNode(parent, child, parentStreamId, parent.getParallelism());
     }
 
-    Node addNode(Node parent, Node child, int parallelism, String parentStreamId) {
+    Node addNode(Node parent, Node child, String parentStreamId, int parallelism) {
         graph.addVertex(child);
         graph.addEdge(parent, child);
         child.setParallelism(parallelism);
@@ -215,34 +229,62 @@ public class StreamBuilder {
         } else {
             child.addParentStream(parent, parentStreamId);
         }
+        if (!(child instanceof PartitionNode)) {
+            if (child.getGroupingInfo() != null) {
+                if (!child.getGroupingInfo().equals(parent.getGroupingInfo())) {
+                    throw new IllegalStateException("Trying to assign grouping info for node" +
+                            " with current grouping info: " + child.getGroupingInfo() +
+                            " to: " + parent.getGroupingInfo() + " Node: " + child);
+                }
+            } else {
+                child.setGroupingInfo(parent.getGroupingInfo());
+            }
+        }
+        if (!(child instanceof WindowNode) && !child.isWindowed()) {
+            child.setWindowed(parent.isWindowed());
+        }
         return child;
     }
 
     private PriorityQueue<Node> queue() {
         // min-heap
         return new PriorityQueue<>(new Comparator<Node>() {
+            /*
+             * Nodes in the descending order of priority.
+             * ProcessorNode has higher priority than partition and window nodes
+             * so that the topological order iterator will group as many processor nodes together as possible.
+             * UpdateStateByKeyProcessor has a higher priority than StateQueryProcessor so that StateQueryProcessor
+             * can be mapped to the same StatefulBolt that UpdateStateByKeyProcessor is part of.
+             */
+            Map<Class<?>, Integer> p = new HashMap<>();
+            {
+                p.put(SpoutNode.class, 0);
+                p.put(UpdateStateByKeyProcessor.class, 1);
+                p.put(ProcessorNode.class, 2);
+                p.put(StateQueryProcessor.class, 3);
+                p.put(PartitionNode.class, 4);
+                p.put(WindowNode.class, 5);
+                p.put(SinkNode.class, 6);
+            }
             @Override
             public int compare(Node n1, Node n2) {
-                return getPriority(n1.getClass()) - getPriority(n2.getClass());
+                return getPriority(n1) - getPriority(n2);
             }
 
-            private int getPriority(Class<? extends Node> clazz) {
-                /*
-                 * Nodes in the descending order of priority.
-                 * ProcessorNode has the highest priority so that the topological order iterator
-                 * will group as many processor nodes together as possible.
-                 */
-                Class<?>[] p = new Class<?>[]{
-                        ProcessorNode.class,
-                        SpoutNode.class,
-                        SinkNode.class,
-                        PartitionNode.class,
-                        WindowNode.class};
-                for (int i = 0; i < p.length; i++) {
-                    if (clazz.equals(p[i])) {
-                        return i;
+            private int getPriority(Node node) {
+                Integer priority;
+                // check if processor has specific priority first
+                if (node instanceof ProcessorNode) {
+                    Processor processor = ((ProcessorNode) node).getProcessor();
+                    priority = p.get(processor.getClass());
+                    if (priority != null) {
+                        return priority;
                     }
                 }
+                priority = p.get(node.getClass());
+                if (priority != null) {
+                    return priority;
+                }
                 return Integer.MAX_VALUE;
             }
         });
@@ -319,7 +361,7 @@ public class StreamBuilder {
         }
     }
 
-    private Node parentNode(Node curNode) {
+    Node parentNode(Node curNode) {
         Set<Node> parentNode = parentNodes(curNode);
         if (parentNode.size() > 1) {
             throw new IllegalArgumentException("Node " + curNode + " has more than one parent node.");
@@ -350,7 +392,6 @@ public class StreamBuilder {
         String boltId = UniqueIdGen.getInstance().getUniqueBoltId();
         for (ProcessorNode processorNode : curGroup) {
             processorNode.setComponentId(boltId);
-            processorNode.setWindowed(isWindowed(processorNode));
             processorNode.setWindowedParentStreams(getWindowedParentStreams(processorNode));
         }
         final Set<ProcessorNode> initialProcessors = initialProcessors(curGroup);
@@ -397,12 +438,7 @@ public class StreamBuilder {
         Set<WindowNode> windowNodes = new HashSet<>();
         Set<Node> parents;
         for (ProcessorNode processorNode : initialProcessors) {
-            if (processorNode.getProcessor() instanceof JoinProcessor) {
-                String leftStream = ((JoinProcessor) processorNode.getProcessor()).getLeftStream();
-                parents = processorNode.getParents(leftStream);
-            } else {
-                parents = parentNodes(processorNode);
-            }
+            parents = parentNodes(processorNode);
             for (Node node : parents) {
                 if (windowInfo.containsKey(node)) {
                     windowNodes.add(windowInfo.get(node));
@@ -438,7 +474,7 @@ public class StreamBuilder {
         }
         for (Node parent : parentNodes(sinkNode)) {
             for (String stream : sinkNode.getParentStreams(parent)) {
-                declareStream(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
+                declareGrouping(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
             }
         }
     }
@@ -511,16 +547,8 @@ public class StreamBuilder {
     private Set<String> getWindowedParentStreams(ProcessorNode processorNode) {
         Set<String> res = new HashSet<>();
         for (Node parent : parentNodes(processorNode)) {
-            if (parent instanceof ProcessorNode) {
-                ProcessorNode pn = (ProcessorNode) parent;
-                if (pn.isWindowed()) {
-                    res.addAll(Collections2.filter(pn.getOutputStreams(), new Predicate<String>() {
-                        @Override
-                        public boolean apply(String input) {
-                            return !StreamUtil.isSinkStream(input);
-                        }
-                    }));
-                }
+            if (parent instanceof ProcessorNode && parent.isWindowed()) {
+                res.addAll(parent.getOutputStreams());
             }
         }
         return res;
@@ -539,10 +567,14 @@ public class StreamBuilder {
                     LOG.debug("Parent {} of curNode {} is in curGroup {}", parent, curNode, curGroup);
                 } else {
                     for (String stream : curNode.getParentStreams(parent)) {
-                        declareStream(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
+                        declareGrouping(boltDeclarer, parent, stream, nodeGroupingInfo.get(parent, stream));
                         // put global stream id for spouts
                         if (parent.getComponentId().startsWith("spout")) {
                             stream = parent.getComponentId() + stream;
+                        } else {
+                            // subscribe to parent's punctuation stream
+                            String punctuationStream = StreamUtil.getPunctuationStream(stream);
+                            declareGrouping(boltDeclarer, parent, punctuationStream, GroupingInfo.all());
                         }
                         streamToInitialProcessor.put(stream, curNode);
                     }
@@ -552,7 +584,7 @@ public class StreamBuilder {
         return streamToInitialProcessor;
     }
 
-    private void declareStream(BoltDeclarer boltDeclarer, Node parent, String streamId, GroupingInfo grouping) {
+    private void declareGrouping(BoltDeclarer boltDeclarer, Node parent, String streamId, GroupingInfo grouping) {
         if (grouping == null) {
             boltDeclarer.shuffleGrouping(parent.getComponentId(), streamId);
         } else {
@@ -572,20 +604,4 @@ public class StreamBuilder {
         }
         return nodes;
     }
-
-    private boolean isWindowed(Node curNode) {
-        for (Node parent : StreamUtil.<Node>getParents(graph, curNode)) {
-            if (parent instanceof WindowNode) {
-                return true;
-            } else if (parent instanceof ProcessorNode) {
-                ProcessorNode p = (ProcessorNode) parent;
-                if (p.isWindowed()) {
-                    return true;
-                }
-            } else {
-                return (parent instanceof PartitionNode) && isWindowed(parent);
-            }
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java b/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
index 0531ff6..e0bf7d3 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/StreamUtil.java
@@ -17,11 +17,14 @@
  */
 package org.apache.storm.streams;
 
+import org.apache.storm.tuple.Fields;
 import org.jgrapht.DirectedGraph;
 
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.storm.streams.WindowNode.PUNCTUATION;
+
 public class StreamUtil {
     @SuppressWarnings("unchecked")
     public static <T> List<T> getParents(DirectedGraph<Node, Edge> graph, Node node) {
@@ -43,20 +46,24 @@ public class StreamUtil {
         return ret;
     }
 
-
-    public static boolean isSinkStream(String streamId) {
-        return streamId.endsWith("__sink");
+    public static boolean isPunctuation(Object value) {
+        return PUNCTUATION.equals(value);
     }
 
-    public static String getSinkStream(String streamId) {
-        return streamId + "__sink";
+    public static String getPunctuationStream(String stream) {
+        return stream + PUNCTUATION;
     }
 
-    public static boolean isPunctuation(Object value) {
-        if (value instanceof Pair) {
-            value = ((Pair) value).getFirst();
+    public static String getSourceStream(String stream) {
+        int idx = stream.lastIndexOf(PUNCTUATION);
+        if (idx > 0) {
+            return stream.substring(0, idx);
         }
-        return WindowNode.PUNCTUATION.equals(value);
+        return stream;
+    }
+
+    public static Fields getPunctuationFields() {
+        return new Fields(PUNCTUATION);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java b/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
deleted file mode 100644
index 77973f2..0000000
--- a/storm-core/src/jvm/org/apache/storm/streams/Tuple3.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.streams;
-
-/**
- * A tuple of three elements along the lines of Scala's Tuple.
- *
- * @param <T1> the type of the first element
- * @param <T2> the type of the second element
- * @param <T3> the type of the third element
- */
-public class Tuple3<T1, T2, T3> {
-    public final T1 _1;
-    public final T2 _2;
-    public final T3 _3;
-
-    /**
-     * Constructs a new tuple of three elements.
-     *
-     * @param _1 the first element
-     * @param _2 the second element
-     * @param _3 the third element
-     */
-    public Tuple3(T1 _1, T2 _2, T3 _3) {
-        this._1 = _1;
-        this._2 = _2;
-        this._3 = _3;
-    }
-
-    @Override
-    public String toString() {
-        return "(" + _1 + "," + _2 + "," + _3 + ")";
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java b/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
index a0e831a..c1783b5 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/WindowNode.java
@@ -29,6 +29,7 @@ public class WindowNode extends Node {
 
     WindowNode(Window<?, ?> windowParams, String outputStream, Fields outputFields) {
         super(outputStream, outputFields);
+        setWindowed(true);
         this.windowParams = windowParams;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
deleted file mode 100644
index e3feaf4..0000000
--- a/storm-core/src/jvm/org/apache/storm/streams/operations/Aggregator.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.streams.operations;
-
-/**
- * Interface for aggregating values.
- *
- * @param <T> the original value type
- * @param <R> the aggregated value type
- */
-public interface Aggregator<T, R> extends Operation {
-    /**
-     * The initial value of the aggregate to start with.
-     *
-     * @return the initial value
-     */
-    R init();
-
-    /**
-     * Returns a new aggregate by applying the value with the current aggregate.
-     *
-     * @param value     the value to aggregate
-     * @param aggregate the current aggregate
-     * @return the new aggregate
-     */
-    R apply(T value, R aggregate);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/BiFunction.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/BiFunction.java b/storm-core/src/jvm/org/apache/storm/streams/operations/BiFunction.java
new file mode 100644
index 0000000..7732e47
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/BiFunction.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * a function that accepts two arguments and produces a result.
+ *
+ * @param <T> the type of the first argument to the function
+ * @param <U> the type of the second argument to the function
+ * @param <R> the type of the result of the function
+ */
+public interface BiFunction<T, U, R> extends Operation {
+    /**
+     * Applies this function to the given arguments.
+     *
+     * @param input1 the first function argument
+     * @param input2 the second function argument
+     * @return the function result
+     */
+
+    R apply(T input1, U input2);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/CombinerAggregator.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/CombinerAggregator.java b/storm-core/src/jvm/org/apache/storm/streams/operations/CombinerAggregator.java
new file mode 100644
index 0000000..a74b2c8
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/CombinerAggregator.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Interface for aggregating values.
+ *
+ * @param <T> the original value type
+ * @param <A> the accumulator type
+ * @param <R> the result type
+ */
+public interface CombinerAggregator<T, A, R> extends Operation {
+    /**
+     * The initial value of the accumulator to start with.
+     *
+     * @return the initial value of the accumulator
+     */
+    A init();
+
+    /**
+     * Updates the accumulator by applying the current accumulator with the value.
+     *
+     * @param accumulator the current accumulator
+     * @param value       the value
+     * @return the updated accumulator
+     */
+    A apply(A accumulator, T value);
+
+    /**
+     * Merges two accumulators and returns the merged accumulator.
+     *
+     * @param accum1 the first accumulator
+     * @param accum2 the second accumulator
+     * @return the merged accumulator
+     */
+    A merge(A accum1, A accum2);
+
+    /**
+     * Produces a result value out of the accumulator.
+     *
+     * @param accum the accumulator
+     * @return the result
+     */
+    R result(A accum);
+
+    /**
+     * A static factory to create a {@link CombinerAggregator} based on initial value, accumulator and combiner.
+     *
+     * @param initialValue the initial value of the result to start with
+     * @param accumulator  a function that accumulates values into a partial result
+     * @param combiner     a function that combines partially accumulated results
+     * @param <T>          the value type
+     * @param <R>          the result type
+     * @return the {@link CombinerAggregator}
+     */
+    static <T, R> CombinerAggregator<T, R, R> of(R initialValue,
+                                                 BiFunction<? super R, ? super T, ? extends R> accumulator,
+                                                 BiFunction<? super R, ? super R, ? extends R> combiner) {
+        return new CombinerAggregator<T, R, R>() {
+            @Override
+            public R init() {
+                return initialValue;
+            }
+
+            @Override
+            public R apply(R aggregate, T value) {
+                return accumulator.apply(aggregate, value);
+            }
+
+            @Override
+            public R merge(R accum1, R accum2) {
+                return combiner.apply(accum1, accum2);
+            }
+
+            @Override
+            public R result(R accum) {
+                return accum;
+            }
+        };
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java b/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
index 04ee70d..48dc6fe 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/Reducer.java
@@ -23,7 +23,7 @@ package org.apache.storm.streams.operations;
  *
  * @param <T> the type of the arguments and the result
  */
-public interface Reducer<T> extends Operation {
+public interface Reducer<T> extends BiFunction<T, T, T> {
     /**
      * Applies this function to the given arguments.
      *

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/StateUpdater.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/StateUpdater.java b/storm-core/src/jvm/org/apache/storm/streams/operations/StateUpdater.java
new file mode 100644
index 0000000..a1b1383
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/StateUpdater.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations;
+
+/**
+ * Interface for updating state.
+ *
+ * @param <T> the value type
+ * @param <S> the state type
+ */
+public interface StateUpdater<T, S> extends Operation {
+    /**
+     * The initial value of the state to start with.
+     *
+     * @return the initial value of the state
+     */
+    S init();
+
+    /**
+     * Returns a new state by applying the value on the current state.
+     *
+     * @param state the current state
+     * @param value the value
+     * @return the new state
+     */
+    S apply(S state, T value);
+
+    /**
+     * A static factory to create a {@link StateUpdater} based on an initial value of the state and a
+     * state update function.
+     *
+     * @param initialValue  the intial value of the state
+     * @param stateUpdateFn the state update function
+     * @param <T>           the value type
+     * @param <S>           the state type
+     * @return the {@link StateUpdater}
+     */
+    static <T, S> StateUpdater<T, S> of(S initialValue,
+                                        BiFunction<? super S, ? super T, ? extends S> stateUpdateFn) {
+        return new StateUpdater<T, S>() {
+            @Override
+            public S init() {
+                return initialValue;
+            }
+
+            @Override
+            public S apply(S state, T value) {
+                return stateUpdateFn.apply(state, value);
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Count.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Count.java b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Count.java
index fd02d2a..6589ed6 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Count.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Count.java
@@ -17,21 +17,31 @@
  */
 package org.apache.storm.streams.operations.aggregators;
 
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.CombinerAggregator;
 
 /**
  * Computes the count of values.
  *
  * @param <T> the value type
  */
-public class Count<T> implements Aggregator<T, Long> {
+public class Count<T> implements CombinerAggregator<T, Long, Long> {
     @Override
     public Long init() {
         return 0L;
     }
 
     @Override
-    public Long apply(T value, Long aggregate) {
-        return aggregate + 1;
+    public Long apply(Long accum, T value) {
+        return accum + 1;
+    }
+
+    @Override
+    public Long merge(Long accum1, Long accum2) {
+        return accum1 + accum2;
+    }
+
+    @Override
+    public Long result(Long accum) {
+        return accum;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
index e232075..df11d99 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/aggregators/Sum.java
@@ -17,19 +17,29 @@
  */
 package org.apache.storm.streams.operations.aggregators;
 
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.CombinerAggregator;
 
 /**
  * Computes the long sum of the input values
  */
-public class Sum implements Aggregator<Number, Long> {
+public class Sum implements CombinerAggregator<Number, Long, Long> {
     @Override
     public Long init() {
         return 0L;
     }
 
     @Override
-    public Long apply(Number value, Long aggregate) {
+    public Long apply(Long aggregate, Number value) {
         return value.longValue() + aggregate;
     }
+
+    @Override
+    public Long merge(Long accum1, Long accum2) {
+        return accum1 + accum2;
+    }
+
+    @Override
+    public Long result(Long accum) {
+        return accum;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/operations/mappers/TupleValueMappers.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/operations/mappers/TupleValueMappers.java b/storm-core/src/jvm/org/apache/storm/streams/operations/mappers/TupleValueMappers.java
new file mode 100644
index 0000000..f900f84
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/operations/mappers/TupleValueMappers.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.operations.mappers;
+
+import org.apache.storm.streams.tuple.Tuple3;
+import org.apache.storm.streams.tuple.Tuple4;
+import org.apache.storm.streams.tuple.Tuple5;
+import org.apache.storm.streams.tuple.Tuple6;
+import org.apache.storm.streams.tuple.Tuple7;
+import org.apache.storm.streams.tuple.Tuple8;
+import org.apache.storm.streams.tuple.Tuple9;
+import org.apache.storm.streams.tuple.Tuple10;
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Factory for constructing typed tuples from a {@link Tuple}
+ * based on indicies.
+ */
+@SuppressWarnings("unchecked")
+public final class TupleValueMappers {
+    private TupleValueMappers() {
+    }
+
+    public static <T1, T2, T3>
+    TupleValueMapper<Tuple3<T1, T2, T3>> of(int index1,
+                                            int index2,
+                                            int index3) {
+        return input -> new Tuple3<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3));
+    }
+
+    public static <T1, T2, T3, T4>
+    TupleValueMapper<Tuple4<T1, T2, T3, T4>> of(int index1,
+                                                int index2,
+                                                int index3,
+                                                int index4) {
+        return input -> new Tuple4<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4));
+    }
+
+    public static <T1, T2, T3, T4, T5>
+    TupleValueMapper<Tuple5<T1, T2, T3, T4, T5>> of(int index1,
+                                                    int index2,
+                                                    int index3,
+                                                    int index4,
+                                                    int index5) {
+        return input -> new Tuple5<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4),
+                (T5) input.getValue(index5));
+    }
+
+    public static <T1, T2, T3, T4, T5, T6>
+    TupleValueMapper<Tuple6<T1, T2, T3, T4, T5, T6>> of(int index1,
+                                                        int index2,
+                                                        int index3,
+                                                        int index4,
+                                                        int index5,
+                                                        int index6) {
+        return input -> new Tuple6<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4),
+                (T5) input.getValue(index5),
+                (T6) input.getValue(index6));
+    }
+
+    public static <T1, T2, T3, T4, T5, T6, T7>
+    TupleValueMapper<Tuple7<T1, T2, T3, T4, T5, T6, T7>> of(int index1,
+                                                            int index2,
+                                                            int index3,
+                                                            int index4,
+                                                            int index5,
+                                                            int index6,
+                                                            int index7) {
+        return input -> new Tuple7<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4),
+                (T5) input.getValue(index5),
+                (T6) input.getValue(index6),
+                (T7) input.getValue(index7));
+    }
+
+    public static <T1, T2, T3, T4, T5, T6, T7, T8>
+    TupleValueMapper<Tuple8<T1, T2, T3, T4, T5, T6, T7, T8>> of(int index1,
+                                                                int index2,
+                                                                int index3,
+                                                                int index4,
+                                                                int index5,
+                                                                int index6,
+                                                                int index7,
+                                                                int index8) {
+        return input -> new Tuple8<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4),
+                (T5) input.getValue(index5),
+                (T6) input.getValue(index6),
+                (T7) input.getValue(index7),
+                (T8) input.getValue(index8));
+    }
+
+    public static <T1, T2, T3, T4, T5, T6, T7, T8, T9>
+    TupleValueMapper<Tuple9<T1, T2, T3, T4, T5, T6, T7, T8, T9>> of(int index1,
+                                                                    int index2,
+                                                                    int index3,
+                                                                    int index4,
+                                                                    int index5,
+                                                                    int index6,
+                                                                    int index7,
+                                                                    int index8,
+                                                                    int index9) {
+        return input -> new Tuple9<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4),
+                (T5) input.getValue(index5),
+                (T6) input.getValue(index6),
+                (T7) input.getValue(index7),
+                (T8) input.getValue(index8),
+                (T9) input.getValue(index9));
+    }
+
+    public static <T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>
+    TupleValueMapper<Tuple10<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10>> of(int index1,
+                                                                          int index2,
+                                                                          int index3,
+                                                                          int index4,
+                                                                          int index5,
+                                                                          int index6,
+                                                                          int index7,
+                                                                          int index8,
+                                                                          int index9,
+                                                                          int index10) {
+        return input -> new Tuple10<>(
+                (T1) input.getValue(index1),
+                (T2) input.getValue(index2),
+                (T3) input.getValue(index3),
+                (T4) input.getValue(index4),
+                (T5) input.getValue(index5),
+                (T6) input.getValue(index6),
+                (T7) input.getValue(index7),
+                (T8) input.getValue(index8),
+                (T9) input.getValue(index9),
+                (T10) input.getValue(index10));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
index d53a52d..c10113b 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateByKeyProcessor.java
@@ -18,37 +18,60 @@
 package org.apache.storm.streams.processors;
 
 import org.apache.storm.streams.Pair;
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.CombinerAggregator;
 
 import java.util.HashMap;
 import java.util.Map;
 
-public class AggregateByKeyProcessor<K, V, R> extends BaseProcessor<Pair<K, V>> implements BatchProcessor {
-    private final Aggregator<V, R> aggregator;
-    private final Map<K, R> state = new HashMap<>();
+public class AggregateByKeyProcessor<K, V, A, R> extends BaseProcessor<Pair<K, V>> implements BatchProcessor {
+    private final CombinerAggregator<V, A, R> aggregator;
+    private final boolean emitAggregate;
+    private final Map<K, A> state = new HashMap<>();
 
-    public AggregateByKeyProcessor(Aggregator<V, R> aggregator) {
+    public AggregateByKeyProcessor(CombinerAggregator<V, A, R> aggregator) {
+        this(aggregator, false);
+    }
+
+    public AggregateByKeyProcessor(CombinerAggregator<V, A, R> aggregator, boolean emitAggregate) {
         this.aggregator = aggregator;
+        this.emitAggregate = emitAggregate;
     }
 
     @Override
     public void execute(Pair<K, V> input) {
         K key = input.getFirst();
         V val = input.getSecond();
-        R agg = state.get(key);
-        if (agg == null) {
-            agg = aggregator.init();
+        A accumulator = state.get(key);
+        if (accumulator == null) {
+            accumulator = aggregator.init();
+        }
+        state.put(key, aggregator.apply(accumulator, val));
+        if (emitAggregate) {
+            mayBeForwardAggUpdate(Pair.of(key, state.get(key)));
+        } else {
+            mayBeForwardAggUpdate(Pair.of(key, aggregator.result(state.get(key))));
         }
-        state.put(key, aggregator.apply(val, agg));
-        mayBeForwardAggUpdate(Pair.of(key, state.get(key)));
     }
 
     @Override
     public void finish() {
-        for (Map.Entry<K, R> entry : state.entrySet()) {
-            context.forward(Pair.of(entry.getKey(), entry.getValue()));
+        for (Map.Entry<K, A> entry : state.entrySet()) {
+            if (emitAggregate) {
+                context.forward(Pair.of(entry.getKey(), entry.getValue()));
+            } else {
+                context.forward(Pair.of(entry.getKey(), aggregator.result(entry.getValue())));
+            }
+
         }
         state.clear();
     }
 
+    @Override
+    public String toString() {
+        return "AggregateByKeyProcessor{" +
+                "aggregator=" + aggregator +
+                ", emitAggregate=" + emitAggregate +
+                ", state=" + state +
+                "}";
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
index c5a1906..d169345 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/AggregateProcessor.java
@@ -17,14 +17,20 @@
  */
 package org.apache.storm.streams.processors;
 
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.CombinerAggregator;
 
-public class AggregateProcessor<T, R> extends BaseProcessor<T> implements BatchProcessor {
-    private final Aggregator<T, R> aggregator;
-    private R state;
+public class AggregateProcessor<T, A, R> extends BaseProcessor<T> implements BatchProcessor {
+    private final CombinerAggregator<T, A, R> aggregator;
+    private final boolean emitAggregate;
+    private A state;
 
-    public AggregateProcessor(Aggregator<T, R> aggregator) {
+    public AggregateProcessor(CombinerAggregator<T, A, R> aggregator) {
+        this(aggregator, false);
+    }
+
+    public AggregateProcessor(CombinerAggregator<T, A, R> aggregator, boolean emitAggregate) {
         this.aggregator = aggregator;
+        this.emitAggregate = emitAggregate;
     }
 
     @Override
@@ -32,14 +38,32 @@ public class AggregateProcessor<T, R> extends BaseProcessor<T> implements BatchP
         if (state == null) {
             state = aggregator.init();
         }
-        R curAggregate = (state != null) ? state : aggregator.init();
-        state = aggregator.apply(input, curAggregate);
-        mayBeForwardAggUpdate(state);
+        state = aggregator.apply(state, input);
+        if (emitAggregate) {
+            mayBeForwardAggUpdate(state);
+        } else {
+            mayBeForwardAggUpdate(aggregator.result(state));
+        }
     }
 
     @Override
     public void finish() {
-        context.forward(state);
-        state = null;
+        if (state != null) {
+            if (emitAggregate) {
+                context.forward(state);
+            } else {
+                context.forward(aggregator.result(state));
+            }
+            state = null;
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "AggregateProcessor{" +
+                "aggregator=" + aggregator +
+                ", emitAggregate=" + emitAggregate +
+                ", state=" + state +
+                "}";
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/BranchProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/BranchProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/BranchProcessor.java
index f8bc739..421a5a7 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/BranchProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/BranchProcessor.java
@@ -23,15 +23,15 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class BranchProcessor<T> extends BaseProcessor<T> {
-    private final Map<Predicate<T>, String> predicateToStream = new HashMap<>();
+    private final Map<Predicate<? super T>, String> predicateToStream = new HashMap<>();
 
-    public void addPredicate(Predicate<T> predicate, String stream) {
+    public void addPredicate(Predicate<? super T> predicate, String stream) {
         predicateToStream.put(predicate, stream);
     }
 
     @Override
     public void execute(T input) {
-        for (Map.Entry<Predicate<T>, String> entry : predicateToStream.entrySet()) {
+        for (Map.Entry<Predicate<? super T>, String> entry : predicateToStream.entrySet()) {
             if (entry.getKey().test(input)) {
                 context.forward(input, entry.getValue());
                 break;

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/EmittingProcessorContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/EmittingProcessorContext.java b/storm-core/src/jvm/org/apache/storm/streams/processors/EmittingProcessorContext.java
index d841080..a85eaf7 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/EmittingProcessorContext.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/EmittingProcessorContext.java
@@ -46,11 +46,11 @@ public class EmittingProcessorContext implements ProcessorContext {
     private static final Logger LOG = LoggerFactory.getLogger(EmittingProcessorContext.class);
     private final ProcessorNode processorNode;
     private final String outputStreamId;
+    private final String punctuationStreamId;
     private final OutputCollector collector;
     private final Fields outputFields;
     private final Values punctuation;
     private final List<RefCountedTuple> anchors = new ArrayList<>();
-    private boolean emitPunctuation = true;
     private long eventTimestamp;
     private String timestampField;
 
@@ -59,23 +59,20 @@ public class EmittingProcessorContext implements ProcessorContext {
         this.outputStreamId = outputStreamId;
         this.collector = collector;
         outputFields = processorNode.getOutputFields();
-        punctuation = createPunctuation();
+        punctuation = new Values(PUNCTUATION);
+        punctuationStreamId = StreamUtil.getPunctuationStream(outputStreamId);
     }
 
     @Override
     public <T> void forward(T input) {
-        if (input instanceof Pair) {
-            Pair<?, ?> value = (Pair<?, ?>) input;
-            emit(new Values(value.getFirst(), value.getSecond()));
-        } else if (PUNCTUATION.equals(input)) {
-            if (emitPunctuation) {
-                emit(punctuation);
-            } else {
-                LOG.debug("Not emitting punctuation since emitPunctuation is false");
-            }
+        if (PUNCTUATION.equals(input)) {
+            emit(punctuation, punctuationStreamId);
             maybeAck();
+        } else if (processorNode.emitsPair()) {
+            Pair<?, ?> value = (Pair<?, ?>) input;
+            emit(new Values(value.getFirst(), value.getSecond()), outputStreamId);
         } else {
-            emit(new Values(input));
+            emit(new Values(input), outputStreamId);
         }
     }
 
@@ -96,10 +93,6 @@ public class EmittingProcessorContext implements ProcessorContext {
         return processorNode.getWindowedParentStreams();
     }
 
-    public void setEmitPunctuation(boolean emitPunctuation) {
-        this.emitPunctuation = emitPunctuation;
-    }
-
     public void setTimestampField(String fieldName) {
         timestampField = fieldName;
     }
@@ -128,14 +121,6 @@ public class EmittingProcessorContext implements ProcessorContext {
         this.eventTimestamp = timestamp;
     }
 
-    private Values createPunctuation() {
-        Values values = new Values();
-        for (int i = 0; i < outputFields.size(); i++) {
-            values.add(PUNCTUATION);
-        }
-        return values;
-    }
-
     private void maybeAck() {
         if (!anchors.isEmpty()) {
             for (RefCountedTuple anchor : anchors) {
@@ -154,7 +139,7 @@ public class EmittingProcessorContext implements ProcessorContext {
         return anchors.stream().map(RefCountedTuple::tuple).collect(Collectors.toList());
     }
 
-    private void emit(Values values) {
+    private void emit(Values values, String outputStreamId) {
         if (timestampField != null) {
             values.add(eventTimestamp);
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/JoinProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/JoinProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/JoinProcessor.java
index d56cfea..05cad8d 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/JoinProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/JoinProcessor.java
@@ -20,12 +20,14 @@ package org.apache.storm.streams.processors;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.Multimap;
 import org.apache.storm.streams.Pair;
-import org.apache.storm.streams.Tuple3;
 import org.apache.storm.streams.operations.ValueJoiner;
+import org.apache.storm.streams.tuple.Tuple3;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 /**
  * Provides equi-join implementation based on simple hash-join.
@@ -36,11 +38,25 @@ public class JoinProcessor<K, R, V1, V2> extends BaseProcessor<Pair<K, ?>> imple
     private final String rightStream;
     private final List<Pair<K, V1>> leftRows = new ArrayList<>();
     private final List<Pair<K, V2>> rightRows = new ArrayList<>();
+    private final JoinType leftType;
+    private final JoinType rightType;
+
+    public enum JoinType {
+        INNER,
+        OUTER
+    }
 
     public JoinProcessor(String leftStream, String rightStream, ValueJoiner<V1, V2, R> valueJoiner) {
+        this(leftStream, rightStream, valueJoiner, JoinType.INNER, JoinType.INNER);
+    }
+
+    public JoinProcessor(String leftStream, String rightStream, ValueJoiner<V1, V2, R> valueJoiner,
+                         JoinType leftType, JoinType rightType) {
         this.valueJoiner = valueJoiner;
         this.leftStream = leftStream;
         this.rightStream = rightStream;
+        this.leftType = leftType;
+        this.rightType = rightType;
     }
 
     @Override
@@ -78,30 +94,54 @@ public class JoinProcessor<K, R, V1, V2> extends BaseProcessor<Pair<K, ?>> imple
         return rightStream;
     }
 
+    /*
+     * performs a hash-join by constructing a hash map of the smaller set, iterating over the
+     * larger set and finding matching rows in the hash map.
+     */
     private void joinAndForward(List<Pair<K, V1>> leftRows, List<Pair<K, V2>> rightRows) {
-        if (leftRows.size() <= rightRows.size()) {
-            for (Tuple3<K, V1, V2> res : join(getJoinTable(leftRows), rightRows)) {
+        if (leftRows.size() < rightRows.size()) {
+            for (Tuple3<K, V1, V2> res : join(getJoinTable(leftRows), rightRows, leftType, rightType)) {
                 context.forward(Pair.of(res._1, valueJoiner.apply(res._2, res._3)));
             }
         } else {
-            for (Tuple3<K, V2, V1> res : join(getJoinTable(rightRows), leftRows)) {
+            for (Tuple3<K, V2, V1> res : join(getJoinTable(rightRows), leftRows, rightType, leftType)) {
                 context.forward(Pair.of(res._1, valueJoiner.apply(res._3, res._2)));
             }
         }
     }
 
-    private <T1, T2> List<Tuple3<K, T1, T2>> join(Multimap<K, T1> tab, List<Pair<K, T2>> rows) {
+    /*
+     * returns list of Tuple3 (key, val from table, val from row)
+     */
+    private <T1, T2> List<Tuple3<K, T1, T2>> join(Multimap<K, T1> tab, List<Pair<K, T2>> rows,
+                                                  JoinType leftType, JoinType rightType) {
         List<Tuple3<K, T1, T2>> res = new ArrayList<>();
         for (Pair<K, T2> row : rows) {
-            for (T1 mapValue : tab.get(row.getFirst())) {
-                if (mapValue != null) {
+            K key = row.getFirst();
+            Collection<T1> values = tab.removeAll(key);
+            if (values.isEmpty()) {
+                if (rightType == JoinType.OUTER) {
+                    res.add(new Tuple3<>(row.getFirst(), null, row.getSecond()));
+                }
+            } else {
+                for (T1 mapValue : values) {
                     res.add(new Tuple3<>(row.getFirst(), mapValue, row.getSecond()));
                 }
             }
         }
+        // whatever remains in the tab are non matching left rows.
+        if (leftType == JoinType.OUTER) {
+            for (Map.Entry<K, T1> row : tab.entries()) {
+                res.add(new Tuple3<>(row.getKey(), row.getValue(), null));
+            }
+        }
         return res;
     }
 
+    /*
+     * key1 -> (val1, val2 ..)
+     * key2 -> (val3, val4 ..)
+     */
     private <T> Multimap<K, T> getJoinTable(List<Pair<K, T>> rows) {
         Multimap<K, T> m = ArrayListMultimap.create();
         for (Pair<K, T> v : rows) {

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
new file mode 100644
index 0000000..57ad845
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateByKeyProcessor.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.processors;
+
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.operations.CombinerAggregator;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class MergeAggregateByKeyProcessor<K, V, A, R> extends BaseProcessor<Pair<K, A>> implements BatchProcessor {
+    protected final CombinerAggregator<V, A, R> aggregator;
+    protected final Map<K, A> state = new HashMap<>();
+
+    public MergeAggregateByKeyProcessor(CombinerAggregator<V, A, R> aggregator) {
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    public void execute(Pair<K, A> input) {
+        K key = input.getFirst();
+        A val = input.getSecond();
+        A accumulator = state.get(key);
+        if (accumulator == null) {
+            accumulator = aggregator.init();
+        }
+        state.put(key, aggregator.merge(accumulator, val));
+        mayBeForwardAggUpdate(Pair.of(key, aggregator.result(state.get(key))));
+    }
+
+    @Override
+    public void finish() {
+        for (Map.Entry<K, A> entry : state.entrySet()) {
+            context.forward(Pair.of(entry.getKey(), aggregator.result(entry.getValue())));
+        }
+        state.clear();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
new file mode 100644
index 0000000..61b555b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/MergeAggregateProcessor.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.processors;
+
+import org.apache.storm.streams.operations.CombinerAggregator;
+
+public class MergeAggregateProcessor<T, A, R> extends BaseProcessor<A> implements BatchProcessor {
+    private final CombinerAggregator<T, A, R> aggregator;
+    private A state;
+
+    public MergeAggregateProcessor(CombinerAggregator<T, A, R> aggregator) {
+        this.aggregator = aggregator;
+    }
+
+    @Override
+    protected void execute(A input) {
+        if (state == null) {
+            state = aggregator.init();
+        }
+        state = aggregator.merge(state, input);
+        mayBeForwardAggUpdate(aggregator.result(state));
+    }
+
+    @Override
+    public void finish() {
+        if (state != null) {
+            context.forward(aggregator.result(state));
+            state = null;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
index d64e114..0b90fb9 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/ReduceProcessor.java
@@ -35,7 +35,9 @@ public class ReduceProcessor<T> extends BaseProcessor<T> implements BatchProcess
 
     @Override
     public void finish() {
-        context.forward(agg);
-        agg = null;
+        if (agg != null) {
+            context.forward(agg);
+            agg = null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/processors/UpdateStateByKeyProcessor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/processors/UpdateStateByKeyProcessor.java b/storm-core/src/jvm/org/apache/storm/streams/processors/UpdateStateByKeyProcessor.java
index 9e068a0..5fb2730 100644
--- a/storm-core/src/jvm/org/apache/storm/streams/processors/UpdateStateByKeyProcessor.java
+++ b/storm-core/src/jvm/org/apache/storm/streams/processors/UpdateStateByKeyProcessor.java
@@ -19,14 +19,14 @@ package org.apache.storm.streams.processors;
 
 import org.apache.storm.state.KeyValueState;
 import org.apache.storm.streams.Pair;
-import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.StateUpdater;
 
 public class UpdateStateByKeyProcessor<K, V, R> extends BaseProcessor<Pair<K, V>> implements StatefulProcessor<K, R> {
-    private final Aggregator<V, R> aggregator;
+    private final StateUpdater<V, R> stateUpdater;
     private KeyValueState<K, R> keyValueState;
 
-    public UpdateStateByKeyProcessor(Aggregator<V, R> aggregator) {
-        this.aggregator = aggregator;
+    public UpdateStateByKeyProcessor(StateUpdater<V, R> stateUpdater) {
+        this.stateUpdater = stateUpdater;
     }
 
     @Override
@@ -40,9 +40,9 @@ public class UpdateStateByKeyProcessor<K, V, R> extends BaseProcessor<Pair<K, V>
         V val = input.getSecond();
         R agg = keyValueState.get(key);
         if (agg == null) {
-            agg = aggregator.init();
+            agg = stateUpdater.init();
         }
-        R newAgg = aggregator.apply(val, agg);
+        R newAgg = stateUpdater.apply(agg, val);
         keyValueState.put(key, newAgg);
         context.forward(Pair.of(key, newAgg));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple10.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple10.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple10.java
new file mode 100644
index 0000000..879d71c
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple10.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of ten elements along the lines of Scala's Tuple.
+ *
+ * @param <T1>  the type of the first element
+ * @param <T2>  the type of the second element
+ * @param <T3>  the type of the third element
+ * @param <T4>  the type of the fourth element
+ * @param <T5>  the type of the fifth element
+ * @param <T6>  the type of the sixth element
+ * @param <T7>  the type of the seventh element
+ * @param <T8>  the type of the eighth element
+ * @param <T9>  the type of the ninth element
+ * @param <T10> the type of the tenth element
+ */
+public class Tuple10<T1, T2, T3, T4, T5, T6, T7, T8, T9, T10> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+    public final T5 _5;
+    public final T6 _6;
+    public final T7 _7;
+    public final T8 _8;
+    public final T9 _9;
+    public final T10 _10;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1  the first element
+     * @param _2  the second element
+     * @param _3  the third element
+     * @param _4  the fourth element
+     * @param _5  the fifth element
+     * @param _6  the sixth element
+     * @param _7  the seventh element
+     * @param _8  the eighth element
+     * @param _9  the ninth element
+     * @param _10 the tenth element
+     */
+    public Tuple10(T1 _1, T2 _2, T3 _3, T4 _4, T5 _5, T6 _6, T7 _7, T8 _8, T9 _9, T10 _10) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+        this._5 = _5;
+        this._6 = _6;
+        this._7 = _7;
+        this._8 = _8;
+        this._9 = _9;
+        this._10 = _10;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple10<?, ?, ?, ?, ?, ?, ?, ?, ?, ?> tuple10 = (Tuple10<?, ?, ?, ?, ?, ?, ?, ?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple10._1) : tuple10._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple10._2) : tuple10._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple10._3) : tuple10._3 != null) return false;
+        if (_4 != null ? !_4.equals(tuple10._4) : tuple10._4 != null) return false;
+        if (_5 != null ? !_5.equals(tuple10._5) : tuple10._5 != null) return false;
+        if (_6 != null ? !_6.equals(tuple10._6) : tuple10._6 != null) return false;
+        if (_7 != null ? !_7.equals(tuple10._7) : tuple10._7 != null) return false;
+        if (_8 != null ? !_8.equals(tuple10._8) : tuple10._8 != null) return false;
+        if (_9 != null ? !_9.equals(tuple10._9) : tuple10._9 != null) return false;
+        return _10 != null ? _10.equals(tuple10._10) : tuple10._10 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        result = 31 * result + (_5 != null ? _5.hashCode() : 0);
+        result = 31 * result + (_6 != null ? _6.hashCode() : 0);
+        result = 31 * result + (_7 != null ? _7.hashCode() : 0);
+        result = 31 * result + (_8 != null ? _8.hashCode() : 0);
+        result = 31 * result + (_9 != null ? _9.hashCode() : 0);
+        result = 31 * result + (_10 != null ? _10.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + "," + _5 + "," + _6 + "," + _7 + "," + _8 + "," + _9 + "," + _10 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple3.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple3.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple3.java
new file mode 100644
index 0000000..514e169
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple3.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of three elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ */
+public final class Tuple3<T1, T2, T3> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     */
+    public Tuple3(T1 _1, T2 _2, T3 _3) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple3<?, ?, ?> tuple3 = (Tuple3<?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple3._1) : tuple3._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple3._2) : tuple3._2 != null) return false;
+        return _3 != null ? _3.equals(tuple3._3) : tuple3._3 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple4.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple4.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple4.java
new file mode 100644
index 0000000..5ae3fb1
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple4.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of four elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ * @param <T4> the type of the fourth element
+ */
+public final class Tuple4<T1, T2, T3, T4> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     * @param _4 the fourth element
+     */
+    public Tuple4(T1 _1, T2 _2, T3 _3, T4 _4) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple4<?, ?, ?, ?> tuple4 = (Tuple4<?, ?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple4._1) : tuple4._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple4._2) : tuple4._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple4._3) : tuple4._3 != null) return false;
+        return _4 != null ? _4.equals(tuple4._4) : tuple4._4 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple5.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple5.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple5.java
new file mode 100644
index 0000000..6b0f81e
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple5.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of five elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ * @param <T4> the type of the fourth element
+ * @param <T5> the type of the fifth element
+ */
+public class Tuple5<T1, T2, T3, T4, T5> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+    public final T5 _5;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     * @param _4 the fourth element
+     * @param _5 the fifth element
+     */
+    public Tuple5(T1 _1, T2 _2, T3 _3, T4 _4, T5 _5) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+        this._5 = _5;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple5<?, ?, ?, ?, ?> tuple5 = (Tuple5<?, ?, ?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple5._1) : tuple5._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple5._2) : tuple5._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple5._3) : tuple5._3 != null) return false;
+        if (_4 != null ? !_4.equals(tuple5._4) : tuple5._4 != null) return false;
+        return _5 != null ? _5.equals(tuple5._5) : tuple5._5 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        result = 31 * result + (_5 != null ? _5.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + "," + _5 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple6.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple6.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple6.java
new file mode 100644
index 0000000..4c35e27
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple6.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+
+/**
+ * A tuple of six elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ * @param <T4> the type of the fourth element
+ * @param <T5> the type of the fifth element
+ * @param <T6> the type of the sixth element
+ */
+public class Tuple6<T1, T2, T3, T4, T5, T6> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+    public final T5 _5;
+    public final T6 _6;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     * @param _4 the fourth element
+     * @param _5 the fifth element
+     * @param _6 the sixth element
+     */
+    public Tuple6(T1 _1, T2 _2, T3 _3, T4 _4, T5 _5, T6 _6) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+        this._5 = _5;
+        this._6 = _6;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple6<?, ?, ?, ?, ?, ?> tuple6 = (Tuple6<?, ?, ?, ?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple6._1) : tuple6._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple6._2) : tuple6._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple6._3) : tuple6._3 != null) return false;
+        if (_4 != null ? !_4.equals(tuple6._4) : tuple6._4 != null) return false;
+        if (_5 != null ? !_5.equals(tuple6._5) : tuple6._5 != null) return false;
+        return _6 != null ? _6.equals(tuple6._6) : tuple6._6 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        result = 31 * result + (_5 != null ? _5.hashCode() : 0);
+        result = 31 * result + (_6 != null ? _6.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + "," + _5 + "," + _6 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple7.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple7.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple7.java
new file mode 100644
index 0000000..366e8e9
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple7.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of seven elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ * @param <T4> the type of the fourth element
+ * @param <T5> the type of the fifth element
+ * @param <T6> the type of the sixth element
+ * @param <T7> the type of the seventh element
+ */
+public class Tuple7<T1, T2, T3, T4, T5, T6, T7> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+    public final T5 _5;
+    public final T6 _6;
+    public final T7 _7;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     * @param _4 the fourth element
+     * @param _5 the fifth element
+     * @param _6 the sixth element
+     * @param _7 the seventh element
+     */
+    public Tuple7(T1 _1, T2 _2, T3 _3, T4 _4, T5 _5, T6 _6, T7 _7) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+        this._5 = _5;
+        this._6 = _6;
+        this._7 = _7;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple7<?, ?, ?, ?, ?, ?, ?> tuple7 = (Tuple7<?, ?, ?, ?, ?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple7._1) : tuple7._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple7._2) : tuple7._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple7._3) : tuple7._3 != null) return false;
+        if (_4 != null ? !_4.equals(tuple7._4) : tuple7._4 != null) return false;
+        if (_5 != null ? !_5.equals(tuple7._5) : tuple7._5 != null) return false;
+        if (_6 != null ? !_6.equals(tuple7._6) : tuple7._6 != null) return false;
+        return _7 != null ? _7.equals(tuple7._7) : tuple7._7 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        result = 31 * result + (_5 != null ? _5.hashCode() : 0);
+        result = 31 * result + (_6 != null ? _6.hashCode() : 0);
+        result = 31 * result + (_7 != null ? _7.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + "," + _5 + "," + _6 + "," + _7 + ")";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/3a10865c/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple8.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple8.java b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple8.java
new file mode 100644
index 0000000..bf088df
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/tuple/Tuple8.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams.tuple;
+
+/**
+ * A tuple of eight elements along the lines of Scala's Tuple.
+ *
+ * @param <T1> the type of the first element
+ * @param <T2> the type of the second element
+ * @param <T3> the type of the third element
+ * @param <T4> the type of the fourth element
+ * @param <T5> the type of the fifth element
+ * @param <T6> the type of the sixth element
+ * @param <T7> the type of the seventh element
+ * @param <T8> the type of the eighth element
+ */
+public class Tuple8<T1, T2, T3, T4, T5, T6, T7, T8> {
+    public final T1 _1;
+    public final T2 _2;
+    public final T3 _3;
+    public final T4 _4;
+    public final T5 _5;
+    public final T6 _6;
+    public final T7 _7;
+    public final T8 _8;
+
+    /**
+     * Constructs a new tuple.
+     *
+     * @param _1 the first element
+     * @param _2 the second element
+     * @param _3 the third element
+     * @param _4 the fourth element
+     * @param _5 the fifth element
+     * @param _6 the sixth element
+     * @param _7 the seventh element
+     * @param _8 the eighth element
+     */
+    public Tuple8(T1 _1, T2 _2, T3 _3, T4 _4, T5 _5, T6 _6, T7 _7, T8 _8) {
+        this._1 = _1;
+        this._2 = _2;
+        this._3 = _3;
+        this._4 = _4;
+        this._5 = _5;
+        this._6 = _6;
+        this._7 = _7;
+        this._8 = _8;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Tuple8<?, ?, ?, ?, ?, ?, ?, ?> tuple8 = (Tuple8<?, ?, ?, ?, ?, ?, ?, ?>) o;
+
+        if (_1 != null ? !_1.equals(tuple8._1) : tuple8._1 != null) return false;
+        if (_2 != null ? !_2.equals(tuple8._2) : tuple8._2 != null) return false;
+        if (_3 != null ? !_3.equals(tuple8._3) : tuple8._3 != null) return false;
+        if (_4 != null ? !_4.equals(tuple8._4) : tuple8._4 != null) return false;
+        if (_5 != null ? !_5.equals(tuple8._5) : tuple8._5 != null) return false;
+        if (_6 != null ? !_6.equals(tuple8._6) : tuple8._6 != null) return false;
+        if (_7 != null ? !_7.equals(tuple8._7) : tuple8._7 != null) return false;
+        return _8 != null ? _8.equals(tuple8._8) : tuple8._8 == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = _1 != null ? _1.hashCode() : 0;
+        result = 31 * result + (_2 != null ? _2.hashCode() : 0);
+        result = 31 * result + (_3 != null ? _3.hashCode() : 0);
+        result = 31 * result + (_4 != null ? _4.hashCode() : 0);
+        result = 31 * result + (_5 != null ? _5.hashCode() : 0);
+        result = 31 * result + (_6 != null ? _6.hashCode() : 0);
+        result = 31 * result + (_7 != null ? _7.hashCode() : 0);
+        result = 31 * result + (_8 != null ? _8.hashCode() : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "(" + _1 + "," + _2 + "," + _3 + "," + _4 + "," + _5 + "," + _6 + "," + _7 + "," + _8 + ")";
+    }
+}


Mime
View raw message