From commits-return-3235-apmail-rocketmq-commits-archive=rocketmq.apache.org@rocketmq.apache.org Mon Mar 18 06:43:35 2019 Return-Path: X-Original-To: apmail-rocketmq-commits-archive@minotaur.apache.org Delivered-To: apmail-rocketmq-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 232421951F for ; Mon, 18 Mar 2019 06:43:35 +0000 (UTC) Received: (qmail 20164 invoked by uid 500); 18 Mar 2019 06:43:34 -0000 Delivered-To: apmail-rocketmq-commits-archive@rocketmq.apache.org Received: (qmail 20134 invoked by uid 500); 18 Mar 2019 06:43:34 -0000 Mailing-List: contact commits-help@rocketmq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@rocketmq.apache.org Delivered-To: mailing list commits@rocketmq.apache.org Received: (qmail 20125 invoked by uid 99); 18 Mar 2019 06:43:34 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Mar 2019 06:43:34 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 816D582F23; Mon, 18 Mar 2019 06:43:33 +0000 (UTC) Date: Mon, 18 Mar 2019 06:43:33 +0000 To: "commits@rocketmq.apache.org" Subject: [rocketmq-client-go] branch native updated: Pull Consumer (#44) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155289141345.30849.6486106733633691648@gitbox.apache.org> From: dinglei@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: rocketmq-client-go X-Git-Refname: refs/heads/native X-Git-Reftype: branch X-Git-Oldrev: 4bbb1569ffa92274fc860c6efc23a8fd000c122b X-Git-Newrev: 93d47535c09e68908eb0123b003f41227a975c3b X-Git-Rev: 93d47535c09e68908eb0123b003f41227a975c3b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. dinglei pushed a commit to branch native in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git The following commit(s) were added to refs/heads/native by this push: new 93d4753 Pull Consumer (#44) 93d4753 is described below commit 93d47535c09e68908eb0123b003f41227a975c3b Author: wenfeng AuthorDate: Mon Mar 18 14:43:29 2019 +0800 Pull Consumer (#44) * fix bugs of remote & refactor logger * consumer can pull message from broker --- consumer.go | 41 ++++++++- consumer_test.go | 18 ++++ core/producer.go | 10 +-- core/push_consumer.go | 4 +- go.mod | 5 +- go.sum | 6 ++ kernel/client.go | 70 ++++++++++++--- kernel/model.go | 119 +++++++++++++++++++++++-- kernel/request.go | 27 +++++- kernel/response.go | 13 +-- kernel/route.go | 160 ++++++++++++++++++++-------------- kernel/{response.go => route_test.go} | 27 +----- remote/client.go | 7 +- remote/codec.go | 44 +++++----- remote/codec_test.go | 1 - {utils => rlog}/log.go | 71 ++++++++++++--- utils/helper.go | 12 ++- utils/helper_test.go | 9 +- utils/ring_buffer.go | 19 ++-- utils/ring_buffer_test.go | 33 +++---- 20 files changed, 496 insertions(+), 200 deletions(-) diff --git a/consumer.go b/consumer.go index acb8f1d..5119f93 100644 --- a/consumer.go +++ b/consumer.go @@ -22,18 +22,25 @@ import ( "errors" "fmt" "github.com/apache/rocketmq-client-go/kernel" + "github.com/apache/rocketmq-client-go/rlog" "strconv" "sync" + "sync/atomic" "time" ) type Consumer interface { + Start() Pull(topic, expression string, numbers int) (*kernel.PullResult, error) SubscribeWithChan(topic, expression string) (chan *kernel.Message, error) SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) ConsumeResult) error ACK(msg *kernel.Message, result ConsumeResult) } +var ( + queueCounterTable sync.Map +) + type ConsumeResult int type ConsumerType int @@ -42,6 +49,8 @@ const ( Original ConsumerType = iota Orderly Transaction + + SubAll = "*" ) type ConsumerConfig struct { @@ -65,9 +74,12 @@ type defaultConsumer struct { config ConsumerConfig } +func (c *defaultConsumer) Start() { + c.state = kernel.Running +} + func (c *defaultConsumer) Pull(topic, expression string, numbers int) (*kernel.PullResult, error) { mq := getNextQueueOf(topic) - if mq == nil { return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic) } @@ -210,11 +222,33 @@ func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data } func getSubscriptionData(mq *kernel.MessageQueue, exp string) *kernel.SubscriptionData { - return nil + subData := &kernel.SubscriptionData{ + Topic: mq.Topic, + } + if exp == "" || exp == SubAll { + subData.SubString = SubAll + } else { + // TODO + } + return subData } func getNextQueueOf(topic string) *kernel.MessageQueue { - return nil + queues, err := kernel.FetchSubscribeMessageQueues(topic) + if err != nil && len(queues) > 0 { + rlog.Error(err.Error()) + return nil + } + var index int64 + v, exist := queueCounterTable.Load(topic) + if !exist { + index = -1 + queueCounterTable.Store(topic, 0) + } else { + index = v.(int64) + } + + return queues[int(atomic.AddInt64(&index, 1))%len(queues)] } func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 { @@ -248,7 +282,6 @@ func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult { if result == nil { kernel.UpdateTopicRouteInfo(mq.Topic) } - return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false) } diff --git a/consumer_test.go b/consumer_test.go new file mode 100644 index 0000000..aaba3cb --- /dev/null +++ b/consumer_test.go @@ -0,0 +1,18 @@ +package rocketmq + +import ( + "fmt" + "testing" +) + +func TestDefaultConsumer_Pull(t *testing.T) { + consumer := NewConsumer(ConsumerConfig{ + GroupName: "testGroup", + }) + consumer.Start() + result, err := consumer.Pull("test", "*", 32) + if err != nil { + t.Fatal(err.Error()) + } + fmt.Println(len(result.GetMessageExts())) +} diff --git a/core/producer.go b/core/producer.go index 7df57e6..22b49ed 100644 --- a/core/producer.go +++ b/core/producer.go @@ -33,7 +33,7 @@ int queueSelectorCallback_cgo(int size, CMessage *msg, void *selectorKey) { import "C" import ( "errors" - log "github.com/sirupsen/logrus" + "github.com/apache/rocketmq-client-go/rlog" "unsafe" ) @@ -211,7 +211,7 @@ func (p *defaultProducer) SendMessageSync(msg *Message) (*SendResult, error) { err := rmqError(C.SendMessageSync(p.cproduer, cmsg, &sr)) if err != NIL { - log.Warnf("send message error, error is: %s", err.Error()) + rlog.Warnf("send message error, error is: %s", err.Error()) return nil, err } @@ -237,7 +237,7 @@ func (p *defaultProducer) SendMessageOrderly(msg *Message, selector MessageQueue &sr)) if err != NIL { - log.Warnf("send message orderly error, error is: %s", err.Error()) + rlog.Warnf("send message orderly error, error is: %s", err.Error()) return nil, err } @@ -254,10 +254,10 @@ func (p *defaultProducer) SendMessageOneway(msg *Message) error { err := rmqError(C.SendMessageOneway(p.cproduer, cmsg)) if err != NIL { - log.Warnf("send message with oneway error, error is: %s", err.Error()) + rlog.Warnf("send message with oneway error, error is: %s", err.Error()) return err } - log.Debugf("Send Message: %s with oneway success.", msg.String()) + rlog.Debugf("Send Message: %s with oneway success.", msg.String()) return nil } diff --git a/core/push_consumer.go b/core/push_consumer.go index c222587..afb0d06 100644 --- a/core/push_consumer.go +++ b/core/push_consumer.go @@ -34,7 +34,7 @@ import "C" import ( "errors" "fmt" - log "github.com/sirupsen/logrus" + "github.com/apache/rocketmq-client-go/rlog" "sync" "unsafe" ) @@ -226,6 +226,6 @@ func (c *defaultPushConsumer) Subscribe(topic, expression string, consumeFunc fu return err } c.funcsMap.Store(topic, consumeFunc) - log.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression) + rlog.Infof("subscribe topic[%s] with expression[%s] successfully.", topic, expression) return nil } diff --git a/go.mod b/go.mod index fa910f6..2701b7c 100644 --- a/go.mod +++ b/go.mod @@ -1,11 +1,14 @@ module github.com/apache/rocketmq-client-go -go 1.12 +go 1.11 require ( github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect github.com/sirupsen/logrus v1.3.0 github.com/stretchr/testify v1.3.0 + github.com/tidwall/gjson v1.2.1 + github.com/tidwall/match v1.0.1 // indirect + github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect gopkg.in/alecthomas/kingpin.v2 v2.2.6 ) diff --git a/go.sum b/go.sum index 0b0e0c1..7a45ece 100644 --- a/go.sum +++ b/go.sum @@ -16,6 +16,12 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/tidwall/gjson v1.2.1 h1:j0efZLrZUvNerEf6xqoi0NjWMK5YlLrR7Guo/dxY174= +github.com/tidwall/gjson v1.2.1/go.mod h1:c/nTNbUr0E0OrXEhq1pwa8iEgc2DOt4ZZqAt1HtCkPA= +github.com/tidwall/match v1.0.1 h1:PnKP62LPNxHKTwvHHZZzdOAOCtsJTjo6dZLCwpKm5xc= +github.com/tidwall/match v1.0.1/go.mod h1:LujAq0jyVjBy028G1WhWfIzbpQfMO8bBZ6Tyb0+pL9E= +github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 h1:BP2bjP495BBPaBcS5rmqviTfrOkN5rO5ceKAMRZCRFc= +github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793 h1:u+LnwYTOOW7Ukr/fppxEb1Nwz0AtPflrblfvUudpo+I= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8= diff --git a/kernel/client.go b/kernel/client.go index 6e4808a..f9185ac 100644 --- a/kernel/client.go +++ b/kernel/client.go @@ -20,7 +20,9 @@ package kernel import ( "context" "errors" + "fmt" "github.com/apache/rocketmq-client-go/remote" + "github.com/apache/rocketmq-client-go/rlog" "github.com/apache/rocketmq-client-go/utils" "os" "strconv" @@ -34,7 +36,6 @@ const ( ) var ( - log = utils.RLog namesrvAddrs = os.Getenv("rocketmq.namesrv.addr") clientIP = utils.LocalIP() instanceName = os.Getenv("rocketmq.client.name") @@ -71,10 +72,10 @@ type InnerConsumer interface { // SendMessage with batch by sync func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest, msgs []*Message) (*SendResult, error) { - cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs)) - response, err := remote.InvokeSync(brokerAddrs, cmd, 3 * time.Second) + cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs)) + response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second) if err != nil { - log.Warningf("send messages with sync error: %v", err) + rlog.Warnf("send messages with sync error: %v", err) return nil, err } @@ -89,10 +90,10 @@ func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, reque func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest, msgs []*Message) (*SendResult, error) { - cmd := remote.NewRemotingCommand(SendBatchMessage, request, encodeMessages(msgs)) + cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs)) err := remote.InvokeOneWay(brokerAddrs, cmd) if err != nil { - log.Warningf("send messages with oneway error: %v", err) + rlog.Warnf("send messages with oneway error: %v", err) } return nil, err } @@ -100,13 +101,13 @@ func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMes func processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand) *SendResult { var status SendStatus switch cmd.Code { - case FlushDiskTimeout: + case ResFlushDiskTimeout: status = SendFlushDiskTimeout - case FlushSlaveTimeout: + case ResFlushSlaveTimeout: status = SendFlushSlaveTimeout - case SlaveNotAvailable: + case ResSlaveNotAvailable: status = SendSlaveNotAvailable - case Success: + case ResSuccess: status = SendOK default: // TODO process unknown code @@ -145,7 +146,54 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin // PullMessage with sync func PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) { - return nil, nil + cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil) + + res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second) + if err != nil { + return nil, err + } + + return processPullResponse(res) +} + +func processPullResponse(response *remote.RemotingCommand) (*PullResult, error) { + pullResult := &PullResult{} + switch response.Code { + case ResSuccess: + pullResult.Status = PullFound + case ResPullNotFound: + pullResult.Status = PullNoNewMsg + case ResPullRetryImmediately: + pullResult.Status = PullNoMatchedMsg + case ResPullOffsetMoved: + pullResult.Status = PullOffsetIllegal + default: + return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark) + } + + v, exist := response.ExtFields["maxOffset"] + if exist { + pullResult.MaxOffset, _ = strconv.ParseInt(v, 10, 64) + } + + v, exist = response.ExtFields["minOffset"] + if exist { + pullResult.MinOffset, _ = strconv.ParseInt(v, 10, 64) + } + + v, exist = response.ExtFields["nextBeginOffset"] + if exist { + pullResult.NextBeginOffset, _ = strconv.ParseInt(v, 10, 64) + } + + v, exist = response.ExtFields["suggestWhichBrokerId"] + if exist { + pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64) + } + + pullResult.messageExts = decodeMessage(response.Body) + + return pullResult, nil } // PullMessageAsync pull message async diff --git a/kernel/model.go b/kernel/model.go index 9ee30cb..f9593ab 100644 --- a/kernel/model.go +++ b/kernel/model.go @@ -18,6 +18,8 @@ limitations under the License. package kernel import ( + "bytes" + "encoding/binary" "fmt" "github.com/apache/rocketmq-client-go/utils" ) @@ -30,6 +32,9 @@ const ( SendFlushDiskTimeout SendFlushSlaveTimeout SendSlaveNotAvailable + + FlagCompressed = 0x1 + MsgIdLength = 8 + 8 ) // SendResult RocketMQ send result @@ -69,20 +74,14 @@ type PullResult struct { MaxOffset int64 Status PullStatus SuggestWhichBrokerId int64 - messageBinary []byte messageExts []*MessageExt } func (result *PullResult) GetMessageExts() []*MessageExt { - if result.messageExts != nil && len(result.messageExts) > 0 { - return result.messageExts - } - return result.messageExts } func (result *PullResult) SetMessageExts(msgExts []*MessageExt) { - result.messageBinary = nil result.messageExts = msgExts } @@ -93,6 +92,112 @@ func (result *PullResult) GetMessages() []*Message { return toMessages(result.messageExts) } +func decodeMessage(data []byte) []*MessageExt { + msgs := make([]*MessageExt, 0) + buf := bytes.NewBuffer(data) + count := 0 + for count < len(data) { + msg := &MessageExt{} + + // 1. total size + binary.Read(buf, binary.BigEndian, &msg.StoreSize) + count += 4 + + // 2. magic code + buf.Next(4) + count += 4 + + // 3. body CRC32 + binary.Read(buf, binary.BigEndian, &msg.BodyCRC) + count += 4 + + // 4. queueID + binary.Read(buf, binary.BigEndian, &msg.QueueId) + count += 4 + + // 5. Flag + binary.Read(buf, binary.BigEndian, &msg.Flag) + count += 4 + + // 6. QueueOffset + binary.Read(buf, binary.BigEndian, &msg.QueueOffset) + count += 8 + + // 7. physical offset + binary.Read(buf, binary.BigEndian, &msg.CommitLogOffset) + count += 8 + + // 8. SysFlag + binary.Read(buf, binary.BigEndian, &msg.SysFlag) + count += 4 + + // 9. BornTimestamp + binary.Read(buf, binary.BigEndian, &msg.BornTimestamp) + count += 8 + + // 10. born host + hostBytes := buf.Next(4) + var port int32 + binary.Read(buf, binary.BigEndian, &port) + msg.BornHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port) + count += 8 + + // 11. store timestamp + binary.Read(buf, binary.BigEndian, &msg.StoreTimestamp) + count += 8 + + // 12. store host + hostBytes = buf.Next(4) + binary.Read(buf, binary.BigEndian, &port) + msg.StoreHost = fmt.Sprintf("%s:%d", utils.GetAddressByBytes(hostBytes), port) + count += 8 + + // 13. reconsume times + binary.Read(buf, binary.BigEndian, &msg.ReconsumeTimes) + count += 4 + + // 14. prepared transaction offset + binary.Read(buf, binary.BigEndian, &msg.PreparedTransactionOffset) + count += 8 + + // 15. body + var length int32 + binary.Read(buf, binary.BigEndian, &length) + msg.Body = buf.Next(int(length)) + if (msg.SysFlag & FlagCompressed) == FlagCompressed { + msg.Body = utils.UnCompress(msg.Body) + } + count += 4 + int(length) + + // 16. topic + _byte, _ := buf.ReadByte() + msg.Topic = string(buf.Next(int(_byte))) + count += 1 + int(_byte) + + var propertiesLength int16 + binary.Read(buf, binary.BigEndian, &propertiesLength) + if propertiesLength > 0 { + msg.Properties = parseProperties(buf.Next(int(propertiesLength))) + } + count += 2 + int(propertiesLength) + + msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset) + //count += 16 + + msgs = append(msgs, msg) + } + + return msgs +} + +func createMessageId(addr []byte, offset int64) string { + return "msgID" // TODO +} + +func parseProperties(data []byte) map[string]string { + return make(map[string]string, 0) +} + func toMessages(messageExts []*MessageExt) []*Message { msgs := make([]*Message, 0) @@ -133,7 +238,7 @@ type ( MessageModel int ConsumeFromWhere int - ServiceState int + ServiceState int ) const ( diff --git a/kernel/request.go b/kernel/request.go index 7967cca..c5e0fef 100644 --- a/kernel/request.go +++ b/kernel/request.go @@ -17,9 +17,12 @@ limitations under the License. package kernel +import "fmt" + const ( - GetRouteInfoByTopic = int16(105) - SendBatchMessage = int16(320) + ReqPullMessage = int16(11) + ReqGetRouteInfoByTopic = int16(105) + ReqSendBatchMessage = int16(320) ) type SendMessageRequest struct { @@ -59,6 +62,22 @@ type PullMessageRequest struct { ExpressionType string `json:"expressionType"` } +func (request *PullMessageRequest) Encode() map[string]string { + maps := make(map[string]string) + maps["consumerGroup"] = request.ConsumerGroup + maps["topic"] = request.Topic + maps["queueId"] = fmt.Sprintf("%d", request.QueueOffset) + maps["queueOffset"] = fmt.Sprintf("%d", request.QueueOffset) + maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums) + maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag) + maps["commitOffset"] = fmt.Sprintf("%d", request.CommitOffset) + maps["suspendTimeoutMillis"] = fmt.Sprintf("%d", request.SuspendTimeoutMillis) + maps["subscription"] = request.SubExpression + maps["subVersion"] = fmt.Sprintf("%d", request.SubVersion) + maps["expressionType"] = request.ExpressionType + return maps +} + type GetMaxOffsetRequest struct { Topic string `json:"topic"` QueueId int32 `json:"queueId"` @@ -88,7 +107,9 @@ type GetRouteInfoRequest struct { } func (request *GetRouteInfoRequest) Encode() map[string]string { - return nil + maps := make(map[string]string) + maps["topic"] = request.Topic + return maps } func (request *GetRouteInfoRequest) Decode(properties map[string]string) error { diff --git a/kernel/response.go b/kernel/response.go index df1cecc..c4824a0 100644 --- a/kernel/response.go +++ b/kernel/response.go @@ -18,11 +18,14 @@ limitations under the License. package kernel const ( - Success = int16(0) - FlushDiskTimeout = int16(10) - SlaveNotAvailable = int16(11) - FlushSlaveTimeout = int16(12) - TopicNotExist = int16(17) + ResSuccess = int16(0) + ResFlushDiskTimeout = int16(10) + ResSlaveNotAvailable = int16(11) + ResFlushSlaveTimeout = int16(12) + ResTopicNotExist = int16(17) + ResPullNotFound = int16(19) + ResPullRetryImmediately = int16(20) + ResPullOffsetMoved = int16(21) ) type SendMessageResponse struct { diff --git a/kernel/route.go b/kernel/route.go index 38deb93..dc08edc 100644 --- a/kernel/route.go +++ b/kernel/route.go @@ -21,6 +21,8 @@ import ( "encoding/json" "errors" "github.com/apache/rocketmq-client-go/remote" + "github.com/apache/rocketmq-client-go/rlog" + "github.com/tidwall/gjson" "sort" "strconv" "strings" @@ -79,49 +81,49 @@ func UpdateTopicRouteInfo(topic string) { lockNamesrv.Lock() defer lockNamesrv.Unlock() - RouteData, err := queryTopicRouteInfoFromServer(topic, requestTimeout) + routeData, err := queryTopicRouteInfoFromServer(topic, requestTimeout) if err != nil { - log.Warningf("query topic route from server error: %s", err) + rlog.Warnf("query topic route from server error: %s", err) return } - if RouteData == nil { - log.Warningf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic) + if routeData == nil { + rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic) return } var changed bool oldRouteData, exist := routeDataMap.Load(topic) - if !exist || RouteData == nil { + if !exist || routeData == nil { changed = true } else { - changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), RouteData) + changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), routeData) } if !changed { changed = isNeedUpdateTopicRouteInfo(topic) } else { - log.Infof("the topic[%s] route info changed, old[%s] ,new[%s]", topic, oldRouteData, RouteData) + rlog.Infof("the topic[%s] route info changed, old[%v] ,new[%s]", topic, oldRouteData, routeData) } if !changed { return } - newTopicRouteData := RouteData.clone() + newTopicRouteData := routeData.clone() - for _, brokerData := range newTopicRouteData.brokerDataList { - brokerAddressesMap.Store(brokerData.brokerName, brokerData.brokerAddresses) + for _, brokerData := range newTopicRouteData.BrokerDataList { + brokerAddressesMap.Store(brokerData.BrokerName, brokerData.BrokerAddresses) } // update publish info - publishInfo := RouteData2PublishInfo(topic, RouteData) + publishInfo := routeData2PublishInfo(topic, routeData) publishInfo.HaveTopicRouterInfo = true old, _ := publishInfoMap.Load(topic) publishInfoMap.Store(topic, publishInfoMap) if old != nil { - log.Infof("Old TopicPublishInfo [%s] removed.", old) + rlog.Infof("Old TopicPublishInfo [%s] removed.", old) } } @@ -132,7 +134,7 @@ func FindBrokerAddressInPublish(brokerName string) string { return "" } - return bd.(*BrokerData).brokerAddresses[MasterId] + return bd.(*BrokerData).BrokerAddresses[MasterId] } func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBroker bool) *FindBrokerResult { @@ -142,15 +144,16 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro found = false ) - bd, exist := brokerAddressesMap.Load(brokerName) + addrs, exist := brokerAddressesMap.Load(brokerName) if exist { - for k, v := range bd.(*BrokerData).brokerAddresses { + for k, v := range addrs.(map[int64]string) { if v != "" { found = true if k != MasterId { slave = true } + brokerAddr = v break } } @@ -159,7 +162,7 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro var result *FindBrokerResult if found { result = &FindBrokerResult{ - BrokerAddr: brokerName, + BrokerAddr: brokerAddr, Slave: slave, BrokerVersion: findBrokerVersion(brokerName, brokerAddr), } @@ -177,14 +180,13 @@ func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) { mqs := make([]*MessageQueue, 0) - for _, qd := range routeData.queueDataList { - if queueIsReadable(qd.perm) { - for i := 0; i < qd.readQueueNums; i++ { - mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.brokerName, QueueId: i}) + for _, qd := range routeData.QueueDataList { + if queueIsReadable(qd.Perm) { + for i := 0; i < qd.ReadQueueNums; i++ { + mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i}) } } } - return mqs, nil } @@ -207,8 +209,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR request := &GetRouteInfoRequest{ Topic: topic, } - rc := remote.NewRemotingCommand(GetRouteInfoByTopic, request, nil) - + rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil) response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout) if err != nil { @@ -216,18 +217,19 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR } switch response.Code { - case Success: + case ResSuccess: if response.Body == nil { return nil, errors.New(response.Remark) } - RouteData := &topicRouteData{} - err = json.Unmarshal(response.Body, RouteData) + routeData := &topicRouteData{} + + err = routeData.decode(string(response.Body)) if err != nil { - log.Warningf("unmarshal topicRouteData error: %s", err) + rlog.Warnf("decode topicRouteData error: %s", err) return nil, err } - return RouteData, nil - case TopicNotExist: + return routeData, nil + case ResTopicNotExist: return nil, ErrTopicNotExist default: return nil, errors.New(response.Remark) @@ -241,17 +243,17 @@ func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData) bo oldDataCloned := oldData.clone() newDataCloned := newData.clone() - sort.Slice(oldDataCloned.queueDataList, func(i, j int) bool { - return strings.Compare(oldDataCloned.queueDataList[i].brokerName, oldDataCloned.queueDataList[j].brokerName) > 0 + sort.Slice(oldDataCloned.QueueDataList, func(i, j int) bool { + return strings.Compare(oldDataCloned.QueueDataList[i].BrokerName, oldDataCloned.QueueDataList[j].BrokerName) > 0 }) - sort.Slice(oldDataCloned.brokerDataList, func(i, j int) bool { - return strings.Compare(oldDataCloned.brokerDataList[i].brokerName, oldDataCloned.brokerDataList[j].brokerName) > 0 + sort.Slice(oldDataCloned.BrokerDataList, func(i, j int) bool { + return strings.Compare(oldDataCloned.BrokerDataList[i].BrokerName, oldDataCloned.BrokerDataList[j].BrokerName) > 0 }) - sort.Slice(newDataCloned.queueDataList, func(i, j int) bool { - return strings.Compare(newDataCloned.queueDataList[i].brokerName, newDataCloned.queueDataList[j].brokerName) > 0 + sort.Slice(newDataCloned.QueueDataList, func(i, j int) bool { + return strings.Compare(newDataCloned.QueueDataList[i].BrokerName, newDataCloned.QueueDataList[j].BrokerName) > 0 }) - sort.Slice(newDataCloned.brokerDataList, func(i, j int) bool { - return strings.Compare(newDataCloned.brokerDataList[i].brokerName, newDataCloned.brokerDataList[j].brokerName) > 0 + sort.Slice(newDataCloned.BrokerDataList, func(i, j int) bool { + return strings.Compare(newDataCloned.BrokerDataList[i].BrokerName, newDataCloned.BrokerDataList[j].BrokerName) > 0 }) return !oldDataCloned.equals(newDataCloned) @@ -263,7 +265,7 @@ func isNeedUpdateTopicRouteInfo(topic string) bool { return !exist || value.(*TopicPublishInfo).isOK() } -func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo { +func routeData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo { publishInfo := &TopicPublishInfo{ RouteData: data, OrderTopic: false, @@ -288,32 +290,32 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo return publishInfo } - qds := data.queueDataList + qds := data.QueueDataList sort.Slice(qds, func(i, j int) bool { return i-j >= 0 }) for _, qd := range qds { - if !queueIsWriteable(qd.perm) { + if !queueIsWriteable(qd.Perm) { continue } var bData *BrokerData - for _, bd := range data.brokerDataList { - if bd.brokerName == qd.brokerName { + for _, bd := range data.BrokerDataList { + if bd.BrokerName == qd.BrokerName { bData = bd break } } - if bData == nil || bData.brokerAddresses[MasterId] == "" { + if bData == nil || bData.BrokerAddresses[MasterId] == "" { continue } - for i := 0; i < qd.writeQueueNums; i++ { + for i := 0; i < qd.WriteQueueNums; i++ { mq := &MessageQueue{ Topic: topic, - BrokerName: qd.brokerName, + BrokerName: qd.BrokerName, QueueId: i, } publishInfo.MqList = append(publishInfo.MqList, mq) @@ -324,50 +326,80 @@ func RouteData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo } func getNameServerAddress() string { - return "" + return "127.0.0.1:9876" } // topicRouteData topicRouteData type topicRouteData struct { OrderTopicConf string - queueDataList []*QueueData - brokerDataList []*BrokerData + QueueDataList []*QueueData `json:"queueDatas"` + BrokerDataList []*BrokerData `json:"brokerDatas"` +} + +func (routeData *topicRouteData) decode(data string) error { + res := gjson.Parse(data) + json.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList) + + bds := res.Get("brokerDatas").Array() + routeData.BrokerDataList = make([]*BrokerData, len(bds)) + for idx, v := range bds { + bd := &BrokerData{ + BrokerName: v.Get("brokerName").String(), + Cluster: v.Get("cluster").String(), + BrokerAddresses: make(map[int64]string, 0), + } + addrs := v.Get("brokerAddrs").String() + strs := strings.Split(addrs[1:len(addrs)-1], ",") + if strs != nil { + for _, str := range strs { + i := strings.Index(str, ":") + if i < 0 { + continue + } + id, _ := strconv.ParseInt(str[0:i], 10, 64) + bd.BrokerAddresses[id] = strings.Replace(str[i+1:], "\"", "", -1) + } + } + routeData.BrokerDataList[idx] = bd + } + return nil } -func (RouteData *topicRouteData) clone() *topicRouteData { +func (routeData *topicRouteData) clone() *topicRouteData { cloned := &topicRouteData{ - OrderTopicConf: RouteData.OrderTopicConf, - queueDataList: make([]*QueueData, len(RouteData.queueDataList)), - brokerDataList: make([]*BrokerData, len(RouteData.brokerDataList)), + OrderTopicConf: routeData.OrderTopicConf, + QueueDataList: make([]*QueueData, len(routeData.QueueDataList)), + BrokerDataList: make([]*BrokerData, len(routeData.BrokerDataList)), } - for index, value := range RouteData.queueDataList { - cloned.queueDataList[index] = value + for index, value := range routeData.QueueDataList { + cloned.QueueDataList[index] = value } - for index, value := range RouteData.brokerDataList { - cloned.brokerDataList[index] = value + for index, value := range routeData.BrokerDataList { + cloned.BrokerDataList[index] = value } return cloned } -func (RouteData *topicRouteData) equals(data *topicRouteData) bool { +func (routeData *topicRouteData) equals(data *topicRouteData) bool { return false } // QueueData QueueData type QueueData struct { - brokerName string - readQueueNums int - writeQueueNums int - perm int - topicSynFlag int + BrokerName string `json:"brokerName"` + ReadQueueNums int `json:"readQueueNums"` + WriteQueueNums int `json:"writeQueueNums"` + Perm int `json:"perm"` + TopicSynFlag int `json:"topicSynFlag"` } // BrokerData BrokerData type BrokerData struct { - brokerName string - brokerAddresses map[int64]string + Cluster string `json:"cluster"` + BrokerName string `json:"brokerName"` + BrokerAddresses map[int64]string `json:"brokerAddrs"` brokerAddressesLock sync.RWMutex } diff --git a/kernel/response.go b/kernel/route_test.go similarity index 59% copy from kernel/response.go copy to kernel/route_test.go index df1cecc..eb9c312 100644 --- a/kernel/response.go +++ b/kernel/route_test.go @@ -17,29 +17,10 @@ limitations under the License. package kernel -const ( - Success = int16(0) - FlushDiskTimeout = int16(10) - SlaveNotAvailable = int16(11) - FlushSlaveTimeout = int16(12) - TopicNotExist = int16(17) +import ( + "testing" ) -type SendMessageResponse struct { - MsgId string - QueueId int32 - QueueOffset int64 - TransactionId string - MsgRegion string -} - -func (response *SendMessageResponse) Decode(properties map[string]string) { - -} - -type PullMessageResponse struct { - SuggestWhichBrokerId int64 - NextBeginOffset int64 - MinOffset int64 - MaxOffset int64 +func TestUpdateTopicRouteInfo(t *testing.T) { + UpdateTopicRouteInfo("test") } diff --git a/remote/client.go b/remote/client.go index 2cdd6de..bfc4c36 100644 --- a/remote/client.go +++ b/remote/client.go @@ -30,7 +30,7 @@ import ( var ( //ErrRequestTimeout for request timeout error ErrRequestTimeout = errors.New("request timeout") - connectionLocker sync.Mutex + connectionLocker sync.Mutex ) //ResponseFuture for @@ -160,7 +160,6 @@ func connect(addr string) (net.Conn, error) { connectionTable.Store(addr, tcpConn) go receiveResponse(tcpConn) return tcpConn, nil - } func receiveResponse(r net.Conn) { @@ -195,8 +194,8 @@ func createScanner(r io.Reader) *bufio.Scanner { if len(data) >= 4 { var length int32 binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length) - if int(length) <= len(data) { - return int(length), data[:length], nil + if int(length)+4 <= len(data) { + return int(length) + 4, data[4 : length+4], nil } } } diff --git a/remote/codec.go b/remote/codec.go index d556cfb..8e499fe 100644 --- a/remote/codec.go +++ b/remote/codec.go @@ -37,14 +37,13 @@ const ( ResponseType = 1 _Flag = 0 - _LanguageFlag = "golang" _LanguageCode = byte(9) _Version = 137 ) type RemotingCommand struct { Code int16 `json:"code"` - Language byte `json:"language"` + Language byte `json:"-"` Version int16 `json:"version"` Opaque int32 `json:"opaque"` Flag int32 `json:"flag"` @@ -59,11 +58,10 @@ type CustomHeader interface { func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingCommand { cmd := &RemotingCommand{ - Code: code, - Language: _LanguageCode, - Version: _Version, - Opaque: atomic.AddInt32(&opaque, 1), - Body: body, + Code: code, + Version: _Version, + Opaque: atomic.AddInt32(&opaque, 1), + Body: body, } if header != nil { @@ -73,6 +71,11 @@ func NewRemotingCommand(code int16, header CustomHeader, body []byte) *RemotingC return cmd } +func (command *RemotingCommand) String() string { + return fmt.Sprintf("Code: %d, Opaque: %d, Remark: %s, ExtFields: %v", + command.Code, command.Opaque, command.Remark, command.ExtFields) +} + func (command *RemotingCommand) isResponseType() bool { return command.Flag&(ResponseType) == ResponseType } @@ -112,7 +115,7 @@ func encode(command *RemotingCommand) ([]byte, error) { return nil, err } - frameSize := 8 + len(header) + len(command.Body) + frameSize := 4 + len(header) + len(command.Body) buf := bytes.NewBuffer(make([]byte, frameSize)) buf.Reset() @@ -141,18 +144,14 @@ func encode(command *RemotingCommand) ([]byte, error) { func decode(data []byte) (*RemotingCommand, error) { buf := bytes.NewBuffer(data) - var length int32 - err := binary.Read(buf, binary.BigEndian, &length) - if err != nil { - return nil, err - } + length := int32(len(data)) var oriHeaderLen int32 - err = binary.Read(buf, binary.BigEndian, &oriHeaderLen) + err := binary.Read(buf, binary.BigEndian, &oriHeaderLen) if err != nil { return nil, err } - headerLength := oriHeaderLen & 0xFFFFFF + headerLength := oriHeaderLen & 0xFFFFFF headerData := make([]byte, headerLength) err = binary.Read(buf, binary.BigEndian, &headerData) if err != nil { @@ -160,7 +159,6 @@ func decode(data []byte) (*RemotingCommand, error) { } var command *RemotingCommand - switch codeType := byte((oriHeaderLen >> 24) & 0xFF); codeType { case JsonCodecs: command, err = jsonSerializer.decodeHeader(headerData) @@ -173,13 +171,15 @@ func decode(data []byte) (*RemotingCommand, error) { return nil, err } - bodyLength := length - 8 - headerLength - bodyData := make([]byte, bodyLength) - err = binary.Read(buf, binary.BigEndian, &bodyData) - if err != nil { - return nil, err + bodyLength := length - 4 - headerLength + if bodyLength > 0 { + bodyData := make([]byte, bodyLength) + err = binary.Read(buf, binary.BigEndian, &bodyData) + if err != nil { + return nil, err + } + command.Body = bodyData } - command.Body = bodyData return command, nil } diff --git a/remote/codec_test.go b/remote/codec_test.go index ab1a61e..e48b0bb 100644 --- a/remote/codec_test.go +++ b/remote/codec_test.go @@ -23,7 +23,6 @@ import ( ) type testHeader struct { - } func (t testHeader) Encode() map[string]string { diff --git a/utils/log.go b/rlog/log.go similarity index 54% rename from utils/log.go rename to rlog/log.go index d684efd..c73c9ba 100644 --- a/utils/log.go +++ b/rlog/log.go @@ -15,28 +15,77 @@ * limitations under the License. */ -package utils +package rlog -import "io" - -var RLog Logger +import ( + "github.com/sirupsen/logrus" +) type Logger interface { - Output() io.Writer - SetOutput(w io.Writer) - Prefix() string - SetPrefix(p string) - SetHeader(h string) Print(i ...interface{}) Printf(format string, args ...interface{}) Debug(i ...interface{}) Debugf(format string, args ...interface{}) Info(i ...interface{}) Infof(format string, args ...interface{}) - Warning(i ...interface{}) - Warningf(format string, args ...interface{}) + Warn(i ...interface{}) + Warnf(format string, args ...interface{}) Error(i ...interface{}) Errorf(format string, args ...interface{}) Fatal(i ...interface{}) Fatalf(format string, args ...interface{}) } + +var rLog Logger = logrus.New() + +func SetLogger(log Logger) { + rLog = log +} + +func Print(i ...interface{}) { + rLog.Print(i...) +} + +func Printf(format string, args ...interface{}) { + rLog.Printf(format, args...) +} + +func Debug(i ...interface{}) { + rLog.Debug(i...) +} + +func Debugf(format string, args ...interface{}) { + rLog.Debugf(format, args...) +} + +func Info(i ...interface{}) { + rLog.Info(i...) +} + +func Infof(format string, args ...interface{}) { + rLog.Infof(format, args...) +} + +func Warn(i ...interface{}) { + rLog.Warn(i...) +} + +func Warnf(format string, args ...interface{}) { + rLog.Warnf(format, args...) +} + +func Error(i ...interface{}) { + rLog.Error(i...) +} + +func Errorf(format string, args ...interface{}) { + rLog.Errorf(format, args...) +} + +func Fatal(i ...interface{}) { + rLog.Fatal(i...) +} + +func Fatalf(format string, args ...interface{}) { + rLog.Fatalf(format, args...) +} diff --git a/utils/helper.go b/utils/helper.go index bc18c26..485ae15 100644 --- a/utils/helper.go +++ b/utils/helper.go @@ -33,7 +33,7 @@ var ( counter int16 = 0 startTimestamp int64 = 0 nextTimestamp int64 = 0 - prefix string + prefix string locker sync.Mutex ) @@ -79,7 +79,7 @@ func clientIP4() ([]byte, error) { } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { - if ip4 := ipnet.IP.To4(); ip4!=nil{ + if ip4 := ipnet.IP.To4(); ip4 != nil { return ip4, nil } } @@ -87,7 +87,9 @@ func clientIP4() ([]byte, error) { return nil, errors.New("unknown IP address") } - +func GetAddressByBytes(data []byte) string { + return "127.0.0.1" +} func Pid() int16 { return int16(os.Getpid()) @@ -104,3 +106,7 @@ func HashString(s string) int { } return int(crc32.ChecksumIEEE([]byte(s))) } + +func UnCompress(data []byte) []byte { + return data +} diff --git a/utils/helper_test.go b/utils/helper_test.go index 113e92c..4e1877c 100644 --- a/utils/helper_test.go +++ b/utils/helper_test.go @@ -28,14 +28,13 @@ func TestLocalIP(t *testing.T) { ip := LocalIP() if ip[0] == 0 && ip[1] == 0 && ip[2] == 0 && ip[3] == 0 { t.Errorf("failed to get host public ip4 address") - }else{ + } else { t.Logf("ip4 address: %v", ip) } } - func BenchmarkMessageClientID(b *testing.B) { - for i:= 0; i< b.N;i++{ - MessageClientID() + for i := 0; i < b.N; i++ { + MessageClientID() } -} \ No newline at end of file +} diff --git a/utils/ring_buffer.go b/utils/ring_buffer.go index 9400f53..cab385a 100644 --- a/utils/ring_buffer.go +++ b/utils/ring_buffer.go @@ -19,8 +19,8 @@ package utils import ( "runtime" - "time" "sync/atomic" + "time" ) // 1.需要能够动态扩容 @@ -30,14 +30,14 @@ import ( type RingNodesBuffer struct { writePos uint64 readPos uint64 - mask uint64 + mask uint64 nodes nodes } type node struct { position uint64 - buf []byte + buf []byte } type nodes []*node @@ -106,10 +106,10 @@ L: // 直接返回数据 func (r *RingNodesBuffer) Read(timeout time.Duration) (data []byte, err error) { var ( - node *node + node *node pos = atomic.LoadUint64(&r.readPos) start time.Time - dif uint64 + dif uint64 ) if timeout > 0 { start = time.Now() @@ -143,12 +143,12 @@ L: } // 知道大小,传进去解析 -func (r *RingNodesBuffer) ReadBySize(data []byte,timeout time.Duration) (n int, err error) { +func (r *RingNodesBuffer) ReadBySize(data []byte, timeout time.Duration) (n int, err error) { var ( - node *node + node *node pos = atomic.LoadUint64(&r.readPos) start time.Time - dif uint64 + dif uint64 ) i := 0 if timeout > 0 { @@ -176,12 +176,11 @@ L: i++ } } - n = copy(data,node.buf) + n = copy(data, node.buf) atomic.StoreUint64(&node.position, pos+r.mask+1) return } - func (r *RingNodesBuffer) Size() uint64 { return atomic.LoadUint64(&r.writePos) - atomic.LoadUint64(&r.readPos) diff --git a/utils/ring_buffer_test.go b/utils/ring_buffer_test.go index a119f21..2d54a85 100644 --- a/utils/ring_buffer_test.go +++ b/utils/ring_buffer_test.go @@ -18,15 +18,15 @@ package utils import ( - "time" - "testing" + "fmt" "github.com/stretchr/testify/assert" - "sync" "strconv" - "fmt" + "sync" + "testing" + "time" ) -func TestRingRead(t *testing.T) { +func TestRingRead(t *testing.T) { rb := NewRingNodesBuffer(5) assert.Equal(t, uint64(8), rb.Cap()) @@ -34,7 +34,7 @@ func TestRingRead(t *testing.T) { if !assert.Nil(t, err) { return } - data, err := rb.Read(1*time.Second) + data, err := rb.Read(1 * time.Second) if !assert.Nil(t, err) { return } @@ -42,8 +42,7 @@ func TestRingRead(t *testing.T) { assert.Equal(t, "hello", string(data)) } - -func TestRingReadBySize(t *testing.T) { +func TestRingReadBySize(t *testing.T) { rb := NewRingNodesBuffer(5) assert.Equal(t, uint64(8), rb.Cap()) @@ -52,7 +51,7 @@ func TestRingReadBySize(t *testing.T) { return } sink := make([]byte, 5) - n, err := rb.ReadBySize(sink,1*time.Second) + n, err := rb.ReadBySize(sink, 1*time.Second) if !assert.Nil(t, err) { return } @@ -68,7 +67,6 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < 100; i++ { go func() { for i := 0; i < b.N; i++ { @@ -82,8 +80,8 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) { for i := 0; i < b.N; i++ { _ = len(strconv.Itoa(i)) var p []byte - p,_ = q.Read(1*time.Second) - fmt.Sprintf("%v",p) + p, _ = q.Read(1 * time.Second) + fmt.Sprintf("%v", p) } wg.Done() @@ -93,8 +91,6 @@ func BenchmarkRingReadBufferMPMC(b *testing.B) { wg.Wait() } - - func BenchmarkRingBySizeBufferMPMC(b *testing.B) { q := NewRingNodesBuffer(uint64(b.N * 100)) var wg sync.WaitGroup @@ -102,7 +98,6 @@ func BenchmarkRingBySizeBufferMPMC(b *testing.B) { b.ResetTimer() b.ReportAllocs() - for i := 0; i < 100; i++ { go func() { for i := 0; i < b.N; i++ { @@ -114,13 +109,13 @@ func BenchmarkRingBySizeBufferMPMC(b *testing.B) { for i := 0; i < 100; i++ { go func() { for i := 0; i < b.N; i++ { - p := make([]byte,len(strconv.Itoa(i))) - q.ReadBySize(p,1*time.Second) - fmt.Sprintf("%v",p) + p := make([]byte, len(strconv.Itoa(i))) + q.ReadBySize(p, 1*time.Second) + fmt.Sprintf("%v", p) } wg.Done() }() } wg.Wait() -} \ No newline at end of file +}