rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 42/43: feat(ons-client) remove the dependency of ons-auth4client
Date Fri, 06 Dec 2019 04:23:04 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit 51e20e3325f79dd0b42bd773b156ac4290e395a7
Author: duhenglucky <duhengforever@apache.org>
AuthorDate: Wed Dec 4 17:26:57 2019 +0800

    feat(ons-client) remove the dependency of ons-auth4client
---
 ons-core/ons-client/pom.xml                        | 21 +++++--
 .../org/apache/rocketmq/ons/api/Constants.java     |  2 +
 .../exception/AuthenticationException.java         | 68 ++++++++++++++++++++
 .../rocketmq/ONSChannel.java}                      | 12 ++--
 .../ons/api/impl/rocketmq/ONSClientAbstract.java   | 33 ++--------
 .../ons/api/impl/rocketmq/ONSConsumerAbstract.java | 11 ++--
 .../ons/api/impl/rocketmq/OnsClientRPCHook.java    | 28 +++++++--
 .../ons/api/impl/rocketmq/OrderProducerImpl.java   | 12 ++--
 .../ons/api/impl/rocketmq/ProducerImpl.java        |  7 ++-
 .../ons/api/impl/rocketmq/PullConsumerImpl.java    |  3 +-
 .../api/impl/rocketmq/TransactionProducerImpl.java |  9 ++-
 ons-core/ons-trace-core/pom.xml                    |  5 --
 .../core/dispatch/impl/AsyncArrayDispatcher.java   | 30 +++------
 .../core/dispatch/impl/TraceProducerFactory.java   | 55 +----------------
 .../ons/open/trace/core/hook/AbstractRPCHook.java  | 72 ----------------------
 ons-core/pom.xml                                   |  5 ++
 16 files changed, 156 insertions(+), 217 deletions(-)

