storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2327 ConfigurableTopology
Date Thu, 02 Feb 2017 07:05:53 GMT
Repository: storm
Updated Branches:
  refs/heads/master 9c8d7b916 -> 81aea91f1


STORM-2327 ConfigurableTopology


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8685807b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8685807b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8685807b

Branch: refs/heads/master
Commit: 8685807b9a9c77dad3ecb8507e985906bce974b5
Parents: 1811273
Author: Julien Nioche <julien@digitalpebble.com>
Authored: Thu Jan 26 14:44:01 2017 +0000
Committer: Julien Nioche <julien@digitalpebble.com>
Committed: Tue Jan 31 16:20:30 2017 +0000

----------------------------------------------------------------------
 examples/storm-starter/README.markdown          |  14 +-
 .../storm/starter/ExclamationTopology.java      |  35 ++--
 .../apache/storm/starter/RollingTopWords.java   | 117 +++++-------
 .../storm/starter/SkewedRollingTopWords.java    | 123 +++++--------
 .../apache/storm/starter/WordCountTopology.java |  42 +++--
 .../apache/storm/starter/util/StormRunner.java  |  50 -----
 .../storm/topology/ConfigurableTopology.java    | 184 +++++++++++++++++++
 7 files changed, 318 insertions(+), 247 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/examples/storm-starter/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-starter/README.markdown b/examples/storm-starter/README.markdown
index ca0a253..f8693a1 100644
--- a/examples/storm-starter/README.markdown
+++ b/examples/storm-starter/README.markdown
@@ -88,21 +88,21 @@ Example filename of the uberjar:
 You can submit (run) a topology contained in this uberjar to Storm via the `storm` CLI tool:
 
     # Example 1: Run the ExclamationTopology in local mode (LocalCluster)
-    $ storm jar target/storm-starter-*.jar org.apache.storm.starter.ExclamationTopology
+    $ storm jar target/storm-starter-*.jar org.apache.storm.starter.ExclamationTopology -local
 
     # Example 2: Run the RollingTopWords in remote/cluster mode,
     #            under the name "production-topology"
-    $ storm jar target/storm-starter-*.jar org.apache.storm.starter.RollingTopWords production-topology
remote
+    $ storm jar target/storm-starter-*.jar org.apache.storm.starter.RollingTopWords production-topology
 
 With submitting you can run topologies which use multilang, for example, `WordCountTopology`.
 
 _Submitting a topology in local vs. remote mode:_
 It depends on the actual code of a topology how you can or even must tell Storm whether to
