storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [08/14] Merge branch 'master' into idiomatic-clojure-01
Date Thu, 12 Jun 2014 21:11:06 GMT
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 6996b49..8d2d221 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -21,52 +21,57 @@ import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.Utils;
-
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
-import java.util.Timer;
-import java.util.TimerTask;
 import java.util.Random;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
-class Client implements IConnection {
+public class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
-    private static final Timer TIMER = new Timer("netty-client-timer", true);
-
+    private static final String PREFIX = "Netty-Client-";
     private final int max_retries;
     private final long base_sleep_ms;
     private final long max_sleep_ms;
-    private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage or ControlMessage
     private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    InetSocketAddress remote_addr;
-    private AtomicInteger retries;
+    private InetSocketAddress remote_addr;
+    
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
-    private final AtomicBoolean being_closed;
-    private boolean wait_for_requests;
+    private boolean closing;
+
+    private int messageBatchSize;
+    
+    private AtomicLong pendings;
+
+    MessageBatch messageBatch = null;
+    private AtomicLong flushCheckTimer;
+    private int flushCheckInterval;
+    private ScheduledExecutorService scheduler;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, ChannelFactory factory, String host, int port) {
+    Client(Map storm_conf, ChannelFactory factory, 
+            ScheduledExecutorService scheduler, String host, int port) {
         this.factory = factory;
-        message_queue = new LinkedBlockingQueue<Object>();
-        retries = new AtomicInteger(0);
+        this.scheduler = scheduler;
         channelRef = new AtomicReference<Channel>(null);
-        being_closed = new AtomicBoolean(false);
-        wait_for_requests = false;
+        closing = false;
+        pendings = new AtomicLong(0);
+        flushCheckTimer = new AtomicLong(Long.MAX_VALUE);
 
         // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -74,6 +79,13 @@ class Client implements IConnection {
         base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
 
+        this.messageBatchSize = Utils.getInt(storm_conf.get(Config.STORM_NETTY_MESSAGE_BATCH_SIZE), 262144);
+        
+        flushCheckInterval = Utils.getInt(storm_conf.get(Config.STORM_NETTY_FLUSH_CHECK_INTERVAL_MS), 10); // default 10 ms
+
+        LOG.info("New Netty Client, connect to " + host + ", " + port
+                + ", config: " + ", buffer_size: " + buffer_size);
+
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", buffer_size);
@@ -84,43 +96,88 @@ class Client implements IConnection {
 
         // Start the connection attempt.
         remote_addr = new InetSocketAddress(host, port);
-        bootstrap.connect(remote_addr);
+        
+        // setup the connection asyncly now
+        scheduler.execute(new Runnable() {
+            @Override
+            public void run() {   
+                connect();
+            }
+        });
+        
+        Runnable flusher = new Runnable() {
+            @Override
+            public void run() {
+
+                if(!closing) {
+                    long flushCheckTime = flushCheckTimer.get();
+                    long now = System.currentTimeMillis();
+                    if (now > flushCheckTime) {
+                        Channel channel = channelRef.get();
+                        if (null != channel && channel.isWritable()) {
+                            flush(channel);
+                        }
+                    }
+                }
+                
+            }
+        };
+        
+        long initialDelay = Math.min(30L * 1000, max_sleep_ms * max_retries); //max wait for 30s
+        scheduler.scheduleWithFixedDelay(flusher, initialDelay, flushCheckInterval, TimeUnit.MILLISECONDS);
     }
 
     /**
      * We will retry connection with exponential back-off policy
      */
-    void reconnect() {
-        close_n_release();
-
-        //reconnect only if it's not being closed
-        if (being_closed.get()) return;
-
-        final int tried_count = retries.incrementAndGet();
-        if (tried_count <= max_retries) {
-            long sleep = getSleepTimeMs();
-            LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
-            TIMER.schedule(new TimerTask() {
-                @Override
-                public void run() { 
-                    LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
-                    bootstrap.connect(remote_addr);
-                }}, sleep);
-        } else {
-            LOG.warn(remote_addr+" is not reachable. We will close this client.");
-            close();
+    private synchronized void connect() {
+        try {
+            if (channelRef.get() != null) {
+                return;
+            }
+            
+            Channel channel = null;
+
+            int tried = 0;
+            while (tried <= max_retries) {
+
+                LOG.info("Reconnect started for {}... [{}]", name(), tried);
+                LOG.debug("connection started...");
+
+                ChannelFuture future = bootstrap.connect(remote_addr);
+                future.awaitUninterruptibly();
+                Channel current = future.getChannel();
+                if (!future.isSuccess()) {
+                    if (null != current) {
+                        current.close();
+                    }
+                } else {
+                    channel = current;
+                    break;
+                }
+                Thread.sleep(getSleepTimeMs(tried));
+                tried++;  
+            }
+            if (null != channel) {
+                LOG.info("connection established to a remote host " + name() + ", " + channel.toString());
+                channelRef.set(channel);
+            } else {
+                close();
+                throw new RuntimeException("Remote address is not reachable. We will close this client " + name());
+            }
+        } catch (InterruptedException e) {
+            throw new RuntimeException("connection failed " + name(), e);
         }
     }
 
     /**
      * # of milliseconds to wait per exponential back-off policy
      */
-    private long getSleepTimeMs()
-    {
-        if (retries.get() > 30) {
+    private long getSleepTimeMs(int retries) {
+        if (retries > 30) {
            return max_sleep_ms;
         }
-        int backoff = 1 << retries.get();
+        int backoff = 1 << retries;
         long sleepMs = base_sleep_ms * Math.max(1, random.nextInt(backoff));
         if ( sleepMs > max_sleep_ms )
             sleepMs = max_sleep_ms;
@@ -128,133 +185,114 @@ class Client implements IConnection {
     }
 
     /**
-     * Enqueue a task message to be sent to server
+     * Enqueue task messages to be sent to server
      */
-    public void send(int task, byte[] message) {
-        //throw exception if the client is being closed
-        if (being_closed.get()) {
+    synchronized public void send(Iterator<TaskMessage> msgs) {
+
+        // throw exception if the client is being closed
+        if (closing) {
             throw new RuntimeException("Client is being closed, and does not take requests any more");
         }
-
-        try {
-            message_queue.put(new TaskMessage(task, message));
-
-            //resume delivery if it is waiting for requests
-            tryDeliverMessages(true);
-        } catch (InterruptedException e) {
-            throw new RuntimeException(e);
+        
+        if (null == msgs || !msgs.hasNext()) {
+            return;
         }
-    }
-
-    /**
-     * Retrieve messages from queue, and delivery to server if any
-     */
-    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException {
-        //just skip if delivery only if waiting, and we are not waiting currently
-        if (only_if_waiting && !wait_for_requests)  return;
 
-        //make sure that channel was not closed
         Channel channel = channelRef.get();
-        if (channel == null)  return;
-        if (!channel.isOpen()) {
-            LOG.info("Channel to {} is no longer open.",remote_addr);
-            //The channel is not open yet. Reconnect?
-            reconnect();
-            return;
+        if (null == channel) {
+            connect();
+            channel = channelRef.get();
         }
 
-        final MessageBatch requests = tryTakeMessages();
-        if (requests==null) {
-            wait_for_requests = true;
-            return;
-        }
+        while (msgs.hasNext()) {
+            TaskMessage message = msgs.next();
+            if (null == messageBatch) {
+                messageBatch = new MessageBatch(messageBatchSize);
+            }
 
-        //if channel is being closed and we have no outstanding messages,  let's close the channel
-        if (requests.isEmpty() && being_closed.get()) {
-            close_n_release();
-            return;
+            messageBatch.add(message);
+            if (messageBatch.isFull()) {
+                MessageBatch toBeFlushed = messageBatch;
+                flushRequest(channel, toBeFlushed);
+                messageBatch = null;
+            }
         }
 
-        //we are busily delivering messages, and will check queue upon response.
-        //When send() is called by senders, we should not thus call tryDeliverMessages().
-        wait_for_requests = false;
-
-        //write request into socket channel
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr, future.getCause());
-                    reconnect();
-                } else {
-                    LOG.debug("{} request(s) sent", requests.size());
-
-                    //Now that our requests have been sent, channel could be closed if needed
-                    if (being_closed.get())
-                        close_n_release();
-                }
+        if (null != messageBatch && !messageBatch.isEmpty()) {
+            if (channel.isWritable()) {
+                flushCheckTimer.set(Long.MAX_VALUE);
+                
+                // Flush as fast as we can to reduce the latency
+                MessageBatch toBeFlushed = messageBatch;
+                messageBatch = null;
+                flushRequest(channel, toBeFlushed);
+                
+            } else {
+                // when channel is NOT writable, it means the internal netty buffer is full. 
+                // In this case, we can try to buffer up more incoming messages.
+                flushCheckTimer.set(System.currentTimeMillis() + flushCheckInterval);
             }
-        });
+        }
+
     }
 
-    /**
-     * Take all enqueued messages from queue
-     * @return  batch of messages
-     * @throws InterruptedException
-     *
-     * synchronized ... ensure that messages are delivered in the same order
-     * as they are added into queue
-     */
-    private MessageBatch tryTakeMessages() throws InterruptedException {
-        //1st message
-        Object msg = message_queue.poll();
-        if (msg == null) return null;
-
-        MessageBatch batch = new MessageBatch(buffer_size);
-        //we will discard any message after CLOSE
-        if (msg == ControlMessage.CLOSE_MESSAGE) {
-            LOG.info("Connection to {} is being closed", remote_addr);
-            being_closed.set(true);
-            return batch;
+    public String name() {
+        if (null != remote_addr) {
+            return PREFIX + remote_addr.toString();
         }
+        return "";
+    }
 
-        batch.add((TaskMessage)msg);
-        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
-            //Is it a CLOSE message?
-            if (msg == ControlMessage.CLOSE_MESSAGE) {
-                message_queue.take();
-                LOG.info("Connection to {} is being closed", remote_addr);
-                being_closed.set(true);
-                break;
+    private synchronized void flush(Channel channel) {
+        if (!closing) {
+            if (null != messageBatch && !messageBatch.isEmpty()) {
+                MessageBatch toBeFlushed = messageBatch;
+                flushCheckTimer.set(Long.MAX_VALUE);
+                flushRequest(channel, toBeFlushed);
+                messageBatch = null;
             }
-
-            //try to add this msg into batch
-            if (!batch.tryAdd((TaskMessage) msg))
-                break;
-
-            //remove this message
-            message_queue.take();
         }
-
-        return batch;
     }
-
+    
     /**
      * gracefully close this client.
-     *
-     * We will send all existing requests, and then invoke close_n_release() method
+     * 
+     * We will send all existing requests, and then invoke close_n_release()
+     * method
      */
-    public void close() {
-        //enqueue a CLOSE message so that shutdown() will be invoked
-        try {
-            message_queue.put(ControlMessage.CLOSE_MESSAGE);
-
-            //resume delivery if it is waiting for requests
-            tryDeliverMessages(true);
-        } catch (InterruptedException e) {
-            LOG.info("Interrupted Connection to {} is being closed", remote_addr);
-            being_closed.set(true);
+    public synchronized void close() {
+        if (!closing) {
+            closing = true;
+            LOG.info("Closing Netty Client " + name());
+            
+            if (null != messageBatch && !messageBatch.isEmpty()) {
+                MessageBatch toBeFlushed = messageBatch;
+                Channel channel = channelRef.get();
+                if (channel != null) {
+                    flushRequest(channel, toBeFlushed);
+                }
+                messageBatch = null;
+            }
+        
+            //wait for pendings to exit
+            final long timeoutMilliSeconds = 600 * 1000; //600 seconds
+            final long start = System.currentTimeMillis();
+            
+            LOG.info("Waiting for pending batchs to be sent with "+ name() + "..., timeout: {}ms, pendings: {}", timeoutMilliSeconds, pendings.get());
+            
+            while(pendings.get() != 0) {
+                try {
+                    long delta = System.currentTimeMillis() - start;
+                    if (delta > timeoutMilliSeconds) {
+                        LOG.error("Timeout when sending pending batchs with {}..., there are still {} pending batchs not sent", name(), pendings.get());
+                        break;
+                    }
+                    Thread.sleep(1000); //sleep 1s
+                } catch (InterruptedException e) {
+                    break;
+                } 
+            }
+            
             close_n_release();
         }
     }
@@ -262,27 +300,51 @@ class Client implements IConnection {
     /**
      * close_n_release() is invoked after all messages have been sent.
      */
-    synchronized void close_n_release() {
+    private void close_n_release() {
         if (channelRef.get() != null) {
             channelRef.get().close();
             LOG.debug("channel {} closed",remote_addr);
-            setChannel(null);
         }
     }
 
-    public TaskMessage recv(int flags) {
+    @Override
+    public Iterator<TaskMessage> recv(int flags, int clientId) {
         throw new RuntimeException("Client connection should not receive any messages");
     }
 
-    void setChannel(Channel channel) {
-        if (channel != null && channel.isOpen()) {
-            //Assume the most recent connection attempt was successful.
-            retries.set(0);
-        }
-        channelRef.set(channel);
-        //reset retries
-        if (channel != null)
-            retries.set(0);
+    @Override
+    public void send(int taskId, byte[] payload) {
+        TaskMessage msg = new TaskMessage(taskId, payload);
+        List<TaskMessage> wrapper = new ArrayList<TaskMessage>(1);
+        wrapper.add(msg);
+        send(wrapper.iterator());
     }
 
-}
+    private void flushRequest(Channel channel, final MessageBatch requests) {
+        if (requests == null)
+            return;
+
+        pendings.incrementAndGet();
+        ChannelFuture future = channel.write(requests);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+
+                pendings.decrementAndGet();
+                if (!future.isSuccess()) {
+                    LOG.info(
+                            "failed to send requests to " + remote_addr.toString() + ": ", future.getCause());
+
+                    Channel channel = future.getChannel();
+
+                    if (null != channel) {
+                        channel.close();
+                        channelRef.compareAndSet(channel, null);
+                    }
+                } else {
+                    LOG.debug("{} request(s) sent", requests.size());
+                }
+            }
+        });
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 80b4443..f592aff 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,8 +18,12 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
 import java.util.Map;
 import java.util.Vector;
 
@@ -28,14 +32,16 @@ import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.IContext;
 import backtype.storm.utils.Utils;
 
-import java.util.Map;
-import java.util.Vector;
-
 public class Context implements IContext {
+    private static final Logger LOG = LoggerFactory.getLogger(Context.class);
+        
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
+    
+    private ScheduledExecutorService clientScheduleService;
+    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
     /**
      * initialization per Storm configuration 
@@ -47,13 +53,19 @@ public class Context implements IContext {
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
         if (maxWorkers > 0) {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool(), maxWorkers);
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
-                    Executors.newCachedThreadPool());
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
+                    Executors.newCachedThreadPool(workerFactory));
         }
+        
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
+        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -69,7 +81,8 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, clientChannelFactory, host, port);
+        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+                clientScheduleService, host, port);
         connections.add(client);
         return client;
     }
@@ -78,12 +91,22 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
+        clientScheduleService.shutdown();        
+        
         for (IConnection conn : connections) {
             conn.close();
         }
+        
+        try {
+            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Error when shutting down client scheduler", e);
+        }
+        
         connections = null;
 
         //we need to release resources associated with client channel factory
         clientChannelFactory.releaseExternalResources();
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
index a552cf7..b7335b3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/ControlMessage.java
@@ -17,6 +17,8 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.io.IOException;
+
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.buffer.ChannelBufferOutputStream;
 import org.jboss.netty.buffer.ChannelBuffers;
@@ -54,14 +56,14 @@ enum ControlMessage {
      * encode the current Control Message into a channel buffer
      * @throws Exception
      */
-    ChannelBuffer buffer() throws Exception {
+    ChannelBuffer buffer() throws IOException {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encodeLength()));      
         write(bout);
         bout.close();
         return bout.buffer();
     }
 
-    void write(ChannelBufferOutputStream bout) throws Exception {
+    void write(ChannelBufferOutputStream bout) throws IOException {
         bout.writeShort(code);        
     } 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index cd8d4e3..63c861a 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -44,6 +44,11 @@ class MessageBatch {
         encoded_length += msgEncodeLength(msg);
     }
 
+
+    TaskMessage get(int index) {
+        return msgs.get(index);
+    }
+
     /**
      * try to add a TaskMessage to a batch
      * @param taskMsg

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
index 3365e58..72c3cf7 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageDecoder.java
@@ -17,6 +17,9 @@
  */
 package backtype.storm.messaging.netty;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import backtype.storm.messaging.TaskMessage;
 import org.jboss.netty.buffer.ChannelBuffer;
 import org.jboss.netty.channel.Channel;
@@ -34,52 +37,79 @@ public class MessageDecoder extends FrameDecoder {
      */
     protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
         // Make sure that we have received at least a short 
-        if (buf.readableBytes() < 2) {
+        long available = buf.readableBytes();
+        if (available < 2) {
             //need more data
             return null;
         }
 
-        // Mark the current buffer position before reading task/len field
-        // because the whole frame might not be in the buffer yet.
-        // We will reset the buffer position to the marked position if
-        // there's not enough bytes in the buffer.
-        buf.markReaderIndex();
-
-        //read the short field
-        short code = buf.readShort();
-        
-        //case 1: Control message
-        ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
-        if (ctrl_msg != null) return ctrl_msg;
-        
-        //case 2: task Message
-        short task = code;
-        
-        // Make sure that we have received at least an integer (length) 
-        if (buf.readableBytes() < 4) {
-            //need more data
-            buf.resetReaderIndex();
-            return null;
-        }
+        List<Object> ret = new ArrayList<Object>();
+
+        // Use while loop, try to decode as more messages as possible in single call
+        while (available >= 2) {
+
+            // Mark the current buffer position before reading task/len field
+            // because the whole frame might not be in the buffer yet.
+            // We will reset the buffer position to the marked position if
+            // there's not enough bytes in the buffer.
+            buf.markReaderIndex();
+
+            // read the short field
+            short code = buf.readShort();
+            available -= 2;
+
+            // case 1: Control message
+            ControlMessage ctrl_msg = ControlMessage.mkMessage(code);
+            if (ctrl_msg != null) {
+
+                if (ctrl_msg == ControlMessage.EOB_MESSAGE) {
+                    continue;
+                } else {
+                    return ctrl_msg;
+                }
+            }
+
+            // case 2: task Message
+            short task = code;
 
-        // Read the length field.
-        int length = buf.readInt();
-        if (length<=0) {
-            return new TaskMessage(task, null);
+            // Make sure that we have received at least an integer (length)
+            if (available < 4) {
+                // need more data
+                buf.resetReaderIndex();
+                break;
+            }
+
+            // Read the length field.
+            int length = buf.readInt();
+
+            available -= 4;
+
+            if (length <= 0) {
+                ret.add(new TaskMessage(task, null));
+                break;
+            }
+
+            // Make sure if there's enough bytes in the buffer.
+            if (available < length) {
+                // The whole bytes were not received yet - return null.
+                buf.resetReaderIndex();
+                break;
+            }
+            available -= length;
+
+            // There's enough bytes in the buffer. Read it.
+            ChannelBuffer payload = buf.readBytes(length);
+
+
+            // Successfully decoded a frame.
+            // Return a TaskMessage object
+            ret.add(new TaskMessage(task, payload.array()));
         }
-        
-        // Make sure if there's enough bytes in the buffer.
-        if (buf.readableBytes() < length) {
-            // The whole bytes were not received yet - return null.
-            buf.resetReaderIndex();
+
+        if (ret.size() == 0) {
             return null;
+        } else {
+            return ret;
         }
-
-        // There's enough bytes in the buffer. Read it.
-        ChannelBuffer payload = buf.readBytes(length);
-
-        // Successfully decoded a frame.
-        // Return a TaskMessage object
-        return new TaskMessage(task,payload.array());
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
new file mode 100644
index 0000000..ea3f249
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/NettyRenameThreadFactory.java
@@ -0,0 +1,35 @@
+package backtype.storm.messaging.netty;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.netty.util.ThreadNameDeterminer;
+import org.jboss.netty.util.ThreadRenamingRunnable;
+
+public class NettyRenameThreadFactory  implements ThreadFactory {
+    
+    static {
+      //Rename Netty threads
+      ThreadRenamingRunnable.setThreadNameDeterminer(ThreadNameDeterminer.CURRENT);
+    }
+  
+    final ThreadGroup group;
+    final AtomicInteger index = new AtomicInteger(1);
+    final String name;
+
+    NettyRenameThreadFactory(String name) {
+        SecurityManager s = System.getSecurityManager();
+        group = (s != null)? s.getThreadGroup() :
+                             Thread.currentThread().getThreadGroup();
+        this.name = name;
+    }
+
+    public Thread newThread(Runnable r) {
+        Thread t = new Thread(group, r, name + "-" + index.getAndIncrement(), 0);
+        if (t.isDaemon())
+            t.setDaemon(false);
+        if (t.getPriority() != Thread.NORM_PRIORITY)
+            t.setPriority(Thread.NORM_PRIORITY);
+        return t;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index 83e4187..20a147d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -31,35 +31,69 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
 
 class Server implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Server.class);
     @SuppressWarnings("rawtypes")
     Map storm_conf;
     int port;
-    private LinkedBlockingQueue<TaskMessage> message_queue;
+    
+    // Create multiple queues for incoming messages. The size equals the number of receiver threads.
+    // For message which is sent to same task, it will be stored in the same queue to preserve the message order.
+    private LinkedBlockingQueue<ArrayList<TaskMessage>>[] message_queue;
+    
     volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
     final ChannelFactory factory;
     final ServerBootstrap bootstrap;
-
+    
+    private int queueCount;
+    HashMap<Integer, Integer> taskToQueueId = null;
+    int roundRobinQueueId;
+	
+    boolean closing = false;
+    List<TaskMessage> closeMessage = Arrays.asList(new TaskMessage(-1, null));
+    
+    
     @SuppressWarnings("rawtypes")
     Server(Map storm_conf, int port) {
         this.storm_conf = storm_conf;
         this.port = port;
-        message_queue = new LinkedBlockingQueue<TaskMessage>();
-
+        
+        queueCount = Utils.getInt(storm_conf.get(Config.WORKER_RECEIVER_THREAD_COUNT), 1);
+        roundRobinQueueId = 0;
+        taskToQueueId = new HashMap<Integer, Integer>();
+    
+        message_queue = new LinkedBlockingQueue[queueCount];
+        for (int i = 0; i < queueCount; i++) {
+            message_queue[i] = new LinkedBlockingQueue<ArrayList<TaskMessage>>();
+        }
+        
         // Configure the server.
         int buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_SERVER_WORKER_THREADS));
 
+        ThreadFactory bossFactory = new NettyRenameThreadFactory(name() + "-boss");
+        ThreadFactory workerFactory = new NettyRenameThreadFactory(name() + "-worker");
+        
         if (maxWorkers > 0) {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), maxWorkers);
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
+                Executors.newCachedThreadPool(workerFactory), maxWorkers);
         } else {
-            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+            factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), 
+                Executors.newCachedThreadPool(workerFactory));
         }
+        
+        LOG.info("Create Netty Server " + name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
+        
         bootstrap = new ServerBootstrap(factory);
         bootstrap.setOption("child.tcpNoDelay", true);
         bootstrap.setOption("child.receiveBufferSize", buffer_size);
@@ -72,36 +106,101 @@ class Server implements IConnection {
         Channel channel = bootstrap.bind(new InetSocketAddress(port));
         allChannels.add(channel);
     }
+    
+    private ArrayList<TaskMessage>[] groupMessages(List<TaskMessage> msgs) {
+      ArrayList<TaskMessage> messageGroups[] = new ArrayList[queueCount];
+      
+      for (int i = 0; i < msgs.size(); i++) {
+        TaskMessage message = msgs.get(i);
+        int task = message.task();
+        
+        if (task == -1) {
+          closing = true;
+          return null;
+        }
+        
+        Integer queueId = getMessageQueueId(task);
+        
+        if (null == messageGroups[queueId]) {
+          messageGroups[queueId] = new ArrayList<TaskMessage>();
+        }
+        messageGroups[queueId].add(message);
+      }
+      return messageGroups;
+    }
+    
+    private Integer getMessageQueueId(int task) {
+      // try to construct the map from taskId -> queueId in round robin manner.
+      
+      Integer queueId = taskToQueueId.get(task);
+      if (null == queueId) {
+        synchronized(taskToQueueId) {
+          //assgin task to queue in round-robin manner
+          if (null == taskToQueueId.get(task)) {
+            queueId = roundRobinQueueId++;
+            
+            taskToQueueId.put(task, queueId);
+            if (roundRobinQueueId == queueCount) {
+              roundRobinQueueId = 0;
+            }
+          }
+        }
+      }
+      return queueId;
+    }
 
     /**
      * enqueue a received message 
      * @param message
      * @throws InterruptedException
      */
-    protected void enqueue(TaskMessage message) throws InterruptedException {
-        message_queue.put(message);
-        LOG.debug("message received with task: {}, payload size: {}", message.task(), message.message().length);
+    protected void enqueue(List<TaskMessage> msgs) throws InterruptedException {
+      
+      if (null == msgs || msgs.size() == 0 || closing) {
+        return;
+      }
+      
+      ArrayList<TaskMessage> messageGroups[] = groupMessages(msgs);
+      
+      if (null == messageGroups || closing) {
+        return;
+      }
+      
+      for (int receiverId = 0; receiverId < messageGroups.length; receiverId++) {
+        ArrayList<TaskMessage> msgGroup = messageGroups[receiverId];
+        if (null != msgGroup) {
+          message_queue[receiverId].put(msgGroup);
+        }
+      }
     }
     
-    /**
-     * fetch a message from message queue synchronously (flags != 1) or asynchronously (flags==1)
-     */
-    public TaskMessage recv(int flags)  {
-        if ((flags & 0x01) == 0x01) { 
+    public Iterator<TaskMessage> recv(int flags, int receiverId)  {
+      if (closing) {
+        return closeMessage.iterator();
+      }
+      
+      ArrayList<TaskMessage> ret = null; 
+      int queueId = receiverId % queueCount;
+      if ((flags & 0x01) == 0x01) { 
             //non-blocking
-            return message_queue.poll();
+            ret = message_queue[queueId].poll();
         } else {
             try {
-                TaskMessage request = message_queue.take();
+                ArrayList<TaskMessage> request = message_queue[queueId].take();
                 LOG.debug("request to be processed: {}", request);
-                return request;
+                ret = request;
             } catch (InterruptedException e) {
                 LOG.info("exception within msg receiving", e);
-                return null;
+                ret = null;
             }
         }
+      
+      if (null != ret) {
+        return ret.iterator();
+      }
+      return null;
     }
-
+   
     /**
      * register a newly created channel
      * @param channel
@@ -133,4 +232,12 @@ class Server implements IConnection {
     public void send(int task, byte[] message) {
         throw new RuntimeException("Server connection should not send any messages");
     }
+    
+    public void send(Iterator<TaskMessage> msgs) {
+      throw new RuntimeException("Server connection should not send any messages");
+    }
+	
+    public String name() {
+      return "Netty-server-localhost-" + port;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
new file mode 100644
index 0000000..ae317aa
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientErrorHandler.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.messaging.netty;
+
+import java.net.ConnectException;
+
+import org.jboss.netty.channel.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class StormClientErrorHandler extends SimpleChannelUpstreamHandler  {
+    private static final Logger LOG = LoggerFactory.getLogger(StormClientErrorHandler.class);
+    private String name;
+    
+    StormClientErrorHandler(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
+        Throwable cause = event.getCause();
+        if (!(cause instanceof ConnectException)) {
+            LOG.info("Connection failed " + name, cause);
+        } 
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
deleted file mode 100644
index 43a8c39..0000000
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.messaging.netty;
-
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.ConnectException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class StormClientHandler extends SimpleChannelUpstreamHandler  {
-    private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
-    private Client client;
-    long start_time;
-    
-    StormClientHandler(Client client) {
-        this.client = client;
-        start_time = System.currentTimeMillis();
-    }
-
-    @Override
-    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent event) {
-        //register the newly established channel
-        Channel channel = event.getChannel();
-        client.setChannel(channel);
-        LOG.info("connection established from "+channel.getLocalAddress()+" to "+channel.getRemoteAddress());
-        
-        //send next batch of requests if any
-        try {
-            client.tryDeliverMessages(false);
-        } catch (Exception ex) {
-            LOG.info("exception when sending messages:", ex.getMessage());
-            client.reconnect();
-        }
-    }
-
-    @Override
-    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
-        LOG.debug("send/recv time (ms): {}", (System.currentTimeMillis() - start_time));
-        
-        //examine the response message from server
-        ControlMessage msg = (ControlMessage)event.getMessage();
-        if (msg==ControlMessage.FAILURE_RESPONSE)
-            LOG.info("failure response:{}", msg);
-
-        //send next batch of requests if any
-        try {
-            client.tryDeliverMessages(false);
-        } catch (Exception ex) {
-            LOG.info("exception when sending messages:", ex.getMessage());
-            client.reconnect();
-        }
-    }
-
-    @Override
-    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
-        Throwable cause = event.getCause();
-        if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection to "+client.remote_addr+" failed:", cause);
-        }
-        client.reconnect();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
index 6bad8e3..e6e8b3d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientPipelineFactory.java
@@ -37,7 +37,7 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
         // Encoder
         pipeline.addLast("encoder", new MessageEncoder());
         // business logic.
-        pipeline.addLast("handler", new StormClientHandler(client));
+        pipeline.addLast("handler", new StormClientErrorHandler(client.name()));
 
         return pipeline;
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
index 093fb61..bf9b79e 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormServerHandler.java
@@ -18,10 +18,14 @@
 package backtype.storm.messaging.netty;
 
 import backtype.storm.messaging.TaskMessage;
-import org.jboss.netty.channel.*;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
 class StormServerHandler extends SimpleChannelUpstreamHandler  {
@@ -41,30 +45,22 @@ class StormServerHandler extends SimpleChannelUpstreamHandler  {
     
     @Override
     public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
-        Object msg = e.getMessage();  
-        if (msg == null) return;
-
-        //end of batch?
-        if (msg==ControlMessage.EOB_MESSAGE) {
-            Channel channel = ctx.getChannel();
-            LOG.debug("Send back response ...");
-            if (failure_count.get()==0)
-                channel.write(ControlMessage.OK_RESPONSE);
-            else channel.write(ControlMessage.FAILURE_RESPONSE);
-            return;
-        }
-        
-        //enqueue the received message for processing
-        try {
-            server.enqueue((TaskMessage)msg);
-        } catch (InterruptedException e1) {
-            LOG.info("failed to enqueue a request message", e);
-            failure_count.incrementAndGet();
-        }
+      List<TaskMessage> msgs = (List<TaskMessage>) e.getMessage();
+      if (msgs == null) {
+        return;
+      }
+      
+      try {
+        server.enqueue(msgs);
+      } catch (InterruptedException e1) {
+        LOG.info("failed to enqueue a request message", e);
+        failure_count.incrementAndGet();
+      }
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+        LOG.error("server errors in handling the request", e.getCause());
         server.closeChannel(e.getChannel());
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
new file mode 100644
index 0000000..1570aeb
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventLogSpout.java
@@ -0,0 +1,139 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import static backtype.storm.utils.Utils.get;
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+
+public class TestEventLogSpout extends BaseRichSpout {
+    public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
+    
+    private static final Map<String, Integer> acked = new HashMap<String, Integer>();
+    private static final Map<String, Integer> failed = new HashMap<String, Integer>();
+    
+    private String uid;
+    private long totalCount;
+    
+    SpoutOutputCollector _collector;
+    private long eventId = 0;
+    private long myCount;
+    private int source;
+    
+    public static int getNumAcked(String stormId) {
+        synchronized(acked) {
+            return get(acked, stormId, 0);
+        }
+    }
+
+    public static int getNumFailed(String stormId) {
+        synchronized(failed) {
+            return get(failed, stormId, 0);
+        }
+    }
+    
+    public TestEventLogSpout(long totalCount) {
+        this.uid = UUID.randomUUID().toString();
+        
+        synchronized(acked) {
+            acked.put(uid, 0);
+        }
+        synchronized(failed) {
+            failed.put(uid, 0);
+        }
+        
+        this.totalCount = totalCount;
+    }
+        
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        this.source = context.getThisTaskId();
+        long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
+        myCount = totalCount / taskCount;
+    }
+    
+    public void close() {
+        
+    }
+    
+    public void cleanup() {
+        synchronized(acked) {            
+            acked.remove(uid);
+        } 
+        synchronized(failed) {            
+            failed.remove(uid);
+        }
+    }
+    
+    public boolean completed() {
+        
+        int ackedAmt;
+        int failedAmt;
+        
+        synchronized(acked) {
+            ackedAmt = acked.get(uid);
+        }
+        synchronized(failed) {
+            failedAmt = failed.get(uid);
+        }
+        int totalEmitted = ackedAmt + failedAmt;
+        
+        if (totalEmitted >= totalCount) {
+            return true;
+        }
+        return false;
+    }
+        
+    public void nextTuple() {
+        if (eventId < myCount) { 
+            eventId++;
+            _collector.emit(new Values(source, eventId), eventId);
+        }        
+    }
+    
+    public void ack(Object msgId) {
+        synchronized(acked) {
+            int curr = get(acked, uid, 0);
+            acked.put(uid, curr+1);
+        }
+    }
+
+    public void fail(Object msgId) {
+        synchronized(failed) {
+            int curr = get(failed, uid, 0);
+            failed.put(uid, curr+1);
+        }
+    }
+    
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("source", "eventId"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
new file mode 100644
index 0000000..1f80362
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/testing/TestEventOrderCheckBolt.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.testing;
+
+import backtype.storm.topology.OutputFieldsDeclarer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+
+public class TestEventOrderCheckBolt extends BaseRichBolt {
+    public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class);
+    
+    private int _count;
+    OutputCollector _collector;
+    Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
+
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        _collector = collector;
+        _count = 0;
+    }
+
+    public void execute(Tuple input) {
+        Integer sourceId = input.getInteger(0);
+        Long eventId = input.getLong(1);
+        Long recentEvent = recentEventId.get(sourceId);
+
+        if (null != recentEvent && eventId <= recentEvent) {
+            String error = "Error: event id is not in strict order! event source Id: "
+                    + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId;
+
+            _collector.emit(input, new Values(error));
+        }
+        recentEventId.put(sourceId, eventId);
+
+        _collector.ack(input);
+    }
+
+    public void cleanup() {
+
+    }
+
+    public Fields getOutputFields() {
+        return new Fields("error");
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("error"));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
index 56751c6..8c5b466 100644
--- a/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/backtype/storm/utils/DisruptorQueue.java
@@ -51,8 +51,11 @@ public class DisruptorQueue implements IStatefulObject {
     // TODO: consider having a threadlocal cache of this variable to speed up reads?
     volatile boolean consumerStartedFlag = false;
     ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue();
+    private static String PREFIX = "disruptor-";
+    private String _queueName = "";
     
-    public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
+    public DisruptorQueue(String queueName, ClaimStrategy claim, WaitStrategy wait) {
+         this._queueName = PREFIX + queueName;
         _buffer = new RingBuffer<MutableObject>(new ObjectEventFactory(), claim, wait);
         _consumer = new Sequence();
         _barrier = _buffer.newBarrier();
@@ -62,6 +65,10 @@ public class DisruptorQueue implements IStatefulObject {
         }
     }
     
+    public String getName() {
+      return _queueName;
+    }
+    
     public void consumeBatch(EventHandler<Object> handler) {
         consumeBatchToCursor(_barrier.getCursor(), handler);
     }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
new file mode 100644
index 0000000..0e53632
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import backtype.storm.messaging.IConnection;
+import backtype.storm.messaging.TaskMessage;
+
+public class TransferDrainer {
+
+  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
+  
+  public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) {
+    for (String key : workerTupleSetMap.keySet()) {
+      
+      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
+      if (null == bundle) {
+        bundle = new ArrayList<ArrayList<TaskMessage>>();
+        bundles.put(key, bundle);
+      }
+      
+      ArrayList tupleSet = workerTupleSetMap.get(key);
+      if (null != tupleSet && tupleSet.size() > 0) {
+        bundle.add(tupleSet);
+      }
+    } 
+  }
+  
+  public void send(HashMap<String, IConnection> connections) {
+    for (String hostPort : bundles.keySet()) {
+      IConnection connection = connections.get(hostPort);
+      if (null != connection) { 
+        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
+        Iterator<TaskMessage> iter = getBundleIterator(bundle);
+        if (null != iter && iter.hasNext()) {
+          connection.send(iter);
+        }
+      }
+    } 
+  }
+  
+  private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
+    
+    if (null == bundle) {
+      return null;
+    }
+    
+    return new Iterator<TaskMessage> () {
+      
+      private int offset = 0;
+      private int size = 0;
+      {
+        for (ArrayList<TaskMessage> list : bundle) {
+            size += list.size();
+        }
+      }
+      
+      private int bundleOffset = 0;
+      private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
+      
+      @Override
+      public boolean hasNext() {
+        if (offset < size) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      public TaskMessage next() {
+        TaskMessage msg = null;
+        if (iter.hasNext()) {
+          msg = iter.next(); 
+        } else {
+          bundleOffset++;
+          iter = bundle.get(bundleOffset).iterator();
+          msg = iter.next();
+        }
+        if (null != msg) {
+          offset++;
+        }
+        return msg;
+      }
+
+      @Override
+      public void remove() {
+        throw new RuntimeException("not supported");
+      }
+    };
+  }
+  
+  public void clear() {
+    bundles.clear();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/jvm/backtype/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/Utils.java b/storm-core/src/jvm/backtype/storm/utils/Utils.java
index a1fed96..6a0a447 100644
--- a/storm-core/src/jvm/backtype/storm/utils/Utils.java
+++ b/storm-core/src/jvm/backtype/storm/utils/Utils.java
@@ -25,6 +25,7 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.io.PrintStream;
 import java.net.URL;
 import java.net.URLDecoder;
 import java.nio.ByteBuffer;
@@ -37,6 +38,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Scanner;
 import java.util.TreeMap;
 import java.util.UUID;
 
@@ -301,15 +303,39 @@ public class Utils {
     }
     
     public static Integer getInt(Object o) {
-        if(o instanceof Long) {
-            return ((Long) o ).intValue();
-        } else if (o instanceof Integer) {
-            return (Integer) o;
-        } else if (o instanceof Short) {
-            return ((Short) o).intValue();
-        } else {
-            throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
-        }
+      Integer result = getInt(o, null);
+      if (null == result) {
+        throw new IllegalArgumentException("Don't know how to convert null + to int");
+      }
+      return result;
+    }
+    
+    public static Integer getInt(Object o, Integer defaultValue) {
+      if (null == o) {
+        return defaultValue;
+      }
+      
+      if(o instanceof Long) {
+          return ((Long) o ).intValue();
+      } else if (o instanceof Integer) {
+          return (Integer) o;
+      } else if (o instanceof Short) {
+          return ((Short) o).intValue();
+      } else {
+          throw new IllegalArgumentException("Don't know how to convert " + o + " + to int");
+      }
+    }
+
+    public static boolean getBoolean(Object o, boolean defaultValue) {
+      if (null == o) {
+        return defaultValue;
+      }
+      
+      if(o instanceof Boolean) {
+          return (Boolean) o;
+      } else {
+          throw new IllegalArgumentException("Don't know how to convert " + o + " + to boolean");
+      }
     }
     
     public static long secureRandomLong() {
@@ -373,7 +399,7 @@ public class Utils {
         ret.start();
         return ret;
     }
-
+    
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port) {
         CuratorFramework ret = newCurator(conf, servers, port);
         ret.start();

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/component.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/component.html b/storm-core/src/ui/public/component.html
new file mode 100644
index 0000000..90ca630
--- /dev/null
+++ b/storm-core/src/ui/public/component.html
@@ -0,0 +1,88 @@
+<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01//EN" "http://www.w3.org/TR/html4/strict.dtd">
+<html><head>
+<title>Storm UI</title>
+<link href="/css/bootstrap-1.4.0.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css" rel="stylesheet" type="text/css">
+<script src="/js/jquery-1.6.2.min.js" type="text/javascript"></script>
+<script src="/js/jquery.tablesorter.min.js" type="text/javascript"></script>
+<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
+<script src="/js/jquery.mustache.js" type="text/javascript"></script>
+<script src="/js/purl.js" type="text/javascript"></script>
+<script src="/js/bootstrap-twipsy.js" type="text/javascript"></script>
+<script src="/js/script.js" type="text/javascript"></script>
+</head>
+<body>
+<h1><a href="/">Storm UI</a></h1>
+<div id="component-summary">
+</div>
+<div id="component-stats-detail">
+</div>
+<div id="component-input-stats">
+</div>
+<div id="component-output-stats">
+</div>
+<div id="component-executor-stats">
+</div>
+<div id="component-errors">
+</div>
+<div id="json-response-error">
+</div>
+<p id="toggle-switch" style="display: block;" class="js-only"></p>
+<script>
+$(document).ready(function() {
+    var componentId = $.url().param("id");
+    var topologyId = $.url().param("topology_id");
+    var window = $.url().param("window");
+    var sys = $.cookies.get("sys") || "false";
+    var url = "/api/v1/topology/"+topologyId+"/component/"+componentId+"?sys="+sys;
+    if(window) url += "&window="+window;
+    renderToggleSys($("#toggle-switch"));
+    $.ajaxSetup({
+        "error":function(jqXHR,textStatus,response) {
+            var errorJson = jQuery.parseJSON(jqXHR.responseText);
+            $.get("/templates/json-error-template.html", function(template) {
+                $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
+            });
+        }
+    });
+
+    $.getJSON(url,function(response,status,jqXHR) {
+        var componentSummary = $("#component-summary");
+        var componentStatsDetail = $("#component-stats-detail")
+        var inputStats = $("#component-input-stats");
+        var outputStats = $("#component-output-stats");
+        var executorStats = $("#component-executor-stats");
+        var componentErrors = $("#component-errors");
+        $.get("/templates/component-page-template.html", function(template) {
+            componentSummary.append(Mustache.render($(template).filter("#component-summary-template").html(),response));
+            if(response["componentType"] == "spout") {
+                componentStatsDetail.append(Mustache.render($(template).filter("#spout-stats-detail-template").html(),response));
+                $("#spout-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
+                outputStats.append(Mustache.render($(template).filter("#output-stats-template").html(),response));
+                $("#output-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
+                executorStats.append(Mustache.render($(template).filter("#executor-stats-template").html(),response));
+                $("#executor-stats-table").tablesorter({ sortList: [[0,0]], headers: {1: { sorter: "stormtimestr"}}});
+            } else {
+                componentStatsDetail.append(Mustache.render($(template).filter("#bolt-stats-template").html(),response));
+                $("#bolt-stats-table").tablesorter({ sortList: [[0,0]], headers: {0: { sorter: "stormtimestr"}}});
+                inputStats.append(Mustache.render($(template).filter("#bolt-input-stats-template").html(),response));
+                if (response["inputStats"].length > 0) {
+                    $("#bolt-input-stats-table").tablesorter({ sortList: [[0,0]], headers: {}});
+                }
+                outputStats.append(Mustache.render($(template).filter("#bolt-output-stats-template").html(),response));
+                $("#bolt-output-stats-table").tablesorter({ sortList: [[0,0]], headers: {}});
+                executorStats.append(Mustache.render($(template).filter("#bolt-executor-template").html(),response));
+                if(response["outputStats"].length > 0) {
+                    $("#bolt-executor-table").tablesorter({ sortList: [[0,0]], headers: {}});
+                }
+            }
+            componentErrors.append(Mustache.render($(template).filter("#component-errors-template").html(),response));
+            if(response["componentErrors"].length > 0) {
+                $("#component-errors-table").tablesorter({ sortList: [[0,0]], headers: {1: { sorter: "stormtimestr"}}});
+            }
+        });
+    });
+});
+</script>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/55e1664d/storm-core/src/ui/public/index.html
----------------------------------------------------------------------
diff --git a/storm-core/src/ui/public/index.html b/storm-core/src/ui/public/index.html
new file mode 100644
index 0000000..77af159
--- /dev/null
+++ b/storm-core/src/ui/public/index.html
@@ -0,0 +1,73 @@
+<html><head>
+<title>Storm UI</title>
+<link href="/css/bootstrap-1.4.0.css" rel="stylesheet" type="text/css">
+<link href="/css/style.css" rel="stylesheet" type="text/css">
+<script src="/js/jquery-1.6.2.min.js" type="text/javascript"></script>
+<script src="/js/jquery.tablesorter.min.js" type="text/javascript"></script>
+<script src="/js/jquery.cookies.2.2.0.min.js" type="text/javascript"></script>
+<script src="/js/jquery.mustache.js" type="text/javascript"></script>
+<script src="/js/bootstrap-twipsy.js" type="text/javascript"></script>
+<script src="/js/script.js" type="text/javascript"></script>
+</head>
+<body>
+<h1><a href="/">Storm UI</a></h1>
+<h2>Cluster Summary</h2>
+<div id="cluster-summary">
+</div>
+<h2>Topology summary</h2>
+<div id="topology-summary">
+</div>
+<h2>Supervisor summary</h2>
+<div id="supervisor-summary">
+</div>
+<h2>Nimbus Configuration</h2>
+<div id="nimbus-configuration"></div>
+<div id="json-response-error"></div>
+</body>
+<script>
+$(document).ready(function() {
+    $.ajaxSetup({
+        "error":function(jqXHR,textStatus,response) {
+            var errorJson = jQuery.parseJSON(jqXHR.responseText);
+            $.get("/templates/json-error-template.html", function(template) {
+                $("#json-response-error").append(Mustache.render($(template).filter("#json-error-template").html(),errorJson));
+            });
+        }
+    });
+    var template = $.get("/templates/index-page-template.html");
+    var clusterSummary = $("#cluster-summary");
+    var topologySummary = $("#topology-summary");
+    var supervisorSummary = $("#supervisor-summary");
+    var config = $("#nimbus-configuration");
+
+    $.getJSON("/api/v1/cluster/summary",function(response,status,jqXHR) {
+        $.get("/templates/index-page-template.html", function(template) {
+            clusterSummary.append(Mustache.render($(template).filter("#cluster-summary-template").html(),response));
+        });
+    });
+    $.getJSON("/api/v1/topology/summary",function(response,status,jqXHR) {
+      $.get("/templates/index-page-template.html", function(template) {
+          topologySummary.append(Mustache.render($(template).filter("#topology-summary-template").html(),response));
+          if(response["topologies"].length > 0) {
+              $("#topology-summary-table").tablesorter({ sortList: [[0,0]], headers: {3: { sorter: "stormtimestr"}}});
+              }
+      });
+    });
+    $.getJSON("/api/v1/supervisor/summary",function(response,status,jqXHR) {
+      $.get("/templates/index-page-template.html", function(template) {
+          supervisorSummary.append(Mustache.render($(template).filter("#supervisor-summary-template").html(),response));
+          if(response["supervisors"].length > 0) {
+              $("#supervisor-summary-table").tablesorter({ sortList: [[0,0]], headers: {3: { sorter: "stormtimestr"}}});
+          }
+      });
+    });
+    $.getJSON("/api/v1/cluster/configuration",function(response,status,jqXHR) {
+      var formattedResponse = formatConfigData(response);
+      $.get("/templates/index-page-template.html", function(template) {
+          config.append(Mustache.render($(template).filter("#configuration-template").html(),formattedResponse));
+          $("#nimbus-configuration-table").tablesorter({ sortList: [[0,0]], headers: {}});
+      });
+    });
+  });
+</script>
+</html>


Mime
View raw message