diff --git a/ons-core/ons-client/pom.xml b/ons-core/ons-client/pom.xml
index b9558d8..fde3dc5 100644
--- a/ons-core/ons-client/pom.xml
+++ b/ons-core/ons-client/pom.xml
@@ -33,6 +33,11 @@
             <version>${rocketmq.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-acl</artifactId>
+            <version>${rocketmq.version}</version>
+        </dependency>
+        <dependency>
             <groupId>commons-codec</groupId>
             <artifactId>commons-codec</artifactId>
         </dependency>
@@ -45,12 +50,18 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>ons-trace-core</artifactId>
             <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>${project.groupId}</groupId>
+                    <artifactId>ons-auth4client</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
-        <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>ons-auth4client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
+        <!--<dependency>-->
+            <!--<groupId>${project.groupId}</groupId>-->
+            <!--<artifactId>ons-auth4client</artifactId>-->
+            <!--<version>${project.version}</version>-->
+        <!--</dependency>-->
         <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
index a82c405..2a82a0b 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
@@ -21,4 +21,6 @@ public class Constants {
     public static final String TRANSACTION_ID = "__transactionId__";
 
     public static final String TOPIC_PARTITION_SEPARATOR = "#";
+
+    public static final String ONS_CHANNEL_KEY = "OnsChannel";
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java
new file mode 100644
index 0000000..b4c57d6
--- /dev/null
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/authority/exception/AuthenticationException.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.api.impl.authority.exception;
+
+public class AuthenticationException extends RuntimeException {
+    private static final long serialVersionUID = 1L;
+
+    private String status;
+    private int code;
+
+
+    public AuthenticationException(String status, int code) {
+        super();
+        this.status = status;
+        this.code = code;
+    }
+
+
+    public AuthenticationException(String status, int code, String message) {
+        super(message);
+        this.status = status;
+        this.code = code;
+    }
+
+
+    public AuthenticationException(String status, int code, Throwable throwable) {
+        super(throwable);
+        this.status = status;
+        this.code = code;
+    }
+
+
+    public AuthenticationException(String status, int code, String message, Throwable throwable)
{
+        super(message, throwable);
+        this.status = status;
+        this.code = code;
+    }
+
+    public String getStatus() {
+        return status;
+    }
+
+    public void setStatus(String status) {
+        this.status = status;
+    }
+
+    public int getCode() {
+        return code;
+    }
+
+    public void setCode(int code) {
+        this.code = code;
+    }
+}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java
similarity index 80%
copy from ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
copy to ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java
index a82c405..e4acc99 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/Constants.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSChannel.java
@@ -15,10 +15,14 @@
  * limitations under the License.
  */
 
-package org.apache.rocketmq.ons.api;
+package org.apache.rocketmq.ons.api.impl.rocketmq;
 
-public class Constants {
-    public static final String TRANSACTION_ID = "__transactionId__";
 
-    public static final String TOPIC_PARTITION_SEPARATOR = "#";
+public enum ONSChannel {
+
+    CLOUD,
+
+    ALIYUN,
+
+    ALL
 }
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
index 6b78740..984ba6f 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSClientAbstract.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
-
 import io.openmessaging.api.Credentials;
 import io.openmessaging.api.LifeCycle;
 import java.util.Properties;
@@ -28,14 +27,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Generated;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.SessionCredentials;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
 import org.apache.rocketmq.ons.api.impl.util.NameAddrUtils;
 import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
@@ -74,27 +74,17 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials
{
     public ONSClientAbstract(Properties properties) {
         this.properties = properties;
         this.sessionCredentials.updateContent(properties);
-        if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
-            (null == this.sessionCredentials.getAccessKey() || "".equals(this.sessionCredentials.getAccessKey())))
{
-            throw new ONSClientException("please set access key");
-        }
-
-        if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
-            (null == this.sessionCredentials.getSecretKey() || "".equals(this.sessionCredentials.getSecretKey())))
{
-            throw new ONSClientException("please set secret key");
-        }
-
-        if (null == this.sessionCredentials.getOnsChannel()) {
-            throw new ONSClientException("please set ons channel");
-        }
+        ONSChannel onsChannle = ONSChannel.valueOf(this.properties.getProperty(Constants.ONS_CHANNEL_KEY,
"ALIYUN"));
 
         this.nameServerAddr = getNameSrvAddrFromProperties();
         if (nameServerAddr != null) {
             return;
         }
-        if (nameServerAddr == null && !this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN))
{
+
+        if (nameServerAddr == null && !onsChannle.equals(ONSChannel.ALIYUN)) {
             return;
         }
+
         this.nameServerAddr = fetchNameServerAddr();
         if (null == nameServerAddr) {
             throw new ONSClientException(FAQ.errorMessage("Can not find name server, May
be your network problem.", FAQ.FIND_NS_FAILED));
@@ -239,17 +229,6 @@ public abstract class ONSClientAbstract implements LifeCycle, Credentials
{
 
     @Override
     public void updateCredential(Properties credentialProperties) {
-        if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
-            (null == credentialProperties.getProperty(SessionCredentials.AccessKey)
-                || "".equals(credentialProperties.getProperty(SessionCredentials.AccessKey))))
{
-            throw new ONSClientException("update credential failed. please set access key.");
-        }
-
-        if (this.sessionCredentials.getOnsChannel().equals(ONSChannel.ALIYUN) &&
-            (null == credentialProperties.getProperty(SessionCredentials.SecretKey)
-                || "".equals(credentialProperties.getProperty(SessionCredentials.SecretKey))))
{
-            throw new ONSClientException("update credential failed. please set secret key");
-        }
         this.sessionCredentials.updateContent(credentialProperties);
     }
 
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
index 3c6cae2..6cc4220 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
@@ -17,14 +17,15 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
-
 import io.openmessaging.api.MessageSelector;
 import java.util.Properties;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
@@ -55,8 +56,8 @@ public class ONSConsumerAbstract extends ONSClientAbstract {
         }
 
         this.defaultMQPushConsumer =
-            new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials));
-
+            new DefaultMQPushConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials,
+                properties.getProperty(Constants.ONS_CHANNEL_KEY)));
 
         String maxReconsumeTimes = properties.getProperty(PropertyKeyConst.MaxReconsumeTimes);
         if (!UtilAll.isBlank(maxReconsumeTimes)) {
@@ -117,15 +118,13 @@ public class ONSConsumerAbstract extends ONSClientAbstract {
         } else {
             try {
                 Properties tempProperties = new Properties();
-                tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
-                tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
                 tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
                 tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
                 tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
                 tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
                 tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
                 tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.CONSUMER.name());
-                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
sessionCredentials);
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
new AclClientRPCHook(sessionCredentials));
                 dispatcher.setHostConsumer(defaultMQPushConsumer.getDefaultMQPushConsumerImpl());
                 traceDispatcher = dispatcher;
                 this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
index a145c25..b117a00 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OnsClientRPCHook.java
@@ -17,23 +17,39 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
+import org.apache.rocketmq.acl.common.AclException;
+import org.apache.rocketmq.acl.common.SessionCredentials;
+import org.apache.rocketmq.ons.api.Constants;
 import org.apache.rocketmq.ons.api.impl.MQClientInfo;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
+import org.apache.rocketmq.ons.api.impl.authority.exception.AuthenticationException;
+import org.apache.rocketmq.remoting.protocol.RemotingCommand;
 
-public class OnsClientRPCHook extends ClientRPCHook {
+public class OnsClientRPCHook extends AclClientRPCHook {
+    private static final int CAL_SIGNATURE_FAILED = 10015;
+    private final ONSChannel onsChannel;
 
-    public OnsClientRPCHook(SessionCredentials sessionCredentials) {
+//    public OnsClientRPCHook(SessionCredentials sessionCredentials) {
+//        super(sessionCredentials);
+//        this.onsChannel = ONSChannel.ALIYUN;
+//    }
+
+    public OnsClientRPCHook(SessionCredentials sessionCredentials, String channel) {
         super(sessionCredentials);
+        this.onsChannel = ONSChannel.valueOf(channel);
     }
 
     @Override
     public void doBeforeRequest(String remoteAddr, RemotingCommand request) {
-        super.doBeforeRequest(remoteAddr, request);
+        try {
+            super.doBeforeRequest(remoteAddr, request);
+        } catch (AclException aclException) {
+            throw new AuthenticationException("CAL_SIGNATURE_FAILED", CAL_SIGNATURE_FAILED,
aclException.getMessage(), aclException);
+        }
+        request.addExtField(Constants.ONS_CHANNEL_KEY, onsChannel.name());
         request.setVersion(MQClientInfo.versionCode);
     }
 
-
     @Override
     public void doAfterResponse(String remoteAddr, RemotingCommand request, RemotingCommand
response) {
         super.doAfterResponse(remoteAddr, request, response);
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
index 2808444..1164ad1 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
@@ -17,18 +17,18 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
-
 import io.openmessaging.api.Message;
 import io.openmessaging.api.SendResult;
 import io.openmessaging.api.order.OrderProducer;
 import java.util.List;
 import java.util.Properties;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.logging.InternalLogger;
-
+import org.apache.rocketmq.ons.api.Constants;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
@@ -50,8 +50,8 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce
         }
 
         this.defaultMQProducer =
-            new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials));
-
+            new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials,
+                properties.getProperty(Constants.ONS_CHANNEL_KEY)));
 
         this.defaultMQProducer.setProducerGroup(producerGroup);
 
@@ -78,15 +78,13 @@ public class OrderProducerImpl extends ONSClientAbstract implements OrderProduce
         } else {
             try {
                 Properties tempProperties = new Properties();
-                tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
-                tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
                 tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
                 tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
                 tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
                 tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
                 tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
                 tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
-                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
sessionCredentials);
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
new AclClientRPCHook(sessionCredentials));
                 dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
                 this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
