storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [23/50] [abbrv] git commit: Reformat Java code to use 2 instead of 4 spaces (to match Clojure style)
Date Thu, 20 Mar 2014 21:22:46 GMT
Reformat Java code to use 2 instead of 4 spaces (to match Clojure style)


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a51c8247
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a51c8247
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a51c8247

Branch: refs/heads/master
Commit: a51c8247e3f358788efc8fb3ed7104b7b7cb21c4
Parents: 3b2db7c
Author: Michael G. Noll <mnoll@verisign.com>
Authored: Thu Aug 29 12:39:14 2013 +0200
Committer: Michael G. Noll <mnoll@verisign.com>
Committed: Thu Aug 29 12:39:14 2013 +0200

----------------------------------------------------------------------
 src/jvm/storm/starter/BasicDRPCTopology.java    |  84 ++--
 src/jvm/storm/starter/ExclamationTopology.java  |  82 ++--
 src/jvm/storm/starter/ManualDRPC.java           |  64 ++-
 src/jvm/storm/starter/ReachTopology.java        | 297 ++++++------
 src/jvm/storm/starter/RollingTopWords.java      |  92 ++--
 src/jvm/storm/starter/SingleJoinExample.java    |  66 +--
 .../storm/starter/TransactionalGlobalCount.java | 237 +++++-----
 src/jvm/storm/starter/TransactionalWords.java   | 369 +++++++--------
 src/jvm/storm/starter/WordCountTopology.java    | 132 +++---
 .../storm/starter/bolt/AbstractRankerBolt.java  | 126 +++---
 .../starter/bolt/IntermediateRankingsBolt.java  |  46 +-
 src/jvm/storm/starter/bolt/PrinterBolt.java     |  16 +-
 .../storm/starter/bolt/RollingCountBolt.java    | 183 ++++----
 src/jvm/storm/starter/bolt/SingleJoinBolt.java  | 153 ++++---
 .../storm/starter/bolt/TotalRankingsBolt.java   |  46 +-
 .../starter/spout/RandomSentenceSpout.java      |  69 ++-
 .../tools/NthLastModifiedTimeTracker.java       |  64 ++-
 src/jvm/storm/starter/tools/Rankable.java       |   4 +-
 .../starter/tools/RankableObjectWithFields.java | 189 ++++----
 src/jvm/storm/starter/tools/Rankings.java       | 142 +++---
 .../starter/tools/SlidingWindowCounter.java     | 111 +++--
 .../storm/starter/tools/SlotBasedCounter.java   | 138 +++---
 src/jvm/storm/starter/trident/TridentReach.java | 199 ++++----
 .../storm/starter/trident/TridentWordCount.java |  98 ++--
 src/jvm/storm/starter/util/StormRunner.java     |  22 +-
 src/jvm/storm/starter/util/TupleHelpers.java    |  12 +-
 .../bolt/IntermediateRankingsBoltTest.java      | 238 +++++-----
 .../starter/bolt/RollingCountBoltTest.java      | 171 ++++---
 .../starter/bolt/TotalRankingsBoltTest.java     | 241 +++++-----
 .../storm/starter/tools/MockTupleHelpers.java   |  26 +-
 .../tools/NthLastModifiedTimeTrackerTest.java   | 194 ++++----
 .../tools/RankableObjectWithFieldsTest.java     | 419 +++++++++--------
 test/jvm/storm/starter/tools/RankingsTest.java  | 453 +++++++++----------
 .../starter/tools/SlidingWindowCounterTest.java | 160 ++++---
 .../starter/tools/SlotBasedCounterTest.java     | 304 ++++++-------
 35 files changed, 2574 insertions(+), 2673 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/BasicDRPCTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/BasicDRPCTopology.java b/src/jvm/storm/starter/BasicDRPCTopology.java
index 1ea8e77..7e7ef93 100644
--- a/src/jvm/storm/starter/BasicDRPCTopology.java
+++ b/src/jvm/storm/starter/BasicDRPCTopology.java
@@ -5,59 +5,57 @@ import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
 import backtype.storm.StormSubmitter;
 import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
-import java.util.Map;
 
 /**
- * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function
- * that appends a "!" to any string you send the DRPC function.
- * 
- * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on 
- * doing distributed RPC on top of Storm.
+ * This topology is a basic example of doing distributed RPC on top of Storm. It implements a function that appends a
+ * "!" to any string you send the DRPC function.
+ * <p/>
+ * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on doing distributed RPC on top of
+ * Storm.
  */
 public class BasicDRPCTopology {
-    public static class ExclaimBolt extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String input = tuple.getString(1);
-            collector.emit(new Values(tuple.getValue(0), input + "!"));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "result"));
-        }
-    
+  public static class ExclaimBolt extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String input = tuple.getString(1);
+      collector.emit(new Values(tuple.getValue(0), input + "!"));
     }
-    
-    public static void main(String[] args) throws Exception {
-        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
-        builder.addBolt(new ExclaimBolt(), 3);
-        
-        Config conf = new Config();
-        
-        if(args==null || args.length==0) {
-            LocalDRPC drpc = new LocalDRPC();
-            LocalCluster cluster = new LocalCluster();
-            
-            cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
-
-            for(String word: new String[] {"hello", "goodbye"}) {
-                System.out.println("Result for \"" + word + "\": "
-                        + drpc.execute("exclamation", word));
-            }
-            
-            cluster.shutdown();
-            drpc.shutdown();
-        } else {
-            conf.setNumWorkers(3);
-            StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
-        }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "result"));
+    }
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
+    builder.addBolt(new ExclaimBolt(), 3);
+
+    Config conf = new Config();
+
+    if (args == null || args.length == 0) {
+      LocalDRPC drpc = new LocalDRPC();
+      LocalCluster cluster = new LocalCluster();
+
+      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
+
+      for (String word : new String[]{ "hello", "goodbye" }) {
+        System.out.println("Result for \"" + word + "\": " + drpc.execute("exclamation", word));
+      }
+
+      cluster.shutdown();
+      drpc.shutdown();
+    }
+    else {
+      conf.setNumWorkers(3);
+      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/ExclamationTopology.java b/src/jvm/storm/starter/ExclamationTopology.java
index b98c8cd..fed8b1e 100644
--- a/src/jvm/storm/starter/ExclamationTopology.java
+++ b/src/jvm/storm/starter/ExclamationTopology.java
@@ -13,58 +13,58 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.Utils;
+
 import java.util.Map;
 
 /**
  * This is a basic example of a Storm topology.
  */
 public class ExclamationTopology {
-    
-    public static class ExclamationBolt extends BaseRichBolt {
-        OutputCollector _collector;
 
-        @Override
-        public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-            _collector = collector;
-        }
+  public static class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+      _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+      _collector.ack(tuple);
+    }
 
-        @Override
-        public void execute(Tuple tuple) {
-            _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
-            _collector.ack(tuple);
-        }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
 
+  }
 
+  public static void main(String[] args) throws Exception {
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.setSpout("word", new TestWordSpout(), 10);
+    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
     }
