storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [01/32] git commit: 1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads
Date Mon, 09 Jun 2014 13:48:06 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/master dd1d21360 -> 1a57fcf6b


1. Async netty message transfer 2. Batch send and batch receive api and implementation 3. allow to configure the number of receiver thread 4. name the threads


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/861a92ea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/861a92ea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/861a92ea

Branch: refs/heads/master
Commit: 861a92eab8740cfc0f83ac4d7ade9a2ab04a8b9b
Parents: 22215b5
Author: Sean Zhong <clockfly@gmail.com>
Authored: Wed May 7 11:10:07 2014 +0800
Committer: Sean Zhong <clockfly@gmail.com>
Committed: Wed May 7 11:10:07 2014 +0800

----------------------------------------------------------------------
 .../src/clj/backtype/storm/daemon/executor.clj  |   1 +
 .../src/clj/backtype/storm/daemon/worker.clj    |  50 +--
 storm-core/src/clj/backtype/storm/disruptor.clj |   9 +-
 .../src/clj/backtype/storm/messaging/loader.clj |  81 ++--
 .../src/clj/backtype/storm/messaging/local.clj  |  30 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  25 ++
 .../backtype/storm/messaging/IConnection.java   |  21 +-
 .../backtype/storm/messaging/netty/Client.java  | 379 +++++++++++--------
 .../backtype/storm/messaging/netty/Context.java |  11 +-
 .../storm/messaging/netty/ControlMessage.java   |   6 +-
 .../storm/messaging/netty/MessageBatch.java     |   5 +
 .../storm/messaging/netty/MessageDecoder.java   | 107 ++++--
 .../backtype/storm/messaging/netty/Server.java  | 142 ++++++-
 .../messaging/netty/StormClientHandler.java     |  52 +--
 .../messaging/netty/StormServerHandler.java     |  40 +-
 .../backtype/storm/utils/DisruptorQueue.java    |   9 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |  63 ++-
 .../storm/messaging/netty_unit_test.clj         |  46 ++-
 18 files changed, 692 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index f133a1b..390bba8 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -204,6 +204,7 @@
         storm-conf (normalized-component-conf (:storm-conf worker) worker-context component-id)
         executor-type (executor-type worker-context component-id)
         batch-transfer->worker (disruptor/disruptor-queue
+                                  (str "executor"  executor-id "-send-queue")
                                   (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   :claim-strategy :single-threaded
                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index 0d1f6c6..2648237 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -18,8 +18,10 @@
   (:use [backtype.storm bootstrap])
   (:require [backtype.storm.daemon [executor :as executor]])
   (:import [java.util.concurrent Executors])
+  (:import [java.util ArrayList HashMap])
+  (:import [backtype.storm.utils TransferDrainer])
   (:import [backtype.storm.messaging TransportFactory])
-  (:import [backtype.storm.messaging IContext IConnection])
+  (:import [backtype.storm.messaging TaskMessage IContext IConnection])
   (:gen-class))
 
 (bootstrap)
@@ -109,25 +111,30 @@
 (defn mk-transfer-fn [worker]
   (let [local-tasks (-> worker :task-ids set)
         local-transfer (:transfer-local-fn worker)
-        ^DisruptorQueue transfer-queue (:transfer-queue worker)]
+        ^DisruptorQueue transfer-queue (:transfer-queue worker)
+        task->node+port (:cached-task->node+port worker)]
     (fn [^KryoTupleSerializer serializer tuple-batch]
       (let [local (ArrayList.)
-            remote (ArrayList.)]
+            remoteMap (HashMap.)]
         (fast-list-iter [[task tuple :as pair] tuple-batch]
           (if (local-tasks task)
             (.add local pair)
-            (.add remote pair)
-            ))
+            (let [node+port (get @task->node+port task)]
+              (when (not (.get remoteMap node+port))
+                (.put remoteMap node+port (ArrayList.)))
+              (let [remote (.get remoteMap node+port)]
+                (.add remote (TaskMessage. task (.serialize serializer tuple)))
+                 ))))
+        
         (local-transfer local)
-        ;; not using map because the lazy seq shows up in perf profiles
-        (let [serialized-pairs (fast-list-for [[task ^TupleImpl tuple] remote] [task (.serialize serializer tuple)])]
-          (disruptor/publish transfer-queue serialized-pairs)
-          )))))
+        (disruptor/publish transfer-queue remoteMap)
+          ))))
 
 (defn- mk-receive-queue-map [storm-conf executors]
   (->> executors
        ;; TODO: this depends on the type of executor
-       (map (fn [e] [e (disruptor/disruptor-queue (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
+       (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
+                                                  (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))]))
        (into {})
        ))
@@ -169,7 +176,7 @@
         storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
         storm-conf (read-supervisor-storm-conf conf storm-id)
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port))
-        transfer-queue (disruptor/disruptor-queue (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
+        transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   :wait-strategy (storm-conf TOPOLOGY-DISRUPTOR-WAIT-STRATEGY))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors)
         
@@ -218,6 +225,7 @@
       :default-shared-resources (mk-default-resources <>)
       :user-shared-resources (mk-user-resources <>)
       :transfer-local-fn (mk-transfer-local-fn <>)
+      :receiver-thread-count (get storm-conf WORKER-RECEIVER-THREAD-COUNT)
       :transfer-fn (mk-transfer-fn <>)
       )))
 
