rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: feat(namespace): add name space support (#207)
Date Tue, 31 Dec 2019 03:02:32 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new d321677  feat(namespace): add name space support (#207)
d321677 is described below

commit d3216777d010b984f2f3e4b8093b7a022a4db20c
Author: dinglei <libya_003@163.com>
AuthorDate: Tue Dec 31 11:02:23 2019 +0800

    feat(namespace): add name space support (#207)
    
    * Add name space util support
    
    * feat(namespace): add name space support
    
    * feat(namespace): add name space support
    
    * fix(travisci): fix format error in supportting name space
---
 include/DefaultMQProducer.h                        |  1 +
 include/DefaultMQPullConsumer.h                    |  1 +
 include/DefaultMQPushConsumer.h                    |  1 +
 include/MQClient.h                                 |  7 +--
 include/MQConsumer.h                               |  2 +
 src/common/MQClient.cpp                            | 11 +++-
 src/common/MessageAccessor.cpp                     | 56 ++++++++++++++++++++
 src/common/{NameSpaceUtil.h => MessageAccessor.h}  | 25 +++++----
 src/common/NameSpaceUtil.cpp                       | 52 +++++++++++++++++++
 src/common/NameSpaceUtil.h                         | 14 +++++
 src/consumer/ConsumeMessageConcurrentlyService.cpp |  6 +++
 src/consumer/ConsumeMessageOrderlyService.cpp      |  5 ++
 src/consumer/DefaultMQPullConsumer.cpp             | 48 ++++++++++++++---
 src/consumer/DefaultMQPushConsumer.cpp             | 37 +++++++++++++
 src/producer/DefaultMQProducer.cpp                 | 60 +++++++++++++++++++++-
 15 files changed, 301 insertions(+), 25 deletions(-)

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index fd657ad..630e765 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -102,6 +102,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
                             SendCallback* pSendCallback);
   bool tryToCompressMessage(MQMessage& msg);
   BatchMessage buildBatchMessage(std::vector<MQMessage>& msgs);
+  bool dealWithNameSpace();
 
  private:
   int m_sendMsgTimeout;
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index af01941..939ce0e 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -123,6 +123,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
  private:
   void checkConfig();
   void copySubscription();
+  bool dealWithNameSpace();
 
   PullResult pullSyncImpl(const MQMessageQueue& mq,
                           const std::string& subExpression,
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b6de085..894c6b5 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -130,6 +130,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
   void checkConfig();
   void copySubscription();
   void updateTopicSubscribeInfoWhenSubscriptionChanged();
+  bool dealWithNameSpace();
 
  private:
   uint64_t m_startTime;
diff --git a/include/MQClient.h b/include/MQClient.h
index 5632c95..0e23339 100644
--- a/include/MQClient.h
+++ b/include/MQClient.h
@@ -57,7 +57,9 @@ class ROCKETMQCLIENT_API MQClient {
   void setNamesrvDomain(const std::string& namesrvDomain);
   const std::string& getInstanceName() const;
   void setInstanceName(const std::string& instanceName);
-  //<!groupName;
+  // nameSpace
+  const std::string& getNameSpace() const;
+  void setNameSpace(const std::string& nameSpace);
   const std::string& getGroupName() const;
   void setGroupName(const std::string& groupname);
 
@@ -184,9 +186,8 @@ class ROCKETMQCLIENT_API MQClient {
   std::string m_namesrvAddr;
   std::string m_namesrvDomain;
   std::string m_instanceName;
-  //<!  the name is globle only
+  std::string m_nameSpace;
   std::string m_GroupName;
-  //<!factory;
   MQClientFactory* m_clientFactory;
   int m_serviceState;
   int m_pullThreadNum;
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index b6cd613..48d78f2 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -56,9 +56,11 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient {
  public:
   MessageModel getMessageModel() const { return m_messageModel; }
   void setMessageModel(MessageModel messageModel) { m_messageModel = messageModel; }
+  bool isUseNameSpaceMode() const { return m_useNameSpaceMode; }
 
  protected:
   MessageModel m_messageModel;
+  bool m_useNameSpaceMode = false;
 };
 
 //<!***************************************************************************
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index 8eb3b13..afdc5fa 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -19,9 +19,9 @@
 #include "Logging.h"
 #include "MQClientFactory.h"
 #include "MQClientManager.h"
+#include "NameSpaceUtil.h"
 #include "TopicPublishInfo.h"
 #include "UtilAll.h"
-#include "NameSpaceUtil.h"
 
 namespace rocketmq {
 
@@ -39,6 +39,7 @@ MQClient::MQClient() {
     m_namesrvAddr = "";
 
   m_instanceName = "DEFAULT";
+  m_nameSpace = "";
   m_clientFactory = NULL;
   m_serviceState = CREATE_JUST;
   m_pullThreadNum = std::thread::hardware_concurrency();
@@ -87,7 +88,13 @@ const string& MQClient::getInstanceName() const {
 void MQClient::setInstanceName(const string& instanceName) {
   m_instanceName = instanceName;
 }
+const string& MQClient::getNameSpace() const {
+  return m_nameSpace;
+}
 
+void MQClient::setNameSpace(const string& nameSpace) {
+  m_nameSpace = nameSpace;
+}
 void MQClient::createTopic(const string& key, const string& newTopic, int queueNum)
{
   try {
     getFactory()->createTopic(key, newTopic, queueNum, m_SessionCredentials);
@@ -212,4 +219,4 @@ const SessionCredentials& MQClient::getSessionCredentials() const
{
 }
 
 //<!************************************************************************
-}  //<!end namespace;
+}  // namespace rocketmq
diff --git a/src/common/MessageAccessor.cpp b/src/common/MessageAccessor.cpp
new file mode 100644
index 0000000..6fcfac1
--- /dev/null
+++ b/src/common/MessageAccessor.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MessageAccessor.h"
+#include <functional>
+#include <vector>
+#include "Logging.h"
+#include "NameSpaceUtil.h"
+
+using namespace std;
+namespace rocketmq {
+
+void MessageAccessor::withNameSpace(MQMessage& msg, const string nameSpace) {
+  if (!nameSpace.empty()) {
+    string originTopic = msg.getTopic();
+    string newTopic = nameSpace + NAMESPACE_SPLIT_FLAG + originTopic;
+    msg.setTopic(newTopic);
+  }
+}
+
+void MessageAccessor::withoutNameSpaceSingle(MQMessageExt& msg, const string nameSpace)
{
+  if (!nameSpace.empty()) {
+    string originTopic = msg.getTopic();
+    auto index = originTopic.find(nameSpace);
+    if (index != string::npos) {
+      string newTopic =
+          originTopic.substr(index + nameSpace.length() + NAMESPACE_SPLIT_FLAG.length(),
originTopic.length());
+      msg.setTopic(newTopic);
+      LOG_DEBUG("Find Name Space Prefix in MessageID[%s], OriginTopic[%s], NewTopic[%s]",
msg.getMsgId().c_str(),
+                originTopic.c_str(), newTopic.c_str());
+    }
+  }
+}
+void MessageAccessor::withoutNameSpace(vector<MQMessageExt>& msgs, const string
nameSpace) {
+  if (!nameSpace.empty()) {
+    // for_each(msgs.cbegin(), msgs.cend(), bind2nd(&MessageAccessor::withoutNameSpaceSingle,
nameSpace));
+    for (auto iter = msgs.begin(); iter != msgs.end(); iter++) {
+      withoutNameSpaceSingle(*iter, nameSpace);
+    }
+  }
+}
+//<!***************************************************************************
+}  // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/MessageAccessor.h
similarity index 60%
copy from src/common/NameSpaceUtil.h
copy to src/common/MessageAccessor.h
index 1d4afcf..af42021 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/MessageAccessor.h
@@ -14,23 +14,22 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-#ifndef __NAMESPACEUTIL_H__
-#define __NAMESPACEUTIL_H__
+#ifndef __MESSAGE_ACCESSOR_H__
+#define __MESSAGE_ACCESSOR_H__
 
 #include <string>
-
-using namespace std;
-
-static const string ENDPOINT_PREFIX = "http://";
-static const unsigned int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+#include <vector>
+#include "MQMessage.h"
+#include "MQMessageExt.h"
 namespace rocketmq {
-class NameSpaceUtil {
+//<!***************************************************************************
+class MessageAccessor {
  public:
-  static bool isEndPointURL(string nameServerAddr);
-
-  static string formatNameServerURL(string nameServerAddr);
+  static void withNameSpace(MQMessage& msg, const std::string nameSpace);
+  static void withoutNameSpaceSingle(MQMessageExt& msg, const std::string nameSpace);
+  static void withoutNameSpace(std::vector<MQMessageExt>& msgs, const std::string
nameSpace);
 };
 
+//<!***************************************************************************
 }  // namespace rocketmq
-#endif  //__NAMESPACEUTIL_H__
+#endif
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
index ff44a00..118bd4f 100644
--- a/src/common/NameSpaceUtil.cpp
+++ b/src/common/NameSpaceUtil.cpp
@@ -36,4 +36,56 @@ string NameSpaceUtil::formatNameServerURL(string nameServerAddr) {
   }
   return nameServerAddr;
 }
+
+string NameSpaceUtil::getNameSpaceFromNsURL(string nameServerAddr) {
+  LOG_DEBUG("Try to get Name Space from nameServerAddr [%s]", nameServerAddr.c_str());
+  string nsAddr = formatNameServerURL(nameServerAddr);
+  string nameSpace;
+  auto index = nameServerAddr.find(NAMESPACE_PREFIX);
+  if (index != string::npos) {
+    auto indexDot = nameServerAddr.find('.');
+    if (indexDot != string::npos) {
+      nameSpace = nameServerAddr.substr(index, indexDot);
+      LOG_INFO("Get Name Space [%s] from nameServerAddr [%s]", nameSpace.c_str(), nameServerAddr.c_str());
+      return nameSpace;
+    }
+  }
+  return "";
+}
+
+bool NameSpaceUtil::checkNameSpaceExistInNsURL(string nameServerAddr) {
+  if (!isEndPointURL(nameServerAddr)) {
+    LOG_DEBUG("This nameServerAddr [%s] is not a endpoint. should not get Name Space.", nameServerAddr.c_str());
+    return false;
+  }
+  auto index = nameServerAddr.find(NAMESPACE_PREFIX);
+  if (index != string::npos) {
+    LOG_INFO("Find Name Space Prefix in nameServerAddr [%s]", nameServerAddr.c_str());
+    return true;
+  }
+  return false;
+}
+
+bool NameSpaceUtil::checkNameSpaceExistInNameServer(string nameServerAddr) {
+  auto index = nameServerAddr.find(NAMESPACE_PREFIX);
+  if (index != string::npos) {
+    LOG_INFO("Find Name Space Prefix in nameServerAddr [%s]", nameServerAddr.c_str());
+    return true;
+  }
+  return false;
+}
+
+string NameSpaceUtil::withNameSpace(string source, string ns) {
+  if (!ns.empty()) {
+    return ns + NAMESPACE_SPLIT_FLAG + source;
+  }
+  return source;
+}
+
+bool NameSpaceUtil::hasNameSpace(string source, string ns) {
+  if (source.length() >= ns.length() && source.find(ns) != string::npos) {
+    return true;
+  }
+  return false;
+}
 }  // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h
index 1d4afcf..a63d647 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/NameSpaceUtil.h
@@ -24,12 +24,26 @@ using namespace std;
 
 static const string ENDPOINT_PREFIX = "http://";
 static const unsigned int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+static const string NAMESPACE_PREFIX = "MQ_INST_";
+static const int NAMESPACE_PREFIX_LENGTH = NAMESPACE_PREFIX.length();
+static const string NAMESPACE_SPLIT_FLAG = "%";
+
 namespace rocketmq {
 class NameSpaceUtil {
  public:
   static bool isEndPointURL(string nameServerAddr);
 
   static string formatNameServerURL(string nameServerAddr);
+
+  static string getNameSpaceFromNsURL(string nameServerAddr);
+
+  static bool checkNameSpaceExistInNsURL(string nameServerAddr);
+
+  static bool checkNameSpaceExistInNameServer(string nameServerAddr);
+
+  static string withNameSpace(string source, string ns);
+
+  static bool hasNameSpace(string source, string ns);
 };
 
 }  // namespace rocketmq
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index e5df16e..3b0204a 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -20,6 +20,7 @@
 #include "ConsumeMsgService.h"
 #include "DefaultMQPushConsumer.h"
 #include "Logging.h"
+#include "MessageAccessor.h"
 #include "UtilAll.h"
 namespace rocketmq {
 
@@ -98,6 +99,11 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
   if (m_pMessageListener != NULL) {
     resetRetryTopic(msgs);
     request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
+    LOG_DEBUG("=====Receive Messages:[%s][%s][%s]", msgs[0].getTopic().c_str(), msgs[0].getMsgId().c_str(),
+              msgs[0].getBody().c_str());
+    if (m_pConsumer->isUseNameSpaceMode()) {
+      MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
+    }
     status = m_pMessageListener->consumeMessage(msgs);
   }
 
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp b/src/consumer/ConsumeMessageOrderlyService.cpp
index f68fe44..4d06e5f 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -17,6 +17,8 @@
 #if !defined(WIN32) && !defined(__APPLE__)
 #include <sys/prctl.h>
 #endif
+
+#include <MessageAccessor.h>
 #include "ConsumeMsgService.h"
 #include "DefaultMQPushConsumer.h"
 #include "Logging.h"
@@ -181,6 +183,9 @@ void ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest>
p
         request->takeMessages(msgs, pConsumer->getConsumeMessageBatchMaxSize());
         if (!msgs.empty()) {
           request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
+          if (m_pConsumer->isUseNameSpaceMode()) {
+            MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
+          }
           ConsumeStatus consumeStatus = m_pMessageListener->consumeMessage(msgs);
           if (consumeStatus == RECONSUME_LATER) {
             request->makeMessageToCosumeAgain(msgs);
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index d3ce978..f549883 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -20,10 +20,9 @@
 #include "CommunicationMode.h"
 #include "FilterAPI.h"
 #include "Logging.h"
-#include "MQClientAPIImpl.h"
 #include "MQClientFactory.h"
-#include "MQClientManager.h"
-#include "MQProtos.h"
+#include "MessageAccessor.h"
+#include "NameSpaceUtil.h"
 #include "OffsetStore.h"
 #include "PullAPIWrapper.h"
 #include "PullSysFlag.h"
@@ -64,6 +63,7 @@ void DefaultMQPullConsumer::start() {
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
 #endif
+  dealWithNameSpace();
   switch (m_serviceState) {
     case CREATE_JUST: {
       m_serviceState = START_FAILED;
@@ -147,7 +147,8 @@ void DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int
delayLevel) {
 void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const string& topic, vector<MQMessageQueue>&
mqs) {
   mqs.clear();
   try {
-    getFactory()->fetchSubscribeMessageQueues(topic, mqs, getSessionCredentials());
+    const string localTopic = NameSpaceUtil::withNameSpace(topic, getNameSpace());
+    getFactory()->fetchSubscribeMessageQueues(localTopic, mqs, getSessionCredentials());
   } catch (MQException& e) {
     LOG_ERROR(e.what());
   }
@@ -226,7 +227,11 @@ PullResult DefaultMQPullConsumer::pullSyncImpl(const MQMessageQueue&
mq,
                                                                         ComMode_SYNC,   
        // 10
                                                                         NULL,           
        //<!callback;
                                                                         getSessionCredentials(),
NULL));
-    return m_pPullAPIWrapper->processPullResult(mq, pullResult.get(), pSData.get());
+    PullResult pr = m_pPullAPIWrapper->processPullResult(mq, pullResult.get(), pSData.get());
+    if (m_useNameSpaceMode) {
+      MessageAccessor::withoutNameSpace(pr.msgFoundList, m_nameSpace);
+    }
+    return pr;
   } catch (MQException& e) {
     LOG_ERROR(e.what());
   }
@@ -265,6 +270,7 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
   arg.pPullWrapper = m_pPullAPIWrapper;
 
   try {
+    // not support name space
     unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq,    
                 // 1
                                                                         pSData->getSubString(),
 // 2
                                                                         0L,             
        // 3
@@ -376,6 +382,36 @@ bool DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest>
pull
 Rebalance* DefaultMQPullConsumer::getRebalance() const {
   return NULL;
 }
-
+// we should deal with name space before producer start.
+bool DefaultMQPullConsumer::dealWithNameSpace() {
+  string ns = getNameSpace();
+  if (ns.empty()) {
+    string nsAddr = getNamesrvAddr();
+    if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
+      return true;
+    }
+    ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
+    // reset namespace
+    setNameSpace(ns);
+  }
+  // reset group name
+  if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
+    string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
+    setGroupName(fullGID);
+  }
+  set<string> tmpTopics;
+  for (auto iter = m_registerTopics.begin(); iter != m_registerTopics.end(); iter++) {
+    string topic = *iter;
+    if (!NameSpaceUtil::hasNameSpace(topic, ns)) {
+      LOG_INFO("Update Subscribe Topic[%s] with NameSpace:%s", topic.c_str(), ns.c_str());
+      topic = NameSpaceUtil::withNameSpace(topic, ns);
+      // let other mode to known, the name space model opened.
+      m_useNameSpaceMode = true;
+    }
+    tmpTopics.insert(topic);
+  }
+  m_registerTopics.swap(tmpTopics);
+  return true;
+}
 //<!************************************************************************
 }  // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 0cd5427..c6ab2b2 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -23,6 +23,7 @@
 #include "Logging.h"
 #include "MQClientAPIImpl.h"
 #include "MQClientFactory.h"
+#include "NameSpaceUtil.h"
 #include "OffsetStore.h"
 #include "PullAPIWrapper.h"
 #include "PullSysFlag.h"
@@ -206,6 +207,7 @@ DefaultMQPushConsumer::DefaultMQPushConsumer(const string& groupname)
   string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
   setGroupName(gname);
   m_asyncPull = true;
+  m_useNameSpaceMode = false;
   m_asyncPullTimeout = 30 * 1000;
   setMessageModel(CLUSTERING);
 
@@ -298,6 +300,8 @@ void DefaultMQPushConsumer::start() {
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
 #endif
+  // deal with name space before start
+  dealWithNameSpace();
   switch (m_serviceState) {
     case CREATE_JUST: {
       m_serviceState = START_FAILED;
@@ -972,6 +976,39 @@ ConsumerRunningInfo* DefaultMQPushConsumer::getConsumerRunningInfo()
{
 
   return info;
 }
+// we should deal with name space before producer start.
+bool DefaultMQPushConsumer::dealWithNameSpace() {
+  string ns = getNameSpace();
+  if (ns.empty()) {
+    string nsAddr = getNamesrvAddr();
+    if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
+      return true;
+    }
+    ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
+    // reset namespace
+    setNameSpace(ns);
+  }
+  // reset group name
+  if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
+    string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
+    setGroupName(fullGID);
+  }
+  map<string, string> subTmp;
+  map<string, string>::iterator it = m_subTopics.begin();
+  for (; it != m_subTopics.end(); ++it) {
+    string topic = it->first;
+    string subs = it->second;
+    if (!NameSpaceUtil::hasNameSpace(topic, ns)) {
+      LOG_INFO("Update Subscribe[%s:%s] with NameSpace:%s", it->first.c_str(), it->second.c_str(),
ns.c_str());
+      topic = NameSpaceUtil::withNameSpace(topic, ns);
+      // let other mode to known, the name space model opened.
+      m_useNameSpaceMode = true;
+    }
+    subTmp[topic] = subs;
+  }
+  m_subTopics.swap(subTmp);
 
+  return true;
+}
 //<!************************************************************************
 }  // namespace rocketmq
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 980a373..bd9cbc0 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -30,6 +30,8 @@
 #include "MQClientManager.h"
 #include "MQDecoder.h"
 #include "MQProtos.h"
+#include "MessageAccessor.h"
+#include "NameSpaceUtil.h"
 #include "StringIdMaker.h"
 #include "TopicPublishInfo.h"
 #include "Validators.h"
@@ -61,7 +63,8 @@ void DefaultMQProducer::start() {
   sa.sa_flags = 0;
   sigaction(SIGPIPE, &sa, 0);
 #endif
-
+  // we should deal with namespaced before start.
+  dealWithNameSpace();
   switch (m_serviceState) {
     case CREATE_JUST: {
       m_serviceState = START_FAILED;
@@ -109,6 +112,9 @@ void DefaultMQProducer::shutdown() {
 
 SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) {
   Validators::checkMessage(msg, getMaxMessageSize());
+  if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+    MessageAccessor::withNameSpace(msg, getNameSpace());
+  }
   try {
     return sendDefaultImpl(msg, ComMode_SYNC, NULL, bSelectActiveBroker);
   } catch (MQException& e) {
@@ -120,6 +126,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker)
{
 
 void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback, bool bSelectActiveBroker)
{
   Validators::checkMessage(msg, getMaxMessageSize());
+  if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+    MessageAccessor::withNameSpace(msg, getNameSpace());
+  }
   try {
     sendDefaultImpl(msg, ComMode_ASYNC, pSendCallback, bSelectActiveBroker);
   } catch (MQException& e) {
@@ -162,6 +171,9 @@ BatchMessage DefaultMQProducer::buildBatchMessage(std::vector<MQMessage>&
msgs)
   bool waitStoreMsgOK = false;
   for (auto& msg : msgs) {
     Validators::checkMessage(msg, getMaxMessageSize());
+    if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+      MessageAccessor::withNameSpace(msg, getNameSpace());
+    }
     if (firstFlag) {
       topic = msg.getTopic();
       waitStoreMsgOK = msg.isWaitStoreMsgOK();
@@ -190,6 +202,9 @@ BatchMessage DefaultMQProducer::buildBatchMessage(std::vector<MQMessage>&
msgs)
 
 SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) {
   Validators::checkMessage(msg, getMaxMessageSize());
+  if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+    MessageAccessor::withNameSpace(msg, getNameSpace());
+  }
   if (msg.getTopic() != mq.getTopic()) {
     LOG_WARN("message's topic not equal mq's topic");
   }
@@ -204,6 +219,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue&
mq) {
 
 void DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq, SendCallback*
pSendCallback) {
   Validators::checkMessage(msg, getMaxMessageSize());
+  if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+    MessageAccessor::withNameSpace(msg, getNameSpace());
+  }
   if (msg.getTopic() != mq.getTopic()) {
     LOG_WARN("message's topic not equal mq's topic");
   }
@@ -217,6 +235,9 @@ void DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue&
mq, SendCallb
 
 void DefaultMQProducer::sendOneway(MQMessage& msg, bool bSelectActiveBroker) {
   Validators::checkMessage(msg, getMaxMessageSize());
+  if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+    MessageAccessor::withNameSpace(msg, getNameSpace());
+  }
   try {
     sendDefaultImpl(msg, ComMode_ONEWAY, NULL, bSelectActiveBroker);
   } catch (MQException& e) {
@@ -227,6 +248,9 @@ void DefaultMQProducer::sendOneway(MQMessage& msg, bool bSelectActiveBroker)
{
 
 void DefaultMQProducer::sendOneway(MQMessage& msg, const MQMessageQueue& mq) {
   Validators::checkMessage(msg, getMaxMessageSize());
+  if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+    MessageAccessor::withNameSpace(msg, getNameSpace());
+  }
   if (msg.getTopic() != mq.getTopic()) {
     LOG_WARN("message's topic not equal mq's topic");
   }
@@ -240,6 +264,9 @@ void DefaultMQProducer::sendOneway(MQMessage& msg, const MQMessageQueue&
mq) {
 
 SendResult DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector* pSelector, void*
arg) {
   try {
+    if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+      MessageAccessor::withNameSpace(msg, getNameSpace());
+    }
     return sendSelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL);
   } catch (MQException& e) {
     LOG_ERROR(e.what());
@@ -254,6 +281,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg,
                                    int autoRetryTimes,
                                    bool bActiveBroker) {
   try {
+    if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+      MessageAccessor::withNameSpace(msg, getNameSpace());
+    }
     return sendAutoRetrySelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL, autoRetryTimes,
bActiveBroker);
   } catch (MQException& e) {
     LOG_ERROR(e.what());
@@ -264,6 +294,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg,
 
 void DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector* pSelector, void* arg,
SendCallback* pSendCallback) {
   try {
+    if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+      MessageAccessor::withNameSpace(msg, getNameSpace());
+    }
     sendSelectImpl(msg, pSelector, arg, ComMode_ASYNC, pSendCallback);
   } catch (MQException& e) {
     LOG_ERROR(e.what());
@@ -273,6 +306,9 @@ void DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector*
pSelector, vo
 
 void DefaultMQProducer::sendOneway(MQMessage& msg, MessageQueueSelector* pSelector, void*
arg) {
   try {
+    if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+      MessageAccessor::withNameSpace(msg, getNameSpace());
+    }
     sendSelectImpl(msg, pSelector, arg, ComMode_ONEWAY, NULL);
   } catch (MQException& e) {
     LOG_ERROR(e.what());
@@ -535,9 +571,11 @@ bool DefaultMQProducer::tryToCompressMessage(MQMessage& msg) {
 
   return false;
 }
+
 int DefaultMQProducer::getRetryTimes() const {
   return m_retryTimes;
 }
+
 void DefaultMQProducer::setRetryTimes(int times) {
   if (times <= 0) {
     LOG_WARN("set retry times illegal, use default value:5");
@@ -556,6 +594,7 @@ void DefaultMQProducer::setRetryTimes(int times) {
 int DefaultMQProducer::getRetryTimes4Async() const {
   return m_retryTimes4Async;
 }
+
 void DefaultMQProducer::setRetryTimes4Async(int times) {
   if (times <= 0) {
     LOG_WARN("set retry times illegal, use default value:1");
@@ -572,5 +611,24 @@ void DefaultMQProducer::setRetryTimes4Async(int times) {
   m_retryTimes4Async = times;
 }
 
+// we should deal with name space before producer start.
+bool DefaultMQProducer::dealWithNameSpace() {
+  string ns = getNameSpace();
+  if (ns.empty()) {
+    string nsAddr = getNamesrvAddr();
+    if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
+      return true;
+    }
+    ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
+    // reset namespace
+    setNameSpace(ns);
+  }
+  // reset group name
+  if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
+    string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
+    setGroupName(fullGID);
+  }
+  return true;
+}
 //<!***************************************************************************
 }  // namespace rocketmq


Mime
View raw message