rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch master updated: [ISSUE #487] Implement Unsubscribe method for push consumer (#626)
Date Tue, 16 Mar 2021 11:40:18 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 8019e59  [ISSUE #487] Implement Unsubscribe method for push consumer (#626)
8019e59 is described below

commit 8019e59bf0cf7edb70da035d2529faf8573e908b
Author: 张旭 <maixiaohai00@gmail.com>
AuthorDate: Tue Mar 16 19:39:52 2021 +0800

    [ISSUE #487] Implement Unsubscribe method for push consumer (#626)
    
    * Implement Unsubscribe method for push consumer
---
 consumer/push_consumer.go      | 13 ++++++++++++-
 consumer/push_consumer_test.go | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 1 deletion(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 8fb4637..393a0e4 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -227,6 +227,14 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 		return errors.New("cannot subscribe topic since client either failed to start or has been
shutdown.")
 	}
 
+	// add retry topic subscription for resubscribe
+	retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+	_, exists := pc.subscriptionDataTable.Load(retryTopic)
+	if !exists {
+		sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
+		pc.subscriptionDataTable.Store(retryTopic, sub)
+	}
+
 	if pc.option.Namespace != "" {
 		topic = pc.option.Namespace + "%" + topic
 	}
@@ -241,7 +249,10 @@ func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
 	return nil
 }
 
-func (pc *pushConsumer) Unsubscribe(string) error {
+func (pc *pushConsumer) Unsubscribe(topic string) error {
+	pc.subscriptionDataTable.Delete(topic)
+	retryTopic := internal.GetRetryTopic(pc.consumerGroup)
+	pc.subscriptionDataTable.Delete(retryTopic)
 	return nil
 }
 
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
index 4789e9b..e67b2db 100644
--- a/consumer/push_consumer_test.go
+++ b/consumer/push_consumer_test.go
@@ -52,6 +52,23 @@ func TestStart(t *testing.T) {
 			return ConsumeSuccess, nil
 		})
 
+		_, exists := c.subscriptionDataTable.Load("TopicTest")
+		So(exists, ShouldBeTrue)
+
+		err = c.Unsubscribe("TopicTest")
+		So(err, ShouldBeNil)
+		_, exists = c.subscriptionDataTable.Load("TopicTest")
+		So(exists, ShouldBeFalse)
+
+		err = c.Subscribe("TopicTest", MessageSelector{}, func(ctx context.Context,
+			msgs ...*primitive.MessageExt) (ConsumeResult, error) {
+			fmt.Printf("subscribe callback: %v \n", msgs)
+			return ConsumeSuccess, nil
+		})
+
+		_, exists = c.subscriptionDataTable.Load("TopicTest")
+		So(exists, ShouldBeTrue)
+
 		client.EXPECT().ClientID().Return("127.0.0.1@DEFAULT")
 		client.EXPECT().Start().Return()
 		client.EXPECT().RegisterConsumer(gomock.Any(), gomock.Any()).Return(nil)


Mime
View raw message