rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: feat(client): add timer to clean off line broker and test case. (#222)
Date Fri, 10 Jan 2020 04:52:06 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new b5b3b35  feat(client): add timer to clean off line broker and test case. (#222)
b5b3b35 is described below

commit b5b3b35f0a047799c4be1e11b8956cf00959f984
Author: dinglei <libya_003@163.com>
AuthorDate: Fri Jan 10 12:51:58 2020 +0800

    feat(client): add timer to clean off line broker and test case. (#222)
    
    * fix(tranport): using recursive mutex to avoid death lock
    
    * fix(transport): add test case for client factory.
    
    * feat(unittest): open test case for client api impl
    
    * feat(unittest): open test case for client api impl
---
 .gitignore                        |   1 +
 build.sh                          |   6 +-
 src/MQClientAPIImpl.cpp           |  12 +-
 src/MQClientAPIImpl.h             | 280 +++++++++++++++++++-------------------
 src/MQClientFactory.cpp           | 104 +++++++++++---
 src/MQClientFactory.h             |  20 ++-
 src/MQClientManager.cpp           |   2 +-
 src/MQClientManager.h             |  12 +-
 src/common/TopAddressing.h        |   4 +-
 src/transport/TcpRemotingClient.h |  24 ++--
 test/src/MQClientAPIImpTest.cpp   | 177 ++++++++++++++++++++++++
 test/src/MQClientFactoryTest.cpp  | 106 +++++++++++++++
 12 files changed, 557 insertions(+), 191 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5693999..22c7350 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,3 +4,4 @@ bin
 build
 libs/signature/lib
 tmp_*
+Testing
diff --git a/build.sh b/build.sh
index ca1bc25..77ab32b 100755
--- a/build.sh
+++ b/build.sh
@@ -368,7 +368,11 @@ ExecutionTesting() {
   fi
   echo "############# unit test  start  ###########"
   cd ${build_dir}
-  make test
+  if [ $verbose -eq 0 ]; then
+    ctest
+  else
+    ctest -V
+  fi
   if [ $? -ne 0 ]; then
     echo "############# unit test failed  ###########"
     exit 1
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 0877a03..7520eb5 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -252,9 +252,9 @@ SendResult MQClientAPIImpl::sendMessage(const string& addr,
   return SendResult();
 }
 
-void MQClientAPIImpl::sendHearbeat(const string& addr,
-                                   HeartbeatData* pHeartbeatData,
-                                   const SessionCredentials& sessionCredentials) {
+void MQClientAPIImpl::sendHeartbeat(const string& addr,
+                                    HeartbeatData* pHeartbeatData,
+                                    const SessionCredentials& sessionCredentials) {
   RemotingCommand request(HEART_BEAT, NULL);
 
   string body;
@@ -265,7 +265,9 @@ void MQClientAPIImpl::sendHearbeat(const string& addr,
   request.Encode();
 
   if (m_pRemotingClient->invokeHeartBeat(addr, request)) {
-    LOG_INFO("sendheartbeat to broker:%s success", addr.c_str());
+    LOG_DEBUG("sendHeartbeat to broker:%s success", addr.c_str());
+  } else {
+    LOG_WARN("sendHeartbeat to broker:%s failed", addr.c_str());
   }
 }
 
@@ -314,6 +316,7 @@ TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const string& t
           }
         }
         case TOPIC_NOT_EXIST: {
+          LOG_WARN("Get topic[%s] route failed [TOPIC_NOT_EXIST].", topic.c_str());
           return NULL;
         }
         default:
@@ -323,6 +326,7 @@ TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const string& t
       return NULL;
     }
   }
+  LOG_WARN("Get topic[%s] route failed [Null Response].", topic.c_str());
   return NULL;
 }
 
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9555d72..9f08dac 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -49,150 +49,152 @@ class MQClientAPIImpl {
                   uint64_t tcpTransportTryLockTimeout,
                   string unitName);
   virtual ~MQClientAPIImpl();
-  void stopAllTcpTransportThread();
-  bool writeDataToFile(string filename, string data, bool isSync);
-  string fetchNameServerAddr(const string& NSDomain);
-  void updateNameServerAddr(const string& addrs);
-
-  void callSignatureBeforeRequest(const string& addr,
-                                  RemotingCommand& request,
-                                  const SessionCredentials& session_credentials);
-  void createTopic(const string& addr,
-                   const string& defaultTopic,
-                   TopicConfig topicConfig,
-                   const SessionCredentials& sessionCredentials);
-  void endTransactionOneway(std::string addr,
-                            EndTransactionRequestHeader* requestHeader,
-                            std::string remark,
-                            const SessionCredentials& sessionCredentials);
-
-  SendResult sendMessage(const string& addr,
-                         const string& brokerName,
-                         const MQMessage& msg,
-                         SendMessageRequestHeader* pRequestHeader,
-                         int timeoutMillis,
-                         int maxRetrySendTimes,
-                         int communicationMode,
-                         SendCallback* pSendCallback,
-                         const SessionCredentials& sessionCredentials);
-
-  PullResult* pullMessage(const string& addr,
-                          PullMessageRequestHeader* pRequestHeader,
-                          int timeoutMillis,
-                          int communicationMode,
-                          PullCallback* pullCallback,
-                          void* pArg,
-                          const SessionCredentials& sessionCredentials);
-
-  void sendHearbeat(const string& addr, HeartbeatData* pHeartbeatData, const SessionCredentials& sessionCredentials);
-
-  void unregisterClient(const string& addr,
-                        const string& clientID,
-                        const string& producerGroup,
-                        const string& consumerGroup,
-                        const SessionCredentials& sessionCredentials);
-
-  TopicRouteData* getTopicRouteInfoFromNameServer(const string& topic,
-                                                  int timeoutMillis,
-                                                  const SessionCredentials& sessionCredentials);
-
-  TopicList* getTopicListFromNameServer(const SessionCredentials& sessionCredentials);
-
-  int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis);
-
-  void deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis);
-
-  void deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis);
-
-  void deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis);
-
-  string getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
-
-  KVTable getKVListByNamespace(const string& projectNamespace, int timeoutMillis);
-
-  void deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
-
-  SendResult processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse);
-
-  PullResult* processPullResponse(RemotingCommand* pResponse);
-
-  int64 getMinOffset(const string& addr,
-                     const string& topic,
-                     int queueId,
-                     int timeoutMillis,
-                     const SessionCredentials& sessionCredentials);
-
-  int64 getMaxOffset(const string& addr,
-                     const string& topic,
-                     int queueId,
-                     int timeoutMillis,
-                     const SessionCredentials& sessionCredentials);
-
-  int64 searchOffset(const string& addr,
-                     const string& topic,
-                     int queueId,
-                     uint64_t timestamp,
-                     int timeoutMillis,
-                     const SessionCredentials& sessionCredentials);
-
-  MQMessageExt* viewMessage(const string& addr,
-                            int64 phyoffset,
-                            int timeoutMillis,
-                            const SessionCredentials& sessionCredentials);
-
-  int64 getEarliestMsgStoretime(const string& addr,
-                                const string& topic,
-                                int queueId,
-                                int timeoutMillis,
-                                const SessionCredentials& sessionCredentials);
+  virtual void stopAllTcpTransportThread();
+  virtual bool writeDataToFile(string filename, string data, bool isSync);
+  virtual string fetchNameServerAddr(const string& NSDomain);
+  virtual void updateNameServerAddr(const string& addrs);
+
+  virtual void callSignatureBeforeRequest(const string& addr,
+                                          RemotingCommand& request,
+                                          const SessionCredentials& session_credentials);
+  virtual void createTopic(const string& addr,
+                           const string& defaultTopic,
+                           TopicConfig topicConfig,
+                           const SessionCredentials& sessionCredentials);
+  virtual void endTransactionOneway(std::string addr,
+                                    EndTransactionRequestHeader* requestHeader,
+                                    std::string remark,
+                                    const SessionCredentials& sessionCredentials);
+
+  virtual SendResult sendMessage(const string& addr,
+                                 const string& brokerName,
+                                 const MQMessage& msg,
+                                 SendMessageRequestHeader* pRequestHeader,
+                                 int timeoutMillis,
+                                 int maxRetrySendTimes,
+                                 int communicationMode,
+                                 SendCallback* pSendCallback,
+                                 const SessionCredentials& sessionCredentials);
+
+  virtual PullResult* pullMessage(const string& addr,
+                                  PullMessageRequestHeader* pRequestHeader,
+                                  int timeoutMillis,
+                                  int communicationMode,
+                                  PullCallback* pullCallback,
+                                  void* pArg,
+                                  const SessionCredentials& sessionCredentials);
+
+  virtual void sendHeartbeat(const string& addr,
+                             HeartbeatData* pHeartbeatData,
+                             const SessionCredentials& sessionCredentials);
 
