rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: Prevent pc.prch blocking (#327)
Date Wed, 11 Dec 2019 07:45:26 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new f9521f9  Prevent pc.prch blocking (#327)
f9521f9 is described below

commit f9521f97be4c50033ef15c5c435e375d2a63dd39
Author: 兰园望月 <1376030681@qq.com>
AuthorDate: Wed Dec 11 15:45:18 2019 +0800

    Prevent pc.prch blocking (#327)
---
 consumer/push_consumer.go | 32 ++++++++++++++++----------------
 1 file changed, 16 insertions(+), 16 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c915a32..6d70393 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -148,20 +148,16 @@ func (pc *pushConsumer) Start() error {
 			return
 		}
 
-		pc.Rebalance()
-		time.Sleep(1 * time.Second)
-
 		go func() {
-			// initial lock.
-			time.Sleep(1000 * time.Millisecond)
-			pc.lockAll()
-
+			// todo start clean msg expired
 			for {
 				select {
-				case <-pc.lockTicker.C:
-					pc.lockAll()
+				case pr := <-pc.prCh:
+					go func() {
+						pc.pullMessage(&pr)
+					}()
 				case <-pc.done:
-					rlog.Info("push consumer close tick.", map[string]interface{}{
+					rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
 						rlog.LogKeyConsumerGroup: pc.consumerGroup,
 					})
 					return
@@ -169,16 +165,20 @@ func (pc *pushConsumer) Start() error {
 			}
 		}()
 
+		pc.Rebalance()
+		time.Sleep(1 * time.Second)
+
 		go func() {
-			// todo start clean msg expired
+			// initial lock.
+			time.Sleep(1000 * time.Millisecond)
+			pc.lockAll()
+
 			for {
 				select {
-				case pr := <-pc.prCh:
-					go func() {
-						pc.pullMessage(&pr)
-					}()
+				case <-pc.lockTicker.C:
+					pc.lockAll()
 				case <-pc.done:
-					rlog.Info("push consumer close pullConsumer listener.", map[string]interface{}{
+					rlog.Info("push consumer close tick.", map[string]interface{}{
 						rlog.LogKeyConsumerGroup: pc.consumerGroup,
 					})
 					return


Mime
View raw message