storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [42/50] [abbrv] git commit: merge storm-starter into examples
Date Thu, 20 Mar 2014 21:23:05 GMT
merge storm-starter into examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/f1d7fca7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/f1d7fca7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/f1d7fca7

Branch: refs/heads/master
Commit: f1d7fca7ff1c68c184539e39502f579c9ef6dc39
Parents: 3aa6d7b 8364b51
Author: P. Taylor Goetz <ptgoetz@gmail.com>
Authored: Wed Mar 5 15:57:05 2014 -0500
Committer: P. Taylor Goetz <ptgoetz@gmail.com>
Committed: Wed Mar 5 15:57:05 2014 -0500

----------------------------------------------------------------------
 examples/storm-starter/.gitignore               |   8 +
 examples/storm-starter/LICENSE                  |  14 +
 examples/storm-starter/README.markdown          | 136 +++++++
 examples/storm-starter/m2-pom.xml               | 175 +++++++++
 .../multilang/resources/splitsentence.py        |   9 +
 .../multilang/resources/splitsentence.rb        |  11 +
 .../storm-starter/multilang/resources/storm.py  | 206 +++++++++++
 .../storm-starter/multilang/resources/storm.rb  | 185 ++++++++++
 examples/storm-starter/project.clj              |  25 ++
 .../src/clj/storm/starter/clj/word_count.clj    |  80 +++++
 .../jvm/storm/starter/BasicDRPCTopology.java    |  61 ++++
 .../jvm/storm/starter/ExclamationTopology.java  |  70 ++++
 .../src/jvm/storm/starter/ManualDRPC.java       |  51 +++
 .../jvm/storm/starter/PrintSampleStream.java    |  37 ++
 .../src/jvm/storm/starter/ReachTopology.java    | 179 ++++++++++
 .../src/jvm/storm/starter/RollingTopWords.java  |  61 ++++
 .../jvm/storm/starter/SingleJoinExample.java    |  47 +++
 .../storm/starter/TransactionalGlobalCount.java | 156 +++++++++
 .../jvm/storm/starter/TransactionalWords.java   | 229 ++++++++++++
 .../jvm/storm/starter/WordCountTopology.java    |  90 +++++
 .../storm/starter/bolt/AbstractRankerBolt.java  |  93 +++++
 .../starter/bolt/IntermediateRankingsBolt.java  |  41 +++
 .../src/jvm/storm/starter/bolt/PrinterBolt.java |  20 ++
 .../storm/starter/bolt/RollingCountBolt.java    | 125 +++++++
 .../jvm/storm/starter/bolt/SingleJoinBolt.java  |  97 +++++
 .../storm/starter/bolt/TotalRankingsBolt.java   |  42 +++
 .../starter/spout/RandomSentenceSpout.java      |  47 +++
 .../storm/starter/spout/TwitterSampleSpout.java | 105 ++++++
 .../tools/NthLastModifiedTimeTracker.java       |  53 +++
 .../src/jvm/storm/starter/tools/Rankable.java   |  15 +
 .../starter/tools/RankableObjectWithFields.java | 131 +++++++
 .../src/jvm/storm/starter/tools/Rankings.java   | 139 ++++++++
 .../starter/tools/SlidingWindowCounter.java     | 102 ++++++
 .../storm/starter/tools/SlotBasedCounter.java   | 101 ++++++
 .../jvm/storm/starter/trident/TridentReach.java | 139 ++++++++
 .../storm/starter/trident/TridentWordCount.java |  68 ++++
 .../src/jvm/storm/starter/util/StormRunner.java |  22 ++
 .../jvm/storm/starter/util/TupleHelpers.java    |  16 +
 examples/storm-starter/storm-starter.iml        |  82 +++++
 .../bolt/IntermediateRankingsBoltTest.java      | 129 +++++++
 .../starter/bolt/RollingCountBoltTest.java      |  96 +++++
 .../starter/bolt/TotalRankingsBoltTest.java     | 130 +++++++
 .../storm/starter/tools/MockTupleHelpers.java   |  23 ++
 .../tools/NthLastModifiedTimeTrackerTest.java   | 108 ++++++
 .../tools/RankableObjectWithFieldsTest.java     | 235 +++++++++++++
 .../jvm/storm/starter/tools/RankingsTest.java   | 351 +++++++++++++++++++
 .../starter/tools/SlidingWindowCounterTest.java |  89 +++++
 .../starter/tools/SlotBasedCounterTest.java     | 164 +++++++++
 48 files changed, 4593 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/.gitignore
