rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq-ons] 40/43: feat(trace)adapted to support RocketMQ native client
Date Fri, 06 Dec 2019 04:23:02 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch OpenMessaging
in repository https://gitbox.apache.org/repos/asf/rocketmq-ons.git

commit d7493f0e42bc7da471b37cbb2573d545b2c55f67
Author: duhenglucky <duhengforever@apache.org>
AuthorDate: Thu Nov 28 19:40:28 2019 +0800

    feat(trace)adapted to support RocketMQ native client
---
 .../open/trace/core/common/OnsTraceConstants.java  |   9 +-
 .../open/trace/core/dispatch/AsyncDispatcher.java  |   2 +
 .../core/dispatch/impl/AsyncArrayDispatcher.java   |  88 ++++++++++-----
 .../core/dispatch/impl/TraceProducerFactory.java   |  31 +++++-
 .../core/hook/OnsClientSendMessageHookImpl.java    |  97 ++++++++++++++++
 .../trace/core/hook/OnsConsumeMessageHookImpl.java | 123 +++++++++++++++++++++
 6 files changed, 310 insertions(+), 40 deletions(-)

diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
index 0877f84..f92ec8d 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/common/OnsTraceConstants.java
@@ -18,7 +18,6 @@
 package org.apache.rocketmq.ons.open.trace.core.common;
 
 import javax.annotation.Generated;
-
 import org.apache.rocketmq.common.MixAll;
 
 @Generated("ons-client")
@@ -42,16 +41,16 @@ public class OnsTraceConstants {
 
     public static final String MaxMsgSize = "MaxMsgSize";
 
-
     public static final String groupName = "_INNER_TRACE_PRODUCER";
 
     public static final String traceTopic = MixAll.SYSTEM_TOPIC_PREFIX + "TRACE_DATA_";
 
-
     public static final String default_region = MixAll.DEFAULT_TRACE_REGION_ID;
 
-    public static final char CONTENT_SPLITOR = (char)1;
-    public static final char FIELD_SPLITOR = (char)2;
+    public static final char CONTENT_SPLITOR = (char) 1;
+    public static final char FIELD_SPLITOR = (char) 2;
 
     public static final String TraceDispatcherType = "DispatcherType";
+
+    public static final String CustomizedTraceTopic = "customizedTraceTopic";
 }
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
index 87ead88..9156680 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/AsyncDispatcher.java
@@ -30,4 +30,6 @@ public interface AsyncDispatcher {
     void flush() throws IOException;
 
     void shutdown();
+
+    void start(String nameServerAddresses) throws MQClientException;
 }
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
index 4f02bbd..f3a456d 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/AsyncArrayDispatcher.java
@@ -30,6 +30,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.client.common.ThreadLocalIndex;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
@@ -52,13 +53,14 @@ import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDataEncoder;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceDispatcherType;
 import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceTransferBean;
 import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+import org.apache.rocketmq.remoting.RPCHook;
 
 public class AsyncArrayDispatcher implements AsyncDispatcher {
     private final static InternalLogger CLIENT_LOG = ClientLogger.getLog();
     private final int queueSize;
     private final int batchSize;
-    private final DefaultMQProducer traceProducer;
-    private final ThreadPoolExecutor traceExecuter;
+    private DefaultMQProducer traceProducer;
+    private final ThreadPoolExecutor traceExecutor;
 
     private AtomicLong discardCount;
     private Thread worker;
@@ -71,9 +73,24 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
     private DefaultMQPushConsumerImpl hostConsumer;
     private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
     private String dispatcherId = UUID.randomUUID().toString();
+    private String customizedTraceTopic;
 
-    public AsyncArrayDispatcher(Properties properties) throws MQClientException {
+    public AsyncArrayDispatcher(Properties properties, RPCHook rpcHook) {
+        this(properties, null, rpcHook);
+        traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
+    }
+
+    public AsyncArrayDispatcher(Properties properties) {
+        this(properties, null, null);
+    }
+
+    public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials)
{
+        this(properties, sessionCredentials, null);
+    }
+
+    public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials,
RPCHook rpcHook) {
         dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType);