run the topology locally (in
-an in-memory LocalCluster instance of Storm) or remotely (in a "real" Storm cluster).  In
the case of
-[RollingTopWords](src/jvm/org/apache/storm/starter/RollingTopWords.java), for instance, this
can be done by passing command line
-arguments.
-Topologies other than `RollingTopWords` -- such as [ExclamationTopology](src/jvm/org/apache/storm/starter/ExclamationTopology.java)
--- may behave differently, e.g. by always submitting to a remote cluster (i.e. hardcoded
in a way that you, as a user,
+an in-memory LocalCluster instance of Storm) or remotely (in a "real" Storm cluster).  In
the case of topologies extending
+[ConfigurableTopology](https://github.com/apache/storm/tree/master/storm-core/src/jvm/org/apache/storm/topology/ConfigurableTopology.java),
+such as [RollingTopWords](src/jvm/org/apache/storm/starter/RollingTopWords.java) or [ExclamationTopology](src/jvm/org/apache/storm/starter/ExclamationTopology.java),

+this is done by specifying `-local` in the command line arguments.
+Other topologies may behave differently, e.g. by always submitting to a remote cluster (i.e.
hardcoded in a way that you, as a user,
 cannot change without modifying the topology code), or by requiring a customized configuration
file that the topology
 code will parse prior submitting the topology to Storm.  Similarly, further options such
as the name of the topology may
 be user-configurable or be hardcoded into the topology code.  So make sure you understand
how the topology of your

http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
index 7de8645..0b04709 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java
@@ -17,27 +17,23 @@
  */
 package org.apache.storm.starter;
 
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
+import java.util.Map;
+
 import org.apache.storm.task.OutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.ConfigurableTopology;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.topology.base.BaseRichBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-import java.util.Map;
 
 /**
  * This is a basic example of a Storm topology.
  */
-public class ExclamationTopology {
+public class ExclamationTopology extends ConfigurableTopology {
 
   public static class ExclamationBolt extends BaseRichBolt {
     OutputCollector _collector;
@@ -58,30 +54,33 @@ public class ExclamationTopology {
       declarer.declare(new Fields("word"));
     }
 
-
   }
 
   public static void main(String[] args) throws Exception {
+    ConfigurableTopology.start(new ExclamationTopology(), args);
+  }
+
+  protected int run(String[] args) {
     TopologyBuilder builder = new TopologyBuilder();
 
     builder.setSpout("word", new TestWordSpout(), 10);
     builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
     builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
 
-    Config conf = new Config();
     conf.setDebug(true);
 
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
+    String topologyName = "test";
 
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    if (isLocal) {
+      ttl = 10;
+    } else {
+      conf.setNumWorkers(3);
     }
-    else {
 
-      try (LocalCluster cluster = new LocalCluster();
-           LocalTopology topo = cluster.submitTopology("test", conf, builder.createTopology());)
{
-        Utils.sleep(10000);
-      }
+    if (args != null && args.length > 0) {
+      topologyName = args[0];
     }
+
+    return submit(topologyName, conf, builder);
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
index b5ee161..7ebbaf8 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/RollingTopWords.java
@@ -17,114 +17,81 @@
  */
 package org.apache.storm.starter;
 
-import org.apache.storm.Config;
-import org.apache.storm.testing.TestWordSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.log4j.Logger;
 import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
 import org.apache.storm.starter.bolt.RollingCountBolt;
 import org.apache.storm.starter.bolt.TotalRankingsBolt;
-import org.apache.storm.starter.util.StormRunner;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.ConfigurableTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This topology does a continuous computation of the top N words that the topology has seen
in terms of cardinality.
- * The top N computation is done in a completely scalable way, and a similar approach could
be used to compute things
- * like trending topics or trending images on Twitter.
+ * This topology does a continuous computation of the top N words that the
+ * topology has seen in terms of cardinality. The top N computation is done in a
+ * completely scalable way, and a similar approach could be used to compute
+ * things like trending topics or trending images on Twitter.
  */
-public class RollingTopWords {
+public class RollingTopWords extends ConfigurableTopology {
 
-  private static final Logger LOG = Logger.getLogger(RollingTopWords.class);
-  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
+  private static final Logger LOG = LoggerFactory.getLogger(RollingTopWords.class);
   private static final int TOP_N = 5;
 
-  private final TopologyBuilder builder;
-  private final String topologyName;
-  private final Config topologyConfig;
-  private final int runtimeInSeconds;
-
-  public RollingTopWords(String topologyName) throws InterruptedException {
-    builder = new TopologyBuilder();
-    this.topologyName = topologyName;
-    topologyConfig = createTopologyConfiguration();
-    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
-    wireTopology();
-  }
-
-  private static Config createTopologyConfiguration() {
-    Config conf = new Config();
-    conf.setDebug(true);
-    return conf;
+  private RollingTopWords() {
   }
 
-  private void wireTopology() throws InterruptedException {
-    String spoutId = "wordGenerator";
-    String counterId = "counter";
-    String intermediateRankerId = "intermediateRanker";
-    String totalRankerId = "finalRanker";
-    builder.setSpout(spoutId, new TestWordSpout(), 5);
-    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new
Fields("word"));
-    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId,
new Fields(
-        "obj"));
-    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-  }
-
-  public void runLocally() throws InterruptedException {
-    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig,
runtimeInSeconds);
-  }
-
-  public void runRemotely() throws Exception {
-    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+  public static void main(String[] args) throws Exception {
+    ConfigurableTopology.start(new RollingTopWords(), args);
   }
 
   /**
    * Submits (runs) the topology.
    *
-   * Usage: "RollingTopWords [topology-name] [local|remote]"
+   * Usage: "RollingTopWords [topology-name] [-local]"
    *
-   * By default, the topology is run locally under the name "slidingWindowCounts".
+   * By default, the topology is run locally under the name
+   * "slidingWindowCounts".
    *
    * Examples:
    *
    * ```
    *
    * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
foobar
-   *
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
-local
+   * 
    * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
foobar local
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
foobar -local
+   * 
+   * # Runs in local mode (LocalCluster) for 30 seconds, with topology name "foobar" 
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
foobar -local -ttl 30
    *
    * # Runs in remote/cluster mode, with topology name "production-topology"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
production-topology remote
-   * ```
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
production-topology ```
    *
-   * @param args First positional argument (optional) is topology name, second positional
argument (optional) defines
-   *             whether to run the topology locally ("local") or remotely, i.e. on a real
cluster ("remote").
+   * @param args
+   *          First positional argument (optional) is topology name, second
+   *          positional argument (optional) defines whether to run the topology
+   *          locally ("-local") or remotely, i.e. on a real cluster.
    * @throws Exception
    */
-  public static void main(String[] args) throws Exception {
+  protected int run(String[] args) {
     String topologyName = "slidingWindowCounts";
     if (args.length >= 1) {
       topologyName = args[0];
     }
-    boolean runLocally = true;
-    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
-      runLocally = false;
-    }
-
+    TopologyBuilder builder = new TopologyBuilder();
+    String spoutId = "wordGenerator";
+    String counterId = "counter";
+    String intermediateRankerId = "intermediateRanker";
+    String totalRankerId = "finalRanker";
+    builder.setSpout(spoutId, new TestWordSpout(), 5);
+    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).fieldsGrouping(spoutId, new
Fields("word"));
+    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(counterId,
+        new Fields("obj"));
+    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
     LOG.info("Topology name: " + topologyName);
-    RollingTopWords rtw = new RollingTopWords(topologyName);
-    if (runLocally) {
-      LOG.info("Running in local mode");
-      rtw.runLocally();
-    }
-    else {
-      LOG.info("Running in remote (cluster) mode");
-      rtw.runRemotely();
-    }
+
+    return submit(topologyName, conf, builder);
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
index 3addc15..83ad4fc 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/SkewedRollingTopWords.java
@@ -17,116 +17,85 @@
  */
 package org.apache.storm.starter;
 
-import org.apache.storm.Config;
-import org.apache.storm.testing.TestWordSpout;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.tuple.Fields;
-import org.apache.log4j.Logger;
 import org.apache.storm.starter.bolt.IntermediateRankingsBolt;
-import org.apache.storm.starter.bolt.RollingCountBolt;
 import org.apache.storm.starter.bolt.RollingCountAggBolt;
+import org.apache.storm.starter.bolt.RollingCountBolt;
 import org.apache.storm.starter.bolt.TotalRankingsBolt;
-import org.apache.storm.starter.util.StormRunner;
+import org.apache.storm.testing.TestWordSpout;
+import org.apache.storm.topology.ConfigurableTopology;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
- * This topology does a continuous computation of the top N words that the topology has seen
in terms of cardinality.
- * The top N computation is done in a completely scalable way, and a similar approach could
be used to compute things
- * like trending topics or trending images on Twitter. It takes an approach that assumes
that some works will be much
- * more common then other words, and uses partialKeyGrouping to better balance the skewed
load.
+ * This topology does a continuous computation of the top N words that the
+ * topology has seen in terms of cardinality. The top N computation is done in a
+ * completely scalable way, and a similar approach could be used to compute
+ * things like trending topics or trending images on Twitter. It takes an
+ * approach that assumes that some works will be much more common then other
+ * words, and uses partialKeyGrouping to better balance the skewed load.
  */
-public class SkewedRollingTopWords {
-  private static final Logger LOG = Logger.getLogger(SkewedRollingTopWords.class);
-  private static final int DEFAULT_RUNTIME_IN_SECONDS = 60;
-  private static final int TOP_N = 5;
-
-  private final TopologyBuilder builder;
-  private final String topologyName;
-  private final Config topologyConfig;
-  private final int runtimeInSeconds;
-
-  public SkewedRollingTopWords(String topologyName) throws InterruptedException {
-    builder = new TopologyBuilder();
-    this.topologyName = topologyName;
-    topologyConfig = createTopologyConfiguration();
-    runtimeInSeconds = DEFAULT_RUNTIME_IN_SECONDS;
-
-    wireTopology();
-  }
-
-  private static Config createTopologyConfiguration() {
-    Config conf = new Config();
-    conf.setDebug(true);
-    return conf;
-  }
+public class SkewedRollingTopWords extends ConfigurableTopology {
 
-  private void wireTopology() throws InterruptedException {
-    String spoutId = "wordGenerator";
-    String counterId = "counter";
-    String aggId = "aggregator";
-    String intermediateRankerId = "intermediateRanker";
-    String totalRankerId = "finalRanker";
-    builder.setSpout(spoutId, new TestWordSpout(), 5);
-    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId,
new Fields("word"));
-    builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
-    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId,
new Fields("obj"));
-    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
-  }
+  private static final Logger LOG = LoggerFactory.getLogger(SkewedRollingTopWords.class);
+  private static final int TOP_N = 5;
 
-  public void runLocally() throws InterruptedException {
-    StormRunner.runTopologyLocally(builder.createTopology(), topologyName, topologyConfig,
runtimeInSeconds);
+  private SkewedRollingTopWords() {
   }
 
-  public void runRemotely() throws Exception {
-    StormRunner.runTopologyRemotely(builder.createTopology(), topologyName, topologyConfig);
+  public static void main(String[] args) throws Exception {
+    ConfigurableTopology.start(new SkewedRollingTopWords(), args);
   }
 
   /**
    * Submits (runs) the topology.
    *
-   * Usage: "RollingTopWords [topology-name] [local|remote]"
+   * Usage: "SkewedRollingTopWords [topology-name] [-local]"
    *
-   * By default, the topology is run locally under the name "slidingWindowCounts".
+   * By default, the topology is run locally under the name
+   * "slidingWindowCounts".
    *
    * Examples:
    *
    * ```
    *
    * # Runs in local mode (LocalCluster), with topology name "slidingWindowCounts"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
-   *
-   * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
foobar
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords
-local
    *
    * # Runs in local mode (LocalCluster), with topology name "foobar"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
foobar local
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords
foobar -local
+   * 
+   * # Runs in local mode (LocalCluster) for 30 seconds, with topology name "foobar" 
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords
foobar -local -ttl 30
    *
    * # Runs in remote/cluster mode, with topology name "production-topology"
-   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.RollingTopWords
production-topology remote
-   * ```
+   * $ storm jar storm-starter-jar-with-dependencies.jar org.apache.storm.starter.SkewedRollingTopWords
production-topology ```
    *
-   * @param args First positional argument (optional) is topology name, second positional
argument (optional) defines
-   *             whether to run the topology locally ("local") or remotely, i.e. on a real
cluster ("remote").
+   * @param args
+   *          First positional argument (optional) is topology name, second
+   *          positional argument (optional) defines whether to run the topology
+   *          locally ("-local") or remotely, i.e. on a real cluster.
    * @throws Exception
    */
-  public static void main(String[] args) throws Exception {
+  protected int run(String[] args) {
     String topologyName = "slidingWindowCounts";
     if (args.length >= 1) {
       topologyName = args[0];
     }
-    boolean runLocally = true;
-    if (args.length >= 2 && args[1].equalsIgnoreCase("remote")) {
-      runLocally = false;
-    }
-
+    TopologyBuilder builder = new TopologyBuilder();
+    String spoutId = "wordGenerator";
+    String counterId = "counter";
+    String aggId = "aggregator";
+    String intermediateRankerId = "intermediateRanker";
+    String totalRankerId = "finalRanker";
+    builder.setSpout(spoutId, new TestWordSpout(), 5);
+    builder.setBolt(counterId, new RollingCountBolt(9, 3), 4).partialKeyGrouping(spoutId,
new Fields("word"));
+    builder.setBolt(aggId, new RollingCountAggBolt(), 4).fieldsGrouping(counterId, new Fields("obj"));
+    builder.setBolt(intermediateRankerId, new IntermediateRankingsBolt(TOP_N), 4).fieldsGrouping(aggId,
new Fields("obj"));
+    builder.setBolt(totalRankerId, new TotalRankingsBolt(TOP_N)).globalGrouping(intermediateRankerId);
     LOG.info("Topology name: " + topologyName);
-    SkewedRollingTopWords rtw = new SkewedRollingTopWords(topologyName);
-    if (runLocally) {
-      LOG.info("Running in local mode");
-      rtw.runLocally();
-    }
-    else {
-      LOG.info("Running in remote (cluster) mode");
-      rtw.runRemotely();
-    }
+
+    return submit(topologyName, conf, builder);
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
index c244fa1..576fcd9 100644
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
+++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java
@@ -17,12 +17,13 @@
  */
 package org.apache.storm.starter;
 
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.storm.starter.spout.RandomSentenceSpout;
 import org.apache.storm.task.ShellBolt;
 import org.apache.storm.topology.BasicOutputCollector;
+import org.apache.storm.topology.ConfigurableTopology;
 import org.apache.storm.topology.IRichBolt;
 import org.apache.storm.topology.OutputFieldsDeclarer;
 import org.apache.storm.topology.TopologyBuilder;
@@ -30,15 +31,12 @@ import org.apache.storm.topology.base.BaseBasicBolt;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Tuple;
 import org.apache.storm.tuple.Values;
-import org.apache.storm.starter.spout.RandomSentenceSpout;
-
-import java.util.HashMap;
-import java.util.Map;
 
 /**
- * This topology demonstrates Storm's stream groupings and multilang capabilities.
+ * This topology demonstrates Storm's stream groupings and multilang
+ * capabilities.
  */
-public class WordCountTopology {
+public class WordCountTopology extends ConfigurableTopology {
   public static class SplitSentence extends ShellBolt implements IRichBolt {
 
     public SplitSentence() {
@@ -77,6 +75,10 @@ public class WordCountTopology {
   }
 
   public static void main(String[] args) throws Exception {
+    ConfigurableTopology.start(new WordCountTopology(), args);
+  }
+
+  protected int run(String[] args) {
 
     TopologyBuilder builder = new TopologyBuilder();
 
@@ -85,21 +87,21 @@ public class WordCountTopology {
     builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
     builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
 
-    Config conf = new Config();
     conf.setDebug(true);
 
-    if (args != null && args.length > 0) {
-      conf.setNumWorkers(3);
+    String topologyName = "word-count";
 
-      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
-    }
-    else {
+    if (isLocal) {
       conf.setMaxTaskParallelism(3);
+      ttl = 10;
+    } else {
+      conf.setNumWorkers(3);
+    }
 
-      try (LocalCluster cluster = new LocalCluster();
-           LocalTopology topo = cluster.submitTopology("word-count", conf, builder.createTopology());)
{
-        Thread.sleep(10000);
-      }
+    if (args != null && args.length > 0) {
+      topologyName = args[0];
     }
+
+    return submit(topologyName, conf, builder);
   }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
b/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
deleted file mode 100644
index 06ec4d7..0000000
--- a/examples/storm-starter/src/jvm/org/apache/storm/starter/util/StormRunner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.starter.util;
-
-import org.apache.storm.Config;
-import org.apache.storm.LocalCluster;
-import org.apache.storm.LocalCluster.LocalTopology;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.AlreadyAliveException;
-import org.apache.storm.generated.AuthorizationException;
-import org.apache.storm.generated.InvalidTopologyException;
-import org.apache.storm.generated.StormTopology;
-
-public final class StormRunner {
-
-  private static final int MILLIS_IN_SEC = 1000;
-
-  private StormRunner() {
-  }
-
-  public static void runTopologyLocally(StormTopology topology, String topologyName, Config
conf, int runtimeInSeconds)
-      throws InterruptedException {
-    try (LocalCluster cluster = new LocalCluster();
-         LocalTopology topo = cluster.submitTopology(topologyName, conf, topology);) {
-      Thread.sleep((long) runtimeInSeconds * MILLIS_IN_SEC);
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-  }
-
-  public static void runTopologyRemotely(StormTopology topology, String topologyName, Config
conf)
-      throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
-    StormSubmitter.submitTopology(topologyName, conf, topology);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/8685807b/storm-core/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/topology/ConfigurableTopology.java b/storm-core/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
new file mode 100644
index 0000000..41f0b64
--- /dev/null
+++ b/storm-core/src/jvm/org/apache/storm/topology/ConfigurableTopology.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.topology;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.LocalCluster.LocalTopology;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.utils.Utils;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Extensions of this class use command line arguments to determine whether the
+ * topology should run locally or remote, with a time to live and takes a
+ * reference to one or more configuration files. The main() method should call
+ * ConfigurableTopology.start() and it must instantiate a TopologyBuilder in the
+ * run() method.
+ * 
+ * <pre>
+ * {
+ *    public class MyTopology extends ConfigurableTopology {
+ *
+  *   public static void main(String[] args) throws Exception {
+  *       ConfigurableTopology.start(new MyTopology(), args);
+  *   }
+  *
+  *   &#64;Override
+  *   protected int run(String[] args) {
+  *       TopologyBuilder builder = new TopologyBuilder();
+  *
+  *       // build topology as usual
+  *    
+  *       return submit("crawl", conf, builder);
+  *   }
+ * }
+ * </pre>
+ **/
+public abstract class ConfigurableTopology {
+
+    protected Config conf = new Config();
+    protected boolean isLocal = false;
+    protected int ttl = -1;
+
+    public static void start(ConfigurableTopology topology, String args[]) {
+        String[] remainingArgs = topology.parse(args);
+        topology.run(remainingArgs);
+    }
+
+    protected Config getConf() {
+        return conf;
+    }
+
+    protected abstract int run(String args[]);
+
+    /** Submits the topology with the name taken from the configuration **/
+    protected int submit(Config conf, TopologyBuilder builder) {
+        String name = (String) Utils.get(conf, Config.TOPOLOGY_NAME, null);
+        if (StringUtils.isBlank(name))
+            throw new RuntimeException(
+                    "No value found for " + Config.TOPOLOGY_NAME);
+        return submit(name, conf, builder);
+    }
+
+    /** Submits the topology under a specific name **/
+    protected int submit(String name, Config conf, TopologyBuilder builder) {
+
+        if (isLocal) {
+            try (LocalCluster cluster = new LocalCluster();
+                    LocalTopology topo = cluster.submitTopology(name, conf,
+                            builder.createTopology());) {
+                if (ttl != -1) {
+                    Utils.sleep(ttl * 1000);
+                    cluster.shutdown();
+                }
+            } catch (Exception e) {
+                e.printStackTrace();
+                return -1;
+            }
+        }
+
+        else {
+            try {
+                StormSubmitter.submitTopology(name, conf,
+                        builder.createTopology());
+            } catch (Exception e) {
+                e.printStackTrace();
+                return -1;
+            }
+        }
+        return 0;
+    }
+
+    private String[] parse(String args[]) {
+
+        List<String> newArgs = new ArrayList<>();
+        Collections.addAll(newArgs, args);
+
+        Iterator<String> iter = newArgs.iterator();
+        while (iter.hasNext()) {
+            String param = iter.next();
+            if (param.equals("-conf")) {
+                if (!iter.hasNext()) {
+                    throw new RuntimeException("Conf file not specified");
+                }
+                iter.remove();
+                String resource = iter.next();
+                try {
+                    loadConf(resource, conf);
+                } catch (FileNotFoundException e) {
+                    throw new RuntimeException("File not found : " + resource);
+                }
+                iter.remove();
+            } else if (param.equals("-local")) {
+                isLocal = true;
+                iter.remove();
+            } else if (param.equals("-ttl")) {
+                if (!iter.hasNext()) {
+                    throw new RuntimeException("ttl value not specified");
+                }
+                iter.remove();
+                String ttlValue = iter.next();
+                try {
+                    ttl = Integer.parseInt(ttlValue);
+                } catch (NumberFormatException nfe) {
+                    throw new RuntimeException("ttl value incorrect");
+                }
+                iter.remove();
+            }
+        }
+
+        return newArgs.toArray(new String[newArgs.size()]);
+    }
+
+    public static Config loadConf(String resource, Config conf)
+            throws FileNotFoundException {
+        Yaml yaml = new Yaml();
+        Map ret = (Map) yaml.load(new InputStreamReader(
+                new FileInputStream(resource), Charset.defaultCharset()));
+        if (ret == null) {
+            ret = new HashMap();
+        }
+        // If the config consists of a single key 'config', its values are used
+        // instead. This means that the same config files can be used with Flux
+        // and the ConfigurableTopology.
+        else {
+            if (ret.size() == 1) {
+                Object confNode = ret.get("config");
+                if (confNode != null && confNode instanceof Map) {
+                    ret = (Map) ret;
+                }
+            }
+        }
+        conf.putAll(ret);
+        return conf;
+    }
+}


Mime
View raw message