storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject [15/23] storm git commit: Abandone the idea of scheduling timeouts as needed as it's still performing worse than the original version
Date Tue, 14 Jul 2015 19:06:46 GMT
Abandone the idea of scheduling timeouts as needed as it's still performing worse than the
original version


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afa638cd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afa638cd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afa638cd

Branch: refs/heads/0.9.x-branch
Commit: afa638cdbf2440d88af3775c0f02b4dc792922b0
Parents: 832b5db
Author: Enno Shioji <eshioji@gmail.com>
Authored: Wed Jun 3 00:26:53 2015 +0100
Committer: Enno Shioji <eshioji@gmail.com>
Committed: Wed Jun 3 00:26:53 2015 +0100

----------------------------------------------------------------------
 .../backtype/storm/messaging/netty/Client.java  | 127 ++++++++++---------
 .../backtype/storm/messaging/netty/Context.java |  32 +++--
 2 files changed, 85 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/afa638cd/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
index 0d75448..3652886 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Client.java
@@ -23,6 +23,7 @@ import backtype.storm.messaging.TaskMessage;
 import backtype.storm.metric.api.IStatefulObject;
 import backtype.storm.utils.StormBoundedExponentialBackoffRetry;
 import backtype.storm.utils.Utils;
+import com.google.common.base.Throwables;
 import org.jboss.netty.bootstrap.ClientBootstrap;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFactory;
@@ -41,6 +42,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
@@ -120,14 +122,14 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      */
     private final int messageBatchSize;
 
-    private final HashedWheelTimer scheduler;
+    private final ScheduledExecutorService scheduler;
 
     private final Object pendingMessageLock = new Object();
     private MessageBatch pendingMessage;
     private Timeout pendingFlush;
 
     @SuppressWarnings("rawtypes")