-    
-    public static void main(String[] args) throws Exception {
-        TopologyBuilder builder = new TopologyBuilder();
-        
-        builder.setSpout("word", new TestWordSpout(), 10);        
-        builder.setBolt("exclaim1", new ExclamationBolt(), 3)
-                .shuffleGrouping("word");
-        builder.setBolt("exclaim2", new ExclamationBolt(), 2)
-                .shuffleGrouping("exclaim1");
-                
-        Config conf = new Config();
-        conf.setDebug(true);
-        
-        if(args!=null && args.length > 0) {
-            conf.setNumWorkers(3);
-            
-            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
-        } else {
-        
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("test", conf, builder.createTopology());
-            Utils.sleep(10000);
-            cluster.killTopology("test");
-            cluster.shutdown();    
-        }
+    else {
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("test", conf, builder.createTopology());
+      Utils.sleep(10000);
+      cluster.killTopology("test");
+      cluster.shutdown();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/ManualDRPC.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/ManualDRPC.java b/src/jvm/storm/starter/ManualDRPC.java
index fd45c9c..ade4ab1 100644
--- a/src/jvm/storm/starter/ManualDRPC.java
+++ b/src/jvm/storm/starter/ManualDRPC.java
@@ -15,39 +15,37 @@ import backtype.storm.tuple.Values;
 
 
 public class ManualDRPC {
-    public static class ExclamationBolt extends BaseBasicBolt {
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("result", "return-info"));
-        }
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String arg = tuple.getString(0);
-            Object retInfo = tuple.getValue(1);
-            collector.emit(new Values(arg + "!!!", retInfo));
-        }
-        
+  public static class ExclamationBolt extends BaseBasicBolt {
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("result", "return-info"));
     }
-    
-    public static void main(String[] args) {
-        TopologyBuilder builder = new TopologyBuilder();
-        LocalDRPC drpc = new LocalDRPC();
-        
-        DRPCSpout spout = new DRPCSpout("exclamation", drpc);
-        builder.setSpout("drpc", spout);
-        builder.setBolt("exclaim", new ExclamationBolt(), 3)
-                .shuffleGrouping("drpc");
-        builder.setBolt("return", new ReturnResults(), 3)
-                .shuffleGrouping("exclaim");
-        
-        LocalCluster cluster = new LocalCluster();
-        Config conf = new Config();
-        cluster.submitTopology("exclaim", conf, builder.createTopology());
-        
-        System.out.println(drpc.execute("exclamation", "aaa"));
-        System.out.println(drpc.execute("exclamation", "bbb"));
-        
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String arg = tuple.getString(0);
+      Object retInfo = tuple.getValue(1);
+      collector.emit(new Values(arg + "!!!", retInfo));
     }
+
+  }
+
+  public static void main(String[] args) {
+    TopologyBuilder builder = new TopologyBuilder();
+    LocalDRPC drpc = new LocalDRPC();
+
+    DRPCSpout spout = new DRPCSpout("exclamation", drpc);
+    builder.setSpout("drpc", spout);
+    builder.setBolt("exclaim", new ExclamationBolt(), 3).shuffleGrouping("drpc");
+    builder.setBolt("return", new ReturnResults(), 3).shuffleGrouping("exclaim");
+
+    LocalCluster cluster = new LocalCluster();
+    Config conf = new Config();
+    cluster.submitTopology("exclaim", conf, builder.createTopology());
+
+    System.out.println(drpc.execute("exclamation", "aaa"));
+    System.out.println(drpc.execute("exclamation", "bbb"));
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/ReachTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/ReachTopology.java b/src/jvm/storm/starter/ReachTopology.java
index 8a7fdbc..a63db3e 100644
--- a/src/jvm/storm/starter/ReachTopology.java
+++ b/src/jvm/storm/starter/ReachTopology.java
@@ -5,186 +5,175 @@ import backtype.storm.LocalCluster;
 import backtype.storm.LocalDRPC;
 import backtype.storm.StormSubmitter;
 import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
 import backtype.storm.drpc.LinearDRPCTopologyBuilder;
-import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.BasicOutputCollector;
-import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.topology.base.BaseBatchBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+
+import java.util.*;
 
 /**
- * This is a good example of doing complex Distributed RPC on top of Storm. This 
- * program creates a topology that can compute the reach for any URL on Twitter
- * in realtime by parallelizing the whole computation. 
- * 
- * Reach is the number of unique people exposed to a URL on Twitter. To compute reach,
- * you have to get all the people who tweeted the URL, get all the followers of all those people,
- * unique that set of followers, and then count the unique set. It's an intense computation 
- * that can involve thousands of database calls and tens of millions of follower records.
- * 
- * This Storm topology does every piece of that computation in parallel, turning what would be a 
- * computation that takes minutes on a single machine into one that takes just a couple seconds.
- * 
- * For the purposes of demonstration, this topology replaces the use of actual DBs with
- * in-memory hashmaps. 
- * 
+ * This is a good example of doing complex Distributed RPC on top of Storm. This program creates a topology that can
+ * compute the reach for any URL on Twitter in realtime by parallelizing the whole computation.
+ * <p/>
+ * Reach is the number of unique people exposed to a URL on Twitter. To compute reach, you have to get all the people
+ * who tweeted the URL, get all the followers of all those people, unique that set of followers, and then count the
+ * unique set. It's an intense computation that can involve thousands of database calls and tens of millions of follower
+ * records.
+ * <p/>
+ * This Storm topology does every piece of that computation in parallel, turning what would be a computation that takes
+ * minutes on a single machine into one that takes just a couple seconds.
+ * <p/>
+ * For the purposes of demonstration, this topology replaces the use of actual DBs with in-memory hashmaps.
+ * <p/>
  * See https://github.com/nathanmarz/storm/wiki/Distributed-RPC for more information on Distributed RPC.
  */
 public class ReachTopology {
-    public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
-       put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan")); 
-       put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan")); 
-       put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john")); 
-    }};
-    
-    public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
-        put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
-        put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
-        put("tim", Arrays.asList("alex"));
-        put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
-        put("adam", Arrays.asList("david", "carissa"));
-        put("mike", Arrays.asList("john", "bob"));
-        put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
-    }};
-    
-    public static class GetTweeters extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            Object id = tuple.getValue(0);
-            String url = tuple.getString(1);
-            List<String> tweeters = TWEETERS_DB.get(url);
-            if(tweeters!=null) {
-                for(String tweeter: tweeters) {
-                    collector.emit(new Values(id, tweeter));
-                }
-            }
+  public static Map<String, List<String>> TWEETERS_DB = new HashMap<String, List<String>>() {{
+    put("foo.com/blog/1", Arrays.asList("sally", "bob", "tim", "george", "nathan"));
+    put("engineering.twitter.com/blog/5", Arrays.asList("adam", "david", "sally", "nathan"));
+    put("tech.backtype.com/blog/123", Arrays.asList("tim", "mike", "john"));
+  }};
+
+  public static Map<String, List<String>> FOLLOWERS_DB = new HashMap<String, List<String>>() {{
+    put("sally", Arrays.asList("bob", "tim", "alice", "adam", "jim", "chris", "jai"));
+    put("bob", Arrays.asList("sally", "nathan", "jim", "mary", "david", "vivian"));
+    put("tim", Arrays.asList("alex"));
+    put("nathan", Arrays.asList("sally", "bob", "adam", "harry", "chris", "vivian", "emily", "jordan"));
+    put("adam", Arrays.asList("david", "carissa"));
+    put("mike", Arrays.asList("john", "bob"));
+    put("john", Arrays.asList("alice", "nathan", "jim", "mike", "bob"));
+  }};
+
+  public static class GetTweeters extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      Object id = tuple.getValue(0);
+      String url = tuple.getString(1);
+      List<String> tweeters = TWEETERS_DB.get(url);
+      if (tweeters != null) {
+        for (String tweeter : tweeters) {
+          collector.emit(new Values(id, tweeter));
         }
+      }
+    }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "tweeter"));
-        }        
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "tweeter"));
     }
-    
-    public static class GetFollowers extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            Object id = tuple.getValue(0);
-            String tweeter = tuple.getString(1);
-            List<String> followers = FOLLOWERS_DB.get(tweeter);
-            if(followers!=null) {
-                for(String follower: followers) {
-                    collector.emit(new Values(id, follower));
-                }
-            }
-        }
+  }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "follower"));
+  public static class GetFollowers extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      Object id = tuple.getValue(0);
+      String tweeter = tuple.getString(1);
+      List<String> followers = FOLLOWERS_DB.get(tweeter);
+      if (followers != null) {
+        for (String follower : followers) {
+          collector.emit(new Values(id, follower));
         }
+      }
     }
