rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: Pull Consumer (#44)
Date Mon, 18 Mar 2019 06:43:33 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 93d4753  Pull Consumer (#44)
93d4753 is described below

commit 93d47535c09e68908eb0123b003f41227a975c3b
Author: wenfeng <sxian.wang@gmail.com>
AuthorDate: Mon Mar 18 14:43:29 2019 +0800

    Pull Consumer (#44)
    
    * fix bugs of remote & refactor logger
    
    * consumer can pull message from broker
---
 consumer.go                           |  41 ++++++++-
 consumer_test.go                      |  18 ++++
 core/producer.go                      |  10 +--
 core/push_consumer.go                 |   4 +-
 go.mod                                |   5 +-
 go.sum                                |   6 ++
 kernel/client.go                      |  70 ++++++++++++---
 kernel/model.go                       | 119 +++++++++++++++++++++++--
 kernel/request.go                     |  27 +++++-
 kernel/response.go                    |  13 +--
 kernel/route.go                       | 160 ++++++++++++++++++++--------------
 kernel/{response.go => route_test.go} |  27 +-----
 remote/client.go                      |   7 +-
 remote/codec.go                       |  44 +++++-----
 remote/codec_test.go                  |   1 -
 {utils => rlog}/log.go                |  71 ++++++++++++---
 utils/helper.go                       |  12 ++-
 utils/helper_test.go                  |   9 +-
 utils/ring_buffer.go                  |  19 ++--
 utils/ring_buffer_test.go             |  33 +++----
 20 files changed, 496 insertions(+), 200 deletions(-)

diff --git a/consumer.go b/consumer.go
index acb8f1d..5119f93 100644
--- a/consumer.go
+++ b/consumer.go
@@ -22,18 +22,25 @@ import (
 	"errors"
 	"fmt"
 	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/rlog"
 	"strconv"
 	"sync"
+	"sync/atomic"
 	"time"
 )
 
 type Consumer interface {
+	Start()
 	Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
 	SubscribeWithChan(topic, expression string) (chan *kernel.Message, error)
 	SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) ConsumeResult) error
 	ACK(msg *kernel.Message, result ConsumeResult)
 }
 
+var (
+	queueCounterTable sync.Map
+)
+
 type ConsumeResult int
 
 type ConsumerType int
@@ -42,6 +49,8 @@ const (
 	Original ConsumerType = iota
 	Orderly
 	Transaction
+
+	SubAll = "*"
 )
 
 type ConsumerConfig struct {
@@ -65,9 +74,12 @@ type defaultConsumer struct {
 	config ConsumerConfig
 }
 
+func (c *defaultConsumer) Start() {
+	c.state = kernel.Running
+}
+
 func (c *defaultConsumer) Pull(topic, expression string, numbers int) (*kernel.PullResult,
error) {
 	mq := getNextQueueOf(topic)
-
 	if mq == nil {
 		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
 	}
@@ -210,11 +222,33 @@ func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult,
data
 }
 
 func getSubscriptionData(mq *kernel.MessageQueue, exp string) *kernel.SubscriptionData {
-	return nil
+	subData := &kernel.SubscriptionData{
+		Topic: mq.Topic,
+	}
+	if exp == "" || exp == SubAll {
+		subData.SubString = SubAll
+	} else {
+		// TODO
+	}
+	return subData
 }
 
 func getNextQueueOf(topic string) *kernel.MessageQueue {
-	return nil
+	queues, err := kernel.FetchSubscribeMessageQueues(topic)
+	if err != nil && len(queues) > 0 {
+		rlog.Error(err.Error())
+		return nil
+	}
+	var index int64
+	v, exist := queueCounterTable.Load(topic)
+	if !exist {
+		index = -1
+		queueCounterTable.Store(topic, 0)
+	} else {
+		index = v.(int64)
+	}
+
+	return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
 }
 
 func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
@@ -248,7 +282,6 @@ func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
 	if result == nil {
 		kernel.UpdateTopicRouteInfo(mq.Topic)
 	}
-
 	return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq),
false)
 }
 
diff --git a/consumer_test.go b/consumer_test.go
new file mode 100644
index 0000000..aaba3cb
--- /dev/null
+++ b/consumer_test.go
@@ -0,0 +1,18 @@
+package rocketmq
+
+import (
+	"fmt"
+	"testing"
+)
+
+func TestDefaultConsumer_Pull(t *testing.T) {
+	consumer := NewConsumer(ConsumerConfig{
+		GroupName: "testGroup",
+	})
+	consumer.Start()
+	result, err := consumer.Pull("test", "*", 32)
+	if err != nil {
+		t.Fatal(err.Error())
+	}
+	fmt.Println(len(result.GetMessageExts()))
+}
diff --git a/core/producer.go b/core/producer.go
index 7df57e6..22b49ed 100644
--- a/core/producer.go
+++ b/core/producer.go
@@ -33,7 +33,7 @@ int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey)
{
 import "C"
 import (
 	"errors"
-	log "github.com/sirupsen/logrus"
+	"github.com/apache/rocketmq-client-go/rlog"
 	"unsafe"
 )
 
@@ -211,7 +211,7 @@ func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult,
error) {
 	err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr))
 
 	if err != NIL {
-		log.Warnf("send message error, error is: %s", err.Error())
+		rlog.Warnf("send message error, error is: %s", err.Error())
 		return nil, err
 	}
 
@@ -237,7 +237,7 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue
 		&sr))
 
 	if err != NIL {
-		log.Warnf("send message orderly error, error is: %s", err.Error())
+		rlog.Warnf("send message orderly error, error is: %s", err.Error())
 		return nil, err
 	}
 
@@ -254,10 +254,10 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error {
 
 	err := rmqError(C.SendMessageOneway(p.cproduer, cmsg))
 	if err != NIL {
-		log.Warnf("send message with oneway error, error is: %s", err.Error())
+		rlog.Warnf("send message with oneway error, error is: %s", err.Error())
 		return err
 	}
 
-	log.Debugf("Send Message: %s with oneway success.", msg.String())
+	rlog.Debugf("Send Message: %s with oneway success.", msg.String())
 	return nil
 }
