storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2391 Moving HdfsSpoutTopology into storm-hdfs-examples module
Date Sat, 04 Mar 2017 02:57:37 GMT
Repository: storm
Updated Branches:
  refs/heads/master eb0e0edfe -> b7fe86077


STORM-2391 Moving HdfsSpoutTopology into storm-hdfs-examples module


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62f14b89
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62f14b89
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62f14b89

Branch: refs/heads/master
Commit: 62f14b89630bf498cc77808524b52daa101a14bb
Parents: eb0e0ed
Author: Roshan Naik <roshan@HW13642.local>
Authored: Wed Mar 1 17:56:23 2017 -0800
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Sat Mar 4 11:45:13 2017 +0900

----------------------------------------------------------------------
 .../storm/hdfs/spout/HdfsSpoutTopology.java     | 192 +++++++++++++++++++
 .../jvm/storm/starter/HdfsSpoutTopology.java    | 143 --------------
 2 files changed, 192 insertions(+), 143 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/62f14b89/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
new file mode 100644
index 0000000..b787c54
--- /dev/null
+++ b/examples/storm-hdfs-examples/src/main/java/org/apache/storm/hdfs/spout/HdfsSpoutTopology.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.hdfs.spout;
+
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.KillOptions;
+import org.apache.storm.generated.Nimbus;
+import org.apache.storm.generated.TopologyInfo;
+import org.apache.storm.generated.TopologySummary;
+import org.apache.storm.generated.ExecutorSummary;
+import org.apache.storm.generated.SpoutStats;
+import org.apache.storm.generated.ClusterSummary;
+import org.apache.storm.metric.LoggingMetricsConsumer;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.utils.NimbusClient;
+import org.apache.storm.utils.Utils;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+
+public class HdfsSpoutTopology {
+
+  public static final String SPOUT_ID = "hdfsspout";
+  public static final String BOLT_ID = "constbolt";
+
+
+  public static class ConstBolt extends BaseRichBolt {
+    private static final long serialVersionUID = -5313598399155365865L;
+    public static final String FIELDS = "message";
+    private OutputCollector collector;
+    private static final Logger log = LoggerFactory.getLogger(ConstBolt.class);
+    int count =0;
+
+    public ConstBolt() {
+    }
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+      this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      log.info("Received tuple : {}", tuple.getValue(0));
+      count++;
+      if(count==3) {
+        collector.fail(tuple);
+      }
+      else {
+        collector.ack(tuple);
+      }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields(FIELDS));
+    }
+  } // class
+
+  /** Copies text file content from sourceDir to destinationDir. Moves source files into
sourceDir after its done consuming */
+  public static void main(String[] args) throws Exception {
+    // 0 - validate args
+    if (args.length < 7) {
+      System.err.println("Please check command line arguments.");
+      System.err.println("Usage :");
+      System.err.println(HdfsSpoutTopology.class.toString() + " topologyName hdfsUri fileFormat
sourceDir sourceArchiveDir badDir destinationDir.");
+      System.err.println(" topologyName - topology name.");
+      System.err.println(" hdfsUri - hdfs name node URI");
+      System.err.println(" fileFormat -  Set to 'TEXT' for reading text files or 'SEQ' for
sequence files.");
+      System.err.println(" sourceDir  - read files from this HDFS dir using HdfsSpout.");
+      System.err.println(" archiveDir - after a file in sourceDir is read completely, it
is moved to this HDFS location.");
+      System.err.println(" badDir - files that cannot be read properly will be moved to this
HDFS location.");
+      System.err.println(" spoutCount - Num of spout instances.");
+      System.err.println();
+      System.exit(-1);
+    }
+
+    // 1 - parse cmd line args
+    String topologyName = args[0];
+    String hdfsUri = args[1];
+    String fileFormat = args[2];
+    String sourceDir = args[3];
+    String archiveDir = args[4];
+    String badDir = args[5];
+    int spoutNum = Integer.parseInt(args[6]);
+
+    // 2 - create and configure spout and bolt
+    ConstBolt bolt = new ConstBolt();
+
+    HdfsSpout spout =  new HdfsSpout().withOutputFields(TextFileReader.defaultFields)
+                                      .setReaderType(fileFormat)
+                                      .setHdfsUri(hdfsUri)
+                                      .setSourceDir(sourceDir)
+                                      .setArchiveDir(archiveDir)
+                                      .setBadFilesDir(badDir);
+
+    // 3 - Create and configure topology
+    Config conf = new Config();
+    conf.setNumWorkers(1);
+    conf.setNumAckers(1);
+    conf.setMaxTaskParallelism(1);
+    conf.setDebug(true);
+    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
+
+    TopologyBuilder builder = new TopologyBuilder();
+    builder.setSpout(SPOUT_ID, spout, spoutNum);
+    builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID);
+
+    // 4 - submit topology, wait for a few min and terminate it
+    Map clusterConf = Utils.readStormConfig();
+    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
+    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
+
+    // 5 - Print metrics every 30 sec, kill topology after 20 min
+    for (int i = 0; i < 40; i++) {
+      Thread.sleep(30 * 1000);
+      printMetrics(client, topologyName);
+    }
+    kill(client, topologyName);
+  } // main
+
+  private static void kill(Nimbus.Client client, String topologyName) throws Exception {
+    KillOptions opts = new KillOptions();
+    opts.set_wait_secs(0);
+    client.killTopologyWithOpts(topologyName, opts);
+  }
+
+  static void printMetrics(Nimbus.Client client, String name) throws Exception {
+    ClusterSummary summary = client.getClusterInfo();
+    String id = null;
+    for (TopologySummary ts: summary.get_topologies()) {
+      if (name.equals(ts.get_name())) {
+        id = ts.get_id();
+      }
+    }
+    if (id == null) {
+      throw new Exception("Could not find a topology named "+name);
+    }
+    TopologyInfo info = client.getTopologyInfo(id);
+    int uptime = info.get_uptime_secs();
+    long acked = 0;
+    long failed = 0;
+    double weightedAvgTotal = 0.0;
+    for (ExecutorSummary exec: info.get_executors()) {
+      if ("spout".equals(exec.get_component_id())) {
+        SpoutStats stats = exec.get_stats().get_specific().get_spout();
+        Map<String, Long> failedMap = stats.get_failed().get(":all-time");
+        Map<String, Long> ackedMap = stats.get_acked().get(":all-time");
+        Map<String, Double> avgLatMap = stats.get_complete_ms_avg().get(":all-time");
+        for (String key: ackedMap.keySet()) {
+          if (failedMap != null) {
+            Long tmp = failedMap.get(key);
+            if (tmp != null) {
+              failed += tmp;
+            }
+          }
+          long ackVal = ackedMap.get(key);
+          double latVal = avgLatMap.get(key) * ackVal;
+          acked += ackVal;
+          weightedAvgTotal += latVal;
+        }
+      }
+    }
+    double avgLatency = weightedAvgTotal/acked;
+    System.out.println("uptime: "+uptime+" acked: "+acked+" avgLatency: "+avgLatency+" acked/sec:
"+(((double)acked)/uptime+" failed: "+failed));
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/62f14b89/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java b/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
deleted file mode 100644
index 6c2fa68..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/HdfsSpoutTopology.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package storm.starter;
-
-import org.apache.storm.Config;
-import org.apache.storm.StormSubmitter;
-import org.apache.storm.generated.Nimbus;
-import org.apache.storm.hdfs.spout.TextFileReader;
-import org.apache.storm.metric.LoggingMetricsConsumer;
-import org.apache.storm.starter.FastWordCountTopology;
-import org.apache.storm.topology.TopologyBuilder;
-import org.apache.storm.utils.NimbusClient;
-import org.apache.storm.utils.Utils;
-import org.apache.storm.hdfs.spout.Configs;
-import org.apache.storm.hdfs.spout.HdfsSpout;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.topology.*;
-import org.apache.storm.tuple.*;
-import org.apache.storm.task.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-
-public class HdfsSpoutTopology {
-
-  public static final String SPOUT_ID = "hdfsspout";
-  public static final String BOLT_ID = "constbolt";
-
-
-  public static class ConstBolt extends BaseRichBolt {
-    private static final long serialVersionUID = -5313598399155365865L;
-    public static final String FIELDS = "message";
-    private OutputCollector collector;
-    private static final Logger log = LoggerFactory.getLogger(ConstBolt.class);
-    int count =0;
-
-    public ConstBolt() {
-    }
-
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-      this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple tuple) {
-      log.info("Received tuple : {}", tuple.getValue(0));
-      count++;
-      if(count==3) {
-        collector.fail(tuple);
-      }
-      else {
-        collector.ack(tuple);
-      }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-      declarer.declare(new Fields(FIELDS));
-    }
-  } // class
-
-  /** Copies text file content from sourceDir to destinationDir. Moves source files into
sourceDir after its done consuming */
-  public static void main(String[] args) throws Exception {
-    // 0 - validate args
-    if (args.length < 7) {
-      System.err.println("Please check command line arguments.");
-      System.err.println("Usage :");
-      System.err.println(HdfsSpoutTopology.class.toString() + " topologyName hdfsUri fileFormat
sourceDir sourceArchiveDir badDir destinationDir.");
-      System.err.println(" topologyName - topology name.");
-      System.err.println(" hdfsUri - hdfs name node URI");
-      System.err.println(" fileFormat -  Set to 'TEXT' for reading text files or 'SEQ' for
sequence files.");
-      System.err.println(" sourceDir  - read files from this HDFS dir using HdfsSpout.");
-      System.err.println(" archiveDir - after a file in sourceDir is read completely, it
is moved to this HDFS location.");
-      System.err.println(" badDir - files that cannot be read properly will be moved to this
HDFS location.");
-      System.err.println(" spoutCount - Num of spout instances.");
-      System.err.println();
-      System.exit(-1);
-    }
-
-    // 1 - parse cmd line args
-    String topologyName = args[0];
-    String hdfsUri = args[1];
-    String fileFormat = args[2];
-    String sourceDir = args[3];
-    String archiveDir = args[4];
-    String badDir = args[5];
-    int spoutNum = Integer.parseInt(args[6]);
-
-    // 2 - create and configure spout and bolt
-    ConstBolt bolt = new ConstBolt();
-
-    HdfsSpout spout =  new HdfsSpout().withOutputFields(TextFileReader.defaultFields)
-                                      .setReaderType(fileFormat)
-                                      .setHdfsUri(hdfsUri)
-                                      .setSourceDir(sourceDir)
-                                      .setArchiveDir(archiveDir)
-                                      .setBadFilesDir(badDir);
-
-    // 3 - Create and configure topology
-    Config conf = new Config();
-    conf.setNumWorkers(1);
-    conf.setNumAckers(1);
-    conf.setMaxTaskParallelism(1);
-    conf.setDebug(true);
-    conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
-
-    TopologyBuilder builder = new TopologyBuilder();
-    builder.setSpout(SPOUT_ID, spout, spoutNum);
-    builder.setBolt(BOLT_ID, bolt, 1).shuffleGrouping(SPOUT_ID);
-
-    // 4 - submit topology, wait for a few min and terminate it
-    Map clusterConf = Utils.readStormConfig();
-    StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
-    Nimbus.Client client = NimbusClient.getConfiguredClient(clusterConf).getClient();
-
-    // 5 - Print metrics every 30 sec, kill topology after 20 min
-    for (int i = 0; i < 40; i++) {
-      Thread.sleep(30 * 1000);
-      FastWordCountTopology.printMetrics(client, topologyName);
-    }
-    FastWordCountTopology.kill(client, topologyName);
-  } // main
-
-}


Mime
View raw message