+        this.customizedTraceTopic = properties.getProperty(OnsTraceConstants.CustomizedTraceTopic);
         int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize,
"2048"));
         queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
         this.queueSize = queueSize;
@@ -82,34 +99,22 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
         traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024);
         appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
 
-        this.traceExecuter = new ThreadPoolExecutor(//
+        this.traceExecutor = new ThreadPoolExecutor(//
             10, //
             20, //
             1000 * 60, //
             TimeUnit.MILLISECONDS, //
             this.appenderQueue, //
             new ThreadFactoryImpl("MQTraceSendThread_"));
-        traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties);
-    }
-
-    public AsyncArrayDispatcher(Properties properties, SessionCredentials sessionCredentials)
throws MQClientException {
-        dispatcherType = properties.getProperty(OnsTraceConstants.TraceDispatcherType);
-        int queueSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.AsyncBufferSize,
"2048"));
-        queueSize = 1 << (32 - Integer.numberOfLeadingZeros(queueSize - 1));
-        this.queueSize = queueSize;
-        batchSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxBatchNum,
"1"));
-        this.discardCount = new AtomicLong(0L);
-        traceContextQueue = new ArrayBlockingQueue<OnsTraceContext>(1024);
-        appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
-
-        this.traceExecuter = new ThreadPoolExecutor(
-            10,
-            20,
-            1000 * 60,
-            TimeUnit.MILLISECONDS,
-            this.appenderQueue,
-            new ThreadFactoryImpl("MQTraceSendThread_"));
-        traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials);
+        if (sessionCredentials == null && rpcHook == null) {
+            traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties);
+        }
+        if (properties != null && rpcHook != null) {
+            traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, rpcHook);
+        }
+        if (properties != null && sessionCredentials != null) {
+            traceProducer = TraceProducerFactory.getTraceDispatcherProducer(properties, sessionCredentials);
+        }
     }
 
     public DefaultMQProducerImpl getHostProducer() {
@@ -138,6 +143,12 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
     }
 
     @Override