-    
-    public static class PartialUniquer extends BaseBatchBolt {
-        BatchOutputCollector _collector;
-        Object _id;
-        Set<String> _followers = new HashSet<String>();
-        
-        @Override
-        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-            _collector = collector;
-            _id = id;
-        }
 
-        @Override
-        public void execute(Tuple tuple) {
-            _followers.add(tuple.getString(1));
-        }
-        
-        @Override
-        public void finishBatch() {
-            _collector.emit(new Values(_id, _followers.size()));
-        }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "follower"));
+    }
+  }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "partial-count"));
-        }
+  public static class PartialUniquer extends BaseBatchBolt {
+    BatchOutputCollector _collector;
+    Object _id;
+    Set<String> _followers = new HashSet<String>();
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+      _collector = collector;
+      _id = id;
     }
-    
-    public static class CountAggregator extends BaseBatchBolt {
-        BatchOutputCollector _collector;
-        Object _id;
-        int _count = 0;
-        
-        @Override
-        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-            _collector = collector;
-            _id = id;
-        }
 
-        @Override
-        public void execute(Tuple tuple) {
-            _count+=tuple.getInteger(1);
-        }
-        
-        @Override
-        public void finishBatch() {
-            _collector.emit(new Values(_id, _count));
-        }
+    @Override
+    public void execute(Tuple tuple) {
+      _followers.add(tuple.getString(1));
+    }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "reach"));
-        }
+    @Override
+    public void finishBatch() {
+      _collector.emit(new Values(_id, _followers.size()));
     }
-    
-    public static LinearDRPCTopologyBuilder construct() {
-        LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
-        builder.addBolt(new GetTweeters(), 4);
-        builder.addBolt(new GetFollowers(), 12)
-                 .shuffleGrouping();
-        builder.addBolt(new PartialUniquer(), 6)
-                 .fieldsGrouping(new Fields("id", "follower"));
-        builder.addBolt(new CountAggregator(), 3)
-                 .fieldsGrouping(new Fields("id")); 
-        return builder;
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "partial-count"));
     }
-    
-    public static void main(String[] args) throws Exception {
-        LinearDRPCTopologyBuilder builder = construct();
-        
-        
-        Config conf = new Config();
-        
-        if(args==null || args.length==0) {
-            conf.setMaxTaskParallelism(3);
-            LocalDRPC drpc = new LocalDRPC();
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
-            
-            String[] urlsToTry = new String[] { "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com"};
-            for(String url: urlsToTry) {
-                System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
-            }
-            
-            cluster.shutdown();
-            drpc.shutdown();
-        } else {
-            conf.setNumWorkers(6);
-            StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
-        }
+  }
+
+  public static class CountAggregator extends BaseBatchBolt {
+    BatchOutputCollector _collector;
+    Object _id;
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+      _collector = collector;
+      _id = id;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _count += tuple.getInteger(1);
+    }
+
+    @Override
+    public void finishBatch() {
+      _collector.emit(new Values(_id, _count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "reach"));
+    }
+  }
+
+  public static LinearDRPCTopologyBuilder construct() {
+    LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
+    builder.addBolt(new GetTweeters(), 4);
+    builder.addBolt(new GetFollowers(), 12).shuffleGrouping();
+    builder.addBolt(new PartialUniquer(), 6).fieldsGrouping(new Fields("id", "follower"));
+    builder.addBolt(new CountAggregator(), 3).fieldsGrouping(new Fields("id"));
+    return builder;
+  }
+
+  public static void main(String[] args) throws Exception {
+    LinearDRPCTopologyBuilder builder = construct();
+
+
+    Config conf = new Config();
+
+    if (args == null || args.length == 0) {
+      conf.setMaxTaskParallelism(3);
+      LocalDRPC drpc = new LocalDRPC();
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("reach-drpc", conf, builder.createLocalTopology(drpc));
+
+      String[] urlsToTry = new String[]{ "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com" };
+      for (String url : urlsToTry) {
+        System.out.println("Reach of " + url + ": " + drpc.execute("reach", url));
+      }
+
+      cluster.shutdown();
+      drpc.shutdown();
+    }
+    else {
+      conf.setNumWorkers(6);
+      StormSubmitter.submitTopology(args[0], conf, builder.createRemoteTopology());
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/RollingTopWords.java b/src/jvm/storm/starter/RollingTopWords.java
index ef016a4..9d0b3c8 100644
--- a/src/jvm/storm/starter/RollingTopWords.java
+++ b/src/jvm/storm/starter/RollingTopWords.java
@@ -1,13 +1,13 @@
 package storm.starter;
 
-import storm.starter.bolt.IntermediateRankingsBolt;
-import storm.starter.bolt.RollingCountBolt;
-import storm.starter.bolt.TotalRankingsBolt;
-import storm.starter.util.StormRunner;
 import backtype.storm.Config;
 import backtype.storm.testing.TestWordSpout;
 import backtype.storm.topology.TopologyBuilder;
 import backtype.storm.tuple.Fields;
+import storm.starter.bolt.IntermediateRankingsBolt;
+import storm.starter.bolt.RollingCountBolt;
+import storm.starter.bolt.TotalRankingsBolt;
+import storm.starter.util.StormRunner;
 
 /**
  * This topology does a continuous computation of the top N words that the topology has seen in terms of cardinality.
@@ -16,46 +16,46 @@ import backtype.storm.tuple.Fields;
  */
 public class RollingTopWords {
 
-    private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
-    private static final int TOP_N = 5;
-
-    private final TopologyBuilder builder;
-    private final String topologyName;
-    private final Config topologyConfig;
-    private final int runtimeInSeconds;
-
-    public RollingTopWords() throws InterruptedException {
-        builder = new TopologyBuilder();
-        topologyName = "slidingWindowCounts";
-        topologyConfig = createTopologyConfiguration();
-        runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
-        wireTopology();
-    }
-
-    private static Config createTopologyConfiguration() {
-        Config conf = new Config();
-        conf.setDebug(true);
-        return conf;
-    }
-
-    private void wireTopology() throws InterruptedException {
-        String spoutId = "wordGenerator";
-        String counterId = "counter";
-        String intermediateRankerId = "intermediateRanker";
-        String totalRankerId = "finalRanker";
-        builder.setSpout(spoutId, new TestWordSpout(), 5);
-        builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
-        builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId,
-            new Fields("obj"));
-        builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-    }
-
-    public void run() throws InterruptedException {
-        StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
-    }
-
-    public static void main(String[] args) throws Exception {
-        new RollingTopWords().run();
-    }
+  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+  private static final int TOP_N = 5;
+
+  private final TopologyBuilder builder;
+  private final String topologyName;
+  private final Config topologyConfig;
+  private final int runtimeInSeconds;
+
+  public RollingTopWords() throws InterruptedException {
+    builder = new TopologyBuilder();
+    topologyName = "slidingWindowCounts";
+    topologyConfig = createTopologyConfiguration();
+    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
+
+    wireTopology();
+  }
+
+  private static Config createTopologyConfiguration() {
+    Config conf = new Config();
+    conf.setDebug(true);
+    return conf;
+  }
+
+  private void wireTopology() throws InterruptedException {
+    String spoutId = "wordGenerator";
+    String counterId = "counter";
+    String intermediateRankerId = "intermediateRanker";
+    String totalRankerId = "finalRanker";
+    builder.setSpout(spoutId, new TestWordSpout(), 5);
+    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new Fields("word"));
+    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId, new Fields(
+        "obj"));
+    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
+  }
+
+  public void run() throws InterruptedException {
+    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig, runtimeInSeconds);
+  }
+
+  public static void main(String[] args) throws Exception {
+    new RollingTopWords().run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/SingleJoinExample.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/SingleJoinExample.java b/src/jvm/storm/starter/SingleJoinExample.java
index 62d332e..d323809 100644
--- a/src/jvm/storm/starter/SingleJoinExample.java
+++ b/src/jvm/storm/starter/SingleJoinExample.java
@@ -10,38 +10,38 @@ import backtype.storm.utils.Utils;
 import storm.starter.bolt.SingleJoinBolt;
 
 public class SingleJoinExample {
-    public static void main(String[] args) {
-        FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
-        FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
-        
-        TopologyBuilder builder = new TopologyBuilder();
-        builder.setSpout("gender", genderSpout);
-        builder.setSpout("age", ageSpout);
-        builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
-                .fieldsGrouping("gender", new Fields("id"))
-                .fieldsGrouping("age", new Fields("id"));
-        
-        Config conf = new Config();
-        conf.setDebug(true);
-        
-        LocalCluster cluster = new LocalCluster();
-        cluster.submitTopology("join-example", conf, builder.createTopology());
-        
-        for(int i=0; i<10; i++) {
-            String gender;
-            if(i % 2 == 0) {
-                gender = "male";
-            } else {
-                gender = "female";
-            }
-            genderSpout.feed(new Values(i, gender));
-        }
-        
-        for(int i=9; i>=0; i--) {            
-            ageSpout.feed(new Values(i, i+20));
-        }
-        
-        Utils.sleep(2000);
-        cluster.shutdown();
+  public static void main(String[] args) {
+    FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
+    FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
+
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout("gender", genderSpout);
+    builder.setSpout("age", ageSpout);
+    builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age"))).fieldsGrouping("gender", new Fields("id"))
+        .fieldsGrouping("age", new Fields("id"));
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    LocalCluster cluster = new LocalCluster();
+    cluster.submitTopology("join-example", conf, builder.createTopology());
+
+    for (int i = 0; i < 10; i++) {
+      String gender;
+      if (i % 2 == 0) {
+        gender = "male";
+      }
+      else {
+        gender = "female";
+      }
+      genderSpout.feed(new Values(i, gender));
     }
+
+    for (int i = 9; i >= 0; i--) {
+      ageSpout.feed(new Values(i, i + 20));
+    }
+
+    Utils.sleep(2000);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/TransactionalGlobalCount.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/TransactionalGlobalCount.java b/src/jvm/storm/starter/TransactionalGlobalCount.java
index b4579a4..91b16aa 100644
--- a/src/jvm/storm/starter/TransactionalGlobalCount.java
+++ b/src/jvm/storm/starter/TransactionalGlobalCount.java
@@ -14,6 +14,7 @@ import backtype.storm.transactional.TransactionalTopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -21,135 +22,135 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far 
- * in a database. The source of data and the databases are mocked out as in memory maps for demonstration
- * purposes. This class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies
+ * This is a basic example of a transactional topology. It keeps a count of the number of tuples seen so far in a
+ * database. The source of data and the databases are mocked out as in memory maps for demonstration purposes. This
+ * class is defined in depth on the wiki at https://github.com/nathanmarz/storm/wiki/Transactional-topologies
  */
 public class TransactionalGlobalCount {
-    public static final int PARTITION_TAKE_PER_BATCH = 3;
-    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-        put(0, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("chicken"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-        }});
-        put(1, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-            add(new Values("banana"));
-        }});
-        put(2, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-        }});
-    }};
-    
-    public static class Value {
-        int count = 0;
-        BigInteger txid;
+  public static final int PARTITION_TAKE_PER_BATCH = 3;
+  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+    put(0, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("chicken"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+    }});
+    put(1, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+      add(new Values("banana"));
+    }});
+    put(2, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+    }});
+  }};
+
+  public static class Value {
+    int count = 0;
+    BigInteger txid;
+  }
+
+  public static Map<String, Value> DATABASE = new HashMap<String, Value>();
+  public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
+
+  public static class BatchCount extends BaseBatchBolt {
+    Object _id;
+    BatchOutputCollector _collector;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
+      _collector = collector;
+      _id = id;
     }
 
