rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [14/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:54 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientFactory.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientFactory.cpp b/rocketmq-client4cpp/src/MQClientFactory.cpp
new file mode 100755
index 0000000..2b8208b
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientFactory.cpp
@@ -0,0 +1,1258 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <math.h>
+#include <set>
+#include <string>
+#include <iostream>
+#include <vector>
+
+#include "MQClientFactory.h"
+#include "RemoteClientConfig.h"
+#include "ClientRemotingProcessor.h"
+#include "MQClientAPIImpl.h"
+#include "MQAdminImpl.h"
+#include "DefaultMQProducer.h"
+#include "PullMessageService.h"
+#include "RebalanceService.h"
+#include "ScopedLock.h"
+#include "KPRUtil.h"
+#include "DefaultMQProducerImpl.h"
+#include "DefaultMQPushConsumerImpl.h"
+#include "MQClientException.h"
+#include "MQConsumerInner.h"
+#include "MQProducerInner.h"
+#include "UtilAll.h"
+#include "PermName.h"
+#include "MQClientManager.h"
+#include "ConsumerStatManage.h"
+#include "TopicPublishInfo.h"
+#include "MQVersion.h"
+
+namespace rmq
+{
+
+
+long MQClientFactory::LockTimeoutMillis = 3000;
+
+MQClientFactory::MQClientFactory(ClientConfig& clientConfig, int factoryIndex, const std::string& clientId)
+{
+    m_clientConfig = clientConfig;
+    m_factoryIndex = factoryIndex;
+    m_pRemoteClientConfig = new RemoteClientConfig();
+    m_pRemoteClientConfig->clientCallbackExecutorThreads = clientConfig.getClientCallbackExecutorThreads();
+    m_pClientRemotingProcessor = new ClientRemotingProcessor(this);
+    m_pMQClientAPIImpl = new MQClientAPIImpl(m_clientConfig, *m_pRemoteClientConfig, m_pClientRemotingProcessor);
+
+    if (!m_clientConfig.getNamesrvAddr().empty())
+    {
+        m_pMQClientAPIImpl->updateNameServerAddressList(m_clientConfig.getNamesrvAddr());
+        RMQ_INFO("user specified name server address: {%s}", m_clientConfig.getNamesrvAddr().c_str());
+    }
+
+    m_clientId = clientId;
+
+    m_pMQAdminImpl = new MQAdminImpl(this);
+    m_pPullMessageService = new PullMessageService(this);
+    m_pRebalanceService = new RebalanceService(this);
+    m_pDefaultMQProducer = new DefaultMQProducer(MixAll::CLIENT_INNER_PRODUCER_GROUP);
+    m_pDefaultMQProducer->resetClientConfig(clientConfig);
+    m_bootTimestamp = KPRUtil::GetCurrentTimeMillis();
+
+    m_pFetchNameServerAddrTask = new ScheduledTask(this, &MQClientFactory::fetchNameServerAddr);
+    m_pUpdateTopicRouteInfoFromNameServerTask = new ScheduledTask(this, &MQClientFactory::updateTopicRouteInfoFromNameServerTask);
+    m_pCleanBrokerTask = new ScheduledTask(this, &MQClientFactory::cleanBroker);
+    m_pPersistAllConsumerOffsetTask = new ScheduledTask(this, &MQClientFactory::persistAllConsumerOffsetTask);
+    m_pRecordSnapshotPeriodicallyTask = new ScheduledTask(this, &MQClientFactory::recordSnapshotPeriodicallyTask);
+    m_pLogStatsPeriodicallyTask = new ScheduledTask(this, &MQClientFactory::logStatsPeriodicallyTask);
+
+    m_serviceState = CREATE_JUST;
+
+    RMQ_INFO("created a new client Instance, FactoryIndex: {%d} ClinetID: {%s} Config: {%s} Version: {%s}",
+                m_factoryIndex,
+                m_clientId.c_str(),
+                m_clientConfig.toString().c_str(),
+                MQVersion::getVersionDesc(MQVersion::s_CurrentVersion));
+}
+
+MQClientFactory::~MQClientFactory()
+{
+    delete m_pRemoteClientConfig;
+    delete m_pClientRemotingProcessor;
+    delete m_pMQClientAPIImpl;
+    delete m_pMQAdminImpl;
+    delete m_pPullMessageService;
+    delete m_pRebalanceService;
+    delete m_pDefaultMQProducer;
+}
+
+void MQClientFactory::start()
+{
+    RMQ_DEBUG("MQClientFactory::start()");
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+            makesureInstanceNameIsOnly(m_clientConfig.getInstanceName());
+
+            m_serviceState = START_FAILED;
+            if (m_clientConfig.getNamesrvAddr().empty())
+            {
+                m_clientConfig.setNamesrvAddr(m_pMQClientAPIImpl->fetchNameServerAddr());
+            }
+
+            m_pMQClientAPIImpl->start();
+            m_timerTaskManager.Init(5, 1000);
+            startScheduledTask();
+            m_pPullMessageService->Start();
+            m_pRebalanceService->Start();
+            m_pDefaultMQProducer->getDefaultMQProducerImpl()->start(false);
+
+			RMQ_INFO("the client factory [%s] start OK", m_clientId.c_str());
+            m_serviceState = RUNNING;
+            break;
+        case RUNNING:
+            RMQ_WARN("MQClientFactory is already running.");
+            break;
+        case SHUTDOWN_ALREADY:
+            RMQ_ERROR("MQClientFactory should have already been shutted down");
+            break;
+        case START_FAILED:
+            RMQ_ERROR("MQClientFactory started failed.");
+            THROW_MQEXCEPTION(MQClientException, "The Factory object start failed", -1);
+        default:
+            break;
+    }
+}
+
+
+void MQClientFactory::shutdown()
+{
+    RMQ_DEBUG("MQClientFactory::shutdown()");
+    // Consumer
+    if (!m_consumerTable.empty())
+    {
+        return;
+    }
+
+    // AdminExt
+    if (!m_adminExtTable.empty())
+    {
+        return;
+    }
+
+    // Producer
+    if (m_producerTable.size() > 1)
+    {
+        return;
+    }
+
+    RMQ_DEBUG("MQClientFactory::shutdown_begin");
+    {
+        kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+        switch (m_serviceState)
+        {
+            case CREATE_JUST:
+                break;
+            case RUNNING:
+                m_pDefaultMQProducer->getDefaultMQProducerImpl()->shutdown(false);
+
+                for (int i = 0; i < 6; i++)
+                {
+                    m_timerTaskManager.UnRegisterTimer(m_scheduledTaskIds[i]);
+                }
+
+                m_timerTaskManager.Stop();
+
+                m_pPullMessageService->stop();
+                m_pPullMessageService->Join();
+
+                m_pMQClientAPIImpl->shutdown();
+                m_pRebalanceService->stop();
+                m_pRebalanceService->Join();
+
+                //closesocket(m_datagramSocket);
+
+                MQClientManager::getInstance()->removeClientFactory(m_clientId);
+                m_serviceState = SHUTDOWN_ALREADY;
+                break;
+            case SHUTDOWN_ALREADY:
+                break;
+            default:
+                break;
+        }
+    }
+}
+
+
+void MQClientFactory::sendHeartbeatToAllBrokerWithLock()
+{
+	RMQ_DEBUG("TryLock m_lockHeartbeat: %p", &m_lockHeartbeat);
+    if (m_lockHeartbeat.TryLock())
+    {
+        try
+        {
+            RMQ_DEBUG("TryLock m_lockHeartbeat ok");
+            sendHeartbeatToAllBroker();
+        }
+        catch (...)
+        {
+            RMQ_ERROR("sendHeartbeatToAllBroker exception");
+        }
+        m_lockHeartbeat.Unlock();
+    }
+    else
+    {
+        RMQ_WARN("TryLock heartBeat fail");
+    }
+}
+
+void MQClientFactory::updateTopicRouteInfoFromNameServer()
+{
+    std::set<std::string> topicList;
+
+    // Consumer
+    {
+    	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+        std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+        for (; it != m_consumerTable.end(); it++)
+        {
+            MQConsumerInner* inner = it->second;
+            std::set<SubscriptionData> subList = inner->subscriptions();
+            std::set<SubscriptionData>::iterator it1 = subList.begin();
+            for (; it1 != subList.end(); it1++)
+            {
+                topicList.insert((*it1).getTopic());
+            }
+        }
+    }
+
+    // Producer
+    {
+    	kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock);
+        std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin();
+        for (; it != m_producerTable.end(); it++)
+        {
+            MQProducerInner* inner = it->second;
+            std::set<std::string> pubList = inner->getPublishTopicList();
+            topicList.insert(pubList.begin(), pubList.end());
+        }
+    }
+
+    std::set<std::string>::iterator it2 = topicList.begin();
+    for (; it2 != topicList.end(); it2++)
+    {
+        updateTopicRouteInfoFromNameServer(*it2);
+    }
+}
+
+bool MQClientFactory::updateTopicRouteInfoFromNameServer(const std::string& topic)
+{
+    return updateTopicRouteInfoFromNameServer(topic, false, NULL);
+}
+
+bool MQClientFactory::updateTopicRouteInfoFromNameServer(const std::string& topic,
+        bool isDefault,
+        DefaultMQProducer* pDefaultMQProducer)
+{
+    RMQ_DEBUG("TryLock m_lockNamesrv: 0x%p, topic: [%s]", &m_lockNamesrv, topic.c_str());
+    if (m_lockNamesrv.TryLock(MQClientFactory::LockTimeoutMillis))
+    {
+        RMQ_DEBUG("TryLock m_lockNamesrv ok");
+        TopicRouteDataPtr topicRouteData = NULL;
+        try
+        {
+            if (isDefault && pDefaultMQProducer != NULL)
+            {
+                topicRouteData =
+                    m_pMQClientAPIImpl->getDefaultTopicRouteInfoFromNameServer(
+                        pDefaultMQProducer->getCreateTopicKey(), 1000 * 3);
+                if (topicRouteData.ptr() != NULL)
+                {
+                    std::list<QueueData> dataList = topicRouteData->getQueueDatas();
+
+                    std::list<QueueData>::iterator it = dataList.begin();
+                    for (; it != dataList.end(); it++)
+                    {
+                        QueueData data = *it;
+
+                        int queueNums =
+                            std::min<int>(pDefaultMQProducer->getDefaultTopicQueueNums(),
+                                          data.readQueueNums);
+                        data.readQueueNums = (queueNums);
+                        data.writeQueueNums = (queueNums);
+                    }
+                }
+            }
+            else
+            {
+                topicRouteData =
+                    m_pMQClientAPIImpl->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
+            }
+
+            if (topicRouteData.ptr() != NULL)
+            {
+                kpr::ScopedWLock<kpr::RWMutex> lock(m_topicRouteTableLock);
+                std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.find(topic);
+                bool changed = false;
+
+                if (it != m_topicRouteTable.end())
+                {
+                    changed = topicRouteDataIsChange(it->second, *topicRouteData);
+                    if (!changed)
+                    {
+                        changed = isNeedUpdateTopicRouteInfo(topic);
+						if (changed)
+						{
+                        	RMQ_INFO("the topic[{%s}] route info changed, old[{%s}] ,new[{%s}]",
+                                 topic.c_str(), it->second.toString().c_str(),
+                                 topicRouteData->toString().c_str());
+                        }
+                    }
+                }
+                else
+                {
+                    changed = true;
+                }
+
+                if (changed)
+                {
+                    TopicRouteData cloneTopicRouteData = *topicRouteData;
+
+                    std::list<BrokerData> dataList = topicRouteData->getBrokerDatas();
+
+                    std::list<BrokerData>::iterator it = dataList.begin();
+                    for (; it != dataList.end(); it++)
+                    {
+                        kpr::ScopedWLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+                        m_brokerAddrTable[(*it).brokerName] = (*it).brokerAddrs;
+                    }
+
+                    {
+                        TopicPublishInfoPtr publishInfo =
+                            topicRouteData2TopicPublishInfo(topic, *topicRouteData);
+                        publishInfo->setHaveTopicRouterInfo(true);
+
+                        kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock);
+                        std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin();
+                        for (; it != m_producerTable.end(); it++)
+                        {
+                            MQProducerInner* impl = it->second;
+                            if (impl)
+                            {
+                                impl->updateTopicPublishInfo(topic, *publishInfo);
+                            }
+                        }
+                    }
+
+                    {
+                        std::set<MessageQueue>* subscribeInfo =
+                            topicRouteData2TopicSubscribeInfo(topic, *topicRouteData);
+
+                        kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+                        std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+                        for (; it != m_consumerTable.end(); it++)
+                        {
+                            MQConsumerInner* impl = it->second;
+                            if (impl)
+                            {
+                                impl->updateTopicSubscribeInfo(topic, *subscribeInfo);
+                            }
+                        }
+                        delete subscribeInfo;
+                    }
+
+                    m_topicRouteTable[topic] = cloneTopicRouteData;
+                    m_lockNamesrv.Unlock();
+                    RMQ_DEBUG("UnLock m_lockNamesrv ok");
+
+                    RMQ_INFO("topicRouteTable.put[%s] = TopicRouteData[%s]",
+                    	topic.c_str(), cloneTopicRouteData.toString().c_str());
+                    return true;
+                }
+            }
+            else
+            {
+                //TODO log?
+                RMQ_WARN("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {%s}",
+                         topic.c_str());
+            }
+        }
+        catch (const std::exception& e)
+        {
+        	if (!(topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) == 0) && topic != MixAll::DEFAULT_TOPIC)
+        	{
+            	RMQ_WARN("updateTopicRouteInfoFromNameServer Exception: %s", e.what());
+            }
+        }
+        catch (...)
+        {
+            RMQ_WARN("updateTopicRouteInfoFromNameServer unknow Exception");
+        }
+
+        m_lockNamesrv.Unlock();
+        RMQ_DEBUG("UnLock m_lockNamesrv ok");
+    }
+    else
+    {
+        RMQ_WARN("TryLock m_lockNamesrv timeout %ldms", MQClientFactory::LockTimeoutMillis);
+    }
+
+    return false;
+}
+
+TopicPublishInfo*  MQClientFactory::topicRouteData2TopicPublishInfo(const std::string& topic,
+        TopicRouteData& route)
+{
+    TopicPublishInfo* info = new TopicPublishInfo();
+    if (!route.getOrderTopicConf().empty())
+    {
+        std::vector<std::string> brokers;
+        UtilAll::Split(brokers, route.getOrderTopicConf(), ";");
+        for (size_t i = 0; i < brokers.size(); i++)
+        {
+            std::vector<std::string> item;
+            UtilAll::Split(item, brokers[i], ":");
+            int nums = atoi(item[1].c_str());
+            for (int i = 0; i < nums; i++)
+            {
+                MessageQueue mq(topic, item[0], i);
+                info->getMessageQueueList().push_back(mq);
+            }
+        }
+
+        info->setOrderTopic(true);
+    }
+    else
+    {
+        std::list<QueueData> qds = route.getQueueDatas();
+        qds.sort();
+        std::list<QueueData>::iterator it = qds.begin();
+        for (; it != qds.end(); it++)
+        {
+            QueueData& qd = (*it);
+            if (PermName::isWriteable(qd.perm))
+            {
+                bool find = false;
+                BrokerData brokerData;
+                std::list<BrokerData> bds = route.getBrokerDatas();
+                std::list<BrokerData>::iterator it1 = bds.begin();
+
+                for (; it1 != bds.end(); it1++)
+                {
+                    BrokerData& bd = (*it1);
+                    if (bd.brokerName == qd.brokerName)
+                    {
+                        brokerData = bd;
+                        find = true;
+                        break;
+                    }
+                }
+
+                if (!find)
+                {
+                    continue;
+                }
+
+                if (brokerData.brokerAddrs.find(MixAll::MASTER_ID) == brokerData.brokerAddrs.end())
+                {
+                    continue;
+                }
+
+                for (int i = 0; i < qd.writeQueueNums; i++)
+                {
+                    MessageQueue mq(topic, qd.brokerName, i);
+                    info->getMessageQueueList().push_back(mq);
+                }
+            }
+        }
+
+        info->setOrderTopic(false);
+    }
+
+    return info;
+}
+
+std::set<MessageQueue>* MQClientFactory::topicRouteData2TopicSubscribeInfo(const std::string& topic,
+        TopicRouteData& route)
+{
+    std::set<MessageQueue>* mqList = new std::set<MessageQueue>();
+    std::list<QueueData> qds = route.getQueueDatas();
+    std::list<QueueData>::iterator it = qds.begin();
+    for (; it != qds.end(); it++)
+    {
+        QueueData& qd = (*it);
+        if (PermName::isReadable(qd.perm))
+        {
+            for (int i = 0; i < qd.readQueueNums; i++)
+            {
+                MessageQueue mq(topic, qd.brokerName, i);
+                mqList->insert(mq);
+            }
+        }
+    }
+
+    return mqList;
+}
+
+bool MQClientFactory::registerConsumer(const std::string& group, MQConsumerInner* pConsumer)
+{
+    if (group.empty() || pConsumer == NULL)
+    {
+        return false;
+    }
+
+	kpr::ScopedWLock<kpr::RWMutex> lock(m_consumerTableLock);
+    if (m_consumerTable.find(group) != m_consumerTable.end())
+    {
+        return false;
+    }
+    m_consumerTable[group] = pConsumer;
+
+    return true;
+}
+
+void MQClientFactory::unregisterConsumer(const std::string& group)
+{
+	{
+		kpr::ScopedWLock<kpr::RWMutex> lock(m_consumerTableLock);
+	    m_consumerTable.erase(group);
+    }
+    unregisterClientWithLock("", group);
+}
+
+bool MQClientFactory::registerProducer(const std::string& group, DefaultMQProducerImpl* pProducer)
+{
+    if (group.empty() || pProducer == NULL)
+    {
+        return false;
+    }
+
+	kpr::ScopedWLock<kpr::RWMutex> lock(m_producerTableLock);
+    if (m_producerTable.find(group) != m_producerTable.end())
+    {
+        return false;
+    }
+    m_producerTable[group] = pProducer;
+
+    return true;
+}
+
+void MQClientFactory::unregisterProducer(const std::string& group)
+{
+	{
+		kpr::ScopedWLock<kpr::RWMutex> lock(m_producerTableLock);
+	    m_producerTable.erase(group);
+    }
+    unregisterClientWithLock(group, "");
+}
+
+bool MQClientFactory::registerAdminExt(const std::string& group, MQAdminExtInner* pAdmin)
+{
+    if (group.empty() || pAdmin == NULL)
+    {
+        return false;
+    }
+
+	kpr::ScopedWLock<kpr::RWMutex> lock(m_adminExtTableLock);
+    if (m_adminExtTable.find(group) != m_adminExtTable.end())
+    {
+        return false;
+    }
+    m_adminExtTable[group] = pAdmin;
+
+    return true;
+}
+
+void MQClientFactory::unregisterAdminExt(const std::string& group)
+{
+	kpr::ScopedWLock<kpr::RWMutex> lock(m_adminExtTableLock);
+    m_adminExtTable.erase(group);
+}
+
+void MQClientFactory::rebalanceImmediately()
+{
+    m_pRebalanceService->wakeup();
+}
+
+void MQClientFactory::doRebalance()
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+    std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+    for (; it != m_consumerTable.end(); it++)
+    {
+        MQConsumerInner* impl = it->second;
+        if (impl != NULL)
+        {
+            try
+            {
+                impl->doRebalance();
+            }
+            catch (std::exception& e)
+            {
+                RMQ_ERROR("doRebalance exception, %s", e.what());
+            }
+            catch (...)
+            {
+                RMQ_ERROR("doRebalance unknow exception");
+            }
+        }
+    }
+}
+
+MQProducerInner* MQClientFactory::selectProducer(const std::string& group)
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock);
+    std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.find(group);
+    if (it != m_producerTable.end())
+    {
+        return it->second;
+    }
+
+    return NULL;
+}
+
+MQConsumerInner* MQClientFactory::selectConsumer(const std::string& group)
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+    std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.find(group);
+    if (it != m_consumerTable.end())
+    {
+        return it->second;
+    }
+
+    return NULL;
+}
+
+FindBrokerResult MQClientFactory::findBrokerAddressInAdmin(const std::string& brokerName)
+{
+    //TODO
+    FindBrokerResult result;
+    std::string brokerAddr;
+    bool slave = false;
+    bool found = false;
+
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+    typeof(m_brokerAddrTable.begin()) it = m_brokerAddrTable.find(brokerName);
+    if (it != m_brokerAddrTable.end())
+    {
+        // TODO more slave
+        typeof(it->second.begin()) it1 = it->second.begin();
+        for (; it1 != it->second.end(); it1++)
+        {
+            int brockerId = it1->first;
+            brokerAddr = it1->second;
+            if (!brokerAddr.empty())
+            {
+                found = true;
+                if (MixAll::MASTER_ID == brockerId)
+                {
+                    slave = false;
+                }
+                else
+                {
+                    slave = true;
+                }
+                break;
+            }
+        }
+    }
+
+    if (found)
+    {
+        result.brokerAddr = brokerAddr;
+        result.slave = slave;
+    }
+
+    return result;
+}
+
+std::string MQClientFactory::findBrokerAddressInPublish(const std::string& brokerName)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+    std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.find(brokerName);
+    if (it != m_brokerAddrTable.end())
+    {
+        std::map<int, std::string>::iterator it1 = it->second.find(MixAll::MASTER_ID);
+        if (it1 != it->second.end())
+        {
+            return it1->second;
+        }
+    }
+
+    return "";
+}
+
+FindBrokerResult MQClientFactory::findBrokerAddressInSubscribe(const std::string& brokerName,
+        long brokerId,
+        bool onlyThisBroker)
+{
+    std::string brokerAddr = "";
+    bool slave = false;
+    bool found = false;
+
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+    std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.find(brokerName);
+    if (it != m_brokerAddrTable.end())
+    {
+        std::map<int, std::string>::iterator it1 = it->second.find(brokerId);
+        if (it1 != it->second.end())
+        {
+            brokerAddr = it1->second;
+            slave = (brokerId != MixAll::MASTER_ID);
+            found = true;
+        }
+        else
+        {
+            it1 = it->second.begin();
+            brokerAddr = it1->second;
+            slave = (brokerId != MixAll::MASTER_ID);
+            found = true;
+        }
+    }
+
+    FindBrokerResult result;
+    result.brokerAddr = brokerAddr;
+    result.slave = slave;
+
+    return result;
+}
+
+std::list<std::string> MQClientFactory::findConsumerIdList(const std::string& topic, const std::string& group)
+{
+    std::string brokerAddr = findBrokerAddrByTopic(topic);
+
+    if (brokerAddr.empty())
+    {
+        updateTopicRouteInfoFromNameServer(topic);
+        brokerAddr = findBrokerAddrByTopic(topic);
+    }
+
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            return m_pMQClientAPIImpl->getConsumerIdListByGroup(brokerAddr, group, 3000);
+        }
+        catch (...)
+        {
+			RMQ_WARN("getConsumerIdListByGroup exception, %s, %s", brokerAddr.c_str(), group.c_str());
+        }
+    }
+
+    std::list<std::string> ids;
+
+    return ids;
+}
+
+std::string MQClientFactory::findBrokerAddrByTopic(const std::string& topic)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_topicRouteTableLock);
+
+    std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.find(topic);
+    if (it != m_topicRouteTable.end())
+    {
+        const std::list<BrokerData>& brokers = it->second.getBrokerDatas();
+
+        if (!brokers.empty())
+        {
+            BrokerData bd = brokers.front();
+            return TopicRouteData::selectBrokerAddr(bd);
+        }
+    }
+
+    return "";
+}
+
+TopicRouteData MQClientFactory::getAnExistTopicRouteData(const std::string& topic)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_topicRouteTableLock);
+
+    std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.find(topic);
+    if (it != m_topicRouteTable.end())
+    {
+        return it->second;
+    }
+
+    TopicRouteData data;
+    return data;
+}
+
+MQClientAPIImpl* MQClientFactory::getMQClientAPIImpl()
+{
+    return m_pMQClientAPIImpl;
+}
+
+MQAdminImpl* MQClientFactory::getMQAdminImpl()
+{
+    return m_pMQAdminImpl;
+}
+
+std::string MQClientFactory::getClientId()
+{
+    return m_clientId;
+}
+
+long long MQClientFactory::getBootTimestamp()
+{
+    return m_bootTimestamp;
+}
+
+PullMessageService* MQClientFactory::getPullMessageService()
+{
+    return m_pPullMessageService;
+}
+
+
+DefaultMQProducer* MQClientFactory::getDefaultMQProducer()
+{
+    return m_pDefaultMQProducer;
+}
+
+void MQClientFactory::sendHeartbeatToAllBroker()
+{
+    RMQ_DEBUG("sendHeartbeatToAllBroker begin");
+
+    HeartbeatData heartbeatData;
+    this->prepareHeartbeatData(heartbeatData);
+
+    bool producerEmpty = heartbeatData.getProducerDataSet().empty();
+    bool consumerEmpty = heartbeatData.getConsumerDataSet().empty();
+    if (producerEmpty && consumerEmpty)
+    {
+        RMQ_ERROR("sending hearbeat, but no consumer and no producer");
+        return;
+    }
+
+    RMQ_DEBUG("clientId=%s, m_brokerAddrTable=%u", heartbeatData.getClientID().c_str(), (unsigned)m_brokerAddrTable.size());
+
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+    std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.begin();
+    for (; it != m_brokerAddrTable.end(); it++)
+    {
+        std::map<int, std::string>::iterator it1 = it->second.begin();
+        for (; it1 != it->second.end(); it1++)
+        {
+            std::string& addr = it1->second;
+            if (!addr.empty())
+            {
+                if (consumerEmpty)
+                {
+                    if (it1->first != MixAll::MASTER_ID)
+                    {
+                        continue;
+                    }
+                }
+
+                try
+                {
+                    m_pMQClientAPIImpl->sendHearbeat(addr, &heartbeatData, 3000);
+                    RMQ_INFO("send heartbeat to broker[{%s} {%d} {%s}] success",
+                    	it->first.c_str(), it1->first, addr.c_str());
+                    RMQ_INFO("HeartbeatData %s", heartbeatData.toString().c_str());
+                }
+                catch (...)
+                {
+                    RMQ_ERROR("send heart beat to broker exception");
+                }
+            }
+        }
+    }
+
+    RMQ_DEBUG("sendHeartbeatToAllBroker end");
+}
+
+void MQClientFactory::prepareHeartbeatData(HeartbeatData& heartbeatData)
+{
+    // clientID
+    heartbeatData.setClientID(m_clientId);
+
+    // Consumer
+    {
+    	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+        std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+        for (; it != m_consumerTable.end(); it++)
+        {
+            MQConsumerInner* inner = it->second;
+            if (inner)
+            {
+                ConsumerData consumerData;
+                consumerData.groupName = inner->groupName();
+                consumerData.consumeType = inner->consumeType();
+                consumerData.messageModel = inner->messageModel();
+                consumerData.consumeFromWhere = inner->consumeFromWhere();
+                consumerData.subscriptionDataSet = inner->subscriptions();
+
+                heartbeatData.getConsumerDataSet().insert(consumerData);
+            }
+        }
+    }
+
+    // Producer
+    {
+    	kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock);
+        std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin();
+        for (; it != m_producerTable.end(); it++)
+        {
+            MQProducerInner* inner = it->second;
+            if (inner)
+            {
+                ProducerData producerData;
+                producerData.groupName = (it->first);
+
+                heartbeatData.getProducerDataSet().insert(producerData);
+            }
+        }
+    }
+
+    return;
+}
+
+void MQClientFactory::makesureInstanceNameIsOnly(const std::string& instanceName)
+{
+    //TODO
+}
+
+
+void MQClientFactory::fetchNameServerAddr()
+{
+    //1000 * 10, 1000 * 60 * 2
+    try
+    {
+        RMQ_DEBUG("Task: fetchNameServerAddr");
+        m_pMQClientAPIImpl->fetchNameServerAddr();
+    }
+    catch (...)
+    {
+    	RMQ_ERROR("Task: fetchNameServerAddr exception");
+    }
+}
+
+void MQClientFactory::updateTopicRouteInfoFromNameServerTask()
+{
+    //10, 1000 * 30, m_clientConfig.getPollNameServerInteval()
+    try
+    {
+        RMQ_DEBUG("Task: updateTopicRouteInfoFromNameServerTask");
+        updateTopicRouteInfoFromNameServer();
+    }
+    catch (...)
+    {
+		RMQ_ERROR("Task: fetchNameServerAddr exception");
+    }
+}
+
+void MQClientFactory::cleanBroker()
+{
+    //1000, 1000 * 30, m_clientConfig.getHeartbeatBrokerInterval()
+    try
+    {
+        RMQ_DEBUG("Task: cleanBroker");
+        cleanOfflineBroker();
+        sendHeartbeatToAllBrokerWithLock();
+    }
+    catch (...)
+    {
+		RMQ_ERROR("Task: cleanBroker exception");
+    }
+}
+
+void MQClientFactory::persistAllConsumerOffsetTask()
+{
+    //1000 * 10, 1000 * 5, m_clientConfig.getPersistConsumerOffsetInterval()
+    try
+    {
+        RMQ_DEBUG("Task: persistAllConsumerOffsetTask");
+        persistAllConsumerOffset();
+    }
+    catch (...)
+    {
+		RMQ_ERROR("Task: persistAllConsumerOffsetTask exception");
+    }
+}
+
+void MQClientFactory::recordSnapshotPeriodicallyTask()
+{
+    // 1000 * 10, 1000,
+    try
+    {
+        //RMQ_DEBUG("Task: recordSnapshotPeriodicallyTask");
+        recordSnapshotPeriodically();
+    }
+    catch (...)
+    {
+		RMQ_ERROR("Task: recordSnapshotPeriodically exception");
+    }
+}
+
+void MQClientFactory::logStatsPeriodicallyTask()
+{
+    //  1000 * 10, 1000 * 60
+    try
+    {
+        RMQ_DEBUG("Task: logStatsPeriodicallyTask");
+        logStatsPeriodically();
+    }
+    catch (...)
+    {
+		RMQ_ERROR("Task: logStatsPeriodicallyTask exception");
+    }
+}
+
+void MQClientFactory::startScheduledTask()
+{
+    m_scheduledTaskIds[0] = m_timerTaskManager.RegisterTimer(1000 * 10, 1000 * 60 * 2, m_pFetchNameServerAddrTask);
+
+    m_scheduledTaskIds[1] = m_timerTaskManager.RegisterTimer(10, m_clientConfig.getPollNameServerInterval(), m_pUpdateTopicRouteInfoFromNameServerTask);
+
+    m_scheduledTaskIds[2] = m_timerTaskManager.RegisterTimer(1000, m_clientConfig.getHeartbeatBrokerInterval(), m_pCleanBrokerTask);
+
+    m_scheduledTaskIds[3] = m_timerTaskManager.RegisterTimer(1000 * 10, m_clientConfig.getPersistConsumerOffsetInterval(), m_pPersistAllConsumerOffsetTask);
+
+    m_scheduledTaskIds[4] = m_timerTaskManager.RegisterTimer(1000 * 10, 1000, m_pRecordSnapshotPeriodicallyTask);
+    m_scheduledTaskIds[5] = m_timerTaskManager.RegisterTimer(1000 * 10, 1000 * 60, m_pLogStatsPeriodicallyTask);
+}
+
+void MQClientFactory::cleanOfflineBroker()
+{
+    RMQ_DEBUG("TryLock m_lockNamesrv: 0x%p", &m_lockNamesrv);
+    if (m_lockNamesrv.TryLock(MQClientFactory::LockTimeoutMillis))
+    {
+        RMQ_DEBUG("TryLock m_lockNamesrv ok");
+        std::map<std::string, std::map<int, std::string> > updatedTable;
+        {
+            kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+            std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.begin();
+
+            for (; it != m_brokerAddrTable.end(); it++)
+            {
+                std::map<int, std::string> cloneTable = it->second;
+
+                std::map<int, std::string>::iterator it1 = cloneTable.begin();
+
+                for (; it1 != cloneTable.end();)
+                {
+                    std::string& addr = it1->second;
+                    if (!isBrokerAddrExistInTopicRouteTable(addr))
+                    {
+                        std::map<int, std::string>::iterator itTmp = it1;
+                        it1++;
+                        cloneTable.erase(itTmp);
+                        continue;
+                    }
+
+                    it1++;
+                }
+
+                if (!cloneTable.empty())
+                {
+                    updatedTable[it->first] = cloneTable;
+                }
+            }
+        }
+
+        {
+            kpr::ScopedWLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+            m_brokerAddrTable.clear();
+            m_brokerAddrTable = updatedTable;
+        }
+
+        m_lockNamesrv.Unlock();
+        RMQ_DEBUG("UnLock m_lockNamesrv ok");
+    }
+    else
+    {
+        RMQ_DEBUG("TryLock m_lockNamesrv fail");
+    }
+}
+
+bool MQClientFactory::isBrokerAddrExistInTopicRouteTable(const std::string& addr)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_topicRouteTableLock);
+
+    std::map<std::string, TopicRouteData>::iterator it = m_topicRouteTable.begin();
+    for (; it != m_topicRouteTable.end(); it++)
+    {
+        const std::list<BrokerData>& brokers = it->second.getBrokerDatas();
+        std::list<BrokerData>::const_iterator it1 = brokers.begin();
+
+        for (; it1 != brokers.end(); it1++)
+        {
+            std::map<int, std::string>::const_iterator it2 = (*it1).brokerAddrs.begin();
+            for (; it2 != (*it1).brokerAddrs.end(); it2++)
+            {
+                if (it2->second.find(addr) != std::string::npos)
+                {
+                    return true;
+                }
+            }
+        }
+    }
+
+    return false;
+}
+
+void MQClientFactory::recordSnapshotPeriodically()
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+    std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+    for (; it != m_consumerTable.end(); it++)
+    {
+        MQConsumerInner* inner = it->second;
+        if (inner)
+        {
+            DefaultMQPushConsumerImpl* consumer = dynamic_cast<DefaultMQPushConsumerImpl*>(inner);
+            if (consumer)
+            {
+                consumer->getConsumerStatManager()->recordSnapshotPeriodically();
+            }
+        }
+    }
+}
+
+void MQClientFactory::logStatsPeriodically()
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+    std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+    for (; it != m_consumerTable.end(); it++)
+    {
+        MQConsumerInner* inner = it->second;
+        if (inner)
+        {
+            DefaultMQPushConsumerImpl* consumer = dynamic_cast<DefaultMQPushConsumerImpl*>(inner);
+            if (consumer)
+            {
+                std::string group = it->first;
+                consumer->getConsumerStatManager()->logStatsPeriodically(group, m_clientId);
+            }
+        }
+    }
+}
+
+void MQClientFactory::persistAllConsumerOffset()
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+    RMQ_DEBUG("persistAllConsumerOffset, m_consumerTable.size=%u", (unsigned)m_consumerTable.size());
+    std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+    for (; it != m_consumerTable.end(); it++)
+    {
+        MQConsumerInner* inner = it->second;
+        if (inner)
+        {
+            inner->persistConsumerOffset();
+        }
+    }
+}
+
+bool MQClientFactory::topicRouteDataIsChange(TopicRouteData& olddata, TopicRouteData& nowdata)
+{
+    TopicRouteData old = olddata;
+    TopicRouteData now = nowdata;
+
+    old.getQueueDatas().sort();
+    old.getBrokerDatas().sort();
+    now.getQueueDatas().sort();
+    now.getBrokerDatas().sort();
+
+    return !(old == now);
+
+}
+
+bool MQClientFactory::isNeedUpdateTopicRouteInfo(const std::string& topic)
+{
+    bool result = false;
+    {
+    	kpr::ScopedRLock<kpr::RWMutex> lock(m_producerTableLock);
+        std::map<std::string, MQProducerInner*>::iterator it = m_producerTable.begin();
+        for (; it != m_producerTable.end(); it++)
+        {
+            MQProducerInner* inner = it->second;
+            if (inner)
+            {
+                result = inner->isPublishTopicNeedUpdate(topic);
+            }
+        }
+    }
+
+    {
+    	kpr::ScopedRLock<kpr::RWMutex> lock(m_consumerTableLock);
+        std::map<std::string, MQConsumerInner*>::iterator it = m_consumerTable.begin();
+        for (; it != m_consumerTable.end(); it++)
+        {
+            MQConsumerInner* inner = it->second;
+            if (inner)
+            {
+                result = inner->isSubscribeTopicNeedUpdate(topic);
+            }
+        }
+    }
+
+    return result;
+}
+
+void MQClientFactory::unregisterClientWithLock(const std::string& producerGroup, const std::string& consumerGroup)
+{
+    RMQ_DEBUG("TryLock m_lockHeartbeat: 0x%p", &m_lockHeartbeat);
+    if (m_lockHeartbeat.TryLock(MQClientFactory::LockTimeoutMillis))
+    {
+        try
+        {
+            RMQ_DEBUG("TryLock m_lockHeartbeat ok");
+            unregisterClient(producerGroup, consumerGroup);
+        }
+        catch (...)
+        {
+            RMQ_ERROR("unregisterClientWithLock exception, %s %s",
+                      producerGroup.c_str(), consumerGroup.c_str());
+        }
+        m_lockHeartbeat.Unlock();
+        RMQ_DEBUG("Unlock m_lockHeartbeat ok");
+    }
+    else
+    {
+        RMQ_WARN("TryLock m_lockHeartbeat fail");
+    }
+}
+
+void MQClientFactory::unregisterClient(const std::string& producerGroup, const std::string& consumerGroup)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_brokerAddrTableLock);
+    std::map<std::string, std::map<int, std::string> >::iterator it = m_brokerAddrTable.begin();
+    for (; it != m_brokerAddrTable.end(); it++)
+    {
+        std::map<int, std::string>::iterator it1 = it->second.begin();
+
+        for (; it1 != it->second.end(); it1++)
+        {
+            std::string& addr = it1->second;
+
+            if (!addr.empty())
+            {
+                try
+                {
+                    m_pMQClientAPIImpl->unregisterClient(addr, m_clientId, producerGroup,
+                                                         consumerGroup, 3000);
+                }
+                catch (...)
+                {
+                    RMQ_ERROR("unregister client exception from broker: %s", addr.c_str());
+                }
+            }
+        }
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientFactory.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientFactory.h b/rocketmq-client4cpp/src/MQClientFactory.h
new file mode 100755
index 0000000..8f56a27
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientFactory.h
@@ -0,0 +1,214 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQCLIENTFACTORY_H__
+#define __MQCLIENTFACTORY_H__
+
+#include <set>
+#include <string>
+#include <list>
+
+#include "SocketUtil.h"
+#include "TopicRouteData.h"
+#include "FindBrokerResult.h"
+#include "ClientConfig.h"
+#include "Mutex.h"
+#include "ServiceState.h"
+#include "TimerTaskManager.h"
+
+namespace rmq
+{
+    class ClientConfig;
+    class MessageQueue;
+    class MQAdminExtInner;
+    class MQClientAPIImpl;
+    class MQAdminImpl;
+    class PullMessageService;
+    class HeartbeatData;
+    class RemoteClientConfig;
+    class ClientRemotingProcessor;
+    class RebalanceService;
+    class DefaultMQProducer;
+    class TopicPublishInfo;
+    class MQProducerInner;
+    class MQConsumerInner;
+    class DefaultMQProducerImpl;
+
+    class MQClientFactory
+    {
+    public:
+        MQClientFactory(ClientConfig& clientConfig, int factoryIndex, const std::string& clientId);
+        ~MQClientFactory();
+
+        void start();
+        void shutdown();
+        void sendHeartbeatToAllBrokerWithLock();
+        void updateTopicRouteInfoFromNameServer();
+        bool updateTopicRouteInfoFromNameServer(const std::string& topic);
+
+        bool updateTopicRouteInfoFromNameServer(const std::string& topic, bool isDefault,
+                                                DefaultMQProducer* pDefaultMQProducer);
+
+        static TopicPublishInfo* topicRouteData2TopicPublishInfo(const std::string& topic,
+                TopicRouteData& route);
+
+        static std::set<MessageQueue>* topicRouteData2TopicSubscribeInfo(const std::string& topic,
+                TopicRouteData& route);
+
+        bool registerConsumer(const std::string& group, MQConsumerInner* pConsumer);
+        void unregisterConsumer(const std::string& group);
+
+        bool registerProducer(const std::string& group, DefaultMQProducerImpl* pProducer);
+        void unregisterProducer(const std::string& group);
+
+        bool registerAdminExt(const std::string& group, MQAdminExtInner* pAdmin);
+        void unregisterAdminExt(const std::string& group);
+
+        void rebalanceImmediately();
+        void doRebalance();
+
+        MQProducerInner* selectProducer(const std::string& group);
+        MQConsumerInner* selectConsumer(const std::string& group);
+
+        FindBrokerResult findBrokerAddressInAdmin(const std::string& brokerName);
+        std::string findBrokerAddressInPublish(const std::string& brokerName);
+        FindBrokerResult findBrokerAddressInSubscribe(//
+            const std::string& brokerName,//
+            long brokerId,//
+            bool onlyThisBroker);
+
+        std::list<std::string> findConsumerIdList(const std::string& topic, const std::string& group);
+        std::string findBrokerAddrByTopic(const std::string& topic);
+        TopicRouteData getAnExistTopicRouteData(const std::string& topic);
+        MQClientAPIImpl* getMQClientAPIImpl();
+        MQAdminImpl* getMQAdminImpl();
+        std::string getClientId();
+        long long getBootTimestamp();
+        PullMessageService* getPullMessageService();
+        DefaultMQProducer* getDefaultMQProducer();
+
+    private:
+        void sendHeartbeatToAllBroker();
+        //HeartbeatData* prepareHeartbeatData();
+        void prepareHeartbeatData(HeartbeatData& heartbeatData);
+
+        void makesureInstanceNameIsOnly(const std::string& instanceName);
+        void startScheduledTask();
+
+
+        void cleanOfflineBroker();
+        bool isBrokerAddrExistInTopicRouteTable(const std::string& addr);
+        void recordSnapshotPeriodically();
+        void logStatsPeriodically();
+        void persistAllConsumerOffset();
+        bool topicRouteDataIsChange(TopicRouteData& olddata, TopicRouteData& nowdata);
+        bool isNeedUpdateTopicRouteInfo(const std::string& topic);
+        void unregisterClientWithLock(const std::string& producerGroup, const std::string& consumerGroup);
+        void unregisterClient(const std::string& producerGroup, const std::string& consumerGroup);
+
+        typedef void (MQClientFactory::*pScheduledFunc)();
+
+        class ScheduledTask : public kpr::TimerTask
+        {
+        public:
+            ScheduledTask(MQClientFactory* pMQClientFactory, pScheduledFunc pScheduled)
+                : m_pMQClientFactory(pMQClientFactory), m_pScheduled(pScheduled)
+            {
+            }
+
+            virtual void DoTask()
+            {
+                (m_pMQClientFactory->*m_pScheduled)();
+            }
+
+        private:
+            MQClientFactory* m_pMQClientFactory;
+            pScheduledFunc m_pScheduled;
+        };
+		typedef kpr::RefHandleT<ScheduledTask> ScheduledTaskPtr;
+
+        // schedule task
+        void fetchNameServerAddr();
+        void updateTopicRouteInfoFromNameServerTask();
+        void cleanBroker();
+        void persistAllConsumerOffsetTask();
+        void recordSnapshotPeriodicallyTask();
+        void logStatsPeriodicallyTask();
+
+    private:
+        static long LockTimeoutMillis;
+        ClientConfig m_clientConfig;
+        int m_factoryIndex;
+        std::string m_clientId;
+        long long m_bootTimestamp;
+
+        // Producer
+        //group --> MQProducerInner
+        std::map<std::string, MQProducerInner*> m_producerTable;
+        kpr::RWMutex m_producerTableLock;
+
+        // Consumer
+        //group --> MQConsumerInner
+        std::map<std::string, MQConsumerInner*> m_consumerTable;
+        kpr::RWMutex m_consumerTableLock;
+
+        // AdminExt
+        // group --> MQAdminExtInner
+        std::map<std::string, MQAdminExtInner*> m_adminExtTable;
+        kpr::RWMutex m_adminExtTableLock;
+
+        RemoteClientConfig* m_pRemoteClientConfig;
+
+        MQClientAPIImpl* m_pMQClientAPIImpl;
+        MQAdminImpl* m_pMQAdminImpl;
+
+        /// Topic---> TopicRouteData
+        std::map<std::string, TopicRouteData> m_topicRouteTable;
+        kpr::RWMutex m_topicRouteTableLock;
+
+        kpr::Mutex m_mutex;
+        kpr::Mutex m_lockNamesrv;
+
+        kpr::Mutex m_lockHeartbeat;
+
+        //-----brokerName
+        //     ------brokerid  addr
+        //     ------brokerid  addr
+        std::map<std::string, std::map<int, std::string> > m_brokerAddrTable;
+        kpr::RWMutex m_brokerAddrTableLock;
+
+        // ��ʱ�߳�
+        kpr::TimerTaskManager m_timerTaskManager;
+        ScheduledTaskPtr m_pFetchNameServerAddrTask;
+        ScheduledTaskPtr m_pUpdateTopicRouteInfoFromNameServerTask;
+        ScheduledTaskPtr m_pCleanBrokerTask;
+        ScheduledTaskPtr m_pPersistAllConsumerOffsetTask;
+        ScheduledTaskPtr m_pRecordSnapshotPeriodicallyTask;
+        ScheduledTaskPtr m_pLogStatsPeriodicallyTask;
+
+        int m_scheduledTaskIds[6];
+
+        ClientRemotingProcessor* m_pClientRemotingProcessor;
+        PullMessageService* m_pPullMessageService;
+        RebalanceService* m_pRebalanceService;
+        DefaultMQProducer* m_pDefaultMQProducer;
+        ServiceState m_serviceState;
+
+        //SOCKET m_datagramSocket;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientManager.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientManager.cpp b/rocketmq-client4cpp/src/MQClientManager.cpp
new file mode 100755
index 0000000..b3041fc
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientManager.cpp
@@ -0,0 +1,75 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "MQClientManager.h"
+#include "ScopedLock.h"
+#include "MQClientFactory.h"
+#include "ClientConfig.h"
+
+namespace rmq
+{
+
+
+MQClientManager* MQClientManager::s_instance = new MQClientManager();
+
+MQClientManager::MQClientManager()
+{
+
+}
+
+MQClientManager::~MQClientManager()
+{
+
+}
+
+MQClientManager* MQClientManager::getInstance()
+{
+    return s_instance;
+}
+
+MQClientFactory* MQClientManager::getAndCreateMQClientFactory(ClientConfig& clientConfig)
+{
+    std::string clientId = clientConfig.buildMQClientId();
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    std::map<std::string, MQClientFactory*>::iterator it = m_factoryTable.find(clientId);
+
+    if (it != m_factoryTable.end())
+    {
+        return it->second;
+    }
+    else
+    {
+        MQClientFactory* factory = new MQClientFactory(clientConfig, m_factoryIndexGenerator++, clientId);
+
+        m_factoryTable[clientId] = factory;
+
+        return factory;
+    }
+}
+
+void MQClientManager::removeClientFactory(const std::string&  clientId)
+{
+    kpr::ScopedLock<kpr::Mutex> lock(m_mutex);
+    std::map<std::string, MQClientFactory*>::iterator it = m_factoryTable.find(clientId);
+
+    if (it != m_factoryTable.end())
+    {
+        //delete it->second;
+        m_factoryTable.erase(it);
+    }
+}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientManager.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientManager.h b/rocketmq-client4cpp/src/MQClientManager.h
new file mode 100755
index 0000000..742f8bb
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientManager.h
@@ -0,0 +1,49 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQCLIENTMANAGER_H__
+#define __MQCLIENTMANAGER_H__
+
+#include <string>
+#include <map>
+#include "Mutex.h"
+#include "AtomicValue.h"
+
+namespace rmq
+{
+	class MQClientFactory;
+	class ClientConfig;
+
+	class MQClientManager
+	{
+	public:
+	    ~MQClientManager();
+	    static MQClientManager* getInstance();
+	    MQClientFactory* getAndCreateMQClientFactory(ClientConfig& clientConfig);
+	    void removeClientFactory(const std::string&  clientId);
+
+	private:
+	    MQClientManager();
+
+	private:
+	    static MQClientManager* s_instance;
+	    kpr::AtomicInteger m_factoryIndexGenerator;
+	    std::map<std::string, MQClientFactory*> m_factoryTable;
+	    kpr::Mutex m_mutex;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/Makefile
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/Makefile b/rocketmq-client4cpp/src/Makefile
new file mode 100644
index 0000000..5d5f43b
--- /dev/null
+++ b/rocketmq-client4cpp/src/Makefile
@@ -0,0 +1,26 @@
+#-----------------------------------------------------------------------
+TARGET          := librocketmq.a
+CONFIG          :=
+STRIP_FLAG      := N
+MFLAGS      	:= ${BIT}
+CFLAGS          += -g -fPIC -Wno-deprecated -fno-strict-aliasing -fno-omit-frame-pointer
+CFLAGS_32       += -march=i686
+INCLUDE         += -I../include -I./common -I./kpr -I./protocol -I./message -I./transport -I./producer -I./consumer \
+				   -I./jsoncpp 
+LIB             += -lz -lrt -lpthread
+LIB32           += 
+LIB64           += 
+#-----------------------------------------------------------------------
+
+LOCAL_SRC += $(wildcard jsoncpp/*.cpp)
+LOCAL_SRC += $(wildcard kpr/*.cpp)
+LOCAL_SRC += $(wildcard common/*.cpp)
+LOCAL_SRC += $(wildcard protocol/*.cpp)
+LOCAL_SRC += $(wildcard message/*.cpp)
+LOCAL_SRC += $(wildcard transport/*.cpp)
+LOCAL_SRC += $(wildcard producer/*.cpp)
+LOCAL_SRC += $(wildcard consumer/*.cpp)
+
+include ./Makefile.std
+#-----------------------------------------------------------------------
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/Makefile.std
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/Makefile.std b/rocketmq-client4cpp/src/Makefile.std
new file mode 100755
index 0000000..b23f260
--- /dev/null
+++ b/rocketmq-client4cpp/src/Makefile.std
@@ -0,0 +1,127 @@
+#-------------------------------------------------------------------------------
+CC          = gcc
+CXX         = g++
+CFLAGS      += -g -fPIC -Wno-deprecated -Wall -pipe -fno-ident -fno-strict-aliasing -MMD -D_GNU_SOURCE -D_REENTRANT
+
+#-------------------------------------------------------------------------------
+INCLUDE     += -I./
+LIB_32      += -L./
+LIB_64      += -L./
+
+LOCAL_SRC   += $(sort $(wildcard *.cpp *.cc *.c))
+LOCAL_OBJ += $(patsubst %.cpp,%.o, $(patsubst %.cc,%.o, $(patsubst %.c,%.o, $(LOCAL_SRC))))
+DEP_FILE    := $(foreach obj, $(LOCAL_OBJ), $(dir $(obj))$(basename $(notdir $(obj))).d)
+
+#-----------------------------------------------------------------------------
+PLATFORM := $(strip $(shell echo `uname -m`))
+ifneq ($(MFLAGS),64)
+	  ifneq ($(MFLAGS),32)
+			ifeq ($(PLATFORM),x86_64)
+		MFLAGS := 64
+			else
+				MFLAGS := 32
+			endif
+	  endif
+endif
+
+ifeq ($(MFLAGS),64)
+	 ifneq ($(PLATFORM),x86_64)
+		   MFLAGS := 32
+	 endif
+endif
+
+ifeq ($(MFLAGS),32)
+	CFLAGS += -D_SYS_EPOLL_ $(CFLAGS_32)
+else
+	CFLAGS += $(CFLAGS_64)
+endif
+
+#-----------------------------------------------------------------------------
+ifneq ($(PLATFORM),x86_64)
+	MFLAGS  := 32
+	LIB     := $(LIB) $(LIB_32)
+else
+	DEP_FILE_32     := $(foreach obj, $(DEP_FILE),$(patsubst %.d,%.32.d, $(obj)))
+	DEP_FILE_64     := $(foreach obj, $(DEP_FILE),$(patsubst %.d,%.64.d, $(obj)))
+
+	LOCAL_OBJ_32    := $(foreach obj, $(LOCAL_OBJ),$(patsubst %.o,%.32.o,$(obj)))
+	LOCAL_OBJ_64    := $(foreach obj, $(LOCAL_OBJ),$(patsubst %.o,%.64.o,$(obj)))
+
+	LOCAL_MOCK_OBJ_32  += $(filter-out %Server.32.o, $(LOCAL_OBJ_32))  UnitTest.32.o
+	LOCAL_MOCK_OBJ_64  += $(filter-out %Server.64.o, $(LOCAL_OBJ_64))  UnitTest.64.o
+
+	CLEANFILE       := $(LOCAL_OBJ_32) $(LOCAL_OBJ_64)
+
+	ifeq ($(MFLAGS),64)
+		DEP_FILE    := $(DEP_FILE_64)
+		LOCAL_OBJ   := $(LOCAL_OBJ_64)
+		LIB         := $(LIB) $(LIB_64)
+		LOCAL_MOCK_OBJ   := $(LOCAL_MOCK_OBJ_64)
+	else
+		DEP_FILE    := $(DEP_FILE_32)
+		LOCAL_OBJ   := $(LOCAL_OBJ_32)
+		LIB         := $(LIB) $(LIB_32)
+		LOCAL_MOCK_OBJ   := $(LOCAL_MOCK_OBJ_32)
+	endif
+endif
+
+#-------------------------------------------------------------------------------
+all : $(LOCAL_OBJ) $(TARGET) $(TARGETS)
+
+$(filter %.a,$(TARGET)) : $(LOCAL_OBJ)
+	ar r $@ $(LOCAL_OBJ)
+
+$(filter %.so,$(TARGET)) : $(LOCAL_OBJ)
+	$(CXX) -m$(MFLAGS) $(CFLAGS) -shared -o $@ $^ $(INCLUDE) $(LIB)
+
+$(filter-out %.so %.a,$(TARGET)) : $(LOCAL_OBJ)
+	$(CXX) -m$(MFLAGS) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB)
+
+$(filter-out %.so %.a %.y,$(TARGETS)) : % : %.$(MFLAGS).o
+	$(CXX) -m$(MFLAGS) $(CFLAGS) -o $@ $^ $(INCLUDE) $(LIB)
+
+utest : $(LOCAL_MOCK_OBJ)
+	$(CXX) -m$(MFLAGS) $(CFLAGS) -o UnitTest $^ $(INCLUDE) $(LIB)
+
+#----------------------------------------------------------------------------------
+uclean:
+	rm -vf $(LOCAL_MOCK_OBJ) $(TARGET) $(TARGETS) ${CLEANFILE} *.d.tmp gmon.out UnitTest
+
+clean:
+	rm -vf $(LOCAL_OBJ) $(TARGET) $(TARGETS) ${CLEANFILE} *.d.tmp gmon.out UnitTest
+
+cleanall:
+	rm -vf $(LOCAL_OBJ) $(TARGET) $(TARGETS) $(DEP_FILE) ${CLEANFILE} *.o *.d.tmp *.d gmon.out UnitTest
+
+ifneq ($(DEP_FILE),)
+-include $(DEP_FILE)
+endif
+
+#-------------------------------------------------------------------------------
+%.32.o: %.cpp
+	$(CXX) -m32 $(CFLAGS) $(INCLUDE) -o $@ -c $<
+
+%.32.o: %.cc
+	$(CXX) -m32 $(CFLAGS) $(INCLUDE) -o $@ -c $<
+
+%.32.o: %.c
+	$(CC) -m32 $(CFLAGS) $(INCLUDE) -o $@ -c $<
+#-------------------------------------------------------------------------------
+%.64.o: %.cpp
+	$(CXX) -m64 $(CFLAGS) $(INCLUDE) -o $@ -c $<
+
+%.64.o: %.cc
+	$(CXX) -m64 $(CFLAGS) $(INCLUDE) -o $@ -c $<
+
+%.64.o: %.c
+	$(CC) -m64 $(CFLAGS) $(INCLUDE) -o $@ -c $<
+#----------------------------------------------------------------------------------
+%.o: %.cpp
+	$(CXX) -m$(MFLAGS) $(CFLAGS) $(INCLUDE) -o $@ -c $<
+
+%.o: %.cc
+	$(CXX) -m$(MFLAGS) $(CFLAGS) $(INCLUDE) -o $@ -c $<
+
+%.o: %.c
+	$(CC) -m$(MFLAGS) $(CFLAGS) $(INCLUDE) -o $@ -c $<
+#----------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/RocketMQClient.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/RocketMQClient.cpp b/rocketmq-client4cpp/src/RocketMQClient.cpp
new file mode 100755
index 0000000..ec377b3
--- /dev/null
+++ b/rocketmq-client4cpp/src/RocketMQClient.cpp
@@ -0,0 +1,186 @@
+#include "RocketMQClient.h"
+#include "AtomicValue.h"
+#include "FileUtil.h"
+
+volatile int RocketMQUtil::_logFd = -1;
+int RocketMQUtil::_logLevel = 0;
+std::string RocketMQUtil::_logPath = "";
+
+pid_t RocketMQUtil::getPid()
+{
+    static __thread pid_t pid = 0;
+    if (!pid)
+    {
+        pid = getpid();
+    }
+    return pid;
+}
+
+pid_t RocketMQUtil::getTid()
+{
+    static __thread pid_t pid = 0;
+    static __thread pid_t tid = 0;
+    if (!pid || !tid || pid != getpid())
+    {
+        pid = getpid();
+        tid = syscall(__NR_gettid);
+    }
+    return tid;
+}
+
+int RocketMQUtil::getDiffDays(time_t tmFirst, time_t tmSecond)
+{
+    static struct timeb g_tb;
+    static bool g_tbInit = false;
+
+    if(!g_tbInit)
+    {
+        ftime(&g_tb);
+        g_tbInit = true;
+    }
+
+    return (tmSecond - g_tb.timezone*60)/86400 - (tmFirst - g_tb.timezone*60)/86400;
+};
+
+
+std::string RocketMQUtil::tm2str(const time_t& t, const std::string& sFormat)
+{
+    struct tm stTm;
+    localtime_r(&t, &stTm);
+
+    char sTimeString[255] = "\0";
+    strftime(sTimeString, sizeof(sTimeString), sFormat.c_str(), &stTm);
+
+    return std::string(sTimeString);
+}
+
+std::string RocketMQUtil::now2str(const std::string& sFormat)
+{
+    time_t t = time(NULL);
+    return tm2str(t, sFormat.c_str());
+}
+
+std::string RocketMQUtil::now2str()
+{
+    return now2str("%Y-%m-%d %H:%M:%S");
+}
+
+int64_t RocketMQUtil::getNowMs()
+{
+    struct timeval tv;
+    gettimeofday(&tv, NULL);
+    return tv.tv_sec * (int64_t)1000 + tv.tv_usec / 1000;
+}
+
+
+std::string RocketMQUtil::str2fmt(const char* format, ...)
+{
+    int dataLen = 0;
+    va_list args;
+    char buffer[8092];
+    buffer[0] = buffer[sizeof(buffer) - 1] = '\0';
+
+    va_start( args, format );
+    dataLen = ::vsnprintf(buffer, sizeof(buffer), format, args);
+    va_end(args);
+
+    return std::string(buffer);
+};
+
+
+int RocketMQUtil::initLog(const std::string& sLogPath)
+{
+	if (sLogPath.empty())
+	{
+		return 0;
+	}
+
+	const char *pLogLevel = getenv("ROCKETMQ_LOGLEVEL");
+	if (pLogLevel != NULL)
+	{
+		int logLevel = atoi(pLogLevel);
+    	_logLevel = logLevel;
+		_logPath = sLogPath;
+	}
+	else
+	{
+		_logLevel = WARN_LOG;
+		_logPath = sLogPath;
+	}
+
+	std::string logDir = kpr::FileUtil::extractFilePath(_logPath);
+	if (!kpr::FileUtil::isFileExist(logDir, S_IFDIR))
+	{
+		kpr::FileUtil::makeDirRecursive(logDir);
+	}
+
+    return 0;
+}
+
+void RocketMQUtil::setLogLevel(int logLevel)
+{
+	_logLevel = logLevel;
+}
+
+
+void RocketMQUtil::writeLog(const char* fmt, ...)
+{
+    if (_logPath.empty())
+    {
+        return;
+    }
+
+    static volatile time_t last_time = 0;
+	static std::string last_time_str = "";
+	time_t old = last_time;
+	time_t now = time(NULL);
+
+	if (now - last_time >= 5)
+	{
+		if (__sync_bool_compare_and_swap(&last_time, old, now))
+		{
+			std::string time_str = tm2str(now, "%Y%m%d");
+	    	if (_logFd < 0 || time_str != last_time_str)
+	    	{
+	    		int oldFd = _logFd;
+				std::string logFullPath = _logPath + "." + time_str;
+	    		_logFd = open(logFullPath.c_str(), O_CREAT | O_RDWR | O_APPEND, 0666);
+	    		if (_logFd > 0)
+	    		{
+					last_time_str = time_str;
+				}
+
+				if (oldFd > 0)
+	    		{
+	    			close(oldFd);
+	    		}
+	    	}
+		}
+	}
+
+    char buf[1024*128];
+    buf[0] = buf[sizeof(buf) - 1] = '\0';
+
+    va_list ap;
+    va_start(ap, fmt);
+    int size = vsnprintf(buf, sizeof(buf), fmt, ap);
+    va_end(ap);
+
+	int logFd = _logFd;
+    if (logFd > 0 && (size > 0 && size < (int)sizeof(buf)))
+    {
+        int ret = write(logFd, buf, size);
+        if (ret < 0)
+        {
+        	if (errno == EBADF)
+        	{
+        		write(_logFd, buf, size);
+        	}
+        }
+    }
+
+    return;
+}
+
+
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/ConsumeStats.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/ConsumeStats.h b/rocketmq-client4cpp/src/common/ConsumeStats.h
new file mode 100755
index 0000000..34ea817
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/ConsumeStats.h
@@ -0,0 +1,95 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CONSUMESTATS_H__
+#define __CONSUMESTATS_H__
+
+#include <map>
+#include "MessageQueue.h"
+
+namespace rmq
+{
+    typedef struct
+    {
+        long long brokerOffset;
+        long long consumerOffset;
+        long long lastTimestamp;
+    } OffsetWrapper;
+
+    /**
+    * Consumer progress
+    *
+    * @author kangliqiang<kangliq@163.com>
+    */
+    class ConsumeStats
+    {
+    public:
+        ConsumeStats()
+            : m_consumeTps(0)
+        {
+        }
+
+        ~ConsumeStats()
+        {
+        }
+
+        long long computeTotalDiff()
+        {
+
+            long long diffTotal = 0L;
+
+            //Iterator<Entry<MessageQueue, OffsetWrapper>> it = m_offsetTable.entrySet().iterator();
+            //while (it.hasNext()) {
+            //  Entry<MessageQueue, OffsetWrapper> next = it.next();
+            //  long long diff = next.getValue().getBrokerOffset() - next.getValue().getConsumerOffset();
+            //  diffTotal += diff;
+            //}
+
+            return diffTotal;
+        }
+
+
+        std::map<MessageQueue*, OffsetWrapper> getOffsetTable()
+        {
+            return m_offsetTable;
+        }
+
+
+        void setOffsetTable(const std::map<MessageQueue*, OffsetWrapper> offsetTable)
+        {
+            m_offsetTable = offsetTable;
+        }
+
+
+        long long getConsumeTps()
+        {
+            return m_consumeTps;
+        }
+
+
+        void setConsumeTps(long long consumeTps)
+        {
+            m_consumeTps = consumeTps;
+        }
+
+    private:
+
+        std::map<MessageQueue*, OffsetWrapper> m_offsetTable;
+        long long m_consumeTps;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/FilterAPI.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/FilterAPI.h b/rocketmq-client4cpp/src/common/FilterAPI.h
new file mode 100755
index 0000000..3d87306
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/FilterAPI.h
@@ -0,0 +1,72 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __FILTERAPI_H__
+#define __FILTERAPI_H__
+
+#include <string>
+#include "SubscriptionData.h"
+#include "UtilAll.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+    class FilterAPI
+    {
+    public:
+        static SubscriptionData* buildSubscriptionData(const std::string topic, const std::string& subString)
+        {
+            SubscriptionData* subscriptionData = new SubscriptionData();
+            subscriptionData->setTopic(topic);
+            subscriptionData->setSubString(subString);
+
+            if (subString.empty() || subString == SubscriptionData::SUB_ALL)
+            {
+                subscriptionData->setSubString(SubscriptionData::SUB_ALL);
+            }
+            else
+            {
+                std::vector<std::string> out;
+
+                UtilAll::Split(out, subString, "||");
+
+                if (out.empty())
+                {
+                    THROW_MQEXCEPTION(MQClientException, "FilterAPI subString split error", -1);
+                }
+
+                for (size_t i = 0; i < out.size(); i++)
+                {
+                    std::string tag = out[i];
+                    if (!tag.empty())
+                    {
+                        std::string trimString = UtilAll::Trim(tag);
+
+                        if (!trimString.empty())
+                        {
+                            subscriptionData->getTagsSet().insert(trimString);
+                            subscriptionData->getCodeSet().insert(UtilAll::hashCode(trimString));
+                        }
+                    }
+                }
+            }
+
+            return subscriptionData;
+        }
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MQVersion.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/MQVersion.cpp b/rocketmq-client4cpp/src/common/MQVersion.cpp
new file mode 100755
index 0000000..1a03fa6
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/MQVersion.cpp
@@ -0,0 +1,88 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "MQVersion.h"
+
+namespace rmq
+{
+
+int MQVersion::s_CurrentVersion = MQVersion::V3_2_6;
+
+const char* MQVersion::getVersionDesc(int value)
+{
+    switch (value)
+    {
+        case V3_0_0_BETA9_SNAPSHOT:
+            return "V3_0_0_BETA9_SNAPSHOT";
+        case V3_0_0_BETA9:
+            return "V3_0_0_BETA9";
+        case V3_0_0_FINAL:
+            return "V3_0_0_FINAL";
+        case V3_0_1_SNAPSHOT:
+            return "V3_0_1_SNAPSHOT";
+        case V3_0_1:
+            return "V3_0_1";
+        case V3_0_2_SNAPSHOT:
+            return "V3_0_2_SNAPSHOT";
+        case V3_0_2:
+            return "V3_0_2";
+        case V3_0_3:
+            return "V3_0_3";
+        case V3_0_4_SNAPSHOT:
+            return "V3_0_4_SNAPSHOT";
+        case V3_0_4:
+            return "V3_0_4";
+        case V3_0_5_SNAPSHOT:
+            return "V3_0_5_SNAPSHOT";
+        case V3_0_5:
+            return "V3_0_5";
+        case V3_0_6_SNAPSHOT:
+            return "V3_0_6_SNAPSHOT";
+        case V3_0_6:
+            return "V3_0_6";
+        case V3_0_7_SNAPSHOT:
+            return "V3_0_7_SNAPSHOT";
+        case V3_0_7:
+            return "V3_0_7";
+
+		case V3_2_6_SNAPSHOT:
+            return "V3_2_6_SNAPSHOT";
+        case V3_2_6:
+            return "V3_2_6";
+		case V3_2_7_SNAPSHOT:
+            return "V3_2_7_SNAPSHOT";
+        case V3_2_7:
+            return "V3_2_7";
+		case V3_2_8_SNAPSHOT:
+            return "V3_2_8_SNAPSHOT";
+        case V3_2_8:
+            return "V3_2_8";
+		case V3_5_8_SNAPSHOT:
+            return "V3_5_8_SNAPSHOT";
+        case V3_5_8:
+            return "V3_5_8";
+    }
+
+    return "";
+}
+
+
+MQVersion::Version MQVersion::value2Version(int value)
+{
+    return (MQVersion::Version)value;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MQVersion.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/MQVersion.h b/rocketmq-client4cpp/src/common/MQVersion.h
new file mode 100755
index 0000000..d92957c
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/MQVersion.h
@@ -0,0 +1,184 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQVERSION_H__
+#define __MQVERSION_H__
+
+#include <string>
+
+namespace rmq
+{
+    /**
+    * Version Info
+    *
+    */
+    class MQVersion
+    {
+    public:
+        enum Version
+        {
+            V3_0_0_SNAPSHOT,
+            V3_0_0_ALPHA1,
+            V3_0_0_BETA1,
+            V3_0_0_BETA2,
+            V3_0_0_BETA3,
+            V3_0_0_BETA4,
+            V3_0_0_BETA5,
+            V3_0_0_BETA6_SNAPSHOT,
+            V3_0_0_BETA6,
+            V3_0_0_BETA7_SNAPSHOT,
+            V3_0_0_BETA7,
+            V3_0_0_BETA8_SNAPSHOT,
+            V3_0_0_BETA8,
+            V3_0_0_BETA9_SNAPSHOT,
+            V3_0_0_BETA9,
+            V3_0_0_FINAL,
+            V3_0_1_SNAPSHOT,
+            V3_0_1,
+            V3_0_2_SNAPSHOT,
+            V3_0_2,
+
+            V3_0_3_SNAPSHOT,
+            V3_0_3,
+            V3_0_4_SNAPSHOT,
+            V3_0_4,
+            V3_0_5_SNAPSHOT,
+            V3_0_5,
+            V3_0_6_SNAPSHOT,
+            V3_0_6,
+            V3_0_7_SNAPSHOT,
+            V3_0_7,
+            V3_0_8_SNAPSHOT,
+            V3_0_8,
+            V3_0_9_SNAPSHOT,
+            V3_0_9,
+            V3_0_10_SNAPSHOT,
+            V3_0_10,
+            V3_0_11_SNAPSHOT,
+            V3_0_11,
+            V3_0_12_SNAPSHOT,
+            V3_0_12,
+            V3_0_13_SNAPSHOT,
+            V3_0_13,
+            V3_0_14_SNAPSHOT,
+            V3_0_14,
+            V3_0_15_SNAPSHOT,
+            V3_0_15,
+            V3_1_0_SNAPSHOT,
+            V3_1_0,
+            V3_1_1_SNAPSHOT,
+            V3_1_1,
+            V3_1_2_SNAPSHOT,
+            V3_1_2,
+            V3_1_3_SNAPSHOT,
+            V3_1_3,
+            V3_1_4_SNAPSHOT,
+            V3_1_4,
+            V3_1_5_SNAPSHOT,
+            V3_1_5,
+            V3_1_6_SNAPSHOT,
+            V3_1_6,
+            V3_1_7_SNAPSHOT,
+            V3_1_7,
+            V3_1_8_SNAPSHOT,
+            V3_1_8,
+            V3_1_9_SNAPSHOT,
+            V3_1_9,
+            V3_2_0_SNAPSHOT,
+            V3_2_0,
+            V3_2_1_SNAPSHOT,
+            V3_2_1,
+            V3_2_2_SNAPSHOT,
+            V3_2_2,
+            V3_2_3_SNAPSHOT,
+            V3_2_3,
+            V3_2_4_SNAPSHOT,
+            V3_2_4,
+            V3_2_5_SNAPSHOT,
+            V3_2_5,
+            V3_2_6_SNAPSHOT,
+            V3_2_6,
+            V3_2_7_SNAPSHOT,
+            V3_2_7,
+            V3_2_8_SNAPSHOT,
+            V3_2_8,
+            V3_2_9_SNAPSHOT,
+            V3_2_9,
+            V3_3_1_SNAPSHOT,
+            V3_3_1,
+            V3_3_2_SNAPSHOT,
+            V3_3_2,
+            V3_3_3_SNAPSHOT,
+            V3_3_3,
+            V3_3_4_SNAPSHOT,
+            V3_3_4,
+            V3_3_5_SNAPSHOT,
+            V3_3_5,
+            V3_3_6_SNAPSHOT,
+            V3_3_6,
+            V3_3_7_SNAPSHOT,
+            V3_3_7,
+            V3_3_8_SNAPSHOT,
+            V3_3_8,
+            V3_3_9_SNAPSHOT,
+            V3_3_9,
+            V3_4_1_SNAPSHOT,
+            V3_4_1,
+            V3_4_2_SNAPSHOT,
+            V3_4_2,
+            V3_4_3_SNAPSHOT,
+            V3_4_3,
+            V3_4_4_SNAPSHOT,
+            V3_4_4,
+            V3_4_5_SNAPSHOT,
+            V3_4_5,
+            V3_4_6_SNAPSHOT,
+            V3_4_6,
+            V3_4_7_SNAPSHOT,
+            V3_4_7,
+            V3_4_8_SNAPSHOT,
+            V3_4_8,
+            V3_4_9_SNAPSHOT,
+            V3_4_9,
+            V3_5_1_SNAPSHOT,
+            V3_5_1,
+            V3_5_2_SNAPSHOT,
+            V3_5_2,
+            V3_5_3_SNAPSHOT,
+            V3_5_3,
+            V3_5_4_SNAPSHOT,
+            V3_5_4,
+            V3_5_5_SNAPSHOT,
+            V3_5_5,
+            V3_5_6_SNAPSHOT,
+            V3_5_6,
+            V3_5_7_SNAPSHOT,
+            V3_5_7,
+            V3_5_8_SNAPSHOT,
+            V3_5_8,
+            V3_5_9_SNAPSHOT,
+            V3_5_9,
+        };
+
+        static const char* getVersionDesc(int value);
+        static Version value2Version(int value);
+
+    public:
+        static int s_CurrentVersion;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MessageSysFlag.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/MessageSysFlag.cpp b/rocketmq-client4cpp/src/common/MessageSysFlag.cpp
new file mode 100755
index 0000000..329b71b
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/MessageSysFlag.cpp
@@ -0,0 +1,47 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "MessageSysFlag.h"
+
+namespace rmq
+{
+
+/**
+* SysFlag
+*/
+int MessageSysFlag::CompressedFlag = (0x1 << 0);
+int MessageSysFlag::MultiTagsFlag = (0x1 << 1);
+
+/**
+* 7 6 5 4 3 2 1 0<br>
+* SysFlag for transaction
+*/
+int MessageSysFlag::TransactionNotType = (0x0 << 2);
+int MessageSysFlag::TransactionPreparedType = (0x1 << 2);
+int MessageSysFlag::TransactionCommitType = (0x2 << 2);
+int MessageSysFlag::TransactionRollbackType = (0x3 << 2);
+
+int MessageSysFlag::getTransactionValue(int flag)
+{
+    return flag & TransactionRollbackType;
+}
+
+int MessageSysFlag::resetTransactionValue(int flag, int type)
+{
+    return (flag & (~TransactionRollbackType)) | type;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MessageSysFlag.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/MessageSysFlag.h b/rocketmq-client4cpp/src/common/MessageSysFlag.h
new file mode 100755
index 0000000..3950564
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/MessageSysFlag.h
@@ -0,0 +1,46 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MESSAGESYSFLAG_H__
+#define __MESSAGESYSFLAG_H__
+
+namespace rmq
+{
+    class MessageSysFlag
+    {
+    public:
+        static int getTransactionValue(int flag);
+        static int resetTransactionValue(int flag, int type);
+
+    public:
+        /**
+        * SysFlag
+        */
+        static int CompressedFlag;
+        static int MultiTagsFlag;
+
+        /**
+        * 7 6 5 4 3 2 1 0<br>
+        * SysFlag for transaction
+        */
+        static int TransactionNotType;
+        static int TransactionPreparedType;
+        static int TransactionCommitType;
+        static int TransactionRollbackType;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MixAll.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/MixAll.cpp b/rocketmq-client4cpp/src/common/MixAll.cpp
new file mode 100755
index 0000000..417a5e5
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/MixAll.cpp
@@ -0,0 +1,88 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MixAll.h"
+#include "FileUtil.h"
+
+namespace rmq
+{
+
+const std::string MixAll::DEFAULT_TOPIC = "TBW102";
+const std::string MixAll::BENCHMARK_TOPIC = "BenchmarkTest";
+const std::string MixAll::DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
+const std::string MixAll::DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
+const std::string MixAll::TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
+const std::string MixAll::CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
+const std::string MixAll::SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
+const std::string MixAll::RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
+const std::string MixAll::DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
+const std::string MixAll::NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
+const std::string MixAll::ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
+const std::string MixAll::ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
+const std::string MixAll::MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel";
+const std::string MixAll::ROCKETMQ_NAMESRV_DOMAIN = "172.30.30.125";
+
+std::string MixAll::getRetryTopic(const std::string& consumerGroup)
+{
+    return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
+}
+
+bool MixAll::compareAndIncreaseOnly(kpr::AtomicLong& target, long long value)
+{
+    long long current = target.get();
+    while (value > current)
+    {
+        long long tmp = target.getAndSet(current, value);
+
+        if (tmp == current)
+        {
+            return true;
+        }
+
+        current = target.get();
+    }
+
+    return false;
+}
+
+
+std::string MixAll::file2String(const std::string& fileName)
+{
+    return kpr::FileUtil::load2str(fileName);
+}
+
+void MixAll::string2File(const std::string& fileName, const std::string& fileData)
+{
+    // write tmp file
+    std::string tmpFile = fileName + ".tmp";
+    kpr::FileUtil::save2file(tmpFile, fileData);
+
+    // backup old file
+    std::string bakFile = fileName + ".bak";
+    std::string oldFileData = kpr::FileUtil::load2str(fileName);
+    if (!oldFileData.empty())
+    {
+        kpr::FileUtil::save2file(bakFile, oldFileData);
+    }
+
+    // delete old file
+    std::remove(fileName.c_str());
+
+    // rename file
+    std::rename(tmpFile.c_str(), fileName.c_str());
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/MixAll.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/MixAll.h b/rocketmq-client4cpp/src/common/MixAll.h
new file mode 100755
index 0000000..797a0ae
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/MixAll.h
@@ -0,0 +1,62 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MIXALL_H__
+#define __MIXALL_H__
+
+#include <string>
+#include <vector>
+#include <iostream>
+#include <fstream>
+#include <unistd.h>
+#include <sys/stat.h>
+#include <sys/types.h>
+#include <dirent.h>
+#include <fnmatch.h>
+#include <arpa/inet.h>
+#include <ifaddrs.h>
+
+#include "AtomicValue.h"
+
+namespace rmq
+{
+    class MixAll
+    {
+    public:
+        static const long MASTER_ID = 0L;
+        static const std::string DEFAULT_TOPIC;
+        static const std::string BENCHMARK_TOPIC;
+        static const std::string DEFAULT_PRODUCER_GROUP;
+        static const std::string DEFAULT_CONSUMER_GROUP;
+        static const std::string TOOLS_CONSUMER_GROUP;
+        static const std::string CLIENT_INNER_PRODUCER_GROUP;
+        static const std::string SELF_TEST_TOPIC;
+        static const std::string RETRY_GROUP_TOPIC_PREFIX;
+        static const std::string DLQ_GROUP_TOPIC_PREFIX;
+        static const std::string NAMESRV_ADDR_ENV;
+        static const std::string ROCKETMQ_HOME_ENV;
+        static const std::string  ROCKETMQ_HOME_PROPERTY;
+        static const std::string MESSAGE_COMPRESS_LEVEL;
+        static const std::string ROCKETMQ_NAMESRV_DOMAIN;
+
+        static std::string getRetryTopic(const std::string& consumerGroup);
+        static bool compareAndIncreaseOnly(kpr::AtomicLong& target, long long value);
+        static std::string file2String(const std::string& fileName);
+        static void string2File(const std::string& fileName, const std::string& fileData);
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/NamesrvConfig.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/NamesrvConfig.h b/rocketmq-client4cpp/src/common/NamesrvConfig.h
new file mode 100755
index 0000000..608a4d9
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/NamesrvConfig.h
@@ -0,0 +1,72 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __NAMESRVCONFIG_H__
+#define __NAMESRVCONFIG_H__
+
+#include <stdlib.h>
+#include <string>
+
+namespace rmq
+{
+/**
+ * Name server Config
+ *
+ */
+class NamesrvConfig
+{
+public:
+    NamesrvConfig()
+    {
+        m_kvConfigPath = "";
+
+        char* home = getenv(MixAll::ROCKETMQ_HOME_ENV.c_str());
+        if (home)
+        {
+            m_rocketmqHome = home;
+        }
+        else
+        {
+            m_rocketmqHome = "";
+        }
+    }
+
+    const std::string& getRocketmqHome()
+    {
+        return m_rocketmqHome;
+    }
+
+    void setRocketmqHome(const std::string& rocketmqHome)
+    {
+        m_rocketmqHome = rocketmqHome;
+    }
+
+    const std::string& getKvConfigPath()
+    {
+        return m_kvConfigPath;
+    }
+
+    void setKvConfigPath(const std::string& kvConfigPath)
+    {
+        m_kvConfigPath = kvConfigPath;
+    }
+
+private:
+    std::string m_rocketmqHome;
+    std::string m_kvConfigPath;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/common/NamesrvUtil.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/common/NamesrvUtil.h b/rocketmq-client4cpp/src/common/NamesrvUtil.h
new file mode 100755
index 0000000..4f3639c
--- /dev/null
+++ b/rocketmq-client4cpp/src/common/NamesrvUtil.h
@@ -0,0 +1,29 @@
+/**
+* Copyright (C) 2013 kangliqiang, kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __NAMESRVUTIL_H__
+#define __NAMESRVUTIL_H__
+
+namespace rmq
+{
+	namespace NamesrvUtil
+	{
+		const char* NAMESPACE_ORDER_TOPIC_CONFIG = "ORDER_TOPIC_CONFIG";
+		const char*  NAMESPACE_PROJECT_CONFIG = "PROJECT_CONFIG";
+	};
+}
+
+#endif



Mime
View raw message