storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [04/12] storm git commit: [STORM-1961] Stream api for storm core use cases
Date Tue, 07 Feb 2017 01:28:17 GMT
[STORM-1961] Stream api for storm core use cases

The initial version of unified stream api for expressing streaming computation pipelines over the storm core spouts and bolts.
Right now this provides at-least once guarantees and addresses only the storm core use cases.

For high level design see - https://issues.apache.org/jira/secure/attachment/12827547/UnifiedStreamapiforStorm.pdf

A few examples have been added which should give a basic idea on how to use the apis. More examples and detailed documentation will be added as followup tasks.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e251573d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e251573d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e251573d

Branch: refs/heads/master
Commit: e251573d524f6a6a11c7821ba93fd155d9beb770
Parents: e3b2f96
Author: Arun Mahadevan <arunm@apache.org>
Authored: Mon Sep 19 23:50:16 2016 +0530
Committer: Arun Mahadevan <arunm@apache.org>
Committed: Fri Jan 13 01:20:43 2017 +0530

----------------------------------------------------------------------
 .../storm/starter/streams/BranchExample.java    |  72 +++
 .../streams/GroupByKeyAndWindowExample.java     | 109 ++++
 .../storm/starter/streams/JoinExample.java      | 110 ++++
 .../starter/streams/StateQueryExample.java      | 112 ++++
 .../starter/streams/StatefulWordCount.java      |  85 +++
 .../starter/streams/WindowedWordCount.java      |  90 +++
 .../storm/starter/streams/WordCountToBolt.java  | 111 ++++
 .../src/jvm/org/apache/storm/streams/Edge.java  |  41 ++
 .../org/apache/storm/streams/GroupingInfo.java  |  84 +++
 .../src/jvm/org/apache/storm/streams/Node.java  | 129 ++++
 .../src/jvm/org/apache/storm/streams/Pair.java  |  78 +++
 .../org/apache/storm/streams/PairStream.java    | 241 ++++++++
 .../org/apache/storm/streams/PartitionNode.java |  42 ++
 .../org/apache/storm/streams/ProcessorBolt.java |  70 +++
 .../storm/streams/ProcessorBoltDelegate.java    | 285 +++++++++
 .../org/apache/storm/streams/ProcessorNode.java |  81 +++
 .../apache/storm/streams/RefCountedTuple.java   |  64 ++
 .../jvm/org/apache/storm/streams/SinkNode.java  |  44 ++
 .../jvm/org/apache/storm/streams/SpoutNode.java |  48 ++
 .../storm/streams/StatefulProcessorBolt.java    | 116 ++++
 .../jvm/org/apache/storm/streams/Stream.java    | 393 ++++++++++++
 .../org/apache/storm/streams/StreamBolt.java    |  38 ++
 .../org/apache/storm/streams/StreamBuilder.java | 591 +++++++++++++++++++
 .../org/apache/storm/streams/StreamState.java   |  43 ++
 .../org/apache/storm/streams/StreamUtil.java    |  62 ++
 .../storm/streams/StreamsEdgeFactory.java       |  29 +
 .../jvm/org/apache/storm/streams/Tuple3.java    |  49 ++
 .../org/apache/storm/streams/UniqueIdGen.java   |  56 ++
 .../org/apache/storm/streams/WindowNode.java    |  38 ++
 .../storm/streams/WindowedProcessorBolt.java    | 137 +++++
 .../storm/streams/operations/Aggregator.java    |  42 ++
 .../storm/streams/operations/Consumer.java      |  32 +
 .../streams/operations/FlatMapFunction.java     |  27 +
 .../storm/streams/operations/Function.java      |  34 ++
 .../streams/operations/IdentityFunction.java    |  31 +
 .../storm/streams/operations/Operation.java     |  26 +
 .../streams/operations/PairFlatMapFunction.java |  30 +
 .../storm/streams/operations/PairFunction.java  |  30 +
 .../streams/operations/PairValueJoiner.java     |  40 ++
 .../storm/streams/operations/Predicate.java     |  33 ++
 .../storm/streams/operations/PrintConsumer.java |  30 +
 .../storm/streams/operations/Reducer.java       |  35 ++
 .../storm/streams/operations/ValueJoiner.java   |  36 ++
 .../streams/operations/aggregators/Count.java   |  37 ++
 .../streams/operations/aggregators/Sum.java     |  35 ++
 .../operations/mappers/PairValueMapper.java     |  51 ++
 .../operations/mappers/TupleValueMapper.java    |  30 +
 .../streams/operations/mappers/ValueMapper.java |  45 ++
 .../operations/mappers/ValuesMapper.java        |  48 ++
 .../processors/AggregateByKeyProcessor.java     |  54 ++
 .../streams/processors/AggregateProcessor.java  |  45 ++
 .../storm/streams/processors/BaseProcessor.java | 107 ++++
 .../streams/processors/BatchProcessor.java      |  25 +
 .../streams/processors/BranchProcessor.java     |  41 ++
 .../processors/ChainedProcessorContext.java     |  66 +++
 .../processors/EmittingProcessorContext.java    | 170 ++++++
 .../streams/processors/FilterProcessor.java     |  35 ++
 .../streams/processors/FlatMapProcessor.java    |  35 ++
 .../processors/FlatMapValuesProcessor.java      |  36 ++
 .../streams/processors/ForEachProcessor.java    |  33 ++
 .../processors/ForwardingProcessorContext.java  | 102 ++++
 .../storm/streams/processors/JoinProcessor.java | 112 ++++
 .../storm/streams/processors/MapProcessor.java  |  33 ++
 .../streams/processors/MapValuesProcessor.java  |  34 ++
 .../storm/streams/processors/PeekProcessor.java |  34 ++
 .../storm/streams/processors/Processor.java     |  51 ++
 .../streams/processors/ProcessorContext.java    |  59 ++
 .../processors/ReduceByKeyProcessor.java        |  52 ++
 .../streams/processors/ReduceProcessor.java     |  41 ++
 .../streams/processors/StateQueryProcessor.java |  48 ++
 .../streams/processors/StatefulProcessor.java   |  36 ++
 .../processors/UpdateStateByKeyProcessor.java   |  49 ++
 .../storm/streams/windowing/BaseWindow.java     |  64 ++
 .../storm/streams/windowing/SlidingWindows.java | 151 +++++
 .../streams/windowing/TumblingWindows.java      | 119 ++++
 .../apache/storm/streams/windowing/Window.java  |  70 +++
 .../topology/StatefulWindowedBoltExecutor.java  |   4 +-
 .../storm/topology/WindowedBoltExecutor.java    |   4 +-
 .../storm/topology/base/BaseWindowedBolt.java   |  37 +-
 .../windowing/AbstractTridentWindowManager.java |   2 +-
 .../storm/windowing/CountEvictionPolicy.java    |   8 +-
 .../apache/storm/windowing/EvictionPolicy.java  |   7 +
 .../storm/windowing/TimeEvictionPolicy.java     |  17 +-
 .../apache/storm/windowing/TupleWindowImpl.java |  11 +
 .../windowing/WatermarkCountEvictionPolicy.java |  10 +-
 .../windowing/WatermarkTimeEvictionPolicy.java  |   4 +-
 .../jvm/org/apache/storm/windowing/Window.java  |   7 +
 .../windowing/WindowLifecycleListener.java      |   3 +-
 .../apache/storm/windowing/WindowManager.java   |   2 +-
 .../apache/storm/streams/ProcessorBoltTest.java | 165 ++++++
 .../streams/StatefulProcessorBoltTest.java      | 100 ++++
 .../apache/storm/streams/StreamBuilderTest.java | 219 +++++++
 .../streams/WindowedProcessorBoltTest.java      | 110 ++++
 .../storm/windowing/WindowManagerTest.java      |   2 +-
 94 files changed, 6505 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