+    public void start(String nameServerAddresses) throws MQClientException {
+        this.traceProducer.setNamesrvAddr(nameServerAddresses);
+        this.start();
+    }
+
+    @Override
     public boolean append(final Object ctx) {
         boolean result = traceContextQueue.offer((OnsTraceContext) ctx);
         if (!result) {
@@ -162,7 +173,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
     @Override
     public void shutdown() {
         this.stopped = true;
-        this.traceExecuter.shutdown();
+        this.traceExecutor.shutdown();
         TraceProducerFactory.unregisterTraceDispatcher(dispatcherId);
         this.removeShutdownHook();
     }
@@ -171,6 +182,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
         if (shutDownHook == null) {
             shutDownHook = new ThreadFactoryImpl("ShutdownHookMQTrace").newThread(new Runnable()
{
                 private volatile boolean hasShutdown = false;
+
                 @Override
                 public void run() {
                     synchronized (this) {
@@ -215,7 +227,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
                 }
                 if (contexts.size() > 0) {
                     AsyncAppenderRequest request = new AsyncAppenderRequest(contexts);
-                    traceExecuter.submit(request);
+                    traceExecutor.submit(request);
                 } else if (AsyncArrayDispatcher.this.stopped) {
                     this.stopped = true;
                 }
@@ -291,9 +303,13 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
 
         private void sendTraceDataByMQ(Set<String> keySet, final String data, String
dataTopic,
             String currentRegionId) {
-            String topic = OnsTraceConstants.traceTopic + currentRegionId;
+            String topic = customizedTraceTopic;
+            if (StringUtils.isBlank(topic)) {
+                topic = OnsTraceConstants.traceTopic + currentRegionId;
+            }
             final Message message = new Message(topic, data.getBytes());
             message.setKeys(keySet);
+
             try {
                 Set<String> dataBrokerSet = getBrokerSetByTopic(dataTopic);
                 Set<String> traceBrokerSet = tryGetMessageQueueBrokerSet(traceProducer.getDefaultMQProducerImpl(),
topic);
@@ -305,7 +321,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
 
                     @Override
                     public void onException(Throwable e) {
-                        CLIENT_LOG.info("send trace data ,the traceData is " + data);
+                        CLIENT_LOG.info("send trace data ,the traceData is: {} ", data, e);
                     }
                 };
                 if (dataBrokerSet.isEmpty()) {
@@ -333,7 +349,7 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
                 }
 
             } catch (Exception e) {
-                CLIENT_LOG.info("send trace data,the traceData is" + data);
+                CLIENT_LOG.info("send trace data,the traceData is: {}", data, e);
             }
         }
 
@@ -379,4 +395,16 @@ public class AsyncArrayDispatcher implements AsyncDispatcher {
             return brokerSet;
         }
     }
+
+    public String getCustomizedTraceTopic() {
+        return customizedTraceTopic;
+    }
+
+    public void setCustomizedTraceTopic(String customizedTraceTopic) {
+        this.customizedTraceTopic = customizedTraceTopic;
+    }
+
+    public DefaultMQProducer getTraceProducer() {
+        return traceProducer;
+    }
 }
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
index 60e65cb..fb1c3af 100644
--- a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/dispatch/impl/TraceProducerFactory.java
@@ -21,14 +21,13 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
 import org.apache.rocketmq.client.exception.MQClientException;
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.common.namesrv.TopAddressing;
-
 import org.apache.rocketmq.ons.api.impl.authority.SessionCredentials;
 import org.apache.rocketmq.ons.api.impl.rocketmq.ClientRPCHook;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.remoting.RPCHook;
 
 public class TraceProducerFactory {
     private static Map<String, Object> dispatcherTable = new ConcurrentHashMap<String,
Object>();
@@ -61,7 +60,27 @@ public class TraceProducerFactory {
         return traceProducer;
     }
 
-    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, SessionCredentials
sessionCredentials) {
+    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties, RPCHook
rpcHook) {
+        if (traceProducer == null) {
+            traceProducer = new DefaultMQProducer(rpcHook);
+            traceProducer.setProducerGroup(OnsTraceConstants.groupName);
+            traceProducer.setSendMsgTimeout(5000);
+            traceProducer.setInstanceName(properties.getProperty(OnsTraceConstants.InstanceName,
String.valueOf(System.currentTimeMillis())));
+            String nameSrv = properties.getProperty(OnsTraceConstants.NAMESRV_ADDR);
+            if (nameSrv == null) {
+                TopAddressing topAddressing = new TopAddressing(properties.getProperty(OnsTraceConstants.ADDRSRV_URL));
+                nameSrv = topAddressing.fetchNSAddr();
+            }
+            traceProducer.setNamesrvAddr(nameSrv);
+            traceProducer.setVipChannelEnabled(false);
+            int maxSize = Integer.parseInt(properties.getProperty(OnsTraceConstants.MaxMsgSize,
"128000"));
+            traceProducer.setMaxMessageSize(maxSize - 10 * 1000);
+        }
+        return traceProducer;
+    }
+
+    public static DefaultMQProducer getTraceDispatcherProducer(Properties properties,
+        SessionCredentials sessionCredentials) {
         if (traceProducer == null) {
             String accessKey = properties.getProperty(OnsTraceConstants.AccessKey);
             traceProducer = new DefaultMQProducer(new ClientRPCHook(sessionCredentials));
@@ -86,12 +105,14 @@ public class TraceProducerFactory {
         if (traceProducer != null && isStarted.compareAndSet(false, true)) {
             traceProducer.start();
         }
+
     }
 
     public static void unregisterTraceDispatcher(String dispatcherId) {
         dispatcherTable.remove(dispatcherId);
-        if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.get())
{
+        if (dispatcherTable.isEmpty() && traceProducer != null && isStarted.compareAndSet(true,
false)) {
             traceProducer.shutdown();
         }
     }
+
 }
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java
new file mode 100644
index 0000000..c7b7e4f
--- /dev/null
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsClientSendMessageHookImpl.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.ons.open.trace.core.hook;
+
+import java.util.ArrayList;
+import org.apache.rocketmq.client.hook.SendMessageContext;
+import org.apache.rocketmq.client.hook.SendMessageHook;
+import org.apache.rocketmq.client.producer.SendStatus;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+
+public class OnsClientSendMessageHookImpl implements SendMessageHook {
+
+    private AsyncDispatcher localDispatcher;
+
+    public OnsClientSendMessageHookImpl(AsyncDispatcher localDispatcher) {
+        this.localDispatcher = localDispatcher;
+    }
+
+    @Override
+    public String hookName() {
+        return "OnsClientSendMessageHook";
+    }
+
+    @Override
+    public void sendMessageBefore(SendMessageContext context) {
+
+        if (context == null || context.getMessage().getTopic().startsWith(MixAll.SYSTEM_TOPIC_PREFIX))
{
+            return;
+        }
+        OnsTraceContext onsContext = new OnsTraceContext();
+        onsContext.setTraceBeans(new ArrayList<OnsTraceBean>(1));
+        context.setMqTraceContext(onsContext);
+        onsContext.setTraceType(OnsTraceType.Pub);
+        String userGroup = NamespaceUtil.withoutNamespace(context.getProducerGroup(), context.getNamespace());
+        onsContext.setGroupName(userGroup);
+        OnsTraceBean traceBean = new OnsTraceBean();
+        String userTopic = NamespaceUtil.withoutNamespace(context.getMessage().getTopic(),
context.getNamespace());
+        traceBean.setTopic(userTopic);
+        traceBean.setTags(context.getMessage().getTags());
+        traceBean.setKeys(context.getMessage().getKeys());
+        traceBean.setStoreHost(context.getBrokerAddr());
+        traceBean.setBodyLength(context.getMessage().getBody().length);
+        traceBean.setMsgType(context.getMsgType());
+        onsContext.getTraceBeans().add(traceBean);
+    }
+
+    @Override
+    public void sendMessageAfter(SendMessageContext context) {
+
+        if (context == null || context.getMessage().getTopic().startsWith(OnsTraceConstants.traceTopic)
|| context.getMqTraceContext() == null) {
+            return;
+        }
+        if (context.getSendResult() == null) {
+            return;
+        }
+        if (context.getSendResult().getRegionId() == null
+            || context.getSendResult().getRegionId().equals(OnsTraceConstants.default_region)
+            || !context.getSendResult().isTraceOn()) {
+            // if regionId is default or switch is false,skip it
+            return;
+        }
+        OnsTraceContext onsContext = (OnsTraceContext) context.getMqTraceContext();
+        OnsTraceBean traceBean = onsContext.getTraceBeans().get(0);
+        int costTime = (int) ((System.currentTimeMillis() - onsContext.getTimeStamp()) /
onsContext.getTraceBeans().size());
+        onsContext.setCostTime(costTime);
+        if (context.getSendResult().getSendStatus().equals(SendStatus.SEND_OK)) {
+            onsContext.setSuccess(true);
+        } else {
+            onsContext.setSuccess(false);
+        }
+        onsContext.setRegionId(context.getSendResult().getRegionId());
+        traceBean.setMsgId(context.getSendResult().getMsgId());
+        traceBean.setOffsetMsgId(context.getSendResult().getOffsetMsgId());
+        traceBean.setStoreTime(onsContext.getTimeStamp() + costTime / 2);
+        localDispatcher.append(onsContext);
+    }
+}
diff --git a/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java
b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java
new file mode 100644
index 0000000..e08cac1
--- /dev/null
+++ b/ons-core/ons-trace-core/src/main/java/org/apache/rocketmq/ons/open/trace/core/hook/OnsConsumeMessageHookImpl.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.ons.open.trace.core.hook;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.rocketmq.client.consumer.listener.ConsumeReturnType;
+import org.apache.rocketmq.client.hook.ConsumeMessageContext;
+import org.apache.rocketmq.client.hook.ConsumeMessageHook;
+import org.apache.rocketmq.common.MixAll;
+import org.apache.rocketmq.common.message.MessageConst;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.common.protocol.NamespaceUtil;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceBean;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceConstants;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceContext;
+import org.apache.rocketmq.ons.open.trace.core.common.OnsTraceType;
+import org.apache.rocketmq.ons.open.trace.core.dispatch.AsyncDispatcher;
+
+public class OnsConsumeMessageHookImpl implements ConsumeMessageHook {
+
+    private AsyncDispatcher localDispatcher;
+
+    public OnsConsumeMessageHookImpl(AsyncDispatcher localDispatcher) {
+        this.localDispatcher = localDispatcher;
+    }
+
+    @Override
+    public String hookName() {
+        return "OnsConsumeMessageHook";
+    }
+
+    @Override
+    public void consumeMessageBefore(ConsumeMessageContext context) {
+        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty())
{
+            return;
+        }
+        OnsTraceContext onsTraceContext = new OnsTraceContext();
+        context.setMqTraceContext(onsTraceContext);
+        onsTraceContext.setTraceType(OnsTraceType.SubBefore);
+        String userGroup = NamespaceUtil.withoutNamespace(context.getConsumerGroup(), context.getNamespace());
+        onsTraceContext.setGroupName(userGroup);
+        List<OnsTraceBean> beans = new ArrayList<OnsTraceBean>();
+        for (MessageExt msg : context.getMsgList()) {
+            if (msg == null) {
+                continue;
+            }
+            String regionId = msg.getProperty(MessageConst.PROPERTY_MSG_REGION);
+            String traceOn = msg.getProperty(MessageConst.PROPERTY_TRACE_SWITCH);
+            if (regionId == null || regionId.equals(OnsTraceConstants.default_region)) {
+                // if regionId is default ,skip it
+                continue;
+            }
+            if (traceOn != null && "false".equals(traceOn)) {
+                // if trace switch is false ,skip it
+                continue;
+            }
+            OnsTraceBean traceBean = new OnsTraceBean();
+
+            String userTopic = NamespaceUtil.withoutNamespace(msg.getTopic(), context.getNamespace());
+            traceBean.setTopic(userTopic);
+            traceBean.setMsgId(msg.getMsgId());
+            traceBean.setTags(msg.getTags());
+            traceBean.setKeys(msg.getKeys());
+            traceBean.setStoreTime(msg.getStoreTimestamp());
+            traceBean.setBodyLength(msg.getStoreSize());
+            traceBean.setRetryTimes(msg.getReconsumeTimes());
+            onsTraceContext.setRegionId(regionId);
+            beans.add(traceBean);
+        }
+        if (beans.size() > 0) {
+            onsTraceContext.setTraceBeans(beans);
+            onsTraceContext.setTimeStamp(System.currentTimeMillis());
+            localDispatcher.append(onsTraceContext);
+        }
+    }
+
+    @Override
+    public void consumeMessageAfter(ConsumeMessageContext context) {
+        if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty())
{
+            return;
+        }
+        OnsTraceContext subBeforeContext = (OnsTraceContext) context.getMqTraceContext();
+        if (subBeforeContext.getRegionId().equals(OnsTraceConstants.default_region)) {
+            // if regionId is default ,skip it
+            return;
+        }
+        if (subBeforeContext.getTraceBeans() == null || subBeforeContext.getTraceBeans().size()
< 1) {
+            // if subbefore bean is null ,skip it
+            return;
+        }
+        OnsTraceContext subAfterContext = new OnsTraceContext();
+        subAfterContext.setTraceType(OnsTraceType.SubAfter);
+        subAfterContext.setRegionId(subBeforeContext.getRegionId());
+        subAfterContext.setGroupName(subBeforeContext.getGroupName());
+        subAfterContext.setRequestId(subBeforeContext.getRequestId());
+        subAfterContext.setSuccess(context.isSuccess());
+
+        int costTime = (int) ((System.currentTimeMillis() - subBeforeContext.getTimeStamp())
/ context.getMsgList().size());
+        subAfterContext.setCostTime(costTime);//
+        subAfterContext.setTraceBeans(subBeforeContext.getTraceBeans());
+        String contextType = context.getProps().get(MixAll.CONSUME_CONTEXT_TYPE);
+        if (contextType != null) {
+            subAfterContext.setContextCode(ConsumeReturnType.valueOf(contextType).ordinal());
+        }
+        localDispatcher.append(subAfterContext);
+    }
+}


Mime
View raw message