rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: Consume message from slave if master is down. (#105)
Date Wed, 03 Apr 2019 02:25:00 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 46ee33b  Consume  message from slave if master is down. (#105)
46ee33b is described below

commit 46ee33b19e7c0ca84ed76fdc0c5feb932ac4928a
Author: James <ywhjames@hotmail.com>
AuthorDate: Wed Apr 3 10:24:56 2019 +0800

    Consume  message from slave if master is down. (#105)
    
    merged from MQClientInstance#findBrokerAddrByTopic, BrokerData#selectBrokerAddr and MQClientInstance#findBrokerAddressInSubscribe
in Java client.
---
 src/MQClientFactory.cpp       |  55 +++++-----
 src/protocol/TopicRouteData.h | 230 ++++++++++++++++++++----------------------
 2 files changed, 137 insertions(+), 148 deletions(-)

diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 4eccf34..2dd7bdb 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -631,35 +631,38 @@ string MQClientFactory::findBrokerAddressInPublish(const string&
brokerName) {
   return "";
 }
 
-FindBrokerResult* MQClientFactory::findBrokerAddressInSubscribe(
-    const string& brokerName, int brokerId, bool onlyThisBroker) {
-  string brokerAddr;
-  bool slave = false;
-  bool found = false;
-  BrokerAddrMAP brokerTable(getBrokerAddrMap());
-
-  if (brokerTable.find(brokerName) != brokerTable.end()) {
-    map<int, string> brokerMap(brokerTable[brokerName]);
-    map<int, string>::iterator it1 = brokerMap.find(brokerId);
-    if (it1 != brokerMap.end()) {
-      brokerAddr = it1->second;
-      slave = (brokerId != MASTER_ID);
-      found = true;
-    } else  // from master
-    {
-      it1 = brokerMap.find(MASTER_ID);
-      if (it1 != brokerMap.end()) {
-        brokerAddr = it1->second;
-        slave = false;
-        found = true;
-      }
+FindBrokerResult *MQClientFactory::findBrokerAddressInSubscribe(const string &brokerName,
+                                                                int brokerId,
+                                                                bool onlyThisBroker) {
+    string brokerAddr;
+    bool slave = false;
+    bool found = false;
+    BrokerAddrMAP brokerTable(getBrokerAddrMap());
+
+    if (brokerTable.find(brokerName) != brokerTable.end()) {
+        map<int, string> brokerMap(brokerTable[brokerName]);
+        if (!brokerMap.empty()) {
+            auto iter = brokerMap.find(brokerId);
+            if (iter != brokerMap.end()) {
+                brokerAddr = iter->second;
+                slave = (brokerId != MASTER_ID);
+                found = true;
+            } else if (!onlyThisBroker) {  // not only from master
+                iter = brokerMap.begin();
+                brokerAddr = iter->second;
+                slave = iter->first != MASTER_ID;
+                found = true;
+            }
+        }
     }
-  }
 
-  brokerTable.clear();
-  if (found) return new FindBrokerResult(brokerAddr, slave);
+    brokerTable.clear();
 
-  return NULL;
+    if (found) {
+        return new FindBrokerResult(brokerAddr, slave);
+    }
+
+    return nullptr;
 }
 
 FindBrokerResult* MQClientFactory::findBrokerAddressInAdmin(
diff --git a/src/protocol/TopicRouteData.h b/src/protocol/TopicRouteData.h
index ec8f842..9f70698 100755
--- a/src/protocol/TopicRouteData.h
+++ b/src/protocol/TopicRouteData.h
@@ -17,161 +17,147 @@
 #ifndef __TOPICROUTEDATA_H__
 #define __TOPICROUTEDATA_H__
 #include <algorithm>
+#include <cstdlib>
 #include "Logging.h"
 #include "UtilAll.h"
 #include "dataBlock.h"
 #include "json/json.h"
 
 namespace rocketmq {
+
 //<!***************************************************************************
 struct QueueData {
-  string brokerName;
-  int readQueueNums;
-  int writeQueueNums;
-  int perm;
-
-  bool operator<(const QueueData& other) const {
-    return brokerName < other.brokerName;
-  }
-
-  bool operator==(const QueueData& other) const {
-    if (brokerName == other.brokerName &&
-        readQueueNums == other.readQueueNums &&
-        writeQueueNums == other.writeQueueNums && perm == other.perm) {
-      return true;
+    std::string brokerName;
+    int readQueueNums;
+    int writeQueueNums;
+    int perm;
+
+    bool operator<(const QueueData &other) const { return brokerName < other.brokerName;
}
+
+    bool operator==(const QueueData &other) const {
+        return brokerName == other.brokerName && readQueueNums == other.readQueueNums
&&
+               writeQueueNums == other.writeQueueNums && perm == other.perm;
     }
-    return false;
-  }
 };
 
 //<!***************************************************************************
 struct BrokerData {
-  string brokerName;
-  map<int, string> brokerAddrs;  //<!0:master,1,2.. slave
+    std::string brokerName;
+    std::map<int, string> brokerAddrs;  //<!0:master,1,2.. slave
 
-  bool operator<(const BrokerData& other) const {
-    return brokerName < other.brokerName;
-  }
+    bool operator<(const BrokerData &other) const { return brokerName < other.brokerName;
}
 
-  bool operator==(const BrokerData& other) const {
-    if (brokerName == other.brokerName && brokerAddrs == other.brokerAddrs) {
-      return true;
+    bool operator==(const BrokerData &other) const {
+        return brokerName == other.brokerName && brokerAddrs == other.brokerAddrs;
     }
-    return false;
-  }
 };
 
 //<!************************************************************************/
 class TopicRouteData {
- public:
-  virtual ~TopicRouteData() {
-    m_brokerDatas.clear();
-    m_queueDatas.clear();
-  }
-
-  static TopicRouteData* Decode(const MemoryBlock* mem) {
-    //<!see doc/TopicRouteData.json;
-    const char* const pData = static_cast<const char*>(mem->getData());
-    string data(pData, mem->getSize());
-    
-    Json::Value root;
-    Json::CharReaderBuilder charReaderBuilder;
-    charReaderBuilder.settings_["allowNumericKeys"] = true;
-    unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader());
-    const char* begin = pData;
-    const char* end = pData + mem->getSize(); 
-    string errs;
-    if (!pCharReaderPtr->parse(begin, end, &root, &errs)) {
-      LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(), root.isArray(),
root.isObject());
-      return NULL;
+   public:
+    virtual ~TopicRouteData() {
+        m_brokerDatas.clear();
+        m_queueDatas.clear();
     }
 
-    TopicRouteData* trd = new TopicRouteData();
-    trd->setOrderTopicConf(root["orderTopicConf"].asString());
-
-    Json::Value qds = root["queueDatas"];
-    for (unsigned int i = 0; i < qds.size(); i++) {
-      QueueData d;
-      Json::Value qd = qds[i];
-      d.brokerName = qd["brokerName"].asString();
-      d.readQueueNums = qd["readQueueNums"].asInt();
-      d.writeQueueNums = qd["writeQueueNums"].asInt();
-      d.perm = qd["perm"].asInt();
-
-      trd->getQueueDatas().push_back(d);
+    static TopicRouteData *Decode(const MemoryBlock *mem) {
+        //<!see doc/TopicRouteData.json;
+        const char *const pData = static_cast<const char *>(mem->getData());
+        string data(pData, mem->getSize());
+
+        Json::CharReaderBuilder charReaderBuilder;
+        charReaderBuilder.settings_["allowNumericKeys"] = true;
+        unique_ptr<Json::CharReader> pCharReaderPtr(charReaderBuilder.newCharReader());
+
+        const char *begin = pData;
+        const char *end = pData + mem->getSize();
+        Json::Value root;
+        string errs;
+
+        if (!pCharReaderPtr->parse(begin, end, &root, &errs)) {
+            LOG_ERROR("parse json error:%s, value isArray:%d, isObject:%d", errs.c_str(),
root.isArray(),
+                      root.isObject());
+            return nullptr;
+        }
+
+        auto *trd = new TopicRouteData();
+        trd->setOrderTopicConf(root["orderTopicConf"].asString());
+
+        Json::Value qds = root["queueDatas"];
+        for (auto qd : qds) {
+            QueueData d;
+            d.brokerName = qd["brokerName"].asString();
+            d.readQueueNums = qd["readQueueNums"].asInt();
+            d.writeQueueNums = qd["writeQueueNums"].asInt();
+            d.perm = qd["perm"].asInt();
+            trd->getQueueDatas().push_back(d);
+        }
+        sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end());
+
+        Json::Value bds = root["brokerDatas"];
+        for (auto bd : bds) {
+            BrokerData d;
+            d.brokerName = bd["brokerName"].asString();
+            LOG_DEBUG("brokerName:%s", d.brokerName.c_str());
+            Json::Value bas = bd["brokerAddrs"];
+            Json::Value::Members mbs = bas.getMemberNames();
+            for (const auto &key : mbs) {
+                int id = atoi(key.c_str());
+                string addr = bas[key].asString();
+                d.brokerAddrs[id] = addr;
+                LOG_DEBUG("brokerId:%d, brokerAddr:%s", id, addr.c_str());
+            }
+            trd->getBrokerDatas().push_back(d);
+        }
+        sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end());
+
+        return trd;
     }
 
-    sort(trd->getQueueDatas().begin(), trd->getQueueDatas().end());
-
-    Json::Value bds = root["brokerDatas"];
-    for (unsigned int i = 0; i < bds.size(); i++) {
-      BrokerData d;
-      Json::Value bd = bds[i];
-      d.brokerName = bd["brokerName"].asString();
-
-      LOG_DEBUG("brokerName:%s", d.brokerName.c_str());
-
-      Json::Value bas = bd["brokerAddrs"];
-      Json::Value::Members mbs = bas.getMemberNames();
-      for (size_t i = 0; i < mbs.size(); i++) {
-        string key = mbs.at(i);
-        LOG_DEBUG("brokerid:%s,brokerAddr:%s", key.c_str(),
-                  bas[key].asString().c_str());
-        d.brokerAddrs[atoi(key.c_str())] = bas[key].asString();
-      }
-
-      trd->getBrokerDatas().push_back(d);
+    /**
+     * Selects a (preferably master) broker address from the registered list.
+     * If the master's address cannot be found, a slave broker address is selected in a random
manner.
+     *
+     * @return Broker address.
+     */
+    std::string selectBrokerAddr() {
+        int bdSize = m_brokerDatas.size();
+        if (bdSize > 0) {
+            int bdIndex = std::rand() % bdSize;
+            auto bd = m_brokerDatas[bdIndex];
+            auto iter = bd.brokerAddrs.find(MASTER_ID);
+            if (iter == bd.brokerAddrs.end()) {
+                int baSize = bd.brokerAddrs.size();
+                int baIndex = std::rand() % baSize;
+                iter = bd.brokerAddrs.begin();
+                for (; baIndex > 0; baIndex--) {
+                    iter++;
+                }
+            }
+            return iter->second;
+        }
+        return "";
     }
 
-    sort(trd->getBrokerDatas().begin(), trd->getBrokerDatas().end());
+    std::vector<QueueData> &getQueueDatas() { return m_queueDatas; }
 
-    return trd;
-  }
+    std::vector<BrokerData> &getBrokerDatas() { return m_brokerDatas; }
 
-  string selectBrokerAddr() {
-    vector<BrokerData>::iterator it = m_brokerDatas.begin();
-    for (; it != m_brokerDatas.end(); ++it) {
-      map<int, string>::iterator it1 = (*it).brokerAddrs.find(MASTER_ID);
-      if (it1 != (*it).brokerAddrs.end()) {
-        return it1->second;
-      }
-    }
-    return "";
-  }
-
-
-  vector<QueueData>& getQueueDatas() { return m_queueDatas; }
-
-  vector<BrokerData>& getBrokerDatas() { return m_brokerDatas; }
+    const std::string &getOrderTopicConf() const { return m_orderTopicConf; }
 
-  const string& getOrderTopicConf() const { return m_orderTopicConf; }
+    void setOrderTopicConf(const string &orderTopicConf) { m_orderTopicConf = orderTopicConf;
}
 
-  void setOrderTopicConf(const string& orderTopicConf) {
-    m_orderTopicConf = orderTopicConf;
-  }
-
-  bool operator==(const TopicRouteData& other) const {
-    if (m_brokerDatas != other.m_brokerDatas) {
-      return false;
-    }
-
-    if (m_orderTopicConf != other.m_orderTopicConf) {
-      return false;
+    bool operator==(const TopicRouteData &other) const {
+        return m_brokerDatas == other.m_brokerDatas && m_orderTopicConf == other.m_orderTopicConf
&&
+               m_queueDatas == other.m_queueDatas;
     }
 
-    if (m_queueDatas != other.m_queueDatas) {
-      return false;
-    }
-    return true;
-  }
-
- public:
- private:
-  string m_orderTopicConf;
-  vector<QueueData> m_queueDatas;
-  vector<BrokerData> m_brokerDatas;
+   private:
+    std::string m_orderTopicConf;
+    std::vector<QueueData> m_queueDatas;
+    std::vector<BrokerData> m_brokerDatas;
 };
 
-}  //<!end namespace;
+}  // namespace rocketmq
 
 #endif


Mime
View raw message