rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq] branch develop updated: Enhance: share netty handler (#635)
Date Thu, 11 Jul 2019 11:26:38 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9226292  Enhance: share netty handler (#635)
9226292 is described below

commit 92262928fef9727f1e3ec0066e9dd5edb74d7bb5
Author: Eric <erikfei1288@163.com>
AuthorDate: Thu Jul 11 19:26:30 2019 +0800

    Enhance: share netty handler (#635)
    
    * It's highly recommended to share one stateless handler with all the server-side channels.
    
    * comment for prepareSharableHandlers
    
    * Simplify comment
    
    * remove comment
    
    * supply test case
---
 .../rocketmq/remoting/netty/NettyEncoder.java      |  2 ++
 .../remoting/netty/NettyRemotingServer.java        | 28 ++++++++++++++++++----
 .../remoting/netty/NettyRemotingAbstractTest.java  | 17 +++++++++++++
 3 files changed, 42 insertions(+), 5 deletions(-)

diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
index 8c3c56a..4506e71 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.remoting.netty;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.MessageToByteEncoder;
 import java.nio.ByteBuffer;
@@ -26,6 +27,7 @@ import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
+@ChannelHandler.Sharable
 public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
     private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
 
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 7f6284e..32d169b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -22,6 +22,7 @@ import io.netty.buffer.PooledByteBufAllocator;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelDuplexHandler;
 import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInitializer;
 import io.netty.channel.ChannelOption;
@@ -82,6 +83,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
     private static final String TLS_HANDLER_NAME = "sslHandler";
     private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
 
+    // sharable handlers
+    private HandshakeHandler handshakeHandler;
+    private NettyEncoder encoder;
+    private NettyConnectManageHandler connectionManageHandler;
+    private NettyServerHandler serverHandler;
+
     public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
         this(nettyServerConfig, null);
     }
@@ -186,6 +193,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
                 }
             });
 
+        prepareSharableHandlers();
+
         ServerBootstrap childHandler =
             this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
                 .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
@@ -200,14 +209,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
                     @Override
                     public void initChannel(SocketChannel ch) throws Exception {
                         ch.pipeline()
-                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
-                                new HandshakeHandler(TlsSystemConfig.tlsMode))
+                            .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
                             .addLast(defaultEventExecutorGroup,
-                                new NettyEncoder(),
+                                encoder,
                                 new NettyDecoder(),
                                 new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
-                                new NettyConnectManageHandler(),
-                                new NettyServerHandler()
+                                connectionManageHandler,
+                                serverHandler
                             );
                     }
                 });
@@ -334,6 +342,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
         return this.publicExecutor;
     }
 
+    private void prepareSharableHandlers() {
+        handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
+        encoder = new NettyEncoder();
+        connectionManageHandler = new NettyConnectManageHandler();
+        serverHandler = new NettyServerHandler();
+    }
+
+    @ChannelHandler.Sharable
     class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
 
         private final TlsMode tlsMode;
@@ -396,6 +412,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
         }
     }
 
+    @ChannelHandler.Sharable
     class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
 
         @Override
@@ -404,6 +421,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
         }
     }
 
+    @ChannelHandler.Sharable
     class NettyConnectManageHandler extends ChannelDuplexHandler {
         @Override
         public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index c3da3e9..5330c90 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -26,6 +26,9 @@ import org.mockito.Spy;
 import org.mockito.junit.MockitoJUnitRunner;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.notNull;
 import static org.mockito.Mockito.when;
 
 @RunWith(MockitoJUnitRunner.class)
@@ -90,4 +93,18 @@ public class NettyRemotingAbstractTest {
         semaphore.acquire(1);
         assertThat(semaphore.availablePermits()).isEqualTo(0);
     }
+
+    @Test
+    public void testScanResponseTable() {
+        int dummyId = 1;
+        // mock timeout
+        ResponseFuture responseFuture = new ResponseFuture(null,dummyId, -1000, new InvokeCallback()
{
+            @Override
+            public void operationComplete(final ResponseFuture responseFuture) {
+            }
+        }, null);
+        remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture);
+        remotingAbstract.scanResponseTable();
+        assertNull(remotingAbstract.responseTable.get(dummyId));
+    }
 }
\ No newline at end of file


Mime
View raw message