storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [1/3] storm git commit: [STORM-533] Added in client and server IConnection metrics.
Date Mon, 17 Nov 2014 17:19:31 GMT
Repository: storm
Updated Branches:
  refs/heads/master 2dd7a9426 -> b8fceb875


[STORM-533] Added in client and server IConnection metrics.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29a8e2e7
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29a8e2e7
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29a8e2e7

Branch: refs/heads/master
Commit: 29a8e2e75fe9486151c5aa5dd8602e2817eab20a
Parents: c2fefbc
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Thu Oct 16 17:54:35 2014 +0000
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Fri Nov 14 15:47:37 2014 -0600

----------------------------------------------------------------------
 .../backtype/storm/daemon/builtin_metrics.clj   | 21 ++++-
 .../src/clj/backtype/storm/daemon/executor.clj  | 11 ++-
 .../src/clj/backtype/storm/daemon/worker.clj    | 12 ++-
 .../src/clj/backtype/storm/messaging/loader.clj | 13 ++-
 .../backtype/storm/messaging/netty/Client.java  | 40 ++++++++-
 .../backtype/storm/messaging/netty/Server.java  | 63 ++++++++++++-
 .../messaging/netty/StormServerHandler.java     |  2 +-
 .../testing/ForwardingMetricsConsumer.java      | 95 ++++++++++++++++++++
 8 files changed, 233 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
index b911642..23bde46 100644
--- a/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/builtin_metrics.clj
@@ -14,7 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns backtype.storm.daemon.builtin-metrics
-  (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer StateMetric])
+  (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer StateMetric
IMetric IStatefulObject])
   (:import [backtype.storm Config])
   (:use [backtype.storm.stats :only [stats-rate]]))
 
