This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch test-release
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 406cb8d4b6b4ae902f89a07dccb68a277f09588c
Author: Hu Zongtang <huzongtang@cmss.chinamobile.com>
AuthorDate: Fri Jan 3 15:27:10 2020 +0800
[ISSSUE 1188]Fix the problem when more than one producer or consumer in the same process
can trace only one (#1275) (#1303)
* fix trace problem when multi produce/consumer in the same process
* uniform parameter manner
* variable rename
* consumer groups may be same with the producer group
Co-authored-by: zhengwen zhu <ahuazhu@gmail.com>
---
.../rocketmq/client/consumer/DefaultMQPushConsumer.java | 2 +-
.../apache/rocketmq/client/producer/DefaultMQProducer.java | 4 ++--
.../apache/rocketmq/client/trace/AsyncTraceDispatcher.java | 13 +++++++++++--
.../org/apache/rocketmq/client/trace/TraceConstants.java | 2 +-
.../org/apache/rocketmq/client/trace/TraceDispatcher.java | 5 ++++-
5 files changed, 19 insertions(+), 7 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
index 339f799..6ad0fc3 100644
--- a/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/consumer/DefaultMQPushConsumer.java
@@ -388,7 +388,7 @@ public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsume
defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(consumerGroup,
TraceDispatcher.Type.CONSUME, customizedTraceTopic, rpcHook);
dispatcher.setHostConsumer(this.getDefaultMQPushConsumerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(
diff --git a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
index faa79f5..0457341 100644
--- a/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
+++ b/client/src/main/java/org/apache/rocketmq/client/producer/DefaultMQProducer.java
@@ -161,7 +161,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer
{
//if client open the message trace feature
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.defaultMQProducerImpl);
traceDispatcher = dispatcher;
this.defaultMQProducerImpl.registerSendMessageHook(
@@ -246,7 +246,7 @@ public class DefaultMQProducer extends ClientConfig implements MQProducer
{
//if client open the message trace feature
if (enableMsgTrace) {
try {
- AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(customizedTraceTopic,
rpcHook);
+ AsyncTraceDispatcher dispatcher = new AsyncTraceDispatcher(producerGroup,
TraceDispatcher.Type.PRODUCE, customizedTraceTopic, rpcHook);
dispatcher.setHostProducer(this.getDefaultMQProducerImpl());
traceDispatcher = dispatcher;
this.getDefaultMQProducerImpl().registerSendMessageHook(
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
index 06a28e4..8c3d886 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java
@@ -73,14 +73,19 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
private String traceTopicName;
private AtomicBoolean isStarted = new AtomicBoolean(false);
private AccessChannel accessChannel = AccessChannel.LOCAL;
+ private String group;
+ private Type type;
- public AsyncTraceDispatcher(String traceTopicName, RPCHook rpcHook) {
+ public AsyncTraceDispatcher(String group, Type type,String traceTopicName, RPCHook rpcHook)
{
// queueSize is greater than or equal to the n power of 2 of value
this.queueSize = 2048;
this.batchSize = 100;
this.maxMsgSize = 128000;
this.discardCount = new AtomicLong(0L);
this.traceContextQueue = new ArrayBlockingQueue<TraceContext>(1024);
+ this.group = group;
+ this.type = type;
+
this.appenderQueue = new ArrayBlockingQueue<Runnable>(queueSize);
if (!UtilAll.isBlank(traceTopicName)) {
this.traceTopicName = traceTopicName;
@@ -150,7 +155,7 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
DefaultMQProducer traceProducerInstance = this.traceProducer;
if (traceProducerInstance == null) {
traceProducerInstance = new DefaultMQProducer(rpcHook);
- traceProducerInstance.setProducerGroup(TraceConstants.GROUP_NAME);
+ traceProducerInstance.setProducerGroup(genGroupNameForTrace());
traceProducerInstance.setSendMsgTimeout(5000);
traceProducerInstance.setVipChannelEnabled(false);
// The max size of message is 128K
@@ -159,6 +164,10 @@ public class AsyncTraceDispatcher implements TraceDispatcher {
return traceProducerInstance;
}
+ private String genGroupNameForTrace() {
+ return TraceConstants.GROUP_NAME_PREFIX + "-" + this.group + "-" + this.type ;
+ }
+
@Override
public boolean append(final Object ctx) {
boolean result = traceContextQueue.offer((TraceContext) ctx);
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
index e61ea9d..cb4a246 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceConstants.java
@@ -20,7 +20,7 @@ import org.apache.rocketmq.common.MixAll;
public class TraceConstants {
- public static final String GROUP_NAME = "_INNER_TRACE_PRODUCER";
+ public static final String GROUP_NAME_PREFIX = "_INNER_TRACE_PRODUCER";
public static final char CONTENT_SPLITOR = (char) 1;
public static final char FIELD_SPLITOR = (char) 2;
public static final String TRACE_INSTANCE_NAME = "PID_CLIENT_INNER_TRACE_PRODUCER";
diff --git a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
index 51cc0de..33341cf 100644
--- a/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
+++ b/client/src/main/java/org/apache/rocketmq/client/trace/TraceDispatcher.java
@@ -24,7 +24,10 @@ import java.io.IOException;
* Interface of asynchronous transfer data
*/
public interface TraceDispatcher {
-
+ enum Type {
+ PRODUCE,
+ CONSUME
+ }
/**
* Initialize asynchronous transfer data module
*/
|