rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: Update: update network interface. (#60)
Date Wed, 12 Jun 2019 13:13:07 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 5689607  Update: update network interface. (#60)
5689607 is described below

commit 5689607f6d4ddea6001cfdc262580004c7dded9a
Author: James <ywhjames@hotmail.com>
AuthorDate: Wed Jun 12 21:13:02 2019 +0800

    Update: update network interface. (#60)
    
    * update network interface.
    
    - feature: use only one event loop for all TcpTransport.
    - update: network components.
    
    * remove boost mutex, timed_mutex and condition_variable in TcpRemotingClient, TcpTransport and ReponseFunture.
---
 src/common/AsyncCallbackWrap.cpp    | 386 ++++++++++-----------
 src/common/noncopyable.h            |  33 ++
 src/transport/EventLoop.cpp         | 241 ++++++++++++++
 src/transport/EventLoop.h           | 117 +++++++
 src/transport/ResponseFuture.cpp    | 159 ++++-----
 src/transport/ResponseFuture.h      |  66 ++--
 src/transport/TcpRemotingClient.cpp | 647 +++++++++++++++++-------------------
 src/transport/TcpRemotingClient.h   | 262 ++++++++-------
 src/transport/TcpTransport.cpp      | 343 +++++++------------
 src/transport/TcpTransport.h        |  88 ++---
 10 files changed, 1288 insertions(+), 1054 deletions(-)

diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
old mode 100644
new mode 100755
index 52e0374..cb26fda
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -1,193 +1,193 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-#include "AsyncCallbackWrap.h"
-#include "Logging.h"
-#include "MQClientAPIImpl.h"
-#include "MQDecoder.h"
-#include "MQMessageQueue.h"
-#include "MQProtos.h"
-#include "PullAPIWrapper.h"
-#include "PullResultExt.h"
-#include "ResponseFuture.h"
-
-namespace rocketmq {
-//<!***************************************************************************
-AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI)
-    : m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
-
-AsyncCallbackWrap::~AsyncCallbackWrap() {
-  m_pAsyncCallBack = NULL;
-  m_pClientAPI = NULL;
-}
-
-//<!************************************************************************
-SendCallbackWrap::SendCallbackWrap(const string& brokerName,
-                                   const MQMessage& msg,
-                                   AsyncCallback* pAsyncCallback,
-                                   MQClientAPIImpl* pclientAPI)
-    : AsyncCallbackWrap(pAsyncCallback, pclientAPI), m_msg(msg), m_brokerName(brokerName) {}
-
-void SendCallbackWrap::onException() {
-  if (m_pAsyncCallBack == NULL)
-    return;
-
-  SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
-  if (pCallback) {
-    unique_ptr<MQException> exception(
-        new MQException("send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__));
-    pCallback->onException(*exception);
-    if (pCallback->getSendCallbackType() == autoDeleteSendCallback) {
-      deleteAndZero(pCallback);
-    }
-  }
-}
-
-void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
-  unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
-
-  if (m_pAsyncCallBack == NULL) {
-    return;
-  }
-  int opaque = pResponseFuture->getOpaque();
-  SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
-
-  if (!pResponse) {
-    string err = "unknow reseaon";
-    if (!pResponseFuture->isSendRequestOK()) {
-      err = "send request failed";
-
-    } else if (pResponseFuture->isTimeOut()) {
-      // pResponseFuture->setAsyncResponseFlag();
-      err = "wait response timeout";
-    }
-    if (pCallback) {
-      MQException exception(err, -1, __FILE__, __LINE__);
-      pCallback->onException(exception);
-    }
-    LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
-  } else {
-    try {
-      SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
-      if (pCallback) {
-        LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
-                  opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
-        pCallback->onSuccess(ret);
-      }
-    } catch (MQException& e) {
-      LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
-
-      // broker may return exception, need consider retry send
-      int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
-      int retryTimes = pResponseFuture->getRetrySendTimes();
-      if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
-        int64 left_timeout_ms = pResponseFuture->leftTime();
-        string brokerAddr = pResponseFuture->getBrokerAddr();
-        const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
-        retryTimes += 1;
-        LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
-                 opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
-
-        bool exception_flag = false;
-        try {
-          m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg,
-                                         (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes,
-                                         retryTimes);
-        } catch (MQClientException& e) {
-          LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque,
-                    retryTimes, m_msg.toString().data());
-          exception_flag = true;
-        }
-
-        if (exception_flag == false) {
-          return;  // send retry again, here need return
-        }
-      }
-
-      if (pCallback) {
-        MQException exception("process send response error", -1, __FILE__, __LINE__);
-        pCallback->onException(exception);
-      }
-    }
-  }
-  if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
-    deleteAndZero(pCallback);
-  }
-}
-
-//<!************************************************************************
-PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
-    : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
-  m_pArg = *static_cast<AsyncArg*>(pArg);
-}
-
-PullCallbackWarp::~PullCallbackWarp() {}
-
-void PullCallbackWarp::onException() {
-  if (m_pAsyncCallBack == NULL)
-    return;
-
-  PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
-  if (pCallback) {
-    MQException exception("wait response timeout", -1, __FILE__, __LINE__);
-    pCallback->onException(exception);
-  } else {
-    LOG_ERROR("PullCallback is NULL, AsyncPull could not continue");
-  }
-}
-
-void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
-  unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
-  if (m_pAsyncCallBack == NULL) {
-    LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
-    return;
-  }
-  PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
-  if (!pResponse) {
-    string err = "unknow reseaon";
-    if (!pResponseFuture->isSendRequestOK()) {
-      err = "send request failed";
-
-    } else if (pResponseFuture->isTimeOut()) {
-      // pResponseFuture->setAsyncResponseFlag();
-      err = "wait response timeout";
-    }
-    MQException exception(err, -1, __FILE__, __LINE__);
-    LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
-    if (pCallback && bProducePullRequest)
-      pCallback->onException(exception);
-  } else {
-    try {
-      if (m_pArg.pPullWrapper) {
-        unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
-        PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
-        if (pCallback)
-          pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
-      } else {
-        LOG_ERROR("pPullWrapper had been destroyed with consumer");
-      }
-    } catch (MQException& e) {
-      LOG_ERROR(e.what());
-      MQException exception("pullResult error", -1, __FILE__, __LINE__);
-      if (pCallback && bProducePullRequest)
-        pCallback->onException(exception);
-    }
-  }
-}
-
-//<!***************************************************************************
-}  //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AsyncCallbackWrap.h"
+#include "Logging.h"
+#include "MQClientAPIImpl.h"
+#include "MQDecoder.h"
+#include "MQMessageQueue.h"
+#include "MQProtos.h"
+#include "PullAPIWrapper.h"
+#include "PullResultExt.h"
+#include "ResponseFuture.h"
+
+namespace rocketmq {
+//<!***************************************************************************
+AsyncCallbackWrap::AsyncCallbackWrap(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI)
+    : m_pAsyncCallBack(pAsyncCallback), m_pClientAPI(pclientAPI) {}
+
+AsyncCallbackWrap::~AsyncCallbackWrap() {
+  m_pAsyncCallBack = NULL;
+  m_pClientAPI = NULL;
+}
+
+//<!************************************************************************
+SendCallbackWrap::SendCallbackWrap(const string& brokerName,
+                                   const MQMessage& msg,
+                                   AsyncCallback* pAsyncCallback,
+                                   MQClientAPIImpl* pclientAPI)
+    : AsyncCallbackWrap(pAsyncCallback, pclientAPI), m_msg(msg), m_brokerName(brokerName) {}
+
+void SendCallbackWrap::onException() {
+  if (m_pAsyncCallBack == NULL)
+    return;
+
+  SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
+  if (pCallback) {
+    unique_ptr<MQException> exception(
+        new MQException("send msg failed due to wait response timeout or network error", -1, __FILE__, __LINE__));
+    pCallback->onException(*exception);
+    if (pCallback->getSendCallbackType() == autoDeleteSendCallback) {
+      deleteAndZero(pCallback);
+    }
+  }
+}
+
+void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
+  unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
+
+  if (m_pAsyncCallBack == NULL) {
+    return;
+  }
+  int opaque = pResponseFuture->getOpaque();
+  SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
+
+  if (!pResponse) {
+    string err = "unknow reseaon";
+    if (!pResponseFuture->isSendRequestOK()) {
+      err = "send request failed";
+
+    } else if (pResponseFuture->isTimeOut()) {
+      // pResponseFuture->setAsyncResponseFlag();
+      err = "wait response timeout";
+    }
+    if (pCallback) {
+      MQException exception(err, -1, __FILE__, __LINE__);
+      pCallback->onException(exception);
+    }
+    LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
+  } else {
+    try {
+      SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
+      if (pCallback) {
+        LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d",
+                  opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
+        pCallback->onSuccess(ret);
+      }
+    } catch (MQException& e) {
+      LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
+
+      // broker may return exception, need consider retry send
+      int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
+      int retryTimes = pResponseFuture->getRetrySendTimes();
+      if (pResponseFuture->getAsyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
+        int64 left_timeout_ms = pResponseFuture->leftTime();
+        string brokerAddr = pResponseFuture->getBrokerAddr();
+        const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
+        retryTimes += 1;
+        LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s",
+                 opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
+
+        bool exception_flag = false;
+        try {
+          m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg,
+                                         (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes,
+                                         retryTimes);
+        } catch (MQClientException& e) {
+          LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque,
+                    retryTimes, m_msg.toString().data());
+          exception_flag = true;
+        }
+
+        if (exception_flag == false) {
+          return;  // send retry again, here need return
+        }
+      }
+
+      if (pCallback) {
+        MQException exception("process send response error", -1, __FILE__, __LINE__);
+        pCallback->onException(exception);
+      }
+    }
+  }
+  if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
+    deleteAndZero(pCallback);
+  }
+}
+
+//<!************************************************************************
+PullCallbackWarp::PullCallbackWarp(AsyncCallback* pAsyncCallback, MQClientAPIImpl* pclientAPI, void* pArg)
+    : AsyncCallbackWrap(pAsyncCallback, pclientAPI) {
+  m_pArg = *static_cast<AsyncArg*>(pArg);
+}
+
+PullCallbackWarp::~PullCallbackWarp() {}
+
+void PullCallbackWarp::onException() {
+  if (m_pAsyncCallBack == NULL)
+    return;
+
+  PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
+  if (pCallback) {
+    MQException exception("wait response timeout", -1, __FILE__, __LINE__);
+    pCallback->onException(exception);
+  } else {
+    LOG_ERROR("PullCallback is NULL, AsyncPull could not continue");
+  }
+}
+
+void PullCallbackWarp::operationComplete(ResponseFuture* pResponseFuture, bool bProducePullRequest) {
+  unique_ptr<RemotingCommand> pResponse(pResponseFuture->getCommand());
+  if (m_pAsyncCallBack == NULL) {
+    LOG_ERROR("m_pAsyncCallBack is NULL, AsyncPull could not continue");
+    return;
+  }
+  PullCallback* pCallback = static_cast<PullCallback*>(m_pAsyncCallBack);
+  if (!pResponse) {
+    string err = "unknow reseaon";
+    if (!pResponseFuture->isSendRequestOK()) {
+      err = "send request failed";
+
+    } else if (pResponseFuture->isTimeOut()) {
+      // pResponseFuture->setAsyncResponseFlag();
+      err = "wait response timeout";
+    }
+    MQException exception(err, -1, __FILE__, __LINE__);
+    LOG_ERROR("Async pull exception of opaque:%d", pResponseFuture->getOpaque());
+    if (pCallback && bProducePullRequest)
+      pCallback->onException(exception);
+  } else {
+    try {
+      if (m_pArg.pPullWrapper) {
+        unique_ptr<PullResult> pullResult(m_pClientAPI->processPullResponse(pResponse.get()));
+        PullResult result = m_pArg.pPullWrapper->processPullResult(m_pArg.mq, pullResult.get(), &m_pArg.subData);
+        if (pCallback)
+          pCallback->onSuccess(m_pArg.mq, result, bProducePullRequest);
+      } else {
+        LOG_ERROR("pPullWrapper had been destroyed with consumer");
+      }
+    } catch (MQException& e) {
+      LOG_ERROR(e.what());
+      MQException exception("pullResult error", -1, __FILE__, __LINE__);
+      if (pCallback && bProducePullRequest)
+        pCallback->onException(exception);
+    }
+  }
+}
+
+//<!***************************************************************************
+}  // namespace rocketmq
diff --git a/src/common/noncopyable.h b/src/common/noncopyable.h
new file mode 100644
index 0000000..f52f988
--- /dev/null
+++ b/src/common/noncopyable.h
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __NONCOPYABLE_H__
+#define __NONCOPYABLE_H__
+
+namespace rocketmq {
+
+class noncopyable {
+ protected:
+  noncopyable() = default;
+  ~noncopyable() = default;
+
+  noncopyable(const noncopyable&) = delete;
+  noncopyable& operator=(const noncopyable&) = delete;
+};
+
+}  // namespace rocketmq
+
+#endif  //__NONCOPYABLE_H__
diff --git a/src/transport/EventLoop.cpp b/src/transport/EventLoop.cpp
new file mode 100644
index 0000000..3d67405
--- /dev/null
+++ b/src/transport/EventLoop.cpp
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "EventLoop.h"
+
+#if !defined(WIN32) && !defined(__APPLE__)
+#include <sys/prctl.h>
+#endif
+
+#include <event2/thread.h>
+
+#include "Logging.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+EventLoop* EventLoop::GetDefaultEventLoop() {
+  static EventLoop defaultEventLoop;
+  return &defaultEventLoop;
+}
+
+EventLoop::EventLoop(const struct event_config* config, bool run_immediately)
+    : m_eventBase(nullptr), m_loopThread(nullptr), _is_running(false) {
+  // tell libevent support multi-threads
+#ifdef WIN32
+  evthread_use_windows_threads();
+#else
+  evthread_use_pthreads();
+#endif
+
+  if (config == nullptr) {
+    m_eventBase = event_base_new();
+  } else {
+    m_eventBase = event_base_new_with_config(config);
+  }
+
+  if (m_eventBase == nullptr) {
+    // failure...
+    LOG_ERROR("Failed to create event base!");
+    return;
+  }
+
+  evthread_make_base_notifiable(m_eventBase);
+
+  if (run_immediately) {
+    start();
+  }
+}
+
+EventLoop::~EventLoop() {
+  stop();
+
+  if (m_eventBase != nullptr) {
+    event_base_free(m_eventBase);
+    m_eventBase = nullptr;
+  }
+}
+
+void EventLoop::start() {
+  if (m_loopThread == nullptr) {
+    // start event loop
+#if !defined(WIN32) && !defined(__APPLE__)
+    string taskName = UtilAll::getProcessName();
+    prctl(PR_SET_NAME, "EventLoop", 0, 0, 0);
+#endif
+    m_loopThread = new std::thread(&EventLoop::runLoop, this);
+#if !defined(WIN32) && !defined(__APPLE__)
+    prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+#endif
+  }
+}
+
+void EventLoop::stop() {
+  if (m_loopThread != nullptr /*&& m_loopThread.joinable()*/) {
+    _is_running = false;
+    m_loopThread->join();
+
+    delete m_loopThread;
+    m_loopThread = nullptr;
+  }
+}
+
+void EventLoop::runLoop() {
+  _is_running = true;
+
+  while (_is_running) {
+    int ret;
+
+    ret = event_base_dispatch(m_eventBase);
+    //    ret = event_base_loop(m_eventBase, EVLOOP_NONBLOCK);
+
+    if (ret == 1) {
+      // no event
+      std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    }
+  }
+}
+
+#define OPT_UNLOCK_CALLBACKS (BEV_OPT_DEFER_CALLBACKS | BEV_OPT_UNLOCK_CALLBACKS)
+
+BufferEvent* EventLoop::createBufferEvent(socket_t fd, int options) {
+  struct bufferevent* event = bufferevent_socket_new(m_eventBase, fd, options);
+  if (event == nullptr) {
+    return nullptr;
+  }
+
+  bool unlock = (options & OPT_UNLOCK_CALLBACKS) == OPT_UNLOCK_CALLBACKS;
+
+  return new BufferEvent(event, unlock);
+}
+
+BufferEvent::BufferEvent(struct bufferevent* event, bool unlockCallbacks)
+    : m_bufferEvent(event),
+      m_unlockCallbacks(unlockCallbacks),
+      m_readCallback(nullptr),
+      m_writeCallback(nullptr),
+      m_eventCallback(nullptr),
+      m_callbackTransport() {
+#ifdef ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+  if (m_bufferEvent != nullptr) {
+    bufferevent_setcb(m_bufferEvent, read_callback, write_callback, event_callback, this);
+  }
+#endif  // ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+}
+
+BufferEvent::~BufferEvent() {
+  if (m_bufferEvent != nullptr) {
+    // free function will set all callbacks to NULL first.
+    bufferevent_free(m_bufferEvent);
+    m_bufferEvent = nullptr;
+  }
+}
+
+void BufferEvent::setCallback(BufferEventDataCallback readCallback,
+                              BufferEventDataCallback writeCallback,
+                              BufferEventEventCallback eventCallback,
+                              std::shared_ptr<TcpTransport> transport) {
+  // use lock in bufferevent
+  bufferevent_lock(m_bufferEvent);
+
+  // wrap callback
+  m_readCallback = readCallback;
+  m_writeCallback = writeCallback;
+  m_eventCallback = eventCallback;
+  m_callbackTransport = transport;
+
+#ifndef ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+  bufferevent_data_cb readcb = readCallback != nullptr ? read_callback : nullptr;
+  bufferevent_data_cb writecb = writeCallback != nullptr ? write_callback : nullptr;
+  bufferevent_event_cb eventcb = eventCallback != nullptr ? event_callback : nullptr;
+
+  bufferevent_setcb(m_bufferEvent, readcb, writecb, eventcb, this);
+#endif  // ROCKETMQ_BUFFEREVENT_PROXY_ALL_CALLBACK
+
+  bufferevent_unlock(m_bufferEvent);
+}
+
+void BufferEvent::read_callback(struct bufferevent* bev, void* ctx) {
+  auto event = static_cast<BufferEvent*>(ctx);
+
+  if (event->m_unlockCallbacks)
+    bufferevent_lock(event->m_bufferEvent);
+
+  BufferEventDataCallback callback = event->m_readCallback;
+  std::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
+
+  if (event->m_unlockCallbacks)
+    bufferevent_unlock(event->m_bufferEvent);
+
+  if (callback) {
+    callback(event, transport.get());
+  }
+}
+
+void BufferEvent::write_callback(struct bufferevent* bev, void* ctx) {
+  auto event = static_cast<BufferEvent*>(ctx);
+
+  if (event->m_unlockCallbacks)
+    bufferevent_lock(event->m_bufferEvent);
+
+  BufferEventDataCallback callback = event->m_writeCallback;
+  std::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
+
+  if (event->m_unlockCallbacks)
+    bufferevent_unlock(event->m_bufferEvent);
+
+  if (callback) {
+    callback(event, transport.get());
+  }
+}
+
+static std::string buildPeerAddrPort(socket_t fd) {
+  sockaddr_in addr;
+  socklen_t len = sizeof(addr);
+
+  getpeername(fd, (struct sockaddr*)&addr, &len);
+
+  LOG_DEBUG("socket: %d, addr: %s, port: %d", fd, inet_ntoa(addr.sin_addr), ntohs(addr.sin_port));
+  std::string addrPort(inet_ntoa(addr.sin_addr));
+  addrPort.append(":");
+  addrPort.append(UtilAll::to_string(ntohs(addr.sin_port)));
+
+  return addrPort;
+}
+
+void BufferEvent::event_callback(struct bufferevent* bev, short what, void* ctx) {
+  auto event = static_cast<BufferEvent*>(ctx);
+
+  if (what & BEV_EVENT_CONNECTED) {
+    socket_t fd = event->getfd();
+    event->m_peerAddrPort = buildPeerAddrPort(fd);
+  }
+
+  if (event->m_unlockCallbacks)
+    bufferevent_lock(event->m_bufferEvent);
+
+  BufferEventEventCallback callback = event->m_eventCallback;
+  std::shared_ptr<TcpTransport> transport = event->m_callbackTransport.lock();
+
+  if (event->m_unlockCallbacks)
+    bufferevent_unlock(event->m_bufferEvent);
+
+  if (callback) {
+    callback(event, what, transport.get());
+  }
+}
+
+}  // namespace rocketmq
diff --git a/src/transport/EventLoop.h b/src/transport/EventLoop.h
new file mode 100644
index 0000000..c974479
--- /dev/null
+++ b/src/transport/EventLoop.h
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __EVENTLOOP_H__
+#define __EVENTLOOP_H__
+
+#include <memory>
+#include <thread>
+
+#include <event2/buffer.h>
+#include <event2/bufferevent.h>
+#include <event2/event.h>
+
+#include "noncopyable.h"
+
+using socket_t = evutil_socket_t;
+
+namespace rocketmq {
+
+class BufferEvent;
+
+class EventLoop : public noncopyable {
+ public:
+  static EventLoop* GetDefaultEventLoop();
+
+ public:
+  explicit EventLoop(const struct event_config* config = nullptr, bool run_immediately = true);
+  virtual ~EventLoop();
+
+  void start();
+  void stop();
+
+  BufferEvent* createBufferEvent(socket_t fd, int options);
+
+ private:
+  void runLoop();
+
+ private:
+  struct event_base* m_eventBase;
+  std::thread* m_loopThread;
+
+  bool _is_running;  // aotmic is unnecessary
+};
+
+class TcpTransport;
+
+using BufferEventDataCallback = void (*)(BufferEvent* event, TcpTransport* transport);
+using BufferEventEventCallback = void (*)(BufferEvent* event, short what, TcpTransport* transport);
+
+class BufferEvent : public noncopyable {
+ public:
+  virtual ~BufferEvent();
+
+  void setCallback(BufferEventDataCallback readCallback,
+                   BufferEventDataCallback writeCallback,
+                   BufferEventEventCallback eventCallback,
+                   std::shared_ptr<TcpTransport> transport);
+
+  void setWatermark(short events, size_t lowmark, size_t highmark) {
+    bufferevent_setwatermark(m_bufferEvent, events, lowmark, highmark);
+  }
+
+  int enable(short event) { return bufferevent_enable(m_bufferEvent, event); }
+
+  int connect(const struct sockaddr* addr, int socklen) {
+    return bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)addr, socklen);
+  }
+
+  int write(const void* data, size_t size) { return bufferevent_write(m_bufferEvent, data, size); }
+
+  size_t read(void* data, size_t size) { return bufferevent_read(m_bufferEvent, data, size); }
+
+  struct evbuffer* getInput() {
+    return bufferevent_get_input(m_bufferEvent);
+  }
+
+  socket_t getfd() const { return bufferevent_getfd(m_bufferEvent); }
+
+  std::string getPeerAddrPort() const { return m_peerAddrPort; }
+
+ private:
+  BufferEvent(struct bufferevent* event, bool unlockCallbacks);
+  friend EventLoop;
+
+  static void read_callback(struct bufferevent* bev, void* ctx);
+  static void write_callback(struct bufferevent* bev, void* ctx);
+  static void event_callback(struct bufferevent* bev, short what, void* ctx);
+
+ private:
+  struct bufferevent* m_bufferEvent;
+  const bool m_unlockCallbacks;
+
+  BufferEventDataCallback m_readCallback;
+  BufferEventDataCallback m_writeCallback;
+  BufferEventEventCallback m_eventCallback;
+  std::weak_ptr<TcpTransport> m_callbackTransport;  // avoid reference cycle
+
+  // cache properties
+  std::string m_peerAddrPort;
+};
+
+}  // namespace rocketmq
+
+#endif  //__EVENTLOOP_H__
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
old mode 100644
new mode 100755
index cc10daa..b0b2613
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -15,46 +15,42 @@
  * limitations under the License.
  */
 #include "ResponseFuture.h"
