storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: STORM-2320: DRPC client printer class reusable for local and remote DRPC
Date Wed, 08 Feb 2017 05:03:22 GMT
Repository: storm
Updated Branches:
  refs/heads/1.x-branch 090d396ba -> 9e06883b9


STORM-2320: DRPC client printer class reusable for local and remote DRPC

  - Client necessary to check for DRPC results while running in distribute mode


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7f63a73b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7f63a73b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7f63a73b

Branch: refs/heads/1.x-branch
Commit: 7f63a73b7718fc27165bd7b80ed625235a07cec6
Parents: 090d396
Author: Hugo Louro <hmclouro@gmail.com>
Authored: Fri Dec 23 17:09:06 2016 -0800
Committer: Hugo Louro <hmclouro@gmail.com>
Committed: Tue Feb 7 18:38:00 2017 -0800

----------------------------------------------------------------------
 .../TridentKafkaClientWordCountNamedTopics.java | 15 +++-
 .../storm/kafka/trident/DrpcResultsPrinter.java | 85 ++++++++++++++++++++
 .../trident/TridentKafkaConsumerTopology.java   | 13 ++-
 .../kafka/trident/TridentKafkaWordCount.java    |  6 +-
 4 files changed, 114 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7f63a73b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
index 5861363..83d6884 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientWordCountNamedTopics.java
@@ -33,6 +33,12 @@ import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
 import java.io.Serializable;
 import java.util.Arrays;
 import java.util.List;