@@ -296,28 +304,19 @@
 ;; TODO: consider having a max batch size besides what disruptor does automagically to prevent latency issues
 (defn mk-transfer-tuples-handler [worker]
   (let [^DisruptorQueue transfer-queue (:transfer-queue worker)
-        drainer (ArrayList.)
+        drainer (TransferDrainer.)
         node+port->socket (:cached-node+port->socket worker)
         task->node+port (:cached-task->node+port worker)
         endpoint-socket-lock (:endpoint-socket-lock worker)
         ]
     (disruptor/clojure-handler
       (fn [packets _ batch-end?]
-        (.addAll drainer packets)
+        (.add drainer packets)
+        
         (when batch-end?
           (read-locked endpoint-socket-lock
-            (let [node+port->socket @node+port->socket
-                  task->node+port @task->node+port]
-              ;; consider doing some automatic batching here (would need to not be serialized at this point to remove per-tuple overhead)
-              ;; try using multipart messages ... first sort the tuples by the target node (without changing the local ordering)
-            
-              (fast-list-iter [[task ser-tuple] drainer]
-                ;; TODO: consider write a batch of tuples here to every target worker  
-                ;; group by node+port, do multipart send              
-                (let [node-port (get task->node+port task)]
-                  (when node-port
-                    (.send ^IConnection (get node+port->socket node-port) task ser-tuple))
-                    ))))
+            (let [node+port->socket @node+port->socket]
+              (.send drainer node+port->socket)))
           (.clear drainer))))))
 
 (defn launch-receive-thread [worker]
@@ -325,6 +324,7 @@
   (msg-loader/launch-receive-thread!
     (:mq-context worker)
     (:storm-id worker)
+    (:receiver-thread-count worker)
     (:port worker)
     (:transfer-local-fn worker)
     (-> worker :storm-conf (get TOPOLOGY-RECEIVER-BUFFER-SIZE))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/disruptor.clj b/storm-core/src/clj/backtype/storm/disruptor.clj
index 9456d1a..28393eb 100644
--- a/storm-core/src/clj/backtype/storm/disruptor.clj
+++ b/storm-core/src/clj/backtype/storm/disruptor.clj
@@ -47,8 +47,9 @@
 ;; This would manifest itself in Trident when doing 1 batch at a time processing, and the ack_init message
 ;; wouldn't make it to the acker until the batch timed out and another tuple was played into the queue, 
 ;; unblocking the consumer
-(defnk disruptor-queue [buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
-  (DisruptorQueue. ((CLAIM-STRATEGY claim-strategy) buffer-size)
+(defnk disruptor-queue [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
+  (DisruptorQueue. queue-name
+                   ((CLAIM-STRATEGY claim-strategy) buffer-size)
                    (mk-wait-strategy wait-strategy)
                    ))
 
@@ -89,7 +90,7 @@
                 (consume-batch-when-available queue handler)
                 0 )
               :kill-fn kill-fn
-              :thread-name thread-name
+              :thread-name (.getName queue)
               )]
      (consumer-started! queue)
      ret
@@ -97,5 +98,5 @@
 
 (defmacro consume-loop [queue & handler-args]
   `(let [handler# (handler ~@handler-args)]
-     (consume-loop* ~queue handler#)
+     (consume-loop* ~queue handler# :thread-name (.getName queue))
      ))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/messaging/loader.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/loader.clj b/storm-core/src/clj/backtype/storm/messaging/loader.clj
index 9e43c26..e13b5a8 100644
--- a/storm-core/src/clj/backtype/storm/messaging/loader.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/loader.clj
@@ -15,7 +15,7 @@
 ;; limitations under the License.
 (ns backtype.storm.messaging.loader
   (:use [backtype.storm util log])
-  (:import [java.util ArrayList])
+  (:import [java.util ArrayList Iterator])
   (:import [backtype.storm.messaging IContext IConnection TaskMessage])
   (:import [backtype.storm.utils DisruptorQueue MutableObject])
   (:require [backtype.storm.messaging [local :as local]])
@@ -24,45 +24,62 @@
 (defn mk-local-context []
   (local/mk-context))
 
+(defn- mk-receive-thread [context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-id]
+    (async-loop
+       (fn []
+         (log-message "Starting receive-thread: [stormId: " storm-id ", port: " port ", thread-id: " thread-id  " ]")
+         (fn []
+           (let [batched (ArrayList.)
+                 ^Iterator iter (.recv ^IConnection socket 0 thread-id)
+                 closed (atom false)]
+             (when iter
+               (while (and (not @closed) (.hasNext iter)) 
+                  (let [packet (.next iter)
+                        task (if packet (.task ^TaskMessage packet))
+                        message (if packet (.message ^TaskMessage packet))]
+                      (if (= task -1)
+                         (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
+                           (.close socket)
+                           (reset! closed  true))
+                         (when packet (.add batched [task message]))))))
+             
+             (when (not @closed)
+               (do
+                 (if (> (.size batched) 0)
+                   (transfer-local-fn batched))
+                 0)))))
+         :factory? true
+         :daemon daemon
+         :kill-fn kill-fn
+         :priority priority
+         :thread-name (str "worker-receiver-thread-" thread-id)))
+
+(defn- mk-receive-threads [context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-count]
+  (into [] (for [thread-id (range thread-count)] 
+             (mk-receive-thread context storm-id port transfer-local-fn  daemon kill-fn priority socket max-buffer-size thread-id))))
+
+
 (defnk launch-receive-thread!
-  [context storm-id port transfer-local-fn max-buffer-size
+  [context storm-id receiver-thread-count port transfer-local-fn max-buffer-size
    :daemon true
    :kill-fn (fn [t] (System/exit 1))
    :priority Thread/NORM_PRIORITY]
   (let [max-buffer-size (int max-buffer-size)
-        vthread (async-loop
-                 (fn []
-                   (let [socket (.bind ^IContext context storm-id port)]
-                     (fn []
-                       (let [batched (ArrayList.)
-                             init (.recv ^IConnection socket 0)]
-                         (loop [packet init]
-                           (let [task (if packet (.task ^TaskMessage packet))
-                                 message (if packet (.message ^TaskMessage packet))]
-                             (if (= task -1)
-                               (do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
-                                 (.close socket)
-                                 nil )
-                               (do
-                                 (when packet (.add batched [task message]))
-                                 (if (and packet (< (.size batched) max-buffer-size))
-                                   (recur (.recv ^IConnection socket 1))
-                                   (do (transfer-local-fn batched)
-                                     0 ))))))))))
-                 :factory? true
-                 :daemon daemon
-                 :kill-fn kill-fn
-                 :priority priority)]
+        socket (.bind ^IContext context storm-id port)
+        thread-count (if receiver-thread-count receiver-thread-count 1)
+        vthreads (mk-receive-threads context storm-id port transfer-local-fn daemon kill-fn priority socket max-buffer-size thread-count)]
     (fn []
       (let [kill-socket (.connect ^IContext context storm-id "localhost" port)]
         (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
         (.send ^IConnection kill-socket
-                  -1
-                  (byte-array []))
-        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
-        (.join vthread)
+                  -1 (byte-array []))
+        
         (.close ^IConnection kill-socket)
+        
+        (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
+        
+        (for [thread-id (range thread-count)] 
+             (.join (vthreads thread-id)))
+        
         (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
-        ))))
-
-
+        ))))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/clj/backtype/storm/messaging/local.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/local.clj b/storm-core/src/clj/backtype/storm/messaging/local.clj
index bf4d5b2..de14806 100644
--- a/storm-core/src/clj/backtype/storm/messaging/local.clj
+++ b/storm-core/src/clj/backtype/storm/messaging/local.clj
@@ -18,7 +18,8 @@
   (:use [backtype.storm log])
   (:import [backtype.storm.messaging IContext IConnection TaskMessage])
   (:import [java.util.concurrent LinkedBlockingQueue])
-  (:import [java.util Map])
+  (:import [java.util Map Iterator])
+  (:import [java.util Iterator ArrayList])
   (:gen-class))
 
 (defn add-queue! [queues-map lock storm-id port]
@@ -30,16 +31,35 @@
 
 (deftype LocalConnection [storm-id port queues-map lock queue]
   IConnection
-  (^TaskMessage recv [this ^int flags]
+  (^Iterator recv [this ^int flags]
     (when-not queue
       (throw (IllegalArgumentException. "Cannot receive on this socket")))
-    (if (= flags 1)
-      (.poll queue)
-      (.take queue)))
+    (let [ret (ArrayList.)
+          msg (if (= flags 1) (.poll queue) (.take queue))]
+      (if msg
+        (do 
+          (.add ret msg)
+          (.iterator ret))
+        nil)))
+  (^Iterator recv [this ^int flags ^int clientId]
+    (when-not queue
+      (throw (IllegalArgumentException. "Cannot receive on this socket")))
+    (let [ret (ArrayList.)
+          msg (if (= flags 1) (.poll queue) (.take queue))]
+      (if msg
+        (do 
+          (.add ret msg)
+          (.iterator ret))
+        nil)))
   (^void send [this ^int taskId ^bytes payload]
     (let [send-queue (add-queue! queues-map lock storm-id port)]
       (.put send-queue (TaskMessage. taskId payload))
       ))
+  (^void send [this ^Iterator iter]
+    (let [send-queue (add-queue! queues-map lock storm-id port)]
+      (while (.hasNext iter) 
+         (.put send-queue (.next iter)))
+      ))
   (^void close [this]
     ))
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 1fc0d78..98718a3 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -84,8 +84,27 @@ public class Config extends HashMap<String, Object> {
      */
     public static final String STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS = "storm.messaging.netty.client_worker_threads"; 
     public static final Object STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS_SCHEMA = Number.class;
+    
+    /**
+     * If the Netty messaging layer is busy, the Netty client will try to batch message as more as possible up to the size of STORM_NETTY_MESSAGE_BATCH_SIZE bytes
+     */
+    public static final String STORM_NETTY_MESSAGE_BATCH_SIZE = "netty.transfer.batch.size";
+    public static final Object STORM_NETTY_MESSAGE_BATCH_SIZE_SCHEMA = Number.class;
 
     /**
+     * This control whether we do Netty message transfer in a synchronized way or async way. 
+     */
+    public static final String STORM_NETTY_BLOCKING = "netty.blocking";
+    public static final Object STORM_NETTY_BLOCKING_SCHEMA = Boolean.class;
+    
+    /**
+     * We check with this interval that whether the Netty channel is writable and try to write pending messages
+     */
+    public static final String STORM_NETTY_FLUSH_CHECK_INTERVAL_MS = "netty.flush.check.interval.ms";
+    public static final Object STORM_NETTY_FLUSH_CHECK_INTERVAL_MS_SCHEMA = Number.class;
+    
+    
+    /**
      * A list of hosts of ZooKeeper servers used to manage the cluster.
      */
     public static final String STORM_ZOOKEEPER_SERVERS = "storm.zookeeper.servers";
@@ -462,6 +481,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object WORKER_CHILDOPTS_SCHEMA = ConfigValidation.StringOrStringListValidator;
 
     /**
+     * control how many worker receiver threads we need per worker
+     */
+    public static final String WORKER_RECEIVER_THREAD_COUNT = "worker.receiver.thread.count";
+    public static final Object WORKER_RECEIVER_THREAD_COUNT_SCHEMA = Number.class;
+    
+    /**
      * How often this worker should heartbeat to the supervisor.
      */
     public static final String WORKER_HEARTBEAT_FREQUENCY_SECS = "worker.heartbeat.frequency.secs";

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
index 41ae3f5..fe9caa7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/IConnection.java
@@ -17,13 +17,23 @@
  */
 package backtype.storm.messaging;
 
+import java.util.Iterator;
+
 public interface IConnection {   
     /**
-     * receive a message (consists taskId and payload)
+     * receive a batch message iterator (consists taskId and payload)
      * @param flags 0: block, 1: non-block
      * @return
      */
-    public TaskMessage recv(int flags);
+    public Iterator<TaskMessage> recv(int flags);
+    
+    /**
+     * receive a batch message iterator (consists taskId and payload)
+     * @param flags 0: block, 1: non-block
+     * @return
+     */
+    public Iterator<TaskMessage> recv(int flags, int clientId);
+    
     /**
      * send a message with taskId and payload
      * @param taskId task ID
@@ -32,6 +42,13 @@ public interface IConnection {
     public void send(int taskId,  byte[] payload);
     
     /**
+     * send batch messages
+     * @param msgs
+     */
+
+    public void send(Iterator<TaskMessage> msgs);
+    
+    /**
      * close this connection
      */
     public void close();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 6996b49..09b045b 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -21,52 +21,53 @@ import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.Utils;
-
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-class Client implements IConnection {
+public class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private static final Timer TIMER = new Timer("netty-client-timer", true);
-
+    private static final String PREFIX = "Netty-Client-";
     private final int max_retries;
     private final long base_sleep_ms;
     private final long max_sleep_ms;
-    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
     private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    InetSocketAddress remote_addr;
-    private AtomicInteger retries;
+    private InetSocketAddress remote_addr;
+    
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
-    private final AtomicBoolean being_closed;
-    private boolean wait_for_requests;
+    private boolean closing;
+
+    private Integer messageBatchSize;
+    private Boolean blocking = false;
+    
+    private AtomicLong pendings;
+
+    MessageBatch messageBatch = null;
+    private AtomicLong flushCheckTimer;
+    private int flushCheckInterval;
 
     @SuppressWarnings("rawtypes")
     Client(Map storm_conf, ChannelFactory factory, String host, int port) {
         this.factory = factory;
-        message_queue = new LinkedBlockingQueue<Object>();
-        retries = new AtomicInteger(0);
         channelRef = new AtomicReference<Channel>(null);
-        being_closed = new AtomicBoolean(false);
-        wait_for_requests = false;
+        closing = false;
+        pendings = new AtomicLong(0);
+        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
 
         // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -74,6 +75,14 @@ class Client implements IConnection {
         base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 
+        this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        blocking = Utils.getBoolean(storm_conf.get(Config.STORM_NETTY_BLOCKING), false);
+        
+        flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
+
+        LOG.info("New Netty Client, connect to " + host + ", " + port
+                + ", config: " + ", buffer_size: " + buffer_size);
+
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", buffer_size);
@@ -84,43 +93,87 @@ class Client implements IConnection {
 
         // Start the connection attempt.
         remote_addr = new InetSocketAddress(host, port);
-        bootstrap.connect(remote_addr);
+        
+        Thread flushChecker = new Thread(new Runnable() {
+            @Override
+            public void run() {
+                //make sure we have a connection
+                connect();
+                
+                while(!closing) {
+                    long flushCheckTime = flushCheckTimer.get();
+                    long now = System.currentTimeMillis();
+                    if (now > flushCheckTime) {
+                        Channel channel = channelRef.get();
+                        if (null != channel && channel.isWritable()) {
+                            flush();
+                        }
+                    }
+                    try {
+                        Thread.sleep(flushCheckInterval);
+                    } catch (InterruptedException e) {
+                        break;
+                    }
+                }
+                
+            }
+        });
+        
+        flushChecker.setDaemon(true);
+        flushChecker.start();
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    void reconnect() {
-        close_n_release();
-
-        //reconnect only if it's not being closed
-        if (being_closed.get()) return;
-
-        final int tried_count = retries.incrementAndGet();
-        if (tried_count <= max_retries) {
-            long sleep = getSleepTimeMs();
-            LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
-            TIMER.schedule(new TimerTask() {
-                @Override
-                public void run() { 
-                    LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
-                    bootstrap.connect(remote_addr);
-                }}, sleep);
-        } else {
-            LOG.warn(remote_addr+" is not reachable. We will close this client.");
-            close();
+    private synchronized void connect() {
+        try {
+            if (channelRef.get() != null) {
+                return;
+            }
+            
+            Channel channel = null;
+
+            int tried = 0;
+            while (tried <= max_retries) {
+
+                LOG.info("Reconnect started for {}... [{}]", name(), tried);
+                LOG.debug("connection started...");
+
+                ChannelFuture future = bootstrap.connect(remote_addr);
+                future.awaitUninterruptibly();
+                Channel current = future.getChannel();
+                if (!future.isSuccess()) {
+                    if (null != current) {
+                        current.close();
+                    }
+                } else {
+                    channel = current;
+                    break;
+                }
+                Thread.sleep(getSleepTimeMs(tried));
+                tried++;  
+            }
+            if (null != channel) {
+                LOG.info("connection established to a remote host " + name() + ", " + channel.toString());
+                channelRef.set(channel);
+            } else {
+                close();
+                throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("connection failed " + name(), e);
         }
     }
 
     /**
      * # of milliseconds to wait per exponential back-off policy
      */
-    private long getSleepTimeMs()
-    {
-        if (retries.get() > 30) {
+    private long getSleepTimeMs(int retries) {
+        if (retries > 30) {
            return max_sleep_ms;
         }
-        int backoff = 1 << retries.get();
+        int backoff = 1 << retries;
         long sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
         if ( sleepMs > max_sleep_ms )
             sleepMs = max_sleep_ms;
@@ -128,133 +181,103 @@ class Client implements IConnection {
     }
 
     /**
-     * Enqueue a task message to be sent to server
+     * Enqueue task messages to be sent to server
      */
-    public void send(int task, byte[] message) {
-        //throw exception if the client is being closed
-        if (being_closed.get()) {
+    synchronized public void send(Iterator<TaskMessage> msgs) {
+
+        // throw exception if the client is being closed
+        if (closing) {
             throw new RuntimeException("Client is being closed, and does not take requests any more");
         }
-
-        try {
-            message_queue.put(new TaskMessage(task, message));
-
-            //resume delivery if it is waiting for requests
-            tryDeliverMessages(true);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+        
+        if (null == msgs || !msgs.hasNext()) {
+            return;
         }
-    }
 
-    /**
-     * Retrieve messages from queue, and delivery to server if any
-     */
-    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
-        //just skip if delivery only if waiting, and we are not waiting currently
-        if (only_if_waiting && !wait_for_requests)  return;
-
-        //make sure that channel was not closed
         Channel channel = channelRef.get();
-        if (channel == null)  return;
-        if (!channel.isOpen()) {
-            LOG.info("Channel to {} is no longer open.",remote_addr);
-            //The channel is not open yet. Reconnect?
-            reconnect();
-            return;
+        if (null == channel) {
+            connect();
+            channel = channelRef.get();
         }
 
-        final MessageBatch requests = tryTakeMessages();
-        if (requests==null) {
-            wait_for_requests = true;
-            return;
-        }
+        while (msgs.hasNext()) {
+            TaskMessage message = msgs.next();
+            if (null == messageBatch) {
+                messageBatch = new MessageBatch(messageBatchSize);
+            }
 
-        //if channel is being closed and we have no outstanding messages,  let's close the channel
-        if (requests.isEmpty() && being_closed.get()) {
-            close_n_release();
-            return;
+            messageBatch.add(message);
+            if (messageBatch.isFull()) {
+                MessageBatch toBeFlushed = messageBatch;
+                flushRequest(channel, toBeFlushed, blocking);
+                messageBatch = null;
+            }
         }
 
-        //we are busily delivering messages, and will check queue upon response.
-        //When send() is called by senders, we should not thus call tryDeliverMessages().
-        wait_for_requests = false;
-
-        //write request into socket channel
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
-                    reconnect();
-                } else {
-                    LOG.debug("{} request(s) sent", requests.size());
-
-                    //Now that our requests have been sent, channel could be closed if needed
-                    if (being_closed.get())
-                        close_n_release();
-                }
+        if (null != messageBatch && !messageBatch.isEmpty()) {
+            if (channel.isWritable()) {
+                flushCheckTimer.set(Long.MAX_VALUE);
+                
+                // Flush as fast as we can to reduce the latency
+                MessageBatch toBeFlushed = messageBatch;
+                messageBatch = null;
+                flushRequest(channel, toBeFlushed, blocking);
+                
+            } else {
+                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
             }
-        });
+        }
+
     }
 
-    /**
-     * Take all enqueued messages from queue
-     * @return  batch of messages
-     * @throws InterruptedException
-     *
-     * synchronized ... ensure that messages are delivered in the same order
-     * as they are added into queue
-     */
-    private MessageBatch tryTakeMessages() throws InterruptedException {
-        //1st message
-        Object msg = message_queue.poll();
-        if (msg == null) return null;
-
-        MessageBatch batch = new MessageBatch(buffer_size);
-        //we will discard any message after CLOSE
-        if (msg == ControlMessage.CLOSE_MESSAGE) {
-            LOG.info("Connection to {} is being closed", remote_addr);
-            being_closed.set(true);
-            return batch;
+    public String name() {
+        if (null != remote_addr) {
+            return PREFIX + remote_addr.toString();
         }
+        return "";
+    }
 
-        batch.add((TaskMessage)msg);
-        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
-            //Is it a CLOSE message?
-            if (msg == ControlMessage.CLOSE_MESSAGE) {
-                message_queue.take();
-                LOG.info("Connection to {} is being closed", remote_addr);
-                being_closed.set(true);
-                break;
+    private synchronized void flush() {
+        if (!closing) {
+            if (null != messageBatch && !messageBatch.isEmpty()) {
+                MessageBatch toBeFlushed = messageBatch;
+                Channel channel = channelRef.get();
+                if (channel != null) {
+                    flushCheckTimer.set(Long.MAX_VALUE);
+                    flushRequest(channel, toBeFlushed, true);
+                }
+                messageBatch = null;
             }
-
-            //try to add this msg into batch
-            if (!batch.tryAdd((TaskMessage) msg))
-                break;
-
-            //remove this message
-            message_queue.take();
         }
-
-        return batch;
     }
-
+    
     /**
      * gracefully close this client.
-     *
-     * We will send all existing requests, and then invoke close_n_release() method
+     * 
+     * We will send all existing requests, and then invoke close_n_release()
+     * method
      */
-    public void close() {
-        //enqueue a CLOSE message so that shutdown() will be invoked
-        try {
-            message_queue.put(ControlMessage.CLOSE_MESSAGE);
-
-            //resume delivery if it is waiting for requests
-            tryDeliverMessages(true);
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted Connection to {} is being closed", remote_addr);
-            being_closed.set(true);
+    public synchronized void close() {
+        if (!closing) {
+            closing = true;
+            if (null != messageBatch && !messageBatch.isEmpty()) {
+                MessageBatch toBeFlushed = messageBatch;
+                Channel channel = channelRef.get();
+                if (channel != null) {
+                    flushRequest(channel, toBeFlushed, true);
+                }
+                messageBatch = null;
+            }
+        
+            //wait for pendings to exit
+            while(pendings.get() != 0) {
+                try {
+                    Thread.sleep(1000); //sleep 1s
+                } catch (InterruptedException e) {
+                    break;
+                } 
+            }
+            
             close_n_release();
         }
     }
@@ -262,27 +285,59 @@ class Client implements IConnection {
     /**
      * close_n_release() is invoked after all messages have been sent.
      */
-    synchronized void close_n_release() {
+    private void close_n_release() {
         if (channelRef.get() != null) {
             channelRef.get().close();
             LOG.debug("channel {} closed",remote_addr);
-            setChannel(null);
         }
     }
 
-    public TaskMessage recv(int flags) {
+    public Iterator<TaskMessage> recv(int flags) {
         throw new RuntimeException("Client connection should not receive any messages");
     }
 
-    void setChannel(Channel channel) {
-        if (channel != null && channel.isOpen()) {
-            //Assume the most recent connection attempt was successful.
-            retries.set(0);
-        }
-        channelRef.set(channel);
-        //reset retries
-        if (channel != null)
-            retries.set(0);
+    public Iterator<TaskMessage> recv(int flags, int clientId) {
+        throw new RuntimeException("Client connection should not receive any messages");
+    }
+
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
     }
 
-}
+    private void flushRequest(Channel channel, final MessageBatch requests,
+            boolean blocking) {
+        if (requests == null)
+            return;
+
+        pendings.incrementAndGet();
+        ChannelFuture future = channel.write(requests);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+
+                pendings.decrementAndGet();
+                if (!future.isSuccess()) {
+                    LOG.info(
+                            "failed to send requests to " + remote_addr.toString() + ": ", future.getCause());
+
+                    Channel channel = future.getChannel();
+
+                    if (null != channel) {
+                        channel.close();
+                        channelRef.compareAndSet(channel, null);
+                    }
+                } else {
+                    LOG.debug("{} request(s) sent", requests.size());
+                }
+            }
+        });
+
+        if (blocking) {
+            future.awaitUninterruptibly();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 80b4443..2e762ce 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -20,6 +20,7 @@ package backtype.storm.messaging.netty;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.Map;
 import java.util.Vector;
 
@@ -47,12 +48,14 @@ public class Context implements IContext {
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
         if (maxWorkers > 0) {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool(), maxWorkers);
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool());
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index a552cf7..b7335b3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -17,6 +17,8 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.io.IOException;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -54,14 +56,14 @@ enum ControlMessage {
      * encode the current Control Message into a channel buffer
      * @throws Exception
      */
-    ChannelBuffer buffer() throws Exception {
+    ChannelBuffer buffer() throws IOException {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
         write(bout);
         bout.close();
         return bout.buffer();
     }
 
-    void write(ChannelBufferOutputStream bout) throws Exception {
+    void write(ChannelBufferOutputStream bout) throws IOException {
         bout.writeShort(code);        
     } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index cd8d4e3..63c861a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -44,6 +44,11 @@ class MessageBatch {
         encoded_length += msgEncodeLength(msg);
     }
 
+
+    TaskMessage get(int index) {
+        return msgs.get(index);
+    }
+
     /**
      * try to add a TaskMessage to a batch
      * @param taskMsg

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 3365e58..8291d78 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -17,6 +17,9 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import backtype.storm.messaging.TaskMessage;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
@@ -34,52 +37,78 @@ public class MessageDecoder extends FrameDecoder {
      */
     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
         // Make sure that we have received at least a short 
-        if (buf.readableBytes() < 2) {
+        long available = buf.readableBytes();
+        if (available < 2) {
             //need more data
             return null;
         }
 
-        // Mark the current buffer position before reading task/len field
-        // because the whole frame might not be in the buffer yet.
-        // We will reset the buffer position to the marked position if
-        // there's not enough bytes in the buffer.
-        buf.markReaderIndex();
-
-        //read the short field
-        short code = buf.readShort();
-        
-        //case 1: Control message
-        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
-        if (ctrl_msg != null) return ctrl_msg;
-        
-        //case 2: task Message
-        short task = code;
-        
-        // Make sure that we have received at least an integer (length) 
-        if (buf.readableBytes() < 4) {
-            //need more data
-            buf.resetReaderIndex();
-            return null;
-        }
+        List<Object> ret = new ArrayList<Object>();
+
+        while (available >= 2) {
+
+            // Mark the current buffer position before reading task/len field
+            // because the whole frame might not be in the buffer yet.
+            // We will reset the buffer position to the marked position if
+            // there's not enough bytes in the buffer.
+            buf.markReaderIndex();
+
+            // read the short field
+            short code = buf.readShort();
+            available -= 2;
+
+            // case 1: Control message
+            ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+            if (ctrl_msg != null) {
+
+                if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
+                    continue;
+                } else {
+                    return ctrl_msg;
+                }
+            }
+
+            // case 2: task Message
+            short task = code;
 
-        // Read the length field.
-        int length = buf.readInt();
-        if (length<=0) {
-            return new TaskMessage(task, null);
+            // Make sure that we have received at least an integer (length)
+            if (available < 4) {
+                // need more data
+                buf.resetReaderIndex();
+                break;
+            }
+
+            // Read the length field.
+            int length = buf.readInt();
+
+            available -= 4;
+
+            if (length <= 0) {
+                ret.add(new TaskMessage(task, null));
+                break;
+            }
+
+            // Make sure if there's enough bytes in the buffer.
+            if (available < length) {
+                // The whole bytes were not received yet - return null.
+                buf.resetReaderIndex();
+                break;
+            }
+            available -= length;
+
+            // There's enough bytes in the buffer. Read it.
+            ChannelBuffer payload = buf.readBytes(length);
+
+
+            // Successfully decoded a frame.
+            // Return a TaskMessage object
+            ret.add(new TaskMessage(task, payload.array()));
         }
-        
-        // Make sure if there's enough bytes in the buffer.
-        if (buf.readableBytes() < length) {
-            // The whole bytes were not received yet - return null.
-            buf.resetReaderIndex();
+
+        if (ret.size() == 0) {
             return null;
+        } else {
+            return ret;
         }
-
-        // There's enough bytes in the buffer. Read it.
-        ChannelBuffer payload = buf.readBytes(length);
-
-        // Successfully decoded a frame.
-        // Return a TaskMessage object
-        return new TaskMessage(task,payload.array());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 83e4187..71f01e0 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -31,35 +31,65 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 
 class Server implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
-    private LinkedBlockingQueue<TaskMessage> message_queue;
+    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
     volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
-
+    
+    private int queueCount;
+    HashMap<Integer, Integer> taskToQueueId = null;
+    int roundRobinQueueId;
+	
+    boolean closing = false;
+    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
+    
+    
     @SuppressWarnings("rawtypes")
     Server(Map storm_conf, int port) {
         this.storm_conf = storm_conf;
         this.port = port;
-        message_queue = new LinkedBlockingQueue<TaskMessage>();
-
+        
+        queueCount = Utils.getInt(storm_conf.get("worker.receiver.thread.count"), 1);
+        roundRobinQueueId = 0;
+        taskToQueueId = new HashMap<Integer, Integer>();
+    
+        message_queue = new LinkedBlockingQueue[queueCount];
+		    for (int i = 0; i < queueCount; i++) {
+            message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+        }
+        
         // Configure the server.
         int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
 
+        ThreadFactory bossFactory = new NettyRenameThreadFactory(name() + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory(name() + "-worker");
+        
         if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
+                Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
+                Executors.newCachedThreadPool(workerFactory));
         }
+        
+        LOG.info("Create Netty Server " + name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
+        
         bootstrap = new ServerBootstrap(factory);
         bootstrap.setOption("child.tcpNoDelay", true);
         bootstrap.setOption("child.receiveBufferSize", buffer_size);
@@ -72,34 +102,106 @@ class Server implements IConnection {
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);
     }
+    
+    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
+      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+      
+      for (int i = 0; i < msgs.size(); i++) {
+        TaskMessage message = msgs.get(i);
+        int task = message.task();
+        
+        if (task == -1) {
+          closing = true;
+          return null;
+        }
+        
+        Integer queueId = getMessageQueueId(task);
+        
+        if (null == messageGroups[queueId]) {
+          messageGroups[queueId] = new ArrayList<TaskMessage>();
+        }
+        messageGroups[queueId].add(message);
+      }
+      return messageGroups;
+    }
+    
+    private Integer getMessageQueueId(int task) {
+      Integer queueId = taskToQueueId.get(task);
+      if (null == queueId) {
+        synchronized(taskToQueueId) {
+          //assgin task to queue in round-robin manner
+          if (null == taskToQueueId.get(task)) {
+            queueId = roundRobinQueueId++;
+            taskToQueueId.put(task, queueId);
+            if (roundRobinQueueId == queueCount) {
+              roundRobinQueueId = 0;
+            }
+          }
+        }
+      }
+      return queueId;
+    }
 
     /**
      * enqueue a received message 
      * @param message
      * @throws InterruptedException
      */
-    protected void enqueue(TaskMessage message) throws InterruptedException {
-        message_queue.put(message);
-        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
-    }
+    protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
+      
+      if (null == msgs || msgs.size() == 0 || closing) {
+        return;
+      }
+      
+      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+      
+      if (null == messageGroups || closing) {
+        return;
+      }
+      
+      for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
+        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+        if (null != msgGroup) {
+          message_queue[receiverId].put(msgGroup);
+        }
+      }
+   }
     
     /**
      * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
      */
-    public TaskMessage recv(int flags)  {
-        if ((flags & 0x01) == 0x01) { 
+    public Iterator<TaskMessage> recv(int flags)  {
+      if (queueCount > 1) {
+        throw new RuntimeException("Use recv(int flags, int clientId) instead, as we have worker.receiver.thread.count=" + queueCount + " receive threads, clientId should be 0 <= clientId < " + queueCount);
+      }
+      return recv(flags, 0);
+    }
+    
+    public Iterator<TaskMessage> recv(int flags, int receiverId)  {
+      if (closing) {
+        return closeMessage.iterator();
+      }
+      
+      ArrayList<TaskMessage> ret = null; 
+      int queueId = receiverId % queueCount;
+      if ((flags & 0x01) == 0x01) { 
             //non-blocking
-            return message_queue.poll();
+            ret = message_queue[queueId].poll();
         } else {
             try {
-                TaskMessage request = message_queue.take();
+                ArrayList<TaskMessage> request = message_queue[queueId].take();
                 LOG.debug("request to be processed: {}", request);
-                return request;
+                ret = request;
             } catch (InterruptedException e) {
                 LOG.info("exception within msg receiving", e);
-                return null;
+                ret = null;
             }
         }
+      
+      if (null != ret) {
+        return ret.iterator();
+      }
+      return null;
     }
 
     /**
@@ -133,4 +235,12 @@ class Server implements IConnection {
     public void send(int task, byte[] message) {
         throw new RuntimeException("Server connection should not send any messages");
     }
+    
+    public void send(Iterator<TaskMessage> msgs) {
+      throw new RuntimeException("Server connection should not send any messages");
+    }
+	
+	 public String name() {
+      return "Netty-server-localhost-" + port;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 43a8c39..4c70518 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -18,70 +18,24 @@
 package backtype.storm.messaging.netty;
 
 import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 public class StormClientHandler extends SimpleChannelUpstreamHandler  {
     private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
     private Client client;
-    long start_time;
     
     StormClientHandler(Client client) {
         this.client = client;
-        start_time = System.currentTimeMillis();
-    }
-
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
-        //register the newly established channel
-        Channel channel = event.getChannel();
-        client.setChannel(channel);
-        LOG.info("connection established from "+channel.getLocalAddress()+" to "+channel.getRemoteAddress());
-        
-        //send next batch of requests if any
-        try {
-            client.tryDeliverMessages(false);
-        } catch (Exception ex) {
-            LOG.info("exception when sending messages:", ex.getMessage());
-            client.reconnect();
-        }
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
-        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
-        
-        //examine the response message from server
-        ControlMessage msg = (ControlMessage)event.getMessage();
-        if (msg==ControlMessage.FAILURE_RESPONSE)
-            LOG.info("failure response:{}", msg);
-
-        //send next batch of requests if any
-        try {
-            client.tryDeliverMessages(false);
-        } catch (Exception ex) {
-            LOG.info("exception when sending messages:", ex.getMessage());
-            client.reconnect();
-        }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
         Throwable cause = event.getCause();
         if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection to "+client.remote_addr+" failed:", cause);
-        }
-        client.reconnect();
+            LOG.info("Connection failed " + client.name(), cause);
+        } 
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 093fb61..8b93e31 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -18,10 +18,14 @@
 package backtype.storm.messaging.netty;
 
 import backtype.storm.messaging.TaskMessage;
-import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 class StormServerHandler extends SimpleChannelUpstreamHandler  {
@@ -41,30 +45,22 @@ class StormServerHandler extends SimpleChannelUpstreamHandler  {
     
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();  
-        if (msg == null) return;
-
-        //end of batch?
-        if (msg==ControlMessage.EOB_MESSAGE) {
-            Channel channel = ctx.getChannel();
-            LOG.debug("Send back response ...");
-            if (failure_count.get()==0)
-                channel.write(ControlMessage.OK_RESPONSE);
-            else channel.write(ControlMessage.FAILURE_RESPONSE);
-            return;
-        }
-        
-        //enqueue the received message for processing
-        try {
-            server.enqueue((TaskMessage)msg);
-        } catch (InterruptedException e1) {
-            LOG.info("failed to enqueue a request message", e);
-            failure_count.incrementAndGet();
-        }
+      List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
+      if (msgs == null) {
+        return;
+      }
+      
+      try {
+        server.enqueue(msgs);
+      } catch (InterruptedException e1) {
+        LOG.info("failed to enqueue a request message", e);
+        failure_count.incrementAndGet();
+      }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        e.getCause().printStackTrace();
         server.closeChannel(e.getChannel());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 56751c6..8c5b466 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -51,8 +51,11 @@ public class DisruptorQueue implements IStatefulObject {
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+    private static String PREFIX = "disruptor-";
+    private String _queueName = "";
     
-    public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
+    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
+         this._queueName = PREFIX + queueName;
         _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
@@ -62,6 +65,10 @@ public class DisruptorQueue implements IStatefulObject {
         }
     }
     
+    public String getName() {
+      return _queueName;
+    }
+    
     public void consumeBatch(EventHandler<Object> handler) {
         consumeBatchToCursor(_barrier.getCursor(), handler);
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index a1fed96..b1892f1 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.PrintStream;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.TreeMap;
 import java.util.UUID;
 
@@ -301,15 +303,39 @@ public class Utils {
     }
     
     public static Integer getInt(Object o) {
-        if(o instanceof Long) {
-            return ((Long) o ).intValue();
-        } else if (o instanceof Integer) {
-            return (Integer) o;
-        } else if (o instanceof Short) {
-            return ((Short) o).intValue();
-        } else {
-            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
-        }
+      Integer result = getInt(o, null);
+      if (null == result) {
+        throw new IllegalArgumentException("Don't know how to convert null + to int");
+      }
+      return result;
+    }
+    
+    public static Integer getInt(Object o, Integer defaultValue) {
+      if (null == o) {
+        return defaultValue;
+      }
+      
+      if(o instanceof Long) {
+          return ((Long) o ).intValue();
+      } else if (o instanceof Integer) {
+          return (Integer) o;
+      } else if (o instanceof Short) {
+          return ((Short) o).intValue();
+      } else {
+          throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
+      }
+    }
+
+    public static boolean getBoolean(Object o, boolean defaultValue) {
+      if (null == o) {
+        return defaultValue;
+      }
+      
+      if(o instanceof Boolean) {
+          return (Boolean) o;
+      } else {
+          throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+      }
     }
     
     public static long secureRandomLong() {
@@ -373,6 +399,25 @@ public class Utils {
         ret.start();
         return ret;
     }
+    
+    public static void redirectStreamAsync(Process process) {
+      redirectStreamAsync(process.getInputStream(), System.out);
+      redirectStreamAsync(process.getErrorStream(), System.err);
+    }
+    
+    static void redirectStreamAsync(final InputStream input,
+        final PrintStream output) {
+      new Thread(new Runnable() {
+        @Override
+        public void run() {
+          Scanner scanner = new Scanner(input);
+          while (scanner.hasNextLine()) {
+            output.println(scanner.nextLine());
+          }
+        }
+      }).start();
+    }
+ 
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
         CuratorFramework ret = newCurator(conf, servers, port);

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/861a92ea/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
index f271607..d76e245 100644
--- a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -37,7 +37,8 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        resp (.recv server 0)]
+        iter (.recv server 0)
+        resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
     (.close client)
@@ -58,7 +59,8 @@
         server (.bind context nil port)
         client (.connect context nil "localhost" port)
         _ (.send client task (.getBytes req_msg))
-        resp (.recv server 0)]
+        iter (.recv server 0)
+        resp (.next iter)]
     (is (= task (.task resp)))
     (is (= req_msg (String. (.message resp))))
     (.close client)
@@ -77,15 +79,23 @@
                     }
         context (TransportFactory/makeContext storm-conf)
         client (.connect context nil "localhost" port)
+        
+        server (Thread.
+                (fn []
+                  (Thread/sleep 1000)
+                  (let [server (.bind context nil port)
+                        iter (.recv server 0)
+                        resp (.next iter)]
+                    (is (= task (.task resp)))
+                    (is (= req_msg (String. (.message resp))))
+                    (.close server) 
+                  )))
+        _ (.start server)
         _ (.send client task (.getBytes req_msg))
-        _ (Thread/sleep 1000)
-        server (.bind context nil port)
-        resp (.recv server 0)]
-    (is (= task (.task resp)))
-    (is (= req_msg (String. (.message resp))))
+        ]
     (.close client)
-    (.close server)
-    (.term context)))    
+    (.join server)
+    (.term context)))
 
 (deftest test-batch
   (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
@@ -102,11 +112,21 @@
     (doseq [num  (range 1 100000)]
       (let [req_msg (str num)]
         (.send client task (.getBytes req_msg))))
-    (doseq [num  (range 1 100000)]
+    
+    (let [resp (ArrayList.)
+          received (atom 0)]
+      (while (< @received (- 100000 1))
+        (let [iter (.recv server 0)]
+          (while (.hasNext iter)
+            (let [msg (.next iter)]
+              (.add resp msg)
+              (swap! received inc)
+              ))))
+      (doseq [num  (range 1 100000)]
       (let [req_msg (str num)
-            resp (.recv server 0)
-            resp_msg (String. (.message resp))]
-        (is (= req_msg resp_msg))))
+            resp_msg (String. (.message (.get resp (- num 1))))]
+        (is (= req_msg resp_msg)))))
+   
     (.close client)
     (.close server)
     (.term context)))


Mime
View raw message