rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: [ISSUE #105] Add PullConsumer. resolve #105 (#106)
Date Wed, 10 Jul 2019 06:58:17 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new e673b0e  [ISSUE #105] Add PullConsumer. resolve #105 (#106)
e673b0e is described below

commit e673b0ef82247ddc9105b6c2d9889163b300a226
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Wed Jul 10 14:58:13 2019 +0800

    [ISSUE #105] Add PullConsumer. resolve #105 (#106)
    
    * add PullConsumer. resolve #105
    
    * fix code review.
---
 api.go                           |   8 +-
 consumer/consumer.go             | 166 ++++++++++++++++++++++++++-----
 consumer/option.go               |  10 +-
 consumer/pull_consumer.go        | 206 +++++++++++++++++++++++++--------------
 consumer/push_consumer.go        |  36 +++----
 examples/consumer/pull/main.go   |  65 ++++++++++++
 examples/consumer/simple/main.go |   2 +-
 internal/client.go               |   9 +-
 internal/remote/future.go        |   4 +-
 internal/remote/remote_client.go |   6 --
 utils/errors.go                  |  10 ++
 11 files changed, 388 insertions(+), 134 deletions(-)

diff --git a/api.go b/api.go
index ecd6ab8..eec98bf 100644
--- a/api.go
+++ b/api.go
@@ -54,7 +54,11 @@ type PullConsumer interface {
 	Shutdown() error
 	Pull(context.Context, string, consumer.MessageSelector, int) (*primitive.PullResult, error)
 	PullFrom(context.Context, *primitive.MessageQueue, int64, int) (*primitive.PullResult, error)
-	// only update in memory
-	UpdateOffset(primitive.MessageQueue, int64) error
+	CurrentOffset(*primitive.MessageQueue) (int64, error)
+	UpdateOffset(*primitive.MessageQueue, int64) error
 	PersistOffset(context.Context) error
 }
+
+func NewPullConsumer(opts ...consumer.Option) (PullConsumer, error) {
+	return consumer.NewPullConsumer(opts...)
+}
diff --git a/consumer/consumer.go b/consumer/consumer.go
index 5bd9674..cff84ba 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -18,13 +18,13 @@ limitations under the License.
 package consumer
 
 import (
+	"context"
 	"encoding/json"
 	"fmt"
 	"sort"
 	"strconv"
 	"strings"
 	"sync"
-	"sync/atomic"
 	"time"
 
 	"github.com/apache/rocketmq-client-go/internal"
@@ -32,6 +32,7 @@ import (
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/pkg/errors"
 	"github.com/tidwall/gjson"
 )
 
@@ -64,6 +65,11 @@ const (
 	_SubAll = "*"
 )
 
+var(
+	ErrCreated = errors.New("consumer group has been created")
+	ErrBrokerNotFound = errors.New("broker can not found")
+)
+
 // Message model defines the way how messages are delivered to each consumer clients.
 // </p>
 //
@@ -247,11 +253,42 @@ type defaultConsumer struct {
 	prCh chan PullRequest
 }
 
-func (dc *defaultConsumer) persistConsumerOffset() {
+func (dc *defaultConsumer) start() error {
+
+	if dc.model == Clustering {
+		// set retry topic
+		retryTopic := internal.GetRetryTopic(dc.consumerGroup)
+		sub := buildSubscriptionData(retryTopic, MessageSelector{TAG, _SubAll})
+		dc.subscriptionDataTable.Store(retryTopic, sub)
+	}
+
+	dc.client = internal.GetOrNewRocketMQClient(dc.option.ClientOptions)
+	if dc.model == Clustering {
+		dc.option.ChangeInstanceNameToPID()
+		dc.storage = NewRemoteOffsetStore(dc.consumerGroup, dc.client)
+	} else {
+		dc.storage = NewLocalFileOffsetStore(dc.consumerGroup, dc.client.ClientID())
+	}
+
+	dc.client.UpdateTopicRouteInfo()
+	dc.client.Start()
+	dc.state = internal.StateRunning
+
+	return nil
+}
+
+func (dc *defaultConsumer) shutdown() error {
+	dc.state = internal.StateRunning
+	dc.client.Shutdown()
+
+	return nil
+}
+
+
+func (dc *defaultConsumer) persistConsumerOffset() error {
 	err := dc.makeSureStateOK()
 	if err != nil {
-		rlog.Errorf("consumer state error: %s", err.Error())
-		return
+		return err
 	}
 	mqs := make([]*primitive.MessageQueue, 0)
 	dc.processQueueTable.Range(func(key, value interface{}) bool {
@@ -259,6 +296,22 @@ func (dc *defaultConsumer) persistConsumerOffset() {
 		return true
 	})
 	dc.storage.persist(mqs)
+	return nil
+}
+
+func (c *defaultConsumer) updateOffset(queue *primitive.MessageQueue, offset int64) error
{
+	c.storage.update(queue, offset, false)
+	return nil
+}
+
+func (dc *defaultConsumer) subscriptionAutomatically(topic string) {
+	_, exist := dc.subscriptionDataTable.Load(topic)
+	if !exist {
+		s := MessageSelector{
+			Expression: _SubAll,
+		}
+		dc.subscriptionDataTable.Store(topic, buildSubscriptionData(topic, s))
+	}
 }
 
 func (dc *defaultConsumer) updateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
{
@@ -671,6 +724,89 @@ func (dc *defaultConsumer) computePullFromWhere(mq *primitive.MessageQueue)
int6
 	return result
 }
 
+func (dc *defaultConsumer) pullInner(ctx context.Context, queue *primitive.MessageQueue,
data *internal.SubscriptionData,
+	offset int64, numbers int, sysFlag int32, commitOffsetValue int64) (*primitive.PullResult,
error) {
+
+	brokerResult := tryFindBroker(queue)
+	if brokerResult == nil {
+		rlog.Warnf("no broker found for %s", queue.String())
+		return nil, ErrBrokerNotFound
+	}
+
+	if brokerResult.Slave {
+		sysFlag = clearCommitOffsetFlag(sysFlag)
+	}
+
+	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0
{
+		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message
by %v",
+			queue.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	}
+
+	pullRequest := &internal.PullMessageRequest{
+		ConsumerGroup: dc.consumerGroup,
+		Topic:         queue.Topic,
+		QueueId:       int32(queue.QueueId),
+		QueueOffset:   offset,
+		MaxMsgNums:    int32(numbers),
+		SysFlag:       sysFlag,
+		CommitOffset:  commitOffsetValue,
+		// TODO: 和java对齐
+		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
+		SubExpression:        data.SubString,
+		// TODO: add subversion
+		ExpressionType: string(data.ExpType),
+	}
+
+	if data.ExpType == string(TAG) {
+		pullRequest.SubVersion = 0
+	} else {
+		pullRequest.SubVersion = data.SubVersion
+	}
+
+	// TODO: add computPullFromWhichFilterServer
+
+	return dc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
+}
+
+func (dc *defaultConsumer) processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult,
data *internal.SubscriptionData) {
+
+	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
+
+	switch result.Status {
+	case primitive.PullFound:
+		result.SetMessageExts(primitive.DecodeMessage(result.GetBody()))
+		msgs := result.GetMessageExts()
+
+		// filter message according to tags
+		msgListFilterAgain := msgs
+		if len(data.Tags) > 0 && data.ClassFilterMode {
+			msgListFilterAgain = make([]*primitive.MessageExt, len(msgs))
+			for _, msg := range msgs {
+				_, exist := data.Tags[msg.GetTags()]
+				if exist {
+					msgListFilterAgain = append(msgListFilterAgain, msg)
+				}
+			}
+		}
+
+		// TODO: add filter message hook
+		for _, msg := range msgListFilterAgain {
+			traFlag, _ := strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
+			if traFlag {
+				msg.TransactionId = msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
+			}
+
+			if msg.Properties == nil {
+				msg.Properties = make(map[string]string)
+			}
+			msg.Properties[primitive.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
+			msg.Properties[primitive.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
+		}
+
+		result.SetMessageExts(msgListFilterAgain)
+	}
+}
+
 func (dc *defaultConsumer) findConsumerList(topic string) []string {
 	brokerAddr := internal.FindBrokerAddrByTopic(topic)
 	if brokerAddr == "" {
@@ -728,6 +864,10 @@ func (dc *defaultConsumer) queryMaxOffset(mq *primitive.MessageQueue)
(int64, er
 	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
+func (dc *defaultConsumer) queryOffset(mq *primitive.MessageQueue) (int64) {
+	return dc.storage.read(mq, _ReadMemoryThenStore)
+}
+
 // SearchOffsetByTimestamp with specific queueId and topic
 func (dc *defaultConsumer) searchOffsetByTimestamp(mq *primitive.MessageQueue, timestamp
int64) (int64, error) {
 	brokerAddr := internal.FindBrokerAddrByName(mq.BrokerName)
@@ -786,24 +926,6 @@ func buildSubscriptionData(topic string, selector MessageSelector) *internal.Sub
 	return subData
 }
 
-func getNextQueueOf(topic string) *primitive.MessageQueue {
-	queues, err := internal.FetchSubscribeMessageQueues(topic)
-	if err != nil && len(queues) > 0 {
-		rlog.Error(err.Error())
-		return nil
-	}
-	var index int64
-	v, exist := queueCounterTable.Load(topic)
-	if !exist {
-		index = -1
-		queueCounterTable.Store(topic, 0)
-	} else {
-		index = v.(int64)
-	}
-
-	return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
-}
-
 func buildSysFlag(commitOffset, suspend, subscription, classFilter bool) int32 {
 	var flag int32 = 0
 	if commitOffset {
diff --git a/consumer/option.go b/consumer/option.go
index 2a826e8..67bcaec 100644
--- a/consumer/option.go
+++ b/consumer/option.go
@@ -113,6 +113,14 @@ func defaultPushConsumerOptions() consumerOptions {
 
 type Option func(*consumerOptions)
 
+func defaultPullConsumerOptions() consumerOptions {
+	opts := consumerOptions{
+		ClientOptions: internal.DefaultClientOptions(),
+	}
+	opts.ClientOptions.GroupName = "DEFAULT_CONSUMER"
+	return opts
+}
+
 func WithConsumerModel(m MessageModel) Option {
 	return func(options *consumerOptions) {
 		options.ConsumerModel = m
@@ -173,4 +181,4 @@ func WithRetry(retries int) Option {
 	return func(opts *consumerOptions) {
 		opts.RetryTimes = retries
 	}
-}
+}
\ No newline at end of file
diff --git a/consumer/pull_consumer.go b/consumer/pull_consumer.go
index c8bcdc5..19e2369 100644
--- a/consumer/pull_consumer.go
+++ b/consumer/pull_consumer.go
@@ -19,19 +19,38 @@ package consumer
 
 import (
 	"context"
-	"errors"
 	"fmt"
-	"strconv"
 	"sync"
+	"sync/atomic"
 
 	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
+	"github.com/pkg/errors"
 )
 
 type PullConsumer interface {
+	// Start
 	Start()
+
+	// Shutdown refuse all new pull operation, finish all submitted.
 	Shutdown()
+
+	// Pull pull message of topic,  selector indicate which queue to pull.
 	Pull(ctx context.Context, topic string, selector MessageSelector, numbers int) (*primitive.PullResult,
error)
+
+	// PullFrom pull messages of queue from the offset to offset + numbers
+	PullFrom(ctx context.Context, queue *primitive.MessageQueue, offset int64, numbers int)
(*primitive.PullResult, error)
+
+	// updateOffset update offset of queue in mem
+	UpdateOffset(queue *primitive.MessageQueue, offset int64) error
+
+	// PersistOffset persist all offset in mem.
+	PersistOffset(ctx context.Context) error
+
+	// CurrentOffset return the current offset of queue in mem.
+	CurrentOffset(queue *primitive.MessageQueue) (int64, error)
 }
 
 var (
@@ -39,20 +58,60 @@ var (
 )
 
 type defaultPullConsumer struct {
-	state     internal.ServiceState
+	*defaultConsumer
+
 	option    consumerOptions
 	client    internal.RMQClient
 	GroupName string
 	Model     MessageModel
 	UnitMode  bool
+
+	interceptor primitive.Interceptor
 }
 
-func (c *defaultPullConsumer) Start() {
+func NewPullConsumer(options ...Option) (*defaultPullConsumer, error) {
+	defaultOpts := defaultPullConsumerOptions()
+	for _, apply := range options {
+		apply(&defaultOpts)
+	}
+
+	srvs, err := internal.NewNamesrv(defaultOpts.NameServerAddrs...)
+	if err != nil {
+		return nil, errors.Wrap(err, "new Namesrv failed.")
+	}
+	internal.RegisterNamsrv(srvs)
+
+	dc := &defaultConsumer{
+		consumerGroup: defaultOpts.GroupName,
+		cType:         _PullConsume,
+		state:         internal.StateCreateJust,
+		prCh:          make(chan PullRequest, 4),
+		model:         defaultOpts.ConsumerModel,
+		option:        defaultOpts,
+	}
+
+	c := &defaultPullConsumer{
+		defaultConsumer: dc,
+	}
+	return c, nil
+}
+
+func (c *defaultPullConsumer) Start() error {
 	c.state = internal.StateRunning
+
+	var err error
+	c.once.Do(func() {
+		err = c.start()
+		if err != nil {
+			return
+		}
+	})
+
+	return err
 }
 
 func (c *defaultPullConsumer) Pull(ctx context.Context, topic string, selector MessageSelector,
numbers int) (*primitive.PullResult, error) {
-	mq := getNextQueueOf(topic)
+	mq := c.getNextQueueOf(topic)
 	if mq == nil {
 		return nil, fmt.Errorf("prepard to pull topic: %s, but no queue is founded", topic)
 	}
@@ -64,10 +123,28 @@ func (c *defaultPullConsumer) Pull(ctx context.Context, topic string,
selector M
 		return nil, err
 	}
 
-	processPullResult(mq, result, data)
+	c.processPullResult(mq, result, data)
 	return result, nil
 }
 
+func (c *defaultPullConsumer) getNextQueueOf(topic string) *primitive.MessageQueue {
+	queues, err := internal.FetchSubscribeMessageQueues(topic)
+	if err != nil && len(queues) > 0 {
+		rlog.Error(err.Error())
+		return nil
+	}
+	var index int64
+	v, exist := queueCounterTable.Load(topic)
+	if !exist {
+		index = -1
+		queueCounterTable.Store(topic, 0)
+	} else {
+		index = v.(int64)
+	}
+
+	return queues[int(atomic.AddInt64(&index, 1))%len(queues)]
+}
+
 // SubscribeWithChan ack manually
 func (c *defaultPullConsumer) SubscribeWithChan(topic, selector MessageSelector) (chan *primitive.Message,
error) {
 	return nil, nil
@@ -83,62 +160,46 @@ func (c *defaultPullConsumer) ACK(msg *primitive.Message, result ConsumeResult)
 
 }
 
-func (c *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQueue, data
*internal.SubscriptionData,
-	offset int64, numbers int) (*primitive.PullResult, error) {
+func (c *defaultConsumer) checkPull(ctx context.Context, mq *primitive.MessageQueue, offset
int64, numbers int) error {
 	err := c.makeSureStateOK()
 	if err != nil {
-		return nil, err
+		return err
 	}
 
 	if mq == nil {
-		return nil, errors.New("MessageQueue is nil")
+		return utils.ErrMQEmpty
 	}
 
 	if offset < 0 {
-		return nil, errors.New("offset < 0")
+		return utils.ErrOffset
 	}
 
 	if numbers <= 0 {
-		numbers = 1
+		return utils.ErrNumbers
 	}
-	c.subscriptionAutomatically(mq.Topic)
+	return nil
+}
 
-	brokerResult := tryFindBroker(mq)
-	if brokerResult == nil {
-		return nil, fmt.Errorf("the broker %s does not exist", mq.BrokerName)
-	}
+// TODO: add timeout limit
+// TODO: add hook
+func (c *defaultPullConsumer) pull(ctx context.Context, mq *primitive.MessageQueue, data
*internal.SubscriptionData,
+	offset int64, numbers int) (*primitive.PullResult, error) {
 
-	if (data.ExpType == string(TAG)) && brokerResult.BrokerVersion < internal.V4_1_0
{
-		return nil, fmt.Errorf("the broker [%s, %v] does not upgrade to support for filter message
by %v",
-			mq.BrokerName, brokerResult.BrokerVersion, data.ExpType)
+	if err := c.checkPull(ctx, mq, offset, numbers); err != nil {
+		return nil, err
 	}
 
-	sysFlag := buildSysFlag(false, true, true, false)
+	c.subscriptionAutomatically(mq.Topic)
 
-	if brokerResult.Slave {
-		sysFlag = clearCommitOffsetFlag(sysFlag)
-	}
-	pullRequest := &internal.PullMessageRequest{
-		ConsumerGroup:        c.GroupName,
-		Topic:                mq.Topic,
-		QueueId:              int32(mq.QueueId),
-		QueueOffset:          offset,
-		MaxMsgNums:           int32(numbers),
-		SysFlag:              sysFlag,
-		CommitOffset:         0,
-		SuspendTimeoutMillis: _BrokerSuspendMaxTime,
-		SubExpression:        data.SubString,
-		ExpressionType:       string(data.ExpType),
-	}
+	sysFlag := buildSysFlag(false, true, true, false)
 
-	if data.ExpType == string(TAG) {
-		pullRequest.SubVersion = 0
-	} else {
-		pullRequest.SubVersion = data.SubVersion
+	pullResp, err := c.pullInner(ctx, mq, data, offset, numbers, sysFlag, 0)
+	if err != nil {
+		return nil, err
 	}
+	c.processPullResult(mq, pullResp, data)
 
-	// TODO computePullFromWhichFilterServer
-	return c.client.PullMessage(ctx, brokerResult.BrokerAddr, pullRequest)
+	return pullResp, err
 }
 
 func (c *defaultPullConsumer) makeSureStateOK() error {
@@ -148,42 +209,39 @@ func (c *defaultPullConsumer) makeSureStateOK() error {
 	return nil
 }
 
-func (c *defaultPullConsumer) subscriptionAutomatically(topic string) {
-	// TODO
+func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 {
+	return c.computePullFromWhere(queue)
 }
 
-func (c *defaultPullConsumer) nextOffsetOf(queue *primitive.MessageQueue) int64 {
-	return 0
-}
-
-func processPullResult(mq *primitive.MessageQueue, result *primitive.PullResult, data *internal.SubscriptionData)
{
-	updatePullFromWhichNode(mq, result.SuggestWhichBrokerId)
-	switch result.Status {
-	case primitive.PullFound:
-		msgs := result.GetMessageExts()
-		msgListFilterAgain := msgs
-		if len(data.Tags) > 0 && data.ClassFilterMode {
-			msgListFilterAgain = make([]*primitive.MessageExt, len(msgs))
-			for _, msg := range msgs {
-				_, exist := data.Tags[msg.GetTags()]
-				if exist {
-					msgListFilterAgain = append(msgListFilterAgain, msg)
-				}
-			}
-		}
+// PullFrom pull messages of queue from the offset to offset + numbers
+func (c *defaultPullConsumer) PullFrom(ctx context.Context, queue *primitive.MessageQueue,
offset int64, numbers int) (*primitive.PullResult, error) {
+	if err := c.checkPull(ctx, queue, offset, numbers); err != nil {
+		return nil, err
+	}
+
+	selector := MessageSelector{}
+	data := buildSubscriptionData(queue.Topic, selector)
 
-		// TODO hook
+	return c.pull(ctx, queue, data, offset, numbers)
+}
 
-		for _, msg := range msgListFilterAgain {
-			traFlag, _ := strconv.ParseBool(msg.Properties[primitive.PropertyTransactionPrepared])
-			if traFlag {
-				msg.TransactionId = msg.Properties[primitive.PropertyUniqueClientMessageIdKeyIndex]
-			}
+// updateOffset update offset of queue in mem
+func (c *defaultPullConsumer) UpdateOffset(queue *primitive.MessageQueue, offset int64) error
{
+	return c.updateOffset(queue, offset)
+}
 
-			msg.Properties[primitive.PropertyMinOffset] = strconv.FormatInt(result.MinOffset, 10)
-			msg.Properties[primitive.PropertyMaxOffset] = strconv.FormatInt(result.MaxOffset, 10)
-		}
+// PersistOffset persist all offset in mem.
+func (c *defaultPullConsumer) PersistOffset(ctx context.Context) error {
+	return c.persistConsumerOffset()
+}
 
-		result.SetMessageExts(msgListFilterAgain)
-	}
+// CurrentOffset return the current offset of queue in mem.
+func (c *defaultPullConsumer) CurrentOffset(queue *primitive.MessageQueue) (int64, error)
{
+	v := c.queryOffset(queue)
+	return v, nil
+}
+
+// Shutdown close defaultConsumer, refuse new request.
+func (c *defaultPullConsumer) Shutdown() error {
+	return c.defaultConsumer.shutdown()
 }
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index c5641a5..9000651 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -126,20 +126,18 @@ func (pc *pushConsumer) Start() error {
 		pc.state = internal.StateStartFailed
 		pc.validate()
 
-		if pc.model == Clustering {
-			// set retry topic
-			retryTopic := internal.GetRetryTopic(pc.consumerGroup)
-			pc.subscriptionDataTable.Store(retryTopic, buildSubscriptionData(retryTopic,
-				MessageSelector{TAG, _SubAll}))
+		err = pc.defaultConsumer.start()
+		if err != nil {
+			return
 		}
 
-		pc.client = internal.GetOrNewRocketMQClient(pc.option.ClientOptions)
-		if pc.model == Clustering {
-			pc.option.ChangeInstanceNameToPID()
-			pc.storage = NewRemoteOffsetStore(pc.consumerGroup, pc.client)
-		} else {
-			pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, pc.client.ClientID())
+		err := pc.client.RegisterConsumer(pc.consumerGroup, pc)
+		if err != nil {
+			pc.state = internal.StateStartFailed
+			rlog.Errorf("the consumer group: [%s] has been created, specify another name.", pc.consumerGroup)
+			err = ErrCreated
 		}
+
 		go func() {
 			// todo start clean msg expired
 			// TODO quit
@@ -151,16 +149,6 @@ func (pc *pushConsumer) Start() error {
 			}
 		}()
 
-		err = pc.client.RegisterConsumer(pc.consumerGroup, pc)
-		if err != nil {
-			pc.state = internal.StateCreateJust
-			rlog.Errorf("the consumer group: [%s] has been created, specify another name.", pc.consumerGroup)
-			err = errors.New("consumer group has been created")
-			return
-		}
-		pc.client.UpdateTopicRouteInfo()
-		pc.client.Start()
-		pc.state = internal.StateRunning
 	})
 
 	pc.client.UpdateTopicRouteInfo()
@@ -201,8 +189,8 @@ func (pc *pushConsumer) Rebalance() {
 	pc.defaultConsumer.doBalance()
 }
 
-func (pc *pushConsumer) PersistConsumerOffset() {
-	pc.defaultConsumer.persistConsumerOffset()
+func (pc *pushConsumer) PersistConsumerOffset() error {
+	return pc.defaultConsumer.persistConsumerOffset()
 }
 
 func (pc *pushConsumer) UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
{
@@ -651,7 +639,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 					msgs := req.([]*primitive.MessageExt)
 					r, e := pc.consume(ctx, msgs...)
 
-					realReply := reply.(ConsumeResultHolder)
+					realReply := reply.(*ConsumeResultHolder)
 					realReply.ConsumeResult = r
 					return e
 				})
diff --git a/examples/consumer/pull/main.go b/examples/consumer/pull/main.go
new file mode 100644
index 0000000..4076450
--- /dev/null
+++ b/examples/consumer/pull/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/consumer"
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
+)
+
+func main() {
+	c, err := consumer.NewPullConsumer(consumer.WithGroupName("testGroup"), consumer.WithNameServer([]string{"127.0.0.1:9876"}))
+	if err != nil{
+		rlog.Fatal("fail to new pullConsumer: ", err)
+	}
+	c.Start()
+
+	ctx := context.Background()
+	queue := &primitive.MessageQueue{
+		Topic:      "TopicTest",
+		BrokerName: "", // replace with your broker name. otherwise, pull will failed.
+		QueueId:    0,
+	}
+
+	offset := int64(0)
+	for {
+		resp, err := c.PullFrom(ctx, queue, offset, 10)
+		if err != nil {
+			if err == utils.ErrRequestTimeout {
+				fmt.Printf("timeout \n")
+				time.Sleep(1 *time.Second)
+				continue
+			}
+			fmt.Printf("unexpectable err: %v \n", err)
+			return
+		}
+		if resp.Status == primitive.PullFound {
+			fmt.Printf("pull message success. nextOffset: %d \n", resp.NextBeginOffset)
+			for _, msg := range resp.GetMessageExts() {
+				fmt.Printf("pull msg: %v \n", msg)
+			}
+		}
+		offset = resp.NextBeginOffset
+	}
+}
diff --git a/examples/consumer/simple/main.go b/examples/consumer/simple/main.go
index 43947f9..2ddc465 100644
--- a/examples/consumer/simple/main.go
+++ b/examples/consumer/simple/main.go
@@ -33,7 +33,7 @@ func main() {
 		consumer.WithGroupName("testGroup"),
 		consumer.WithNameServer([]string{"127.0.0.1:9876"}),
 	)
-	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx context.Context,
+	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
 		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
 		fmt.Printf("subscribe callback: %v \n", msgs)
 		return consumer.ConsumeSuccess, nil
diff --git a/internal/client.go b/internal/client.go
index 1db43c7..09b7ca3 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -76,7 +76,7 @@ type InnerProducer interface {
 }
 
 type InnerConsumer interface {
-	PersistConsumerOffset()
+	PersistConsumerOffset() error
 	UpdateTopicSubscribeInfo(topic string, mqs []*primitive.MessageQueue)
 	IsSubscribeTopicNeedUpdate(topic string) bool
 	SubscriptionDataList() []*SubscriptionData
@@ -217,7 +217,10 @@ func (c *rmqClient) Start() {
 			for !c.close {
 				c.consumerMap.Range(func(key, value interface{}) bool {
 					consumer := value.(InnerConsumer)
-					consumer.PersistConsumerOffset()
+					err := consumer.PersistConsumerOffset()
+					if err != nil {
+						rlog.Errorf("persist offset failed. err: %v", err)
+					}
 					return true
 				})
 				time.Sleep(_PersistOffset)
@@ -305,7 +308,7 @@ func (c *rmqClient) SendHeartbeatToAllBrokerWithLock() {
 	hbData.ProducerDatas = pData
 	hbData.ConsumerDatas = cData
 	if len(pData) == 0 && len(cData) == 0 {
-		rlog.Info("sending heartbeat, but no consumer and no consumer")
+		rlog.Info("sending heartbeat, but no producer and no consumer")
 		return
 	}
 	brokerAddressesMap.Range(func(key, value interface{}) bool {
diff --git a/internal/remote/future.go b/internal/remote/future.go
index 93990e5..5a1c724 100644
--- a/internal/remote/future.go
+++ b/internal/remote/future.go
@@ -20,6 +20,8 @@ package remote
 import (
 	"sync"
 	"time"
+
+	"github.com/apache/rocketmq-client-go/utils"
 )
 
 // ResponseFuture
@@ -71,7 +73,7 @@ func (r *ResponseFuture) waitResponse() (*RemotingCommand, error) {
 			cmd, err = r.ResponseCommand, r.Err
 			goto done
 		case <-timer.C:
-			err = ErrRequestTimeout
+			err = utils.ErrRequestTimeout
 			r.Err = err
 			goto done
 		}
diff --git a/internal/remote/remote_client.go b/internal/remote/remote_client.go
index 1d0330a..8e32216 100644
--- a/internal/remote/remote_client.go
+++ b/internal/remote/remote_client.go
@@ -20,7 +20,6 @@ import (
 	"bufio"
 	"bytes"
 	"encoding/binary"
-	"errors"
 	"io"
 	"net"
 	"sync"
@@ -29,11 +28,6 @@ import (
 	"github.com/apache/rocketmq-client-go/rlog"
 )
 
-var (
-	//ErrRequestTimeout for request timeout error
-	ErrRequestTimeout = errors.New("request timeout")
-)
-
 type ClientRequestFunc func(*RemotingCommand) *RemotingCommand
 
 type TcpOption struct {
diff --git a/utils/errors.go b/utils/errors.go
index a3c7ead..507d7bb 100644
--- a/utils/errors.go
+++ b/utils/errors.go
@@ -19,6 +19,16 @@ package utils
 
 import (
 	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/pkg/errors"
+)
+
+var(
+	// ErrRequestTimeout for request timeout error
+	ErrRequestTimeout = errors.New("request timeout")
+
+	ErrMQEmpty = errors.New("MessageQueue is nil")
+	ErrOffset  = errors.New("offset < 0")
+	ErrNumbers = errors.New("numbers < 0")
 )
 
 func CheckError(action string, err error) {


Mime
View raw message