rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jinrongt...@apache.org
Subject [rocketmq] branch develop updated: [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset
Date Wed, 24 Mar 2021 03:39:21 GMT
This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4645942  [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset
4645942 is described below

commit 46459426801d582b5ef71c8a3e38267ccd6caad9
Author: panzhi <panzhi33@qq.com>
AuthorDate: Wed Mar 24 11:39:12 2021 +0800

    [ISSUE #2748] Fix deleteSubscriptionGroup not remove consumer offset
---
 .../rocketmq/broker/offset/ConsumerOffsetManager.java    | 16 ++++++++++++++++
 .../rocketmq/broker/processor/AdminBrokerProcessor.java  |  4 ++++
 .../org/apache/rocketmq/client/impl/MQClientAPIImpl.java |  3 ++-
 .../header/DeleteSubscriptionGroupRequestHeader.java     | 10 ++++++++++
 .../apache/rocketmq/tools/admin/DefaultMQAdminExt.java   |  7 +++++++
 .../rocketmq/tools/admin/DefaultMQAdminExtImpl.java      |  9 ++++++++-
 .../java/org/apache/rocketmq/tools/admin/MQAdminExt.java |  3 +++
 .../command/consumer/DeleteSubscriptionGroupCommand.java | 13 +++++++++++--
 8 files changed, 61 insertions(+), 4 deletions(-)

diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index ebc9dd8..bd05758 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -232,4 +232,20 @@ public class ConsumerOffsetManager extends ConfigManager {
         }
     }
 
+    public void removeOffset(final String group) {
+        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
+        while (it.hasNext()) {
+            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
+            String topicAtGroup = next.getKey();
+            if (topicAtGroup.contains(group)) {
+                String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
+                if (arrays.length == 2 && group.equals(arrays[1])) {
+                    it.remove();
+                    log.warn("clean group offset {}", topicAtGroup);
+                }
+            }
+        }
+
+    }
+
 }
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index dcdb701..0a1d214 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -714,6 +714,10 @@ public class AdminBrokerProcessor extends AsyncNettyRequestProcessor
implements
 
         this.brokerController.getSubscriptionGroupManager().deleteSubscriptionGroupConfig(requestHeader.getGroupName());
 
+        if (requestHeader.isRemoveOffset()) {
+            this.brokerController.getConsumerOffsetManager().removeOffset(requestHeader.getGroupName());
+        }
+
         if (this.brokerController.getBrokerConfig().isAutoDeleteUnusedStats()) {
             this.brokerController.getBrokerStatsManager().onGroupDeleted(requestHeader.getGroupName());
         }
diff --git a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
index 7a4d556..63b2045 100644
--- a/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
+++ b/client/src/main/java/org/apache/rocketmq/client/impl/MQClientAPIImpl.java
@@ -1467,10 +1467,11 @@ public class MQClientAPIImpl {
         throw new MQClientException(response.getCode(), response.getRemark());
     }
 
-    public void deleteSubscriptionGroup(final String addr, final String groupName, final
long timeoutMillis)
+    public void deleteSubscriptionGroup(final String addr, final String groupName, final
boolean removeOffset, final long timeoutMillis)
         throws RemotingException, MQBrokerException, InterruptedException, MQClientException
{
         DeleteSubscriptionGroupRequestHeader requestHeader = new DeleteSubscriptionGroupRequestHeader();
         requestHeader.setGroupName(groupName);
+        requestHeader.setRemoveOffset(removeOffset);
         RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.DELETE_SUBSCRIPTIONGROUP,
requestHeader);
 
         RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(),
addr),
diff --git a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
index dff9e2f..6591d77 100644
--- a/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
+++ b/common/src/main/java/org/apache/rocketmq/common/protocol/header/DeleteSubscriptionGroupRequestHeader.java
@@ -25,6 +25,8 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader
     @CFNotNull
     private String groupName;
 
+    private boolean removeOffset;
+
     @Override
     public void checkFields() throws RemotingCommandException {
     }
@@ -36,4 +38,12 @@ public class DeleteSubscriptionGroupRequestHeader implements CommandCustomHeader
     public void setGroupName(String groupName) {
         this.groupName = groupName;
     }