index 80b3fe6..97107ed 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
@@ -32,6 +32,7 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.common.message.MessageClientIDSetter;
 import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.logging.InternalLogger;
+import org.apache.rocketmq.ons.api.Constants;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
@@ -56,7 +57,8 @@ public class ProducerImpl extends ONSClientAbstract implements Producer
{
         }
 
         this.defaultMQProducer =
-            new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials));
+            new DefaultMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials,
+                properties.getProperty(Constants.ONS_CHANNEL_KEY)));
 
         this.defaultMQProducer.setProducerGroup(producerGroup);
 
@@ -96,7 +98,8 @@ public class ProducerImpl extends ONSClientAbstract implements Producer
{
                 tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
                 tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
                 tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
-                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
sessionCredentials);
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
+                    new OnsClientRPCHook(sessionCredentials, properties.getProperty(Constants.ONS_CHANNEL_KEY)));
                 dispatcher.setHostProducer(defaultMQProducer.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
                 this.defaultMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
index d364b88..233d637 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/PullConsumerImpl.java
@@ -63,7 +63,8 @@ public class PullConsumerImpl extends ONSClientAbstract implements PullConsumer
         }
 
         this.litePullConsumer =
-            new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials));
+            new DefaultLitePullConsumer(this.getNamespace(), consumerGroup, new OnsClientRPCHook(sessionCredentials,
+                properties.getProperty(Constants.ONS_CHANNEL_KEY)));
 
         String messageModel = properties.getProperty(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
         this.litePullConsumer.setMessageModel(MessageModel.valueOf(messageModel));
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
index f56a460..45a6839 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
@@ -17,7 +17,6 @@
 
 package org.apache.rocketmq.ons.api.impl.rocketmq;
 
-
 import io.openmessaging.api.Message;
 import io.openmessaging.api.SendResult;
 import io.openmessaging.api.transaction.LocalTransactionExecuter;
@@ -25,6 +24,7 @@ import io.openmessaging.api.transaction.TransactionProducer;
 import io.openmessaging.api.transaction.TransactionStatus;
 import java.util.Properties;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.acl.common.AclClientRPCHook;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.LocalTransactionState;
 import org.apache.rocketmq.client.producer.TransactionCheckListener;
@@ -53,7 +53,8 @@ public class TransactionProducerImpl extends ONSClientAbstract implements
Transa
             producerGroup = "__ONS_PRODUCER_DEFAULT_GROUP";
         }
         transactionMQProducer =