+
+#include <chrono>
+
 #include "Logging.h"
 #include "TcpRemotingClient.h"
 
 namespace rocketmq {
+
 //<!************************************************************************
 ResponseFuture::ResponseFuture(int requestCode,
                                int opaque,
                                TcpRemotingClient* powner,
                                int64 timeout,
-                               bool bAsync /* = false */,
-                               AsyncCallbackWrap* pcall /* = NULL */) {
-  m_bAsync.store(bAsync);
-  m_requestCode = requestCode;
-  m_opaque = opaque;
-  m_timeout = timeout;
-  m_pCallbackWrap = pcall;
-  m_pResponseCommand = NULL;
-  m_sendRequestOK = false;
-  m_maxRetrySendTimes = 1;
-  m_retrySendTimes = 1;
+                               bool bAsync,
+                               AsyncCallbackWrap* pCallback)
+    : m_requestCode(requestCode),
+      m_opaque(opaque),
+      m_timeout(timeout),
+      m_bAsync(bAsync),
+      m_pCallbackWrap(pCallback),
+      m_asyncCallbackStatus(ASYNC_CALLBACK_STATUS_INIT),
+      m_haveResponse(false),
+      m_sendRequestOK(false),
+      m_pResponseCommand(nullptr),
+      m_maxRetrySendTimes(1),
+      m_retrySendTimes(1) {
   m_brokerAddr = "";
   m_beginTimestamp = UtilAll::currentTimeMillis();
-  m_asyncCallbackStatus = asyncCallBackStatus_init;
-  if (getASyncFlag()) {
-    m_asyncResponse.store(false);
-    m_syncResponse.store(true);
-  } else {
-    m_asyncResponse.store(true);
-    m_syncResponse.store(false);
-  }
 }
 
 ResponseFuture::~ResponseFuture() {
   deleteAndZero(m_pCallbackWrap);
   /*
-    do not set m_pResponseCommand to NULL when destruct, as m_pResponseCommand
-    is used by MQClientAPIImpl concurrently, and will be released by producer or
-    consumer;
-    m_pResponseCommand = NULL;
-  */
+    do not delete m_pResponseCommand when destruct, as m_pResponseCommand
+    is used by MQClientAPIImpl concurrently, and will be released by producer or consumer;
+   */
 }
 
 void ResponseFuture::releaseThreadCondition() {
@@ -62,55 +58,41 @@ void ResponseFuture::releaseThreadCondition() {
 }
 
 RemotingCommand* ResponseFuture::waitResponse(int timeoutMillis) {
-  boost::unique_lock<boost::mutex> lk(m_defaultEventLock);
-  if (!m_defaultEvent.timed_wait(lk, boost::posix_time::milliseconds(timeoutMillis))) {
-    LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque);
-    m_syncResponse.store(true);
+  std::unique_lock<std::mutex> eventLock(m_defaultEventLock);
+  if (!m_haveResponse) {
+    if (timeoutMillis <= 0) {
+      timeoutMillis = m_timeout;
+    }
+    if (m_defaultEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
+      LOG_WARN("waitResponse of code:%d with opaque:%d timeout", m_requestCode, m_opaque);
+      m_haveResponse = true;
+    }
   }
   return m_pResponseCommand;
 }
 
-void ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
-  // LOG_DEBUG("setResponse of opaque:%d",m_opaque);
-  m_pResponseCommand = pResponseCommand;
+bool ResponseFuture::setResponse(RemotingCommand* pResponseCommand) {
+  std::unique_lock<std::mutex> eventLock(m_defaultEventLock);
 
-  if (!getASyncFlag()) {
-    if (m_syncResponse.load() == false) {
-      m_defaultEvent.notify_all();
-      m_syncResponse.store(true);
-    }
+  if (m_haveResponse) {
+    return false;
   }
-}
 
-const bool ResponseFuture::getSyncResponseFlag() {
-  if (m_syncResponse.load() == true) {
-    return true;
-  }
-  return false;
-}
+  m_pResponseCommand = pResponseCommand;
+  m_haveResponse = true;
 
-const bool ResponseFuture::getAsyncResponseFlag() {
-  if (m_asyncResponse.load() == true) {
-    // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
-    return true;
+  if (!getAsyncFlag()) {
+    m_defaultEvent.notify_all();
   }
 
-  return false;
-}
-
-void ResponseFuture::setAsyncResponseFlag() {
-  m_asyncResponse.store(true);
+  return true;
 }
 
-const bool ResponseFuture::getASyncFlag() {
-  if (m_bAsync.load() == true) {
-    // LOG_DEBUG("ASYNC flag is TRUE,opaque is:%d",getOpaque() );
-    return true;
-  }
-  return false;
+const bool ResponseFuture::getAsyncFlag() {
+  return m_bAsync;
 }
 
-bool ResponseFuture::isSendRequestOK() {
+bool ResponseFuture::isSendRequestOK() const {
   return m_sendRequestOK;
 }
 
@@ -126,58 +108,32 @@ int ResponseFuture::getRequestCode() const {
   return m_requestCode;
 }
 
-void ResponseFuture::setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus) {
-  boost::lock_guard<boost::mutex> lock(m_asyncCallbackLock);
-  if (m_asyncCallbackStatus == asyncCallBackStatus_init) {
-    m_asyncCallbackStatus = asyncCallbackStatus;
-  }
-}
-
-void ResponseFuture::executeInvokeCallback() {
-  if (m_pCallbackWrap == NULL) {
+void ResponseFuture::invokeCompleteCallback() {
+  if (m_pCallbackWrap == nullptr) {
     deleteAndZero(m_pResponseCommand);
     return;
   } else {
-    if (m_asyncCallbackStatus == asyncCallBackStatus_response) {
-      m_pCallbackWrap->operationComplete(this, true);
-    } else {
-      if (m_pResponseCommand)
-        deleteAndZero(m_pResponseCommand);  // the responseCommand from
-                                            // RemotingCommand::Decode(mem) will
-                                            // only deleted by operationComplete
-                                            // automatically
-      LOG_WARN(
-          "timeout and response incoming concurrently of opaque:%d, and "
-          "executeInvokeCallbackException was called earlier",
-          m_opaque);
-    }
+    m_pCallbackWrap->operationComplete(this, true);
   }
 }
 
-void ResponseFuture::executeInvokeCallbackException() {
-  if (m_pCallbackWrap == NULL) {
+void ResponseFuture::invokeExceptionCallback() {
+  if (m_pCallbackWrap == nullptr) {
     LOG_ERROR("m_pCallbackWrap is NULL, critical error");
     return;
   } else {
-    if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
-      // here no need retrySendTimes process because of it have timeout
-      LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(),
-                getRetrySendTimes(), getMaxRetrySendTimes());
-
-      m_pCallbackWrap->onException();
-    } else {
-      LOG_WARN(
-          "timeout and response incoming concurrently of opaque:%d, and "
-          "executeInvokeCallback was called earlier",
-          m_opaque);
-    }
+    // here no need retrySendTimes process because of it have timeout
+    LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(),
+              getMaxRetrySendTimes());
+
+    m_pCallbackWrap->onException();
   }
 }
 
 bool ResponseFuture::isTimeOut() const {
   int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
   //<!only async;
-  return m_bAsync.load() == 1 && diff > m_timeout;
+  return m_bAsync && diff > m_timeout;
 }
 
 int ResponseFuture::getMaxRetrySendTimes() const {
@@ -197,16 +153,17 @@ void ResponseFuture::setRetrySendTimes(int retryTimes) {
 void ResponseFuture::setBrokerAddr(const std::string& brokerAddr) {
   m_brokerAddr = brokerAddr;
 }
+
+std::string ResponseFuture::getBrokerAddr() const {
+  return m_brokerAddr;
+}
+
 void ResponseFuture::setRequestCommand(const RemotingCommand& requestCommand) {
   m_requestCommand = requestCommand;
 }
-
 const RemotingCommand& ResponseFuture::getRequestCommand() {
   return m_requestCommand;
 }
-std::string ResponseFuture::getBrokerAddr() const {
-  return m_brokerAddr;
-}
 
 int64 ResponseFuture::leftTime() const {
   int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
@@ -222,4 +179,4 @@ AsyncCallbackWrap* ResponseFuture::getAsyncCallbackWrap() {
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
old mode 100644
new mode 100755
index d6f6b7a..66be663
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -16,19 +16,21 @@
  */
 #ifndef __RESPONSEFUTURE_H__
 #define __RESPONSEFUTURE_H__
-#include <boost/atomic.hpp>
-#include <boost/thread/condition_variable.hpp>
+
+#include <atomic>
+#include <condition_variable>
+
 #include "AsyncCallbackWrap.h"
 #include "RemotingCommand.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
 
-typedef enum asyncCallBackStatus {
-  asyncCallBackStatus_init = 0,
-  asyncCallBackStatus_response = 1,
-  asyncCallBackStatus_timeout = 2
-} asyncCallBackStatus;
+typedef enum AsyncCallbackStatus {
+  ASYNC_CALLBACK_STATUS_INIT = 0,
+  ASYNC_CALLBACK_STATUS_RESPONSE = 1,
+  ASYNC_CALLBACK_STATUS_TIMEOUT = 2
+} AsyncCallbAackStatus;
 
 class TcpRemotingClient;
 //<!***************************************************************************
@@ -39,32 +41,30 @@ class ResponseFuture {
                  TcpRemotingClient* powner,
                  int64 timeoutMilliseconds,
                  bool bAsync = false,
-                 AsyncCallbackWrap* pcall = NULL);
+                 AsyncCallbackWrap* pCallback = nullptr);
   virtual ~ResponseFuture();
+
   void releaseThreadCondition();
-  RemotingCommand* waitResponse(int timeoutMillis);
+  RemotingCommand* waitResponse(int timeoutMillis = 0);
   RemotingCommand* getCommand() const;
 
-  void setResponse(RemotingCommand* pResponseCommand);
-  bool isSendRequestOK();
+  bool setResponse(RemotingCommand* pResponseCommand);
+
+  bool isSendRequestOK() const;
   void setSendRequestOK(bool sendRequestOK);
   int getRequestCode() const;
   int getOpaque() const;
 
   //<!callback;
-  void executeInvokeCallback();
-  void executeInvokeCallbackException();
+  void invokeCompleteCallback();
+  void invokeExceptionCallback();
   bool isTimeOut() const;
   int getMaxRetrySendTimes() const;
   int getRetrySendTimes() const;
   int64 leftTime() const;
-  // bool    isTimeOutMoreThan30s() const;
-  const bool getASyncFlag();
-  void setAsyncResponseFlag();
-  const bool getAsyncResponseFlag();
-  const bool getSyncResponseFlag();
+  const bool getAsyncFlag();
   AsyncCallbackWrap* getAsyncCallbackWrap();
-  void setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus);
+
   void setMaxRetrySendTimes(int maxRetryTimes);
   void setRetrySendTimes(int retryTimes);
   void setBrokerAddr(const std::string& brokerAddr);
@@ -75,26 +75,30 @@ class ResponseFuture {
  private:
   int m_requestCode;
   int m_opaque;
-  bool m_sendRequestOK;
-  boost::mutex m_defaultEventLock;
-  boost::condition_variable_any m_defaultEvent;
-  int64 m_beginTimestamp;
   int64 m_timeout;  // ms
-  boost::atomic<bool> m_bAsync;
-  RemotingCommand* m_pResponseCommand;  //<!delete outside;
+
+  const bool m_bAsync;
   AsyncCallbackWrap* m_pCallbackWrap;
-  boost::mutex m_asyncCallbackLock;
-  asyncCallBackStatus m_asyncCallbackStatus;
-  boost::atomic<bool> m_asyncResponse;
-  boost::atomic<bool> m_syncResponse;
+
+  AsyncCallbackStatus m_asyncCallbackStatus;
+  std::mutex m_asyncCallbackLock;
+
+  bool m_haveResponse;
+  std::mutex m_defaultEventLock;
+  std::condition_variable m_defaultEvent;
+
+  int64 m_beginTimestamp;
+  bool m_sendRequestOK;
+  RemotingCommand* m_pResponseCommand;  //<!delete outside;
 
   int m_maxRetrySendTimes;
   int m_retrySendTimes;
+
   std::string m_brokerAddr;
   RemotingCommand m_requestCommand;
-  // TcpRemotingClient*    m_tcpRemoteClient;
 };
+
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
old mode 100644
new mode 100755
index 8256391..2f2ab46
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -19,6 +19,7 @@
 #if !defined(WIN32) && !defined(__APPLE__)
 #include <sys/prctl.h>
 #endif
+
 #include "Logging.h"
 #include "MemoryOutputStream.h"
 #include "TopAddressing.h"
@@ -35,27 +36,31 @@ TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeo
       m_ioServiceWork(m_ioService) {
 #if !defined(WIN32) && !defined(__APPLE__)
   string taskName = UtilAll::getProcessName();
-  prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
+  prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0);
 #endif
-  for (int i = 0; i != pullThreadNum; ++i) {
+  for (int i = 0; i != m_pullThreadNum; ++i) {
     m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, &m_ioService));
   }
 #if !defined(WIN32) && !defined(__APPLE__)
   prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
 #endif
-  LOG_INFO(
-      "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
-      "m_pullThreadNum:%d",
-      m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
+
+  LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d", m_tcpConnectTimeout,
+           m_tcpTransportTryLockTimeout, m_pullThreadNum);
+
   m_async_service_thread.reset(new boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
 }
 
 void TcpRemotingClient::boost_asio_work() {
-  LOG_INFO("TcpRemotingClient::boost asio async service runing");
-  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
-                                                          // service stops after
-                                                          // first timer timeout
-                                                          // callback
+  LOG_INFO("TcpRemotingClient::boost asio async service running");
+
+#if !defined(WIN32) && !defined(__APPLE__)
+  prctl(PR_SET_NAME, "RemotingAsioT", 0, 0, 0);
+#endif
+
+  // avoid async io service stops after first timer timeout callback
+  boost::asio::io_service::work work(m_async_ioService);
+
   m_async_ioService.run();
 }
 
@@ -69,15 +74,15 @@ TcpRemotingClient::~TcpRemotingClient() {
 
 void TcpRemotingClient::stopAllTcpTransportThread() {
   LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
+
   m_async_ioService.stop();
   m_async_service_thread->interrupt();
   m_async_service_thread->join();
   removeAllTimerCallback();
 
   {
-    TcpMap::iterator it = m_tcpTable.begin();
-    for (; it != m_tcpTable.end(); ++it) {
-      it->second->disconnect(it->first);
+    for (const auto& trans : m_tcpTable) {
+      trans.second->disconnect(trans.first);
     }
     m_tcpTable.clear();
   }
@@ -86,62 +91,66 @@ void TcpRemotingClient::stopAllTcpTransportThread() {
   m_threadpool.join_all();
 
   {
-    boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-    for (ResMap::iterator it = m_futureTable.begin(); it != m_futureTable.end(); ++it) {
-      if (it->second)
-        it->second->releaseThreadCondition();
+    std::lock_guard<std::mutex> lock(m_futureTableLock);
+    for (const auto& future : m_futureTable) {
+      if (future.second)
+        future.second->releaseThreadCondition();
     }
   }
+
   LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
 }
 
 void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
   LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str());
-  if (!addrs.empty()) {
-    boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
-    if (!lock.owns_lock()) {
-      if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(10))) {
-        LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
-        return;
-      }
+
+  if (addrs.empty()) {
+    return;
+  }
+
+  std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.try_lock_for(std::chrono::seconds(10))) {
+      LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
+      return;
     }
-    // clear first;
-    m_namesrvAddrList.clear();
-
-    vector<string> out;
-    UtilAll::Split(out, addrs, ";");
-    for (size_t i = 0; i < out.size(); i++) {
-      string addr = out[i];
-      UtilAll::Trim(addr);
-
-      string hostName;
-      short portNumber;
-      if (UtilAll::SplitURL(addr, hostName, portNumber)) {
-        LOG_INFO("update Namesrv:%s", addr.c_str());
-        m_namesrvAddrList.push_back(addr);
-      } else {
-        LOG_INFO("This may be invalid namer server: [%s]", addr.c_str());
-      }
+  }
+
+  // clear first;
+  m_namesrvAddrList.clear();
+
+  vector<string> out;
+  UtilAll::Split(out, addrs, ";");
+  for (auto addr : out) {
+    UtilAll::Trim(addr);
+
+    string hostName;
+    short portNumber;
+    if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+      LOG_INFO("update Namesrv:%s", addr.c_str());
+      m_namesrvAddrList.push_back(addr);
+    } else {
+      LOG_INFO("This may be invalid namer server: [%s]", addr.c_str());
     }
-    out.clear();
   }
+  out.clear();
 }
 
-bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
+bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis) {
+  std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, 3000, false, NULL));
+
+    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
     addResponseFuture(opaque, responseFuture);
-    // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
-    // timeoutms:%d", addr.c_str(), code, opaque, 3000);
 
     if (SendCommand(pTcp, request)) {
       responseFuture->setSendRequestOK(true);
-      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
-      if (pRsp == NULL) {
+      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse());
+      if (pRsp == nullptr) {
         LOG_ERROR("wait response timeout of heartbeat, so closeTransport of addr:%s", addr.c_str());
+        // avoid responseFuture leak;
         findAndDeleteResponseFuture(opaque);
         CloseTransport(addr, pTcp);
         return false;
@@ -152,6 +161,7 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& req
         return false;
       }
     } else {
+      // avoid responseFuture leak;
       findAndDeleteResponseFuture(opaque);
       CloseTransport(addr, pTcp);
     }
@@ -159,34 +169,28 @@ bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& req
   return false;
 }
 
-RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
-                                               RemotingCommand& request,
-                                               int timeoutMillis /* = 3000 */) {
+RemotingCommand* TcpRemotingClient::invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis) {
   LOG_DEBUG("InvokeSync:", addr.c_str());
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
+  std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
+
+    std::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, timeoutMillis));
     addResponseFuture(opaque, responseFuture);
 
     if (SendCommand(pTcp, request)) {
-      // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
-      // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
       responseFuture->setSendRequestOK(true);
-      RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
-      if (pRsp == NULL) {
+      RemotingCommand* pRsp = responseFuture->waitResponse();
+      if (pRsp == nullptr) {
         if (code != GET_CONSUMER_LIST_BY_GROUP) {
-          LOG_WARN(
-              "wait response timeout or get NULL response of code:%d, so "
-              "closeTransport of addr:%s",
-              code, addr.c_str());
+          LOG_WARN("wait response timeout or get NULL response of code:%d, so closeTransport of addr:%s", code,
+                   addr.c_str());
           CloseTransport(addr, pTcp);
         }
         // avoid responseFuture leak;
         findAndDeleteResponseFuture(opaque);
-        return NULL;
+        return nullptr;
       } else {
         return pRsp;
       }
@@ -197,137 +201,130 @@ RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
     }
   }
   LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str());
-  return NULL;
+  return nullptr;
 }
 
 bool TcpRemotingClient::invokeAsync(const string& addr,
                                     RemotingCommand& request,
-                                    AsyncCallbackWrap* cbw,
-                                    int64 timeoutMilliseconds,
+                                    AsyncCallbackWrap* callback,
+                                    int64 timeoutMillis,
                                     int maxRetrySendTimes,
                                     int retrySendTimes) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    //<!not delete, for callback to delete;
