rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [rocketmq] 01/01: Expose the rpc hook
Date Fri, 26 Oct 2018 10:59:58 GMT
This is an automated email from the ASF dual-hosted git repository.

dongeforever pushed a commit to branch feature_acl
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 4915871c935c99a1e705c43746d27bf27448307d
Author: zander <zhendongliu.lzd@alibaba-inc.com>
AuthorDate: Fri Oct 26 18:58:15 2018 +0800

    Expose the rpc hook
---
 .../remoting/netty/NettyRemotingAbstract.java      | 56 ++++++++++++++++++----
 .../remoting/netty/NettyRemotingClient.java        | 32 +++++--------
 .../remoting/netty/NettyRemotingServer.java        | 11 ++---
 3 files changed, 63 insertions(+), 36 deletions(-)

diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
index 8dccebc..206b96a 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingAbstract.java
@@ -23,6 +23,7 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.ssl.SslContext;
 import io.netty.handler.ssl.SslHandler;
 import java.net.SocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -95,6 +96,13 @@ public abstract class NettyRemotingAbstract {
      */
     protected volatile SslContext sslContext;
 
+    /**
+     * custom rpc hooks
+     */
+    protected List<RPCHook> rpcHooks = new ArrayList<RPCHook>();
+
+
+
     static {
         NettyLogger.initNettyLogger();
     }
@@ -158,6 +166,23 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+    protected void doBeforeRpcHooks(String addr, RemotingCommand request) {
+        if (rpcHooks.size() > 0) {
+            for (RPCHook rpcHook: rpcHooks) {
+                rpcHook.doBeforeRequest(addr, request);
+            }
+        }
+    }
+
+    protected void doAfterRpcHooks(String addr, RemotingCommand request, RemotingCommand
response) {
+        if (rpcHooks.size() > 0) {
+            for (RPCHook rpcHook: rpcHooks) {
+                rpcHook.doAfterResponse(addr, request, response);
+            }
+        }
+    }
+
+
     /**
      * Process incoming request command issued by remote peer.
      *
@@ -174,15 +199,9 @@ public abstract class NettyRemotingAbstract {
                 @Override
                 public void run() {
                     try {
-                        RPCHook rpcHook = NettyRemotingAbstract.this.getRPCHook();
-                        if (rpcHook != null) {
-                            rpcHook.doBeforeRequest(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd);
-                        }
-
+                        doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd);
                         final RemotingCommand response = pair.getObject1().processRequest(ctx,
cmd);
-                        if (rpcHook != null) {
-                            rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd, response);
-                        }
+                        doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
cmd, response);
 
                         if (!cmd.isOnewayRPC()) {
                             if (response != null) {
@@ -314,12 +333,29 @@ public abstract class NettyRemotingAbstract {
         }
     }
 
+
+
     /**
      * Custom RPC hook.
+     * Just be compatible with the previous version, use getRPCHooks instead.
+     */
+    @Deprecated
+    protected RPCHook getRPCHook() {
+        if (rpcHooks.size() > 0) {
+            return rpcHooks.get(0);
+        }
+        return null;
+    }
+
+    /**
+     * Custom RPC hooks.
      *
-     * @return RPC hook if specified; null otherwise.
+     * @return RPC hooks if specified; null otherwise.
      */
-    public abstract RPCHook getRPCHook();
+    public List<RPCHook> getRPCHooks() {
+        return rpcHooks;
+    }
+
 
     /**
      * This method specifies thread pool to use while invoking callback methods.
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
index 33c2eed..e891ad7 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java
@@ -34,6 +34,7 @@ import io.netty.handler.timeout.IdleState;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
 import io.netty.util.concurrent.DefaultEventExecutorGroup;
+import io.netty.util.concurrent.EventExecutorGroup;
 import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.cert.CertificateException;
@@ -53,6 +54,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
 import org.apache.rocketmq.remoting.InvokeCallback;
 import org.apache.rocketmq.remoting.RPCHook;
@@ -64,8 +67,6 @@ import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
-import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.logging.InternalLoggerFactory;
 import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
 public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient
{
@@ -94,7 +95,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
     private ExecutorService callbackExecutor;
     private final ChannelEventListener channelEventListener;
     private DefaultEventExecutorGroup defaultEventExecutorGroup;
-    private RPCHook rpcHook;
 
     public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
         this(nettyClientConfig, null);
@@ -283,7 +283,9 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
 
     @Override
     public void registerRPCHook(RPCHook rpcHook) {
-        this.rpcHook = rpcHook;
+        if (!rpcHooks.contains(rpcHook)) {
+            rpcHooks.add(rpcHook);
+        }
     }
 
     public void closeChannel(final Channel channel) {
@@ -357,6 +359,8 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
         }
     }
 
+
+
     @Override
     public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)
         throws InterruptedException, RemotingConnectException, RemotingSendRequestException,
RemotingTimeoutException {
@@ -364,17 +368,13 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
         final Channel channel = this.getAndCreateChannel(addr);
         if (channel != null && channel.isActive()) {
             try {
-                if (this.rpcHook != null) {
-                    this.rpcHook.doBeforeRequest(addr, request);
-                }
+                doBeforeRpcHooks(addr, request);
                 long costTime = System.currentTimeMillis() - beginStartTime;
                 if (timeoutMillis < costTime) {
                     throw new RemotingTimeoutException("invokeSync call timeout");
                 }
                 RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis
- costTime);
-                if (this.rpcHook != null) {
-                    this.rpcHook.doAfterResponse(RemotingHelper.parseChannelRemoteAddr(channel),
request, response);
-                }
+                doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request,
response);
                 return response;
             } catch (RemotingSendRequestException e) {
                 log.warn("invokeSync: send request exception, so close the channel[{}]",
addr);
@@ -522,9 +522,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
         final Channel channel = this.getAndCreateChannel(addr);
         if (channel != null && channel.isActive()) {
             try {
-                if (this.rpcHook != null) {
-                    this.rpcHook.doBeforeRequest(addr, request);
-                }
+                doBeforeRpcHooks(addr, request);
                 long costTime = System.currentTimeMillis() - beginStartTime;
                 if (timeoutMillis < costTime) {
                     throw new RemotingTooMuchRequestException("invokeAsync call timeout");
@@ -547,9 +545,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
         final Channel channel = this.getAndCreateChannel(addr);
         if (channel != null && channel.isActive()) {
             try {
-                if (this.rpcHook != null) {
-                    this.rpcHook.doBeforeRequest(addr, request);
-                }
+                doBeforeRpcHooks(addr, request);
                 this.invokeOnewayImpl(channel, request, timeoutMillis);
             } catch (RemotingSendRequestException e) {
                 log.warn("invokeOneway: send request exception, so close the channel[{}]",
addr);
@@ -592,10 +588,6 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements
Remoti
         return channelEventListener;
     }
 
-    @Override
-    public RPCHook getRPCHook() {
-        return this.rpcHook;
-    }
 
     @Override
     public ExecutorService getCallbackExecutor() {
diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
index 1984842..90386f3 100644
--- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
+++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingServer.java
@@ -40,6 +40,8 @@ import io.netty.util.concurrent.DefaultEventExecutorGroup;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.cert.CertificateException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -75,7 +77,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
     private final Timer timer = new Timer("ServerHouseKeepingService", true);
     private DefaultEventExecutorGroup defaultEventExecutorGroup;
 
-    private RPCHook rpcHook;
 
     private int port = 0;
 
@@ -266,7 +267,9 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
 
     @Override
     public void registerRPCHook(RPCHook rpcHook) {
-        this.rpcHook = rpcHook;
+        if (!rpcHooks.contains(rpcHook)) {
+            rpcHooks.add(rpcHook);
+        }
     }
 
     @Override
@@ -318,10 +321,6 @@ public class NettyRemotingServer extends NettyRemotingAbstract implements
Remoti
         return channelEventListener;
     }
 
-    @Override
-    public RPCHook getRPCHook() {
-        return this.rpcHook;
-    }
 
     @Override
     public ExecutorService getCallbackExecutor() {


Mime
View raw message