-            new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials));
+            new TransactionMQProducer(this.getNamespace(), producerGroup, new OnsClientRPCHook(sessionCredentials,
+                properties.getProperty(Constants.ONS_CHANNEL_KEY)));
 
         boolean isVipChannelEnabled = Boolean.parseBoolean(properties.getProperty(PropertyKeyConst.isVipChannelEnabled,
"false"));
         transactionMQProducer.setVipChannelEnabled(isVipChannelEnabled);
@@ -75,15 +76,13 @@ public class TransactionProducerImpl extends ONSClientAbstract implements
Transa
         } else {
             try {
                 Properties tempProperties = new Properties();
-                tempProperties.put(OnsTraceConstants.AccessKey, sessionCredentials.getAccessKey());
-                tempProperties.put(OnsTraceConstants.SecretKey, sessionCredentials.getSecretKey());
                 tempProperties.put(OnsTraceConstants.MaxMsgSize, "128000");
                 tempProperties.put(OnsTraceConstants.AsyncBufferSize, "2048");
                 tempProperties.put(OnsTraceConstants.MaxBatchNum, "100");
                 tempProperties.put(OnsTraceConstants.NAMESRV_ADDR, this.getNameServerAddr());
                 tempProperties.put(OnsTraceConstants.InstanceName, "PID_CLIENT_INNER_TRACE_PRODUCER");
                 tempProperties.put(OnsTraceConstants.TraceDispatcherType, OnsTraceDispatcherType.PRODUCER.name());
-                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
sessionCredentials);
+                AsyncArrayDispatcher dispatcher = new AsyncArrayDispatcher(tempProperties,
new AclClientRPCHook(sessionCredentials));
                 dispatcher.setHostProducer(transactionMQProducer.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
                 this.transactionMQProducer.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/ons-core/ons-trace-core/pom.xml b/ons-core/ons-trace-core/pom.xml
index 422b192..32317fc 100644
--- a/ons-core/ons-trace-core/pom.xml
+++ b/ons-core/ons-trace-core/pom.xml
@@ -29,11 +29,6 @@
     <name>ons-trace-core ${project.version}</name>
     <dependencies>
         <dependency>
-            <groupId>${project.groupId}</groupId>
-            <artifactId>ons-auth4client</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.rocketmq</groupId>
             <artifactId>rocketmq-client</artifactId>
         </dependency>
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index f3a456d..601f6df 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -46,7 +46,6 @@ import org.apache.rocketmq.common.message.Message;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.NamespaceUtil;
 import org.apache.rocketmq.logging.InternalLogger;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder;
@@ -75,20 +74,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
     private String dispatcherId = UUID.randomUUID().toString();
     private String customizedTraceTopic;
 
+    /**
+     * Create AsyncArrayDispatcher with acl RPC hook.
+     *
+     * @param properties
+     * @param rpcHook RPC hook only can be set with AclRPCHook
+     */
     public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) {
-        this(properties, null, rpcHook);
-        traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
-    }
-
-    public AsyncArrayDispatcher(Properties properties) {
-        this(properties, null, null);
-    }
-
-    public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials)
{
-        this(properties, sessionCredentials, null);
-    }
-
-    public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials,
RPCHook rpcHook) {
         dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType);
         this.customizedTraceTopic = properties.getProperty(OnsTraceConstants.CustomizedTraceTopic);
         int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize,
