rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 41/43: feat(client) use the trace rpc hook in trace core replace origin rpc hook
Date Fri, 06 Dec 2019 04:23:03 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit e46fac7d8f39874dbd16bc1b61febcac05b56a37
Author: duhenglucky <duhengforever@apache.org>
AuthorDate: Fri Nov 29 12:18:26 2019 +0800

    feat(client) use the trace rpc hook in trace core replace origin rpc hook
---
 .../ons/api/impl/rocketmq/ONSConsumerAbstract.java |   2 +-
 .../ons/api/impl/rocketmq/OrderProducerImpl.java   |   2 +-
 .../ons/api/impl/rocketmq/ProducerImpl.java        |   2 +-
 .../api/impl/rocketmq/TransactionProducerImpl.java |   2 +-
 .../tracehook/OnsClientSendMessageHookImpl.java    |  97 ----------------
 .../impl/tracehook/OnsConsumeMessageHookImpl.java  | 123 ---------------------
 .../core/dispatch/impl/TraceProducerFactory.java   |  16 ++-
 7 files changed, 14 insertions(+), 230 deletions(-)

diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
index d82feeb..3c6cae2 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ONSConsumerAbstract.java
@@ -27,11 +27,11 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsConsumeMessageHookImpl;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
 import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsConsumeMessageHookImpl;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class ONSConsumerAbstract extends ONSClientAbstract {
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
index c840bb2..2808444 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/OrderProducerImpl.java
@@ -31,11 +31,11 @@ import org.apache.rocketmq.logging.InternalLogger;
 
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
 import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class OrderProducerImpl extends ONSClientAbstract implements OrderProducer {
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
index cccdf05..80b3fe6 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/ProducerImpl.java
@@ -34,11 +34,11 @@ import org.apache.rocketmq.common.protocol.ResponseCode;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
 import org.apache.rocketmq.ons.api.exception.ONSClientException;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
 import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.remoting.exception.RemotingConnectException;
 import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
index 66b3014..f56a460 100644
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
+++ b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/rocketmq/TransactionProducerImpl.java
@@ -33,11 +33,11 @@ import org.apache.rocketmq.common.UtilAll;
 import org.apache.rocketmq.logging.InternalLogger;
 import org.apache.rocketmq.ons.api.Constants;
 import org.apache.rocketmq.ons.api.PropertyKeyConst;
-import org.apache.rocketmq.ons.api.impl.tracehook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.ons.api.impl.util.ClientLoggerUtil;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
 import org.apache.rocketmq.ons.open.trace.core.dispatch.impl.AsyncArrayDispatcher;
+import org.apache.rocketmq.ons.open.trace.core.hook.OnsClientSendMessageHookImpl;
 import org.apache.rocketmq.remoting.protocol.LanguageCode;
 
 public class TransactionProducerImpl extends ONSClientAbstract implements TransactionProducer
{
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java
deleted file mode 100644
index 6385ad9..0000000
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsClientSendMessageHookImpl.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.rocketmq.ons.api.impl.tracehook;
-
-import java.util.ArrayList;
-import org.apache.rocketmq.client.hook.SendMessageContext;
-import org.apache.rocketmq.client.hook.SendMessageHook;
-import org.apache.rocketmq.client.producer.SendStatus;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType;
-import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
-
-public class OnsClientSendMessageHookImpl implements SendMessageHook {
-
-    private AsyncDispatcher localDispatcher;
-
-    public OnsClientSendMessageHookImpl(AsyncDispatcher localDispatcher) {
-        this.localDispatcher = localDispatcher;
-    }
-
-    @Override
-    public String hookName() {
-        return "OnsClientSendMessageHook";
-    }
-
-    @Override
-    public void sendMessageBefore(SendMessageContext context) {
-
-        if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX))
{
-            return;
-        }
-        OnsTraceContext onsContext = new OnsTraceContext();
-        onsContext.setTraceBeans(new ArrayList<OnsTraceBean>(1));
-        context.setMqTraceContext(onsContext);
-        onsContext.setTraceType(OnsTraceType.Pub);
-        String userGroup = NamespaceUtil.withoutNamespace(context.getProducerGroup(), context.getNamespace());
-        onsContext.setGroupName(userGroup);
-        OnsTraceBean traceBean = new OnsTraceBean();
-        String userTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic(),
context.getNamespace());
-        traceBean.setTopic(userTopic);
-        traceBean.setTags(context.getMessage().getTags());
-        traceBean.setKeys(context.getMessage().getKeys());
-        traceBean.setStoreHost(context.getBrokerAddr());
-        traceBean.setBodyLength(context.getMessage().getBody().length);
-        traceBean.setMsgType(context.getMsgType());
-        onsContext.getTraceBeans().add(traceBean);
-    }
-
-    @Override
-    public void sendMessageAfter(SendMessageContext context) {
-
-        if (context == null || context.getMessage().getTopic().startsWith(OnsTraceConstants.traceTopic)
|| context.getMqTraceContext() == null) {
-            return;
-        }
-        if (context.getSendResult() == null) {
-            return;
-        }
-        if (context.getSendResult().getRegionId() == null
-            || context.getSendResult().getRegionId().equals(OnsTraceConstants.default_region)
-            || !context.getSendResult().isTraceOn()) {
-            // if regionId is default or switch is false,skip it
-            return;
-        }
-        OnsTraceContext onsContext = (OnsTraceContext) context.getMqTraceContext();
-        OnsTraceBean traceBean = onsContext.getTraceBeans().get(0);
-        int costTime = (int) ((System.currentTimeMillis() - onsContext.getTimeStamp()) /
onsContext.getTraceBeans().size());
-        onsContext.setCostTime(costTime);
-        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
-            onsContext.setSuccess(true);
-        } else {
-            onsContext.setSuccess(false);
-        }
-        onsContext.setRegionId(context.getSendResult().getRegionId());
-        traceBean.setMsgId(context.getSendResult().getMsgId());
-        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
-        traceBean.setStoreTime(onsContext.getTimeStamp() + costTime / 2);
-        localDispatcher.append(onsContext);
-    }
-}
diff --git a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java
b/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java
deleted file mode 100644
index d5a0782..0000000
--- a/ons-core/ons-client/src/main/java/org/apache/rocketmq/ons/api/impl/tracehook/OnsConsumeMessageHookImpl.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.rocketmq.ons.api.impl.tracehook;
-
-import java.util.ArrayList;
-import java.util.List;
-import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
-import org.apache.rocketmq.client.hook.ConsumeMessageContext;
-import org.apache.rocketmq.client.hook.ConsumeMessageHook;
-import org.apache.rocketmq.common.MixAll;
-import org.apache.rocketmq.common.message.MessageConst;
-import org.apache.rocketmq.common.message.MessageExt;
-import org.apache.rocketmq.common.protocol.NamespaceUtil;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType;
-import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
-
-public class OnsConsumeMessageHookImpl implements ConsumeMessageHook {
-
-    private AsyncDispatcher localDispatcher;
-
-    public OnsConsumeMessageHookImpl(AsyncDispatcher localDispatcher) {
-        this.localDispatcher = localDispatcher;
-    }
-
-    @Override
-    public String hookName() {
-        return "OnsConsumeMessageHook";
-    }
-
-    @Override
-    public void consumeMessageBefore(ConsumeMessageContext context) {
-        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty())
{
-            return;
-        }
-        OnsTraceContext onsTraceContext = new OnsTraceContext();
-        context.setMqTraceContext(onsTraceContext);
-        onsTraceContext.setTraceType(OnsTraceType.SubBefore);
-        String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace());
-        onsTraceContext.setGroupName(userGroup);
-        List<OnsTraceBean> beans = new ArrayList<OnsTraceBean>();
-        for (MessageExt msg : context.getMsgList()) {
-            if (msg == null) {
-                continue;
-            }
-            String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
-            String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
-            if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) {
-                // if regionId is default ,skip it
-                continue;
-            }
-            if (traceOn != null && "false".equals(traceOn)) {
-                // if trace switch is false ,skip it
-                continue;
-            }
-            OnsTraceBean traceBean = new OnsTraceBean();
-
-            String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace());
-            traceBean.setTopic(userTopic);
-            traceBean.setMsgId(msg.getMsgId());
-            traceBean.setTags(msg.getTags());
-            traceBean.setKeys(msg.getKeys());
-            traceBean.setStoreTime(msg.getStoreTimestamp());
-            traceBean.setBodyLength(msg.getStoreSize());
-            traceBean.setRetryTimes(msg.getReconsumeTimes());
-            onsTraceContext.setRegionId(regionId);
-            beans.add(traceBean);
-        }
-        if (beans.size() > 0) {
-            onsTraceContext.setTraceBeans(beans);
-            onsTraceContext.setTimeStamp(System.currentTimeMillis());
-            localDispatcher.append(onsTraceContext);
-        }
-    }
-
-    @Override
-    public void consumeMessageAfter(ConsumeMessageContext context) {
-        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty())
{
-            return;
-        }
-        OnsTraceContext subBeforeContext = (OnsTraceContext) context.getMqTraceContext();
-        if (subBeforeContext.getRegionId().equals(OnsTraceConstants.default_region)) {
-            // if regionId is default ,skip it
-            return;
-        }
-        if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size()
< 1) {
-            // if subbefore bean is null ,skip it
-            return;
-        }
-        OnsTraceContext subAfterContext = new OnsTraceContext();
-        subAfterContext.setTraceType(OnsTraceType.SubAfter);
-        subAfterContext.setRegionId(subBeforeContext.getRegionId());
-        subAfterContext.setGroupName(subBeforeContext.getGroupName());
-        subAfterContext.setRequestId(subBeforeContext.getRequestId());
-        subAfterContext.setSuccess(context.isSuccess());
-
-        int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp())
/ context.getMsgList().size());
-        subAfterContext.setCostTime(costTime);//
-        subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
-        String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
-        if (contextType != null) {
-            subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
-        }
-        localDispatcher.append(subAfterContext);
-    }
-}
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
index fb1c3af..187637f 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
@@ -36,14 +36,18 @@ public class TraceProducerFactory {
 
     public static DefaultMQProducer getTraceDispatcherProducer(Properties properties) {
         if (traceProducer == null) {
-            SessionCredentials sessionCredentials = new SessionCredentials();
-            Properties sessionProperties = new Properties();
             String accessKey = properties.getProperty(OnsTraceConstants.AccessKey);
             String secretKey = properties.getProperty(OnsTraceConstants.SecretKey);
-            sessionProperties.put(OnsTraceConstants.AccessKey, accessKey);
-            sessionProperties.put(OnsTraceConstants.SecretKey, secretKey);
-            sessionCredentials.updateContent(sessionProperties);
-            traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
+            if (accessKey != null && secretKey != null) {
+                SessionCredentials sessionCredentials = new SessionCredentials();
+                Properties sessionProperties = new Properties();
+                sessionProperties.put(OnsTraceConstants.AccessKey, accessKey);
+                sessionProperties.put(OnsTraceConstants.SecretKey, secretKey);
+                sessionCredentials.updateContent(sessionProperties);
+                traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
+            } else {
+                traceProducer = new DefaultMQProducer();
+            }
             traceProducer.setProducerGroup(accessKey + OnsTraceConstants.groupName);
             traceProducer.setSendMsgTimeout(5000);
             traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName,
String.valueOf(System.currentTimeMillis())));


Mime
View raw message