-    public static Map<String, Value> DATABASE = new HashMap<String, Value>();
-    public static final String GLOBAL_COUNT_KEY = "GLOBAL-COUNT";
-        
-    public static class BatchCount extends BaseBatchBolt {
-        Object _id;
-        BatchOutputCollector _collector;
-        
-        int _count = 0;
-
-        @Override
-        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
-            _collector = collector;
-            _id = id;
-        }
+    @Override
+    public void execute(Tuple tuple) {
+      _count++;
+    }
 
-        @Override
-        public void execute(Tuple tuple) {
-            _count++;
-        }
+    @Override
+    public void finishBatch() {
+      _collector.emit(new Values(_id, _count));
+    }
 
-        @Override
-        public void finishBatch() {
-            _collector.emit(new Values(_id, _count));
-        }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "count"));
+    }
+  }
+
+  public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
+    TransactionAttempt _attempt;
+    BatchOutputCollector _collector;
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "count"));
-        }        
+    int _sum = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+      _collector = collector;
+      _attempt = attempt;
     }
-    
-    public static class UpdateGlobalCount extends BaseTransactionalBolt implements ICommitter {
-        TransactionAttempt _attempt;
-        BatchOutputCollector _collector;
-        
-        int _sum = 0;
-
-        @Override
-        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-            _collector = collector;
-            _attempt = attempt;
-        }
 
-        @Override
-        public void execute(Tuple tuple) {
-            _sum+=tuple.getInteger(1);
-        }
+    @Override
+    public void execute(Tuple tuple) {
+      _sum += tuple.getInteger(1);
+    }
 
-        @Override
-        public void finishBatch() {
-            Value val = DATABASE.get(GLOBAL_COUNT_KEY);
-            Value newval;
-            if(val == null || !val.txid.equals(_attempt.getTransactionId())) {
-                newval = new Value();
-                newval.txid = _attempt.getTransactionId();
-                if(val==null) {
-                    newval.count = _sum;
-                } else {
-                    newval.count = _sum + val.count;
-                }
-                DATABASE.put(GLOBAL_COUNT_KEY, newval);
-            } else {
-                newval = val;
-            }
-            _collector.emit(new Values(_attempt, newval.count));
+    @Override
+    public void finishBatch() {
+      Value val = DATABASE.get(GLOBAL_COUNT_KEY);
+      Value newval;
+      if (val == null || !val.txid.equals(_attempt.getTransactionId())) {
+        newval = new Value();
+        newval.txid = _attempt.getTransactionId();
+        if (val == null) {
+          newval.count = _sum;
         }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "sum"));
-        }        
+        else {
+          newval.count = _sum + val.count;
+        }
+        DATABASE.put(GLOBAL_COUNT_KEY, newval);
+      }
+      else {
+        newval = val;
+      }
+      _collector.emit(new Values(_attempt, newval.count));
     }
-    
-    public static void main(String[] args) throws Exception {
-        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
-        builder.setBolt("partial-count", new BatchCount(), 5)
-                .noneGrouping("spout");
-        builder.setBolt("sum", new UpdateGlobalCount())
-                .globalGrouping("partial-count");
-        
-        LocalCluster cluster = new LocalCluster();
-        
-        Config config = new Config();
-        config.setDebug(true);
-        config.setMaxSpoutPending(3);
-        
-        cluster.submitTopology("global-count-topology", config, builder.buildTopology());
-        
-        Thread.sleep(3000);
-        cluster.shutdown();
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "sum"));
     }