diff --git a/core/push_consumer.go b/core/push_consumer.go
index c222587..afb0d06 100644
--- a/core/push_consumer.go
+++ b/core/push_consumer.go
@@ -34,7 +34,7 @@ import "C"
 import (
 	"errors"
 	"fmt"
-	log "github.com/sirupsen/logrus"
+	"github.com/apache/rocketmq-client-go/rlog"
 	"sync"
 	"unsafe"
 )
@@ -226,6 +226,6 @@ func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc
fu
 		return err
 	}
 	c.funcsMap.Store(topic, consumeFunc)
-	log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
+	rlog.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression)
 	return nil
 }
diff --git a/go.mod b/go.mod
index fa910f6..2701b7c 100644
--- a/go.mod
+++ b/go.mod
@@ -1,11 +1,14 @@
 module github.com/apache/rocketmq-client-go
 
-go 1.12
+go 1.11
 
 require (
 	github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
 	github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
 	github.com/sirupsen/logrus v1.3.0
 	github.com/stretchr/testify v1.3.0
+	github.com/tidwall/gjson v1.2.1
+	github.com/tidwall/match v1.0.1 // indirect
+	github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
 	gopkg.in/alecthomas/kingpin.v2 v2.2.6
 )
diff --git a/go.sum b/go.sum
index 0b0e0c1..7a45ece 100644
--- a/go.sum
+++ b/go.sum
@@ -16,6 +16,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174=
+github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA=
+github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc=
+github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E=
+github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 h1:BP2bjP495BBPaBcS5rmqviTfrOkN5rO5ceKAMRZCRFc=
+github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
diff --git a/kernel/client.go b/kernel/client.go
index 6e4808a..f9185ac 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -20,7 +20,9 @@ package kernel
 import (
 	"context"
 	"errors"
+	"fmt"
 	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 	"os"
 	"strconv"
@@ -34,7 +36,6 @@ const (
 )
 
 var (
-	log                           = utils.RLog
 	namesrvAddrs                  = os.Getenv("rocketmq.namesrv.addr")
 	clientIP                      = utils.LocalIP()
 	instanceName                  = os.Getenv("rocketmq.client.name")
@@ -71,10 +72,10 @@ type InnerConsumer interface {
 // SendMessage with batch by sync
 func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
-	cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
-	response, err := remote.InvokeSync(brokerAddrs, cmd, 3 * time.Second)
+	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
+	response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
-		log.Warningf("send messages with sync error: %v", err)
+		rlog.Warnf("send messages with sync error: %v", err)
 		return nil, err
 	}
 
@@ -89,10 +90,10 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string,
reque
 
 func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
-	cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs))
+	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
 	err := remote.InvokeOneWay(brokerAddrs, cmd)
 	if err != nil {
-		log.Warningf("send messages with oneway error: %v", err)
+		rlog.Warnf("send messages with oneway error: %v", err)
 	}
 	return nil, err
 }
@@ -100,13 +101,13 @@ func SendMessageOneWay(ctx context.Context, brokerAddrs string, request
*SendMes
 func processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand)
*SendResult {
 	var status SendStatus
 	switch cmd.Code {
-	case FlushDiskTimeout:
+	case ResFlushDiskTimeout:
 		status = SendFlushDiskTimeout
-	case FlushSlaveTimeout:
+	case ResFlushSlaveTimeout:
 		status = SendFlushSlaveTimeout
-	case SlaveNotAvailable:
+	case ResSlaveNotAvailable:
 		status = SendSlaveNotAvailable
-	case Success:
+	case ResSuccess:
 		status = SendOK
 	default:
 		// TODO process unknown code
@@ -145,7 +146,54 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin
 
 // PullMessage with sync
 func PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult,
error) {
-	return nil, nil
+	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
+
+	res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
+	if err != nil {
+		return nil, err
+	}
+
+	return processPullResponse(res)
+}
+
+func processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
+	pullResult := &PullResult{}
+	switch response.Code {
+	case ResSuccess:
+		pullResult.Status = PullFound
+	case ResPullNotFound:
+		pullResult.Status = PullNoNewMsg
+	case ResPullRetryImmediately:
+		pullResult.Status = PullNoMatchedMsg
+	case ResPullOffsetMoved:
+		pullResult.Status = PullOffsetIllegal
+	default:
+		return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
+	}
+
+	v, exist := response.ExtFields["maxOffset"]
+	if exist {
+		pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64)
+	}
+
+	v, exist = response.ExtFields["minOffset"]
+	if exist {
+		pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64)
+	}
+
+	v, exist = response.ExtFields["nextBeginOffset"]
+	if exist {
+		pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64)
+	}
+
+	v, exist = response.ExtFields["suggestWhichBrokerId"]
+	if exist {
+		pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
+	}
+
+	pullResult.messageExts = decodeMessage(response.Body)
+
+	return pullResult, nil
 }
 
 // PullMessageAsync pull message async
diff --git a/kernel/model.go b/kernel/model.go
index 9ee30cb..f9593ab 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -18,6 +18,8 @@ limitations under the License.
 package kernel
 
 import (
+	"bytes"
+	"encoding/binary"
 	"fmt"
 	"github.com/apache/rocketmq-client-go/utils"
 )
@@ -30,6 +32,9 @@ const (
 	SendFlushDiskTimeout
 	SendFlushSlaveTimeout
 	SendSlaveNotAvailable
+
+	FlagCompressed = 0x1
+	MsgIdLength    = 8 + 8
 )
 
 // SendResult RocketMQ send result