@@ -50,7 +50,24 @@
   (doseq [[kw imetric] builtin-metrics]
     (.registerMetric topology-context (str "__" (name kw)) imetric
                      (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
-          
+
+(defn register-iconnection-server-metric [server storm-conf topology-context]
+  (if (instance? IStatefulObject server)
+    (.registerMetric topology-context "__recv-iconnection" (StateMetric. server)
+                     (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn register-iconnection-client-metrics [node+port->socket-ref storm-conf topology-context]
+  (.registerMetric topology-context "__send-iconnection"
+    (reify IMetric
+      (^Object getValueAndReset [this]
+        (into {}
+          (map
+            (fn [[node+port ^IStatefulObject connection]] [node+port (.getState connection)])
+            (filter 
+              (fn [[node+port connection]] (instance? IStatefulObject connection))
+              @node+port->socket-ref)))))
+    (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))
+ 
 (defn register-queue-metrics [queues storm-conf topology-context]
   (doseq [[qname q] queues]
     (.registerMetric topology-context (str "__" (name qname)) (StateMetric. q)

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index 6ef0e75..be364cc 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -721,10 +721,13 @@
           (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
           (when (instance? ICredentialsListener bolt-obj) (.setCredentials bolt-obj initial-credentials))

           (if (= component-id Constants/SYSTEM_COMPONENT_ID)
-            (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
-                                                     :receive (:receive-queue executor-data)
-                                                     :transfer (:transfer-queue (:worker
executor-data))}
-                                                    storm-conf user-context)
+            (do
+              (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue
executor-data)
+                                                       :receive (:receive-queue executor-data)
+                                                       :transfer (:transfer-queue (:worker
executor-data))}
+                                                      storm-conf user-context)
+              (builtin-metrics/register-iconnection-client-metrics (:cached-node+port->socket
(:worker executor-data)) storm-conf user-context)
+              (builtin-metrics/register-iconnection-server-metric (:receiver (:worker executor-data))
storm-conf user-context))
             (builtin-metrics/register-queue-metrics {:sendqueue (:batch-transfer-queue executor-data)
                                                      :receive (:receive-queue executor-data)}
                                                     storm-conf user-context)

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 06b5967..e717ce4 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -202,12 +202,15 @@
                                (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)]
[t queue])))
                                (into {}))
 
-        topology (read-supervisor-topology conf storm-id)]
+        topology (read-supervisor-topology conf storm-id)
+        mq-context  (if mq-context
+                      mq-context
+                      (TransportFactory/makeContext storm-conf))]
+
     (recursive-map
       :conf conf
-      :mq-context (if mq-context
-                      mq-context
-                      (TransportFactory/makeContext storm-conf))
+      :mq-context mq-context
+      :receiver (.bind ^IContext mq-context storm-id port)
       :storm-id storm-id
       :assignment-id assignment-id
       :port port
@@ -348,6 +351,7 @@
   (log-message "Launching receive-thread for " (:assignment-id worker) ":" (:port worker))
   (msg-loader/launch-receive-thread!
     (:mq-context worker)
+    (:receiver worker)
     (:storm-id worker)
     (:receiver-thread-count worker)
     (:port worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj
index e13b5a8..e0ffeea 100644
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj
@@ -24,7 +24,7 @@
 (defn mk-local-context []
   (local/mk-context))
 
-(defn- mk-receive-thread [context storm-id port transfer-local-fn  daemon kill-fn priority
socket max-buffer-size thread-id]
+(defn- mk-receive-thread [storm-id port transfer-local-fn  daemon kill-fn priority socket
max-buffer-size thread-id]
     (async-loop
        (fn []
          (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id:
" thread-id  " ]")
@@ -54,20 +54,19 @@
          :priority priority
          :thread-name (str "worker-receiver-thread-" thread-id)))
 
-(defn- mk-receive-threads [context storm-id port transfer-local-fn  daemon kill-fn priority
socket max-buffer-size thread-count]
+(defn- mk-receive-threads [storm-id port transfer-local-fn  daemon kill-fn priority socket
max-buffer-size thread-count]
   (into [] (for [thread-id (range thread-count)] 
-             (mk-receive-thread context storm-id port transfer-local-fn  daemon kill-fn priority
socket max-buffer-size thread-id))))
+             (mk-receive-thread storm-id port transfer-local-fn  daemon kill-fn priority
socket max-buffer-size thread-id))))
 
 
 (defnk launch-receive-thread!
-  [context storm-id receiver-thread-count port transfer-local-fn max-buffer-size
+  [context socket storm-id receiver-thread-count port transfer-local-fn max-buffer-size
    :daemon true
    :kill-fn (fn [t] (System/exit 1))
    :priority Thread/NORM_PRIORITY]
   (let [max-buffer-size (int max-buffer-size)
-        socket (.bind ^IContext context storm-id port)
         thread-count (if receiver-thread-count receiver-thread-count 1)
-        vthreads (mk-receive-threads context storm-id port transfer-local-fn daemon kill-fn
priority socket max-buffer-size thread-count)]
+        vthreads (mk-receive-threads storm-id port transfer-local-fn daemon kill-fn priority
socket max-buffer-size thread-count)]
     (fn []
       (let [kill-socket (.connect ^IContext context storm-id "localhost" port)]
         (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
@@ -82,4 +81,4 @@
              (.join (vthreads thread-id)))
         
         (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
-        ))))
\ No newline at end of file
+        ))))

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 9a25bf7..d770481 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -18,14 +18,19 @@
 package backtype.storm.messaging.netty;
 
 import java.net.InetSocketAddress;
+import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.Random;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.jboss.netty.bootstrap.ClientBootstrap;
@@ -37,12 +42,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import backtype.storm.Config;
+import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
 
-public class Client implements IConnection {
+public class Client implements IConnection, IStatefulObject{
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
     private static final String PREFIX = "Netty-Client-";
     private final int max_retries;
@@ -53,6 +59,9 @@ public class Client implements IConnection {
     private final ClientBootstrap bootstrap;
     private InetSocketAddress remote_addr;
     
+    private AtomicInteger totalReconnects;
+    private AtomicInteger messagesSent;
+    private AtomicInteger messagesLostReconnect;
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
@@ -79,6 +88,9 @@ public class Client implements IConnection {
         closing = false;
         pendings = new AtomicLong(0);
         flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
+        totalReconnects = new AtomicInteger(0);
+        messagesSent = new AtomicInteger(0);
+        messagesLostReconnect = new AtomicInteger(0);
 
         // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -154,6 +166,7 @@ public class Client implements IConnection {
                 LOG.info("Reconnect started for {}... [{}]", name(), tried);
                 LOG.debug("connection started...");
 
+                totalReconnects.getAndIncrement();
                 ChannelFuture future = bootstrap.connect(remote_addr);
                 future.awaitUninterruptibly();
                 Channel current = future.getChannel();
@@ -324,13 +337,13 @@ public class Client implements IConnection {
         if (requests == null)
             return;
 
-        pendings.incrementAndGet();
+        pendings.getAndAdd(requests.size());
         ChannelFuture future = channel.write(requests);
         future.addListener(new ChannelFutureListener() {
             public void operationComplete(ChannelFuture future)
                     throws Exception {
 
-                pendings.decrementAndGet();
+                pendings.getAndAdd(0-requests.size());
                 if (!future.isSuccess()) {
                     LOG.info(
                             "failed to send requests to " + remote_addr.toString() + ": ",
future.getCause());
@@ -341,11 +354,32 @@ public class Client implements IConnection {
                         channel.close();
                         channelRef.compareAndSet(channel, null);
                     }
+                    messagesLostReconnect.getAndAdd(requests.size());
                 } else {
+                    messagesSent.getAndAdd(requests.size());
                     LOG.debug("{} request(s) sent", requests.size());
                 }
             }
         });
     }
+
+    @Override
+    public Object getState() {
+        LOG.info("Getting metrics for connection to "+remote_addr);
+        HashMap<String, Object> ret = new HashMap<String, Object>();
+        ret.put("reconnects", totalReconnects.getAndSet(0));
+        ret.put("sent", messagesSent.getAndSet(0));
+        ret.put("pending", pendings.get());
+        ret.put("lostOnSend", messagesLostReconnect.getAndSet(0));
+        ret.put("dest", remote_addr.toString());
+        Channel c = channelRef.get();
+        if (c != null) {
+            SocketAddress address = c.getLocalAddress();
+            if (address != null) {
+              ret.put("src", address.toString());
+            }
+        }
+        return ret;
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 4e106f0..4563042 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -24,6 +24,8 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
@@ -40,13 +42,18 @@ import org.slf4j.LoggerFactory;
 import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
+import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.Utils;
 
-class Server implements IConnection {
+class Server implements IConnection, IStatefulObject {
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
+    private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<String,
AtomicInteger>();
+    private final AtomicInteger messagesDequeued = new AtomicInteger(0);
+    private final AtomicInteger[] pendingMessages;
+    
     
     // Create multiple queues for incoming messages. The size equals the number of receiver
threads.
     // For message which is sent to same task, it will be stored in the same queue to preserve
the message order.
@@ -74,8 +81,10 @@ class Server implements IConnection {
         taskToQueueId = new HashMap<Integer, Integer>();
     
         message_queue = new LinkedBlockingQueue[queueCount];
+        pendingMessages = new AtomicInteger[queueCount];
         for (int i = 0; i < queueCount; i++) {
             message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+            pendingMessages[i] = new AtomicInteger(0);
         }
         
         // Configure the server.
@@ -150,17 +159,36 @@ class Server implements IConnection {
       return queueId;
     }
 
+    private void addReceiveCount(String from, int amount) {
+        //This is possibly lossy in the case where a value is deleted
+        // because it has received no messages over the metrics collection
+        // period and new messages are starting to come in.  This is
+        // because I don't want the overhead of a synchronize just to have
+        // the metric be absolutely perfect.
+        AtomicInteger i = messagesEnqueued.get(from);
+        if (i == null) {
+            i = new AtomicInteger(amount);
+            AtomicInteger prev = messagesEnqueued.putIfAbsent(from, i);
+            if (prev != null) {
+                prev.addAndGet(amount);
+            }
+        } else {
+            i.addAndGet(amount);
+        }
+    }
+
+
     /**
      * enqueue a received message 
      * @param message
      * @throws InterruptedException
      */
-    protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
+    protected void enqueue(List<TaskMessage> msgs, String from) throws InterruptedException
{
       
       if (null == msgs || msgs.size() == 0 || closing) {
         return;
       }
-      
+      addReceiveCount(from, msgs.size());
       ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
       
       if (null == messageGroups || closing) {
@@ -171,6 +199,7 @@ class Server implements IConnection {
         ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
         if (null != msgGroup) {
           message_queue[receiverId].put(msgGroup);
+          pendingMessages[receiverId].addAndGet(msgGroup.size());
         }
       }
     }
@@ -197,6 +226,8 @@ class Server implements IConnection {
         }
       
       if (null != ret) {
+        messagesDequeued.addAndGet(ret.size());
+        pendingMessages[queueId].addAndGet(0 - ret.size());
         return ret.iterator();
       }
       return null;
@@ -241,4 +272,30 @@ class Server implements IConnection {
     public String name() {
       return "Netty-server-localhost-" + port;
     }
+
+    @Override
+    public Object getState() {
+        LOG.info("Getting metrics for server on " + port);
+        HashMap<String, Object> ret = new HashMap<String, Object>();
+        ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
+        ArrayList<Integer> pending = new ArrayList<Integer>(pendingMessages.length);
+        for (AtomicInteger p: pendingMessages) {
+            pending.add(p.get());
+        }
+        ret.put("pending", pending);
+        HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+        Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
+        while (it.hasNext()) {
+            Map.Entry<String, AtomicInteger> ent = it.next();
+            //Yes we can delete something that is not 0 because of races, but that is OK
for metrics
+            AtomicInteger i = ent.getValue();
+            if (i.get() == 0) {
+                it.remove();
+            } else {
+                enqueued.put(ent.getKey(), i.getAndSet(0));
+            }
+        }
+        ret.put("enqueued", enqueued);
+        return ret;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index bf9b79e..6b71171 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -51,7 +51,7 @@ class StormServerHandler extends SimpleChannelUpstreamHandler  {
       }
       
       try {
-        server.enqueue(msgs);
+        server.enqueue(msgs, e.getRemoteAddress().toString());
       } catch (InterruptedException e1) {
         LOG.info("failed to enqueue a request message", e);
         failure_count.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/storm/blob/29a8e2e7/storm-core/src/jvm/backtype/storm/testing/ForwardingMetricsConsumer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/ForwardingMetricsConsumer.java b/storm-core/src/jvm/backtype/storm/testing/ForwardingMetricsConsumer.java
new file mode 100644
index 0000000..010336e
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/ForwardingMetricsConsumer.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.Socket;
+import java.util.Collection;
+import java.util.Map;
+import java.io.OutputStream;
+
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+
+/*
+ * Listens for all metrics, dumps them as text to a configured host:port
+ *
+ * To use, add this to your topology's configuration:
+ *   conf.registerMetricsConsumer(backtype.storm.testing.ForwardingMetricsConsumer.class,
"<HOST>:<PORT>", 1);
+ *
+ * Or edit the storm.yaml config file:
+ *
+ *   topology.metrics.consumer.register:
+ *     - class: "backtype.storm.testing.ForwardingMetricsConsumer"
+ *     - argument: "example.com:9999"
+ *       parallelism.hint: 1
+ *
+ */
+public class ForwardingMetricsConsumer implements IMetricsConsumer {
+    String host;
+    int port;
+    Socket socket;
+    OutputStream out;
+
+    @Override
+    public void prepare(Map stormConf, Object registrationArgument, TopologyContext context,
IErrorReporter errorReporter) {
+        String [] parts = ((String)registrationArgument).split(":",2);
+        host = parts[0];
+        port = Integer.valueOf(parts[1]);
+        try {
+          socket = new Socket(host, port);
+          out = socket.getOutputStream();
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints)
{
+        StringBuilder sb = new StringBuilder();
+        String header = taskInfo.timestamp + "\t" +
+            taskInfo.srcWorkerHost + ":"+ taskInfo.srcWorkerPort + "\t"+
+            taskInfo.srcTaskId + "\t" + taskInfo.srcComponentId + "\t";
+        sb.append(header);
+        for (DataPoint p : dataPoints) {
+            sb.delete(header.length(), sb.length());
+            sb.append(p.name)
+                .append("\t")
+                .append(p.value)
+                .append("\n");
+            try {
+              out.write(sb.toString().getBytes());
+              out.flush();
+            } catch (Exception e) {
+              throw new RuntimeException(e);
+            }
+        }
+    }
+
+    @Override
+    public void cleanup() { 
+      try {
+        socket.close();
+      } catch (Exception e) {
+        throw new RuntimeException(e);
+      }
+    }
+}


Mime
View raw message