rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: Add OffsetStore for Consumer (#49)
Date Tue, 30 Apr 2019 14:08:18 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new e6de4b4  Add OffsetStore for Consumer (#49)
e6de4b4 is described below

commit e6de4b44f90d52053316c59359b32b3b679babf1
Author: wenfeng <sxian.wang@gmail.com>
AuthorDate: Tue Apr 30 22:08:13 2019 +0800

    Add OffsetStore for Consumer (#49)
    
    * add impl of LocalOffsetStoreage
    
    * add impl of RemoteBrokerStore
    
    * fix offset bugs
---
 consumer/consumer.go                               |  36 ++-
 .../producer/main.go => consumer/consumer_test.go  |  32 +--
 consumer/offset_store.go                           | 302 ++++++++++++++++++++-
 consumer/process_queue.go                          |  16 +-
 consumer/push_consumer.go                          |  29 +-
 examples/producer/main.go                          |   2 +-
 kernel/client.go                                   | 140 ++++++----
 kernel/request.go                                  |  59 +++-
 kernel/route.go                                    |   2 +-
 remote/remote_client.go                            |   2 +-
 remote/remote_client_test.go                       |   2 +-
 examples/producer/main.go => utils/errors.go       |  29 +-
 utils/files.go                                     |  65 +++++
 13 files changed, 565 insertions(+), 151 deletions(-)

diff --git a/consumer/consumer.go b/consumer/consumer.go
index 60d3dd5..f128f66 100644
--- a/consumer/consumer.go
+++ b/consumer/consumer.go
@@ -193,6 +193,14 @@ func (pr *PullRequest) String() string {
 
 type ConsumerOption struct {
 	kernel.ClientOption
+	/**
+	 * Backtracking consumption time with second precision. Time format is
+	 * 20131223171201<br>
+	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
+	 * Default backtracking consumption time Half an hour ago.
+	 */
+	ConsumeTimestamp string
+
 	// The socket timeout in milliseconds
 	ConsumerPullTimeout time.Duration
 
@@ -549,7 +557,7 @@ func (dc *defaultConsumer) doUnlock(addr string, body *lockBatchRequestBody,
one
 	data, _ := json.Marshal(body)
 	request := remote.NewRemotingCommand(kernel.ReqUnlockBatchMQ, nil, data)
 	if oneway {
-		err := remote.InvokeOneWay(addr, request)
+		err := remote.InvokeOneWay(addr, request, 3*time.Second)
 		if err != nil {
 			rlog.Errorf("lock mq to broker with oneway: %s error %s", addr, err.Error())
 		}
@@ -581,6 +589,7 @@ func (dc *defaultConsumer) buildProcessQueueTableByBrokerName() map[string][]*ke
 	return result
 }
 
+// TODO 问题不少 需要再好好对一下
 func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs []*kernel.MessageQueue)
bool {
 	var changed bool
 	mqSet := make(map[*kernel.MessageQueue]bool)
@@ -614,6 +623,10 @@ func (dc *defaultConsumer) updateProcessQueueTable(topic string, mqs
[]*kernel.M
 
 	if dc.cType == _PushConsume {
 		for mq := range mqSet {
+			_, exist := dc.processQueueTable.Load(mq)
+			if exist {
+				continue
+			}
 			if dc.consumeOrderly && !dc.lock(mq) {
 				rlog.Warnf("do defaultConsumer, Group:%s add a new mq failed, %s, because lock failed",
 					dc.consumerGroup, mq.String())
@@ -669,13 +682,17 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue)
int64 {
 		case ConsumeFromLastOffset:
 			if lastOffset == -1 {
 				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
-					lastOffset, err := kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+					lastOffset = 0
+				} else {
+					lastOffset, err := kernel.QueryMaxOffset(mq)
 					if err == nil {
 						result = lastOffset
 					} else {
 						rlog.Warnf("query max offset of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
 					}
 				}
+			} else {
+				result = -1
 			}
 		case ConsumeFromFirstOffset:
 			if lastOffset == -1 {
@@ -684,14 +701,25 @@ func (dc *defaultConsumer) computePullFromWhere(mq *kernel.MessageQueue)
int64 {
 		case ConsumeFromTimestamp:
 			if lastOffset == -1 {
 				if strings.HasPrefix(mq.Topic, kernel.RetryGroupTopicPrefix) {
-					lastOffset, err := kernel.QueryMaxOffset(mq.Topic, mq.QueueId)
+					lastOffset, err := kernel.QueryMaxOffset(mq)
 					if err == nil {
 						result = lastOffset
 					} else {
+						result = -1
 						rlog.Warnf("query max offset of: [%s:%d] error, %s", mq.Topic, mq.QueueId, err.Error())
 					}
 				} else {
-					// TODO parse timestamp
+					t, err := time.Parse("20060102150405", dc.option.ConsumeTimestamp)
+					if err != nil {
+						result = -1
+					} else {
+						lastOffset, err := kernel.SearchOffsetByTimestamp(mq, t.Unix())
+						if err != nil {
+							result = -1
+						} else {
+							result = lastOffset
+						}
+					}
 				}
 			}
 		default:
diff --git a/examples/producer/main.go b/consumer/consumer_test.go
similarity index 53%
copy from examples/producer/main.go
copy to consumer/consumer_test.go
index 5c12584..2eb3f24 100644
--- a/examples/producer/main.go
+++ b/consumer/consumer_test.go
@@ -15,33 +15,17 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package main
+package consumer
 
 import (
-	"fmt"
-	"github.com/apache/rocketmq-client-go/consumer"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"os"
+	"github.com/stretchr/testify/assert"
+	"testing"
 	"time"
 )
 
-func main() {
-	c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
-		ConsumerModel: consumer.Clustering,
-		FromWhere:     consumer.ConsumeFromFirstOffset,
-	})
-	err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
-		msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-		fmt.Println(msgs)
-		return consumer.ConsumeSuccess, nil
-	})
-	if err != nil {
-		fmt.Println(err.Error())
-	}
-	err = c.Start()
-	if err != nil {
-		fmt.Println(err.Error())
-		os.Exit(-1)
-	}
-	time.Sleep(time.Hour)
+func TestParseTimestamp(t *testing.T) {
+	layout := "20060102150405"
+	timestamp, err := time.ParseInLocation(layout, "20190430193409", time.Local)
+	assert.Nil(t, err)
+	assert.Equal(t, int64(1556624049), timestamp.Unix())
 }
diff --git a/consumer/offset_store.go b/consumer/offset_store.go
index a11280d..6b6d719 100644
--- a/consumer/offset_store.go
+++ b/consumer/offset_store.go
@@ -17,7 +17,19 @@ limitations under the License.
 
 package consumer
 
-import "github.com/apache/rocketmq-client-go/kernel"
+import (
+	"encoding/json"
+	"fmt"
+	"github.com/apache/rocketmq-client-go/kernel"
+	"github.com/apache/rocketmq-client-go/remote"
+	"github.com/apache/rocketmq-client-go/rlog"
+	"github.com/apache/rocketmq-client-go/utils"
+	"os"
+	"path/filepath"
+	"strconv"
+	"sync"
+	"time"
+)
 
 type readType int
 
@@ -27,6 +39,16 @@ const (
 	_ReadMemoryThenStore
 )
 
+var (
+	_LocalOffsetStorePath = os.Getenv("rocketmq.client.localOffsetStoreDir")
+)
+
+func init() {
+	if _LocalOffsetStorePath == "" {
+		_LocalOffsetStorePath = filepath.Join(os.Getenv("user.home"), ".rocketmq_client_go")
+	}
+}
+
 type OffsetStore interface {
 	load()
 	persist(mqs []*kernel.MessageQueue)
@@ -36,20 +58,280 @@ type OffsetStore interface {
 }
 
 type localFileOffsetStore struct {
+	group       string
+	path        string
+	OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+	// mutex for offset file
+	mutex sync.Mutex
 }
 
-func (local *localFileOffsetStore) load()                                               
           {}
-func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue)                  
           {}
-func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue)                      
           {}
-func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int64      
           { return 0 }
-func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly
bool) {}
+type queueOffset struct {
+	QueueID int    `json:"queueId"`
+	Broker  string `json:"brokerName"`
+	Offset  int64  `json:"offset"`
+}
+
+func NewLocalFileOffsetStore(clientID, group string) OffsetStore {
+	store := &localFileOffsetStore{
+		group: group,
+		path:  filepath.Join(_LocalOffsetStorePath, clientID, group, "offset.json"),
+	}
+	store.load()
+	return store
+}
+
+func (local *localFileOffsetStore) load() {
+	local.mutex.Lock()
+	defer local.mutex.Unlock()
+	data, err := utils.FileReadAll(local.path)
+	if err != nil {
+		data, err = utils.FileReadAll(filepath.Join(local.path, ".bak"))
+	}
+	if err != nil {
+		rlog.Debugf("load local offset: %s error: %s", local.path, err.Error())
+		return
+	}
+	err = json.Unmarshal(data, local)
+	if err != nil {
+		rlog.Debugf("unmarshal local offset: %s error: %s", local.path, err.Error())
+		return
+	}
+}
+
+func (local *localFileOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+	if t == _ReadFromMemory || t == _ReadMemoryThenStore {
+		off := readFromMemory(local.OffsetTable, mq)
+		if off >= 0 || (off == -1 && t == _ReadFromMemory) {
+			return off
+		}
+	}
+	local.load()
+	return readFromMemory(local.OffsetTable, mq)
+}
+
+func (local *localFileOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly
bool) {
+	rlog.Infof("update offset: %s to %d", mq, offset)
+	localOffset, exist := local.OffsetTable[mq.Topic]
+	if !exist {
+		localOffset = make(map[int]*queueOffset)
+		local.OffsetTable[mq.Topic] = localOffset
+	}
+	q, exist := localOffset[mq.QueueId]
+	if !exist {
+		q = &queueOffset{
+			QueueID: mq.QueueId,
+			Broker:  mq.BrokerName,
+		}
+		localOffset[mq.QueueId] = q
+	}
+	if increaseOnly {
+		if q.Offset < offset {
+			q.Offset = offset
+		}
+	} else {
+		q.Offset = offset
+	}
+}
+
+func (local *localFileOffsetStore) persist(mqs []*kernel.MessageQueue) {
+	if len(mqs) == 0 {
+		return
+	}
+	s := new(struct {
+		OffsetTable map[string]map[int]*queueOffset `json:"offsetTable"`
+	})
+	table := make(map[string]map[int]*queueOffset)
+	for idx := range mqs {
+		mq := mqs[idx]
+		offsets, exist := local.OffsetTable[mq.Topic]
+		if !exist {
+			continue
+		}
+		off, exist := offsets[mq.QueueId]
+		if !exist {
+			continue
+		}
+
+		offsets, exist = table[mq.Topic]
+		if !exist {
+			offsets = make(map[int]*queueOffset)
+		}
+		offsets[off.QueueID] = off
+	}
+	data, _ := json.Marshal(s)
+	utils.CheckError(fmt.Sprintf("persist offset to %s", local.path), utils.WriteToFile(local.path,
data))
+}
+
+func (local *localFileOffsetStore) remove(mq *kernel.MessageQueue) {
+	// unsupported
+}
 
 type remoteBrokerOffsetStore struct {
+	group       string
+	OffsetTable map[string]map[int]*queueOffset `json:"OffsetTable"`
+	mutex       sync.RWMutex
+}
+
+func NewRemoteOffsetStore(group string) OffsetStore {
+	return &remoteBrokerOffsetStore{
+		group:       group,
+		OffsetTable: make(map[string]map[int]*queueOffset),
+	}
+}
+
+func (remote *remoteBrokerOffsetStore) load() {
+	// unsupported
+}
+
+func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue) {
+	remote.mutex.Lock()
+	defer remote.mutex.Unlock()
+	if len(mqs) == 0 {
+		return
+	}
+	for idx := range mqs {
+		mq := mqs[idx]
+		offsets, exist := remote.OffsetTable[mq.Topic]
+		if !exist {
+			continue
+		}
+		off, exist := offsets[mq.QueueId]
+		if !exist {
+			continue
+		}
+
+		err := updateConsumeOffsetToBroker(remote.group, mq.Topic, off)
+		if err != nil {
+			rlog.Warnf("update offset to broker error: %s, group: %s, queue: %s, offset: %d",
+				err.Error(), remote.group, mq.String(), off.Offset)
+		} else {
+			rlog.Infof("update offset to broker success, group: %s, topic: %s, queue: %v", remote.group,
mq.Topic, off)
+		}
+	}
+}
+
+func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue) {
+	remote.mutex.Lock()
+	defer remote.mutex.Unlock()
+	if mq == nil {
+		return
+	}
+	offset, exist := remote.OffsetTable[mq.Topic]
+	if !exist {
+		return
+	}
+	rlog.Infof("delete: %s", mq.String())
+	delete(offset, mq.QueueId)
+}
+
+func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
+	remote.mutex.RLock()
+	if t == _ReadFromMemory || t == _ReadMemoryThenStore {
+		off := readFromMemory(remote.OffsetTable, mq)
+		if off >= 0 || (off == -1 && t == _ReadFromMemory) {
+			remote.mutex.RUnlock()
+			return off
+		}
+	}
+	off, err := fetchConsumeOffsetFromBroker(remote.group, mq)
+	if err != nil {
+		rlog.Errorf("fetch offset of %s error: %s", mq.String(), err.Error())
+		remote.mutex.RUnlock()
+		return -1
+	}
+	remote.mutex.RUnlock()
+	remote.update(mq, off, true)
+	return off
 }
 
-func (remote *remoteBrokerOffsetStore) load()                                          {}
-func (remote *remoteBrokerOffsetStore) persist(mqs []*kernel.MessageQueue)             {}
-func (remote *remoteBrokerOffsetStore) remove(mq *kernel.MessageQueue)                 {}
-func (remote *remoteBrokerOffsetStore) read(mq *kernel.MessageQueue, t readType) int64 {
return 0 }
 func (remote *remoteBrokerOffsetStore) update(mq *kernel.MessageQueue, offset int64, increaseOnly
bool) {
+	rlog.Infof("update offset: %s to %d", mq, offset)
+	remote.mutex.Lock()
+	defer remote.mutex.Unlock()
+	localOffset, exist := remote.OffsetTable[mq.Topic]
+	if !exist {
+		localOffset = make(map[int]*queueOffset)
+		remote.OffsetTable[mq.Topic] = localOffset
+	}
+	q, exist := localOffset[mq.QueueId]
+	if !exist {
+		rlog.Infof("new queueOffset: %d, off: %d", mq.QueueId, offset)
+		q = &queueOffset{
+			QueueID: mq.QueueId,
+			Broker:  mq.BrokerName,
+		}
+		localOffset[mq.QueueId] = q
+	}
+	if increaseOnly {
+		if q.Offset < offset {
+			q.Offset = offset
+		}
+	} else {
+		q.Offset = offset
+	}
+}
+
+func readFromMemory(table map[string]map[int]*queueOffset, mq *kernel.MessageQueue) int64
{
+	localOffset, exist := table[mq.Topic]
+	if !exist {
+		return -1
+	}
+	off, exist := localOffset[mq.QueueId]
+	if !exist {
+		return -1
+	}
+
+	return off.Offset
+}
+
+func fetchConsumeOffsetFromBroker(group string, mq *kernel.MessageQueue) (int64, error) {
+	broker := kernel.FindBrokerAddrByName(mq.BrokerName)
+	if broker == "" {
+		kernel.UpdateTopicRouteInfo(mq.Topic)
+		broker = kernel.FindBrokerAddrByName(mq.BrokerName)
+	}
+	if broker == "" {
+		return int64(-1), fmt.Errorf("broker: %s address not found", mq.BrokerName)
+	}
+	queryOffsetRequest := &kernel.QueryConsumerOffsetRequest{
+		ConsumerGroup: group,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+	}
+	cmd := remote.NewRemotingCommand(kernel.ReqQueryConsumerOffset, queryOffsetRequest, nil)
+	res, err := remote.InvokeSync(broker, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+	if res.Code != kernel.ResSuccess {
+		return -2, fmt.Errorf("broker response code: %d, remarks: %s", res.Code, res.Remark)
+	}
+
+	off, err := strconv.ParseInt(res.ExtFields["offset"], 10, 64)
+
+	if err != nil {
+		return -1, err
+	}
+
+	return off, nil
+}
+
+func updateConsumeOffsetToBroker(group, topic string, queue *queueOffset) error {
+	broker := kernel.FindBrokerAddrByName(queue.Broker)
+	if broker == "" {
+		kernel.UpdateTopicRouteInfo(topic)
+		broker = kernel.FindBrokerAddrByName(queue.Broker)
+	}
+	if broker == "" {
+		return fmt.Errorf("broker: %s address not found", queue.Broker)
+	}
+
+	updateOffsetRequest := &kernel.UpdateConsumerOffsetRequest{
+		ConsumerGroup: group,
+		Topic:         topic,
+		QueueId:       queue.QueueID,
+		CommitOffset:  queue.Offset,
+	}
+	cmd := remote.NewRemotingCommand(kernel.ReqUpdateConsumerOffset, updateOffsetRequest, nil)
+	return remote.InvokeOneWay(broker, cmd, 5*time.Second)
 }
diff --git a/consumer/process_queue.go b/consumer/process_queue.go
index 0db0766..f4367f9 100644
--- a/consumer/process_queue.go
+++ b/consumer/process_queue.go
@@ -64,20 +64,26 @@ func (pq *ProcessQueue) putMessage(messages []*kernel.MessageExt) {
 	localList := list.New()
 	for idx := range messages {
 		localList.PushBack(messages[idx])
+		pq.queueOffsetMax = messages[idx].QueueOffset
 	}
 	pq.mutex.Lock()
 	pq.msgCache.PushBackList(localList)
 	pq.mutex.Unlock()
 }
 
-func (pq *ProcessQueue) removeMessage(number int) int {
-	i := 0
+func (pq *ProcessQueue) removeMessage(number int) int64 {
+	result := pq.queueOffsetMax + 1
 	pq.mutex.Lock()
-	for ; i < number && pq.msgCache.Len() > 0; i++ {
-		pq.msgCache.Remove(pq.msgCache.Front())
+	for i := 0; i < number && pq.msgCache.Len() > 0; i++ {
+		head := pq.msgCache.Front()
+		pq.msgCache.Remove(head)
+		result = head.Value.(*kernel.MessageExt).QueueOffset
 	}
 	pq.mutex.Unlock()
-	return i
+	if pq.msgCache.Len() > 0 {
+		result = pq.msgCache.Front().Value.(*kernel.MessageExt).QueueOffset
+	}
+	return result
 }
 
 func (pq *ProcessQueue) takeMessages(number int) []*kernel.MessageExt {
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 58f4c68..fc78e3e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -20,7 +20,6 @@ package consumer
 import (
 	"context"
 	"errors"
-	"fmt"
 	"github.com/apache/rocketmq-client-go/kernel"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"math"
@@ -53,13 +52,6 @@ type PushConsumer interface {
 
 type pushConsumer struct {
 	*defaultConsumer
-	/**
-	 * Backtracking consumption time with second precision. Time format is
-	 * 20131223171201<br>
-	 * Implying Seventeen twelve and 01 seconds on December 23, 2013 year<br>
-	 * Default backtracking consumption time Half an hour ago.
-	 */
-	ConsumeTimestamp             time.Duration
 	queueFlowControlTimes        int
 	queueMaxSpanFlowControlTimes int
 	consume                      func(*ConsumeMessageContext, []*kernel.MessageExt) (ConsumeResult,
error)
@@ -97,9 +89,8 @@ func NewPushConsumer(consumerGroup string, opt ConsumerOption) PushConsumer
{
 	}
 
 	p := &pushConsumer{
-		defaultConsumer:  dc,
-		ConsumeTimestamp: 30 * time.Minute,
-		subscribedTopic:  make(map[string]string, 0),
+		defaultConsumer: dc,
+		subscribedTopic: make(map[string]string, 0),
 	}
 	dc.mqChanged = p.messageQueueChanged
 	if p.consumeOrderly {
@@ -118,8 +109,8 @@ func (pc *pushConsumer) Start() error {
 		pc.state = kernel.StateStartFailed
 		pc.validate()
 
-		// set retry topic
 		if pc.model == Clustering {
+			// set retry topic
 			retryTopic := kernel.GetRetryTopic(pc.consumerGroup)
 			pc.subscriptionDataTable.Store(retryTopic, buildSubscriptionData(retryTopic,
 				MessageSelector{TAG, _SubAll}))
@@ -128,9 +119,9 @@ func (pc *pushConsumer) Start() error {
 		pc.client = kernel.GetOrNewRocketMQClient(pc.option.ClientOption)
 		if pc.model == Clustering {
 			pc.option.ChangeInstanceNameToPID()
-			pc.storage = &remoteBrokerOffsetStore{}
+			pc.storage = NewRemoteOffsetStore(pc.consumerGroup)
 		} else {
-			pc.storage = &localFileOffsetStore{}
+			pc.storage = NewLocalFileOffsetStore(pc.consumerGroup, pc.client.ClientID())
 		}
 		pc.storage.load()
 		go func() {
@@ -139,7 +130,6 @@ func (pc *pushConsumer) Start() error {
 			for {
 				pr := <-pc.prCh
 				go func() {
-					fmt.Println(pr.String())
 					pc.pullMessage(&pr)
 				}()
 			}
@@ -417,7 +407,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 			QueueOffset:    request.nextOffset,
 			MaxMsgNums:     pc.option.PullBatchSize,
 			SysFlag:        sysFlag,
-			CommitOffset:   0,
+			CommitOffset:   commitOffsetValue,
 			SubExpression:  _SubAll,
 			ExpressionType: string(TAG), // TODO
 		}
@@ -428,7 +418,6 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 		//	pullRequest.SubVersion = data.SubVersion
 		//}
 
-		//ch := make(chan *kernel.PullResult)
 		brokerResult := tryFindBroker(request.mq)
 		if brokerResult == nil {
 			rlog.Warnf("no broker found for %s", request.mq.String())
@@ -437,13 +426,13 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 		}
 		result, err := pc.client.PullMessage(context.Background(), brokerResult.BrokerAddr, pullRequest)
 		if err != nil {
-			rlog.Warnf("pull message from %s error: %s", "127.0.0.1:10911", err.Error())
+			rlog.Warnf("pull message from %s error: %s", brokerResult.BrokerAddr, err.Error())
 			sleepTime = _PullDelayTimeWhenError
 			goto NEXT
 		}
 
 		if result.Status == kernel.PullBrokerTimeout {
-			rlog.Warnf("pull broker: %s timeout", "127.0.0.1:10911")
+			rlog.Warnf("pull broker: %s timeout", brokerResult.BrokerAddr)
 			sleepTime = _PullDelayTimeWhenError
 			goto NEXT
 		}
@@ -486,7 +475,7 @@ func (pc *pushConsumer) pullMessage(request *PullRequest) {
 				rlog.Warnf("fix the pull request offset: %s", request.String())
 			}()
 		default:
-			rlog.Warnf("")
+			rlog.Warnf("unknown pull status: %v", result.Status)
 			sleepTime = _PullDelayTimeWhenError
 		}
 	}
diff --git a/examples/producer/main.go b/examples/producer/main.go
index 5c12584..e2d0126 100644
--- a/examples/producer/main.go
+++ b/examples/producer/main.go
@@ -30,7 +30,7 @@ func main() {
 		ConsumerModel: consumer.Clustering,
 		FromWhere:     consumer.ConsumeFromFirstOffset,
 	})
-	err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
+	err := c.Subscribe("test", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
 		msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
 		fmt.Println(msgs)
 		return consumer.ConsumeSuccess, nil
diff --git a/kernel/client.go b/kernel/client.go
index e83d598..8403ccb 100644
--- a/kernel/client.go
+++ b/kernel/client.go
@@ -101,63 +101,66 @@ type RMQClient struct {
 
 	// group -> InnerConsumer
 	consumerMap sync.Map
+	once        sync.Once
 }
 
 var clientMap sync.Map
 
 func GetOrNewRocketMQClient(option ClientOption) *RMQClient {
-	// TODO
-	return &RMQClient{option: option}
+	client := &RMQClient{option: option}
+	actual, _ := clientMap.LoadOrStore(client.ClientID(), client)
+	return actual.(*RMQClient)
 }
 
 func (c *RMQClient) Start() {
-	// TODO fetchNameServerAddr
-	go func() {}()
-
-	// schedule update route info
-	go func() {
-		// delay
-		time.Sleep(50 * time.Millisecond)
-		for {
-			c.UpdateTopicRouteInfo()
-			time.Sleep(_PullNameServerInterval)
-		}
-	}()
-
-	// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
-	go func() {}()
-
-	// schedule persist offset
-	go func() {
-		time.Sleep(10 * time.Second)
-		for {
-			c.consumerMap.Range(func(key, value interface{}) bool {
-				consumer := value.(InnerConsumer)
-				consumer.PersistConsumerOffset()
-				return true
-			})
-			time.Sleep(_PersistOffset)
-		}
-	}()
+	c.once.Do(func() {
+		// TODO fetchNameServerAddr
+		go func() {}()
+
+		// schedule update route info
+		go func() {
+			// delay
+			time.Sleep(50 * time.Millisecond)
+			for {
+				c.UpdateTopicRouteInfo()
+				time.Sleep(_PullNameServerInterval)
+			}
+		}()
+
+		// TODO cleanOfflineBroker & sendHeartbeatToAllBrokerWithLock
+		go func() {}()
+
+		// schedule persist offset
+		go func() {
+			time.Sleep(10 * time.Second)
+			for {
+				c.consumerMap.Range(func(key, value interface{}) bool {
+					consumer := value.(InnerConsumer)
+					consumer.PersistConsumerOffset()
+					return true
+				})
+				time.Sleep(_PersistOffset)
+			}
+		}()
 
-	go func() {
-		for {
-			c.RebalanceImmediately()
-			time.Sleep(time.Second)
-		}
-	}()
+		go func() {
+			for {
+				c.RebalanceImmediately()
+				time.Sleep(time.Second)
+			}
+		}()
+	})
 }
 
 func (c *RMQClient) ClientID() string {
-	//id := c.option.ClientIP + "@" + c.option.InstanceName
-	//if c.option.UnitName != "" {
-	//	id += "@" + c.option.UnitName
-	//}
-	return "127.0.0.1:10911@DEFAULT"
+	id := c.option.ClientIP + "@" + c.option.InstanceName
+	if c.option.UnitName != "" {
+		id += "@" + c.option.UnitName
+	}
+	return id
 }
 
 func (c *RMQClient) CheckClientInBroker() {
-
 }
 
 func (c *RMQClient) SendHeartbeatToAllBrokerWithLock() {
@@ -269,7 +272,7 @@ func (c *RMQClient) SendMessageAsync(ctx context.Context, brokerAddrs,
brokerNam
 func (c *RMQClient) SendMessageOneWay(ctx context.Context, brokerAddrs string, request *SendMessageRequest,
 	msgs []*Message) (*SendResult, error) {
 	cmd := remote.NewRemotingCommand(ReqSendBatchMessage, request, encodeMessages(msgs))
-	err := remote.InvokeOneWay(brokerAddrs, cmd)
+	err := remote.InvokeOneWay(brokerAddrs, cmd, 3*time.Second)
 	if err != nil {
 		rlog.Warnf("send messages with oneway error: %v", err)
 	}
@@ -379,18 +382,59 @@ func (c *RMQClient) PullMessageAsync(ctx context.Context, brokerAddrs
string, re
 }
 
 // QueryMaxOffset with specific queueId and topic
-func QueryMaxOffset(topic string, queueId int) (int64, error) {
-	return 0, nil
+func QueryMaxOffset(mq *MessageQueue) (int64, error) {
+	brokerAddr := FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = FindBrokerAddrByName(mq.Topic)
+	}
+	if brokerAddr == "" {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &GetMaxOffsetRequest{
+		Topic:   mq.Topic,
+		QueueId: mq.QueueId,
+	}
+
+	cmd := remote.NewRemotingCommand(ReqGetMaxOffset, request, nil)
+	response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
 // QueryConsumerOffset with specific queueId and topic of consumerGroup
-func (c *RMQClient) QueryConsumerOffset(consumerGroup, topic string, queue int) (int64, error)
{
+func (c *RMQClient) QueryConsumerOffset(consumerGroup, mq *MessageQueue) (int64, error) {
 	return 0, nil
 }
 
 // SearchOffsetByTimestamp with specific queueId and topic
-func (c *RMQClient) SearchOffsetByTimestamp(topic string, queue int, timestamp int64) (int64,
error) {
-	return 0, nil
+func SearchOffsetByTimestamp(mq *MessageQueue, timestamp int64) (int64, error) {
+	brokerAddr := FindBrokerAddrByName(mq.BrokerName)
+	if brokerAddr == "" {
+		UpdateTopicRouteInfo(mq.Topic)
+		brokerAddr = FindBrokerAddrByName(mq.Topic)
+	}
+	if brokerAddr == "" {
+		return -1, fmt.Errorf("the broker [%s] does not exist", mq.BrokerName)
+	}
+
+	request := &SearchOffsetRequest{
+		Topic:     mq.Topic,
+		QueueId:   mq.QueueId,
+		Timestamp: timestamp,
+	}
+
+	cmd := remote.NewRemotingCommand(ReqSearchOffsetByTimestamp, request, nil)
+	response, err := remote.InvokeSync(brokerAddr, cmd, 3*time.Second)
+	if err != nil {
+		return -1, err
+	}
+
+	return strconv.ParseInt(response.ExtFields["offset"], 10, 64)
 }
 
 // UpdateConsumerOffset with specific queueId and topic
diff --git a/kernel/request.go b/kernel/request.go
index 2647ad5..36770ec 100644
--- a/kernel/request.go
+++ b/kernel/request.go
@@ -19,17 +19,22 @@ package kernel
 
 import (
 	"fmt"
+	"strconv"
 	"time"
 )
 
 const (
-	ReqPullMessage            = int16(11)
-	ReqHeartBeat              = int16(34)
-	ReqGetConsumerListByGroup = int16(38)
-	ReqLockBatchMQ            = int16(41)
-	ReqUnlockBatchMQ          = int16(42)
-	ReqGetRouteInfoByTopic    = int16(105)
-	ReqSendBatchMessage       = int16(320)
+	ReqPullMessage             = int16(11)
+	ReqQueryConsumerOffset     = int16(14)
+	ReqUpdateConsumerOffset    = int16(15)
+	ReqSearchOffsetByTimestamp = int16(30)
+	ReqGetMaxOffset            = int16(30)
+	ReqHeartBeat               = int16(34)
+	ReqGetConsumerListByGroup  = int16(38)
+	ReqLockBatchMQ             = int16(41)
+	ReqUnlockBatchMQ           = int16(42)
+	ReqGetRouteInfoByTopic     = int16(105)
+	ReqSendBatchMessage        = int16(320)
 )
 
 type SendMessageRequest struct {
@@ -97,28 +102,60 @@ func (request *GetConsumerList) Encode() map[string]string {
 
 type GetMaxOffsetRequest struct {
 	Topic   string `json:"topic"`
-	QueueId int32  `json:"queueId"`
+	QueueId int    `json:"queueId"`
+}
+
+func (request *GetMaxOffsetRequest) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["topic"] = request.Topic
+	maps["queueId"] = strconv.Itoa(request.QueueId)
+	return maps
 }
 
 type QueryConsumerOffsetRequest struct {
 	ConsumerGroup string `json:"consumerGroup"`
 	Topic         string `json:"topic"`
-	QueueId       int32  `json:"queueId"`
+	QueueId       int    `json:"queueId"`
+}
+
+func (request *QueryConsumerOffsetRequest) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["consumerGroup"] = request.ConsumerGroup
+	maps["topic"] = request.Topic
+	maps["queueId"] = strconv.Itoa(request.QueueId)
+	return maps
 }
 
 type SearchOffsetRequest struct {
 	Topic     string `json:"topic"`
-	QueueId   int32  `json:"queueId"`
+	QueueId   int    `json:"queueId"`
 	Timestamp int64  `json:"timestamp"`
 }
 
+func (request *SearchOffsetRequest) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["Topic"] = request.Topic
+	maps["QueueId"] = strconv.Itoa(request.QueueId)
+	maps["timestamp"] = strconv.FormatInt(request.Timestamp, 10)
+	return maps
+}
+
 type UpdateConsumerOffsetRequest struct {
 	ConsumerGroup string `json:"consumerGroup"`
 	Topic         string `json:"topic"`
-	QueueId       int32  `json:"queueId"`
+	QueueId       int    `json:"queueId"`
 	CommitOffset  int64  `json:"commitOffset"`
 }
 
+func (request *UpdateConsumerOffsetRequest) Encode() map[string]string {
+	maps := make(map[string]string)
+	maps["consumerGroup"] = request.ConsumerGroup
+	maps["topic"] = request.Topic
+	maps["queueId"] = strconv.Itoa(request.QueueId)
+	maps["commitOffset"] = strconv.FormatInt(request.CommitOffset, 10)
+	return maps
+}
+
 type GetRouteInfoRequest struct {
 	Topic string `json:"topic"`
 }
diff --git a/kernel/route.go b/kernel/route.go
index f8cd884..4ff4f99 100644
--- a/kernel/route.go
+++ b/kernel/route.go
@@ -138,7 +138,7 @@ func FindBrokerAddrByTopic(topic string) string {
 	return addr
 }
 
-func FindBrokerAddressInPublish(brokerName string) string {
+func FindBrokerAddrByName(brokerName string) string {
 	bd, exist := brokerAddressesMap.Load(brokerName)
 
 	if !exist {
diff --git a/remote/remote_client.go b/remote/remote_client.go
index 9f5839a..bfe483c 100644
--- a/remote/remote_client.go
+++ b/remote/remote_client.go
@@ -116,7 +116,7 @@ func InvokeAsync(addr string, request *RemotingCommand, timeoutMillis
time.Durat
 
 }
 
-func InvokeOneWay(addr string, request *RemotingCommand) error {
+func InvokeOneWay(addr string, request *RemotingCommand, timeout time.Duration) error {
 	conn, err := connect(addr)
 	if err != nil {
 		return err
diff --git a/remote/remote_client_test.go b/remote/remote_client_test.go
index 0a05953..acd0d26 100644
--- a/remote/remote_client_test.go
+++ b/remote/remote_client_test.go
@@ -261,7 +261,7 @@ func TestInvokeOneWay(t *testing.T) {
 	var wg sync.WaitGroup
 	wg.Add(1)
 	go func() {
-		err := InvokeOneWay(":3000", clientSendRemtingCommand)
+		err := InvokeOneWay(":3000", clientSendRemtingCommand, 3*time.Second)
 		if err != nil {
 			t.Fatalf("failed to invoke synchronous. %s", err)
 		}
diff --git a/examples/producer/main.go b/utils/errors.go
similarity index 53%
copy from examples/producer/main.go
copy to utils/errors.go
index 5c12584..0d96d97 100644
--- a/examples/producer/main.go
+++ b/utils/errors.go
@@ -15,33 +15,12 @@ See the License for the specific language governing permissions and
 limitations under the License.
 */
 
-package main
+package utils
 
-import (
-	"fmt"
-	"github.com/apache/rocketmq-client-go/consumer"
-	"github.com/apache/rocketmq-client-go/kernel"
-	"os"
-	"time"
-)
+import "github.com/apache/rocketmq-client-go/rlog"
 
-func main() {
-	c := consumer.NewPushConsumer("testGroup", consumer.ConsumerOption{
-		ConsumerModel: consumer.Clustering,
-		FromWhere:     consumer.ConsumeFromFirstOffset,
-	})
-	err := c.Subscribe("testTopic", consumer.MessageSelector{}, func(ctx *consumer.ConsumeMessageContext,
-		msgs []*kernel.MessageExt) (consumer.ConsumeResult, error) {
-		fmt.Println(msgs)
-		return consumer.ConsumeSuccess, nil
-	})
+func CheckError(action string, err error) {
 	if err != nil {
-		fmt.Println(err.Error())
+		rlog.Errorf("%s error: %s", action, err.Error())
 	}
-	err = c.Start()
-	if err != nil {
-		fmt.Println(err.Error())
-		os.Exit(-1)
-	}
-	time.Sleep(time.Hour)
 }
diff --git a/utils/files.go b/utils/files.go
new file mode 100644
index 0000000..b1c2c36
--- /dev/null
+++ b/utils/files.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package utils
+
+import (
+	"fmt"
+	"os"
+	"path/filepath"
+)
+
+func FileReadAll(path string) ([]byte, error) {
+	stat, err := os.Stat(path)
+	if err != nil {
+		return nil, err
+	}
+	file, err := os.Open(path)
+	if err != nil {
+		return nil, err
+	}
+	data := make([]byte, stat.Size())
+	_, err = file.Read(data)
+	if err != nil {
+		return nil, err
+	}
+	return data, nil
+}
+
+func WriteToFile(path string, data []byte) error {
+	tmpFile, err := os.Create(filepath.Join(path, ".tmp"))
+	if err != nil {
+		return err
+	}
+	_, err = tmpFile.Write(data)
+	if err != nil {
+		return err
+	}
+	CheckError(fmt.Sprintf("close %s", tmpFile.Name()), tmpFile.Close())
+
+	prevContent, err := FileReadAll(path)
+	if err == nil {
+		bakFile, err := os.Create(filepath.Join(path, ".bak"))
+		_, err = bakFile.Write(prevContent)
+		if err != nil {
+			return err
+		}
+		CheckError(fmt.Sprintf("close %s", bakFile.Name()), bakFile.Close())
+	}
+	CheckError(fmt.Sprintf("remove %s", path), os.Remove(path))
+	return os.Rename(filepath.Join(path, ".tmp"), path)
+}


Mime
View raw message