rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-go] branch native updated: add trace feature. resolve #124 (#125)
Date Thu, 25 Jul 2019 08:20:56 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 09ccdd4  add trace feature. resolve #124 (#125)
09ccdd4 is described below

commit 09ccdd463d6c8b1ab6497abf95b7e187be83bc46
Author: xujianhai666 <52450794+xujianhai666@users.noreply.github.com>
AuthorDate: Thu Jul 25 16:20:51 2019 +0800

    add trace feature. resolve #124 (#125)
---
 consumer/interceptor.go         | 109 +++++++++
 consumer/push_consumer.go       |  20 +-
 examples/consumer/trace/main.go |  57 +++++
 examples/producer/trace/main.go |  65 ++++++
 go.mod                          |   1 +
 go.sum                          |   9 +
 internal/client.go              |   2 +-
 internal/constants.go           |   1 +
 internal/route.go               |   2 +-
 internal/trace.go               | 482 ++++++++++++++++++++++++++++++++++++++++
 internal/trace_test.go          | 125 +++++++++++
 internal/utils/helper.go        |  52 -----
 internal/utils/helper_test.go   |  33 ---
 internal/utils/net.go           |  24 +-
 internal/utils/net_test.go      |   2 +-
 primitive/ctx.go                |  65 +++++-
 primitive/interceptor.go        |   6 +
 primitive/message.go            |  44 ++++
 primitive/result.go             |  12 +-
 primitive/result_test.go        |  18 +-
 producer/interceptor.go         |  97 ++++++++
 producer/producer.go            |  17 ++
 22 files changed, 1134 insertions(+), 109 deletions(-)

diff --git a/consumer/interceptor.go b/consumer/interceptor.go
new file mode 100644
index 0000000..260ac6f
--- /dev/null
+++ b/consumer/interceptor.go
@@ -0,0 +1,109 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package consumer
+
+import (
+	"context"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/internal"
+	"github.com/apache/rocketmq-client-go/internal/utils"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace. 
+func WithTrace(traceCfg primitive.TraceConfig) Option {
+	return func(options *consumerOptions) {
+
+		ori := options.Interceptors
+		options.Interceptors = make([]primitive.Interceptor, 0)
+		options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg))
+		options.Interceptors = append(options.Interceptors, ori...)
+	}
+}
+
+func newTraceInterceptor(traceCfg primitive.TraceConfig) primitive.Interceptor {
+	dispatcher := internal.NewTraceDispatcher(traceCfg.TraceTopic, traceCfg.Access)
+	dispatcher.Start()
+
+	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+		consumerCtx, exist := primitive.GetConsumerCtx(ctx)
+		if !exist || len(consumerCtx.Msgs) == 0 {
+			return next(ctx, req, reply)
+		}
+
+		beginT := time.Now()
+		// before traceCtx
+		traceCx := internal.TraceContext{
+			RequestId: internal.CreateUniqID(),
+			TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
+			TraceType: internal.SubBefore,
+			GroupName: consumerCtx.ConsumerGroup,
+			IsSuccess: true,
+		}
+		beans := make([]internal.TraceBean, 0)
+		for _, msg := range consumerCtx.Msgs {
+			if msg == nil {
+				continue
+			}
+			regionID := msg.GetRegionID()
+			traceOn := msg.IsTraceOn()
+			if traceOn == "false" {
+				continue
+			}
+			bean := internal.TraceBean{
+				Topic:      msg.Topic,
+				MsgId:      msg.MsgId,
+				Tags:       msg.GetTags(),
+				Keys:       msg.GetKeys(),
+				StoreTime:  msg.StoreTimestamp,
+				BodyLength: int(msg.StoreSize),
+				RetryTimes: int(msg.ReconsumeTimes),
+				ClientHost: utils.LocalIP,
+				StoreHost:  utils.LocalIP,
+			}
+			beans = append(beans, bean)
+			traceCx.RegionId = regionID
+		}
+		if len(beans) > 0 {
+			traceCx.TraceBeans = beans
+			traceCx.TimeStamp = time.Now().UnixNano() / int64(time.Millisecond)
+			dispatcher.Append(traceCx)
+		}
+
+		err := next(ctx, req, reply)
+
+		// after traceCtx
+		costTime := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)
+		ctxType := consumerCtx.Properties[primitive.PropCtxType]
+		afterCtx := internal.TraceContext{
+			TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
+
+			TraceType:   internal.SubAfter,
+			RegionId:    traceCx.RegionId,
+			GroupName:   traceCx.GroupName,
+			RequestId:   traceCx.RequestId,
+			IsSuccess:   consumerCtx.Success,
+			CostTime:    costTime,
+			TraceBeans:  traceCx.TraceBeans,
+			ContextCode: primitive.ConsumeReturnType(ctxType).Ordinal(),
+		}
+		dispatcher.Append(afterCtx)
+		return err
+	}
+}
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3b011c3..7b4d65e 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -640,6 +640,9 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M
 
 			realReply := reply.(*ConsumeResultHolder)
 			realReply.ConsumeResult = r
+
+			msgCtx, _ := primitive.GetConsumerCtx(ctx)
+			msgCtx.Success = realReply.ConsumeResult == ConsumeSuccess
 			return e
 		})
 		return container.ConsumeResult, err
@@ -694,6 +697,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 			var err error
 			msgCtx := &primitive.ConsumeMessageContext{
 				Properties: make(map[string]string),
+				ConsumerGroup: pc.consumerGroup,
+				MQ: mq,
+				Msgs: msgs,
 			}
 			ctx := context.Background()
 			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -706,16 +712,15 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 
 			consumeRT := time.Now().Sub(beginTime)
 			if err != nil {
-				msgCtx.Properties["ConsumeContextType"] = "EXCEPTION"
+				msgCtx.Properties[primitive.PropCtxType] = string(primitive.ExceptionRetrun)
 			} else if consumeRT >= pc.option.ConsumeTimeout {
-				msgCtx.Properties["ConsumeContextType"] = "TIMEOUT"
+				msgCtx.Properties[primitive.PropCtxType] = string(primitive.TimeoutReturn)
 			} else if result == ConsumeSuccess {
-				msgCtx.Properties["ConsumeContextType"] = "SUCCESS"
-			} else {
-				msgCtx.Properties["ConsumeContextType"] = "RECONSUME_LATER"
+				msgCtx.Properties[primitive.PropCtxType] = string(primitive.SuccessReturn)
+			} else if result == ConsumeRetryLater{
+				msgCtx.Properties[primitive.PropCtxType] = string(primitive.FailedReturn)
 			}
 
