rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #108] Add producer unit test (#109)
Date Wed, 10 Jul 2019 03:17:14 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 95f5b0d  [ISSUE #108] Add producer unit test (#109)
95f5b0d is described below

commit 95f5b0dd1ce2eaba64e1e1185c5407ef49bc5264
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Wed Jul 10 11:17:09 2019 +0800

    [ISSUE #108] Add producer unit test (#109)
    
    * add producer unit test. resolve #108
    
    * clean line
---
 consumer/consumer.go                  |   2 +-
 consumer/offset_store.go              |   4 +-
 consumer/pull_consumer.go             |   2 +-
 examples/producer/interceptor/main.go |   1 -
 go.mod                                |   2 +-
 go.sum                                |   5 +-
 internal/client.go                    | 106 ++++++---
 internal/client_test.go               |   2 +-
 internal/mock_client.go               | 411 ++++++++++++++++++++++++++++++++++
 internal/route.go                     |   7 +
 producer/producer.go                  |  10 +-
 producer/producer_test.go             | 234 +++++++++++++++++++
 producer/selector_test.go             |   5 +-
 13 files changed, 743 insertions(+), 48 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index a02602a..5bd9674 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -225,7 +225,7 @@ type defaultConsumer struct {
 	fromWhere      ConsumeFromWhere
 
 	cType     ConsumeType
-	client    *internal.RMQClient
+	client    internal.RMQClient
 	mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
 	state     internal.ServiceState
 	pause     bool
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index 5c55e27..45d80f2 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -180,11 +180,11 @@ func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue)
{
 type remoteBrokerOffsetStore struct {
 	group       string
 	OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
-	client      *internal.RMQClient
+	client      internal.RMQClient
 	mutex       sync.RWMutex
 }
 
-func NewRemoteOffsetStore(group string, client *internal.RMQClient) OffsetStore {
+func NewRemoteOffsetStore(group string, client internal.RMQClient) OffsetStore {
 	return &remoteBrokerOffsetStore{
 		group:       group,
 		client:      client,
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index 32cbaed..c8bcdc5 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -41,7 +41,7 @@ var (
 type defaultPullConsumer struct {
 	state     internal.ServiceState
 	option    consumerOptions
-	client    *internal.RMQClient
+	client    internal.RMQClient
 	GroupName string
 	Model     MessageModel
 	UnitMode  bool
diff --git a/examples/producer/interceptor/main.go b/examples/producer/interceptor/main.go
index 4822de9..643d2a8 100644
--- a/examples/producer/interceptor/main.go
+++ b/examples/producer/interceptor/main.go
@@ -42,7 +42,6 @@ func main() {
 	}
 	for i := 0; i < 10; i++ {
 		res, err := p.SendSync(context.Background(), &primitive.Message{
-			//Topic: "test",
 			Topic:      "TopicTest",
 			Body:       []byte("Hello RocketMQ Go Client!"),
 			Properties: map[string]string{"order": strconv.Itoa(i)},
diff --git a/go.mod b/go.mod
index 684a7d2..322c847 100644
--- a/go.mod
+++ b/go.mod
@@ -4,11 +4,11 @@ go 1.12
 
 require (
 	github.com/emirpasic/gods v1.12.0
+	github.com/golang/mock v1.3.1
 	github.com/pkg/errors v0.8.1
 	github.com/sirupsen/logrus v1.4.1
 	github.com/stretchr/testify v1.3.0
 	github.com/tidwall/gjson v1.2.1
 	github.com/tidwall/match v1.0.1 // indirect
 	github.com/tidwall/pretty v0.0.0-20190325153808-1166b9ac2b65 // indirect
-	golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 // indirect
 )
diff --git a/go.sum b/go.sum
index 3b0271b..06d0d34 100644
--- a/go.sum
+++ b/go.sum
@@ -3,6 +3,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
 github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
+github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
+github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -30,5 +32,4 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
-golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468 h1:fTfk6GjmihJbK0mSUFgPPgYpsdmApQ86Mcd4GuKax9U=
-golang.org/x/tools v0.0.0-20190606050223-4d9ae51c2468/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc=
+golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/internal/client.go b/internal/client.go
index a6e1bc5..1db43c7 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -118,7 +118,41 @@ func (opt *ClientOptions) String() string {
 		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.ACLEnabled)
 }
 
-type RMQClient struct {
+//go:generate mockgen -source client.go -destination mock_client.go --package internal RMQClient
+type RMQClient interface {
+	Start()
+	Shutdown()
+
+	ClientID() string
+
+	RegisterProducer(group string, producer InnerProducer)
+	InvokeSync(addr string, request *remote.RemotingCommand,
+		timeoutMillis time.Duration) (*remote.RemotingCommand, error)
+	InvokeAsync(addr string, request *remote.RemotingCommand,
+		timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error
+	InvokeOneWay(addr string, request *remote.RemotingCommand,
+		timeoutMillis time.Duration) error
+	CheckClientInBroker()
+	SendHeartbeatToAllBrokerWithLock()
+	UpdateTopicRouteInfo()
+	SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
+		msgs []*primitive.Message, f func(result *primitive.SendResult)) error
+	SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
+		msgs []*primitive.Message) (*primitive.SendResult, error)
+
+	ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult,
msgs ...*primitive.Message)
+
+	RegisterConsumer(group string, consumer InnerConsumer) error
+	UnregisterConsumer(group string)
+	PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult,
error)
+	PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f
func(result *primitive.PullResult)) error
+	RebalanceImmediately()
+	UpdatePublishInfo(topic string, data *TopicRouteData)
+}
+
+var _ RMQClient = new(rmqClient)
+
+type rmqClient struct {
 	option ClientOptions
 	// group -> InnerProducer
 	producerMap sync.Map
@@ -134,8 +168,8 @@ type RMQClient struct {
 
 var clientMap sync.Map
 
-func GetOrNewRocketMQClient(option ClientOptions) *RMQClient {
-	client := &RMQClient{
+func GetOrNewRocketMQClient(option ClientOptions) *rmqClient {
+	client := &rmqClient{
 		option:       option,
 		remoteClient: remote.NewRemotingClient(),
 	}
@@ -147,10 +181,10 @@ func GetOrNewRocketMQClient(option ClientOptions) *RMQClient {
 			return nil
 		})
 	}
-	return actual.(*RMQClient)
+	return actual.(*rmqClient)
 }
 
-func (c *RMQClient) Start() {
+func (c *rmqClient) Start() {
 	//ctx, cancel := context.WithCancel(context.Background())
 	//c.cancel = cancel
 	c.close = false
@@ -162,7 +196,7 @@ func (c *RMQClient) Start() {
 		go func() {
 			// delay
 			time.Sleep(50 * time.Millisecond)
-			for !c.close{
+			for !c.close {
 				c.UpdateTopicRouteInfo()
 				time.Sleep(_PullNameServerInterval)
 			}
@@ -170,7 +204,7 @@ func (c *RMQClient) Start() {
 
 		// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
 		go func() {
-			for !c.close{
+			for !c.close {
 				cleanOfflineBroker()
 				c.SendHeartbeatToAllBrokerWithLock()
 				time.Sleep(_HeartbeatBrokerInterval)
@@ -180,7 +214,7 @@ func (c *RMQClient) Start() {
 		// schedule persist offset
 		go func() {
 			//time.Sleep(10 * time.Second)
-			for !c.close{
+			for !c.close {
 				c.consumerMap.Range(func(key, value interface{}) bool {
 					consumer := value.(InnerConsumer)
 					consumer.PersistConsumerOffset()
@@ -191,7 +225,7 @@ func (c *RMQClient) Start() {
 		}()
 
 		go func() {
-			for !c.close{
+			for !c.close {
 				c.RebalanceImmediately()
 				time.Sleep(_RebalanceInterval)
 			}
@@ -199,12 +233,12 @@ func (c *RMQClient) Start() {
 	})
 }
 
-func (c *RMQClient) Shutdown() {
+func (c *rmqClient) Shutdown() {
 	c.remoteClient.ShutDown()
 	c.close = true
 }
 
-func (c *RMQClient) ClientID() string {
+func (c *rmqClient) ClientID() string {
 	id := c.option.ClientIP + "@" + c.option.InstanceName
 	if c.option.UnitName != "" {
 		id += "@" + c.option.UnitName
@@ -212,7 +246,7 @@ func (c *RMQClient) ClientID() string {
 	return id
 }
 
-func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeSync(addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration) (*remote.RemotingCommand, error) {
 	if c.close {
 		return nil, ErrServiceState
@@ -220,7 +254,7 @@ func (c *RMQClient) InvokeSync(addr string, request *remote.RemotingCommand,
 	return c.remoteClient.InvokeSync(addr, request, timeoutMillis)
 }
 
-func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeAsync(addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
 	if c.close {
 		return ErrServiceState
@@ -231,7 +265,7 @@ func (c *RMQClient) InvokeAsync(addr string, request *remote.RemotingCommand,
 
 }
 
-func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
+func (c *rmqClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
 	timeoutMillis time.Duration) error {
 	if c.close {
 		return ErrServiceState
@@ -239,11 +273,11 @@ func (c *RMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand,
 	return c.remoteClient.InvokeOneWay(addr, request, timeoutMillis)
 }
 
-func (c *RMQClient) CheckClientInBroker() {
+func (c *rmqClient) CheckClientInBroker() {
 }
 
 // TODO
-func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 	c.hbMutex.Lock()
 	defer c.hbMutex.Unlock()
 	hbData := &heartbeatData{
@@ -301,7 +335,7 @@ func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
 	})
 }
 
-func (c *RMQClient) UpdateTopicRouteInfo() {
+func (c *rmqClient) UpdateTopicRouteInfo() {
 	publishTopicSet := make(map[string]bool, 0)
 	c.producerMap.Range(func(key, value interface{}) bool {
 		producer := value.(InnerProducer)
@@ -328,17 +362,17 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
 	})
 
 	for topic := range subscribedTopicSet {
-		c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
+		c.updateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
 	}
 }
 
 // SendMessageAsync send message with batch by async
-func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
+func (c *rmqClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest,
 	msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
 	return nil
 }
 
-func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
+func (c *rmqClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*primitive.Message) (*primitive.SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
 	err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
@@ -348,7 +382,7 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs
string, r
 	return nil, err
 }
 
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
+func (c *rmqClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, resp
*primitive.SendResult, msgs ...*primitive.Message) {
 	var status primitive.SendStatus
 	switch cmd.Code {
 	case ResFlushDiskTimeout:
@@ -394,7 +428,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 }
 
 // PullMessage with sync
-func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest)
(*primitive.PullResult, error) {
+func (c *rmqClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest)
(*primitive.PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
 	res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
@@ -404,7 +438,7 @@ func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string,
request
 	return c.processPullResponse(res)
 }
 
-func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult,
error) {
+func (c *rmqClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult,
error) {
 
 	pullResult := &primitive.PullResult{}
 	switch response.Code {
@@ -426,7 +460,7 @@ func (c *RMQClient) processPullResponse(response *remote.RemotingCommand)
(*prim
 	return pullResult, nil
 }
 
-func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand)
{
+func (c *rmqClient) decodeCommandCustomHeader(pr *primitive.PullResult, cmd *remote.RemotingCommand)
{
 	v, exist := cmd.ExtFields["maxOffset"]
 	if exist {
 		pr.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
@@ -449,34 +483,34 @@ func (c *RMQClient) decodeCommandCustomHeader(pr *primitive.PullResult,
cmd *rem
 }
 
 // PullMessageAsync pull message async
-func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest,
f func(result *primitive.PullResult)) error {
+func (c *rmqClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest,
f func(result *primitive.PullResult)) error {
 	return nil
 }
 
-func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
+func (c *rmqClient) RegisterConsumer(group string, consumer InnerConsumer) error {
 	c.consumerMap.Store(group, consumer)
 	return nil
 }
 
-func (c *RMQClient) UnregisterConsumer(group string) {
+func (c *rmqClient) UnregisterConsumer(group string) {
 }
 
-func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+func (c *rmqClient) RegisterProducer(group string, producer InnerProducer) {
 	c.producerMap.Store(group, producer)
 }
 
-func (c *RMQClient) UnregisterProducer(group string) {
+func (c *rmqClient) UnregisterProducer(group string) {
 }
 
-func (c *RMQClient) SelectProducer(group string) InnerProducer {
+func (c *rmqClient) SelectProducer(group string) InnerProducer {
 	return nil
 }
 
-func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
+func (c *rmqClient) SelectConsumer(group string) InnerConsumer {
 	return nil
 }
 
-func (c *RMQClient) RebalanceImmediately() {
+func (c *rmqClient) RebalanceImmediately() {
 	c.consumerMap.Range(func(key, value interface{}) bool {
 		consumer := value.(InnerConsumer)
 		consumer.Rebalance()
@@ -484,7 +518,7 @@ func (c *RMQClient) RebalanceImmediately() {
 	})
 }
 
-func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
 	if data == nil {
 		return
 	}
@@ -500,7 +534,7 @@ func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData)
{
 	})
 }
 
-func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
+func (c *rmqClient) isNeedUpdatePublishInfo(topic string) bool {
 	var result bool
 	c.producerMap.Range(func(key, value interface{}) bool {
 		p := value.(InnerProducer)
@@ -513,7 +547,7 @@ func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
 	return result
 }
 
-func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+func (c *rmqClient) updateSubscribeInfo(topic string, data *TopicRouteData) {
 	if data == nil {
 		return
 	}
@@ -528,7 +562,7 @@ func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData)
{
 	})
 }
 
-func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
+func (c *rmqClient) isNeedUpdateSubscribeInfo(topic string) bool {
 	var result bool
 	c.consumerMap.Range(func(key, value interface{}) bool {
 		consumer := value.(InnerConsumer)
diff --git a/internal/client_test.go b/internal/client_test.go
index aafbca7..7814ddd 100644
--- a/internal/client_test.go
+++ b/internal/client_test.go
@@ -23,7 +23,7 @@ import (
 )
 
 func TestRMQClient_PullMessage(t *testing.T) {
-	client := GetOrNewRocketMQClient(ClientOption{})
+	client := GetOrNewRocketMQClient(ClientOptions{})
 	req := &PullMessageRequest{
 		ConsumerGroup:  "testGroup",
 		Topic:          "wenfeng",
diff --git a/internal/mock_client.go b/internal/mock_client.go
new file mode 100644
index 0000000..3ee3f50
--- /dev/null
+++ b/internal/mock_client.go
@@ -0,0 +1,411 @@
+// Code generated by MockGen. DO NOT EDIT.
+// Source: client.go
+
+// Package internal is a generated GoMock package.
+package internal
+
+import (
+	context "context"
+	remote "github.com/apache/rocketmq-client-go/internal/remote"
+	primitive "github.com/apache/rocketmq-client-go/primitive"
+	gomock "github.com/golang/mock/gomock"
+	reflect "reflect"
+	time "time"
+)
+
+// MockInnerProducer is a mock of InnerProducer interface
+type MockInnerProducer struct {
+	ctrl     *gomock.Controller
+	recorder *MockInnerProducerMockRecorder
+}
+
+// MockInnerProducerMockRecorder is the mock recorder for MockInnerProducer
+type MockInnerProducerMockRecorder struct {
+	mock *MockInnerProducer
+}
+
+// NewMockInnerProducer creates a new mock instance
+func NewMockInnerProducer(ctrl *gomock.Controller) *MockInnerProducer {
+	mock := &MockInnerProducer{ctrl: ctrl}
+	mock.recorder = &MockInnerProducerMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockInnerProducer) EXPECT() *MockInnerProducerMockRecorder {
+	return m.recorder
+}
+
+// PublishTopicList mocks base method
+func (m *MockInnerProducer) PublishTopicList() []string {
+	ret := m.ctrl.Call(m, "PublishTopicList")
+	ret0, _ := ret[0].([]string)
+	return ret0
+}
+
+// PublishTopicList indicates an expected call of PublishTopicList
+func (mr *MockInnerProducerMockRecorder) PublishTopicList() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PublishTopicList", reflect.TypeOf((*MockInnerProducer)(nil).PublishTopicList))
+}
+
+// UpdateTopicPublishInfo mocks base method
+func (m *MockInnerProducer) UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
{
+	m.ctrl.Call(m, "UpdateTopicPublishInfo", topic, info)
+}
+
+// UpdateTopicPublishInfo indicates an expected call of UpdateTopicPublishInfo
+func (mr *MockInnerProducerMockRecorder) UpdateTopicPublishInfo(topic, info interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicPublishInfo", reflect.TypeOf((*MockInnerProducer)(nil).UpdateTopicPublishInfo),
topic, info)
+}
+
+// IsPublishTopicNeedUpdate mocks base method
+func (m *MockInnerProducer) IsPublishTopicNeedUpdate(topic string) bool {
+	ret := m.ctrl.Call(m, "IsPublishTopicNeedUpdate", topic)
+	ret0, _ := ret[0].(bool)
+	return ret0
+}
+
+// IsPublishTopicNeedUpdate indicates an expected call of IsPublishTopicNeedUpdate
+func (mr *MockInnerProducerMockRecorder) IsPublishTopicNeedUpdate(topic interface{}) *gomock.Call
{
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPublishTopicNeedUpdate", reflect.TypeOf((*MockInnerProducer)(nil).IsPublishTopicNeedUpdate),
topic)
+}
+
+// IsUnitMode mocks base method
+func (m *MockInnerProducer) IsUnitMode() bool {
+	ret := m.ctrl.Call(m, "IsUnitMode")
+	ret0, _ := ret[0].(bool)
+	return ret0
+}
+
+// IsUnitMode indicates an expected call of IsUnitMode
+func (mr *MockInnerProducerMockRecorder) IsUnitMode() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerProducer)(nil).IsUnitMode))
+}
+
+// MockInnerConsumer is a mock of InnerConsumer interface
+type MockInnerConsumer struct {
+	ctrl     *gomock.Controller
+	recorder *MockInnerConsumerMockRecorder
+}
+
+// MockInnerConsumerMockRecorder is the mock recorder for MockInnerConsumer
+type MockInnerConsumerMockRecorder struct {
+	mock *MockInnerConsumer
+}
+
+// NewMockInnerConsumer creates a new mock instance
+func NewMockInnerConsumer(ctrl *gomock.Controller) *MockInnerConsumer {
+	mock := &MockInnerConsumer{ctrl: ctrl}
+	mock.recorder = &MockInnerConsumerMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockInnerConsumer) EXPECT() *MockInnerConsumerMockRecorder {
+	return m.recorder
+}
+
+// PersistConsumerOffset mocks base method
+func (m *MockInnerConsumer) PersistConsumerOffset() {
+	m.ctrl.Call(m, "PersistConsumerOffset")
+}
+
+// PersistConsumerOffset indicates an expected call of PersistConsumerOffset
+func (mr *MockInnerConsumerMockRecorder) PersistConsumerOffset() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PersistConsumerOffset", reflect.TypeOf((*MockInnerConsumer)(nil).PersistConsumerOffset))
+}
+
+// UpdateTopicSubscribeInfo mocks base method
+func (m *MockInnerConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
{
+	m.ctrl.Call(m, "UpdateTopicSubscribeInfo", topic, mqs)
+}
+
+// UpdateTopicSubscribeInfo indicates an expected call of UpdateTopicSubscribeInfo
+func (mr *MockInnerConsumerMockRecorder) UpdateTopicSubscribeInfo(topic, mqs interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicSubscribeInfo", reflect.TypeOf((*MockInnerConsumer)(nil).UpdateTopicSubscribeInfo),
topic, mqs)
+}
+
+// IsSubscribeTopicNeedUpdate mocks base method
+func (m *MockInnerConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+	ret := m.ctrl.Call(m, "IsSubscribeTopicNeedUpdate", topic)
+	ret0, _ := ret[0].(bool)
+	return ret0
+}
+
+// IsSubscribeTopicNeedUpdate indicates an expected call of IsSubscribeTopicNeedUpdate
+func (mr *MockInnerConsumerMockRecorder) IsSubscribeTopicNeedUpdate(topic interface{}) *gomock.Call
{
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSubscribeTopicNeedUpdate", reflect.TypeOf((*MockInnerConsumer)(nil).IsSubscribeTopicNeedUpdate),
topic)
+}
+
+// SubscriptionDataList mocks base method
+func (m *MockInnerConsumer) SubscriptionDataList() []*SubscriptionData {
+	ret := m.ctrl.Call(m, "SubscriptionDataList")
+	ret0, _ := ret[0].([]*SubscriptionData)
+	return ret0
+}
+
+// SubscriptionDataList indicates an expected call of SubscriptionDataList
+func (mr *MockInnerConsumerMockRecorder) SubscriptionDataList() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscriptionDataList", reflect.TypeOf((*MockInnerConsumer)(nil).SubscriptionDataList))
+}
+
+// Rebalance mocks base method
+func (m *MockInnerConsumer) Rebalance() {
+	m.ctrl.Call(m, "Rebalance")
+}
+
+// Rebalance indicates an expected call of Rebalance
+func (mr *MockInnerConsumerMockRecorder) Rebalance() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Rebalance", reflect.TypeOf((*MockInnerConsumer)(nil).Rebalance))
+}
+
+// IsUnitMode mocks base method
+func (m *MockInnerConsumer) IsUnitMode() bool {
+	ret := m.ctrl.Call(m, "IsUnitMode")
+	ret0, _ := ret[0].(bool)
+	return ret0
+}
+
+// IsUnitMode indicates an expected call of IsUnitMode
+func (mr *MockInnerConsumerMockRecorder) IsUnitMode() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsUnitMode", reflect.TypeOf((*MockInnerConsumer)(nil).IsUnitMode))
+}
+
+// MockRMQClient is a mock of RMQClient interface
+type MockRMQClient struct {
+	ctrl     *gomock.Controller
+	recorder *MockRMQClientMockRecorder
+}
+
+// MockRMQClientMockRecorder is the mock recorder for MockRMQClient
+type MockRMQClientMockRecorder struct {
+	mock *MockRMQClient
+}
+
+// NewMockRMQClient creates a new mock instance
+func NewMockRMQClient(ctrl *gomock.Controller) *MockRMQClient {
+	mock := &MockRMQClient{ctrl: ctrl}
+	mock.recorder = &MockRMQClientMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use
+func (m *MockRMQClient) EXPECT() *MockRMQClientMockRecorder {
+	return m.recorder
+}
+
+// Start mocks base method
+func (m *MockRMQClient) Start() {
+	m.ctrl.Call(m, "Start")
+}
+
+// Start indicates an expected call of Start
+func (mr *MockRMQClientMockRecorder) Start() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Start", reflect.TypeOf((*MockRMQClient)(nil).Start))
+}
+
+// Shutdown mocks base method
+func (m *MockRMQClient) Shutdown() {
+	m.ctrl.Call(m, "Shutdown")
+}
+
+// Shutdown indicates an expected call of Shutdown
+func (mr *MockRMQClientMockRecorder) Shutdown() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Shutdown", reflect.TypeOf((*MockRMQClient)(nil).Shutdown))
+}
+
+// ClientID mocks base method
+func (m *MockRMQClient) ClientID() string {
+	ret := m.ctrl.Call(m, "ClientID")
+	ret0, _ := ret[0].(string)
+	return ret0
+}
+
+// ClientID indicates an expected call of ClientID
+func (mr *MockRMQClientMockRecorder) ClientID() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClientID", reflect.TypeOf((*MockRMQClient)(nil).ClientID))
+}
+
+// RegisterProducer mocks base method
+func (m *MockRMQClient) RegisterProducer(group string, producer InnerProducer) {
+	m.ctrl.Call(m, "RegisterProducer", group, producer)
+}
+
+// RegisterProducer indicates an expected call of RegisterProducer
+func (mr *MockRMQClientMockRecorder) RegisterProducer(group, producer interface{}) *gomock.Call
{
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterProducer", reflect.TypeOf((*MockRMQClient)(nil).RegisterProducer),
group, producer)
+}
+
+// InvokeSync mocks base method
+func (m *MockRMQClient) InvokeSync(addr string, request *remote.RemotingCommand, timeoutMillis
time.Duration) (*remote.RemotingCommand, error) {
+	ret := m.ctrl.Call(m, "InvokeSync", addr, request, timeoutMillis)
+	ret0, _ := ret[0].(*remote.RemotingCommand)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// InvokeSync indicates an expected call of InvokeSync
+func (mr *MockRMQClientMockRecorder) InvokeSync(addr, request, timeoutMillis interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeSync", reflect.TypeOf((*MockRMQClient)(nil).InvokeSync),
addr, request, timeoutMillis)
+}
+
+// InvokeAsync mocks base method
+func (m *MockRMQClient) InvokeAsync(addr string, request *remote.RemotingCommand, timeoutMillis
time.Duration, f func(*remote.RemotingCommand, error)) error {
+	ret := m.ctrl.Call(m, "InvokeAsync", addr, request, timeoutMillis, f)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// InvokeAsync indicates an expected call of InvokeAsync
+func (mr *MockRMQClientMockRecorder) InvokeAsync(addr, request, timeoutMillis, f interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeAsync", reflect.TypeOf((*MockRMQClient)(nil).InvokeAsync),
addr, request, timeoutMillis, f)
+}
+
+// InvokeOneWay mocks base method
+func (m *MockRMQClient) InvokeOneWay(addr string, request *remote.RemotingCommand, timeoutMillis
time.Duration) error {
+	ret := m.ctrl.Call(m, "InvokeOneWay", addr, request, timeoutMillis)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// InvokeOneWay indicates an expected call of InvokeOneWay
+func (mr *MockRMQClientMockRecorder) InvokeOneWay(addr, request, timeoutMillis interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InvokeOneWay", reflect.TypeOf((*MockRMQClient)(nil).InvokeOneWay),
addr, request, timeoutMillis)
+}
+
+// CheckClientInBroker mocks base method
+func (m *MockRMQClient) CheckClientInBroker() {
+	m.ctrl.Call(m, "CheckClientInBroker")
+}
+
+// CheckClientInBroker indicates an expected call of CheckClientInBroker
+func (mr *MockRMQClientMockRecorder) CheckClientInBroker() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckClientInBroker", reflect.TypeOf((*MockRMQClient)(nil).CheckClientInBroker))
+}
+
+// SendHeartbeatToAllBrokerWithLock mocks base method
+func (m *MockRMQClient) SendHeartbeatToAllBrokerWithLock() {
+	m.ctrl.Call(m, "SendHeartbeatToAllBrokerWithLock")
+}
+
+// SendHeartbeatToAllBrokerWithLock indicates an expected call of SendHeartbeatToAllBrokerWithLock
+func (mr *MockRMQClientMockRecorder) SendHeartbeatToAllBrokerWithLock() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendHeartbeatToAllBrokerWithLock",
reflect.TypeOf((*MockRMQClient)(nil).SendHeartbeatToAllBrokerWithLock))
+}
+
+// UpdateTopicRouteInfo mocks base method
+func (m *MockRMQClient) UpdateTopicRouteInfo() {
+	m.ctrl.Call(m, "UpdateTopicRouteInfo")
+}
+
+// UpdateTopicRouteInfo indicates an expected call of UpdateTopicRouteInfo
+func (mr *MockRMQClientMockRecorder) UpdateTopicRouteInfo() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateTopicRouteInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdateTopicRouteInfo))
+}
+
+// SendMessageAsync mocks base method
+func (m *MockRMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
request *SendMessageRequest, msgs []*primitive.Message, f func(*primitive.SendResult)) error
{
+	ret := m.ctrl.Call(m, "SendMessageAsync", ctx, brokerAddrs, brokerName, request, msgs, f)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// SendMessageAsync indicates an expected call of SendMessageAsync
+func (mr *MockRMQClientMockRecorder) SendMessageAsync(ctx, brokerAddrs, brokerName, request,
msgs, f interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).SendMessageAsync),
ctx, brokerAddrs, brokerName, request, msgs, f)
+}
+
+// SendMessageOneWay mocks base method
+func (m *MockRMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMessageRequest, msgs []*primitive.Message) (*primitive.SendResult, error) {
+	ret := m.ctrl.Call(m, "SendMessageOneWay", ctx, brokerAddrs, request, msgs)
+	ret0, _ := ret[0].(*primitive.SendResult)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// SendMessageOneWay indicates an expected call of SendMessageOneWay
+func (mr *MockRMQClientMockRecorder) SendMessageOneWay(ctx, brokerAddrs, request, msgs interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SendMessageOneWay", reflect.TypeOf((*MockRMQClient)(nil).SendMessageOneWay),
ctx, brokerAddrs, request, msgs)
+}
+
+// ProcessSendResponse mocks base method
+func (m *MockRMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand,
resp *primitive.SendResult, msgs ...*primitive.Message) {
+	varargs := []interface{}{brokerName, cmd, resp}
+	for _, a := range msgs {
+		varargs = append(varargs, a)
+	}
+	m.ctrl.Call(m, "ProcessSendResponse", varargs...)
+}
+
+// ProcessSendResponse indicates an expected call of ProcessSendResponse
+func (mr *MockRMQClientMockRecorder) ProcessSendResponse(brokerName, cmd, resp interface{},
msgs ...interface{}) *gomock.Call {
+	varargs := append([]interface{}{brokerName, cmd, resp}, msgs...)
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProcessSendResponse", reflect.TypeOf((*MockRMQClient)(nil).ProcessSendResponse),
varargs...)
+}
+
+// RegisterConsumer mocks base method
+func (m *MockRMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
+	ret := m.ctrl.Call(m, "RegisterConsumer", group, consumer)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// RegisterConsumer indicates an expected call of RegisterConsumer
+func (mr *MockRMQClientMockRecorder) RegisterConsumer(group, consumer interface{}) *gomock.Call
{
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).RegisterConsumer),
group, consumer)
+}
+
+// UnregisterConsumer mocks base method
+func (m *MockRMQClient) UnregisterConsumer(group string) {
+	m.ctrl.Call(m, "UnregisterConsumer", group)
+}
+
+// UnregisterConsumer indicates an expected call of UnregisterConsumer
+func (mr *MockRMQClientMockRecorder) UnregisterConsumer(group interface{}) *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UnregisterConsumer", reflect.TypeOf((*MockRMQClient)(nil).UnregisterConsumer),
group)
+}
+
+// PullMessage mocks base method
+func (m *MockRMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest)
(*primitive.PullResult, error) {
+	ret := m.ctrl.Call(m, "PullMessage", ctx, brokerAddrs, request)
+	ret0, _ := ret[0].(*primitive.PullResult)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// PullMessage indicates an expected call of PullMessage
+func (mr *MockRMQClientMockRecorder) PullMessage(ctx, brokerAddrs, request interface{}) *gomock.Call
{
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessage", reflect.TypeOf((*MockRMQClient)(nil).PullMessage),
ctx, brokerAddrs, request)
+}
+
+// PullMessageAsync mocks base method
+func (m *MockRMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request
*PullMessageRequest, f func(*primitive.PullResult)) error {
+	ret := m.ctrl.Call(m, "PullMessageAsync", ctx, brokerAddrs, request, f)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// PullMessageAsync indicates an expected call of PullMessageAsync
+func (mr *MockRMQClientMockRecorder) PullMessageAsync(ctx, brokerAddrs, request, f interface{})
*gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PullMessageAsync", reflect.TypeOf((*MockRMQClient)(nil).PullMessageAsync),
ctx, brokerAddrs, request, f)
+}
+
+// RebalanceImmediately mocks base method
+func (m *MockRMQClient) RebalanceImmediately() {
+	m.ctrl.Call(m, "RebalanceImmediately")
+}
+
+// RebalanceImmediately indicates an expected call of RebalanceImmediately
+func (mr *MockRMQClientMockRecorder) RebalanceImmediately() *gomock.Call {
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RebalanceImmediately", reflect.TypeOf((*MockRMQClient)(nil).RebalanceImmediately))
+}
+
+// UpdatePublishInfo mocks base method
+func (m *MockRMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+	m.ctrl.Call(m, "UpdatePublishInfo", topic, data)
+}
+
+// UpdatePublishInfo indicates an expected call of UpdatePublishInfo
+func (mr *MockRMQClientMockRecorder) UpdatePublishInfo(topic, data interface{}) *gomock.Call
{
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdatePublishInfo", reflect.TypeOf((*MockRMQClient)(nil).UpdatePublishInfo),
topic, data)
+}
diff --git a/internal/route.go b/internal/route.go
index 074cf90..dbb39c1 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -157,6 +157,13 @@ func UpdateTopicRouteInfo(topic string) *TopicRouteData {
 	return routeData.clone()
 }
 
+// just for test
+func AddBroker(routeData *TopicRouteData) {
+	for _, brokerData := range routeData.BrokerDataList {
+		brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
+	}
+}
+
 func FindBrokerAddrByTopic(topic string) string {
 	v, exist := routeDataMap.Load(topic)
 	if !exist {
diff --git a/producer/producer.go b/producer/producer.go
index 47c4026..ab501de 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -33,6 +33,7 @@ import (
 var (
 	ErrTopicEmpty   = errors.New("topic is nil")
 	ErrMessageEmpty = errors.New("message is nil")
+	ErrNotRunning   = errors.New("producer not started")
 )
 
 func NewDefaultProducer(opts ...Option) (*defaultProducer, error) {
@@ -84,7 +85,7 @@ func getChainedInterceptor(interceptors []primitive.Interceptor, cur int,
finalI
 
 type defaultProducer struct {
 	group       string
-	client      *internal.RMQClient
+	client      internal.RMQClient
 	state       internal.ServiceState
 	options     producerOptions
 	publishInfo sync.Map
@@ -100,10 +101,16 @@ func (p *defaultProducer) Start() error {
 }
 
 func (p *defaultProducer) Shutdown() error {
+	p.state = internal.StateShutdown
+	p.client.Shutdown()
 	return nil
 }
 
 func (p *defaultProducer) checkMsg(msg *primitive.Message) error {
+	if p.state != internal.StateRunning {
+		return ErrNotRunning
+	}
+
 	if msg == nil {
 		return errors.New("message is nil")
 	}
@@ -242,6 +249,7 @@ func (p *defaultProducer) sendOneWay(ctx context.Context, msg *primitive.Message
 			err = _err
 			continue
 		}
+		return nil
 	}
 	return err
 }
diff --git a/producer/producer_test.go b/producer/producer_test.go
new file mode 100644
index 0000000..0dd564f
--- /dev/null
+++ b/producer/producer_test.go
@@ -0,0 +1,234 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package producer
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/internal"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/golang/mock/gomock"
+	"github.com/stretchr/testify/assert"
+)
+
+const (
+	topic = "TopicTest"
+)
+
+func TestShutdown(t *testing.T) {
+	p, _ := NewDefaultProducer(
+		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithRetry(2),
+		WithQueueSelector(NewManualQueueSelector()),
+	)
+
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	client := internal.NewMockRMQClient(ctrl)
+	p.client = client
+
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().Start().Return()
+	err := p.Start()
+	assert.Nil(t, err)
+
+	client.EXPECT().Shutdown().Return()
+	err = p.Shutdown()
+	assert.Nil(t, err)
+
+	ctx := context.Background()
+	msg := new(primitive.Message)
+
+	r, err := p.SendSync(ctx, msg)
+	assert.Equal(t, ErrNotRunning, err)
+	assert.Nil(t, r)
+
+	err = p.SendOneWay(ctx, msg)
+	assert.Equal(t, ErrNotRunning, err)
+	assert.Nil(t, r)
+
+	f := func(context.Context, *primitive.SendResult, error) {
+		assert.False(t, true, "should not  come in")
+	}
+	err = p.SendAsync(ctx, f, msg)
+	assert.Equal(t, ErrNotRunning, err)
+	assert.Nil(t, r)
+}
+
+func mockB4Send(p *defaultProducer) {
+	p.publishInfo.Store(topic, &internal.TopicPublishInfo{
+		HaveTopicRouterInfo: true,
+		MqList: []*primitive.MessageQueue{
+			{
+				Topic:      topic,
+				BrokerName: "aa",
+				QueueId:    0,
+			},
+		},
+	})
+	internal.AddBroker(&internal.TopicRouteData{
+		BrokerDataList: []*internal.BrokerData{
+			{
+				Cluster:    "cluster",
+				BrokerName: "aa",
+				BrokerAddresses: map[int64]string{
+					0: "1",
+				},
+			},
+		},
+	})
+}
+
+func TestSync(t *testing.T) {
+	p, _ := NewDefaultProducer(
+		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithRetry(2),
+		WithQueueSelector(NewManualQueueSelector()),
+	)
+
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	client := internal.NewMockRMQClient(ctrl)
+	p.client = client
+
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().Start().Return()
+	err := p.Start()
+	assert.Nil(t, err)
+
+	ctx := context.Background()
+	msg := &primitive.Message{
+		Topic:      topic,
+		Body:       []byte("this is a message body"),
+		Properties: map[string]string{"key": "value"},
+		QueueID:    0,
+	}
+
+	expectedResp := &primitive.SendResult{
+		Status:      primitive.SendOK,
+		MsgID:       "111",
+		QueueOffset: 0,
+		OffsetMsgID: "0",
+	}
+
+	mockB4Send(p)
+
+	client.EXPECT().InvokeSync(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil, nil)
+	client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
+		func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message)
{
+			resp.Status = expectedResp.Status
+			resp.MsgID = expectedResp.MsgID
+			resp.QueueOffset = expectedResp.QueueOffset
+			resp.OffsetMsgID = expectedResp.OffsetMsgID
+		})
+	resp, err := p.SendSync(ctx, msg)
+	assert.Nil(t, err)
+	assert.Equal(t, expectedResp, resp)
+}
+
+func TestASync(t *testing.T) {
+	p, _ := NewDefaultProducer(
+		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithRetry(2),
+		WithQueueSelector(NewManualQueueSelector()),
+	)
+
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	client := internal.NewMockRMQClient(ctrl)
+	p.client = client
+
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().Start().Return()
+	err := p.Start()
+	assert.Nil(t, err)
+
+	ctx := context.Background()
+	msg := &primitive.Message{
+		Topic:      topic,
+		Body:       []byte("this is a message body"),
+		Properties: map[string]string{"key": "value"},
+	}
+
+	expectedResp := &primitive.SendResult{
+		Status:      primitive.SendOK,
+		MsgID:       "111",
+		QueueOffset: 0,
+		OffsetMsgID: "0",
+	}
+
+	f := func(ctx context.Context, resp *primitive.SendResult, err error) {
+		assert.Nil(t, err)
+		assert.Equal(t, expectedResp, resp)
+	}
+
+	mockB4Send(p)
+
+	client.EXPECT().InvokeAsync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn(
+		func(addr string, request *remote.RemotingCommand,
+			timeoutMillis time.Duration, f func(*remote.RemotingCommand, error)) error {
+			// mock invoke callback
+			f(nil, nil)
+			return nil
+		})
+	client.EXPECT().ProcessSendResponse(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Do(
+		func(brokerName string, cmd *remote.RemotingCommand, resp *primitive.SendResult, msgs ...*primitive.Message)
{
+			resp.Status = expectedResp.Status
+			resp.MsgID = expectedResp.MsgID
+			resp.QueueOffset = expectedResp.QueueOffset
+			resp.OffsetMsgID = expectedResp.OffsetMsgID
+		})
+
+	err = p.SendAsync(ctx, f, msg)
+	assert.Nil(t, err)
+}
+
+func TestOneway(t *testing.T) {
+	p, _ := NewDefaultProducer(
+		WithNameServer([]string{"127.0.0.1:9876"}),
+		WithRetry(2),
+		WithQueueSelector(NewManualQueueSelector()),
+	)
+
+	ctrl := gomock.NewController(t)
+	defer ctrl.Finish()
+	client := internal.NewMockRMQClient(ctrl)
+	p.client = client
+
+	client.EXPECT().RegisterProducer(gomock.Any(), gomock.Any()).Return()
+	client.EXPECT().Start().Return()
+	err := p.Start()
+	assert.Nil(t, err)
+
+	ctx := context.Background()
+	msg := &primitive.Message{
+		Topic:      topic,
+		Body:       []byte("this is a message body"),
+		Properties: map[string]string{"key": "value"},
+	}
+
+	mockB4Send(p)
+
+	client.EXPECT().InvokeOneWay(gomock.Any(), gomock.Any(), gomock.Any()).Return(nil).AnyTimes()
+
+	err = p.SendOneWay(ctx, msg)
+	assert.Nil(t, err)
+}
diff --git a/producer/selector_test.go b/producer/selector_test.go
index 016db07..723c55b 100644
--- a/producer/selector_test.go
+++ b/producer/selector_test.go
@@ -20,6 +20,7 @@ package producer
 import (
 	"testing"
 
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -27,10 +28,10 @@ func TestRoundRobin(t *testing.T) {
 	queues := 10
 	s := NewRoundRobinQueueSelector()
 
-	m := &Message{
+	m := &primitive.Message{
 		Topic: "test",
 	}
-	mrr := &Message{
+	mrr := &primitive.Message{
 		Topic: "rr",
 	}
 	for i := 0; i < 100; i++ {


Mime
View raw message