storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [2/4] git commit: remove 0MQ and replace with netty
Date Sat, 21 Dec 2013 02:26:21 GMT
remove 0MQ and replace with netty


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b63ed139
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b63ed139
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b63ed139

Branch: refs/heads/master
Commit: b63ed13946eae3acaab4a51bba705124701207eb
Parents: 1bcc169
Author: P. Taylor Goetz <ptgoetz@gmail.com>
Authored: Tue Dec 10 11:23:11 2013 -0500
Committer: P. Taylor Goetz <ptgoetz@gmail.com>
Committed: Tue Dec 10 11:23:11 2013 -0500

----------------------------------------------------------------------
 MODULES                                         |   1 -
 bin/install_zmq.sh                              |  31 ---
 conf/defaults.yaml                              |   2 +-
 storm-core/project.clj                          |   2 +-
 .../src/clj/backtype/storm/messaging/zmq.clj    |  93 ---------
 storm-core/src/clj/backtype/storm/testing.clj   |   2 +-
 storm-core/src/clj/zilch/mq.clj                 | 104 ----------
 .../backtype/storm/messaging/netty/Client.java  | 204 ++++++++++++++++++
 .../backtype/storm/messaging/netty/Context.java |  50 +++++
 .../storm/messaging/netty/ControlMessage.java   |  50 +++++
 .../storm/messaging/netty/MessageBatch.java     | 151 ++++++++++++++
 .../storm/messaging/netty/MessageDecoder.java   |  68 ++++++
 .../storm/messaging/netty/MessageEncoder.java   |  22 ++
 .../backtype/storm/messaging/netty/Server.java  | 119 +++++++++++
 .../messaging/netty/StormClientHandler.java     | 104 ++++++++++
 .../netty/StormClientPipelineFactory.java       |  27 +++
 .../messaging/netty/StormServerHandler.java     |  53 +++++
 .../netty/StormServerPipelineFactory.java       |  28 +++
 .../storm/messaging/netty_integration_test.clj  |  44 ++++
 .../storm/messaging/netty_unit_test.clj         |  97 +++++++++
 .../test/clj/backtype/storm/messaging_test.clj  |   2 +-
 storm-core/test/clj/zilch/test/mq.clj           |  86 --------
 storm-netty/project.clj                         |  13 --
 .../backtype/storm/messaging/netty/Client.java  | 205 -------------------
 .../backtype/storm/messaging/netty/Context.java |  50 -----
 .../storm/messaging/netty/ControlMessage.java   |  51 -----
 .../storm/messaging/netty/MessageBatch.java     | 153 --------------
 .../storm/messaging/netty/MessageDecoder.java   |  68 ------
 .../storm/messaging/netty/MessageEncoder.java   |  22 --
 .../backtype/storm/messaging/netty/Server.java  | 120 -----------
 .../messaging/netty/StormClientHandler.java     | 111 ----------
 .../netty/StormClientPipelineFactory.java       |  27 ---
 .../messaging/netty/StormServerHandler.java     |  59 ------
 .../netty/StormServerPipelineFactory.java       |  28 ---
 .../storm/messaging/netty_integration_test.clj  |  44 ----
 .../storm/messaging/netty_unit_test.clj         |  97 ---------
 36 files changed, 1021 insertions(+), 1367 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/MODULES
----------------------------------------------------------------------
diff --git a/MODULES b/MODULES
index 76c078a..aa29093 100644
--- a/MODULES
+++ b/MODULES
@@ -1,4 +1,3 @@
 storm-console-logging
 storm-core
-storm-netty
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/bin/install_zmq.sh
----------------------------------------------------------------------
diff --git a/bin/install_zmq.sh b/bin/install_zmq.sh
deleted file mode 100755
index dc744f1..0000000
--- a/bin/install_zmq.sh
+++ /dev/null
@@ -1,31 +0,0 @@
-#!/bin/bash
-export JAVA_HOME=${JAVA_HOME:/usr/libexec/java_home}
-
-if [ ! -d "$JAVA_HOME/include" ]; then
-    echo "
-Looks like you're missing your 'include' directory. If you're using Mac OS X, You'll need to install the Java dev package.
-
-- Navigate to http://goo.gl/D8lI
-- Click the Java tab on the right
-- Install the appropriate version and try again.
-"
-    exit -1;
-fi
-
-#install zeromq
-wget http://download.zeromq.org/zeromq-2.1.7.tar.gz
-tar -xzf zeromq-2.1.7.tar.gz
-cd zeromq-2.1.7
-./configure
-make
-sudo make install
-
-cd ../
-
-#install jzmq (both native and into local maven cache)
-git clone https://github.com/nathanmarz/jzmq.git
-cd jzmq
-./autogen.sh
-./configure
-make
-sudo make install

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index a5b31f4..35e7b00 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -18,7 +18,7 @@ storm.zookeeper.retry.intervalceiling.millis: 30000
 storm.cluster.mode: "distributed" # can be distributed or local
 storm.local.mode.zmq: false
 storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
-storm.messaging.transport: "backtype.storm.messaging.zmq"
+storm.messaging.transport: "backtype.storm.messaging.netty.Context"
 
 ### nimbus.* configs are for the master
 nimbus.host: "localhost"

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/project.clj
----------------------------------------------------------------------
diff --git a/storm-core/project.clj b/storm-core/project.clj
index 0eaa6a3..1b7226a 100644
--- a/storm-core/project.clj
+++ b/storm-core/project.clj
@@ -10,7 +10,6 @@
                  [clj-time "0.4.1"]
                  [com.netflix.curator/curator-framework "1.0.1"
                   :exclusions [log4j/log4j]]
-                 [backtype/jzmq "2.1.0"]
                  [com.googlecode.json-simple/json-simple "1.1"]
                  [compojure "1.1.3"]
                  [hiccup "0.3.6"]
@@ -27,6 +26,7 @@
                  [com.google.guava/guava "13.0"]
                  [ch.qos.logback/logback-classic "1.0.6"]
                  [org.slf4j/log4j-over-slf4j "1.6.6"]