@@ -69,20 +74,14 @@ type PullResult struct {
 	MaxOffset            int64
 	Status               PullStatus
 	SuggestWhichBrokerId int64
-	messageBinary        []byte
 	messageExts          []*MessageExt
 }
 
 func (result *PullResult) GetMessageExts() []*MessageExt {
-	if result.messageExts != nil && len(result.messageExts) > 0 {
-		return result.messageExts
-	}
-
 	return result.messageExts
 }
 
 func (result *PullResult) SetMessageExts(msgExts []*MessageExt) {
-	result.messageBinary = nil
 	result.messageExts = msgExts
 }
 
@@ -93,6 +92,112 @@ func (result *PullResult) GetMessages() []*Message {
 	return toMessages(result.messageExts)
 }
 
+func decodeMessage(data []byte) []*MessageExt {
+	msgs := make([]*MessageExt, 0)
+	buf := bytes.NewBuffer(data)
+	count := 0
+	for count < len(data) {
+		msg := &MessageExt{}
+
+		// 1. total size
+		binary.Read(buf, binary.BigEndian, &msg.StoreSize)
+		count += 4
+
+		// 2. magic code
+		buf.Next(4)
+		count += 4
+
+		// 3. body CRC32
+		binary.Read(buf, binary.BigEndian, &msg.BodyCRC)
+		count += 4
+
+		// 4. queueID
+		binary.Read(buf, binary.BigEndian, &msg.QueueId)
+		count += 4
+
+		// 5. Flag
+		binary.Read(buf, binary.BigEndian, &msg.Flag)
+		count += 4
+
+		// 6. QueueOffset
+		binary.Read(buf, binary.BigEndian, &msg.QueueOffset)
+		count += 8
+
+		// 7. physical offset
+		binary.Read(buf, binary.BigEndian, &msg.CommitLogOffset)
+		count += 8
+
+		// 8. SysFlag
+		binary.Read(buf, binary.BigEndian, &msg.SysFlag)
+		count += 4
+
+		// 9. BornTimestamp
+		binary.Read(buf, binary.BigEndian, &msg.BornTimestamp)
+		count += 8
+
+		// 10. born host
+		hostBytes := buf.Next(4)
+		var port int32
+		binary.Read(buf, binary.BigEndian, &port)
+		msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
+		count += 8
+
+		// 11. store timestamp
+		binary.Read(buf, binary.BigEndian, &msg.StoreTimestamp)
+		count += 8
+
+		// 12. store host
+		hostBytes = buf.Next(4)
+		binary.Read(buf, binary.BigEndian, &port)
+		msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port)
+		count += 8
+
+		// 13. reconsume times
+		binary.Read(buf, binary.BigEndian, &msg.ReconsumeTimes)
+		count += 4
+
+		// 14. prepared transaction offset
+		binary.Read(buf, binary.BigEndian, &msg.PreparedTransactionOffset)
+		count += 8
+
+		// 15. body
+		var length int32
+		binary.Read(buf, binary.BigEndian, &length)
+		msg.Body = buf.Next(int(length))
+		if (msg.SysFlag & FlagCompressed) == FlagCompressed {
+			msg.Body = utils.UnCompress(msg.Body)
+		}
+		count += 4 + int(length)
+
+		// 16. topic
+		_byte, _ := buf.ReadByte()
+		msg.Topic = string(buf.Next(int(_byte)))
+		count += 1 + int(_byte)
+
+		var propertiesLength int16
+		binary.Read(buf, binary.BigEndian, &propertiesLength)
+		if propertiesLength > 0 {
+			msg.Properties = parseProperties(buf.Next(int(propertiesLength)))
+		}
+		count += 2 + int(propertiesLength)
+
+		msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset)
+		//count += 16
+
+		msgs = append(msgs, msg)
+	}
+
+	return msgs
+}
+
+func createMessageId(addr []byte, offset int64) string {
+	return "msgID" // TODO
+}
+
+func parseProperties(data []byte) map[string]string {
+	return make(map[string]string, 0)
+}
+
 func toMessages(messageExts []*MessageExt) []*Message {
 	msgs := make([]*Message, 0)
 
@@ -133,7 +238,7 @@ type (
 
 	MessageModel     int
 	ConsumeFromWhere int
-	ServiceState int
+	ServiceState     int
 )
 
 const (
diff --git a/kernel/request.go b/kernel/request.go
index 7967cca..c5e0fef 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -17,9 +17,12 @@ limitations under the License.
 
 package kernel
 
+import "fmt"
+
 const (
-	GetRouteInfoByTopic = int16(105)
-	SendBatchMessage    = int16(320)
+	ReqPullMessage         = int16(11)
+	ReqGetRouteInfoByTopic = int16(105)
+	ReqSendBatchMessage    = int16(320)
 )
 
 type SendMessageRequest struct {
@@ -59,6 +62,22 @@ type PullMessageRequest struct {
 	ExpressionType       string `json:"expressionType"`
 }
 
+func (request *PullMessageRequest) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["consumerGroup"] = request.ConsumerGroup
+	maps["topic"] = request.Topic
+	maps["queueId"] = fmt.Sprintf("%d", request.QueueOffset)
+	maps["queueOffset"] = fmt.Sprintf("%d", request.QueueOffset)
+	maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
+	maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
+	maps["commitOffset"] = fmt.Sprintf("%d", request.CommitOffset)
+	maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", request.SuspendTimeoutMillis)
+	maps["subscription"] = request.SubExpression
+	maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion)
+	maps["expressionType"] = request.ExpressionType
+	return maps
+}
+
 type GetMaxOffsetRequest struct {
 	Topic   string `json:"topic"`
 	QueueId int32  `json:"queueId"`
@@ -88,7 +107,9 @@ type GetRouteInfoRequest struct {
 }
 
 func (request *GetRouteInfoRequest) Encode() map[string]string {
-	return nil
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	return maps
 }
 
 func (request *GetRouteInfoRequest) Decode(properties map[string]string) error {
diff --git a/kernel/response.go b/kernel/response.go
index df1cecc..c4824a0 100644
--- a/kernel/response.go
+++ b/kernel/response.go
@@ -18,11 +18,14 @@ limitations under the License.
 package kernel
 
 const (
-	Success           = int16(0)
-	FlushDiskTimeout  = int16(10)
-	SlaveNotAvailable = int16(11)
-	FlushSlaveTimeout = int16(12)
-	TopicNotExist     = int16(17)
+	ResSuccess              = int16(0)
+	ResFlushDiskTimeout     = int16(10)
+	ResSlaveNotAvailable    = int16(11)
+	ResFlushSlaveTimeout    = int16(12)
+	ResTopicNotExist        = int16(17)
+	ResPullNotFound         = int16(19)
+	ResPullRetryImmediately = int16(20)
+	ResPullOffsetMoved      = int16(21)
 )
 
 type SendMessageResponse struct {
diff --git a/kernel/route.go b/kernel/route.go
index 38deb93..dc08edc 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -21,6 +21,8 @@ import (
 	"encoding/json"
 	"errors"
 	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/tidwall/gjson"
 	"sort"
 	"strconv"
 	"strings"
@@ -79,49 +81,49 @@ func UpdateTopicRouteInfo(topic string) {
 	lockNamesrv.Lock()
 	defer lockNamesrv.Unlock()
 
-	RouteData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+	routeData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
 	if err != nil {
-		log.Warningf("query topic route from server error: %s", err)
+		rlog.Warnf("query topic route from server error: %s", err)
 		return
 	}
 
-	if RouteData == nil {
-		log.Warningf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic)
+	if routeData == nil {
+		rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic)
 		return
 	}
 
 	var changed bool
 	oldRouteData, exist := routeDataMap.Load(topic)
-	if !exist || RouteData == nil {
+	if !exist || routeData == nil {
 		changed = true
 	} else {
-		changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), RouteData)
+		changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), routeData)
 	}
 
 	if !changed {
 		changed = isNeedUpdateTopicRouteInfo(topic)
 	} else {
-		log.Infof("the topic[%s] route info changed, old[%s] ,new[%s]", topic, oldRouteData, RouteData)
+		rlog.Infof("the topic[%s] route info changed, old[%v] ,new[%s]", topic, oldRouteData, routeData)
 	}
 
 	if !changed {
 		return
 	}
 
-	newTopicRouteData := RouteData.clone()
+	newTopicRouteData := routeData.clone()
 
-	for _, brokerData := range newTopicRouteData.brokerDataList {
-		brokerAddressesMap.Store(brokerData.brokerName, brokerData.brokerAddresses)
+	for _, brokerData := range newTopicRouteData.BrokerDataList {
+		brokerAddressesMap.Store(brokerData.BrokerName, brokerData.BrokerAddresses)
 	}
 
 	// update publish info
-	publishInfo := RouteData2PublishInfo(topic, RouteData)
+	publishInfo := routeData2PublishInfo(topic, routeData)
 	publishInfo.HaveTopicRouterInfo = true
 
 	old, _ := publishInfoMap.Load(topic)
 	publishInfoMap.Store(topic, publishInfoMap)
 	if old != nil {
-		log.Infof("Old TopicPublishInfo [%s] removed.", old)
+		rlog.Infof("Old TopicPublishInfo [%s] removed.", old)
 	}
 }
 
@@ -132,7 +134,7 @@ func FindBrokerAddressInPublish(brokerName string) string {
 		return ""
 	}
 
-	return bd.(*BrokerData).brokerAddresses[MasterId]
+	return bd.(*BrokerData).BrokerAddresses[MasterId]
 }
 
 func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool)