+  }
+
+  public static void main(String[] args) throws Exception {
+    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("global-count", "spout", spout, 3);
+    builder.setBolt("partial-count", new BatchCount(), 5).noneGrouping("spout");
+    builder.setBolt("sum", new UpdateGlobalCount()).globalGrouping("partial-count");
+
+    LocalCluster cluster = new LocalCluster();
+
+    Config config = new Config();
+    config.setDebug(true);
+    config.setMaxSpoutPending(3);
+
+    cluster.submitTopology("global-count-topology", config, builder.buildTopology());
+
+    Thread.sleep(3000);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/TransactionalWords.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/TransactionalWords.java b/src/jvm/storm/starter/TransactionalWords.java
index 0252b66..4ee7b12 100644
--- a/src/jvm/storm/starter/TransactionalWords.java
+++ b/src/jvm/storm/starter/TransactionalWords.java
@@ -15,6 +15,7 @@ import backtype.storm.transactional.TransactionalTopologyBuilder;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+
 import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -22,205 +23,207 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology 
- * processes a stream of words and produces two outputs:
- * 
- * 1. A count for each word (stored in a database)
- * 2. The number of words for every bucket of 10 counts. So it stores in the database how many words have appeared
- * 0-9 times, how many have appeared 10-19 times, and so on. 
- * 
+ * This class defines a more involved transactional topology then TransactionalGlobalCount. This topology processes a
+ * stream of words and produces two outputs:
+ * <p/>
+ * 1. A count for each word (stored in a database) 2. The number of words for every bucket of 10 counts. So it stores in
+ * the database how many words have appeared 0-9 times, how many have appeared 10-19 times, and so on.
+ * <p/>
  * A batch of words can cause the bucket counts to decrement for some buckets and increment for others as words move
  * between buckets as their counts accumulate.
  */
 public class TransactionalWords {
-    public static class CountValue {
-        Integer prev_count = null;
-        int count = 0;
-        BigInteger txid = null;
+  public static class CountValue {
+    Integer prev_count = null;
+    int count = 0;
+    BigInteger txid = null;
+  }
+
+  public static class BucketValue {
+    int count = 0;
+    BigInteger txid;
+  }
+
+  public static final int BUCKET_SIZE = 10;
+
+  public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
+  public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
+
+
+  public static final int PARTITION_TAKE_PER_BATCH = 3;
+
+  public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
+    put(0, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("chicken"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+    }});
+    put(1, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("apple"));
+      add(new Values("banana"));
+    }});
+    put(2, new ArrayList<List<Object>>() {{
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("cat"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+      add(new Values("dog"));
+    }});
+  }};
+
+  public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
+    Map<String, Integer> _counts = new HashMap<String, Integer>();
+    BatchOutputCollector _collector;
+    TransactionAttempt _id;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
+      _collector = collector;
+      _id = id;
     }
 
-    public static class BucketValue {
-        int count = 0;
-        BigInteger txid;
+    @Override
+    public void execute(Tuple tuple) {
+      String key = tuple.getString(1);
+      Integer curr = _counts.get(key);
+      if (curr == null)
+        curr = 0;
+      _counts.put(key, curr + 1);
     }
-    
-    public static final int BUCKET_SIZE = 10;
-        
-    public static Map<String, CountValue> COUNT_DATABASE = new HashMap<String, CountValue>();
-    public static Map<Integer, BucketValue> BUCKET_DATABASE = new HashMap<Integer, BucketValue>();
-    
-    
-    public static final int PARTITION_TAKE_PER_BATCH = 3;
-    
-    public static final Map<Integer, List<List<Object>>> DATA = new HashMap<Integer, List<List<Object>>>() {{
-        put(0, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("chicken"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-        }});
-        put(1, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("apple"));
-            add(new Values("banana"));
-        }});
-        put(2, new ArrayList<List<Object>>() {{
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("cat"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-            add(new Values("dog"));
-        }});
-    }};
-            
-    public static class KeyedCountUpdater extends BaseTransactionalBolt implements ICommitter {
-        Map<String, Integer> _counts = new HashMap<String, Integer>();
-        BatchOutputCollector _collector;
-        TransactionAttempt _id;
-        
-        int _count = 0;
-
-        @Override
-        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
-            _collector = collector;
-            _id = id;
-        }
 
-        @Override
-        public void execute(Tuple tuple) {
-            String key = tuple.getString(1);
-            Integer curr = _counts.get(key);
-            if(curr==null) curr = 0;
-            _counts.put(key, curr + 1);
+    @Override
+    public void finishBatch() {
+      for (String key : _counts.keySet()) {
+        CountValue val = COUNT_DATABASE.get(key);
+        CountValue newVal;
+        if (val == null || !val.txid.equals(_id)) {
+          newVal = new CountValue();
+          newVal.txid = _id.getTransactionId();
+          if (val != null) {
+            newVal.prev_count = val.count;
+            newVal.count = val.count;
+          }
+          newVal.count = newVal.count + _counts.get(key);
+          COUNT_DATABASE.put(key, newVal);
         }
-
-        @Override
-        public void finishBatch() {
-            for(String key: _counts.keySet()) {
-                CountValue val = COUNT_DATABASE.get(key);
-                CountValue newVal;
-                if(val==null || !val.txid.equals(_id)) {
-                    newVal = new CountValue();
-                    newVal.txid = _id.getTransactionId();
-                    if(val!=null) {
-                        newVal.prev_count = val.count;
-                        newVal.count = val.count;
-                    }
-                    newVal.count = newVal.count + _counts.get(key);
-                    COUNT_DATABASE.put(key, newVal);
-                } else {
-                    newVal = val;
-                }
-                _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
-            }
+        else {
+          newVal = val;
         }
+        _collector.emit(new Values(_id, key, newVal.count, newVal.prev_count));
+      }
+    }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "key", "count", "prev-count"));
-        }        
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "key", "count", "prev-count"));
     }
-    
-    public static class Bucketize extends BaseBasicBolt {
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
-            int curr = tuple.getInteger(2);
-            Integer prev = tuple.getInteger(3);
-
-            int currBucket = curr / BUCKET_SIZE;
-            Integer prevBucket = null;
-            if(prev!=null) {
-                prevBucket = prev / BUCKET_SIZE;
-            }
-            
-            if(prevBucket==null) {
-                collector.emit(new Values(attempt, currBucket, 1));                
-            } else if(currBucket != prevBucket) {
-                collector.emit(new Values(attempt, currBucket, 1));
-                collector.emit(new Values(attempt, prevBucket, -1));
-            }
-        }
-        
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("attempt", "bucket", "delta"));
-        }
+  }
+
+  public static class Bucketize extends BaseBasicBolt {
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      TransactionAttempt attempt = (TransactionAttempt) tuple.getValue(0);
+      int curr = tuple.getInteger(2);
+      Integer prev = tuple.getInteger(3);
+
+      int currBucket = curr / BUCKET_SIZE;
+      Integer prevBucket = null;
+      if (prev != null) {
+        prevBucket = prev / BUCKET_SIZE;
+      }
+
+      if (prevBucket == null) {
+        collector.emit(new Values(attempt, currBucket, 1));
+      }
+      else if (currBucket != prevBucket) {
+        collector.emit(new Values(attempt, currBucket, 1));
+        collector.emit(new Values(attempt, prevBucket, -1));
+      }
     }
-    
-    public static class BucketCountUpdater extends BaseTransactionalBolt {
-        Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
-        BatchOutputCollector _collector;
-        TransactionAttempt _attempt;
-        
-        int _count = 0;
-
-        @Override
-        public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
-            _collector = collector;
-            _attempt = attempt;
-        }
 
