rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From huzongt...@apache.org
Subject [rocketmq-client-go] branch native updated: Refactor directories & API (#82)
Date Tue, 02 Jul 2019 15:40:19 GMT
This is an automated email from the ASF dual-hosted git repository.

huzongtang pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 3011efa  Refactor directories & API (#82)
3011efa is described below

commit 3011efa4508e453f14e0ef83647aa0e429e1cb35
Author: wenfeng <sxian.wang@gmail.com>
AuthorDate: Tue Jul 2 23:40:15 2019 +0800

    Refactor directories & API (#82)
    
    * refactor directorie & apiss
    
    * fix ci failed
---
 api.go                                             |  54 ++++
 consumer/strategy.go                               | 130 ---------
 docs/Introduction.md                               |  12 +-
 examples/consumer/main.go                          |  16 +-
 examples/producer/main.go                          |   8 +-
 {consumer => internal/consumer}/consumer.go        | 301 ++++-----------------
 {consumer => internal/consumer}/consumer_test.go   |   0
 {consumer => internal/consumer}/offset_store.go    |  33 +--
 {consumer => internal/consumer}/process_queue.go   |  24 +-
 {consumer => internal/consumer}/pull_consumer.go   |  50 ++--
 .../consumer}/pull_consumer_test.go                |   0
 {consumer => internal/consumer}/push_consumer.go   | 103 +++----
 .../consumer}/push_consumer_test.go                |   0
 {consumer => internal/consumer}/statistics.go      |   0
 {kernel => internal/kernel}/client.go              |  83 +++---
 {kernel => internal/kernel}/client_test.go         |   0
 {kernel => internal/kernel}/constants.go           |   0
 internal/kernel/model.go                           |  80 ++++++
 {kernel => internal/kernel}/mq_version.go          |   0
 {kernel => internal/kernel}/perm.go                |   0
 {kernel => internal/kernel}/request.go             |   0
 {kernel => internal/kernel}/response.go            |   0
 {kernel => internal/kernel}/route.go               |  15 +-
 {kernel => internal/kernel}/route_test.go          |   0
 {kernel => internal/kernel}/transaction.go         |   0
 {kernel => internal/kernel}/validators.go          |   0
 {producer => internal/producer}/producer.go        |  29 +-
 {remote => internal/remote}/codec.go               |   0
 {remote => internal/remote}/codec_test.go          |   0
 {remote => internal/remote}/remote_client.go       |   0
 {remote => internal/remote}/remote_client_test.go  |   0
 {remote => internal/remote}/rpchook.go             |   0
 primitive/consume.go                               | 128 +++++++++
 {kernel => primitive}/message.go                   |  32 ++-
 primitive/options.go                               | 132 +++++++++
 kernel/model.go => primitive/result.go             |  86 +-----
 primitive/strategy.go                              | 117 ++++++++
 37 files changed, 760 insertions(+), 673 deletions(-)

diff --git a/api.go b/api.go
new file mode 100644
index 0000000..5043f95
--- /dev/null
+++ b/api.go
@@ -0,0 +1,54 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package rocketmq
+
+import (
+	"context"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+type Producer interface {
+	Start() error
+	Shutdown() error
+	SendSync(context.Context, ...*primitive.Message) (primitive.SendResult, error)
+	SendAsync(context.Context, func(primitive.SendResult), ...*primitive.Message) error
+	SendOneWay(context.Context, ...*primitive.Message) error
+}
+
+func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
+	return nil, nil
+}
+
+type PushConsumer interface {
+	Start() error
+	Shutdown() error
+	Subscribe(topic string, selector primitive.MessageSelector,
+		f func(context.Context, ...*primitive.MessageExt) (primitive.ConsumeResult, error)) error
+	Unsubscribe(string) error
+}
+
+type PullConsumer interface {
+	Start() error
+	Shutdown() error
+	Pull(context.Context, string, primitive.MessageSelector, int) (primitive.PullResult, error)
+	PullFrom(context.Context, primitive.MessageQueue, int64, int) (primitive.PullResult, error)
+	// only update in memory
+	UpdateOffset(primitive.MessageQueue, int64) error
+	PersistOffset(context.Context) error
+	CurrentOffset(primitive.MessageQueue) int64
+}
diff --git a/consumer/strategy.go b/consumer/strategy.go
deleted file mode 100644
index 1cb6b2b..0000000
--- a/consumer/strategy.go
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one or more
-contributor license agreements.  See the NOTICE file distributed with
-this work for additional information regarding copyright ownership.
-The ASF licenses this file to You under the Apache License, Version 2.0
-(the "License"); you may not use this file except in compliance with
-the License.  You may obtain a copy of the License at
-
-    http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package consumer
-
-import (
-	"github.com/apache/rocketmq-client-go/kernel"
-	"github.com/apache/rocketmq-client-go/rlog"
-	"github.com/apache/rocketmq-client-go/utils"
-)
-
-// Strategy Algorithm for message allocating between consumers
-type AllocateStrategy string
-
-const (
-	// An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
-	// specified.
-	//
-	// If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
-	// should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
-	// no alive consumer to monopolize them.
-	StrategyMachineNearby = AllocateStrategy("MachineNearby")
-
-	// Average Hashing queue algorithm
-	StrategyAveragely = AllocateStrategy("Averagely")
-
-	// Cycle average Hashing queue algorithm
-	StrategyAveragelyCircle = AllocateStrategy("AveragelyCircle")
-
-	// Use Message Queue specified
-	StrategyConfig = AllocateStrategy("Config")
-
-	// Computer room Hashing queue algorithm, such as Alipay logic room
-	StrategyMachineRoom = AllocateStrategy("MachineRoom")
-
-	// Consistent Hashing queue algorithm
-	StrategyConsistentHash = AllocateStrategy("ConsistentHash")
-)
-
-func allocateByAveragely(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
-	cidAll []string) []*kernel.MessageQueue {
-	if currentCID == "" || utils.IsArrayEmpty(mqAll) || utils.IsArrayEmpty(cidAll) {
-		return nil
-	}
-	var (
-		find  bool
-		index int
-	)
-
-	for idx := range cidAll {
-		if cidAll[idx] == currentCID {
-			find = true
-			index = idx
-			break
-		}
-	}
-	if !find {
-		rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in cidAll:%+v", consumerGroup, currentCID, cidAll)
-		return nil
-	}
-
-	mqSize := len(mqAll)
-	cidSize := len(cidAll)
-	mod := mqSize % cidSize
-
-	var averageSize int
-	if mqSize <= cidSize {
-		averageSize = 1
-	} else {
-		if mod > 0 && index < mod {
-			averageSize = mqSize/cidSize + 1
-		} else {
-			averageSize = mqSize / cidSize
-		}
-	}
-
-	var startIndex int
-	if mod > 0 && index < mod {
-		startIndex = index * averageSize
-	} else {
-		startIndex = index*averageSize + mod
-	}
-
-	num := utils.MinInt(averageSize, mqSize-startIndex)
-	result := make([]*kernel.MessageQueue, num)
-	for i := 0; i < num; i++ {
-		result[i] = mqAll[(startIndex+i)%mqSize]
-	}
-	return result
-}
-
-// TODO
-func allocateByMachineNearby(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
-	cidAll []string) []*kernel.MessageQueue {
-	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
-	cidAll []string) []*kernel.MessageQueue {
-	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByConfig(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
-	cidAll []string) []*kernel.MessageQueue {
-	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByMachineRoom(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
-	cidAll []string) []*kernel.MessageQueue {
-	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
-
-func allocateByConsistentHash(consumerGroup, currentCID string, mqAll []*kernel.MessageQueue,
-	cidAll []string) []*kernel.MessageQueue {
-	return allocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
-}
diff --git a/docs/Introduction.md b/docs/Introduction.md
index 9bc22c7..0db6e7b 100644
--- a/docs/Introduction.md
+++ b/docs/Introduction.md
@@ -20,8 +20,8 @@ rlog.SetLogger(Logger)
 Producer interface {
 	Start() error
 	Shutdown() error
-	SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
-	SendOneWay(context.Context, *kernel.Message) error
+	SendSync(context.Context, *primitive.Message) (*kernel.SendResult, error)
+	SendOneWay(context.Context, *primitive.Message) error
 }
 ```
 
@@ -42,7 +42,7 @@ err := p.Start()
 
 - send message with sync
 ```go
-result, err := p.SendSync(context.Background(), &kernel.Message{
+result, err := p.SendSync(context.Background(), &primitive.Message{
     Topic: "test",
     Body:  []byte("Hello RocketMQ Go Client!"),
 })
@@ -52,7 +52,7 @@ result, err := p.SendSync(context.Background(), &kernel.Message{
 
 - or send message with oneway
 ```go 
-err := p.SendOneWay(context.Background(), &kernel.Message{
+err := p.SendOneWay(context.Background(), &primitive.Message{
     Topic: "test",
     Body:  []byte("Hello RocketMQ Go Client!"),
 })
@@ -68,7 +68,7 @@ PushConsumer interface {
 	Start() error
 	Shutdown()
 	Subscribe(topic string, selector MessageSelector,
-		f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
+		f func(*ConsumeMessageContext, []*primitive.MessageExt) (ConsumeResult, error)) error
 }
 ```
 
@@ -85,7 +85,7 @@ c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
 - Subscribe a topic(only support one topic now), and define your consuming function
 ```go
 err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
-    msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+    msgs []*primitive.MessageExt) (consumer.ConsumeResult, error) {
     fmt.Println(msgs)
     return consumer.ConsumeSuccess, nil
 })
diff --git a/examples/consumer/main.go b/examples/consumer/main.go
index f660d43..65b0ba3 100644
--- a/examples/consumer/main.go
+++ b/examples/consumer/main.go
@@ -19,22 +19,22 @@ package main
 
 import (
 	"fmt"
-	"github.com/apache/rocketmq-client-go/consumer"
-	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/internal/consumer"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"os"
 	"time"
 )
 
 func main() {
-	c, _ := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
+	c, _ := consumer.NewPushConsumer("testGroup", primitive.ConsumerOption{
 		NameServerAddr: "127.0.0.1:9876",
-		ConsumerModel:  consumer.Clustering,
-		FromWhere:      consumer.ConsumeFromFirstOffset,
+		ConsumerModel:  primitive.Clustering,
+		FromWhere:      primitive.ConsumeFromFirstOffset,
 	})
-	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
-		msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
+	err := c.Subscribe("test", primitive.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
+		msgs []*primitive.MessageExt) (primitive.ConsumeResult, error) {
 		fmt.Println(msgs)
-		return consumer.ConsumeSuccess, nil
+		return primitive.ConsumeSuccess, nil
 	})
 	if err != nil {
 		fmt.Println(err.Error())
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 9cc626b..c3e338d 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -20,13 +20,13 @@ package main
 import (
 	"context"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"github.com/apache/rocketmq-client-go/producer"
+	"github.com/apache/rocketmq-client-go/internal/producer"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"os"
 )
 
 func main() {
-	opt := producer.ProducerOptions{
+	opt := primitive.ProducerOptions{
 		NameServerAddr:           "127.0.0.1:9876",
 		RetryTimesWhenSendFailed: 2,
 	}
@@ -37,7 +37,7 @@ func main() {
 		os.Exit(1)
 	}
 	for i := 0; i < 1000; i++ {
-		res, err := p.SendSync(context.Background(), &kernel.Message{
+		res, err := p.SendSync(context.Background(), &primitive.Message{
 			Topic: "test",
 			Body:  []byte("Hello RocketMQ Go Client!"),
 		})
diff --git a/consumer/consumer.go b/internal/consumer/consumer.go
similarity index 66%
rename from consumer/consumer.go
rename to internal/consumer/consumer.go
index 80e4fb5..08ae664 100644
--- a/consumer/consumer.go
+++ b/internal/consumer/consumer.go
@@ -20,8 +20,9 @@ package consumer
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/internal/kernel"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/tidwall/gjson"
@@ -53,135 +54,18 @@ const (
 	_PersistConsumerOffsetInterval = 5 * time.Second
 )
 
-// Message model defines the way how messages are delivered to each consumer clients.
-// </p>
-//
-// RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
-// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
-// balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
-// separately.
-// </p>
-//
-// This field defaults to clustering.
-type MessageModel int
-
-const (
-	BroadCasting MessageModel = iota
-	Clustering
-)
-
-func (mode MessageModel) String() string {
-	switch mode {
-	case BroadCasting:
-		return "BroadCasting"
-	case Clustering:
-		return "Clustering"
-	default:
-		return "Unknown"
-	}
-}
-
-// Consuming point on consumer booting.
-// </p>
-//
-// There are three consuming points:
-// <ul>
-// <li>
-// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
-// If it were a newly booting up consumer client, according aging of the consumer group, there are two
-// cases:
-// <ol>
-// <li>
-// if the consumer group is created so recently that the earliest message being subscribed has yet
-// expired, which means the consumer group represents a lately launched business, consuming will
-// start from the very beginning;
-// </li>
-// <li>
-// if the earliest message being subscribed has expired, consuming will start from the latest
-// messages, meaning messages born prior to the booting timestamp would be ignored.
-// </li>
-// </ol>
-// </li>
-// <li>
-// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
-// </li>
-// <li>
-// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
-// messages born prior to {@link #consumeTimestamp} will be ignored
-// </li>
-// </ul>
-type ConsumeFromWhere int
-
-const (
-	ConsumeFromLastOffset ConsumeFromWhere = iota
-	ConsumeFromFirstOffset
-	ConsumeFromTimestamp
-)
-
 type ConsumeType string
 
 const (
 	_PullConsume = ConsumeType("pull")
 	_PushConsume = ConsumeType("push")
-)
-
-type ExpressionType string
-
-const (
-	/**
-	 * <ul>
-	 * Keywords:
-	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
-	 * </ul>
-	 * <p/>
-	 * <ul>
-	 * Data type:
-	 * <li>Boolean, like: TRUE, FALSE</li>
-	 * <li>String, like: 'abc'</li>
-	 * <li>Decimal, like: 123</li>
-	 * <li>Float number, like: 3.1415</li>
-	 * </ul>
-	 * <p/>
-	 * <ul>
-	 * Grammar:
-	 * <li>{@code AND, OR}</li>
-	 * <li>{@code >, >=, <, <=, =}</li>
-	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
-	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
-	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
-	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
-	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
-	 * </ul>
-	 * <p/>
-	 * <p>
-	 * Example:
-	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
-	 * </p>
-	 */
-	SQL92 = ExpressionType("SQL92")
-
-	/**
-	 * Only support or operation such as
-	 * "tag1 || tag2 || tag3", <br>
-	 * If null or * expression, meaning subscribe all.
-	 */
-	TAG = ExpressionType("TAG")
-)
-
-func IsTagType(exp string) bool {
-	if exp == "" || exp == "TAG" {
-		return true
-	}
-	return false
-}
 
-const (
 	_SubAll = "*"
 )
 
 type PullRequest struct {
 	consumerGroup string
-	mq            *kernel.MessageQueue
+	mq            *primitive.MessageQueue
 	pq            *processQueue
 	nextOffset    int64
 	lockedFirst   bool
@@ -192,83 +76,6 @@ func (pr *PullRequest) String() string {
 		pr.consumerGroup, pr.mq.Topic, pr.mq.QueueId)
 }
 
-type ConsumerOption struct {
-	kernel.ClientOption
-	NameServerAddr string
-
-	/**
-	 * Backtracking consumption time with second precision. Time format is
-	 * 20131223171201<br>
-	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
-	 * Default backtracking consumption time Half an hour ago.
-	 */
-	ConsumeTimestamp string
-
-	// The socket timeout in milliseconds
-	ConsumerPullTimeout time.Duration
-
-	// Concurrently max span offset.it has no effect on sequential consumption
-	ConsumeConcurrentlyMaxSpan int
-
-	// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
-	// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
-	PullThresholdForQueue int64
-
-	// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
-	// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
-	//
-	// The size of a message only measured by message body, so it's not accurate
-	PullThresholdSizeForQueue int
-
-	// Flow control threshold on topic level, default value is -1(Unlimited)
-	//
-	// The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
-	// {@code pullThresholdForTopic} if it is't unlimited
-	//
-	// For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
-	// then pullThresholdForQueue will be set to 100
-	PullThresholdForTopic int
-
-	// Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
-	//
-	// The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
-	// {@code pullThresholdSizeForTopic} if it is't unlimited
-	//
-	// For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
-	// assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
-	PullThresholdSizeForTopic int
-
-	// Message pull Interval
-	PullInterval time.Duration
-
-	// Batch consumption size
-	ConsumeMessageBatchMaxSize int
-
-	// Batch pull size
-	PullBatchSize int32
-
-	// Whether update subscription relationship when every pull
-	PostSubscriptionWhenPull bool
-
-	// Max re-consume times. -1 means 16 times.
-	//
-	// If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
-	// queue waiting.
-	MaxReconsumeTimes int
-
-	// Suspending pulling time for cases requiring slow pulling like flow-control scenario.
-	SuspendCurrentQueueTimeMillis time.Duration
-
-	// Maximum amount of time a message may block the consuming thread.
-	ConsumeTimeout time.Duration
-
-	ConsumerModel  MessageModel
-	Strategy       AllocateStrategy
-	ConsumeOrderly bool
-	FromWhere      ConsumeFromWhere
-	// TODO traceDispatcher
-}
-
 // TODO hook
 type defaultConsumer struct {
 	/**
@@ -279,25 +86,25 @@ type defaultConsumer struct {
 	 * See <a href="http://rocketmq.apache.org/docs/core-concept/">here</a> for further discussion.
 	 */
 	consumerGroup  string
-	model          MessageModel
-	allocate       func(string, string, []*kernel.MessageQueue, []string) []*kernel.MessageQueue
+	model          primitive.MessageModel
+	allocate       func(string, string, []*primitive.MessageQueue, []string) []*primitive.MessageQueue
 	unitMode       bool
 	consumeOrderly bool
-	fromWhere      ConsumeFromWhere
+	fromWhere      primitive.ConsumeFromWhere
 
 	cType     ConsumeType
 	client    *kernel.RMQClient
-	mqChanged func(topic string, mqAll, mqDivided []*kernel.MessageQueue)
+	mqChanged func(topic string, mqAll, mqDivided []*primitive.MessageQueue)
 	state     kernel.ServiceState
 	pause     bool
 	once      sync.Once
-	option    ConsumerOption
-	// key: int, hash(*kernel.MessageQueue)
+	option    primitive.ConsumerOption
+	// key: int, hash(*primitive.MessageQueue)
 	// value: *processQueue
 	processQueueTable sync.Map
 
 	// key: topic(string)
-	// value: map[int]*kernel.MessageQueue
+	// value: map[int]*primitive.MessageQueue
 	topicSubscribeInfoTable sync.Map
 
 	// key: topic
@@ -314,19 +121,19 @@ func (dc *defaultConsumer) persistConsumerOffset() {
 		rlog.Errorf("consumer state error: %s", err.Error())
 		return
 	}
-	mqs := make([]*kernel.MessageQueue, 0)
+	mqs := make([]*primitive.MessageQueue, 0)
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
-		mqs = append(mqs, key.(*kernel.MessageQueue))
+		mqs = append(mqs, key.(*primitive.MessageQueue))
 		return true
 	})
 	dc.storage.persist(mqs)
 }
 
-func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
+func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
 	_, exist := dc.subscriptionDataTable.Load(topic)
 	// does subscribe, if true, replace it
 	if exist {
-		mqSet := make(map[int]*kernel.MessageQueue, 0)
+		mqSet := make(map[int]*primitive.MessageQueue, 0)
 		for idx := range mqs {
 			mq := mqs[idx]
 			mqSet[mq.HashCode()] = mq
@@ -355,23 +162,23 @@ func (dc *defaultConsumer) doBalance() {
 			rlog.Warnf("do balance of group: %s, but topic: %s does not exist.", dc.consumerGroup, topic)
 			return true
 		}
-		mqs := v.([]*kernel.MessageQueue)
+		mqs := v.([]*primitive.MessageQueue)
 		switch dc.model {
-		case BroadCasting:
+		case primitive.BroadCasting:
 			changed := dc.updateProcessQueueTable(topic, mqs)
 			if changed {
 				dc.mqChanged(topic, mqs, mqs)
 				rlog.Infof("messageQueueChanged, Group: %s, Topic: %s, MessageQueues: %v",
 					dc.consumerGroup, topic, mqs)
 			}
-		case Clustering:
+		case primitive.Clustering:
 			cidAll := dc.findConsumerList(topic)
 			if cidAll == nil {
 				rlog.Warnf("do balance for Group: %s, Topic: %s get consumer id list failed",
 					dc.consumerGroup, topic)
 				return true
 			}
-			mqAll := make([]*kernel.MessageQueue, len(mqs))
+			mqAll := make([]*primitive.MessageQueue, len(mqs))
 			copy(mqAll, mqs)
 			sort.Strings(cidAll)
 			sort.SliceStable(mqAll, func(i, j int) bool {
@@ -390,9 +197,9 @@ func (dc *defaultConsumer) doBalance() {
 			changed := dc.updateProcessQueueTable(topic, allocateResult)
 			if changed {
 				dc.mqChanged(topic, mqAll, allocateResult)
-				rlog.Infof("do balance result changed, allocateMessageQueueStrategyName=%s, group=%s, "+
+				rlog.Infof("do balance result changed, group=%s, "+
 					"topic=%s, clientId=%s, mqAllSize=%d, cidAllSize=%d, rebalanceResultSize=%d, "+
-					"rebalanceResultSet=%v", string(dc.option.Strategy), dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
+					"rebalanceResultSet=%v", dc.consumerGroup, topic, dc.client.ClientID(), len(mqAll),
 					len(cidAll), len(allocateResult), allocateResult)
 
 			}
@@ -418,12 +225,12 @@ func (dc *defaultConsumer) makeSureStateOK() error {
 }
 
 type lockBatchRequestBody struct {
-	ConsumerGroup string                 `json:"consumerGroup"`
-	ClientId      string                 `json:"clientId"`
-	MQs           []*kernel.MessageQueue `json:"mqSet"`
+	ConsumerGroup string                    `json:"consumerGroup"`
+	ClientId      string                    `json:"clientId"`
+	MQs           []*primitive.MessageQueue `json:"mqSet"`
 }
 
-func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
+func (dc *defaultConsumer) lock(mq *primitive.MessageQueue) bool {
 	brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, kernel.MasterId, true)
 
 	if brokerResult == nil {
@@ -433,7 +240,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
 	body := &lockBatchRequestBody{
 		ConsumerGroup: dc.consumerGroup,
 		ClientId:      dc.client.ClientID(),
-		MQs:           []*kernel.MessageQueue{mq},
+		MQs:           []*primitive.MessageQueue{mq},
 	}
 	lockedMQ := dc.doLock(brokerResult.BrokerAddr, body)
 	var lockOK bool
@@ -453,7 +260,7 @@ func (dc *defaultConsumer) lock(mq *kernel.MessageQueue) bool {
 	return lockOK
 }
 
-func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
+func (dc *defaultConsumer) unlock(mq *primitive.MessageQueue, oneway bool) {
 	brokerResult := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, kernel.MasterId, true)
 
 	if brokerResult == nil {
@@ -463,14 +270,14 @@ func (dc *defaultConsumer) unlock(mq *kernel.MessageQueue, oneway bool) {
 	body := &lockBatchRequestBody{
 		ConsumerGroup: dc.consumerGroup,
 		ClientId:      dc.client.ClientID(),
-		MQs:           []*kernel.MessageQueue{mq},
+		MQs:           []*primitive.MessageQueue{mq},
 	}
 	dc.doUnlock(brokerResult.BrokerAddr, body, oneway)
 	rlog.Warnf("unlock messageQueue. group:%s, clientId:%s, mq:%s",
 		dc.consumerGroup, dc.client.ClientID(), mq.String())
 }
 
-func (dc *defaultConsumer) lockAll(mq kernel.MessageQueue) {
+func (dc *defaultConsumer) lockAll(mq primitive.MessageQueue) {
 	mqMapSet := dc.buildProcessQueueTableByBrokerName()
 	for broker, mqs := range mqMapSet {
 		if len(mqs) == 0 {
@@ -539,7 +346,7 @@ func (dc *defaultConsumer) unlockAll(oneway bool) {
 	}
 }
 
-func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []kernel.MessageQueue {
+func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []primitive.MessageQueue {
 	data, _ := json.Marshal(body)
 	request := remote.NewRemotingCommand(kernel.ReqLockBatchMQ, nil, data)
 	response, err := dc.client.InvokeSync(addr, request, 1*time.Second)
@@ -548,7 +355,7 @@ func (dc *defaultConsumer) doLock(addr string, body *lockBatchRequestBody) []ker
 		return nil
 	}
 	lockOKMQSet := struct {
-		MQs []kernel.MessageQueue `json:"lockOKMQSet"`
+		MQs []primitive.MessageQueue `json:"lockOKMQSet"`
 	}{}
 	err = json.Unmarshal(response.Body, &lockOKMQSet)
 	if err != nil {
@@ -577,14 +384,14 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody, one
 	}
 }
 
-func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*kernel.MessageQueue {
-	result := make(map[string][]*kernel.MessageQueue, 0)
+func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*primitive.MessageQueue {
+	result := make(map[string][]*primitive.MessageQueue, 0)
 
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
-		mq := key.(*kernel.MessageQueue)
+		mq := key.(*primitive.MessageQueue)
 		mqs, exist := result[mq.BrokerName]
 		if !exist {
-			mqs = make([]*kernel.MessageQueue, 0)
+			mqs = make([]*primitive.MessageQueue, 0)
 		}
 		mqs = append(mqs, mq)
 		result[mq.BrokerName] = mqs
@@ -595,15 +402,15 @@ func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*ke
 }
 
 // TODO 问题不少 需要再好好对一下
-func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.MessageQueue) bool {
+func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*primitive.MessageQueue) bool {
 	var changed bool
-	mqSet := make(map[*kernel.MessageQueue]bool)
+	mqSet := make(map[*primitive.MessageQueue]bool)
 	for idx := range mqs {
 		mqSet[mqs[idx]] = true
 	}
 	// TODO
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
-		mq := key.(*kernel.MessageQueue)
+		mq := key.(*primitive.MessageQueue)
 		pq := value.(*processQueue)
 		if mq.Topic == topic {
 			if !mqSet[mq] {
@@ -666,13 +473,13 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.M
 	return changed
 }
 
-func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
-	dc.storage.persist([]*kernel.MessageQueue{mq})
+func (dc *defaultConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue, pq *processQueue) bool {
+	dc.storage.persist([]*primitive.MessageQueue{mq})
 	dc.storage.remove(mq)
 	return true
 }
 
-func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
+func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue) int64 {
 	if dc.cType == _PullConsume {
 		return 0
 	}
@@ -682,7 +489,7 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
 		result = lastOffset
 	} else {
 		switch dc.fromWhere {
-		case ConsumeFromLastOffset:
+		case primitive.ConsumeFromLastOffset:
 			if lastOffset == -1 {
 				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
 					lastOffset = 0
@@ -697,11 +504,11 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue) int64 {
 			} else {
 				result = -1
 			}
-		case ConsumeFromFirstOffset:
+		case primitive.ConsumeFromFirstOffset:
 			if lastOffset == -1 {
 				result = 0
 			}
-		case ConsumeFromTimestamp:
+		case primitive.ConsumeFromTimestamp:
 			if lastOffset == -1 {
 				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
 					lastOffset, err := dc.queryMaxOffset(mq)
@@ -760,12 +567,12 @@ func (dc *defaultConsumer) findConsumerList(topic string) []string {
 	return nil
 }
 
-func (dc *defaultConsumer) sendBack(msg *kernel.MessageExt, level int) error {
+func (dc *defaultConsumer) sendBack(msg *primitive.MessageExt, level int) error {
 	return nil
 }
 
 // QueryMaxOffset with specific queueId and topic
-func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, error) {
+func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue) (int64, error) {
 	brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
 	if brokerAddr == "" {
 		kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -790,7 +597,7 @@ func (dc *defaultConsumer) queryMaxOffset(mq *kernel.MessageQueue) (int64, error
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
-func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, timestamp int64) (int64, error) {
+func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, timestamp int64) (int64, error) {
 	brokerAddr := kernel.FindBrokerAddrByName(mq.BrokerName)
 	if brokerAddr == "" {
 		kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -815,19 +622,19 @@ func (dc *defaultConsumer) searchOffsetByTimestamp(mq *kernel.MessageQueue, time
 	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
-func buildSubscriptionData(topic string, selector MessageSelector) *kernel.SubscriptionData {
+func buildSubscriptionData(topic string, selector primitive.MessageSelector) *kernel.SubscriptionData {
 	subData := &kernel.SubscriptionData{
 		Topic:     topic,
 		SubString: selector.Expression,
 		ExpType:   string(selector.Type),
 	}
 
-	if selector.Type != "" && selector.Type != TAG {
+	if selector.Type != "" && selector.Type != primitive.TAG {
 		return subData
 	}
 
 	if selector.Expression == "" || selector.Expression == _SubAll {
-		subData.ExpType = string(TAG)
+		subData.ExpType = string(primitive.TAG)
 		subData.SubString = _SubAll
 	} else {
 		tags := strings.Split(selector.Expression, "\\|\\|")
@@ -847,7 +654,7 @@ func buildSubscriptionData(topic string, selector MessageSelector) *kernel.Subsc
 	return subData
 }
 
-func getNextQueueOf(topic string) *kernel.MessageQueue {
+func getNextQueueOf(topic string) *primitive.MessageQueue {
 	queues, err := kernel.FetchSubscribeMessageQueues(topic)
 	if err != nil && len(queues) > 0 {
 		rlog.Error(err.Error())
@@ -890,7 +697,7 @@ func clearCommitOffsetFlag(sysFlag int32) int32 {
 	return sysFlag & (^0x1 << 0)
 }
 
-func tryFindBroker(mq *kernel.MessageQueue) *kernel.FindBrokerResult {
+func tryFindBroker(mq *primitive.MessageQueue) *kernel.FindBrokerResult {
 	result := kernel.FindBrokerAddressInSubscribe(mq.BrokerName, recalculatePullFromWhichNode(mq), false)
 
 	if result == nil {
@@ -903,11 +710,11 @@ var (
 	pullFromWhichNodeTable sync.Map
 )
 
-func updatePullFromWhichNode(mq *kernel.MessageQueue, brokerId int64) {
+func updatePullFromWhichNode(mq *primitive.MessageQueue, brokerId int64) {
 	pullFromWhichNodeTable.Store(mq.HashCode(), brokerId)
 }
 
-func recalculatePullFromWhichNode(mq *kernel.MessageQueue) int64 {
+func recalculatePullFromWhichNode(mq *primitive.MessageQueue) int64 {
 	v, exist := pullFromWhichNodeTable.Load(mq.HashCode())
 	if exist {
 		return v.(int64)
diff --git a/consumer/consumer_test.go b/internal/consumer/consumer_test.go
similarity index 100%
rename from consumer/consumer_test.go
rename to internal/consumer/consumer_test.go
diff --git a/consumer/offset_store.go b/internal/consumer/offset_store.go
similarity index 86%
rename from consumer/offset_store.go
rename to internal/consumer/offset_store.go
index 0edf828..702ba0d 100644
--- a/consumer/offset_store.go
+++ b/internal/consumer/offset_store.go
@@ -20,8 +20,9 @@ package consumer
 import (
 	"encoding/json"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/internal/kernel"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 	"os"
@@ -50,10 +51,10 @@ func init() {
 }
 
 type OffsetStore interface {
-	persist(mqs []*kernel.MessageQueue)
-	remove(mq *kernel.MessageQueue)
-	read(mq *kernel.MessageQueue, t readType) int64
-	update(mq *kernel.MessageQueue, offset int64, increaseOnly bool)
+	persist(mqs []*primitive.MessageQueue)
+	remove(mq *primitive.MessageQueue)
+	read(mq *primitive.MessageQueue, t readType) int64
+	update(mq *primitive.MessageQueue, offset int64, increaseOnly bool)
 }
 
 type localFileOffsetStore struct {
@@ -108,7 +109,7 @@ func (local *localFileOffsetStore) load() {
 	}
 }
 
-func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+func (local *localFileOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
 	if t == _ReadFromMemory || t == _ReadMemoryThenStore {
 		off := readFromMemory(local.OffsetTable, mq)
 		if off >= 0 || (off == -1 && t == _ReadFromMemory) {
@@ -119,7 +120,7 @@ func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int
 	return readFromMemory(local.OffsetTable, mq)
 }
 
-func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+func (local *localFileOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
 	rlog.Debugf("update offset: %s to %d", mq, offset)
 	localOffset, exist := local.OffsetTable[mq.Topic]
 	if !exist {
@@ -143,7 +144,7 @@ func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64,
 	}
 }
 
-func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
+func (local *localFileOffsetStore) persist(mqs []*primitive.MessageQueue) {
 	if len(mqs) == 0 {
 		return
 	}
@@ -171,7 +172,7 @@ func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
 	utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path, data))
 }
 
-func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
+func (local *localFileOffsetStore) remove(mq *primitive.MessageQueue) {
 	// nothing to do
 }
 
@@ -190,7 +191,7 @@ func NewRemoteOffsetStore(group string, client *kernel.RMQClient) OffsetStore {
 	}
 }
 
-func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+func (r *remoteBrokerOffsetStore) persist(mqs []*primitive.MessageQueue) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
 	if len(mqs) == 0 {
@@ -217,7 +218,7 @@ func (r *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
 	}
 }
 
-func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+func (r *remoteBrokerOffsetStore) remove(mq *primitive.MessageQueue) {
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
 	if mq == nil {
@@ -231,7 +232,7 @@ func (r *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
 	delete(offset, mq.QueueId)
 }
 
-func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+func (r *remoteBrokerOffsetStore) read(mq *primitive.MessageQueue, t readType) int64 {
 	r.mutex.RLock()
 	if t == _ReadFromMemory || t == _ReadMemoryThenStore {
 		off := readFromMemory(r.OffsetTable, mq)
@@ -251,7 +252,7 @@ func (r *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int6
 	return off
 }
 
-func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly bool) {
+func (r *remoteBrokerOffsetStore) update(mq *primitive.MessageQueue, offset int64, increaseOnly bool) {
 	rlog.Debugf("update offset: %s to %d", mq, offset)
 	r.mutex.Lock()
 	defer r.mutex.Unlock()
@@ -278,7 +279,7 @@ func (r *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64,
 	}
 }
 
-func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
+func (r *remoteBrokerOffsetStore) fetchConsumeOffsetFromBroker(group string, mq *primitive.MessageQueue) (int64, error) {
 	broker := kernel.FindBrokerAddrByName(mq.BrokerName)
 	if broker == "" {
 		kernel.UpdateTopicRouteInfo(mq.Topic)
@@ -330,7 +331,7 @@ func (r *remoteBrokerOffsetStore) updateConsumeOffsetToBroker(group, topic strin
 	return r.client.InvokeOneWay(broker, cmd, 5*time.Second)
 }
 
-func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64 {
+func readFromMemory(table map[string]map[int]*queueOffset, mq *primitive.MessageQueue) int64 {
 	localOffset, exist := table[mq.Topic]
 	if !exist {
 		return -1
diff --git a/consumer/process_queue.go b/internal/consumer/process_queue.go
similarity index 87%
rename from consumer/process_queue.go
rename to internal/consumer/process_queue.go
index 77e84fa..2ec865c 100644
--- a/consumer/process_queue.go
+++ b/internal/consumer/process_queue.go
@@ -18,7 +18,7 @@ limitations under the License.
 package consumer
 
 import (
-	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/emirpasic/gods/maps/treemap"
 	"github.com/emirpasic/gods/utils"
@@ -51,7 +51,7 @@ type processQueue struct {
 	consuming                  bool
 	msgAccCnt                  int64
 	lockConsume                sync.Mutex
-	msgCh                      chan []*kernel.MessageExt
+	msgCh                      chan []*primitive.MessageExt
 }
 
 func newProcessQueue() *processQueue {
@@ -60,12 +60,12 @@ func newProcessQueue() *processQueue {
 		lastPullTime:    time.Now(),
 		lastConsumeTime: time.Now(),
 		lastLockTime:    time.Now(),
-		msgCh:           make(chan []*kernel.MessageExt, 32),
+		msgCh:           make(chan []*primitive.MessageExt, 32),
 	}
 	return pq
 }
 
-func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
+func (pq *processQueue) putMessage(messages ...*primitive.MessageExt) {
 	if messages == nil || len(messages) == 0 {
 		return
 	}
@@ -92,7 +92,7 @@ func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
 	}
 
 	msg := messages[len(messages)-1]
-	maxOffset, err := strconv.ParseInt(msg.Properties[kernel.PropertyMaxOffset], 10, 64)
+	maxOffset, err := strconv.ParseInt(msg.Properties[primitive.PropertyMaxOffset], 10, 64)
 	if err != nil {
 		acc := maxOffset - msg.QueueOffset
 		if acc > 0 {
@@ -101,7 +101,7 @@ func (pq *processQueue) putMessage(messages ...*kernel.MessageExt) {
 	}
 }
 
-func (pq *processQueue) removeMessage(messages ...*kernel.MessageExt) int64 {
+func (pq *processQueue) removeMessage(messages ...*primitive.MessageExt) int64 {
 	result := int64(-1)
 	pq.mutex.Lock()
 	pq.lastConsumeTime = time.Now()
@@ -152,8 +152,8 @@ func (pq *processQueue) cleanExpiredMsg(consumer defaultConsumer) {
 			return
 		}
 		_, firstValue := pq.msgCache.Min()
-		msg := firstValue.(*kernel.MessageExt)
-		startTime := msg.Properties[kernel.PropertyConsumeStartTime]
+		msg := firstValue.(*primitive.MessageExt)
+		startTime := msg.Properties[primitive.PropertyConsumeStartTime]
 		if startTime != "" {
 			st, err := strconv.ParseInt(startTime, 10, 64)
 			if err != nil {
@@ -187,15 +187,15 @@ func (pq *processQueue) getMaxSpan() int {
 	return int(lastKey.(int64) - firstKey.(int64))
 }
 
-func (pq *processQueue) getMessages() []*kernel.MessageExt {
+func (pq *processQueue) getMessages() []*primitive.MessageExt {
 	return <-pq.msgCh
 }
 
-func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
+func (pq *processQueue) takeMessages(number int) []*primitive.MessageExt {
 	for pq.msgCache.Empty() {
 		time.Sleep(10 * time.Millisecond)
 	}
-	result := make([]*kernel.MessageExt, number)
+	result := make([]*primitive.MessageExt, number)
 	i := 0
 	pq.mutex.Lock()
 	for ; i < number; i++ {
@@ -203,7 +203,7 @@ func (pq *processQueue) takeMessages(number int) []*kernel.MessageExt {
 		if v == nil {
 			break
 		}
-		result[i] = v.(*kernel.MessageExt)
+		result[i] = v.(*primitive.MessageExt)
 		pq.msgCache.Remove(k)
 	}
 	pq.mutex.Unlock()
diff --git a/consumer/pull_consumer.go b/internal/consumer/pull_consumer.go
similarity index 70%
rename from consumer/pull_consumer.go
rename to internal/consumer/pull_consumer.go
index 1b4f950..9250771 100644
--- a/consumer/pull_consumer.go
+++ b/internal/consumer/pull_consumer.go
@@ -21,27 +21,23 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/internal/kernel"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"strconv"
 	"sync"
 )
 
-type MessageSelector struct {
-	Type       ExpressionType
-	Expression string
-}
-
 type PullConsumer interface {
 	Start()
 	Shutdown()
-	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error)
+	Pull(ctx context.Context, topic string, selector primitive.MessageSelector, numbers int) (*primitive.PullResult, error)
 }
 
 var (
 	queueCounterTable sync.Map
 )
 
-func NewConsumer(config ConsumerOption) *defaultPullConsumer {
+func NewConsumer(config primitive.ConsumerOption) *defaultPullConsumer {
 	return &defaultPullConsumer{
 		option: config,
 	}
@@ -49,10 +45,10 @@ func NewConsumer(config ConsumerOption) *defaultPullConsumer {
 
 type defaultPullConsumer struct {
 	state     kernel.ServiceState
-	option    ConsumerOption
+	option    primitive.ConsumerOption
 	client    *kernel.RMQClient
 	GroupName string
-	Model     MessageModel
+	Model     primitive.MessageModel
 	UnitMode  bool
 }
 
@@ -60,7 +56,7 @@ func (c *defaultPullConsumer) Start() {
 	c.state = kernel.StateRunning
 }
 
-func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*kernel.PullResult, error) {
+func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector primitive.MessageSelector, numbers int) (*primitive.PullResult, error) {
 	mq := getNextQueueOf(topic)
 	if mq == nil {
 		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
@@ -78,22 +74,22 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector M
 }
 
 // SubscribeWithChan ack manually
-func (c *defaultPullConsumer) SubscribeWithChan(topic, selector MessageSelector) (chan *kernel.Message, error) {
+func (c *defaultPullConsumer) SubscribeWithChan(topic, selector primitive.MessageSelector) (chan *primitive.Message, error) {
 	return nil, nil
 }
 
 // SubscribeWithFunc ack automatic
-func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector MessageSelector,
-	f func(msg *kernel.Message) ConsumeResult) error {
+func (c *defaultPullConsumer) SubscribeWithFunc(topic, selector primitive.MessageSelector,
+	f func(msg *primitive.Message) primitive.ConsumeResult) error {
 	return nil
 }
 
-func (c *defaultPullConsumer) ACK(msg *kernel.Message, result ConsumeResult) {
+func (c *defaultPullConsumer) ACK(msg *primitive.Message, result primitive.ConsumeResult) {
 
 }
 
-func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue, data *kernel.SubscriptionData,
-	offset int64, numbers int) (*kernel.PullResult, error) {
+func (c *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQueue, data *kernel.SubscriptionData,
+	offset int64, numbers int) (*primitive.PullResult, error) {
 	err := c.makeSureStateOK()
 	if err != nil {
 		return nil, err
@@ -117,7 +113,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue,
 		return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
 	}
 
-	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < kernel.V4_1_0 {
+	if (data.ExpType == string(primitive.TAG)) && brokerResult.BrokerVersion < kernel.V4_1_0 {
 		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message by %v",
 			mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
 	}
@@ -140,7 +136,7 @@ func (c *defaultPullConsumer) pull(ctx context.Context, mq *kernel.MessageQueue,
 		ExpressionType:       string(data.ExpType),
 	}
 
-	if data.ExpType == string(TAG) {
+	if data.ExpType == string(primitive.TAG) {
 		pullRequest.SubVersion = 0
 	} else {
 		pullRequest.SubVersion = data.SubVersion
@@ -161,18 +157,18 @@ func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
 	// TODO
 }
 
-func (c *defaultPullConsumer) nextOffsetOf(queue *kernel.MessageQueue) int64 {
+func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 {
 	return 0
 }
 
-func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data *kernel.SubscriptionData) {
+func processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *kernel.SubscriptionData) {
 	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
 	switch result.Status {
-	case kernel.PullFound:
+	case primitive.PullFound:
 		msgs := result.GetMessageExts()
 		msgListFilterAgain := msgs
 		if len(data.Tags) > 0 && data.ClassFilterMode {
-			msgListFilterAgain = make([]*kernel.MessageExt, len(msgs))
+			msgListFilterAgain = make([]*primitive.MessageExt, len(msgs))
 			for _, msg := range msgs {
 				_, exist := data.Tags[msg.GetTags()]
 				if exist {
@@ -184,13 +180,13 @@ func processPullResult(mq *kernel.MessageQueue, result *kernel.PullResult, data
 		// TODO hook
 
 		for _, msg := range msgListFilterAgain {
-			traFlag, _ := strconv.ParseBool(msg.Properties[kernel.PropertyTransactionPrepared])
+			traFlag, _ := strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
 			if traFlag {
-				msg.TransactionId = msg.Properties[kernel.PropertyUniqueClientMessageIdKeyIndex]
+				msg.TransactionId = msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
 			}
 
-			msg.Properties[kernel.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
-			msg.Properties[kernel.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
+			msg.Properties[primitive.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
+			msg.Properties[primitive.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
 		}
 
 		result.SetMessageExts(msgListFilterAgain)
diff --git a/consumer/pull_consumer_test.go b/internal/consumer/pull_consumer_test.go
similarity index 100%
rename from consumer/pull_consumer_test.go
rename to internal/consumer/pull_consumer_test.go
diff --git a/consumer/push_consumer.go b/internal/consumer/push_consumer.go
similarity index 88%
rename from consumer/push_consumer.go
rename to internal/consumer/push_consumer.go
index 1cb5efd..b3d1e5d 100644
--- a/consumer/push_consumer.go
+++ b/internal/consumer/push_consumer.go
@@ -21,7 +21,8 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/internal/kernel"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 	"math"
@@ -38,31 +39,28 @@ import (
 // See quick start/Consumer in the example module for a typical usage.
 //
 // <strong>Thread Safety:</strong> After initialization, the instance can be regarded as thread-safe.
-type ConsumeResult int
 
 const (
-	Mb                           = 1024 * 1024
-	ConsumeSuccess ConsumeResult = iota
-	ConsumeRetryLater
+	Mb = 1024 * 1024
 )
 
 type PushConsumer interface {
 	Start() error
 	Shutdown()
-	Subscribe(topic string, selector MessageSelector,
-		f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error
+	Subscribe(topic string, selector primitive.MessageSelector,
+		f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error
 }
 
 type pushConsumer struct {
 	*defaultConsumer
 	queueFlowControlTimes        int
 	queueMaxSpanFlowControlTimes int
-	consume                      func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)
-	submitToConsume              func(*processQueue, *kernel.MessageQueue)
+	consume                      func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)
+	submitToConsume              func(*processQueue, *primitive.MessageQueue)
 	subscribedTopic              map[string]string
 }
 
-func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, error) {
+func NewPushConsumer(consumerGroup string, opt primitive.ConsumerOption) (PushConsumer, error) {
 	if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
 		return nil, err
 	}
@@ -86,23 +84,10 @@ func NewPushConsumer(consumerGroup string, opt ConsumerOption) (PushConsumer, er
 		option:         opt,
 	}
 
-	switch opt.Strategy {
-	case StrategyAveragely:
-		dc.allocate = allocateByAveragely
-	case StrategyAveragelyCircle:
-		dc.allocate = allocateByAveragelyCircle
-	case StrategyConfig:
-		dc.allocate = allocateByConfig
-	case StrategyConsistentHash:
-		dc.allocate = allocateByConsistentHash
-	case StrategyMachineNearby:
-		dc.allocate = allocateByMachineNearby
-	case StrategyMachineRoom:
-		dc.allocate = allocateByMachineRoom
-	default:
-		dc.allocate = allocateByAveragely
+	if opt.Strategy == nil {
+		opt.Strategy = primitive.AllocateByAveragely
 	}
-
+	dc.allocate = opt.Strategy
 	p := &pushConsumer{
 		defaultConsumer: dc,
 		subscribedTopic: make(map[string]string, 0),
@@ -124,15 +109,15 @@ func (pc *pushConsumer) Start() error {
 		pc.state = kernel.StateStartFailed
 		pc.validate()
 
-		if pc.model == Clustering {
+		if pc.model == primitive.Clustering {
 			// set retry topic
 			retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
 			pc.subscriptionDataTable.Store(retryTopic, buildSubscriptionData(retryTopic,
-				MessageSelector{TAG, _SubAll}))
+				primitive.MessageSelector{primitive.TAG, _SubAll}))
 		}
 
 		pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
-		if pc.model == Clustering {
+		if pc.model == primitive.Clustering {
 			pc.option.ChangeInstanceNameToPID()
 			pc.storage = NewRemoteOffsetStore(pc.consumerGroup, pc.client)
 		} else {
@@ -177,8 +162,8 @@ func (pc *pushConsumer) Start() error {
 
 func (pc *pushConsumer) Shutdown() {}
 
-func (pc *pushConsumer) Subscribe(topic string, selector MessageSelector,
-	f func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult, error)) error {
+func (pc *pushConsumer) Subscribe(topic string, selector primitive.MessageSelector,
+	f func(*ConsumeMessageContext, []*primitive.MessageExt) (primitive.ConsumeResult, error)) error {
 	if pc.state != kernel.StateCreateJust {
 		return errors.New("subscribe topic only started before")
 	}
@@ -197,7 +182,7 @@ func (pc *pushConsumer) PersistConsumerOffset() {
 	pc.defaultConsumer.persistConsumerOffset()
 }
 
-func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*kernel.MessageQueue) {
+func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue) {
 	pc.defaultConsumer.updateTopicSubscribeInfo(topic, mqs)
 }
 
@@ -213,7 +198,7 @@ func (pc *pushConsumer) IsUnitMode() bool {
 	return pc.unitMode
 }
 
-func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*kernel.MessageQueue) {
+func (pc *pushConsumer) messageQueueChanged(topic string, mqAll, mqDivided []*primitive.MessageQueue) {
 	// TODO
 }
 
@@ -399,7 +384,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			subExpression      string
 		)
 
-		if pc.model == Clustering {
+		if pc.model == primitive.Clustering {
 			commitOffsetValue = pc.storage.read(request.mq, _ReadFromMemory)
 			if commitOffsetValue > 0 {
 				commitOffsetEnable = true
@@ -423,7 +408,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			SysFlag:        sysFlag,
 			CommitOffset:   commitOffsetValue,
 			SubExpression:  _SubAll,
-			ExpressionType: string(TAG), // TODO
+			ExpressionType: string(primitive.TAG), // TODO
 		}
 		//
 		//if data.ExpType == string(TAG) {
@@ -445,14 +430,14 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			goto NEXT
 		}
 
-		if result.Status == kernel.PullBrokerTimeout {
+		if result.Status == primitive.PullBrokerTimeout {
 			rlog.Warnf("pull broker: %s timeout", brokerResult.BrokerAddr)
 			sleepTime = _PullDelayTimeWhenError
 			goto NEXT
 		}
 
 		switch result.Status {
-		case kernel.PullFound:
+		case primitive.PullFound:
 			rlog.Debugf("Topic: %s, QueueId: %d found messages: %d", request.mq.Topic, request.mq.QueueId,
 				len(result.GetMessageExts()))
 			prevRequestOffset := request.nextOffset
@@ -472,19 +457,19 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 				rlog.Warnf("[BUG] pull message result maybe data wrong, [nextBeginOffset=%d, "+
 					"firstMsgOffset=%d, prevRequestOffset=%d]", result.NextBeginOffset, firstMsgOffset, prevRequestOffset)
 			}
-		case kernel.PullNoNewMsg:
+		case primitive.PullNoNewMsg:
 			rlog.Debugf("Topic: %s, QueueId: %d no more msg, next offset: %d", request.mq.Topic, request.mq.QueueId, result.NextBeginOffset)
-		case kernel.PullNoMsgMatched:
+		case primitive.PullNoMsgMatched:
 			request.nextOffset = result.NextBeginOffset
 			pc.correctTagsOffset(request)
-		case kernel.PullOffsetIllegal:
+		case primitive.PullOffsetIllegal:
 			rlog.Warnf("the pull request offset illegal, {} {}", request.String(), result.String())
 			request.nextOffset = result.NextBeginOffset
 			pq.dropped = true
 			go func() {
 				time.Sleep(10 * time.Second)
 				pc.storage.update(request.mq, request.nextOffset, false)
-				pc.storage.persist([]*kernel.MessageQueue{request.mq})
+				pc.storage.persist([]*primitive.MessageQueue{request.mq})
 				pc.storage.remove(request.mq)
 				rlog.Warnf("fix the pull request offset: %s", request.String())
 			}()
@@ -499,7 +484,7 @@ func (pc *pushConsumer) correctTagsOffset(pr *PullRequest) {
 	// TODO
 }
 
-func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *kernel.MessageExt) bool {
+func (pc *pushConsumer) sendMessageBack(ctx *ConsumeMessageContext, msg *primitive.MessageExt) bool {
 	return true
 }
 
@@ -514,7 +499,7 @@ func (pc *pushConsumer) resume() {
 	rlog.Infof("resume consumer: %s", pc.consumerGroup)
 }
 
-func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]int64) {
+func (pc *pushConsumer) resetOffset(topic string, table map[primitive.MessageQueue]int64) {
 	//topic := cmd.ExtFields["topic"]
 	//group := cmd.ExtFields["group"]
 	//if topic == "" || group == "" {
@@ -529,7 +514,7 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
 	//rlog.Infof("invoke reset offset operation from broker. brokerAddr=%s, topic=%s, group=%s, timestamp=%v",
 	//	from, topic, group, t)
 	//
-	//offsetTable := make(map[kernel.MessageQueue]int64, 0)
+	//offsetTable := make(map[primitive.MessageQueue]int64, 0)
 	//err = json.Unmarshal(cmd.Body, &offsetTable)
 	//if err != nil {
 	//	rlog.Warnf("received reset offset command from: %s, but parse offset table: %s", err.Error())
@@ -541,7 +526,7 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
 	//	return
 	//}
 
-	set := make(map[int]*kernel.MessageQueue, 0)
+	set := make(map[int]*primitive.MessageQueue, 0)
 	for k := range table {
 		set[k.HashCode()] = &k
 	}
@@ -559,7 +544,7 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
 	if !exist {
 		return
 	}
-	queuesOfTopic := v.(map[int]*kernel.MessageQueue)
+	queuesOfTopic := v.(map[int]*primitive.MessageQueue)
 	for k := range queuesOfTopic {
 		q := set[k]
 		if q != nil {
@@ -575,9 +560,9 @@ func (pc *pushConsumer) resetOffset(topic string, table map[kernel.MessageQueue]
 	}
 }
 
-func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, pq *processQueue) bool {
+func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *primitive.MessageQueue, pq *processQueue) bool {
 	pc.defaultConsumer.removeUnnecessaryMessageQueue(mq, pq)
-	if !pc.consumeOrderly || Clustering != pc.model {
+	if !pc.consumeOrderly || primitive.Clustering != pc.model {
 		return true
 	}
 	// TODO orderly
@@ -586,21 +571,21 @@ func (pc *pushConsumer) removeUnnecessaryMessageQueue(mq *kernel.MessageQueue, p
 
 type ConsumeMessageContext struct {
 	consumerGroup string
-	msgs          []*kernel.MessageExt
-	mq            *kernel.MessageQueue
+	msgs          []*primitive.MessageExt
+	mq            *primitive.MessageQueue
 	success       bool
 	status        string
 	// mqTractContext
 	properties map[string]string
 }
 
-func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.MessageQueue) {
 	msgs := pq.getMessages()
 	if msgs == nil {
 		return
 	}
 	for count := 0; count < len(msgs); count++ {
-		var subMsgs []*kernel.MessageExt
+		var subMsgs []*primitive.MessageExt
 		if count+pc.option.ConsumeMessageBatchMaxSize > len(msgs) {
 			subMsgs = msgs[count:]
 			count = len(msgs)
@@ -626,11 +611,11 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
 			for idx := range subMsgs {
 				msg := subMsgs[idx]
 				if msg.Properties != nil {
-					retryTopic := msg.Properties[kernel.PropertyRetryTopic]
+					retryTopic := msg.Properties[primitive.PropertyRetryTopic]
 					if retryTopic == "" && groupTopic == msg.Topic {
 						msg.Topic = retryTopic
 					}
-					subMsgs[idx].Properties[kernel.PropertyConsumeStartTime] = strconv.FormatInt(
+					subMsgs[idx].Properties[primitive.PropertyConsumeStartTime] = strconv.FormatInt(
 						beginTime.UnixNano()/int64(time.Millisecond), 10)
 				}
 			}
@@ -640,7 +625,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
 				ctx.properties["ConsumeContextType"] = "EXCEPTION"
 			} else if consumeRT >= pc.option.ConsumeTimeout {
 				ctx.properties["ConsumeContextType"] = "TIMEOUT"
-			} else if result == ConsumeSuccess {
+			} else if result == primitive.ConsumeSuccess {
 				ctx.properties["ConsumeContextType"] = "SUCCESS"
 			} else {
 				ctx.properties["ConsumeContextType"] = "RECONSUME_LATER"
@@ -650,12 +635,12 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
 			increaseConsumeRT(pc.consumerGroup, mq.Topic, consumeRT)
 
 			if !pq.dropped {
-				msgBackFailed := make([]*kernel.MessageExt, 0)
-				if result == ConsumeSuccess {
+				msgBackFailed := make([]*primitive.MessageExt, 0)
+				if result == primitive.ConsumeSuccess {
 					increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
 				} else {
 					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
-					if pc.model == BroadCasting {
+					if pc.model == primitive.BroadCasting {
 						for i := 0; i < len(msgs); i++ {
 							rlog.Warnf("BROADCASTING, the message=%s consume failed, drop it, {}", subMsgs[i])
 						}
@@ -688,5 +673,5 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *kernel.Mes
 	}
 }
 
-func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *kernel.MessageQueue) {
+func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.MessageQueue) {
 }
diff --git a/consumer/push_consumer_test.go b/internal/consumer/push_consumer_test.go
similarity index 100%
rename from consumer/push_consumer_test.go
rename to internal/consumer/push_consumer_test.go
diff --git a/consumer/statistics.go b/internal/consumer/statistics.go
similarity index 100%
rename from consumer/statistics.go
rename to internal/consumer/statistics.go
diff --git a/kernel/client.go b/internal/kernel/client.go
similarity index 86%
rename from kernel/client.go
rename to internal/kernel/client.go
index 9dbecc6..78de681 100644
--- a/kernel/client.go
+++ b/internal/kernel/client.go
@@ -22,7 +22,8 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"os"
 	"strconv"
@@ -65,28 +66,6 @@ func init() {
 	}
 }
 
-type ClientOption struct {
-	NameServerAddr    string
-	ClientIP          string
-	InstanceName      string
-	UnitMode          bool
-	UnitName          string
-	VIPChannelEnabled bool
-	UseTLS            bool
-}
-
-func (opt *ClientOption) ChangeInstanceNameToPID() {
-	if opt.InstanceName == "DEFAULT" {
-		opt.InstanceName = strconv.Itoa(os.Getegid())
-	}
-}
-
-func (opt *ClientOption) String() string {
-	return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
-		"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", opt.ClientIP,
-		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.UseTLS)
-}
-
 type InnerProducer interface {
 	PublishTopicList() []string
 	UpdateTopicPublishInfo(topic string, info *TopicPublishInfo)
@@ -97,7 +76,7 @@ type InnerProducer interface {
 
 type InnerConsumer interface {
 	PersistConsumerOffset()
-	UpdateTopicSubscribeInfo(topic string, mqs []*MessageQueue)
+	UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
 	IsSubscribeTopicNeedUpdate(topic string) bool
 	SubscriptionDataList() []*SubscriptionData
 	Rebalance()
@@ -105,7 +84,7 @@ type InnerConsumer interface {
 }
 
 type RMQClient struct {
-	option ClientOption
+	option primitive.ClientOption
 	// group -> InnerProducer
 	producerMap sync.Map
 
@@ -119,7 +98,7 @@ type RMQClient struct {
 
 var clientMap sync.Map
 
-func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
+func GetOrNewRocketMQClient(option primitive.ClientOption) *RMQClient {
 	client := &RMQClient{
 		option:       option,
 		remoteClient: remote.NewRemotingClient(),
@@ -298,12 +277,12 @@ func (c *RMQClient) UpdateTopicRouteInfo() {
 
 // SendMessageAsync send message with batch by async
 func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs, brokerName string, request *SendMessageRequest,
-	msgs []*Message, f func(result *SendResult)) error {
+	msgs []*primitive.Message, f func(result *primitive.SendResult)) error {
 	return nil
 }
 
 func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
-	msgs []*Message) (*SendResult, error) {
+	msgs []*primitive.Message) (*primitive.SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
 	err := c.remoteClient.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
@@ -312,28 +291,28 @@ func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, r
 	return nil, err
 }
 
-func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*Message) *SendResult {
-	var status SendStatus
+func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingCommand, msgs ...*primitive.Message) *primitive.SendResult {
+	var status primitive.SendStatus
 	switch cmd.Code {
 	case ResFlushDiskTimeout:
-		status = SendFlushDiskTimeout
+		status = primitive.SendFlushDiskTimeout
 	case ResFlushSlaveTimeout:
-		status = SendFlushSlaveTimeout
+		status = primitive.SendFlushSlaveTimeout
 	case ResSlaveNotAvailable:
-		status = SendSlaveNotAvailable
+		status = primitive.SendSlaveNotAvailable
 	case ResSuccess:
-		status = SendOK
+		status = primitive.SendOK
 	default:
 		// TODO process unknown code
 	}
 
 	msgIDs := make([]string, 0)
 	for i := 0; i < len(msgs); i++ {
-		msgIDs = append(msgIDs, msgs[i].Properties[PropertyUniqueClientMessageIdKeyIndex])
+		msgIDs = append(msgIDs, msgs[i].Properties[primitive.PropertyUniqueClientMessageIdKeyIndex])
 	}
 
-	regionId := cmd.ExtFields[PropertyMsgRegion]
-	trace := cmd.ExtFields[PropertyTraceSwitch]
+	regionId := cmd.ExtFields[primitive.PropertyMsgRegion]
+	trace := cmd.ExtFields[primitive.PropertyTraceSwitch]
 
 	if regionId == "" {
 		regionId = defaultTraceRegionID
@@ -341,11 +320,11 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 
 	qId, _ := strconv.Atoi(cmd.ExtFields["queueId"])
 	off, _ := strconv.ParseInt(cmd.ExtFields["queueOffset"], 10, 64)
-	return &SendResult{
+	return &primitive.SendResult{
 		Status:      status,
 		MsgID:       cmd.ExtFields["msgId"],
 		OffsetMsgID: cmd.ExtFields["msgId"],
-		MessageQueue: &MessageQueue{
+		MessageQueue: &primitive.MessageQueue{
 			Topic:      msgs[0].Topic,
 			BrokerName: brokerName,
 			QueueId:    qId,
@@ -358,7 +337,7 @@ func (c *RMQClient) ProcessSendResponse(brokerName string, cmd *remote.RemotingC
 }
 
 // PullMessage with sync
-func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*PullResult, error) {
+func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request *PullMessageRequest) (*primitive.PullResult, error) {
 	cmd := remote.NewRemotingCommand(ReqPullMessage, request, nil)
 	res, err := c.remoteClient.InvokeSync(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
@@ -368,17 +347,17 @@ func (c *RMQClient) PullMessage(ctx context.Context, brokerAddrs string, request
 	return c.processPullResponse(res)
 }
 
-func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*PullResult, error) {
-	pullResult := &PullResult{}
+func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*primitive.PullResult, error) {
+	pullResult := &primitive.PullResult{}
 	switch response.Code {
 	case ResSuccess:
-		pullResult.Status = PullFound
+		pullResult.Status = primitive.PullFound
 	case ResPullNotFound:
-		pullResult.Status = PullNoNewMsg
+		pullResult.Status = primitive.PullNoNewMsg
 	case ResPullRetryImmediately:
-		pullResult.Status = PullNoMsgMatched
+		pullResult.Status = primitive.PullNoMsgMatched
 	case ResPullOffsetMoved:
-		pullResult.Status = PullOffsetIllegal
+		pullResult.Status = primitive.PullOffsetIllegal
 	default:
 		return nil, fmt.Errorf("unknown Response Code: %d, remark: %s", response.Code, response.Remark)
 	}
@@ -403,13 +382,13 @@ func (c *RMQClient) processPullResponse(response *remote.RemotingCommand) (*Pull
 		pullResult.SuggestWhichBrokerId, _ = strconv.ParseInt(v, 10, 64)
 	}
 
-	pullResult.messageExts = decodeMessage(response.Body)
+	//pullResult.messageExts = decodeMessage(response.Body) TODO parse in top
 
 	return pullResult, nil
 }
 
 // PullMessageAsync pull message async
-func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *PullResult)) error {
+func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs string, request *PullMessageRequest, f func(result *primitive.PullResult)) error {
 	return nil
 }
 
@@ -501,13 +480,13 @@ func (c *RMQClient) isNeedUpdateSubscribeInfo(topic string) bool {
 	return result
 }
 
-func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*MessageQueue {
-	list := make([]*MessageQueue, 0)
+func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*primitive.MessageQueue {
+	list := make([]*primitive.MessageQueue, 0)
 	for idx := range data.QueueDataList {
 		qd := data.QueueDataList[idx]
 		if queueIsReadable(qd.Perm) {
 			for i := 0; i < qd.ReadQueueNums; i++ {
-				list = append(list, &MessageQueue{
+				list = append(list, &primitive.MessageQueue{
 					Topic:      topic,
 					BrokerName: qd.BrokerName,
 					QueueId:    i,
@@ -518,7 +497,7 @@ func routeData2SubscribeInfo(topic string, data *TopicRouteData) []*MessageQueue
 	return list
 }
 
-func encodeMessages(message []*Message) []byte {
+func encodeMessages(message []*primitive.Message) []byte {
 	var buffer bytes.Buffer
 	index := 0
 	for index < len(message) {
diff --git a/kernel/client_test.go b/internal/kernel/client_test.go
similarity index 100%
rename from kernel/client_test.go
rename to internal/kernel/client_test.go
diff --git a/kernel/constants.go b/internal/kernel/constants.go
similarity index 100%
rename from kernel/constants.go
rename to internal/kernel/constants.go
diff --git a/internal/kernel/model.go b/internal/kernel/model.go
new file mode 100644
index 0000000..b3ef62e
--- /dev/null
+++ b/internal/kernel/model.go
@@ -0,0 +1,80 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package kernel
+
+import (
+	"encoding/json"
+	"github.com/apache/rocketmq-client-go/rlog"
+)
+
+type FindBrokerResult struct {
+	BrokerAddr    string
+	Slave         bool
+	BrokerVersion int32
+}
+
+type (
+	// groupName of consumer
+	producerData string
+
+	consumeType string
+
+	ServiceState int
+)
+
+const (
+	StateCreateJust ServiceState = iota
+	StateStartFailed
+	StateRunning
+	StateShutdown
+)
+
+type SubscriptionData struct {
+	ClassFilterMode bool
+	Topic           string
+	SubString       string
+	Tags            map[string]bool
+	Codes           map[int32]bool
+	SubVersion      int64
+	ExpType         string
+}
+
+type consumerData struct {
+	GroupName         string              `json:"groupName"`
+	CType             consumeType         `json:"consumeType"`
+	MessageModel      string              `json:"messageModel"`
+	Where             string              `json:"consumeFromWhere"`
+	SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
+	UnitMode          bool                `json:"unitMode"`
+}
+
+type heartbeatData struct {
+	ClientId      string         `json:"clientID"`
+	ProducerDatas []producerData `json:"producerDataSet"`
+	ConsumerDatas []consumerData `json:"consumerDataSet"`
+}
+
+func (data *heartbeatData) encode() []byte {
+	d, err := json.Marshal(data)
+	if err != nil {
+		rlog.Errorf("marshal heartbeatData error: %s", err.Error())
+		return nil
+	}
+	rlog.Info(string(d))
+	return d
+}
diff --git a/kernel/mq_version.go b/internal/kernel/mq_version.go
similarity index 100%
rename from kernel/mq_version.go
rename to internal/kernel/mq_version.go
diff --git a/kernel/perm.go b/internal/kernel/perm.go
similarity index 100%
rename from kernel/perm.go
rename to internal/kernel/perm.go
diff --git a/kernel/request.go b/internal/kernel/request.go
similarity index 100%
rename from kernel/request.go
rename to internal/kernel/request.go
diff --git a/kernel/response.go b/internal/kernel/response.go
similarity index 100%
rename from kernel/response.go
rename to internal/kernel/response.go
diff --git a/kernel/route.go b/internal/kernel/route.go
similarity index 96%
rename from kernel/route.go
rename to internal/kernel/route.go
index d0678e6..f216c0e 100644
--- a/kernel/route.go
+++ b/internal/kernel/route.go
@@ -20,7 +20,8 @@ package kernel
 import (
 	"encoding/json"
 	"errors"
-	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 	"github.com/tidwall/gjson"
@@ -98,7 +99,7 @@ func cleanOfflineBroker() {
 type TopicPublishInfo struct {
 	OrderTopic          bool
 	HaveTopicRouterInfo bool
-	MqList              []*MessageQueue
+	MqList              []*primitive.MessageQueue
 	RouteData           *TopicRouteData
 	TopicQueueIndex     int32
 }
@@ -221,19 +222,19 @@ func FindBrokerAddressInSubscribe(brokerName string, brokerId int64, onlyThisBro
 	return result
 }
 
-func FetchSubscribeMessageQueues(topic string) ([]*MessageQueue, error) {
+func FetchSubscribeMessageQueues(topic string) ([]*primitive.MessageQueue, error) {
 	routeData, err := queryTopicRouteInfoFromServer(topic)
 
 	if err != nil {
 		return nil, err
 	}
 
-	mqs := make([]*MessageQueue, 0)
+	mqs := make([]*primitive.MessageQueue, 0)
 
 	for _, qd := range routeData.QueueDataList {
 		if queueIsReadable(qd.Perm) {
 			for i := 0; i < qd.ReadQueueNums; i++ {
-				mqs = append(mqs, &MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
+				mqs = append(mqs, &primitive.MessageQueue{Topic: topic, BrokerName: qd.BrokerName, QueueId: i})
 			}
 		}
 	}
@@ -321,7 +322,7 @@ func routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo
 			item := strings.Split(broker, ":")
 			nums, _ := strconv.Atoi(item[1])
 			for i := 0; i < nums; i++ {
-				mq := &MessageQueue{
+				mq := &primitive.MessageQueue{
 					Topic:      topic,
 					BrokerName: item[0],
 					QueueId:    i,
@@ -357,7 +358,7 @@ func routeData2PublishInfo(topic string, data *TopicRouteData) *TopicPublishInfo
 		}
 
 		for i := 0; i < qd.WriteQueueNums; i++ {
-			mq := &MessageQueue{
+			mq := &primitive.MessageQueue{
 				Topic:      topic,
 				BrokerName: qd.BrokerName,
 				QueueId:    i,
diff --git a/kernel/route_test.go b/internal/kernel/route_test.go
similarity index 100%
rename from kernel/route_test.go
rename to internal/kernel/route_test.go
diff --git a/kernel/transaction.go b/internal/kernel/transaction.go
similarity index 100%
rename from kernel/transaction.go
rename to internal/kernel/transaction.go
diff --git a/kernel/validators.go b/internal/kernel/validators.go
similarity index 100%
rename from kernel/validators.go
rename to internal/kernel/validators.go
diff --git a/producer/producer.go b/internal/producer/producer.go
similarity index 85%
rename from producer/producer.go
rename to internal/producer/producer.go
index 8bb0bf8..52657ee 100644
--- a/producer/producer.go
+++ b/internal/producer/producer.go
@@ -21,8 +21,9 @@ import (
 	"context"
 	"errors"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/internal/kernel"
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 	"os"
@@ -34,11 +35,11 @@ import (
 type Producer interface {
 	Start() error
 	Shutdown() error
-	SendSync(context.Context, *kernel.Message) (*kernel.SendResult, error)
-	SendOneWay(context.Context, *kernel.Message) error
+	SendSync(context.Context, *primitive.Message) (*primitive.SendResult, error)
+	SendOneWay(context.Context, *primitive.Message) error
 }
 
-func NewProducer(opt ProducerOptions) (Producer, error) {
+func NewProducer(opt primitive.ProducerOptions) (Producer, error) {
 	if err := utils.VerifyIP(opt.NameServerAddr); err != nil {
 		return nil, err
 	}
@@ -63,18 +64,10 @@ type defaultProducer struct {
 	group       string
 	client      *kernel.RMQClient
 	state       kernel.ServiceState
-	options     ProducerOptions
+	options     primitive.ProducerOptions
 	publishInfo sync.Map
 }
 
-type ProducerOptions struct {
-	kernel.ClientOption
-	NameServerAddr           string
-	GroupName                string
-	RetryTimesWhenSendFailed int
-	UnitMode                 bool
-}
-
 func (p *defaultProducer) Start() error {
 	p.state = kernel.StateRunning
 	p.client.RegisterProducer(p.group, p)
@@ -86,7 +79,7 @@ func (p *defaultProducer) Shutdown() error {
 	return nil
 }
 
-func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message) (*kernel.SendResult, error) {
+func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message) (*primitive.SendResult, error) {
 	if msg == nil {
 		return nil, errors.New("message is nil")
 	}
@@ -122,7 +115,7 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *kernel.Message) (*k
 	return nil, err
 }
 
-func (p *defaultProducer) SendOneWay(ctx context.Context, msg *kernel.Message) error {
+func (p *defaultProducer) SendOneWay(ctx context.Context, msg *primitive.Message) error {
 	if msg == nil {
 		return errors.New("message is nil")
 	}
@@ -157,7 +150,7 @@ func (p *defaultProducer) SendOneWay(ctx context.Context, msg *kernel.Message) e
 	return err
 }
 
-func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg *kernel.Message) *remote.RemotingCommand {
+func (p *defaultProducer) buildSendRequest(mq *primitive.MessageQueue, msg *primitive.Message) *remote.RemotingCommand {
 	req := &kernel.SendMessageRequest{
 		ProducerGroup:  p.group,
 		Topic:          mq.Topic,
@@ -173,7 +166,7 @@ func (p *defaultProducer) buildSendRequest(mq *kernel.MessageQueue, msg *kernel.
 	return remote.NewRemotingCommand(kernel.ReqSendMessage, req, msg.Body)
 }
 
-func (p *defaultProducer) selectMessageQueue(topic string) *kernel.MessageQueue {
+func (p *defaultProducer) selectMessageQueue(topic string) *primitive.MessageQueue {
 	v, exist := p.publishInfo.Load(topic)
 
 	if !exist {
diff --git a/remote/codec.go b/internal/remote/codec.go
similarity index 100%
rename from remote/codec.go
rename to internal/remote/codec.go
diff --git a/remote/codec_test.go b/internal/remote/codec_test.go
similarity index 100%
rename from remote/codec_test.go
rename to internal/remote/codec_test.go
diff --git a/remote/remote_client.go b/internal/remote/remote_client.go
similarity index 100%
rename from remote/remote_client.go
rename to internal/remote/remote_client.go
diff --git a/remote/remote_client_test.go b/internal/remote/remote_client_test.go
similarity index 100%
rename from remote/remote_client_test.go
rename to internal/remote/remote_client_test.go
diff --git a/remote/rpchook.go b/internal/remote/rpchook.go
similarity index 100%
rename from remote/rpchook.go
rename to internal/remote/rpchook.go
diff --git a/primitive/consume.go b/primitive/consume.go
new file mode 100644
index 0000000..40e89b9
--- /dev/null
+++ b/primitive/consume.go
@@ -0,0 +1,128 @@
+package primitive
+
+// Message model defines the way how messages are delivered to each consumer clients.
+// </p>
+//
+// RocketMQ supports two message models: clustering and broadcasting. If clustering is set, consumer clients with
+// the same {@link #consumerGroup} would only consume shards of the messages subscribed, which achieves load
+// balances; Conversely, if the broadcasting is set, each consumer client will consume all subscribed messages
+// separately.
+// </p>
+//
+// This field defaults to clustering.
+type MessageModel int
+
+const (
+	BroadCasting MessageModel = iota
+	Clustering
+)
+
+func (mode MessageModel) String() string {
+	switch mode {
+	case BroadCasting:
+		return "BroadCasting"
+	case Clustering:
+		return "Clustering"
+	default:
+		return "Unknown"
+	}
+}
+
+// Consuming point on consumer booting.
+// </p>
+//
+// There are three consuming points:
+// <ul>
+// <li>
+// <code>CONSUME_FROM_LAST_OFFSET</code>: consumer clients pick up where it stopped previously.
+// If it were a newly booting up consumer client, according aging of the consumer group, there are two
+// cases:
+// <ol>
+// <li>
+// if the consumer group is created so recently that the earliest message being subscribed has yet
+// expired, which means the consumer group represents a lately launched business, consuming will
+// start from the very beginning;
+// </li>
+// <li>
+// if the earliest message being subscribed has expired, consuming will start from the latest
+// messages, meaning messages born prior to the booting timestamp would be ignored.
+// </li>
+// </ol>
+// </li>
+// <li>
+// <code>CONSUME_FROM_FIRST_OFFSET</code>: Consumer client will start from earliest messages available.
+// </li>
+// <li>
+// <code>CONSUME_FROM_TIMESTAMP</code>: Consumer client will start from specified timestamp, which means
+// messages born prior to {@link #consumeTimestamp} will be ignored
+// </li>
+// </ul>
+type ConsumeFromWhere int
+
+const (
+	ConsumeFromLastOffset ConsumeFromWhere = iota
+	ConsumeFromFirstOffset
+	ConsumeFromTimestamp
+)
+
+type ExpressionType string
+
+const (
+	/**
+	 * <ul>
+	 * Keywords:
+	 * <li>{@code AND, OR, NOT, BETWEEN, IN, TRUE, FALSE, IS, NULL}</li>
+	 * </ul>
+	 * <p/>
+	 * <ul>
+	 * Data type:
+	 * <li>Boolean, like: TRUE, FALSE</li>
+	 * <li>String, like: 'abc'</li>
+	 * <li>Decimal, like: 123</li>
+	 * <li>Float number, like: 3.1415</li>
+	 * </ul>
+	 * <p/>
+	 * <ul>
+	 * Grammar:
+	 * <li>{@code AND, OR}</li>
+	 * <li>{@code >, >=, <, <=, =}</li>
+	 * <li>{@code BETWEEN A AND B}, equals to {@code >=A AND <=B}</li>
+	 * <li>{@code NOT BETWEEN A AND B}, equals to {@code >B OR <A}</li>
+	 * <li>{@code IN ('a', 'b')}, equals to {@code ='a' OR ='b'}, this operation only support String type.</li>
+	 * <li>{@code IS NULL}, {@code IS NOT NULL}, check parameter whether is null, or not.</li>
+	 * <li>{@code =TRUE}, {@code =FALSE}, check parameter whether is true, or false.</li>
+	 * </ul>
+	 * <p/>
+	 * <p>
+	 * Example:
+	 * (a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)
+	 * </p>
+	 */
+	SQL92 = ExpressionType("SQL92")
+
+	/**
+	 * Only support or operation such as
+	 * "tag1 || tag2 || tag3", <br>
+	 * If null or * expression, meaning subscribe all.
+	 */
+	TAG = ExpressionType("TAG")
+)
+
+func IsTagType(exp string) bool {
+	if exp == "" || exp == "TAG" {
+		return true
+	}
+	return false
+}
+
+type MessageSelector struct {
+	Type       ExpressionType
+	Expression string
+}
+
+type ConsumeResult int
+
+const (
+	ConsumeSuccess ConsumeResult = iota
+	ConsumeRetryLater
+)
diff --git a/kernel/message.go b/primitive/message.go
similarity index 85%
rename from kernel/message.go
rename to primitive/message.go
index 0f65853..d15c8a3 100644
--- a/kernel/message.go
+++ b/primitive/message.go
@@ -15,9 +15,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package kernel
+package primitive
 
-import "fmt"
+import (
+	"fmt"
+	"github.com/apache/rocketmq-client-go/utils"
+)
 
 const (
 	PropertyKeySeparator                   = " "
@@ -118,3 +121,28 @@ func (msgExt *MessageExt) String() string {
 		msgExt.StoreTimestamp, msgExt.StoreHost, msgExt.CommitLogOffset, msgExt.BodyCRC, msgExt.ReconsumeTimes,
 		msgExt.PreparedTransactionOffset)
 }
+
+// MessageQueue message queue
+type MessageQueue struct {
+	Topic      string `json:"topic"`
+	BrokerName string `json:"brokerName"`
+	QueueId    int    `json:"queueId"`
+}
+
+func (mq *MessageQueue) String() string {
+	return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
+}
+
+func (mq *MessageQueue) HashCode() int {
+	result := 1
+	result = 31*result + utils.HashString(mq.BrokerName)
+	result = 31*result + mq.QueueId
+	result = 31*result + utils.HashString(mq.Topic)
+
+	return result
+}
+
+func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
+	// TODO
+	return true
+}
diff --git a/primitive/options.go b/primitive/options.go
new file mode 100644
index 0000000..b657ac0
--- /dev/null
+++ b/primitive/options.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+	"fmt"
+	"os"
+	"strconv"
+	"time"
+)
+
+type ProducerOptions struct {
+	ClientOption
+	NameServerAddr           string
+	GroupName                string
+	RetryTimesWhenSendFailed int
+	UnitMode                 bool
+}
+
+type ConsumerOption struct {
+	ClientOption
+	NameServerAddr string
+
+	/**
+	 * Backtracking consumption time with second precision. Time format is
+	 * 20131223171201<br>
+	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
+	 * Default backtracking consumption time Half an hour ago.
+	 */
+	ConsumeTimestamp string
+
+	// The socket timeout in milliseconds
+	ConsumerPullTimeout time.Duration
+
+	// Concurrently max span offset.it has no effect on sequential consumption
+	ConsumeConcurrentlyMaxSpan int
+
+	// Flow control threshold on queue level, each message queue will cache at most 1000 messages by default,
+	// Consider the {PullBatchSize}, the instantaneous value may exceed the limit
+	PullThresholdForQueue int64
+
+	// Limit the cached message size on queue level, each message queue will cache at most 100 MiB messages by default,
+	// Consider the {@code pullBatchSize}, the instantaneous value may exceed the limit
+	//
+	// The size of a message only measured by message body, so it's not accurate
+	PullThresholdSizeForQueue int
+
+	// Flow control threshold on topic level, default value is -1(Unlimited)
+	//
+	// The value of {@code pullThresholdForQueue} will be overwrote and calculated based on
+	// {@code pullThresholdForTopic} if it is't unlimited
+	//
+	// For example, if the value of pullThresholdForTopic is 1000 and 10 message queues are assigned to this consumer,
+	// then pullThresholdForQueue will be set to 100
+	PullThresholdForTopic int
+
+	// Limit the cached message size on topic level, default value is -1 MiB(Unlimited)
+	//
+	// The value of {@code pullThresholdSizeForQueue} will be overwrote and calculated based on
+	// {@code pullThresholdSizeForTopic} if it is't unlimited
+	//
+	// For example, if the value of pullThresholdSizeForTopic is 1000 MiB and 10 message queues are
+	// assigned to this consumer, then pullThresholdSizeForQueue will be set to 100 MiB
+	PullThresholdSizeForTopic int
+
+	// Message pull Interval
+	PullInterval time.Duration
+
+	// Batch consumption size
+	ConsumeMessageBatchMaxSize int
+
+	// Batch pull size
+	PullBatchSize int32
+
+	// Whether update subscription relationship when every pull
+	PostSubscriptionWhenPull bool
+
+	// Max re-consume times. -1 means 16 times.
+	//
+	// If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
+	// queue waiting.
+	MaxReconsumeTimes int
+
+	// Suspending pulling time for cases requiring slow pulling like flow-control scenario.
+	SuspendCurrentQueueTimeMillis time.Duration
+
+	// Maximum amount of time a message may block the consuming thread.
+	ConsumeTimeout time.Duration
+
+	ConsumerModel  MessageModel
+	Strategy       AllocateStrategy
+	ConsumeOrderly bool
+	FromWhere      ConsumeFromWhere
+	// TODO traceDispatcher
+}
+
+func (opt *ClientOption) ChangeInstanceNameToPID() {
+	if opt.InstanceName == "DEFAULT" {
+		opt.InstanceName = strconv.Itoa(os.Getegid())
+	}
+}
+
+func (opt *ClientOption) String() string {
+	return fmt.Sprintf("ClientOption [ClientIP=%s, InstanceName=%s, "+
+		"UnitMode=%v, UnitName=%s, VIPChannelEnabled=%v, UseTLS=%v]", opt.ClientIP,
+		opt.InstanceName, opt.UnitMode, opt.UnitName, opt.VIPChannelEnabled, opt.UseTLS)
+}
+
+type ClientOption struct {
+	NameServerAddr    string
+	ClientIP          string
+	InstanceName      string
+	UnitMode          bool
+	UnitName          string
+	VIPChannelEnabled bool
+	UseTLS            bool
+}
diff --git a/kernel/model.go b/primitive/result.go
similarity index 71%
rename from kernel/model.go
rename to primitive/result.go
index 8ed0f97..0c1e738 100644
--- a/kernel/model.go
+++ b/primitive/result.go
@@ -15,14 +15,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package kernel
+package primitive
 
 import (
 	"bytes"
 	"encoding/binary"
-	"encoding/json"
 	"fmt"
-	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
 )
 
@@ -209,85 +207,3 @@ func toMessages(messageExts []*MessageExt) []*Message {
 
 	return msgs
 }
-
-// MessageQueue message queue
-type MessageQueue struct {
-	Topic      string `json:"topic"`
-	BrokerName string `json:"brokerName"`
-	QueueId    int    `json:"queueId"`
-}
-
-func (mq *MessageQueue) String() string {
-	return fmt.Sprintf("MessageQueue [topic=%s, brokerName=%s, queueId=%d]", mq.Topic, mq.BrokerName, mq.QueueId)
-}
-
-func (mq *MessageQueue) HashCode() int {
-	result := 1
-	result = 31*result + utils.HashString(mq.BrokerName)
-	result = 31*result + mq.QueueId
-	result = 31*result + utils.HashString(mq.Topic)
-
-	return result
-}
-
-func (mq *MessageQueue) Equals(queue *MessageQueue) bool {
-	// TODO
-	return true
-}
-
-type FindBrokerResult struct {
-	BrokerAddr    string
-	Slave         bool
-	BrokerVersion int32
-}
-
-type (
-	// groupName of consumer
-	producerData string
-
-	consumeType string
-
-	ServiceState int
-)
-
-const (
-	StateCreateJust ServiceState = iota
-	StateStartFailed
-	StateRunning
-	StateShutdown
-)
-
-type SubscriptionData struct {
-	ClassFilterMode bool
-	Topic           string
-	SubString       string
-	Tags            map[string]bool
-	Codes           map[int32]bool
-	SubVersion      int64
-	ExpType         string
-}
-
-type consumerData struct {
-	GroupName         string              `json:"groupName"`
-	CType             consumeType         `json:"consumeType"`
-	MessageModel      string              `json:"messageModel"`
-	Where             string              `json:"consumeFromWhere"`
-	SubscriptionDatas []*SubscriptionData `json:"subscriptionDataSet"`
-	UnitMode          bool                `json:"unitMode"`
-}
-
-type heartbeatData struct {
-	ClientId      string         `json:"clientID"`
-	ProducerDatas []producerData `json:"producerDataSet"`
-	ConsumerDatas []consumerData `json:"consumerDataSet"`
-}
-
-func (data *heartbeatData) encode() []byte {
-	d, err := json.Marshal(data)
-	if err != nil {
-		rlog.Errorf("marshal heartbeatData error: %s", err.Error())
-		return nil
-	}
-	rlog.Info(string(d))
-	return d
-}
diff --git a/primitive/strategy.go b/primitive/strategy.go
new file mode 100644
index 0000000..56dfef1
--- /dev/null
+++ b/primitive/strategy.go
@@ -0,0 +1,117 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package primitive
+
+import (
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
+)
+
+// Strategy Algorithm for message allocating between consumers
+// An allocate strategy proxy for based on machine room nearside priority. An actual allocate strategy can be
+// specified.
+//
+// If any consumer is alive in a machine room, the message queue of the broker which is deployed in the same machine
+// should only be allocated to those. Otherwise, those message queues can be shared along all consumers since there are
+// no alive consumer to monopolize them.
+//
+// Average Hashing queue algorithm
+// Cycle average Hashing queue algorithm
+// Use Message Queue specified
+// Computer room Hashing queue algorithm, such as Alipay logic room
+// Consistent Hashing queue algorithm
+
+type AllocateStrategy func(string, string, []*MessageQueue, []string) []*MessageQueue
+
+func AllocateByAveragely(consumerGroup, currentCID string, mqAll []*MessageQueue,
+	cidAll []string) []*MessageQueue {
+	if currentCID == "" || utils.IsArrayEmpty(mqAll) || utils.IsArrayEmpty(cidAll) {
+		return nil
+	}
+	var (
+		find  bool
+		index int
+	)
+
+	for idx := range cidAll {
+		if cidAll[idx] == currentCID {
+			find = true
+			index = idx
+			break
+		}
+	}
+	if !find {
+		rlog.Infof("[BUG] ConsumerGroup=%s, ConsumerId=%s not in cidAll:%+v", consumerGroup, currentCID, cidAll)
+		return nil
+	}
+
+	mqSize := len(mqAll)
+	cidSize := len(cidAll)
+	mod := mqSize % cidSize
+
+	var averageSize int
+	if mqSize <= cidSize {
+		averageSize = 1
+	} else {
+		if mod > 0 && index < mod {
+			averageSize = mqSize/cidSize + 1
+		} else {
+			averageSize = mqSize / cidSize
+		}
+	}
+
+	var startIndex int
+	if mod > 0 && index < mod {
+		startIndex = index * averageSize
+	} else {
+		startIndex = index*averageSize + mod
+	}
+
+	num := utils.MinInt(averageSize, mqSize-startIndex)
+	result := make([]*MessageQueue, num)
+	for i := 0; i < num; i++ {
+		result[i] = mqAll[(startIndex+i)%mqSize]
+	}
+	return result
+}
+
+// TODO
+func AllocateByMachineNearby(consumerGroup, currentCID string, mqAll []*MessageQueue,
+	cidAll []string) []*MessageQueue {
+	return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByAveragelyCircle(consumerGroup, currentCID string, mqAll []*MessageQueue,
+	cidAll []string) []*MessageQueue {
+	return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByConfig(consumerGroup, currentCID string, mqAll []*MessageQueue,
+	cidAll []string) []*MessageQueue {
+	return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByMachineRoom(consumerGroup, currentCID string, mqAll []*MessageQueue,
+	cidAll []string) []*MessageQueue {
+	return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}
+
+func AllocateByConsistentHash(consumerGroup, currentCID string, mqAll []*MessageQueue,
+	cidAll []string) []*MessageQueue {
+	return AllocateByAveragely(consumerGroup, currentCID, mqAll, cidAll)
+}


Mime
View raw message