*FindBrokerResult {
@@ -142,15 +144,16 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64,
onlyThisBro
 		found      = false
 	)
 
-	bd, exist := brokerAddressesMap.Load(brokerName)
+	addrs, exist := brokerAddressesMap.Load(brokerName)
 
 	if exist {
-		for k, v := range bd.(*BrokerData).brokerAddresses {
+		for k, v := range addrs.(map[int64]string) {
 			if v != "" {
 				found = true
 				if k != MasterId {
 					slave = true
 				}
+				brokerAddr = v
 				break
 			}
 		}
@@ -159,7 +162,7 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
 	var result *FindBrokerResult
 	if found {
 		result = &FindBrokerResult{
-			BrokerAddr:    brokerName,
+			BrokerAddr:    brokerAddr,
 			Slave:         slave,
 			BrokerVersion: findBrokerVersion(brokerName, brokerAddr),
 		}
@@ -177,14 +180,13 @@ func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error)
{
 
 	mqs := make([]*MessageQueue, 0)
 
-	for _, qd := range routeData.queueDataList {
-		if queueIsReadable(qd.perm) {
-			for i := 0; i < qd.readQueueNums; i++ {
-				mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.brokerName, QueueId:
i})
+	for _, qd := range routeData.QueueDataList {
+		if queueIsReadable(qd.Perm) {
+			for i := 0; i < qd.ReadQueueNums; i++ {
+				mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId:
i})
 			}
 		}
 	}
-
 	return mqs, nil
 }
 
@@ -207,8 +209,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration)
(*topicR
 	request := &GetRouteInfoRequest{
 		Topic: topic,
 	}
-	rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil)
-
+	rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
 	response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
 
 	if err != nil {
@@ -216,18 +217,19 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration)
(*topicR
 	}
 
 	switch response.Code {
-	case Success:
+	case ResSuccess:
 		if response.Body == nil {
 			return nil, errors.New(response.Remark)
 		}
-		RouteData := &topicRouteData{}
-		err = json.Unmarshal(response.Body, RouteData)
+		routeData := &topicRouteData{}
+
+		err = routeData.decode(string(response.Body))
 		if err != nil {
-			log.Warningf("unmarshal topicRouteData error: %s", err)
+			rlog.Warnf("decode topicRouteData error: %s", err)
 			return nil, err
 		}
-		return RouteData, nil
-	case TopicNotExist:
+		return routeData, nil
+	case ResTopicNotExist:
 		return nil, ErrTopicNotExist
 	default:
 		return nil, errors.New(response.Remark)
@@ -241,17 +243,17 @@ func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData)
bo
 	oldDataCloned := oldData.clone()
 	newDataCloned := newData.clone()
 