-        @Override
-        public void execute(Tuple tuple) {
-            Integer bucket = tuple.getInteger(1);
-            Integer delta = tuple.getInteger(2);
-            Integer curr = _accum.get(bucket);
-            if(curr==null) curr = 0;
-            _accum.put(bucket, curr + delta);
-        }
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("attempt", "bucket", "delta"));
+    }
+  }
 
-        @Override
-        public void finishBatch() {
-            for(Integer bucket: _accum.keySet()) {
-                BucketValue currVal = BUCKET_DATABASE.get(bucket);
-                BucketValue newVal;
-                if(currVal==null || !currVal.txid.equals(_attempt.getTransactionId())) {
-                    newVal = new BucketValue();
-                    newVal.txid = _attempt.getTransactionId();
-                    newVal.count = _accum.get(bucket);
-                    if(currVal!=null) newVal.count += currVal.count;
-                    BUCKET_DATABASE.put(bucket, newVal);
-                } else {
-                    newVal = currVal;
-                }
-                _collector.emit(new Values(_attempt, bucket, newVal.count));
-            }
-        }
+  public static class BucketCountUpdater extends BaseTransactionalBolt {
+    Map<Integer, Integer> _accum = new HashMap<Integer, Integer>();
+    BatchOutputCollector _collector;
+    TransactionAttempt _attempt;
+
+    int _count = 0;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt attempt) {
+      _collector = collector;
+      _attempt = attempt;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      Integer bucket = tuple.getInteger(1);
+      Integer delta = tuple.getInteger(2);
+      Integer curr = _accum.get(bucket);
+      if (curr == null)
+        curr = 0;
+      _accum.put(bucket, curr + delta);
+    }
 
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("id", "bucket", "count"));
-        }        
+    @Override
+    public void finishBatch() {
+      for (Integer bucket : _accum.keySet()) {
+        BucketValue currVal = BUCKET_DATABASE.get(bucket);
+        BucketValue newVal;
+        if (currVal == null || !currVal.txid.equals(_attempt.getTransactionId())) {
+          newVal = new BucketValue();
+          newVal.txid = _attempt.getTransactionId();
+          newVal.count = _accum.get(bucket);
+          if (currVal != null)
+            newVal.count += currVal.count;
+          BUCKET_DATABASE.put(bucket, newVal);
+        }
+        else {
+          newVal = currVal;
+        }
+        _collector.emit(new Values(_attempt, bucket, newVal.count));
+      }
     }
-    
-    public static void main(String[] args) throws Exception {
-        MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
-        TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
-        builder.setBolt("count", new KeyedCountUpdater(), 5)
-                .fieldsGrouping("spout", new Fields("word"));
-        builder.setBolt("bucketize", new Bucketize())
-                .noneGrouping("count");
-        builder.setBolt("buckets", new BucketCountUpdater(), 5)
-                .fieldsGrouping("bucketize", new Fields("bucket"));
-        
-        
-        LocalCluster cluster = new LocalCluster();
-        
-        Config config = new Config();
-        config.setDebug(true);
-        config.setMaxSpoutPending(3);
-        
-        cluster.submitTopology("top-n-topology", config, builder.buildTopology());
-        
-        Thread.sleep(3000);
-        cluster.shutdown();
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("id", "bucket", "count"));
     }
