rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] branch enchanced_msg_trace updated: [ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process can trace only one (#1275)
Date Sun, 23 Jun 2019 11:51:30 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch enchanced_msg_trace
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/enchanced_msg_trace by this push:
     new 0c5dae7  [ISSSUE 1188]Fix the problem when more than one producer or consumer in
the same process can trace only one (#1275)
0c5dae7 is described below

commit 0c5dae7cbcf6bad680fffb38a5e0eed018555aea
Author: zhengwen zhu <ahuazhu@gmail.com>
AuthorDate: Sun Jun 23 19:51:18 2019 +0800

    [ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process
can trace only one (#1275)
    
    * fix trace problem when multi produce/consumer in the same process
    
    * uniform parameter manner
    
    * variable rename
    
    * consumer groups may be same with the producer group
---
 .../rocketmq/client/consumer/DefaultMQPushConsumer.java     |  2 +-
 .../apache/rocketmq/client/producer/DefaultMQProducer.java  |  4 ++--
 .../apache/rocketmq/client/trace/AsyncTraceDispatcher.java  | 13 +++++++++++--
 .../org/apache/rocketmq/client/trace/TraceConstants.java    |  2 +-
 .../org/apache/rocketmq/client/trace/TraceDispatcher.java   |  5 ++++-
 5 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 339f799..6ad0fc3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
         defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup,
TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
                 dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
                 traceDispatcher = dispatcher;
                 this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b4acf8f..9b36cf0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -171,7 +171,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer
{
         //if client open the message trace feature
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
                 dispatcher.setHostProducer(this.defaultMQProducerImpl);
                 traceDispatcher = dispatcher;
                 this.defaultMQProducerImpl.registerSendMessageHook(
@@ -256,7 +256,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer
{
         //if client open the message trace feature
         if (enableMsgTrace) {
             try {
-                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+                AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
                 dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
                 traceDispatcher = dispatcher;
                 this.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ca3bcfa..b987d96 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
     private String traceTopicName;
     private AtomicBoolean isStarted = new AtomicBoolean(false);
     private AccessChannel accessChannel = AccessChannel.LOCAL;
+    private String group;
+    private Type type;
 
-    public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
+    public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook)
{
         // queueSize is greater than or equal to the n power of 2 of value
         this.queueSize = 2048;
         this.batchSize = 100;
         this.maxMsgSize = 128000;
         this.discardCount = new AtomicLong(0L);
         this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+        this.group = group;
+        this.type = type;
+
         this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
         if (!UtilAll.isBlank(traceTopicName)) {
             this.traceTopicName = traceTopicName;
@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         DefaultMQProducer traceProducerInstance = this.traceProducer;
         if (traceProducerInstance == null) {
             traceProducerInstance = new DefaultMQProducer(rpcHook);
-            traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
+            traceProducerInstance.setProducerGroup(genGroupNameForTrace());
             traceProducerInstance.setSendMsgTimeout(5000);
             traceProducerInstance.setVipChannelEnabled(false);
             // The max size of message is 128K
@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
         return traceProducerInstance;
     }
 
+    private String genGroupNameForTrace() {
+        return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
+    }
+
     @Override
     public boolean append(final Object ctx) {
         boolean result = traceContextQueue.offer((TraceContext) ctx);
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index e61ea9d..cb4a246 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
 
 public class TraceConstants {
 
-    public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
+    public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER";
     public static final char CONTENT_SPLITOR = (char) 1;
     public static final char FIELD_SPLITOR = (char) 2;
     public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 51cc0de..33341cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -24,7 +24,10 @@ import java.io.IOException;
  * Interface of asynchronous transfer data
  */
 public interface TraceDispatcher {
-
+    enum Type {
+        PRODUCE,
+        CONSUME
+    }
     /**
      * Initialize asynchronous transfer data module
      */


Mime
View raw message