-	sort.Slice(oldDataCloned.queueDataList, func(i, j int) bool {
-		return strings.Compare(oldDataCloned.queueDataList[i].brokerName, oldDataCloned.queueDataList[j].brokerName)
> 0
+	sort.Slice(oldDataCloned.QueueDataList, func(i, j int) bool {
+		return strings.Compare(oldDataCloned.QueueDataList[i].BrokerName, oldDataCloned.QueueDataList[j].BrokerName)
> 0
 	})
-	sort.Slice(oldDataCloned.brokerDataList, func(i, j int) bool {
-		return strings.Compare(oldDataCloned.brokerDataList[i].brokerName, oldDataCloned.brokerDataList[j].brokerName)
> 0
+	sort.Slice(oldDataCloned.BrokerDataList, func(i, j int) bool {
+		return strings.Compare(oldDataCloned.BrokerDataList[i].BrokerName, oldDataCloned.BrokerDataList[j].BrokerName)
> 0
 	})
-	sort.Slice(newDataCloned.queueDataList, func(i, j int) bool {
-		return strings.Compare(newDataCloned.queueDataList[i].brokerName, newDataCloned.queueDataList[j].brokerName)
> 0
+	sort.Slice(newDataCloned.QueueDataList, func(i, j int) bool {
+		return strings.Compare(newDataCloned.QueueDataList[i].BrokerName, newDataCloned.QueueDataList[j].BrokerName)
> 0
 	})
-	sort.Slice(newDataCloned.brokerDataList, func(i, j int) bool {
-		return strings.Compare(newDataCloned.brokerDataList[i].brokerName, newDataCloned.brokerDataList[j].brokerName)
> 0
+	sort.Slice(newDataCloned.BrokerDataList, func(i, j int) bool {
+		return strings.Compare(newDataCloned.BrokerDataList[i].BrokerName, newDataCloned.BrokerDataList[j].BrokerName)
> 0
 	})
 
 	return !oldDataCloned.equals(newDataCloned)
@@ -263,7 +265,7 @@ func isNeedUpdateTopicRouteInfo(topic string) bool {
 	return !exist || value.(*TopicPublishInfo).isOK()
 }
 
-func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo {
+func routeData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo {
 	publishInfo := &TopicPublishInfo{
 		RouteData:  data,
 		OrderTopic: false,
@@ -288,32 +290,32 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
 		return publishInfo
 	}
 
-	qds := data.queueDataList
+	qds := data.QueueDataList
 	sort.Slice(qds, func(i, j int) bool {
 		return i-j >= 0
 	})
 
 	for _, qd := range qds {
-		if !queueIsWriteable(qd.perm) {
+		if !queueIsWriteable(qd.Perm) {
 			continue
 		}
 
 		var bData *BrokerData
-		for _, bd := range data.brokerDataList {
-			if bd.brokerName == qd.brokerName {
+		for _, bd := range data.BrokerDataList {
+			if bd.BrokerName == qd.BrokerName {
 				bData = bd
 				break
 			}
 		}
 
-		if bData == nil || bData.brokerAddresses[MasterId] == "" {
+		if bData == nil || bData.BrokerAddresses[MasterId] == "" {
 			continue
 		}
 
-		for i := 0; i < qd.writeQueueNums; i++ {
+		for i := 0; i < qd.WriteQueueNums; i++ {
 			mq := &MessageQueue{
 				Topic:      topic,
-				BrokerName: qd.brokerName,
+				BrokerName: qd.BrokerName,
 				QueueId:    i,
 			}
 			publishInfo.MqList = append(publishInfo.MqList, mq)
@@ -324,50 +326,80 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo
 }
 
 func getNameServerAddress() string {
-	return ""
+	return "127.0.0.1:9876"
 }
 
 // topicRouteData topicRouteData
 type topicRouteData struct {
 	OrderTopicConf string
-	queueDataList  []*QueueData
-	brokerDataList []*BrokerData
+	QueueDataList  []*QueueData  `json:"queueDatas"`
+	BrokerDataList []*BrokerData `json:"brokerDatas"`
+}
+
+func (routeData *topicRouteData) decode(data string) error {
+	res := gjson.Parse(data)
+	json.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList)
+
+	bds := res.Get("brokerDatas").Array()
+	routeData.BrokerDataList = make([]*BrokerData, len(bds))
+	for idx, v := range bds {
+		bd := &BrokerData{
+			BrokerName:      v.Get("brokerName").String(),
+			Cluster:         v.Get("cluster").String(),
+			BrokerAddresses: make(map[int64]string, 0),
+		}
+		addrs := v.Get("brokerAddrs").String()
+		strs := strings.Split(addrs[1:len(addrs)-1], ",")
+		if strs != nil {
+			for _, str := range strs {
+				i := strings.Index(str, ":")
+				if i < 0 {
+					continue
+				}
+				id, _ := strconv.ParseInt(str[0:i], 10, 64)
+				bd.BrokerAddresses[id] = strings.Replace(str[i+1:], "\"", "", -1)
+			}
+		}
+		routeData.BrokerDataList[idx] = bd
+	}
+	return nil
 }
 
-func (RouteData *topicRouteData) clone() *topicRouteData {
+func (routeData *topicRouteData) clone() *topicRouteData {
 	cloned := &topicRouteData{
-		OrderTopicConf: RouteData.OrderTopicConf,
-		queueDataList:  make([]*QueueData, len(RouteData.queueDataList)),
-		brokerDataList: make([]*BrokerData, len(RouteData.brokerDataList)),
+		OrderTopicConf: routeData.OrderTopicConf,
+		QueueDataList:  make([]*QueueData, len(routeData.QueueDataList)),
+		BrokerDataList: make([]*BrokerData, len(routeData.BrokerDataList)),
 	}
 
-	for index, value := range RouteData.queueDataList {
-		cloned.queueDataList[index] = value
+	for index, value := range routeData.QueueDataList {
+		cloned.QueueDataList[index] = value
 	}
 
-	for index, value := range RouteData.brokerDataList {
-		cloned.brokerDataList[index] = value
+	for index, value := range routeData.BrokerDataList {
+		cloned.BrokerDataList[index] = value
 	}
 
 	return cloned
 }
 
-func (RouteData *topicRouteData) equals(data *topicRouteData) bool {
+func (routeData *topicRouteData) equals(data *topicRouteData) bool {
 	return false
 }
 
 // QueueData QueueData
 type QueueData struct {
-	brokerName     string
-	readQueueNums  int
-	writeQueueNums int
-	perm           int
-	topicSynFlag   int
+	BrokerName     string `json:"brokerName"`
+	ReadQueueNums  int    `json:"readQueueNums"`
+	WriteQueueNums int    `json:"writeQueueNums"`
+	Perm           int    `json:"perm"`
+	TopicSynFlag   int    `json:"topicSynFlag"`
 }
 
 // BrokerData BrokerData
 type BrokerData struct {
-	brokerName          string
-	brokerAddresses     map[int64]string
+	Cluster             string           `json:"cluster"`
+	BrokerName          string           `json:"brokerName"`
+	BrokerAddresses     map[int64]string `json:"brokerAddrs"`
 	brokerAddressesLock sync.RWMutex
 }
diff --git a/kernel/response.go b/kernel/route_test.go
similarity index 59%
copy from kernel/response.go
copy to kernel/route_test.go
index df1cecc..eb9c312 100644
--- a/kernel/response.go
+++ b/kernel/route_test.go
@@ -17,29 +17,10 @@ limitations under the License.
 
 package kernel
 
-const (
-	Success           = int16(0)
-	FlushDiskTimeout  = int16(10)
-	SlaveNotAvailable = int16(11)
-	FlushSlaveTimeout = int16(12)
-	TopicNotExist     = int16(17)
+import (
+	"testing"
 )
 
-type SendMessageResponse struct {
-	MsgId         string
-	QueueId       int32
-	QueueOffset   int64
-	TransactionId string
-	MsgRegion     string
-}
-
-func (response *SendMessageResponse) Decode(properties map[string]string) {
-
-}
-
-type PullMessageResponse struct {
-	SuggestWhichBrokerId int64
-	NextBeginOffset      int64
-	MinOffset            int64
-	MaxOffset            int64
+func TestUpdateTopicRouteInfo(t *testing.T) {
+	UpdateTopicRouteInfo("test")
 }
diff --git a/remote/client.go b/remote/client.go
index 2cdd6de..bfc4c36 100644
--- a/remote/client.go
+++ b/remote/client.go
@@ -30,7 +30,7 @@ import (
 var (
 	//ErrRequestTimeout for request timeout error
 	ErrRequestTimeout = errors.New("request timeout")
-	connectionLocker sync.Mutex
+	connectionLocker  sync.Mutex
 )
 
 //ResponseFuture for
@@ -160,7 +160,6 @@ func connect(addr string) (net.Conn, error) {
 	connectionTable.Store(addr, tcpConn)
 	go receiveResponse(tcpConn)
 	return tcpConn, nil
-
 }
 
 func receiveResponse(r net.Conn) {
@@ -195,8 +194,8 @@ func createScanner(r io.Reader) *bufio.Scanner {
 			if len(data) >= 4 {
 				var length int32
 				binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
-				if int(length) <= len(data) {
-					return int(length), data[:length], nil
+				if int(length)+4 <= len(data) {
+					return int(length) + 4, data[4 : length+4], nil
 				}
 			}
 		}
diff --git a/remote/codec.go b/remote/codec.go
index d556cfb..8e499fe 100644
--- a/remote/codec.go
+++ b/remote/codec.go
@@ -37,14 +37,13 @@ const (
 	ResponseType = 1
 
 	_Flag         = 0
-	_LanguageFlag = "golang"
 	_LanguageCode = byte(9)
 	_Version      = 137
 )
 
 type RemotingCommand struct {
 	Code      int16             `json:"code"`
-	Language  byte              `json:"language"`
+	Language  byte              `json:"-"`
 	Version   int16             `json:"version"`
 	Opaque    int32             `json:"opaque"`
 	Flag      int32             `json:"flag"`
@@ -59,11 +58,10 @@ type CustomHeader interface {
 
 func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand {
 	cmd := &RemotingCommand{
-		Code:     code,
-		Language: _LanguageCode,
-		Version:  _Version,
-		Opaque:   atomic.AddInt32(&opaque, 1),
-		Body:     body,
+		Code:    code,
+		Version: _Version,
+		Opaque:  atomic.AddInt32(&opaque, 1),
+		Body:    body,
 	}
 
 	if header != nil {
@@ -73,6 +71,11 @@ func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingC
 	return cmd
 }
 
+func (command *RemotingCommand) String() string {
+	return fmt.Sprintf("Code: %d, Opaque: %d, Remark: %s, ExtFields: %v",
+		command.Code, command.Opaque, command.Remark, command.ExtFields)
+}
+
 func (command *RemotingCommand) isResponseType() bool {
 	return command.Flag&(ResponseType) == ResponseType
 }
@@ -112,7 +115,7 @@ func encode(command *RemotingCommand) ([]byte, error) {
 		return nil, err
 	}
 
-	frameSize := 8 + len(header) + len(command.Body)
+	frameSize := 4 + len(header) + len(command.Body)
 	buf := bytes.NewBuffer(make([]byte, frameSize))
 	buf.Reset()
 
@@ -141,18 +144,14 @@ func encode(command *RemotingCommand) ([]byte, error) {
 
 func decode(data []byte) (*RemotingCommand, error) {
 	buf := bytes.NewBuffer(data)
-	var length int32
-	err := binary.Read(buf, binary.BigEndian, &length)
-	if err != nil {
-		return nil, err
-	}
+	length := int32(len(data))
 	var oriHeaderLen int32
-	err = binary.Read(buf, binary.BigEndian, &oriHeaderLen)
+	err := binary.Read(buf, binary.BigEndian, &oriHeaderLen)
 	if err != nil {
 		return nil, err
 	}
-	headerLength := oriHeaderLen & 0xFFFFFF
 
+	headerLength := oriHeaderLen & 0xFFFFFF
 	headerData := make([]byte, headerLength)
 	err = binary.Read(buf, binary.BigEndian, &headerData)
 	if err != nil {
@@ -160,7 +159,6 @@ func decode(data []byte) (*RemotingCommand, error) {
 	}
 
 	var command *RemotingCommand
-
 	switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType {
 	case JsonCodecs:
 		command, err = jsonSerializer.decodeHeader(headerData)
@@ -173,13 +171,15 @@ func decode(data []byte) (*RemotingCommand, error) {
 		return nil, err
 	}
 
-	bodyLength := length - 8 - headerLength
-	bodyData := make([]byte, bodyLength)
-	err = binary.Read(buf, binary.BigEndian, &bodyData)
-	if err != nil {
-		return nil, err
+	bodyLength := length - 4 - headerLength
+	if bodyLength > 0 {
+		bodyData := make([]byte, bodyLength)
+		err = binary.Read(buf, binary.BigEndian, &bodyData)
+		if err != nil {
+			return nil, err
+		}
+		command.Body = bodyData
 	}
-	command.Body = bodyData
 	return command, nil
 }
 
diff --git a/remote/codec_test.go b/remote/codec_test.go
index ab1a61e..e48b0bb 100644
--- a/remote/codec_test.go
+++ b/remote/codec_test.go
@@ -23,7 +23,6 @@ import (
 )
 
 type testHeader struct {
-
 }
 
 func (t testHeader) Encode() map[string]string {
diff --git a/utils/log.go b/rlog/log.go
similarity index 54%
rename from utils/log.go
rename to rlog/log.go
index d684efd..c73c9ba 100644
--- a/utils/log.go
+++ b/rlog/log.go
@@ -15,28 +15,77 @@
  *  limitations under the License.
  */
 
-package utils
+package rlog
 
-import "io"
-
-var RLog Logger
+import (
+	"github.com/sirupsen/logrus"
+)
 
 type Logger interface {
-	Output() io.Writer
-	SetOutput(w io.Writer)
-	Prefix() string
-	SetPrefix(p string)
-	SetHeader(h string)
 	Print(i ...interface{})
 	Printf(format string, args ...interface{})
 	Debug(i ...interface{})
 	Debugf(format string, args ...interface{})
 	Info(i ...interface{})
 	Infof(format string, args ...interface{})
-	Warning(i ...interface{})
-	Warningf(format string, args ...interface{})
+	Warn(i ...interface{})
+	Warnf(format string, args ...interface{})
 	Error(i ...interface{})
 	Errorf(format string, args ...interface{})
 	Fatal(i ...interface{})
 	Fatalf(format string, args ...interface{})
 }
+
+var rLog Logger = logrus.New()
+
+func SetLogger(log Logger) {
+	rLog = log
+}
+
+func Print(i ...interface{}) {
+	rLog.Print(i...)
+}
+
+func Printf(format string, args ...interface{}) {
+	rLog.Printf(format, args...)
+}
+
+func Debug(i ...interface{}) {
+	rLog.Debug(i...)
+}
+
+func Debugf(format string, args ...interface{}) {
+	rLog.Debugf(format, args...)
+}
+
+func Info(i ...interface{}) {
+	rLog.Info(i...)
+}
+
+func Infof(format string, args ...interface{}) {
+	rLog.Infof(format, args...)
+}
+
+func Warn(i ...interface{}) {
+	rLog.Warn(i...)
+}
+
+func Warnf(format string, args ...interface{}) {
+	rLog.Warnf(format, args...)
+}
+
+func Error(i ...interface{}) {
+	rLog.Error(i...)
+}
+
+func Errorf(format string, args ...interface{}) {
+	rLog.Errorf(format, args...)
+}
+
+func Fatal(i ...interface{}) {
+	rLog.Fatal(i...)
+}
+
+func Fatalf(format string, args ...interface{}) {
+	rLog.Fatalf(format, args...)
+}
diff --git a/utils/helper.go b/utils/helper.go
index bc18c26..485ae15 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -33,7 +33,7 @@ var (
 	counter        int16 = 0
 	startTimestamp int64 = 0
 	nextTimestamp  int64 = 0
-	prefix  string
+	prefix         string
 	locker         sync.Mutex
 )
 
@@ -79,7 +79,7 @@ func clientIP4() ([]byte, error) {
 	}
 	for _, addr := range addrs {
 		if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
-			if ip4 := ipnet.IP.To4(); ip4!=nil{
+			if ip4 := ipnet.IP.To4(); ip4 != nil {
 				return ip4, nil
 			}
 		}
@@ -87,7 +87,9 @@ func clientIP4() ([]byte, error) {
 	return nil, errors.New("unknown IP address")
 }
 
-
+func GetAddressByBytes(data []byte) string {
+	return "127.0.0.1"
+}
 
 func Pid() int16 {
 	return int16(os.Getpid())
@@ -104,3 +106,7 @@ func HashString(s string) int {
 	}
 	return int(crc32.ChecksumIEEE([]byte(s)))
 }
+
+func UnCompress(data []byte) []byte {
+	return data
+}
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 113e92c..4e1877c 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -28,14 +28,13 @@ func TestLocalIP(t *testing.T) {
 	ip := LocalIP()
 	if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 {
 		t.Errorf("failed to get host public ip4 address")
-	}else{
+	} else {
 		t.Logf("ip4 address: %v", ip)
 	}
 }
 
-
 func BenchmarkMessageClientID(b *testing.B) {
-	for i:= 0; i< b.N;i++{
-		 MessageClientID()
+	for i := 0; i < b.N; i++ {
+		MessageClientID()
 	}
-}
\ No newline at end of file
+}
diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go
index 9400f53..cab385a 100644
--- a/utils/ring_buffer.go
+++ b/utils/ring_buffer.go
@@ -19,8 +19,8 @@ package utils
 
 import (
 	"runtime"
-	"time"
 	"sync/atomic"
+	"time"
 )
 
 // 1.需要能够动态扩容
@@ -30,14 +30,14 @@ import (
 type RingNodesBuffer struct {
 	writePos uint64
 	readPos  uint64
-	mask  uint64
+	mask     uint64
 
 	nodes nodes
 }
 
 type node struct {
 	position uint64
-	buf     []byte
+	buf      []byte
 }
 
 type nodes []*node
@@ -106,10 +106,10 @@ L:
 // 直接返回数据
 func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) {
 	var (
-		node     *node
+		node  *node
 		pos   = atomic.LoadUint64(&r.readPos)
 		start time.Time
-		dif uint64
+		dif   uint64
 	)
 	if timeout > 0 {
 		start = time.Now()
@@ -143,12 +143,12 @@ L:
 }
 
 // 知道大小,传进去解析
-func (r *RingNodesBuffer) ReadBySize(data []byte,timeout time.Duration) (n int, err error)
{
+func (r *RingNodesBuffer) ReadBySize(data []byte, timeout time.Duration) (n int, err error)
{
 	var (
-		node     *node
+		node  *node
 		pos   = atomic.LoadUint64(&r.readPos)
 		start time.Time
-		dif uint64
+		dif   uint64
 	)
 	i := 0
 	if timeout > 0 {
@@ -176,12 +176,11 @@ L:
 			i++
 		}
 	}
-	n = copy(data,node.buf)
+	n = copy(data, node.buf)
 	atomic.StoreUint64(&node.position, pos+r.mask+1)
 	return
 }
 
-
 func (r *RingNodesBuffer) Size() uint64 {
 	return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos)
 
diff --git a/utils/ring_buffer_test.go b/utils/ring_buffer_test.go
index a119f21..2d54a85 100644
--- a/utils/ring_buffer_test.go
+++ b/utils/ring_buffer_test.go
@@ -18,15 +18,15 @@
 package utils
 
 import (
-	"time"
-	"testing"
+	"fmt"
 	"github.com/stretchr/testify/assert"
-	"sync"
 	"strconv"
-	"fmt"
+	"sync"
+	"testing"
+	"time"
 )
 
-func TestRingRead(t *testing.T)  {
+func TestRingRead(t *testing.T) {
 	rb := NewRingNodesBuffer(5)
 	assert.Equal(t, uint64(8), rb.Cap())
 
@@ -34,7 +34,7 @@ func TestRingRead(t *testing.T)  {
 	if !assert.Nil(t, err) {
 		return
 	}
-	data, err := rb.Read(1*time.Second)
+	data, err := rb.Read(1 * time.Second)
 	if !assert.Nil(t, err) {
 		return
 	}
@@ -42,8 +42,7 @@ func TestRingRead(t *testing.T)  {
 	assert.Equal(t, "hello", string(data))
 }
 
-
-func TestRingReadBySize(t *testing.T)  {
+func TestRingReadBySize(t *testing.T) {
 	rb := NewRingNodesBuffer(5)
 	assert.Equal(t, uint64(8), rb.Cap())
 
@@ -52,7 +51,7 @@ func TestRingReadBySize(t *testing.T)  {
 		return
 	}
 	sink := make([]byte, 5)
-	n, err := rb.ReadBySize(sink,1*time.Second)
+	n, err := rb.ReadBySize(sink, 1*time.Second)
 	if !assert.Nil(t, err) {
 		return
 	}
@@ -68,7 +67,6 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) {
 	b.ResetTimer()
 	b.ReportAllocs()
 
-
 	for i := 0; i < 100; i++ {
 		go func() {
 			for i := 0; i < b.N; i++ {
@@ -82,8 +80,8 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) {
 			for i := 0; i < b.N; i++ {
 				_ = len(strconv.Itoa(i))
 				var p []byte
-				p,_ = q.Read(1*time.Second)
-				fmt.Sprintf("%v",p)
+				p, _ = q.Read(1 * time.Second)
+				fmt.Sprintf("%v", p)
 
 			}
 			wg.Done()
@@ -93,8 +91,6 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) {
 	wg.Wait()
 }
 
-
-
 func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
 	q := NewRingNodesBuffer(uint64(b.N * 100))
 	var wg sync.WaitGroup
@@ -102,7 +98,6 @@ func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
 	b.ResetTimer()
 	b.ReportAllocs()
 
-
 	for i := 0; i < 100; i++ {
 		go func() {
 			for i := 0; i < b.N; i++ {
@@ -114,13 +109,13 @@ func BenchmarkRingBySizeBufferMPMC(b *testing.B) {
 	for i := 0; i < 100; i++ {
 		go func() {
 			for i := 0; i < b.N; i++ {
-				p := make([]byte,len(strconv.Itoa(i)))
-				q.ReadBySize(p,1*time.Second)
-				fmt.Sprintf("%v",p)
+				p := make([]byte, len(strconv.Itoa(i)))
+				q.ReadBySize(p, 1*time.Second)
+				fmt.Sprintf("%v", p)
 			}
 			wg.Done()
 		}()
 	}
 
 	wg.Wait()
-}
\ No newline at end of file
+}


Mime
View raw message