From commits-return-3690-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.apache.org Wed Jul 10 03:17:16 2019 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [207.244.88.153]) by minotaur.apache.org (Postfix) with SMTP id 83F33192E6 for ; Wed, 10 Jul 2019 03:17:15 +0000 (UTC) Received: (qmail 72908 invoked by uid 500); 10 Jul 2019 03:17:15 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 72858 invoked by uid 500); 10 Jul 2019 03:17:14 -0000 Mailing-List: contact commits-help@rocketmq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.apache.org Delivered-To: mailing list commits@rocketmq.apache.org Received: (qmail 72849 invoked by uid 99); 10 Jul 2019 03:17:14 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 10 Jul 2019 03:17:14 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4691187AD7; Wed, 10 Jul 2019 03:17:14 +0000 (UTC) Date: Wed, 10 Jul 2019 03:17:14 +0000 To: "commits@rocketmq.apache.org" Subject: [rocketmq-client-go] branch native updated: [ISSUE #108] Add producer unit test (#109) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <156272863421.25764.18377060755605447362@gitbox.apache.org> From: huzongtang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: rocketmq-client-go X-Git-Refname: refs/heads/native X-Git-Reftype: branch X-Git-Oldrev: e696fd8a71de1b39e206e734b2b366f9db325886 X-Git-Newrev: 95f5b0dd1ce2eaba64e1e1185c5407ef49bc5264 X-Git-Rev: 95f5b0dd1ce2eaba64e1e1185c5407ef49bc5264 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. huzongtang pushed a commit to branch native in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git The following commit(s) were added to refs/heads/native by this push: new 95f5b0d [ISSUE #108] Add producer unit test (#109) 95f5b0d is described below commit 95f5b0dd1ce2eaba64e1e1185c5407ef49bc5264 Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com> AuthorDate: Wed Jul 10 11:17:09 2019 +0800 [ISSUE #108] Add producer unit test (#109) * add producer unit test. resolve #108 * clean line --- consumer/consumer.go | 2 +- consumer/offset_store.go | 4 +- consumer/pull_consumer.go | 2 +- examples/producer/interceptor/main.go | 1 - go.mod | 2 +- go.sum | 5 +- internal/client.go | 106 ++++++--- internal/client_test.go | 2 +- internal/mock_client.go | 411 ++++++++++++++++++++++++++++++++++ internal/route.go | 7 + producer/producer.go | 10 +- producer/producer_test.go | 234 +++++++++++++++++++ producer/selector_test.go | 5 +- 13 files changed, 743 insertions(+), 48 deletions(-) diff --git a/consumer/consumer.go b/consumer/consumer.go index a02602a..5bd9674 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -225,7 +225,7 @@ type defaultConsumer struct { fromWhere ConsumeFromWhere cType ConsumeType - client *internal.RMQClient + client internal.RMQClient mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue) state internal.ServiceState pause bool diff --git a/consumer/offset_store.go b/consumer/offset_store.go index 5c55e27..45d80f2 100644 --- a/consumer/offset_store.go +++ b/consumer/offset_store.go @@ -180,11 +180,11 @@ func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue) { type remoteBrokerOffsetStore struct { group string OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"` - client *internal.RMQClient + client internal.RMQClient mutex sync.RWMutex } -func NewRemoteOffsetStore(group string, client *internal.RMQClient) OffsetStore { +func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore { return &remoteBrokerOffsetStore{ group: group, client: client, diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go index 32cbaed..c8bcdc5 100644 --- a/consumer/pull_consumer.go +++ b/consumer/pull_consumer.go @@ -41,7 +41,7 @@ var ( type defaultPullConsumer struct { state internal.ServiceState option consumerOptions - client *internal.RMQClient + client internal.RMQClient GroupName string Model MessageModel UnitMode bool diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go index 4822de9..643d2a8 100644 --- a/examples/producer/interceptor/main.go +++ b/examples/producer/interceptor/main.go @@ -42,7 +42,6 @@ func main() { } for i := 0; i < 10; i++ { res, err := p.SendSync(context.Background(), &primitive.Message{ - //Topic: "test", Topic: "TopicTest", Body: []byte("Hello RocketMQ Go Client!"), Properties: map[string]string{"order": strconv.Itoa(i)}, diff --git a/go.mod b/go.mod index 684a7d2..322c847 100644 --- a/go.mod +++ b/go.mod @@ -4,11 +4,11 @@ go 1.12 require ( github.com/emirpasic/gods v1.12.0 + github.com/golang/mock v1.3.1 github.com/pkg/errors v0.8.1 github.com/sirupsen/logrus v1.4.1 github.com/stretchr/testify v1.3.0 github.com/tidwall/gjson v1.2.1 github.com/tidwall/match v1.0.1 // indirect github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect - golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 // indirect ) diff --git a/go.sum b/go.sum index 3b0271b..06d0d34 100644 --- a/go.sum +++ b/go.sum @@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg= github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= +github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -30,5 +32,4 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 h1:fTfk6GjmihJbK0mSUFgPPgYpsdmApQ86Mcd4GuKax9U= -golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= diff --git a/internal/client.go b/internal/client.go index a6e1bc5..1db43c7 100644 --- a/internal/client.go +++ b/internal/client.go @@ -118,7 +118,41 @@ func (opt *ClientOptions) String() string { opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.ACLEnabled) } -type RMQClient struct { +//go:generate mockgen -source client.go -destination mock_client.go --package internal RMQClient +type RMQClient interface { + Start() + Shutdown() + + ClientID() string + + RegisterProducer(group string, producer InnerProducer) + InvokeSync(addr string, request *remote.RemotingCommand, + timeoutMillis time.Duration) (*remote.RemotingCommand, error) + InvokeAsync(addr string, request *remote.RemotingCommand, + timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error + InvokeOneWay(addr string, request *remote.RemotingCommand, + timeoutMillis time.Duration) error + CheckClientInBroker() + SendHeartbeatToAllBrokerWithLock() + UpdateTopicRouteInfo() + SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, + msgs []*primitive.Message, f func(result *primitive.SendResult)) error + SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, + msgs []*primitive.Message) (*primitive.SendResult, error) + + ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) + + RegisterConsumer(group string, consumer InnerConsumer) error + UnregisterConsumer(group string) + PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) + PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error + RebalanceImmediately() + UpdatePublishInfo(topic string, data *TopicRouteData) +} + +var _ RMQClient = new(rmqClient) + +type rmqClient struct { option ClientOptions // group -> InnerProducer producerMap sync.Map @@ -134,8 +168,8 @@ type RMQClient struct { var clientMap sync.Map -func GetOrNewRocketMQClient(option ClientOptions) *RMQClient { - client := &RMQClient{ +func GetOrNewRocketMQClient(option ClientOptions) *rmqClient { + client := &rmqClient{ option: option, remoteClient: remote.NewRemotingClient(), } @@ -147,10 +181,10 @@ func GetOrNewRocketMQClient(option ClientOptions) *RMQClient { return nil }) } - return actual.(*RMQClient) + return actual.(*rmqClient) } -func (c *RMQClient) Start() { +func (c *rmqClient) Start() { //ctx, cancel := context.WithCancel(context.Background()) //c.cancel = cancel c.close = false @@ -162,7 +196,7 @@ func (c *RMQClient) Start() { go func() { // delay time.Sleep(50 * time.Millisecond) - for !c.close{ + for !c.close { c.UpdateTopicRouteInfo() time.Sleep(_PullNameServerInterval) } @@ -170,7 +204,7 @@ func (c *RMQClient) Start() { // TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock go func() { - for !c.close{ + for !c.close { cleanOfflineBroker() c.SendHeartbeatToAllBrokerWithLock() time.Sleep(_HeartbeatBrokerInterval) @@ -180,7 +214,7 @@ func (c *RMQClient) Start() { // schedule persist offset go func() { //time.Sleep(10 * time.Second) - for !c.close{ + for !c.close { c.consumerMap.Range(func(key, value interface{}) bool { consumer := value.(InnerConsumer) consumer.PersistConsumerOffset() @@ -191,7 +225,7 @@ func (c *RMQClient) Start() { }() go func() { - for !c.close{ + for !c.close { c.RebalanceImmediately() time.Sleep(_RebalanceInterval) } @@ -199,12 +233,12 @@ func (c *RMQClient) Start() { }) } -func (c *RMQClient) Shutdown() { +func (c *rmqClient) Shutdown() { c.remoteClient.ShutDown() c.close = true } -func (c *RMQClient) ClientID() string { +func (c *rmqClient) ClientID() string { id := c.option.ClientIP + "@" + c.option.InstanceName if c.option.UnitName != "" { id += "@" + c.option.UnitName @@ -212,7 +246,7 @@ func (c *RMQClient) ClientID() string { return id } -func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand, +func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) { if c.close { return nil, ErrServiceState @@ -220,7 +254,7 @@ func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand, return c.remoteClient.InvokeSync(addr, request, timeoutMillis) } -func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, +func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error { if c.close { return ErrServiceState @@ -231,7 +265,7 @@ func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, } -func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, +func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error { if c.close { return ErrServiceState @@ -239,11 +273,11 @@ func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis) } -func (c *RMQClient) CheckClientInBroker() { +func (c *rmqClient) CheckClientInBroker() { } // TODO -func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() { +func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() { c.hbMutex.Lock() defer c.hbMutex.Unlock() hbData := &heartbeatData{ @@ -301,7 +335,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() { }) } -func (c *RMQClient) UpdateTopicRouteInfo() { +func (c *rmqClient) UpdateTopicRouteInfo() { publishTopicSet := make(map[string]bool, 0) c.producerMap.Range(func(key, value interface{}) bool { producer := value.(InnerProducer) @@ -328,17 +362,17 @@ func (c *RMQClient) UpdateTopicRouteInfo() { }) for topic := range subscribedTopicSet { - c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic)) + c.updateSubscribeInfo(topic, UpdateTopicRouteInfo(topic)) } } // SendMessageAsync send message with batch by async -func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, +func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, msgs []*primitive.Message, f func(result *primitive.SendResult)) error { return nil } -func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, +func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, msgs []*primitive.Message) (*primitive.SendResult, error) { cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs)) err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second) @@ -348,7 +382,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, r return nil, err } -func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { +func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { var status primitive.SendStatus switch cmd.Code { case ResFlushDiskTimeout: @@ -394,7 +428,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC } // PullMessage with sync -func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) { +func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) { cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil) res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second) if err != nil { @@ -404,7 +438,7 @@ func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request return c.processPullResponse(res) } -func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) { +func (c *rmqClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) { pullResult := &primitive.PullResult{} switch response.Code { @@ -426,7 +460,7 @@ func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*prim return pullResult, nil } -func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand) { +func (c *rmqClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand) { v, exist := cmd.ExtFields["maxOffset"] if exist { pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64) @@ -449,34 +483,34 @@ func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *rem } // PullMessageAsync pull message async -func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error { +func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error { return nil } -func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error { +func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error { c.consumerMap.Store(group, consumer) return nil } -func (c *RMQClient) UnregisterConsumer(group string) { +func (c *rmqClient) UnregisterConsumer(group string) { } -func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) { +func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) { c.producerMap.Store(group, producer) } -func (c *RMQClient) UnregisterProducer(group string) { +func (c *rmqClient) UnregisterProducer(group string) { } -func (c *RMQClient) SelectProducer(group string) InnerProducer { +func (c *rmqClient) SelectProducer(group string) InnerProducer { return nil } -func (c *RMQClient) SelectConsumer(group string) InnerConsumer { +func (c *rmqClient) SelectConsumer(group string) InnerConsumer { return nil } -func (c *RMQClient) RebalanceImmediately() { +func (c *rmqClient) RebalanceImmediately() { c.consumerMap.Range(func(key, value interface{}) bool { consumer := value.(InnerConsumer) consumer.Rebalance() @@ -484,7 +518,7 @@ func (c *RMQClient) RebalanceImmediately() { }) } -func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) { +func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) { if data == nil { return } @@ -500,7 +534,7 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) { }) } -func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool { +func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool { var result bool c.producerMap.Range(func(key, value interface{}) bool { p := value.(InnerProducer) @@ -513,7 +547,7 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool { return result } -func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) { +func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) { if data == nil { return } @@ -528,7 +562,7 @@ func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) { }) } -func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool { +func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool { var result bool c.consumerMap.Range(func(key, value interface{}) bool { consumer := value.(InnerConsumer) diff --git a/internal/client_test.go b/internal/client_test.go index aafbca7..7814ddd 100644 --- a/internal/client_test.go +++ b/internal/client_test.go @@ -23,7 +23,7 @@ import ( ) func TestRMQClient_PullMessage(t *testing.T) { - client := GetOrNewRocketMQClient(ClientOption{}) + client := GetOrNewRocketMQClient(ClientOptions{}) req := &PullMessageRequest{ ConsumerGroup: "testGroup", Topic: "wenfeng", diff --git a/internal/mock_client.go b/internal/mock_client.go new file mode 100644 index 0000000..3ee3f50 --- /dev/null +++ b/internal/mock_client.go @@ -0,0 +1,411 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: client.go + +// Package internal is a generated GoMock package. +package internal + +import ( + context "context" + remote "github.com/apache/rocketmq-client-go/internal/remote" + primitive "github.com/apache/rocketmq-client-go/primitive" + gomock "github.com/golang/mock/gomock" + reflect "reflect" + time "time" +) + +// MockInnerProducer is a mock of InnerProducer interface +type MockInnerProducer struct { + ctrl *gomock.Controller + recorder *MockInnerProducerMockRecorder +} + +// MockInnerProducerMockRecorder is the mock recorder for MockInnerProducer +type MockInnerProducerMockRecorder struct { + mock *MockInnerProducer +} + +// NewMockInnerProducer creates a new mock instance +func NewMockInnerProducer(ctrl *gomock.Controller) *MockInnerProducer { + mock := &MockInnerProducer{ctrl: ctrl} + mock.recorder = &MockInnerProducerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder { + return m.recorder +} + +// PublishTopicList mocks base method +func (m *MockInnerProducer) PublishTopicList() []string { + ret := m.ctrl.Call(m, "PublishTopicList") + ret0, _ := ret[0].([]string) + return ret0 +} + +// PublishTopicList indicates an expected call of PublishTopicList +func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList)) +} + +// UpdateTopicPublishInfo mocks base method +func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo) { + m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info) +} + +// UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo +func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicPublishInfo", reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo), topic, info) +} + +// IsPublishTopicNeedUpdate mocks base method +func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool { + ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsPublishTopicNeedUpdate indicates an expected call of IsPublishTopicNeedUpdate +func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPublishTopicNeedUpdate", reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate), topic) +} + +// IsUnitMode mocks base method +func (m *MockInnerProducer) IsUnitMode() bool { + ret := m.ctrl.Call(m, "IsUnitMode") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsUnitMode indicates an expected call of IsUnitMode +func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode)) +} + +// MockInnerConsumer is a mock of InnerConsumer interface +type MockInnerConsumer struct { + ctrl *gomock.Controller + recorder *MockInnerConsumerMockRecorder +} + +// MockInnerConsumerMockRecorder is the mock recorder for MockInnerConsumer +type MockInnerConsumerMockRecorder struct { + mock *MockInnerConsumer +} + +// NewMockInnerConsumer creates a new mock instance +func NewMockInnerConsumer(ctrl *gomock.Controller) *MockInnerConsumer { + mock := &MockInnerConsumer{ctrl: ctrl} + mock.recorder = &MockInnerConsumerMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder { + return m.recorder +} + +// PersistConsumerOffset mocks base method +func (m *MockInnerConsumer) PersistConsumerOffset() { + m.ctrl.Call(m, "PersistConsumerOffset") +} + +// PersistConsumerOffset indicates an expected call of PersistConsumerOffset +func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConsumerOffset", reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset)) +} + +// UpdateTopicSubscribeInfo mocks base method +func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) { + m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs) +} + +// UpdateTopicSubscribeInfo indicates an expected call of UpdateTopicSubscribeInfo +func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicSubscribeInfo", reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo), topic, mqs) +} + +// IsSubscribeTopicNeedUpdate mocks base method +func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool { + ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic) + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsSubscribeTopicNeedUpdate indicates an expected call of IsSubscribeTopicNeedUpdate +func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSubscribeTopicNeedUpdate", reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate), topic) +} + +// SubscriptionDataList mocks base method +func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData { + ret := m.ctrl.Call(m, "SubscriptionDataList") + ret0, _ := ret[0].([]*SubscriptionData) + return ret0 +} + +// SubscriptionDataList indicates an expected call of SubscriptionDataList +func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscriptionDataList", reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList)) +} + +// Rebalance mocks base method +func (m *MockInnerConsumer) Rebalance() { + m.ctrl.Call(m, "Rebalance") +} + +// Rebalance indicates an expected call of Rebalance +func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance", reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance)) +} + +// IsUnitMode mocks base method +func (m *MockInnerConsumer) IsUnitMode() bool { + ret := m.ctrl.Call(m, "IsUnitMode") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsUnitMode indicates an expected call of IsUnitMode +func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode)) +} + +// MockRMQClient is a mock of RMQClient interface +type MockRMQClient struct { + ctrl *gomock.Controller + recorder *MockRMQClientMockRecorder +} + +// MockRMQClientMockRecorder is the mock recorder for MockRMQClient +type MockRMQClientMockRecorder struct { + mock *MockRMQClient +} + +// NewMockRMQClient creates a new mock instance +func NewMockRMQClient(ctrl *gomock.Controller) *MockRMQClient { + mock := &MockRMQClient{ctrl: ctrl} + mock.recorder = &MockRMQClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use +func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder { + return m.recorder +} + +// Start mocks base method +func (m *MockRMQClient) Start() { + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockRMQClientMockRecorder) Start() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockRMQClient)(nil).Start)) +} + +// Shutdown mocks base method +func (m *MockRMQClient) Shutdown() { + m.ctrl.Call(m, "Shutdown") +} + +// Shutdown indicates an expected call of Shutdown +func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockRMQClient)(nil).Shutdown)) +} + +// ClientID mocks base method +func (m *MockRMQClient) ClientID() string { + ret := m.ctrl.Call(m, "ClientID") + ret0, _ := ret[0].(string) + return ret0 +} + +// ClientID indicates an expected call of ClientID +func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID", reflect.TypeOf((*MockRMQClient)(nil).ClientID)) +} + +// RegisterProducer mocks base method +func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) { + m.ctrl.Call(m, "RegisterProducer", group, producer) +} + +// RegisterProducer indicates an expected call of RegisterProducer +func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer), group, producer) +} + +// InvokeSync mocks base method +func (m *MockRMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) (*remote.RemotingCommand, error) { + ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis) + ret0, _ := ret[0].(*remote.RemotingCommand) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// InvokeSync indicates an expected call of InvokeSync +func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRMQClient)(nil).InvokeSync), addr, request, timeoutMillis) +} + +// InvokeAsync mocks base method +func (m *MockRMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error { + ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f) + ret0, _ := ret[0].(error) + return ret0 +} + +// InvokeAsync indicates an expected call of InvokeAsync +func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis, f interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync), addr, request, timeoutMillis, f) +} + +// InvokeOneWay mocks base method +func (m *MockRMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis time.Duration) error { + ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis) + ret0, _ := ret[0].(error) + return ret0 +} + +// InvokeOneWay indicates an expected call of InvokeOneWay +func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay), addr, request, timeoutMillis) +} + +// CheckClientInBroker mocks base method +func (m *MockRMQClient) CheckClientInBroker() { + m.ctrl.Call(m, "CheckClientInBroker") +} + +// CheckClientInBroker indicates an expected call of CheckClientInBroker +func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckClientInBroker", reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker)) +} + +// SendHeartbeatToAllBrokerWithLock mocks base method +func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() { + m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock") +} + +// SendHeartbeatToAllBrokerWithLock indicates an expected call of SendHeartbeatToAllBrokerWithLock +func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeartbeatToAllBrokerWithLock", reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock)) +} + +// UpdateTopicRouteInfo mocks base method +func (m *MockRMQClient) UpdateTopicRouteInfo() { + m.ctrl.Call(m, "UpdateTopicRouteInfo") +} + +// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo +func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo)) +} + +// SendMessageAsync mocks base method +func (m *MockRMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, msgs []*primitive.Message, f func(*primitive.SendResult)) error { + ret := m.ctrl.Call(m, "SendMessageAsync", ctx, brokerAddrs, brokerName, request, msgs, f) + ret0, _ := ret[0].(error) + return ret0 +} + +// SendMessageAsync indicates an expected call of SendMessageAsync +func (mr *MockRMQClientMockRecorder) SendMessageAsync(ctx, brokerAddrs, brokerName, request, msgs, f interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).SendMessageAsync), ctx, brokerAddrs, brokerName, request, msgs, f) +} + +// SendMessageOneWay mocks base method +func (m *MockRMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, msgs []*primitive.Message) (*primitive.SendResult, error) { + ret := m.ctrl.Call(m, "SendMessageOneWay", ctx, brokerAddrs, request, msgs) + ret0, _ := ret[0].(*primitive.SendResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SendMessageOneWay indicates an expected call of SendMessageOneWay +func (mr *MockRMQClientMockRecorder) SendMessageOneWay(ctx, brokerAddrs, request, msgs interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageOneWay", reflect.TypeOf((*MockRMQClient)(nil).SendMessageOneWay), ctx, brokerAddrs, request, msgs) +} + +// ProcessSendResponse mocks base method +func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { + varargs := []interface{}{brokerName, cmd, resp} + for _, a := range msgs { + varargs = append(varargs, a) + } + m.ctrl.Call(m, "ProcessSendResponse", varargs...) +} + +// ProcessSendResponse indicates an expected call of ProcessSendResponse +func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp interface{}, msgs ...interface{}) *gomock.Call { + varargs := append([]interface{}{brokerName, cmd, resp}, msgs...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSendResponse", reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse), varargs...) +} + +// RegisterConsumer mocks base method +func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) error { + ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer) + ret0, _ := ret[0].(error) + return ret0 +} + +// RegisterConsumer indicates an expected call of RegisterConsumer +func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer), group, consumer) +} + +// UnregisterConsumer mocks base method +func (m *MockRMQClient) UnregisterConsumer(group string) { + m.ctrl.Call(m, "UnregisterConsumer", group) +} + +// UnregisterConsumer indicates an expected call of UnregisterConsumer +func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer), group) +} + +// PullMessage mocks base method +func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) { + ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request) + ret0, _ := ret[0].(*primitive.PullResult) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PullMessage indicates an expected call of PullMessage +func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage), ctx, brokerAddrs, request) +} + +// PullMessageAsync mocks base method +func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(*primitive.PullResult)) error { + ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f) + ret0, _ := ret[0].(error) + return ret0 +} + +// PullMessageAsync indicates an expected call of PullMessageAsync +func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync), ctx, brokerAddrs, request, f) +} + +// RebalanceImmediately mocks base method +func (m *MockRMQClient) RebalanceImmediately() { + m.ctrl.Call(m, "RebalanceImmediately") +} + +// RebalanceImmediately indicates an expected call of RebalanceImmediately +func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceImmediately", reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately)) +} + +// UpdatePublishInfo mocks base method +func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) { + m.ctrl.Call(m, "UpdatePublishInfo", topic, data) +} + +// UpdatePublishInfo indicates an expected call of UpdatePublishInfo +func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data interface{}) *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo), topic, data) +} diff --git a/internal/route.go b/internal/route.go index 074cf90..dbb39c1 100644 --- a/internal/route.go +++ b/internal/route.go @@ -157,6 +157,13 @@ func UpdateTopicRouteInfo(topic string) *TopicRouteData { return routeData.clone() } +// just for test +func AddBroker(routeData *TopicRouteData) { + for _, brokerData := range routeData.BrokerDataList { + brokerAddressesMap.Store(brokerData.BrokerName, brokerData) + } +} + func FindBrokerAddrByTopic(topic string) string { v, exist := routeDataMap.Load(topic) if !exist { diff --git a/producer/producer.go b/producer/producer.go index 47c4026..ab501de 100644 --- a/producer/producer.go +++ b/producer/producer.go @@ -33,6 +33,7 @@ import ( var ( ErrTopicEmpty = errors.New("topic is nil") ErrMessageEmpty = errors.New("message is nil") + ErrNotRunning = errors.New("producer not started") ) func NewDefaultProducer(opts ...Option) (*defaultProducer, error) { @@ -84,7 +85,7 @@ func getChainedInterceptor(interceptors []primitive.Interceptor, cur int, finalI type defaultProducer struct { group string - client *internal.RMQClient + client internal.RMQClient state internal.ServiceState options producerOptions publishInfo sync.Map @@ -100,10 +101,16 @@ func (p *defaultProducer) Start() error { } func (p *defaultProducer) Shutdown() error { + p.state = internal.StateShutdown + p.client.Shutdown() return nil } func (p *defaultProducer) checkMsg(msg *primitive.Message) error { + if p.state != internal.StateRunning { + return ErrNotRunning + } + if msg == nil { return errors.New("message is nil") } @@ -242,6 +249,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message err = _err continue } + return nil } return err } diff --git a/producer/producer_test.go b/producer/producer_test.go new file mode 100644 index 0000000..0dd564f --- /dev/null +++ b/producer/producer_test.go @@ -0,0 +1,234 @@ +/* +Licensed to the Apache Software Foundation (ASF) under one or more +contributor license agreements. See the NOTICE file distributed with +this work for additional information regarding copyright ownership. +The ASF licenses this file to You under the Apache License, Version 2.0 +(the "License"); you may not use this file except in compliance with +the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package producer + +import ( + "context" + "testing" + "time" + + "github.com/apache/rocketmq-client-go/internal" + "github.com/apache/rocketmq-client-go/internal/remote" + "github.com/apache/rocketmq-client-go/primitive" + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" +) + +const ( + topic = "TopicTest" +) + +func TestShutdown(t *testing.T) { + p, _ := NewDefaultProducer( + WithNameServer([]string{"127.0.0.1:9876"}), + WithRetry(2), + WithQueueSelector(NewManualQueueSelector()), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := internal.NewMockRMQClient(ctrl) + p.client = client + + client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return() + client.EXPECT().Start().Return() + err := p.Start() + assert.Nil(t, err) + + client.EXPECT().Shutdown().Return() + err = p.Shutdown() + assert.Nil(t, err) + + ctx := context.Background() + msg := new(primitive.Message) + + r, err := p.SendSync(ctx, msg) + assert.Equal(t, ErrNotRunning, err) + assert.Nil(t, r) + + err = p.SendOneWay(ctx, msg) + assert.Equal(t, ErrNotRunning, err) + assert.Nil(t, r) + + f := func(context.Context, *primitive.SendResult, error) { + assert.False(t, true, "should not come in") + } + err = p.SendAsync(ctx, f, msg) + assert.Equal(t, ErrNotRunning, err) + assert.Nil(t, r) +} + +func mockB4Send(p *defaultProducer) { + p.publishInfo.Store(topic, &internal.TopicPublishInfo{ + HaveTopicRouterInfo: true, + MqList: []*primitive.MessageQueue{ + { + Topic: topic, + BrokerName: "aa", + QueueId: 0, + }, + }, + }) + internal.AddBroker(&internal.TopicRouteData{ + BrokerDataList: []*internal.BrokerData{ + { + Cluster: "cluster", + BrokerName: "aa", + BrokerAddresses: map[int64]string{ + 0: "1", + }, + }, + }, + }) +} + +func TestSync(t *testing.T) { + p, _ := NewDefaultProducer( + WithNameServer([]string{"127.0.0.1:9876"}), + WithRetry(2), + WithQueueSelector(NewManualQueueSelector()), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := internal.NewMockRMQClient(ctrl) + p.client = client + + client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return() + client.EXPECT().Start().Return() + err := p.Start() + assert.Nil(t, err) + + ctx := context.Background() + msg := &primitive.Message{ + Topic: topic, + Body: []byte("this is a message body"), + Properties: map[string]string{"key": "value"}, + QueueID: 0, + } + + expectedResp := &primitive.SendResult{ + Status: primitive.SendOK, + MsgID: "111", + QueueOffset: 0, + OffsetMsgID: "0", + } + + mockB4Send(p) + + client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil) + client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { + resp.Status = expectedResp.Status + resp.MsgID = expectedResp.MsgID + resp.QueueOffset = expectedResp.QueueOffset + resp.OffsetMsgID = expectedResp.OffsetMsgID + }) + resp, err := p.SendSync(ctx, msg) + assert.Nil(t, err) + assert.Equal(t, expectedResp, resp) +} + +func TestASync(t *testing.T) { + p, _ := NewDefaultProducer( + WithNameServer([]string{"127.0.0.1:9876"}), + WithRetry(2), + WithQueueSelector(NewManualQueueSelector()), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := internal.NewMockRMQClient(ctrl) + p.client = client + + client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return() + client.EXPECT().Start().Return() + err := p.Start() + assert.Nil(t, err) + + ctx := context.Background() + msg := &primitive.Message{ + Topic: topic, + Body: []byte("this is a message body"), + Properties: map[string]string{"key": "value"}, + } + + expectedResp := &primitive.SendResult{ + Status: primitive.SendOK, + MsgID: "111", + QueueOffset: 0, + OffsetMsgID: "0", + } + + f := func(ctx context.Context, resp *primitive.SendResult, err error) { + assert.Nil(t, err) + assert.Equal(t, expectedResp, resp) + } + + mockB4Send(p) + + client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(addr string, request *remote.RemotingCommand, + timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error { + // mock invoke callback + f(nil, nil) + return nil + }) + client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do( + func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message) { + resp.Status = expectedResp.Status + resp.MsgID = expectedResp.MsgID + resp.QueueOffset = expectedResp.QueueOffset + resp.OffsetMsgID = expectedResp.OffsetMsgID + }) + + err = p.SendAsync(ctx, f, msg) + assert.Nil(t, err) +} + +func TestOneway(t *testing.T) { + p, _ := NewDefaultProducer( + WithNameServer([]string{"127.0.0.1:9876"}), + WithRetry(2), + WithQueueSelector(NewManualQueueSelector()), + ) + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + client := internal.NewMockRMQClient(ctrl) + p.client = client + + client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return() + client.EXPECT().Start().Return() + err := p.Start() + assert.Nil(t, err) + + ctx := context.Background() + msg := &primitive.Message{ + Topic: topic, + Body: []byte("this is a message body"), + Properties: map[string]string{"key": "value"}, + } + + mockB4Send(p) + + client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes() + + err = p.SendOneWay(ctx, msg) + assert.Nil(t, err) +} diff --git a/producer/selector_test.go b/producer/selector_test.go index 016db07..723c55b 100644 --- a/producer/selector_test.go +++ b/producer/selector_test.go @@ -20,6 +20,7 @@ package producer import ( "testing" + "github.com/apache/rocketmq-client-go/primitive" "github.com/stretchr/testify/assert" ) @@ -27,10 +28,10 @@ func TestRoundRobin(t *testing.T) { queues := 10 s := NewRoundRobinQueueSelector() - m := &Message{ + m := &primitive.Message{ Topic: "test", } - mrr := &Message{ + mrr := &primitive.Message{ Topic: "rr", } for i := 0; i < 100; i++ {