+  std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
     int code = request.getCode();
     int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
+
+    // delete in callback
+    std::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMillis, true, callback));
     responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
     responseFuture->setRetrySendTimes(retrySendTimes);
     responseFuture->setBrokerAddr(addr);
     responseFuture->setRequestCommand(request);
     addAsyncResponseFuture(opaque, responseFuture);
-    if (cbw) {
+
+    if (callback) {
       boost::asio::deadline_timer* t =
-          new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMilliseconds));
+          new boost::asio::deadline_timer(m_async_ioService, boost::posix_time::milliseconds(timeoutMillis));
       addTimerCallback(t, opaque);
-      boost::system::error_code e;
-      t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout, this, e, opaque));
+      t->async_wait(
+          boost::bind(&TcpRemotingClient::handleAsyncRequestTimeout, this, boost::asio::placeholders::error, opaque));
     }
 
-    if (SendCommand(pTcp, request))  // Even if send failed, asyncTimerThread
-                                     // will trigger next pull request or report
-                                     // send msg failed
-    {
+    // Even if send failed, asyncTimerThread will trigger next pull request or report send msg failed
+    if (SendCommand(pTcp, request)) {
       LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque);
       responseFuture->setSendRequestOK(true);
     }
     return true;
   }
+
   LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
   return false;
 }
 
 void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& request) {
   //<!not need callback;
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
+  std::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
     request.markOnewayRPC();
-    LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), request.getCode());
-    SendCommand(pTcp, request);
+    if (SendCommand(pTcp, request)) {
+      LOG_DEBUG("invokeOneway success. addr:%s, code:%d", addr.c_str(), request.getCode());
+    } else {
+      LOG_WARN("invokeOneway failed. addr:%s, code:%d", addr.c_str(), request.getCode());
+    }
+  } else {
+    LOG_WARN("invokeOneway failed: NULL transport. addr:%s, code:%d", addr.c_str(), request.getCode());
   }
 }
 
-boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& addr, bool needRespons) {
+std::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& addr, bool needResponse) {
   if (addr.empty()) {
     LOG_DEBUG("GetTransport of NameServer");
-    return CreateNameserverTransport(needRespons);
+    return CreateNameServerTransport(needResponse);
   }
-  return CreateTransport(addr, needRespons);
+  return CreateTransport(addr, needResponse);
 }
 
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needRespons) {
-  boost::shared_ptr<TcpTransport> tts;
+std::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string& addr, bool needResponse) {
+  std::shared_ptr<TcpTransport> tts;
+
   {
     // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
-    // long
-    // time, if could not get m_tcpLock, return NULL
-    bool bGetMutex = false;
-    boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+    // long time, if could not get m_tcpLock, return NULL
+    std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
     if (!lock.owns_lock()) {
-      if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
         LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
-        boost::shared_ptr<TcpTransport> pTcp;
+        std::shared_ptr<TcpTransport> pTcp;
         return pTcp;
-      } else {
-        bGetMutex = true;
       }
-    } else {
-      bGetMutex = true;
     }
-    if (bGetMutex) {
-      if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-        boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
-        boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
-        if (tcp) {
-          tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
-          if (connectStatus == e_connectWaitResponse) {
-            boost::shared_ptr<TcpTransport> pTcp;
-            return pTcp;
-          } else if (connectStatus == e_connectFail) {
-            LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
-            tcp->disconnect(addr);  // avoid coredump when connection with broker was broken
-            m_tcpTable.erase(addr);
-          } else if (connectStatus == e_connectSuccess) {
-            return tcp;
-          } else {
-            LOG_ERROR(
-                "go to fault state, erase:%s from tcpMap, and reconnect "
-                "it",
-                addr.c_str());
-            m_tcpTable.erase(addr);
-          }
+
+    // check for reuse
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      std::shared_ptr<TcpTransport> tcp = m_tcpTable[addr];
+
+      if (tcp) {
+        TcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
+        if (connectStatus == TCP_CONNECT_STATUS_SUCCESS) {
+          return tcp;
+        } else if (connectStatus == TCP_CONNECT_STATUS_WAIT) {
+          std::shared_ptr<TcpTransport> pTcp;
+          return pTcp;
+        } else if (connectStatus == TCP_CONNECT_STATUS_FAILED) {
+          LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
+          tcp->disconnect(addr);  // avoid coredump when connection with broker was broken
+          m_tcpTable.erase(addr);
+        } else {
+          LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str());
+          m_tcpTable.erase(addr);
         }
       }
+    }
 
-      //<!callback;
-      READ_CALLBACK callback = needRespons ? &TcpRemotingClient::static_messageReceived : NULL;
+    //<!callback;
+    TcpTransportReadCallback callback = needResponse ? &TcpRemotingClient::static_messageReceived : nullptr;
 
-      tts.reset(new TcpTransport(this, callback));
-      tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
-      if (connectStatus != e_connectWaitResponse) {
-        LOG_WARN("can not connect to :%s", addr.c_str());
-        tts->disconnect(addr);
-        boost::shared_ptr<TcpTransport> pTcp;
-        return pTcp;
-      } else {
-        m_tcpTable[addr] = tts;  // even if connecting failed finally, this
-                                 // server transport will be erased by next
-                                 // CreateTransport
-      }
-    } else {
-      LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
-      boost::shared_ptr<TcpTransport> pTcp;
+    tts = TcpTransport::CreateTransport(this, callback);
+    TcpConnectStatus connectStatus = tts->connect(addr, 0);  // use non-block
+    if (connectStatus != TCP_CONNECT_STATUS_WAIT) {
+      LOG_WARN("can not connect to:%s", addr.c_str());
+      tts->disconnect(addr);
+      std::shared_ptr<TcpTransport> pTcp;
       return pTcp;
+    } else {
+      // even if connecting failed finally, this server transport will be erased by next CreateTransport
+      m_tcpTable[addr] = tts;
     }
   }
 
