rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: fix(consumer): use sync offset fix instead if async (#414)
Date Wed, 19 Feb 2020 13:30:14 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 089cc03  fix(consumer): use sync offset fix instead if async (#414)
089cc03 is described below

commit 089cc03d644e54468b6688a74d26e5d23594fabe
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Wed Feb 19 21:30:02 2020 +0800

    fix(consumer): use sync offset fix instead if async (#414)
    
    fix(consumer): use sync offset fix instead if async (#414)
---
 consumer/push_consumer.go | 12 +++++-------
 1 file changed, 5 insertions(+), 7 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3bef44c..cb14688 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -679,13 +679,11 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			})
 			request.nextOffset = result.NextBeginOffset
 			pq.WithDropped(true)
-			go primitive.WithRecover(func() {
-				time.Sleep(10 * time.Second)
-				pc.storage.update(request.mq, request.nextOffset, false)
-				pc.storage.persist([]*primitive.MessageQueue{request.mq})
-				pc.storage.remove(request.mq)
-				rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
-			})
+			time.Sleep(10 * time.Second)
+			pc.storage.update(request.mq, request.nextOffset, false)
+			pc.storage.persist([]*primitive.MessageQueue{request.mq})
+			pc.processQueueTable.Delete(request.mq)
+			rlog.Warning(fmt.Sprintf("fix the pull request offset: %s", request.String()), nil)
 		default:
 			rlog.Warning(fmt.Sprintf("unknown pull status: %v", result.Status), nil)
 			sleepTime = _PullDelayTimeWhenError


Mime
View raw message