rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From duhengfore...@apache.org
Subject [rocketmq] 01/01: fix(transaction) fix the send back message sent into transaction half topic
Date Thu, 12 Dec 2019 18:23:18 GMT
This is an automated email from the ASF dual-hosted git repository.

duhengforever pushed a commit to branch fix_transaction_sendback
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 118bf22a4c1fe8d17495cc71f64c3f1ed74412ce
Author: duhenglucky <duhengforever@apache.org>
AuthorDate: Fri Dec 13 02:22:26 2019 +0800

    fix(transaction) fix the send back message sent into transaction half topic
---
 .../org/apache/rocketmq/broker/processor/EndTransactionProcessor.java  | 1 +
 .../org/apache/rocketmq/broker/processor/SendMessageProcessor.java     | 3 ++-
 .../rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java    | 1 +
 .../rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java       | 1 +
 4 files changed, 5 insertions(+), 1 deletion(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
index 1d5943d..9844cae 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/EndTransactionProcessor.java
@@ -132,6 +132,7 @@ public class EndTransactionProcessor implements NettyRequestProcessor
{
                     msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
                     msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
                     msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
+                    MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED);
                     RemotingCommand sendResult = sendFinalMessage(msgInner);
                     if (sendResult.getCode() == ResponseCode.SUCCESS) {
                         this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
index 2589a75..f753ebb 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/SendMessageProcessor.java
@@ -353,7 +353,8 @@ public class SendMessageProcessor extends AbstractSendMessageProcessor
implement
         PutMessageResult putMessageResult = null;
         Map<String, String> oriProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
         String traFlag = oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
-        if (traFlag != null && Boolean.parseBoolean(traFlag)) {
+        if (traFlag != null && Boolean.parseBoolean(traFlag)
+            && !(msgInner.getReconsumeTimes() > 0 && msgInner.getDelayTimeLevel()
> 0)) { //For client under version 4.6.1
             if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
                 response.setCode(ResponseCode.NO_PERMISSION);
                 response.setRemark(
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
index edc2647..e440bd9 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/ConsumeMessageOrderlyService.java
@@ -374,6 +374,7 @@ public class ConsumeMessageOrderlyService implements ConsumeMessageService
{
             MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
             MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()));
             MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+            MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
             this.defaultMQPushConsumer.getDefaultMQPushConsumerImpl().getmQClientFactory().getDefaultMQProducer().send(newMsg);
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
index 807e9c6..25a81a0 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultMQPushConsumerImpl.java
@@ -528,6 +528,7 @@ public class DefaultMQPushConsumerImpl implements MQConsumerInner {
             MessageAccessor.putProperty(newMsg, MessageConst.PROPERTY_RETRY_TOPIC, msg.getTopic());
             MessageAccessor.setReconsumeTime(newMsg, String.valueOf(msg.getReconsumeTimes()
+ 1));
             MessageAccessor.setMaxReconsumeTimes(newMsg, String.valueOf(getMaxReconsumeTimes()));
+            MessageAccessor.clearProperty(newMsg, MessageConst.PROPERTY_TRANSACTION_PREPARED);
             newMsg.setDelayTimeLevel(3 + msg.getReconsumeTimes());
 
             this.mQClientFactory.getDefaultMQProducer().send(newMsg);


Mime
View raw message