rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wenf...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #371]feat: clean extra code for internal packagec (#372)
Date Tue, 07 Jan 2020 03:56:24 GMT
This is an automated email from the ASF dual-hosted git repository.

wenfeng pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 33798bc  [ISSUE #371]feat: clean extra code for internal packagec (#372)
33798bc is described below

commit 33798bc30375afecaf1605cfdd8efee9527fb5f4
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Tue Jan 7 11:56:18 2020 +0800

    [ISSUE #371]feat: clean extra code for internal packagec (#372)
    
    * feat: clean extra code for internal packagec
    
    Closes #371
    
    * fix
---
 consumer/push_consumer.go |  1 +
 internal/client.go        | 17 +++--------------
 internal/mock_client.go   | 23 +++++++++++------------
 internal/route.go         | 11 -----------
 internal/validators.go    |  4 ----
 producer/producer.go      |  1 +
 producer/producer_test.go |  1 +
 7 files changed, 17 insertions(+), 41 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 1add8dd..0c7f224 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -212,6 +212,7 @@ func (pc *pushConsumer) Shutdown() error {
 		pc.lockTicker.Stop()
 		close(pc.done)
 
+		pc.client.UnregisterConsumer(pc.consumerGroup)
 		err = pc.defaultConsumer.shutdown()
 	})
 
diff --git a/internal/client.go b/internal/client.go
index acc25ea..ca8cc88 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -131,6 +131,7 @@ type RMQClient interface {
 	ClientID() string
 
 	RegisterProducer(group string, producer InnerProducer)
+	UnregisterProducer(group string)
 	InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
 		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
 	InvokeAsync(ctx context.Context, addr string, request *remote.RemotingCommand,
@@ -146,7 +147,6 @@ type RMQClient interface {
 	RegisterConsumer(group string, consumer InnerConsumer) error
 	UnregisterConsumer(group string)
 	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader)
(*primitive.PullResult, error)
-	PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader,
f func(result *primitive.PullResult)) error
 	RebalanceImmediately()
 	UpdatePublishInfo(topic string, data *TopicRouteData, changed bool)
 }
@@ -600,11 +600,6 @@ func (c *rmqClient) decodeCommandCustomHeader(pr *primitive.PullResult,
cmd *rem
 	}
 }
 
-// PullMessageAsync pull message async
-func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequestHeader,
f func(result *primitive.PullResult)) error {
-	return nil
-}
-
 func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
 	_, exist := c.consumerMap.Load(group)
 	if exist {
@@ -618,6 +613,7 @@ func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer)
error
 }
 
 func (c *rmqClient) UnregisterConsumer(group string) {
+	c.consumerMap.Delete(group)
 }
 
 func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
@@ -625,14 +621,7 @@ func (c *rmqClient) RegisterProducer(group string, producer InnerProducer)
{
 }
 
 func (c *rmqClient) UnregisterProducer(group string) {
-}
-
-func (c *rmqClient) SelectProducer(group string) InnerProducer {
-	return nil
-}
-
-func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
-	return nil
+	c.producerMap.Delete(group)
 }
 
 func (c *rmqClient) RebalanceImmediately() {
diff --git a/internal/mock_client.go b/internal/mock_client.go
index d01a51f..d8ed7b2 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -14,6 +14,7 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 See the License for the specific language governing permissions and
 limitations under the License.
 */
+
 // Code generated by MockGen. DO NOT EDIT.
 // Source: client.go
 
@@ -268,6 +269,16 @@ func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer
interface{
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer),
group, producer)
 }
 
+// UnregisterProducer mocks base method
+func (m *MockRMQClient) UnregisterProducer(group string) {
+	m.ctrl.Call(m, "UnregisterProducer", group)
+}
+
+// UnregisterProducer indicates an expected call of UnregisterProducer
+func (mr *MockRMQClientMockRecorder) UnregisterProducer(group interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterProducer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterProducer),
group)
+}
+
 // InvokeSync mocks base method
 func (m *MockRMQClient) InvokeSync(ctx context.Context, addr string, request *remote.RemotingCommand,
timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
 	ret := m.ctrl.Call(m, "InvokeSync", ctx, addr, request, timeoutMillis)
@@ -387,18 +398,6 @@ func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request
inter
 	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage),
ctx, brokerAddrs, request)
 }
 
-// PullMessageAsync mocks base method
-func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequestHeader, f func(*primitive.PullResult)) error {
-	ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
-	ret0, _ := ret[0].(error)
-	return ret0
-}
-
-// PullMessageAsync indicates an expected call of PullMessageAsync
-func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{})
*gomock.Call {
-	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync),
ctx, brokerAddrs, request, f)
-}
-
 // RebalanceImmediately mocks base method
 func (m *MockRMQClient) RebalanceImmediately() {
 	m.ctrl.Call(m, "RebalanceImmediately")
diff --git a/internal/route.go b/internal/route.go
index f4d4116..c5e771e 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -113,7 +113,6 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
 }
 
 func (s *namesrvs) UpdateTopicRouteInfo(topic string) (*TopicRouteData, bool) {
-	// Todo process lock timeout
 	s.lockNamesrv.Lock()
 	defer s.lockNamesrv.Unlock()
 
@@ -258,16 +257,6 @@ func (s *namesrvs) FetchSubscribeMessageQueues(topic string) ([]*primitive.Messa
 	return mqs, nil
 }
 
-func (s *namesrvs) FindMQByTopic(topic string) *primitive.MessageQueue {
-	mqs, err := s.FetchPublishMessageQueues(topic)
-	if err != nil {
-		return nil
-	}
-	r := rand.New(rand.NewSource(time.Now().UnixNano()))
-	i := utils.AbsInt(r.Int())
-	return mqs[i%len(mqs)]
-}
-
 func (s *namesrvs) FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
{
 	var (
 		err       error
diff --git a/internal/validators.go b/internal/validators.go
index 7753942..e693fde 100644
--- a/internal/validators.go
+++ b/internal/validators.go
@@ -37,10 +37,6 @@ func ValidateGroup(group string) {
 		rlog.Fatal("consumerGroup is empty", nil)
 	}
 
-	//if !_Pattern.Match([]byte(group)) {
-	//	rlog.Fatalf("the specified group[%s] contains illegal characters, allowing only %s",
group, _ValidPattern)
-	//}
-
 	if len(group) > _CharacterMaxLength {
 		rlog.Fatal("the specified group is longer than group max length 255.", nil)
 	}
diff --git a/producer/producer.go b/producer/producer.go
index 8e0661f..a47d76f 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -87,6 +87,7 @@ func (p *defaultProducer) Start() error {
 
 func (p *defaultProducer) Shutdown() error {
 	atomic.StoreInt32(&p.state, int32(internal.StateShutdown))
+	p.client.UnregisterProducer(p.group)
 	p.client.Shutdown()
 	return nil
 }
diff --git a/producer/producer_test.go b/producer/producer_test.go
index 387e9b0..e1273bb 100644
--- a/producer/producer_test.go
+++ b/producer/producer_test.go
@@ -51,6 +51,7 @@ func TestShutdown(t *testing.T) {
 	assert.Nil(t, err)
 
 	client.EXPECT().Shutdown().Return()
+	client.EXPECT().UnregisterProducer(gomock.Any()).Return()
 	err = p.Shutdown()
 	assert.Nil(t, err)
 


Mime
View raw message