"2048"));
@@ -106,15 +98,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
             TimeUnit.MILLISECONDS, //
             this.appenderQueue, //
             new ThreadFactoryImpl("MQTraceSendThread_"));
-        if (sessionCredentials == null && rpcHook == null) {
-            traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties);
-        }
-        if (properties != null && rpcHook != null) {
-            traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
-        }
-        if (properties != null && sessionCredentials != null) {
-            traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials);
-        }
+        traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
     }
 
     public DefaultMQProducerImpl getHostProducer() {
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
index 187637f..4a75f7b 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
@@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
-import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
-import org.apache.rocketmq.ons.api.impl.rocketmq.ClientRPCHook;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.remoting.RPCHook;
 
@@ -34,39 +32,9 @@ public class TraceProducerFactory {
     private static AtomicBoolean isStarted = new AtomicBoolean(false);
     private static DefaultMQProducer traceProducer;
 
-    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
-        if (traceProducer == null) {
-            String accessKey = properties.getProperty(OnsTraceConstants.AccessKey);
-            String secretKey = properties.getProperty(OnsTraceConstants.SecretKey);
-            if (accessKey != null && secretKey != null) {
-                SessionCredentials sessionCredentials = new SessionCredentials();
-                Properties sessionProperties = new Properties();
-                sessionProperties.put(OnsTraceConstants.AccessKey, accessKey);
-                sessionProperties.put(OnsTraceConstants.SecretKey, secretKey);
-                sessionCredentials.updateContent(sessionProperties);
-                traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
-            } else {
-                traceProducer = new DefaultMQProducer();
-            }
-            traceProducer.setProducerGroup(accessKey + OnsTraceConstants.groupName);
-            traceProducer.setSendMsgTimeout(5000);
-            traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName,
String.valueOf(System.currentTimeMillis())));
-            String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR);
-            if (nameSrv == null) {
-                TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL));
-                nameSrv = topAddressing.fetchNSAddr();
-            }
-            traceProducer.setNamesrvAddr(nameSrv);
-            traceProducer.setVipChannelEnabled(false);
-            int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize,
"128000"));
-            traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
-        }
-        return traceProducer;
-    }
-
     public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook
