This is an automated email from the ASF dual-hosted git repository.
huzongtang pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 9226292 Enhance: share netty handler (#635)
9226292 is described below
commit 92262928fef9727f1e3ec0066e9dd5edb74d7bb5
Author: Eric <erikfei1288@163.com>
AuthorDate: Thu Jul 11 19:26:30 2019 +0800
Enhance: share netty handler (#635)
* It's highly recommended to share one stateless handler with all the server-side channels.
* comment for prepareSharableHandlers
* Simplify comment
* remove comment
* supply test case
---
.../rocketmq/remoting/netty/NettyEncoder.java | 2 ++
.../remoting/netty/NettyRemotingServer.java | 28 ++++++++++++++++++----
.../remoting/netty/NettyRemotingAbstractTest.java | 17 +++++++++++++
3 files changed, 42 insertions(+), 5 deletions(-)
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
index 8c3c56a..4506e71 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyEncoder.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.remoting.netty;
import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.ByteBuffer;
@@ -26,6 +27,7 @@ import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+@ChannelHandler.Sharable
public class NettyEncoder extends MessageToByteEncoder<RemotingCommand> {
private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 7f6284e..32d169b 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -22,6 +22,7 @@ import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
@@ -82,6 +83,12 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
private static final String TLS_HANDLER_NAME = "sslHandler";
private static final String FILE_REGION_ENCODER_NAME = "fileRegionEncoder";
+ // sharable handlers
+ private HandshakeHandler handshakeHandler;
+ private NettyEncoder encoder;
+ private NettyConnectManageHandler connectionManageHandler;
+ private NettyServerHandler serverHandler;
+
public NettyRemotingServer(final NettyServerConfig nettyServerConfig) {
this(nettyServerConfig, null);
}
@@ -186,6 +193,8 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
}
});
+ prepareSharableHandlers();
+
ServerBootstrap childHandler =
this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
.channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
@@ -200,14 +209,13 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline()
- .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
- new HandshakeHandler(TlsSystemConfig.tlsMode))
+ .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)
.addLast(defaultEventExecutorGroup,
- new NettyEncoder(),
+ encoder,
new NettyDecoder(),
new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
- new NettyConnectManageHandler(),
- new NettyServerHandler()
+ connectionManageHandler,
+ serverHandler
);
}
});
@@ -334,6 +342,14 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
return this.publicExecutor;
}
+ private void prepareSharableHandlers() {
+ handshakeHandler = new HandshakeHandler(TlsSystemConfig.tlsMode);
+ encoder = new NettyEncoder();
+ connectionManageHandler = new NettyConnectManageHandler();
+ serverHandler = new NettyServerHandler();
+ }
+
+ @ChannelHandler.Sharable
class HandshakeHandler extends SimpleChannelInboundHandler<ByteBuf> {
private final TlsMode tlsMode;
@@ -396,6 +412,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
}
}
+ @ChannelHandler.Sharable
class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {
@Override
@@ -404,6 +421,7 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
}
}
+ @ChannelHandler.Sharable
class NettyConnectManageHandler extends ChannelDuplexHandler {
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
diff --git a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
index c3da3e9..5330c90 100644
--- a/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
+++ b/remoting/src/test/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstractTest.java
@@ -26,6 +26,9 @@ import org.mockito.Spy;
import org.mockito.junit.MockitoJUnitRunner;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.notNull;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
@@ -90,4 +93,18 @@ public class NettyRemotingAbstractTest {
semaphore.acquire(1);
assertThat(semaphore.availablePermits()).isEqualTo(0);
}
+
+ @Test
+ public void testScanResponseTable() {
+ int dummyId = 1;
+ // mock timeout
+ ResponseFuture responseFuture = new ResponseFuture(null,dummyId, -1000, new InvokeCallback()
{
+ @Override
+ public void operationComplete(final ResponseFuture responseFuture) {
+ }
+ }, null);
+ remotingAbstract.responseTable.putIfAbsent(dummyId, responseFuture);
+ remotingAbstract.scanResponseTable();
+ assertNull(remotingAbstract.responseTable.get(dummyId));
+ }
}
\ No newline at end of file
|