-			// TODO hook
 			increaseConsumeRT(pc.consumerGroup, mq.Topic, int64(consumeRT/time.Millisecond))
 
 			if !pq.dropped {
@@ -808,6 +813,9 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
 			ctx := context.Background()
 			msgCtx := &primitive.ConsumeMessageContext{
 				Properties: make(map[string]string),
+				ConsumerGroup: pc.consumerGroup,
+				MQ: mq,
+				Msgs: msgs,
 			}
 			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
 			ctx = primitive.WithMethod(ctx, primitive.ConsumerPush)
diff --git a/examples/consumer/trace/main.go b/examples/consumer/trace/main.go
new file mode 100644
index 0000000..9102175
--- /dev/null
+++ b/examples/consumer/trace/main.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/rocketmq-client-go"
+	"github.com/apache/rocketmq-client-go/consumer"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+func main() {
+	namesrvs := []string{"127.0.0.1:9876"}
+	traceCfg := primitive.TraceConfig{
+		Access: primitive.Local,
+	}
+
+	c, _ := rocketmq.NewPushConsumer(
+		consumer.WithGroupName("testGroup"),
+		consumer.WithNameServer(namesrvs),
+		consumer.WithTrace(traceCfg),
+	)
+	err := c.Subscribe("TopicTest", consumer.MessageSelector{}, func(ctx context.Context,
+		msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {
+		fmt.Printf("subscribe callback: %v \n", msgs)
+		return consumer.ConsumeSuccess, nil
+	})
+	if err != nil {
+		fmt.Println(err.Error())
+	}
+	// Note: start after subscribe
+	err = c.Start()
+	if err != nil {
+		fmt.Println(err.Error())
+		os.Exit(-1)
+	}
+	time.Sleep(time.Hour)
+}
diff --git a/examples/producer/trace/main.go b/examples/producer/trace/main.go
new file mode 100644
index 0000000..266c783
--- /dev/null
+++ b/examples/producer/trace/main.go
@@ -0,0 +1,65 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package main
+
+import (
+	"context"
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/apache/rocketmq-client-go"
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/apache/rocketmq-client-go/producer"
+)
+
+func main() {
+	namesrvs := []string{"127.0.0.1:9876"}
+	traceCfg := primitive.TraceConfig{
+		Access:   primitive.Local,
+	}
+
+	p, _ := rocketmq.NewProducer(
+		producer.WithNameServer(namesrvs),
+		producer.WithRetry(2),
+		producer.WithTrace(traceCfg))
+	err := p.Start()
+	if err != nil {
+		fmt.Printf("start producer error: %s", err.Error())
+		os.Exit(1)
+	}
+	for i := 0; i < 1; i++ {
+		res, err := p.SendSync(context.Background(), &primitive.Message{
+			Topic: "TopicTest",
+			Body:  []byte("Hello RocketMQ Go Client!"),
+		})
+
+		if err != nil {
+			fmt.Printf("send message error: %s\n", err)
+		} else {
+			fmt.Printf("send message success: result=%s\n", res.String())
+		}
+	}
+
+	time.Sleep(10 * time.Second)
+
+	err = p.Shutdown()
+	if err != nil {
+		fmt.Printf("shundown producer error: %s", err.Error())
+	}
+}
diff --git a/go.mod b/go.mod
index 322c847..e1b0d88 100644
--- a/go.mod
+++ b/go.mod
@@ -7,6 +7,7 @@ require (
 	github.com/golang/mock v1.3.1
 	github.com/pkg/errors v0.8.1
 	github.com/sirupsen/logrus v1.4.1
+	github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945
 	github.com/stretchr/testify v1.3.0
 	github.com/tidwall/gjson v1.2.1
 	github.com/tidwall/match v1.0.1 // indirect
diff --git a/go.sum b/go.sum
index 06d0d34..cc4c4b6 100644
--- a/go.sum
+++ b/go.sum
@@ -5,6 +5,10 @@ github.com/emirpasic/gods v1.12.0 h1:QAUIPSaCu4G+POclxeqb3F+WPpdKqFGlw36+yOzGlrg
 github.com/emirpasic/gods v1.12.0/go.mod h1:YfzfFFoVP/catgzJb4IKIqXjX78Ha8FMSDh3ymbK86o=
 github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
 github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8=
+github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
+github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo=
+github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
 github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
@@ -13,6 +17,10 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
 github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d h1:zE9ykElWQ6/NYmHa3jpm/yHnI4xSofP+UP6SpjHcSeM=
+github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 h1:N8Bg45zpk/UcpNGnfJt2y/3lRWASHNTUET8owPYCgYI=
+github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
@@ -32,4 +40,5 @@ golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5h
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/tools v0.0.0-20190328211700-ab21143f2384/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
 golang.org/x/tools v0.0.0-20190425150028-36563e24a262/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
diff --git a/internal/client.go b/internal/client.go
index 558c28d..d58efc4 100644
--- a/internal/client.go
+++ b/internal/client.go
@@ -88,7 +88,7 @@ func DefaultClientOptions() ClientOptions {
 	opts := ClientOptions{
 		InstanceName: "DEFAULT",
 		RetryTimes:   3,
-		ClientIP:     utils.LocalIP(),
+		ClientIP:     utils.LocalIP,
 	}
 	return opts
 }
diff --git a/internal/constants.go b/internal/constants.go
index a234c18..e2e911f 100644
--- a/internal/constants.go
+++ b/internal/constants.go
@@ -21,6 +21,7 @@ const (
 	RetryGroupTopicPrefix    = "%RETRY%"
 	DefaultConsumerGroup     = "DEFAULT_CONSUMER"
 	ClientInnerProducerGroup = "CLIENT_INNER_PRODUCER"
+	SystemTopicPrefix        = "rmq_sys_"
 )
 
 func GetRetryTopic(group string) string {
diff --git a/internal/route.go b/internal/route.go
index 8b1e713..6053fd8 100644
--- a/internal/route.go
+++ b/internal/route.go
@@ -275,7 +275,7 @@ func FetchPublishMessageQueues(topic string) ([]*primitive.MessageQueue, error)
 	if !exist {
 		routeData, err = queryTopicRouteInfoFromServer(topic)
 		if err != nil {
-			rlog.Error("queryTopicRouteInfoFromServer failed. topic: %v", topic)
+			rlog.Error("queryTopicRouteInfoFromServer failed. topic: ", topic)
 			return nil, err
 		}
 		routeDataMap.Store(topic, routeData)
diff --git a/internal/trace.go b/internal/trace.go
new file mode 100644
index 0000000..dc3b246
--- /dev/null
+++ b/internal/trace.go
@@ -0,0 +1,482 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+	"bytes"
+	"context"
+	"encoding/binary"
+	"encoding/hex"
+	"fmt"
+	"os"
+	"runtime"
+	"strconv"
+	"strings"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
+	"github.com/apache/rocketmq-client-go/primitive"
+	"github.com/apache/rocketmq-client-go/rlog"
+)
+
+var (
+	counter        int16 = 0
+	startTimestamp int64 = 0
+	nextTimestamp  int64 = 0
+	prefix         string
+	locker         sync.Mutex
+	classLoadId    int32 = 0
+)
+
+func init() {
+	buf := new(bytes.Buffer)
+
+	ip, err := utils.ClientIP4()
+	if err != nil {
+		ip = utils.FakeIP()
+	}
+	_, _ = buf.Write(ip)
+	_ = binary.Write(buf, binary.BigEndian, Pid())
+	_ = binary.Write(buf, binary.BigEndian, classLoadId)
+	prefix = strings.ToUpper(hex.EncodeToString(buf.Bytes()))
+}
+
+func CreateUniqID() string {
+	locker.Lock()
+	defer locker.Unlock()
+
+	if time.Now().Unix() > nextTimestamp {
+		updateTimestamp()
+	}
+	counter++
+	buf := new(bytes.Buffer)
+	_ = binary.Write(buf, binary.BigEndian, int32((time.Now().Unix()-startTimestamp)*1000))
+	_ = binary.Write(buf, binary.BigEndian, counter)
+
+	return prefix + hex.EncodeToString(buf.Bytes())
+}
+
+func updateTimestamp() {
+	year, month := time.Now().Year(), time.Now().Month()
+	startTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix()
+	nextTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0).Unix()
+}
+
+func Pid() int16 {
+	return int16(os.Getpid())
+}
+
+type TraceBean struct {
+	Topic       string
+	MsgId       string
+	OffsetMsgId string
+	Tags        string
+	Keys        string
+	StoreHost   string
+	ClientHost  string
+	StoreTime   int64
+	RetryTimes  int
+	BodyLength  int
+	MsgType     primitive.MessageType
+}
+
+type TraceTransferBean struct {
+	transData string
+	// not duplicate
+	transKey []string
+}
+
+type TraceType string
+
+const (
+	Pub       TraceType = "Pub"
+	SubBefore TraceType = "SubBefore"
+	SubAfter  TraceType = "SubAfter"
+
+	contentSplitter = '\001'
+	fieldSplitter   = '\002'
+)
+
+type TraceContext struct {
+	TraceType   TraceType
+	TimeStamp   int64
+	RegionId    string
+	RegionName  string
+	GroupName   string
+	CostTime    int64
+	IsSuccess   bool
+	RequestId   string
+	ContextCode int
+	TraceBeans  []TraceBean
+}
+
+func (ctx *TraceContext) marshal2Bean() *TraceTransferBean {
+	buffer := bytes.NewBufferString("")
+	switch ctx.TraceType {
+	case Pub:
+		bean := ctx.TraceBeans[0]
+		buffer.WriteString(string(ctx.TraceType))
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(strconv.FormatInt(ctx.TimeStamp, 10))
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(ctx.RegionId)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(ctx.GroupName)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(bean.Topic)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(bean.MsgId)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(bean.Tags)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(bean.Keys)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(bean.StoreHost)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(strconv.Itoa(bean.BodyLength))
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(strconv.FormatInt(ctx.CostTime, 10))
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(strconv.Itoa(int(bean.MsgType)))
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(bean.OffsetMsgId)
+		buffer.WriteRune(contentSplitter)
+		buffer.WriteString(strconv.FormatBool(ctx.IsSuccess))
+		buffer.WriteRune(fieldSplitter)
+	case SubBefore:
+		for _, bean := range ctx.TraceBeans {
+			buffer.WriteString(string(ctx.TraceType))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(strconv.FormatInt(ctx.TimeStamp, 10))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(ctx.RegionId)
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(ctx.GroupName)
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(ctx.RequestId)
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(bean.MsgId)
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(strconv.Itoa(bean.RetryTimes))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(nullWrap(bean.Keys))
+			buffer.WriteRune(fieldSplitter)
+		}
+	case SubAfter:
+		for _, bean := range ctx.TraceBeans {
+			buffer.WriteString(string(ctx.TraceType))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(ctx.RequestId)
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(bean.MsgId)
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(strconv.FormatInt(ctx.CostTime, 10))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(strconv.FormatBool(ctx.IsSuccess))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(nullWrap(bean.Keys))
+			buffer.WriteRune(contentSplitter)
+			buffer.WriteString(strconv.Itoa(ctx.ContextCode))
+			buffer.WriteRune(fieldSplitter)
+		}
+	}
+	transferBean := new(TraceTransferBean)
+	transferBean.transData = buffer.String()
+	for _, bean := range ctx.TraceBeans {
+		transferBean.transKey = append(transferBean.transKey, bean.MsgId)
+		if len(bean.Keys) > 0 {
+			transferBean.transKey = append(transferBean.transKey, bean.Keys)
+		}
+	}
+	return transferBean
+}
+
+// compatible with java console.
+func nullWrap(s string) string {
+	if len(s) == 0 {
+		return "null"
+	}
+	return s
+}
+
+type traceDispatcherType int
+
+const (
+	RmqSysTraceTopic = "RMQ_SYS_TRACE_TOPIC"
+
+	ProducerType traceDispatcherType = iota
+	ConsumerType
+
+	maxMsgSize = 128000 - 10*1000
+	batchSize  = 100
+
+	TraceTopicPrefix = SystemTopicPrefix + "TRACE_DATA_"
+	TraceGroupName   = "_INNER_TRACE_PRODUCER"
+)
+
+type TraceDispatcher interface {
+	GetTraceTopicName() string
+
+	Start()
+	Append(ctx TraceContext) bool
+	Close()
+}
+
+type traceDispatcher struct {
+	ctx     context.Context
+	cancel  context.CancelFunc
+	running bool
+
+	traceTopic string
+	access     primitive.AccessChannel
+
+	ticker  *time.Ticker
+	input   chan TraceContext
+	batchCh chan []*TraceContext
+
+	discardCount int64
+
+	// support deliver trace message to other cluster.
+	namesrvs []string
+	// round robin index
+	rrindex int32
+	cli     RMQClient
+}
+
+func NewTraceDispatcher(traceTopic string, access primitive.AccessChannel) *traceDispatcher {
+	ctx := context.Background()
+	ctx, cancel := context.WithCancel(ctx)
+
+	t := traceTopic
+	if len(t) == 0 {
+		t = RmqSysTraceTopic
+	}
+
+	if access == primitive.Cloud {
+		t = TraceTopicPrefix + traceTopic
+	}
+
+	cliOp := DefaultClientOptions()
+	cliOp.RetryTimes = 0
+	cli := GetOrNewRocketMQClient(cliOp)
+	return &traceDispatcher{
+		ctx:    ctx,
+		cancel: cancel,
+
+		traceTopic: t,
+		access:     access,
+		input:      make(chan TraceContext, 1024),
+		batchCh:    make(chan []*TraceContext, 2048),
+		cli:        cli,
+	}
+}
+
+func (td *traceDispatcher) GetTraceTopicName() string {
+	return td.traceTopic
+}
+
+func (td *traceDispatcher) Start() {
+	td.running = true
+	td.cli.Start()
+	go td.process()
+}
+
+func (td *traceDispatcher) Close() {
+	td.running = false
+	td.ticker.Stop()
+	td.cancel()
+}
+
+func (td *traceDispatcher) Append(ctx TraceContext) bool {
+	if !td.running {
+		rlog.Error("traceDispatcher is closed.")
+		return false
+	}
+	select {
+	case td.input <- ctx:
+		return true
+	default:
+		rlog.Warnf("buffer full: %d, ctx is %v", atomic.AddInt64(&td.discardCount, 1), ctx)
+		return false
+	}
+}
+
+// process
+func (td *traceDispatcher) process() {
+	var count int
+	var batch []TraceContext
+	maxWaitDuration := 5 * time.Millisecond
+	maxWaitTime := maxWaitDuration.Nanoseconds()
+	td.ticker = time.NewTicker(maxWaitDuration)
+	lastput := time.Now()
+	for {
+		select {
+		case ctx := <-td.input:
+			count++
+			lastput = time.Now()
+			batch = append(batch, ctx)
+			if count == batchSize {
+				count = 0
+				go td.batchCommit(batch)
+				batch = make([]TraceContext, 0)
+			}
+		case <-td.ticker.C:
+			delta := time.Since(lastput).Nanoseconds()
+			if delta > maxWaitTime {
+				count++
+				lastput = time.Now()
+				if len(batch) > 0 {
+					go td.batchCommit(batch)
+					batch = make([]TraceContext, 0)
+				}
+			}
+		case <-td.ctx.Done():
+			go td.batchCommit(batch)
+			batch = make([]TraceContext, 0)
+
+			now := time.Now().UnixNano() / int64(time.Millisecond)
+			end := now + 500
+			for now < end {
+				now = time.Now().UnixNano() / int64(time.Millisecond)
+				runtime.Gosched()
+			}
+			rlog.Infof("------end trace send %v %v", td.input, td.batchCh)
+		}
+	}
+}
+
+// batchCommit commit slice of TraceContext. convert the ctxs to keyed pair(key is Topic + regionid).
+// flush according key one by one.
+func (td *traceDispatcher) batchCommit(ctxs []TraceContext) {
+	keyedCtxs := make(map[string][]TraceTransferBean)
+	for _, ctx := range ctxs {
+		if len(ctx.TraceBeans) == 0 {
+			return
+		}
+		topic := ctx.TraceBeans[0].Topic
+		regionID := ctx.RegionId
+		key := topic
+		if len(regionID) > 0 {
+			key = fmt.Sprintf("%s%c%s", topic, contentSplitter, regionID)
+		}
+		keyedCtxs[key] = append(keyedCtxs[key], *ctx.marshal2Bean())
+	}
+
+	for k, v := range keyedCtxs {
+		arr := strings.Split(k, string([]byte{contentSplitter}))
+		topic := k
+		regionID := ""
+		if len(arr) > 1 {
+			topic = arr[0]
+			regionID = arr[1]
+		}
+		td.flush(topic, regionID, v)
+	}
+}
+
+type Keyset map[string]struct{}
+
+func (ks Keyset) slice() []string {
+	slice := make([]string, len(ks))
+	for k, _ := range ks {
+		slice = append(slice, k)
+	}
+	return slice
+}
+
+// flush data in batch.
+func (td *traceDispatcher) flush(topic, regionID string, data []TraceTransferBean) {
+	if len(data) == 0 {
+		return
+	}
+
+	keyset := make(Keyset)
+	var builder strings.Builder
+	flushed := true
+	for _, bean := range data {
+		for _, k := range bean.transKey {
+			keyset[k] = struct{}{}
+		}
+		builder.WriteString(bean.transData)
+		flushed = false
+
+		if builder.Len() > maxMsgSize {
+			td.sendTraceDataByMQ(keyset, regionID, builder.String())
+			builder.Reset()
+			keyset = make(Keyset)
+			flushed = true
+		}
+	}
+	if !flushed {
+		td.sendTraceDataByMQ(keyset, regionID, builder.String())
+	}
+}
+
+func (td *traceDispatcher) sendTraceDataByMQ(keyset Keyset, regionID string, data string) {
+	msg := primitive.NewMessage(td.traceTopic, []byte(data))
+	msg.SetKeys(keyset.slice())
+
+	mq, addr := td.findMq()
+	if mq == nil {
+		return
+	}
+
+	var req = td.buildSendRequest(mq, msg)
+	td.cli.InvokeAsync(addr, req, 5000*time.Millisecond, func(command *remote.RemotingCommand, e error) {
+		if e != nil {
+			rlog.Error("send trace data ,the traceData is %v", data)
+		}
+	})
+}
+
+func (td *traceDispatcher) findMq() (*primitive.MessageQueue, string) {
+	mqs, err := FetchPublishMessageQueues(td.traceTopic)
+	if err != nil {
+		rlog.Error("fetch publish message queues failed. err: %v", err)
+		return nil, ""
+	}
+	i := atomic.AddInt32(&td.rrindex, 1)
+	if i < 0 {
+		i = 0
+		atomic.StoreInt32(&td.rrindex, 0)
+	}
+	i %= int32(len(mqs))
+	mq := mqs[i]
+
+	brokerName := mq.BrokerName
+	addr := FindBrokerAddrByName(brokerName)
+
+	return mq, addr
+}
+
+func (td *traceDispatcher) buildSendRequest(mq *primitive.MessageQueue,
+	msg *primitive.Message) *remote.RemotingCommand {
+	req := &SendMessageRequest{
+		ProducerGroup: TraceGroupName,
+		Topic:         mq.Topic,
+		QueueId:       mq.QueueId,
+		BornTimestamp: time.Now().UnixNano() / int64(time.Millisecond),
+		Flag:          msg.Flag,
+		Properties:    primitive.MarshalPropeties(msg.Properties),
+	}
+
+	return remote.NewRemotingCommand(ReqSendMessage, req, msg.Body)
+}
diff --git a/internal/trace_test.go b/internal/trace_test.go
new file mode 100644
index 0000000..23808a3
--- /dev/null
+++ b/internal/trace_test.go
@@ -0,0 +1,125 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package internal
+
+import (
+	"testing"
+
+	"github.com/apache/rocketmq-client-go/primitive"
+	. "github.com/smartystreets/goconvey/convey"
+	"github.com/stretchr/testify/assert"
+)
+
+func TestMarshal2Bean(t *testing.T) {
+
+	Convey("marshal of TraceContext", t, func() {
+
+		Convey("When marshal producer trace data", func() {
+			traceCtx := TraceContext{
+				TraceType: Pub,
+				TimeStamp: 1563780533299,
+				RegionId:  "DefaultRegion",
+				GroupName: "ProducerGroupName",
+				CostTime:  3572,
+				IsSuccess: true,
+				RequestId: "0A5DE93A815518B4AAC26F77F8330001",
+				TraceBeans: []TraceBean{
+					{
+						Topic:       "TopicTest",
+						MsgId:       "0A5DE93A833B18B4AAC26F842A2F0000",
+						OffsetMsgId: "0A5DE93A00002A9F000000000042E322",
+						Tags:        "TagA",
+						Keys:        "OrderID1882",
+						StoreHost:   "10.93.233.58:10911",
+						ClientHost:  "10.93.233.58",
+						StoreTime:   1563780535085,
+						BodyLength:  11,
+						MsgType:     primitive.NormalMsg,
+					},
+				},
+			}
+			bean := traceCtx.marshal2Bean()
+			assert.Equal(t, "Pub1563780533299DefaultRegionProducerGroupNameTopicTest0A5DE93A833B18B4AAC26F842A2F0000TagAOrderID188210.93.233.58:1091111357200A5DE93A00002A9F000000000042E322true\x02",
+				bean.transData)
+			assert.Equal(t, []string{"0A5DE93A833B18B4AAC26F842A2F0000", "OrderID1882"}, bean.transKey)
+
+			// consumer before test
+			traceCtx = TraceContext{
+				TraceType: SubBefore,
+				TimeStamp: 1563789119096,
+				GroupName: "CID_JODIE_1",
+				IsSuccess: true,
+				RequestId: "0A5DE93A96A818B4AAC26FFAFA780007",
+				TraceBeans: []TraceBean{
+					{
+						Topic:      "TopicTest",
+						MsgId:      "0A5DE93A973418B4AAC26FFAFA5A0000",
+						Tags:       "TagA",
+						Keys:       "OrderID1882",
+						StoreHost:  "10.93.233.58",
+						ClientHost: "10.93.233.58",
+						StoreTime:  1563789119092,
+						BodyLength: 190,
+					},
+				},
+			}
+			bean = traceCtx.marshal2Bean()
+
+			Convey("transData should equal to expected", func() {
+				So(bean.transData, ShouldEqual, "SubBefore1563789119096CID_JODIE_10A5DE93A96A818B4AAC26FFAFA7800070A5DE93A973418B4AAC26FFAFA5A00000OrderID1882")
+			})
+
+			Convey("transkey should equal to expected", func() {
+				expectedKey := []string{"0A5DE93A973418B4AAC26FFAFA5A0000", "OrderID1882"}
+				So(bean.transKey[0], ShouldEqual, expectedKey[0])
+				So(bean.transKey[1], ShouldEqual, expectedKey[1])
+			})
+		})
+
+		Convey("When marshal consumer trace data", func() {
+			traceCtx := TraceContext{
+				TraceType: SubAfter,
+				TimeStamp: 1563789119096,
+				GroupName: "CID_JODIE_1",
+				IsSuccess: true,
+				RequestId: "0A5DE93A96A818B4AAC26FFAFA780007",
+				TraceBeans: []TraceBean{
+					{
+						Topic:      "TopicTest",
+						MsgId:      "0A5DE93A973418B4AAC26FFAFA5A0000",
+						Tags:       "TagA",
+						Keys:       "OrderID1882",
+						StoreHost:  "10.93.233.58",
+						ClientHost: "10.93.233.58",
+						StoreTime:  1563789119092,
+						BodyLength: 190,
+					},
+				},
+			}
+			bean := traceCtx.marshal2Bean()
+			Convey("transData should equal to expected", func() {
+				So(bean.transData, ShouldEqual, "SubAfter0A5DE93A96A818B4AAC26FFAFA7800070A5DE93A973418B4AAC26FFAFA5A00000trueOrderID18820")
+			})
+			Convey("transkey should equal to expected", func() {
+				expectedKey := []string{"0A5DE93A973418B4AAC26FFAFA5A0000", "OrderID1882"}
+				So(bean.transKey[0], ShouldEqual, expectedKey[0])
+				So(bean.transKey[1], ShouldEqual, expectedKey[1])
+			})
+		})
+	})
+}
diff --git a/internal/utils/helper.go b/internal/utils/helper.go
index d8797dd..2b9eb2a 100644
--- a/internal/utils/helper.go
+++ b/internal/utils/helper.go
@@ -17,62 +17,10 @@ limitations under the License.
 
 package utils
 
-import (
-	"bytes"
-	"encoding/binary"
-	"fmt"
-	"os"
-	"sync"
-	"time"
-)
-
-var (
-	counter        int16 = 0
-	startTimestamp int64 = 0
-	nextTimestamp  int64 = 0
-	prefix         string
-	locker         sync.Mutex
-)
-
-func MessageClientID() string {
-	locker.Lock()
-	defer locker.Unlock()
-	if prefix == "" {
-		buf := new(bytes.Buffer)
-		binary.Write(buf, binary.BigEndian, LocalIP())
-		binary.Write(buf, binary.BigEndian, Pid())
-		binary.Write(buf, binary.BigEndian, ClassLoaderID())
-		prefix = fmt.Sprintf("%x", buf.Bytes())
-	}
-	if time.Now().Unix() > nextTimestamp {
-		updateTimestamp()
-	}
-	counter++
-	buf := new(bytes.Buffer)
-	binary.Write(buf, binary.BigEndian, int32((time.Now().Unix()-startTimestamp)*1000))
-	binary.Write(buf, binary.BigEndian, counter)
-	return prefix + fmt.Sprintf("%x", buf.Bytes())
-
-}
-
-func updateTimestamp() {
-	year, month := time.Now().Year(), time.Now().Month()
-	startTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).Unix()
-	nextTimestamp = time.Date(year, month, 1, 0, 0, 0, 0, time.Local).AddDate(0, 1, 0).Unix()
-}
-
 func GetAddressByBytes(data []byte) string {
 	return "127.0.0.1"
 }
 
-func Pid() int16 {
-	return int16(os.Getpid())
-}
-
-func ClassLoaderID() int32 {
-	return 0
-}
-
 func UnCompress(data []byte) []byte {
 	return data
 }
diff --git a/internal/utils/helper_test.go b/internal/utils/helper_test.go
deleted file mode 100644
index ee55cd2..0000000
--- a/internal/utils/helper_test.go
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package utils
-
-import (
-	"testing"
-)
-
-func TestClassLoaderID(t *testing.T) {
-	if ClassLoaderID() != 0 {
-		t.Errorf("wrong ClassLoaderID, want=%d, got=%d", 0, ClassLoaderID())
-	}
-}
-
-func BenchmarkMessageClientID(b *testing.B) {
-	for i := 0; i < b.N; i++ {
-		MessageClientID()
-	}
-}
diff --git a/internal/utils/net.go b/internal/utils/net.go
index 5da1edc..65f2731 100644
--- a/internal/utils/net.go
+++ b/internal/utils/net.go
@@ -1,20 +1,28 @@
 package utils
 
 import (
+	"bytes"
 	"errors"
 	"fmt"
 	"net"
+	"strconv"
+	"time"
 )
 
-func LocalIP() string {
-	ip, err := clientIP4()
+var (
+	LocalIP string
+)
+
+func init() {
+	ip, err := ClientIP4()
 	if err != nil {
-		return ""
+		LocalIP = ""
+	} else {
+		LocalIP = fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
 	}
-	return fmt.Sprintf("%d.%d.%d.%d", ip[0], ip[1], ip[2], ip[3])
 }
 
-func clientIP4() ([]byte, error) {
+func ClientIP4() ([]byte, error) {
 	addrs, err := net.InterfaceAddrs()
 	if err != nil {
 		return nil, errors.New("unexpected IP address")
@@ -28,3 +36,9 @@ func clientIP4() ([]byte, error) {
 	}
 	return nil, errors.New("unknown IP address")
 }
+
+func FakeIP() []byte {
+	buf := bytes.NewBufferString("")
+	buf.WriteString(strconv.FormatInt(time.Now().UnixNano()/int64(time.Millisecond), 10))
+	return buf.Bytes()[4:8]
+}
diff --git a/internal/utils/net_test.go b/internal/utils/net_test.go
index 9f76062..44454ad 100644
--- a/internal/utils/net_test.go
+++ b/internal/utils/net_test.go
@@ -3,5 +3,5 @@ package utils
 import "testing"
 
 func TestLocalIP2(t *testing.T) {
-	t.Log(LocalIP())
+	t.Log(LocalIP)
 }
diff --git a/primitive/ctx.go b/primitive/ctx.go
index 6e9b6d3..186521a 100644
--- a/primitive/ctx.go
+++ b/primitive/ctx.go
@@ -23,23 +23,55 @@ package primitive
 import (
 	"context"
 	"math"
+
+	"github.com/apache/rocketmq-client-go/rlog"
 )
 
 type CtxKey int
 
+type CommunicationMode string
+
+type ConsumeReturnType string
+
+func (c ConsumeReturnType) Ordinal() int {
+	switch c {
+	case SuccessReturn:
+		return 0
+	case TimeoutReturn:
+		return 1
+	case ExceptionRetrun:
+		return 2
+	case NullReturn:
+		return 3
+	case FailedReturn:
+		return 4
+	default:
+		rlog.Error("illegal ConsumeReturnType: %v", c)
+		return 0
+	}
+}
+
 const (
 	method CtxKey = iota
 	msgCtx
 	orderlyCtx
 	concurrentlyCtx
+	producerCtx
 
 	// method name in  producer
-	SendSync   = "SendSync"
-	SendOneway = "SendOneway"
-	SendAsync  = "SendAsync"
+	SendSync   CommunicationMode = "SendSync"
+	SendOneway CommunicationMode = "SendOneway"
+	SendAsync  CommunicationMode = "SendAsync"
 	// method name in consumer
 	ConsumerPush = "ConsumerPush"
 	ConsumerPull = "ConsumerPull"
+
+	PropCtxType                       = "ConsumeContextType"
+	SuccessReturn   ConsumeReturnType = "SUCCESS"
+	TimeoutReturn   ConsumeReturnType = "TIMEOUT"
+	ExceptionRetrun ConsumeReturnType = "EXCEPTION"
+	NullReturn      ConsumeReturnType = "RETURNNULL"
+	FailedReturn    ConsumeReturnType = "FAILED"
 )
 
 type ConsumeMessageContext struct {
@@ -53,13 +85,13 @@ type ConsumeMessageContext struct {
 }
 
 // WithMethod set call method name
-func WithMethod(ctx context.Context, m string) context.Context {
+func WithMethod(ctx context.Context, m CommunicationMode) context.Context {
 	return context.WithValue(ctx, method, m)
 }
 
 // GetMethod get call method name
-func GetMethod(ctx context.Context) string {
-	return ctx.Value(method).(string)
+func GetMethod(ctx context.Context) CommunicationMode {
+	return ctx.Value(method).(CommunicationMode)
 }
 
 // WithConsumerCtx set ConsumeMessageContext in PushConsumer
@@ -116,3 +148,24 @@ func GetConcurrentlyCtx(ctx context.Context) (*ConsumeConcurrentlyContext, bool)
 	c, exist := ctx.Value(concurrentlyCtx).(*ConsumeConcurrentlyContext)
 	return c, exist
 }
+
+type ProducerCtx struct {
+	ProducerGroup     string
+	Message           Message
+	MQ                MessageQueue
+	BrokerAddr        string
+	BornHost          string
+	CommunicationMode CommunicationMode
+	SendResult        *SendResult
+	Props             map[string]string
+	MsgType           MessageType
+	Namespace         string
+}
+
+func WithProducerCtx(ctx context.Context, c *ProducerCtx) context.Context {
+	return context.WithValue(ctx, producerCtx, c)
+}
+
+func GetProducerCtx(ctx context.Context) *ProducerCtx {
+	return ctx.Value(producerCtx).(*ProducerCtx)
+}
diff --git a/primitive/interceptor.go b/primitive/interceptor.go
index 3c4c659..7fd3f83 100644
--- a/primitive/interceptor.go
+++ b/primitive/interceptor.go
@@ -28,3 +28,9 @@ type Invoker func(ctx context.Context, req, reply interface{}) error
 // In PushConsumer call, the req is []*MessageExt type and the reply is ConsumeResultHolder,
 // use type assert to get real type.
 type Interceptor func(ctx context.Context, req, reply interface{}, next Invoker) error
+
+// config for message trace.
+type TraceConfig struct {
+	TraceTopic string
+	Access     AccessChannel
+}
diff --git a/primitive/message.go b/primitive/message.go
index 1da6d5a..28c9eab 100644
--- a/primitive/message.go
+++ b/primitive/message.go
@@ -20,6 +20,7 @@ package primitive
 import (
 	"fmt"
 	"strconv"
+	"strings"
 
 	"github.com/apache/rocketmq-client-go/internal/utils"
 )
@@ -108,6 +109,23 @@ func (msg *Message) RemoveProperty(key string) string {
 	return value
 }
 
+func (msg *Message) SetKeys(keys []string) {
+	var sb strings.Builder
+	for _, k := range keys {
+		sb.WriteString(k)
+		sb.WriteString(PropertyKeySeparator)
+	}
+	msg.PutProperty(PropertyKeys, sb.String())
+}
+
+func (msg *Message) GetTags() string {
+	return msg.Properties[PropertyTags]
+}
+
+func (msg *Message) GetKeys() string {
+	return msg.Properties[PropertyKeys]
+}
+
 type MessageExt struct {
 	Message
 	MsgId                     string
@@ -129,6 +147,14 @@ func (msgExt *MessageExt) GetTags() string {
 	return msgExt.Properties[PropertyTags]
 }
 
+func (msgExt *MessageExt) GetRegionID() string {
+	return msgExt.Properties[PropertyMsgRegion]
+}
+
+func (msgExt *MessageExt) IsTraceOn() string {
+	return msgExt.Properties[PropertyTraceSwitch]
+}
+
 func (msgExt *MessageExt) String() string {
 	return fmt.Sprintf("[Message=%s, MsgId=%s, QueueId=%d, StoreSize=%d, QueueOffset=%d, SysFlag=%d, "+
 		"BornTimestamp=%d, BornHost=%s, StoreTimestamp=%d, StoreHost=%s, CommitLogOffset=%d, BodyCRC=%d, "+
@@ -162,3 +188,21 @@ func (mq MessageQueue) Equals(queue *MessageQueue) bool {
 	// TODO
 	return mq.BrokerName == queue.BrokerName && mq.Topic == queue.Topic && mq.QueueId == mq.QueueId
 }
+
+type AccessChannel int
+
+const (
+	// connect to private IDC cluster.
+	Local AccessChannel = iota
+	// connect to Cloud service.
+	Cloud
+)
+
+type MessageType int
+
+const (
+	NormalMsg MessageType = iota
+	TransMsgHalf
+	TransMsgCommit
+	DelayMsg
+)
diff --git a/primitive/result.go b/primitive/result.go
index f21cc2e..4664cf0 100644
--- a/primitive/result.go
+++ b/primitive/result.go
@@ -20,7 +20,9 @@ package primitive
 import (
 	"bytes"
 	"encoding/binary"
+	"encoding/hex"
 	"fmt"
+	"strings"
 
 	"github.com/apache/rocketmq-client-go/internal/utils"
 )
@@ -203,7 +205,7 @@ func DecodeMessage(data []byte) []*MessageExt {
 		}
 		count += 2 + int(propertiesLength)
 
-		msg.MsgId = createMessageId(hostBytes, msg.CommitLogOffset)
+		msg.MsgId = createMessageId(hostBytes, port, msg.CommitLogOffset)
 		//count += 16
 
 		msgs = append(msgs, msg)
@@ -212,8 +214,12 @@ func DecodeMessage(data []byte) []*MessageExt {
 	return msgs
 }
 
-func createMessageId(addr []byte, offset int64) string {
-	return "msgID" // TODO
+func createMessageId(addr []byte, port int32, offset int64) string {
+	buffer := new(bytes.Buffer)
+	buffer.Write(addr)
+	binary.Write(buffer, binary.BigEndian, port)
+	binary.Write(buffer, binary.BigEndian, offset)
+	return strings.ToUpper(hex.EncodeToString(buffer.Bytes()))
 }
 
 // unmarshalProperties parse data into property kv pairs.
diff --git a/primitive/result_test.go b/primitive/result_test.go
index 131db8c..a3fd5ab 100644
--- a/primitive/result_test.go
+++ b/primitive/result_test.go
@@ -18,12 +18,14 @@ limitations under the License.
 package primitive
 
 import (
+	"strings"
 	"testing"
 
+	. "github.com/smartystreets/goconvey/convey"
 	"github.com/stretchr/testify/assert"
 )
 
-func Test(t *testing.T) {
+func TestProperties(t *testing.T) {
 	kv := map[string]string{
 		"k1": "v1",
 		"k2": "v2",
@@ -32,3 +34,17 @@ func Test(t *testing.T) {
 	kv2 := unmarshalProperties([]byte(str))
 	assert.Equal(t, kv, kv2)
 }
+
+func TestCreateMessageId(t *testing.T) {
+	Convey("MessageId gen", t, func() {
+		b := []byte{10, 93, 233, 58}
+		port := int32(10911)
+		offset := int64(4391252)
+		id := createMessageId(b, port, offset)
+
+		Convey("generated messageId should be equal to expected", func() {
+			assert.Equal(t, strings.ToLower("0A5DE93A00002A9F0000000000430154"), id)
+		})
+	})
+
+}
diff --git a/producer/interceptor.go b/producer/interceptor.go
new file mode 100644
index 0000000..46f8042
--- /dev/null
+++ b/producer/interceptor.go
@@ -0,0 +1,97 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+/**
+ * builtin interceptor
+ */
+package producer
+
+import (
+	"context"
+	"time"
+
+	"github.com/apache/rocketmq-client-go/internal"
+	"github.com/apache/rocketmq-client-go/internal/utils"
+	"github.com/apache/rocketmq-client-go/primitive"
+)
+
+// WithTrace support rocketmq trace: https://github.com/apache/rocketmq/wiki/RIP-6-Message-Trace.
+func WithTrace(traceCfg primitive.TraceConfig) Option {
+	return func(options *producerOptions) {
+
+		ori := options.Interceptors
+		options.Interceptors = make([]primitive.Interceptor, 0)
+		options.Interceptors = append(options.Interceptors, newTraceInterceptor(traceCfg))
+		options.Interceptors = append(options.Interceptors, ori...)
+	}
+}
+
+func newTraceInterceptor(traceCfg primitive.TraceConfig) primitive.Interceptor {
+	dispatcher := internal.NewTraceDispatcher(traceCfg.TraceTopic, traceCfg.Access)
+	dispatcher.Start()
+
+	return func(ctx context.Context, req, reply interface{}, next primitive.Invoker) error {
+		beginT := time.Now()
+		err := next(ctx, req, reply)
+
+		producerCtx := primitive.GetProducerCtx(ctx)
+		if producerCtx.Message.Topic == dispatcher.GetTraceTopicName() {
+			return next(ctx, req, reply)
+		}
+
+		// SendOneway && SendAsync has no reply.
+		if reply == nil {
+			return err
+		}
+
+		result := reply.(*primitive.SendResult)
+		if result.RegionID == "" || !result.TraceOn {
+			return err
+		}
+
+		sendSuccess := result.Status == primitive.SendOK
+		costT := time.Since(beginT).Nanoseconds() / int64(time.Millisecond)
+		storeT := beginT.UnixNano()/int64(time.Millisecond) + costT/2
+
+		traceBean := internal.TraceBean{
+			Topic:       producerCtx.Message.Topic,
+			Tags:        producerCtx.Message.GetTags(),
+			Keys:        producerCtx.Message.GetKeys(),
+			StoreHost:   producerCtx.BrokerAddr,
+			ClientHost:  utils.LocalIP,
+			BodyLength:  len(producerCtx.Message.Body),
+			MsgType:     producerCtx.MsgType,
+			MsgId:       result.MsgID,
+			OffsetMsgId: result.OffsetMsgID,
+			StoreTime:   storeT,
+		}
+
+		traceCtx := internal.TraceContext{
+			RequestId: internal.CreateUniqID(), // set id
+			TimeStamp: time.Now().UnixNano() / int64(time.Millisecond),
+
+			TraceType:  internal.Pub,
+			GroupName:  producerCtx.ProducerGroup,
+			RegionId:   result.RegionID,
+			TraceBeans: []internal.TraceBean{traceBean},
+			CostTime:   costT,
+			IsSuccess:  sendSuccess,
+		}
+		dispatcher.Append(traceCtx)
+		return err
+	}
+}
diff --git a/producer/producer.go b/producer/producer.go
index 12a72b6..13f7952 100644
--- a/producer/producer.go
+++ b/producer/producer.go
@@ -25,6 +25,7 @@ import (
 
 	"github.com/apache/rocketmq-client-go/internal"
 	"github.com/apache/rocketmq-client-go/internal/remote"
+	"github.com/apache/rocketmq-client-go/internal/utils"
 	"github.com/apache/rocketmq-client-go/primitive"
 	"github.com/apache/rocketmq-client-go/rlog"
 	"github.com/pkg/errors"
@@ -129,6 +130,15 @@ func (p *defaultProducer) SendSync(ctx context.Context, msg *primitive.Message)
 	resp := new(primitive.SendResult)
 	if p.interceptor != nil {
 		primitive.WithMethod(ctx, primitive.SendSync)
+		producerCtx := &primitive.ProducerCtx{
+			ProducerGroup: p.group,
+			CommunicationMode: primitive.SendSync,
+			BornHost: utils.LocalIP,
+			Message: *msg,
+			SendResult: resp,
+		}
+		ctx = primitive.WithProducerCtx(ctx, producerCtx)
+
 		err := p.interceptor(ctx, msg, resp, func(ctx context.Context, req, reply interface{}) error {
 			var err error
 			realReq := req.(*primitive.Message)
@@ -151,6 +161,7 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 		err error
 	)
 
+	var producerCtx *primitive.ProducerCtx
 	for retryCount := 0; retryCount < retryTime; retryCount++ {
 		mq := p.selectMessageQueue(msg)
 		if mq == nil {
@@ -163,6 +174,12 @@ func (p *defaultProducer) sendSync(ctx context.Context, msg *primitive.Message,
 			return fmt.Errorf("topic=%s route info not found", mq.Topic)
 		}
 
+		if p.interceptor != nil {
+			producerCtx = primitive.GetProducerCtx(ctx)
+			producerCtx.BrokerAddr = addr
+			producerCtx.MQ = *mq
+		}
+
 		res, _err := p.client.InvokeSync(addr, p.buildSendRequest(mq, msg), 3*time.Second)
 		if _err != nil {
 			err = _err


Mime
View raw message