rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vongosl...@apache.org
Subject [rocketmq-client-cpp] branch master updated: [ISSUE #275] Add trace message for pub and sub. (#276)
Date Mon, 16 Mar 2020 12:40:16 GMT
This is an automated email from the ASF dual-hosted git repository.

vongosling pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 900b937  [ISSUE #275] Add trace message for pub and sub. (#276)
900b937 is described below

commit 900b937ff97b236c19804f201d24a1e674447b62
Author: dinglei <libya_003@163.com>
AuthorDate: Mon Mar 16 20:40:09 2020 +0800

    [ISSUE #275] Add trace message for pub and sub. (#276)
    
    * feat(trace): add trace message for sync producer.
    
    * feat(trace): add message trace for push consumer
    
    * feat(trace): add test case for trace message of push consumer
    
    * feat(trace): add default key value to trace message to avoid the bug in broker.
---
 include/DefaultMQProducer.h                        |   2 +
 include/DefaultMQPushConsumer.h                    |   2 +
 src/MQClientFactory.cpp                            |   2 +
 src/common/DefaultMQClient.cpp                     |  10 ++
 src/common/NameSpaceUtil.cpp                       |  14 +++
 src/common/NameSpaceUtil.h                         |   2 +
 src/consumer/ConsumeMessageConcurrentlyService.cpp |  66 ++++++++--
 src/consumer/ConsumeMessageHookImpl.cpp            | 114 +++++++++++++++++
 .../ConsumeMessageHookImpl.h}                      |  39 ++----
 src/consumer/ConsumeMessageOrderlyService.cpp      |  25 ++++
 src/consumer/DefaultMQPushConsumer.cpp             |   7 +-
 src/consumer/DefaultMQPushConsumerImpl.cpp         |  87 ++++++++++++-
 src/consumer/DefaultMQPushConsumerImpl.h           |  18 +++
 src/include/DefaultMQClient.h                      |   5 +
 src/producer/DefaultMQProducer.cpp                 |   7 +-
 src/producer/DefaultMQProducerImpl.cpp             | 135 +++++++++++++++++++--
 src/producer/DefaultMQProducerImpl.h               |  19 +++
 src/producer/SendMessageHookImpl.cpp               | 108 +++++++++++++++++
 .../SendMessageHookImpl.h}                         |  39 ++----
 src/trace/ConsumeMessageContext.cpp                | 102 ++++++++++++++++
 src/trace/ConsumeMessageContext.h                  |  86 +++++++++++++
 .../ConsumeMessageHook.h}                          |  38 ++----
 src/trace/SendMessageContext.cpp                   | 115 ++++++++++++++++++
 src/trace/SendMessageContext.h                     |  96 +++++++++++++++
 src/{include => trace}/SendMessageHook.h           |  34 ++----
 src/trace/TraceBean.cpp                            | 121 ++++++++++++++++++
 src/trace/TraceBean.h                              |  89 ++++++++++++++
 .../SendMessageHook.h => trace/TraceContant.cpp}   |  39 ++----
 .../SendMessageHook.h => trace/TraceContant.h}     |  50 ++++----
 src/trace/TraceContext.cpp                         | 113 +++++++++++++++++
 src/trace/TraceContext.h                           |  88 ++++++++++++++
 src/trace/TraceTransferBean.cpp                    |  38 ++++++
 .../TraceTransferBean.h}                           |  43 +++----
 src/trace/TraceUtil.cpp                            | 124 +++++++++++++++++++
 .../SendMessageHook.h => trace/TraceUtil.h}        |  38 ++----
 test/src/common/NameSpaceUtilTest.cpp              |   2 +
 .../src/consumer/DefaultMQPushConsumerImplTest.cpp | 119 ++++++++++++++++++
 test/src/producer/DefaultMQProducerImplTest.cpp    |  45 ++++++-
 test/src/trace/TraceBeanTest.cpp                   |  77 ++++++++++++
 test/src/trace/TraceUtilTest.cpp                   |  85 +++++++++++++
 40 files changed, 1998 insertions(+), 245 deletions(-)

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 991d27d..6ca52f9 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -136,6 +136,8 @@ class ROCKETMQCLIENT_API DefaultMQProducer {
 
   void setUnitName(std::string unitName);
   const std::string& getUnitName() const;
+  void setMessageTrace(bool messageTrace);
+  bool getMessageTrace() const;
 
  private:
   DefaultMQProducerImpl* impl;
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index 5b10b3d..46f76cf 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -132,6 +132,8 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer {
   const std::string& getUnitName() const;
 
   void setAsyncPull(bool asyncFlag);
+  void setMessageTrace(bool messageTrace);
+  bool getMessageTrace() const;
 
  private:
   DefaultMQPushConsumerImpl* impl;
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index cb1a491..d210965 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -77,6 +77,8 @@ void MQClientFactory::start() {
       m_serviceState = RUNNING;
       break;
     case RUNNING:
+      LOG_INFO("The Factory object:%s start before with now state:%d", m_clientId.c_str(), m_serviceState);
+      break;
     case SHUTDOWN_ALREADY:
     case START_FAILED:
       LOG_INFO("The Factory object:%s start failed with fault state:%d", m_clientId.c_str(), m_serviceState);
diff --git a/src/common/DefaultMQClient.cpp b/src/common/DefaultMQClient.cpp
index 269ce61..c271212 100644
--- a/src/common/DefaultMQClient.cpp
+++ b/src/common/DefaultMQClient.cpp
@@ -46,6 +46,7 @@ DefaultMQClient::DefaultMQClient() {
   m_tcpConnectTimeout = 3000;        // 3s
   m_tcpTransportTryLockTimeout = 3;  // 3s
   m_unitName = "";
+  m_messageTrace = false;
 }
 
 DefaultMQClient::~DefaultMQClient() {}
@@ -216,6 +217,14 @@ const string& DefaultMQClient::getUnitName() const {
   return m_unitName;
 }
 
+bool DefaultMQClient::getMessageTrace() const {
+  return m_messageTrace;
+}
+
+void DefaultMQClient::setMessageTrace(bool mMessageTrace) {
+  m_messageTrace = mMessageTrace;
+}
+
 void DefaultMQClient::setSessionCredentials(const string& input_accessKey,
                                             const string& input_secretKey,
                                             const string& input_onsChannel) {
@@ -239,6 +248,7 @@ void DefaultMQClient::showClientConfigs() {
   LOG_WARN("PullThreadNum:%d", m_pullThreadNum);
   LOG_WARN("TcpConnectTimeout:%lld ms", m_tcpConnectTimeout);
   LOG_WARN("TcpTransportTryLockTimeout:%lld s", m_tcpTransportTryLockTimeout);
+  LOG_WARN("OpenMessageTrace:%s", m_messageTrace ? "true" : "false");
   // LOG_WARN("*****************************************************************************");
 }
 //<!************************************************************************
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
index 0a08deb..61866e9 100644
--- a/src/common/NameSpaceUtil.cpp
+++ b/src/common/NameSpaceUtil.cpp
@@ -17,6 +17,7 @@
 
 #include "NameSpaceUtil.h"
 #include "Logging.h"
+#include "TraceContant.h"
 
 namespace rocketmq {
 
@@ -75,6 +76,15 @@ bool NameSpaceUtil::checkNameSpaceExistInNameServer(string nameServerAddr) {
   return false;
 }
 
+string NameSpaceUtil::withoutNameSpace(string source, string nameSpace) {
+  if (!nameSpace.empty()) {
+    auto index = source.find(nameSpace);
+    if (index != string::npos) {
+      return source.substr(index + nameSpace.length() + NAMESPACE_SPLIT_FLAG.length(), source.length());
+    }
+  }
+  return source;
+}
 string NameSpaceUtil::withNameSpace(string source, string ns) {
   if (!ns.empty()) {
     return ns + NAMESPACE_SPLIT_FLAG + source;
@@ -83,6 +93,10 @@ string NameSpaceUtil::withNameSpace(string source, string ns) {
 }
 
 bool NameSpaceUtil::hasNameSpace(string source, string ns) {
+  if (source.find(TraceContant::TRACE_TOPIC) != string::npos) {
+    LOG_DEBUG("Find Trace Topic [%s]", source.c_str());
+    return true;
+  }
   if (!ns.empty() && source.length() >= ns.length() && source.find(ns) != string::npos) {
     return true;
   }
diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h
index a63d647..cdaaf6b 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/NameSpaceUtil.h
@@ -43,6 +43,8 @@ class NameSpaceUtil {
 
   static string withNameSpace(string source, string ns);
 
+  static string withoutNameSpace(string source, string ns);
+
   static bool hasNameSpace(string source, string ns);
 };
 
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index deda8ac..ef43bb7 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -17,11 +17,13 @@
 #if !defined(WIN32) && !defined(__APPLE__)
 #include <sys/prctl.h>
 #endif
+
 #include "ConsumeMsgService.h"
 #include "DefaultMQPushConsumer.h"
 #include "Logging.h"
 #include "MessageAccessor.h"
 #include "UtilAll.h"
+
 namespace rocketmq {
 
 //<!************************************************************************
@@ -80,6 +82,7 @@ void ConsumeMessageConcurrentlyService::submitConsumeRequest(boost::weak_ptr<Pul
              request->m_messageQueue.toString().c_str());
   }
 }
+
 void ConsumeMessageConcurrentlyService::submitConsumeRequestLater(boost::weak_ptr<PullRequest> pullRequest,
                                                                   vector<MQMessageExt>& msgs,
                                                                   int millis) {
@@ -146,14 +149,26 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
   if (request->isDropped()) {
     LOG_WARN("the pull request for %s Had been dropped before", request->m_messageQueue.toString().c_str());
     request->clearAllMsgs();  // add clear operation to avoid bad state when
-                              // dropped pullRequest returns normal
+    // dropped pullRequest returns normal
     return;
   }
   if (msgs.empty()) {
     LOG_WARN("the msg of pull result is NULL,its mq:%s", (request->m_messageQueue).toString().c_str());
     return;
   }
-
+  ConsumeMessageContext consumeMessageContext;
+  DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
+  if (pConsumer) {
+    if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
+      consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
+      consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
+      consumeMessageContext.setMessageQueue(request->m_messageQueue);
+      consumeMessageContext.setMsgList(msgs);
+      consumeMessageContext.setSuccess(false);
+      consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
+      pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
+    }
+  }
   ConsumeStatus status = CONSUME_SUCCESS;
   if (m_pMessageListener != NULL) {
     resetRetryTopic(msgs);
@@ -163,11 +178,48 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
     if (m_pConsumer->isUseNameSpaceMode()) {
       MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
     }
-    try {
-      status = m_pMessageListener->consumeMessage(msgs);
-    } catch (...) {
-      status = RECONSUME_LATER;
-      LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
+
+    if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
+      // For open trace message, consume message one by one.
+      for (size_t i = 0; i < msgs.size(); ++i) {
+        LOG_DEBUG("=====Trace Receive Messages,Topic[%s], MsgId[%s],Body[%s],RetryTimes[%d]",
+                  msgs[i].getTopic().c_str(), msgs[i].getMsgId().c_str(), msgs[i].getBody().c_str(),
+                  msgs[i].getReconsumeTimes());
+        std::vector<MQMessageExt> msgInner;
+        msgInner.push_back(msgs[i]);
+        if (status != CONSUME_SUCCESS) {
+          // all the Messages behind should be set to failed.
+          status = RECONSUME_LATER;
+          consumeMessageContext.setMsgIndex(i);
+          consumeMessageContext.setStatus("RECONSUME_LATER");
+          consumeMessageContext.setSuccess(false);
+          pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
+          continue;
+        }
+        try {
+          status = m_pMessageListener->consumeMessage(msgInner);
+        } catch (...) {
+          status = RECONSUME_LATER;
+          LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
+        }
+        consumeMessageContext.setMsgIndex(i);  // indicate message position,not support batch consumer
+        if (status == CONSUME_SUCCESS) {
+          consumeMessageContext.setStatus("CONSUME_SUCCESS");
+          consumeMessageContext.setSuccess(true);
+        } else {
+          status = RECONSUME_LATER;
+          consumeMessageContext.setStatus("RECONSUME_LATER");
+          consumeMessageContext.setSuccess(false);
+        }
+        pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
+      }
+    } else {
+      try {
+        status = m_pMessageListener->consumeMessage(msgs);
+      } catch (...) {
+        status = RECONSUME_LATER;
+        LOG_ERROR("Consumer's code is buggy. Un-caught exception raised");
+      }
     }
   }
 
diff --git a/src/consumer/ConsumeMessageHookImpl.cpp b/src/consumer/ConsumeMessageHookImpl.cpp
new file mode 100644
index 0000000..a1045c6
--- /dev/null
+++ b/src/consumer/ConsumeMessageHookImpl.cpp
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeMessageHookImpl.h"
+#include <memory>
+#include <string>
+#include "ConsumeMessageContext.h"
+#include "DefaultMQPushConsumerImpl.h"
+#include "Logging.h"
+#include "MQClientException.h"
+#include "NameSpaceUtil.h"
+#include "TraceContant.h"
+#include "TraceContext.h"
+#include "TraceTransferBean.h"
+#include "TraceUtil.h"
+#include "UtilAll.h"
+namespace rocketmq {
+
+class TraceMessageConsumeCallback : public SendCallback {
+  virtual void onSuccess(SendResult& sendResult) {
+    LOG_DEBUG("TraceMessageConsumeCallback, MsgId:[%s],OffsetMsgId[%s]", sendResult.getMsgId().c_str(),
+              sendResult.getOffsetMsgId().c_str());
+  }
+  virtual void onException(MQException& e) {}
+};
+static TraceMessageConsumeCallback* consumeTraceCallback = new TraceMessageConsumeCallback();
+std::string ConsumeMessageHookImpl::getHookName() {
+  return "RocketMQConsumeMessageHookImpl";
+}
+
+void ConsumeMessageHookImpl::executeHookBefore(ConsumeMessageContext* context) {
+  if (context == NULL || context->getMsgList().empty()) {
+    return;
+  }
+  TraceContext* traceContext = new TraceContext();
+  context->setTraceContext(traceContext);
+  traceContext->setTraceType(SubBefore);
+  traceContext->setGroupName(NameSpaceUtil::withoutNameSpace(context->getConsumerGroup(), context->getNameSpace()));
+  std::vector<TraceBean> beans;
+
+  std::vector<MQMessageExt> msgs = context->getMsgList();
+  std::vector<MQMessageExt>::iterator it = msgs.begin();
+  for (; it != msgs.end(); ++it) {
+    TraceBean bean;
+    bean.setTopic((*it).getTopic());
+    bean.setMsgId((*it).getMsgId());
+    bean.setTags((*it).getTags());
+    bean.setKeys((*it).getKeys());
+    bean.setStoreHost((*it).getStoreHostString());
+    bean.setStoreTime((*it).getStoreTimestamp());
+    bean.setBodyLength((*it).getStoreSize());
+    bean.setRetryTimes((*it).getReconsumeTimes());
+    std::string regionId = (*it).getProperty(MQMessage::PROPERTY_MSG_REGION);
+    if (regionId.empty()) {
+      regionId = TraceContant::DEFAULT_REDION;
+    }
+    traceContext->setRegionId(regionId);
+    traceContext->setTraceBean(bean);
+  }
+  traceContext->setTimeStamp(UtilAll::currentTimeMillis());
+
+  std::string topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId();
+
+  TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext);
+  MQMessage message(topic, ben.getTransData());
+  message.setKeys(ben.getTransKey());
+
+  // send trace message async.
+  context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback);
+  return;
+}
+
+void ConsumeMessageHookImpl::executeHookAfter(ConsumeMessageContext* context) {
+  if (context == NULL || context->getMsgList().empty()) {
+    return;
+  }
+
+  std::shared_ptr<TraceContext> subBeforeContext = context->getTraceContext();
+  TraceContext subAfterContext;
+  subAfterContext.setTraceType(SubAfter);
+  subAfterContext.setRegionId(subBeforeContext->getRegionId());
+  subAfterContext.setGroupName(subBeforeContext->getGroupName());
+  subAfterContext.setRequestId(subBeforeContext->getRequestId());
+  subAfterContext.setStatus(context->getSuccess());
+  int costTime = static_cast<int>(UtilAll::currentTimeMillis() - subBeforeContext->getTimeStamp());
+  subAfterContext.setCostTime(costTime);
+  subAfterContext.setTraceBeanIndex(context->getMsgIndex());
+  TraceBean bean = subBeforeContext->getTraceBeans()[subAfterContext.getTraceBeanIndex()];
+  subAfterContext.setTraceBean(bean);
+
+  std::string topic = TraceContant::TRACE_TOPIC + subAfterContext.getRegionId();
+  TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(&subAfterContext);
+  MQMessage message(topic, ben.getTransData());
+  message.setKeys(ben.getTransKey());
+
+  // send trace message async.
+  context->getDefaultMQPushConsumer()->submitSendTraceRequest(message, consumeTraceCallback);
+  return;
+}
+}  // namespace rocketmq
diff --git a/src/include/SendMessageHook.h b/src/consumer/ConsumeMessageHookImpl.h
similarity index 51%
copy from src/include/SendMessageHook.h
copy to src/consumer/ConsumeMessageHookImpl.h
index 0d3e1e3..30852ba 100644
--- a/src/include/SendMessageHook.h
+++ b/src/consumer/ConsumeMessageHookImpl.h
@@ -14,36 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
-
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_CONSUME_MESSAGE_RPC_HOOK_IMPL_H__
+#define __ROCKETMQ_CONSUME_MESSAGE_RPC_HOOK_IMPL_H__
 
+#include <string>
+#include "ConsumeMessageContext.h"
+#include "ConsumeMessageHook.h"
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
- public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
-
-class SendMessageHook {
+class ConsumeMessageHookImpl : public ConsumeMessageHook {
  public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+  virtual ~ConsumeMessageHookImpl() {}
+  virtual std::string getHookName();
+  virtual void executeHookBefore(ConsumeMessageContext* context);
+  virtual void executeHookAfter(ConsumeMessageContext* context);
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif
\ No newline at end of file
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp
index fcff4a4..849fa8f 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -185,8 +185,27 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
           if (m_pConsumer->isUseNameSpaceMode()) {
             MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
           }
+          ConsumeMessageContext consumeMessageContext;
+          DefaultMQPushConsumerImpl* pConsumer = dynamic_cast<DefaultMQPushConsumerImpl*>(m_pConsumer);
+          if (pConsumer) {
+            if (pConsumer->getMessageTrace() && pConsumer->hasConsumeMessageHook()) {
+              consumeMessageContext.setDefaultMQPushConsumer(pConsumer);
+              consumeMessageContext.setConsumerGroup(pConsumer->getGroupName());
+              consumeMessageContext.setMessageQueue(request->m_messageQueue);
+              consumeMessageContext.setMsgList(msgs);
+              consumeMessageContext.setSuccess(false);
+              consumeMessageContext.setNameSpace(pConsumer->getNameSpace());
+              pConsumer->executeConsumeMessageHookBefore(&consumeMessageContext);
+            }
+          }
           ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
           if (consumeStatus == RECONSUME_LATER) {
+            if (pConsumer) {
+              consumeMessageContext.setMsgIndex(0);
+              consumeMessageContext.setStatus("RECONSUME_LATER");
+              consumeMessageContext.setSuccess(false);
+              pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
+            }
             if (msgs[0].getReconsumeTimes() <= 15) {
               msgs[0].setReconsumeTimes(msgs[0].getReconsumeTimes() + 1);
               request->makeMessageToCosumeAgain(msgs);
@@ -202,6 +221,12 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
               tryLockLaterAndReconsumeDelay(request, false, 5000);
             }
           } else {
+            if (pConsumer) {
+              consumeMessageContext.setMsgIndex(0);
+              consumeMessageContext.setStatus("CONSUME_SUCCESS");
+              consumeMessageContext.setSuccess(true);
+              pConsumer->executeConsumeMessageHookAfter(&consumeMessageContext);
+            }
             m_pConsumer->updateConsumeOffset(request->m_messageQueue, request->commit());
           }
         } else {
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 5034051..e0cb5bf 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -203,5 +203,10 @@ uint64_t DefaultMQPushConsumer::getTcpTransportTryLockTimeout() const {
 void DefaultMQPushConsumer::setAsyncPull(bool asyncFlag) {
   impl->setAsyncPull(asyncFlag);
 }
-//<!************************************************************************
+void DefaultMQPushConsumer::setMessageTrace(bool messageTrace) {
+  impl->setMessageTrace(messageTrace);
+}
+bool DefaultMQPushConsumer::getMessageTrace() const {
+  return impl->getMessageTrace();
+}
 }  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 5fe6f9c..c503804 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -17,6 +17,7 @@
 
 #include "DefaultMQPushConsumerImpl.h"
 #include "CommunicationMode.h"
+#include "ConsumeMessageHookImpl.h"
 #include "ConsumeMsgService.h"
 #include "ConsumerRunningInfo.h"
 #include "FilterAPI.h"
@@ -190,7 +191,6 @@ class AsyncPullCallback : public PullCallback {
   bool m_bShutdown;
 };
 
-//<!***************************************************************************
 static boost::mutex m_asyncCallbackLock;
 
 DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl() {}
@@ -316,6 +316,7 @@ void DefaultMQPushConsumerImpl::start() {
     case CREATE_JUST: {
       m_serviceState = START_FAILED;
       DefaultMQClient::start();
+      dealWithMessageTrace();
       LOG_INFO("DefaultMQPushConsumerImpl:%s start", m_GroupName.c_str());
 
       //<!data;
@@ -400,6 +401,7 @@ void DefaultMQPushConsumerImpl::shutdown() {
   switch (m_serviceState) {
     case RUNNING: {
       LOG_INFO("DefaultMQPushConsumerImpl shutdown");
+      shutdownMessageTraceInnerProducer();
       m_async_ioService.stop();
       m_async_service_thread->interrupt();
       m_async_service_thread->join();
@@ -1080,5 +1082,86 @@ void DefaultMQPushConsumerImpl::logConfigs() {
   LOG_WARN("AsyncPullMode:%s", m_asyncPull ? "true" : "false");
   LOG_WARN("AsyncPullTimeout:%d ms", m_asyncPullTimeout);
 }
-//<!************************************************************************
+// we should create trace message poll before producer send messages.
+bool DefaultMQPushConsumerImpl::dealWithMessageTrace() {
+  if (!getMessageTrace()) {
+    LOG_INFO("Message Trace set to false, Will not send trace messages.");
+    return false;
+  }
+  // Try to create default producer inner.
+  LOG_INFO("DefaultMQPushConsumer Open message trace..");
+
+  createMessageTraceInnerProducer();
+  std::shared_ptr<ConsumeMessageHook> hook(new ConsumeMessageHookImpl());
+  registerConsumeMessageHook(hook);
+  return true;
+}
+
+void DefaultMQPushConsumerImpl::createMessageTraceInnerProducer() {
+  m_DefaultMQProducerImpl = std::make_shared<DefaultMQProducerImpl>(getGroupName());
+  m_DefaultMQProducerImpl->setMessageTrace(false);
+  m_DefaultMQProducerImpl->setInstanceName(getInstanceName());
+  const SessionCredentials& session = getSessionCredentials();
+  m_DefaultMQProducerImpl->setSessionCredentials(session.getAccessKey(), session.getSecretKey(),
+                                                 session.getAuthChannel());
+  if (!getNamesrvAddr().empty()) {
+    m_DefaultMQProducerImpl->setNamesrvAddr(getNamesrvAddr());
+  }
+  m_DefaultMQProducerImpl->setNameSpace(getNameSpace());
+  // m_DefaultMQProducerImpl->setNamesrvDomain(getNamesrvDomain());
+  m_DefaultMQProducerImpl->start(false);
+}
+void DefaultMQPushConsumerImpl::shutdownMessageTraceInnerProducer() {
+  LOG_INFO("Shutdown Message Trace Inner Producer In Consumer.");
+  m_DefaultMQProducerImpl->shutdown(false);
+}
+bool DefaultMQPushConsumerImpl::hasConsumeMessageHook() {
+  return !m_consumeMessageHookList.empty();
+}
+
+void DefaultMQPushConsumerImpl::registerConsumeMessageHook(std::shared_ptr<ConsumeMessageHook>& hook) {
+  m_consumeMessageHookList.push_back(hook);
+  LOG_INFO("Register ConsumeMessageHook success,hookname is %s", hook->getHookName().c_str());
+}
+
+void DefaultMQPushConsumerImpl::executeConsumeMessageHookBefore(ConsumeMessageContext* context) {
+  if (!m_consumeMessageHookList.empty()) {
+    std::vector<std::shared_ptr<ConsumeMessageHook>>::iterator it = m_consumeMessageHookList.begin();
+    for (; it != m_consumeMessageHookList.end(); ++it) {
+      try {
+        (*it)->executeHookBefore(context);
+      } catch (exception e) {
+      }
+    }
+  }
+}
+
+void DefaultMQPushConsumerImpl::executeConsumeMessageHookAfter(ConsumeMessageContext* context) {
+  if (!m_consumeMessageHookList.empty()) {
+    std::vector<std::shared_ptr<ConsumeMessageHook>>::iterator it = m_consumeMessageHookList.begin();
+    for (; it != m_consumeMessageHookList.end(); ++it) {
+      try {
+        (*it)->executeHookAfter(context);
+      } catch (exception e) {
+      }
+    }
+  }
+}
+
+void DefaultMQPushConsumerImpl::submitSendTraceRequest(MQMessage& msg, SendCallback* pSendCallback) {
+  if (getMessageTrace()) {
+    try {
+      LOG_DEBUG("=====Send Trace Messages,Topic[%s],Key[%s],Body[%s]", msg.getTopic().c_str(), msg.getKeys().c_str(),
+                msg.getBody().c_str());
+      // m_DefaultMQProducerImpl->submitSendTraceRequest(msg, pSendCallback);
+      m_DefaultMQProducerImpl->send(msg, pSendCallback, false);
+    } catch (exception e) {
+      LOG_INFO(e.what());
+    }
+  }
+}
+
+void DefaultMQPushConsumerImpl::setDefaultMqProducerImpl(DefaultMQProducerImpl* DefaultMqProducerImpl) {
+  m_DefaultMQProducerImpl.reset(DefaultMqProducerImpl);
+}
 }  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index f4e4319..34fb307 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -26,6 +26,9 @@
 #include <boost/thread/thread.hpp>
 #include <string>
 #include "AsyncCallback.h"
+#include "ConsumeMessageContext.h"
+#include "ConsumeMessageHook.h"
+#include "DefaultMQProducerImpl.h"
 #include "MQConsumer.h"
 #include "MQMessageListener.h"
 #include "MQMessageQueue.h"
@@ -128,6 +131,13 @@ class DefaultMQPushConsumerImpl : public MQConsumer {
   */
   void setMaxCacheMsgSizePerQueue(int maxCacheSize);
   int getMaxCacheMsgSizePerQueue() const;
+  void submitSendTraceRequest(MQMessage& msg, SendCallback* pSendCallback);
+  bool hasConsumeMessageHook();
+
+  void registerConsumeMessageHook(std::shared_ptr<ConsumeMessageHook>& hook);
+  void setDefaultMqProducerImpl(DefaultMQProducerImpl* DefaultMqProducerImpl);
+  void executeConsumeMessageHookBefore(ConsumeMessageContext* context);
+  void executeConsumeMessageHookAfter(ConsumeMessageContext* context);
 
  private:
   void checkConfig();
@@ -136,6 +146,10 @@ class DefaultMQPushConsumerImpl : public MQConsumer {
   bool dealWithNameSpace();
   void logConfigs();
 
+  bool dealWithMessageTrace();
+  void createMessageTraceInnerProducer();
+  void shutdownMessageTraceInnerProducer();
+
  private:
   uint64_t m_startTime;
   ConsumeFromWhere m_consumeFromWhere;
@@ -161,6 +175,10 @@ class DefaultMQPushConsumerImpl : public MQConsumer {
  private:
   TaskQueue* m_pullmsgQueue;
   std::unique_ptr<boost::thread> m_pullmsgThread;
+
+  // used for trace
+  std::vector<std::shared_ptr<ConsumeMessageHook> > m_consumeMessageHookList;
+  std::shared_ptr<DefaultMQProducerImpl> m_DefaultMQProducerImpl;
 };
 //<!***************************************************************************
 }  // namespace rocketmq
diff --git a/src/include/DefaultMQClient.h b/src/include/DefaultMQClient.h
index a2e5ce5..32b6aed 100644
--- a/src/include/DefaultMQClient.h
+++ b/src/include/DefaultMQClient.h
@@ -170,6 +170,10 @@ class DefaultMQClient {
 
   virtual void setFactory(MQClientFactory*);
 
+  bool getMessageTrace() const;
+
+  void setMessageTrace(bool mMessageTrace);
+
  protected:
   virtual void start();
   virtual void shutdown();
@@ -191,6 +195,7 @@ class DefaultMQClient {
 
   std::string m_unitName;
   SessionCredentials m_SessionCredentials;
+  bool m_messageTrace;
 };
 //<!***************************************************************************
 }  // namespace rocketmq
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 1977299..34c8ac1 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -180,7 +180,12 @@ void DefaultMQProducer::setUnitName(std::string unitName) {
 const std::string& DefaultMQProducer::getUnitName() const {
   return impl->getUnitName();
 }
-
+void DefaultMQProducer::setMessageTrace(bool messageTrace) {
+  impl->setMessageTrace(messageTrace);
+}
+bool DefaultMQProducer::getMessageTrace() const {
+  return impl->getMessageTrace();
+}
 SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) {
   return impl->send(msg, bSelectActiveBroker);
 }
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index 037f81a..bd2556b 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -24,21 +24,19 @@
 #include "CommandHeader.h"
 #include "CommunicationMode.h"
 #include "Logging.h"
-#include "MQClientAPIImpl.h"
 #include "MQClientException.h"
 #include "MQClientFactory.h"
-#include "MQClientManager.h"
 #include "MQDecoder.h"
-#include "MQProtos.h"
 #include "MessageAccessor.h"
 #include "NameSpaceUtil.h"
+#include "SendMessageHookImpl.h"
 #include "StringIdMaker.h"
 #include "TopicPublishInfo.h"
+#include "TraceContant.h"
 #include "Validators.h"
 
 namespace rocketmq {
 
-//<!************************************************************************
 DefaultMQProducerImpl::DefaultMQProducerImpl(const string& groupname)
     : m_sendMsgTimeout(3000),
       m_compressMsgBodyOverHowmuch(4 * 1024),
@@ -46,7 +44,8 @@ DefaultMQProducerImpl::DefaultMQProducerImpl(const string& groupname)
       // m_retryAnotherBrokerWhenNotStoreOK(false),
       m_compressLevel(5),
       m_retryTimes(5),
-      m_retryTimes4Async(1) {
+      m_retryTimes4Async(1),
+      m_trace_ioService_work(m_trace_ioService) {
   //<!set default group name;
   string gname = groupname.empty() ? DEFAULT_PRODUCER_GROUP : groupname;
   setGroupName(gname);
@@ -55,6 +54,9 @@ DefaultMQProducerImpl::DefaultMQProducerImpl(const string& groupname)
 DefaultMQProducerImpl::~DefaultMQProducerImpl() {}
 
 void DefaultMQProducerImpl::start() {
+  start(true);
+}
+void DefaultMQProducerImpl::start(bool factoryStart) {
 #ifndef WIN32
   /* Ignore the SIGPIPE */
   struct sigaction sa;
@@ -70,6 +72,7 @@ void DefaultMQProducerImpl::start() {
   switch (m_serviceState) {
     case CREATE_JUST: {
       m_serviceState = START_FAILED;
+      dealWithMessageTrace();
       DefaultMQClient::start();
       LOG_INFO("DefaultMQProducerImpl:%s start", m_GroupName.c_str());
 
@@ -80,9 +83,10 @@ void DefaultMQProducerImpl::start() {
             MQClientException,
             "The producer group[" + getGroupName() + "] has been created before, specify another name please.", -1);
       }
-
-      getFactory()->start();
-      getFactory()->sendHeartbeatToAllBroker();
+      if (factoryStart) {
+        getFactory()->start();
+        getFactory()->sendHeartbeatToAllBroker();
+      }
       m_serviceState = RUNNING;
       break;
     }
@@ -96,11 +100,21 @@ void DefaultMQProducerImpl::start() {
 }
 
 void DefaultMQProducerImpl::shutdown() {
+  shutdown(true);
+}
+void DefaultMQProducerImpl::shutdown(bool factoryStart) {
   switch (m_serviceState) {
     case RUNNING: {
       LOG_INFO("DefaultMQProducerImpl shutdown");
+      if (getMessageTrace()) {
+        LOG_INFO("DefaultMQProducerImpl message trace thread pool shutdown.");
+        m_trace_ioService.stop();
+        m_trace_threadpool.join_all();
+      }
       getFactory()->unregisterProducer(this);
-      getFactory()->shutdown();
+      if (factoryStart) {
+        getFactory()->shutdown();
+      }
       m_serviceState = SHUTDOWN_ALREADY;
       break;
     }
@@ -432,6 +446,7 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg,
   }
 
   if (!brokerAddr.empty()) {
+    boost::scoped_ptr<SendMessageContext> pSendMesgContext(new SendMessageContext());
     try {
       bool isBatchMsg = std::type_index(typeid(msg)) == std::type_index(typeid(BatchMessage));
       // msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
@@ -444,7 +459,28 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg,
       }
 
       LOG_DEBUG("produce before:%s to %s", msg.toString().c_str(), mq.toString().c_str());
+      if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook()) {
+        pSendMesgContext.reset(new SendMessageContext);
+        pSendMesgContext->setDefaultMqProducer(this);
+        pSendMesgContext->setProducerGroup(NameSpaceUtil::withoutNameSpace(getGroupName(), getNameSpace()));
+        pSendMesgContext->setCommunicationMode(static_cast<CommunicationMode>(communicationMode));
+        pSendMesgContext->setBornHost(UtilAll::getLocalAddress());
+        pSendMesgContext->setBrokerAddr(brokerAddr);
+        pSendMesgContext->setMessage(msg);
+        pSendMesgContext->setMessageQueue(mq);
+        pSendMesgContext->setMsgType(TRACE_NORMAL_MSG);
+        pSendMesgContext->setNameSpace(getNameSpace());
+        string tranMsg = msg.getProperty(MQMessage::PROPERTY_TRANSACTION_PREPARED);
+        if (!tranMsg.empty() && tranMsg == "true") {
+          pSendMesgContext->setMsgType(TRACE_TRANS_HALF_MSG);
+        }
 
+        if (msg.getProperty("__STARTDELIVERTIME") != "" ||
+            msg.getProperty(MQMessage::PROPERTY_DELAY_TIME_LEVEL) != "") {
+          pSendMesgContext->setMsgType(TRACE_DELAY_MSG);
+        }
+        executeSendMessageHookBefore(pSendMesgContext.get());
+      }
       SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
       requestHeader->producerGroup = getGroupName();
       requestHeader->topic = (msg.getTopic());
@@ -458,9 +494,15 @@ SendResult DefaultMQProducerImpl::sendKernelImpl(MQMessage& msg,
       requestHeader->batch = isBatchMsg;
       requestHeader->properties = (MQDecoder::messageProperties2String(msg.getProperties()));
 
-      return getFactory()->getMQClientAPIImpl()->sendMessage(brokerAddr, mq.getBrokerName(), msg, requestHeader,
-                                                             getSendMsgTimeout(), getRetryTimes4Async(),
-                                                             communicationMode, sendCallback, getSessionCredentials());
+      SendResult sendResult = getFactory()->getMQClientAPIImpl()->sendMessage(
+          brokerAddr, mq.getBrokerName(), msg, requestHeader, getSendMsgTimeout(), getRetryTimes4Async(),
+          communicationMode, sendCallback, getSessionCredentials());
+      if (!isMessageTraceTopic(msg.getTopic()) && getMessageTrace() && hasSendMessageHook() && sendCallback == NULL &&
+          communicationMode == ComMode_SYNC) {
+        pSendMesgContext->setSendResult(sendResult);
+        executeSendMessageHookAfter(pSendMesgContext.get());
+      }
+      return sendResult;
     } catch (MQException& e) {
       throw e;
     }
@@ -636,6 +678,7 @@ bool DefaultMQProducerImpl::dealWithNameSpace() {
   }
   return true;
 }
+
 void DefaultMQProducerImpl::logConfigs() {
   showClientConfigs();
 
@@ -646,5 +689,71 @@ void DefaultMQProducerImpl::logConfigs() {
   LOG_WARN("RetryTimes:%d", m_retryTimes);
   LOG_WARN("RetryTimes4Async:%d", m_retryTimes4Async);
 }
-//<!***************************************************************************
+
+// we should create trace message poll before producer send messages.
+bool DefaultMQProducerImpl::dealWithMessageTrace() {
+  if (!getMessageTrace()) {
+    LOG_INFO("Message Trace set to false, Will not send trace messages.");
+    return false;
+  }
+  size_t threadpool_size = boost::thread::hardware_concurrency();
+  LOG_INFO("Create send message trace threadpool: %d", threadpool_size);
+  for (size_t i = 0; i < threadpool_size; ++i) {
+    m_trace_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_trace_ioService));
+  }
+  LOG_INFO("DefaultMQProducer Open meassage trace..");
+  std::shared_ptr<SendMessageHook> hook(new SendMessageHookImpl);
+  registerSendMessageHook(hook);
+  return true;
+}
+bool DefaultMQProducerImpl::isMessageTraceTopic(string source) {
+  return source.find(TraceContant::TRACE_TOPIC) != string::npos;
+}
+bool DefaultMQProducerImpl::hasSendMessageHook() {
+  return !m_sendMessageHookList.empty();
+}
+
+void DefaultMQProducerImpl::registerSendMessageHook(std::shared_ptr<SendMessageHook>& hook) {
+  m_sendMessageHookList.push_back(hook);
+  LOG_INFO("Register sendMessageHook success,hookname is %s", hook->getHookName().c_str());
+}
+
+void DefaultMQProducerImpl::executeSendMessageHookBefore(SendMessageContext* context) {
+  if (!m_sendMessageHookList.empty()) {
+    std::vector<std::shared_ptr<SendMessageHook> >::iterator it = m_sendMessageHookList.begin();
+    for (; it != m_sendMessageHookList.end(); ++it) {
+      try {
+        (*it)->executeHookBefore(context);
+      } catch (exception e) {
+      }
+    }
+  }
+}
+
+void DefaultMQProducerImpl::executeSendMessageHookAfter(SendMessageContext* context) {
+  if (!m_sendMessageHookList.empty()) {
+    std::vector<std::shared_ptr<SendMessageHook> >::iterator it = m_sendMessageHookList.begin();
+    for (; it != m_sendMessageHookList.end(); ++it) {
+      try {
+        (*it)->executeHookAfter(context);
+      } catch (exception e) {
+      }
+    }
+  }
+}
+
+void DefaultMQProducerImpl::submitSendTraceRequest(const MQMessage& msg, SendCallback* pSendCallback) {
+  m_trace_ioService.post(boost::bind(&DefaultMQProducerImpl::sendTraceMessage, this, msg, pSendCallback));
+}
+
+void DefaultMQProducerImpl::sendTraceMessage(MQMessage& msg, SendCallback* pSendCallback) {
+  try {
+    LOG_DEBUG("=====Send Trace Messages,Topic[%s],Key[%s],Body[%s]", msg.getTopic().c_str(), msg.getKeys().c_str(),
+              msg.getBody().c_str());
+    send(msg, pSendCallback, true);
+  } catch (MQException e) {
+    LOG_ERROR(e.what());
+    // throw e;
+  }
+}
 }  // namespace rocketmq
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index 65fe6af..637c193 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -22,6 +22,8 @@
 #include "MQMessageQueue.h"
 #include "MQProducer.h"
 #include "RocketMQClient.h"
+#include "SendMessageContext.h"
+#include "SendMessageHook.h"
 #include "SendResult.h"
 
 namespace rocketmq {
@@ -34,6 +36,8 @@ class DefaultMQProducerImpl : public MQProducer {
   //<!begin mqadmin;
   virtual void start();
   virtual void shutdown();
+  virtual void start(bool factoryStart);
+  virtual void shutdown(bool factoryStart);
   //<!end mqadmin;
 
   //<! begin MQProducer;
@@ -78,6 +82,7 @@ class DefaultMQProducerImpl : public MQProducer {
 
   int getRetryTimes4Async() const;
   void setRetryTimes4Async(int times);
+  void submitSendTraceRequest(const MQMessage& msg, SendCallback* pSendCallback);
 
  protected:
   SendResult sendAutoRetrySelectImpl(MQMessage& msg,
@@ -104,6 +109,14 @@ class DefaultMQProducerImpl : public MQProducer {
   BatchMessage buildBatchMessage(std::vector<MQMessage>& msgs);
   bool dealWithNameSpace();
   void logConfigs();
+  bool dealWithMessageTrace();
+  bool isMessageTraceTopic(std::string topic);
+  bool hasSendMessageHook();
+  void registerSendMessageHook(std::shared_ptr<SendMessageHook>& hook);
+  void executeSendMessageHookBefore(SendMessageContext* context);
+  void executeSendMessageHookAfter(SendMessageContext* context);
+
+  void sendTraceMessage(MQMessage& msg, SendCallback* pSendCallback);
 
  private:
   int m_sendMsgTimeout;
@@ -113,6 +126,12 @@ class DefaultMQProducerImpl : public MQProducer {
   int m_compressLevel;
   int m_retryTimes;
   int m_retryTimes4Async;
+
+  // used for trace
+  std::vector<std::shared_ptr<SendMessageHook> > m_sendMessageHookList;
+  boost::asio::io_service m_trace_ioService;
+  boost::thread_group m_trace_threadpool;
+  boost::asio::io_service::work m_trace_ioService_work;
 };
 //<!***************************************************************************
 }  // namespace rocketmq
diff --git a/src/producer/SendMessageHookImpl.cpp b/src/producer/SendMessageHookImpl.cpp
new file mode 100644
index 0000000..f1f6a62
--- /dev/null
+++ b/src/producer/SendMessageHookImpl.cpp
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SendMessageHookImpl.h"
+#include <memory>
+#include <string>
+#include "DefaultMQProducerImpl.h"
+#include "Logging.h"
+#include "MQClientException.h"
+#include "SendMessageContext.h"
+#include "TraceContant.h"
+#include "TraceTransferBean.h"
+#include "TraceUtil.h"
+#include "UtilAll.h"
+
+using namespace std;
+namespace rocketmq {
+
+class TraceMessageSendCallback : public SendCallback {
+  virtual void onSuccess(SendResult& sendResult) {
+    LOG_DEBUG("TraceMessageSendCallback, MsgId:[%s],OffsetMsgId[%s]", sendResult.getMsgId().c_str(),
+              sendResult.getOffsetMsgId().c_str());
+  }
+  virtual void onException(MQException& e) {}
+};
+static TraceMessageSendCallback* callback = new TraceMessageSendCallback();
+std::string SendMessageHookImpl::getHookName() {
+  return "RocketMQSendMessageHookImpl";
+}
+
+void SendMessageHookImpl::executeHookBefore(SendMessageContext* context) {
+  if (context != NULL) {
+    string topic = context->getMessage()->getTopic();
+    // Check if contains TraceConstants::TRACE_TOPIC
+    if (topic.find(TraceContant::TRACE_TOPIC) != string::npos) {
+      // trace message itself
+      return;
+    }
+    TraceContext* traceContext = new TraceContext();
+    context->setTraceContext(traceContext);
+  }
+  return;
+}
+
+void SendMessageHookImpl::executeHookAfter(SendMessageContext* context) {
+  if (context == NULL) {
+    return;
+  }
+  string topic = context->getMessage()->getTopic();
+  // Check if contains TraceConstants::TRACE_TOPIC
+  if (topic.find(TraceContant::TRACE_TOPIC) != string::npos) {
+    // trace message itself
+    return;
+  }
+  std::shared_ptr<TraceContext> traceContext;
+  traceContext.reset(context->getTraceContext());
+
+  // OnsTraceContext* onsContext = context->getMqTraceContext();
+  traceContext->setTraceType(Pub);
+  traceContext->setGroupName(context->getProducerGroup());
+  // boost::scoped_ptr<OnsTraceBean> traceBean(new OnsTraceBean());
+  TraceBean traceBean;
+  traceBean.setTopic(context->getMessage()->getTopic());
+  traceBean.setTags(context->getMessage()->getTags());
+  traceBean.setKeys(context->getMessage()->getKeys());
+  traceBean.setStoreHost(context->getBrokerAddr());
+  traceBean.setBodyLength(context->getMessage()->getBody().size());
+  traceBean.setMsgType(context->getMsgType());
+
+  int costTime = static_cast<int>(UtilAll::currentTimeMillis() - traceContext->getTimeStamp());
+  traceContext->setCostTime(costTime);
+  if (context->getSendResult()->getSendStatus() == SEND_OK) {
+    traceContext->setStatus(true);
+  } else {
+    traceContext->setStatus(false);
+  }
+
+  traceContext->setRegionId(context->getSendResult()->getRegionId());
+  traceBean.setMsgId(context->getMessage()->getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
+  traceBean.setOffsetMsgId(context->getSendResult()->getOffsetMsgId());
+  traceBean.setStoreTime(traceContext->getTimeStamp() + (costTime / 2));
+
+  traceContext->setTraceBean(traceBean);
+
+  topic = TraceContant::TRACE_TOPIC + traceContext->getRegionId();
+  TraceTransferBean ben = TraceUtil::CovertTraceContextToTransferBean(traceContext.get());
+  // encode data
+  MQMessage message(topic, ben.getTransData());
+  message.setKeys(ben.getTransKey());
+  // send trace message.
+  context->getDefaultMqProducer()->submitSendTraceRequest(message, callback);
+  return;
+}
+}  // namespace rocketmq
\ No newline at end of file
diff --git a/src/include/SendMessageHook.h b/src/producer/SendMessageHookImpl.h
similarity index 51%
copy from src/include/SendMessageHook.h
copy to src/producer/SendMessageHookImpl.h
index 0d3e1e3..54a2ebc 100644
--- a/src/include/SendMessageHook.h
+++ b/src/producer/SendMessageHookImpl.h
@@ -14,36 +14,19 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
-
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_IMPL_H__
+#define __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_IMPL_H__
 
+#include <string>
+#include "SendMessageContext.h"
+#include "SendMessageHook.h"
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
- public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
-
-class SendMessageHook {
+class SendMessageHookImpl : public SendMessageHook {
  public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+  virtual ~SendMessageHookImpl() {}
+  virtual std::string getHookName();
+  virtual void executeHookBefore(SendMessageContext* context);
+  virtual void executeHookAfter(SendMessageContext* context);
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif
\ No newline at end of file
diff --git a/src/trace/ConsumeMessageContext.cpp b/src/trace/ConsumeMessageContext.cpp
new file mode 100644
index 0000000..149708b
--- /dev/null
+++ b/src/trace/ConsumeMessageContext.cpp
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ConsumeMessageContext.h"
+#include <string>
+#include <vector>
+#include "DefaultMQPushConsumerImpl.h"
+
+namespace rocketmq {
+ConsumeMessageContext::ConsumeMessageContext() {
+  m_defaultMQPushConsumer = NULL;
+  // m_traceContext = NULL;
+}
+ConsumeMessageContext::~ConsumeMessageContext() {
+  m_traceContext.reset();
+}
+std::string ConsumeMessageContext::getConsumerGroup() {
+  return m_consumerGroup;
+}
+
+void ConsumeMessageContext::setConsumerGroup(const std::string& mConsumerGroup) {
+  m_consumerGroup = mConsumerGroup;
+}
+
+bool ConsumeMessageContext::getSuccess() {
+  return m_success;
+}
+
+void ConsumeMessageContext::setSuccess(bool mSuccess) {
+  m_success = mSuccess;
+}
+
+std::vector<MQMessageExt> ConsumeMessageContext::getMsgList() {
+  return m_msgList;
+}
+
+void ConsumeMessageContext::setMsgList(std::vector<MQMessageExt> mMsgList) {
+  m_msgList = mMsgList;
+}
+
+std::string ConsumeMessageContext::getStatus() {
+  return m_status;
+}
+
+void ConsumeMessageContext::setStatus(const std::string& mStatus) {
+  m_status = mStatus;
+}
+
+int ConsumeMessageContext::getMsgIndex() {
+  return m_msgIndex;
+}
+
+void ConsumeMessageContext::setMsgIndex(int mMsgIndex) {
+  m_msgIndex = mMsgIndex;
+}
+
+MQMessageQueue ConsumeMessageContext::getMessageQueue() {
+  return m_messageQueue;
+}
+
+void ConsumeMessageContext::setMessageQueue(const MQMessageQueue& mMessageQueue) {
+  m_messageQueue = mMessageQueue;
+}
+
+DefaultMQPushConsumerImpl* ConsumeMessageContext::getDefaultMQPushConsumer() {
+  return m_defaultMQPushConsumer;
+}
+
+void ConsumeMessageContext::setDefaultMQPushConsumer(DefaultMQPushConsumerImpl* mDefaultMqPushConsumer) {
+  m_defaultMQPushConsumer = mDefaultMqPushConsumer;
+}
+
+std::shared_ptr<TraceContext> ConsumeMessageContext::getTraceContext() {
+  return m_traceContext;
+}
+
+void ConsumeMessageContext::setTraceContext(TraceContext* mTraceContext) {
+  m_traceContext.reset(mTraceContext);
+}
+
+std::string ConsumeMessageContext::getNameSpace() {
+  return m_nameSpace;
+}
+
+void ConsumeMessageContext::setNameSpace(const std::string& mNameSpace) {
+  m_nameSpace = mNameSpace;
+}
+}  // namespace rocketmq
diff --git a/src/trace/ConsumeMessageContext.h b/src/trace/ConsumeMessageContext.h
new file mode 100644
index 0000000..67024a6
--- /dev/null
+++ b/src/trace/ConsumeMessageContext.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ROCKETMQ_CONSUME_MESSAGE_CONTEXT_H__
+#define __ROCKETMQ_CONSUME_MESSAGE_CONTEXT_H__
+
+#include <memory>
+#include <string>
+#include <vector>
+#include "MQMessageExt.h"
+#include "MQMessageQueue.h"
+#include "TraceBean.h"
+#include "TraceContant.h"
+#include "TraceContext.h"
+
+namespace rocketmq {
+class DefaultMQPushConsumerImpl;
+class ConsumeMessageContext {
+ public:
+  ConsumeMessageContext();
+
+  virtual ~ConsumeMessageContext();
+
+  std::string getConsumerGroup();
+
+  void setConsumerGroup(const std::string& mConsumerGroup);
+
+  bool getSuccess();
+
+  void setSuccess(bool mSuccess);
+
+  std::vector<MQMessageExt> getMsgList();
+
+  void setMsgList(std::vector<MQMessageExt> mMsgList);
+
+  std::string getStatus();
+
+  void setStatus(const std::string& mStatus);
+
+  int getMsgIndex();
+
+  void setMsgIndex(int mMsgIndex);
+
+  MQMessageQueue getMessageQueue();
+
+  void setMessageQueue(const MQMessageQueue& mMessageQueue);
+
+  DefaultMQPushConsumerImpl* getDefaultMQPushConsumer();
+
+  void setDefaultMQPushConsumer(DefaultMQPushConsumerImpl* mDefaultMqPushConsumer);
+
+  std::shared_ptr<TraceContext> getTraceContext();
+
+  void setTraceContext(TraceContext* mTraceContext);
+
+  std::string getNameSpace();
+
+  void setNameSpace(const std::string& mNameSpace);
+
+ private:
+  std::string m_consumerGroup;
+  bool m_success;
+  std::vector<MQMessageExt> m_msgList;
+  std::string m_status;
+  int m_msgIndex;
+  MQMessageQueue m_messageQueue;
+  DefaultMQPushConsumerImpl* m_defaultMQPushConsumer;
+  // TraceContext* m_traceContext;
+  std::shared_ptr<TraceContext> m_traceContext;
+  std::string m_nameSpace;
+};
+}  // namespace rocketmq
+#endif
\ No newline at end of file
diff --git a/src/include/SendMessageHook.h b/src/trace/ConsumeMessageHook.h
similarity index 51%
copy from src/include/SendMessageHook.h
copy to src/trace/ConsumeMessageHook.h
index 0d3e1e3..d6317fb 100644
--- a/src/include/SendMessageHook.h
+++ b/src/trace/ConsumeMessageHook.h
@@ -14,36 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
-
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_COUNSUME_MESSAGE_RPC_HOOK_H__
+#define __ROCKETMQ_COUNSUME_MESSAGE_RPC_HOOK_H__
 
+#include <string>
+#include "ConsumeMessageContext.h"
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
- public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
-
-class SendMessageHook {
+class ConsumeMessageHook {
  public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+  virtual ~ConsumeMessageHook() {}
+  virtual std::string getHookName() = 0;
+  virtual void executeHookBefore(ConsumeMessageContext* context) = 0;
+  virtual void executeHookAfter(ConsumeMessageContext* context) = 0;
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif
\ No newline at end of file
diff --git a/src/trace/SendMessageContext.cpp b/src/trace/SendMessageContext.cpp
new file mode 100644
index 0000000..9cf0766
--- /dev/null
+++ b/src/trace/SendMessageContext.cpp
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SendMessageContext.h"
+#include <string>
+
+namespace rocketmq {
+SendMessageContext::SendMessageContext() {}
+
+SendMessageContext::~SendMessageContext() {
+  m_defaultMQProducer = NULL;
+}
+
+std::string SendMessageContext::getProducerGroup() {
+  return m_producerGroup;
+}
+
+void SendMessageContext::setProducerGroup(const std::string& mProducerGroup) {
+  m_producerGroup = mProducerGroup;
+}
+
+MQMessage* SendMessageContext::getMessage() {
+  return &m_message;
+}
+
+void SendMessageContext::setMessage(const MQMessage& mMessage) {
+  m_message = mMessage;
+}
+
+TraceMessageType SendMessageContext::getMsgType() {
+  return m_msgType;
+}
+
+void SendMessageContext::setMsgType(TraceMessageType mMsgType) {
+  m_msgType = mMsgType;
+}
+
+MQMessageQueue* SendMessageContext::getMessageQueue() {
+  return &m_messageQueue;
+}
+
+void SendMessageContext::setMessageQueue(const MQMessageQueue& mMq) {
+  m_messageQueue = mMq;
+}
+
+std::string SendMessageContext::getBrokerAddr() {
+  return m_brokerAddr;
+}
+
+void SendMessageContext::setBrokerAddr(const std::string& mBrokerAddr) {
+  m_brokerAddr = mBrokerAddr;
+}
+
+std::string SendMessageContext::getBornHost() {
+  return m_bornHost;
+}
+
+void SendMessageContext::setBornHost(const std::string& mBornHost) {
+  m_bornHost = mBornHost;
+}
+
+CommunicationMode SendMessageContext::getCommunicationMode() {
+  return m_communicationMode;
+}
+
+void SendMessageContext::setCommunicationMode(CommunicationMode mCommunicationMode) {
+  m_communicationMode = mCommunicationMode;
+}
+
+DefaultMQProducerImpl* SendMessageContext::getDefaultMqProducer() {
+  return m_defaultMQProducer;
+}
+
+void SendMessageContext::setDefaultMqProducer(DefaultMQProducerImpl* mDefaultMqProducer) {
+  m_defaultMQProducer = mDefaultMqProducer;
+}
+
+SendResult* SendMessageContext::getSendResult() {
+  return &m_sendResult;
+}
+
+void SendMessageContext::setSendResult(const SendResult& mSendResult) {
+  m_sendResult = mSendResult;
+}
+
+TraceContext* SendMessageContext::getTraceContext() {
+  return m_traceContext;
+}
+
+void SendMessageContext::setTraceContext(TraceContext* mTraceContext) {
+  m_traceContext = mTraceContext;
+}
+
+std::string SendMessageContext::getNameSpace() {
+  return m_nameSpace;
+}
+
+void SendMessageContext::setNameSpace(const std::string& mNameSpace) {
+  m_nameSpace = mNameSpace;
+}
+}  // namespace rocketmq
diff --git a/src/trace/SendMessageContext.h b/src/trace/SendMessageContext.h
new file mode 100644
index 0000000..9cd985a
--- /dev/null
+++ b/src/trace/SendMessageContext.h
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __ROCKETMQ_SEND_MESSAGE_CONTEXT_H__
+#define __ROCKETMQ_SEND_MESSAGE_CONTEXT_H__
+
+#include <string>
+#include "CommunicationMode.h"
+#include "MQMessage.h"
+#include "MQMessageQueue.h"
+#include "SendResult.h"
+#include "TraceBean.h"
+#include "TraceContant.h"
+#include "TraceContext.h"
+
+namespace rocketmq {
+class DefaultMQProducerImpl;
+class SendMessageContext {
+ public:
+  SendMessageContext();
+
+  virtual ~SendMessageContext();
+
+  std::string getProducerGroup();
+
+  void setProducerGroup(const std::string& mProducerGroup);
+
+  MQMessage* getMessage();
+
+  void setMessage(const MQMessage& mMessage);
+
+  TraceMessageType getMsgType();
+
+  void setMsgType(TraceMessageType mMsgType);
+
+  MQMessageQueue* getMessageQueue();
+
+  void setMessageQueue(const MQMessageQueue& mMq);
+
+  std::string getBrokerAddr();
+
+  void setBrokerAddr(const std::string& mBrokerAddr);
+
+  std::string getBornHost();
+
+  void setBornHost(const std::string& mBornHost);
+
+  CommunicationMode getCommunicationMode();
+
+  void setCommunicationMode(CommunicationMode mCommunicationMode);
+
+  DefaultMQProducerImpl* getDefaultMqProducer();
+
+  void setDefaultMqProducer(DefaultMQProducerImpl* mDefaultMqProducer);
+
+  SendResult* getSendResult();
+
+  void setSendResult(const SendResult& mSendResult);
+
+  TraceContext* getTraceContext();
+
+  void setTraceContext(TraceContext* mTraceContext);
+
+  std::string getNameSpace();
+
+  void setNameSpace(const std::string& mNameSpace);
+
+ private:
+  std::string m_producerGroup;
+  MQMessage m_message;
+  TraceMessageType m_msgType;
+  MQMessageQueue m_messageQueue;
+  std::string m_brokerAddr;
+  std::string m_bornHost;
+  CommunicationMode m_communicationMode;
+  DefaultMQProducerImpl* m_defaultMQProducer;
+  SendResult m_sendResult;
+  TraceContext* m_traceContext;
+  std::string m_nameSpace;
+};
+
+}  // namespace rocketmq
+#endif
\ No newline at end of file
diff --git a/src/include/SendMessageHook.h b/src/trace/SendMessageHook.h
similarity index 54%
copy from src/include/SendMessageHook.h
copy to src/trace/SendMessageHook.h
index 0d3e1e3..70c3927 100644
--- a/src/include/SendMessageHook.h
+++ b/src/trace/SendMessageHook.h
@@ -14,36 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
-
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_H__
+#define __ROCKETMQ_SEND_MESSAGE_RPC_HOOK_H__
 
+#include <string>
+#include "SendMessageContext.h"
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
- public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
-
 class SendMessageHook {
  public:
   virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+  virtual std::string getHookName() = 0;
+  virtual void executeHookBefore(SendMessageContext* context) = 0;
+  virtual void executeHookAfter(SendMessageContext* context) = 0;
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif
\ No newline at end of file
diff --git a/src/trace/TraceBean.cpp b/src/trace/TraceBean.cpp
new file mode 100644
index 0000000..cf7275b
--- /dev/null
+++ b/src/trace/TraceBean.cpp
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TraceBean.h"
+#include <string>
+#include <vector>
+
+namespace rocketmq {
+TraceBean::TraceBean()
+    : m_topic("null"),
+      m_msgId("null"),
+      m_offsetMsgId("null"),
+      m_tags("null"),
+      m_keys("null"),
+      m_storeHost("null"),
+      m_clientHost("null") {}
+
+TraceBean::~TraceBean() {}
+
+const std::string& TraceBean::getTopic() const {
+  return m_topic;
+}
+
+void TraceBean::setTopic(const std::string& topic) {
+  m_topic = topic;
+}
+
+const std::string& TraceBean::getMsgId() const {
+  return m_msgId;
+}
+
+void TraceBean::setMsgId(const std::string& msgId) {
+  m_msgId = msgId;
+}
+
+const std::string& TraceBean::getOffsetMsgId() const {
+  return m_offsetMsgId;
+}
+
+void TraceBean::setOffsetMsgId(const std::string& offsetMsgId) {
+  m_offsetMsgId = offsetMsgId;
+}
+
+const std::string& TraceBean::getTags() const {
+  return m_tags;
+}
+
+void TraceBean::setTags(const std::string& tags) {
+  m_tags = tags;
+}
+
+const std::string& TraceBean::getKeys() const {
+  return m_keys;
+}
+
+void TraceBean::setKeys(const std::string& keys) {
+  m_keys = keys;
+}
+
+const std::string& TraceBean::getStoreHost() const {
+  return m_storeHost;
+}
+
+void TraceBean::setStoreHost(const std::string& storeHost) {
+  m_storeHost = storeHost;
+}
+
+const std::string& TraceBean::getClientHost() const {
+  return m_clientHost;
+}
+
+void TraceBean::setClientHost(const std::string& clientHost) {
+  m_clientHost = clientHost;
+}
+
+TraceMessageType TraceBean::getMsgType() const {
+  return m_msgType;
+}
+
+void TraceBean::setMsgType(TraceMessageType msgType) {
+  m_msgType = msgType;
+}
+
+long long int TraceBean::getStoreTime() const {
+  return m_storeTime;
+}
+
+void TraceBean::setStoreTime(long long int storeTime) {
+  m_storeTime = storeTime;
+}
+
+int TraceBean::getRetryTimes() const {
+  return m_retryTimes;
+}
+
+void TraceBean::setRetryTimes(int retryTimes) {
+  m_retryTimes = retryTimes;
+}
+
+int TraceBean::getBodyLength() const {
+  return m_bodyLength;
+}
+
+void TraceBean::setBodyLength(int bodyLength) {
+  m_bodyLength = bodyLength;
+}
+}  // namespace rocketmq
\ No newline at end of file
diff --git a/src/trace/TraceBean.h b/src/trace/TraceBean.h
new file mode 100644
index 0000000..b6edce9
--- /dev/null
+++ b/src/trace/TraceBean.h
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ROCKETMQ_TRACE_BEAN_H__
+#define __ROCKETMQ_TRACE_BEAN_H__
+
+#include <string>
+#include <vector>
+#include "TraceContant.h"
+
+namespace rocketmq {
+class TraceBean {
+ public:
+  TraceBean();
+  virtual ~TraceBean();
+
+  const std::string& getTopic() const;
+
+  void setTopic(const std::string& topic);
+
+  const std::string& getMsgId() const;
+
+  void setMsgId(const std::string& msgId);
+
+  const std::string& getOffsetMsgId() const;
+
+  void setOffsetMsgId(const std::string& offsetMsgId);
+
+  const std::string& getTags() const;
+
+  void setTags(const std::string& tags);
+
+  const std::string& getKeys() const;
+
+  void setKeys(const std::string& keys);
+
+  const std::string& getStoreHost() const;
+
+  void setStoreHost(const std::string& storeHost);
+
+  const std::string& getClientHost() const;
+
+  void setClientHost(const std::string& clientHost);
+
+  TraceMessageType getMsgType() const;
+
+  void setMsgType(TraceMessageType msgType);
+
+  long long int getStoreTime() const;
+
+  void setStoreTime(long long int storeTime);
+
+  int getRetryTimes() const;
+
+  void setRetryTimes(int retryTimes);
+
+  int getBodyLength() const;
+
+  void setBodyLength(int bodyLength);
+
+ private:
+  std::string m_topic;
+  std::string m_msgId;
+  std::string m_offsetMsgId;
+  std::string m_tags;
+  std::string m_keys;
+  std::string m_storeHost;
+  std::string m_clientHost;
+  TraceMessageType m_msgType;
+  long long m_storeTime;
+  int m_retryTimes;
+  int m_bodyLength;
+};
+}  // namespace rocketmq
+#endif
\ No newline at end of file
diff --git a/src/include/SendMessageHook.h b/src/trace/TraceContant.cpp
similarity index 50%
copy from src/include/SendMessageHook.h
copy to src/trace/TraceContant.cpp
index 0d3e1e3..997a730 100644
--- a/src/include/SendMessageHook.h
+++ b/src/trace/TraceContant.cpp
@@ -14,36 +14,17 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
 
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#include "TraceContant.h"
+#include <string>
 
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
- public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
-
-class SendMessageHook {
- public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
-};
-//<!***************************************************************************
+std::string TraceContant::GROUP_NAME = "_INNER_TRACE_PRODUCER";
+std::string TraceContant::TRACE_TOPIC = "rmq_sys_TRACE_DATA_";
+std::string TraceContant::DEFAULT_REDION = "DEFAULT_REGION";
+char TraceContant::CONTENT_SPLITOR = 1;
+char TraceContant::FIELD_SPLITOR = 2;
+std::string TraceContant::TRACE_TYPE_PUB = "Pub";
+std::string TraceContant::TRACE_TYPE_BEFORE = "SubBefore";
+std::string TraceContant::TRACE_TYPE_AFTER = "SubAfter";
 }  // namespace rocketmq
-#endif
diff --git a/src/include/SendMessageHook.h b/src/trace/TraceContant.h
similarity index 51%
copy from src/include/SendMessageHook.h
copy to src/trace/TraceContant.h
index 0d3e1e3..3a5f30f 100644
--- a/src/include/SendMessageHook.h
+++ b/src/trace/TraceContant.h
@@ -14,36 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
 
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_TRACE_CONTANT_H_
+#define __ROCKETMQ_TRACE_CONTANT_H_
+
+#include <string>
 
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
+class TraceContant {
  public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
+  static std::string GROUP_NAME;
+  static std::string TRACE_TOPIC;
+  static std::string DEFAULT_REDION;
+  static char CONTENT_SPLITOR;
+  static char FIELD_SPLITOR;
+  static std::string TRACE_TYPE_PUB;
+  static std::string TRACE_TYPE_BEFORE;
+  static std::string TRACE_TYPE_AFTER;
 };
-
-class SendMessageHook {
- public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+enum TraceMessageType {
+  TRACE_NORMAL_MSG = 0,
+  TRACE_TRANS_HALF_MSG,
+  TRACE_TRANS_COMMIT_MSG,
+  TRACE_DELAY_MSG,
+};
+enum TraceType {
+  Pub,        // for send message
+  SubBefore,  // for consume message before
+  SubAfter,   // for consum message after
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif  //
diff --git a/src/trace/TraceContext.cpp b/src/trace/TraceContext.cpp
new file mode 100644
index 0000000..6696f06
--- /dev/null
+++ b/src/trace/TraceContext.cpp
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TraceContext.h"
+#include <string>
+#include <vector>
+
+#include "StringIdMaker.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+TraceContext::TraceContext() : m_timeStamp(UtilAll::currentTimeMillis()) {
+  m_requestId = StringIdMaker::getInstance().createUniqID();
+}
+
+TraceContext::TraceContext(const std::string& mGroupName) : m_groupName(mGroupName) {}
+
+TraceContext::~TraceContext() {}
+
+TraceMessageType TraceContext::getMsgType() const {
+  return m_msgType;
+}
+
+void TraceContext::setMsgType(TraceMessageType msgType) {
+  m_msgType = msgType;
+}
+
+TraceType TraceContext::getTraceType() const {
+  return m_traceType;
+}
+
+void TraceContext::setTraceType(TraceType traceType) {
+  m_traceType = traceType;
+}
+
+long long int TraceContext::getTimeStamp() const {
+  return m_timeStamp;
+}
+
+void TraceContext::setTimeStamp(long long int timeStamp) {
+  m_timeStamp = timeStamp;
+}
+
+const string& TraceContext::getRegionId() const {
+  return m_regionId;
+}
+
+void TraceContext::setRegionId(const string& regionId) {
+  m_regionId = regionId;
+}
+
+const string& TraceContext::getGroupName() const {
+  return m_groupName;
+}
+
+void TraceContext::setGroupName(const string& groupName) {
+  m_groupName = groupName;
+}
+
+int TraceContext::getCostTime() const {
+  return m_costTime;
+}
+
+void TraceContext::setCostTime(int costTime) {
+  m_costTime = costTime;
+}
+
+bool TraceContext::getStatus() const {
+  return m_status;
+}
+
+void TraceContext::setStatus(bool isSuccess) {
+  m_status = isSuccess;
+}
+
+const string& TraceContext::getRequestId() const {
+  return m_requestId;
+}
+
+void TraceContext::setRequestId(const string& requestId) {
+  m_requestId = requestId;
+}
+
+int TraceContext::getTraceBeanIndex() const {
+  return m_traceBeanIndex;
+}
+
+void TraceContext::setTraceBeanIndex(int traceBeanIndex) {
+  m_traceBeanIndex = traceBeanIndex;
+}
+
+const vector<TraceBean>& TraceContext::getTraceBeans() const {
+  return m_traceBeans;
+}
+
+void TraceContext::setTraceBean(const TraceBean& traceBean) {
+  m_traceBeans.push_back(traceBean);
+}
+}  // namespace rocketmq
\ No newline at end of file
diff --git a/src/trace/TraceContext.h b/src/trace/TraceContext.h
new file mode 100644
index 0000000..1f61754
--- /dev/null
+++ b/src/trace/TraceContext.h
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __ROCKETMQ_TRACE_CONTEXT_H__
+#define __ROCKETMQ_TRACE_CONTEXT_H__
+
+#include <string>
+#include <vector>
+#include "TraceBean.h"
+#include "TraceContant.h"
+
+namespace rocketmq {
+class TraceContext {
+ public:
+  TraceContext();
+
+  TraceContext(const std::string& mGroupName);
+
+  virtual ~TraceContext();
+
+  TraceMessageType getMsgType() const;
+
+  void setMsgType(TraceMessageType msgType);
+
+  TraceType getTraceType() const;
+
+  void setTraceType(TraceType traceType);
+
+  long long int getTimeStamp() const;
+
+  void setTimeStamp(long long int timeStamp);
+
+  const std::string& getRegionId() const;
+
+  void setRegionId(const std::string& regionId);
+
+  const std::string& getGroupName() const;
+
+  void setGroupName(const std::string& groupName);
+
+  int getCostTime() const;
+
+  void setCostTime(int costTime);
+
+  bool getStatus() const;
+
+  void setStatus(bool isSuccess);
+
+  const std::string& getRequestId() const;
+
+  void setRequestId(const std::string& requestId);
+
+  int getTraceBeanIndex() const;
+
+  void setTraceBeanIndex(int traceBeanIndex);
+
+  const std::vector<TraceBean>& getTraceBeans() const;
+
+  void setTraceBean(const TraceBean& traceBean);
+
+ private:
+  TraceMessageType m_msgType;
+  TraceType m_traceType;
+  long long m_timeStamp;
+  std::string m_regionId;
+  std::string m_groupName;
+  int m_costTime;
+  bool m_status;
+  std::string m_requestId;
+  int m_traceBeanIndex;
+  std::vector<TraceBean> m_traceBeans;
+};
+}  // namespace rocketmq
+#endif
\ No newline at end of file
diff --git a/src/trace/TraceTransferBean.cpp b/src/trace/TraceTransferBean.cpp
new file mode 100644
index 0000000..bd04c33
--- /dev/null
+++ b/src/trace/TraceTransferBean.cpp
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TraceTransferBean.h"
+#include <string>
+#include <vector>
+
+namespace rocketmq {
+std::string TraceTransferBean::getTransData() {
+  return m_transData;
+}
+
+void TraceTransferBean::setTransData(const std::string& transData) {
+  m_transData = transData;
+}
+
+std::vector<std::string> TraceTransferBean::getTransKey() {
+  return m_transKey;
+}
+
+void TraceTransferBean::setTransKey(const std::string& transkey) {
+  m_transKey.push_back(transkey);
+}
+}  // namespace rocketmq
\ No newline at end of file
diff --git a/src/include/SendMessageHook.h b/src/trace/TraceTransferBean.h
similarity index 51%
copy from src/include/SendMessageHook.h
copy to src/trace/TraceTransferBean.h
index 0d3e1e3..5ca054a 100644
--- a/src/include/SendMessageHook.h
+++ b/src/trace/TraceTransferBean.h
@@ -14,36 +14,27 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
 
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_TRACE_TRANSFER_BEAN_H__
+#define __ROCKETMQ_TRACE_TRANSFER_BEAN_H__
+
+#include <string>
+#include <vector>
 
 namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
+class TraceTransferBean {
  public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
+  std::string getTransData();
 
-class SendMessageHook {
- public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+  void setTransData(const std::string& transData);
+
+  std::vector<std::string> getTransKey();
+
+  void setTransKey(const std::string& transkey);
+
+ private:
+  std::string m_transData;
+  std::vector<std::string> m_transKey;
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif
\ No newline at end of file
diff --git a/src/trace/TraceUtil.cpp b/src/trace/TraceUtil.cpp
new file mode 100644
index 0000000..a95fcf8
--- /dev/null
+++ b/src/trace/TraceUtil.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TraceUtil.h"
+#include <sstream>
+#include <string>
+#include "TraceContant.h"
+
+namespace rocketmq {
+std::string TraceUtil::CovertTraceTypeToString(TraceType type) {
+  switch (type) {
+    case Pub:
+      return TraceContant::TRACE_TYPE_PUB;
+    case SubBefore:
+      return TraceContant::TRACE_TYPE_BEFORE;
+    case SubAfter:
+      return TraceContant::TRACE_TYPE_AFTER;
+    default:
+      return TraceContant::TRACE_TYPE_PUB;
+  }
+}
+
+TraceTransferBean TraceUtil::CovertTraceContextToTransferBean(TraceContext* ctx) {
+  std::ostringstream ss;
+  std::vector<TraceBean> beans = ctx->getTraceBeans();
+  switch (ctx->getTraceType()) {
+    case Pub: {
+      std::vector<TraceBean>::iterator it = beans.begin();
+      ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR;
+      ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR;
+      ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR;
+      ss << ctx->getGroupName() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getTopic() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getTags() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getKeys() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getStoreHost() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getBodyLength() << TraceContant::CONTENT_SPLITOR;
+      ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getMsgType() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getOffsetMsgId() << TraceContant::CONTENT_SPLITOR;
+      ss << (ctx->getStatus() ? "true" : "false") << TraceContant::FIELD_SPLITOR;
+    } break;
+
+    case SubBefore: {
+      std::vector<TraceBean>::iterator it = beans.begin();
+      for (; it != beans.end(); ++it) {
+        ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR;
+        ss << ctx->getTimeStamp() << TraceContant::CONTENT_SPLITOR;
+        ss << ctx->getRegionId() << TraceContant::CONTENT_SPLITOR;
+        ss << ctx->getGroupName() << TraceContant::CONTENT_SPLITOR;
+        ss << ctx->getRequestId() << TraceContant::CONTENT_SPLITOR;
+        ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR;
+        ss << it->getRetryTimes() << TraceContant::CONTENT_SPLITOR;
+        // this is a bug caused by broker.
+        std::string defaultKey = "dKey";
+        if (!it->getKeys().empty()) {
+          defaultKey = it->getKeys();
+        }
+        ss << defaultKey << TraceContant::FIELD_SPLITOR;
+      }
+    } break;
+
+    case SubAfter: {
+      std::vector<TraceBean>::iterator it = beans.begin();
+      ss << TraceUtil::CovertTraceTypeToString(ctx->getTraceType()) << TraceContant::CONTENT_SPLITOR;
+      ss << ctx->getRequestId() << TraceContant::CONTENT_SPLITOR;
+      ss << it->getMsgId() << TraceContant::CONTENT_SPLITOR;
+      ss << ctx->getCostTime() << TraceContant::CONTENT_SPLITOR;
+      ss << (ctx->getStatus() ? "true" : "false") << TraceContant::CONTENT_SPLITOR;
+      // this is a bug caused by broker.
+      std::string defaultKey = "dKey";
+      if (!it->getKeys().empty()) {
+        defaultKey = it->getKeys();
+      }
+      ss << defaultKey << TraceContant::FIELD_SPLITOR;
+    } break;
+
+    default:
+      break;
+  }
+
+  TraceTransferBean transferBean;
+  transferBean.setTransData(ss.str());
+
+  switch (ctx->getTraceType()) {
+    case Pub:
+    case SubAfter: {
+      std::vector<TraceBean>::iterator it = beans.begin();
+      transferBean.setTransKey(it->getMsgId());
+      if (it->getKeys() != "") {
+        transferBean.setTransKey(it->getKeys());
+      }
+    } break;
+    case SubBefore: {
+      std::vector<TraceBean>::iterator it = beans.begin();
+      for (; it != beans.end(); ++it) {
+        transferBean.setTransKey((*it).getMsgId());
+        if ((*it).getKeys() != "") {
+          transferBean.setTransKey((*it).getKeys());
+        }
+      }
+    } break;
+    default:
+      break;
+  }
+
+  return transferBean;
+}
+}  // namespace rocketmq
diff --git a/src/include/SendMessageHook.h b/src/trace/TraceUtil.h
similarity index 51%
rename from src/include/SendMessageHook.h
rename to src/trace/TraceUtil.h
index 0d3e1e3..f924844 100644
--- a/src/include/SendMessageHook.h
+++ b/src/trace/TraceUtil.h
@@ -14,36 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#ifndef __SENDMESSAGEHOOK_H__
-#define __SENDMESSAGEHOOK_H__
 
-#include "MQClientException.h"
-#include "MQMessage.h"
-#include "MQMessageQueue.h"
-#include "RocketMQClient.h"
-#include "SendResult.h"
+#ifndef __ROCKETMQ_TRACE_UTIL_H_
+#define __ROCKETMQ_TRACE_UTIL_H_
 
-namespace rocketmq {
-//<!***************************************************************************
-class SendMessageContext {
- public:
-  std::string producerGroup;
-  MQMessage msg;
-  MQMessageQueue mq;
-  std::string brokerAddr;
-  int communicationMode;
-  SendResult sendResult;
-  MQException* pException;
-  void* pArg;
-};
+#include <string>
+#include "TraceContant.h"
+#include "TraceContext.h"
+#include "TraceTransferBean.h"
 
-class SendMessageHook {
+namespace rocketmq {
+class TraceUtil {
  public:
-  virtual ~SendMessageHook() {}
-  virtual std::string hookName() = 0;
-  virtual void sendMessageBefore(const SendMessageContext& context) = 0;
-  virtual void sendMessageAfter(const SendMessageContext& context) = 0;
+  static std::string CovertTraceTypeToString(TraceType type);
+  static TraceTransferBean CovertTraceContextToTransferBean(TraceContext* ctx);
 };
-//<!***************************************************************************
 }  // namespace rocketmq
-#endif
+#endif  //
diff --git a/test/src/common/NameSpaceUtilTest.cpp b/test/src/common/NameSpaceUtilTest.cpp
index 8076c04..08873b5 100644
--- a/test/src/common/NameSpaceUtilTest.cpp
+++ b/test/src/common/NameSpaceUtilTest.cpp
@@ -80,9 +80,11 @@ TEST(NameSpaceUtil, hasNameSpace) {
   string source = "testTopic";
   string ns = "MQ_INST_UNITTEST";
   string nsSource = "MQ_INST_UNITTEST%testTopic";
+  string nsTraceSource = "rmq_sys_TRACE_DATA_Region";
   EXPECT_TRUE(NameSpaceUtil::hasNameSpace(nsSource, ns));
   EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, ns));
   EXPECT_FALSE(NameSpaceUtil::hasNameSpace(source, ""));
+  EXPECT_TRUE(NameSpaceUtil::hasNameSpace(nsTraceSource, ns));
 }
 int main(int argc, char* argv[]) {
   InitGoogleMock(&argc, argv);
diff --git a/test/src/consumer/DefaultMQPushConsumerImplTest.cpp b/test/src/consumer/DefaultMQPushConsumerImplTest.cpp
new file mode 100644
index 0000000..cfdc95f
--- /dev/null
+++ b/test/src/consumer/DefaultMQPushConsumerImplTest.cpp
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+#include <vector>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "ConsumeMessageContext.h"
+#include "ConsumeMessageHookImpl.h"
+#include "DefaultMQProducerImpl.h"
+#include "DefaultMQPushConsumerImpl.h"
+#include "MQMessageExt.h"
+#include "MQMessageQueue.h"
+
+using namespace std;
+using namespace rocketmq;
+using rocketmq::DefaultMQProducerImpl;
+using rocketmq::DefaultMQPushConsumerImpl;
+using testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+class MockDefaultMQProducerImpl : public DefaultMQProducerImpl {
+ public:
+  MockDefaultMQProducerImpl(const string& groupId) : DefaultMQProducerImpl(groupId) {}
+  MOCK_METHOD3(send, void(MQMessage&, SendCallback*, bool));
+};
+TEST(DefaultMQPushConsumerImplTest, init) {
+  DefaultMQPushConsumerImpl* impl = new DefaultMQPushConsumerImpl("testMQConsumerGroup");
+  EXPECT_EQ(impl->getGroupName(), "testMQConsumerGroup");
+  impl->setUnitName("testUnit");
+  EXPECT_EQ(impl->getUnitName(), "testUnit");
+  impl->setTcpTransportPullThreadNum(64);
+  EXPECT_EQ(impl->getTcpTransportPullThreadNum(), 64);
+  impl->setTcpTransportConnectTimeout(2000);
+  EXPECT_EQ(impl->getTcpTransportConnectTimeout(), 2000);
+  impl->setTcpTransportTryLockTimeout(3000);
+  EXPECT_EQ(impl->getTcpTransportTryLockTimeout(), 3);
+  impl->setNamesrvAddr("http://rocketmq.nameserver.com");
+  EXPECT_EQ(impl->getNamesrvAddr(), "rocketmq.nameserver.com");
+  impl->setNameSpace("MQ_INST_NAMESPACE_TEST");
+  EXPECT_EQ(impl->getNameSpace(), "MQ_INST_NAMESPACE_TEST");
+  impl->setMessageTrace(true);
+  EXPECT_TRUE(impl->getMessageTrace());
+  impl->setAsyncPull(true);
+
+  impl->setConsumeMessageBatchMaxSize(3000);
+  EXPECT_EQ(impl->getConsumeMessageBatchMaxSize(), 3000);
+
+  impl->setConsumeThreadCount(3);
+  EXPECT_EQ(impl->getConsumeThreadCount(), 3);
+
+  impl->setMaxReconsumeTimes(30);
+  EXPECT_EQ(impl->getMaxReconsumeTimes(), 30);
+
+  impl->setMaxCacheMsgSizePerQueue(3000);
+  EXPECT_EQ(impl->getMaxCacheMsgSizePerQueue(), 3000);
+
+  impl->setPullMsgThreadPoolCount(10);
+  EXPECT_EQ(impl->getPullMsgThreadPoolCount(), 10);
+}
+
+TEST(DefaultMQPushConsumerImpl, Trace) {
+  DefaultMQPushConsumerImpl* impl = new DefaultMQPushConsumerImpl();
+  MockDefaultMQProducerImpl* implProducer = new MockDefaultMQProducerImpl("testMockProducerTraceGroup");
+  std::shared_ptr<ConsumeMessageHook> hook(new ConsumeMessageHookImpl());
+  impl->setMessageTrace(true);
+  impl->setDefaultMqProducerImpl(implProducer);
+  impl->registerConsumeMessageHook(hook);
+  EXPECT_CALL(*implProducer, send(_, _, _)).WillRepeatedly(Return());
+
+  ConsumeMessageContext consumeMessageContext;
+  MQMessageQueue messageQueue("TestTopic", "BrokerA", 0);
+  MQMessageExt messageExt;
+  messageExt.setMsgId("MessageID");
+  messageExt.setKeys("MessageKey");
+  vector<MQMessageExt> msgs;
+  consumeMessageContext.setDefaultMQPushConsumer(impl);
+  consumeMessageContext.setConsumerGroup("testMockProducerTraceGroup");
+  consumeMessageContext.setMessageQueue(messageQueue);
+  consumeMessageContext.setMsgList(msgs);
+  consumeMessageContext.setSuccess(false);
+  consumeMessageContext.setNameSpace("NameSpace");
+  impl->executeConsumeMessageHookBefore(&consumeMessageContext);
+  impl->executeConsumeMessageHookAfter(&consumeMessageContext);
+
+  msgs.push_back(messageExt);
+  consumeMessageContext.setMsgList(msgs);
+
+  impl->executeConsumeMessageHookBefore(&consumeMessageContext);
+
+  consumeMessageContext.setMsgIndex(0);
+  consumeMessageContext.setStatus("CONSUME_SUCCESS");
+  consumeMessageContext.setSuccess(true);
+  impl->executeConsumeMessageHookAfter(&consumeMessageContext);
+  EXPECT_TRUE(impl->hasConsumeMessageHook());
+  delete implProducer;
+}
+int main(int argc, char* argv[]) {
+  InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}
diff --git a/test/src/producer/DefaultMQProducerImplTest.cpp b/test/src/producer/DefaultMQProducerImplTest.cpp
index b9a042b..3e42e69 100644
--- a/test/src/producer/DefaultMQProducerImplTest.cpp
+++ b/test/src/producer/DefaultMQProducerImplTest.cpp
@@ -103,9 +103,8 @@ TEST(DefaultMQProducerImplTest, init) {
   EXPECT_EQ(impl->getNamesrvAddr(), "rocketmq.nameserver.com");
   impl->setNameSpace("MQ_INST_NAMESPACE_TEST");
   EXPECT_EQ(impl->getNameSpace(), "MQ_INST_NAMESPACE_TEST");
-  // impl->start();
-  // EXPECT_EQ(impl->getGroupName(), "MQ_INST_NAMESPACE_TEST%testMQProducerGroup");
-  // impl->shutdown();
+  impl->setMessageTrace(true);
+  EXPECT_TRUE(impl->getMessageTrace());
 }
 TEST(DefaultMQProducerImplTest, Sends) {
   DefaultMQProducerImpl* impl = new DefaultMQProducerImpl("testMockSendMQProducerGroup");
@@ -187,6 +186,46 @@ TEST(DefaultMQProducerImplTest, Sends) {
   delete mockFactory;
   delete apiImpl;
 }
+TEST(DefaultMQProducerImplTest, Trace) {
+  DefaultMQProducerImpl* impl = new DefaultMQProducerImpl("testMockProducerTraceGroup");
+  MockMQClientFactory* mockFactory = new MockMQClientFactory("testTraceClientId");
+  MockMQClientAPIImpl* apiImpl = new MockMQClientAPIImpl();
+
+  impl->setFactory(mockFactory);
+  impl->setNamesrvAddr("http://rocketmq.nameserver.com");
+  impl->setMessageTrace(true);
+
+  // prepare send
+  boost::shared_ptr<TopicPublishInfo> topicPublishInfo = boost::make_shared<TopicPublishInfo>();
+  MQMessageQueue mqA("TestTraceTopic", "BrokerA", 0);
+  MQMessageQueue mqB("TestTraceTopic", "BrokerB", 0);
+  topicPublishInfo->updateMessageQueueList(mqA);
+  topicPublishInfo->updateMessageQueueList(mqB);
+
+  SendResult okMQAResult(SEND_OK, "MSSAGEID", "OFFSETID", mqA, 1024, "DEFAULT_REGION");
+
+  EXPECT_CALL(*mockFactory, start()).Times(1).WillOnce(Return());
+  EXPECT_CALL(*mockFactory, shutdown()).Times(1).WillOnce(Return());
+  EXPECT_CALL(*mockFactory, registerProducer(_)).Times(1).WillOnce(Return(true));
+  EXPECT_CALL(*mockFactory, unregisterProducer(_)).Times(1).WillOnce(Return());
+  EXPECT_CALL(*mockFactory, sendHeartbeatToAllBroker()).Times(1).WillOnce(Return());
+  EXPECT_CALL(*mockFactory, tryToFindTopicPublishInfo(_, _)).WillRepeatedly(Return(topicPublishInfo));
+  EXPECT_CALL(*mockFactory, findBrokerAddressInPublish(_)).WillRepeatedly(Return("BrokerA"));
+  EXPECT_CALL(*mockFactory, getMQClientAPIImpl()).WillRepeatedly(Return(apiImpl));
+
+  EXPECT_CALL(*apiImpl, sendMessage(_, _, _, _, _, _, _, _, _)).WillRepeatedly(Return(okMQAResult));
+
+  // Start Producer.
+  impl->start();
+
+  MQMessage msg("TestTraceTopic", "testTag", "testKey", "testBodysA");
+  SendResult s1 = impl->send(msg);
+  EXPECT_EQ(s1.getSendStatus(), SEND_OK);
+
+  impl->shutdown();
+  delete mockFactory;
+  delete apiImpl;
+}
 int main(int argc, char* argv[]) {
   InitGoogleMock(&argc, argv);
   return RUN_ALL_TESTS();
diff --git a/test/src/trace/TraceBeanTest.cpp b/test/src/trace/TraceBeanTest.cpp
new file mode 100644
index 0000000..7fcbe32
--- /dev/null
+++ b/test/src/trace/TraceBeanTest.cpp
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "TraceBean.h"
+
+using std::string;
+
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using rocketmq::TraceBean;
+using rocketmq::TraceMessageType;
+
+TEST(TraceBean, Init) {
+  std::string m_topic("topic");
+  std::string m_msgId("msgid");
+  std::string m_offsetMsgId("offsetmsgid");
+  std::string m_tags("tag");
+  std::string m_keys("ksy");
+  std::string m_storeHost("storehost");
+  std::string m_clientHost("clienthost");
+  TraceMessageType m_msgType = TraceMessageType::TRACE_NORMAL_MSG;
+  long long m_storeTime = 100;
+  int m_retryTimes = 2;
+  int m_bodyLength = 1024;
+  TraceBean bean;
+  bean.setTopic(m_topic);
+  bean.setMsgId(m_msgId);
+  bean.setOffsetMsgId(m_offsetMsgId);
+  bean.setTags(m_tags);
+  bean.setKeys(m_keys);
+  bean.setStoreHost(m_storeHost);
+  bean.setClientHost(m_clientHost);
+  bean.setMsgType(m_msgType);
+  bean.setStoreTime(m_storeTime);
+  bean.setRetryTimes(m_retryTimes);
+  bean.setBodyLength(m_bodyLength);
+  EXPECT_EQ(bean.getTopic(), m_topic);
+  EXPECT_EQ(bean.getMsgId(), m_msgId);
+  EXPECT_EQ(bean.getOffsetMsgId(), m_offsetMsgId);
+  EXPECT_EQ(bean.getTags(), m_tags);
+  EXPECT_EQ(bean.getKeys(), m_keys);
+  EXPECT_EQ(bean.getStoreHost(), m_storeHost);
+  EXPECT_EQ(bean.getClientHost(), m_clientHost);
+  EXPECT_EQ(bean.getMsgType(), m_msgType);
+  EXPECT_EQ(bean.getStoreTime(), m_storeTime);
+  EXPECT_EQ(bean.getRetryTimes(), m_retryTimes);
+  EXPECT_EQ(bean.getBodyLength(), m_bodyLength);
+}
+
+int main(int argc, char* argv[]) {
+  InitGoogleMock(&argc, argv);
+  testing::GTEST_FLAG(throw_on_failure) = true;
+  testing::GTEST_FLAG(filter) = "TraceBean.*";
+  int itestts = RUN_ALL_TESTS();
+  return itestts;
+}
diff --git a/test/src/trace/TraceUtilTest.cpp b/test/src/trace/TraceUtilTest.cpp
new file mode 100644
index 0000000..b5e4202
--- /dev/null
+++ b/test/src/trace/TraceUtilTest.cpp
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <string>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "TraceContant.h"
+#include "TraceUtil.h"
+
+using std::string;
+
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using rocketmq::TraceBean;
+using rocketmq::TraceContant;
+using rocketmq::TraceContext;
+using rocketmq::TraceMessageType;
+using rocketmq::TraceTransferBean;
+using rocketmq::TraceType;
+using rocketmq::TraceUtil;
+
+TEST(TraceUtil, CovertTraceTypeToString) {
+  EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::Pub), TraceContant::TRACE_TYPE_PUB);
+  EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::SubBefore), TraceContant::TRACE_TYPE_BEFORE);
+  EXPECT_EQ(TraceUtil::CovertTraceTypeToString(TraceType::SubAfter), TraceContant::TRACE_TYPE_AFTER);
+  EXPECT_EQ(TraceUtil::CovertTraceTypeToString((TraceType)5), TraceContant::TRACE_TYPE_PUB);
+}
+TEST(TraceUtil, CovertTraceContextToTransferBean) {
+  TraceContext context;
+  TraceBean bean;
+  bean.setMsgType(TraceMessageType::TRACE_NORMAL_MSG);
+  bean.setMsgId("MessageID");
+  bean.setKeys("MessageKey");
+  context.setRegionId("region");
+  context.setMsgType(TraceMessageType::TRACE_TRANS_COMMIT_MSG);
+  context.setTraceType(TraceType::Pub);
+  context.setGroupName("PubGroup");
+  context.setCostTime(50);
+  context.setStatus(true);
+  context.setTraceBean(bean);
+  context.setTraceBeanIndex(1);
+  TraceTransferBean beanPub = TraceUtil::CovertTraceContextToTransferBean(&context);
+  EXPECT_GT(beanPub.getTransKey().size(), 0);
+  context.setTraceType(TraceType::SubBefore);
+  TraceTransferBean beanBefore = TraceUtil::CovertTraceContextToTransferBean(&context);
+  EXPECT_GT(beanBefore.getTransKey().size(), 0);
+
+  context.setTraceType(TraceType::SubAfter);
+  TraceTransferBean beanAfter = TraceUtil::CovertTraceContextToTransferBean(&context);
+  EXPECT_GT(beanAfter.getTransKey().size(), 0);
+
+  TraceContext contextFailed("testGroup");
+  contextFailed.setMsgType(context.getMsgType());
+  contextFailed.setTraceType((TraceType)5);
+  contextFailed.setRequestId(context.getRegionId());
+  contextFailed.setTimeStamp(context.getTimeStamp());
+  contextFailed.setTraceBeanIndex(context.getTraceBeanIndex());
+  TraceTransferBean beanWrong = TraceUtil::CovertTraceContextToTransferBean(&contextFailed);
+  EXPECT_EQ(beanWrong.getTransKey().size(), 0);
+}
+int main(int argc, char* argv[]) {
+  InitGoogleMock(&argc, argv);
+  testing::GTEST_FLAG(throw_on_failure) = true;
+  testing::GTEST_FLAG(filter) = "TraceUtil.*";
+  int itestts = RUN_ALL_TESTS();
+  return itestts;
+}


Mime
View raw message