rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: fix: pull timeout and suspend time is not consistent with java (#350)
Date Fri, 03 Jan 2020 03:42:48 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 598c979  fix: pull timeout and suspend time is not consistent with java (#350)
598c979 is described below

commit 598c979ebd6db2212776e8e38d7f623090360481
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Fri Jan 3 11:42:40 2020 +0800

    fix: pull timeout and suspend time is not consistent with java (#350)
    
    - modify pull timeout
    - add suspend time
    
    Closes #349
---
 consumer/push_consumer.go | 19 ++++++++++---------
 internal/client.go        |  4 ++--
 internal/request.go       |  3 ++-
 3 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 74251a1..1add8dd 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -599,15 +599,16 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 		sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
 
 		pullRequest := &internal.PullMessageRequestHeader{
-			ConsumerGroup:  pc.consumerGroup,
-			Topic:          request.mq.Topic,
-			QueueId:        int32(request.mq.QueueId),
-			QueueOffset:    request.nextOffset,
-			MaxMsgNums:     pc.option.PullBatchSize,
-			SysFlag:        sysFlag,
-			CommitOffset:   commitOffsetValue,
-			SubExpression:  _SubAll,
-			ExpressionType: string(TAG), // TODO
+			ConsumerGroup:        pc.consumerGroup,
+			Topic:                request.mq.Topic,
+			QueueId:              int32(request.mq.QueueId),
+			QueueOffset:          request.nextOffset,
+			MaxMsgNums:           pc.option.PullBatchSize,
+			SysFlag:              sysFlag,
+			CommitOffset:         commitOffsetValue,
+			SubExpression:        _SubAll,
+			ExpressionType:       string(TAG),
+			SuspendTimeoutMillis: 20 * time.Second,
 		}
 		//
 		//if data.ExpType == string(TAG) {
diff --git a/internal/client.go b/internal/client.go
index c41417d..fec61fd 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -51,7 +51,7 @@ const (
 	_PersistOffset = 5 * time.Second
 
 	// Rebalance interval
-	_RebalanceInterval = 10 * time.Second
+	_RebalanceInterval = 20 * time.Second
 )
 
 var (
@@ -544,7 +544,7 @@ func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 // PullMessage with sync
 func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader)
(*primitive.PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 10*time.Second)
+	res, err := c.remoteClient.InvokeSync(ctx, brokerAddrs, cmd, 30*time.Second)
 	if err != nil {
 		return nil, err
 	}
diff --git a/internal/request.go b/internal/request.go
index 3e477ea..5438790 100644
--- a/internal/request.go
+++ b/internal/request.go
@@ -210,10 +210,11 @@ func (request *PullMessageRequestHeader) Encode() map[string]string
{
 	maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
 	maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
 	maps["commitOffset"] = fmt.Sprintf("%d", request.CommitOffset)
-	maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", request.SuspendTimeoutMillis)
+	maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", request.SuspendTimeoutMillis/time.Millisecond)
 	maps["subscription"] = request.SubExpression
 	maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
 	maps["expressionType"] = request.ExpressionType
+
 	return maps
 }
 


Mime
View raw message