From commits-return-3700-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.apache.org Thu Jul 11 11:26:40 2019 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id B6F2119A9E for ; Thu, 11 Jul 2019 11:26:39 +0000 (UTC) Received: (qmail 82992 invoked by uid 500); 11 Jul 2019 11:26:39 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 82963 invoked by uid 500); 11 Jul 2019 11:26:39 -0000 Mailing-List: contact commits-help@rocketmq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.apache.org Delivered-To: mailing list commits@rocketmq.apache.org Received: (qmail 82954 invoked by uid 99); 11 Jul 2019 11:26:39 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Jul 2019 11:26:39 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id EC01885E04; Thu, 11 Jul 2019 11:26:38 +0000 (UTC) Date: Thu, 11 Jul 2019 11:26:38 +0000 To: "commits@rocketmq.apache.org" Subject: [rocketmq] branch develop updated: Enhance: share netty handler (#635) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156284439885.3765.15088240133222653834@gitbox.apache.org> From: huzongtang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: rocketmq X-Git-Refname: refs/heads/develop X-Git-Reftype: branch X-Git-Oldrev: 5b29b73cc8a10b24caf7bfb9317e0399555a6e58 X-Git-Newrev: 92262928fef9727f1e3ec0066e9dd5edb74d7bb5 X-Git-Rev: 92262928fef9727f1e3ec0066e9dd5edb74d7bb5 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. huzongtang pushed a commit to branch develop in repository https://gitbox.apache.org/repos/asf/rocketmq.git The following commit(s) were added to refs/heads/develop by this push: new 9226292 Enhance: share netty handler (#635) 9226292 is described below commit 92262928fef9727f1e3ec0066e9dd5edb74d7bb5 Author: Eric AuthorDate: Thu Jul 11 19:26:30 2019 +0800 Enhance: share netty handler (#635) * It's highly recommended to share one stateless handler with all the server-side channels. * comment for prepareSharableHandlers * Simplify comment * remove comment * supply test case --- .../rocketmq/remoting/netty/NettyEncoder.java | 2 ++ .../remoting/netty/NettyRemotingServer.java | 28 ++++++++++++++++++---- .../remoting/netty/NettyRemotingAbstractTest.java | 17 +++++++++++++ 3 files changed, 42 insertions(+), 5 deletions(-) diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java index 8c3c56a..4506e71 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java @@ -17,6 +17,7 @@ package org.apache.rocketmq.remoting.netty; import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; import java.nio.ByteBuffer; @@ -26,6 +27,7 @@ import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; import org.apache.rocketmq.remoting.protocol.RemotingCommand; +@ChannelHandler.Sharable public class NettyEncoder extends MessageToByteEncoder { private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING); diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java index 7f6284e..32d169b 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java @@ -22,6 +22,7 @@ import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -82,6 +83,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti private static final String TLS_HANDLER_NAME = "sslHandler"; private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder"; + // sharable handlers + private HandshakeHandler handshakeHandler; + private NettyEncoder encoder; + private NettyConnectManageHandler connectionManageHandler; + private NettyServerHandler serverHandler; + public NettyRemotingServer(final NettyServerConfig nettyServerConfig) { this(nettyServerConfig, null); } @@ -186,6 +193,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } }); + prepareSharableHandlers(); + ServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector) .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class) @@ -200,14 +209,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline() - .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, - new HandshakeHandler(TlsSystemConfig.tlsMode)) + .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler) .addLast(defaultEventExecutorGroup, - new NettyEncoder(), + encoder, new NettyDecoder(), new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()), - new NettyConnectManageHandler(), - new NettyServerHandler() + connectionManageHandler, + serverHandler ); } }); @@ -334,6 +342,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti return this.publicExecutor; } + private void prepareSharableHandlers() { + handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode); + encoder = new NettyEncoder(); + connectionManageHandler = new NettyConnectManageHandler(); + serverHandler = new NettyServerHandler(); + } + + @ChannelHandler.Sharable class HandshakeHandler extends SimpleChannelInboundHandler { private final TlsMode tlsMode; @@ -396,6 +412,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } + @ChannelHandler.Sharable class NettyServerHandler extends SimpleChannelInboundHandler { @Override @@ -404,6 +421,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements Remoti } } + @ChannelHandler.Sharable class NettyConnectManageHandler extends ChannelDuplexHandler { @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java index c3da3e9..5330c90 100644 --- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java +++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java @@ -26,6 +26,9 @@ import org.mockito.Spy; import org.mockito.junit.MockitoJUnitRunner; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.notNull; import static org.mockito.Mockito.when; @RunWith(MockitoJUnitRunner.class) @@ -90,4 +93,18 @@ public class NettyRemotingAbstractTest { semaphore.acquire(1); assertThat(semaphore.availablePermits()).isEqualTo(0); } + + @Test + public void testScanResponseTable() { + int dummyId = 1; + // mock timeout + ResponseFuture responseFuture = new ResponseFuture(null,dummyId, -1000, new InvokeCallback() { + @Override + public void operationComplete(final ResponseFuture responseFuture) { + } + }, null); + remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture); + remotingAbstract.scanResponseTable(); + assertNull(remotingAbstract.responseTable.get(dummyId)); + } } \ No newline at end of file