+
+    public boolean isRemoveOffset() {
+        return removeOffset;
+    }
+
+    public void setRemoveOffset(boolean removeOffset) {
+        this.removeOffset = removeOffset;
+    }
 }
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
index e80a813..8b1c228 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExt.java
@@ -321,6 +321,13 @@ public class DefaultMQAdminExt extends ClientConfig implements MQAdminExt
{
     }
 
     @Override
+    public void deleteSubscriptionGroup(String addr,
+        String groupName, boolean removeOffset) throws RemotingException, MQBrokerException,
InterruptedException,
+        MQClientException {
+        defaultMQAdminExtImpl.deleteSubscriptionGroup(addr, groupName, removeOffset);
+    }
+
+    @Override
     public void createAndUpdateKvConfig(String namespace, String key,
         String value) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException {
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
index 22d4005..5c34370 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/DefaultMQAdminExtImpl.java
@@ -424,7 +424,14 @@ public class DefaultMQAdminExtImpl implements MQAdminExt, MQAdminExtInner
{
     public void deleteSubscriptionGroup(String addr,
         String groupName) throws RemotingException, MQBrokerException, InterruptedException,
         MQClientException {
-        this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName,
timeoutMillis);
+        this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName,
false, timeoutMillis);
+    }
+
+    @Override
+    public void deleteSubscriptionGroup(String addr,
+        String groupName, boolean removeOffset) throws RemotingException, MQBrokerException,
InterruptedException,
+        MQClientException {
+        this.mqClientInstance.getMQClientAPIImpl().deleteSubscriptionGroup(addr, groupName,
removeOffset, timeoutMillis);
     }
 
     @Override
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
index 17b6225..d5462cb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/admin/MQAdminExt.java
@@ -152,6 +152,9 @@ public interface MQAdminExt extends MQAdmin {
     void deleteSubscriptionGroup(final String addr, String groupName) throws RemotingException,
MQBrokerException,
         InterruptedException, MQClientException;
 
+    void deleteSubscriptionGroup(final String addr, String groupName, boolean removeOffset)
throws RemotingException, MQBrokerException,
+        InterruptedException, MQClientException;
+
     void createAndUpdateKvConfig(String namespace, String key,
         String value) throws RemotingException, MQBrokerException,
         InterruptedException, MQClientException;
diff --git a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
index 96d8195..fb0efeb 100644
--- a/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
+++ b/tools/src/main/java/org/apache/rocketmq/tools/command/consumer/DeleteSubscriptionGroupCommand.java
@@ -54,6 +54,10 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
         opt.setRequired(true);
         options.addOption(opt);
 
+        opt = new Option("r", "removeOffset", true, "remove offset");
+        opt.setRequired(false);
+        options.addOption(opt);
+
         return options;
     }
 
@@ -65,11 +69,16 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
             // groupName
             String groupName = commandLine.getOptionValue('g').trim();
 
+            boolean removeOffset = false;
+            if (commandLine.hasOption('r')) {
+                removeOffset = Boolean.valueOf(commandLine.getOptionValue("r").trim());
+            }
+
             if (commandLine.hasOption('b')) {
                 String addr = commandLine.getOptionValue('b').trim();
                 adminExt.start();
 
-                adminExt.deleteSubscriptionGroup(addr, groupName);
+                adminExt.deleteSubscriptionGroup(addr, groupName, removeOffset);
                 System.out.printf("delete subscription group [%s] from broker [%s] success.%n",
groupName,
                     addr);
 
@@ -80,7 +89,7 @@ public class DeleteSubscriptionGroupCommand implements SubCommand {
 
                 Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(adminExt,
clusterName);
                 for (String master : masterSet) {
-                    adminExt.deleteSubscriptionGroup(master, groupName);
+                    adminExt.deleteSubscriptionGroup(master, groupName, removeOffset);
                     System.out.printf(
                         "delete subscription group [%s] from broker [%s] in cluster [%s]
success.%n",
                         groupName, master, clusterName);

Mime
View raw message