----------------------------------------------------------------------
diff --cc examples/storm-starter/.gitignore
index 0000000,0000000..c4b88f3
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/.gitignore
@@@ -1,0 -1,0 +1,8 @@@
++classes/
++lib/
++*.pyc
++.classpath
++.project
++*.jar
++.lein-deps-sum
++

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/LICENSE
----------------------------------------------------------------------
diff --cc examples/storm-starter/LICENSE
index 0000000,0000000..19560d1
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/LICENSE
@@@ -1,0 -1,0 +1,14 @@@
++Copyright 2011 Nathan Marz
++
++   Licensed under the Apache License, Version 2.0 (the "License");
++   you may not use this file except in compliance with the License.
++   You may obtain a copy of the License at
++
++       http://www.apache.org/licenses/LICENSE-2.0
++
++   Unless required by applicable law or agreed to in writing, software
++   distributed under the License is distributed on an "AS IS" BASIS,
++   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++   See the License for the specific language governing permissions and
++   limitations under the License.
++

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/README.markdown
----------------------------------------------------------------------
diff --cc examples/storm-starter/README.markdown
index 0000000,0000000..e271ac1
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/README.markdown
@@@ -1,0 -1,0 +1,136 @@@
++# Example Storm Topologies
++
++Learn to use Storm!
++
++---
++
++Table of Contents
++
++* <a href="#getting-started">Getting started</a>
++* <a href="#leiningen">Using storm-starter with Leiningen</a>
++* <a href="#maven">Using storm-starter with Maven</a>
++* <a href="#intellij-idea">Using storm-starter with IntelliJ IDEA</a>
++
++---
++
++
++<a name="getting-started"></a>
++
++# Getting started
++
++## Prerequisites
++
++First, you need `java` and `git` installed and in your user's `PATH`.  Also, two of the examples in storm-starter
++require Python and Ruby.
++
++Next, make sure you have the storm-starter code available on your machine.  Git/GitHub beginners may want to use the
++following command to download the latest storm-starter code and change to the new directory that contains the downloaded
++code.
++
++    $ git clone git://github.com/nathanmarz/storm-starter.git && cd storm-starter
++
++
++## storm-starter overview
++
++storm-starter contains a variety of examples of using Storm.  If this is your first time working with Storm, check out
++these topologies first:
++
++1. [ExclamationTopology](src/jvm/storm/starter/ExclamationTopology.java):  Basic topology written in all Java
++2. [WordCountTopology](src/jvm/storm/starter/WordCountTopology.java):  Basic topology that makes use of multilang by
++   implementing one bolt in Python
++3. [ReachTopology](src/jvm/storm/starter/ReachTopology.java): Example of complex DRPC on top of Storm
++
++After you have familiarized yourself with these topologies, take a look at the other topopologies in
++[src/jvm/storm/starter/](src/jvm/storm/starter/) such as [RollingTopWords](src/jvm/storm/starter/RollingTopWords.java)
++for more advanced implementations.
++
++If you want to learn more about how Storm works, please head over to the
++[Storm project page](http://github.com/nathanmarz/storm).
++
++
++<a name="leiningen"></a>
++
++# Using storm-starter with Leiningen
++
++## Install Leiningen
++
++The storm-starter build uses [Leiningen](http://leiningen.org/) 2.0.  Install Leiningen by following the
++[leiningen installation instructions](https://github.com/technomancy/leiningen).
++
++
++## Running topologies with Leiningen
++
++### To run a Java topology
++
++    $ lein deps
++    $ lein compile
++    $ java -cp $(lein classpath) storm.starter.ExclamationTopology
++
++
++### To run a Clojure topology:
++
++    $ lein deps
++    $ lein compile
++    $ lein run -m storm.starter.clj.word-count
++
++
++<a name="maven"></a>
++
++# Using storm-starter with Maven
++
++## Install Maven
++
++[Maven](http://maven.apache.org/) is an alternative to Leiningen.  Install Maven (preferably version 3.x) by following
++the [Maven installation instructions](http://maven.apache.org/download.cgi).
++
++
++## Running topologies with Maven
++
++storm-starter contains [m2-pom.xml](m2-pom.xml) which can be used with Maven using the `-f` option. For example, to
++compile and run `WordCountTopology` in local mode, use the command:
++
++    $ mvn -f m2-pom.xml compile exec:java -Dstorm.topology=storm.starter.WordCountTopology
++
++You can also run clojure topologies with Maven:
++
++    $ mvn -f m2-pom.xml compile exec:java -Dstorm.topology=storm.starter.clj.word_count
++
++## Packaging storm-starter for use on a Storm cluster
++
++You can package a jar suitable for submitting to a Storm cluster with the command:
++
++    $ mvn -f m2-pom.xml package
++
++This will package your code and all the non-Storm dependencies into a single "uberjar" at the path
++`target/storm-starter-{version}-jar-with-dependencies.jar`.
++
++
++## Running unit tests
++
++Use the following Maven command to run the unit tests that ship with storm-starter.  Unfortunately `lein test` does not
++yet run the included unit tests.
++
++    $ mvn -f m2-pom.xml test
++
++
++<a name="intellij-idea"></a>
++
++# Using storm-starter with IntelliJ IDEA
++
++## Importing storm-starter as a project in IDEA
++
++The following instructions will import storm-starter as a new project in IntelliJ IDEA.
++
++* Copy `m2-pom.xml` to `pom.xml`.  This is requried so that IDEA (or Eclipse) can properly detect the maven
++  configuration.
++* Open _File > Import Project..._ and navigate to the top-level directory of your storm-starter clone (e.g.
++  `~/git/storm-starter`).
++* Select _Import project from external model_, select "Maven", and click _Next_.
++* In the following screen, enable the checkbox _Import Maven projects automatically_.  Leave all other values at their
++  defaults.  Click _Next_.
++* Click _Next_ on the following screen about selecting Maven projects to import.
++* Select the JDK to be used by IDEA for storm-starter, then click _Next_.
++    * At the time of this writing you should use JDK 6.
++    * It is strongly recommended to use Sun/Oracle JDK 6 rather than OpenJDK 6.
++* You may now optionally change the name of the project in IDEA.  The default name suggested by IDEA is "storm-starter".
++  Click _Finish_ once you are done.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/m2-pom.xml
----------------------------------------------------------------------
diff --cc examples/storm-starter/m2-pom.xml
index 0000000,0000000..77011a7
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/m2-pom.xml
@@@ -1,0 -1,0 +1,175 @@@
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
++  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++  <modelVersion>4.0.0</modelVersion>
++
++  <groupId>storm.starter</groupId>
++  <artifactId>storm-starter</artifactId>
++  <version>0.0.1-SNAPSHOT</version>
++  <packaging>jar</packaging>
++
++  <name>storm-starter</name>
++  <url>https://github.com/nathanmarz/storm-starter</url>
++
++  <properties>
++    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
++  </properties>
++
++  <repositories>
++    <repository>
++      <id>github-releases</id>
++      <url>http://oss.sonatype.org/content/repositories/github-releases/</url>
++    </repository>
++    <repository>
++      <id>clojars.org</id>
++      <url>http://clojars.org/repo</url>
++    </repository>
++  </repositories>
++
++  <dependencies>
++    <dependency>
++      <groupId>junit</groupId>
++      <artifactId>junit</artifactId>
++      <version>3.8.1</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.testng</groupId>
++      <artifactId>testng</artifactId>
++      <version>6.8.5</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.mockito</groupId>
++      <artifactId>mockito-all</artifactId>
++      <version>1.9.0</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.easytesting</groupId>
++      <artifactId>fest-assert-core</artifactId>
++      <version>2.0M8</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>org.jmock</groupId>
++      <artifactId>jmock</artifactId>
++      <version>2.6.0</version>
++      <scope>test</scope>
++    </dependency>
++    <dependency>
++      <groupId>storm</groupId>
++      <artifactId>storm</artifactId>
++      <version>0.9.0.1</version>
++      <!-- keep storm out of the jar-with-dependencies -->
++      <scope>provided</scope>
++    </dependency>
++    <dependency>
++      <groupId>commons-collections</groupId>
++      <artifactId>commons-collections</artifactId>
++      <version>3.2.1</version>
++    </dependency>
++    <dependency>
++      <groupId>com.google.guava</groupId>
++      <artifactId>guava</artifactId>
++      <version>15.0</version>
++    </dependency>
++  </dependencies>
++
++  <build>
++    <sourceDirectory>src/jvm</sourceDirectory>
++    <testSourceDirectory>test/jvm</testSourceDirectory>
++    <resources>
++      <resource>
++        <directory>${basedir}/multilang</directory>
++      </resource>
++    </resources>
++
++    <plugins>
++      <!--
++        Bind the maven-assembly-plugin to the package phase
++        this will create a jar file without the storm dependencies
++        suitable for deployment to a cluster.
++       -->
++      <plugin>
++        <artifactId>maven-assembly-plugin</artifactId>
++        <configuration>
++          <descriptorRefs>
++            <descriptorRef>jar-with-dependencies</descriptorRef>
++          </descriptorRefs>
++          <archive>
++            <manifest>
++              <mainClass></mainClass>
++            </manifest>
++          </archive>
++        </configuration>
++        <executions>
++          <execution>
++            <id>make-assembly</id>
++            <phase>package</phase>
++            <goals>
++              <goal>single</goal>
++            </goals>
++          </execution>
++        </executions>
++      </plugin>
++
++      <plugin>
++        <groupId>com.theoryinpractise</groupId>
++        <artifactId>clojure-maven-plugin</artifactId>
++        <version>1.3.12</version>
++        <extensions>true</extensions>
++        <configuration>
++          <sourceDirectories>
++            <sourceDirectory>src/clj</sourceDirectory>
++          </sourceDirectories>
++        </configuration>
++        <executions>
++          <execution>
++            <id>compile</id>
++            <phase>compile</phase>
++            <goals>
++              <goal>compile</goal>
++            </goals>
++          </execution>
++          <execution>
++            <id>test</id>
++            <phase>test</phase>
++            <goals>
++              <goal>test</goal>
++            </goals>
++          </execution>
++        </executions>
++      </plugin>
++
++      <plugin>
++        <groupId>org.codehaus.mojo</groupId>
++        <artifactId>exec-maven-plugin</artifactId>
++        <version>1.2.1</version>
++        <executions>
++          <execution>
++            <goals>
++              <goal>exec</goal>
++            </goals>
++          </execution>
++        </executions>
++        <configuration>
++          <executable>java</executable>
++          <includeProjectDependencies>true</includeProjectDependencies>
++          <includePluginDependencies>false</includePluginDependencies>
++          <classpathScope>compile</classpathScope>
++          <mainClass>${storm.topology}</mainClass>
++        </configuration>
++      </plugin>
++
++      <plugin>
++        <groupId>org.apache.maven.plugins</groupId>
++        <artifactId>maven-compiler-plugin</artifactId>
++        <configuration>
++          <source>1.6</source>
++          <target>1.6</target>
++        </configuration>
++      </plugin>
++
++    </plugins>
++  </build>
++</project>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/multilang/resources/splitsentence.py
----------------------------------------------------------------------
diff --cc examples/storm-starter/multilang/resources/splitsentence.py
index 0000000,0000000..24a9c9b
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/multilang/resources/splitsentence.py
@@@ -1,0 -1,0 +1,9 @@@
++import storm
++
++class SplitSentenceBolt(storm.BasicBolt):
++    def process(self, tup):
++        words = tup.values[0].split(" ")
++        for word in words:
++          storm.emit([word])
++
++SplitSentenceBolt().run()

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/multilang/resources/splitsentence.rb
----------------------------------------------------------------------
diff --cc examples/storm-starter/multilang/resources/splitsentence.rb
index 0000000,0000000..6b719ed
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/multilang/resources/splitsentence.rb
@@@ -1,0 -1,0 +1,11 @@@
++require "./storm"
++
++class SplitSentenceBolt < Storm::Bolt
++  def process(tup)
++    tup.values[0].split(" ").each do |word|
++      emit([word])
++    end
++  end
++end
++
++SplitSentenceBolt.new.run

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/multilang/resources/storm.py
----------------------------------------------------------------------
diff --cc examples/storm-starter/multilang/resources/storm.py
index 0000000,0000000..c06b255
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/multilang/resources/storm.py
@@@ -1,0 -1,0 +1,206 @@@
++import sys
++import os
++import traceback
++from collections import deque
++
++try:
++    import simplejson as json
++except ImportError:
++    import json
++
++json_encode = lambda x: json.dumps(x)
++json_decode = lambda x: json.loads(x)
++
++#reads lines and reconstructs newlines appropriately
++def readMsg():
++    msg = ""
++    while True:
++        line = sys.stdin.readline()[0:-1]
++        if line == "end":
++            break
++        msg = msg + line + "\n"
++    return json_decode(msg[0:-1])
++
++MODE = None
++ANCHOR_TUPLE = None
++
++#queue up commands we read while trying to read taskids
++pending_commands = deque()
++
++def readTaskIds():
++    if pending_taskids:
++        return pending_taskids.popleft()
++    else:
++        msg = readMsg()
++        while type(msg) is not list:
++            pending_commands.append(msg)
++            msg = readMsg()
++        return msg
++
++#queue up taskids we read while trying to read commands/tuples
++pending_taskids = deque()
++
++def readCommand():
++    if pending_commands:
++        return pending_commands.popleft()
++    else:
++        msg = readMsg()
++        while type(msg) is list:
++            pending_taskids.append(msg)
++            msg = readMsg()
++        return msg
++
++def readTuple():
++    cmd = readCommand()
++    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
++
++def sendMsgToParent(msg):
++    print json_encode(msg)
++    print "end"
++    sys.stdout.flush()
++
++def sync():
++    sendMsgToParent({'command':'sync'})
++
++def sendpid(heartbeatdir):
++    pid = os.getpid()
++    sendMsgToParent({'pid':pid})
++    open(heartbeatdir + "/" + str(pid), "w").close()    
++
++def emit(*args, **kwargs):
++    __emit(*args, **kwargs)
++    return readTaskIds()
++
++def emitDirect(task, *args, **kwargs):
++    kwargs[directTask] = task
++    __emit(*args, **kwargs)
++
++def __emit(*args, **kwargs):
++    global MODE
++    if MODE == Bolt:
++        emitBolt(*args, **kwargs)
++    elif MODE == Spout:
++        emitSpout(*args, **kwargs)
++
++def emitBolt(tup, stream=None, anchors = [], directTask=None):
++    global ANCHOR_TUPLE
++    if ANCHOR_TUPLE is not None:
++        anchors = [ANCHOR_TUPLE]
++    m = {"command": "emit"}
++    if stream is not None:
++        m["stream"] = stream
++    m["anchors"] = map(lambda a: a.id, anchors)
++    if directTask is not None:
++        m["task"] = directTask
++    m["tuple"] = tup
++    sendMsgToParent(m)
++    
++def emitSpout(tup, stream=None, id=None, directTask=None):
++    m = {"command": "emit"}
++    if id is not None:
++        m["id"] = id
++    if stream is not None:
++        m["stream"] = stream
++    if directTask is not None:
++        m["task"] = directTask
++    m["tuple"] = tup
++    sendMsgToParent(m)
++
++def ack(tup):
++    sendMsgToParent({"command": "ack", "id": tup.id})
++
++def fail(tup):
++    sendMsgToParent({"command": "fail", "id": tup.id})
++
++def log(msg):
++    sendMsgToParent({"command": "log", "msg": msg})
++
++def initComponent():
++    setupInfo = readMsg()
++    sendpid(setupInfo['pidDir'])
++    return [setupInfo['conf'], setupInfo['context']]
++
++class Tuple:
++    def __init__(self, id, component, stream, task, values):
++        self.id = id
++        self.component = component
++        self.stream = stream
++        self.task = task
++        self.values = values
++
++    def __repr__(self):
++        return '<%s%s>' % (
++                self.__class__.__name__,
++                ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
++
++class Bolt:
++    def initialize(self, stormconf, context):
++        pass
++
++    def process(self, tuple):
++        pass
++
++    def run(self):
++        global MODE
++        MODE = Bolt
++        conf, context = initComponent()
++        self.initialize(conf, context)
++        try:
++            while True:
++                tup = readTuple()
++                self.process(tup)
++        except Exception, e:
++            log(traceback.format_exc(e))
++
++class BasicBolt:
++    def initialize(self, stormconf, context):
++        pass
++
++    def process(self, tuple):
++        pass
++
++    def run(self):
++        global MODE
++        MODE = Bolt
++        global ANCHOR_TUPLE
++        conf, context = initComponent()
++        self.initialize(conf, context)
++        try:
++            while True:
++                tup = readTuple()
++                ANCHOR_TUPLE = tup
++                self.process(tup)
++                ack(tup)
++        except Exception, e:
++            log(traceback.format_exc(e))
++
++class Spout:
++    def initialize(self, conf, context):
++        pass
++
++    def ack(self, id):
++        pass
++
++    def fail(self, id):
++        pass
++
++    def nextTuple(self):
++        pass
++
++    def run(self):
++        global MODE
++        MODE = Spout
++        conf, context = initComponent()
++        self.initialize(conf, context)
++        try:
++            while True:
++                msg = readCommand()
++                if msg["command"] == "next":
++                    self.nextTuple()
++                if msg["command"] == "ack":
++                    self.ack(msg["id"])
++                if msg["command"] == "fail":
++                    self.fail(msg["id"])
++                sync()
++        except Exception, e:
++            log(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/multilang/resources/storm.rb
----------------------------------------------------------------------
diff --cc examples/storm-starter/multilang/resources/storm.rb
index 0000000,0000000..017fc25
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/multilang/resources/storm.rb
@@@ -1,0 -1,0 +1,185 @@@
++require "rubygems"
++require "json"
++
++module Storm
++  module Protocol
++    class << self
++      attr_accessor :mode, :pending_taskids, :pending_commands
++    end
++
++    self.pending_taskids = []
++    self.pending_commands = []
++
++    def read_message
++      msg = ""
++      loop do
++        line = STDIN.readline.chomp
++        break if line == "end"
++        msg << line
++        msg << "\n"
++      end
++      JSON.parse msg.chomp
++    end
++
++    def read_task_ids
++      Storm::Protocol.pending_taskids.shift ||
++        begin
++          msg = read_message
++          until msg.is_a? Array
++            Storm::Protocol.pending_commands.push(msg)
++            msg = read_message
++          end
++          msg
++        end
++    end
++
++    def read_command
++      Storm::Protocol.pending_commands.shift ||
++        begin
++          msg = read_message
++          while msg.is_a? Array
++            Storm::Protocol.pending_taskids.push(msg)
++            msg = read_message
++          end
++          msg
++        end
++    end
++
++    def send_msg_to_parent(msg)
++      puts msg.to_json
++      puts "end"
++      STDOUT.flush
++    end
++
++    def sync
++      send_msg_to_parent({'command' => 'sync'})
++    end
++
++    def send_pid(heartbeat_dir)
++      pid = Process.pid
++      send_msg_to_parent({'pid' => pid})
++      File.open("#{heartbeat_dir}/#{pid}", "w").close
++    end
++
++    def emit_bolt(tup, args = {})
++      stream = args[:stream]
++      anchors = args[:anchors] || args[:anchor] || []
++      anchors = [anchors] unless anchors.is_a? Enumerable
++      direct = args[:direct_task]
++      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
++      m[:stream] = stream if stream
++      m[:task] = direct if direct
++      send_msg_to_parent m
++      read_task_ids unless direct
++    end
++
++    def emit_spout(tup, args = {})
++      stream = args[:stream]
++      id = args[:id]
++      direct = args[:direct_task]
++      m = {:command => :emit, :tuple => tup}
++      m[:id] = id if id
++      m[:stream] = stream if stream
++      m[:task] = direct if direct
++      send_msg_to_parent m
++      read_task_ids unless direct
++    end
++
++    def emit(*args)
++      case Storm::Protocol.mode
++      when 'spout'
++        emit_spout(*args)
++      when 'bolt'
++        emit_bolt(*args)
++      end
++    end
++
++    def ack(tup)
++      send_msg_to_parent :command => :ack, :id => tup.id
++    end
++
++    def fail(tup)
++      send_msg_to_parent :command => :fail, :id => tup.id
++    end
++
++    def log(msg)
++      send_msg_to_parent :command => :log, :msg => msg.to_s
++    end
++
++    def handshake
++      setup_info = read_message
++      send_pid setup_info['pidDir']
++      [setup_info['conf'], setup_info['context']]
++    end
++  end
++
++  class Tuple
++    attr_accessor :id, :component, :stream, :task, :values
++
++    def initialize(id, component, stream, task, values)
++      @id = id
++      @component = component
++      @stream = stream
++      @task = task
++      @values = values
++    end
++
++    def self.from_hash(hash)
++      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
++    end
++  end
++
++  class Bolt
++    include Storm::Protocol
++
++    def prepare(conf, context); end
++
++    def process(tuple); end
++
++    def run
++      Storm::Protocol.mode = 'bolt'
++      prepare(*handshake)
++      begin
++        while true
++          process Tuple.from_hash(read_command)
++        end
++      rescue Exception => e
++        log 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
++      end
++    end
++  end
++
++  class Spout
++    include Storm::Protocol
++
++    def open(conf, context); end
++
++    def nextTuple; end
++
++    def ack(id); end
++
++    def fail(id); end
++
++    def run
++      Storm::Protocol.mode = 'spout'
++      open(*handshake)
++
++      begin
++        while true
++          msg = read_command
++          case msg['command']
++          when 'next'
++            nextTuple
++          when 'ack'
++            ack(msg['id'])
++          when 'fail'
++            fail(msg['id'])
++          end
++          sync
++        end
++      rescue Exception => e
++        log 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
++      end
++    end
++  end
++end

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/project.clj
----------------------------------------------------------------------
diff --cc examples/storm-starter/project.clj
index 0000000,0000000..ae6c377
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/project.clj
@@@ -1,0 -1,0 +1,25 @@@
++(defproject storm-starter "0.0.1-SNAPSHOT"
++  :source-paths ["src/clj"]
++  :java-source-paths ["src/jvm"]
++  :test-paths ["test/jvm"]
++  :resource-paths ["multilang"]
++  :aot :all
++  :repositories {
++;;                 "twitter4j" "http://twitter4j.org/maven2"
++                 }
++
++  :dependencies [
++;;                 [org.twitter4j/twitter4j-core "2.2.6-SNAPSHOT"]
++;;                 [org.twitter4j/twitter4j-stream "2.2.6-SNAPSHOT"]
++                   [commons-collections/commons-collections "3.2.1"]
++                 ]
++
++  :profiles {:dev
++              {:dependencies [[storm "0.9.0.1"]
++                              [org.clojure/clojure "1.4.0"]
++                              [org.testng/testng "6.8.5"]
++                              [org.easytesting/fest-assert-core "2.0M8"]
++                              [org.mockito/mockito-all "1.9.0"]
++                              [org.jmock/jmock "2.6.0"]]}}
++  :min-lein-version "2.0.0"
++  )

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/clj/storm/starter/clj/word_count.clj
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/clj/storm/starter/clj/word_count.clj
index 0000000,0000000..ce2725d
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/clj/storm/starter/clj/word_count.clj
@@@ -1,0 -1,0 +1,80 @@@
++(ns storm.starter.clj.word-count
++  (:import [backtype.storm StormSubmitter LocalCluster])
++  (:use [backtype.storm clojure config])
++  (:gen-class))
++
++(defspout sentence-spout ["sentence"]
++  [conf context collector]
++  (let [sentences ["a little brown dog"
++                   "the man petted the dog"
++                   "four score and seven years ago"
++                   "an apple a day keeps the doctor away"]]
++    (spout
++     (nextTuple []
++       (Thread/sleep 100)
++       (emit-spout! collector [(rand-nth sentences)])         
++       )
++     (ack [id]
++        ;; You only need to define this method for reliable spouts
++        ;; (such as one that reads off of a queue like Kestrel)
++        ;; This is an unreliable spout, so it does nothing here
++        ))))
++
++(defspout sentence-spout-parameterized ["word"] {:params [sentences] :prepare false}
++  [collector]
++  (Thread/sleep 500)
++  (emit-spout! collector [(rand-nth sentences)]))
++
++(defbolt split-sentence ["word"] [tuple collector]
++  (let [words (.split (.getString tuple 0) " ")]
++    (doseq [w words]
++      (emit-bolt! collector [w] :anchor tuple))
++    (ack! collector tuple)
++    ))
++
++(defbolt word-count ["word" "count"] {:prepare true}
++  [conf context collector]
++  (let [counts (atom {})]
++    (bolt
++     (execute [tuple]
++       (let [word (.getString tuple 0)]
++         (swap! counts (partial merge-with +) {word 1})
++         (emit-bolt! collector [word (@counts word)] :anchor tuple)
++         (ack! collector tuple)
++         )))))
++
++(defn mk-topology []
++
++  (topology
++   {"1" (spout-spec sentence-spout)
++    "2" (spout-spec (sentence-spout-parameterized
++                     ["the cat jumped over the door"
++                      "greetings from a faraway land"])
++                     :p 2)}
++   {"3" (bolt-spec {"1" :shuffle "2" :shuffle}
++                   split-sentence
++                   :p 5)
++    "4" (bolt-spec {"3" ["word"]}
++                   word-count
++                   :p 6)}))
++
++(defn run-local! []
++  (let [cluster (LocalCluster.)]
++    (.submitTopology cluster "word-count" {TOPOLOGY-DEBUG true} (mk-topology))
++    (Thread/sleep 10000)
++    (.shutdown cluster)
++    ))
++
++(defn submit-topology! [name]
++  (StormSubmitter/submitTopology
++   name
++   {TOPOLOGY-DEBUG true
++    TOPOLOGY-WORKERS 3}
++   (mk-topology)))
++
++(defn -main
++  ([]
++   (run-local!))
++  ([name]
++   (submit-topology! name)))
++

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
index 0000000,0000000..7e7ef93
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/BasicDRPCTopology.java
@@@ -1,0 -1,0 +1,61 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.StormSubmitter;
++import backtype.storm.drpc.LinearDRPCTopologyBuilder;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++
++/**
++ * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a
++ * "!" to any string you send the DRPC function.
++ * <p/>
++ * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of
++ * Storm.
++ */
++public class BasicDRPCTopology {
++  public static class ExclaimBolt extends BaseBasicBolt {
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector collector) {
++      String input = tuple.getString(1);
++      collector.emit(new Values(tuple.getValue(0), input + "!"));
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "result"));
++    }
++
++  }
++
++  public static void main(String[] args) throws Exception {
++    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
++    builder.addBolt(new ExclaimBolt(), 3);
++
++    Config conf = new Config();
++
++    if (args == null || args.length == 0) {
++      LocalDRPC drpc = new LocalDRPC();
++      LocalCluster cluster = new LocalCluster();
++
++      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
++
++      for (String word : new String[]{ "hello", "goodbye" }) {
++        System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
++      }
++
++      cluster.shutdown();
++      drpc.shutdown();
++    }
++    else {
++      conf.setNumWorkers(3);
++      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
index 0000000,0000000..fed8b1e
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ExclamationTopology.java
@@@ -1,0 -1,0 +1,70 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.StormSubmitter;
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.testing.TestWordSpout;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.TopologyBuilder;
++import backtype.storm.topology.base.BaseRichBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import backtype.storm.utils.Utils;
++
++import java.util.Map;
++
++/**
++ * This is a basic example of a Storm topology.
++ */
++public class ExclamationTopology {
++
++  public static class ExclamationBolt extends BaseRichBolt {
++    OutputCollector _collector;
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
++      _collector = collector;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
++      _collector.ack(tuple);
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("word"));
++    }
++
++
++  }
++
++  public static void main(String[] args) throws Exception {
++    TopologyBuilder builder = new TopologyBuilder();
++
++    builder.setSpout("word", new TestWordSpout(), 10);
++    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
++    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
++
++    Config conf = new Config();
++    conf.setDebug(true);
++
++    if (args != null && args.length > 0) {
++      conf.setNumWorkers(3);
++
++      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
++    }
++    else {
++
++      LocalCluster cluster = new LocalCluster();
++      cluster.submitTopology("test", conf, builder.createTopology());
++      Utils.sleep(10000);
++      cluster.killTopology("test");
++      cluster.shutdown();
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
index 0000000,0000000..ade4ab1
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ManualDRPC.java
@@@ -1,0 -1,0 +1,51 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.drpc.DRPCSpout;
++import backtype.storm.drpc.ReturnResults;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.TopologyBuilder;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++
++
++public class ManualDRPC {
++  public static class ExclamationBolt extends BaseBasicBolt {
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("result", "return-info"));
++    }
++
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector collector) {
++      String arg = tuple.getString(0);
++      Object retInfo = tuple.getValue(1);
++      collector.emit(new Values(arg + "!!!", retInfo));
++    }
++
++  }
++
++  public static void main(String[] args) {
++    TopologyBuilder builder = new TopologyBuilder();
++    LocalDRPC drpc = new LocalDRPC();
++
++    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
++    builder.setSpout("drpc", spout);
++    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
++    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
++
++    LocalCluster cluster = new LocalCluster();
++    Config conf = new Config();
++    cluster.submitTopology("exclaim", conf, builder.createTopology());
++
++    System.out.println(drpc.execute("exclamation", "aaa"));
++    System.out.println(drpc.execute("exclamation", "bbb"));
++
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
index 0000000,0000000..e880e16
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/PrintSampleStream.java
@@@ -1,0 -1,0 +1,37 @@@
++/*
++// to use this example, uncomment the twitter4j dependency information in the project.clj,
++// uncomment storm.starter.spout.TwitterSampleSpout, and uncomment this class
++
++package storm.starter;
++
++import storm.starter.spout.TwitterSampleSpout;
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.topology.TopologyBuilder;
++import backtype.storm.utils.Utils;
++import storm.starter.bolt.PrinterBolt;
++
++
++public class PrintSampleStream {        
++    public static void main(String[] args) {
++        String username = args[0];
++        String pwd = args[1];
++        TopologyBuilder builder = new TopologyBuilder();
++        
++        builder.setSpout("spout", new TwitterSampleSpout(username, pwd));
++        builder.setBolt("print", new PrinterBolt())
++                .shuffleGrouping("spout");
++                
++        
++        Config conf = new Config();
++        
++        
++        LocalCluster cluster = new LocalCluster();
++        
++        cluster.submitTopology("test", conf, builder.createTopology());
++        
++        Utils.sleep(10000);
++        cluster.shutdown();
++    }
++}
++*/

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
index 0000000,0000000..a63db3e
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ReachTopology.java
@@@ -1,0 -1,0 +1,179 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.LocalDRPC;
++import backtype.storm.StormSubmitter;
++import backtype.storm.coordination.BatchOutputCollector;
++import backtype.storm.drpc.LinearDRPCTopologyBuilder;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.topology.base.BaseBatchBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++
++import java.util.*;
++
++/**
++ * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
++ * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
++ * <p/>
++ * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
++ * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
++ * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
++ * records.
++ * <p/>
++ * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
++ * minutes on a single machine into one that takes just a couple seconds.
++ * <p/>
++ * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
++ * <p/>
++ * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC.
++ */
++public class ReachTopology {
++  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
++    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
++    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
++    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
++  }};
++
++  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
++    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
++    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
++    put("tim", Arrays.asList("alex"));
++    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
++    put("adam", Arrays.asList("david", "carissa"));
++    put("mike", Arrays.asList("john", "bob"));
++    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
++  }};
++
++  public static class GetTweeters extends BaseBasicBolt {
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector collector) {
++      Object id = tuple.getValue(0);
++      String url = tuple.getString(1);
++      List<String> tweeters = TWEETERS_DB.get(url);
++      if (tweeters != null) {
++        for (String tweeter : tweeters) {
++          collector.emit(new Values(id, tweeter));
++        }
++      }
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "tweeter"));
++    }
++  }
++
++  public static class GetFollowers extends BaseBasicBolt {
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector collector) {
++      Object id = tuple.getValue(0);
++      String tweeter = tuple.getString(1);
++      List<String> followers = FOLLOWERS_DB.get(tweeter);
++      if (followers != null) {
++        for (String follower : followers) {
++          collector.emit(new Values(id, follower));
++        }
++      }
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "follower"));
++    }
++  }
++
++  public static class PartialUniquer extends BaseBatchBolt {
++    BatchOutputCollector _collector;
++    Object _id;
++    Set<String> _followers = new HashSet<String>();
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
++      _collector = collector;
++      _id = id;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      _followers.add(tuple.getString(1));
++    }
++
++    @Override
++    public void finishBatch() {
++      _collector.emit(new Values(_id, _followers.size()));
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "partial-count"));
++    }
++  }
++
++  public static class CountAggregator extends BaseBatchBolt {
++    BatchOutputCollector _collector;
++    Object _id;
++    int _count = 0;
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
++      _collector = collector;
++      _id = id;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      _count += tuple.getInteger(1);
++    }
++
++    @Override
++    public void finishBatch() {
++      _collector.emit(new Values(_id, _count));
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "reach"));
++    }
++  }
++
++  public static LinearDRPCTopologyBuilder construct() {
++    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
++    builder.addBolt(new GetTweeters(), 4);
++    builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
++    builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
++    builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
++    return builder;
++  }
++
++  public static void main(String[] args) throws Exception {
++    LinearDRPCTopologyBuilder builder = construct();
++
++
++    Config conf = new Config();
++
++    if (args == null || args.length == 0) {
++      conf.setMaxTaskParallelism(3);
++      LocalDRPC drpc = new LocalDRPC();
++      LocalCluster cluster = new LocalCluster();
++      cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
++
++      String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
++      for (String url : urlsToTry) {
++        System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
++      }
++
++      cluster.shutdown();
++      drpc.shutdown();
++    }
++    else {
++      conf.setNumWorkers(6);
++      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
index 0000000,0000000..9d0b3c8
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/RollingTopWords.java
@@@ -1,0 -1,0 +1,61 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.testing.TestWordSpout;
++import backtype.storm.topology.TopologyBuilder;
++import backtype.storm.tuple.Fields;
++import storm.starter.bolt.IntermediateRankingsBolt;
++import storm.starter.bolt.RollingCountBolt;
++import storm.starter.bolt.TotalRankingsBolt;
++import storm.starter.util.StormRunner;
++
++/**
++ * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
++ * The top N computation is done in a completely scalable way, and a similar approach could be used to compute things
++ * like trending topics or trending images on Twitter.
++ */
++public class RollingTopWords {
++
++  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
++  private static final int TOP_N = 5;
++
++  private final TopologyBuilder builder;
++  private final String topologyName;
++  private final Config topologyConfig;
++  private final int runtimeInSeconds;
++
++  public RollingTopWords() throws InterruptedException {
++    builder = new TopologyBuilder();
++    topologyName = "slidingWindowCounts";
++    topologyConfig = createTopologyConfiguration();
++    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
++
++    wireTopology();
++  }
++
++  private static Config createTopologyConfiguration() {
++    Config conf = new Config();
++    conf.setDebug(true);
++    return conf;
++  }
++
++  private void wireTopology() throws InterruptedException {
++    String spoutId = "wordGenerator";
++    String counterId = "counter";
++    String intermediateRankerId = "intermediateRanker";
++    String totalRankerId = "finalRanker";
++    builder.setSpout(spoutId, new TestWordSpout(), 5);
++    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
++    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
++        "obj"));
++    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
++  }
++
++  public void run() throws InterruptedException {
++    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
++  }
++
++  public static void main(String[] args) throws Exception {
++    new RollingTopWords().run();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
index 0000000,0000000..d323809
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/SingleJoinExample.java
@@@ -1,0 -1,0 +1,47 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.testing.FeederSpout;
++import backtype.storm.topology.TopologyBuilder;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++import backtype.storm.utils.Utils;
++import storm.starter.bolt.SingleJoinBolt;
++
++public class SingleJoinExample {
++  public static void main(String[] args) {
++    FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
++    FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
++
++    TopologyBuilder builder = new TopologyBuilder();
++    builder.setSpout("gender", genderSpout);
++    builder.setSpout("age", ageSpout);
++    builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
++        .fieldsGrouping("age", new Fields("id"));
++
++    Config conf = new Config();
++    conf.setDebug(true);
++
++    LocalCluster cluster = new LocalCluster();
++    cluster.submitTopology("join-example", conf, builder.createTopology());
++
++    for (int i = 0; i < 10; i++) {
++      String gender;
++      if (i % 2 == 0) {
++        gender = "male";
++      }
++      else {
++        gender = "female";
++      }
++      genderSpout.feed(new Values(i, gender));
++    }
++
++    for (int i = 9; i >= 0; i--) {
++      ageSpout.feed(new Values(i, i + 20));
++    }
++
++    Utils.sleep(2000);
++    cluster.shutdown();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
index 0000000,0000000..91b16aa
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/TransactionalGlobalCount.java
@@@ -1,0 -1,0 +1,156 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.coordination.BatchOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.testing.MemoryTransactionalSpout;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBatchBolt;
++import backtype.storm.topology.base.BaseTransactionalBolt;
++import backtype.storm.transactional.ICommitter;
++import backtype.storm.transactional.TransactionAttempt;
++import backtype.storm.transactional.TransactionalTopologyBuilder;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++
++import java.math.BigInteger;
++import java.util.ArrayList;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
++ * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This
++ * class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies
++ */
++public class TransactionalGlobalCount {
++  public static final int PARTITION_TAKE_PER_BATCH = 3;
++  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
++    put(0, new ArrayList<List<Object>>() {{
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("chicken"));
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("apple"));
++    }});
++    put(1, new ArrayList<List<Object>>() {{
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("apple"));
++      add(new Values("banana"));
++    }});
++    put(2, new ArrayList<List<Object>>() {{
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("dog"));
++      add(new Values("dog"));
++      add(new Values("dog"));
++    }});
++  }};
++
++  public static class Value {
++    int count = 0;
++    BigInteger txid;
++  }
++
++  public static Map<String, Value> DATABASE = new HashMap<String, Value>();
++  public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
++
++  public static class BatchCount extends BaseBatchBolt {
++    Object _id;
++    BatchOutputCollector _collector;
++
++    int _count = 0;
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
++      _collector = collector;
++      _id = id;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      _count++;
++    }
++
++    @Override
++    public void finishBatch() {
++      _collector.emit(new Values(_id, _count));
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "count"));
++    }
++  }
++
++  public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
++    TransactionAttempt _attempt;
++    BatchOutputCollector _collector;
++
++    int _sum = 0;
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
++      _collector = collector;
++      _attempt = attempt;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      _sum += tuple.getInteger(1);
++    }
++
++    @Override
++    public void finishBatch() {
++      Value val = DATABASE.get(GLOBAL_COUNT_KEY);
++      Value newval;
++      if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
++        newval = new Value();
++        newval.txid = _attempt.getTransactionId();
++        if (val == null) {
++          newval.count = _sum;
++        }
++        else {
++          newval.count = _sum + val.count;
++        }
++        DATABASE.put(GLOBAL_COUNT_KEY, newval);
++      }
++      else {
++        newval = val;
++      }
++      _collector.emit(new Values(_attempt, newval.count));
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "sum"));
++    }
++  }
++
++  public static void main(String[] args) throws Exception {
++    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
++    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
++    builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
++    builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
++
++    LocalCluster cluster = new LocalCluster();
++
++    Config config = new Config();
++    config.setDebug(true);
++    config.setMaxSpoutPending(3);
++
++    cluster.submitTopology("global-count-topology", config, builder.buildTopology());
++
++    Thread.sleep(3000);
++    cluster.shutdown();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
index 0000000,0000000..4ee7b12
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/TransactionalWords.java
@@@ -1,0 -1,0 +1,229 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.coordination.BatchOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.testing.MemoryTransactionalSpout;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.topology.base.BaseTransactionalBolt;
++import backtype.storm.transactional.ICommitter;
++import backtype.storm.transactional.TransactionAttempt;
++import backtype.storm.transactional.TransactionalTopologyBuilder;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++
++import java.math.BigInteger;
++import java.util.ArrayList;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++/**
++ * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
++ * stream of words and produces two outputs:
++ * <p/>
++ * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
++ * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
++ * <p/>
++ * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
++ * between buckets as their counts accumulate.
++ */
++public class TransactionalWords {
++  public static class CountValue {
++    Integer prev_count = null;
++    int count = 0;
++    BigInteger txid = null;
++  }
++
++  public static class BucketValue {
++    int count = 0;
++    BigInteger txid;
++  }
++
++  public static final int BUCKET_SIZE = 10;
++
++  public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
++  public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
++
++
++  public static final int PARTITION_TAKE_PER_BATCH = 3;
++
++  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
++    put(0, new ArrayList<List<Object>>() {{
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("chicken"));
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("apple"));
++    }});
++    put(1, new ArrayList<List<Object>>() {{
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("apple"));
++      add(new Values("banana"));
++    }});
++    put(2, new ArrayList<List<Object>>() {{
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("cat"));
++      add(new Values("dog"));
++      add(new Values("dog"));
++      add(new Values("dog"));
++      add(new Values("dog"));
++    }});
++  }};
++
++  public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
++    Map<String, Integer> _counts = new HashMap<String, Integer>();
++    BatchOutputCollector _collector;
++    TransactionAttempt _id;
++
++    int _count = 0;
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
++      _collector = collector;
++      _id = id;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      String key = tuple.getString(1);
++      Integer curr = _counts.get(key);
++      if (curr == null)
++        curr = 0;
++      _counts.put(key, curr + 1);
++    }
++
++    @Override
++    public void finishBatch() {
++      for (String key : _counts.keySet()) {
++        CountValue val = COUNT_DATABASE.get(key);
++        CountValue newVal;
++        if (val == null || !val.txid.equals(_id)) {
++          newVal = new CountValue();
++          newVal.txid = _id.getTransactionId();
++          if (val != null) {
++            newVal.prev_count = val.count;
++            newVal.count = val.count;
++          }
++          newVal.count = newVal.count + _counts.get(key);
++          COUNT_DATABASE.put(key, newVal);
++        }
++        else {
++          newVal = val;
++        }
++        _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
++      }
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "key", "count", "prev-count"));
++    }
++  }
++
++  public static class Bucketize extends BaseBasicBolt {
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector collector) {
++      TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
++      int curr = tuple.getInteger(2);
++      Integer prev = tuple.getInteger(3);
++
++      int currBucket = curr / BUCKET_SIZE;
++      Integer prevBucket = null;
++      if (prev != null) {
++        prevBucket = prev / BUCKET_SIZE;
++      }
++
++      if (prevBucket == null) {
++        collector.emit(new Values(attempt, currBucket, 1));
++      }
++      else if (currBucket != prevBucket) {
++        collector.emit(new Values(attempt, currBucket, 1));
++        collector.emit(new Values(attempt, prevBucket, -1));
++      }
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("attempt", "bucket", "delta"));
++    }
++  }
++
++  public static class BucketCountUpdater extends BaseTransactionalBolt {
++    Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
++    BatchOutputCollector _collector;
++    TransactionAttempt _attempt;
++
++    int _count = 0;
++
++    @Override
++    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
++      _collector = collector;
++      _attempt = attempt;
++    }
++
++    @Override
++    public void execute(Tuple tuple) {
++      Integer bucket = tuple.getInteger(1);
++      Integer delta = tuple.getInteger(2);
++      Integer curr = _accum.get(bucket);
++      if (curr == null)
++        curr = 0;
++      _accum.put(bucket, curr + delta);
++    }
++
++    @Override
++    public void finishBatch() {
++      for (Integer bucket : _accum.keySet()) {
++        BucketValue currVal = BUCKET_DATABASE.get(bucket);
++        BucketValue newVal;
++        if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
++          newVal = new BucketValue();
++          newVal.txid = _attempt.getTransactionId();
++          newVal.count = _accum.get(bucket);
++          if (currVal != null)
++            newVal.count += currVal.count;
++          BUCKET_DATABASE.put(bucket, newVal);
++        }
++        else {
++          newVal = currVal;
++        }
++        _collector.emit(new Values(_attempt, bucket, newVal.count));
++      }
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("id", "bucket", "count"));
++    }
++  }
++
++  public static void main(String[] args) throws Exception {
++    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
++    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
++    builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
++    builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
++    builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
++
++
++    LocalCluster cluster = new LocalCluster();
++
++    Config config = new Config();
++    config.setDebug(true);
++    config.setMaxSpoutPending(3);
++
++    cluster.submitTopology("top-n-topology", config, builder.buildTopology());
++
++    Thread.sleep(3000);
++    cluster.shutdown();
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
index 0000000,0000000..f26278c
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java
@@@ -1,0 -1,0 +1,90 @@@
++package storm.starter;
++
++import backtype.storm.Config;
++import backtype.storm.LocalCluster;
++import backtype.storm.StormSubmitter;
++import backtype.storm.task.ShellBolt;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.IRichBolt;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.TopologyBuilder;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import storm.starter.spout.RandomSentenceSpout;
++
++import java.util.HashMap;
++import java.util.Map;
++
++/**
++ * This topology demonstrates Storm's stream groupings and multilang capabilities.
++ */
++public class WordCountTopology {
++  public static class SplitSentence extends ShellBolt implements IRichBolt {
++
++    public SplitSentence() {
++      super("python", "splitsentence.py");
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("word"));
++    }
++
++    @Override
++    public Map<String, Object> getComponentConfiguration() {
++      return null;
++    }
++  }
++
++  public static class WordCount extends BaseBasicBolt {
++    Map<String, Integer> counts = new HashMap<String, Integer>();
++
++    @Override
++    public void execute(Tuple tuple, BasicOutputCollector collector) {
++      String word = tuple.getString(0);
++      Integer count = counts.get(word);
++      if (count == null)
++        count = 0;
++      count++;
++      counts.put(word, count);
++      collector.emit(new Values(word, count));
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++      declarer.declare(new Fields("word", "count"));
++    }
++  }
++
++  public static void main(String[] args) throws Exception {
++
++    TopologyBuilder builder = new TopologyBuilder();
++
++    builder.setSpout("spout", new RandomSentenceSpout(), 5);
++
++    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
++    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
++
++    Config conf = new Config();
++    conf.setDebug(true);
++
++
++    if (args != null && args.length > 0) {
++      conf.setNumWorkers(3);
++
++      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
++    }
++    else {
++      conf.setMaxTaskParallelism(3);
++
++      LocalCluster cluster = new LocalCluster();
++      cluster.submitTopology("word-count", conf, builder.createTopology());
++
++      Thread.sleep(10000);
++
++      cluster.shutdown();
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 0000000,0000000..07ac843
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@@ -1,0 -1,0 +1,93 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import org.apache.log4j.Logger;
++import storm.starter.tools.Rankings;
++import storm.starter.util.TupleHelpers;
++
++import java.util.HashMap;
++import java.util.Map;
++
++/**
++ * This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
++ * <p/>
++ * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
++ * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
++ * tuples are retrieved and counted.
++ */
++public abstract class AbstractRankerBolt extends BaseBasicBolt {
++
++  private static final long serialVersionUID = 4931640198501530202L;
++  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
++  private static final int DEFAULT_COUNT = 10;
++
++  private final int emitFrequencyInSeconds;
++  private final int count;
++  private final Rankings rankings;
++
++  public AbstractRankerBolt() {
++    this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
++  }
++
++  public AbstractRankerBolt(int topN) {
++    this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
++  }
++
++  public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
++    if (topN < 1) {
++      throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
++    }
++    if (emitFrequencyInSeconds < 1) {
++      throw new IllegalArgumentException(
++          "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
++    }
++    count = topN;
++    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
++    rankings = new Rankings(count);
++  }
++
++  protected Rankings getRankings() {
++    return rankings;
++  }
++
++  /**
++   * This method functions as a template method (design pattern).
++   */
++  @Override
++  public final void execute(Tuple tuple, BasicOutputCollector collector) {
++    if (TupleHelpers.isTickTuple(tuple)) {
++      getLogger().debug("Received tick tuple, triggering emit of current rankings");
++      emitRankings(collector);
++    }
++    else {
++      updateRankingsWithTuple(tuple);
++    }
++  }
++
++  abstract void updateRankingsWithTuple(Tuple tuple);
++
++  private void emitRankings(BasicOutputCollector collector) {
++    collector.emit(new Values(rankings.copy()));
++    getLogger().debug("Rankings: " + rankings);
++  }
++
++  @Override
++  public void declareOutputFields(OutputFieldsDeclarer declarer) {
++    declarer.declare(new Fields("rankings"));
++  }
++
++  @Override
++  public Map<String, Object> getComponentConfiguration() {
++    Map<String, Object> conf = new HashMap<String, Object>();
++    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
++    return conf;
++  }
++
++  abstract Logger getLogger();
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
index 0000000,0000000..6a57d47
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
@@@ -1,0 -1,0 +1,41 @@@
++package storm.starter.bolt;
++
++import backtype.storm.tuple.Tuple;
++import org.apache.log4j.Logger;
++import storm.starter.tools.Rankable;
++import storm.starter.tools.RankableObjectWithFields;
++
++/**
++ * This bolt ranks incoming objects by their count.
++ * <p/>
++ * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
++ * additionalField2, ..., additionalFieldN).
++ */
++public final class IntermediateRankingsBolt extends AbstractRankerBolt {
++
++  private static final long serialVersionUID = -1369800530256637409L;
++  private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
++
++  public IntermediateRankingsBolt() {
++    super();
++  }
++
++  public IntermediateRankingsBolt(int topN) {
++    super(topN);
++  }
++
++  public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
++    super(topN, emitFrequencyInSeconds);
++  }
++
++  @Override
++  void updateRankingsWithTuple(Tuple tuple) {
++    Rankable rankable = RankableObjectWithFields.from(tuple);
++    super.getRankings().updateWith(rankable);
++  }
++
++  @Override
++  Logger getLogger() {
++    return LOG;
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
index 0000000,0000000..8b2e62b
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/PrinterBolt.java
@@@ -1,0 -1,0 +1,20 @@@
++package storm.starter.bolt;
++
++import backtype.storm.topology.BasicOutputCollector;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseBasicBolt;
++import backtype.storm.tuple.Tuple;
++
++
++public class PrinterBolt extends BaseBasicBolt {
++
++  @Override
++  public void execute(Tuple tuple, BasicOutputCollector collector) {
++    System.out.println(tuple);
++  }
++
++  @Override
++  public void declareOutputFields(OutputFieldsDeclarer ofd) {
++  }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index 0000000,0000000..d547749
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@@ -1,0 -1,0 +1,125 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.tuple.Values;
++import org.apache.log4j.Logger;
++import storm.starter.tools.NthLastModifiedTimeTracker;
++import storm.starter.tools.SlidingWindowCounter;
++import storm.starter.util.TupleHelpers;
++
++import java.util.HashMap;
++import java.util.Map;
++import java.util.Map.Entry;
++
++/**
++ * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.
++ * <p/>
++ * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output
++ * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the
++ * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five
++ * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every
++ * minute.
++ * <p/>
++ * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the
++ * actual duration of the sliding window. The latter is included in case the expected sliding window length (as
++ * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual
++ * window length is tracked and calculated for the window, and not individually for each object within a window.
++ * <p/>
++ * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window
++ * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window
++ * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning
++ * during the first ~ five minutes of startup time if the window length is set to five minutes).
++ */
++public class RollingCountBolt extends BaseRichBolt {
++
++  private static final long serialVersionUID = 5537727428628598519L;
++  private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
++  private static final int NUM_WINDOW_CHUNKS = 5;
++  private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
++  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
++  private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
++      "Actual window length is %d seconds when it should be %d seconds"
++          + " (you can safely ignore this warning during the startup phase)";
++
++  private final SlidingWindowCounter<Object> counter;
++  private final int windowLengthInSeconds;
++  private final int emitFrequencyInSeconds;
++  private OutputCollector collector;
++  private NthLastModifiedTimeTracker lastModifiedTracker;
++
++  public RollingCountBolt() {
++    this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
++  }
++
++  public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
++    this.windowLengthInSeconds = windowLengthInSeconds;
++    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
++    counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
++        this.emitFrequencyInSeconds));
++  }
++
++  private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
++    return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
++  }
++
++  @SuppressWarnings("rawtypes")
++  @Override
++  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
++    this.collector = collector;
++    lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
++        this.emitFrequencyInSeconds));
++  }
++
++  @Override
++  public void execute(Tuple tuple) {
++    if (TupleHelpers.isTickTuple(tuple)) {
++      LOG.debug("Received tick tuple, triggering emit of current window counts");
++      emitCurrentWindowCounts();
++    }
++    else {
++      countObjAndAck(tuple);
++    }
++  }
++
++  private void emitCurrentWindowCounts() {
++    Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
++    int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
++    lastModifiedTracker.markAsModified();
++    if (actualWindowLengthInSeconds != windowLengthInSeconds) {
++      LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
++    }
++    emit(counts, actualWindowLengthInSeconds);
++  }
++
++  private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
++    for (Entry<Object, Long> entry : counts.entrySet()) {
++      Object obj = entry.getKey();
++      Long count = entry.getValue();
++      collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
++    }
++  }
++
++  private void countObjAndAck(Tuple tuple) {
++    Object obj = tuple.getValue(0);
++    counter.incrementCount(obj);
++    collector.ack(tuple);
++  }
++
++  @Override
++  public void declareOutputFields(OutputFieldsDeclarer declarer) {
++    declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
++  }
++
++  @Override
++  public Map<String, Object> getComponentConfiguration() {
++    Map<String, Object> conf = new HashMap<String, Object>();
++    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
++    return conf;
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
index 0000000,0000000..e12f005
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/SingleJoinBolt.java
@@@ -1,0 -1,0 +1,97 @@@
++package storm.starter.bolt;
++
++import backtype.storm.Config;
++import backtype.storm.generated.GlobalStreamId;
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichBolt;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Tuple;
++import backtype.storm.utils.TimeCacheMap;
++
++import java.util.*;
++
++public class SingleJoinBolt extends BaseRichBolt {
++  OutputCollector _collector;
++  Fields _idFields;
++  Fields _outFields;
++  int _numSources;
++  TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>> _pending;
++  Map<String, GlobalStreamId> _fieldLocations;
++
++  public SingleJoinBolt(Fields outFields) {
++    _outFields = outFields;
++  }
++
++  @Override
++  public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
++    _fieldLocations = new HashMap<String, GlobalStreamId>();
++    _collector = collector;
++    int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
++    _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
++    _numSources = context.getThisSources().size();
++    Set<String> idFields = null;
++    for (GlobalStreamId source : context.getThisSources().keySet()) {
++      Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
++      Set<String> setFields = new HashSet<String>(fields.toList());
++      if (idFields == null)
++        idFields = setFields;
++      else
++        idFields.retainAll(setFields);
++
++      for (String outfield : _outFields) {
++        for (String sourcefield : fields) {
++          if (outfield.equals(sourcefield)) {
++            _fieldLocations.put(outfield, source);
++          }
++        }
++      }
++    }
++    _idFields = new Fields(new ArrayList<String>(idFields));
++
++    if (_fieldLocations.size() != _outFields.size()) {
++      throw new RuntimeException("Cannot find all outfields among sources");
++    }
++  }
++
++  @Override
++  public void execute(Tuple tuple) {
++    List<Object> id = tuple.select(_idFields);
++    GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
++    if (!_pending.containsKey(id)) {
++      _pending.put(id, new HashMap<GlobalStreamId, Tuple>());
++    }
++    Map<GlobalStreamId, Tuple> parts = _pending.get(id);
++    if (parts.containsKey(streamId))
++      throw new RuntimeException("Received same side of single join twice");
++    parts.put(streamId, tuple);
++    if (parts.size() == _numSources) {
++      _pending.remove(id);
++      List<Object> joinResult = new ArrayList<Object>();
++      for (String outField : _outFields) {
++        GlobalStreamId loc = _fieldLocations.get(outField);
++        joinResult.add(parts.get(loc).getValueByField(outField));
++      }
++      _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
++
++      for (Tuple part : parts.values()) {
++        _collector.ack(part);
++      }
++    }
++  }
++
++  @Override
++  public void declareOutputFields(OutputFieldsDeclarer declarer) {
++    declarer.declare(_outFields);
++  }
++
++  private class ExpireCallback implements TimeCacheMap.ExpiredCallback<List<Object>, Map<GlobalStreamId, Tuple>> {
++    @Override
++    public void expire(List<Object> id, Map<GlobalStreamId, Tuple> tuples) {
++      for (Tuple tuple : tuples.values()) {
++        _collector.fail(tuple);
++      }
++    }
++  }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/f1d7fca7/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
----------------------------------------------------------------------
diff --cc examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
index 0000000,0000000..217443e
new file mode 100644
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/TotalRankingsBolt.java
@@@ -1,0 -1,0 +1,42 @@@
++package storm.starter.bolt;
++
++import backtype.storm.tuple.Tuple;
++import org.apache.log4j.Logger;
++import storm.starter.tools.Rankings;
++
++/**
++ * This bolt merges incoming {@link Rankings}.
++ * <p/>
++ * It can be used to merge intermediate rankings generated by {@link IntermediateRankingsBolt} into a final,
++ * consolidated ranking. To do so, configure this bolt with a globalGrouping on {@link IntermediateRankingsBolt}.
++ */
++public final class TotalRankingsBolt extends AbstractRankerBolt {
++
++  private static final long serialVersionUID = -8447525895532302198L;
++  private static final Logger LOG = Logger.getLogger(TotalRankingsBolt.class);
++
++  public TotalRankingsBolt() {
++    super();
++  }
++
++  public TotalRankingsBolt(int topN) {
++    super(topN);
++  }
++
++  public TotalRankingsBolt(int topN, int emitFrequencyInSeconds) {
++    super(topN, emitFrequencyInSeconds);
++  }
++
++  @Override
++  void updateRankingsWithTuple(Tuple tuple) {
++    Rankings rankingsToBeMerged = (Rankings) tuple.getValue(0);
++    super.getRankings().updateWith(rankingsToBeMerged);
++    super.getRankings().pruneZeroCounts();
++  }
++
++  @Override
++  Logger getLogger() {
++    return LOG;
++  }
++
++}


Mime
View raw message