rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: Add Push Consumer (#47)
Date Fri, 26 Apr 2019 06:00:52 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new cc1dfb2  Add Push Consumer (#47)
cc1dfb2 is described below

commit cc1dfb23aab9d8f18b21b587b0877ac2542b866b
Author: wenfeng <sxian.wang@gmail.com>
AuthorDate: Fri Apr 26 14:00:48 2019 +0800

    Add Push Consumer (#47)
    
    * add push consumer
    
    * add strategy
    
    * push consumer done
    
    * test push consumer
    
    * fix push_consumer fatal error
    
    * add license
---
 config.go                                        |   1 +
 consumer.go                                      | 302 ---------
 consumer/consumer.go                             | 830 +++++++++++++++++++++++
 consumer/offset_store.go                         |  55 ++
 consumer/process_queue.go                        | 100 +++
 consumer/pull_consumer.go                        | 198 ++++++
 consumer/pull_consumer_test.go                   |  18 +
 consumer/push_consumer.go                        | 606 +++++++++++++++++
 consumer/push_consumer_test.go                   |  18 +
 consumer/statistics.go                           |  60 ++
 consumer/strategy.go                             | 130 ++++
 consumer_test.go                                 |  18 -
 examples/main.go                                 |  73 --
 examples/producer.go                             |  50 --
 examples/producer/main.go                        |  47 ++
 examples/producer_orderly.go                     |  75 --
 examples/pull_consumer.go                        |  74 --
 examples/push_consumer.go                        |  59 --
 go.mod                                           |   1 +
 go.sum                                           |   2 +
 kernel/client.go                                 | 347 ++++++++--
 kernel/client_test.go                            |  45 ++
 kernel/constants.go                              |  27 +
 kernel/message.go                                |  54 +-
 kernel/model.go                                  | 129 ++--
 kernel/perm.go                                   |  30 +-
 kernel/request.go                                |  49 +-
 kernel/route.go                                  | 148 ++--
 kernel/validators.go                             |  46 ++
 remote/codes.go                                  | 147 ----
 remote/{client.go => remote_client.go}           |  33 +-
 remote/{client_test.go => remote_client_test.go} |   0
 rlog/log.go                                      |  10 -
 utils/helper.go                                  |  13 +-
 utils/helper_test.go                             |   4 +-
 utils/math.go                                    |  32 +
 utils/string.go                                  |  40 ++
 37 files changed, 2773 insertions(+), 1098 deletions(-)

diff --git a/config.go b/config.go
new file mode 100644
index 0000000..5519947
--- /dev/null
+++ b/config.go
@@ -0,0 +1 @@
+package rocketmq
diff --git a/consumer.go b/consumer.go
deleted file mode 100644
index 5119f93..0000000
--- a/consumer.go
+++ /dev/null
@@ -1,302 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package rocketmq
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"github.com/apache/rocketmq-client-go/rlog"
-	"strconv"
-	"sync"
-	"sync/atomic"
-	"time"
-)
-
-type Consumer interface {
-	Start()
-	Pull(topic, expression string, numbers int) (*kernel.PullResult, error)
-	SubscribeWithChan(topic, expression string) (chan *kernel.Message, error)
-	SubscribeWithFunc(topic, expression string, f func(msg *kernel.Message) ConsumeResult) error
-	ACK(msg *kernel.Message, result ConsumeResult)
-}
-
-var (
-	queueCounterTable sync.Map
-)
-
-type ConsumeResult int
-
-type ConsumerType int
-
-const (
-	Original ConsumerType = iota
-	Orderly
-	Transaction
-
-	SubAll = "*"
-)
-
-type ConsumerConfig struct {
-	GroupName                  string
-	Model                      kernel.MessageModel
-	UnitMode                   bool
-	MaxReconsumeTimes          int
-	PullMessageTimeout         time.Duration
-	FromWhere                  kernel.ConsumeFromWhere
-	brokerSuspendMaxTimeMillis int64
-}
-
-func NewConsumer(config ConsumerConfig) Consumer {
-	return &defaultConsumer{
-		config: config,
-	}
-}
-
-type defaultConsumer struct {
-	state  kernel.ServiceState
-	config ConsumerConfig
-}
-
-func (c *defaultConsumer) Start() {
-	c.state = kernel.Running
-}
-
-func (c *defaultConsumer) Pull(topic, expression string, numbers int) (*kernel.PullResult, error) {
-	mq := getNextQueueOf(topic)
-	if mq == nil {
-		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
-	}
-
-	data := getSubscriptionData(mq, expression)
-	result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)
-
-	if err != nil {
-		return nil, err
-	}
-
-	processPullResult(mq, result, data)
-	return result, nil
-}
-
-// SubscribeWithChan ack manually
-func (c *defaultConsumer) SubscribeWithChan(topic, expression string) (chan *kernel.Message, error) {
-	return nil, nil
-}
-
-// SubscribeWithFunc ack automatic
-func (c *defaultConsumer) SubscribeWithFunc(topic, expression string,
-	f func(msg *kernel.Message) ConsumeResult) error {
-	return nil
-}
-
-func (c *defaultConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
-
-}
-
-func (c *defaultConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, data *kernel.SubscriptionData,
-	offset int64, numbers int) (*kernel.PullResult, error) {
-	err := c.makeSureStateOK()
-	if err != nil {
-		return nil, err
-	}
-
-	if mq == nil {
-		return nil, errors.New("MessageQueue is nil")
-	}
-
-	if offset < 0 {
-		return nil, errors.New("offset < 0")
-	}
-
-	if numbers <= 0 {
-		numbers = 1
-	}
-	c.subscriptionAutomatically(mq.Topic)
-
-	brokerResult := tryFindBroker(mq)
-	if brokerResult == nil {
-		return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
-	}
-
-	if (data.ExpType == kernel.TAG) && brokerResult.BrokerVersion < kernel.V4_1_0 {
-		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
-			mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
-	}
-
-	sysFlag := buildSysFlag(false, true, true, false)
-
-	if brokerResult.Slave {
-		sysFlag = clearCommitOffsetFlag(sysFlag)
-	}
-	pullRequest := &kernel.PullMessageRequest{
-		ConsumerGroup:        c.config.GroupName,
-		Topic:                mq.Topic,
-		QueueId:              int32(mq.QueueId),
-		QueueOffset:          offset,
-		MaxMsgNums:           int32(numbers),
-		SysFlag:              sysFlag,
-		CommitOffset:         0,
-		SuspendTimeoutMillis: c.config.brokerSuspendMaxTimeMillis,
-		SubExpression:        data.SubString,
-		ExpressionType:       string(data.ExpType),
-	}
-
-	if data.ExpType == kernel.TAG {
-		pullRequest.SubVersion = 0
-	} else {
-		pullRequest.SubVersion = data.SubVersion
-	}
-
-	// TODO computePullFromWhichFilterServer
-	return kernel.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
-}
-
-func (c *defaultConsumer) makeSureStateOK() error {
-	if c.state != kernel.Running {
-		return fmt.Errorf("the consumer state is [%d], not running", c.state)
-	}
-	return nil
-}
-
-func (c *defaultConsumer) subscriptionAutomatically(topic string) {
-	// TODO
-}
-
-func (c *defaultConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
-	return 0
-}
-
-func toMessage(messageExts []*kernel.MessageExt) []*kernel.Message {
-	msgs := make([]*kernel.Message, 0)
-
-	return msgs
-}
-
-func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data *kernel.SubscriptionData) {
-	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
-	switch result.Status {
-	case kernel.PullFound:
-		msgs := result.GetMessageExts()
-		msgListFilterAgain := msgs
-		if len(data.Tags) > 0 && data.ClassFilterMode {
-			msgListFilterAgain = make([]*kernel.MessageExt, len(msgs))
-			for _, msg := range msgs {
-				_, exist := data.Tags[msg.GetTags()]
-				if exist {
-					msgListFilterAgain = append(msgListFilterAgain, msg)
-				}
-			}
-		}
-
-		// TODO hook
-
-		for _, msg := range msgListFilterAgain {
-			traFlag, _ := strconv.ParseBool(msg.Properties[kernel.TransactionPrepared])
-			if traFlag {
-				msg.TransactionId = msg.Properties[kernel.UniqueClientMessageIdKeyIndex]
-			}
-
-			msg.Properties[kernel.MinOffset] = strconv.FormatInt(result.MinOffset, 10)
-			msg.Properties[kernel.MaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
-		}
-
-		result.SetMessageExts(msgListFilterAgain)
-	}
-}
-
-func getSubscriptionData(mq *kernel.MessageQueue, exp string) *kernel.SubscriptionData {
-	subData := &kernel.SubscriptionData{
-		Topic: mq.Topic,
-	}
-	if exp == "" || exp == SubAll {
-		subData.SubString = SubAll
-	} else {
-		// TODO
-	}
-	return subData
-}
-
-func getNextQueueOf(topic string) *kernel.MessageQueue {
-	queues, err := kernel.FetchSubscribeMessageQueues(topic)
-	if err != nil && len(queues) > 0 {
-		rlog.Error(err.Error())
-		return nil
-	}
-	var index int64
-	v, exist := queueCounterTable.Load(topic)
-	if !exist {
-		index = -1
-		queueCounterTable.Store(topic, 0)
-	} else {
-		index = v.(int64)
-	}
-
-	return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
-}
-
-func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
-	var flag int32 = 0
-	if commitOffset {
-		flag |= 0x1 << 0
-	}
-
-	if suspend {
-		flag |= 0x1 << 1
-	}
-
-	if subscription {
-		flag |= 0x1 << 2
-	}
-
-	if classFilter {
-		flag |= 0x1 << 3
-	}
-
-	return flag
-}
-
-func clearCommitOffsetFlag(sysFlag int32) int32 {
-	return sysFlag & (^0x1 << 0)
-}
-
-func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
-	result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
-
-	if result == nil {
-		kernel.UpdateTopicRouteInfo(mq.Topic)
-	}
-	return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
-}
-
-var (
-	pullFromWhichNodeTable sync.Map
-)
-
-func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
-	pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
-}
-
-func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
-	v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
-	if exist {
-		return v.(int64)
-	}
-	return kernel.MasterId
-}
diff --git a/consumer/consumer.go b/consumer/consumer.go
new file mode 100644
index 0000000..60d3dd5
--- /dev/null
+++ b/consumer/consumer.go
@@ -0,0 +1,830 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/tidwall/gjson"
+	"sort"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+const (
+	// Delay some time when exception error
+	_PullDelayTimeWhenError = 3 * time.Second
+
+	// Flow control interval
+	_PullDelayTimeWhenFlowControl = 50 * time.Millisecond
+
+	// Delay some time when suspend pull service
+	_PullDelayTimeWhenSuspend = 30 * time.Second
+
+	// Long polling mode, the Consumer connection max suspend time
+	_BrokerSuspendMaxTime = 20 * time.Second
+
+	// Long polling mode, the Consumer connection timeout (must greater than _BrokerSuspendMaxTime)
+	_ConsumerTimeoutWhenSuspend = 30 * time.Second
+
+	// Offset persistent interval for consumer
+	_PersistConsumerOffsetInterval = 5 * time.Second
+)
+
+// Message model defines the way how messages are delivered to each consumer clients.
+// </p>
+//
+// RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
+// balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
+// separately.
+// </p>
+//
+// This field defaults to clustering.
+type MessageModel int
+
+const (
+	BroadCasting MessageModel = iota
+	Clustering
+)
+
+func (mode MessageModel) String() string {
+	switch mode {
+	case BroadCasting:
+		return "BroadCasting"
+	case Clustering:
+		return "Clustering"
+	default:
+		return "Unknown"
+	}
+}
+
+// Consuming point on consumer booting.
+// </p>
+//
+// There are three consuming points:
+// <ul>
+// <li>
+// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
+// If it were a newly booting up consumer client, according aging of the consumer group, there are two
+// cases:
+// <ol>
+// <li>
+// if the consumer group is created so recently that the earliest message being subscribed has yet
+// expired, which means the consumer group represents a lately launched business, consuming will
+// start from the very beginning;
+// </li>
+// <li>
+// if the earliest message being subscribed has expired, consuming will start from the latest
+// messages, meaning messages born prior to the booting timestamp would be ignored.
+// </li>
+// </ol>
+// </li>
+// <li>
+// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
+// </li>
+// <li>
+// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
+// messages born prior to {@link #consumeTimestamp} will be ignored
+// </li>
+// </ul>
+type ConsumeFromWhere int
+
+const (
+	ConsumeFromLastOffset ConsumeFromWhere = iota
+	ConsumeFromFirstOffset
+	ConsumeFromTimestamp
+)
+
+type ConsumeType string
+
+const (
+	_PullConsume = ConsumeType("pull")
+	_PushConsume = ConsumeType("push")
+)
+
+type ExpressionType string
+
+const (
+	/**
+	 * <ul>
+	 * Keywords:
+	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+	 * </ul>
+	 * <p/>
+	 * <ul>
+	 * Data type:
+	 * <li>Boolean, like: TRUE, FALSE</li>
+	 * <li>String, like: 'abc'</li>
+	 * <li>Decimal, like: 123</li>
+	 * <li>Float number, like: 3.1415</li>
+	 * </ul>
+	 * <p/>
+	 * <ul>
+	 * Grammar:
+	 * <li>{@code AND, OR}</li>
+	 * <li>{@code >, >=, <, <=, =}</li>
+	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+	 * </ul>
+	 * <p/>
+	 * <p>
+	 * Example:
+	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+	 * </p>
+	 */
+	SQL92 = ExpressionType("SQL92")
+
+	/**
+	 * Only support or operation such as
+	 * "tag1 || tag2 || tag3", <br>
+	 * If null or * expression, meaning subscribe all.
+	 */
+	TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+	if exp == "" || exp == "TAG" {
+		return true
+	}
+	return false
+}
+
+const (
+	_SubAll = "*"
+)
+
+type PullRequest struct {
+	consumerGroup string
+	mq            *kernel.MessageQueue
+	pq            *ProcessQueue
+	nextOffset    int64
+	lockedFirst   bool
+}
+
+func (pr *PullRequest) String() string {
+	return fmt.Sprintf("[ConsumerGroup: %s, Topic: %s, MessageQueue: %d]",
+		pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
+}
+
+type ConsumerOption struct {
+	kernel.ClientOption
+	// The socket timeout in milliseconds
+	ConsumerPullTimeout time.Duration
+
+	// Concurrently max span offset.it has no effect on sequential consumption
+	ConsumeConcurrentlyMaxSpan int
+
+	// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
+	// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
+	PullThresholdForQueue int
+
+	// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
+	// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+	//
+	// The size of a message only measured by message body, so it's not accurate
+	PullThresholdSizeForQueue int
+
+	// Flow control threshold on topic level, default value is -1(Unlimited)
+	//
+	// The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
+	// {@code pullThresholdForTopic} if it is't unlimited
+	//
+	// For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
+	// then pullThresholdForQueue will be set to 100
+	PullThresholdForTopic int
+
+	// Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
+	//
+	// The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
+	// {@code pullThresholdSizeForTopic} if it is't unlimited
+	//
+	// For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
+	// assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+	PullThresholdSizeForTopic int
+
+	// Message pull Interval
+	PullInterval time.Duration
+
+	// Batch consumption size
+	ConsumeMessageBatchMaxSize int
+
+	// Batch pull size
+	PullBatchSize int32
+
+	// Whether update subscription relationship when every pull
+	PostSubscriptionWhenPull bool
+
+	// Max re-consume times. -1 means 16 times.
+	//
+	// If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
+	// queue waiting.
+	MaxReconsumeTimes int
+
+	// Suspending pulling time for cases requiring slow pulling like flow-control scenario.
+	SuspendCurrentQueueTimeMillis time.Duration
+
+	// Maximum amount of time a message may block the consuming thread.
+	ConsumeTimeout time.Duration
+
+	ConsumerModel  MessageModel
+	Strategy       AllocateStrategy
+	ConsumeOrderly bool
+	FromWhere      ConsumeFromWhere
+	// TODO traceDispatcher
+}
+
+// TODO hook
+type defaultConsumer struct {
+	/**
+	 * Consumers of the same role is required to have exactly same subscriptions and consumerGroup to correctly achieve
+	 * load balance. It's required and needs to be globally unique.
+	 * </p>
+	 *
+	 * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
+	 */
+	consumerGroup  string
+	model          MessageModel
+	allocate       func(string, string, []*kernel.MessageQueue, []string) []*kernel.MessageQueue
+	unitMode       bool
+	consumeOrderly bool
+	fromWhere      ConsumeFromWhere
+
+	cType     ConsumeType
+	client    *kernel.RMQClient
+	mqChanged func(topic string, mqAll, mqDivided []*kernel.MessageQueue)
+	state     kernel.ServiceState
+	pause     bool
+	once      sync.Once
+	option    ConsumerOption
+	// key: int, hash(*kernel.MessageQueue)
+	// value: *processQueue
+	processQueueTable sync.Map
+
+	// key: topic(string)
+	// value: map[int]*kernel.MessageQueue
+	topicSubscribeInfoTable sync.Map
+
+	// key: topic
+	// value: *SubscriptionData
+	subscriptionDataTable sync.Map
+	storage               OffsetStore
+	// chan for push consumer
+	prCh chan PullRequest
+}
+
+func (dc *defaultConsumer) persistConsumerOffset() {
+	err := dc.makeSureStateOK()
+	if err != nil {
+		rlog.Errorf("consumer state error: %s", err.Error())
+		return
+	}
+	mqs := make([]*kernel.MessageQueue, 0)
+	dc.processQueueTable.Range(func(key, value interface{}) bool {
+		mqs = append(mqs, key.(*kernel.MessageQueue))
+		return true
+	})
+	dc.storage.persist(mqs)
+}
+
+func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
+	_, exist := dc.subscriptionDataTable.Load(topic)
+	// does subscribe, if true, replace it
+	if exist {
+		mqSet := make(map[int]*kernel.MessageQueue, 0)
+		for idx := range mqs {
+			mq := mqs[idx]
+			mqSet[mq.HashCode()] = mq
+		}
+		dc.topicSubscribeInfoTable.Store(topic, mqs)
+	}
+}
+
+func (dc *defaultConsumer) isSubscribeTopicNeedUpdate(topic string) bool {
+	_, exist := dc.subscriptionDataTable.Load(topic)
+	if !exist {
+		return false
+	}
+	_, exist = dc.topicSubscribeInfoTable.Load(topic)
+	return !exist
+}
+
+func (dc *defaultConsumer) doBalance() {
+	dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+		topic := key.(string)
+		if strings.HasPrefix(topic, kernel.RetryGroupTopicPrefix) {
+			return true
+		}
+		v, exist := dc.topicSubscribeInfoTable.Load(topic)
+		if !exist {
+			rlog.Warnf("do balance of group: %s, but topic: %s does not exist.", dc.consumerGroup, topic)
+			return true
+		}
+		mqs := v.([]*kernel.MessageQueue)
+		switch dc.model {
+		case BroadCasting:
+			changed := dc.updateProcessQueueTable(topic, mqs)
+			if changed {
+				dc.mqChanged(topic, mqs, mqs)
+				rlog.Infof("messageQueueChanged, Group: %s, Topic: %s, MessageQueues: %v",
+					dc.consumerGroup, topic, mqs)
+			}
+		case Clustering:
+			cidAll := dc.findConsumerList(topic)
+			if cidAll == nil {
+				rlog.Warnf("do balance for Group: %s, Topic: %s get consumer id list failed",
+					dc.consumerGroup, topic)
+				return true
+			}
+			mqAll := make([]*kernel.MessageQueue, len(mqs))
+			copy(mqAll, mqs)
+			sort.Strings(cidAll)
+			sort.SliceStable(mqAll, func(i, j int) bool {
+				v := strings.Compare(mqAll[i].Topic, mqAll[j].Topic)
+				if v != 0 {
+					return v > 0
+				}
+
+				v = strings.Compare(mqAll[i].BrokerName, mqAll[j].BrokerName)
+				if v != 0 {
+					return v > 0
+				}
+				return (mqAll[i].QueueId - mqAll[j].QueueId) > 0
+			})
+			allocateResult := dc.allocate(dc.consumerGroup, dc.client.ClientID(), mqAll, cidAll)
+			changed := dc.updateProcessQueueTable(topic, allocateResult)
+			if changed {
+				dc.mqChanged(topic, mqAll, allocateResult)
+				rlog.Infof("do balance result changed, allocateMessageQueueStrategyName=%s, group=%s, "+
+					"topic=%s, clientId=%s, mqAllSize=%d, cidAllSize=%d, rebalanceResultSize=%d, "+
+					"rebalanceResultSet=%v", string(dc.option.Strategy), dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
+					len(cidAll), len(allocateResult), allocateResult)
+
+			}
+		}
+		return true
+	})
+}
+
+func (dc *defaultConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
+	result := make([]*kernel.SubscriptionData, 0)
+	dc.subscriptionDataTable.Range(func(key, value interface{}) bool {
+		result = append(result, value.(*kernel.SubscriptionData))
+		return true
+	})
+	return result
+}
+
+func (dc *defaultConsumer) makeSureStateOK() error {
+	// TODO log
+	return nil //dc.state == StateRunning
+}
+
+type lockBatchRequestBody struct {
+	ConsumerGroup string                 `json:"consumerGroup"`
+	ClientId      string                 `json:"clientId"`
+	MQs           []*kernel.MessageQueue `json:"mqSet"`
+}
+
+func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
+	brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, kernel.MasterId, true)
+
+	if brokerResult == nil {
+		return false
+	}
+
+	body := &lockBatchRequestBody{
+		ConsumerGroup: dc.consumerGroup,
+		ClientId:      dc.client.ClientID(),
+		MQs:           []*kernel.MessageQueue{mq},
+	}
+	lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
+	var lockOK bool
+	for idx := range lockedMQ {
+		_mq := lockedMQ[idx]
+		v, exist := dc.processQueueTable.Load(_mq)
+		if exist {
+			pq := v.(*ProcessQueue)
+			pq.locked = true
+			pq.lastConsumeTime = time.Now()
+		}
+		if _mq.Equals(mq) {
+			lockOK = true
+		}
+	}
+	rlog.Infof("the message queue lock %v, %s %s", lockOK, dc.consumerGroup, mq.String())
+	return lockOK
+}
+
+func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
+	brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, kernel.MasterId, true)
+
+	if brokerResult == nil {
+		return
+	}
+
+	body := &lockBatchRequestBody{
+		ConsumerGroup: dc.consumerGroup,
+		ClientId:      dc.client.ClientID(),
+		MQs:           []*kernel.MessageQueue{mq},
+	}
+	dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
+	rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
+		dc.consumerGroup, dc.client.ClientID(), mq.String())
+}
+
+func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
+	mqMapSet := dc.buildProcessQueueTableByBrokerName()
+	for broker, mqs := range mqMapSet {
+		if len(mqs) == 0 {
+			continue
+		}
+		brokerResult := kernel.FindBrokerAddressInSubscribe(broker, kernel.MasterId, true)
+		if brokerResult == nil {
+			continue
+		}
+		body := &lockBatchRequestBody{
+			ConsumerGroup: dc.consumerGroup,
+			ClientId:      dc.client.ClientID(),
+			MQs:           mqs,
+		}
+		lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
+		set := make(map[int]bool, 0)
+		for idx := range lockedMQ {
+			_mq := lockedMQ[idx]
+			v, exist := dc.processQueueTable.Load(_mq)
+			if exist {
+				pq := v.(*ProcessQueue)
+				pq.locked = true
+				pq.lastConsumeTime = time.Now()
+			}
+			set[_mq.HashCode()] = true
+		}
+		for idx := range mqs {
+			_mq := mqs[idx]
+			if !set[_mq.HashCode()] {
+				v, exist := dc.processQueueTable.Load(_mq)
+				if exist {
+					pq := v.(*ProcessQueue)
+					pq.locked = true
+					pq.lastLockTime = time.Now()
+					rlog.Warnf("the message queue: %s locked Failed, Group: %s", mq.String(), dc.consumerGroup)
+				}
+			}
+		}
+	}
+}
+
+func (dc *defaultConsumer) unlockAll(oneway bool) {
+	mqMapSet := dc.buildProcessQueueTableByBrokerName()
+	for broker, mqs := range mqMapSet {
+		if len(mqs) == 0 {
+			continue
+		}
+		brokerResult := kernel.FindBrokerAddressInSubscribe(broker, kernel.MasterId, true)
+		if brokerResult == nil {
+			continue
+		}
+		body := &lockBatchRequestBody{
+			ConsumerGroup: dc.consumerGroup,
+			ClientId:      dc.client.ClientID(),
+			MQs:           mqs,
+		}
+		dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
+		for idx := range mqs {
+			_mq := mqs[idx]
+			v, exist := dc.processQueueTable.Load(_mq)
+			if exist {
+				v.(*ProcessQueue).locked = false
+				rlog.Warnf("the message queue: %s locked Failed, Group: %s", _mq.String(), dc.consumerGroup)
+			}
+		}
+	}
+}
+
+func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []kernel.MessageQueue {
+	data, _ := json.Marshal(body)
+	request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
+	response, err := remote.InvokeSync(addr, request, 1*time.Second)
+	if err != nil {
+		rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
+		return nil
+	}
+	lockOKMQSet := struct {
+		MQs []kernel.MessageQueue `json:"lockOKMQSet"`
+	}{}
+	err = json.Unmarshal(response.Body, &lockOKMQSet)
+	if err != nil {
+		rlog.Errorf("Unmarshal lock mq body error %s", err.Error())
+		return nil
+	}
+	return lockOKMQSet.MQs
+}
+
+func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, oneway bool) {
+	data, _ := json.Marshal(body)
+	request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
+	if oneway {
+		err := remote.InvokeOneWay(addr, request)
+		if err != nil {
+			rlog.Errorf("lock mq to broker with oneway: %s error %s", addr, err.Error())
+		}
+	} else {
+		response, err := remote.InvokeSync(addr, request, 1*time.Second)
+		if err != nil {
+			rlog.Errorf("lock mq to broker: %s error %s", addr, err.Error())
+		}
+		if response.Code != kernel.ResSuccess {
+			// TODO error
+		}
+	}
+}
+
+func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*kernel.MessageQueue {
+	result := make(map[string][]*kernel.MessageQueue, 0)
+
+	dc.processQueueTable.Range(func(key, value interface{}) bool {
+		mq := key.(*kernel.MessageQueue)
+		mqs, exist := result[mq.BrokerName]
+		if !exist {
+			mqs = make([]*kernel.MessageQueue, 0)
+		}
+		mqs = append(mqs, mq)
+		result[mq.BrokerName] = mqs
+		return true
+	})
+
+	return result
+}
+
+func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.MessageQueue) bool {
+	var changed bool
+	mqSet := make(map[*kernel.MessageQueue]bool)
+	for idx := range mqs {
+		mqSet[mqs[idx]] = true
+	}
+	// TODO
+	dc.processQueueTable.Range(func(key, value interface{}) bool {
+		mq := key.(*kernel.MessageQueue)
+		pq := value.(*ProcessQueue)
+		if mq.Topic == topic {
+			if !mqSet[mq] {
+				pq.dropped = true
+				if dc.removeUnnecessaryMessageQueue(mq, pq) {
+					delete(mqSet, mq)
+					changed = true
+					rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s", dc.consumerGroup, mq.String())
+				}
+			} else if pq.isPullExpired() && dc.cType == _PushConsume {
+				pq.dropped = true
+				if dc.removeUnnecessaryMessageQueue(mq, pq) {
+					delete(mqSet, mq)
+					changed = true
+					rlog.Infof("do defaultConsumer, Group:%s, remove unnecessary mq: %s, "+
+						"because pull was paused, so try to fixed it", dc.consumerGroup, mq)
+				}
+			}
+		}
+		return true
+	})
+
+	if dc.cType == _PushConsume {
+		for mq := range mqSet {
+			if dc.consumeOrderly && !dc.lock(mq) {
+				rlog.Warnf("do defaultConsumer, Group:%s add a new mq failed, %s, because lock failed",
+					dc.consumerGroup, mq.String())
+				continue
+			}
+			dc.storage.remove(mq)
+			nextOffset := dc.computePullFromWhere(mq)
+			if nextOffset >= 0 {
+				_, exist := dc.processQueueTable.Load(mq)
+				if exist {
+					rlog.Debugf("do defaultConsumer, Group: %s, mq already exist, %s", dc.consumerGroup, mq.String())
+				} else {
+					rlog.Infof("do defaultConsumer, Group: %s, add a new mq, %s", dc.consumerGroup, mq.String())
+					pq := &ProcessQueue{}
+					dc.processQueueTable.Store(mq, pq)
+					pr := PullRequest{
+						consumerGroup: dc.consumerGroup,
+						mq:            mq,
+						pq:            pq,
+						nextOffset:    nextOffset,
+					}
+					dc.prCh <- pr
+					changed = true
+				}
+			} else {
+				rlog.Warnf("do defaultConsumer failed, Group:%s, add new mq failed, {}", dc.consumerGroup, mq)
+			}
+		}
+	}
+
+	return changed
+}
+
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *ProcessQueue) bool {
+	dc.storage.persist([]*kernel.MessageQueue{mq})
+	dc.storage.remove(mq)
+	if dc.cType == _PushConsume && dc.consumeOrderly && Clustering == dc.model {
+		// TODO
+	}
+	return true
+}
+
+func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
+	if dc.cType == _PullConsume {
+		return 0
+	}
+	var result = int64(-1)
+	lastOffset := dc.storage.read(mq, _ReadFromStore)
+	if lastOffset >= 0 {
+		result = lastOffset
+	} else {
+		switch dc.fromWhere {
+		case ConsumeFromLastOffset:
+			if lastOffset == -1 {
+				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
+					lastOffset, err := kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+					if err == nil {
+						result = lastOffset
+					} else {
+						rlog.Warnf("query max offset of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+					}
+				}
+			}
+		case ConsumeFromFirstOffset:
+			if lastOffset == -1 {
+				result = 0
+			}
+		case ConsumeFromTimestamp:
+			if lastOffset == -1 {
+				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
+					lastOffset, err := kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+					if err == nil {
+						result = lastOffset
+					} else {
+						rlog.Warnf("query max offset of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
+					}
+				} else {
+					// TODO parse timestamp
+				}
+			}
+		default:
+		}
+	}
+
+	return result
+}
+
+func (dc *defaultConsumer) findConsumerList(topic string) []string {
+	brokerAddr := kernel.FindBrokerAddrByTopic(topic)
+	if brokerAddr == "" {
+		kernel.UpdateTopicRouteInfo(topic)
+		brokerAddr = kernel.FindBrokerAddrByTopic(topic)
+	}
+
+	if brokerAddr != "" {
+		req := &kernel.GetConsumerList{
+			ConsumerGroup: dc.consumerGroup,
+		}
+		cmd := remote.NewRemotingCommand(kernel.ReqGetConsumerListByGroup, req, nil)
+		res, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second) // TODO 超时机制有问题
+		if err != nil {
+			rlog.Errorf("get consumer list of [%s] from %s error: %s", dc.consumerGroup, brokerAddr, err.Error())
+			return nil
+		}
+		result := gjson.ParseBytes(res.Body)
+		list := make([]string, 0)
+		arr := result.Get("consumerIdList").Array()
+		for idx := range arr {
+			list = append(list, arr[idx].String())
+		}
+		return list
+	}
+	return nil
+}
+
+func buildSubscriptionData(topic string, selector MessageSelector) *kernel.SubscriptionData {
+	subData := &kernel.SubscriptionData{
+		Topic:     topic,
+		SubString: selector.Expression,
+		ExpType:   string(selector.Type),
+	}
+
+	if selector.Type != "" && selector.Type != TAG {
+		return subData
+	}
+
+	if selector.Expression == "" || selector.Expression == _SubAll {
+		subData.ExpType = string(TAG)
+		subData.SubString = _SubAll
+	} else {
+		tags := strings.Split(selector.Expression, "\\|\\|")
+		for idx := range tags {
+			trimString := strings.Trim(tags[idx], " ")
+			if trimString != "" {
+				if !subData.Tags[trimString] {
+					subData.Tags[trimString] = true
+				}
+				hCode := utils.HashString(trimString)
+				if !subData.Codes[int32(hCode)] {
+					subData.Codes[int32(hCode)] = true
+				}
+			}
+		}
+	}
+	return subData
+}
+
+func getNextQueueOf(topic string) *kernel.MessageQueue {
+	queues, err := kernel.FetchSubscribeMessageQueues(topic)
+	if err != nil && len(queues) > 0 {
+		rlog.Error(err.Error())
+		return nil
+	}
+	var index int64
+	v, exist := queueCounterTable.Load(topic)
+	if !exist {
+		index = -1
+		queueCounterTable.Store(topic, 0)
+	} else {
+		index = v.(int64)
+	}
+
+	return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
+}
+
+func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
+	var flag int32 = 0
+	if commitOffset {
+		flag |= 0x1 << 0
+	}
+
+	if suspend {
+		flag |= 0x1 << 1
+	}
+
+	if subscription {
+		flag |= 0x1 << 2
+	}
+
+	if classFilter {
+		flag |= 0x1 << 3
+	}
+
+	return flag
+}
+
+func clearCommitOffsetFlag(sysFlag int32) int32 {
+	return sysFlag & (^0x1 << 0)
+}
+
+func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+	result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+
+	if result == nil {
+		kernel.UpdateTopicRouteInfo(mq.Topic)
+	}
+	return kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
+}
+
+var (
+	pullFromWhichNodeTable sync.Map
+)
+
+func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+	pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
+}
+
+func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+	v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
+	if exist {
+		return v.(int64)
+	}
+	return kernel.MasterId
+}
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
new file mode 100644
index 0000000..a11280d
--- /dev/null
+++ b/consumer/offset_store.go
@@ -0,0 +1,55 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import "github.com/apache/rocketmq-client-go/kernel"
+
+type readType int
+
+const (
+	_ReadFromMemory readType = iota
+	_ReadFromStore
+	_ReadMemoryThenStore
+)
+
+type OffsetStore interface {
+	load()
+	persist(mqs []*kernel.MessageQueue)
+	remove(mq *kernel.MessageQueue)
+	read(mq *kernel.MessageQueue, t readType) int64
+	update(mq *kernel.MessageQueue, offset int64, increaseOnly bool)
+}
+
+type localFileOffsetStore struct {
+}
+
+func (local *localFileOffsetStore) load()                                                           {}
+func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue)                              {}
+func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue)                                  {}
+func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int64                  { return 0 }
+func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {}
+
+type remoteBrokerOffsetStore struct {
+}
+
+func (remote *remoteBrokerOffsetStore) load()                                          {}
+func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue)             {}
+func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue)                 {}
+func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 { return 0 }
+func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+}
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
new file mode 100644
index 0000000..0db0766
--- /dev/null
+++ b/consumer/process_queue.go
@@ -0,0 +1,100 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"container/list"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"sync"
+	"time"
+)
+
+const (
+	_RebalanceLockMaxTime = 30 * time.Second
+	_RebalanceInterval    = 20 * time.Second
+	_PullMaxIdleTime      = 120 * time.Second
+)
+
+type ProcessQueue struct {
+	mutex                      sync.RWMutex
+	msgCache                   list.List // sorted
+	cachedMsgCount             int
+	cachedMsgSize              int64
+	consumeLock                sync.Mutex
+	consumingMsgOrderlyTreeMap sync.Map
+	tryUnlockTimes             int64
+	queueOffsetMax             int64
+	dropped                    bool
+	lastPullTime               time.Time
+	lastConsumeTime            time.Time
+	locked                     bool
+	lastLockTime               time.Time
+	consuming                  bool
+	msgAccCnt                  int64
+	once                       sync.Once
+}
+
+func (pq *ProcessQueue) isPullExpired() bool {
+	return false
+}
+
+func (pq *ProcessQueue) getMaxSpan() int {
+	return pq.msgCache.Len()
+}
+
+func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
+	pq.once.Do(func() {
+		pq.msgCache.Init()
+	})
+	localList := list.New()
+	for idx := range messages {
+		localList.PushBack(messages[idx])
+	}
+	pq.mutex.Lock()
+	pq.msgCache.PushBackList(localList)
+	pq.mutex.Unlock()
+}
+
+func (pq *ProcessQueue) removeMessage(number int) int {
+	i := 0
+	pq.mutex.Lock()
+	for ; i < number && pq.msgCache.Len() > 0; i++ {
+		pq.msgCache.Remove(pq.msgCache.Front())
+	}
+	pq.mutex.Unlock()
+	return i
+}
+
+func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
+	for pq.msgCache.Len() == 0 {
+		time.Sleep(10 * time.Millisecond)
+	}
+	result := make([]*kernel.MessageExt, number)
+	i := 0
+	pq.mutex.Lock()
+	for ; i < number; i++ {
+		e := pq.msgCache.Front()
+		if e == nil {
+			break
+		}
+		result[i] = e.Value.(*kernel.MessageExt)
+		pq.msgCache.Remove(e)
+	}
+	pq.mutex.Unlock()
+	return result[:i]
+}
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
new file mode 100644
index 0000000..1b4f950
--- /dev/null
+++ b/consumer/pull_consumer.go
@@ -0,0 +1,198 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"strconv"
+	"sync"
+)
+
+type MessageSelector struct {
+	Type       ExpressionType
+	Expression string
+}
+
+type PullConsumer interface {
+	Start()
+	Shutdown()
+	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error)
+}
+
+var (
+	queueCounterTable sync.Map
+)
+
+func NewConsumer(config ConsumerOption) *defaultPullConsumer {
+	return &defaultPullConsumer{
+		option: config,
+	}
+}
+
+type defaultPullConsumer struct {
+	state     kernel.ServiceState
+	option    ConsumerOption
+	client    *kernel.RMQClient
+	GroupName string
+	Model     MessageModel
+	UnitMode  bool
+}
+
+func (c *defaultPullConsumer) Start() {
+	c.state = kernel.StateRunning
+}
+
+func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error) {
+	mq := getNextQueueOf(topic)
+	if mq == nil {
+		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
+	}
+
+	data := buildSubscriptionData(mq.Topic, selector)
+	result, err := c.pull(context.Background(), mq, data, c.nextOffsetOf(mq), numbers)
+
+	if err != nil {
+		return nil, err
+	}
+
+	processPullResult(mq, result, data)
+	return result, nil
+}
+
+// SubscribeWithChan ack manually
+func (c *defaultPullConsumer) SubscribeWithChan(topic, selector MessageSelector) (chan *kernel.Message, error) {
+	return nil, nil
+}
+
+// SubscribeWithFunc ack automatic
+func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector MessageSelector,
+	f func(msg *kernel.Message) ConsumeResult) error {
+	return nil
+}
+
+func (c *defaultPullConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+
+}
+
+func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, data *kernel.SubscriptionData,
+	offset int64, numbers int) (*kernel.PullResult, error) {
+	err := c.makeSureStateOK()
+	if err != nil {
+		return nil, err
+	}
+
+	if mq == nil {
+		return nil, errors.New("MessageQueue is nil")
+	}
+
+	if offset < 0 {
+		return nil, errors.New("offset < 0")
+	}
+
+	if numbers <= 0 {
+		numbers = 1
+	}
+	c.subscriptionAutomatically(mq.Topic)
+
+	brokerResult := tryFindBroker(mq)
+	if brokerResult == nil {
+		return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
+	}
+
+	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < kernel.V4_1_0 {
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
+			mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	}
+
+	sysFlag := buildSysFlag(false, true, true, false)
+
+	if brokerResult.Slave {
+		sysFlag = clearCommitOffsetFlag(sysFlag)
+	}
+	pullRequest := &kernel.PullMessageRequest{
+		ConsumerGroup:        c.GroupName,
+		Topic:                mq.Topic,
+		QueueId:              int32(mq.QueueId),
+		QueueOffset:          offset,
+		MaxMsgNums:           int32(numbers),
+		SysFlag:              sysFlag,
+		CommitOffset:         0,
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        data.SubString,
+		ExpressionType:       string(data.ExpType),
+	}
+
+	if data.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = data.SubVersion
+	}
+
+	// TODO computePullFromWhichFilterServer
+	return c.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+}
+
+func (c *defaultPullConsumer) makeSureStateOK() error {
+	if c.state != kernel.StateRunning {
+		return fmt.Errorf("the consumer state is [%d], not running", c.state)
+	}
+	return nil
+}
+
+func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
+	// TODO
+}
+
+func (c *defaultPullConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+	return 0
+}
+
+func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data *kernel.SubscriptionData) {
+	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+	switch result.Status {
+	case kernel.PullFound:
+		msgs := result.GetMessageExts()
+		msgListFilterAgain := msgs
+		if len(data.Tags) > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*kernel.MessageExt, len(msgs))
+			for _, msg := range msgs {
+				_, exist := data.Tags[msg.GetTags()]
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+
+		// TODO hook
+
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.Properties[kernel.PropertyTransactionPrepared])
+			if traFlag {
+				msg.TransactionId = msg.Properties[kernel.PropertyUniqueClientMessageIdKeyIndex]
+			}
+
+			msg.Properties[kernel.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
+			msg.Properties[kernel.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
+		}
+
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
diff --git a/consumer/pull_consumer_test.go b/consumer/pull_consumer_test.go
new file mode 100644
index 0000000..f7bc454
--- /dev/null
+++ b/consumer/pull_consumer_test.go
@@ -0,0 +1,18 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
new file mode 100644
index 0000000..58f4c68
--- /dev/null
+++ b/consumer/push_consumer.go
@@ -0,0 +1,606 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"math"
+	"strconv"
+	"time"
+)
+
+// In most scenarios, this is the mostly recommended usage to consume messages.
+//
+// Technically speaking, this push client is virtually a wrapper of the underlying pull service. Specifically, on
+// arrival of messages pulled from brokers, it roughly invokes the registered callback handler to feed the messages.
+//
+// See quick start/Consumer in the example module for a typical usage.
+//
+// <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
+type ConsumeResult int
+
+const (
+	Mb                           = 1024 * 1024
+	ConsumeSuccess ConsumeResult = iota
+	ConsumeRetryLater
+)
+
+type PushConsumer interface {
+	Start() error
+	Shutdown()
+	Subscribe(topic string, selector MessageSelector,
+		f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
+}
+
+type pushConsumer struct {
+	*defaultConsumer
+	/**
+	 * Backtracking consumption time with second precision. Time format is
+	 * 20131223171201<br>
+	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
+	 * Default backtracking consumption time Half an hour ago.
+	 */
+	ConsumeTimestamp             time.Duration
+	queueFlowControlTimes        int
+	queueMaxSpanFlowControlTimes int
+	consume                      func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)
+	submitToConsume              func(*ProcessQueue, *kernel.MessageQueue)
+	subscribedTopic              map[string]string
+}
+
+func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer {
+	dc := &defaultConsumer{
+		consumerGroup:  consumerGroup,
+		cType:          _PushConsume,
+		state:          kernel.StateCreateJust,
+		prCh:           make(chan PullRequest, 4),
+		model:          opt.ConsumerModel,
+		consumeOrderly: opt.ConsumeOrderly,
+		fromWhere:      opt.FromWhere,
+		option:         opt,
+	}
+
+	switch opt.Strategy {
+	case StrategyAveragely:
+		dc.allocate = allocateByAveragely
+	case StrategyAveragelyCircle:
+		dc.allocate = allocateByAveragelyCircle
+	case StrategyConfig:
+		dc.allocate = allocateByConfig
+	case StrategyConsistentHash:
+		dc.allocate = allocateByConsistentHash
+	case StrategyMachineNearby:
+		dc.allocate = allocateByMachineNearby
+	case StrategyMachineRoom:
+		dc.allocate = allocateByMachineRoom
+	default:
+		dc.allocate = allocateByAveragely
+	}
+
+	p := &pushConsumer{
+		defaultConsumer:  dc,
+		ConsumeTimestamp: 30 * time.Minute,
+		subscribedTopic:  make(map[string]string, 0),
+	}
+	dc.mqChanged = p.messageQueueChanged
+	if p.consumeOrderly {
+		p.submitToConsume = p.consumeMessageOrderly
+	} else {
+		p.submitToConsume = p.consumeMessageCurrently
+	}
+	return p
+}
+
+func (pc *pushConsumer) Start() error {
+	var err error
+	pc.once.Do(func() {
+		rlog.Infof("the consumerGroup=%s start beginning. messageModel=%v, unitMode=%v",
+			pc.consumerGroup, pc.model, pc.unitMode)
+		pc.state = kernel.StateStartFailed
+		pc.validate()
+
+		// set retry topic
+		if pc.model == Clustering {
+			retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
+			pc.subscriptionDataTable.Store(retryTopic, buildSubscriptionData(retryTopic,
+				MessageSelector{TAG, _SubAll}))
+		}
+
+		pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
+		if pc.model == Clustering {
+			pc.option.ChangeInstanceNameToPID()
+			pc.storage = &remoteBrokerOffsetStore{}
+		} else {
+			pc.storage = &localFileOffsetStore{}
+		}
+		pc.storage.load()
+		go func() {
+			// todo start clean msg expired
+			// TODO quit
+			for {
+				pr := <-pc.prCh
+				go func() {
+					fmt.Println(pr.String())
+					pc.pullMessage(&pr)
+				}()
+			}
+		}()
+
+		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
+		if err != nil {
+			pc.state = kernel.StateCreateJust
+			rlog.Errorf("the consumer group: [%s] has been created, specify another name.", pc.consumerGroup)
+			err = errors.New("consumer group has been created")
+			return
+		}
+		pc.client.UpdateTopicRouteInfo()
+		pc.client.RebalanceImmediately()
+		pc.client.Start()
+		pc.state = kernel.StateRunning
+	})
+
+	pc.client.UpdateTopicRouteInfo()
+	pc.client.RebalanceImmediately()
+	pc.client.CheckClientInBroker()
+	pc.client.SendHeartbeatToAllBrokerWithLock()
+	return err
+}
+
+func (pc *pushConsumer) Shutdown() {}
+
+func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
+	f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error {
+	if pc.state != kernel.StateCreateJust {
+		return errors.New("subscribe topic only started before")
+	}
+	data := buildSubscriptionData(topic, selector)
+	pc.subscriptionDataTable.Store(topic, data)
+	pc.subscribedTopic[topic] = ""
+	pc.consume = f
+	return nil
+}
+
+func (pc *pushConsumer) Rebalance() {
+	pc.defaultConsumer.doBalance()
+}
+
+func (pc *pushConsumer) PersistConsumerOffset() {
+	pc.defaultConsumer.persistConsumerOffset()
+}
+
+func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
+	pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
+}
+
+func (pc *pushConsumer) IsSubscribeTopicNeedUpdate(topic string) bool {
+	return pc.defaultConsumer.isSubscribeTopicNeedUpdate(topic)
+}
+
+func (pc *pushConsumer) SubscriptionDataList() []*kernel.SubscriptionData {
+	return pc.defaultConsumer.SubscriptionDataList()
+}
+
+func (pc *pushConsumer) IsUnitMode() bool {
+	return pc.unitMode
+}
+
+func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*kernel.MessageQueue) {
+	// TODO
+}
+
+func (pc *pushConsumer) validate() {
+	kernel.ValidateGroup(pc.consumerGroup)
+
+	if pc.consumerGroup == kernel.DefaultConsumerGroup {
+		// TODO FQA
+		rlog.Fatalf("consumerGroup can't equal [%s], please specify another one.", kernel.DefaultConsumerGroup)
+	}
+
+	if len(pc.subscribedTopic) == 0 {
+		rlog.Fatal("number of subscribed topics is 0.")
+	}
+
+	if pc.option.ConsumeConcurrentlyMaxSpan < 1 || pc.option.ConsumeConcurrentlyMaxSpan > 65535 {
+		if pc.option.ConsumeConcurrentlyMaxSpan == 0 {
+			pc.option.ConsumeConcurrentlyMaxSpan = 1000
+		} else {
+			rlog.Fatal("option.ConsumeConcurrentlyMaxSpan out of range [1, 65535]")
+		}
+	}
+
+	if pc.option.PullThresholdForQueue < 1 || pc.option.PullThresholdForQueue > 65535 {
+		if pc.option.PullThresholdForQueue == 0 {
+			pc.option.PullThresholdForQueue = 1024
+		} else {
+			rlog.Fatal("option.PullThresholdForQueue out of range [1, 65535]")
+		}
+	}
+
+	if pc.option.PullThresholdForTopic < 1 || pc.option.PullThresholdForTopic > 6553500 {
+		if pc.option.PullThresholdForTopic == 0 {
+			pc.option.PullThresholdForTopic = 102400
+		} else {
+			rlog.Fatal("option.PullThresholdForTopic out of range [1, 6553500]")
+		}
+	}
+
+	if pc.option.PullThresholdSizeForQueue < 1 || pc.option.PullThresholdSizeForQueue > 1024 {
+		if pc.option.PullThresholdSizeForQueue == 0 {
+			pc.option.PullThresholdSizeForQueue = 512
+		} else {
+			rlog.Fatal("option.PullThresholdSizeForQueue out of range [1, 1024]")
+		}
+	}
+
+	if pc.option.PullThresholdSizeForTopic < 1 || pc.option.PullThresholdSizeForTopic > 102400 {
+		if pc.option.PullThresholdSizeForTopic == 0 {
+			pc.option.PullThresholdSizeForTopic = 51200
+		} else {
+			rlog.Fatal("option.PullThresholdSizeForTopic out of range [1, 102400]")
+		}
+	}
+
+	if pc.option.PullInterval < 0 || pc.option.PullInterval > 65535 {
+		rlog.Fatal("option.PullInterval out of range [0, 65535]")
+	}
+
+	if pc.option.ConsumeMessageBatchMaxSize < 1 || pc.option.ConsumeMessageBatchMaxSize > 1024 {
+		if pc.option.ConsumeMessageBatchMaxSize == 0 {
+			pc.option.ConsumeMessageBatchMaxSize = 512
+		} else {
+			rlog.Fatal("option.ConsumeMessageBatchMaxSize out of range [1, 1024]")
+		}
+	}
+
+	if pc.option.PullBatchSize < 1 || pc.option.PullBatchSize > 1024 {
+		if pc.option.PullBatchSize == 0 {
+			pc.option.PullBatchSize = 1
+		} else {
+			rlog.Fatal("option.PullBatchSize out of range [1, 1024]")
+		}
+	}
+}
+
+func (pc *pushConsumer) pullMessage(request *PullRequest) {
+	rlog.Infof("start a nwe Pull Message task %s for [%s]", request.String(), pc.consumerGroup)
+	var sleepTime time.Duration
+	pq := request.pq
+	go func() {
+		for {
+			pc.submitToConsume(request.pq, request.mq)
+		}
+	}()
+	for {
+	NEXT:
+		if pq.dropped {
+			rlog.Infof("the request: [%s] was dropped, so stop task", request.String())
+			return
+		}
+		if sleepTime > 0 {
+			rlog.Infof("pull MessageQueue: %d sleep %d ms", request.mq.QueueId, sleepTime/time.Millisecond)
+			time.Sleep(sleepTime)
+		}
+		// reset time
+		sleepTime = pc.option.PullInterval
+		pq.lastPullTime = time.Now()
+		err := pc.makeSureStateOK()
+		rlog.Debugf("pull MessageQueue: %d", request.mq.QueueId)
+		if err != nil {
+			rlog.Warnf("consumer state error: %s", err.Error())
+			sleepTime = _PullDelayTimeWhenError
+			goto NEXT
+		}
+
+		if pc.pause {
+			rlog.Infof("consumer [%s] of [%s] was paused, execute pull request [%s] later",
+				pc.option.InstanceName, pc.consumerGroup, request.String())
+			sleepTime = _PullDelayTimeWhenSuspend
+			goto NEXT
+		}
+
+		cachedMessageSizeInMiB := int(pq.cachedMsgSize / Mb)
+		if pq.cachedMsgCount > pc.option.PullThresholdForQueue {
+			if pc.queueFlowControlTimes%1000 == 0 {
+				rlog.Warnf("the cached message count exceeds the threshold %d, so do flow control, "+
+					"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
+					pc.option.PullThresholdForQueue, 0, pq.msgCache.Front().Value.(int64),
+					pq.msgCache.Back().Value.(int64),
+					pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
+			}
+			pc.queueFlowControlTimes++
+			sleepTime = _PullDelayTimeWhenFlowControl
+			goto NEXT
+		}
+
+		if cachedMessageSizeInMiB > pc.option.PullThresholdSizeForQueue {
+			if pc.queueFlowControlTimes%1000 == 0 {
+				rlog.Warnf("the cached message size exceeds the threshold %d MiB, so do flow control, "+
+					"minOffset=%d, maxOffset=%d, count=%d, size=%d MiB, pullRequest=%s, flowControlTimes=%d",
+					pc.option.PullThresholdSizeForQueue, 0, //processQueue.getMsgTreeMap().firstKey(),
+					0, // TODO processQueue.getMsgTreeMap().lastKey(),
+					pq.msgCache, cachedMessageSizeInMiB, request.String(), pc.queueFlowControlTimes)
+			}
+			pc.queueFlowControlTimes++
+			sleepTime = _PullDelayTimeWhenFlowControl
+			goto NEXT
+		}
+
+		if !pc.consumeOrderly {
+			if pq.getMaxSpan() > pc.option.ConsumeConcurrentlyMaxSpan {
+
+				if pc.queueMaxSpanFlowControlTimes%1000 == 0 {
+					rlog.Warnf("the queue's messages, span too long, so do flow control, minOffset=%d, "+
+						"maxOffset=%d, maxSpan=%d, pullRequest=%s, flowControlTimes=%d",
+						0, //processQueue.getMsgTreeMap().firstKey(),
+						0, // processQueue.getMsgTreeMap().lastKey(),
+						pq.getMaxSpan(),
+						request.String(), pc.queueMaxSpanFlowControlTimes)
+				}
+				sleepTime = _PullDelayTimeWhenFlowControl
+				goto NEXT
+			}
+		} else {
+			if pq.locked {
+				if !request.lockedFirst {
+					offset := pc.computePullFromWhere(request.mq)
+					brokerBusy := offset < request.nextOffset
+					rlog.Infof("the first time to pull message, so fix offset from broker. "+
+						"pullRequest: [%s] NewOffset: %d brokerBusy: %v",
+						request.String(), offset, brokerBusy)
+					if brokerBusy {
+						rlog.Infof("[NOTIFY_ME]the first time to pull message, but pull request offset"+
+							" larger than broker consume offset. pullRequest: [%s] NewOffset: %d",
+							request.String(), offset)
+					}
+
+					request.lockedFirst = true
+					request.nextOffset = offset
+				}
+			} else {
+				rlog.Infof("pull message later because not locked in broker, [%s]", request.String())
+				sleepTime = _PullDelayTimeWhenError
+				goto NEXT
+			}
+		}
+
+		v, exist := pc.subscriptionDataTable.Load(request.mq.Topic)
+		if !exist {
+			rlog.Warnf("find the consumer's subscription failed, %s", request.String())
+			sleepTime = _PullDelayTimeWhenError
+			goto NEXT
+		}
+		beginTime := time.Now()
+		var (
+			commitOffsetEnable bool
+			commitOffsetValue  int64
+			subExpression      string
+		)
+
+		if pc.model == Clustering {
+			commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
+			if commitOffsetValue > 0 {
+				commitOffsetEnable = true
+			}
+		}
+
+		sd := v.(*kernel.SubscriptionData)
+		classFilter := sd.ClassFilterMode
+		if pc.option.PostSubscriptionWhenPull && classFilter {
+			subExpression = sd.SubString
+		}
+
+		sysFlag := buildSysFlag(commitOffsetEnable, true, subExpression != "", classFilter)
+
+		pullRequest := &kernel.PullMessageRequest{
+			ConsumerGroup:  pc.consumerGroup,
+			Topic:          request.mq.Topic,
+			QueueId:        int32(request.mq.QueueId),
+			QueueOffset:    request.nextOffset,
+			MaxMsgNums:     pc.option.PullBatchSize,
+			SysFlag:        sysFlag,
+			CommitOffset:   0,
+			SubExpression:  _SubAll,
+			ExpressionType: string(TAG), // TODO
+		}
+		//
+		//if data.ExpType == string(TAG) {
+		//	pullRequest.SubVersion = 0
+		//} else {
+		//	pullRequest.SubVersion = data.SubVersion
+		//}
+
+		//ch := make(chan *kernel.PullResult)
+		brokerResult := tryFindBroker(request.mq)
+		if brokerResult == nil {
+			rlog.Warnf("no broker found for %s", request.mq.String())
+			sleepTime = _PullDelayTimeWhenError
+			goto NEXT
+		}
+		result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
+		if err != nil {
+			rlog.Warnf("pull message from %s error: %s", "127.0.0.1:10911", err.Error())
+			sleepTime = _PullDelayTimeWhenError
+			goto NEXT
+		}
+
+		if result.Status == kernel.PullBrokerTimeout {
+			rlog.Warnf("pull broker: %s timeout", "127.0.0.1:10911")
+			sleepTime = _PullDelayTimeWhenError
+			goto NEXT
+		}
+
+		switch result.Status {
+		case kernel.PullFound:
+			rlog.Debugf("Topic: %s, QueueId: %d found messages: %d", request.mq.Topic, request.mq.QueueId,
+				len(result.GetMessageExts()))
+			prevRequestOffset := request.nextOffset
+			request.nextOffset = result.NextBeginOffset
+
+			rt := time.Now().Sub(beginTime)
+			increasePullRT(pc.consumerGroup, request.mq.Topic, rt)
+
+			msgFounded := result.GetMessageExts()
+			firstMsgOffset := int64(math.MaxInt64)
+			if msgFounded != nil && len(msgFounded) != 0 {
+				firstMsgOffset = msgFounded[0].QueueOffset
+				increasePullTPS(pc.consumerGroup, request.mq.Topic, len(msgFounded))
+				pq.putMessage(msgFounded)
+			}
+			if result.NextBeginOffset < prevRequestOffset || firstMsgOffset < prevRequestOffset {
+				rlog.Warnf("[BUG] pull message result maybe data wrong, [nextBeginOffset=%s, "+
+					"firstMsgOffset=%d, prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, prevRequestOffset)
+			}
+		case kernel.PullNoNewMsg:
+			rlog.Infof("Topic: %s, QueueId: %d, no more msg", request.mq.Topic, request.mq.QueueId)
+		case kernel.PullNoMsgMatched:
+			request.nextOffset = result.NextBeginOffset
+			pc.correctTagsOffset(request)
+		case kernel.PullOffsetIllegal:
+			rlog.Warnf("the pull request offset illegal, {} {}", request.String(), result.String())
+			request.nextOffset = result.NextBeginOffset
+			pq.dropped = true
+			go func() {
+				time.Sleep(10 * time.Second)
+				pc.storage.update(request.mq, request.nextOffset, false)
+				pc.storage.persist([]*kernel.MessageQueue{request.mq})
+				pc.storage.remove(request.mq)
+				rlog.Warnf("fix the pull request offset: %s", request.String())
+			}()
+		default:
+			rlog.Warnf("")
+			sleepTime = _PullDelayTimeWhenError
+		}
+	}
+}
+
+func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
+	// TODO
+}
+
+func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *kernel.MessageExt) bool {
+	return true
+}
+
+type ConsumeMessageContext struct {
+	consumerGroup string
+	msgs          []*kernel.MessageExt
+	mq            *kernel.MessageQueue
+	success       bool
+	status        string
+	// mqTractContext
+	properties map[string]string
+}
+
+func (pc *pushConsumer) consumeMessageCurrently(pq *ProcessQueue, mq *kernel.MessageQueue) {
+	msgs := pq.takeMessages(32)
+	if msgs == nil {
+		return
+	}
+	for count := 0; count < len(msgs); count++ {
+		var subMsgs []*kernel.MessageExt
+		if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
+			subMsgs = msgs[count:]
+			count = len(msgs)
+		} else {
+			next := count + pc.option.ConsumeMessageBatchMaxSize
+			subMsgs = msgs[count:next]
+			count = next
+		}
+		go func() {
+		RETRY:
+			if pq.dropped {
+				rlog.Infof("the message queue not be able to consume, because it was dropped. group=%s, mq=%s",
+					pc.consumerGroup, mq.String())
+				return
+			}
+
+			ctx := &ConsumeMessageContext{
+				properties: make(map[string]string),
+			}
+			// TODO hook
+			beginTime := time.Now()
+			groupTopic := kernel.RetryGroupTopicPrefix + pc.consumerGroup
+			for idx := range subMsgs {
+				msg := subMsgs[idx]
+				retryTopic := msg.Properties[kernel.PropertyRetryTopic]
+				if retryTopic == "" && groupTopic == msg.Topic {
+					msg.Topic = retryTopic
+				}
+				subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+					beginTime.UnixNano()/int64(time.Millisecond), 10)
+			}
+			result, err := pc.consume(ctx, subMsgs)
+			consumeRT := time.Now().Sub(beginTime)
+			if err != nil {
+				ctx.properties["ConsumeContextType"] = "EXCEPTION"
+			} else if consumeRT >= pc.option.ConsumeTimeout {
+				ctx.properties["ConsumeContextType"] = "TIMEOUT"
+			} else if result == ConsumeSuccess {
+				ctx.properties["ConsumeContextType"] = "SUCCESS"
+			} else {
+				ctx.properties["ConsumeContextType"] = "RECONSUME_LATER"
+			}
+
+			// TODO hook
+			increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
+
+			if !pq.dropped {
+				msgBackFailed := make([]*kernel.MessageExt, 0)
+				if result == ConsumeSuccess {
+					increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+				} else {
+					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+					if pc.model == BroadCasting {
+						for i := 0; i < len(msgs); i++ {
+							rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", subMsgs[i])
+						}
+					} else {
+						for i := 0; i < len(msgs); i++ {
+							msg := msgs[i]
+							if !pc.sendMessageBack(ctx, msg) {
+								msg.ReconsumeTimes += 1
+								msgBackFailed = append(msgBackFailed, msg)
+							}
+						}
+					}
+				}
+
+				offset := pq.removeMessage(len(subMsgs))
+
+				if offset >= 0 && !pq.dropped {
+					pc.storage.update(mq, int64(offset), true)
+				}
+				if len(msgBackFailed) > 0 {
+					subMsgs = msgBackFailed
+					time.Sleep(5 * time.Second)
+					goto RETRY
+				}
+			} else {
+				rlog.Warnf("processQueue is dropped without process consume result. messageQueue=%s, msgs=%+v",
+					mq, msgs)
+			}
+		}()
+	}
+}
+
+func (pc *pushConsumer) consumeMessageOrderly(pq *ProcessQueue, mq *kernel.MessageQueue) {
+}
diff --git a/consumer/push_consumer_test.go b/consumer/push_consumer_test.go
new file mode 100644
index 0000000..f7bc454
--- /dev/null
+++ b/consumer/push_consumer_test.go
@@ -0,0 +1,18 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
diff --git a/consumer/statistics.go b/consumer/statistics.go
new file mode 100644
index 0000000..29045a0
--- /dev/null
+++ b/consumer/statistics.go
@@ -0,0 +1,60 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import "time"
+
+var (
+	topicAndGroupConsumeOKTPS     = &statsItemSet{statsName: "CONSUME_OK_TPS"}
+	topicAndGroupConsumeRT        = &statsItemSet{statsName: "CONSUME_FAILED_TPS"}
+	topicAndGroupConsumeFailedTPS = &statsItemSet{statsName: "CONSUME_RT"}
+	topicAndGroupPullTPS          = &statsItemSet{statsName: "PULL_TPS"}
+	topicAndGroupPullRT           = &statsItemSet{statsName: "PULL_RT"}
+)
+
+type statsItem struct {
+}
+
+type statsItemSet struct {
+	statsName      string
+	statsItemTable map[string]statsItem
+}
+
+func (set *statsItemSet) addValue(key string, incValue, incTimes int) {
+
+}
+
+func increasePullRT(group, topic string, rt time.Duration) {
+
+}
+
+func increaseConsumeRT(group, topic string, rt time.Duration) {
+
+}
+
+func increasePullTPS(group, topic string, msgNumber int) {
+
+}
+
+func increaseConsumeOKTPS(group, topic string, msgNumber int) {
+
+}
+
+func increaseConsumeFailedTPS(group, topic string, msgNumber int) {
+
+}
diff --git a/consumer/strategy.go b/consumer/strategy.go
new file mode 100644
index 0000000..1cb6b2b
--- /dev/null
+++ b/consumer/strategy.go
@@ -0,0 +1,130 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
+)
+
+// Strategy Algorithm for message allocating between consumers
+type AllocateStrategy string
+
+const (
+	// An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
+	// specified.
+	//
+	// If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
+	// should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
+	// no alive consumer to monopolize them.
+	StrategyMachineNearby = AllocateStrategy("MachineNearby")
+
+	// Average Hashing queue algorithm
+	StrategyAveragely = AllocateStrategy("Averagely")
+
+	// Cycle average Hashing queue algorithm
+	StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")
+
+	// Use Message Queue specified
+	StrategyConfig = AllocateStrategy("Config")
+
+	// Computer room Hashing queue algorithm, such as Alipay logic room
+	StrategyMachineRoom = AllocateStrategy("MachineRoom")
+
+	// Consistent Hashing queue algorithm
+	StrategyConsistentHash = AllocateStrategy("ConsistentHash")
+)
+
+func allocateByAveragely(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
+	cidAll []string) []*kernel.MessageQueue {
+	if currentCID == "" || utils.IsArrayEmpty(mqAll) || utils.IsArrayEmpty(cidAll) {
+		return nil
+	}
+	var (
+		find  bool
+		index int
+	)
+
+	for idx := range cidAll {
+		if cidAll[idx] == currentCID {
+			find = true
+			index = idx
+			break
+		}
+	}
+	if !find {
+		rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in cidAll:%+v", consumerGroup, currentCID, cidAll)
+		return nil
+	}
+
+	mqSize := len(mqAll)
+	cidSize := len(cidAll)
+	mod := mqSize % cidSize
+
+	var averageSize int
+	if mqSize <= cidSize {
+		averageSize = 1
+	} else {
+		if mod > 0 && index < mod {
+			averageSize = mqSize/cidSize + 1
+		} else {
+			averageSize = mqSize / cidSize
+		}
+	}
+
+	var startIndex int
+	if mod > 0 && index < mod {
+		startIndex = index * averageSize
+	} else {
+		startIndex = index*averageSize + mod
+	}
+
+	num := utils.MinInt(averageSize, mqSize-startIndex)
+	result := make([]*kernel.MessageQueue, num)
+	for i := 0; i < num; i++ {
+		result[i] = mqAll[(startIndex+i)%mqSize]
+	}
+	return result
+}
+
+// TODO
+func allocateByMachineNearby(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
+	cidAll []string) []*kernel.MessageQueue {
+	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
+	cidAll []string) []*kernel.MessageQueue {
+	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByConfig(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
+	cidAll []string) []*kernel.MessageQueue {
+	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByMachineRoom(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
+	cidAll []string) []*kernel.MessageQueue {
+	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func allocateByConsistentHash(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
+	cidAll []string) []*kernel.MessageQueue {
+	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
diff --git a/consumer_test.go b/consumer_test.go
deleted file mode 100644
index aaba3cb..0000000
--- a/consumer_test.go
+++ /dev/null
@@ -1,18 +0,0 @@
-package rocketmq
-
-import (
-	"fmt"
-	"testing"
-)
-
-func TestDefaultConsumer_Pull(t *testing.T) {
-	consumer := NewConsumer(ConsumerConfig{
-		GroupName: "testGroup",
-	})
-	consumer.Start()
-	result, err := consumer.Pull("test", "*", 32)
-	if err != nil {
-		t.Fatal(err.Error())
-	}
-	fmt.Println(len(result.GetMessageExts()))
-}
diff --git a/examples/main.go b/examples/main.go
deleted file mode 100644
index 72a2a68..0000000
--- a/examples/main.go
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-	"github.com/apache/rocketmq-client-go/core"
-	"gopkg.in/alecthomas/kingpin.v2"
-	"os"
-)
-
-var (
-	rmq     = kingpin.New("rocketmq", "RocketMQ cmd tools")
-	namesrv = rmq.Flag("namesrv", "NameServer address.").Default("localhost:9876").Short('n').String()
-	topic   = rmq.Flag("topic", "topic name.").Short('t').Required().String()
-	gid     = rmq.Flag("groupId", "group Id").Short('g').Default("testGroup").String()
-	amount  = rmq.Flag("amount", "how many message to produce or consume").Default("64").Short('a').Int()
-
-	produce     = rmq.Command("produce", "send messages to RocketMQ")
-	body        = produce.Flag("body", "message body").Short('b').Required().String()
-	workerCount = produce.Flag("workerCount", "works of send message with orderly").Default("1").Short('w').Int()
-	orderly     = produce.Flag("orderly", "send msg orderly").Short('o').Bool()
-
-	consume = rmq.Command("consume", "consumes message from RocketMQ")
-)
-
-func main() {
-	switch kingpin.MustParse(rmq.Parse(os.Args[1:])) {
-	case produce.FullCommand():
-		pConfig := &rocketmq.ProducerConfig{ClientConfig: rocketmq.ClientConfig{
-			GroupID:    *gid,
-			NameServer: *namesrv,
-			LogC: &rocketmq.LogConfig{
-				Path:     "example",
-				FileSize: 64 * 1 << 10,
-				FileNum:  1,
-				Level:    rocketmq.LogLevelDebug,
-			},
-		}}
-		if *orderly {
-			sendMessageOrderly(pConfig)
-		} else {
-			sendMessage(pConfig)
-		}
-	case consume.FullCommand():
-		cConfig := &rocketmq.PushConsumerConfig{ClientConfig: rocketmq.ClientConfig{
-			GroupID:    *gid,
-			NameServer: *namesrv,
-			LogC: &rocketmq.LogConfig{
-				Path:     "example",
-				FileSize: 64 * 1 << 10,
-				FileNum:  1,
-				Level:    rocketmq.LogLevelInfo,
-			},
-		}}
-
-		ConsumeWithPush(cConfig)
-	}
-}
diff --git a/examples/producer.go b/examples/producer.go
deleted file mode 100644
index e1c4d2f..0000000
--- a/examples/producer.go
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-	"fmt"
-	"github.com/apache/rocketmq-client-go/core"
-)
-
-func sendMessage(config *rocketmq.ProducerConfig) {
-	producer, err := rocketmq.NewProducer(config)
-
-	if err != nil {
-		fmt.Println("create Producer failed, error:", err)
-		return
-	}
-
-	err = producer.Start()
-	if err != nil {
-		fmt.Println("start producer error", err)
-		return
-	}
-	defer producer.Shutdown()
-
-	fmt.Printf("Producer: %s started... \n", producer)
-	for i := 0; i < *amount; i++ {
-		msg := fmt.Sprintf("%s-%d", *body, i)
-		result, err := producer.SendMessageSync(&rocketmq.Message{Topic: *topic, Body: msg})
-		if err != nil {
-			fmt.Println("Error:", err)
-		}
-		fmt.Printf("send message: %s result: %s\n", msg, result)
-	}
-	fmt.Println("shutdown producer.")
-}
diff --git a/examples/producer/main.go b/examples/producer/main.go
new file mode 100644
index 0000000..5c12584
--- /dev/null
+++ b/examples/producer/main.go
@@ -0,0 +1,47 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"fmt"
+	"github.com/apache/rocketmq-client-go/consumer"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"os"
+	"time"
+)
+
+func main() {
+	c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+		ConsumerModel: consumer.Clustering,
+		FromWhere:     consumer.ConsumeFromFirstOffset,
+	})
+	err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
+		msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+		fmt.Println(msgs)
+		return consumer.ConsumeSuccess, nil
+	})
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	err = c.Start()
+	if err != nil {
+		fmt.Println(err.Error())
+		os.Exit(-1)
+	}
+	time.Sleep(time.Hour)
+}
diff --git a/examples/producer_orderly.go b/examples/producer_orderly.go
deleted file mode 100644
index 9943f5b..0000000
--- a/examples/producer_orderly.go
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package main
-
-import (
-	"fmt"
-	"sync"
-	"sync/atomic"
-
-	"github.com/apache/rocketmq-client-go/core"
-)
-
-type queueSelectorByOrderID struct{}
-
-func (s queueSelectorByOrderID) Select(size int, m *rocketmq.Message, arg interface{}) int {
-	return arg.(int) % size
-}
-
-type worker struct {
-	p            rocketmq.Producer
-	leftMsgCount int64
-}
-
-func (w *worker) run() {
-	selector := queueSelectorByOrderID{}
-	for atomic.AddInt64(&w.leftMsgCount, -1) >= 0 {
-		r, err := w.p.SendMessageOrderly(
-			&rocketmq.Message{Topic: *topic, Body: *body}, selector, 7 /*orderID*/, 3,
-		)
-		if err != nil {
-			println("Send Orderly Error:", err)
-		}
-		fmt.Printf("send orderly result:%+v\n", r)
-	}
-}
-
-func sendMessageOrderly(config *rocketmq.ProducerConfig) {
-	producer, err := rocketmq.NewProducer(config)
-	if err != nil {
-		fmt.Println("create Producer failed, error:", err)
-		return
-	}
-
-	producer.Start()
-	defer producer.Shutdown()
-
-	wg := sync.WaitGroup{}
-	wg.Add(*workerCount)
-
-	workers := make([]worker, *workerCount)
-	for i := range workers {
-		workers[i].p = producer
-		workers[i].leftMsgCount = (int64)(*amount)
-	}
-
-	for i := range workers {
-		go func(w *worker) { w.run(); wg.Done() }(&workers[i])
-	}
-
-	wg.Wait()
-}
diff --git a/examples/pull_consumer.go b/examples/pull_consumer.go
deleted file mode 100644
index 1b209c0..0000000
--- a/examples/pull_consumer.go
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-	"fmt"
-	"time"
-
-	"github.com/apache/rocketmq-client-go/core"
-)
-
-func ConsumeWithPull(config *rocketmq.PullConsumerConfig, topic string) {
-
-	consumer, err := rocketmq.NewPullConsumer(config)
-	if err != nil {
-		fmt.Printf("new pull consumer error:%s\n", err)
-		return
-	}
-
-	err = consumer.Start()
-	if err != nil {
-		fmt.Printf("start consumer error:%s\n", err)
-		return
-	}
-	defer consumer.Shutdown()
-
-	mqs := consumer.FetchSubscriptionMessageQueues(topic)
-	fmt.Printf("fetch subscription mqs:%+v\n", mqs)
-
-	total, offsets, now := 0, map[int]int64{}, time.Now()
-
-PULL:
-	for {
-		for _, mq := range mqs {
-			pr := consumer.Pull(mq, "*", offsets[mq.ID], 32)
-			total += len(pr.Messages)
-			fmt.Printf("pull %s, result:%+v\n", mq.String(), pr)
-
-			switch pr.Status {
-			case rocketmq.PullNoNewMsg:
-				break PULL
-			case rocketmq.PullFound:
-				fallthrough
-			case rocketmq.PullNoMatchedMsg:
-				fallthrough
-			case rocketmq.PullOffsetIllegal:
-				offsets[mq.ID] = pr.NextBeginOffset
-			case rocketmq.PullBrokerTimeout:
-				fmt.Println("broker timeout occur")
-			}
-		}
-	}
-
-	var timePerMessage time.Duration
-	if total > 0 {
-		timePerMessage = time.Since(now) / time.Duration(total)
-	}
-	fmt.Printf("total message:%d, per message time:%d\n", total, timePerMessage)
-}
diff --git a/examples/push_consumer.go b/examples/push_consumer.go
deleted file mode 100644
index 38e434c..0000000
--- a/examples/push_consumer.go
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package main
-
-import (
-	"fmt"
-	"github.com/apache/rocketmq-client-go/core"
-	"sync/atomic"
-)
-
-func ConsumeWithPush(config *rocketmq.PushConsumerConfig) {
-
-	consumer, err := rocketmq.NewPushConsumer(config)
-	if err != nil {
-		println("create Consumer failed, error:", err)
-		return
-	}
-
-	ch := make(chan interface{})
-	var count = (int64)(*amount)
-	// MUST subscribe topic before consumer started.
-	consumer.Subscribe(*topic, "*", func(msg *rocketmq.MessageExt) rocketmq.ConsumeStatus {
-		fmt.Printf("A message received: \"%s\" \n", msg.Body)
-		if atomic.AddInt64(&count, -1) <= 0 {
-			ch <- "quit"
-		}
-		return rocketmq.ConsumeSuccess
-	})
-
-	err = consumer.Start()
-	if err != nil {
-		println("consumer start failed,", err)
-		return
-	}
-
-	fmt.Printf("consumer: %s started...\n", consumer)
-	<-ch
-	err = consumer.Shutdown()
-	if err != nil {
-		println("consumer shutdown failed")
-		return
-	}
-	println("consumer has shutdown.")
-}
diff --git a/go.mod b/go.mod
index 2701b7c..58ca7c7 100644
--- a/go.mod
+++ b/go.mod
@@ -5,6 +5,7 @@ go 1.11
 require (
 	github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc // indirect
 	github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect
+	github.com/emirpasic/gods v1.12.0
 	github.com/sirupsen/logrus v1.3.0
 	github.com/stretchr/testify v1.3.0
 	github.com/tidwall/gjson v1.2.1
diff --git a/go.sum b/go.sum
index 7a45ece..85b0d5e 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg=
+github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
diff --git a/kernel/client.go b/kernel/client.go
index f9185ac..e83d598 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -23,34 +23,58 @@ import (
 	"fmt"
 	"github.com/apache/rocketmq-client-go/remote"
 	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
 	"os"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 )
 
 const (
 	defaultTraceRegionID = "DefaultRegion"
-	tranceOff            = "false"
-)
 
-var (
-	namesrvAddrs                  = os.Getenv("rocketmq.namesrv.addr")
-	clientIP                      = utils.LocalIP()
-	instanceName                  = os.Getenv("rocketmq.client.name")
-	pollNameServerInterval        = 30 * time.Second
-	heartbeatBrokerInterval       = 30 * time.Second
-	persistConsumerOffsetInterval = 5 * time.Second
-	unitMode                      = false
-	vipChannelEnabled, _          = strconv.ParseBool(os.Getenv("com.rocketmq.sendMessageWithVIPChannel"))
-	clientID                      = string(clientIP) + "@" + instanceName
+	// tracing message switch
+	_TranceOff = "false"
+
+	// Pulling topic information interval from the named server
+	_PullNameServerInterval = 30 * time.Second
+
+	// Pulling topic information interval from the named server
+	_HeartbeatBrokerInterval = 30 * time.Second
+
+	// Offset persistent interval for consumer
+	_PersistOffset = 5 * time.Second
+
+	// Rebalance interval
+	_RebalanceInterval = 20 * time.Millisecond
 )
 
 var (
-	ErrServiceState = errors.New("service state is not Running, please check")
+	ErrServiceState = errors.New("service state is not running, please check")
 )
 
+type ClientOption struct {
+	NameServerAddr    string
+	ClientIP          string
+	InstanceName      string
+	UnitMode          bool
+	UnitName          string
+	VIPChannelEnabled bool
+	UseTLS            bool
+}
+
+func (opt *ClientOption) ChangeInstanceNameToPID() {
+	if opt.InstanceName == "DEFAULT" {
+		opt.InstanceName = strconv.Itoa(os.Getegid())
+	}
+}
+
+func (opt *ClientOption) String() string {
+	return fmt.Sprintf("ClientOption [NameServerAddr=%s, ClientIP=%s, InstanceName=%s, "+
+		"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", opt.NameServerAddr, opt.ClientIP,
+		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.UseTLS)
+}
+
 type InnerProducer interface {
 	PublishTopicList() []string
 	UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -62,15 +86,169 @@ type InnerProducer interface {
 }
 
 type InnerConsumer interface {
-	DoRebalance()
 	PersistConsumerOffset()
 	UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
 	IsSubscribeTopicNeedUpdate(topic string) bool
+	SubscriptionDataList() []*SubscriptionData
+	Rebalance()
 	IsUnitMode() bool
 }
 
+type RMQClient struct {
+	option ClientOption
+	// group -> InnerProducer
+	producerMap sync.Map
+
+	// group -> InnerConsumer
+	consumerMap sync.Map
+}
+
+var clientMap sync.Map
+
+func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
+	// TODO
+	return &RMQClient{option: option}
+}
+
+func (c *RMQClient) Start() {
+	// TODO fetchNameServerAddr
+	go func() {}()
+
+	// schedule update route info
+	go func() {
+		// delay
+		time.Sleep(50 * time.Millisecond)
+		for {
+			c.UpdateTopicRouteInfo()
+			time.Sleep(_PullNameServerInterval)
+		}
+	}()
+
+	// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
+	go func() {}()
+
+	// schedule persist offset
+	go func() {
+		time.Sleep(10 * time.Second)
+		for {
+			c.consumerMap.Range(func(key, value interface{}) bool {
+				consumer := value.(InnerConsumer)
+				consumer.PersistConsumerOffset()
+				return true
+			})
+			time.Sleep(_PersistOffset)
+		}
+	}()
+
+	go func() {
+		for {
+			c.RebalanceImmediately()
+			time.Sleep(time.Second)
+		}
+	}()
+}
+
+func (c *RMQClient) ClientID() string {
+	//id := c.option.ClientIP + "@" + c.option.InstanceName
+	//if c.option.UnitName != "" {
+	//	id += "@" + c.option.UnitName
+	//}
+	return "127.0.0.1:10911@DEFAULT"
+}
+
+func (c *RMQClient) CheckClientInBroker() {
+
+}
+
+func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
+	hbData := &heartbeatData{
+		ClientId: c.ClientID(),
+	}
+	pData := make([]producerData, 0)
+	c.producerMap.Range(func(key, value interface{}) bool {
+		pData = append(pData, producerData(key.(string)))
+		return true
+	})
+
+	cData := make([]consumerData, 0)
+	c.consumerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerConsumer)
+		cData = append(cData, consumerData{
+			GroupName:         key.(string),
+			CType:             "PUSH",
+			MessageModel:      "CLUSTERING",
+			Where:             "CONSUME_FROM_FIRST_OFFSET",
+			UnitMode:          consumer.IsUnitMode(),
+			SubscriptionDatas: consumer.SubscriptionDataList(),
+		})
+		return true
+	})
+	hbData.ProducerDatas = pData
+	hbData.ConsumerDatas = cData
+	if len(pData) == 0 && len(cData) == 0 {
+		rlog.Warn("sending heartbeat, but no consumer and no producer")
+		return
+	}
+	brokerAddressesMap.Range(func(key, value interface{}) bool {
+		brokerName := key.(string)
+		data := value.(*BrokerData)
+		for id, addr := range data.BrokerAddresses {
+			cmd := remote.NewRemotingCommand(ReqHeartBeat, nil, hbData.encode())
+			response, err := remote.InvokeSync(addr, cmd, 3*time.Second)
+			if err != nil {
+				rlog.Warnf("send heart beat to broker error: %s", err.Error())
+				return true
+			}
+			if response.Code == ResSuccess {
+				v, exist := brokerVersionMap.Load(brokerName)
+				var m map[string]int32
+				if exist {
+					m = v.(map[string]int32)
+				} else {
+					m = make(map[string]int32, 4)
+					brokerVersionMap.Store(brokerName, m)
+				}
+				m[brokerName] = int32(response.Version)
+				rlog.Infof("send heart beat to broker[%s %s %s] success", brokerName, id, addr)
+			}
+		}
+		return true
+	})
+}
+
+func (c *RMQClient) UpdateTopicRouteInfo() {
+	publishTopicSet := make(map[string]bool, 0)
+	c.producerMap.Range(func(key, value interface{}) bool {
+		producer := value.(InnerProducer)
+		list := producer.PublishTopicList()
+		for idx := range list {
+			publishTopicSet[list[idx]] = true
+		}
+		return true
+	})
+	for topic := range publishTopicSet {
+		c.UpdatePublishInfo(topic, UpdateTopicRouteInfo(topic))
+	}
+
+	subscribedTopicSet := make(map[string]bool, 0)
+	c.consumerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerConsumer)
+		list := consumer.SubscriptionDataList()
+		for idx := range list {
+			if !strings.HasPrefix(list[idx].Topic, RetryGroupTopicPrefix) {
+				subscribedTopicSet[list[idx].Topic] = true
+			}
+		}
+		return true
+	})
+
+	for topic := range subscribedTopicSet {
+		c.UpdateSubscribeInfo(topic, UpdateTopicRouteInfo(topic))
+	}
+}
+
 // SendMessage with batch by sync
