rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: fix(route): update route (#339)
Date Tue, 24 Dec 2019 02:41:26 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new be743f7  fix(route): update route (#339)
be743f7 is described below

commit be743f70415addae8f113861000bc1e079a1d4bd
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Tue Dec 24 10:41:20 2019 +0800

    fix(route): update route (#339)
    
    - fix producer publish update logic
    - fix consumer subcription update logic
    
    Closes #338
---
 internal/client.go       | 47 +++++++++++++++++++++++------------------------
 internal/mock_client.go  |  2 +-
 internal/mock_namesrv.go |  4 ++--
 internal/namesrv.go      |  2 +-
 internal/route.go        |  9 +++++----
 producer/producer.go     |  3 ++-
 6 files changed, 34 insertions(+), 33 deletions(-)

diff --git a/internal/client.go b/internal/client.go
index b1ad3a6..c41417d 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -148,7 +148,7 @@ type RMQClient interface {
 	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader)
(*primitive.PullResult, error)
 	PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader,
f func(result *primitive.PullResult)) error
 	RebalanceImmediately()
-	UpdatePublishInfo(topic string, data *TopicRouteData)
+	UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
 }
 
 var _ RMQClient = new(rmqClient)
@@ -475,7 +475,8 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
 		return true
 	})
 	for topic := range publishTopicSet {
-		c.UpdatePublishInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
+		data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+		c.UpdatePublishInfo(topic, data, changed)
 	}
 
 	subscribedTopicSet := make(map[string]bool, 0)
@@ -489,7 +490,8 @@ func (c *rmqClient) UpdateTopicRouteInfo() {
 	})
 
 	for topic := range subscribedTopicSet {
-		c.updateSubscribeInfo(topic, c.namesrvs.UpdateTopicRouteInfo(topic))
+		data, changed := c.namesrvs.UpdateTopicRouteInfo(topic)
+		c.updateSubscribeInfo(topic, data, changed)
 	}
 }
 
@@ -639,36 +641,27 @@ func (c *rmqClient) RebalanceImmediately() {
 	})
 }
 
-func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool) {
 	if data == nil {
 		return
 	}
-	if !c.isNeedUpdatePublishInfo(topic) {
-		return
-	}
-	c.producerMap.Range(func(key, value interface{}) bool {
-		p := value.(InnerProducer)
-		publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
-		publishInfo.HaveTopicRouterInfo = true
-		p.UpdateTopicPublishInfo(topic, publishInfo)
-		return true
-	})
-}
 
-func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
-	var result bool
 	c.producerMap.Range(func(key, value interface{}) bool {
 		p := value.(InnerProducer)
-		if p.IsPublishTopicNeedUpdate(topic) {
-			result = true
-			return false
+		updated := changed
+		if !updated {
+			updated = p.IsPublishTopicNeedUpdate(topic)
+		}
+		if updated {
+			publishInfo := c.namesrvs.routeData2PublishInfo(topic, data)
+			publishInfo.HaveTopicRouterInfo = true
+			p.UpdateTopicPublishInfo(topic, publishInfo)
 		}
 		return true
 	})
-	return result
 }
 
-func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData, changed bool)
{
 	if data == nil {
 		return
 	}
@@ -677,8 +670,14 @@ func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData)
{
 	}
 	c.consumerMap.Range(func(key, value interface{}) bool {
 		consumer := value.(InnerConsumer)
-		// TODO
-		consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+		updated := changed
+		if !updated {
+			updated = consumer.IsSubscribeTopicNeedUpdate(topic)
+		}
+		if updated {
+			consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+		}
+
 		return true
 	})
 }
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 730c073..244cd0c 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -398,7 +398,7 @@ func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call
{
 }
 
 // UpdatePublishInfo mocks base method
-func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
{
 	m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
 }
 
diff --git a/internal/mock_namesrv.go b/internal/mock_namesrv.go
index 365e784..0a983bd 100644
--- a/internal/mock_namesrv.go
+++ b/internal/mock_namesrv.go
@@ -87,11 +87,11 @@ func (mr *MockNamesrvsMockRecorder) UpdateNameServerAddress(nameServer,
instance
 }
 
 // UpdateTopicRouteInfo mocks base method
-func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (m *MockNamesrvs) UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed
bool) {
 	m.ctrl.T.Helper()
 	ret := m.ctrl.Call(m, "UpdateTopicRouteInfo", topic)
 	ret0, _ := ret[0].(*TopicRouteData)
-	return ret0
+	return ret0, changed
 }
 
 // UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
diff --git a/internal/namesrv.go b/internal/namesrv.go
index 3c9cf12..43c5d5d 100644
--- a/internal/namesrv.go
+++ b/internal/namesrv.go
@@ -54,7 +54,7 @@ type Namesrvs interface {
 
 	cleanOfflineBroker()
 
-	UpdateTopicRouteInfo(topic string) *TopicRouteData
+	UpdateTopicRouteInfo(topic string) (routeData *TopicRouteData, changed bool)
 
 	FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
 
diff --git a/internal/route.go b/internal/route.go
index 5c7b114..327706f 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -112,7 +112,7 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
 	return int(qIndex) % length
 }
 
-func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData {
+func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
 	// Todo process lock timeout
 	s.lockNamesrv.Lock()
 	defer s.lockNamesrv.Unlock()
@@ -124,7 +124,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData
{
 			rlog.Warning("query topic route from server error", map[string]interface{}{
 				rlog.LogKeyUnderlayError: err,
 			})
-			return nil
+			return nil, false
 		}
 	}
 
@@ -132,10 +132,11 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData
{
 		rlog.Warning("queryTopicRouteInfoFromServer return nil", map[string]interface{}{
 			rlog.LogKeyTopic: topic,
 		})
-		return nil
+		return nil, false
 	}
 
 	oldRouteData, exist := s.routeDataMap.Load(topic)
+
 	changed := true
 	if exist {
 		changed = s.topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
@@ -153,7 +154,7 @@ func (s *namesrvs) UpdateTopicRouteInfo(topic string) *TopicRouteData
{
 		}
 	}
 
-	return routeData.clone()
+	return routeData.clone(), changed
 }
 
 func (s *namesrvs) AddBroker(routeData *TopicRouteData) {
diff --git a/producer/producer.go b/producer/producer.go
index 013d3aa..05bcb15 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -340,7 +340,8 @@ func (p *defaultProducer) selectMessageQueue(msg *primitive.Message) *primitive.
 
 	v, exist := p.publishInfo.Load(topic)
 	if !exist {
-		p.client.UpdatePublishInfo(topic, p.options.Namesrv.UpdateTopicRouteInfo(topic))
+		data, changed := p.options.Namesrv.UpdateTopicRouteInfo(topic)
+		p.client.UpdatePublishInfo(topic, data, changed)
 		v, exist = p.publishInfo.Load(topic)
 	}
 


Mime
View raw message