rpcHook) {
         if (traceProducer == null) {
-            traceProducer = new DefaultMQProducer(rpcHook);
+            traceProducer = new DefaultMQProducer(rpcHook); //RPC hook only can be set with
AclRPCHook
             traceProducer.setProducerGroup(OnsTraceConstants.groupName);
             traceProducer.setSendMsgTimeout(5000);
             traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName,
String.valueOf(System.currentTimeMillis())));
@@ -83,27 +51,6 @@ public class TraceProducerFactory {
         return traceProducer;
     }
 
-    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties,
-        SessionCredentials sessionCredentials) {
-        if (traceProducer == null) {
-            String accessKey = properties.getProperty(OnsTraceConstants.AccessKey);
-            traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
-            traceProducer.setProducerGroup(accessKey.replace('.', '-') + OnsTraceConstants.groupName);
-            traceProducer.setSendMsgTimeout(5000);
-            traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName,
String.valueOf(System.currentTimeMillis())));
-            String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR);
-            if (nameSrv == null) {
-                TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL));
-                nameSrv = topAddressing.fetchNSAddr();
-            }
-            traceProducer.setNamesrvAddr(nameSrv);
-            traceProducer.setVipChannelEnabled(false);
-            int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize,
"128000"));
-            traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
-        }
-        return traceProducer;
-    }
-
     public static void registerTraceDispatcher(String dispatcherId) throws MQClientException
{
         dispatcherTable.put(dispatcherId, new Object());
         if (traceProducer != null && isStarted.compareAndSet(false, true)) {
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java
deleted file mode 100644
index 9e78880..0000000
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/AbstractRPCHook.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.ons.open.trace.core.hook;
-
-import java.lang.reflect.Field;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.apache.rocketmq.remoting.CommandCustomHeader;
-import org.apache.rocketmq.remoting.RPCHook;
-import org.apache.rocketmq.remoting.protocol.RemotingCommand;
-
-import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.AccessKey;
-import static org.apache.rocketmq.ons.api.impl.authority.SessionCredentials.ONSChannelKey;
-
-public abstract class AbstractRPCHook implements RPCHook {
-    protected ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>
fieldCache =
-            new ConcurrentHashMap<Class<? extends CommandCustomHeader>, Field[]>();
-
-
-    protected SortedMap<String, String> parseRequestContent(RemotingCommand request,
String ak, String onsChannel) {
-        CommandCustomHeader header = request.readCustomHeader();
-        // sort property
-        SortedMap<String, String> map = new TreeMap<String, String>();
-        map.put(AccessKey, ak);
-        map.put(ONSChannelKey, onsChannel);
-        try {
-            // add header properties
-            if (null != header) {
-                Field[] fields = fieldCache.get(header.getClass());
-                if (null == fields) {
-                    fields = header.getClass().getDeclaredFields();
-                    for (Field field : fields) {
-                        field.setAccessible(true);
-                    }
-                    Field[] tmp = fieldCache.putIfAbsent(header.getClass(), fields);
-                    if (null != tmp) {
-                        fields = tmp;
-                    }
-                }
-
-                for (Field field : fields) {
-                    Object value = field.get(header);
-                    if (null != value && !field.isSynthetic()) {
-                        map.put(field.getName(), value.toString());
-                    }
-                }
-            }
-            return map;
-        }
-        catch (Exception e) {
-            throw new RuntimeException("incompatible exception.", e);
-        }
-    }
-
-}
diff --git a/ons-core/pom.xml b/ons-core/pom.xml
index cd82803..dcd2f46 100644
--- a/ons-core/pom.xml
+++ b/ons-core/pom.xml
@@ -246,6 +246,11 @@
                 <version>${rocketmq.version}</version>
             </dependency>
             <dependency>
+                <groupId>org.apache.rocketmq</groupId>
+                <artifactId>rocketmq-acl</artifactId>
+                <version>${rocketmq.version}</version>
+            </dependency>
+            <dependency>
                 <groupId>${project.groupId}</groupId>
                 <artifactId>ons-auth4client</artifactId>
                 <version>${project.version}</version>


Mime
View raw message