+  }
+
+  public static void main(String[] args) throws Exception {
+    MemoryTransactionalSpout spout = new MemoryTransactionalSpout(DATA, new Fields("word"), PARTITION_TAKE_PER_BATCH);
+    TransactionalTopologyBuilder builder = new TransactionalTopologyBuilder("top-n-words", "spout", spout, 2);
+    builder.setBolt("count", new KeyedCountUpdater(), 5).fieldsGrouping("spout", new Fields("word"));
+    builder.setBolt("bucketize", new Bucketize()).noneGrouping("count");
+    builder.setBolt("buckets", new BucketCountUpdater(), 5).fieldsGrouping("bucketize", new Fields("bucket"));
+
+
+    LocalCluster cluster = new LocalCluster();
+
+    Config config = new Config();
+    config.setDebug(true);
+    config.setMaxSpoutPending(3);
+
+    cluster.submitTopology("top-n-topology", config, builder.buildTopology());
+
+    Thread.sleep(3000);
+    cluster.shutdown();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/WordCountTopology.java b/src/jvm/storm/starter/WordCountTopology.java
index bed968d..f26278c 100644
--- a/src/jvm/storm/starter/WordCountTopology.java
+++ b/src/jvm/storm/starter/WordCountTopology.java
@@ -1,11 +1,9 @@
 package storm.starter;
 
-import storm.starter.spout.RandomSentenceSpout;
 import backtype.storm.Config;
 import backtype.storm.LocalCluster;
 import backtype.storm.StormSubmitter;
 import backtype.storm.task.ShellBolt;
-import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -14,6 +12,8 @@ import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import storm.starter.spout.RandomSentenceSpout;
+
 import java.util.HashMap;
 import java.util.Map;
 
@@ -21,70 +21,70 @@ import java.util.Map;
  * This topology demonstrates Storm's stream groupings and multilang capabilities.
  */
 public class WordCountTopology {
-    public static class SplitSentence extends ShellBolt implements IRichBolt {
-        
-        public SplitSentence() {
-            super("python", "splitsentence.py");
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word"));
-        }
-
-        @Override
-        public Map<String, Object> getComponentConfiguration() {
-            return null;
-        }
-    }  
-    
-    public static class WordCount extends BaseBasicBolt {
-        Map<String, Integer> counts = new HashMap<String, Integer>();
-
-        @Override
-        public void execute(Tuple tuple, BasicOutputCollector collector) {
-            String word = tuple.getString(0);
-            Integer count = counts.get(word);
-            if(count==null) count = 0;
-            count++;
-            counts.put(word, count);
-            collector.emit(new Values(word, count));
-        }
-
-        @Override
-        public void declareOutputFields(OutputFieldsDeclarer declarer) {
-            declarer.declare(new Fields("word", "count"));
-        }
+  public static class SplitSentence extends ShellBolt implements IRichBolt {
+
+    public SplitSentence() {
+      super("python", "splitsentence.py");
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+      return null;
+    }
+  }
+
+  public static class WordCount extends BaseBasicBolt {
+    Map<String, Integer> counts = new HashMap<String, Integer>();
+
+    @Override
+    public void execute(Tuple tuple, BasicOutputCollector collector) {
+      String word = tuple.getString(0);
+      Integer count = counts.get(word);
+      if (count == null)
+        count = 0;
+      count++;
+      counts.put(word, count);
+      collector.emit(new Values(word, count));
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word", "count"));
     }
-    
-    public static void main(String[] args) throws Exception {
-        
-        TopologyBuilder builder = new TopologyBuilder();
-        
-        builder.setSpout("spout", new RandomSentenceSpout(), 5);
-        
-        builder.setBolt("split", new SplitSentence(), 8)
-                 .shuffleGrouping("spout");
-        builder.setBolt("count", new WordCount(), 12)
-                 .fieldsGrouping("split", new Fields("word"));
-
-        Config conf = new Config();
-        conf.setDebug(true);
-
-        
-        if(args!=null && args.length > 0) {
-            conf.setNumWorkers(3);
-            
-            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
-        } else {        
-            conf.setMaxTaskParallelism(3);
-
-            LocalCluster cluster = new LocalCluster();
-            cluster.submitTopology("word-count", conf, builder.createTopology());
-        
-            Thread.sleep(10000);
-
-            cluster.shutdown();
-        }
+  }
+
+  public static void main(String[] args) throws Exception {
+
+    TopologyBuilder builder = new TopologyBuilder();
+
+    builder.setSpout("spout", new RandomSentenceSpout(), 5);
+
+    builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
+    builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
+    }
+    else {
+      conf.setMaxTaskParallelism(3);
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("word-count", conf, builder.createTopology());
+
+      Thread.sleep(10000);
+
+      cluster.shutdown();
     }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 27d65a0..6e2551c 100644
--- a/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -1,12 +1,5 @@
 package storm.starter.bolt;
 
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import storm.starter.tools.Rankings;
-import storm.starter.util.TupleHelpers;
 import backtype.storm.Config;
 import backtype.storm.topology.BasicOutputCollector;
 import backtype.storm.topology.OutputFieldsDeclarer;
@@ -14,82 +7,87 @@ import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import storm.starter.tools.Rankings;
+import storm.starter.util.TupleHelpers;
+
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * This abstract bolt provides the basic behavior of bolts that rank objects according to their count.
- * 
+ * <p/>
  * It uses a template method design pattern for {@link AbstractRankerBolt#execute(Tuple, BasicOutputCollector)} to allow
  * actual bolt implementations to specify how incoming tuples are processed, i.e. how the objects embedded within those
  * tuples are retrieved and counted.
- * 
  */
 public abstract class AbstractRankerBolt extends BaseBasicBolt {
 
-    private static final long serialVersionUID = 4931640198501530202L;
-    private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
-    private static final int DEFAULT_COUNT = 10;
+  private static final long serialVersionUID = 4931640198501530202L;
+  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = 2;
+  private static final int DEFAULT_COUNT = 10;
 
-    private final int emitFrequencyInSeconds;
-    private final int count;
-    private final Rankings rankings;
+  private final int emitFrequencyInSeconds;
+  private final int count;
+  private final Rankings rankings;
 
-    public AbstractRankerBolt() {
-        this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-    }
+  public AbstractRankerBolt() {
+    this(DEFAULT_COUNT, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  }
 
-    public AbstractRankerBolt(int topN) {
-        this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
-    }
+  public AbstractRankerBolt(int topN) {
+    this(topN, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  }
 
-    public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
-        if (topN < 1) {
-            throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
-        }
-        if (emitFrequencyInSeconds < 1) {
-            throw new IllegalArgumentException("The emit frequency must be >= 1 seconds (you requested "
-                + emitFrequencyInSeconds + " seconds)");
-        }
-        count = topN;
-        this.emitFrequencyInSeconds = emitFrequencyInSeconds;
-        rankings = new Rankings(count);
+  public AbstractRankerBolt(int topN, int emitFrequencyInSeconds) {
+    if (topN < 1) {
+      throw new IllegalArgumentException("topN must be >= 1 (you requested " + topN + ")");
     }
-
-    protected Rankings getRankings() {
-        return rankings;
+    if (emitFrequencyInSeconds < 1) {
+      throw new IllegalArgumentException(
+          "The emit frequency must be >= 1 seconds (you requested " + emitFrequencyInSeconds + " seconds)");
     }
-
-    /**
-     * This method functions as a template method (design pattern).
-     */
-    @Override
-    public final void execute(Tuple tuple, BasicOutputCollector collector) {
-        if (TupleHelpers.isTickTuple(tuple)) {
-            getLogger().debug("Received tick tuple, triggering emit of current rankings");
-            emitRankings(collector);
-        }
-        else {
-            updateRankingsWithTuple(tuple);
-        }
+    count = topN;
+    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+    rankings = new Rankings(count);
+  }
+
+  protected Rankings getRankings() {
+    return rankings;
+  }
+
+  /**
+   * This method functions as a template method (design pattern).
+   */
+  @Override
+  public final void execute(Tuple tuple, BasicOutputCollector collector) {
+    if (TupleHelpers.isTickTuple(tuple)) {
+      getLogger().debug("Received tick tuple, triggering emit of current rankings");
+      emitRankings(collector);
+    }
+    else {
+      updateRankingsWithTuple(tuple);
     }
+  }
 
-    abstract void updateRankingsWithTuple(Tuple tuple);
+  abstract void updateRankingsWithTuple(Tuple tuple);
 
-    private void emitRankings(BasicOutputCollector collector) {
-        collector.emit(new Values(rankings));
-        getLogger().debug("Rankings: " + rankings);
-    }
+  private void emitRankings(BasicOutputCollector collector) {
+    collector.emit(new Values(rankings));
+    getLogger().debug("Rankings: " + rankings);
+  }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("rankings"));
-    }
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("rankings"));
+  }
 
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Map<String, Object> conf = new HashMap<String, Object>();
-        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
-        return conf;
-    }
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    Map<String, Object> conf = new HashMap<String, Object>();
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+    return conf;
+  }
 
-    abstract Logger getLogger();
+  abstract Logger getLogger();
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java b/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
index 161e67d..6a57d47 100644
--- a/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
+++ b/src/jvm/storm/starter/bolt/IntermediateRankingsBolt.java
@@ -1,43 +1,41 @@
 package storm.starter.bolt;
 
+import backtype.storm.tuple.Tuple;
 import org.apache.log4j.Logger;
-
 import storm.starter.tools.Rankable;
 import storm.starter.tools.RankableObjectWithFields;
-import backtype.storm.tuple.Tuple;
 
 /**
  * This bolt ranks incoming objects by their count.
- * 
+ * <p/>
  * It assumes the input tuples to adhere to the following format: (object, object_count, additionalField1,
  * additionalField2, ..., additionalFieldN).
- * 
  */
 public final class IntermediateRankingsBolt extends AbstractRankerBolt {
 
-    private static final long serialVersionUID = -1369800530256637409L;
-    private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
+  private static final long serialVersionUID = -1369800530256637409L;
+  private static final Logger LOG = Logger.getLogger(IntermediateRankingsBolt.class);
 
-    public IntermediateRankingsBolt() {
-        super();
-    }
+  public IntermediateRankingsBolt() {
+    super();
+  }
 
-    public IntermediateRankingsBolt(int topN) {
-        super(topN);
-    }
+  public IntermediateRankingsBolt(int topN) {
+    super(topN);
+  }
 
-    public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
-        super(topN, emitFrequencyInSeconds);
-    }
+  public IntermediateRankingsBolt(int topN, int emitFrequencyInSeconds) {
+    super(topN, emitFrequencyInSeconds);
+  }
 
-    @Override
-    void updateRankingsWithTuple(Tuple tuple) {
-        Rankable rankable = RankableObjectWithFields.from(tuple);
-        super.getRankings().updateWith(rankable);
-    }
+  @Override
+  void updateRankingsWithTuple(Tuple tuple) {
+    Rankable rankable = RankableObjectWithFields.from(tuple);
+    super.getRankings().updateWith(rankable);
+  }
 
-    @Override
-    Logger getLogger() {
-        return LOG;
-    }
+  @Override
+  Logger getLogger() {
+    return LOG;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/PrinterBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/PrinterBolt.java b/src/jvm/storm/starter/bolt/PrinterBolt.java
index ac380ed..8b2e62b 100644
--- a/src/jvm/storm/starter/bolt/PrinterBolt.java
+++ b/src/jvm/storm/starter/bolt/PrinterBolt.java
@@ -8,13 +8,13 @@ import backtype.storm.tuple.Tuple;
 
 public class PrinterBolt extends BaseBasicBolt {
 
-    @Override
-    public void execute(Tuple tuple, BasicOutputCollector collector) {
-        System.out.println(tuple);
-    }
+  @Override
+  public void execute(Tuple tuple, BasicOutputCollector collector) {
+    System.out.println(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer ofd) {
+  }
 
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer ofd) {
-    }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a51c8247/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/starter/bolt/RollingCountBolt.java b/src/jvm/storm/starter/bolt/RollingCountBolt.java
index 0066b61..d547749 100644
--- a/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -1,14 +1,5 @@
 package storm.starter.bolt;
 
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.log4j.Logger;
-
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
 import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
@@ -17,110 +8,118 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import org.apache.log4j.Logger;
+import storm.starter.tools.NthLastModifiedTimeTracker;
+import storm.starter.tools.SlidingWindowCounter;
+import storm.starter.util.TupleHelpers;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
 
 /**
  * This bolt performs rolling counts of incoming objects, i.e. sliding window based counting.
- * 
+ * <p/>
  * The bolt is configured by two parameters, the length of the sliding window in seconds (which influences the output
  * data of the bolt, i.e. how it will count objects) and the emit frequency in seconds (which influences how often the
  * bolt will output the latest window counts). For instance, if the window length is set to an equivalent of five
  * minutes and the emit frequency to one minute, then the bolt will output the latest five-minute sliding window every
  * minute.
- * 
+ * <p/>
  * The bolt emits a rolling count tuple per object, consisting of the object itself, its latest rolling count, and the
  * actual duration of the sliding window. The latter is included in case the expected sliding window length (as
  * configured by the user) is different from the actual length, e.g. due to high system load. Note that the actual
  * window length is tracked and calculated for the window, and not individually for each object within a window.
- * 
+ * <p/>
  * Note: During the startup phase you will usually observe that the bolt warns you about the actual sliding window
  * length being smaller than the expected length. This behavior is expected and is caused by the way the sliding window
  * counts are initially "loaded up". You can safely ignore this warning during startup (e.g. you will see this warning
  * during the first ~ five minutes of startup time if the window length is set to five minutes).
- * 
  */
 public class RollingCountBolt extends BaseRichBolt {
 
-    private static final long serialVersionUID = 5537727428628598519L;
-    private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
-    private static final int NUM_WINDOW_CHUNKS = 5;
-    private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
-    private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
-    private static final String WINDOW_LENGTH_WARNING_TEMPLATE = "Actual window length is %d seconds when it should be %d seconds"
-        + " (you can safely ignore this warning during the startup phase)";
-
-    private final SlidingWindowCounter<Object> counter;
-    private final int windowLengthInSeconds;
-    private final int emitFrequencyInSeconds;
-    private OutputCollector collector;
-    private NthLastModifiedTimeTracker lastModifiedTracker;
-
-    public RollingCountBolt() {
-        this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  private static final long serialVersionUID = 5537727428628598519L;
+  private static final Logger LOG = Logger.getLogger(RollingCountBolt.class);
+  private static final int NUM_WINDOW_CHUNKS = 5;
+  private static final int DEFAULT_SLIDING_WINDOW_IN_SECONDS = NUM_WINDOW_CHUNKS * 60;
+  private static final int DEFAULT_EMIT_FREQUENCY_IN_SECONDS = DEFAULT_SLIDING_WINDOW_IN_SECONDS / NUM_WINDOW_CHUNKS;
+  private static final String WINDOW_LENGTH_WARNING_TEMPLATE =
+      "Actual window length is %d seconds when it should be %d seconds"
+          + " (you can safely ignore this warning during the startup phase)";
+
+  private final SlidingWindowCounter<Object> counter;
+  private final int windowLengthInSeconds;
+  private final int emitFrequencyInSeconds;
+  private OutputCollector collector;
+  private NthLastModifiedTimeTracker lastModifiedTracker;
+
+  public RollingCountBolt() {
+    this(DEFAULT_SLIDING_WINDOW_IN_SECONDS, DEFAULT_EMIT_FREQUENCY_IN_SECONDS);
+  }
+
+  public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
+    this.windowLengthInSeconds = windowLengthInSeconds;
+    this.emitFrequencyInSeconds = emitFrequencyInSeconds;
+    counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
+        this.emitFrequencyInSeconds));
+  }
+
+  private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
+    return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+  }
+
+  @SuppressWarnings("rawtypes")
+  @Override
+  public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+    this.collector = collector;
+    lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
+        this.emitFrequencyInSeconds));
+  }
+
+  @Override
+  public void execute(Tuple tuple) {
+    if (TupleHelpers.isTickTuple(tuple)) {
+      LOG.debug("Received tick tuple, triggering emit of current window counts");
+      emitCurrentWindowCounts();
     }
-
-    public RollingCountBolt(int windowLengthInSeconds, int emitFrequencyInSeconds) {
-        this.windowLengthInSeconds = windowLengthInSeconds;
-        this.emitFrequencyInSeconds = emitFrequencyInSeconds;
-        counter = new SlidingWindowCounter<Object>(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
-            this.emitFrequencyInSeconds));
+    else {
+      countObjAndAck(tuple);
     }
-
-    private int deriveNumWindowChunksFrom(int windowLengthInSeconds, int windowUpdateFrequencyInSeconds) {
-        return windowLengthInSeconds / windowUpdateFrequencyInSeconds;
+  }
+
+  private void emitCurrentWindowCounts() {
+    Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
+    int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
+    lastModifiedTracker.markAsModified();
+    if (actualWindowLengthInSeconds != windowLengthInSeconds) {
+      LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
     }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-        lastModifiedTracker = new NthLastModifiedTimeTracker(deriveNumWindowChunksFrom(this.windowLengthInSeconds,
-            this.emitFrequencyInSeconds));
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-        if (TupleHelpers.isTickTuple(tuple)) {
-            LOG.debug("Received tick tuple, triggering emit of current window counts");
-            emitCurrentWindowCounts();
-        }
-        else {
-            countObjAndAck(tuple);
-        }
-    }
-
-    private void emitCurrentWindowCounts() {
-        Map<Object, Long> counts = counter.getCountsThenAdvanceWindow();
-        int actualWindowLengthInSeconds = lastModifiedTracker.secondsSinceOldestModification();
-        lastModifiedTracker.markAsModified();
-        if (actualWindowLengthInSeconds != windowLengthInSeconds) {
-            LOG.warn(String.format(WINDOW_LENGTH_WARNING_TEMPLATE, actualWindowLengthInSeconds, windowLengthInSeconds));
-        }
-        emit(counts, actualWindowLengthInSeconds);
-    }
-
-    private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
-        for (Entry<Object, Long> entry : counts.entrySet()) {
-            Object obj = entry.getKey();
-            Long count = entry.getValue();
-            collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
-        }
-    }
-
-    private void countObjAndAck(Tuple tuple) {
-        Object obj = tuple.getValue(0);
-        counter.incrementCount(obj);
-        collector.ack(tuple);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Map<String, Object> conf = new HashMap<String, Object>();
-        conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
-        return conf;
+    emit(counts, actualWindowLengthInSeconds);
+  }
+
+  private void emit(Map<Object, Long> counts, int actualWindowLengthInSeconds) {
+    for (Entry<Object, Long> entry : counts.entrySet()) {
+      Object obj = entry.getKey();
+      Long count = entry.getValue();
+      collector.emit(new Values(obj, count, actualWindowLengthInSeconds));
     }
+  }
+
+  private void countObjAndAck(Tuple tuple) {
+    Object obj = tuple.getValue(0);
+    counter.incrementCount(obj);
+    collector.ack(tuple);
+  }
+
+  @Override
+  public void declareOutputFields(OutputFieldsDeclarer declarer) {
+    declarer.declare(new Fields("obj", "count", "actualWindowLengthInSeconds"));
+  }
+
+  @Override
+  public Map<String, Object> getComponentConfiguration() {
+    Map<String, Object> conf = new HashMap<String, Object>();
+    conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, emitFrequencyInSeconds);
+    return conf;
+  }
 }


Mime
View raw message