Repository: incubator-rocketmq-externals
Updated Branches:
refs/heads/go-client-develop c98a770a6 -> 28b98b096
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/pull_message_controller.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/pull_message_controller.go b/rocketmq-go/pull_message_controller.go
new file mode 100644
index 0000000..320cc31
--- /dev/null
+++ b/rocketmq-go/pull_message_controller.go
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import (
+ "bytes"
+ "compress/zlib"
+ "encoding/binary"
+ "fmt"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "github.com/golang/glog"
+ "io/ioutil"
+ "strconv"
+ "time"
+)
+
+type PullMessageController struct {
+ mqClient service.RocketMqClient
+ clientFactory *ClientFactory
+}
+
+func NewPullMessageController(mqClient service.RocketMqClient, clientFactory *ClientFactory) *PullMessageController {
+ return &PullMessageController{
+ mqClient: mqClient,
+ clientFactory: clientFactory,
+ }
+}
+
+func (self *PullMessageController) Start() {
+ go func() {
+ for {
+ pullRequest := self.mqClient.DequeuePullMessageRequest()
+ self.pullMessage(pullRequest)
+ }
+ }()
+}
+
+func (self *PullMessageController) needDelayPullMessage(mqPushConsumer *DefaultMQPushConsumer, pullRequest *model.PullRequest) (needDelayTime int64) {
+ if pullRequest.ProcessQueue.GetMsgCount() > mqPushConsumer.ConsumerConfig.PullThresholdForQueue {
+ return mqPushConsumer.ConsumerConfig.PullTimeDelayMillsWhenFlowControl
+ }
+ if pullRequest.ProcessQueue.GetMaxSpan() > mqPushConsumer.ConsumerConfig.ConsumeConcurrentlyMaxSpan {
+ return mqPushConsumer.ConsumerConfig.PullTimeDelayMillsWhenFlowControl
+ }
+ return
+}
+
+func (self *PullMessageController) pullMessageLater(pullRequest *model.PullRequest, millisecond int64) {
+ go func() {
+ timeoutTimer := time.NewTimer(time.Duration(millisecond) * time.Millisecond)
+ <-timeoutTimer.C
+ self.pullMessage(pullRequest)
+ }()
+ return
+}
+
+func (self *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
+ defaultMQPullConsumer := self.clientFactory.ConsumerTable[pullRequest.ConsumerGroup]
+ if pullRequest.ProcessQueue.IsDropped() {
+ return
+ }
+
+ //pullRequest.ProcessQueue.SetLastPullTimestamp(System.currentTimeMillis());
+ // state ok
+ // isPause
+
+ delayPullTime := self.needDelayPullMessage(defaultMQPullConsumer, pullRequest)
+ if delayPullTime > 0 {
+ self.pullMessageLater(pullRequest, delayPullTime)
+ return
+ }
+ commitOffsetValue := defaultMQPullConsumer.offsetStore.ReadOffset(pullRequest.MessageQueue, service.READ_FROM_MEMORY)
+
+ subscriptionData, ok := defaultMQPullConsumer.rebalance.SubscriptionInner[pullRequest.MessageQueue.Topic]
+ if !ok {
+ self.pullMessageLater(pullRequest, defaultMQPullConsumer.ConsumerConfig.PullTimeDelayMillsWhenException)
+ return
+ }
+
+ var sysFlag int32 = 0
+ if commitOffsetValue > 0 {
+ sysFlag |= constant.FLAG_COMMIT_OFFSET
+ }
+ sysFlag |= constant.FLAG_SUSPEND
+ sysFlag |= constant.FLAG_SUBSCRIPTION
+ requestHeader := new(header.PullMessageRequestHeader)
+ requestHeader.ConsumerGroup = pullRequest.ConsumerGroup
+ requestHeader.Topic = pullRequest.MessageQueue.Topic
+ requestHeader.QueueId = pullRequest.MessageQueue.QueueId
+ requestHeader.QueueOffset = pullRequest.NextOffset
+
+ requestHeader.CommitOffset = commitOffsetValue
+ requestHeader.SuspendTimeoutMillis = defaultMQPullConsumer.ConsumerConfig.BrokerSuspendMaxTimeMillis
+ requestHeader.MaxMsgNums = int32(defaultMQPullConsumer.ConsumerConfig.PullBatchSize)
+ requestHeader.SubVersion = subscriptionData.SubVersion
+ requestHeader.Subscription = subscriptionData.SubString
+
+ requestHeader.SysFlag = sysFlag
+
+ pullCallback := func(responseFuture *remoting.ResponseFuture) {
+ var nextBeginOffset int64 = pullRequest.NextOffset
+
+ if responseFuture != nil {
+ responseCommand := responseFuture.ResponseCommand
+ if responseCommand.Code == remoting.SUCCESS && len(responseCommand.Body) > 0 {
+ //FOUND
+ var err error
+ pullResult := responseCommand.ExtFields
+ if ok {
+ if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
+ if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
+ nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
+ if err != nil {
+ glog.Error(err)
+ return
+ }
+ }
+ }
+ }
+ msgs := DecodeMessage(responseFuture.ResponseCommand.Body)
+
+ msgs = FilterMessageAgainByTags(msgs, defaultMQPullConsumer.subscriptionTag[pullRequest.MessageQueue.Topic])
+ if len(msgs) == 0 {
+ if pullRequest.ProcessQueue.GetMsgCount() == 0 {
+ defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
+ }
+ }
+ //
+ pullRequest.ProcessQueue.PutMessage(msgs)
+ defaultMQPullConsumer.consumeMessageService.SubmitConsumeRequest(msgs, pullRequest.ProcessQueue, pullRequest.MessageQueue, true)
+ } else {
+ //glog.Error(fmt.Sprintf("pull message error,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body)))
+ var err error // change the offset , use nextBeginOffset
+ pullResult := responseCommand.ExtFields
+ if ok {
+ if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
+ if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
+ nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
+ if err != nil {
+ glog.Error(err)
+ }
+ }
+ }
+ }
+ if responseCommand.Code == remoting.PULL_NOT_FOUND || responseCommand.Code == remoting.PULL_RETRY_IMMEDIATELY {
+ //NO_NEW_MSG //NO_MATCHED_MSG
+ if pullRequest.ProcessQueue.GetMsgCount() == 0 {
+ defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
+ }
+ //update offset increase only
+ //failedPullRequest, _ := json.Marshal(pullRequest)
+ //glog.Error("the pull request offset illegal", string(failedPullRequest))
+ } else if responseCommand.Code == remoting.PULL_OFFSET_MOVED {
+ //OFFSET_ILLEGAL
+ glog.Error(fmt.Sprintf("PULL_OFFSET_MOVED,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body)))
+ pullRequest.ProcessQueue.SetDrop(true)
+ go func() {
+ executeTaskLater := time.NewTimer(10 * time.Second)
+ <-executeTaskLater.C
+ defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, false)
+ defaultMQPullConsumer.rebalance.RemoveProcessQueue(pullRequest.MessageQueue)
+ }()
+ } else {
+ glog.Errorf("illegal response code. pull message error,code=%d,request=%v OFFSET_ILLEGAL", responseCommand.Code, requestHeader)
+ glog.Error(pullRequest.MessageQueue)
+ time.Sleep(1 * time.Second)
+ }
+ }
+ } else {
+ glog.Error("responseFuture is nil")
+ }
+
+ if pullRequest.ProcessQueue.IsDropped() {
+ return
+ }
+ nextPullRequest := &model.PullRequest{
+ ConsumerGroup: pullRequest.ConsumerGroup,
+ NextOffset: nextBeginOffset,
+ MessageQueue: pullRequest.MessageQueue,
+ ProcessQueue: pullRequest.ProcessQueue,
+ }
+ if defaultMQPullConsumer.ConsumerConfig.PullInterval > 0 {
+ go func() {
+ nextPullTime := time.NewTimer(time.Duration(defaultMQPullConsumer.ConsumerConfig.PullInterval) * time.Millisecond)
+ <-nextPullTime.C
+ self.mqClient.EnqueuePullMessageRequest(nextPullRequest)
+ }()
+ } else {
+ self.mqClient.EnqueuePullMessageRequest(nextPullRequest)
+ }
+ }
+ glog.V(2).Infof("requestHeader look offset %s %s %s %s", requestHeader.QueueOffset, requestHeader.Topic, requestHeader.QueueId, requestHeader.CommitOffset)
+ self.consumerPullMessageAsync(pullRequest.MessageQueue.BrokerName, requestHeader, pullCallback)
+}
+func FilterMessageAgainByTags(msgExts []model.MessageExt, subscriptionTagList []string) (result []model.MessageExt) {
+ result = msgExts
+ if len(subscriptionTagList) == 0 {
+ return
+ }
+ result = []model.MessageExt{}
+ for _, msg := range msgExts {
+ for _, tag := range subscriptionTagList {
+ if tag == msg.GetTag() {
+ result = append(result, msg)
+ break
+ }
+ }
+ }
+ return
+}
+
+func (self *PullMessageController) consumerPullMessageAsync(brokerName string, requestHeader remoting.CustomerHeader, invokeCallback remoting.InvokeCallback) {
+ brokerAddr, _, found := self.mqClient.FindBrokerAddressInSubscribe(brokerName, 0, false)
+ if found {
+ remotingCommand := remoting.NewRemotingCommand(remoting.PULL_MESSAGE, requestHeader)
+ self.mqClient.GetRemotingClient().InvokeAsync(brokerAddr, remotingCommand, 1000, invokeCallback)
+ }
+}
+
+func DecodeMessage(data []byte) []model.MessageExt {
+ buf := bytes.NewBuffer(data)
+ var storeSize, magicCode, bodyCRC, queueId, flag, sysFlag, reconsumeTimes, bodyLength, bornPort, storePort int32
+ var queueOffset, physicOffset, preparedTransactionOffset, bornTimeStamp, storeTimestamp int64
+ var topicLen byte
+ var topic, body, properties, bornHost, storeHost []byte
+ var propertiesLength int16
+
+ var propertiesmap = make(map[string]string)
+
+ msgs := []model.MessageExt{}
+ for buf.Len() > 0 {
+ msg := model.MessageExt{Message: &model.Message{}}
+ binary.Read(buf, binary.BigEndian, &storeSize)
+ binary.Read(buf, binary.BigEndian, &magicCode)
+ binary.Read(buf, binary.BigEndian, &bodyCRC)
+ binary.Read(buf, binary.BigEndian, &queueId)
+ binary.Read(buf, binary.BigEndian, &flag)
+ binary.Read(buf, binary.BigEndian, &queueOffset)
+ binary.Read(buf, binary.BigEndian, &physicOffset)
+ binary.Read(buf, binary.BigEndian, &sysFlag)
+ binary.Read(buf, binary.BigEndian, &bornTimeStamp)
+ bornHost = make([]byte, 4)
+ binary.Read(buf, binary.BigEndian, &bornHost)
+ binary.Read(buf, binary.BigEndian, &bornPort)
+ binary.Read(buf, binary.BigEndian, &storeTimestamp)
+ storeHost = make([]byte, 4)
+ binary.Read(buf, binary.BigEndian, &storeHost)
+ binary.Read(buf, binary.BigEndian, &storePort)
+ binary.Read(buf, binary.BigEndian, &reconsumeTimes)
+ binary.Read(buf, binary.BigEndian, &preparedTransactionOffset)
+ binary.Read(buf, binary.BigEndian, &bodyLength)
+ if bodyLength > 0 {
+ body = make([]byte, bodyLength)
+ binary.Read(buf, binary.BigEndian, body)
+ if (sysFlag & constant.CompressedFlag) == constant.CompressedFlag {
+ b := bytes.NewReader(body)
+ z, err := zlib.NewReader(b)
+ if err != nil {
+ glog.Error(err)
+ return nil
+ }
+ body, err = ioutil.ReadAll(z)
+ z.Close()
+ if err != nil {
+ glog.Error(err)
+ return nil
+ }
+ }
+ }
+ binary.Read(buf, binary.BigEndian, &topicLen)
+ topic = make([]byte, int(topicLen))
+ binary.Read(buf, binary.BigEndian, &topic)
+ binary.Read(buf, binary.BigEndian, &propertiesLength)
+ if propertiesLength > 0 {
+ properties = make([]byte, propertiesLength)
+ binary.Read(buf, binary.BigEndian, &properties)
+ propertiesmap = util.String2MessageProperties(string(properties))
+ }
+
+ if magicCode != -626843481 {
+ glog.Errorf("magic code is error %d", magicCode)
+ return nil
+ }
+
+ msg.Topic = string(topic)
+ msg.QueueId = queueId
+ msg.SysFlag = sysFlag
+ msg.QueueOffset = queueOffset
+ msg.BodyCRC = bodyCRC
+ msg.StoreSize = storeSize
+ msg.BornTimestamp = bornTimeStamp
+ msg.ReconsumeTimes = reconsumeTimes
+ msg.Flag = int(flag)
+ msg.CommitLogOffset = physicOffset
+ msg.StoreTimestamp = storeTimestamp
+ msg.PreparedTransactionOffset = preparedTransactionOffset
+ msg.Body = body
+ msg.Properties = propertiesmap
+
+ // < 3.5.8 use messageOffsetId
+ // >= 3.5.8 use clientUniqMsgId
+ msg.MsgId = msg.GetMsgUniqueKey()
+ if len(msg.MsgId) == 0 {
+ msg.MsgId = util.GeneratorMessageOffsetId(storeHost, storePort, msg.CommitLogOffset)
+ }
+ msgs = append(msgs, msg)
+ }
+
+ return msgs
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/rebalance_controller.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/rebalance_controller.go b/rocketmq-go/rebalance_controller.go
new file mode 100644
index 0000000..d6d4001
--- /dev/null
+++ b/rocketmq-go/rebalance_controller.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+type RebalanceController struct {
+ clientFactory *ClientFactory
+}
+
+func NewRebalanceController(clientFactory *ClientFactory) *RebalanceController {
+ return &RebalanceController{
+ clientFactory: clientFactory,
+ }
+}
+
+func (self *RebalanceController) doRebalance() {
+ for _, consumer := range self.clientFactory.ConsumerTable {
+ consumer.rebalance.DoRebalance()
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_averagely.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_averagely.go b/rocketmq-go/service/allocate_message/allocate_message_averagely.go
new file mode 100644
index 0000000..cdfe775
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_averagely.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service_allocate_message
+
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type AllocateMessageQueueAveragely struct{}
+
+func (self *AllocateMessageQueueAveragely) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+
+ if currentCID == "" {
+ return nil, errors.New("currentCID is empty")
+ }
+
+ if mqAll == nil || len(mqAll) == 0 {
+ return nil, errors.New("mqAll is nil or mqAll empty")
+ }
+
+ if cidAll == nil || len(cidAll) == 0 {
+ return nil, errors.New("cidAll is nil or cidAll empty")
+ }
+
+ result := make([]model.MessageQueue, 0)
+ for i, cid := range cidAll {
+ if cid == currentCID {
+ mqLen := len(mqAll)
+ cidLen := len(cidAll)
+ mod := mqLen % cidLen
+ var averageSize int
+ if mqLen < cidLen {
+ averageSize = 1
+ } else {
+ if mod > 0 && i < mod {
+ averageSize = mqLen/cidLen + 1
+ } else {
+ averageSize = mqLen / cidLen
+ }
+ }
+
+ var startIndex int
+ if mod > 0 && i < mod {
+ startIndex = i * averageSize
+ } else {
+ startIndex = i*averageSize + mod
+ }
+
+ var min int
+ if averageSize > mqLen-startIndex {
+ min = mqLen - startIndex
+ } else {
+ min = averageSize
+ }
+
+ for j := 0; j < min; j++ {
+ result = append(result, *mqAll[(startIndex+j)%mqLen])
+ }
+ return result, nil
+
+ }
+ }
+
+ return nil, errors.New("cant't find currentCID")
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go b/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go
new file mode 100644
index 0000000..cdfd668
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service_allocate_message
+
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type AllocateMessageQueueAveragelyByCircle struct{}
+
+func (self *AllocateMessageQueueAveragelyByCircle) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+ if currentCID == "" {
+ return nil, errors.New("currentCID is empty")
+ }
+
+ if mqAll == nil || len(mqAll) == 0 {
+ return nil, errors.New("mqAll is nil or mqAll empty")
+ }
+
+ if cidAll == nil || len(cidAll) == 0 {
+ return nil, errors.New("cidAll is nil or cidAll empty")
+ }
+
+ result := make([]model.MessageQueue, 0)
+ for i, cid := range cidAll {
+ if cid == currentCID {
+ mqLen := len(mqAll)
+ cidLen := len(cidAll)
+ mod := mqLen % cidLen
+ var averageSize int
+ if mqLen < cidLen {
+ averageSize = 1
+ } else {
+ if mod > 0 && i < mod {
+ averageSize = mqLen/cidLen + 1
+ } else {
+ averageSize = mqLen / cidLen
+ }
+ }
+
+ var startIndex int
+ if mod > 0 && i < mod {
+ startIndex = i * averageSize
+ } else {
+ startIndex = i*averageSize + mod
+ }
+
+ var min int
+ if averageSize > mqLen-startIndex {
+ min = mqLen - startIndex
+ } else {
+ min = averageSize
+ }
+
+ for j := 0; j < min; j++ {
+ result = append(result, *mqAll[(startIndex+j)%mqLen])
+ }
+ return result, nil
+
+ }
+ }
+
+ return nil, errors.New("cant't find currentCID")
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_by_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_by_config.go b/rocketmq-go/service/allocate_message/allocate_message_by_config.go
new file mode 100644
index 0000000..2046ffd
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_by_config.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service_allocate_message
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+
+type AllocateMessageQueueByConfig struct {
+ messageQueueList []model.MessageQueue
+}
+
+func (self *AllocateMessageQueueByConfig) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+ return self.messageQueueList, nil
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go b/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go
new file mode 100644
index 0000000..6fe1cbb
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service_allocate_message
+
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type AllocateMessageQueueByMachineRoom struct {
+}
+
+func (self *AllocateMessageQueueByMachineRoom) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+ if currentCID == "" {
+ return nil, errors.New("currentCID is empty")
+ }
+
+ if mqAll == nil || len(mqAll) == 0 {
+ return nil, errors.New("mqAll is nil or mqAll empty")
+ }
+
+ if cidAll == nil || len(cidAll) == 0 {
+ return nil, errors.New("cidAll is nil or cidAll empty")
+ }
+
+ result := make([]model.MessageQueue, 0)
+ for i, cid := range cidAll {
+ if cid == currentCID {
+ mqLen := len(mqAll)
+ cidLen := len(cidAll)
+ mod := mqLen % cidLen
+ var averageSize int
+ if mqLen < cidLen {
+ averageSize = 1
+ } else {
+ if mod > 0 && i < mod {
+ averageSize = mqLen/cidLen + 1
+ } else {
+ averageSize = mqLen / cidLen
+ }
+ }
+
+ var startIndex int
+ if mod > 0 && i < mod {
+ startIndex = i * averageSize
+ } else {
+ startIndex = i*averageSize + mod
+ }
+
+ var min int
+ if averageSize > mqLen-startIndex {
+ min = mqLen - startIndex
+ } else {
+ min = averageSize
+ }
+
+ for j := 0; j < min; j++ {
+ result = append(result, *mqAll[(startIndex+j)%mqLen])
+ }
+ return result, nil
+
+ }
+ }
+
+ return nil, errors.New("cant't find currentCID")
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go b/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go
new file mode 100644
index 0000000..e838c7b
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service_allocate_message
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+
+type AllocateMessageQueueStrategy interface {
+ Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error)
+}
+
+func GetAllocateMessageQueueStrategyByConfig(allocateMessageQueueStrategy string) AllocateMessageQueueStrategy {
+ return new(AllocateMessageQueueAveragely)
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/consume_message_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_message_service.go b/rocketmq-go/service/consume_message_service.go
index 09be61c..e4a08df 100644
--- a/rocketmq-go/service/consume_message_service.go
+++ b/rocketmq-go/service/consume_message_service.go
@@ -35,22 +35,22 @@ type ConsumeMessageService interface {
}
type ConsumeMessageConcurrentlyServiceImpl struct {
- consumerGroup string
- messageListener model.MessageListener
- //sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
- offsetStore OffsetStore
- consumerConfig *config.RocketMqConsumerConfig
+ consumerGroup string
+ messageListener model.MessageListener
+ sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
+ offsetStore OffsetStore
+ consumerConfig *config.RocketMqConsumerConfig
}
func NewConsumeMessageConcurrentlyServiceImpl(messageListener model.MessageListener) (consumeService ConsumeMessageService) {
- //consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener:messageListener, sendMessageBackProducerService:&SendMessageBackProducerServiceImpl{}}
+ consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener: messageListener, sendMessageBackProducerService: &SendMessageBackProducerServiceImpl{}}
return
}
func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
self.consumerGroup = consumerGroup
self.offsetStore = offsetStore
- //self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient,defaultProducerService,consumerConfig)
+ self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient, defaultProducerService, consumerConfig)
self.consumerConfig = consumerConfig
}
@@ -74,7 +74,7 @@ func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []m
}
func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
- //err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
+ err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
return
}
@@ -128,10 +128,10 @@ func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result m
if len(failedMessages) > 0 {
self.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true)
}
- //commitOffset := processQueue.RemoveMessage(successMessages)
- //if (commitOffset > 0 && ! processQueue.IsDropped()) {
- // self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
- //}
+ commitOffset := processQueue.RemoveMessage(successMessages)
+ if commitOffset > 0 && !processQueue.IsDropped() {
+ self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/mq_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/mq_client.go b/rocketmq-go/service/mq_client.go
index 8bbfe79..366aa3f 100644
--- a/rocketmq-go/service/mq_client.go
+++ b/rocketmq-go/service/mq_client.go
@@ -59,10 +59,9 @@ type RocketMqClient interface {
var DEFAULT_TIMEOUT int64 = 6000
-// common
type MqClientImpl struct {
ClientId string
- remotingClient remoting.RemotingClient
+ remotingClient *remoting.DefalutRemotingClient
TopicRouteTable util.ConcurrentMap // map[string]*model.TopicRouteData //topic | topicRoteData
BrokerAddrTable util.ConcurrentMap //map[string]map[int]string //brokerName | map[brokerId]address
TopicPublishInfoTable util.ConcurrentMap //map[string]*model.TopicPublishInfo //topic | TopicPublishInfo //all use this
@@ -134,7 +133,7 @@ func (self *MqClientImpl) GetPublishTopicList() []string {
}
return publishTopicList
}
-func (self *MqClientImpl) GetRemotingClient() remoting.RemotingClient {
+func (self *MqClientImpl) GetRemotingClient() *remoting.DefalutRemotingClient {
return self.remotingClient
}
@@ -147,7 +146,7 @@ func (self *MqClientImpl) DequeuePullMessageRequest() (pullRequest *model.PullRe
}
func (self *MqClientImpl) ClearExpireResponse() {
- //self.remotingClient.ClearExpireResponse()
+ self.remotingClient.ClearExpireResponse()
}
func (self *MqClientImpl) FetchMasterBrokerAddress(brokerName string) (masterAddress string) {
@@ -199,10 +198,9 @@ func (self MqClientImpl) GetTopicRouteInfoFromNameServer(topic string, timeoutMi
return nil, err
}
if response.Code == remoting.SUCCESS {
- //todo it's dirty
topicRouteData := new(model.TopicRouteData)
bodyjson := strings.Replace(string(response.Body), ",0:", ",\"0\":", -1)
- bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) // fastJson的key没有引号 需要通用的方法
+ bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) // fastJson key is string todo todo
bodyjson = strings.Replace(bodyjson, "{0:", "{\"0\":", -1)
bodyjson = strings.Replace(bodyjson, "{1:", "{\"1\":", -1)
err = json.Unmarshal([]byte(bodyjson), topicRouteData)
@@ -291,7 +289,7 @@ func (self MqClientImpl) updateTopicRouteInfoLocal(topic string, topicRouteData
//update pubInfo for each
topicPublishInfo := model.BuildTopicPublishInfoFromTopicRoteData(topic, topicRouteData)
- self.TopicPublishInfoTable.Set(topic, topicPublishInfo)
+ self.TopicPublishInfoTable.Set(topic, topicPublishInfo) // todo
mqList := model.BuildTopicSubscribeInfoFromRoteData(topic, topicRouteData)
self.TopicSubscribeInfoTable.Set(topic, mqList)
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/mq_fault_strategy.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/mq_fault_strategy.go b/rocketmq-go/service/mq_fault_strategy.go
new file mode 100644
index 0000000..852ab98
--- /dev/null
+++ b/rocketmq-go/service/mq_fault_strategy.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service
+
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type MQFaultStrategy struct {
+}
+
+//if first select : random one
+//if has error broker before ,skip the err broker
+func selectOneMessageQueue(topicPublishInfo *model.TopicPublishInfo, lastFailedBroker string) (mqQueue model.MessageQueue, err error) {
+ queueIndex := topicPublishInfo.FetchQueueIndex()
+ queues := topicPublishInfo.MessageQueueList
+ if len(lastFailedBroker) == 0 {
+ mqQueue = queues[queueIndex]
+ return
+ }
+ for i := 0; i < len(queues); i++ {
+ nowQueueIndex := queueIndex + i
+ if nowQueueIndex >= len(queues) {
+ nowQueueIndex = nowQueueIndex - len(queues)
+ }
+ if lastFailedBroker == queues[nowQueueIndex].BrokerName {
+ continue
+ }
+ mqQueue = queues[nowQueueIndex]
+ return
+ }
+ err = errors.New("send to [" + lastFailedBroker + "] fail,no other broker")
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/offset_store.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/offset_store.go b/rocketmq-go/service/offset_store.go
new file mode 100644
index 0000000..0bfe640
--- /dev/null
+++ b/rocketmq-go/service/offset_store.go
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service
+
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+ "github.com/golang/glog"
+ "strconv"
+ "sync"
+)
+
+const (
+ MEMORY_FIRST_THEN_STORE = 0
+ READ_FROM_MEMORY = 1
+ READ_FROM_STORE = 2
+)
+
+type OffsetStore interface {
+ UpdateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool)
+ ReadOffset(mq *model.MessageQueue, readType int) int64
+ Persist(mq *model.MessageQueue)
+ RemoveOffset(mq *model.MessageQueue)
+}
+type RemoteOffsetStore struct {
+ groupName string
+ mqClient RocketMqClient
+ offsetTable map[model.MessageQueue]int64
+ offsetTableLock sync.RWMutex
+}
+
+func RemoteOffsetStoreInit(groupName string, mqClient RocketMqClient) OffsetStore {
+ offsetStore := new(RemoteOffsetStore)
+ offsetStore.groupName = groupName
+ offsetStore.mqClient = mqClient
+ offsetStore.offsetTable = make(map[model.MessageQueue]int64)
+ return offsetStore
+}
+func (self *RemoteOffsetStore) RemoveOffset(mq *model.MessageQueue) {
+ defer self.offsetTableLock.Unlock()
+ self.offsetTableLock.Lock()
+ delete(self.offsetTable, *mq)
+}
+
+func (self *RemoteOffsetStore) Persist(mq *model.MessageQueue) {
+ brokerAddr := self.mqClient.FetchMasterBrokerAddress(mq.BrokerName)
+ if len(brokerAddr) == 0 {
+ self.mqClient.TryToFindTopicPublishInfo(mq.Topic)
+ brokerAddr = self.mqClient.FetchMasterBrokerAddress(mq.BrokerName)
+ }
+ self.offsetTableLock.RLock()
+ offset := self.offsetTable[*mq]
+ self.offsetTableLock.RUnlock()
+ updateConsumerOffsetRequestHeader := &header.UpdateConsumerOffsetRequestHeader{ConsumerGroup: self.groupName, Topic: mq.Topic, QueueId: mq.QueueId, CommitOffset: offset}
+ requestCommand := remoting.NewRemotingCommand(remoting.UPDATE_CONSUMER_OFFSET, updateConsumerOffsetRequestHeader)
+ self.mqClient.GetRemotingClient().InvokeOneWay(brokerAddr, requestCommand, 1000*5)
+}
+
+func (self *RemoteOffsetStore) ReadOffset(mq *model.MessageQueue, readType int) int64 {
+
+ switch readType {
+ case MEMORY_FIRST_THEN_STORE:
+ case READ_FROM_MEMORY:
+ self.offsetTableLock.RLock()
+ offset, ok := self.offsetTable[*mq]
+ self.offsetTableLock.RUnlock()
+ if ok {
+ return offset
+ } else {
+ return -1
+ }
+ case READ_FROM_STORE:
+ offset, err := self.fetchConsumeOffsetFromBroker(mq)
+ if err != nil {
+ glog.Error(err)
+ return -1
+ }
+ glog.V(2).Info("READ_FROM_STORE", offset)
+ self.UpdateOffset(mq, offset, false)
+ return offset
+ }
+
+ return -1
+
+}
+
+func (self *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *model.MessageQueue) (int64, error) {
+ brokerAddr, _, found := self.mqClient.FindBrokerAddressInSubscribe(mq.BrokerName, 0, false)
+
+ if !found {
+ brokerAddr, _, found = self.mqClient.FindBrokerAddressInSubscribe(mq.BrokerName, 0, false)
+ }
+
+ if found {
+ requestHeader := &header.QueryConsumerOffsetRequestHeader{}
+ requestHeader.Topic = mq.Topic
+ requestHeader.QueueId = mq.QueueId
+ requestHeader.ConsumerGroup = self.groupName
+ return self.queryConsumerOffset(brokerAddr, requestHeader, 3000)
+ }
+
+ return -1, errors.New("fetch consumer offset error")
+}
+
+func (self RemoteOffsetStore) queryConsumerOffset(addr string, requestHeader *header.QueryConsumerOffsetRequestHeader, timeoutMillis int64) (int64, error) {
+ remotingCommand := remoting.NewRemotingCommand(remoting.QUERY_CONSUMER_OFFSET, requestHeader)
+ reponse, err := self.mqClient.GetRemotingClient().InvokeSync(addr, remotingCommand, timeoutMillis)
+ if err != nil {
+ glog.Error(err)
+ return -1, err
+ }
+
+ if reponse.Code == remoting.QUERY_NOT_FOUND {
+ return -1, nil
+ }
+
+ if offsetInter, ok := reponse.ExtFields["offset"]; ok {
+ if offsetStr, ok := offsetInter.(string); ok {
+ offset, err := strconv.ParseInt(offsetStr, 10, 64)
+ if err != nil {
+ glog.Error(err)
+ return -1, err
+ }
+ return offset, nil
+
+ }
+ }
+ glog.Error(requestHeader, reponse)
+ return -1, errors.New("query offset error")
+}
+
+func (self *RemoteOffsetStore) UpdateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool) {
+ defer self.offsetTableLock.Unlock()
+ self.offsetTableLock.Lock()
+ if mq != nil {
+ if increaseOnly {
+ offsetOld := self.offsetTable[*mq]
+ if offsetOld >= offset {
+ return
+ }
+ self.offsetTable[*mq] = offset
+ } else {
+ self.offsetTable[*mq] = offset
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/offset_store_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/offset_store_service.go b/rocketmq-go/service/offset_store_service.go
deleted file mode 100644
index 302b412..0000000
--- a/rocketmq-go/service/offset_store_service.go
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package service
-
-type OffsetStore struct {
- mqClient RocketMqClient
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/producer_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/producer_service.go b/rocketmq-go/service/producer_service.go
index a684b27..2f2a7b6 100644
--- a/rocketmq-go/service/producer_service.go
+++ b/rocketmq-go/service/producer_service.go
@@ -16,13 +16,227 @@
*/
package service
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+import (
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+ "github.com/golang/glog"
+ "time"
+)
type ProducerService interface {
+ CheckConfig() (err error)
+ SendDefaultImpl(message *model.Message, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error)
}
type DefaultProducerService struct {
- producerGroup string
- producerConfig *config.RocketMqProducerConfig
- mqClient RocketMqClient
+ producerGroup string
+ producerConfig *config.RocketMqProducerConfig
+ mqClient RocketMqClient
+ mqFaultStrategy MQFaultStrategy
+}
+
+func NewDefaultProducerService(producerGroup string, producerConfig *config.RocketMqProducerConfig, mqClient RocketMqClient) (defaultProducerService *DefaultProducerService) {
+ defaultProducerService = &DefaultProducerService{
+ mqClient: mqClient,
+ producerGroup: producerGroup,
+ producerConfig: producerConfig,
+ }
+ defaultProducerService.CheckConfig()
+ return
+}
+func (self *DefaultProducerService) CheckConfig() (err error) {
+ // todo check if not pass panic
+ return
+}
+
+func (self *DefaultProducerService) SendDefaultImpl(message *model.Message, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error) {
+ var (
+ topicPublishInfo *model.TopicPublishInfo
+ )
+ err = self.checkMessage(message)
+ if err != nil {
+ return
+ }
+ topicPublishInfo, err = self.mqClient.TryToFindTopicPublishInfo(message.Topic)
+ if err != nil {
+ return
+ }
+ if topicPublishInfo.JudgeTopicPublishInfoOk() == false {
+ err = errors.New("topicPublishInfo is error,topic=" + message.Topic)
+ return
+ }
+ glog.V(2).Info("op=look topicPublishInfo", topicPublishInfo)
+ //if(!ok) return error
+ sendResult, err = self.sendMsgUseTopicPublishInfo(message, communicationMode, sendCallback, topicPublishInfo, timeout)
+ return
+}
+
+func (self *DefaultProducerService) producerSendMessageRequest(brokerAddr string, sendMessageHeader remoting.CustomerHeader, message *model.Message, timeout int64) (sendResult *model.SendResult, err error) {
+ remotingCommand := remoting.NewRemotingCommandWithBody(remoting.SEND_MESSAGE, sendMessageHeader, message.Body)
+ var response *remoting.RemotingCommand
+ response, err = self.mqClient.GetRemotingClient().InvokeSync(brokerAddr, remotingCommand, timeout)
+ if err != nil {
+ glog.Error(err)
+ return
+ }
+ sendResult, err = processSendResponse(brokerAddr, message, response)
+ return
+}
+func processSendResponse(brokerName string, message *model.Message, response *remoting.RemotingCommand) (sendResult *model.SendResult, err error) {
+ sendResult = &model.SendResult{}
+ switch response.Code {
+ case remoting.FLUSH_DISK_TIMEOUT:
+ {
+ sendResult.SetSendStatus(model.FlushDiskTimeout)
+ break
+ }
+ case remoting.FLUSH_SLAVE_TIMEOUT:
+ {
+ sendResult.SetSendStatus(model.FlushSlaveTimeout)
+ break
+ }
+ case remoting.SLAVE_NOT_AVAILABLE:
+ {
+ sendResult.SetSendStatus(model.SlaveNotAvaliable)
+ break
+ }
+ case remoting.SUCCESS:
+ {
+ sendResult.SetSendStatus(model.SendOK)
+ break
+ }
+ default:
+ err = errors.New("response.Code error code")
+ return
+ }
+ var responseHeader = &header.SendMessageResponseHeader{}
+ if response.ExtFields != nil {
+ responseHeader.FromMap(response.ExtFields) //change map[string]interface{} into CustomerHeader struct
+ }
+ sendResult.SetMsgID(message.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX])
+ sendResult.SetOffsetMsgID(responseHeader.MsgId)
+ sendResult.SetQueueOffset(responseHeader.QueueOffset)
+ sendResult.SetTransactionID(responseHeader.TransactionId)
+ messageQueue := model.MessageQueue{Topic: message.Topic, BrokerName: brokerName,
+ QueueId: responseHeader.QueueId}
+ sendResult.SetMessageQueue(messageQueue)
+ var regionId = responseHeader.MsgRegion
+ if len(regionId) == 0 {
+ regionId = "DefaultRegion"
+ }
+ sendResult.SetRegionID(regionId)
+ return
+}
+
+func (self *DefaultProducerService) checkMessage(message *model.Message) (err error) {
+ if message == nil {
+ err = errors.New("message is nil")
+ return
+ }
+ if len(message.Topic) == 0 {
+ err = errors.New("topic is empty")
+ return
+ }
+ if message.Topic == constant.DEFAULT_TOPIC {
+ err = errors.New("the topic[" + message.Topic + "] is conflict with default topic.")
+ return
+ }
+
+ if len(message.Topic) > constant.MAX_MESSAGE_TOPIC_SIZE {
+ err = errors.New("the specified topic is longer than topic max length 255.")
+ return
+ }
+ //todo todo public static final String VALID_PATTERN_STR = "";
+
+ if !util.MatchString(message.Topic, `^[%|a-zA-Z0-9_-]+$`) {
+ err = errors.New("the specified topic[" + message.Topic + "] contains illegal characters")
+ return
+ }
+ if len(message.Body) == 0 {
+ err = errors.New("messageBody is empty")
+ return
+ }
+ if len(message.Body) > self.producerConfig.MaxMessageSize {
+ err = errors.New("messageBody is large than " + util.IntToString(self.producerConfig.MaxMessageSize))
+ return
+ }
+ return
+}
+
+func (self *DefaultProducerService) sendMsgUseTopicPublishInfo(message *model.Message, communicationMode string, sendCallback string, topicPublishInfo *model.TopicPublishInfo, timeout int64) (sendResult *model.SendResult, err error) {
+ var (
+ sendTotalTime int
+ messageQueue model.MessageQueue
+ )
+
+ sendTotalTime = 1
+ var lastFailedBroker = ""
+ //todo transaction
+ // todo retry
+ for i := 0; i < sendTotalTime; i++ {
+ messageQueue, err = selectOneMessageQueue(topicPublishInfo, lastFailedBroker)
+ if err != nil {
+ return
+ }
+ sendResult, err = self.doSendMessage(message, messageQueue, communicationMode, sendCallback, topicPublishInfo, timeout)
+ if err != nil {
+ // todo retry
+ return
+ }
+ }
+ return
+}
+
+func (self *DefaultProducerService) doSendMessage(message *model.Message, messageQueue model.MessageQueue,
+ communicationMode string, sendCallback string,
+ topicPublishInfo *model.TopicPublishInfo,
+ timeout int64) (sendResult *model.SendResult, err error) {
+ var (
+ brokerAddr string
+ sysFlag int
+ compressMessageFlag int
+ )
+ compressMessageFlag, err = self.tryToCompressMessage(message)
+ if err != nil {
+ return
+ }
+ sysFlag = sysFlag | compressMessageFlag
+ brokerAddr = self.mqClient.FetchMasterBrokerAddress(messageQueue.BrokerName)
+ if len(brokerAddr) == 0 {
+ err = errors.New("The broker[" + messageQueue.BrokerName + "] not exist")
+ return
+ }
+ message.GeneratorMsgUniqueKey()
+ sendMessageHeader := &header.SendMessageRequestHeader{
+ ProducerGroup: self.producerGroup,
+ Topic: message.Topic,
+ DefaultTopic: constant.DEFAULT_TOPIC,
+ DefaultTopicQueueNums: 4,
+ QueueId: messageQueue.QueueId,
+ SysFlag: sysFlag,
+ BornTimestamp: time.Now().UnixNano() / 1000000,
+ Flag: message.Flag,
+ Properties: util.MessageProperties2String(message.Properties),
+
+ UnitMode: false,
+ ReconsumeTimes: message.GetReconsumeTimes(),
+ MaxReconsumeTimes: message.GetMaxReconsumeTimes(),
+ }
+ sendResult, err = self.producerSendMessageRequest(brokerAddr, sendMessageHeader, message, timeout)
+ return
+}
+
+func (self *DefaultProducerService) tryToCompressMessage(message *model.Message) (compressedFlag int, err error) {
+ if len(message.Body) < self.producerConfig.CompressMsgBodyOverHowMuch {
+ compressedFlag = 0
+ return
+ }
+ compressedFlag = int(constant.CompressedFlag)
+ message.Body, err = util.CompressWithLevel(message.Body, self.producerConfig.ZipCompressLevel)
+ return
}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/producer_service_for_send_back.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/producer_service_for_send_back.go b/rocketmq-go/service/producer_service_for_send_back.go
new file mode 100644
index 0000000..290da27
--- /dev/null
+++ b/rocketmq-go/service/producer_service_for_send_back.go
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service
+
+import (
+ "encoding/json"
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+ "github.com/golang/glog"
+)
+
+type SendMessageBackProducerService interface {
+ SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error)
+ InitSendMessageBackProducerService(consumerGroup string, mqClient RocketMqClient, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig)
+}
+
+type SendMessageBackProducerServiceImpl struct {
+ mqClient RocketMqClient
+ defaultProducerService *DefaultProducerService // one namesvr only one
+ consumerGroup string
+ consumerConfig *config.RocketMqConsumerConfig //one mq group have one
+}
+
+// send to original broker,if fail send a new retry message
+func (self *SendMessageBackProducerServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
+ glog.V(2).Info("op=look_send_message_back", messageExt.MsgId, messageExt.Properties, string(messageExt.Body))
+ err = self.consumerSendMessageBack(brokerName, messageExt, delayLayLevel) // todo use
+ if err == nil {
+ return
+ }
+ glog.Error(err)
+ err = self.sendRetryMessageBack(messageExt)
+ return
+}
+
+func (self *SendMessageBackProducerServiceImpl) sendRetryMessageBack(messageExt *model.MessageExt) error {
+ // todo build a retry topic todo check todo check
+ retryMessage := &model.Message{}
+ originMessageId := messageExt.GetOriginMessageId()
+ retryMessage.Properties = messageExt.Properties
+ retryMessage.SetOriginMessageId(originMessageId)
+ retryMessage.Flag = messageExt.Flag
+ retryMessage.Topic = constant.RETRY_GROUP_TOPIC_PREFIX + self.consumerGroup
+ retryMessage.Body = messageExt.Body
+ retryMessage.SetRetryTopic(messageExt.Topic)
+ retryMessage.SetReconsumeTime(messageExt.GetReconsumeTimes() + 1)
+ retryMessage.SetMaxReconsumeTimes(self.consumerConfig.MaxReconsumeTimes)
+ retryMessage.SetDelayTimeLevel(3 + messageExt.GetReconsumeTimes())
+ pp, _ := json.Marshal(retryMessage)
+ glog.Info("look retryMessage ", string(pp), string(messageExt.Body))
+ sendResult, err := self.defaultProducerService.SendDefaultImpl(retryMessage, constant.COMMUNICATIONMODE_SYNC, "", self.defaultProducerService.producerConfig.SendMsgTimeout)
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ xx, _ := json.Marshal(sendResult)
+ glog.Info("look retryMessage result", string(xx))
+ // todo need check send result
+ return nil
+
+}
+
+func (self *SendMessageBackProducerServiceImpl) InitSendMessageBackProducerService(consumerGroup string, mqClient RocketMqClient, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
+ self.mqClient = mqClient
+ self.consumerGroup = consumerGroup
+ self.defaultProducerService = defaultProducerService
+ self.consumerConfig = consumerConfig
+}
+
+func (self *SendMessageBackProducerServiceImpl) consumerSendMessageBack(brokerName string, messageExt *model.MessageExt, delayLayLevel int) (err error) {
+ if len(brokerName) == 0 {
+ err = errors.New("broker can't be empty")
+ glog.Error(err)
+ return
+ }
+ brokerAddr := self.mqClient.FetchMasterBrokerAddress(brokerName)
+ sendMsgBackHeader := &header.ConsumerSendMsgBackRequestHeader{
+ Offset: messageExt.CommitLogOffset,
+ Group: self.consumerGroup,
+ DelayLevel: 0, //Message consume retry strategy<br>-1,no retry,put into DLQ directly<br>0,broker control retry frequency<br>>0,client control retry frequency
+ OriginMsgId: messageExt.MsgId,
+ OriginTopic: messageExt.Topic,
+ UnitMode: false,
+ MaxReconsumeTimes: int32(self.consumerConfig.MaxReconsumeTimes),
+ }
+ remotingCommand := remoting.NewRemotingCommand(remoting.CONSUMER_SEND_MSG_BACK, sendMsgBackHeader)
+ response, invokeErr := self.mqClient.GetRemotingClient().InvokeSync(brokerAddr, remotingCommand, 5000)
+ if invokeErr != nil {
+ err = invokeErr
+ return
+ }
+ if response == nil || response.Code != remoting.SUCCESS {
+ glog.Error("sendMsgBackRemarkError", response.Remark)
+ err = errors.New("send Message back error")
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/rebalance.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/rebalance.go b/rocketmq-go/service/rebalance.go
new file mode 100644
index 0000000..8f4f4fb
--- /dev/null
+++ b/rocketmq-go/service/rebalance.go
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package service
+
+import (
+ "encoding/json"
+ "errors"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+ "github.com/apache/incubator-rocketmq-externals/rocketmq-go/service/allocate_message"
+ "github.com/golang/glog"
+ "sort"
+ "strings"
+ "sync"
+ "time"
+)
+
+type Rebalance struct {
+ groupName string
+ messageModel string
+ topicSubscribeInfoTableLock sync.RWMutex
+ SubscriptionInner map[string]*model.SubscriptionData
+ subscriptionInnerLock sync.RWMutex
+ mqClient RocketMqClient
+ allocateMessageQueueStrategy service_allocate_message.AllocateMessageQueueStrategy
+ processQueueTable map[model.MessageQueue]*model.ProcessQueue // both subscribe topic and retry group
+ processQueueTableLock sync.RWMutex
+ mutex sync.Mutex
+ offsetStore OffsetStore
+ consumerConfig *config.RocketMqConsumerConfig
+}
+
+func (self *Rebalance) GetMqTableInfo() map[model.MessageQueue]model.ProcessQueueInfo {
+ defer self.processQueueTableLock.RUnlock()
+ self.processQueueTableLock.RLock()
+ mqTable := map[model.MessageQueue]model.ProcessQueueInfo{}
+ for messageQueue, processQueue := range self.processQueueTable {
+ mqTable[messageQueue] = processQueue.ChangeToProcessQueueInfo()
+ }
+ return mqTable
+}
+
+func (self *Rebalance) GetProcessQueue(messageQueue model.MessageQueue) *model.ProcessQueue {
+ defer self.processQueueTableLock.RUnlock()
+ self.processQueueTableLock.RLock()
+ return self.processQueueTable[messageQueue]
+}
+
+func (self *Rebalance) ClearProcessQueue(offsetTable map[model.MessageQueue]int64) {
+ defer self.processQueueTableLock.Unlock()
+ self.processQueueTableLock.Lock()
+ for mq, _ := range offsetTable {
+ processQueue, ok := self.processQueueTable[mq]
+ if !ok {
+ continue
+ }
+ processQueue.Clear()
+ }
+
+}
+
+func (self *Rebalance) GetProcessQueueList() (messageQueueList []model.MessageQueue, processQueueList []*model.ProcessQueue) {
+ defer self.processQueueTableLock.RUnlock()
+ self.processQueueTableLock.RLock()
+ for messageQueue, processQueue := range self.processQueueTable {
+ processQueueList = append(processQueueList, processQueue)
+ messageQueueList = append(messageQueueList, messageQueue)
+ }
+ return
+}
+
+//removeUnnecessaryMessageQueue you should drop it first
+func (self *Rebalance) RemoveProcessQueue(messageQueue *model.MessageQueue) {
+ self.offsetStore.Persist(messageQueue)
+ self.offsetStore.RemoveOffset(messageQueue)
+ self.removeMessageQueueFromMap(*messageQueue)
+}
+func (self *Rebalance) removeMessageQueueFromMap(messageQueue model.MessageQueue) {
+ defer self.processQueueTableLock.Unlock()
+ self.processQueueTableLock.Lock()
+ delete(self.processQueueTable, messageQueue)
+
+}
+
+func NewRebalance(groupName string, subscription map[string]string, mqClient RocketMqClient, offsetStore OffsetStore, consumerConfig *config.RocketMqConsumerConfig) *Rebalance {
+ subscriptionInner := make(map[string]*model.SubscriptionData)
+ for topic, subExpression := range subscription {
+ subData := &model.SubscriptionData{
+ Topic: topic,
+ SubString: subExpression,
+ SubVersion: time.Now().Unix(),
+ }
+ subscriptionInner[topic] = subData
+ }
+ // put retry
+ retryTopic := constant.RETRY_GROUP_TOPIC_PREFIX + groupName
+ subscriptionInner[retryTopic] = &model.SubscriptionData{
+ Topic: retryTopic,
+ SubString: "*",
+ SubVersion: time.Now().Unix(),
+ }
+ return &Rebalance{
+ groupName: groupName,
+ mqClient: mqClient,
+ offsetStore: offsetStore,
+ SubscriptionInner: subscriptionInner,
+ allocateMessageQueueStrategy: service_allocate_message.GetAllocateMessageQueueStrategyByConfig("default"),
+ messageModel: "CLUSTERING",
+ processQueueTable: make(map[model.MessageQueue]*model.ProcessQueue),
+ consumerConfig: consumerConfig,
+ }
+}
+
+func (self *Rebalance) DoRebalance() {
+ self.mutex.Lock()
+ defer self.mutex.Unlock()
+ for topic, _ := range self.SubscriptionInner {
+ self.rebalanceByTopic(topic)
+ }
+}
+
+type ConsumerIdSorter []string
+
+func (self ConsumerIdSorter) Len() int {
+ return len(self)
+}
+func (self ConsumerIdSorter) Swap(i, j int) {
+ self[i], self[j] = self[j], self[i]
+}
+func (self ConsumerIdSorter) Less(i, j int) bool {
+ if self[i] < self[j] {
+ return true
+ }
+ return false
+}
+
+func (self *Rebalance) rebalanceByTopic(topic string) error {
+ var cidAll []string
+ cidAll, err := self.findConsumerIdList(topic, self.groupName)
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+ self.topicSubscribeInfoTableLock.RLock()
+ mqs := self.mqClient.GetTopicSubscribeInfo(topic)
+ self.topicSubscribeInfoTableLock.RUnlock()
+ if len(mqs) > 0 && len(cidAll) > 0 {
+ var messageQueues model.MessageQueues = mqs
+ var consumerIdSorter ConsumerIdSorter = cidAll
+
+ sort.Sort(messageQueues)
+ sort.Sort(consumerIdSorter)
+ }
+ allocateResult, err := self.allocateMessageQueueStrategy.Allocate(self.groupName, self.mqClient.GetClientId(), mqs, cidAll)
+
+ if err != nil {
+ glog.Error(err)
+ return err
+ }
+
+ glog.V(2).Infof("rebalance topic[%s]", topic)
+ self.updateProcessQueueTableInRebalance(topic, allocateResult)
+ return nil
+}
+
+func (self *Rebalance) updateProcessQueueTableInRebalance(topic string, mqSet []model.MessageQueue) {
+ defer self.processQueueTableLock.RUnlock()
+ self.processQueueTableLock.RLock()
+ self.removeTheQueueDontBelongHere(topic, mqSet)
+ self.putTheQueueToProcessQueueTable(topic, mqSet)
+
+}
+func (self *Rebalance) removeTheQueueDontBelongHere(topic string, mqSet []model.MessageQueue) {
+ // there is n^2 todo improve
+ for key, value := range self.processQueueTable {
+ if topic != key.Topic {
+ continue
+ }
+ needDelete := true
+ for _, messageQueueItem := range mqSet {
+ if key == messageQueueItem {
+ needDelete = false
+ // todo if expire
+ break
+ }
+ }
+ if needDelete {
+ value.SetDrop(true)
+ delete(self.processQueueTable, key)
+ }
+ }
+}
+
+func (self *Rebalance) putTheQueueToProcessQueueTable(topic string, mqSet []model.MessageQueue) {
+ for index, mq := range mqSet {
+ _, ok := self.processQueueTable[mq]
+ if !ok {
+ pullRequest := new(model.PullRequest)
+ pullRequest.ConsumerGroup = self.groupName
+ pullRequest.MessageQueue = &mqSet[index]
+ pullRequest.NextOffset = self.computePullFromWhere(&mq) // todo use remote offset
+ pullRequest.ProcessQueue = model.NewProcessQueue()
+ self.processQueueTable[mq] = pullRequest.ProcessQueue
+ self.mqClient.EnqueuePullMessageRequest(pullRequest)
+ }
+ }
+
+}
+func (self *Rebalance) computePullFromWhere(mq *model.MessageQueue) int64 {
+ var result int64 = -1
+ lastOffset := self.offsetStore.ReadOffset(mq, READ_FROM_STORE)
+ switch self.consumerConfig.ConsumeFromWhere {
+ case config.CONSUME_FROM_LAST_OFFSET:
+ if lastOffset >= 0 {
+ result = lastOffset
+ } else {
+ if strings.HasPrefix(mq.Topic, constant.RETRY_GROUP_TOPIC_PREFIX) {
+ result = 0
+ } else {
+ result = self.mqClient.GetMaxOffset(mq)
+ }
+ }
+ break
+ case config.CONSUME_FROM_FIRST_OFFSET:
+ if lastOffset >= 0 {
+ result = lastOffset
+ } else {
+ result = 0 // use the begin offset
+ }
+ break
+ case config.CONSUME_FROM_TIMESTAMP:
+ if lastOffset >= 0 {
+ result = lastOffset
+ } else {
+ if strings.HasPrefix(mq.Topic, constant.RETRY_GROUP_TOPIC_PREFIX) {
+ result = 0
+ } else {
+ result = self.mqClient.SearchOffset(mq, self.consumerConfig.ConsumeTimestamp)
+ }
+ }
+ break
+ default:
+
+ }
+
+ return result
+}
+
+func (self *Rebalance) findConsumerIdList(topic string, groupName string) ([]string, error) {
+ brokerAddr, ok := self.mqClient.FindBrokerAddrByTopic(topic)
+ if !ok {
+ err := self.mqClient.UpdateTopicRouteInfoFromNameServer(topic)
+ if err != nil {
+ glog.Error(err)
+ }
+ brokerAddr, ok = self.mqClient.FindBrokerAddrByTopic(topic)
+ }
+
+ if ok {
+ return self.getConsumerIdListByGroup(brokerAddr, groupName, 3000)
+ }
+
+ return nil, errors.New("can't find broker")
+
+}
+
+func (self *Rebalance) getConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) ([]string, error) {
+ requestHeader := new(header.GetConsumerListByGroupRequestHeader)
+ requestHeader.ConsumerGroup = consumerGroup
+
+ request := remoting.NewRemotingCommand(remoting.GET_CONSUMER_LIST_BY_GROUP, requestHeader)
+
+ response, err := self.mqClient.GetRemotingClient().InvokeSync(addr, request, timeoutMillis)
+ if err != nil {
+ glog.Error(err)
+ return nil, err
+ }
+ if response.Code == remoting.SUCCESS {
+ getConsumerListByGroupResponseBody := new(header.GetConsumerListByGroupResponseBody)
+ bodyjson := strings.Replace(string(response.Body), "0:", "\"0\":", -1)
+ bodyjson = strings.Replace(bodyjson, "1:", "\"1\":", -1)
+ err := json.Unmarshal([]byte(bodyjson), getConsumerListByGroupResponseBody)
+ if err != nil {
+ glog.Error(err)
+ return nil, err
+ }
+ return getConsumerListByGroupResponseBody.ConsumerIdList, nil
+ }
+ return nil, errors.New("getConsumerIdListByGroup error=" + response.Remark)
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/rebalance_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/rebalance_service.go b/rocketmq-go/service/rebalance_service.go
deleted file mode 100644
index acdcdd6..0000000
--- a/rocketmq-go/service/rebalance_service.go
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package service
-
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
-
-type Rebalance struct {
- mqClient RocketMqClient
- offsetStore OffsetStore
- consumerConfig config.RocketMqClientConfig
-}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/tasks.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/tasks.go b/rocketmq-go/tasks.go
new file mode 100644
index 0000000..604222f
--- /dev/null
+++ b/rocketmq-go/tasks.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rocketmq
+
+import (
+ "math/rand"
+ "time"
+)
+
+type TaskManager struct {
+}
+
+func (self MqClientManager) StartAllScheduledTask() {
+ rand.Seed(time.Now().UnixNano())
+ go func() {
+ updateTopicRouteTimer := time.NewTimer(5 * time.Second)
+ for {
+ <-updateTopicRouteTimer.C
+ self.UpdateTopicRouteInfoFromNameServer()
+ updateTopicRouteTimer.Reset(5 * time.Second)
+ }
+ }()
+
+ go func() {
+ heartbeatTimer := time.NewTimer(10 * time.Second)
+ for {
+ <-heartbeatTimer.C
+ self.SendHeartbeatToAllBrokerWithLock()
+ heartbeatTimer.Reset(5 * time.Second)
+ }
+ }()
+
+ go func() {
+ rebalanceTimer := time.NewTimer(15 * time.Second)
+ for {
+ <-rebalanceTimer.C
+ self.rebalanceControllr.doRebalance()
+ rebalanceTimer.Reset(30 * time.Second)
+ }
+ }()
+
+ go func() {
+ timeoutTimer := time.NewTimer(3 * time.Second)
+ for {
+ <-timeoutTimer.C
+ self.mqClient.ClearExpireResponse()
+ timeoutTimer.Reset(time.Second)
+ }
+ }()
+ self.pullMessageController.Start()
+
+ //cleanExpireMsg
+ self.cleanExpireMsgController.Start()
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/compress_util.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/compress_util.go b/rocketmq-go/util/compress_util.go
new file mode 100644
index 0000000..0617911
--- /dev/null
+++ b/rocketmq-go/util/compress_util.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "bytes"
+ "compress/zlib"
+ "github.com/golang/glog"
+ "io/ioutil"
+)
+
+func UnCompress(body []byte) (unCompressBody []byte, err error) {
+ b := bytes.NewReader(body)
+ z, err := zlib.NewReader(b)
+ if err != nil {
+ glog.Error(err)
+ return
+ }
+ defer z.Close()
+ unCompressBody, err = ioutil.ReadAll(z)
+ if err != nil {
+ glog.Error(err)
+ }
+ return
+}
+func Compress(body []byte) (compressBody []byte, err error) {
+ var in bytes.Buffer
+ w := zlib.NewWriter(&in)
+ _, err = w.Write(body)
+ w.Close()
+ compressBody = in.Bytes()
+ return
+}
+
+func CompressWithLevel(body []byte, level int) (compressBody []byte, err error) {
+ var (
+ in bytes.Buffer
+ w *zlib.Writer
+ )
+ //w := zlib.NewWriter(&in)
+ w, err = zlib.NewWriterLevel(&in, level)
+ if err != nil {
+ return
+ }
+ _, err = w.Write(body)
+ w.Close()
+ compressBody = in.Bytes()
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/concurrent_map.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/concurrent_map.go b/rocketmq-go/util/concurrent_map.go
index 2fbe9bf..9d3e273 100644
--- a/rocketmq-go/util/concurrent_map.go
+++ b/rocketmq-go/util/concurrent_map.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package util
import (
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/message_client_id_generator.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/message_client_id_generator.go b/rocketmq-go/util/message_client_id_generator.go
index df4cfb6..23293c0 100644
--- a/rocketmq-go/util/message_client_id_generator.go
+++ b/rocketmq-go/util/message_client_id_generator.go
@@ -88,13 +88,13 @@ func getStartAndNextStartTime() (thisMonthFirstDay12 int64, nextMonthFirstDay12
now := time.Now()
year := now.Year()
month := now.Month()
- thisMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, time.Local).UnixNano()
+ thisMonthFirstDay12 = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).UnixNano()
month = month + 1
if month > 12 {
month = month - 12
year = year + 1
}
- nextMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, time.Local).UnixNano()
+ nextMonthFirstDay12 = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).UnixNano()
return
}
func bytes2string(bytes []byte) (ret string) {
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/message_properties.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/message_properties.go b/rocketmq-go/util/message_properties.go
new file mode 100644
index 0000000..59fd5b8
--- /dev/null
+++ b/rocketmq-go/util/message_properties.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "strings"
+)
+
+//char 1 and 2 from java code
+var NAME_VALUE_SEPARATOR = string(rune(1))
+
+var PROPERTY_SEPARATOR = string(rune(2))
+
+func MessageProperties2String(propertiesMap map[string]string) (ret string) {
+ for key, value := range propertiesMap {
+ ret = ret + key + NAME_VALUE_SEPARATOR + value + PROPERTY_SEPARATOR
+ }
+ return
+}
+
+func String2MessageProperties(properties string) (ret map[string]string) {
+ ret = make(map[string]string)
+ for _, nameValueStr := range strings.Split(properties, PROPERTY_SEPARATOR) {
+ nameValuePair := strings.Split(nameValueStr, NAME_VALUE_SEPARATOR)
+ nameValueLen := len(nameValuePair)
+ if nameValueLen != 2 {
+ //glog.Error("nameValuePair is error", nameValueStr)
+ continue
+ }
+ ret[nameValuePair[0]] = nameValuePair[1]
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/regex_util.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/regex_util.go b/rocketmq-go/util/regex_util.go
new file mode 100644
index 0000000..5357452
--- /dev/null
+++ b/rocketmq-go/util/regex_util.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package util
+
+import (
+ "regexp"
+)
+
+//var regexpMap map[string]*regexp.Regexp
+//var rwMutex sync.RWMutex
+
+// todo improve
+func MatchString(value, pattern string) bool {
+ re, err := regexp.Compile(pattern)
+ if err != nil {
+ return false
+ }
+ return re.MatchString(value)
+}
|