This is an automated email from the ASF dual-hosted git repository.
duhengforever pushed a commit to branch enchanced_msg_trace
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/enchanced_msg_trace by this push:
new 0c5dae7 [ISSSUE 1188]Fix the problem when more than one producer or consumer in
the same process can trace only one (#1275)
0c5dae7 is described below
commit 0c5dae7cbcf6bad680fffb38a5e0eed018555aea
Author: zhengwen zhu <ahuazhu@gmail.com>
AuthorDate: Sun Jun 23 19:51:18 2019 +0800
[ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process
can trace only one (#1275)
* fix trace problem when multi produce/consumer in the same process
* uniform parameter manner
* variable rename
* consumer groups may be same with the producer group
---
.../rocketmq/client/consumer/DefaultMQPushConsumer.java | 2 +-
.../apache/rocketmq/client/producer/DefaultMQProducer.java | 4 ++--
.../apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 13 +++++++++++--
.../org/apache/rocketmq/client/trace/TraceConstants.java | 2 +-
.../org/apache/rocketmq/client/trace/TraceDispatcher.java | 5 ++++-
5 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 339f799..6ad0fc3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup,
TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index b4acf8f..9b36cf0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -171,7 +171,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer
{
//if client open the message trace feature
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
@@ -256,7 +256,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer
{
//if client open the message trace feature
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index ca3bcfa..b987d96 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
+ private String group;
+ private Type type;
- public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
+ public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook)
{
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+ this.group = group;
+ this.type = type;
+
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
- traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
+ traceProducerInstance.setProducerGroup(genGroupNameForTrace());
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
return traceProducerInstance;
}
+ private String genGroupNameForTrace() {
+ return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
+ }
+
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index e61ea9d..cb4a246 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
public class TraceConstants {
- public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
+ public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 51cc0de..33341cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -24,7 +24,10 @@ import java.io.IOException;
* Interface of asynchronous transfer data
*/
public interface TraceDispatcher {
-
+ enum Type {
+ PRODUCE,
+ CONSUME
+ }
/**
* Initialize asynchronous transfer data module
*/
|