new file mode 100644
index 0000000..f5400a5
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/BranchExample.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.spout.RandomIntegerSpout;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.Predicate;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An example that demonstrates the usage of {@link Stream#branch(Predicate[])} to split a stream
+ * into multiple branches based on predicates.
+ */
+public class BranchExample {
+    private static final Logger LOG = LoggerFactory.getLogger(BranchExample.class);
+
+    @SuppressWarnings("unchecked")
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        Stream<Integer>[] evenAndOdd = builder
+                /*
+                 * Create a stream of random numbers from a spout that
+                 * emits random integers by extracting the tuple value at index 0.
+                 */
+                .newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0))
+                /*
+                 * Split the stream of numbers into streams of
+                 * even and odd numbers. The first stream contains even
+                 * and the second contains odd numbers.
+                 */
+                .branch(x -> (x % 2) == 0,
+                        x -> (x % 2) == 1);
+
+        evenAndOdd[0].forEach(x -> LOG.info("EVEN> " + x));
+        evenAndOdd[1].forEach(x -> LOG.info("ODD > " + x));
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
new file mode 100644
index 0000000..6b505bd
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/GroupByKeyAndWindowExample.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.streams.PairStream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.Reducer;
+import org.apache.storm.streams.operations.mappers.PairValueMapper;
+import org.apache.storm.streams.windowing.SlidingWindows;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.topology.base.BaseWindowedBolt.Count;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An example that shows the usage of {@link PairStream#groupByKeyAndWindow(Window)}
+ * and {@link PairStream#reduceByKeyAndWindow(Reducer, Window)}
+ */
+public class GroupByKeyAndWindowExample {
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+
+        // a stream of stock quotes
+        builder.newStream(new StockQuotes(), new PairValueMapper<String, Double>(0, 1))
+                /*
+                 * The elements having the same key within the window will be grouped
+                 * together and the corresponding values will be merged.
+                 */
+                .groupByKeyAndWindow(SlidingWindows.of(Count.of(6), Count.of(3)))
+                .print();
+
+        // a stream of stock quotes
+        builder.newStream(new StockQuotes(), new PairValueMapper<String, Double>(0, 1))
+                /*
+                 * The elements having the same key within the window will be grouped
+                 * together and their values will be reduced using the given reduce function.
+                 */
+                .reduceByKeyAndWindow((x, y) -> (x + y) / 2.0, SlidingWindows.of(Count.of(6), Count.of(3)))
+                .print();
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+
+    private static class StockQuotes extends BaseRichSpout {
+        private final List<List<Values>> values = Arrays.asList(
+                Arrays.asList(new Values("AAPL", 100.0), new Values("GOOG", 780.0), new Values("FB", 125.0)),
+                Arrays.asList(new Values("AAPL", 105.0), new Values("GOOG", 790.0), new Values("FB", 130.0))
+        );
+        private SpoutOutputCollector collector;
+        private int index = 0;
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(5000);
+            for (Values v : values.get(index)) {
+                collector.emit(v);
+            }
+            index = (index + 1) % values.size();
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("key", "val"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
new file mode 100644
index 0000000..0b15615
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/JoinExample.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.streams.PairStream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.Function;
+import org.apache.storm.streams.operations.mappers.PairValueMapper;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * An example that demonstrates the usage of {@link PairStream#join(PairStream)} to join
+ * multiple streams.
+ */
+public class JoinExample {
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        // a stream of (number, square) pairs
+        PairStream<Integer, Integer> squares = builder
+                .newStream(new NumberSpout(x -> x * x),
+                        new PairValueMapper<>(0, 1));
+        // a stream of (number, cube) pairs
+        PairStream<Integer, Integer> cubes = builder
+                .newStream(new NumberSpout(x -> x * x * x),
+                        new PairValueMapper<>(0, 1));
+
+        // create a windowed stream of five seconds duration
+        squares.window(TumblingWindows.of(Duration.seconds(5)))
+                /*
+                 * Join the squares and the cubes stream within the window.
+                 * The values in the squares stream having the same key as that
+                 * of the cubes stream within the window will be joined together.
+                 */
+                .join(cubes)
+                /**
+                 * The results should be of the form (number, (square, cube))
+                 */
+                .print();
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+
+    }
+
+    private static class NumberSpout extends BaseRichSpout {
+        private final Function<Integer, Integer> function;
+        private SpoutOutputCollector collector;
+        private int i = 1;
+
+        NumberSpout(Function<Integer, Integer> function) {
+            this.function = function;
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(990);
+            collector.emit(new Values(i, function.apply(i)));
+            i++;
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("key", "val"));
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
new file mode 100644
index 0000000..e76dd3c
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StateQueryExample.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.Stream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.StreamState;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.apache.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * An example that uses {@link Stream#stateQuery(StreamState)} to query the state
+ * <p>
+ * You should start a local redis instance before running the 'storm jar' command. By default
+ * the connection will be attempted at localhost:6379. The default
+ * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g.
+ * <p/>
+ * <pre>
+ * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...",
+ *  "keySerializerClass":"...", "valueSerializerClass":"...",
+ *  "jedisPoolConfig":{"host":"localhost", "port":6379,
+ *  "timeout":2000, "database":0, "password":"xyz"}}'
+ * </pre>
+ */
+public class StateQueryExample {
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        StreamState<String, Long> ss = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
+                .mapToPair(w -> Pair.of(w, 1))
+                .groupByKey()
+                .updateStateByKey(new Count<>());
+
+        /*
+         * A stream of words emitted by the QuerySpout is used as
+         * the keys to query the state.
+         */
+        builder.newStream(new QuerySpout(), new ValueMapper<String>(0))
+                /*
+                 * Queries the state and emits the
+                 * matching (key, value) as results. The stream state returned
+                 * by the updateStateByKey is passed as the argument to stateQuery.
+                 */
+                .stateQuery(ss).print();
+
+        Config config = new Config();
+        // use redis based state store for persistence
+        config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+
+    private static class QuerySpout extends BaseRichSpout {
+        private SpoutOutputCollector collector;
+        private final String[] words = {"nathan", "mike"};
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            Utils.sleep(2000);
+            for (String word : words) {
+                collector.emit(new Values(word));
+            }
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("word"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
new file mode 100644
index 0000000..f6ae6b0
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/StatefulWordCount.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.PairStream;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.utils.Utils;
+
+/**
+ * A stateful word count that uses {@link PairStream#updateStateByKey(Aggregator)} to
+ * save the counts in a key value state. This example uses Redis state store.
+ * <p>
+ * You should start a local redis instance before running the 'storm jar' command. By default
+ * the connection will be attempted at localhost:6379. The default
+ * RedisKeyValueStateProvider parameters can be overridden in conf/storm.yaml, for e.g.
+ * <p/>
+ * <pre>
+ * topology.state.provider.config: '{"keyClass":"...", "valueClass":"...",
+ *  "keySerializerClass":"...", "valueSerializerClass":"...",
+ *  "jedisPoolConfig":{"host":"localhost", "port":6379,
+ *  "timeout":2000, "database":0, "password":"xyz"}}'
+ * </pre>
+ */
+public class StatefulWordCount {
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        // a stream of words
+        builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
+                /*
+                 * create a stream of (word, 1) pairs
+                 */
+                .mapToPair(w -> Pair.of(w, 1))
+                /*
+                 * group by the word
+                 */
+                .groupByKey()
+                /*
+                 * update the word counts in the state
+                 */
+                .updateStateByKey(new Count<>())
+                 /*
+                  * convert the state back to a stream and print the results
+                  */
+                .toPairStream()
+                .print();
+
+        Config config = new Config();
+        // use redis based state store for persistence
+        config.put(Config.TOPOLOGY_STATE_PROVIDER, "org.apache.storm.redis.state.RedisKeyValueStateProvider");
+
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
new file mode 100644
index 0000000..c6e2f4a
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WindowedWordCount.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.starter.spout.RandomSentenceSpout;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.streams.windowing.TumblingWindows;
+import org.apache.storm.utils.Utils;
+
+import java.util.Arrays;
+
+import static org.apache.storm.topology.base.BaseWindowedBolt.Duration;
+
+/**
+ * A windowed word count example
+ */
+public class WindowedWordCount {
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+        // A stream of random sentences
+        builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0))
+                /*
+                 * Increase the parallelism of this stream. Further operations
+                 * on this stream will execute at this level of parallelism.
+                 */
+                .repartition(2)
+                /*
+                 * split the sentences to words
+                 */
+                .flatMap(s -> Arrays.asList(s.split(" ")))
+                /*
+                 * create a stream of (word, 1) pairs
+                 */
+                .mapToPair(w -> Pair.of(w, 1))
+                /*
+                 * group by word so that the same words end up in the same partition
+                 */
+                .groupByKey()
+                /*
+                 * a two seconds tumbling window
+                 */
+                .window(TumblingWindows.of(Duration.seconds(2)))
+                /*
+                 * compute the word counts in the last two second window
+                 */
+                .aggregateByKey(new Count<>())
+                /*
+                 * emit the count for the words that occurred
+                 * at-least five times in the last two seconds
+                 */
+                .filter(x -> x.getSecond() >= 5)
+                /*
+                 * print the results to stdout
+                 */
+                .print();
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
new file mode 100644
index 0000000..a711696
--- /dev/null
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/streams/WordCountToBolt.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.starter.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.redis.bolt.RedisStoreBolt;
+import org.apache.storm.redis.common.config.JedisPoolConfig;
+import org.apache.storm.redis.common.mapper.RedisDataTypeDescription;
+import org.apache.storm.redis.common.mapper.RedisStoreMapper;
+import org.apache.storm.streams.Pair;
+import org.apache.storm.streams.StreamBuilder;
+import org.apache.storm.streams.operations.aggregators.Count;
+import org.apache.storm.streams.operations.mappers.ValueMapper;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.tuple.ITuple;
+import org.apache.storm.utils.Utils;
+
+/**
+ * An example that computes word counts and finally emits the results to an
+ * external bolt (sink)
+ */
+public class WordCountToBolt {
+    public static void main(String[] args) throws Exception {
+        StreamBuilder builder = new StreamBuilder();
+
+        // Redis config parameters for the RedisStoreBolt
+        JedisPoolConfig poolConfig = new JedisPoolConfig.Builder()
+                .setHost("127.0.0.1").setPort(6379).build();
+        // Storm tuple to redis key-value mapper
+        RedisStoreMapper storeMapper = new WordCountStoreMapper();
+        // The redis bolt (sink)
+        IRichBolt redisStoreBolt = new RedisStoreBolt(poolConfig, storeMapper);
+
+        // A stream of words
+        builder.newStream(new TestWordSpout(), new ValueMapper<String>(0))
+                /*
+                 * create a stream of (word, 1) pairs
+                 */
+                .mapToPair(w -> Pair.of(w, 1))
+                /*
+                 * group by key (word)
+                 */
+                .groupByKey()
+                /*
+                 * aggregate the count
+                 */
+                .aggregateByKey(new Count<>())
+                /*
+                 * The result of aggregation is forwarded to
+                 * the RedisStoreBolt. The forwarded tuple is a
+                 * key-value pair of (word, count) with ("key", "value")
+                 * being the field names.
+                 */
+                .to(redisStoreBolt);
+
+        Config config = new Config();
+        if (args.length > 0) {
+            config.setNumWorkers(1);
+            StormSubmitter.submitTopologyWithProgressBar(args[0], config, builder.build());
+        } else {
+            LocalCluster cluster = new LocalCluster();
+            cluster.submitTopology("test", config, builder.build());
+            Utils.sleep(60000);
+            cluster.killTopology("test");
+            cluster.shutdown();
+        }
+    }
+
+    // Maps a storm tuple to redis key and value
+    private static class WordCountStoreMapper implements RedisStoreMapper {
+        private final RedisDataTypeDescription description;
+        private final String hashKey = "wordCount";
+
+        WordCountStoreMapper() {
+            description = new RedisDataTypeDescription(RedisDataTypeDescription.RedisDataType.HASH, hashKey);
+        }
+
+        @Override
+        public RedisDataTypeDescription getDataTypeDescription() {
+            return description;
+        }
+
+        @Override
+        public String getKeyFromTuple(ITuple tuple) {
+            return tuple.getStringByField("key");
+        }
+
+        @Override
+        public String getValueFromTuple(ITuple tuple) {
+            return String.valueOf(tuple.getLongByField("value"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Edge.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Edge.java b/storm-core/src/jvm/org/apache/storm/streams/Edge.java
new file mode 100644
index 0000000..9b13562
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Edge.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import java.io.Serializable;
+
+/**
+ * An edge connects source and target nodes
+ */
+class Edge implements Serializable {
+    private final Node source;
+    private final Node target;
+
+    Edge(Node source, Node target) {
+        this.source = source;
+        this.target = target;
+    }
+
+    public Node getSource() {
+        return source;
+    }
+
+    public Node getTarget() {
+        return target;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java b/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
new file mode 100644
index 0000000..81def4b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/GroupingInfo.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.topology.BoltDeclarer;
+import org.apache.storm.tuple.Fields;
+
+import java.io.Serializable;
+
+abstract class GroupingInfo implements Serializable {
+    private final Fields fields;
+
+    private GroupingInfo() {
+        this(null);
+    }
+
+    private GroupingInfo(Fields fields) {
+        this.fields = fields;
+    }
+
+    public abstract void declareGrouping(BoltDeclarer declarer, String componentId, String streamId, Fields fields);
+
+    public static GroupingInfo shuffle() {
+        return new GroupingInfo() {
+            @Override
+            public void declareGrouping(BoltDeclarer declarer, String componentId, String streamId, Fields fields) {
+                declarer.shuffleGrouping(componentId, streamId);
+            }
+        };
+    }
+
+    public static GroupingInfo fields(Fields fields) {
+        return new GroupingInfo(fields) {
+            @Override
+            public void declareGrouping(BoltDeclarer declarer, String componentId, String streamId, Fields fields) {
+                declarer.fieldsGrouping(componentId, streamId, fields);
+            }
+        };
+    }
+
+    public static GroupingInfo global() {
+        return new GroupingInfo() {
+            @Override
+            public void declareGrouping(BoltDeclarer declarer, String componentId, String streamId, Fields fields) {
+                declarer.globalGrouping(componentId, streamId);
+            }
+        };
+    }
+
+    public static GroupingInfo all() {
+        return new GroupingInfo() {
+            @Override
+            public void declareGrouping(BoltDeclarer declarer, String componentId, String streamId, Fields fields) {
+                declarer.allGrouping(componentId, streamId);
+            }
+        };
+    }
+
+    public Fields getFields() {
+        return fields;
+    }
+
+    @Override
+    public String toString() {
+        return "GroupingInfo{" +
+                "fields=" + fields +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Node.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Node.java b/storm-core/src/jvm/org/apache/storm/streams/Node.java
new file mode 100644
index 0000000..f9de390
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Node.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multimaps;
+import org.apache.storm.generated.StreamInfo;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Base class for a Node which form the vertices of the topology DAG.
+ */
+abstract class Node implements Serializable {
+    private final Set<String> outputStreams;
+    protected final Fields outputFields;
+    protected String componentId;
+    protected int parallelism;
+    // the parent streams that this node subscribes to
+    private final Multimap<Node, String> parentStreams = ArrayListMultimap.create();
+
+    Node(Set<String> outputStreams, Fields outputFields, String componentId, int parallelism) {
+        this.outputStreams = new HashSet<>(outputStreams);
+        this.outputFields = outputFields;
+        this.componentId = componentId;
+        this.parallelism = parallelism;
+    }
+
+    Node(String outputStream, Fields outputFields, String componentId, int parallelism) {
+        this(Collections.singleton(outputStream), outputFields, componentId, parallelism);
+    }
+
+    Node(String outputStream, Fields outputFields, String componentId) {
+        this(outputStream, outputFields, componentId, 1);
+    }
+
+    Node(String outputStream, Fields outputFields) {
+        this(outputStream, outputFields, null);
+    }
+
+    public Fields getOutputFields() {
+        return outputFields;
+    }
+
+    String getComponentId() {
+        return componentId;
+    }
+
+    void setComponentId(String componentId) {
+        this.componentId = componentId;
+    }
+
+    Integer getParallelism() {
+        return parallelism;
+    }
+
+    void setParallelism(int parallelism) {
+        this.parallelism = parallelism;
+    }
+
+    void addParentStream(Node parent, String streamId) {
+        parentStreams.put(parent, streamId);
+    }
+
+    void removeParentStreams(Node parent) {
+        parentStreams.removeAll(parent);
+    }
+
+    Set<String> getOutputStreams() {
+        return Collections.unmodifiableSet(outputStreams);
+    }
+
+    Collection<String> getParentStreams(Node parent) {
+        return parentStreams.get(parent);
+    }
+
+    Set<Node> getParents(String stream) {
+        Multimap<String, Node> rev = Multimaps.invertFrom(parentStreams, ArrayListMultimap.<String, Node>create());
+        return new HashSet<>(rev.get(stream));
+    }
+
+    void addOutputStream(String streamId) {
+        outputStreams.add(streamId);
+    }
+
+    static Fields getOutputFields(IComponent component, String streamId) {
+        OutputFieldsGetter getter = new OutputFieldsGetter();
+        component.declareOutputFields(getter);
+        Map<String, StreamInfo> fieldsDeclaration = getter.getFieldsDeclaration();
+        if ((fieldsDeclaration != null) && fieldsDeclaration.containsKey(streamId)) {
+            return new Fields(fieldsDeclaration.get(streamId).get_output_fields());
+        }
+        return new Fields();
+    }
+
+    @Override
+    public String toString() {
+        return "Node{" +
+                "outputStreams='" + outputStreams + '\'' +
+                ", outputFields=" + outputFields +
+                ", componentId='" + componentId + '\'' +
+                ", parallelism=" + parallelism +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/Pair.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/Pair.java b/storm-core/src/jvm/org/apache/storm/streams/Pair.java
new file mode 100644
index 0000000..0044359
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/Pair.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import java.io.Serializable;
+
+/**
+ * A pair of values.
+ *
+ * @param <T1> the type of the first value
+ * @param <T2> the type of the second value
+ */
+public final class Pair<T1, T2> implements Serializable {
+    private final T1 first;
+    private final T2 second;
+
+    /**
+     * Constructs a new pair of values
+     *
+     * @param first  the first value
+     * @param second the second value
+     */
+    private Pair(T1 first, T2 second) {
+        this.first = first;
+        this.second = second;
+    }
+
+    /**
+     * Returns the first value in a pair.
+     *
+     * @return the first value
+     */
+    public T1 getFirst() {
+        return first;
+    }
+
+    /**
+     * Returns the second value in a pair.
+     *
+     * @return the second value
+     */
+    public T2 getSecond() {
+        return second;
+    }
+
+    /**
+     * Constructs a new pair of values.
+     *
+     * @param first  the first value
+     * @param second the second value
+     * @param <T1>   the type of the first value
+     * @param <T2>   the type of the second value
+     * @return a new pair of values
+     */
+    public static <T1, T2> Pair<T1, T2> of(T1 first, T2 second) {
+        return new Pair<>(first, second);
+    }
+
+    @Override
+    public String toString() {
+        return "(" + first + ", " + second + ')';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/PairStream.java b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
new file mode 100644
index 0000000..2d18b30
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/PairStream.java
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.Config;
+import org.apache.storm.streams.operations.Aggregator;
+import org.apache.storm.streams.operations.Consumer;
+import org.apache.storm.streams.operations.FlatMapFunction;
+import org.apache.storm.streams.operations.Function;
+import org.apache.storm.streams.operations.PairValueJoiner;
+import org.apache.storm.streams.operations.Predicate;
+import org.apache.storm.streams.operations.Reducer;
+import org.apache.storm.streams.operations.ValueJoiner;
+import org.apache.storm.streams.processors.AggregateByKeyProcessor;
+import org.apache.storm.streams.processors.FlatMapValuesProcessor;
+import org.apache.storm.streams.processors.JoinProcessor;
+import org.apache.storm.streams.processors.MapValuesProcessor;
+import org.apache.storm.streams.processors.ReduceByKeyProcessor;
+import org.apache.storm.streams.processors.UpdateStateByKeyProcessor;
+import org.apache.storm.streams.windowing.Window;
+import org.apache.storm.tuple.Fields;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * Represents a stream of key-value pairs.
+ *
+ * @param <K> the key type
+ * @param <V> the value type
+ */
+public class PairStream<K, V> extends Stream<Pair<K, V>> {
+
+    PairStream(StreamBuilder topology, Node node) {
+        super(topology, node);
+    }
+
+    /**
+     * Returns a new stream by applying a {@link Function} to the value of each key-value pairs in
+     * this stream.
+     *
+     * @param function the mapping function
+     * @param <R>      the result type
+     * @return the new stream
+     */
+    public <R> PairStream<K, R> mapValues(Function<? super V, ? extends R> function) {
+        return new PairStream<>(streamBuilder, addProcessorNode(new MapValuesProcessor<>(function), KEY_VALUE));
+    }
+
+    /**
+     * Return a new stream by applying a {@link FlatMapFunction} function to the value of each key-value pairs in
+     * this stream.
+     *
+     * @param function the flatmap function
+     * @param <R>      the result type
+     * @return the new stream
+     */
+    public <R> PairStream<K, R> flatMapValues(FlatMapFunction<V, R> function) {
+        return new PairStream<>(streamBuilder, addProcessorNode(new FlatMapValuesProcessor<>(function), KEY_VALUE));
+    }
+
+    /**
+     * Aggregates the values for each key of this stream using the given {@link Aggregator}.
+     *
+     * @param aggregator the aggregator
+     * @param <R>        the result type
+     * @return the new stream
+     */
+    public <R> PairStream<K, R> aggregateByKey(Aggregator<? super V, ? extends R> aggregator) {
+        return new PairStream<>(streamBuilder, addProcessorNode(new AggregateByKeyProcessor<>(aggregator), KEY_VALUE));
+    }
+
+    /**
+     * Performs a reduction on the values for each key of this stream by repeatedly applying the reducer.
+     *
+     * @param reducer the reducer
+     * @return the new stream
+     */
+    public PairStream<K, V> reduceByKey(Reducer<V> reducer) {
+        return new PairStream<>(streamBuilder, addProcessorNode(new ReduceByKeyProcessor<>(reducer), KEY_VALUE));
+    }
+
+    /**
+     * Returns a new stream where the values are grouped by the keys.
+     *
+     * @return the new stream
+     */
+    public PairStream<K, V> groupByKey() {
+        return partitionBy(KEY);
+    }
+
+    /**
+     * Returns a new stream where the values are grouped by keys and the given window.
+     * The values that arrive within a window having the same key will be merged together and returned
+     * as an Iterable of values mapped to the key.
+     *
+     * @param window the window configuration
+     * @return the new stream
+     */
+    public PairStream<K, Iterable<V>> groupByKeyAndWindow(Window<?, ?> window) {
+        return groupByKey().window(window).aggregateByKey(new MergeValues<>());
+    }
+
+    /**
+     * Returns a new stream where the values that arrive within a window
+     * having the same key will be reduced by repeatedly applying the reducer.
+     *
+     * @param reducer the reducer
+     * @param window  the window configuration
+     * @return the new stream
+     */
+    public PairStream<K, V> reduceByKeyAndWindow(Reducer<V> reducer, Window<?, ?> window) {
+        return groupByKey().window(window).reduceByKey(reducer);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public PairStream<K, V> peek(Consumer<? super Pair<K, V>> action) {
+        return toPairStream(super.peek(action));
+    }
+
+    /**
+     * Join the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism and windowing parameters (if windowed) of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <V1> PairStream<K, Pair<V, V1>> join(PairStream<K, V1> otherStream) {
+        return join(otherStream, new PairValueJoiner<>());
+    }
+
+    /**
+     * Join the values of this stream with the values having the same key from the other stream.
+     * <p>
+     * Note: The parallelism and windowing parameters (if windowed) of this stream is carried forward to the joined stream.
+     * </p>
+     *
+     * @param otherStream the other stream
+     * @param valueJoiner the {@link ValueJoiner}
+     * @param <R>         the type of the values resulting from the join
+     * @param <V1>        the type of the values in the other stream
+     * @return the new stream
+     */
+    public <R, V1> PairStream<K, R> join(PairStream<K, V1> otherStream,
+                                         ValueJoiner<? super V, ? super V1, ? extends R> valueJoiner) {
+        String leftStream = stream;
+        String rightStream = otherStream.stream;
+        Node joinNode = addProcessorNode(new JoinProcessor<>(leftStream, rightStream, valueJoiner), KEY_VALUE);
+        addNode(otherStream.getNode(), joinNode, joinNode.getParallelism());
+        return new PairStream<>(streamBuilder, joinNode);
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public PairStream<K, V> window(Window<?, ?> window) {
+        return toPairStream(super.window(window));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public PairStream<K, V> repartition(int parallelism) {
+        return toPairStream(super.repartition(parallelism));
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    @SuppressWarnings("unchecked")
+    public PairStream<K, V>[] branch(Predicate<Pair<K, V>>... predicates) {
+        List<PairStream<K, V>> pairStreams = new ArrayList<>();
+        for (Stream<Pair<K, V>> stream : super.branch(predicates)) {
+            pairStreams.add(toPairStream(stream));
+        }
+        return pairStreams.toArray(new PairStream[pairStreams.size()]);
+    }
+
+    /**
+     * Update the state by applying the given aggregator to the previous state of the
+     * key and the new value for the key. This internally uses {@link org.apache.storm.topology.IStatefulBolt}
+     * to save the state. Use {@link Config#TOPOLOGY_STATE_PROVIDER} to choose the state implementation.
+     *
+     * @param aggregator the aggregator
+     * @param <R>        the result type
+     * @return the {@link StreamState} which can be used to query the state
+     */
+    public <R> StreamState<K, R> updateStateByKey(Aggregator<? super V, ? extends R> aggregator) {
+        return new StreamState<>(
+                new PairStream<>(streamBuilder, addProcessorNode(new UpdateStateByKeyProcessor<>(aggregator), KEY_VALUE)));
+    }
+
+    private PairStream<K, V> partitionBy(Fields fields) {
+        return new PairStream<>(
+                streamBuilder,
+                addNode(new PartitionNode(stream, node.getOutputFields(), GroupingInfo.fields(fields))));
+    }
+
+    private PairStream<K, V> toPairStream(Stream<Pair<K, V>> stream) {
+        return new PairStream<>(stream.streamBuilder, stream.node);
+    }
+
+    // used internally to merge values in groupByKeyAndWindow
+    private static class MergeValues<V> implements Aggregator<V, ArrayList<V>> {
+        @Override
+        public ArrayList<V> init() {
+            return new ArrayList<>();
+        }
+
+        @Override
+        public ArrayList<V> apply(V value, ArrayList<V> aggregate) {
+            aggregate.add(value);
+            return aggregate;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java b/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
new file mode 100644
index 0000000..ca92def
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/PartitionNode.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Node that holds partitioning/grouping information.
+ * This is used for operations like groupBy (fields grouping), global
+ * aggregate/reduce (global grouping), state query (all grouping).
+ */
+class PartitionNode extends Node {
+    private final GroupingInfo groupingInfo;
+
+    PartitionNode(String outputStream, Fields outputFields, GroupingInfo groupingInfo) {
+        super(outputStream, outputFields);
+        this.groupingInfo = groupingInfo;
+    }
+
+    PartitionNode(String outputStream, Fields outputFields) {
+        this(outputStream, outputFields, GroupingInfo.shuffle());
+    }
+
+    GroupingInfo getGroupingInfo() {
+        return groupingInfo;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/ProcessorBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/ProcessorBolt.java b/storm-core/src/jvm/org/apache/storm/streams/ProcessorBolt.java
new file mode 100644
index 0000000..f1163ca
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/ProcessorBolt.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.Multimap;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.DirectedGraph;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Stream bolt that executes the different processors (except windowed and stateful operations)
+ */
+class ProcessorBolt extends BaseRichBolt implements StreamBolt {
+    private final ProcessorBoltDelegate delegate;
+
+    ProcessorBolt(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
+        delegate = new ProcessorBoltDelegate(id, graph, nodes);
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        delegate.prepare(stormConf, context, collector);
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        delegate.processAndAck(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        delegate.declareOutputFields(declarer);
+    }
+
+
+    @Override
+    public void setTimestampField(String fieldName) {
+        delegate.setTimestampField(fieldName);
+    }
+
+    @Override
+    public String getId() {
+        return delegate.getId();
+    }
+
+    void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+        delegate.setStreamToInitialProcessors(streamToInitialProcessors);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java b/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
new file mode 100644
index 0000000..5bc6fff
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/ProcessorBoltDelegate.java
@@ -0,0 +1,285 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.Multimap;
+import org.apache.storm.streams.processors.ChainedProcessorContext;
+import org.apache.storm.streams.processors.EmittingProcessorContext;
+import org.apache.storm.streams.processors.ForwardingProcessorContext;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.streams.processors.ProcessorContext;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.jgrapht.DirectedGraph;
+import org.jgrapht.graph.DirectedSubgraph;
+import org.jgrapht.traverse.TopologicalOrderIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+class ProcessorBoltDelegate implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(ProcessorBoltDelegate.class);
+    private final String id;
+    private final DirectedGraph<Node, Edge> graph;
+    private final List<ProcessorNode> nodes;
+    private Map stormConf;
+    private TopologyContext topologyContext;
+    private OutputCollector outputCollector;
+    private final List<ProcessorNode> outgoingProcessors = new ArrayList<>();
+    private final Set<EmittingProcessorContext> emittingProcessorContexts = new HashSet<>();
+    private final Map<ProcessorNode, Set<String>> punctuationState = new HashMap<>();
+    private Multimap<String, ProcessorNode> streamToInitialProcessors;
+    private String timestampField;
+
+    ProcessorBoltDelegate(String id, DirectedGraph<Node, Edge> graph, List<ProcessorNode> nodes) {
+        this.id = id;
+        this.graph = graph;
+        this.nodes = new ArrayList<>(nodes);
+    }
+
+    String getId() {
+        return id;
+    }
+
+    void addNodes(Collection<ProcessorNode> nodes) {
+        this.nodes.addAll(nodes);
+    }
+
+    List<ProcessorNode> getNodes() {
+        return Collections.unmodifiableList(nodes);
+    }
+
+    void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.stormConf = stormConf;
+        topologyContext = context;
+        outputCollector = collector;
+        DirectedSubgraph<Node, Edge> subgraph = new DirectedSubgraph<>(graph, new HashSet<>(nodes), null);
+        TopologicalOrderIterator<Node, Edge> it = new TopologicalOrderIterator<>(subgraph);
+        while (it.hasNext()) {
+            Node node = it.next();
+            if (!(node instanceof ProcessorNode)) {
+                throw new IllegalStateException("Not a processor node " + node);
+            }
+            ProcessorNode processorNode = (ProcessorNode) node;
+            List<ProcessorNode> children = StreamUtil.getChildren(subgraph, processorNode);
+            ProcessorContext processorContext;
+            if (children.isEmpty()) {
+                processorContext = createEmittingContext(processorNode);
+            } else {
+                Multimap<String, ProcessorNode> streamToChildren = ArrayListMultimap.create();
+                for (ProcessorNode child : children) {
+                    for (String stream : child.getParentStreams(processorNode)) {
+                        streamToChildren.put(stream, child);
+                    }
+                }
+                ForwardingProcessorContext forwardingContext = new ForwardingProcessorContext(processorNode, streamToChildren);
+                if (hasOutgoingChild(processorNode, new HashSet<>(children))) {
+                    processorContext = new ChainedProcessorContext(processorNode, forwardingContext, createEmittingContext(processorNode));
+                } else {
+                    processorContext = forwardingContext;
+                }
+            }
+            processorNode.initProcessorContext(processorContext);
+        }
+        if (timestampField != null) {
+            for (EmittingProcessorContext ctx : emittingProcessorContexts) {
+                ctx.setTimestampField(timestampField);
+            }
+        }
+    }
+
+    void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (ProcessorNode node : nodes) {
+            for (String stream : node.getOutputStreams()) {
+                if (timestampField == null) {
+                    declarer.declareStream(stream, node.getOutputFields());
+                } else {
+                    List<String> fields = new ArrayList<>();
+                    fields.addAll(node.getOutputFields().toList());
+                    fields.add(timestampField);
+                    declarer.declareStream(stream, new Fields(fields));
+                }
+            }
+        }
+    }
+
+    void setAnchor(RefCountedTuple tuple) {
+        for (EmittingProcessorContext ctx : emittingProcessorContexts) {
+            ctx.setAnchor(tuple);
+        }
+    }
+
+    Pair<Object, String> getValueAndStream(Tuple input) {
+        Object value;
+        String stream;
+        // if tuple arrives from a spout, it can be passed as is
+        // otherwise the value is in the first field of the tuple
+        if (input.getSourceComponent().startsWith("spout")) {
+            value = input;
+            stream = input.getSourceGlobalStreamId().get_componentId() + input.getSourceGlobalStreamId().get_streamId();
+        } else if (isPair(input)) {
+            value = Pair.of(input.getValue(0), input.getValue(1));
+            stream = input.getSourceStreamId();
+        } else {
+            value = input.getValue(0);
+            stream = input.getSourceStreamId();
+        }
+        return Pair.of(value, stream);
+    }
+
+    void processAndAck(Tuple input) {
+        RefCountedTuple refCountedTuple = new RefCountedTuple(input);
+        setAnchor(refCountedTuple);
+        if (isEventTimestamp()) {
+            setEventTimestamp(input.getLongByField(getTimestampField()));
+        }
+        Pair<Object, String> valueAndStream = getValueAndStream(input);
+        process(valueAndStream.getFirst(), valueAndStream.getSecond());
+        ack(refCountedTuple);
+    }
+
+    void process(Object value, String sourceStreamId) {
+        LOG.debug("Process value {}, sourceStreamId {}", value, sourceStreamId);
+        Collection<ProcessorNode> initialProcessors = streamToInitialProcessors.get(sourceStreamId);
+        for (ProcessorNode processorNode : initialProcessors) {
+            Processor processor = processorNode.getProcessor();
+            if (StreamUtil.isPunctuation(value)) {
+                if (shouldPunctuate(processorNode, sourceStreamId)) {
+                    processor.punctuate(null);
+                    clearPunctuationState(processorNode);
+                }
+            } else {
+                processor.execute(value, sourceStreamId);
+            }
+        }
+    }
+
+    void setStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+        this.streamToInitialProcessors = streamToInitialProcessors;
+    }
+
+    void addStreamToInitialProcessors(Multimap<String, ProcessorNode> streamToInitialProcessors) {
+        this.streamToInitialProcessors.putAll(streamToInitialProcessors);
+    }
+
+    Set<String> getInitialStreams() {
+        return streamToInitialProcessors.keySet();
+    }
+
+    void setTimestampField(String fieldName) {
+        timestampField = fieldName;
+    }
+
+    boolean isEventTimestamp() {
+        return timestampField != null;
+    }
+
+    void setEventTimestamp(long timestamp) {
+        for (EmittingProcessorContext ctx : emittingProcessorContexts) {
+            ctx.setEventTimestamp(timestamp);
+        }
+    }
+
+    private String getTimestampField() {
+        return timestampField;
+    }
+
+    // if there are no windowed/batched processors, we would ack immediately
+    private void ack(RefCountedTuple tuple) {
+        if (tuple.shouldAck()) {
+            LOG.debug("ACKing tuple {}", tuple);
+            outputCollector.ack(tuple.tuple());
+            tuple.setAcked();
+        }
+    }
+
+    private ProcessorContext createEmittingContext(ProcessorNode processorNode) {
+        List<EmittingProcessorContext> emittingContexts = new ArrayList<>();
+        for (String stream : processorNode.getOutputStreams()) {
+            EmittingProcessorContext emittingContext = new EmittingProcessorContext(processorNode, outputCollector, stream);
+            if (StreamUtil.isSinkStream(stream)) {
+                emittingContext.setEmitPunctuation(false);
+            }
+            emittingContexts.add(emittingContext);
+        }
+        emittingProcessorContexts.addAll(emittingContexts);
+        outgoingProcessors.add(processorNode);
+        return new ChainedProcessorContext(processorNode, emittingContexts);
+    }
+
+    private boolean hasOutgoingChild(ProcessorNode processorNode, Set<ProcessorNode> boltChildren) {
+        for (Node child : getChildNodes(processorNode)) {
+            if ((child instanceof ProcessorNode && !boltChildren.contains(child))
+                    || child instanceof SinkNode) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    private Set<Node> getChildNodes(Node node) {
+        Set<Node> children = new HashSet<>();
+        for (Node child : StreamUtil.<Node>getChildren(graph, node)) {
+            if (child instanceof WindowNode || child instanceof PartitionNode) {
+                children.addAll(getChildNodes(child));
+            } else {
+                children.add(child);
+            }
+        }
+        return children;
+    }
+
+    // if we received punctuation from all parent windowed streams
+    private boolean shouldPunctuate(ProcessorNode processorNode, String sourceStreamId) {
+        if (processorNode.getWindowedParentStreams().size() <= 1) {
+            return true;
+        }
+        Set<String> receivedStreams = punctuationState.get(processorNode);
+        if (receivedStreams == null) {
+            receivedStreams = new HashSet<>();
+            punctuationState.put(processorNode, receivedStreams);
+        }
+        receivedStreams.add(sourceStreamId);
+        return receivedStreams.equals(processorNode.getWindowedParentStreams());
+    }
+
+    private void clearPunctuationState(ProcessorNode processorNode) {
+        Set<String> state = punctuationState.get(processorNode);
+        if (state != null) {
+            state.clear();
+        }
+    }
+
+    private boolean isPair(Tuple input) {
+        return input.size() == (timestampField == null ? 2 : 3);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java b/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
new file mode 100644
index 0000000..4771f4f
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/ProcessorNode.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.streams.processors.BatchProcessor;
+import org.apache.storm.streams.processors.Processor;
+import org.apache.storm.streams.processors.ProcessorContext;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Node that wraps a processor in the stream.
+ */
+public class ProcessorNode extends Node {
+    private final Processor<?> processor;
+    private final boolean isBatch;
+    private boolean windowed;
+    // Windowed parent streams
+    private Set<String> windowedParentStreams = Collections.emptySet();
+
+    public ProcessorNode(Processor<?> processor, String outputStream, Fields outputFields) {
+        super(outputStream, outputFields);
+        this.isBatch = processor instanceof BatchProcessor;
+        this.processor = processor;
+    }
+
+    public Processor<?> getProcessor() {
+        return processor;
+    }
+
+    public boolean isWindowed() {
+        return windowed;
+    }
+
+    public boolean isBatch() {
+        return isBatch;
+    }
+
+    public void setWindowed(boolean windowed) {
+        this.windowed = windowed;
+    }
+
+    public Set<String> getWindowedParentStreams() {
+        return Collections.unmodifiableSet(windowedParentStreams);
+    }
+
+    void initProcessorContext(ProcessorContext context) {
+        processor.init(context);
+    }
+
+    void setWindowedParentStreams(Set<String> windowedParentStreams) {
+        this.windowedParentStreams = new HashSet<>(windowedParentStreams);
+    }
+
+    @Override
+    public String toString() {
+        return "ProcessorNode{" +
+                "processor=" + processor +
+                ", windowed=" + windowed +
+                ", windowedParentStreams=" + windowedParentStreams +
+                "} " + super.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/RefCountedTuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/RefCountedTuple.java b/storm-core/src/jvm/org/apache/storm/streams/RefCountedTuple.java
new file mode 100644
index 0000000..e58bcc5
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/RefCountedTuple.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.tuple.Tuple;
+
+/**
+ * Provides reference counting of tuples. Used when operations that operate
+ * on a batch of tuples are involved (e.g. aggregation, join etc).
+ * The input tuples are acked once the result is emitted downstream.
+ */
+public class RefCountedTuple {
+    private int count = 0;
+    private final Tuple tuple;
+    private boolean acked;
+
+    RefCountedTuple(Tuple tuple) {
+        this.tuple = tuple;
+        this.acked = false;
+    }
+
+    public boolean shouldAck() {
+        return count == 0 && !acked;
+    }
+
+    public void increment() {
+        ++count;
+    }
+
+    public void decrement() {
+        --count;
+    }
+
+    public Tuple tuple() {
+        return tuple;
+    }
+
+    public void setAcked() {
+        acked = true;
+    }
+
+    @Override
+    public String toString() {
+        return "RefCountedTuple{" +
+                "count=" + count +
+                ", tuple=" + tuple +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/SinkNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/SinkNode.java b/storm-core/src/jvm/org/apache/storm/streams/SinkNode.java
new file mode 100644
index 0000000..d95ab6b
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/SinkNode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.topology.IBasicBolt;
+import org.apache.storm.topology.IComponent;
+import org.apache.storm.topology.IRichBolt;
+import org.apache.storm.utils.Utils;
+
+/**
+ * Sink node holds IRich or IBasic bolts that are passed
+ * via the {@code Stream#to()} api.
+ */
+class SinkNode extends Node {
+    private final IComponent bolt;
+
+    SinkNode(IComponent bolt) {
+        super(Utils.DEFAULT_STREAM_ID, getOutputFields(bolt, Utils.DEFAULT_STREAM_ID));
+        if (bolt instanceof IRichBolt || bolt instanceof IBasicBolt) {
+            this.bolt = bolt;
+        } else {
+            throw new IllegalArgumentException("Should be an IRichBolt or IBasicBolt");
+        }
+    }
+
+    IComponent getBolt() {
+        return bolt;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e251573d/storm-core/src/jvm/org/apache/storm/streams/SpoutNode.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/streams/SpoutNode.java b/storm-core/src/jvm/org/apache/storm/streams/SpoutNode.java
new file mode 100644
index 0000000..4784514
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/streams/SpoutNode.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.streams;
+
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+/**
+ * A spout node wraps an {@link IRichSpout}.
+ */
+class SpoutNode extends Node {
+    private final IRichSpout spout;
+
+    SpoutNode(IRichSpout spout) {
+        super(Utils.DEFAULT_STREAM_ID, getOutputFields(spout, Utils.DEFAULT_STREAM_ID));
+        if (outputFields.size() == 0) {
+            throw new IllegalArgumentException("Spout " + spout + " does not declare any fields" +
+                    "for the stream '" + Utils.DEFAULT_STREAM_ID + "'");
+        }
+        this.spout = spout;
+    }
+
+    IRichSpout getSpout() {
+        return spout;
+    }
+
+    @Override
+    void addOutputStream(String streamId) {
+        throw new UnsupportedOperationException("Cannot add output streams to a spout node");
+    }
+
+}


Mime
View raw message