-func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
+func (c *RMQClient) SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
 	response, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
@@ -79,16 +257,16 @@ func SendMessageSync(ctx context.Context, brokerAddrs, brokerName string, reques
 		return nil, err
 	}
 
-	return processSendResponse(brokerName, msgs, response), nil
+	return c.processSendResponse(brokerName, msgs, response), nil
 }
 
 // SendMessageAsync send message with batch by async
-func SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
+func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
 	msgs []*Message, f func(result *SendResult)) error {
 	return nil
 }
 
-func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
+func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
 	err := remote.InvokeOneWay(brokerAddrs, cmd)
@@ -98,7 +276,7 @@ func SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMes
 	return nil, err
 }
 
-func processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand) *SendResult {
+func (c *RMQClient) processSendResponse(brokerName string, msgs []*Message, cmd *remote.RemotingCommand) *SendResult {
 	var status SendStatus
 	switch cmd.Code {
 	case ResFlushDiskTimeout:
@@ -118,11 +296,11 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin
 
 	msgIDs := make([]string, 0)
 	for i := 0; i < len(msgs); i++ {
-		msgIDs = append(msgIDs, msgs[i].Properties[UniqueClientMessageIdKeyIndex])
+		msgIDs = append(msgIDs, msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
 	}
 
-	regionId := cmd.ExtFields[MsgRegion]
-	trace := cmd.ExtFields[TraceSwitch]
+	regionId := cmd.ExtFields[PropertyMsgRegion]
+	trace := cmd.ExtFields[PropertyTraceSwitch]
 
 	if regionId == "" {
 		regionId = defaultTraceRegionID
@@ -140,23 +318,22 @@ func processSendResponse(brokerName string, msgs []*Message, cmd *remote.Remotin
 		QueueOffset:   sendResponse.QueueOffset,
 		TransactionID: sendResponse.TransactionId,
 		RegionID:      regionId,
-		TraceOn:       trace != "" && trace != tranceOff,
+		TraceOn:       trace != "" && trace != _TranceOff,
 	}
 }
 
 // PullMessage with sync
-func PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
+func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
-
 	res, err := remote.InvokeSync(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
 		return nil, err
 	}
 
-	return processPullResponse(res)
+	return c.processPullResponse(res)
 }
 
-func processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
+func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
 	pullResult := &PullResult{}
 	switch response.Code {
 	case ResSuccess:
@@ -164,7 +341,7 @@ func processPullResponse(response *remote.RemotingCommand) (*PullResult, error)
 	case ResPullNotFound:
 		pullResult.Status = PullNoNewMsg
 	case ResPullRetryImmediately:
-		pullResult.Status = PullNoMatchedMsg
+		pullResult.Status = PullNoMsgMatched
 	case ResPullOffsetMoved:
 		pullResult.Status = PullOffsetIllegal
 	default:
@@ -197,70 +374,128 @@ func processPullResponse(response *remote.RemotingCommand) (*PullResult, error)
 }
 
 // PullMessageAsync pull message async
-func PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error {
+func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error {
 	return nil
 }
 
 // QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(topic string, queueId int) error {
-	return nil
+func QueryMaxOffset(topic string, queueId int) (int64, error) {
+	return 0, nil
 }
 
 // QueryConsumerOffset with specific queueId and topic of consumerGroup
-func QueryConsumerOffset(consumerGroup, topic string, queue int) (int64, error) {
+func (c *RMQClient) QueryConsumerOffset(consumerGroup, topic string, queue int) (int64, error) {
 	return 0, nil
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
-func SearchOffsetByTimestamp(topic string, queue int, timestamp int64) (int64, error) {
+func (c *RMQClient) SearchOffsetByTimestamp(topic string, queue int, timestamp int64) (int64, error) {
 	return 0, nil
 }
 
 // UpdateConsumerOffset with specific queueId and topic
-func UpdateConsumerOffset(consumerGroup, topic string, queue int, offset int64) error {
+func (c *RMQClient) UpdateConsumerOffset(consumerGroup, topic string, queue int, offset int64) error {
 	return nil
 }
 
-var (
-	// group -> InnerProducer
-	producerMap sync.Map
-
-	// group -> InnerConsumer
-	consumerMap sync.Map
-)
-
-func CheckClientInBroker() {
+func (c *RMQClient) RegisterConsumer(group string, consumer InnerConsumer) error {
+	c.consumerMap.Store(group, consumer)
+	return nil
+}
 
+func (c *RMQClient) UnregisterConsumer(group string) {
 }
 
-func RegisterConsumer(group string, consumer InnerConsumer) {
+func (c *RMQClient) RegisterProducer(group string, producer InnerProducer) {
+}
 
+func (c *RMQClient) UnregisterProducer(group string) {
 }
 
-func UnregisterConsumer(group string) {
+func (c *RMQClient) SelectProducer(group string) InnerProducer {
+	return nil
+}
 
+func (c *RMQClient) SelectConsumer(group string) InnerConsumer {
+	return nil
 }
 
-func RegisterProducer(group string, producer InnerProducer) {
+func (c *RMQClient) RebalanceImmediately() {
+	c.consumerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerConsumer)
+		consumer.Rebalance()
+		return true
+	})
+}
 
+func (c *RMQClient) UpdatePublishInfo(topic string, data *TopicRouteData) {
+	if !c.isNeedUpdatePublishInfo(topic) {
+		return
+	}
+	c.producerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerProducer)
+		publishInfo := routeData2PublishInfo(topic, data)
+		publishInfo.HaveTopicRouterInfo = true
+		consumer.UpdateTopicPublishInfo(topic, publishInfo)
+		return true
+	})
 }
 
-func UnregisterProducer(group string) {
+func (c *RMQClient) isNeedUpdatePublishInfo(topic string) bool {
+	var result bool
+	c.producerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerProducer)
+		if consumer.IsPublishTopicNeedUpdate(topic) {
+			result = true
+			return false
+		}
+		return true
+	})
+	return result
+}
 
+func (c *RMQClient) UpdateSubscribeInfo(topic string, data *TopicRouteData) {
+	if !c.isNeedUpdateSubscribeInfo(topic) {
+		return
+	}
+	c.consumerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerConsumer)
+		// TODO
+		consumer.UpdateTopicSubscribeInfo(topic, routeData2SubscribeInfo(topic, data))
+		return true
+	})
 }
 
-func SelectProducer(group string) InnerProducer {
-	return nil
+func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
+	var result bool
+	c.consumerMap.Range(func(key, value interface{}) bool {
+		consumer := value.(InnerConsumer)
+		if consumer.IsSubscribeTopicNeedUpdate(topic) {
+			result = true
+			return false
+		}
+		return true
+	})
+	return result
 }
 
-func SelectConsumer(group string) InnerConsumer {
-	return nil
+func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*MessageQueue {
+	list := make([]*MessageQueue, 0)
+	for idx := range data.QueueDataList {
+		qd := data.QueueDataList[idx]
+		if queueIsReadable(qd.Perm) {
+			for i := 0; i < qd.ReadQueueNums; i++ {
+				list = append(list, &MessageQueue{
+					Topic:      topic,
+					BrokerName: qd.BrokerName,
+					QueueId:    i,
+				})
+			}
+		}
+	}
+	return list
 }
 
 func encodeMessages(message []*Message) []byte {
 	return nil
 }
-
-func sendHeartbeatToAllBroker() {
-
-}
diff --git a/kernel/client_test.go b/kernel/client_test.go
new file mode 100644
index 0000000..7a9723c
--- /dev/null
+++ b/kernel/client_test.go
@@ -0,0 +1,45 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+	"context"
+	"testing"
+)
+
+func TestRMQClient_PullMessage(t *testing.T) {
+	client := GetOrNewRocketMQClient(ClientOption{})
+	req := &PullMessageRequest{
+		ConsumerGroup:  "testGroup",
+		Topic:          "wenfeng",
+		QueueId:        0,
+		QueueOffset:    0,
+		MaxMsgNums:     32,
+		SysFlag:        0x1 << 2,
+		SubExpression:  "*",
+		ExpressionType: "TAG",
+	}
+	res, err := client.PullMessage(context.Background(), "127.0.0.1:10911", req)
+	if err != nil {
+		t.Fatal(err.Error())
+	}
+
+	for _, a := range res.GetMessageExts() {
+		t.Log(string(a.Body))
+	}
+}
diff --git a/kernel/constants.go b/kernel/constants.go
new file mode 100644
index 0000000..a9eacac
--- /dev/null
+++ b/kernel/constants.go
@@ -0,0 +1,27 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+const (
+	RetryGroupTopicPrefix = "%RETRY%"
+	DefaultConsumerGroup  = "DEFAULT_CONSUMER"
+)
+
+func GetRetryTopic(group string) string {
+	return RetryGroupTopicPrefix + group
+}
diff --git a/kernel/message.go b/kernel/message.go
index 843a743..117d766 100644
--- a/kernel/message.go
+++ b/kernel/message.go
@@ -20,32 +20,32 @@ package kernel
 import "fmt"
 
 const (
-	KeySeparator                   = " "
-	Keys                           = "KEYS"
-	Tags                           = "TAGS"
-	WaitStoreMsgOk                 = "WAIT"
-	DelayTimeLevel                 = "DELAY"
-	RetryTopic                     = "RETRY_TOPIC"
-	RealTopic                      = "REAL_TOPIC"
-	RealQueueId                    = "REAL_QID"
-	TransactionPrepared            = "TRAN_MSG"
-	ProducerGroup                  = "PGROUP"
-	MinOffset                      = "MIN_OFFSET"
-	MaxOffset                      = "MAX_OFFSET"
-	BuyerId                        = "BUYER_ID"
-	OriginMessageId                = "ORIGIN_MESSAGE_ID"
-	TransferFlag                   = "TRANSFER_FLAG"
-	CorrectionFlag                 = "CORRECTION_FLAG"
-	MQ2Flag                        = "MQ2_FLAG"
-	ReconsumeTime                  = "RECONSUME_TIME"
-	MsgRegion                      = "MSG_REGION"
-	TraceSwitch                    = "TRACE_ON"
-	UniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
-	MaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
-	ConsumeStartTime               = "CONSUME_START_TIME"
-	TranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
-	TranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
-	CheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
+	PropertyKeySeparator                   = " "
+	PropertyKeys                           = "KEYS"
+	PropertyTags                           = "TAGS"
+	PropertyWaitStoreMsgOk                 = "WAIT"
+	PropertyDelayTimeLevel                 = "DELAY"
+	PropertyRetryTopic                     = "RETRY_TOPIC"
+	PropertyRealTopic                      = "REAL_TOPIC"
+	PropertyRealQueueId                    = "REAL_QID"
+	PropertyTransactionPrepared            = "TRAN_MSG"
+	PropertyProducerGroup                  = "PGROUP"
+	PropertyMinOffset                      = "MIN_OFFSET"
+	PropertyMaxOffset                      = "MAX_OFFSET"
+	PropertyBuyerId                        = "BUYER_ID"
+	PropertyOriginMessageId                = "ORIGIN_MESSAGE_ID"
+	PropertyTransferFlag                   = "TRANSFER_FLAG"
+	PropertyCorrectionFlag                 = "CORRECTION_FLAG"
+	PropertyMQ2Flag                        = "MQ2_FLAG"
+	PropertyReconsumeTime                  = "RECONSUME_TIME"
+	PropertyMsgRegion                      = "MSG_REGION"
+	PropertyTraceSwitch                    = "TRACE_ON"
+	PropertyUniqueClientMessageIdKeyIndex  = "UNIQ_KEY"
+	PropertyMaxReconsumeTimes              = "MAX_RECONSUME_TIMES"
+	PropertyConsumeStartTime               = "CONSUME_START_TIME"
+	PropertyTranscationPreparedQueueOffset = "TRAN_PREPARED_QUEUE_OFFSET"
+	PropertyTranscationCheckTimes          = "TRANSACTION_CHECK_TIMES"
+	PropertyCheckImmunityTimeInSeconds     = "CHECK_IMMUNITY_TIME_IN_SECONDS"
 )
 
 type Message struct {
@@ -106,7 +106,7 @@ type MessageExt struct {
 }
 
 func (msgExt *MessageExt) GetTags() string {
-	return msgExt.Properties[Tags]
+	return msgExt.Properties[PropertyTags]
 }
 
 func (msgExt *MessageExt) String() string {
diff --git a/kernel/model.go b/kernel/model.go
index f9593ab..d9f4d00 100644
--- a/kernel/model.go
+++ b/kernel/model.go
@@ -20,7 +20,9 @@ package kernel
 import (
 	"bytes"
 	"encoding/binary"
+	"encoding/json"
 	"fmt"
+	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 )
 
@@ -62,7 +64,7 @@ type PullStatus int
 const (
 	PullFound PullStatus = iota
 	PullNoNewMsg
-	PullNoMatchedMsg
+	PullNoMsgMatched
 	PullOffsetIllegal
 	PullBrokerTimeout
 )
@@ -92,6 +94,10 @@ func (result *PullResult) GetMessages() []*Message {
 	return toMessages(result.messageExts)
 }
 
+func (result *PullResult) String() string {
+	return ""
+}
+
 func decodeMessage(data []byte) []*MessageExt {
 	msgs := make([]*MessageExt, 0)
 	buf := bytes.NewBuffer(data)
@@ -224,10 +230,15 @@ func (mq *MessageQueue) HashCode() int {
 	return result
 }
 
+func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
+	// TODO
+	return true
+}
+
 type FindBrokerResult struct {
 	BrokerAddr    string
 	Slave         bool
-	BrokerVersion int
+	BrokerVersion int32
 }
 
 type (
@@ -236,90 +247,16 @@ type (
 
 	consumeType string
 
-	MessageModel     int
-	ConsumeFromWhere int
-	ServiceState     int
+	ServiceState int
 )
 
 const (
-	ConsumeActively  = consumeType("PULL")
-	ConsumePassively = consumeType("PUSH")
-
-	BroadCasting = MessageModel(1)
-	Clustering   = MessageModel(2)
-
-	ConsumeFromLastOffset ConsumeFromWhere = iota
-	ConsumeFromFirstOffset
-	ConsumeFromTimestamp
-
-	CreateJust ServiceState = iota
-	Running
-	Shutdown
+	StateCreateJust ServiceState = iota
+	StateStartFailed
+	StateRunning
+	StateShutdown
 )
 
-func (mode MessageModel) String() string {
-	switch mode {
-	case BroadCasting:
-		return "BroadCasting"
-	case Clustering:
-		return "Clustering"
-	default:
-		return "Unknown"
-	}
-}
-
-type ExpressionType string
-
-const (
-	/**
-	 * <ul>
-	 * Keywords:
-	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
-	 * </ul>
-	 * <p/>
-	 * <ul>
-	 * Data type:
-	 * <li>Boolean, like: TRUE, FALSE</li>
-	 * <li>String, like: 'abc'</li>
-	 * <li>Decimal, like: 123</li>
-	 * <li>Float number, like: 3.1415</li>
-	 * </ul>
-	 * <p/>
-	 * <ul>
-	 * Grammar:
-	 * <li>{@code AND, OR}</li>
-	 * <li>{@code >, >=, <, <=, =}</li>
-	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
-	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
-	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
-	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
-	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
-	 * </ul>
-	 * <p/>
-	 * <p>
-	 * Example:
-	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
-	 * </p>
-	 */
-	SQL92 = ExpressionType("SQL92")
-
-	/**
-	 * Only support or operation such as
-	 * "tag1 || tag2 || tag3", <br>
-	 * If null or * expression, meaning subscribe all.
-	 */
-	TAG = ExpressionType("TAG")
-)
-
-func IsTagType(exp string) bool {
-	if exp == "" || exp == "TAG" {
-		return true
-	}
-	return false
-}
-
-var SubAll = "*"
-
 type SubscriptionData struct {
 	ClassFilterMode bool
 	Topic           string
@@ -327,20 +264,30 @@ type SubscriptionData struct {
 	Tags            map[string]bool
 	Codes           map[int32]bool
 	SubVersion      int64
-	ExpType         ExpressionType
+	ExpType         string
 }
 
 type consumerData struct {
-	groupName         string
-	cType             consumeType
-	messageModel      MessageModel
-	where             ConsumeFromWhere
-	subscriptionDatas []SubscriptionData
-	unitMode          bool
+	GroupName         string              `json:"groupName"`
+	CType             consumeType         `json:"consumeType"`
+	MessageModel      string              `json:"messageModel"`
+	Where             string              `json:"consumeFromWhere"`
+	SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
+	UnitMode          bool                `json:"unitMode"`
 }
 
 type heartbeatData struct {
-	clientId      string
-	producerDatas []producerData
-	consumerDatas []consumerData
+	ClientId      string         `json:"clientID"`
+	ProducerDatas []producerData `json:"producerDataSet"`
+	ConsumerDatas []consumerData `json:"consumerDataSet"`
+}
+
+func (data *heartbeatData) encode() []byte {
+	d, err := json.Marshal(data)
+	if err != nil {
+		rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+		return nil
+	}
+	rlog.Info(string(d))
+	return d
 }
diff --git a/kernel/perm.go b/kernel/perm.go
index e78a876..2d568d5 100644
--- a/kernel/perm.go
+++ b/kernel/perm.go
@@ -1,19 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file dqueueIstributed with
- * thqueueIs work for additional information regarding copyright ownership.
- * The ASF licenses thqueueIs file to You under the Apache License, Version 2.0
- * (the "License"); you may not use thqueueIs file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  dqueueIstributed under the License queueIs dqueueIstributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permqueueIssions and
- *  limitations under the License.
- */
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
 
 package kernel
 
diff --git a/kernel/request.go b/kernel/request.go
index c5e0fef..2647ad5 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -17,12 +17,19 @@ limitations under the License.
 
 package kernel
 
-import "fmt"
+import (
+	"fmt"
+	"time"
+)
 
 const (
-	ReqPullMessage         = int16(11)
-	ReqGetRouteInfoByTopic = int16(105)
-	ReqSendBatchMessage    = int16(320)
+	ReqPullMessage            = int16(11)
+	ReqHeartBeat              = int16(34)
+	ReqGetConsumerListByGroup = int16(38)
+	ReqLockBatchMQ            = int16(41)
+	ReqUnlockBatchMQ          = int16(42)
+	ReqGetRouteInfoByTopic    = int16(105)
+	ReqSendBatchMessage       = int16(320)
 )
 
 type SendMessageRequest struct {
@@ -49,24 +56,24 @@ func (request *SendMessageRequest) Decode(properties map[string]string) error {
 }
 
 type PullMessageRequest struct {
-	ConsumerGroup        string `json:"consumerGroup"`
-	Topic                string `json:"topic"`
-	QueueId              int32  `json:"queueId"`
-	QueueOffset          int64  `json:"queueOffset"`
-	MaxMsgNums           int32  `json:"maxMsgNums"`
-	SysFlag              int32  `json:"sysFlag"`
-	CommitOffset         int64  `json:"commitOffset"`
-	SuspendTimeoutMillis int64  `json:"suspendTimeoutMillis"`
-	SubExpression        string `json:"subscription"`
-	SubVersion           int64  `json:"subVersion"`
-	ExpressionType       string `json:"expressionType"`
+	ConsumerGroup        string        `json:"consumerGroup"`
+	Topic                string        `json:"topic"`
+	QueueId              int32         `json:"queueId"`
+	QueueOffset          int64         `json:"queueOffset"`
+	MaxMsgNums           int32         `json:"maxMsgNums"`
+	SysFlag              int32         `json:"sysFlag"`
+	CommitOffset         int64         `json:"commitOffset"`
+	SuspendTimeoutMillis time.Duration `json:"suspendTimeoutMillis"`
+	SubExpression        string        `json:"subscription"`
+	SubVersion           int64         `json:"subVersion"`
+	ExpressionType       string        `json:"expressionType"`
 }
 
 func (request *PullMessageRequest) Encode() map[string]string {
 	maps := make(map[string]string)
 	maps["consumerGroup"] = request.ConsumerGroup
 	maps["topic"] = request.Topic
-	maps["queueId"] = fmt.Sprintf("%d", request.QueueOffset)
+	maps["queueId"] = fmt.Sprintf("%d", request.QueueId)
 	maps["queueOffset"] = fmt.Sprintf("%d", request.QueueOffset)
 	maps["maxMsgNums"] = fmt.Sprintf("%d", request.MaxMsgNums)
 	maps["sysFlag"] = fmt.Sprintf("%d", request.SysFlag)
@@ -78,6 +85,16 @@ func (request *PullMessageRequest) Encode() map[string]string {
 	return maps
 }
 
+type GetConsumerList struct {
+	ConsumerGroup string `json:"consumerGroup"`
+}
+
+func (request *GetConsumerList) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["consumerGroup"] = request.ConsumerGroup
+	return maps
+}
+
 type GetMaxOffsetRequest struct {
 	Topic   string `json:"topic"`
 	QueueId int32  `json:"queueId"`
diff --git a/kernel/route.go b/kernel/route.go
index dc08edc..f8cd884 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -22,7 +22,9 @@ import (
 	"errors"
 	"github.com/apache/rocketmq-client-go/remote"
 	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/tidwall/gjson"
+	"math/rand"
 	"sort"
 	"strconv"
 	"strings"
@@ -32,7 +34,7 @@ import (
 )
 
 const (
-	requestTimeout   = 3000
+	requestTimeout   = 3 * time.Second
 	defaultTopic     = "TBW102"
 	defaultQueueNums = 4
 	MasterId         = int64(0)
@@ -49,9 +51,10 @@ var (
 	// brokerName -> map[string]int32
 	brokerVersionMap sync.Map
 
-	publishInfoMap sync.Map
-	routeDataMap   sync.Map
-	lockNamesrv    sync.Mutex
+	//publishInfoMap sync.Map
+	//subscribeInfoMap sync.Map
+	routeDataMap sync.Map
+	lockNamesrv  sync.Mutex
 )
 
 // key is topic, value is TopicPublishInfo
@@ -59,7 +62,7 @@ type TopicPublishInfo struct {
 	OrderTopic          bool
 	HaveTopicRouterInfo bool
 	MqList              []*MessageQueue
-	RouteData           *topicRouteData
+	RouteData           *TopicRouteData
 	TopicQueueIndex     int32
 }
 
@@ -76,55 +79,63 @@ func (info *TopicPublishInfo) fetchQueueIndex() int {
 	return int(qIndex) % length
 }
 
-func UpdateTopicRouteInfo(topic string) {
+func UpdateTopicRouteInfo(topic string) *TopicRouteData {
 	// Todo process lock timeout
 	lockNamesrv.Lock()
 	defer lockNamesrv.Unlock()
 
-	routeData, err := queryTopicRouteInfoFromServer(topic, requestTimeout)
+	routeData, err := queryTopicRouteInfoFromServer(topic)
 	if err != nil {
 		rlog.Warnf("query topic route from server error: %s", err)
-		return
+		return nil
 	}
 
 	if routeData == nil {
 		rlog.Warnf("queryTopicRouteInfoFromServer return nil, Topic: %s", topic)
-		return
+		return nil
 	}
 
-	var changed bool
 	oldRouteData, exist := routeDataMap.Load(topic)
-	if !exist || routeData == nil {
-		changed = true
-	} else {
-		changed = topicRouteDataIsChange(oldRouteData.(*topicRouteData), routeData)
-	}
-
-	if !changed {
-		changed = isNeedUpdateTopicRouteInfo(topic)
-	} else {
-		rlog.Infof("the topic[%s] route info changed, old[%v] ,new[%s]", topic, oldRouteData, routeData)
+	changed := true
+	if exist {
+		changed = topicRouteDataIsChange(oldRouteData.(*TopicRouteData), routeData)
 	}
 
-	if !changed {
-		return
+	if changed {
+		routeDataMap.Store(topic, routeData)
+		rlog.Infof("the topic [%s] route info changed, old %v ,new %s", topic,
+			oldRouteData, routeData.String())
+		for _, brokerData := range routeData.BrokerDataList {
+			brokerAddressesMap.Store(brokerData.BrokerName, brokerData)
+		}
 	}
 
-	newTopicRouteData := routeData.clone()
+	return routeData.clone()
+}
 
-	for _, brokerData := range newTopicRouteData.BrokerDataList {
-		brokerAddressesMap.Store(brokerData.BrokerName, brokerData.BrokerAddresses)
+func FindBrokerAddrByTopic(topic string) string {
+	v, exist := routeDataMap.Load(topic)
+	if !exist {
+		return ""
 	}
-
-	// update publish info
-	publishInfo := routeData2PublishInfo(topic, routeData)
-	publishInfo.HaveTopicRouterInfo = true
-
-	old, _ := publishInfoMap.Load(topic)
-	publishInfoMap.Store(topic, publishInfoMap)
-	if old != nil {
-		rlog.Infof("Old TopicPublishInfo [%s] removed.", old)
+	routeData := v.(*TopicRouteData)
+	if len(routeData.BrokerDataList) == 0 {
+		return ""
+	}
+	i := utils.AbsInt(rand.Int())
+	bd := routeData.BrokerDataList[i%len(routeData.BrokerDataList)]
+	addr := bd.BrokerAddresses[MasterId]
+	if addr == "" && len(bd.BrokerAddresses) > 0 {
+		i = i % len(bd.BrokerAddresses)
+		for _, v := range bd.BrokerAddresses {
+			if i <= 0 {
+				addr = v
+				break
+			}
+			i--
+		}
 	}
+	return addr
 }
 
 func FindBrokerAddressInPublish(brokerName string) string {
@@ -144,18 +155,20 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
 		found      = false
 	)
 
-	addrs, exist := brokerAddressesMap.Load(brokerName)
+	v, exist := brokerAddressesMap.Load(brokerName)
 
-	if exist {
-		for k, v := range addrs.(map[int64]string) {
-			if v != "" {
-				found = true
-				if k != MasterId {
-					slave = true
-				}
-				brokerAddr = v
-				break
+	if !exist {
+		return nil
+	}
+	data := v.(*BrokerData)
+	for k, v := range data.BrokerAddresses {
+		if v != "" {
+			found = true
+			if k != MasterId {
+				slave = true
 			}
+			brokerAddr = v
+			break
 		}
 	}
 
@@ -172,7 +185,7 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
 }
 
 func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
-	routeData, err := queryTopicRouteInfoFromServer(topic, 3*time.Second)
+	routeData, err := queryTopicRouteInfoFromServer(topic)
 
 	if err != nil {
 		return nil, err
@@ -190,14 +203,14 @@ func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
 	return mqs, nil
 }
 
-func findBrokerVersion(brokerName, brokerAddr string) int {
+func findBrokerVersion(brokerName, brokerAddr string) int32 {
 	versions, exist := brokerVersionMap.Load(brokerName)
 
 	if !exist {
 		return 0
 	}
 
-	v, exist := versions.(map[string]int)[brokerAddr]
+	v, exist := versions.(map[string]int32)[brokerAddr]
 
 	if exist {
 		return v
@@ -205,12 +218,12 @@ func findBrokerVersion(brokerName, brokerAddr string) int {
 	return 0
 }
 
-func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicRouteData, error) {
+func queryTopicRouteInfoFromServer(topic string) (*TopicRouteData, error) {
 	request := &GetRouteInfoRequest{
 		Topic: topic,
 	}
 	rc := remote.NewRemotingCommand(ReqGetRouteInfoByTopic, request, nil)
-	response, err := remote.InvokeSync(getNameServerAddress(), rc, timeout)
+	response, err := remote.InvokeSync(getNameServerAddress(), rc, requestTimeout)
 
 	if err != nil {
 		return nil, err
@@ -221,11 +234,11 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR
 		if response.Body == nil {
 			return nil, errors.New(response.Remark)
 		}
-		routeData := &topicRouteData{}
+		routeData := &TopicRouteData{}
 
 		err = routeData.decode(string(response.Body))
 		if err != nil {
-			rlog.Warnf("decode topicRouteData error: %s", err)
+			rlog.Warnf("decode TopicRouteData error: %s", err)
 			return nil, err
 		}
 		return routeData, nil
@@ -236,7 +249,7 @@ func queryTopicRouteInfoFromServer(topic string, timeout time.Duration) (*topicR
 	}
 }
 
-func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData) bool {
+func topicRouteDataIsChange(oldData *TopicRouteData, newData *TopicRouteData) bool {
 	if oldData == nil || newData == nil {
 		return true
 	}
@@ -259,13 +272,7 @@ func topicRouteDataIsChange(oldData *topicRouteData, newData *topicRouteData) bo
 	return !oldDataCloned.equals(newDataCloned)
 }
 
-func isNeedUpdateTopicRouteInfo(topic string) bool {
-	value, exist := publishInfoMap.Load(topic)
-
-	return !exist || value.(*TopicPublishInfo).isOK()
-}
-
-func routeData2PublishInfo(topic string, data *topicRouteData) *TopicPublishInfo {
+func routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo {
 	publishInfo := &TopicPublishInfo{
 		RouteData:  data,
 		OrderTopic: false,
@@ -329,16 +336,20 @@ func getNameServerAddress() string {
 	return "127.0.0.1:9876"
 }
 
-// topicRouteData topicRouteData
-type topicRouteData struct {
+// TopicRouteData TopicRouteData
+type TopicRouteData struct {
 	OrderTopicConf string
 	QueueDataList  []*QueueData  `json:"queueDatas"`
 	BrokerDataList []*BrokerData `json:"brokerDatas"`
 }
 
-func (routeData *topicRouteData) decode(data string) error {
+func (routeData *TopicRouteData) decode(data string) error {
 	res := gjson.Parse(data)
-	json.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList)
+	err := json.Unmarshal([]byte(res.Get("queueDatas").String()), &routeData.QueueDataList)
+
+	if err != nil {
+		return err
+	}
 
 	bds := res.Get("brokerDatas").Array()
 	routeData.BrokerDataList = make([]*BrokerData, len(bds))
@@ -365,8 +376,8 @@ func (routeData *topicRouteData) decode(data string) error {
 	return nil
 }
 
-func (routeData *topicRouteData) clone() *topicRouteData {
-	cloned := &topicRouteData{
+func (routeData *TopicRouteData) clone() *TopicRouteData {
+	cloned := &TopicRouteData{
 		OrderTopicConf: routeData.OrderTopicConf,
 		QueueDataList:  make([]*QueueData, len(routeData.QueueDataList)),
 		BrokerDataList: make([]*BrokerData, len(routeData.BrokerDataList)),
@@ -383,10 +394,15 @@ func (routeData *topicRouteData) clone() *topicRouteData {
 	return cloned
 }
 
-func (routeData *topicRouteData) equals(data *topicRouteData) bool {
+func (routeData *TopicRouteData) equals(data *TopicRouteData) bool {
 	return false
 }
 
+func (routeData *TopicRouteData) String() string {
+	data, _ := json.Marshal(routeData)
+	return string(data)
+}
+
 // QueueData QueueData
 type QueueData struct {
 	BrokerName     string `json:"brokerName"`
diff --git a/kernel/validators.go b/kernel/validators.go
new file mode 100644
index 0000000..f34bd58
--- /dev/null
+++ b/kernel/validators.go
@@ -0,0 +1,46 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+	"github.com/apache/rocketmq-client-go/rlog"
+	"regexp"
+)
+
+const (
+	_ValidPattern       = "^[%|a-zA-Z0-9_-]+$"
+	_CharacterMaxLength = 255
+)
+
+var (
+	_Pattern, _ = regexp.Compile("_ValidPattern")
+)
+
+func ValidateGroup(group string) {
+	if group == "" {
+		rlog.Fatal("consumerGroup is empty")
+	}
+
+	//if !_Pattern.Match([]byte(group)) {
+	//	rlog.Fatalf("the specified group[%s] contains illegal characters, allowing only %s", group, _ValidPattern)
+	//}
+
+	if len(group) > _CharacterMaxLength {
+		rlog.Fatal("the specified group is longer than group max length 255.")
+	}
+}
diff --git a/remote/codes.go b/remote/codes.go
deleted file mode 100644
index b94d2e2..0000000
--- a/remote/codes.go
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package remote
-
-const (
-	SEND_MESSAGE                        = 10
-	PULL_MESSAGE                        = 11
-	QUERY_MESSAGE                       = 12
-	QUERY_BROKER_OFFSET                 = 13
-	QUERY_CONSUMER_OFFSET               = 14
-	UPDATE_CONSUMER_OFFSET              = 15
-	UPDATE_AND_CREATE_TOPIC             = 17
-	GET_ALL_TOPIC_CONFIG                = 21
-	GET_TOPIC_CONFIG_LIST               = 22
-	GET_TOPIC_NAME_LIST                 = 23
-	UPDATE_BROKER_CONFIG                = 25
-	GET_BROKER_CONFIG                   = 26
-	TRIGGER_DELETE_FILES                = 27
-	GET_BROKER_RUNTIME_INFO             = 28
-	SEARCH_OFFSET_BY_TIMESTAMP          = 29
-	GET_MAX_OFFSET                      = 30
-	GET_MIN_OFFSET                      = 31
-	GET_EARLIEST_MSG_STORETIME          = 32
-	VIEW_MESSAGE_BY_ID                  = 33
-	HEART_BEAT                          = 34
-	UNREGISTER_CLIENT                   = 35
-	CONSUMER_SEND_MSG_BACK              = 36
-	END_TRANSACTION                     = 37
-	GET_CONSUMER_LIST_BY_GROUP          = 38
-	CHECK_TRANSACTION_STATE             = 39
-	NOTIFY_CONSUMER_IDS_CHANGED         = 40
-	LOCK_BATCH_MQ                       = 41
-	UNLOCK_BATCH_MQ                     = 42
-	GET_ALL_CONSUMER_OFFSET             = 43
-	GET_ALL_DELAY_OFFSET                = 45
-	PUT_KV_CONFIG                       = 100
-	GET_KV_CONFIG                       = 101
-	DELETE_KV_CONFIG                    = 102
-	REGISTER_BROKER                     = 103
-	UNREGISTER_BROKER                   = 104
-	GET_ROUTEINTO_BY_TOPIC              = 105
-	GET_BROKER_CLUSTER_INFO             = 106
-	UPDATE_AND_CREATE_SUBSCRIPTIONGROUP = 200
-	GET_ALL_SUBSCRIPTIONGROUP_CONFIG    = 201
-	GET_TOPIC_STATS_INFO                = 202
-	GET_CONSUMER_CONNECTION_LIST        = 203
-	GET_PRODUCER_CONNECTION_LIST        = 204
-	WIPE_WRITE_PERM_OF_BROKER           = 205
-
-	GET_ALL_TOPIC_LIST_FROM_NAMESERVER = 206
-	DELETE_SUBSCRIPTIONGROUP           = 207
-	GET_CONSUME_STATS                  = 208
-	SUSPEND_CONSUMER                   = 209
-	RESUME_CONSUMER                    = 210
-	RESET_CONSUMER_OFFSET_IN_CONSUMER  = 211
-	RESET_CONSUMER_OFFSET_IN_BROKER    = 212
-	ADJUST_CONSUMER_THREAD_POOL        = 213
-	WHO_CONSUME_THE_MESSAGE            = 214
-
-	DELETE_TOPIC_IN_BROKER    = 215
-	DELETE_TOPIC_IN_NAMESRV   = 216
-	GET_KV_CONFIG_BY_VALUE    = 217
-	DELETE_KV_CONFIG_BY_VALUE = 218
-	GET_KVLIST_BY_NAMESPACE   = 219
-
-	RESET_CONSUMER_CLIENT_OFFSET         = 220
-	GET_CONSUMER_STATUS_FROM_CLIENT      = 221
-	INVOKE_BROKER_TO_RESET_OFFSET        = 222
-	INVOKE_BROKER_TO_GET_CONSUMER_STATUS = 223
-
-	QUERY_TOPIC_CONSUME_BY_WHO = 300
-
-	GET_TOPICS_BY_CLUSTER = 224
-
-	REGISTER_FILTER_SERVER            = 301
-	REGISTER_MESSAGE_FILTER_CLASS     = 302
-	QUERY_CONSUME_TIME_SPAN           = 303
-	GET_SYSTEM_TOPIC_LIST_FROM_NS     = 304
-	GET_SYSTEM_TOPIC_LIST_FROM_BROKER = 305
-
-	CLEAN_EXPIRED_CONSUMEQUEUE = 306
-
-	GET_CONSUMER_RUNNING_INFO = 307
-
-	QUERY_CORRECTION_OFFSET = 308
-
-	CONSUME_MESSAGE_DIRECTLY = 309
-
-	SEND_MESSAGE_V2 = 310
-
-	GET_UNIT_TOPIC_LIST                = 311
-	GET_HAS_UNIT_SUB_TOPIC_LIST        = 312
-	GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST = 313
-	CLONE_GROUP_OFFSET                 = 314
-
-	VIEW_BROKER_STATS_DATA = 315
-)
-
-const (
-	SUCCESS                       = 0
-	SYSTEM_ERROR                  = 1
-	SYSTEM_BUSY                   = 2
-	REQUEST_CODE_NOT_SUPPORTED    = 3
-	TRANSACTION_FAILED            = 4
-	FLUSH_DISK_TIMEOUT            = 10
-	SLAVE_NOT_AVAILABLE           = 11
-	FLUSH_SLAVE_TIMEOUT           = 12
-	MESSAGE_ILLEGAL               = 13
-	SERVICE_NOT_AVAILABLE         = 14
-	VERSION_NOT_SUPPORTED         = 15
-	NO_PERMISSION                 = 16
-	TOPIC_NOT_EXIST               = 17
-	TOPIC_EXIST_ALREADY           = 18
-	PULL_NOT_FOUND                = 19
-	PULL_RETRY_IMMEDIATELY        = 20
-	PULL_OFFSET_MOVED             = 21
-	QUERY_NOT_FOUND               = 22
-	SUBSCRIPTION_PARSE_FAILED     = 23
-	SUBSCRIPTION_NOT_EXIST        = 24
-	SUBSCRIPTION_NOT_LATEST       = 25
-	SUBSCRIPTION_GROUP_NOT_EXIST  = 26
-	TRANSACTION_SHOULD_COMMIT     = 200
-	TRANSACTION_SHOULD_ROLLBACK   = 201
-	TRANSACTION_STATE_UNKNOW      = 202
-	TRANSACTION_STATE_GROUP_WRONG = 203
-	NO_BUYER_ID                   = 204
-
-	NOT_IN_CURRENT_UNIT = 205
-
-	CONSUMER_NOT_ONLINE = 206
-
-	CONSUME_MSG_TIMEOUT = 207
-)
diff --git a/remote/client.go b/remote/remote_client.go
similarity index 88%
rename from remote/client.go
rename to remote/remote_client.go
index bfc4c36..9f5839a 100644
--- a/remote/client.go
+++ b/remote/remote_client.go
@@ -21,6 +21,7 @@ import (
 	"bytes"
 	"encoding/binary"
 	"errors"
+	"github.com/apache/rocketmq-client-go/rlog"
 	"io"
 	"net"
 	"sync"
@@ -164,22 +165,26 @@ func connect(addr string) (net.Conn, error) {
 
 func receiveResponse(r net.Conn) {
 	scanner := createScanner(r)
-	for scanner.Scan() {
+	for {
+		scanner.Scan()
 		receivedRemotingCommand, err := decode(scanner.Bytes())
 		if err != nil {
 			closeConnection(r)
+			rlog.Error(err.Error())
 			break
 		}
 		if receivedRemotingCommand.isResponseType() {
-			resp, ok := responseTable.Load(receivedRemotingCommand.Opaque)
-			if ok {
+			resp, exist := responseTable.Load(receivedRemotingCommand.Opaque)
+			if exist {
 				responseTable.Delete(receivedRemotingCommand.Opaque)
 				responseFuture := resp.(*ResponseFuture)
-				responseFuture.ResponseCommand = receivedRemotingCommand
-				responseFuture.executeInvokeCallback()
-				if responseFuture.Done != nil {
-					responseFuture.Done <- true
-				}
+				go func() {
+					responseFuture.ResponseCommand = receivedRemotingCommand
+					responseFuture.executeInvokeCallback()
+					if responseFuture.Done != nil {
+						responseFuture.Done <- true
+					}
+				}()
 			}
 		} else {
 			// todo handler request from peer
@@ -190,10 +195,20 @@ func receiveResponse(r net.Conn) {
 func createScanner(r io.Reader) *bufio.Scanner {
 	scanner := bufio.NewScanner(r)
 	scanner.Split(func(data []byte, atEOF bool) (int, []byte, error) {
+		defer func() {
+			if err := recover(); err != nil {
+				rlog.Errorf("panic: %v", err)
+			}
+		}()
 		if !atEOF {
 			if len(data) >= 4 {
 				var length int32
-				binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
+				err := binary.Read(bytes.NewReader(data[0:4]), binary.BigEndian, &length)
+				if err != nil {
+					rlog.Errorf("split data error: %s", err.Error())
+					return 0, nil, err
+				}
+
 				if int(length)+4 <= len(data) {
 					return int(length) + 4, data[4 : length+4], nil
 				}
diff --git a/remote/client_test.go b/remote/remote_client_test.go
similarity index 100%
rename from remote/client_test.go
rename to remote/remote_client_test.go
diff --git a/rlog/log.go b/rlog/log.go
index c73c9ba..4f5adaf 100644
--- a/rlog/log.go
+++ b/rlog/log.go
@@ -22,8 +22,6 @@ import (
 )
 
 type Logger interface {
-	Print(i ...interface{})
-	Printf(format string, args ...interface{})
 	Debug(i ...interface{})
 	Debugf(format string, args ...interface{})
 	Info(i ...interface{})
@@ -42,14 +40,6 @@ func SetLogger(log Logger) {
 	rLog = log
 }
 
-func Print(i ...interface{}) {
-	rLog.Print(i...)
-}
-
-func Printf(format string, args ...interface{}) {
-	rLog.Printf(format, args...)
-}
-
 func Debug(i ...interface{}) {
 	rLog.Debug(i...)
 }
diff --git a/utils/helper.go b/utils/helper.go
index 485ae15..1ad0441 100644
--- a/utils/helper.go
+++ b/utils/helper.go
@@ -22,7 +22,6 @@ import (
 	"encoding/binary"
 	"errors"
 	"fmt"
-	"hash/crc32"
 	"net"
 	"os"
 	"sync"
@@ -99,14 +98,10 @@ func ClassLoaderID() int32 {
 	return 0
 }
 
-// HashString hashes a string to a unique hashcode.
-func HashString(s string) int {
-	if s == "" {
-		return 0
-	}
-	return int(crc32.ChecksumIEEE([]byte(s)))
-}
-
 func UnCompress(data []byte) []byte {
 	return data
 }
+
+func IsArrayEmpty(i ...interface{}) bool {
+	return i == nil || len(i) == 0
+}
diff --git a/utils/helper_test.go b/utils/helper_test.go
index 4e1877c..967b0ae 100644
--- a/utils/helper_test.go
+++ b/utils/helper_test.go
@@ -16,7 +16,9 @@
  */
 package utils
 
-import "testing"
+import (
+	"testing"
+)
 
 func TestClassLoaderID(t *testing.T) {
 	if ClassLoaderID() != 0 {
diff --git a/utils/math.go b/utils/math.go
new file mode 100644
index 0000000..816631e
--- /dev/null
+++ b/utils/math.go
@@ -0,0 +1,32 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+func AbsInt(i int) int {
+	if i >= 0 {
+		return i
+	}
+	return -i
+}
+
+func MinInt(a, b int) int {
+	if a < b {
+		return a
+	}
+	return b
+}
diff --git a/utils/string.go b/utils/string.go
new file mode 100644
index 0000000..6a74808
--- /dev/null
+++ b/utils/string.go
@@ -0,0 +1,40 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import "fmt"
+
+// HashString hashes a string to a unique hashcode.
+func HashString(s string) int {
+	val := []byte(s)
+	var h int
+
+	for idx := range val {
+		h = 31*h + int(val[idx])
+	}
+
+	return h
+}
+
+func StrJoin(str, key string, value interface{}) string {
+	if key == "" || value == "" {
+		return str
+	}
+
+	return str + key + ": " + fmt.Sprint(value) + ", "
+}


Mime
View raw message