rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [1/2] incubator-rocketmq-externals git commit: [ROCKETMQ-198] Go-Client's incomplete implement.
Date Fri, 07 Jul 2017 01:56:41 GMT
Repository: incubator-rocketmq-externals
Updated Branches:
  refs/heads/master c98a770a6 -> 28b98b096


http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/pull_message_controller.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/pull_message_controller.go b/rocketmq-go/pull_message_controller.go
new file mode 100644
index 0000000..320cc31
--- /dev/null
+++ b/rocketmq-go/pull_message_controller.go
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import (
+	"bytes"
+	"compress/zlib"
+	"encoding/binary"
+	"fmt"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"github.com/golang/glog"
+	"io/ioutil"
+	"strconv"
+	"time"
+)
+
+type PullMessageController struct {
+	mqClient      service.RocketMqClient
+	clientFactory *ClientFactory
+}
+
+func NewPullMessageController(mqClient service.RocketMqClient, clientFactory *ClientFactory) *PullMessageController {
+	return &PullMessageController{
+		mqClient:      mqClient,
+		clientFactory: clientFactory,
+	}
+}
+
+func (self *PullMessageController) Start() {
+	go func() {
+		for {
+			pullRequest := self.mqClient.DequeuePullMessageRequest()
+			self.pullMessage(pullRequest)
+		}
+	}()
+}
+
+func (self *PullMessageController) needDelayPullMessage(mqPushConsumer *DefaultMQPushConsumer, pullRequest *model.PullRequest) (needDelayTime int64) {
+	if pullRequest.ProcessQueue.GetMsgCount() > mqPushConsumer.ConsumerConfig.PullThresholdForQueue {
+		return mqPushConsumer.ConsumerConfig.PullTimeDelayMillsWhenFlowControl
+	}
+	if pullRequest.ProcessQueue.GetMaxSpan() > mqPushConsumer.ConsumerConfig.ConsumeConcurrentlyMaxSpan {
+		return mqPushConsumer.ConsumerConfig.PullTimeDelayMillsWhenFlowControl
+	}
+	return
+}
+
+func (self *PullMessageController) pullMessageLater(pullRequest *model.PullRequest, millisecond int64) {
+	go func() {
+		timeoutTimer := time.NewTimer(time.Duration(millisecond) * time.Millisecond)
+		<-timeoutTimer.C
+		self.pullMessage(pullRequest)
+	}()
+	return
+}
+
+func (self *PullMessageController) pullMessage(pullRequest *model.PullRequest) {
+	defaultMQPullConsumer := self.clientFactory.ConsumerTable[pullRequest.ConsumerGroup]
+	if pullRequest.ProcessQueue.IsDropped() {
+		return
+	}
+
+	//pullRequest.ProcessQueue.SetLastPullTimestamp(System.currentTimeMillis());
+	// state ok
+	// isPause
+
+	delayPullTime := self.needDelayPullMessage(defaultMQPullConsumer, pullRequest)
+	if delayPullTime > 0 {
+		self.pullMessageLater(pullRequest, delayPullTime)
+		return
+	}
+	commitOffsetValue := defaultMQPullConsumer.offsetStore.ReadOffset(pullRequest.MessageQueue, service.READ_FROM_MEMORY)
+
+	subscriptionData, ok := defaultMQPullConsumer.rebalance.SubscriptionInner[pullRequest.MessageQueue.Topic]
+	if !ok {
+		self.pullMessageLater(pullRequest, defaultMQPullConsumer.ConsumerConfig.PullTimeDelayMillsWhenException)
+		return
+	}
+
+	var sysFlag int32 = 0
+	if commitOffsetValue > 0 {
+		sysFlag |= constant.FLAG_COMMIT_OFFSET
+	}
+	sysFlag |= constant.FLAG_SUSPEND
+	sysFlag |= constant.FLAG_SUBSCRIPTION
+	requestHeader := new(header.PullMessageRequestHeader)
+	requestHeader.ConsumerGroup = pullRequest.ConsumerGroup
+	requestHeader.Topic = pullRequest.MessageQueue.Topic
+	requestHeader.QueueId = pullRequest.MessageQueue.QueueId
+	requestHeader.QueueOffset = pullRequest.NextOffset
+
+	requestHeader.CommitOffset = commitOffsetValue
+	requestHeader.SuspendTimeoutMillis = defaultMQPullConsumer.ConsumerConfig.BrokerSuspendMaxTimeMillis
+	requestHeader.MaxMsgNums = int32(defaultMQPullConsumer.ConsumerConfig.PullBatchSize)
+	requestHeader.SubVersion = subscriptionData.SubVersion
+	requestHeader.Subscription = subscriptionData.SubString
+
+	requestHeader.SysFlag = sysFlag
+
+	pullCallback := func(responseFuture *remoting.ResponseFuture) {
+		var nextBeginOffset int64 = pullRequest.NextOffset
+
+		if responseFuture != nil {
+			responseCommand := responseFuture.ResponseCommand
+			if responseCommand.Code == remoting.SUCCESS && len(responseCommand.Body) > 0 {
+				//FOUND
+				var err error
+				pullResult := responseCommand.ExtFields
+				if ok {
+					if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
+						if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
+							nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
+							if err != nil {
+								glog.Error(err)
+								return
+							}
+						}
+					}
+				}
+				msgs := DecodeMessage(responseFuture.ResponseCommand.Body)
+
+				msgs = FilterMessageAgainByTags(msgs, defaultMQPullConsumer.subscriptionTag[pullRequest.MessageQueue.Topic])
+				if len(msgs) == 0 {
+					if pullRequest.ProcessQueue.GetMsgCount() == 0 {
+						defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
+					}
+				}
+				//
+				pullRequest.ProcessQueue.PutMessage(msgs)
+				defaultMQPullConsumer.consumeMessageService.SubmitConsumeRequest(msgs, pullRequest.ProcessQueue, pullRequest.MessageQueue, true)
+			} else {
+				//glog.Error(fmt.Sprintf("pull message error,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body)))
+				var err error // change the offset , use nextBeginOffset
+				pullResult := responseCommand.ExtFields
+				if ok {
+					if nextBeginOffsetInter, ok := pullResult["nextBeginOffset"]; ok {
+						if nextBeginOffsetStr, ok := nextBeginOffsetInter.(string); ok {
+							nextBeginOffset, err = strconv.ParseInt(nextBeginOffsetStr, 10, 64)
+							if err != nil {
+								glog.Error(err)
+							}
+						}
+					}
+				}
+				if responseCommand.Code == remoting.PULL_NOT_FOUND || responseCommand.Code == remoting.PULL_RETRY_IMMEDIATELY {
+					//NO_NEW_MSG //NO_MATCHED_MSG
+					if pullRequest.ProcessQueue.GetMsgCount() == 0 {
+						defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, true)
+					}
+					//update offset increase only
+					//failedPullRequest, _ := json.Marshal(pullRequest)
+					//glog.Error("the pull request offset illegal", string(failedPullRequest))
+				} else if responseCommand.Code == remoting.PULL_OFFSET_MOVED {
+					//OFFSET_ILLEGAL
+					glog.Error(fmt.Sprintf("PULL_OFFSET_MOVED,code=%d,body=%s", responseCommand.Code, string(responseCommand.Body)))
+					pullRequest.ProcessQueue.SetDrop(true)
+					go func() {
+						executeTaskLater := time.NewTimer(10 * time.Second)
+						<-executeTaskLater.C
+						defaultMQPullConsumer.offsetStore.UpdateOffset(pullRequest.MessageQueue, nextBeginOffset, false)
+						defaultMQPullConsumer.rebalance.RemoveProcessQueue(pullRequest.MessageQueue)
+					}()
+				} else {
+					glog.Errorf("illegal response code. pull message error,code=%d,request=%v OFFSET_ILLEGAL", responseCommand.Code, requestHeader)
+					glog.Error(pullRequest.MessageQueue)
+					time.Sleep(1 * time.Second)
+				}
+			}
+		} else {
+			glog.Error("responseFuture is nil")
+		}
+
+		if pullRequest.ProcessQueue.IsDropped() {
+			return
+		}
+		nextPullRequest := &model.PullRequest{
+			ConsumerGroup: pullRequest.ConsumerGroup,
+			NextOffset:    nextBeginOffset,
+			MessageQueue:  pullRequest.MessageQueue,
+			ProcessQueue:  pullRequest.ProcessQueue,
+		}
+		if defaultMQPullConsumer.ConsumerConfig.PullInterval > 0 {
+			go func() {
+				nextPullTime := time.NewTimer(time.Duration(defaultMQPullConsumer.ConsumerConfig.PullInterval) * time.Millisecond)
+				<-nextPullTime.C
+				self.mqClient.EnqueuePullMessageRequest(nextPullRequest)
+			}()
+		} else {
+			self.mqClient.EnqueuePullMessageRequest(nextPullRequest)
+		}
+	}
+	glog.V(2).Infof("requestHeader look offset %s %s %s %s", requestHeader.QueueOffset, requestHeader.Topic, requestHeader.QueueId, requestHeader.CommitOffset)
+	self.consumerPullMessageAsync(pullRequest.MessageQueue.BrokerName, requestHeader, pullCallback)
+}
+func FilterMessageAgainByTags(msgExts []model.MessageExt, subscriptionTagList []string) (result []model.MessageExt) {
+	result = msgExts
+	if len(subscriptionTagList) == 0 {
+		return
+	}
+	result = []model.MessageExt{}
+	for _, msg := range msgExts {
+		for _, tag := range subscriptionTagList {
+			if tag == msg.GetTag() {
+				result = append(result, msg)
+				break
+			}
+		}
+	}
+	return
+}
+
+func (self *PullMessageController) consumerPullMessageAsync(brokerName string, requestHeader remoting.CustomerHeader, invokeCallback remoting.InvokeCallback) {
+	brokerAddr, _, found := self.mqClient.FindBrokerAddressInSubscribe(brokerName, 0, false)
+	if found {
+		remotingCommand := remoting.NewRemotingCommand(remoting.PULL_MESSAGE, requestHeader)
+		self.mqClient.GetRemotingClient().InvokeAsync(brokerAddr, remotingCommand, 1000, invokeCallback)
+	}
+}
+
+func DecodeMessage(data []byte) []model.MessageExt {
+	buf := bytes.NewBuffer(data)
+	var storeSize, magicCode, bodyCRC, queueId, flag, sysFlag, reconsumeTimes, bodyLength, bornPort, storePort int32
+	var queueOffset, physicOffset, preparedTransactionOffset, bornTimeStamp, storeTimestamp int64
+	var topicLen byte
+	var topic, body, properties, bornHost, storeHost []byte
+	var propertiesLength int16
+
+	var propertiesmap = make(map[string]string)
+
+	msgs := []model.MessageExt{}
+	for buf.Len() > 0 {
+		msg := model.MessageExt{Message: &model.Message{}}
+		binary.Read(buf, binary.BigEndian, &storeSize)
+		binary.Read(buf, binary.BigEndian, &magicCode)
+		binary.Read(buf, binary.BigEndian, &bodyCRC)
+		binary.Read(buf, binary.BigEndian, &queueId)
+		binary.Read(buf, binary.BigEndian, &flag)
+		binary.Read(buf, binary.BigEndian, &queueOffset)
+		binary.Read(buf, binary.BigEndian, &physicOffset)
+		binary.Read(buf, binary.BigEndian, &sysFlag)
+		binary.Read(buf, binary.BigEndian, &bornTimeStamp)
+		bornHost = make([]byte, 4)
+		binary.Read(buf, binary.BigEndian, &bornHost)
+		binary.Read(buf, binary.BigEndian, &bornPort)
+		binary.Read(buf, binary.BigEndian, &storeTimestamp)
+		storeHost = make([]byte, 4)
+		binary.Read(buf, binary.BigEndian, &storeHost)
+		binary.Read(buf, binary.BigEndian, &storePort)
+		binary.Read(buf, binary.BigEndian, &reconsumeTimes)
+		binary.Read(buf, binary.BigEndian, &preparedTransactionOffset)
+		binary.Read(buf, binary.BigEndian, &bodyLength)
+		if bodyLength > 0 {
+			body = make([]byte, bodyLength)
+			binary.Read(buf, binary.BigEndian, body)
+			if (sysFlag & constant.CompressedFlag) == constant.CompressedFlag {
+				b := bytes.NewReader(body)
+				z, err := zlib.NewReader(b)
+				if err != nil {
+					glog.Error(err)
+					return nil
+				}
+				body, err = ioutil.ReadAll(z)
+				z.Close()
+				if err != nil {
+					glog.Error(err)
+					return nil
+				}
+			}
+		}
+		binary.Read(buf, binary.BigEndian, &topicLen)
+		topic = make([]byte, int(topicLen))
+		binary.Read(buf, binary.BigEndian, &topic)
+		binary.Read(buf, binary.BigEndian, &propertiesLength)
+		if propertiesLength > 0 {
+			properties = make([]byte, propertiesLength)
+			binary.Read(buf, binary.BigEndian, &properties)
+			propertiesmap = util.String2MessageProperties(string(properties))
+		}
+
+		if magicCode != -626843481 {
+			glog.Errorf("magic code is error %d", magicCode)
+			return nil
+		}
+
+		msg.Topic = string(topic)
+		msg.QueueId = queueId
+		msg.SysFlag = sysFlag
+		msg.QueueOffset = queueOffset
+		msg.BodyCRC = bodyCRC
+		msg.StoreSize = storeSize
+		msg.BornTimestamp = bornTimeStamp
+		msg.ReconsumeTimes = reconsumeTimes
+		msg.Flag = int(flag)
+		msg.CommitLogOffset = physicOffset
+		msg.StoreTimestamp = storeTimestamp
+		msg.PreparedTransactionOffset = preparedTransactionOffset
+		msg.Body = body
+		msg.Properties = propertiesmap
+
+		//  <  3.5.8 use messageOffsetId
+		//  >= 3.5.8 use clientUniqMsgId
+		msg.MsgId = msg.GetMsgUniqueKey()
+		if len(msg.MsgId) == 0 {
+			msg.MsgId = util.GeneratorMessageOffsetId(storeHost, storePort, msg.CommitLogOffset)
+		}
+		msgs = append(msgs, msg)
+	}
+
+	return msgs
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/rebalance_controller.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/rebalance_controller.go b/rocketmq-go/rebalance_controller.go
new file mode 100644
index 0000000..d6d4001
--- /dev/null
+++ b/rocketmq-go/rebalance_controller.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+type RebalanceController struct {
+	clientFactory *ClientFactory
+}
+
+func NewRebalanceController(clientFactory *ClientFactory) *RebalanceController {
+	return &RebalanceController{
+		clientFactory: clientFactory,
+	}
+}
+
+func (self *RebalanceController) doRebalance() {
+	for _, consumer := range self.clientFactory.ConsumerTable {
+		consumer.rebalance.DoRebalance()
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_averagely.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_averagely.go b/rocketmq-go/service/allocate_message/allocate_message_averagely.go
new file mode 100644
index 0000000..cdfe775
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_averagely.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service_allocate_message
+
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type AllocateMessageQueueAveragely struct{}
+
+func (self *AllocateMessageQueueAveragely) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+
+	if currentCID == "" {
+		return nil, errors.New("currentCID is empty")
+	}
+
+	if mqAll == nil || len(mqAll) == 0 {
+		return nil, errors.New("mqAll is nil or mqAll empty")
+	}
+
+	if cidAll == nil || len(cidAll) == 0 {
+		return nil, errors.New("cidAll is nil or cidAll empty")
+	}
+
+	result := make([]model.MessageQueue, 0)
+	for i, cid := range cidAll {
+		if cid == currentCID {
+			mqLen := len(mqAll)
+			cidLen := len(cidAll)
+			mod := mqLen % cidLen
+			var averageSize int
+			if mqLen < cidLen {
+				averageSize = 1
+			} else {
+				if mod > 0 && i < mod {
+					averageSize = mqLen/cidLen + 1
+				} else {
+					averageSize = mqLen / cidLen
+				}
+			}
+
+			var startIndex int
+			if mod > 0 && i < mod {
+				startIndex = i * averageSize
+			} else {
+				startIndex = i*averageSize + mod
+			}
+
+			var min int
+			if averageSize > mqLen-startIndex {
+				min = mqLen - startIndex
+			} else {
+				min = averageSize
+			}
+
+			for j := 0; j < min; j++ {
+				result = append(result, *mqAll[(startIndex+j)%mqLen])
+			}
+			return result, nil
+
+		}
+	}
+
+	return nil, errors.New("cant't find currentCID")
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go b/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go
new file mode 100644
index 0000000..cdfd668
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_averagely_by_circle.go
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service_allocate_message
+
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type AllocateMessageQueueAveragelyByCircle struct{}
+
+func (self *AllocateMessageQueueAveragelyByCircle) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+	if currentCID == "" {
+		return nil, errors.New("currentCID is empty")
+	}
+
+	if mqAll == nil || len(mqAll) == 0 {
+		return nil, errors.New("mqAll is nil or mqAll empty")
+	}
+
+	if cidAll == nil || len(cidAll) == 0 {
+		return nil, errors.New("cidAll is nil or cidAll empty")
+	}
+
+	result := make([]model.MessageQueue, 0)
+	for i, cid := range cidAll {
+		if cid == currentCID {
+			mqLen := len(mqAll)
+			cidLen := len(cidAll)
+			mod := mqLen % cidLen
+			var averageSize int
+			if mqLen < cidLen {
+				averageSize = 1
+			} else {
+				if mod > 0 && i < mod {
+					averageSize = mqLen/cidLen + 1
+				} else {
+					averageSize = mqLen / cidLen
+				}
+			}
+
+			var startIndex int
+			if mod > 0 && i < mod {
+				startIndex = i * averageSize
+			} else {
+				startIndex = i*averageSize + mod
+			}
+
+			var min int
+			if averageSize > mqLen-startIndex {
+				min = mqLen - startIndex
+			} else {
+				min = averageSize
+			}
+
+			for j := 0; j < min; j++ {
+				result = append(result, *mqAll[(startIndex+j)%mqLen])
+			}
+			return result, nil
+
+		}
+	}
+
+	return nil, errors.New("cant't find currentCID")
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_by_config.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_by_config.go b/rocketmq-go/service/allocate_message/allocate_message_by_config.go
new file mode 100644
index 0000000..2046ffd
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_by_config.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service_allocate_message
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+
+type AllocateMessageQueueByConfig struct {
+	messageQueueList []model.MessageQueue
+}
+
+func (self *AllocateMessageQueueByConfig) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+	return self.messageQueueList, nil
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go b/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go
new file mode 100644
index 0000000..6fe1cbb
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_by_machine_room.go
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service_allocate_message
+
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type AllocateMessageQueueByMachineRoom struct {
+}
+
+func (self *AllocateMessageQueueByMachineRoom) Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error) {
+	if currentCID == "" {
+		return nil, errors.New("currentCID is empty")
+	}
+
+	if mqAll == nil || len(mqAll) == 0 {
+		return nil, errors.New("mqAll is nil or mqAll empty")
+	}
+
+	if cidAll == nil || len(cidAll) == 0 {
+		return nil, errors.New("cidAll is nil or cidAll empty")
+	}
+
+	result := make([]model.MessageQueue, 0)
+	for i, cid := range cidAll {
+		if cid == currentCID {
+			mqLen := len(mqAll)
+			cidLen := len(cidAll)
+			mod := mqLen % cidLen
+			var averageSize int
+			if mqLen < cidLen {
+				averageSize = 1
+			} else {
+				if mod > 0 && i < mod {
+					averageSize = mqLen/cidLen + 1
+				} else {
+					averageSize = mqLen / cidLen
+				}
+			}
+
+			var startIndex int
+			if mod > 0 && i < mod {
+				startIndex = i * averageSize
+			} else {
+				startIndex = i*averageSize + mod
+			}
+
+			var min int
+			if averageSize > mqLen-startIndex {
+				min = mqLen - startIndex
+			} else {
+				min = averageSize
+			}
+
+			for j := 0; j < min; j++ {
+				result = append(result, *mqAll[(startIndex+j)%mqLen])
+			}
+			return result, nil
+
+		}
+	}
+
+	return nil, errors.New("cant't find currentCID")
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go b/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go
new file mode 100644
index 0000000..e838c7b
--- /dev/null
+++ b/rocketmq-go/service/allocate_message/allocate_message_queue_strategy.go
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service_allocate_message
+
+import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+
+type AllocateMessageQueueStrategy interface {
+	Allocate(consumerGroup string, currentCID string, mqAll []*model.MessageQueue, cidAll []string) ([]model.MessageQueue, error)
+}
+
+func GetAllocateMessageQueueStrategyByConfig(allocateMessageQueueStrategy string) AllocateMessageQueueStrategy {
+	return new(AllocateMessageQueueAveragely)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/consume_message_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/consume_message_service.go b/rocketmq-go/service/consume_message_service.go
index 09be61c..e4a08df 100644
--- a/rocketmq-go/service/consume_message_service.go
+++ b/rocketmq-go/service/consume_message_service.go
@@ -35,22 +35,22 @@ type ConsumeMessageService interface {
 }
 
 type ConsumeMessageConcurrentlyServiceImpl struct {
-	consumerGroup   string
-	messageListener model.MessageListener
-	//sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
-	offsetStore    OffsetStore
-	consumerConfig *config.RocketMqConsumerConfig
+	consumerGroup                  string
+	messageListener                model.MessageListener
+	sendMessageBackProducerService SendMessageBackProducerService //for send retry Message
+	offsetStore                    OffsetStore
+	consumerConfig                 *config.RocketMqConsumerConfig
 }
 
 func NewConsumeMessageConcurrentlyServiceImpl(messageListener model.MessageListener) (consumeService ConsumeMessageService) {
-	//consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener:messageListener, sendMessageBackProducerService:&SendMessageBackProducerServiceImpl{}}
+	consumeService = &ConsumeMessageConcurrentlyServiceImpl{messageListener: messageListener, sendMessageBackProducerService: &SendMessageBackProducerServiceImpl{}}
 	return
 }
 
 func (self *ConsumeMessageConcurrentlyServiceImpl) Init(consumerGroup string, mqClient RocketMqClient, offsetStore OffsetStore, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
 	self.consumerGroup = consumerGroup
 	self.offsetStore = offsetStore
-	//self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient,defaultProducerService,consumerConfig)
+	self.sendMessageBackProducerService.InitSendMessageBackProducerService(consumerGroup, mqClient, defaultProducerService, consumerConfig)
 	self.consumerConfig = consumerConfig
 }
 
@@ -74,7 +74,7 @@ func (self *ConsumeMessageConcurrentlyServiceImpl) SubmitConsumeRequest(msgs []m
 }
 
 func (self *ConsumeMessageConcurrentlyServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
-	//err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
+	err = self.sendMessageBackProducerService.SendMessageBack(messageExt, 0, brokerName)
 	return
 }
 
@@ -128,10 +128,10 @@ func (self *ConsumeMessageConcurrentlyServiceImpl) processConsumeResult(result m
 	if len(failedMessages) > 0 {
 		self.SubmitConsumeRequest(failedMessages, processQueue, messageQueue, true)
 	}
-	//commitOffset := processQueue.RemoveMessage(successMessages)
-	//if (commitOffset > 0 && ! processQueue.IsDropped()) {
-	//	self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
-	//}
+	commitOffset := processQueue.RemoveMessage(successMessages)
+	if commitOffset > 0 && !processQueue.IsDropped() {
+		self.offsetStore.UpdateOffset(messageQueue, commitOffset, true)
+	}
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/mq_client.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/mq_client.go b/rocketmq-go/service/mq_client.go
index 8bbfe79..366aa3f 100644
--- a/rocketmq-go/service/mq_client.go
+++ b/rocketmq-go/service/mq_client.go
@@ -59,10 +59,9 @@ type RocketMqClient interface {
 
 var DEFAULT_TIMEOUT int64 = 6000
 
-// common
 type MqClientImpl struct {
 	ClientId                string
-	remotingClient          remoting.RemotingClient
+	remotingClient          *remoting.DefalutRemotingClient
 	TopicRouteTable         util.ConcurrentMap      // map[string]*model.TopicRouteData   //topic | topicRoteData
 	BrokerAddrTable         util.ConcurrentMap      //map[string]map[int]string          //brokerName | map[brokerId]address
 	TopicPublishInfoTable   util.ConcurrentMap      //map[string]*model.TopicPublishInfo //topic | TopicPublishInfo //all use this
@@ -134,7 +133,7 @@ func (self *MqClientImpl) GetPublishTopicList() []string {
 	}
 	return publishTopicList
 }
-func (self *MqClientImpl) GetRemotingClient() remoting.RemotingClient {
+func (self *MqClientImpl) GetRemotingClient() *remoting.DefalutRemotingClient {
 	return self.remotingClient
 }
 
@@ -147,7 +146,7 @@ func (self *MqClientImpl) DequeuePullMessageRequest() (pullRequest *model.PullRe
 }
 
 func (self *MqClientImpl) ClearExpireResponse() {
-	//self.remotingClient.ClearExpireResponse()
+	self.remotingClient.ClearExpireResponse()
 }
 
 func (self *MqClientImpl) FetchMasterBrokerAddress(brokerName string) (masterAddress string) {
@@ -199,10 +198,9 @@ func (self MqClientImpl) GetTopicRouteInfoFromNameServer(topic string, timeoutMi
 		return nil, err
 	}
 	if response.Code == remoting.SUCCESS {
-		//todo  it's dirty
 		topicRouteData := new(model.TopicRouteData)
 		bodyjson := strings.Replace(string(response.Body), ",0:", ",\"0\":", -1)
-		bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) // fastJson的key没有引号 需要通用的方法
+		bodyjson = strings.Replace(bodyjson, ",1:", ",\"1\":", -1) // fastJson key is string todo todo
 		bodyjson = strings.Replace(bodyjson, "{0:", "{\"0\":", -1)
 		bodyjson = strings.Replace(bodyjson, "{1:", "{\"1\":", -1)
 		err = json.Unmarshal([]byte(bodyjson), topicRouteData)
@@ -291,7 +289,7 @@ func (self MqClientImpl) updateTopicRouteInfoLocal(topic string, topicRouteData
 
 	//update pubInfo for each
 	topicPublishInfo := model.BuildTopicPublishInfoFromTopicRoteData(topic, topicRouteData)
-	self.TopicPublishInfoTable.Set(topic, topicPublishInfo)
+	self.TopicPublishInfoTable.Set(topic, topicPublishInfo) // todo
 
 	mqList := model.BuildTopicSubscribeInfoFromRoteData(topic, topicRouteData)
 	self.TopicSubscribeInfoTable.Set(topic, mqList)

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/mq_fault_strategy.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/mq_fault_strategy.go b/rocketmq-go/service/mq_fault_strategy.go
new file mode 100644
index 0000000..852ab98
--- /dev/null
+++ b/rocketmq-go/service/mq_fault_strategy.go
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service
+
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+)
+
+type MQFaultStrategy struct {
+}
+
+//if first select : random one
+//if has error broker before ,skip the err broker
+func selectOneMessageQueue(topicPublishInfo *model.TopicPublishInfo, lastFailedBroker string) (mqQueue model.MessageQueue, err error) {
+	queueIndex := topicPublishInfo.FetchQueueIndex()
+	queues := topicPublishInfo.MessageQueueList
+	if len(lastFailedBroker) == 0 {
+		mqQueue = queues[queueIndex]
+		return
+	}
+	for i := 0; i < len(queues); i++ {
+		nowQueueIndex := queueIndex + i
+		if nowQueueIndex >= len(queues) {
+			nowQueueIndex = nowQueueIndex - len(queues)
+		}
+		if lastFailedBroker == queues[nowQueueIndex].BrokerName {
+			continue
+		}
+		mqQueue = queues[nowQueueIndex]
+		return
+	}
+	err = errors.New("send to [" + lastFailedBroker + "] fail,no other broker")
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/offset_store.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/offset_store.go b/rocketmq-go/service/offset_store.go
new file mode 100644
index 0000000..0bfe640
--- /dev/null
+++ b/rocketmq-go/service/offset_store.go
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service
+
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+	"github.com/golang/glog"
+	"strconv"
+	"sync"
+)
+
+const (
+	MEMORY_FIRST_THEN_STORE = 0
+	READ_FROM_MEMORY        = 1
+	READ_FROM_STORE         = 2
+)
+
+type OffsetStore interface {
+	UpdateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool)
+	ReadOffset(mq *model.MessageQueue, readType int) int64
+	Persist(mq *model.MessageQueue)
+	RemoveOffset(mq *model.MessageQueue)
+}
+type RemoteOffsetStore struct {
+	groupName       string
+	mqClient        RocketMqClient
+	offsetTable     map[model.MessageQueue]int64
+	offsetTableLock sync.RWMutex
+}
+
+func RemoteOffsetStoreInit(groupName string, mqClient RocketMqClient) OffsetStore {
+	offsetStore := new(RemoteOffsetStore)
+	offsetStore.groupName = groupName
+	offsetStore.mqClient = mqClient
+	offsetStore.offsetTable = make(map[model.MessageQueue]int64)
+	return offsetStore
+}
+func (self *RemoteOffsetStore) RemoveOffset(mq *model.MessageQueue) {
+	defer self.offsetTableLock.Unlock()
+	self.offsetTableLock.Lock()
+	delete(self.offsetTable, *mq)
+}
+
+func (self *RemoteOffsetStore) Persist(mq *model.MessageQueue) {
+	brokerAddr := self.mqClient.FetchMasterBrokerAddress(mq.BrokerName)
+	if len(brokerAddr) == 0 {
+		self.mqClient.TryToFindTopicPublishInfo(mq.Topic)
+		brokerAddr = self.mqClient.FetchMasterBrokerAddress(mq.BrokerName)
+	}
+	self.offsetTableLock.RLock()
+	offset := self.offsetTable[*mq]
+	self.offsetTableLock.RUnlock()
+	updateConsumerOffsetRequestHeader := &header.UpdateConsumerOffsetRequestHeader{ConsumerGroup: self.groupName, Topic: mq.Topic, QueueId: mq.QueueId, CommitOffset: offset}
+	requestCommand := remoting.NewRemotingCommand(remoting.UPDATE_CONSUMER_OFFSET, updateConsumerOffsetRequestHeader)
+	self.mqClient.GetRemotingClient().InvokeOneWay(brokerAddr, requestCommand, 1000*5)
+}
+
+func (self *RemoteOffsetStore) ReadOffset(mq *model.MessageQueue, readType int) int64 {
+
+	switch readType {
+	case MEMORY_FIRST_THEN_STORE:
+	case READ_FROM_MEMORY:
+		self.offsetTableLock.RLock()
+		offset, ok := self.offsetTable[*mq]
+		self.offsetTableLock.RUnlock()
+		if ok {
+			return offset
+		} else {
+			return -1
+		}
+	case READ_FROM_STORE:
+		offset, err := self.fetchConsumeOffsetFromBroker(mq)
+		if err != nil {
+			glog.Error(err)
+			return -1
+		}
+		glog.V(2).Info("READ_FROM_STORE", offset)
+		self.UpdateOffset(mq, offset, false)
+		return offset
+	}
+
+	return -1
+
+}
+
+func (self *RemoteOffsetStore) fetchConsumeOffsetFromBroker(mq *model.MessageQueue) (int64, error) {
+	brokerAddr, _, found := self.mqClient.FindBrokerAddressInSubscribe(mq.BrokerName, 0, false)
+
+	if !found {
+		brokerAddr, _, found = self.mqClient.FindBrokerAddressInSubscribe(mq.BrokerName, 0, false)
+	}
+
+	if found {
+		requestHeader := &header.QueryConsumerOffsetRequestHeader{}
+		requestHeader.Topic = mq.Topic
+		requestHeader.QueueId = mq.QueueId
+		requestHeader.ConsumerGroup = self.groupName
+		return self.queryConsumerOffset(brokerAddr, requestHeader, 3000)
+	}
+
+	return -1, errors.New("fetch consumer offset error")
+}
+
+func (self RemoteOffsetStore) queryConsumerOffset(addr string, requestHeader *header.QueryConsumerOffsetRequestHeader, timeoutMillis int64) (int64, error) {
+	remotingCommand := remoting.NewRemotingCommand(remoting.QUERY_CONSUMER_OFFSET, requestHeader)
+	reponse, err := self.mqClient.GetRemotingClient().InvokeSync(addr, remotingCommand, timeoutMillis)
+	if err != nil {
+		glog.Error(err)
+		return -1, err
+	}
+
+	if reponse.Code == remoting.QUERY_NOT_FOUND {
+		return -1, nil
+	}
+
+	if offsetInter, ok := reponse.ExtFields["offset"]; ok {
+		if offsetStr, ok := offsetInter.(string); ok {
+			offset, err := strconv.ParseInt(offsetStr, 10, 64)
+			if err != nil {
+				glog.Error(err)
+				return -1, err
+			}
+			return offset, nil
+
+		}
+	}
+	glog.Error(requestHeader, reponse)
+	return -1, errors.New("query offset error")
+}
+
+func (self *RemoteOffsetStore) UpdateOffset(mq *model.MessageQueue, offset int64, increaseOnly bool) {
+	defer self.offsetTableLock.Unlock()
+	self.offsetTableLock.Lock()
+	if mq != nil {
+		if increaseOnly {
+			offsetOld := self.offsetTable[*mq]
+			if offsetOld >= offset {
+				return
+			}
+			self.offsetTable[*mq] = offset
+		} else {
+			self.offsetTable[*mq] = offset
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/offset_store_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/offset_store_service.go b/rocketmq-go/service/offset_store_service.go
deleted file mode 100644
index 302b412..0000000
--- a/rocketmq-go/service/offset_store_service.go
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package service
-
-type OffsetStore struct {
-	mqClient RocketMqClient
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/producer_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/producer_service.go b/rocketmq-go/service/producer_service.go
index a684b27..2f2a7b6 100644
--- a/rocketmq-go/service/producer_service.go
+++ b/rocketmq-go/service/producer_service.go
@@ -16,13 +16,227 @@
  */
 package service
 
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+import (
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/util"
+	"github.com/golang/glog"
+	"time"
+)
 
 type ProducerService interface {
+	CheckConfig() (err error)
+	SendDefaultImpl(message *model.Message, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error)
 }
 
 type DefaultProducerService struct {
-	producerGroup  string
-	producerConfig *config.RocketMqProducerConfig
-	mqClient       RocketMqClient
+	producerGroup   string
+	producerConfig  *config.RocketMqProducerConfig
+	mqClient        RocketMqClient
+	mqFaultStrategy MQFaultStrategy
+}
+
+func NewDefaultProducerService(producerGroup string, producerConfig *config.RocketMqProducerConfig, mqClient RocketMqClient) (defaultProducerService *DefaultProducerService) {
+	defaultProducerService = &DefaultProducerService{
+		mqClient:       mqClient,
+		producerGroup:  producerGroup,
+		producerConfig: producerConfig,
+	}
+	defaultProducerService.CheckConfig()
+	return
+}
+func (self *DefaultProducerService) CheckConfig() (err error) {
+	// todo check if not pass panic
+	return
+}
+
+func (self *DefaultProducerService) SendDefaultImpl(message *model.Message, communicationMode string, sendCallback string, timeout int64) (sendResult *model.SendResult, err error) {
+	var (
+		topicPublishInfo *model.TopicPublishInfo
+	)
+	err = self.checkMessage(message)
+	if err != nil {
+		return
+	}
+	topicPublishInfo, err = self.mqClient.TryToFindTopicPublishInfo(message.Topic)
+	if err != nil {
+		return
+	}
+	if topicPublishInfo.JudgeTopicPublishInfoOk() == false {
+		err = errors.New("topicPublishInfo is error,topic=" + message.Topic)
+		return
+	}
+	glog.V(2).Info("op=look topicPublishInfo", topicPublishInfo)
+	//if(!ok) return error
+	sendResult, err = self.sendMsgUseTopicPublishInfo(message, communicationMode, sendCallback, topicPublishInfo, timeout)
+	return
+}
+
+func (self *DefaultProducerService) producerSendMessageRequest(brokerAddr string, sendMessageHeader remoting.CustomerHeader, message *model.Message, timeout int64) (sendResult *model.SendResult, err error) {
+	remotingCommand := remoting.NewRemotingCommandWithBody(remoting.SEND_MESSAGE, sendMessageHeader, message.Body)
+	var response *remoting.RemotingCommand
+	response, err = self.mqClient.GetRemotingClient().InvokeSync(brokerAddr, remotingCommand, timeout)
+	if err != nil {
+		glog.Error(err)
+		return
+	}
+	sendResult, err = processSendResponse(brokerAddr, message, response)
+	return
+}
+func processSendResponse(brokerName string, message *model.Message, response *remoting.RemotingCommand) (sendResult *model.SendResult, err error) {
+	sendResult = &model.SendResult{}
+	switch response.Code {
+	case remoting.FLUSH_DISK_TIMEOUT:
+		{
+			sendResult.SetSendStatus(model.FlushDiskTimeout)
+			break
+		}
+	case remoting.FLUSH_SLAVE_TIMEOUT:
+		{
+			sendResult.SetSendStatus(model.FlushSlaveTimeout)
+			break
+		}
+	case remoting.SLAVE_NOT_AVAILABLE:
+		{
+			sendResult.SetSendStatus(model.SlaveNotAvaliable)
+			break
+		}
+	case remoting.SUCCESS:
+		{
+			sendResult.SetSendStatus(model.SendOK)
+			break
+		}
+	default:
+		err = errors.New("response.Code error code")
+		return
+	}
+	var responseHeader = &header.SendMessageResponseHeader{}
+	if response.ExtFields != nil {
+		responseHeader.FromMap(response.ExtFields) //change map[string]interface{} into CustomerHeader struct
+	}
+	sendResult.SetMsgID(message.Properties[constant.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX])
+	sendResult.SetOffsetMsgID(responseHeader.MsgId)
+	sendResult.SetQueueOffset(responseHeader.QueueOffset)
+	sendResult.SetTransactionID(responseHeader.TransactionId)
+	messageQueue := model.MessageQueue{Topic: message.Topic, BrokerName: brokerName,
+		QueueId: responseHeader.QueueId}
+	sendResult.SetMessageQueue(messageQueue)
+	var regionId = responseHeader.MsgRegion
+	if len(regionId) == 0 {
+		regionId = "DefaultRegion"
+	}
+	sendResult.SetRegionID(regionId)
+	return
+}
+
+func (self *DefaultProducerService) checkMessage(message *model.Message) (err error) {
+	if message == nil {
+		err = errors.New("message is nil")
+		return
+	}
+	if len(message.Topic) == 0 {
+		err = errors.New("topic is empty")
+		return
+	}
+	if message.Topic == constant.DEFAULT_TOPIC {
+		err = errors.New("the topic[" + message.Topic + "] is conflict with default topic.")
+		return
+	}
+
+	if len(message.Topic) > constant.MAX_MESSAGE_TOPIC_SIZE {
+		err = errors.New("the specified topic is longer than topic max length 255.")
+		return
+	}
+	//todo todo     public static final String VALID_PATTERN_STR = "";
+
+	if !util.MatchString(message.Topic, `^[%|a-zA-Z0-9_-]+$`) {
+		err = errors.New("the specified topic[" + message.Topic + "] contains illegal characters")
+		return
+	}
+	if len(message.Body) == 0 {
+		err = errors.New("messageBody is empty")
+		return
+	}
+	if len(message.Body) > self.producerConfig.MaxMessageSize {
+		err = errors.New("messageBody is large than " + util.IntToString(self.producerConfig.MaxMessageSize))
+		return
+	}
+	return
+}
+
+func (self *DefaultProducerService) sendMsgUseTopicPublishInfo(message *model.Message, communicationMode string, sendCallback string, topicPublishInfo *model.TopicPublishInfo, timeout int64) (sendResult *model.SendResult, err error) {
+	var (
+		sendTotalTime int
+		messageQueue  model.MessageQueue
+	)
+
+	sendTotalTime = 1
+	var lastFailedBroker = ""
+	//todo transaction
+	// todo retry
+	for i := 0; i < sendTotalTime; i++ {
+		messageQueue, err = selectOneMessageQueue(topicPublishInfo, lastFailedBroker)
+		if err != nil {
+			return
+		}
+		sendResult, err = self.doSendMessage(message, messageQueue, communicationMode, sendCallback, topicPublishInfo, timeout)
+		if err != nil {
+			// todo retry
+			return
+		}
+	}
+	return
+}
+
+func (self *DefaultProducerService) doSendMessage(message *model.Message, messageQueue model.MessageQueue,
+	communicationMode string, sendCallback string,
+	topicPublishInfo *model.TopicPublishInfo,
+	timeout int64) (sendResult *model.SendResult, err error) {
+	var (
+		brokerAddr          string
+		sysFlag             int
+		compressMessageFlag int
+	)
+	compressMessageFlag, err = self.tryToCompressMessage(message)
+	if err != nil {
+		return
+	}
+	sysFlag = sysFlag | compressMessageFlag
+	brokerAddr = self.mqClient.FetchMasterBrokerAddress(messageQueue.BrokerName)
+	if len(brokerAddr) == 0 {
+		err = errors.New("The broker[" + messageQueue.BrokerName + "] not exist")
+		return
+	}
+	message.GeneratorMsgUniqueKey()
+	sendMessageHeader := &header.SendMessageRequestHeader{
+		ProducerGroup:         self.producerGroup,
+		Topic:                 message.Topic,
+		DefaultTopic:          constant.DEFAULT_TOPIC,
+		DefaultTopicQueueNums: 4,
+		QueueId:               messageQueue.QueueId,
+		SysFlag:               sysFlag,
+		BornTimestamp:         time.Now().UnixNano() / 1000000,
+		Flag:                  message.Flag,
+		Properties:            util.MessageProperties2String(message.Properties),
+
+		UnitMode:          false,
+		ReconsumeTimes:    message.GetReconsumeTimes(),
+		MaxReconsumeTimes: message.GetMaxReconsumeTimes(),
+	}
+	sendResult, err = self.producerSendMessageRequest(brokerAddr, sendMessageHeader, message, timeout)
+	return
+}
+
+func (self *DefaultProducerService) tryToCompressMessage(message *model.Message) (compressedFlag int, err error) {
+	if len(message.Body) < self.producerConfig.CompressMsgBodyOverHowMuch {
+		compressedFlag = 0
+		return
+	}
+	compressedFlag = int(constant.CompressedFlag)
+	message.Body, err = util.CompressWithLevel(message.Body, self.producerConfig.ZipCompressLevel)
+	return
 }

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/producer_service_for_send_back.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/producer_service_for_send_back.go b/rocketmq-go/service/producer_service_for_send_back.go
new file mode 100644
index 0000000..290da27
--- /dev/null
+++ b/rocketmq-go/service/producer_service_for_send_back.go
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service
+
+import (
+	"encoding/json"
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+	"github.com/golang/glog"
+)
+
+type SendMessageBackProducerService interface {
+	SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error)
+	InitSendMessageBackProducerService(consumerGroup string, mqClient RocketMqClient, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig)
+}
+
+type SendMessageBackProducerServiceImpl struct {
+	mqClient               RocketMqClient
+	defaultProducerService *DefaultProducerService // one namesvr only one
+	consumerGroup          string
+	consumerConfig         *config.RocketMqConsumerConfig //one mq group have one
+}
+
+// send to original broker,if fail send a new retry message
+func (self *SendMessageBackProducerServiceImpl) SendMessageBack(messageExt *model.MessageExt, delayLayLevel int, brokerName string) (err error) {
+	glog.V(2).Info("op=look_send_message_back", messageExt.MsgId, messageExt.Properties, string(messageExt.Body))
+	err = self.consumerSendMessageBack(brokerName, messageExt, delayLayLevel) // todo use
+	if err == nil {
+		return
+	}
+	glog.Error(err)
+	err = self.sendRetryMessageBack(messageExt)
+	return
+}
+
+func (self *SendMessageBackProducerServiceImpl) sendRetryMessageBack(messageExt *model.MessageExt) error {
+	// todo build a retry topic todo check todo check
+	retryMessage := &model.Message{}
+	originMessageId := messageExt.GetOriginMessageId()
+	retryMessage.Properties = messageExt.Properties
+	retryMessage.SetOriginMessageId(originMessageId)
+	retryMessage.Flag = messageExt.Flag
+	retryMessage.Topic = constant.RETRY_GROUP_TOPIC_PREFIX + self.consumerGroup
+	retryMessage.Body = messageExt.Body
+	retryMessage.SetRetryTopic(messageExt.Topic)
+	retryMessage.SetReconsumeTime(messageExt.GetReconsumeTimes() + 1)
+	retryMessage.SetMaxReconsumeTimes(self.consumerConfig.MaxReconsumeTimes)
+	retryMessage.SetDelayTimeLevel(3 + messageExt.GetReconsumeTimes())
+	pp, _ := json.Marshal(retryMessage)
+	glog.Info("look retryMessage ", string(pp), string(messageExt.Body))
+	sendResult, err := self.defaultProducerService.SendDefaultImpl(retryMessage, constant.COMMUNICATIONMODE_SYNC, "", self.defaultProducerService.producerConfig.SendMsgTimeout)
+	if err != nil {
+		glog.Error(err)
+		return err
+	}
+	xx, _ := json.Marshal(sendResult)
+	glog.Info("look retryMessage result", string(xx))
+	// todo need check send result
+	return nil
+
+}
+
+func (self *SendMessageBackProducerServiceImpl) InitSendMessageBackProducerService(consumerGroup string, mqClient RocketMqClient, defaultProducerService *DefaultProducerService, consumerConfig *config.RocketMqConsumerConfig) {
+	self.mqClient = mqClient
+	self.consumerGroup = consumerGroup
+	self.defaultProducerService = defaultProducerService
+	self.consumerConfig = consumerConfig
+}
+
+func (self *SendMessageBackProducerServiceImpl) consumerSendMessageBack(brokerName string, messageExt *model.MessageExt, delayLayLevel int) (err error) {
+	if len(brokerName) == 0 {
+		err = errors.New("broker can't be empty")
+		glog.Error(err)
+		return
+	}
+	brokerAddr := self.mqClient.FetchMasterBrokerAddress(brokerName)
+	sendMsgBackHeader := &header.ConsumerSendMsgBackRequestHeader{
+		Offset:            messageExt.CommitLogOffset,
+		Group:             self.consumerGroup,
+		DelayLevel:        0, //Message consume retry strategy<br>-1,no retry,put into DLQ directly<br>0,broker control retry frequency<br>>0,client control retry frequency
+		OriginMsgId:       messageExt.MsgId,
+		OriginTopic:       messageExt.Topic,
+		UnitMode:          false,
+		MaxReconsumeTimes: int32(self.consumerConfig.MaxReconsumeTimes),
+	}
+	remotingCommand := remoting.NewRemotingCommand(remoting.CONSUMER_SEND_MSG_BACK, sendMsgBackHeader)
+	response, invokeErr := self.mqClient.GetRemotingClient().InvokeSync(brokerAddr, remotingCommand, 5000)
+	if invokeErr != nil {
+		err = invokeErr
+		return
+	}
+	if response == nil || response.Code != remoting.SUCCESS {
+		glog.Error("sendMsgBackRemarkError", response.Remark)
+		err = errors.New("send Message back error")
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/rebalance.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/rebalance.go b/rocketmq-go/service/rebalance.go
new file mode 100644
index 0000000..8f4f4fb
--- /dev/null
+++ b/rocketmq-go/service/rebalance.go
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package service
+
+import (
+	"encoding/json"
+	"errors"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/constant"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/header"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/remoting"
+	"github.com/apache/incubator-rocketmq-externals/rocketmq-go/service/allocate_message"
+	"github.com/golang/glog"
+	"sort"
+	"strings"
+	"sync"
+	"time"
+)
+
+type Rebalance struct {
+	groupName                    string
+	messageModel                 string
+	topicSubscribeInfoTableLock  sync.RWMutex
+	SubscriptionInner            map[string]*model.SubscriptionData
+	subscriptionInnerLock        sync.RWMutex
+	mqClient                     RocketMqClient
+	allocateMessageQueueStrategy service_allocate_message.AllocateMessageQueueStrategy
+	processQueueTable            map[model.MessageQueue]*model.ProcessQueue // both subscribe topic and retry group
+	processQueueTableLock        sync.RWMutex
+	mutex                        sync.Mutex
+	offsetStore                  OffsetStore
+	consumerConfig               *config.RocketMqConsumerConfig
+}
+
+func (self *Rebalance) GetMqTableInfo() map[model.MessageQueue]model.ProcessQueueInfo {
+	defer self.processQueueTableLock.RUnlock()
+	self.processQueueTableLock.RLock()
+	mqTable := map[model.MessageQueue]model.ProcessQueueInfo{}
+	for messageQueue, processQueue := range self.processQueueTable {
+		mqTable[messageQueue] = processQueue.ChangeToProcessQueueInfo()
+	}
+	return mqTable
+}
+
+func (self *Rebalance) GetProcessQueue(messageQueue model.MessageQueue) *model.ProcessQueue {
+	defer self.processQueueTableLock.RUnlock()
+	self.processQueueTableLock.RLock()
+	return self.processQueueTable[messageQueue]
+}
+
+func (self *Rebalance) ClearProcessQueue(offsetTable map[model.MessageQueue]int64) {
+	defer self.processQueueTableLock.Unlock()
+	self.processQueueTableLock.Lock()
+	for mq, _ := range offsetTable {
+		processQueue, ok := self.processQueueTable[mq]
+		if !ok {
+			continue
+		}
+		processQueue.Clear()
+	}
+
+}
+
+func (self *Rebalance) GetProcessQueueList() (messageQueueList []model.MessageQueue, processQueueList []*model.ProcessQueue) {
+	defer self.processQueueTableLock.RUnlock()
+	self.processQueueTableLock.RLock()
+	for messageQueue, processQueue := range self.processQueueTable {
+		processQueueList = append(processQueueList, processQueue)
+		messageQueueList = append(messageQueueList, messageQueue)
+	}
+	return
+}
+
+//removeUnnecessaryMessageQueue you should drop it first
+func (self *Rebalance) RemoveProcessQueue(messageQueue *model.MessageQueue) {
+	self.offsetStore.Persist(messageQueue)
+	self.offsetStore.RemoveOffset(messageQueue)
+	self.removeMessageQueueFromMap(*messageQueue)
+}
+func (self *Rebalance) removeMessageQueueFromMap(messageQueue model.MessageQueue) {
+	defer self.processQueueTableLock.Unlock()
+	self.processQueueTableLock.Lock()
+	delete(self.processQueueTable, messageQueue)
+
+}
+
+func NewRebalance(groupName string, subscription map[string]string, mqClient RocketMqClient, offsetStore OffsetStore, consumerConfig *config.RocketMqConsumerConfig) *Rebalance {
+	subscriptionInner := make(map[string]*model.SubscriptionData)
+	for topic, subExpression := range subscription {
+		subData := &model.SubscriptionData{
+			Topic:      topic,
+			SubString:  subExpression,
+			SubVersion: time.Now().Unix(),
+		}
+		subscriptionInner[topic] = subData
+	}
+	// put retry
+	retryTopic := constant.RETRY_GROUP_TOPIC_PREFIX + groupName
+	subscriptionInner[retryTopic] = &model.SubscriptionData{
+		Topic:      retryTopic,
+		SubString:  "*",
+		SubVersion: time.Now().Unix(),
+	}
+	return &Rebalance{
+		groupName:                    groupName,
+		mqClient:                     mqClient,
+		offsetStore:                  offsetStore,
+		SubscriptionInner:            subscriptionInner,
+		allocateMessageQueueStrategy: service_allocate_message.GetAllocateMessageQueueStrategyByConfig("default"),
+		messageModel:                 "CLUSTERING",
+		processQueueTable:            make(map[model.MessageQueue]*model.ProcessQueue),
+		consumerConfig:               consumerConfig,
+	}
+}
+
+func (self *Rebalance) DoRebalance() {
+	self.mutex.Lock()
+	defer self.mutex.Unlock()
+	for topic, _ := range self.SubscriptionInner {
+		self.rebalanceByTopic(topic)
+	}
+}
+
+type ConsumerIdSorter []string
+
+func (self ConsumerIdSorter) Len() int {
+	return len(self)
+}
+func (self ConsumerIdSorter) Swap(i, j int) {
+	self[i], self[j] = self[j], self[i]
+}
+func (self ConsumerIdSorter) Less(i, j int) bool {
+	if self[i] < self[j] {
+		return true
+	}
+	return false
+}
+
+func (self *Rebalance) rebalanceByTopic(topic string) error {
+	var cidAll []string
+	cidAll, err := self.findConsumerIdList(topic, self.groupName)
+	if err != nil {
+		glog.Error(err)
+		return err
+	}
+	self.topicSubscribeInfoTableLock.RLock()
+	mqs := self.mqClient.GetTopicSubscribeInfo(topic)
+	self.topicSubscribeInfoTableLock.RUnlock()
+	if len(mqs) > 0 && len(cidAll) > 0 {
+		var messageQueues model.MessageQueues = mqs
+		var consumerIdSorter ConsumerIdSorter = cidAll
+
+		sort.Sort(messageQueues)
+		sort.Sort(consumerIdSorter)
+	}
+	allocateResult, err := self.allocateMessageQueueStrategy.Allocate(self.groupName, self.mqClient.GetClientId(), mqs, cidAll)
+
+	if err != nil {
+		glog.Error(err)
+		return err
+	}
+
+	glog.V(2).Infof("rebalance topic[%s]", topic)
+	self.updateProcessQueueTableInRebalance(topic, allocateResult)
+	return nil
+}
+
+func (self *Rebalance) updateProcessQueueTableInRebalance(topic string, mqSet []model.MessageQueue) {
+	defer self.processQueueTableLock.RUnlock()
+	self.processQueueTableLock.RLock()
+	self.removeTheQueueDontBelongHere(topic, mqSet)
+	self.putTheQueueToProcessQueueTable(topic, mqSet)
+
+}
+func (self *Rebalance) removeTheQueueDontBelongHere(topic string, mqSet []model.MessageQueue) {
+	// there is n^2 todo improve
+	for key, value := range self.processQueueTable {
+		if topic != key.Topic {
+			continue
+		}
+		needDelete := true
+		for _, messageQueueItem := range mqSet {
+			if key == messageQueueItem {
+				needDelete = false
+				// todo if expire
+				break
+			}
+		}
+		if needDelete {
+			value.SetDrop(true)
+			delete(self.processQueueTable, key)
+		}
+	}
+}
+
+func (self *Rebalance) putTheQueueToProcessQueueTable(topic string, mqSet []model.MessageQueue) {
+	for index, mq := range mqSet {
+		_, ok := self.processQueueTable[mq]
+		if !ok {
+			pullRequest := new(model.PullRequest)
+			pullRequest.ConsumerGroup = self.groupName
+			pullRequest.MessageQueue = &mqSet[index]
+			pullRequest.NextOffset = self.computePullFromWhere(&mq) // todo use remote offset
+			pullRequest.ProcessQueue = model.NewProcessQueue()
+			self.processQueueTable[mq] = pullRequest.ProcessQueue
+			self.mqClient.EnqueuePullMessageRequest(pullRequest)
+		}
+	}
+
+}
+func (self *Rebalance) computePullFromWhere(mq *model.MessageQueue) int64 {
+	var result int64 = -1
+	lastOffset := self.offsetStore.ReadOffset(mq, READ_FROM_STORE)
+	switch self.consumerConfig.ConsumeFromWhere {
+	case config.CONSUME_FROM_LAST_OFFSET:
+		if lastOffset >= 0 {
+			result = lastOffset
+		} else {
+			if strings.HasPrefix(mq.Topic, constant.RETRY_GROUP_TOPIC_PREFIX) {
+				result = 0
+			} else {
+				result = self.mqClient.GetMaxOffset(mq)
+			}
+		}
+		break
+	case config.CONSUME_FROM_FIRST_OFFSET:
+		if lastOffset >= 0 {
+			result = lastOffset
+		} else {
+			result = 0 // use the begin offset
+		}
+		break
+	case config.CONSUME_FROM_TIMESTAMP:
+		if lastOffset >= 0 {
+			result = lastOffset
+		} else {
+			if strings.HasPrefix(mq.Topic, constant.RETRY_GROUP_TOPIC_PREFIX) {
+				result = 0
+			} else {
+				result = self.mqClient.SearchOffset(mq, self.consumerConfig.ConsumeTimestamp)
+			}
+		}
+		break
+	default:
+
+	}
+
+	return result
+}
+
+func (self *Rebalance) findConsumerIdList(topic string, groupName string) ([]string, error) {
+	brokerAddr, ok := self.mqClient.FindBrokerAddrByTopic(topic)
+	if !ok {
+		err := self.mqClient.UpdateTopicRouteInfoFromNameServer(topic)
+		if err != nil {
+			glog.Error(err)
+		}
+		brokerAddr, ok = self.mqClient.FindBrokerAddrByTopic(topic)
+	}
+
+	if ok {
+		return self.getConsumerIdListByGroup(brokerAddr, groupName, 3000)
+	}
+
+	return nil, errors.New("can't find broker")
+
+}
+
+func (self *Rebalance) getConsumerIdListByGroup(addr string, consumerGroup string, timeoutMillis int64) ([]string, error) {
+	requestHeader := new(header.GetConsumerListByGroupRequestHeader)
+	requestHeader.ConsumerGroup = consumerGroup
+
+	request := remoting.NewRemotingCommand(remoting.GET_CONSUMER_LIST_BY_GROUP, requestHeader)
+
+	response, err := self.mqClient.GetRemotingClient().InvokeSync(addr, request, timeoutMillis)
+	if err != nil {
+		glog.Error(err)
+		return nil, err
+	}
+	if response.Code == remoting.SUCCESS {
+		getConsumerListByGroupResponseBody := new(header.GetConsumerListByGroupResponseBody)
+		bodyjson := strings.Replace(string(response.Body), "0:", "\"0\":", -1)
+		bodyjson = strings.Replace(bodyjson, "1:", "\"1\":", -1)
+		err := json.Unmarshal([]byte(bodyjson), getConsumerListByGroupResponseBody)
+		if err != nil {
+			glog.Error(err)
+			return nil, err
+		}
+		return getConsumerListByGroupResponseBody.ConsumerIdList, nil
+	}
+	return nil, errors.New("getConsumerIdListByGroup error=" + response.Remark)
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/service/rebalance_service.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/service/rebalance_service.go b/rocketmq-go/service/rebalance_service.go
deleted file mode 100644
index acdcdd6..0000000
--- a/rocketmq-go/service/rebalance_service.go
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package service
-
-import "github.com/apache/incubator-rocketmq-externals/rocketmq-go/model/config"
-
-type Rebalance struct {
-	mqClient       RocketMqClient
-	offsetStore    OffsetStore
-	consumerConfig config.RocketMqClientConfig
-}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/tasks.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/tasks.go b/rocketmq-go/tasks.go
new file mode 100644
index 0000000..604222f
--- /dev/null
+++ b/rocketmq-go/tasks.go
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package rocketmq
+
+import (
+	"math/rand"
+	"time"
+)
+
+type TaskManager struct {
+}
+
+func (self MqClientManager) StartAllScheduledTask() {
+	rand.Seed(time.Now().UnixNano())
+	go func() {
+		updateTopicRouteTimer := time.NewTimer(5 * time.Second)
+		for {
+			<-updateTopicRouteTimer.C
+			self.UpdateTopicRouteInfoFromNameServer()
+			updateTopicRouteTimer.Reset(5 * time.Second)
+		}
+	}()
+
+	go func() {
+		heartbeatTimer := time.NewTimer(10 * time.Second)
+		for {
+			<-heartbeatTimer.C
+			self.SendHeartbeatToAllBrokerWithLock()
+			heartbeatTimer.Reset(5 * time.Second)
+		}
+	}()
+
+	go func() {
+		rebalanceTimer := time.NewTimer(15 * time.Second)
+		for {
+			<-rebalanceTimer.C
+			self.rebalanceControllr.doRebalance()
+			rebalanceTimer.Reset(30 * time.Second)
+		}
+	}()
+
+	go func() {
+		timeoutTimer := time.NewTimer(3 * time.Second)
+		for {
+			<-timeoutTimer.C
+			self.mqClient.ClearExpireResponse()
+			timeoutTimer.Reset(time.Second)
+		}
+	}()
+	self.pullMessageController.Start()
+
+	//cleanExpireMsg
+	self.cleanExpireMsgController.Start()
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/compress_util.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/compress_util.go b/rocketmq-go/util/compress_util.go
new file mode 100644
index 0000000..0617911
--- /dev/null
+++ b/rocketmq-go/util/compress_util.go
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import (
+	"bytes"
+	"compress/zlib"
+	"github.com/golang/glog"
+	"io/ioutil"
+)
+
+func UnCompress(body []byte) (unCompressBody []byte, err error) {
+	b := bytes.NewReader(body)
+	z, err := zlib.NewReader(b)
+	if err != nil {
+		glog.Error(err)
+		return
+	}
+	defer z.Close()
+	unCompressBody, err = ioutil.ReadAll(z)
+	if err != nil {
+		glog.Error(err)
+	}
+	return
+}
+func Compress(body []byte) (compressBody []byte, err error) {
+	var in bytes.Buffer
+	w := zlib.NewWriter(&in)
+	_, err = w.Write(body)
+	w.Close()
+	compressBody = in.Bytes()
+	return
+}
+
+func CompressWithLevel(body []byte, level int) (compressBody []byte, err error) {
+	var (
+		in bytes.Buffer
+		w  *zlib.Writer
+	)
+	//w := zlib.NewWriter(&in)
+	w, err = zlib.NewWriterLevel(&in, level)
+	if err != nil {
+		return
+	}
+	_, err = w.Write(body)
+	w.Close()
+	compressBody = in.Bytes()
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/concurrent_map.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/concurrent_map.go b/rocketmq-go/util/concurrent_map.go
index 2fbe9bf..9d3e273 100644
--- a/rocketmq-go/util/concurrent_map.go
+++ b/rocketmq-go/util/concurrent_map.go
@@ -1,3 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
 package util
 
 import (

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/message_client_id_generator.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/message_client_id_generator.go b/rocketmq-go/util/message_client_id_generator.go
index df4cfb6..23293c0 100644
--- a/rocketmq-go/util/message_client_id_generator.go
+++ b/rocketmq-go/util/message_client_id_generator.go
@@ -88,13 +88,13 @@ func getStartAndNextStartTime() (thisMonthFirstDay12 int64, nextMonthFirstDay12
 	now := time.Now()
 	year := now.Year()
 	month := now.Month()
-	thisMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, time.Local).UnixNano()
+	thisMonthFirstDay12 = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).UnixNano()
 	month = month + 1
 	if month > 12 {
 		month = month - 12
 		year = year + 1
 	}
-	nextMonthFirstDay12 = time.Date(year, month, 1, 12, 0, 0, 0, time.Local).UnixNano()
+	nextMonthFirstDay12 = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).UnixNano()
 	return
 }
 func bytes2string(bytes []byte) (ret string) {

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/message_properties.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/message_properties.go b/rocketmq-go/util/message_properties.go
new file mode 100644
index 0000000..59fd5b8
--- /dev/null
+++ b/rocketmq-go/util/message_properties.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import (
+	"strings"
+)
+
+//char 1 and 2 from java code
+var NAME_VALUE_SEPARATOR = string(rune(1))
+
+var PROPERTY_SEPARATOR = string(rune(2))
+
+func MessageProperties2String(propertiesMap map[string]string) (ret string) {
+	for key, value := range propertiesMap {
+		ret = ret + key + NAME_VALUE_SEPARATOR + value + PROPERTY_SEPARATOR
+	}
+	return
+}
+
+func String2MessageProperties(properties string) (ret map[string]string) {
+	ret = make(map[string]string)
+	for _, nameValueStr := range strings.Split(properties, PROPERTY_SEPARATOR) {
+		nameValuePair := strings.Split(nameValueStr, NAME_VALUE_SEPARATOR)
+		nameValueLen := len(nameValuePair)
+		if nameValueLen != 2 {
+			//glog.Error("nameValuePair is error", nameValueStr)
+			continue
+		}
+		ret[nameValuePair[0]] = nameValuePair[1]
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/28b98b09/rocketmq-go/util/regex_util.go
----------------------------------------------------------------------
diff --git a/rocketmq-go/util/regex_util.go b/rocketmq-go/util/regex_util.go
new file mode 100644
index 0000000..5357452
--- /dev/null
+++ b/rocketmq-go/util/regex_util.go
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package util
+
+import (
+	"regexp"
+)
+
+//var regexpMap map[string]*regexp.Regexp
+//var rwMutex sync.RWMutex
+
+// todo improve
+func MatchString(value, pattern string) bool {
+	re, err := regexp.Compile(pattern)
+	if err != nil {
+		return false
+	}
+	return re.MatchString(value)
+}


Mime
View raw message