storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ptgo...@apache.org
Subject [1/3] git commit: STORM-12 Reduce thread usage of Netty transport.
Date Fri, 11 Apr 2014 16:12:19 GMT
Repository: incubator-storm
Updated Branches:
  refs/heads/master 1be3d0f65 -> 1a0b46e95


STORM-12 Reduce thread usage of Netty transport.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/94c4d4d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/94c4d4d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/94c4d4d9

Branch: refs/heads/master
Commit: 94c4d4d9e6c4ce736141668c585818214a9d26cf
Parents: c621a6c
Author: Robert (Bobby) Evans <evans@yahoo-inc.com>
Authored: Tue Apr 1 15:32:31 2014 +0000
Committer: Robert (Bobby) Evans <evans@yahoo-inc.com>
Committed: Tue Apr 1 15:32:31 2014 +0000

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 184 +++++++++++++------
 .../backtype/storm/messaging/netty/Context.java |  30 ++-
 .../storm/messaging/netty/MessageBatch.java     |  55 +-----
 .../backtype/storm/messaging/netty/Server.java  |   2 +-
 .../messaging/netty/StormClientHandler.java     |  80 +++-----
 5 files changed, 183 insertions(+), 168 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index f15cd1d..6996b49 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -21,17 +21,21 @@ import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.TaskMessage;
 import backtype.storm.utils.Utils;
+
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
 import java.util.Random;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -39,38 +43,37 @@ import java.util.concurrent.atomic.AtomicReference;
 
 class Client implements IConnection {
     private static final Logger LOG = LoggerFactory.getLogger(Client.class);
+    private static final Timer TIMER = new Timer("netty-client-timer", true);
+
     private final int max_retries;
     private final long base_sleep_ms;
     private final long max_sleep_ms;
     private LinkedBlockingQueue<Object> message_queue; //entry should either be TaskMessage
or ControlMessage
     private AtomicReference<Channel> channelRef;
     private final ClientBootstrap bootstrap;
-    private InetSocketAddress remote_addr;
+    InetSocketAddress remote_addr;
     private AtomicInteger retries;
     private final Random random = new Random();
     private final ChannelFactory factory;
     private final int buffer_size;
     private final AtomicBoolean being_closed;
+    private boolean wait_for_requests;
 
     @SuppressWarnings("rawtypes")
-    Client(Map storm_conf, String host, int port) {
+    Client(Map storm_conf, ChannelFactory factory, String host, int port) {
+        this.factory = factory;
         message_queue = new LinkedBlockingQueue<Object>();
         retries = new AtomicInteger(0);
         channelRef = new AtomicReference<Channel>(null);
         being_closed = new AtomicBoolean(false);
+        wait_for_requests = false;
 
         // Configure
         buffer_size = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
         max_retries = Math.min(30, Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_RETRIES)));
         base_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MIN_SLEEP_MS));
         max_sleep_ms = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_MAX_SLEEP_MS));
-        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
 
-        if (maxWorkers > 0) {
-            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool(), maxWorkers);
-        } else {
-            factory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
-        }
         bootstrap = new ClientBootstrap(factory);
         bootstrap.setOption("tcpNoDelay", true);
         bootstrap.setOption("sendBufferSize", buffer_size);