+                 [io.netty/netty "3.6.3.Final"]
                  ]
 
   :source-paths ["src/clj"]

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/backtype/storm/messaging/zmq.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/messaging/zmq.clj b/storm-core/src/clj/backtype/storm/messaging/zmq.clj
deleted file mode 100644
index 23e263e..0000000
--- a/storm-core/src/clj/backtype/storm/messaging/zmq.clj
+++ /dev/null
@@ -1,93 +0,0 @@
-(ns backtype.storm.messaging.zmq
-  (:refer-clojure :exclude [send])
-  (:use [backtype.storm config log])
-  (:import [backtype.storm.messaging IContext IConnection TaskMessage])
-  (:import [java.nio ByteBuffer])
-  (:import [org.zeromq ZMQ])
-  (:import [java.util Map])
-  (:require [zilch.mq :as mq])
-  (:gen-class
-    :methods [^{:static true} [makeContext [java.util.Map] backtype.storm.messaging.IContext]]))
-
-(defn mk-packet [task ^bytes message]
-  (let [bb (ByteBuffer/allocate (+ 2 (count message)))]
-    (.putShort bb (short task))
-    (.put bb message)
-    (.array bb)
-    ))
-
-(defn parse-packet [^bytes packet]
-  (let [bb (ByteBuffer/wrap packet)
-        port (.getShort bb)
-        msg (byte-array (- (count packet) 2))]
-    (.get bb msg)
-    (TaskMessage. (int port) msg)
-    ))
-
-(defn get-bind-zmq-url [local? port]
-  (if local?
-    (str "ipc://" port ".ipc")
-    (str "tcp://*:" port)))
-
-(defn get-connect-zmq-url [local? host port]
-  (if local?
-    (str "ipc://" port ".ipc")
-    (str "tcp://" host ":" port)))
-
-
-(defprotocol ZMQContextQuery
-  (zmq-context [this]))
-
-(deftype ZMQConnection [socket]
-  IConnection
-  (^TaskMessage recv [this ^int flags]
-    (require 'backtype.storm.messaging.zmq)
-    (if-let [packet (mq/recv socket flags)]
-      (parse-packet packet)))
-  (^void send [this ^int taskId ^bytes payload]
-    (require 'backtype.storm.messaging.zmq)
-    (mq/send socket (mk-packet taskId payload) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
-  (^void close [this]
-    (.close socket)))
-
-(defn mk-connection [socket]
-  (ZMQConnection. socket))
-
-(deftype ZMQContext [^{:unsynchronized-mutable true} context 
-                     ^{:unsynchronized-mutable true} linger-ms 
-                     ^{:unsynchronized-mutable true} hwm 
-                     ^{:unsynchronized-mutable true} local?]
-  IContext
-  (^void prepare [this ^Map storm-conf]
-    (let [num-threads (.get storm-conf ZMQ-THREADS)]
-      (set! context (mq/context num-threads)) 
-      (set! linger-ms (.get storm-conf ZMQ-LINGER-MILLIS))
-      (set! hwm (.get storm-conf ZMQ-HWM))
-      (set! local? (= (.get storm-conf STORM-CLUSTER-MODE) "local"))))
-  (^IConnection bind [this ^String storm-id ^int port]
-    (require 'backtype.storm.messaging.zmq)
-    (-> context
-      (mq/socket mq/pull)
-      (mq/set-hwm hwm)
-      (mq/bind (get-bind-zmq-url local? port))
-      mk-connection
-      ))
-  (^IConnection connect [this ^String storm-id ^String host ^int port]
-    (require 'backtype.storm.messaging.zmq)
-    (-> context
-      (mq/socket mq/push)
-      (mq/set-hwm hwm)
-      (mq/set-linger linger-ms)
-      (mq/connect (get-connect-zmq-url local? host port))
-      mk-connection))
-  (^void term [this]
-    (.term context))
-  
-  ZMQContextQuery
-  (zmq-context [this]
-    context))
-
-(defn -makeContext [^Map storm-conf] 
-  (let [context (ZMQContext. nil 0 0 true)]
-    (.prepare context storm-conf)
-    context))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/backtype/storm/testing.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/testing.clj b/storm-core/src/clj/backtype/storm/testing.clj
index 700dce6..a17743a 100644
--- a/storm-core/src/clj/backtype/storm/testing.clj
+++ b/storm-core/src/clj/backtype/storm/testing.clj
@@ -96,7 +96,7 @@
 ;; local dir is always overridden in maps
 ;; can customize the supervisors (except for ports) by passing in map for :supervisors parameter
 ;; if need to customize amt of ports more, can use add-supervisor calls afterwards
-(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1]
+(defnk mk-local-storm-cluster [:supervisors 2 :ports-per-supervisor 3 :daemon-conf {} :inimbus nil :supervisor-slot-port-min 1024]
   (let [zk-tmp (local-temp-path)
         [zk-port zk-handle] (zk/mk-inprocess-zookeeper zk-tmp)
         daemon-conf (merge (read-storm-config)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/clj/zilch/mq.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/zilch/mq.clj b/storm-core/src/clj/zilch/mq.clj
deleted file mode 100644
index 27c2094..0000000
--- a/storm-core/src/clj/zilch/mq.clj
+++ /dev/null
@@ -1,104 +0,0 @@
-;; Copyright 2011 Tim Dysinger
-
-;;    Licensed under the Apache License, Version 2.0 (the "License");
-;;    you may not use this file except in compliance with the License.
-;;    You may obtain a copy of the License at
-
-;;        http://www.apache.org/licenses/LICENSE-2.0
-
-;;    Unless required by applicable law or agreed to in writing, software
-;;    distributed under the License is distributed on an "AS IS" BASIS,
-;;    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;;    See the License for the specific language governing permissions and
-;;    limitations under the License.
-
-(ns zilch.mq
-  (:refer-clojure :exclude [send])
-  )
-
-(defmacro zeromq-imports []
-  '(do
-    (import '[org.zeromq ZMQ ZMQ$Context ZMQ$Socket])
-    ))
-
-(zeromq-imports)
-
-(defn ^ZMQ$Context context [threads]
-  (ZMQ/context threads))
-
-(defmacro with-context
-  [id threads & body]
-  `(let [~id (context ~threads)]
-     (try ~@body
-          (finally (.term ~id)))))
-
-(def sndmore ZMQ/SNDMORE)
-
-(def req ZMQ/REQ)
-(def rep ZMQ/REP)
-(def xreq ZMQ/XREQ)
-(def xrep ZMQ/XREP)
-(def pub ZMQ/PUB)
-(def sub ZMQ/SUB)
-(def pair ZMQ/PAIR)
-(def push ZMQ/PUSH)
-(def pull ZMQ/PULL)
-
-(defn ^bytes barr [& arr]
-  (byte-array (map byte arr)))
-
-(defn ^ZMQ$Socket socket
-  [^ZMQ$Context context type]
-  (.socket context type))
-
-(defn set-linger
-  [^ZMQ$Socket socket linger-ms]
-  (doto socket
-    (.setLinger (long linger-ms))))
-
-(defn set-hwm
-  [^ZMQ$Socket socket hwm]
-  (if hwm
-    (doto socket
-      (.setHWM (long hwm)))
-    socket
-    ))
-
-(defn bind
-  [^ZMQ$Socket socket url]
-  (doto socket
-    (.bind url)))
-
-(defn connect
-  [^ZMQ$Socket socket url]
-  (doto socket
-    (.connect url)))
-
-(defn subscribe
-  ([^ZMQ$Socket socket ^bytes topic]
-     (doto socket
-       (.subscribe topic)))
-  ([^ZMQ$Socket socket]
-     (subscribe socket (byte-array []))))
-
-(defn unsubscribe
-  ([^ZMQ$Socket socket ^bytes topic]
-     (doto socket
-       (.unsubscribe (.getBytes topic))))
-  ([^ZMQ$Socket socket]
-     (unsubscribe socket "")))
-
-(defn send
-  ([^ZMQ$Socket socket ^bytes message flags]
-     (.send socket message flags))
-  ([^ZMQ$Socket socket ^bytes message]
-     (send socket message ZMQ/NOBLOCK)))
-
-(defn recv-more? [^ZMQ$Socket socket]
-  (.hasReceiveMore socket))
-
-(defn recv
-  ([^ZMQ$Socket socket flags]
-     (.recv socket flags))
-  ([^ZMQ$Socket socket]
-     (recv socket 0)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
new file mode 100644
index 0000000..c2b391a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -0,0 +1,204 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+class Client implements IConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+    private final int max_retries;
+    private final int base_sleep_ms;
+    private final int max_sleep_ms;
+    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
+    private AtomicReference<Channel> channelRef;
+    private final ClientBootstrap bootstrap;
+    private InetSocketAddress remote_addr;
+    private AtomicInteger retries;
+    private final Random random = new Random();
+    private final ChannelFactory factory;
+    private final int buffer_size;
+    private final AtomicBoolean being_closed;
+
+    @SuppressWarnings("rawtypes")
+    Client(Map storm_conf, String host, int port) {
+        message_queue = new LinkedBlockingQueue<Object>();
+        retries = new AtomicInteger(0);
+        channelRef = new AtomicReference<Channel>(null);
+        being_closed = new AtomicBoolean(false);
+
+        // Configure
+        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
+        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
+        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+
+        if (maxWorkers > 0) {
+            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+        } else {
+            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+        }
+        bootstrap = new ClientBootstrap(factory);
+        bootstrap.setOption("tcpNoDelay", true);
+        bootstrap.setOption("sendBufferSize", buffer_size);
+        bootstrap.setOption("keepAlive", true);
+
+        // Set up the pipeline factory.
+        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
+
+        // Start the connection attempt.
+        remote_addr = new InetSocketAddress(host, port);
+        bootstrap.connect(remote_addr);
+    }
+
+    /**
+     * We will retry connection with exponential back-off policy
+     */
+    void reconnect() {
+        try {
+            int tried_count = retries.incrementAndGet();
+            if (tried_count <= max_retries) {
+                Thread.sleep(getSleepTimeMs());
+                LOG.info("Reconnect ... [{}]", tried_count);
+                bootstrap.connect(remote_addr);
+                LOG.debug("connection started...");
+            } else {
+                LOG.warn("Remote address is not reachable. We will close this client.");
+                close();
+            }
+        } catch (InterruptedException e) {
+            LOG.warn("connection failed", e);
+        }
+    }
+
+    /**
+     * # of milliseconds to wait per exponential back-off policy
+     */
+    private int getSleepTimeMs()
+    {
+        int backoff = 1 << retries.get();
+        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
+        if ( sleepMs > max_sleep_ms )
+            sleepMs = max_sleep_ms;
+        return sleepMs;
+    }
+
+    /**
+     * Enqueue a task message to be sent to server
+     */
+    public void send(int task, byte[] message) {
+        //throw exception if the client is being closed
+        if (being_closed.get()) {
+            throw new RuntimeException("Client is being closed, and does not take requests any more");
+        }
+
+        try {
+            message_queue.put(new TaskMessage(task, message));
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Take all enqueued messages from queue
+     * @return
+     * @throws InterruptedException
+     */
+    MessageBatch takeMessages()  throws InterruptedException {
+        //1st message
+        MessageBatch batch = new MessageBatch(buffer_size);
+        Object msg = message_queue.take();
+        batch.add(msg);
+
+        //we will discard any message after CLOSE
+        if (msg==ControlMessage.CLOSE_MESSAGE)
+            return batch;
+
+        while (!batch.isFull()) {
+            //peek the next message
+            msg = message_queue.peek();
+            //no more messages
+            if (msg == null) break;
+
+            //we will discard any message after CLOSE
+            if (msg==ControlMessage.CLOSE_MESSAGE) {
+                message_queue.take();
+                batch.add(msg);
+                break;
+            }
+
+            //try to add this msg into batch
+            if (!batch.tryAdd((TaskMessage) msg))
+                break;
+
+            //remove this message
+            message_queue.take();
+        }
+
+        return batch;
+    }
+
+    /**
+     * gracefully close this client.
+     *
+     * We will send all existing requests, and then invoke close_n_release() method
+     */
+    public synchronized void close() {
+        if (!being_closed.get()) {
+            //enqueue a CLOSE message so that shutdown() will be invoked
+            try {
+                message_queue.put(ControlMessage.CLOSE_MESSAGE);
+                being_closed.set(true);
+            } catch (InterruptedException e) {
+                close_n_release();
+            }
+        }
+    }
+
+    /**
+     * close_n_release() is invoked after all messages have been sent.
+     */
+    void  close_n_release() {
+        if (channelRef.get() != null)
+            channelRef.get().close().awaitUninterruptibly();
+
+        //we need to release resources
+        new Thread(new Runnable() {
+            @Override
+            public void run() {
+                factory.releaseExternalResources();
+            }}).start();
+    }
+
+    public TaskMessage recv(int flags) {
+        throw new RuntimeException("Client connection should not receive any messages");
+    }
+
+    void setChannel(Channel channel) {
+        channelRef.set(channel);
+        //reset retries
+        if (channel != null)
+            retries.set(0);
+    }
+
+}
+
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
new file mode 100644
index 0000000..018e0f9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -0,0 +1,50 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.IContext;
+
+import java.util.Map;
+import java.util.Vector;
+
+public class Context implements IContext {
+    @SuppressWarnings("rawtypes")
+    private Map storm_conf;
+    private volatile Vector<IConnection> connections;
+    
+    /**
+     * initialization per Storm configuration 
+     */
+    @SuppressWarnings("rawtypes")
+    public void prepare(Map storm_conf) {
+       this.storm_conf = storm_conf;
+       connections = new Vector<IConnection>(); 
+    }
+
+    /**
+     * establish a server with a binding port
+     */
+    public IConnection bind(String storm_id, int port) {
+        IConnection server = new Server(storm_conf, port);
+        connections.add(server);
+        return server;
+    }
+
+    /**
+     * establish a connection to a remote server
+     */
+    public IConnection connect(String storm_id, String host, int port) {        
+        IConnection client =  new Client(storm_conf, host, port);
+        connections.add(client);
+        return client;
+    }
+
+    /**
+     * terminate this context
+     */
+    public void term() {
+        for (IConnection conn : connections) {
+            conn.close();
+        }
+        connections = null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
new file mode 100644
index 0000000..4cc2040
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -0,0 +1,50 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+enum ControlMessage {
+    CLOSE_MESSAGE((short)-100),
+    EOB_MESSAGE((short)-201),
+    OK_RESPONSE((short)-200),
+    FAILURE_RESPONSE((short)-400);
+
+    private short code;
+
+    //private constructor
+    private ControlMessage(short code) {
+        this.code = code;
+    }
+
+    /**
+     * Return a control message per an encoded status code
+     * @param encoded
+     * @return
+     */
+    static ControlMessage mkMessage(short encoded) {
+        for(ControlMessage cm: ControlMessage.values()) {
+          if(encoded == cm.code) return cm;
+        }
+        return null;
+    }
+
+    int encodeLength() {
+        return 2; //short
+    }
+    
+    /**
+     * encode the current Control Message into a channel buffer
+     * @throws Exception
+     */
+    ChannelBuffer buffer() throws Exception {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
+        write(bout);
+        bout.close();
+        return bout.buffer();
+    }
+
+    void write(ChannelBufferOutputStream bout) throws Exception {
+        bout.writeShort(code);        
+    } 
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
new file mode 100644
index 0000000..a9d46a2
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -0,0 +1,151 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBufferOutputStream;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+import java.util.ArrayList;
+
+class MessageBatch {
+    private int buffer_size;
+    private ArrayList<Object> msgs;
+    private int encoded_length;
+
+    MessageBatch(int buffer_size) {
+        this.buffer_size = buffer_size;
+        msgs = new ArrayList<Object>();
+        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
+    }
+
+    void add(Object obj) {
+        if (obj == null)
+            throw new RuntimeException("null object forbidded in message batch");
+
+        if (obj instanceof TaskMessage) {
+            TaskMessage msg = (TaskMessage)obj;
+            msgs.add(msg);
+            encoded_length += msgEncodeLength(msg);
+            return;
+        }
+
+        if (obj instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage)obj;
+            msgs.add(msg);
+            encoded_length += msg.encodeLength();
+            return;
+        }
+
+        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
+    }
+
+    void remove(Object obj) {
+        if (obj == null) return;
+
+        if (obj instanceof TaskMessage) {
+            TaskMessage msg = (TaskMessage)obj;
+            msgs.remove(msg);
+            encoded_length -= msgEncodeLength(msg);
+            return;
+        }
+
+        if (obj instanceof ControlMessage) {
+            ControlMessage msg = (ControlMessage)obj;
+            msgs.remove(msg);
+            encoded_length -= msg.encodeLength();
+            return;
+        }
+    }
+
+    Object get(int index) {
+        return msgs.get(index);
+    }
+
+    /**
+     * try to add a TaskMessage to a batch
+     * @param taskMsg
+     * @return false if the msg could not be added due to buffer size limit; true otherwise
+     */
+    boolean tryAdd(TaskMessage taskMsg) {
+        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
+            return false;
+        add(taskMsg);
+        return true;
+    }
+
+    private int msgEncodeLength(TaskMessage taskMsg) {
+        if (taskMsg == null) return 0;
+
+        int size = 6; //INT + SHORT
+        if (taskMsg.message() != null) 
+            size += taskMsg.message().length;
+        return size;
+    }
+
+    /**
+     * Has this batch used up allowed buffer size
+     * @return
+     */
+    boolean isFull() {
+        return encoded_length >= buffer_size;
+    }
+
+    /**
+     * true if this batch doesn't have any messages 
+     * @return
+     */
+    boolean isEmpty() {
+        return msgs.isEmpty();
+    }
+
+    /**
+     * # of msgs in this batch
+     * @return
+     */
+    int size() {
+        return msgs.size();
+    }
+
+    /**
+     * create a buffer containing the encoding of this batch
+     */
+    ChannelBuffer buffer() throws Exception {
+        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
+        
+        for (Object msg : msgs) 
+            if (msg instanceof TaskMessage)
+                writeTaskMessage(bout, (TaskMessage)msg);
+            else
+                ((ControlMessage)msg).write(bout);
+        
+        //add a END_OF_BATCH indicator
+        ControlMessage.EOB_MESSAGE.write(bout);
+
+        bout.close();
+
+        return bout.buffer();
+    }
+
+    /**
+     * write a TaskMessage into a stream
+     *
+     * Each TaskMessage is encoded as:
+     *  task ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *  
+     */
+    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
+        int payload_len = 0;
+        if (message.message() != null)
+            payload_len =  message.message().length;
+
+        int task_id = message.task();
+        if (task_id > Short.MAX_VALUE)
+            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
+        
+        bout.writeShort((short)task_id);
+        bout.writeInt(payload_len);
+        if (payload_len >0)
+            bout.write(message.message());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
new file mode 100644
index 0000000..76776a9
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -0,0 +1,68 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.frame.FrameDecoder;
+
+public class MessageDecoder extends FrameDecoder {    
+    /*
+     * Each ControlMessage is encoded as:
+     *  code (<0) ... short(2)
+     * Each TaskMessage is encoded as:
+     *  task (>=0) ... short(2)
+     *  len ... int(4)
+     *  payload ... byte[]     *  
+     */
+    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
+        // Make sure that we have received at least a short 
+        if (buf.readableBytes() < 2) {
+            //need more data
+            return null;
+        }
+
+        // Mark the current buffer position before reading task/len field
+        // because the whole frame might not be in the buffer yet.
+        // We will reset the buffer position to the marked position if
+        // there's not enough bytes in the buffer.
+        buf.markReaderIndex();
+
+        //read the short field
+        short code = buf.readShort();
+        
+        //case 1: Control message
+        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+        if (ctrl_msg != null) return ctrl_msg;
+        
+        //case 2: task Message
+        short task = code;
+        
+        // Make sure that we have received at least an integer (length) 
+        if (buf.readableBytes() < 4) {
+            //need more data
+            buf.resetReaderIndex();
+            return null;
+        }
+
+        // Read the length field.
+        int length = buf.readInt();
+        if (length<=0) {
+            return new TaskMessage(task, null);
+        }
+        
+        // Make sure if there's enough bytes in the buffer.
+        if (buf.readableBytes() < length) {
+            // The whole bytes were not received yet - return null.
+            buf.resetReaderIndex();
+            return null;
+        }
+
+        // There's enough bytes in the buffer. Read it.
+        ChannelBuffer payload = buf.readBytes(length);
+
+        // Successfully decoded a frame.
+        // Return a TaskMessage object
+        return new TaskMessage(task,payload.array());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
new file mode 100644
index 0000000..c0ac8f1
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
@@ -0,0 +1,22 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+
+public class MessageEncoder extends OneToOneEncoder {    
+    @Override
+    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
+        if (obj instanceof ControlMessage) {
+            return ((ControlMessage)obj).buffer();
+        }
+
+        if (obj instanceof MessageBatch) {
+            return ((MessageBatch)obj).buffer();
+        } 
+        
+        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
new file mode 100644
index 0000000..bf6825c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -0,0 +1,119 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.Config;
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+import backtype.storm.utils.Utils;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFactory;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+
+class Server implements IConnection {
+    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
+    @SuppressWarnings("rawtypes")
+    Map storm_conf;
+    int port;
+    private LinkedBlockingQueue<TaskMessage> message_queue;
+    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
+    final ChannelFactory factory;
+    final ServerBootstrap bootstrap;
+
+    @SuppressWarnings("rawtypes")
+    Server(Map storm_conf, int port) {
+        this.storm_conf = storm_conf;
+        this.port = port;
+        message_queue = new LinkedBlockingQueue<TaskMessage>();
+
+        // Configure the server.
+        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
+
+        if (maxWorkers > 0) {
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+        } else {
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+        }
+        bootstrap = new ServerBootstrap(factory);
+        bootstrap.setOption("child.tcpNoDelay", true);
+        bootstrap.setOption("child.receiveBufferSize", buffer_size);
+        bootstrap.setOption("child.keepAlive", true);
+
+        // Set up the pipeline factory.
+        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+
+        // Bind and start to accept incoming connections.
+        Channel channel = bootstrap.bind(new InetSocketAddress(port));
+        allChannels.add(channel);
+    }
+
+    /**
+     * enqueue a received message 
+     * @param message
+     * @throws InterruptedException
+     */
+    protected void enqueue(TaskMessage message) throws InterruptedException {
+        message_queue.put(message);
+        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
+    }
+    
+    /**
+     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
+     */
+    public TaskMessage recv(int flags)  {
+        if ((flags & 0x01) == 0x01) { 
+            //non-blocking
+            return message_queue.poll();
+        } else {
+            try {
+                TaskMessage request = message_queue.take();
+                LOG.debug("request to be processed: {}", request);
+                return request;
+            } catch (InterruptedException e) {
+                LOG.info("exception within msg receiving", e);
+                return null;
+            }
+        }
+    }
+
+    /**
+     * register a newly created channel
+     * @param channel
+     */
+    protected void addChannel(Channel channel) {
+        allChannels.add(channel);
+    }
+    
+    /**
+     * close a channel
+     * @param channel
+     */
+    protected void closeChannel(Channel channel) {
+        channel.close().awaitUninterruptibly();
+        allChannels.remove(channel);
+    }
+
+    /**
+     * close all channels, and release resources
+     */
+    public synchronized void close() {
+        if (allChannels != null) {  
+            allChannels.close().awaitUninterruptibly();
+            factory.releaseExternalResources();
+            allChannels = null;
+        }
+    }
+
+    public void send(int task, byte[] message) {
+        throw new RuntimeException("Server connection should not send any messages");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
new file mode 100644
index 0000000..6fbfb1c
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -0,0 +1,104 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class StormClientHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
+    private Client client;
+    private AtomicBoolean being_closed;
+    long start_time; 
+    
+    StormClientHandler(Client client) {
+        this.client = client;
+        being_closed = new AtomicBoolean(false);
+        start_time = System.currentTimeMillis();
+    }
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
+        //register the newly established channel
+        Channel channel = event.getChannel();
+        client.setChannel(channel);
+        LOG.debug("connection established to a remote host");
+        
+        //send next request
+        try {
+            sendRequests(channel, client.takeMessages());
+        } catch (InterruptedException e) {
+            channel.close();
+        }
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
+        
+        //examine the response message from server
+        ControlMessage msg = (ControlMessage)event.getMessage();
+        if (msg==ControlMessage.FAILURE_RESPONSE)
+            LOG.info("failure response:{}", msg);
+
+        //send next request
+        Channel channel = event.getChannel();
+        try {
+            sendRequests(channel, client.takeMessages());
+        } catch (InterruptedException e) {
+            channel.close();
+        }
+    }
+
+    /**
+     * Retrieve a request from message queue, and send to server
+     * @param channel
+     */
+    private void sendRequests(Channel channel, final MessageBatch requests) {
+        if (requests==null || requests.size()==0 || being_closed.get()) return;
+
+        //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
+        Object last_msg = requests.get(requests.size()-1);
+        if (last_msg==ControlMessage.CLOSE_MESSAGE) {
+            being_closed.set(true);
+            requests.remove(last_msg);
+        }
+
+        //we may don't need do anything if no requests found
+        if (requests.isEmpty()) {
+            if (being_closed.get())
+                client.close_n_release();
+            return;
+        }
+
+        //write request into socket channel
+        ChannelFuture future = channel.write(requests);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+                if (!future.isSuccess()) {
+                    LOG.info("failed to send requests:", future.getCause());
+                    future.getChannel().close();
+                } else {
+                    LOG.debug("{} request(s) sent", requests.size());
+                }
+                if (being_closed.get())
+                    client.close_n_release();
+            }
+        });
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+        Throwable cause = event.getCause();
+        if (!(cause instanceof ConnectException)) {
+            LOG.info("Connection failed:", cause);
+        } 
+        if (!being_closed.get()) {
+            client.setChannel(null);
+            client.reconnect();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
new file mode 100644
index 0000000..91c513a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -0,0 +1,27 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+class StormClientPipelineFactory implements ChannelPipelineFactory {
+    private Client client;
+
+    StormClientPipelineFactory(Client client) {
+        this.client = client;        
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
+        // Create a default pipeline implementation.
+        ChannelPipeline pipeline = Channels.pipeline();
+
+        // Decoder
+        pipeline.addLast("decoder", new MessageDecoder());
+        // Encoder
+        pipeline.addLast("encoder", new MessageEncoder());
+        // business logic.
+        pipeline.addLast("handler", new StormClientHandler(client));
+
+        return pipeline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
new file mode 100644
index 0000000..9a5aaed
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -0,0 +1,53 @@
+package backtype.storm.messaging.netty;
+
+import backtype.storm.messaging.TaskMessage;
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+class StormServerHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
+    Server server;
+    private AtomicInteger failure_count; 
+    
+    StormServerHandler(Server server) {
+        this.server = server;
+        failure_count = new AtomicInteger(0);
+    }
+    
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
+        server.addChannel(e.getChannel());
+    }
+    
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
+        Object msg = e.getMessage();  
+        if (msg == null) return;
+
+        //end of batch?
+        if (msg==ControlMessage.EOB_MESSAGE) {
+            Channel channel = ctx.getChannel();
+            LOG.debug("Send back response ...");
+            if (failure_count.get()==0)
+                channel.write(ControlMessage.OK_RESPONSE);
+            else channel.write(ControlMessage.FAILURE_RESPONSE);
+            return;
+        }
+        
+        //enqueue the received message for processing
+        try {
+            server.enqueue((TaskMessage)msg);
+        } catch (InterruptedException e1) {
+            LOG.info("failed to enqueue a request message", e);
+            failure_count.incrementAndGet();
+        }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        server.closeChannel(e.getChannel());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
new file mode 100644
index 0000000..56b0834
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerPipelineFactory.java
@@ -0,0 +1,28 @@
+package backtype.storm.messaging.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+
+
+class StormServerPipelineFactory implements  ChannelPipelineFactory {
+    private Server server;
+    
+    StormServerPipelineFactory(Server server) {
+        this.server = server;        
+    }
+    
+    public ChannelPipeline getPipeline() throws Exception {
+        // Create a default pipeline implementation.
+        ChannelPipeline pipeline = Channels.pipeline();
+
+        // Decoder
+        pipeline.addLast("decoder", new MessageDecoder());
+        // Encoder
+        pipeline.addLast("encoder", new MessageEncoder());
+        // business logic.
+        pipeline.addLast("handler", new StormServerHandler(server));
+
+        return pipeline;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
new file mode 100644
index 0000000..eefcb48
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_integration_test.clj
@@ -0,0 +1,44 @@
+(ns backtype.storm.messaging.netty-integration-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TransportFactory])
+  (:import [backtype.storm.testing TestWordSpout TestGlobalCount])
+  (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(deftest test-integration
+  (with-simulated-time-local-cluster [cluster :supervisors 4 :supervisor-slot-port-min 6710
+                                      :daemon-conf {STORM-LOCAL-MODE-ZMQ true 
+                                                    STORM-MESSAGING-TRANSPORT  "backtype.storm.messaging.netty.Context"
+                                                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+                                                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                                                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                                                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                                                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                                                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                                                    }]
+    (let [topology (thrift/mk-topology
+                     {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
+                     {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
+                                               :parallelism-hint 6)})
+          results (complete-topology cluster
+                                     topology
+                                     ;; important for test that
+                                     ;; #tuples = multiple of 4 and 6
+                                     :storm-conf {TOPOLOGY-WORKERS 3}
+                                     :mock-sources {"1" [["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ["a"] ["b"]
+                                                         ]}
+                                     )]
+      (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+               (read-tuples results "2"))))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
new file mode 100644
index 0000000..12ebe5d
--- /dev/null
+++ b/storm-core/test/clj/backtype/storm/messaging/netty_unit_test.clj
@@ -0,0 +1,97 @@
+(ns backtype.storm.messaging.netty-unit-test
+  (:use [clojure test])
+  (:import [backtype.storm.messaging TransportFactory])
+  (:use [backtype.storm bootstrap testing util]))
+
+(bootstrap)
+
+(def port 6700) 
+(def task 1) 
+
+(deftest test-basic
+  (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        server (.bind context nil port)
+        client (.connect context nil "localhost" port)
+        _ (.send client task (.getBytes req_msg))
+        resp (.recv server 0)]
+    (is (= task (.task resp)))
+    (is (= req_msg (String. (.message resp))))
+    (.close client)
+    (.close server)
+    (.term context)))    
+
+(deftest test-large-msg
+  (let [req_msg (apply str (repeat 2048000 'c')) 
+        storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 102400
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        server (.bind context nil port)
+        client (.connect context nil "localhost" port)
+        _ (.send client task (.getBytes req_msg))
+        resp (.recv server 0)]
+    (is (= task (.task resp)))
+    (is (= req_msg (String. (.message resp))))
+    (.close client)
+    (.close server)
+    (.term context)))    
+    
+(deftest test-server-delayed
+    (let [req_msg (String. "0123456789abcdefghijklmnopqrstuvwxyz")
+       storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        client (.connect context nil "localhost" port)
+        _ (.send client task (.getBytes req_msg))
+        _ (Thread/sleep 1000)
+        server (.bind context nil port)
+        resp (.recv server 0)]
+    (is (= task (.task resp)))
+    (is (= req_msg (String. (.message resp))))
+    (.close client)
+    (.close server)
+    (.term context)))    
+
+(deftest test-batch
+  (let [storm-conf {STORM-MESSAGING-TRANSPORT "backtype.storm.messaging.netty.Context"
+                    STORM-MESSAGING-NETTY-BUFFER-SIZE 1024000
+                    STORM-MESSAGING-NETTY-MAX-RETRIES 10
+                    STORM-MESSAGING-NETTY-MIN-SLEEP-MS 1000 
+                    STORM-MESSAGING-NETTY-MAX-SLEEP-MS 5000
+                    STORM-MESSAGING-NETTY-SERVER-WORKER-THREADS 1
+                    STORM-MESSAGING-NETTY-CLIENT-WORKER-THREADS 1
+                    }
+        context (TransportFactory/makeContext storm-conf)
+        server (.bind context nil port)
+        client (.connect context nil "localhost" port)]
+    (doseq [num  (range 1 100000)]
+      (let [req_msg (str num)]
+        (.send client task (.getBytes req_msg))))
+    (doseq [num  (range 1 100000)]
+      (let [req_msg (str num)
+            resp (.recv server 0)
+            resp_msg (String. (.message resp))]
+        (is (= req_msg resp_msg))))
+    (.close client)
+    (.close server)
+    (.term context)))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/backtype/storm/messaging_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/messaging_test.clj b/storm-core/test/clj/backtype/storm/messaging_test.clj
index 6b44ea1..3c61cec 100644
--- a/storm-core/test/clj/backtype/storm/messaging_test.clj
+++ b/storm-core/test/clj/backtype/storm/messaging_test.clj
@@ -14,7 +14,7 @@
                                                       STORM-LOCAL-MODE-ZMQ 
                                                       (if transport-on? true false) 
                                                       STORM-MESSAGING-TRANSPORT 
-                                                      "backtype.storm.messaging.zmq"}]
+                                                      "backtype.storm.messaging.netty.Context"}]
       (let [topology (thrift/mk-topology
                        {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 2)}
                        {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-core/test/clj/zilch/test/mq.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/zilch/test/mq.clj b/storm-core/test/clj/zilch/test/mq.clj
deleted file mode 100644
index 756d29b..0000000
--- a/storm-core/test/clj/zilch/test/mq.clj
+++ /dev/null
@@ -1,86 +0,0 @@
-(ns zilch.test.mq
-  (:use clojure.test)
-  (:import [java.util Arrays UUID])
-  (:require [zilch.mq :as mq]))
-
-(defn uuid [] (str (UUID/randomUUID)))
-
-(defn random-msg []
-  (byte-array (map byte (for [i (range (Integer. (int (rand 100))))]
-    (Integer. (int (rand 100)))
-    ))))
-
-(def url
-     (str "inproc://" (uuid))
-     ;; (str "ipc://" (uuid))
-     ;; (str "tcp://127.0.0.1:" (+ 4000 (Math/round (rand 1000)))))
-     )
-
-(deftest zilch
-  (testing "zilch"
-    (testing "should be able to"
-
-      (testing "push / pull"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/pull)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/push)
-                             (mq/connect url))]
-            (let [msg (random-msg)
-                  push (future (mq/send s1 msg))
-                  pull (future (mq/recv s0))]
-              (is (Arrays/equals msg @pull))))))
-
-      (testing "pub / sub"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/pub)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/sub)
-                             (mq/subscribe)
-                             (mq/connect url))]
-            (let [msg (random-msg)
-                  pub (future (mq/send s0 msg))
-                  sub (future (mq/recv s1))]
-              (is (Arrays/equals msg @sub))))))
-
-      (testing "pair / pair"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/pair)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/pair)
-                             (mq/connect url))]
-            (let [msg0 (random-msg)
-                  pair0 (future (mq/send s0 msg0)
-                                (mq/recv s0))
-                  msg1 (random-msg)
-                  pair1 (future (mq/send s1 msg1)
-                                (mq/recv s1))]
-              (is (Arrays/equals msg1 @pair0))
-              (is (Arrays/equals msg0 @pair1))))))
-
-      (testing "req / rep"
-        (mq/with-context context 2
-          (with-open [s0 (-> context
-                             (mq/socket mq/rep)
-                             (mq/bind url))
-                      s1 (-> context
-                             (mq/socket mq/req)
-                             (mq/connect url))]
-            (let [msg (random-msg)
-                  req (future (mq/send s1 msg)
-                              (mq/recv s1))
-                  rep (future (mq/recv s0)
-                              (mq/send s0 msg))]
-              (is (Arrays/equals msg @req))))))
-
-      (testing "req / xrep")
-
-      (testing "xreq / rep")
-
-      (testing "xreq / xrep"))))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/project.clj
----------------------------------------------------------------------
diff --git a/storm-netty/project.clj b/storm-netty/project.clj
deleted file mode 100644
index 24905bf..0000000
--- a/storm-netty/project.clj
+++ /dev/null
@@ -1,13 +0,0 @@
-(def ROOT-DIR (subs *file* 0 (- (count *file*) (count "project.clj"))))
-(def VERSION (-> ROOT-DIR (str "/../VERSION") slurp (.trim)))
-
-(eval `(defproject storm/storm-netty ~VERSION
-  :dependencies [[storm/storm-core ~VERSION]
-                 [io.netty/netty "3.6.3.Final"]]
-  :java-source-paths ["src/jvm"]
-  :test-paths ["test/clj"]
-  :profiles {:release {}}
-  :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
-  :target-path "target"
-  :javac-options ["-target" "1.6" "-source" "1.6"]
-  :aot :all))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
deleted file mode 100644
index 91e4bd4..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Client.java
+++ /dev/null
@@ -1,205 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Random;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-import org.jboss.netty.bootstrap.ClientBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-
-class Client implements IConnection {
-    private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private final int max_retries;
-    private final int base_sleep_ms;
-    private final int max_sleep_ms;
-    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
-    private AtomicReference<Channel> channelRef;
-    private final ClientBootstrap bootstrap;
-    private InetSocketAddress remote_addr;
-    private AtomicInteger retries;
-    private final Random random = new Random();
-    private final ChannelFactory factory;
-    private final int buffer_size;
-    private final AtomicBoolean being_closed;
-
-    @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, String host, int port) {
-        message_queue = new LinkedBlockingQueue<Object>();
-        retries = new AtomicInteger(0);
-        channelRef = new AtomicReference<Channel>(null);
-        being_closed = new AtomicBoolean(false);
-
-        // Configure
-        buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
-        base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
-        max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-
-        if (maxWorkers > 0) {
-            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
-        } else {
-            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-        }
-        bootstrap = new ClientBootstrap(factory);
-        bootstrap.setOption("tcpNoDelay", true);
-        bootstrap.setOption("sendBufferSize", buffer_size);
-        bootstrap.setOption("keepAlive", true);
-
-        // Set up the pipeline factory.
-        bootstrap.setPipelineFactory(new StormClientPipelineFactory(this));
-
-        // Start the connection attempt.
-        remote_addr = new InetSocketAddress(host, port);
-        bootstrap.connect(remote_addr);
-    }
-
-    /**
-     * We will retry connection with exponential back-off policy
-     */
-    void reconnect() {
-        try {
-            int tried_count = retries.incrementAndGet();
-            if (tried_count <= max_retries) {
-                Thread.sleep(getSleepTimeMs());
-                LOG.info("Reconnect ... [{}]", tried_count);
-                bootstrap.connect(remote_addr);
-                LOG.debug("connection started...");
-            } else {
-                LOG.warn("Remote address is not reachable. We will close this client.");
-                close();
-            }
-        } catch (InterruptedException e) {
-            LOG.warn("connection failed", e);
-        }
-    }
-
-    /**
-     * # of milliseconds to wait per exponential back-off policy
-     */
-    private int getSleepTimeMs()
-    {
-        int backoff = 1 << retries.get();
-        int sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
-        if ( sleepMs > max_sleep_ms )
-            sleepMs = max_sleep_ms;
-        return sleepMs;
-    }
-
-    /**
-     * Enqueue a task message to be sent to server
-     */
-    public void send(int task, byte[] message) {
-        //throw exception if the client is being closed
-        if (being_closed.get()) {
-            throw new RuntimeException("Client is being closed, and does not take requests any more");
-        }
-
-        try {
-            message_queue.put(new TaskMessage(task, message));
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Take all enqueued messages from queue
-     * @return
-     * @throws InterruptedException
-     */
-    MessageBatch takeMessages()  throws InterruptedException {
-        //1st message
-        MessageBatch batch = new MessageBatch(buffer_size);
-        Object msg = message_queue.take();
-        batch.add(msg);
-
-        //we will discard any message after CLOSE
-        if (msg==ControlMessage.CLOSE_MESSAGE)
-            return batch;
-
-        while (!batch.isFull()) {
-            //peek the next message
-            msg = message_queue.peek();
-            //no more messages
-            if (msg == null) break;
-
-            //we will discard any message after CLOSE
-            if (msg==ControlMessage.CLOSE_MESSAGE) {
-                message_queue.take();
-                batch.add(msg);
-                break;
-            }
-
-            //try to add this msg into batch
-            if (!batch.tryAdd((TaskMessage) msg))
-                break;
-
-            //remove this message
-            message_queue.take();
-        }
-
-        return batch;
-    }
-
-    /**
-     * gracefully close this client.
-     *
-     * We will send all existing requests, and then invoke close_n_release() method
-     */
-    public synchronized void close() {
-        if (!being_closed.get()) {
-            //enqueue a CLOSE message so that shutdown() will be invoked
-            try {
-                message_queue.put(ControlMessage.CLOSE_MESSAGE);
-                being_closed.set(true);
-            } catch (InterruptedException e) {
-                close_n_release();
-            }
-        }
-    }
-
-    /**
-     * close_n_release() is invoked after all messages have been sent.
-     */
-    void  close_n_release() {
-        if (channelRef.get() != null)
-            channelRef.get().close().awaitUninterruptibly();
-
-        //we need to release resources
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                factory.releaseExternalResources();
-            }}).start();
-    }
-
-    public TaskMessage recv(int flags) {
-        throw new RuntimeException("Client connection should not receive any messages");
-    }
-
-    void setChannel(Channel channel) {
-        channelRef.set(channel);
-        //reset retries
-        if (channel != null)
-            retries.set(0);
-    }
-
-}
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
deleted file mode 100644
index bebd7b6..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Context.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.Map;
-import java.util.Vector;
-
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.IContext;
-
-public class Context implements IContext {
-    @SuppressWarnings("rawtypes")
-    private Map storm_conf;
-    private volatile Vector<IConnection> connections;
-    
-    /**
-     * initialization per Storm configuration 
-     */
-    @SuppressWarnings("rawtypes")
-    public void prepare(Map storm_conf) {
-       this.storm_conf = storm_conf;
-       connections = new Vector<IConnection>(); 
-    }
-
-    /**
-     * establish a server with a binding port
-     */
-    public IConnection bind(String storm_id, int port) {
-        IConnection server = new Server(storm_conf, port);
-        connections.add(server);
-        return server;
-    }
-
-    /**
-     * establish a connection to a remote server
-     */
-    public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, host, port);
-        connections.add(client);
-        return client;
-    }
-
-    /**
-     * terminate this context
-     */
-    public void term() {
-        for (IConnection conn : connections) {
-            conn.close();
-        }
-        connections = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
deleted file mode 100644
index 8b90005..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ /dev/null
@@ -1,51 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-
-enum ControlMessage {
-    CLOSE_MESSAGE((short)-100),
-    EOB_MESSAGE((short)-201),
-    OK_RESPONSE((short)-200),
-    FAILURE_RESPONSE((short)-400);
-
-    private short code;
-
-    //private constructor
-    private ControlMessage(short code) {
-        this.code = code;
-    }
-
-    /**
-     * Return a control message per an encoded status code
-     * @param encoded
-     * @return
-     */
-    static ControlMessage mkMessage(short encoded) {
-        for(ControlMessage cm: ControlMessage.values()) {
-          if(encoded == cm.code) return cm;
-        }
-        return null;
-    }
-
-    int encodeLength() {
-        return 2; //short
-    }
-    
-    /**
-     * encode the current Control Message into a channel buffer
-     * @throws Exception
-     */
-    ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
-        write(bout);
-        bout.close();
-        return bout.buffer();
-    }
-
-    void write(ChannelBufferOutputStream bout) throws Exception {
-        bout.writeShort(code);        
-    } 
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
deleted file mode 100644
index a2c52f4..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.util.ArrayList;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.buffer.ChannelBufferOutputStream;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-
-import backtype.storm.messaging.TaskMessage;
-
-class MessageBatch {
-    private int buffer_size;
-    private ArrayList<Object> msgs;
-    private int encoded_length;
-
-    MessageBatch(int buffer_size) {
-        this.buffer_size = buffer_size;
-        msgs = new ArrayList<Object>();
-        encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
-    }
-
-    void add(Object obj) {
-        if (obj == null)
-            throw new RuntimeException("null object forbidded in message batch");
-
-        if (obj instanceof TaskMessage) {
-            TaskMessage msg = (TaskMessage)obj;
-            msgs.add(msg);
-            encoded_length += msgEncodeLength(msg);
-            return;
-        }
-
-        if (obj instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage)obj;
-            msgs.add(msg);
-            encoded_length += msg.encodeLength();
-            return;
-        }
-
-        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
-    }
-
-    void remove(Object obj) {
-        if (obj == null) return;
-
-        if (obj instanceof TaskMessage) {
-            TaskMessage msg = (TaskMessage)obj;
-            msgs.remove(msg);
-            encoded_length -= msgEncodeLength(msg);
-            return;
-        }
-
-        if (obj instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage)obj;
-            msgs.remove(msg);
-            encoded_length -= msg.encodeLength();
-            return;
-        }
-    }
-
-    Object get(int index) {
-        return msgs.get(index);
-    }
-
-    /**
-     * try to add a TaskMessage to a batch
-     * @param taskMsg
-     * @return false if the msg could not be added due to buffer size limit; true otherwise
-     */
-    boolean tryAdd(TaskMessage taskMsg) {
-        if ((encoded_length + msgEncodeLength(taskMsg)) > buffer_size) 
-            return false;
-        add(taskMsg);
-        return true;
-    }
-
-    private int msgEncodeLength(TaskMessage taskMsg) {
-        if (taskMsg == null) return 0;
-
-        int size = 6; //INT + SHORT
-        if (taskMsg.message() != null) 
-            size += taskMsg.message().length;
-        return size;
-    }
-
-    /**
-     * Has this batch used up allowed buffer size
-     * @return
-     */
-    boolean isFull() {
-        return encoded_length >= buffer_size;
-    }
-
-    /**
-     * true if this batch doesn't have any messages 
-     * @return
-     */
-    boolean isEmpty() {
-        return msgs.isEmpty();
-    }
-
-    /**
-     * # of msgs in this batch
-     * @return
-     */
-    int size() {
-        return msgs.size();
-    }
-
-    /**
-     * create a buffer containing the encoding of this batch
-     */
-    ChannelBuffer buffer() throws Exception {
-        ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
-        
-        for (Object msg : msgs) 
-            if (msg instanceof TaskMessage)
-                writeTaskMessage(bout, (TaskMessage)msg);
-            else
-                ((ControlMessage)msg).write(bout);
-        
-        //add a END_OF_BATCH indicator
-        ControlMessage.EOB_MESSAGE.write(bout);
-
-        bout.close();
-
-        return bout.buffer();
-    }
-
-    /**
-     * write a TaskMessage into a stream
-     *
-     * Each TaskMessage is encoded as:
-     *  task ... short(2)
-     *  len ... int(4)
-     *  payload ... byte[]     *  
-     */
-    private void writeTaskMessage(ChannelBufferOutputStream bout, TaskMessage message) throws Exception {
-        int payload_len = 0;
-        if (message.message() != null)
-            payload_len =  message.message().length;
-
-        int task_id = message.task();
-        if (task_id > Short.MAX_VALUE)
-            throw new RuntimeException("Task ID should not exceed "+Short.MAX_VALUE);
-        
-        bout.writeShort((short)task_id);
-        bout.writeInt(payload_len);
-        if (payload_len >0)
-            bout.write(message.message());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
deleted file mode 100644
index 8190e44..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.buffer.ChannelBuffer;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.frame.FrameDecoder;
-import backtype.storm.messaging.TaskMessage;
-
-public class MessageDecoder extends FrameDecoder {    
-    /*
-     * Each ControlMessage is encoded as:
-     *  code (<0) ... short(2)
-     * Each TaskMessage is encoded as:
-     *  task (>=0) ... short(2)
-     *  len ... int(4)
-     *  payload ... byte[]     *  
-     */
-    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
-        // Make sure that we have received at least a short 
-        if (buf.readableBytes() < 2) {
-            //need more data
-            return null;
-        }
-
-        // Mark the current buffer position before reading task/len field
-        // because the whole frame might not be in the buffer yet.
-        // We will reset the buffer position to the marked position if
-        // there's not enough bytes in the buffer.
-        buf.markReaderIndex();
-
-        //read the short field
-        short code = buf.readShort();
-        
-        //case 1: Control message
-        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
-        if (ctrl_msg != null) return ctrl_msg;
-        
-        //case 2: task Message
-        short task = code;
-        
-        // Make sure that we have received at least an integer (length) 
-        if (buf.readableBytes() < 4) {
-            //need more data
-            buf.resetReaderIndex();
-            return null;
-        }
-
-        // Read the length field.
-        int length = buf.readInt();
-        if (length<=0) {
-            return new TaskMessage(task, null);
-        }
-        
-        // Make sure if there's enough bytes in the buffer.
-        if (buf.readableBytes() < length) {
-            // The whole bytes were not received yet - return null.
-            buf.resetReaderIndex();
-            return null;
-        }
-
-        // There's enough bytes in the buffer. Read it.
-        ChannelBuffer payload = buf.readBytes(length);
-
-        // Successfully decoded a frame.
-        // Return a TaskMessage object
-        return new TaskMessage(task,payload.array());
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
deleted file mode 100644
index c0ac8f1..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/MessageEncoder.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
-
-public class MessageEncoder extends OneToOneEncoder {    
-    @Override
-    protected Object encode(ChannelHandlerContext ctx, Channel channel, Object obj) throws Exception {
-        if (obj instanceof ControlMessage) {
-            return ((ControlMessage)obj).buffer();
-        }
-
-        if (obj instanceof MessageBatch) {
-            return ((MessageBatch)obj).buffer();
-        } 
-        
-        throw new RuntimeException("Unsupported encoding of object of class "+obj.getClass().getName());
-    }
-
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b63ed139/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
deleted file mode 100644
index 4119bbf..0000000
--- a/storm-netty/src/jvm/backtype/storm/messaging/netty/Server.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package backtype.storm.messaging.netty;
-
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.Executors;
-
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.messaging.IConnection;
-import backtype.storm.messaging.TaskMessage;
-import backtype.storm.utils.Utils;
-
-class Server implements IConnection {
-    private static final Logger LOG = LoggerFactory.getLogger(Server.class);
-    @SuppressWarnings("rawtypes")
-    Map storm_conf;
-    int port;
-    private LinkedBlockingQueue<TaskMessage> message_queue;
-    volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
-    final ChannelFactory factory;
-    final ServerBootstrap bootstrap;
-
-    @SuppressWarnings("rawtypes")
-    Server(Map storm_conf, int port) {
-        this.storm_conf = storm_conf;
-        this.port = port;
-        message_queue = new LinkedBlockingQueue<TaskMessage>();
-
-        // Configure the server.
-        int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
-        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
-
-        if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
-        } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
-        }
-        bootstrap = new ServerBootstrap(factory);
-        bootstrap.setOption("child.tcpNoDelay", true);
-        bootstrap.setOption("child.receiveBufferSize", buffer_size);
-        bootstrap.setOption("child.keepAlive", true);
-
-        // Set up the pipeline factory.
-        bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
-
-        // Bind and start to accept incoming connections.
-        Channel channel = bootstrap.bind(new InetSocketAddress(port));
-        allChannels.add(channel);
-    }
-
-    /**
-     * enqueue a received message 
-     * @param message
-     * @throws InterruptedException
-     */
-    protected void enqueue(TaskMessage message) throws InterruptedException {
-        message_queue.put(message);
-        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
-    }
-    
-    /**
-     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
-     */
-    public TaskMessage recv(int flags)  {
-        if ((flags & 0x01) == 0x01) { 
-            //non-blocking
-            return message_queue.poll();
-        } else {
-            try {
-                TaskMessage request = message_queue.take();
-                LOG.debug("request to be processed: {}", request);
-                return request;
-            } catch (InterruptedException e) {
-                LOG.info("exception within msg receiving", e);
-                return null;
-            }
-        }
-    }
-
-    /**
-     * register a newly created channel
-     * @param channel
-     */
-    protected void addChannel(Channel channel) {
-        allChannels.add(channel);
-    }
-    
-    /**
-     * close a channel
-     * @param channel
-     */
-    protected void closeChannel(Channel channel) {
-        channel.close().awaitUninterruptibly();
-        allChannels.remove(channel);
-    }
-
-    /**
-     * close all channels, and release resources
-     */
-    public synchronized void close() {
-        if (allChannels != null) {  
-            allChannels.close().awaitUninterruptibly();
-            factory.releaseExternalResources();
-            allChannels = null;
-        }
-    }
-
-    public void send(int task, byte[] message) {
-        throw new RuntimeException("Server connection should not send any messages");
-    }
-}


Mime
View raw message