-    Client(Map stormConf, ChannelFactory factory, HashedWheelTimer scheduler, String host,
int port) {
+    Client(Map stormConf, ChannelFactory factory, ScheduledExecutorService scheduler, String
host, int port) {
         closing = false;
         this.scheduler = scheduler;
         int bufferSize = Utils.getInt(stormConf.get(Config.STORM_MESSAGING_NETTY_BUFFER_SIZE));
@@ -147,7 +149,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
         // Dummy values to avoid null checks
         pendingMessage = new MessageBatch(messageBatchSize);
-        pendingFlush = scheduler.newTimeout(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
+        scheduler.scheduleWithFixedDelay(new Flush(pendingMessage), 10, TimeUnit.MILLISECONDS);
     }
 
     private ClientBootstrap createClientBootstrap(ChannelFactory factory, int bufferSize)
{
@@ -489,37 +491,33 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * This task runs on a single thread shared among all clients, and thus
      * should not perform operations that block or are expensive.
      */
-    private class Flush implements TimerTask {
-        private final MessageBatch instructor;
-
-        private Flush(MessageBatch instructor) {
-            this.instructor = instructor;
-        }
-
+    private class Flush implements Runnable {
         @Override
-        public void run(Timeout timeout) throws Exception {
-            MessageBatch toSend;
-            MessageBatch replacement = new MessageBatch(messageBatchSize);
-            synchronized (pendingMessageLock){
-                if(instructor == pendingMessage){
-                    // It's still the batch which scheduled this timeout
-                    toSend = pendingMessage;
-                    pendingMessage = replacement;
-                    checkState(!toSend.isFull(), "Only unfilled batches should get timeouts
scheduled");
-                } else {
-                    // It's no longer the batch which scheduled this timeout
-                    // No need to work on this one
-                    toSend = null;
-                }
-            }
-
-            if(toSend!=null){
+        public void run() {
+            try {
                 Channel channel = getConnectedChannel();
-                if(channel == null) {
-                    dropMessages(toSend);
+                if (channel == null || !channel.isWritable()) {
+                    // Connection not available or buffer is full, no point in flushing
+                    return;
                 } else {
+                    // Connection is available and there is room in Netty's buffer
+                    MessageBatch toSend;
+                    synchronized (pendingMessageLock) {
+                        if(pendingMessage.isEmpty()){
+                            // Nothing to flush
+                            return;
+                        } else {
+                            toSend = pendingMessage;
+                            pendingMessage = new MessageBatch(messageBatchSize);
+                        }
+                    }
+                    checkState(!toSend.isFull(), "Filled batches should never be in pendingMessage
field");
+
                     flushMessages(channel, toSend);
                 }
+            }catch (Throwable e){
+                LOG.error("Uncaught throwable", e);
+                throw Throwables.propagate(e);
             }
         }
     }
@@ -529,7 +527,7 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
      * This task runs on a single thread shared among all clients, and thus
      * should not perform operations that block.
      */
-    private class Connect implements TimerTask {
+    private class Connect implements Runnable {
 
         private final InetSocketAddress address;
 
@@ -548,41 +546,46 @@ public class Client extends ConnectionWithStatus implements IStatefulObject
{
 
 
         @Override
-        public void run(Timeout timeout) throws Exception {
-            if (reconnectingAllowed()) {
-                final int connectionAttempt = connectionAttempts.getAndIncrement();
-                totalConnectionAttempts.getAndIncrement();
-
-                LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
-                ChannelFuture future = bootstrap.connect(address);
-                future.addListener(new ChannelFutureListener() {
-                    @Override
-                    public void operationComplete(ChannelFuture future) throws Exception
{
-                        // This call returns immediately
-                        Channel newChannel = future.getChannel();
-
-                        if (future.isSuccess() && connectionEstablished(newChannel))
{
-                            boolean setChannel = channelRef.compareAndSet(null, newChannel);
-                            checkState(setChannel);
-                            LOG.debug("successfully connected to {}, {} [attempt {}]", address.toString(),
newChannel.toString(),
-                                    connectionAttempt);
-                            if (messagesLost.get() > 0) {
-                                LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
-                            }
-                        } else {
-                            Throwable cause = future.getCause();
-                            reschedule(cause);
-                            if (newChannel != null) {
-                                newChannel.close();
+        public void run() {
+            try {
+                if (reconnectingAllowed()) {
+                    final int connectionAttempt = connectionAttempts.getAndIncrement();
+                    totalConnectionAttempts.getAndIncrement();
+
+                    LOG.debug("connecting to {} [attempt {}]", address.toString(), connectionAttempt);
+                    ChannelFuture future = bootstrap.connect(address);
+                    future.addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception
{
+                            // This call returns immediately
+                            Channel newChannel = future.getChannel();
+
+                            if (future.isSuccess() && connectionEstablished(newChannel))
{
+                                boolean setChannel = channelRef.compareAndSet(null, newChannel);
+                                checkState(setChannel);
+                                LOG.debug("successfully connected to {}, {} [attempt {}]",
address.toString(), newChannel.toString(),
+                                        connectionAttempt);
+                                if (messagesLost.get() > 0) {
+                                    LOG.warn("Re-connection to {} was successful but {} messages
has been lost so far", address.toString(), messagesLost.get());
+                                }
+                            } else {
+                                Throwable cause = future.getCause();
+                                reschedule(cause);
+                                if (newChannel != null) {
+                                    newChannel.close();
+                                }
                             }
                         }
-                    }
-                });
-            } else {
-                close();
-                throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
-                        connectionAttempts + " failed attempts. " + messagesLost.get() +
" messages were lost");
+                    });
+                } else {
+                    close();
+                    throw new RuntimeException("Giving up to scheduleConnect to " + dstAddressPrefixedName
+ " after " +
+                            connectionAttempts + " failed attempts. " + messagesLost.get()
+ " messages were lost");
 
+                }
+            }catch (Throwable e){
+                LOG.error("Uncaught throwable", e);
+                throw Throwables.propagate(e);
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/afa638cd/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
index 64f67ba..7e0cb0d 100644
--- a/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
+++ b/storm-core/src/jvm/backtype/storm/messaging/netty/Context.java
@@ -18,7 +18,6 @@
 package backtype.storm.messaging.netty;
 
 import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
-import org.jboss.netty.util.HashedWheelTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import java.util.concurrent.Executors;
@@ -35,13 +34,14 @@ import backtype.storm.utils.Utils;
 
 public class Context implements IContext {
     private static final Logger LOG = LoggerFactory.getLogger(Context.class);
-        
+
     @SuppressWarnings("rawtypes")
     private Map storm_conf;
     private volatile Vector<IConnection> connections;
     private NioClientSocketChannelFactory clientChannelFactory;
-    
-    private HashedWheelTimer clientScheduleService;
+
+    private ScheduledExecutorService clientScheduleService;
+    private final int MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE = 10;
 
     /**
      * initialization per Storm configuration 
@@ -53,7 +53,7 @@ public class Context implements IContext {
 
         //each context will have a single client channel factory
         int maxWorkers = Utils.getInt(storm_conf.get(Config.STORM_MESSAGING_NETTY_CLIENT_WORKER_THREADS));
-		ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
+        ThreadFactory bossFactory = new NettyRenameThreadFactory("client" + "-boss");
         ThreadFactory workerFactory = new NettyRenameThreadFactory("client" + "-worker");
         if (maxWorkers > 0) {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
@@ -62,8 +62,10 @@ public class Context implements IContext {
             clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
                     Executors.newCachedThreadPool(workerFactory));
         }
-        
-        clientScheduleService = new HashedWheelTimer(new NettyRenameThreadFactory("client-schedule-timer"),
10, TimeUnit.MILLISECONDS);
+
+        int otherWorkers = Utils.getInt(storm_conf.get(Config.TOPOLOGY_WORKERS), 1) - 1;
+        int poolSize = Math.min(Math.max(1, otherWorkers), MAX_CLIENT_SCHEDULER_THREAD_POOL_SIZE);
+        clientScheduleService = Executors.newScheduledThreadPool(poolSize, new NettyRenameThreadFactory("client-schedule-service"));
     }
 
     /**
@@ -78,8 +80,8 @@ public class Context implements IContext {
     /**
      * establish a connection to a remote server
      */
-    public IConnection connect(String storm_id, String host, int port) {        
-        IConnection client =  new Client(storm_conf, clientChannelFactory, 
+    public IConnection connect(String storm_id, String host, int port) {
+        IConnection client =  new Client(storm_conf, clientChannelFactory,
                 clientScheduleService, host, port);
         connections.add(client);
         return client;
@@ -89,12 +91,18 @@ public class Context implements IContext {
      * terminate this context
      */
     public void term() {
-        clientScheduleService.stop();
-        
+        clientScheduleService.shutdown();
+
         for (IConnection conn : connections) {
             conn.close();
         }
-        
+
+        try {
+            clientScheduleService.awaitTermination(30, TimeUnit.SECONDS);
+        } catch (InterruptedException e) {
+            LOG.error("Error when shutting down client scheduler", e);
+        }
+
         connections = null;
 
         //we need to release resources associated with client channel factory


Mime
View raw message