-  tcpConnectStatus connectStatus = tts->waitTcpConnectEvent(m_tcpConnectTimeout);
-  if (connectStatus != e_connectSuccess) {
+  TcpConnectStatus connectStatus = tts->waitTcpConnectEvent(static_cast<int>(m_tcpConnectTimeout));
+  if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
     LOG_WARN("can not connect to server:%s", addr.c_str());
     tts->disconnect(addr);
-    boost::shared_ptr<TcpTransport> pTcp;
+    std::shared_ptr<TcpTransport> pTcp;
     return pTcp;
   } else {
     LOG_INFO("connect server with addr:%s success", addr.c_str());
@@ -335,165 +332,122 @@ boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string&
   }
 }
 
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameserverTransport(bool needRespons) {
+std::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameServerTransport(bool needResponse) {
   // m_namesrvLock was added to avoid operation of nameServer was blocked by
   // m_tcpLock, it was used by single Thread mostly, so no performance impact
-  // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
+  // try get m_tcpLock until m_tcpTransportTryLockTimeout to avoid blocking long
   // time, if could not get m_namesrvlock, return NULL
   LOG_DEBUG("--CreateNameserverTransport--");
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+  std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
   if (!lock.owns_lock()) {
-    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+    if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
       LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
-      boost::shared_ptr<TcpTransport> pTcp;
+      std::shared_ptr<TcpTransport> pTcp;
       return pTcp;
-    } else {
-      bGetMutex = true;
     }
-  } else {
-    bGetMutex = true;
   }
 
-  if (bGetMutex) {
-    if (!m_namesrvAddrChoosed.empty()) {
-      boost::shared_ptr<TcpTransport> pTcp = GetTransport(m_namesrvAddrChoosed, true);
-      if (pTcp)
-        return pTcp;
-      else
-        m_namesrvAddrChoosed.clear();
-    }
+  if (!m_namesrvAddrChoosed.empty()) {
+    std::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrChoosed, true);
+    if (pTcp)
+      return pTcp;
+    else
+      m_namesrvAddrChoosed.clear();
+  }
 
-    vector<string>::iterator itp = m_namesrvAddrList.begin();
-    for (; itp != m_namesrvAddrList.end(); ++itp) {
-      unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
-      if (m_namesrvIndex == numeric_limits<unsigned int>::max())
-        m_namesrvIndex = 0;
-      m_namesrvIndex++;
-      LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index,
-               m_namesrvAddrList.size());
-      boost::shared_ptr<TcpTransport> pTcp = GetTransport(m_namesrvAddrList[index], true);
-      if (pTcp) {
-        m_namesrvAddrChoosed = m_namesrvAddrList[index];
-        return pTcp;
-      }
+  for (int i = 0; i < m_namesrvAddrList.size(); i++) {
+    unsigned int index = m_namesrvIndex++ % m_namesrvAddrList.size();
+    LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "", m_namesrvIndex, index,
+             m_namesrvAddrList.size());
+    std::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrList[index], true);
+    if (pTcp) {
+      m_namesrvAddrChoosed = m_namesrvAddrList[index];
+      return pTcp;
     }
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  } else {
-    LOG_WARN("get nameServer tcpTransport mutex failed");
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
   }
+
+  std::shared_ptr<TcpTransport> pTcp;
+  return pTcp;
 }
 
-void TcpRemotingClient::CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> pTcp) {
+bool TcpRemotingClient::CloseTransport(const string& addr, std::shared_ptr<TcpTransport> pTcp) {
   if (addr.empty()) {
     return CloseNameServerTransport(pTcp);
   }
 
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+  std::unique_lock<std::timed_mutex> lock(m_tcpTableLock, std::try_to_lock);
   if (!lock.owns_lock()) {
-    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+    if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
       LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
-      return;
-    } else {
-      bGetMutex = true;
+      return false;
     }
-  } else {
-    bGetMutex = true;
   }
+
   LOG_ERROR("CloseTransport of:%s", addr.c_str());
-  if (bGetMutex) {
-    bool removeItemFromTable = true;
-    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
-        LOG_INFO(
-            "tcpTransport with addr:%s has been closed before, and has been "
-            "created again, nothing to do",
-            addr.c_str());
-        removeItemFromTable = false;
-      }
-    } else {
-      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
-      removeItemFromTable = false;
-    }
 
-    if (removeItemFromTable == true) {
-      LOG_WARN("closeTransport: disconnect broker:%s with state:%d", addr.c_str(),
-               m_tcpTable[addr]->getTcpConnectStatus());
-      if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
-        m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection with server was broken
-      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
-      m_tcpTable.erase(addr);
+  bool removeItemFromTable = true;
+  if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+    if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+      LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do",
+               addr.c_str());
+      removeItemFromTable = false;
     }
   } else {
-    LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
-    return;
+    LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
+    removeItemFromTable = false;
   }
+
+  if (removeItemFromTable) {
+    LOG_WARN("closeTransport: disconnect:%s with state:%d", addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus());
+    if (m_tcpTable[addr]->getTcpConnectStatus() == TCP_CONNECT_STATUS_SUCCESS)
+      m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection with server was broken
+    LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+    m_tcpTable.erase(addr);
+  }
+
   LOG_ERROR("CloseTransport of:%s end", addr.c_str());
+
+  return removeItemFromTable;
 }
 
-void TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp) {
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+bool TcpRemotingClient::CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp) {
+  std::unique_lock<std::timed_mutex> lock(m_namesrvLock, std::try_to_lock);
   if (!lock.owns_lock()) {
-    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
-      return;
-    } else {
-      bGetMutex = true;
+    if (!lock.try_lock_for(std::chrono::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameServerTransport get timed_mutex timeout");
+      return false;
     }
-  } else {
-    bGetMutex = true;
   }
-  if (bGetMutex) {
-    string addr = m_namesrvAddrChoosed;
-    bool removeItemFromTable = true;
-    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
-        LOG_INFO(
-            "tcpTransport with addr:%s has been closed before, and has been "
-            "created again, nothing to do",
-            addr.c_str());
-        removeItemFromTable = false;
-      }
-    } else {
-      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
-      removeItemFromTable = false;
-    }
 
-    if (removeItemFromTable == true) {
-      m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection with server was broken
-      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
-      m_tcpTable.erase(addr);
-      m_namesrvAddrChoosed.clear();
-    }
-  } else {
-    LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", m_namesrvAddrChoosed.c_str());
-    return;
+  string addr = m_namesrvAddrChoosed;
+
+  bool removeItemFromTable = CloseTransport(addr, pTcp);
+  if (removeItemFromTable) {
+    m_namesrvAddrChoosed.clear();
   }
+
+  return removeItemFromTable;
 }
 
-bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg) {
-  const MemoryBlock* phead = msg.GetHead();
-  const MemoryBlock* pbody = msg.GetBody();
+bool TcpRemotingClient::SendCommand(std::shared_ptr<TcpTransport> pTts, RemotingCommand& msg) {
+  const MemoryBlock* pHead = msg.GetHead();
+  const MemoryBlock* pBody = msg.GetBody();
 
-  unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
-  if (phead->getData()) {
-    result->write(phead->getData(), phead->getSize());
+  unique_ptr<MemoryOutputStream> buffer(new MemoryOutputStream(1024));
+  if (pHead->getSize() > 0) {
+    buffer->write(pHead->getData(), static_cast<size_t>(pHead->getSize()));
   }
-  if (pbody->getData()) {
-    result->write(pbody->getData(), pbody->getSize());
+  if (pBody->getSize() > 0) {
+    buffer->write(pBody->getData(), static_cast<size_t>(pBody->getSize()));
   }
-  const char* pData = static_cast<const char*>(result->getData());
-  int len = result->getDataSize();
+
+  const char* pData = static_cast<const char*>(buffer->getData());
+  size_t len = buffer->getDataSize();
   return pTts->sendMessage(pData, len);
 }
 
 void TcpRemotingClient::static_messageReceived(void* context, const MemoryBlock& mem, const string& addr) {
-  TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
+  auto* pTcpRemotingClient = reinterpret_cast<TcpRemotingClient*>(context);
   if (pTcpRemotingClient)
     pTcpRemotingClient->messageReceived(mem, addr);
 }
@@ -503,11 +457,11 @@ void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& ad
 }
 
 void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr) {
-  RemotingCommand* pRespondCmd = NULL;
+  RemotingCommand* pRespondCmd = nullptr;
   try {
     pRespondCmd = RemotingCommand::Decode(mem);
   } catch (...) {
-    LOG_ERROR("processData_error");
+    LOG_ERROR("processData error");
     return;
   }
 
@@ -515,43 +469,58 @@ void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& addr)
 
   //<!process self;
   if (pRespondCmd->isResponseType()) {
-    boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+    std::shared_ptr<ResponseFuture> pFuture = findAndDeleteAsyncResponseFuture(opaque);
     if (!pFuture) {
       pFuture = findAndDeleteResponseFuture(opaque);
-      if (pFuture) {
-        if (pFuture->getSyncResponseFlag()) {
-          LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
-          deleteAndZero(pRespondCmd);
-          return;
-        }
-        LOG_DEBUG("find_response opaque:%d", opaque);
-      } else {
+      if (!pFuture) {
         LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque);
         deleteAndZero(pRespondCmd);
         return;
       }
     }
+
+    LOG_DEBUG("find_response opaque:%d", opaque);
     processResponseCommand(pRespondCmd, pFuture);
   } else {
     processRequestCommand(pRespondCmd, addr);
   }
 }
 
-void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) {
-  int code = pfuture->getRequestCode();
+void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, std::shared_ptr<ResponseFuture> pFuture) {
+  int code = pFuture->getRequestCode();
+  pCmd->SetExtHeader(code);  // set head, for response use
+
   int opaque = pCmd->getOpaque();
-  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque,
-            pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
-  pCmd->SetExtHeader(code);  // set head , for response use
-
-  pfuture->setResponse(pCmd);
-
-  if (pfuture->getASyncFlag()) {
-    if (!pfuture->getAsyncResponseFlag()) {
-      pfuture->setAsyncResponseFlag();
-      pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
-      cancelTimerCallback(opaque);
-      pfuture->executeInvokeCallback();
+  LOG_DEBUG("processResponseCommand, code:%d, opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque,
+            pFuture->getMaxRetrySendTimes(), pFuture->getRetrySendTimes());
+
+  if (!pFuture->setResponse(pCmd)) {
+    // this branch is unreachable normally.
+    LOG_WARN("response already timeout of opaque:%d", opaque);
+    deleteAndZero(pCmd);
+    return;
+  }
+
+  if (pFuture->getAsyncFlag()) {
+    cancelTimerCallback(opaque);
+    pFuture->invokeCompleteCallback();
+  }
+}
+
+void TcpRemotingClient::handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque) {
+  if (e == boost::asio::error::operation_aborted) {
+    LOG_DEBUG("handleAsyncRequestTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
+    return;
+  }
+
+  LOG_DEBUG("handleAsyncRequestTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
+
+  std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+  if (pFuture) {
+    LOG_ERROR("no response got for opaque:%d", opaque);
+    eraseTimerCallback(opaque);
+    if (pFuture->getAsyncCallbackWrap()) {
+      pFuture->invokeExceptionCallback();
     }
   }
 }
@@ -575,16 +544,16 @@ void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const strin
   }
 }
 
-void TcpRemotingClient::addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
-  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-  m_futureTable[opaque] = pfuture;
+void TcpRemotingClient::addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) {
+  std::lock_guard<std::mutex> lock(m_futureTableLock);
+  m_futureTable[opaque] = pFuture;
 }
 
 // Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
 // be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-  boost::shared_ptr<ResponseFuture> pResponseFuture;
+std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
+  std::lock_guard<std::mutex> lock(m_futureTableLock);
+  std::shared_ptr<ResponseFuture> pResponseFuture;
   if (m_futureTable.find(opaque) != m_futureTable.end()) {
     pResponseFuture = m_futureTable[opaque];
     m_futureTable.erase(opaque);
@@ -592,42 +561,20 @@ boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteResponseFuture
   return pResponseFuture;
 }
 
-void TcpRemotingClient::handleAsyncPullForResponseTimeout(const boost::system::error_code& e, int opaque) {
-  if (e == boost::asio::error::operation_aborted) {
-    LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(),
-             e.message().data());
-    return;
-  }
-
-  LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
-  boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
-  if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) {
-    if ((pFuture->getAsyncResponseFlag() != true))  // if no response received, then check timeout or not
-    {
-      LOG_ERROR("no response got for opaque:%d", opaque);
-      pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
-      pFuture->executeInvokeCallbackException();
-    }
-  }
-
-  eraseTimerCallback(opaque);
-}
-
-void TcpRemotingClient::addAsyncResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
-  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
-  m_asyncFutureTable[opaque] = pfuture;
+void TcpRemotingClient::addAsyncResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture) {
+  std::lock_guard<std::mutex> lock(m_asyncFutureTableLock);
+  m_asyncFutureTable[opaque] = pFuture;
 }
 
 // Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will
 // be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
-  boost::shared_ptr<ResponseFuture> pResponseFuture;
+std::shared_ptr<ResponseFuture> TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
+  std::lock_guard<std::mutex> lock(m_asyncFutureTableLock);
+  std::shared_ptr<ResponseFuture> pResponseFuture;
   if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
     pResponseFuture = m_asyncFutureTable[opaque];
     m_asyncFutureTable.erase(opaque);
   }