-  void getConsumerIdListByGroup(const string& addr,
+  virtual void unregisterClient(const string& addr,
+                                const string& clientID,
+                                const string& producerGroup,
                                 const string& consumerGroup,
-                                vector<string>& cids,
-                                int timeoutMillis,
                                 const SessionCredentials& sessionCredentials);
 
-  int64 queryConsumerOffset(const string& addr,
-                            QueryConsumerOffsetRequestHeader* pRequestHeader,
-                            int timeoutMillis,
-                            const SessionCredentials& sessionCredentials);
+  virtual TopicRouteData* getTopicRouteInfoFromNameServer(const string& topic,
+                                                          int timeoutMillis,
+                                                          const SessionCredentials& sessionCredentials);
 
-  void updateConsumerOffset(const string& addr,
-                            UpdateConsumerOffsetRequestHeader* pRequestHeader,
-                            int timeoutMillis,
-                            const SessionCredentials& sessionCredentials);
+  virtual TopicList* getTopicListFromNameServer(const SessionCredentials& sessionCredentials);
 
-  void updateConsumerOffsetOneway(const string& addr,
-                                  UpdateConsumerOffsetRequestHeader* pRequestHeader,
-                                  int timeoutMillis,
-                                  const SessionCredentials& sessionCredentials);
+  virtual int wipeWritePermOfBroker(const string& namesrvAddr, const string& brokerName, int timeoutMillis);
 
-  void consumerSendMessageBack(const string addr,
-                               MQMessageExt& msg,
-                               const string& consumerGroup,
-                               int delayLevel,
-                               int timeoutMillis,
-                               const SessionCredentials& sessionCredentials);
-
-  void lockBatchMQ(const string& addr,
-                   LockBatchRequestBody* requestBody,
-                   vector<MQMessageQueue>& mqs,
-                   int timeoutMillis,
-                   const SessionCredentials& sessionCredentials);
-
-  void unlockBatchMQ(const string& addr,
-                     UnlockBatchRequestBody* requestBody,
-                     int timeoutMillis,
-                     const SessionCredentials& sessionCredentials);
-
-  void sendMessageAsync(const string& addr,
-                        const string& brokerName,
-                        const MQMessage& msg,
-                        RemotingCommand& request,
-                        SendCallback* pSendCallback,
-                        int64 timeoutMilliseconds,
-                        int maxRetryTimes = 1,
-                        int retrySendTimes = 1);
+  virtual void deleteTopicInBroker(const string& addr, const string& topic, int timeoutMillis);
+
+  virtual void deleteTopicInNameServer(const string& addr, const string& topic, int timeoutMillis);
+
+  virtual void deleteSubscriptionGroup(const string& addr, const string& groupName, int timeoutMillis);
+
+  virtual string getKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
+
+  virtual KVTable getKVListByNamespace(const string& projectNamespace, int timeoutMillis);
+
+  virtual void deleteKVConfigByValue(const string& projectNamespace, const string& projectGroup, int timeoutMillis);
+
+  virtual SendResult processSendResponse(const string& brokerName, const MQMessage& msg, RemotingCommand* pResponse);
+
+  virtual PullResult* processPullResponse(RemotingCommand* pResponse);
+
+  virtual int64 getMinOffset(const string& addr,
+                             const string& topic,
+                             int queueId,
+                             int timeoutMillis,
+                             const SessionCredentials& sessionCredentials);
+
+  virtual int64 getMaxOffset(const string& addr,
+                             const string& topic,
+                             int queueId,
+                             int timeoutMillis,
+                             const SessionCredentials& sessionCredentials);
+
+  virtual int64 searchOffset(const string& addr,
+                             const string& topic,
+                             int queueId,
+                             uint64_t timestamp,
+                             int timeoutMillis,
+                             const SessionCredentials& sessionCredentials);
+
+  virtual MQMessageExt* viewMessage(const string& addr,
+                                    int64 phyoffset,
+                                    int timeoutMillis,
+                                    const SessionCredentials& sessionCredentials);
+
+  virtual int64 getEarliestMsgStoretime(const string& addr,
+                                        const string& topic,
+                                        int queueId,
+                                        int timeoutMillis,
+                                        const SessionCredentials& sessionCredentials);
+
+  virtual void getConsumerIdListByGroup(const string& addr,
+                                        const string& consumerGroup,
+                                        vector<string>& cids,
+                                        int timeoutMillis,
+                                        const SessionCredentials& sessionCredentials);
+
+  virtual int64 queryConsumerOffset(const string& addr,
+                                    QueryConsumerOffsetRequestHeader* pRequestHeader,
+                                    int timeoutMillis,
+                                    const SessionCredentials& sessionCredentials);
+
+  virtual void updateConsumerOffset(const string& addr,
+                                    UpdateConsumerOffsetRequestHeader* pRequestHeader,
+                                    int timeoutMillis,
+                                    const SessionCredentials& sessionCredentials);
+
+  virtual void updateConsumerOffsetOneway(const string& addr,
+                                          UpdateConsumerOffsetRequestHeader* pRequestHeader,
+                                          int timeoutMillis,
+                                          const SessionCredentials& sessionCredentials);
+
+  virtual void consumerSendMessageBack(const string addr,
+                                       MQMessageExt& msg,
+                                       const string& consumerGroup,
+                                       int delayLevel,
+                                       int timeoutMillis,
+                                       const SessionCredentials& sessionCredentials);
+
+  virtual void lockBatchMQ(const string& addr,
+                           LockBatchRequestBody* requestBody,
+                           vector<MQMessageQueue>& mqs,
+                           int timeoutMillis,
+                           const SessionCredentials& sessionCredentials);
+
+  virtual void unlockBatchMQ(const string& addr,
+                             UnlockBatchRequestBody* requestBody,
+                             int timeoutMillis,
+                             const SessionCredentials& sessionCredentials);
+
+  virtual void sendMessageAsync(const string& addr,
+                                const string& brokerName,
+                                const MQMessage& msg,
+                                RemotingCommand& request,
+                                SendCallback* pSendCallback,
+                                int64 timeoutMilliseconds,
+                                int maxRetryTimes = 1,
+                                int retrySendTimes = 1);
 
  private:
   SendResult sendMessageSync(const string& addr,
@@ -213,8 +215,10 @@ class MQClientAPIImpl {
                         PullCallback* pullCallback,
                         void* pArg);
 
- private:
+ protected:
   unique_ptr<TcpRemotingClient> m_pRemotingClient;
+
+ private:
   unique_ptr<TopAddressing> m_topAddressing;
   string m_nameSrvAddr;
   bool m_firstFetchNameSrv;
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 03d4640..6315cb1 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -136,7 +136,7 @@ boost::shared_ptr<TopicPublishInfo> MQClientFactory::tryToFindTopicPublishInfo(
   if (!isTopicInfoValidInTable(topic)) {
     updateTopicRouteInfoFromNameServer(topic, session_credentials);
   }
-  //<!if not exsit ,update dafult topic;
+  //<!if not exist ,update default topic;
   if (!isTopicInfoValidInTable(topic)) {
     LOG_INFO("updateTopicRouteInfoFromNameServer with default");
     updateTopicRouteInfoFromNameServer(topic, session_credentials, true);
@@ -156,7 +156,7 @@ bool MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
                                                          bool isDefault /* = false */) {
   boost::lock_guard<boost::mutex> lock(m_factoryLock);
   unique_ptr<TopicRouteData> pTopicRouteData;
-  LOG_INFO("updateTopicRouteInfoFromNameServer start:%s", topic.c_str());
+  LOG_DEBUG("updateTopicRouteInfoFromNameServer start. Topic:%s", topic.c_str());
 
   if (isDefault) {
     pTopicRouteData.reset(
@@ -176,7 +176,7 @@ bool MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
   }
 
   if (pTopicRouteData != NULL) {
-    LOG_INFO("updateTopicRouteInfoFromNameServer has data");
+    LOG_DEBUG("updateTopicRouteInfoFromNameServer has data");
     TopicRouteData* pTemp = getTopicRouteData(topic);
     bool changed = true;
     if (pTemp != NULL) {
@@ -211,7 +211,7 @@ bool MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
     LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
     return true;
   }
-  LOG_DEBUG("updateTopicRouteInfoFromNameServer end null:%s", topic.c_str());
+  LOG_DEBUG("updateTopicRouteInfoFromNameServer end:%s", topic.c_str());
   return false;
 }
 
@@ -424,7 +424,7 @@ void MQClientFactory::insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbea
 }
 
 MQConsumer* MQClientFactory::selectConsumer(const string& group) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   if (m_consumerTable.find(group) != m_consumerTable.end()) {
     return m_consumerTable[group];
   }
@@ -432,7 +432,7 @@ MQConsumer* MQClientFactory::selectConsumer(const string& group) {
 }
 
 bool MQClientFactory::getSessionCredentialFromConsumerTable(SessionCredentials& sessionCredentials) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
     if (it->second)
       sessionCredentials = it->second->getSessionCredentials();
@@ -446,7 +446,7 @@ bool MQClientFactory::getSessionCredentialFromConsumerTable(SessionCredentials&
 
 bool MQClientFactory::getSessionCredentialFromConsumer(const string& consumerGroup,
                                                        SessionCredentials& sessionCredentials) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   if (m_consumerTable.find(consumerGroup) != m_consumerTable.end()) {
     sessionCredentials = m_consumerTable[consumerGroup]->getSessionCredentials();
   }
@@ -458,7 +458,7 @@ bool MQClientFactory::getSessionCredentialFromConsumer(const string& consumerGro
 }
 
 bool MQClientFactory::addConsumerToTable(const string& consumerName, MQConsumer* pMQConsumer) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   if (m_consumerTable.find(consumerName) != m_consumerTable.end())
     return false;
   m_consumerTable[consumerName] = pMQConsumer;
@@ -466,7 +466,7 @@ bool MQClientFactory::addConsumerToTable(const string& consumerName, MQConsumer*
 }
 
 void MQClientFactory::eraseConsumerFromTable(const string& consumerName) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   if (m_consumerTable.find(consumerName) != m_consumerTable.end())
     m_consumerTable.erase(consumerName);  // do not need freee pConsumer, as it
                                           // was allocated by user
@@ -475,12 +475,12 @@ void MQClientFactory::eraseConsumerFromTable(const string& consumerName) {
 }
 
 int MQClientFactory::getConsumerTableSize() {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   return m_consumerTable.size();
 }
 
 void MQClientFactory::getTopicListFromConsumerSubscription(set<string>& topicList) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
     vector<SubscriptionData> result;
     it->second->getSubscriptions(result);
@@ -493,14 +493,14 @@ void MQClientFactory::getTopicListFromConsumerSubscription(set<string>& topicLis
 }
 
 void MQClientFactory::updateConsumerSubscribeTopicInfo(const string& topic, vector<MQMessageQueue> mqs) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
     it->second->updateTopicSubscribeInfo(topic, mqs);
   }
 }
 
 void MQClientFactory::insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
     MQConsumer* pConsumer = it->second;
     ConsumerData consumerData;
@@ -563,6 +563,27 @@ void MQClientFactory::clearBrokerAddrMap() {
   m_brokerAddrTable.clear();
 }
 
+bool MQClientFactory::isBrokerAddressInUse(const std::string& address) {
+  if (m_topicRouteTableMutex.try_lock()) {
+    boost::lock_guard<boost::mutex> lk(m_topicRouteTableMutex, boost::adopt_lock_t());
+    for (TRDMAP::iterator it = m_topicRouteTable.begin(); it != m_topicRouteTable.end(); it++) {
+      TopicRouteData* topicRouteData = it->second;
+      vector<BrokerData>& brokerData = topicRouteData->getBrokerDatas();
+      for (vector<BrokerData>::iterator next = brokerData.begin(); next != brokerData.end(); next++) {
+        map<int, string>& brokerAddresses = next->brokerAddrs;
+        for (map<int, string>::iterator entry = brokerAddresses.begin(); entry != brokerAddresses.end(); entry++) {
+          if (address == entry->second) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  } else {
+    LOG_WARN("Cannot lock m_topicRouteTableMutex. Assume %s is still in use", address.c_str());
+    return true;
+  }
+}
 void MQClientFactory::addBrokerToAddrMap(const string& brokerName, map<int, string>& brokerAddrs) {
   boost::lock_guard<boost::mutex> lock(m_brokerAddrlock);
   if (m_brokerAddrTable.find(brokerName) != m_brokerAddrTable.end()) {
@@ -697,6 +718,35 @@ MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl() const {
   return m_pClientAPIImpl.get();
 }
 
+void MQClientFactory::cleanOfflineBrokers() {
+  LOG_DEBUG("Begin to clean offline brokers");
+  boost::lock_guard<boost::mutex> lock(m_brokerAddrlock);
+
+  for (BrokerAddrMAP::iterator it = m_brokerAddrTable.begin(); it != m_brokerAddrTable.end();) {
+    std::string brokerName = it->first;
+    map<int, std::string> brokerIdAddressMap = it->second;
+
+    for (map<int, std::string>::iterator next = brokerIdAddressMap.begin(); next != brokerIdAddressMap.end();) {
+      if (!isBrokerAddressInUse(next->second)) {
+        LOG_INFO("Remove broker address: %s", (next->second).c_str());
+        brokerIdAddressMap.erase(next++);
+      } else {
+        next++;
+      }
+    }
+
+    if (brokerIdAddressMap.empty()) {
+      m_brokerAddrTable.erase(it++);
+      LOG_INFO("Broker name: %s is purged from client", brokerName.c_str());
+    } else {
+      LOG_DEBUG("Broker: %s is alive", brokerName.c_str());
+      it++;
+    }
+  }
+
+  LOG_DEBUG("Exit of cleaning offline brokers");
+}
+
 void MQClientFactory::sendHeartbeatToAllBroker() {
   BrokerAddrMAP brokerTable(getBrokerAddrMap());
   if (brokerTable.size() == 0) {
@@ -724,7 +774,7 @@ void MQClientFactory::sendHeartbeatToAllBroker() {
         continue;
 
       try {
-        m_pClientAPIImpl->sendHearbeat(addr, heartbeatData.get(), session_credentials);
+        m_pClientAPIImpl->sendHeartbeat(addr, heartbeatData.get(), session_credentials);
       } catch (MQException& e) {
         LOG_ERROR(e.what());
       }
@@ -735,7 +785,7 @@ void MQClientFactory::sendHeartbeatToAllBroker() {
 
 void MQClientFactory::persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
   {
-    boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+    boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
     if (m_consumerTable.size() > 0) {
       for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
         LOG_DEBUG("Client factory start persistAllConsumerOffset");
@@ -771,6 +821,14 @@ void MQClientFactory::timerCB_sendHeartbeatToAllBroker(boost::system::error_code
   t->async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec, t));
 }
 
+void MQClientFactory::timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
+  cleanOfflineBrokers();
+
+  boost::system::error_code e;
+  t->expires_from_now(t->expires_from_now() + boost::posix_time::seconds(30), e);
+  t->async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec, t));
+}
+
 void MQClientFactory::fetchNameServerAddr(boost::system::error_code& ec, boost::asio::deadline_timer* t) {
   m_pClientAPIImpl->fetchNameServerAddr(m_nameSrvDomain);
 
@@ -793,6 +851,10 @@ void MQClientFactory::startScheduledTask(bool startFetchNSService) {
   boost::asio::deadline_timer t2(m_async_ioService, boost::posix_time::milliseconds(10));
   t2.async_wait(boost::bind(&MQClientFactory::timerCB_sendHeartbeatToAllBroker, this, ec2, &t2));
 
+  boost::system::error_code ec3;
+  boost::asio::deadline_timer t3(m_async_ioService, boost::posix_time::seconds(3));
+  t3.async_wait(boost::bind(&MQClientFactory::timerCB_cleanOfflineBrokers, this, ec3, &t3));
+
   if (startFetchNSService) {
     boost::system::error_code ec5;
     boost::asio::deadline_timer* t5 =
@@ -843,18 +905,18 @@ void MQClientFactory::timerCB_doRebalance(boost::system::error_code& ec, boost::
 }
 
 void MQClientFactory::doRebalance() {
-  LOG_INFO("Client factory:%s start dorebalance", m_clientId.c_str());
+  LOG_DEBUG("Client factory:%s start doRebalance", m_clientId.c_str());
   if (getConsumerTableSize() > 0) {
-    boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+    boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
     for (MQCMAP::iterator it = m_consumerTable.begin(); it != m_consumerTable.end(); ++it) {
       it->second->doRebalance();
     }
   }
-  LOG_INFO("Client factory:%s finish dorebalance", m_clientId.c_str());
+  LOG_DEBUG("Client factory:%s finish doRebalance", m_clientId.c_str());
 }
 
 void MQClientFactory::doRebalanceByConsumerGroup(const string& consumerGroup) {
-  boost::lock_guard<boost::mutex> lock(m_consumerTableMutex);
+  boost::lock_guard<boost::recursive_mutex> lock(m_consumerTableMutex);
   if (m_consumerTable.find(consumerGroup) != m_consumerTable.end()) {
     LOG_INFO("Client factory:%s start dorebalance for consumer:%s", m_clientId.c_str(), consumerGroup.c_str());
     MQConsumer* pMQConsumer = m_consumerTable[consumerGroup];
@@ -1125,9 +1187,9 @@ void MQClientFactory::getSessionCredentialsFromOneOfProducerOrConsumer(SessionCr
     getSessionCredentialFromConsumerTable(session_credentials);
 
   if (!session_credentials.isValid()) {
-    LOG_ERROR(
+    LOG_INFO(
         "updateTopicRouteInfo: didn't get the session_credentials from any "
-        "producers and consumers, please re-intialize it");
+        "producers and consumers, please re-intialize it if application needs authentication");
   }
 }
 
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index e0d0efd..b5e5441 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -20,6 +20,7 @@
 #include <boost/asio/io_service.hpp>
 #include <boost/bind.hpp>
 #include <boost/date_time/posix_time/posix_time.hpp>
+#include <boost/thread/recursive_mutex.hpp>
 #include <boost/thread/thread.hpp>
 #include "FindBrokerResult.h"
 #include "MQClientAPIImpl.h"
@@ -103,6 +104,8 @@ class MQClientFactory {
   void doRebalanceByConsumerGroup(const string& consumerGroup);
   void sendHeartbeatToAllBroker();
 
+  void cleanOfflineBrokers();
+
   void findConsumerIds(const string& topic,
                        const string& group,
                        vector<string>& cids,
@@ -114,6 +117,8 @@ class MQClientFactory {
   map<string, map<int, string>> getBrokerAddrMap();
   void clearBrokerAddrMap();
 
+  bool isBrokerAddressInUse(const std::string& address);
+
  private:
   void unregisterClient(const string& producerGroup,
                         const string& consumerGroup,
@@ -128,6 +133,8 @@ class MQClientFactory {
   void updateTopicRouteInfo(boost::system::error_code& ec, boost::asio::deadline_timer* t);
   void timerCB_sendHeartbeatToAllBroker(boost::system::error_code& ec, boost::asio::deadline_timer* t);
 
+  void timerCB_cleanOfflineBrokers(boost::system::error_code& ec, boost::asio::deadline_timer* t);
+
   // consumer related operation
   void consumer_timerOperation();
   void persistAllConsumerOffset(boost::system::error_code& ec, boost::asio::deadline_timer* t);
@@ -157,8 +164,12 @@ class MQClientFactory {
 
   void getSessionCredentialsFromOneOfProducerOrConsumer(SessionCredentials& session_credentials);
 
- private:
+ protected:
   string m_clientId;
+  unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
+  unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;
+
+ private:
   string m_nameSrvDomain;  // per clientId
   ServiceState m_serviceState;
   bool m_bFetchNSService;
@@ -170,7 +181,8 @@ class MQClientFactory {
 
   //<! group --> MQConsumer;
   typedef map<string, MQConsumer*> MQCMAP;
-  boost::mutex m_consumerTableMutex;
+  // Changed to recursive mutex due to avoid deadlock issue:
+  boost::recursive_mutex m_consumerTableMutex;
   MQCMAP m_consumerTable;
 
   //<! Topic---> TopicRouteData
@@ -192,10 +204,6 @@ class MQClientFactory {
   boost::mutex m_factoryLock;
   boost::mutex m_topicPublishInfoLock;
 
-  //<!clientapi;
-  unique_ptr<MQClientAPIImpl> m_pClientAPIImpl;
-  unique_ptr<ClientRemotingProcessor> m_pClientRemotingProcessor;
-
   boost::asio::io_service m_async_ioService;
   unique_ptr<boost::thread> m_async_service_thread;
 
diff --git a/src/MQClientManager.cpp b/src/MQClientManager.cpp
index 2a658c8..de35ddd 100644
--- a/src/MQClientManager.cpp
+++ b/src/MQClientManager.cpp
@@ -54,4 +54,4 @@ void MQClientManager::removeClientFactory(const string& clientId) {
   }
 }
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/MQClientManager.h b/src/MQClientManager.h
index fc253d7..d846f96 100644
--- a/src/MQClientManager.h
+++ b/src/MQClientManager.h
@@ -27,11 +27,11 @@ namespace rocketmq {
 class MQClientManager {
  public:
   virtual ~MQClientManager();
-  MQClientFactory* getMQClientFactory(const string& clientId,
-                                      int pullThreadNum,
-                                      uint64_t tcpConnectTimeout,
-                                      uint64_t tcpTransportTryLockTimeout,
-                                      string unitName);
+  virtual MQClientFactory* getMQClientFactory(const string& clientId,
+                                              int pullThreadNum,
+                                              uint64_t tcpConnectTimeout,
+                                              uint64_t tcpTransportTryLockTimeout,
+                                              string unitName);
   void removeClientFactory(const string& clientId);
 
   static MQClientManager* getInstance();
@@ -45,6 +45,6 @@ class MQClientManager {
 };
 
 //<!***************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif
diff --git a/src/common/TopAddressing.h b/src/common/TopAddressing.h
index 80ad170..a698862 100644
--- a/src/common/TopAddressing.h
+++ b/src/common/TopAddressing.h
@@ -31,7 +31,7 @@ class TopAddressing {
   virtual ~TopAddressing();
 
  public:
-  string fetchNSAddr(const string& NSDomain);
+  virtual string fetchNSAddr(const string& NSDomain);
 
  private:
   string clearNewLine(const string& str);
@@ -43,5 +43,5 @@ class TopAddressing {
   list<string> m_addrs;
   string m_unitName;
 };
-}
+}  // namespace rocketmq
 #endif
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
old mode 100755
new mode 100644
index c612e9d..265177f
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -39,24 +39,24 @@ class TcpRemotingClient {
   TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout);
   virtual ~TcpRemotingClient();
 
-  void stopAllTcpTransportThread();
-  void updateNameServerAddressList(const string& addrs);
+  virtual void stopAllTcpTransportThread();
+  virtual void updateNameServerAddressList(const string& addrs);
 
-  bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+  virtual bool invokeHeartBeat(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
 
   // delete outsite;
-  RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
+  virtual RemotingCommand* invokeSync(const string& addr, RemotingCommand& request, int timeoutMillis = 3000);
 
-  bool invokeAsync(const string& addr,
-                   RemotingCommand& request,
-                   AsyncCallbackWrap* cbw,
-                   int64 timeoutMilliseconds,
-                   int maxRetrySendTimes = 1,
-                   int retrySendTimes = 1);
+  virtual bool invokeAsync(const string& addr,
+                           RemotingCommand& request,
+                           AsyncCallbackWrap* cbw,
+                           int64 timeoutMilliseconds,
+                           int maxRetrySendTimes = 1,
+                           int retrySendTimes = 1);
 
-  void invokeOneway(const string& addr, RemotingCommand& request);
+  virtual void invokeOneway(const string& addr, RemotingCommand& request);
 
-  void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
+  virtual void registerProcessor(MQRequestCode requestCode, ClientRemotingProcessor* clientRemotingProcessor);
 
  private:
   static void static_messageReceived(void* context, const MemoryBlock& mem, const string& addr);
diff --git a/test/src/MQClientAPIImpTest.cpp b/test/src/MQClientAPIImpTest.cpp
new file mode 100644
index 0000000..cf63f67
--- /dev/null
+++ b/test/src/MQClientAPIImpTest.cpp
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "CommunicationMode.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+
+using namespace std;
+using namespace rocketmq;
+using rocketmq::CommunicationMode;
+using rocketmq::RemotingCommand;
+using rocketmq::TcpRemotingClient;
+using testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Mock;
+using testing::Return;
+
+class MockTcpRemotingClient : public TcpRemotingClient {
+ public:
+  MockTcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
+      : TcpRemotingClient(pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout) {}
+
+  MOCK_METHOD3(invokeSync, RemotingCommand*(const string&, RemotingCommand&, int));
+};
+class MockMQClientAPIImpl : public MQClientAPIImpl {
+ public:
+  MockMQClientAPIImpl(const string& mqClientId,
+                      ClientRemotingProcessor* clientRemotingProcessor,
+                      int pullThreadNum,
+                      uint64_t tcpConnectTimeout,
+                      uint64_t tcpTransportTryLockTimeout,
+                      string unitName)
+      : MQClientAPIImpl(mqClientId,
+                        clientRemotingProcessor,
+                        pullThreadNum,
+                        tcpConnectTimeout,
+                        tcpTransportTryLockTimeout,
+                        unitName) {
+    m_processor = clientRemotingProcessor;
+  }
+  ClientRemotingProcessor* m_processor;
+  void reInitRemoteClient(TcpRemotingClient* client) {
+    m_pRemotingClient.reset(client);
+    m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE, m_processor);
+    m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET, m_processor);
+    m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT, m_processor);
+    m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO, m_processor);
+    m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED, m_processor);
+    m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY, m_processor);
+  }
+};
+class MockMQClientAPIImplUtil {
+ public:
+  static MockMQClientAPIImplUtil* GetInstance() {
+    static MockMQClientAPIImplUtil instance;
+    return &instance;
+  }
+  MockMQClientAPIImpl* GetGtestMockClientAPIImpl() {
+    if (m_impl != nullptr) {
+      return m_impl;
+    }
+    string cid = "testClientId";
+    int ptN = 1;
+    uint64_t tct = 3000;
+    uint64_t ttt = 3000;
+    string un = "central";
+    SessionCredentials sc;
+    ClientRemotingProcessor* pp = new ClientRemotingProcessor(nullptr);
+    MockMQClientAPIImpl* impl = new MockMQClientAPIImpl(cid, pp, ptN, tct, ttt, un);
+    MockTcpRemotingClient* pClient = new MockTcpRemotingClient(ptN, tct, ttt);
+    impl->reInitRemoteClient(pClient);
+    m_impl = impl;
+    m_pClient = pClient;
+    return impl;
+  }
+  MockTcpRemotingClient* GetGtestMockRemotingClient() { return m_pClient; }
+  MockMQClientAPIImpl* m_impl = nullptr;
+  MockTcpRemotingClient* m_pClient = nullptr;
+};
+
+TEST(MQClientAPIImplTest, getMaxOffset) {
+  SessionCredentials sc;
+  MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+  Mock::AllowLeak(impl);
+  MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+  Mock::AllowLeak(pClient);
+  GetMaxOffsetResponseHeader* pHead = new GetMaxOffsetResponseHeader();
+  pHead->offset = 4096;
+  RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
+  RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, pHead);
+  EXPECT_CALL(*pClient, invokeSync(_, _, _))
+      .Times(3)
+      .WillOnce(Return(nullptr))
+      .WillOnce(Return(pCommandFailed))
+      .WillOnce(Return(pCommandSuccuss));
+  EXPECT_ANY_THROW(impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+  EXPECT_ANY_THROW(impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+  int64 offset = impl->getMaxOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
+  EXPECT_EQ(4096, offset);
+}
+
+TEST(MQClientAPIImplTest, getMinOffset) {
+  SessionCredentials sc;
+  MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+  Mock::AllowLeak(impl);
+  MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+  Mock::AllowLeak(pClient);
+  GetMinOffsetResponseHeader* pHead = new GetMinOffsetResponseHeader();
+  pHead->offset = 2048;
+  RemotingCommand* pCommandFailed = new RemotingCommand(SYSTEM_ERROR, nullptr);
+  RemotingCommand* pCommandSuccuss = new RemotingCommand(SUCCESS_VALUE, pHead);
+  EXPECT_CALL(*pClient, invokeSync(_, _, _))
+      .Times(3)
+      .WillOnce(Return(nullptr))
+      .WillOnce(Return(pCommandFailed))
+      .WillOnce(Return(pCommandSuccuss));
+  EXPECT_ANY_THROW(impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+  EXPECT_ANY_THROW(impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc));
+  int64 offset = impl->getMinOffset("127.0.0.0:10911", "testTopic", 0, 1000, sc);
+  EXPECT_EQ(2048, offset);
+}
+
+TEST(MQClientAPIImplTest, sendMessage) {
+  string cid = "testClientId";
+  SessionCredentials sc;
+  MockMQClientAPIImpl* impl = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockClientAPIImpl();
+  Mock::AllowLeak(impl);
+  MockTcpRemotingClient* pClient = MockMQClientAPIImplUtil::GetInstance()->GetGtestMockRemotingClient();
+  Mock::AllowLeak(pClient);
+
+  SendMessageResponseHeader* pHead = new SendMessageResponseHeader();
+  pHead->msgId = "MessageID";
+  pHead->queueId = 1;
+  pHead->queueOffset = 409600;
+  RemotingCommand* pCommandSync = new RemotingCommand(SUCCESS_VALUE, pHead);
+  EXPECT_CALL(*pClient, invokeSync(_, _, _)).Times(1).WillOnce(Return(pCommandSync));
+  MQMessage message("testTopic", "Hello, RocketMQ");
+  string unique_msgId = "UniqMessageID";
+  message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_msgId);
+  SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
+  requestHeader->producerGroup = cid;
+  requestHeader->topic = (message.getTopic());
+  requestHeader->defaultTopic = DEFAULT_TOPIC;
+  requestHeader->defaultTopicQueueNums = 4;
+  requestHeader->bornTimestamp = UtilAll::currentTimeMillis();
+  SendResult result =
+      impl->sendMessage("127.0.0.0:10911", "testBroker", message, requestHeader, 100, 1, ComMode_SYNC, nullptr, sc);
+  EXPECT_EQ(result.getSendStatus(), SEND_OK);
+  EXPECT_EQ(result.getMsgId(), unique_msgId);
+  EXPECT_EQ(result.getQueueOffset(), 409600);
+  EXPECT_EQ(result.getOffsetMsgId(), "MessageID");
+  EXPECT_EQ(result.getMessageQueue().getBrokerName(), "testBroker");
+  EXPECT_EQ(result.getMessageQueue().getTopic(), "testTopic");
+}
+int main(int argc, char* argv[]) {
+  InitGoogleMock(&argc, argv);
+  testing::GTEST_FLAG(filter) = "MQClientAPIImplTest.*";
+  return RUN_ALL_TESTS();
+}
diff --git a/test/src/MQClientFactoryTest.cpp b/test/src/MQClientFactoryTest.cpp
new file mode 100644
index 0000000..11490e5
--- /dev/null
+++ b/test/src/MQClientFactoryTest.cpp
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <map>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "MQClientFactory.h"
+
+using namespace std;
+using namespace rocketmq;
+using rocketmq::MQClientFactory;
+using rocketmq::TopicRouteData;
+using testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+class MockMQClientAPIImpl : public MQClientAPIImpl {
+ public:
+  MockMQClientAPIImpl(const string& mqClientId,
+                      ClientRemotingProcessor* clientRemotingProcessor,
+                      int pullThreadNum,
+                      uint64_t tcpConnectTimeout,
+                      uint64_t tcpTransportTryLockTimeout,
+                      string unitName)
+      : MQClientAPIImpl(mqClientId,
+                        clientRemotingProcessor,
+                        pullThreadNum,
+                        tcpConnectTimeout,
+                        tcpTransportTryLockTimeout,
+                        unitName) {}
+
+  MOCK_METHOD5(getMinOffset, int64(const string&, const string&, int, int, const SessionCredentials&));
+  MOCK_METHOD3(getTopicRouteInfoFromNameServer, TopicRouteData*(const string&, int, const SessionCredentials&));
+};
+class MockMQClientFactory : public MQClientFactory {
+ public:
+  MockMQClientFactory(const string& mqClientId,
+                      int pullThreadNum,
+                      uint64_t tcpConnectTimeout,
+                      uint64_t tcpTransportTryLockTimeout,
+                      string unitName)
+      : MQClientFactory(mqClientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName) {}
+  void reInitClientImpl(MQClientAPIImpl* pImpl) { m_pClientAPIImpl.reset(pImpl); }
+  void reInitRemotingProcessor(ClientRemotingProcessor* pImpl) { m_pClientRemotingProcessor.reset(pImpl); }
+  ClientRemotingProcessor* getRemotingProcessor() { return m_pClientRemotingProcessor.release(); }
+};
+
+TEST(MQClientFactoryTest, minOffset) {
+  string clientId = "testClientId";
+  int pullThreadNum = 1;
+  uint64_t tcpConnectTimeout = 3000;
+  uint64_t tcpTransportTryLockTimeout = 3000;
+  string unitName = "central";
+  MockMQClientFactory* factory =
+      new MockMQClientFactory(clientId, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
+  MockMQClientAPIImpl* pImpl = new MockMQClientAPIImpl(clientId, factory->getRemotingProcessor(), pullThreadNum,
+                                                       tcpConnectTimeout, tcpTransportTryLockTimeout, unitName);
+  factory->reInitClientImpl(pImpl);
+  MQMessageQueue mq;
+  mq.setTopic("testTopic");
+  mq.setBrokerName("testBroker");
+  mq.setQueueId(1);
+  SessionCredentials session_credentials;
+
+  TopicRouteData* pData = new TopicRouteData();
+  pData->setOrderTopicConf("OrderTopicConf");
+  QueueData qd;
+  qd.brokerName = "testBroker";
+  qd.readQueueNums = 8;
+  qd.writeQueueNums = 8;
+  qd.perm = 1;
+  pData->getQueueDatas().push_back(qd);
+  BrokerData bd;
+  bd.brokerName = "testBroker";
+  bd.brokerAddrs[0] = "127.0.0.1:10091";
+  bd.brokerAddrs[1] = "127.0.0.2:10092";
+  pData->getBrokerDatas().push_back(bd);
+
+  EXPECT_CALL(*pImpl, getMinOffset(_, _, _, _, _)).Times(1).WillOnce(Return(1024));
+  EXPECT_CALL(*pImpl, getTopicRouteInfoFromNameServer(_, _, _)).Times(1).WillOnce(Return(pData));
+  int64 offset = factory->minOffset(mq, session_credentials);
+  EXPECT_EQ(1024, offset);
+  delete factory;
+}
+
+int main(int argc, char* argv[]) {
+  InitGoogleMock(&argc, argv);
+  return RUN_ALL_TESTS();
+}


Mime
View raw message