This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch test-release
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 171744886f8f1530027b32b0c07c960ed5fd2258
Author: 翊名 <duheng.dh@alibaba-inc.com>
AuthorDate: Tue Jan 7 20:50:13 2020 +0800
feat(pull_consumer) refactor the consumer offset update logic
---
.../org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java | 3 ++-
.../rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java | 2 +-
2 files changed, 3 insertions(+), 2 deletions(-)
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
index fad0b4f..4ab776a 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/AssignedMessageQueue.java
@@ -177,7 +177,8 @@ public class AssignedMessageQueue {
}
}
- public Set<MessageQueue> getAssignedMessageQueues() {
+
+ public Set<MessageQueue> getAssignedMessageQueue() {
return this.assignedMessageQueueState.keySet();
}
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
index 8483da6..4b732a7 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/consumer/DefaultLitePullConsumerImpl.java
@@ -908,7 +908,7 @@ public class DefaultLitePullConsumerImpl implements MQConsumerInner {
Set<MessageQueue> allocateMq = this.rebalanceImpl.getProcessQueueTable().keySet();
mqs.addAll(allocateMq);
} else if (this.subscriptionType == SubscriptionType.ASSIGN) {
- Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueues();
+ Set<MessageQueue> assignedMessageQueue = this.assignedMessageQueue.getAssignedMessageQueue();
mqs.addAll(assignedMessageQueue);
}
this.offsetStore.persistAll(mqs);
|