-
   return pResponseFuture;
 }
 
@@ -638,56 +585,64 @@ void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, ClientRemot
 }
 
 void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+  std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+  if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
     LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
-    boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
-    old_t->cancel();
+    boost::asio::deadline_timer* old_t = m_asyncTimerTable[opaque];
+    m_asyncTimerTable.erase(opaque);
+    try {
+      old_t->cancel();
+    } catch (const std::exception& ec) {
+      LOG_WARN("encounter exception when cancel old timer: %s", ec.what());
+    }
     delete old_t;
-    old_t = NULL;
-    m_async_timer_map.erase(opaque);
   }
-  m_async_timer_map[opaque] = t;
+  m_asyncTimerTable[opaque] = t;
 }
 
 void TcpRemotingClient::eraseTimerCallback(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+  std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+  if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
     LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
-    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+    boost::asio::deadline_timer* t = m_asyncTimerTable[opaque];
+    m_asyncTimerTable.erase(opaque);
     delete t;
-    t = NULL;
-    m_async_timer_map.erase(opaque);
   }
 }
 
 void TcpRemotingClient::cancelTimerCallback(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+  std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+  if (m_asyncTimerTable.find(opaque) != m_asyncTimerTable.end()) {
     LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
-    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
-    t->cancel();
+    boost::asio::deadline_timer* t = m_asyncTimerTable[opaque];
+    m_asyncTimerTable.erase(opaque);
+    try {
+      t->cancel();
+    } catch (const std::exception& ec) {
+      LOG_WARN("encounter exception when cancel timer: %s", ec.what());
+    }
     delete t;
-    t = NULL;
-    m_async_timer_map.erase(opaque);
   }
 }
 
 void TcpRemotingClient::removeAllTimerCallback() {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it != m_async_timer_map.end(); ++it) {
-    boost::asio::deadline_timer* t = it->second;
-    t->cancel();
+  std::lock_guard<std::mutex> lock(m_asyncTimerTableLock);
+  for (const auto& timer : m_asyncTimerTable) {
+    boost::asio::deadline_timer* t = timer.second;
+    try {
+      t->cancel();
+    } catch (const std::exception& ec) {
+      LOG_WARN("encounter exception when cancel timer: %s", ec.what());
+    }
     delete t;
-    t = NULL;
   }
-  m_async_timer_map.clear();
+  m_asyncTimerTable.clear();
 }
 
 void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
-  // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it
-  // later
-  boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+  // delete the map record of opaque<->ResponseFuture, so the answer for the pull request will
+  // discard when receive it later
+  std::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
   if (!pFuture) {
     pFuture = findAndDeleteResponseFuture(opaque);
     if (pFuture) {
@@ -695,9 +650,9 @@ void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq,
     }
   } else {
     LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+    // delete the timeout timer for opaque for pullrequest
+    cancelTimerCallback(opaque);
   }
-  // delete the timeout timer for opaque for pullrequest
-  cancelTimerCallback(opaque);
 }
 
 //<!************************************************************************
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
old mode 100644
new mode 100755
index 6085f7e..82f1155
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -1,127 +1,135 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef __TCPREMOTINGCLIENT_H__
-#define __TCPREMOTINGCLIENT_H__
-
-#include <boost/asio.hpp>
-#include <boost/asio/io_service.hpp>
-#include <boost/bind.hpp>
-#include <boost/date_time/posix_time/posix_time.hpp>
-#include <boost/weak_ptr.hpp>
-#include <map>
-#include "ClientRemotingProcessor.h"
-#include "RemotingCommand.h"
-#include "ResponseFuture.h"
-#include "SocketUtil.h"
-#include "TcpTransport.h"
-
-namespace rocketmq {
-//<!************************************************************************
-
-class TcpRemotingClient {
- public:
-  TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
-  virtual ~TcpRemotingClient();
-  void stopAllTcpTransportThread();
-  void updateNameServerAddressList(const string& addrs);
-
-  //<!delete outsite;
-  RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
-
-  bool invokeHeartBeat(const string& addr, RemotingCommand& request);
-
-  bool invokeAsync(const string& addr,
-                   RemotingCommand& request,
-                   AsyncCallbackWrap* cbw,
-                   int64 timeoutMilliseconds,
-                   int maxRetrySendTimes = 1,
-                   int retrySendTimes = 1);
-  void invokeOneway(const string& addr, RemotingCommand& request);
-
-  void ProcessData(const MemoryBlock& mem, const string& addr);
-
-  void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
-
-  void boost_asio_work();
-  void handleAsyncPullForResponseTimeout(const boost::system::error_code& e, int opaque);
-  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
-
- private:
-  static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
-  void messageReceived(const MemoryBlock& mem, const string& addr);
-  boost::shared_ptr<TcpTransport> GetTransport(const string& addr, bool needRespons);
-  boost::shared_ptr<TcpTransport> CreateTransport(const string& addr, bool needRespons);
-  boost::shared_ptr<TcpTransport> CreateNameserverTransport(bool needRespons);
-  void CloseTransport(const string& addr, boost::shared_ptr<TcpTransport> pTcp);
-  void CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp);
-  bool SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand& msg);
-  void processRequestCommand(RemotingCommand* pCmd, const string& addr);
-  void processResponseCommand(RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture);
-
-  void addResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture);
-  boost::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque);
-
-  void addAsyncResponseFuture(int opaque, boost::shared_ptr<ResponseFuture> pfuture);
-  boost::shared_ptr<ResponseFuture> findAndDeleteAsyncResponseFuture(int opaque);
-
-  void addTimerCallback(boost::asio::deadline_timer* t, int opaque);
-  void eraseTimerCallback(int opaque);
-  void cancelTimerCallback(int opaque);
-  void removeAllTimerCallback();
-
- private:
-  typedef map<string, boost::shared_ptr<TcpTransport>> TcpMap;
-  typedef map<int, boost::shared_ptr<ResponseFuture>> ResMap;
-
-  typedef map<int, ClientRemotingProcessor*> RequestMap;
-  RequestMap m_requestTable;
-
-  boost::mutex m_futureTableMutex;
-  ResMap m_futureTable;  //<! id->future;
-
-  ResMap m_asyncFutureTable;
-  boost::mutex m_asyncFutureLock;
-
-  TcpMap m_tcpTable;  //<! ip->tcp;
-  boost::timed_mutex m_tcpLock;
-
-  // ThreadPool        m_threadpool;
-  int m_pullThreadNum;
-  uint64_t m_tcpConnectTimeout;           // ms
-  uint64_t m_tcpTransportTryLockTimeout;  // s
-
-  //<! Nameserver
-  boost::timed_mutex m_namesrvlock;
-  vector<string> m_namesrvAddrList;
-  string m_namesrvAddrChoosed;
-  unsigned int m_namesrvIndex;
-  boost::asio::io_service m_ioService;
-  boost::thread_group m_threadpool;
-  boost::asio::io_service::work m_ioServiceWork;
-
-  boost::asio::io_service m_async_ioService;
-  unique_ptr<boost::thread> m_async_service_thread;
-
-  typedef map<int, boost::asio::deadline_timer*> asyncTimerMap;
-  boost::mutex m_timerMapMutex;
-  asyncTimerMap m_async_timer_map;
-};
-
-//<!************************************************************************
-}  //<!end namespace;
-
-#endif
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TCPREMOTINGCLIENT_H__
+#define __TCPREMOTINGCLIENT_H__
+
+#include <map>
+#include <mutex>
+
+#include <boost/asio.hpp>
+#include <boost/asio/io_service.hpp>
+#include <boost/bind.hpp>
+#include <boost/date_time/posix_time/posix_time.hpp>
+
+#include "ClientRemotingProcessor.h"
+#include "RemotingCommand.h"
+#include "ResponseFuture.h"
+#include "SocketUtil.h"
+#include "TcpTransport.h"
+
+namespace rocketmq {
+//<!************************************************************************
+
+class TcpRemotingClient {
+ public:
+  TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
+  virtual ~TcpRemotingClient();
+
+  void stopAllTcpTransportThread();
+  void updateNameServerAddressList(const string& addrs);
+
+  bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+
+  // delete outsite;
+  RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+
+  bool invokeAsync(const string& addr,
+                   RemotingCommand& request,
+                   AsyncCallbackWrap* cbw,
+                   int64 timeoutMilliseconds,
+                   int maxRetrySendTimes = 1,
+                   int retrySendTimes = 1);
+
+  void invokeOneway(const string& addr, RemotingCommand& request);
+
+  void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
+
+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
+
+ private:
+  static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
+
+  void messageReceived(const MemoryBlock& mem, const string& addr);
+  void ProcessData(const MemoryBlock& mem, const string& addr);
+  void processRequestCommand(RemotingCommand* pCmd, const string& addr);
+  void processResponseCommand(RemotingCommand* pCmd, std::shared_ptr<ResponseFuture> pFuture);
+  void handleAsyncRequestTimeout(const boost::system::error_code& e, int opaque);
+
+  std::shared_ptr<TcpTransport> GetTransport(const string& addr, bool needResponse);
+  std::shared_ptr<TcpTransport> CreateTransport(const string& addr, bool needResponse);
+  std::shared_ptr<TcpTransport> CreateNameServerTransport(bool needResponse);
+
+  bool CloseTransport(const string& addr, std::shared_ptr<TcpTransport> pTcp);
+  bool CloseNameServerTransport(std::shared_ptr<TcpTransport> pTcp);
+
+  bool SendCommand(std::shared_ptr<TcpTransport> pTts, RemotingCommand& msg);
+
+  void addResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture);
+  std::shared_ptr<ResponseFuture> findAndDeleteResponseFuture(int opaque);
+
+  void addAsyncResponseFuture(int opaque, std::shared_ptr<ResponseFuture> pFuture);
+  std::shared_ptr<ResponseFuture> findAndDeleteAsyncResponseFuture(int opaque);
+
+  void addTimerCallback(boost::asio::deadline_timer* t, int opaque);
+  void eraseTimerCallback(int opaque);
+  void cancelTimerCallback(int opaque);
+  void removeAllTimerCallback();
+
+  void boost_asio_work();
+
+ private:
+  using RequestMap = map<int, ClientRemotingProcessor*>;
+  using TcpMap = map<string, std::shared_ptr<TcpTransport>>;
+  using ResMap = map<int, std::shared_ptr<ResponseFuture>>;
+  using AsyncTimerMap = map<int, boost::asio::deadline_timer*>;
+
+  RequestMap m_requestTable;
+
+  TcpMap m_tcpTable;  //<! addr->tcp;
+  std::timed_mutex m_tcpTableLock;
+
+  ResMap m_futureTable;  //<! id->future;
+  std::mutex m_futureTableLock;
+
+  ResMap m_asyncFutureTable;
+  std::mutex m_asyncFutureTableLock;
+
+  AsyncTimerMap m_asyncTimerTable;
+  std::mutex m_asyncTimerTableLock;
+
+  int m_pullThreadNum;
+  uint64_t m_tcpConnectTimeout;           // ms
+  uint64_t m_tcpTransportTryLockTimeout;  // s
+
+  //<! NameServer
+  std::timed_mutex m_namesrvLock;
+  vector<string> m_namesrvAddrList;
+  string m_namesrvAddrChoosed;
+  unsigned int m_namesrvIndex;
+
+  boost::asio::io_service m_ioService;
+  boost::asio::io_service::work m_ioServiceWork;
+  boost::thread_group m_threadpool;
+
+  boost::asio::io_service m_async_ioService;
+  unique_ptr<boost::thread> m_async_service_thread;
+};
+
+//<!************************************************************************
+}  // namespace rocketmq
+
+#endif
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 701cd8a..c56e5d1 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -15,11 +15,15 @@
  * limitations under the License.
  */
 #include "TcpTransport.h"
+
+#include <chrono>
+
 #ifndef WIN32
 #include <arpa/inet.h>  // for sockaddr_in and inet_ntoa...
 #include <netinet/tcp.h>
 #include <sys/socket.h>  // for socket(), bind(), and connect()...
 #endif
+
 #include "Logging.h"
 #include "TcpRemotingClient.h"
 #include "UtilAll.h"
