rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #131]Update CI config
Date Mon, 29 Jul 2019 02:04:47 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 40111a9  [ISSUE #131]Update CI config
40111a9 is described below

commit 40111a9253ae3c37c028fccf0d45db7c05c45320
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Mon Jul 29 10:04:42 2019 +0800

    [ISSUE #131]Update CI config
    
    #131
---
 .gitignore                            |   4 +-
 .travis.yml                           |  19 ++---
 consumer/consumer_test.go             |   4 +-
 consumer/interceptor.go               |   2 +-
 consumer/lock.go                      |   3 +-
 consumer/push_consumer.go             |  24 +++---
 consumer/statistics.go                |   3 +-
 examples/producer/trace/main.go       |   2 +-
 go.mod                                |   2 -
 internal/client_test.go               |  45 ------------
 internal/mock_client.go               |   5 +-
 internal/remote/remote_client_test.go |  47 +++++++-----
 internal/route.go                     |   1 -
 internal/route_test.go                | 135 ----------------------------------
 primitive/result_test.go              |   3 +-
 producer/producer.go                  |   8 +-
 16 files changed, 66 insertions(+), 241 deletions(-)

diff --git a/.gitignore b/.gitignore
index cedd413..cb35ce2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,5 @@
 .idea
 go.mod
-go.sum
\ No newline at end of file
+go.sum
+vendor/
+coverage.txt
\ No newline at end of file
diff --git a/.travis.yml b/.travis.yml
index 1fbef35..82f760c 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,8 +1,8 @@
 language: go
 
 go:
-  - "1.10.x"
   - "1.11.x"
+  - "1.12.x"
 
 go_import_path: github.com/apache/rocketmq-client-go
 
@@ -12,16 +12,9 @@ env:
     - BROKER_ADDRESS=127.0.0.1:10911
     - TOPIC=test
     - GROUP=testGroup
+    - GO111MODULE=on
   matrix:
-  - OS_TYPE=centos OS_VERSION=7
-
-before_install:
-  - cd ${TRAVIS_HOME}
-  - wget https://opensource-rocketmq-client-us.oss-us-west-1.aliyuncs.com/cpp-client/linux/1.2.0/RHEL7.x/librocketmq.tar.gz
-  - tar -xzf librocketmq.tar.gz
-  - sudo cp librocketmq.so librocketmq.a /usr/local/lib/
-  - sudo cp -r rocketmq /usr/local/include/
-  - cd ${GOPATH}/src/github.com/apache/rocketmq-client-go
+    - OS_TYPE=centos OS_VERSION=7
 
 before_script:
   - cd ${TRAVIS_HOME}
@@ -36,5 +29,9 @@ before_script:
   - ./bin/mqadmin updateSubGroup -n ${NAME_SERVER_ADDRESS} -b ${BROKER_ADDRESS} -g ${GROUP}
 
 script:
-  - export LD_LIBRARY_PATH=/usr/local/lib
   - cd ${GOPATH}/src/github.com/apache/rocketmq-client-go
+  - go fmt ./... && [[ -z `git status -s` ]]
+  - go mod vendor && go test ./... -coverprofile=coverage.txt -covermode=atomic
+
+after_success:
+  - bash <(curl -s https://codecov.io/bash)
\ No newline at end of file
diff --git a/consumer/consumer_test.go b/consumer/consumer_test.go
index e433392..a837b24 100644
--- a/consumer/consumer_test.go
+++ b/consumer/consumer_test.go
@@ -26,7 +26,7 @@ import (
 
 func TestParseTimestamp(t *testing.T) {
 	layout := "20060102150405"
-	timestamp, err := time.ParseInLocation(layout, "20190430193409", time.Local)
+	timestamp, err := time.ParseInLocation(layout, "20190430193409", time.UTC)
 	assert.Nil(t, err)
-	assert.Equal(t, int64(1556624049), timestamp.Unix())
+	assert.Equal(t, int64(1556652849), timestamp.Unix())
 }
diff --git a/consumer/interceptor.go b/consumer/interceptor.go
index 260ac6f..6b050df 100644
--- a/consumer/interceptor.go
+++ b/consumer/interceptor.go
@@ -26,7 +26,7 @@ import (
 	"github.com/apache/rocketmq-client-go/primitive"
 )
 
-// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.

+// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
 func WithTrace(traceCfg primitive.TraceConfig) Option {
 	return func(options *consumerOptions) {
 
diff --git a/consumer/lock.go b/consumer/lock.go
index d923f2c..2376400 100644
--- a/consumer/lock.go
+++ b/consumer/lock.go
@@ -28,8 +28,7 @@ type QueueLock struct {
 }
 
 func newQueueLock() *QueueLock {
-	return &QueueLock{
-	}
+	return &QueueLock{}
 }
 
 func (ql QueueLock) fetchLock(queue primitive.MessageQueue) sync.Locker {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 7b4d65e..9878c56 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -696,10 +696,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq
*primitive.
 
 			var err error
 			msgCtx := &primitive.ConsumeMessageContext{
-				Properties: make(map[string]string),
+				Properties:    make(map[string]string),
 				ConsumerGroup: pc.consumerGroup,
-				MQ: mq,
-				Msgs: msgs,
+				MQ:            mq,
+				Msgs:          msgs,
 			}
 			ctx := context.Background()
 			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -717,7 +717,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 				msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
 			} else if result == ConsumeSuccess {
 				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
-			} else if result == ConsumeRetryLater{
+			} else if result == ConsumeRetryLater {
 				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
 			}
 
@@ -812,10 +812,10 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
 
 			ctx := context.Background()
 			msgCtx := &primitive.ConsumeMessageContext{
-				Properties: make(map[string]string),
+				Properties:    make(map[string]string),
 				ConsumerGroup: pc.consumerGroup,
-				MQ: mq,
-				Msgs: msgs,
+				MQ:            mq,
+				Msgs:          msgs,
 			}
 			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
 			ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
@@ -853,10 +853,10 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
 				case ConsumeSuccess:
 					commitOffset = pq.commit()
 				case SuspendCurrentQueueAMoment:
-					if (pc.checkReconsumeTimes(msgs)) {
+					if pc.checkReconsumeTimes(msgs) {
 						pq.putMessage(msgs...)
 						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
-						continueConsume = false;
+						continueConsume = false
 					} else {
 						commitOffset = pq.commit()
 					}
@@ -866,15 +866,15 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
 				switch result {
 				case ConsumeSuccess:
 				case Commit:
-					commitOffset = pq.commit();
+					commitOffset = pq.commit()
 				case Rollback:
 					// pq.rollback
 					time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
 					continueConsume = false
 				case SuspendCurrentQueueAMoment:
-					if (pc.checkReconsumeTimes(msgs)) {
+					if pc.checkReconsumeTimes(msgs) {
 						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
-						continueConsume = false;
+						continueConsume = false
 					}
 				default:
 				}
diff --git a/consumer/statistics.go b/consumer/statistics.go
index f01364a..b85e056 100644
--- a/consumer/statistics.go
+++ b/consumer/statistics.go
@@ -19,10 +19,11 @@ package consumer
 
 import (
 	"container/list"
-	"github.com/apache/rocketmq-client-go/rlog"
 	"sync"
 	"sync/atomic"
 	"time"
+
+	"github.com/apache/rocketmq-client-go/rlog"
 )
 
 var (
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
index 266c783..ac6983f 100644
--- a/examples/producer/trace/main.go
+++ b/examples/producer/trace/main.go
@@ -31,7 +31,7 @@ import (
 func main() {
 	namesrvs := []string{"127.0.0.1:9876"}
 	traceCfg := primitive.TraceConfig{
-		Access:   primitive.Local,
+		Access: primitive.Local,
 	}
 
 	p, _ := rocketmq.NewProducer(
diff --git a/go.mod b/go.mod
index e1b0d88..f271d91 100644
--- a/go.mod
+++ b/go.mod
@@ -1,7 +1,5 @@
 module github.com/apache/rocketmq-client-go
 
-go 1.12
-
 require (
 	github.com/emirpasic/gods v1.12.0
 	github.com/golang/mock v1.3.1
diff --git a/internal/client_test.go b/internal/client_test.go
deleted file mode 100644
index 7814ddd..0000000
--- a/internal/client_test.go
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package internal
-
-import (
-	"context"
-	"testing"
-)
-
-func TestRMQClient_PullMessage(t *testing.T) {
-	client := GetOrNewRocketMQClient(ClientOptions{})
-	req := &PullMessageRequest{
-		ConsumerGroup:  "testGroup",
-		Topic:          "wenfeng",
-		QueueId:        0,
-		QueueOffset:    0,
-		MaxMsgNums:     32,
-		SysFlag:        0x1 << 2,
-		SubExpression:  "*",
-		ExpressionType: "TAG",
-	}
-	res, err := client.PullMessage(context.Background(), "127.0.0.1:10911", req)
-	if err != nil {
-		t.Fatal(err.Error())
-	}
-
-	for _, a := range res.GetMessageExts() {
-		t.Log(string(a.Body))
-	}
-}
diff --git a/internal/mock_client.go b/internal/mock_client.go
index 825c9e4..d19f0e9 100644
--- a/internal/mock_client.go
+++ b/internal/mock_client.go
@@ -6,11 +6,12 @@ package internal
 
 import (
 	context "context"
+	reflect "reflect"
+	time "time"
+
 	remote "github.com/apache/rocketmq-client-go/internal/remote"
 	primitive "github.com/apache/rocketmq-client-go/primitive"
 	gomock "github.com/golang/mock/gomock"
-	reflect "reflect"
-	time "time"
 )
 
 // MockInnerProducer is a mock of InnerProducer interface
diff --git a/internal/remote/remote_client_test.go b/internal/remote/remote_client_test.go
index 06dcedc..ff6900e 100644
--- a/internal/remote/remote_client_test.go
+++ b/internal/remote/remote_client_test.go
@@ -19,7 +19,6 @@ package remote
 import (
 	"bytes"
 	"errors"
-	"github.com/apache/rocketmq-client-go/internal/utils"
 	"math/rand"
 	"net"
 	"reflect"
@@ -27,6 +26,8 @@ import (
 	"testing"
 	"time"
 
+	"github.com/apache/rocketmq-client-go/internal/utils"
+
 	"github.com/stretchr/testify/assert"
 )
 
@@ -86,11 +87,8 @@ func TestResponseFutureIsTimeout(t *testing.T) {
 	if future.isTimeout() != false {
 		t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", false, future.isTimeout())
 	}
-	time.Sleep(time.Duration(700) * time.Millisecond)
-	if future.isTimeout() != true {
-		t.Errorf("wrong ResponseFuture's istimeout. want=%t, got=%t", true, future.isTimeout())
-	}
-
+	time.Sleep(time.Duration(1000) * time.Millisecond)
+	assert.True(t, future.isTimeout(), "ResponseFuture's istimeout should be true")
 }
 
 func TestResponseFutureWaitResponse(t *testing.T) {
@@ -160,6 +158,8 @@ func TestCreateScanner(t *testing.T) {
 }
 
 func TestInvokeSync(t *testing.T) {
+	addr := ":3004"
+
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
 	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
@@ -168,19 +168,23 @@ func TestInvokeSync(t *testing.T) {
 	wg.Add(1)
 	client := NewRemotingClient()
 	go func() {
-		receiveCommand, err := client.InvokeSync(":3000",
+		receiveCommand, err := client.InvokeSync(addr,
 			clientSendRemtingCommand, time.Second)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		} else {
-			if !reflect.DeepEqual(&receiveCommand, &serverSendRemotingCommand) {
-				t.Errorf("remotingCommand prased in client is different from server. ")
-			}
+			assert.Equal(t, len(receiveCommand.ExtFields), 0)
+			assert.Equal(t, len(serverSendRemotingCommand.ExtFields), 0)
+			// in order to avoid the difference of ExtFields between the receiveCommand and serverSendRemotingCommand
+			// the ExtFields in receiveCommand is map[string]string(nil), but serverSendRemotingCommand
is map[string]string{}
+			receiveCommand.ExtFields = nil
+			serverSendRemotingCommand.ExtFields = nil
+			assert.Equal(t, receiveCommand, serverSendRemotingCommand, "remotingCommand prased in
client is different from server.")
 		}
 		wg.Done()
 	}()
 
-	l, err := net.Listen("tcp", ":3000")
+	l, err := net.Listen("tcp", addr)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -209,13 +213,15 @@ func TestInvokeSync(t *testing.T) {
 			if err != nil {
 				t.Fatalf("failed to write body to conneciton.")
 			}
-			return
+			goto done
 		}
 	}
-	wg.Done()
+done:
+	wg.Wait()
 }
 
 func TestInvokeAsync(t *testing.T) {
+	addr := ":3006"
 	var wg sync.WaitGroup
 	cnt := 50
 	wg.Add(cnt)
@@ -225,7 +231,7 @@ func TestInvokeAsync(t *testing.T) {
 			time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)
 			t.Logf("[Send: %d] asychronous message", index)
 			sendRemotingCommand := randomNewRemotingCommand()
-			err := client.InvokeAsync(":3000", sendRemotingCommand, time.Second, func(r *ResponseFuture)
{
+			err := client.InvokeAsync(addr, sendRemotingCommand, time.Second, func(r *ResponseFuture)
{
 				t.Logf("[Receive: %d] asychronous message response", index)
 				if string(sendRemotingCommand.Body) != string(r.ResponseCommand.Body) {
 					t.Errorf("wrong response message. want=%s, got=%s", string(sendRemotingCommand.Body),
@@ -239,7 +245,7 @@ func TestInvokeAsync(t *testing.T) {
 
 		}(i)
 	}
-	l, err := net.Listen("tcp", ":3000")
+	l, err := net.Listen("tcp", addr)
 	if err != nil {
 		t.Fatalf("failed to create tcp network. %s", err)
 	}
@@ -276,6 +282,8 @@ done:
 }
 
 func TestInvokeAsyncTimeout(t *testing.T) {
+	addr := ":3002"
+
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 	serverSendRemotingCommand := NewRemotingCommand(20, nil, []byte("Welcome native"))
 	serverSendRemotingCommand.Opaque = clientSendRemtingCommand.Opaque
@@ -289,7 +297,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
 	clientSend.Add(1)
 	go func() {
 		clientSend.Wait()
-		err := client.InvokeAsync(":3000", clientSendRemtingCommand,
+		err := client.InvokeAsync(addr, clientSendRemtingCommand,
 			time.Duration(1000), func(r *ResponseFuture) {
 				assert.NotNil(t, r.Err)
 				assert.Equal(t, utils.ErrRequestTimeout, r.Err)
@@ -298,7 +306,7 @@ func TestInvokeAsyncTimeout(t *testing.T) {
 		assert.Nil(t, err, "failed to invokeSync.")
 	}()
 
-	l, err := net.Listen("tcp", ":3000")
+	l, err := net.Listen("tcp", addr)
 	assert.Nil(t, err)
 	defer l.Close()
 	clientSend.Done()
@@ -323,20 +331,21 @@ done:
 }
 
 func TestInvokeOneWay(t *testing.T) {
+	addr := ":3008"
 	clientSendRemtingCommand := NewRemotingCommand(10, nil, []byte("Hello RocketMQ"))
 
 	var wg sync.WaitGroup
 	wg.Add(1)
 	client := NewRemotingClient()
 	go func() {
-		err := client.InvokeOneWay(":3000", clientSendRemtingCommand, 3*time.Second)
+		err := client.InvokeOneWay(addr, clientSendRemtingCommand, 3*time.Second)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		}
 		wg.Done()
 	}()
 
-	l, err := net.Listen("tcp", ":3000")
+	l, err := net.Listen("tcp", addr)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/internal/route.go b/internal/route.go
index 6053fd8..9c6caa9 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -65,7 +65,6 @@ var (
 	//subscribeInfoMap sync.Map
 	routeDataMap sync.Map
 	lockNamesrv  sync.Mutex
-
 )
 
 func cleanOfflineBroker() {
diff --git a/internal/route_test.go b/internal/route_test.go
deleted file mode 100644
index 9c3241e..0000000
--- a/internal/route_test.go
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package internal
-
-import (
-	. "github.com/smartystreets/goconvey/convey"
-	"testing"
-)
-
-const (
-	topic = "TopicTest"
-)
-
-func init() {
-	srvs := []string{"127.0.0.1:9876"}
-	namesrv, err := NewNamesrv(srvs...)
-	if err != nil {
-		panic("register namesrv fail")
-	}
-	RegisterNamsrv(namesrv)
-}
-
-func TestAddBroker(t *testing.T) {
-	Convey("Given a starting topic", t, func() {
-		remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
-		So(err, ShouldBeNil)
-		AddBroker(remoteRouteData)
-
-		Convey("brokerData from brokerAddressesMap by brokeName should be deep equal remoteBrokeData
from server", func() {
-			for _, remoteBrokerData := range remoteRouteData.BrokerDataList {
-				brokerName := remoteBrokerData.BrokerName
-				brokerData, ok := brokerAddressesMap.Load(brokerName)
-				So(ok, ShouldBeTrue)
-				So(brokerData, ShouldResemble, remoteBrokerData)
-			}
-		})
-	})
-}
-
-func TestUpdateTopicRouteInfo(t *testing.T) {
-	Convey("Given a starting topic", t, func() {
-		updatedRouteData := UpdateTopicRouteInfo(topic)
-
-		Convey("updatedRouteData should be deep equal remoteRouteData", func() {
-			remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
-			So(err, ShouldBeNil)
-			So(updatedRouteData, ShouldResemble, remoteRouteData)
-		})
-		Convey("updatedRouteData should be deep equal localRouteData", func() {
-			localRouteData, exist := routeDataMap.Load(topic)
-			So(exist, ShouldBeTrue)
-			So(updatedRouteData, ShouldResemble, localRouteData)
-		})
-	})
-}
-
-func TestFindBrokerAddrByTopic(t *testing.T) {
-	Convey("Given a starting topic", t, func() {
-		addr := FindBrokerAddrByTopic(topic)
-		remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
-		So(err, ShouldBeNil)
-		brokerAddrList := remoteRouteData.BrokerDataList
-
-		Convey("addr from FindBrokerAddrByTopic should be contained in remoteRouteData", func()
{
-			flag := false
-			for _, brokerData := range brokerAddrList {
-				for _, ba := range brokerData.BrokerAddresses {
-					if ba == addr {
-						flag = true
-						break
-					}
-				}
-			}
-			So(flag, ShouldBeTrue)
-		})
-	})
-}
-
-func TestFindBrokerAddrByName(t *testing.T) {
-	Convey("Given a starting topic", t, func() {
-		remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
-		So(err, ShouldBeNil)
-		brokerAddrList := remoteRouteData.BrokerDataList
-
-		Convey("addr from FindBrokerAddrByName should be equal remoteBrokerAddr from server", func()
{
-			for _, brokerData := range brokerAddrList {
-				brokerName := brokerData.BrokerName
-				addr := FindBrokerAddrByName(brokerName)
-				remoteBrokerAddr := brokerData.BrokerAddresses[MasterId]
-				So(addr, ShouldEqual, remoteBrokerAddr)
-			}
-		})
-	})
-}
-
-func TestFindBrokerAddressInSubscribe(t *testing.T) {
-	Convey("Given a starting topic", t, func() {
-		remoteRouteData, err := queryTopicRouteInfoFromServer(topic)
-		So(err, ShouldBeNil)
-		brokerAddrList := remoteRouteData.BrokerDataList
-
-		Convey("range BrokerAddress and compare them in turn", func() {
-			for _, brokerData := range brokerAddrList {
-				brokerName := brokerData.BrokerName
-				for id, ba := range brokerData.BrokerAddresses {
-					findBrokerRes := FindBrokerAddressInSubscribe(brokerName, id, true)
-					res := &FindBrokerResult{
-						BrokerAddr:    ba,
-						Slave:         false,
-						BrokerVersion: findBrokerVersion(brokerName, ba),
-					}
-					if id != MasterId {
-						res.Slave = true
-					}
-					So(findBrokerRes, ShouldResemble, res)
-				}
-			}
-		})
-	})
-}
diff --git a/primitive/result_test.go b/primitive/result_test.go
index a3fd5ab..9324aff 100644
--- a/primitive/result_test.go
+++ b/primitive/result_test.go
@@ -18,7 +18,6 @@ limitations under the License.
 package primitive
 
 import (
-	"strings"
 	"testing"
 
 	. "github.com/smartystreets/goconvey/convey"
@@ -43,7 +42,7 @@ func TestCreateMessageId(t *testing.T) {
 		id := createMessageId(b, port, offset)
 
 		Convey("generated messageId should be equal to expected", func() {
-			assert.Equal(t, strings.ToLower("0A5DE93A00002A9F0000000000430154"), id)
+			assert.Equal(t, "0A5DE93A00002A9F0000000000430154", id)
 		})
 	})
 
diff --git a/producer/producer.go b/producer/producer.go
index 13f7952..698924c 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -131,11 +131,11 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
 	if p.interceptor != nil {
 		primitive.WithMethod(ctx, primitive.SendSync)
 		producerCtx := &primitive.ProducerCtx{
-			ProducerGroup: p.group,
+			ProducerGroup:     p.group,
 			CommunicationMode: primitive.SendSync,
-			BornHost: utils.LocalIP,
-			Message: *msg,
-			SendResult: resp,
+			BornHost:          utils.LocalIP,
+			Message:           *msg,
+			SendResult:        resp,
 		}
 		ctx = primitive.WithProducerCtx(ctx, producerCtx)
 


Mime
View raw message