rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #471] The message mode, ctype and where was wrong in the heartbeat data
Date Fri, 24 Apr 2020 02:00:14 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 0897f19  [ISSUE #471] The message mode, ctype and where was wrong in the heartbeat
data
0897f19 is described below

commit 0897f19d470248ac1af0e4985e93f8c4b2ce81b5
Author: DandelionJR <luyongjie1115@163.com>
AuthorDate: Fri Apr 24 10:00:07 2020 +0800

    [ISSUE #471] The message mode, ctype and where was wrong in the heartbeat data
    
    close #471
---
 consumer/push_consumer.go | 22 ++++++++++++++++++++++
 internal/client.go        |  9 ++++++---
 2 files changed, 28 insertions(+), 3 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9118ae2..d14f0a5 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -267,6 +267,28 @@ func (pc *pushConsumer) IsUnitMode() bool {
 	return pc.unitMode
 }
 
+func (pc *pushConsumer) GetcType() string {
+	return string(pc.cType)
+}
+
+func (pc *pushConsumer) GetModel() string {
+	return pc.model.String()
+}
+
+func (pc *pushConsumer) GetWhere() string {
+	switch pc.fromWhere {
+	case ConsumeFromLastOffset:
+		return "CONSUME_FROM_LAST_OFFSET"
+	case ConsumeFromFirstOffset:
+		return "CONSUME_FROM_FIRST_OFFSET"
+	case ConsumeFromTimestamp:
+		return "CONSUME_FROM_TIMESTAMP"
+	default:
+		return "UNKOWN"
+	}
+
+}
+
 func (pc *pushConsumer) GetConsumerRunningInfo() *internal.ConsumerRunningInfo {
 	info := internal.NewConsumerRunningInfo()
 
diff --git a/internal/client.go b/internal/client.go
index 253b96e..954cc57 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -84,6 +84,9 @@ type InnerConsumer interface {
 	Rebalance()
 	IsUnitMode() bool
 	GetConsumerRunningInfo() *ConsumerRunningInfo
+	GetcType() string
+	GetModel() string
+	GetWhere() string
 }
 
 func DefaultClientOptions() ClientOptions {
@@ -454,9 +457,9 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 		consumer := value.(InnerConsumer)
 		cData := consumerData{
 			GroupName:         key.(string),
-			CType:             "CONSUME_PASSIVELY",
-			MessageModel:      "CLUSTERING",
-			Where:             "CONSUME_FROM_FIRST_OFFSET",
+			CType:             consumeType(consumer.GetcType()),
+			MessageModel:      strings.ToUpper(consumer.GetModel()),
+			Where:             consumer.GetWhere(),
 			UnitMode:          consumer.IsUnitMode(),
 			SubscriptionDatas: consumer.SubscriptionDataList(),
 		}


Mime
View raw message