@@ -27,99 +31,56 @@
 namespace rocketmq {
 
 //<!************************************************************************
-TcpTransport::TcpTransport(TcpRemotingClient* pTcpRemointClient, READ_CALLBACK handle /* = NULL */)
-    : m_tcpConnectStatus(e_connectInit),
-      m_event_base_status(false),
-      m_event_base_mtx(),
-      m_event_base_cv(),
-      m_ReadDatathread(NULL),
-      m_readcallback(handle),
+TcpTransport::TcpTransport(TcpRemotingClient* pTcpRemointClient, TcpTransportReadCallback handle)
+    : m_event(nullptr),
+      m_tcpConnectStatus(TCP_CONNECT_STATUS_INIT),
+      m_connectEventLock(),
+      m_connectEvent(),
+      m_readCallback(handle),
       m_tcpRemotingClient(pTcpRemointClient) {
   m_startTime = UtilAll::currentTimeMillis();
-#ifdef WIN32
-  evthread_use_windows_threads();
-#else
-  evthread_use_pthreads();
-#endif
-  m_eventBase = NULL;
-  m_bufferEvent = NULL;
 }
+
 TcpTransport::~TcpTransport() {
-  m_readcallback = NULL;
-  m_bufferEvent = NULL;
-  m_eventBase = NULL;
+  freeBufferEvent();
+  m_readCallback = nullptr;
 }
 
-tcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeOutMillisecs /* = 3000 */) {
-  string hostName;
-  short portNumber;
-  LOG_DEBUG("connect to [%s].", strServerURL.c_str());
-  if (!UtilAll::SplitURL(strServerURL, hostName, portNumber)) {
-    LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
-    return e_connectFail;
-  }
-
-  boost::lock_guard<boost::mutex> lock(m_socketLock);
-
-  struct sockaddr_in sin;
-  memset(&sin, 0, sizeof(sin));
-  sin.sin_family = AF_INET;
-  sin.sin_addr.s_addr = getInetAddr(hostName);
-
-  sin.sin_port = htons(portNumber);
-
-  m_eventBase = event_base_new();
-  m_bufferEvent = bufferevent_socket_new(m_eventBase, -1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE);
-  bufferevent_setcb(m_bufferEvent, readNextMessageIntCallback, NULL, eventcb, this);
-  bufferevent_enable(m_bufferEvent, EV_READ | EV_WRITE);
-  bufferevent_setwatermark(m_bufferEvent, EV_READ, 4, 0);
-
-  setTcpConnectStatus(e_connectWaitResponse);
-  if (bufferevent_socket_connect(m_bufferEvent, (struct sockaddr*)&sin, sizeof(sin)) < 0) {
-    LOG_INFO("connect to fd:%d failed", bufferevent_getfd(m_bufferEvent));
-    setTcpConnectStatus(e_connectFail);
-    freeBufferEvent();
-    return e_connectFail;
-  } else {
-    int fd = bufferevent_getfd(m_bufferEvent);
-    LOG_INFO("try to connect to fd:%d, addr:%s", fd, (hostName.c_str()));
-
-    evthread_make_base_notifiable(m_eventBase);
-
-    m_ReadDatathread = new boost::thread(boost::bind(&TcpTransport::runThread, this));
-
-    while (!m_event_base_status) {
-      LOG_INFO("Wait till event base is looping");
-      boost::system_time const timeout = boost::get_system_time() + boost::posix_time::milliseconds(1000);
-      boost::unique_lock<boost::mutex> lock(m_event_base_mtx);
-      m_event_base_cv.timed_wait(lock, timeout);
-    }
+void TcpTransport::freeBufferEvent() {
+  // freeBufferEvent is idempotent.
 
-    return e_connectWaitResponse;
+  // first, unlink BufferEvent
+  if (m_event != nullptr) {
+    m_event->setCallback(nullptr, nullptr, nullptr, nullptr);
   }
+
+  // then, release BufferEvent
+  m_event.reset();
 }
 
-void TcpTransport::setTcpConnectStatus(tcpConnectStatus connectStatus) {
+void TcpTransport::setTcpConnectStatus(TcpConnectStatus connectStatus) {
   m_tcpConnectStatus = connectStatus;
 }
 
-tcpConnectStatus TcpTransport::getTcpConnectStatus() {
+TcpConnectStatus TcpTransport::getTcpConnectStatus() {
   return m_tcpConnectStatus;
 }
 
-tcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillisecs) {
-  boost::unique_lock<boost::mutex> lk(m_connectEventLock);
-  if (!m_connectEvent.timed_wait(lk, boost::posix_time::milliseconds(timeoutMillisecs))) {
-    LOG_INFO("connect timeout");
+TcpConnectStatus TcpTransport::waitTcpConnectEvent(int timeoutMillis) {
+  std::unique_lock<std::mutex> eventLock(m_connectEventLock);
+  if (m_tcpConnectStatus == TCP_CONNECT_STATUS_WAIT) {
+    if (m_connectEvent.wait_for(eventLock, std::chrono::milliseconds(timeoutMillis)) == std::cv_status::timeout) {
+      LOG_INFO("connect timeout");
+    }
   }
-  return getTcpConnectStatus();
+  return m_tcpConnectStatus;
 }
 
-void TcpTransport::setTcpConnectEvent(tcpConnectStatus connectStatus) {
-  tcpConnectStatus baseStatus(getTcpConnectStatus());
-  setTcpConnectStatus(connectStatus);
-  if (baseStatus == e_connectWaitResponse) {
-    LOG_INFO("received libevent callback event");
+// internal method
+void TcpTransport::setTcpConnectEvent(TcpConnectStatus connectStatus) {
+  TcpConnectStatus baseStatus = m_tcpConnectStatus.exchange(connectStatus, std::memory_order_relaxed);
+  if (baseStatus == TCP_CONNECT_STATUS_WAIT) {
+    std::unique_lock<std::mutex> eventLock(m_connectEventLock);
     m_connectEvent.notify_all();
   }
 }
@@ -165,129 +126,109 @@ u_long TcpTransport::getInetAddr(string& hostname) {
 }
 
 void TcpTransport::disconnect(const string& addr) {
-  boost::lock_guard<boost::mutex> lock(m_socketLock);
-  if (getTcpConnectStatus() != e_connectInit) {
-    clearBufferEventCallback();
-    LOG_INFO("disconnect:%s start", addr.c_str());
-    m_connectEvent.notify_all();
-    setTcpConnectStatus(e_connectInit);
-    if (m_ReadDatathread) {
-      m_ReadDatathread->interrupt();
-      exitBaseDispatch();
-      while (m_ReadDatathread->timed_join(boost::posix_time::seconds(1)) == false) {
-        LOG_WARN("join readDataThread fail, retry");
-        m_ReadDatathread->interrupt();
-        exitBaseDispatch();
-      }
-      delete m_ReadDatathread;
-      m_ReadDatathread = NULL;
-    }
+  // disconnect is idempotent.
+  std::lock_guard<std::mutex> lock(m_eventLock);
+  if (getTcpConnectStatus() != TCP_CONNECT_STATUS_INIT) {
+    LOG_INFO("disconnect:%s start. event:%p", addr.c_str(), m_event.get());
     freeBufferEvent();
+    setTcpConnectEvent(TCP_CONNECT_STATUS_INIT);
     LOG_INFO("disconnect:%s completely", addr.c_str());
   }
 }
 
-void TcpTransport::clearBufferEventCallback() {
-  if (m_bufferEvent) {
-    // Bufferevents are internally reference-counted, so if the bufferevent has
-    // pending deferred callbacks when you free it, it won't be deleted until
-    // the callbacks are done.
-    // so just empty callback to avoid future callback by libevent
-    bufferevent_setcb(m_bufferEvent, NULL, NULL, NULL, NULL);
+TcpConnectStatus TcpTransport::connect(const string& strServerURL, int timeoutMillis) {
+  string hostname;
+  short port;
+  LOG_DEBUG("connect to [%s].", strServerURL.c_str());
+  if (!UtilAll::SplitURL(strServerURL, hostname, port)) {
+    LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
+    return TCP_CONNECT_STATUS_FAILED;
   }
-}
 
-void TcpTransport::freeBufferEvent() {
-  if (m_bufferEvent) {
-    bufferevent_free(m_bufferEvent);
-    m_bufferEvent = NULL;
-  }
-  if (m_eventBase) {
-    event_base_free(m_eventBase);
-    m_eventBase = NULL;
-  }
-}
-void TcpTransport::exitBaseDispatch() {
-  if (m_eventBase) {
-    event_base_loopbreak(m_eventBase);
-    // event_base_loopexit(m_eventBase, NULL);  //Note: memory leak will be
-    // occured when timer callback was not done;
+  {
+    std::lock_guard<std::mutex> lock(m_eventLock);
+
+    struct sockaddr_in sin;
+    memset(&sin, 0, sizeof(sin));
+    sin.sin_family = AF_INET;
+    sin.sin_addr.s_addr = getInetAddr(hostname);
+    sin.sin_port = htons(port);
+
+    m_event.reset(EventLoop::GetDefaultEventLoop()->createBufferEvent(-1, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE));
+    m_event->setCallback(readNextMessageIntCallback, nullptr, eventCallback, shared_from_this());
+    m_event->setWatermark(EV_READ, 4, 0);
+    m_event->enable(EV_READ | EV_WRITE);
+
+    setTcpConnectStatus(TCP_CONNECT_STATUS_WAIT);
+    if (m_event->connect((struct sockaddr*)&sin, sizeof(sin)) < 0) {
+      LOG_INFO("connect to fd:%d failed", m_event->getfd());
+      freeBufferEvent();
+      setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
+      return TCP_CONNECT_STATUS_FAILED;
+    }
   }
-}
 
-void TcpTransport::runThread() {
-  if (m_eventBase != NULL) {
-    if (!m_event_base_status) {
-      boost::mutex::scoped_lock lock(m_event_base_mtx);
-      m_event_base_status.store(true);
-      m_event_base_cv.notify_all();
-      LOG_INFO("Notify on event_base_dispatch");
-    }
-    event_base_dispatch(m_eventBase);
-    // event_base_loop(m_eventBase, EVLOOP_ONCE);//EVLOOP_NONBLOCK should not
-    // be used, as could not callback event immediatly
+  if (timeoutMillis <= 0) {
+    LOG_INFO("try to connect to fd:%d, addr:%s", m_event->getfd(), hostname.c_str());
+    return TCP_CONNECT_STATUS_WAIT;
   }
-  LOG_INFO("event_base_dispatch exit once");
-  boost::this_thread::sleep(boost::posix_time::milliseconds(1));
-  if (getTcpConnectStatus() != e_connectSuccess)
-    return;
-}
 
-void TcpTransport::timeoutcb(evutil_socket_t fd, short what, void* arg) {
-  LOG_INFO("timeoutcb: received  event:%d on fd:%d", what, fd);
-  TcpTransport* tcpTrans = (TcpTransport*)arg;
-  if (tcpTrans->getTcpConnectStatus() != e_connectSuccess) {
-    LOG_INFO("timeoutcb: after connect time, tcp was not established on fd:%d", fd);
-    tcpTrans->setTcpConnectStatus(e_connectFail);
-  } else {
-    LOG_INFO("timeoutcb: after connect time, tcp was established on fd:%d", fd);
+  TcpConnectStatus connectStatus = waitTcpConnectEvent(timeoutMillis);
+  if (connectStatus != TCP_CONNECT_STATUS_SUCCESS) {
+    LOG_WARN("can not connect to server:%s", strServerURL.c_str());
+
+    std::lock_guard<std::mutex> lock(m_eventLock);
+    freeBufferEvent();
+    setTcpConnectStatus(TCP_CONNECT_STATUS_FAILED);
+    return TCP_CONNECT_STATUS_FAILED;
   }
+
+  return TCP_CONNECT_STATUS_SUCCESS;
 }
 
-void TcpTransport::eventcb(struct bufferevent* bev, short what, void* ctx) {
-  evutil_socket_t fd = bufferevent_getfd(bev);
-  TcpTransport* tcpTrans = (TcpTransport*)ctx;
+void TcpTransport::eventCallback(BufferEvent* event, short what, TcpTransport* transport) {
+  socket_t fd = event->getfd();
   LOG_INFO("eventcb: received event:%x on fd:%d", what, fd);
   if (what & BEV_EVENT_CONNECTED) {
+    LOG_INFO("eventcb: connect to fd:%d successfully", fd);
+
+    // disable Nagle
     int val = 1;
-    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char*)&val, sizeof(val));
-    LOG_INFO("eventcb:connect to fd:%d successfully", fd);
-    tcpTrans->setTcpConnectEvent(e_connectSuccess);
+    setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (void*)&val, sizeof(val));
+    transport->setTcpConnectEvent(TCP_CONNECT_STATUS_SUCCESS);
   } else if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF | BEV_EVENT_READING | BEV_EVENT_WRITING)) {
-    LOG_INFO("eventcb:rcv error event cb:%x on fd:%d", what, fd);
-    tcpTrans->setTcpConnectEvent(e_connectFail);
-    bufferevent_setcb(bev, NULL, NULL, NULL, NULL);
-    // bufferevent_disable(bev, EV_READ|EV_WRITE);
-    // bufferevent_free(bev);
+    LOG_INFO("eventcb: received error event cb:%x on fd:%d", what, fd);
+    // if error, stop callback.
+    event->setCallback(nullptr, nullptr, nullptr, nullptr);
+    transport->setTcpConnectEvent(TCP_CONNECT_STATUS_FAILED);
   } else {
     LOG_ERROR("eventcb: received error event:%d on fd:%d", what, fd);
   }
 }
 
-void TcpTransport::readNextMessageIntCallback(struct bufferevent* bev, void* ctx) {
+void TcpTransport::readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport) {
   /* This callback is invoked when there is data to read on bev. */
 
   // protocol:  <length> <header length> <header data> <body data>
-  //                    1                   2                       3 4
+  //               1            2               3           4
   // rocketmq protocol contains 4 parts as following:
   //     1, big endian 4 bytes int, its length is sum of 2,3 and 4
   //     2, big endian 4 bytes int, its length is 3
   //     3, use json to serialization data
-  //     4, application could self-defination binary data
+  //     4, application could self-defined binary data
 
-  struct evbuffer* input = bufferevent_get_input(bev);
+  struct evbuffer* input = event->getInput();
   while (1) {
     struct evbuffer_iovec v[4];
     int n = evbuffer_peek(input, 4, NULL, v, sizeof(v) / sizeof(v[0]));
 
-    int idx = 0;
     char hdr[4];
     char* p = hdr;
-    unsigned int needed = 4;
+    size_t needed = 4;
 
-    for (idx = 0; idx < n; idx++) {
-      if (needed) {
-        unsigned int tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
+    for (int idx = 0; idx < n; idx++) {
+      if (needed > 0) {
+        size_t tmp = needed < v[idx].iov_len ? needed : v[idx].iov_len;
         memcpy(p, v[idx].iov_base, tmp);
         p += tmp;
         needed -= tmp;
@@ -296,80 +237,54 @@ void TcpTransport::readNextMessageIntCallback(struct bufferevent* bev, void* ctx
       }
     }
 
-    if (needed) {
-      LOG_DEBUG(" too little data received with sum = %d ", 4 - needed);
+    if (needed > 0) {
+      LOG_DEBUG("too little data received with sum = %d", 4 - needed);
       return;
     }
-    uint32 totalLenOfOneMsg = *(uint32*)hdr;  // first 4 bytes, which indicates 1st part of protocol
-    uint32 bytesInMessage = ntohl(totalLenOfOneMsg);
-    LOG_DEBUG("fd:%d, totalLen:" SIZET_FMT ", bytesInMessage:%d", bufferevent_getfd(bev), v[0].iov_len, bytesInMessage);
 
-    uint32 len = evbuffer_get_length(input);
-    if (len >= bytesInMessage + 4) {
-      LOG_DEBUG("had received all data with len:%d from fd:%d", len, bufferevent_getfd(bev));
+    uint32 totalLenOfOneMsg = *(uint32*)hdr;  // first 4 bytes, which indicates 1st part of protocol
+    uint32 msgLen = ntohl(totalLenOfOneMsg);
+    size_t recvLen = evbuffer_get_length(input);
+    if (recvLen >= msgLen + 4) {
+      LOG_DEBUG("had received all data. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
     } else {
-      LOG_DEBUG("didn't received whole bytesInMessage:%d, from fd:%d, totalLen:%d", bytesInMessage,
-                bufferevent_getfd(bev), len);
+      LOG_DEBUG("didn't received whole. msgLen:%d, from:%d, recvLen:%d", msgLen, event->getfd(), recvLen);
       return;  // consider large data which was not received completely by now
     }
 
-    if (bytesInMessage > 0) {
-      MemoryBlock messageData(bytesInMessage, true);
-      uint32 bytesRead = 0;
-      char* data = messageData.getData() + bytesRead;
-      bufferevent_read(bev, data, 4);
-      bytesRead = bufferevent_read(bev, data, bytesInMessage);
+    if (msgLen > 0) {
+      MemoryBlock msg(msgLen, true);
 
-      TcpTransport* tcpTrans = (TcpTransport*)ctx;
-      tcpTrans->messageReceived(messageData);
+      event->read(hdr, 4);  // skip length field
+      size_t bytesRead = event->read(msg.getData(), msgLen);
+
+      transport->messageReceived(msg, event->getPeerAddrPort());
     }
   }
 }
 
-bool TcpTransport::sendMessage(const char* pData, int len) {
-  boost::lock_guard<boost::mutex> lock(m_socketLock);
-  if (getTcpConnectStatus() != e_connectSuccess) {
-    return false;
-  }
-
-  int bytes_left = len;
-  int bytes_written = 0;
-  const char* ptr = pData;
-
-  /*NOTE:
-      1. do not need to consider large data which could not send by once, as
-     bufferevent could handle this case;
-  */
-  if (m_bufferEvent) {
-    bytes_written = bufferevent_write(m_bufferEvent, ptr, bytes_left);
-    if (bytes_written == 0)
-      return true;
-    else
-      return false;
+void TcpTransport::messageReceived(const MemoryBlock& mem, const std::string& addr) {
+  if (m_readCallback != nullptr) {
+    m_readCallback(m_tcpRemotingClient, mem, addr);
   }
-  return false;
 }
 
-void TcpTransport::messageReceived(const MemoryBlock& mem) {
-  if (m_readcallback) {
-    m_readcallback(m_tcpRemotingClient, mem, getPeerAddrAndPort());
+bool TcpTransport::sendMessage(const char* pData, size_t len) {
+  std::lock_guard<std::mutex> lock(m_eventLock);
+  if (getTcpConnectStatus() != TCP_CONNECT_STATUS_SUCCESS) {
+    return false;
   }
+
+  /* NOTE:
+      do not need to consider large data which could not send by once, as
+      bufferevent could handle this case;
+   */
+  return m_event != nullptr && m_event->write(pData, len) == 0;
 }
 
 const string TcpTransport::getPeerAddrAndPort() {
-  struct sockaddr_in broker;
-  socklen_t cLen = sizeof(broker);
-
-  // getsockname(m_socket->getRawSocketHandle(), (struct sockaddr*) &s, &sLen);
-  // // ! use connectSock here.
-  getpeername(bufferevent_getfd(m_bufferEvent), (struct sockaddr*)&broker, &cLen);  // ! use connectSock here.
-  LOG_DEBUG("broker addr: %s, broker port: %d", inet_ntoa(broker.sin_addr), ntohs(broker.sin_port));
-  string brokerAddr(inet_ntoa(broker.sin_addr));
-  brokerAddr.append(":");
-  string brokerPort(UtilAll::to_string(ntohs(broker.sin_port)));
-  brokerAddr.append(brokerPort);
-  LOG_DEBUG("brokerAddr:%s", brokerAddr.c_str());
-  return brokerAddr;
+  std::lock_guard<std::mutex> lock(m_eventLock);
+  return m_event ? m_event->getPeerAddrPort() : "";
 }
 
 const uint64_t TcpTransport::getStartTime() const {
diff --git a/src/transport/TcpTransport.h b/src/transport/TcpTransport.h
old mode 100644
new mode 100755
index cda03ca..bff23dd
--- a/src/transport/TcpTransport.h
+++ b/src/transport/TcpTransport.h
@@ -17,73 +17,77 @@
 #ifndef __TCPTRANSPORT_H__
 #define __TCPTRANSPORT_H__
 
-#include <boost/atomic.hpp>
-#include <boost/thread/condition_variable.hpp>
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
-#include "dataBlock.h"
+#include <atomic>
+#include <condition_variable>
+#include <mutex>
 
-extern "C" {
-#include "event2/buffer.h"
-#include "event2/bufferevent.h"
-#include "event2/event.h"
-#include "event2/thread.h"
-}
+#include "EventLoop.h"
+#include "dataBlock.h"
 
 namespace rocketmq {
+
 //<!***************************************************************************
-typedef enum { e_connectInit = 0, e_connectWaitResponse = 1, e_connectSuccess = 2, e_connectFail = 3 } tcpConnectStatus;
+typedef enum TcpConnectStatus {
+  TCP_CONNECT_STATUS_INIT = 0,
+  TCP_CONNECT_STATUS_WAIT = 1,
+  TCP_CONNECT_STATUS_SUCCESS = 2,
+  TCP_CONNECT_STATUS_FAILED = 3
+} TcpConnectStatus;
+
+using TcpTransportReadCallback = void (*)(void* context, const MemoryBlock&, const std::string&);
 
-typedef void (*READ_CALLBACK)(void* context, const MemoryBlock&, const std::string&);
 class TcpRemotingClient;
-class TcpTransport {
+
+class TcpTransport : public std::enable_shared_from_this<TcpTransport> {
  public:
-  TcpTransport(TcpRemotingClient* pTcpRemointClient, READ_CALLBACK handle = NULL);
+  static std::shared_ptr<TcpTransport> CreateTransport(TcpRemotingClient* pTcpRemotingClient,
+                                                       TcpTransportReadCallback handle = nullptr) {
+    // transport must be managed by smart pointer
+    std::shared_ptr<TcpTransport> transport(new TcpTransport(pTcpRemotingClient, handle));
+    return transport;
+  }
+
   virtual ~TcpTransport();
 
-  tcpConnectStatus connect(const std::string& strServerURL, int timeOutMillisecs = 3000);
   void disconnect(const std::string& addr);
-  tcpConnectStatus waitTcpConnectEvent(int timeoutMillisecs = 3000);
-  void setTcpConnectStatus(tcpConnectStatus connectStatus);
-  tcpConnectStatus getTcpConnectStatus();
-  bool sendMessage(const char* pData, int len);
+  TcpConnectStatus connect(const std::string& strServerURL, int timeoutMillis = 3000);
+  TcpConnectStatus waitTcpConnectEvent(int timeoutMillis = 3000);
+  TcpConnectStatus getTcpConnectStatus();
+
+  bool sendMessage(const char* pData, size_t len);
   const std::string getPeerAddrAndPort();
   const uint64_t getStartTime() const;
 
  private:
-  void messageReceived(const MemoryBlock& mem);
-  static void readNextMessageIntCallback(struct bufferevent* bev, void* ctx);
-  static void eventcb(struct bufferevent* bev, short what, void* ctx);
-  static void timeoutcb(evutil_socket_t fd, short what, void* arg);
-  void runThread();
-  void clearBufferEventCallback();
-  void freeBufferEvent();
-  void exitBaseDispatch();
-  void setTcpConnectEvent(tcpConnectStatus connectStatus);
+  TcpTransport(TcpRemotingClient* pTcpRemotingClient, TcpTransportReadCallback handle = nullptr);
+
+  static void readNextMessageIntCallback(BufferEvent* event, TcpTransport* transport);
+  static void eventCallback(BufferEvent* event, short what, TcpTransport* transport);
+
+  void messageReceived(const MemoryBlock& mem, const std::string& addr);
+  void freeBufferEvent();  // not thread-safe
+
+  void setTcpConnectEvent(TcpConnectStatus connectStatus);
+  void setTcpConnectStatus(TcpConnectStatus connectStatus);
+
   u_long getInetAddr(std::string& hostname);
 
  private:
   uint64_t m_startTime;
-  boost::mutex m_socketLock;
-  struct event_base* m_eventBase;
-  struct bufferevent* m_bufferEvent;
-  boost::atomic<tcpConnectStatus> m_tcpConnectStatus;
-  boost::mutex m_connectEventLock;
-  boost::condition_variable_any m_connectEvent;
 
-  boost::atomic<bool> m_event_base_status;
-  boost::mutex m_event_base_mtx;
-  boost::condition_variable_any m_event_base_cv;
+  std::shared_ptr<BufferEvent> m_event;  // NOTE: use m_event in callback is unsafe.
+  std::mutex m_eventLock;
+  std::atomic<TcpConnectStatus> m_tcpConnectStatus;
 
-  //<!read data thread
-  boost::thread* m_ReadDatathread;
+  std::mutex m_connectEventLock;
+  std::condition_variable m_connectEvent;
 
   //<! read data callback
-  READ_CALLBACK m_readcallback;
+  TcpTransportReadCallback m_readCallback;
   TcpRemotingClient* m_tcpRemotingClient;
 };
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif


Mime
View raw message