@@ -88,19 +91,24 @@ class Client implements IConnection {
      * We will retry connection with exponential back-off policy
      */
     void reconnect() {
-        try {
-            int tried_count = retries.incrementAndGet();
-            if (tried_count <= max_retries) {
-                Thread.sleep(getSleepTimeMs());
-                LOG.info("Reconnect ... [{}]", tried_count);
-                bootstrap.connect(remote_addr);
-                LOG.debug("connection started...");
-            } else {
-                LOG.warn("Remote address is not reachable. We will close this client.");
-                close();
-            }
-        } catch (InterruptedException e) {
-            LOG.warn("connection failed", e);
+        close_n_release();
+
+        //reconnect only if it's not being closed
+        if (being_closed.get()) return;
+
+        final int tried_count = retries.incrementAndGet();
+        if (tried_count <= max_retries) {
+            long sleep = getSleepTimeMs();
+            LOG.info("Waiting {} ms before trying connection to {}", sleep, remote_addr);
+            TIMER.schedule(new TimerTask() {
+                @Override
+                public void run() { 
+                    LOG.info("Reconnect ... [{}] to {}", tried_count, remote_addr);
+                    bootstrap.connect(remote_addr);
+                }}, sleep);
+        } else {
+            LOG.warn(remote_addr+" is not reachable. We will close this client.");
+            close();
         }
     }
 
@@ -130,36 +138,94 @@ class Client implements IConnection {
 
         try {
             message_queue.put(new TaskMessage(task, message));
+
+            //resume delivery if it is waiting for requests
+            tryDeliverMessages(true);
         } catch (InterruptedException e) {
             throw new RuntimeException(e);
         }
     }
 
     /**
+     * Retrieve messages from queue, and delivery to server if any
+     */
+    synchronized void tryDeliverMessages(boolean only_if_waiting) throws InterruptedException
{
+        //just skip if delivery only if waiting, and we are not waiting currently
+        if (only_if_waiting && !wait_for_requests)  return;
+
+        //make sure that channel was not closed
+        Channel channel = channelRef.get();
+        if (channel == null)  return;
+        if (!channel.isOpen()) {
+            LOG.info("Channel to {} is no longer open.",remote_addr);
+            //The channel is not open yet. Reconnect?
+            reconnect();
+            return;
+        }
+
+        final MessageBatch requests = tryTakeMessages();
+        if (requests==null) {
+            wait_for_requests = true;
+            return;
+        }
+
+        //if channel is being closed and we have no outstanding messages,  let's close the
channel
+        if (requests.isEmpty() && being_closed.get()) {
+            close_n_release();
+            return;
+        }
+
+        //we are busily delivering messages, and will check queue upon response.
+        //When send() is called by senders, we should not thus call tryDeliverMessages().
+        wait_for_requests = false;
+
+        //write request into socket channel
+        ChannelFuture future = channel.write(requests);
+        future.addListener(new ChannelFutureListener() {
+            public void operationComplete(ChannelFuture future)
+                    throws Exception {
+                if (!future.isSuccess()) {
+                    LOG.info("failed to send "+requests.size()+" requests to "+remote_addr,
future.getCause());
+                    reconnect();
+                } else {
+                    LOG.debug("{} request(s) sent", requests.size());
+
+                    //Now that our requests have been sent, channel could be closed if needed
+                    if (being_closed.get())
+                        close_n_release();
+                }
+            }
+        });
+    }
+
+    /**
      * Take all enqueued messages from queue
-     * @return
+     * @return  batch of messages
      * @throws InterruptedException
+     *
+     * synchronized ... ensure that messages are delivered in the same order
+     * as they are added into queue
      */
-    MessageBatch takeMessages()  throws InterruptedException {
+    private MessageBatch tryTakeMessages() throws InterruptedException {
         //1st message
-        MessageBatch batch = new MessageBatch(buffer_size);
-        Object msg = message_queue.take();
-        batch.add(msg);
+        Object msg = message_queue.poll();
+        if (msg == null) return null;
 
+        MessageBatch batch = new MessageBatch(buffer_size);
         //we will discard any message after CLOSE
-        if (msg==ControlMessage.CLOSE_MESSAGE)
+        if (msg == ControlMessage.CLOSE_MESSAGE) {
+            LOG.info("Connection to {} is being closed", remote_addr);
+            being_closed.set(true);
             return batch;
+        }
 
-        while (!batch.isFull()) {
-            //peek the next message
-            msg = message_queue.peek();
-            //no more messages
-            if (msg == null) break;
-
-            //we will discard any message after CLOSE
-            if (msg==ControlMessage.CLOSE_MESSAGE) {
+        batch.add((TaskMessage)msg);
+        while (!batch.isFull() && ((msg = message_queue.peek())!=null)) {
+            //Is it a CLOSE message?
+            if (msg == ControlMessage.CLOSE_MESSAGE) {
                 message_queue.take();
-                batch.add(msg);
+                LOG.info("Connection to {} is being closed", remote_addr);
+                being_closed.set(true);
                 break;
             }
 
@@ -179,31 +245,29 @@ class Client implements IConnection {
      *
      * We will send all existing requests, and then invoke close_n_release() method
      */
-    public synchronized void close() {
-        if (!being_closed.get()) {
-            //enqueue a CLOSE message so that shutdown() will be invoked
-            try {
-                message_queue.put(ControlMessage.CLOSE_MESSAGE);
-                being_closed.set(true);
-            } catch (InterruptedException e) {
-                close_n_release();
-            }
+    public void close() {
+        //enqueue a CLOSE message so that shutdown() will be invoked
+        try {
+            message_queue.put(ControlMessage.CLOSE_MESSAGE);
+
+            //resume delivery if it is waiting for requests
+            tryDeliverMessages(true);
+        } catch (InterruptedException e) {
+            LOG.info("Interrupted Connection to {} is being closed", remote_addr);
+            being_closed.set(true);
+            close_n_release();
         }
     }
 
     /**
      * close_n_release() is invoked after all messages have been sent.
      */
-    void  close_n_release() {
-        if (channelRef.get() != null)
-            channelRef.get().close().awaitUninterruptibly();
-
-        //we need to release resources
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                factory.releaseExternalResources();
-            }}).start();
+    synchronized void close_n_release() {
+        if (channelRef.get() != null) {
+            channelRef.get().close();
+            LOG.debug("channel {} closed",remote_addr);
+            setChannel(null);
+        }
     }
 
     public TaskMessage recv(int flags) {
@@ -211,6 +275,10 @@ class Client implements IConnection {
     }
 
     void setChannel(Channel channel) {
+        if (channel != null && channel.isOpen()) {
+            //Assume the most recent connection attempt was successful.
+            retries.set(0);
+        }
         channelRef.set(channel);
         //reset retries
         if (channel != null)
@@ -218,7 +286,3 @@ class Client implements IConnection {
     }
 
 }
-
-
-
-

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 3e09dd1..80b4443 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -17,8 +17,16 @@
  */
 package backtype.storm.messaging.netty;
 
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+
+import java.util.concurrent.Executors;
+import java.util.Map;
+import java.util.Vector;
+
+import backtype.storm.Config;
 import backtype.storm.messaging.IConnection;
 import backtype.storm.messaging.IContext;
+import backtype.storm.utils.Utils;
 
 import java.util.Map;
 import java.util.Vector;
@@ -27,14 +35,25 @@ public class Context implements IContext {
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
-    
+    private NioClientSocketChannelFactory clientChannelFactory;
+
     /**
      * initialization per Storm configuration 
      */
     @SuppressWarnings("rawtypes")
     public void prepare(Map storm_conf) {
-       this.storm_conf = storm_conf;
-       connections = new Vector<IConnection>(); 
+        this.storm_conf = storm_conf;
+        connections = new Vector<IConnection>();
+
+        //each context will have a single client channel factory
+        int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
+        if (maxWorkers > 0) {
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                    Executors.newCachedThreadPool(), maxWorkers);
+        } else {
+            clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(),
+                    Executors.newCachedThreadPool());
+        }
     }
 
     /**
@@ -50,7 +69,7 @@ public class Context implements IContext {
      * establish a connection to a remote server
      */
     public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, host, port);
+        IConnection client =  new Client(storm_conf, clientChannelFactory, host, port);
         connections.add(client);
         return client;
     }
@@ -63,5 +82,8 @@ public class Context implements IContext {
             conn.close();
         }
         connections = null;
+
+        //we need to release resources associated with client channel factory
+        clientChannelFactory.releaseExternalResources();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
index 9d287e4..cd8d4e3 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/MessageBatch.java
@@ -26,56 +26,22 @@ import java.util.ArrayList;
 
 class MessageBatch {
     private int buffer_size;
-    private ArrayList<Object> msgs;
+    private ArrayList<TaskMessage> msgs;
     private int encoded_length;
 
     MessageBatch(int buffer_size) {
         this.buffer_size = buffer_size;
-        msgs = new ArrayList<Object>();
+        msgs = new ArrayList<TaskMessage>();
         encoded_length = ControlMessage.EOB_MESSAGE.encodeLength();
     }
 
-    void add(Object obj) {
+    void add(TaskMessage obj) {
         if (obj == null)
             throw new RuntimeException("null object forbidded in message batch");
 
-        if (obj instanceof TaskMessage) {
-            TaskMessage msg = (TaskMessage)obj;
-            msgs.add(msg);
-            encoded_length += msgEncodeLength(msg);
-            return;
-        }
-
-        if (obj instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage)obj;
-            msgs.add(msg);
-            encoded_length += msg.encodeLength();
-            return;
-        }
-
-        throw new RuntimeException("Unsuppoted object type "+obj.getClass().getName());
-    }
-
-    void remove(Object obj) {
-        if (obj == null) return;
-
-        if (obj instanceof TaskMessage) {
-            TaskMessage msg = (TaskMessage)obj;
-            msgs.remove(msg);
-            encoded_length -= msgEncodeLength(msg);
-            return;
-        }
-
-        if (obj instanceof ControlMessage) {
-            ControlMessage msg = (ControlMessage)obj;
-            msgs.remove(msg);
-            encoded_length -= msg.encodeLength();
-            return;
-        }
-    }
-
-    Object get(int index) {
-        return msgs.get(index);
+        TaskMessage msg = (TaskMessage)obj;
+        msgs.add(msg);
+        encoded_length += msgEncodeLength(msg);
     }
 
     /**
@@ -129,12 +95,9 @@ class MessageBatch {
     ChannelBuffer buffer() throws Exception {
         ChannelBufferOutputStream bout = new ChannelBufferOutputStream(ChannelBuffers.directBuffer(encoded_length));
         
-        for (Object msg : msgs) 
-            if (msg instanceof TaskMessage)
-                writeTaskMessage(bout, (TaskMessage)msg);
-            else
-                ((ControlMessage)msg).write(bout);
-        
+        for (TaskMessage msg : msgs)
+            writeTaskMessage(bout, msg);
+
         //add a END_OF_BATCH indicator
         ControlMessage.EOB_MESSAGE.write(bout);
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
index ad811b0..83e4187 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Server.java
@@ -123,7 +123,7 @@ class Server implements IConnection {
      * close all channels, and release resources
      */
     public synchronized void close() {
-        if (allChannels != null) {  
+        if (allChannels != null) {
             allChannels.close().awaitUninterruptibly();
             factory.releaseExternalResources();
             allChannels = null;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/94c4d4d9/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
index 65c36a7..43a8c39 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/StormClientHandler.java
@@ -17,7 +17,15 @@
  */
 package backtype.storm.messaging.netty;
 
-import org.jboss.netty.channel.*;
+import java.net.ConnectException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -27,12 +35,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class StormClientHandler extends SimpleChannelUpstreamHandler  {
     private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
     private Client client;
-    private AtomicBoolean being_closed;
-    long start_time; 
+    long start_time;
     
     StormClientHandler(Client client) {
         this.client = client;
-        being_closed = new AtomicBoolean(false);
         start_time = System.currentTimeMillis();
     }
 
@@ -41,13 +47,14 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler 
{
         //register the newly established channel
         Channel channel = event.getChannel();
         client.setChannel(channel);
-        LOG.debug("connection established to a remote host");
+        LOG.info("connection established from "+channel.getLocalAddress()+" to "+channel.getRemoteAddress());
         
-        //send next request
+        //send next batch of requests if any
         try {
-            sendRequests(channel, client.takeMessages());
-        } catch (InterruptedException e) {
-            channel.close();
+            client.tryDeliverMessages(false);
+        } catch (Exception ex) {
+            LOG.info("exception when sending messages:", ex.getMessage());
+            client.reconnect();
         }
     }
 
@@ -60,62 +67,21 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler 
{
         if (msg==ControlMessage.FAILURE_RESPONSE)
             LOG.info("failure response:{}", msg);
 
-        //send next request
-        Channel channel = event.getChannel();
+        //send next batch of requests if any
         try {
-            sendRequests(channel, client.takeMessages());
-        } catch (InterruptedException e) {
-            channel.close();
-        }
-    }
-
-    /**
-     * Retrieve a request from message queue, and send to server
-     * @param channel
-     */
-    private void sendRequests(Channel channel, final MessageBatch requests) {
-        if (requests==null || requests.size()==0 || being_closed.get()) return;
-
-        //if task==CLOSE_MESSAGE for our last request, the channel is to be closed
-        Object last_msg = requests.get(requests.size()-1);
-        if (last_msg==ControlMessage.CLOSE_MESSAGE) {
-            being_closed.set(true);
-            requests.remove(last_msg);
-        }
-
-        //we may don't need do anything if no requests found
-        if (requests.isEmpty()) {
-            if (being_closed.get())
-                client.close_n_release();
-            return;
+            client.tryDeliverMessages(false);
+        } catch (Exception ex) {
+            LOG.info("exception when sending messages:", ex.getMessage());
+            client.reconnect();
         }
-
-        //write request into socket channel
-        ChannelFuture future = channel.write(requests);
-        future.addListener(new ChannelFutureListener() {
-            public void operationComplete(ChannelFuture future)
-                    throws Exception {
-                if (!future.isSuccess()) {
-                    LOG.info("failed to send requests:", future.getCause());
-                    future.getChannel().close();
-                } else {
-                    LOG.debug("{} request(s) sent", requests.size());
-                }
-                if (being_closed.get())
-                    client.close_n_release();
-            }
-        });
     }
 
     @Override
     public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
         Throwable cause = event.getCause();
         if (!(cause instanceof ConnectException)) {
-            LOG.info("Connection failed:", cause);
-        } 
-        if (!being_closed.get()) {
-            client.setChannel(null);
-            client.reconnect();
+            LOG.info("Connection to "+client.remote_addr+" failed:", cause);
         }
+        client.reconnect();
     }
 }


Mime
View raw message