rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [40/51] [abbrv] incubator-rocketmq git commit: [ROCKETMQ-175] Consumer may miss messages because of inconsistent sub… closes apache/incubator-rocketmq#92
Date Tue, 06 Jun 2017 03:39:00 GMT
[ROCKETMQ-175] Consumer may miss messages because of inconsistent sub…  closes apache/incubator-rocketmq#92


Project: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/commit/547004c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/tree/547004c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/diff/547004c0

Branch: refs/heads/master
Commit: 547004c0968fb5280e01c391e91851218a9da2bc
Parents: 36e675d
Author: vsair <liuxuedee@gmail.com>
Authored: Fri May 26 15:13:29 2017 +0800
Committer: dongeforever <zhendongliu92@yeah.net>
Committed: Tue Jun 6 11:37:29 2017 +0800

----------------------------------------------------------------------
 .../rocketmq/client/impl/consumer/RebalancePushImpl.java | 11 +++++++++++
 1 file changed, 11 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/547004c0/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
index 1730c99..509c9a4 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/RebalancePushImpl.java
@@ -30,6 +30,7 @@ import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
 import org.apache.rocketmq.common.message.MessageQueue;
 import org.apache.rocketmq.common.protocol.heartbeat.ConsumeType;
 import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
+import org.apache.rocketmq.common.protocol.heartbeat.SubscriptionData;
 
 public class RebalancePushImpl extends RebalanceImpl {
     private final static long UNLOCK_DELAY_TIME_MILLS = Long.parseLong(System.getProperty("rocketmq.client.unlockDelayTimeMills",
"20000"));
@@ -47,6 +48,16 @@ public class RebalancePushImpl extends RebalanceImpl {
 
     @Override
     public void messageQueueChanged(String topic, Set<MessageQueue> mqAll, Set<MessageQueue>
mqDivided) {
+        /**
+         * When rebalance result changed, should update subscription's version to notify
broker.
+         * Fix: inconsistency subscription may lead to consumer miss messages.
+         */
+        SubscriptionData subscriptionData = this.subscriptionInner.get(topic);
+        long newVersion = System.currentTimeMillis();
+        log.info("{} Rebalance changed, also update version: {}, {}", topic, subscriptionData.getSubVersion(),
newVersion);
+        subscriptionData.setSubVersion(newVersion);
+        // notify broker
+        this.getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
     }
 
     @Override


Mime
View raw message