@@ -59,7 +65,7 @@ public class TridentKafkaClientWordCountNamedTopics {
         public List<Object> apply(ConsumerRecord<String, String> record) {
             return new Values(record.value());
         }
-    }
+    };
 
     protected KafkaSpoutConfig<String,String> newKafkaSpoutConfig() {
         return KafkaSpoutConfig.builder(KAFKA_LOCAL_BROKER, TOPIC_1, TOPIC_2)
@@ -82,7 +88,7 @@ public class TridentKafkaClientWordCountNamedTopics {
         new TridentKafkaClientWordCountNamedTopics().run(args);
     }
 
-    protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException,
AuthorizationException {
+    protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException,
AuthorizationException, InterruptedException {
         if (args.length > 0 && Arrays.binarySearch(args, "-h") >= 0) {
             System.out.printf("Usage: java %s [%s] [%s] [%s] [%s]\n", getClass().getName(),
                     "broker_host:broker_port", "topic1", "topic2", "topology_name");
@@ -101,6 +107,11 @@ public class TridentKafkaClientWordCountNamedTopics {
                 StormSubmitter.submitTopology(topic2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl,
topic2));
                 // Consumer
                 StormSubmitter.submitTopology("topics-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque()));
+
+                // Print results to console, which also causes the print filter in the consumer
topology to print the results in the worker log
+                Thread.sleep(2000);
+                DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
+
             } else { //Submit Local
 
                 final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();

http://git-wip-us.apache.org/repos/asf/storm/blob/7f63a73b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
new file mode 100644
index 0000000..f71e2df
--- /dev/null
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/DrpcResultsPrinter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.LocalDRPC;
+import org.apache.storm.generated.DistributedRPC;
+import org.apache.storm.thrift.transport.TTransportException;
+import org.apache.storm.utils.DRPCClient;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class DrpcResultsPrinter {
+    private static final Logger LOG = LoggerFactory.getLogger(DrpcResultsPrinter.class);
+
+    private final DistributedRPC.Iface drpcClient;
+
+    public DrpcResultsPrinter(DistributedRPC.Iface drpcClient) {
+        this.drpcClient = drpcClient;
+    }
+
+    /**
+     * @return local DRPC client running on the same JVML
+     */
+    public static DrpcResultsPrinter localClient() {
+        return new DrpcResultsPrinter(new LocalDRPC());
+    }
+
+    /**
+     * @return remote DRPC client running on local host, on port 3772, with defaults.yaml
config
+     */
+    public static DrpcResultsPrinter remoteClient() {
+        return remoteClient(Utils.readDefaultConfig(), "localhost", 3772);
+    }
+
+    /**
+     * @return remote DRPC client running on the specified host, port, with the provided
config
+     */
+    public static DrpcResultsPrinter remoteClient(Map<String, Object> config, String
host, int port) {
+        try {
+            return new DrpcResultsPrinter(new DRPCClient(config, host,port));
+        } catch (TTransportException e) {
+            throw new RuntimeException(String.format("DRPC Client failed to connect to DRPC
server. " +
+                    "[host = %s], [port = %s], [config = %s]", host, port, config));
+        }
+    }
+
+    /**
+     * Prints the DRPC results for the number of times specified, sleeping the specified
time in between prints
+     */
+    public void printResults(int num, int sleepTime, TimeUnit sleepUnit) {
+        for (int i = 0; i < num; i++) {
+            try {
+                LOG.info("--- DRPC RESULT: " + drpcClient.execute("words", "the and apple
snow jumped"));
+                System.out.println();
+                Thread.sleep(sleepUnit.toMillis(sleepTime));
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7f63a73b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
index 1e7914e..a39eba1 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaConsumerTopology.java
@@ -24,6 +24,7 @@ import org.apache.storm.starter.trident.DebugMemoryMapState;
 import org.apache.storm.trident.Stream;
 import org.apache.storm.trident.TridentState;
 import org.apache.storm.trident.TridentTopology;
+import org.apache.storm.trident.operation.BaseFilter;
 import org.apache.storm.trident.operation.builtin.Count;
 import org.apache.storm.trident.operation.builtin.Debug;
 import org.apache.storm.trident.operation.builtin.FilterNull;
@@ -31,6 +32,7 @@ import org.apache.storm.trident.operation.builtin.MapGet;
 import org.apache.storm.trident.spout.ITridentDataSource;
 import org.apache.storm.trident.testing.MemoryMapState;
 import org.apache.storm.trident.testing.Split;
+import org.apache.storm.trident.tuple.TridentTuple;
 import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -56,13 +58,20 @@ public class TridentKafkaConsumerTopology {
         return tridentTopology.build();
     }
 
-    private static Stream addDRPCStream(TridentTopology tridentTopology, TridentState state,
LocalDRPC drpc) {
+    private static Stream addDRPCStream(TridentTopology tridentTopology, final TridentState
state, LocalDRPC drpc) {
         return tridentTopology.newDRPCStream("words", drpc)
                 .each(new Fields("args"), new Split(), new Fields("word"))
                 .groupBy(new Fields("word"))
                 .stateQuery(state, new Fields("word"), new MapGet(), new Fields("count"))
                 .each(new Fields("count"), new FilterNull())
-                .project(new Fields("word", "count"));
+                .project(new Fields("word", "count"))
+                .filter(new BaseFilter() {
+                    @Override
+                    public boolean isKeep(TridentTuple tuple) {
+                        LOG.debug("DRPC RESULT: " + tuple);  // Used to show the DRPC results
in the worker log. Useful for debugging
+                        return true;
+                    }
+                });
     }
 
     private static TridentState addTridentState(TridentTopology tridentTopology, ITridentDataSource
tridentSpout) {

http://git-wip-us.apache.org/repos/asf/storm/blob/7f63a73b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
index d1de367..84dc380 100644
--- a/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
+++ b/examples/storm-kafka-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaWordCount.java
@@ -85,6 +85,10 @@ public class TridentKafkaWordCount implements Serializable {
             // Consumer
             StormSubmitter.submitTopology(args[2] + "-consumer", tpConf, TridentKafkaConsumerTopology.newTopology(
                     new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
+
+            // Print results to console, which also causes the print filter in the consumer
topology to print the results in the worker log
+            Thread.sleep(2000);
+            DrpcResultsPrinter.remoteClient().printResults(60, 1, TimeUnit.SECONDS);
         } else { //Submit Local
             final LocalSubmitter localSubmitter = LocalSubmitter.newInstance();
             final String prodTpName = "kafkaBolt";
@@ -98,7 +102,7 @@ public class TridentKafkaWordCount implements Serializable {
                         new TransactionalTridentKafkaSpout(newTridentKafkaConfig(zkBrokerUrl[0]))));
 
                 // print
-                localSubmitter.printResults(60, 1, TimeUnit.SECONDS);
+                new DrpcResultsPrinter(localSubmitter.getDrpc()).printResults(60, 1, TimeUnit.SECONDS);
             } finally {
                 // kill
                 localSubmitter.kill(prodTpName);


Mime
View raw message