rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [10/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:50 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullRequest.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.cpp b/rocketmq-client4cpp/src/consumer/PullRequest.cpp
new file mode 100755
index 0000000..b8650c6
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullRequest.cpp
@@ -0,0 +1,108 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "PullRequest.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+PullRequest::~PullRequest()
+{
+
+}
+
+std::string PullRequest::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+void PullRequest::setConsumerGroup(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+}
+
+MessageQueue& PullRequest::getMessageQueue()
+{
+    return m_messageQueue;
+}
+
+void PullRequest::setMessageQueue(const MessageQueue& messageQueue)
+{
+    m_messageQueue = messageQueue;
+}
+
+long long PullRequest::getNextOffset()
+{
+    return m_nextOffset;
+}
+
+void PullRequest::setNextOffset(long long nextOffset)
+{
+    m_nextOffset = nextOffset;
+}
+
+int PullRequest::hashCode()
+{
+    /*
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
+    result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
+    return result;
+    */
+    std::stringstream ss;
+    ss  << m_consumerGroup
+        << m_messageQueue.hashCode();
+    return UtilAll::hashCode(ss.str());
+}
+
+std::string PullRequest::toString() const
+{
+    std::stringstream ss;
+    ss << "{consumerGroup=" << m_consumerGroup
+       << ",messageQueue=" << m_messageQueue.toString()
+       << ",nextOffset=" << m_nextOffset << "}";
+    return ss.str();
+}
+
+
+bool PullRequest::operator==(const PullRequest& other)
+{
+    if (m_consumerGroup != other.m_consumerGroup)
+    {
+        return false;
+    }
+
+    if (!(m_messageQueue == other.m_messageQueue))
+    {
+        return false;
+    }
+
+    return true;
+}
+
+ProcessQueue* PullRequest::getProcessQueue()
+{
+    return m_pProcessQueue;
+}
+
+void PullRequest::setProcessQueue(ProcessQueue* pProcessQueue)
+{
+    m_pProcessQueue = pProcessQueue;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullRequest.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.h b/rocketmq-client4cpp/src/consumer/PullRequest.h
new file mode 100755
index 0000000..3fb8367
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullRequest.h
@@ -0,0 +1,59 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PULLREQUEST_H__
+#define __PULLREQUEST_H__
+
+#include <string>
+#include <sstream>
+
+#include "MessageQueue.h"
+#include "ProcessQueue.h"
+
+namespace rmq
+{
+    class PullRequest
+    {
+    public:
+        virtual ~PullRequest();
+
+        std::string getConsumerGroup();
+        void setConsumerGroup(const std::string& consumerGroup);
+
+        MessageQueue& getMessageQueue();
+        void setMessageQueue(const MessageQueue& messageQueue);
+
+        long long getNextOffset();
+        void setNextOffset(long long nextOffset);
+
+        int hashCode();
+        std::string toString() const;
+
+        bool operator==(const PullRequest& other);
+
+        ProcessQueue* getProcessQueue();
+        void setProcessQueue(ProcessQueue* pProcessQueue);
+
+    private:
+        std::string m_consumerGroup;
+        MessageQueue m_messageQueue;
+
+        ProcessQueue* m_pProcessQueue;
+        long long m_nextOffset;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullResultExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullResultExt.h b/rocketmq-client4cpp/src/consumer/PullResultExt.h
new file mode 100755
index 0000000..24235b2
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullResultExt.h
@@ -0,0 +1,53 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PULLRESULTEXT_H__
+#define __PULLRESULTEXT_H__
+
+#include "PullResult.h"
+
+namespace rmq
+{
+
+  struct PullResultExt : public PullResult
+  {
+      PullResultExt(PullStatus pullStatus,
+                    long long nextBeginOffset,
+                    long long minOffset,
+                    long long maxOffset,
+                    std::list<MessageExt*>& msgFoundList,
+                    long suggestWhichBrokerId,
+                    const char* messageBinary,
+                    int messageBinaryLen)
+          : PullResult(pullStatus,
+                       nextBeginOffset,
+                       minOffset,
+                       maxOffset,
+                       msgFoundList),
+          suggestWhichBrokerId(suggestWhichBrokerId),
+          messageBinary(messageBinary),
+          messageBinaryLen(messageBinaryLen)
+      {
+
+      }
+
+      long suggestWhichBrokerId;
+      const char* messageBinary;
+      int messageBinaryLen;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
new file mode 100755
index 0000000..efdc1cc
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
@@ -0,0 +1,613 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalanceImpl.h"
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientFactory.h"
+#include "MixAll.h"
+#include "LockBatchBody.h"
+#include "MQClientAPIImpl.h"
+#include "KPRUtil.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+RebalanceImpl::RebalanceImpl(const std::string& consumerGroup,
+                             MessageModel messageModel,
+                             AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+                             MQClientFactory* pMQClientFactory)
+    : m_consumerGroup(consumerGroup),
+      m_messageModel(messageModel),
+      m_pAllocateMessageQueueStrategy(pAllocateMessageQueueStrategy),
+      m_pMQClientFactory(pMQClientFactory)
+{
+
+}
+
+RebalanceImpl::~RebalanceImpl()
+{
+}
+
+void RebalanceImpl::unlock(MessageQueue& mq, bool oneway)
+{
+    FindBrokerResult findBrokerResult =
+        m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
+    if (!findBrokerResult.brokerAddr.empty())
+    {
+        UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
+        requestBody->setConsumerGroup(m_consumerGroup);
+        requestBody->setClientId(m_pMQClientFactory->getClientId());
+        requestBody->getMqSet().insert(mq);
+
+        try
+        {
+            m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
+                    requestBody, 1000, oneway);
+        }
+        catch (...)
+        {
+            RMQ_ERROR("unlockBatchMQ exception, MQ: {%s}" , mq.toString().c_str());
+        }
+    }
+}
+
+void RebalanceImpl::unlockAll(bool oneway)
+{
+    std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
+    std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
+
+    for (; it != brokerMqs.end(); it++)
+    {
+        std::string brokerName = it->first;
+        std::set<MessageQueue> mqs = it->second;
+
+        if (mqs.empty())
+        {
+            continue;
+        }
+
+        FindBrokerResult findBrokerResult =
+            m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
+
+        if (!findBrokerResult.brokerAddr.empty())
+        {
+            UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
+            requestBody->setConsumerGroup(m_consumerGroup);
+            requestBody->setClientId(m_pMQClientFactory->getClientId());
+            requestBody->setMqSet(mqs);
+
+            try
+            {
+                m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
+                        requestBody, 1000, oneway);
+
+                kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+                std::set<MessageQueue>::iterator itm = mqs.begin();
+                for (; itm != mqs.end(); itm++)
+                {
+                    std::map<MessageQueue, ProcessQueue*>::iterator itp = m_processQueueTable.find(*itm);
+                    if (itp != m_processQueueTable.end())
+                    {
+                        itp->second->setLocked(false);
+                        RMQ_INFO("the message queue unlock OK, Group: {%s}, MQ: {%s}",
+                                 m_consumerGroup.c_str(), (*itm).toString().c_str());
+                    }
+                }
+            }
+            catch (...)
+            {
+                RMQ_ERROR("unlockBatchMQ exception, mqs.size: {%u} ", (unsigned)mqs.size());
+            }
+        }
+    }
+}
+
+bool RebalanceImpl::lock(MessageQueue& mq)
+{
+    FindBrokerResult findBrokerResult =
+        m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
+    if (!findBrokerResult.brokerAddr.empty())
+    {
+        LockBatchRequestBody* requestBody = new LockBatchRequestBody();
+        requestBody->setConsumerGroup(m_consumerGroup);
+        requestBody->setClientId(m_pMQClientFactory->getClientId());
+        requestBody->getMqSet().insert(mq);
+
+        try
+        {
+            std::set<MessageQueue> lockedMq =
+                m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+                    findBrokerResult.brokerAddr, requestBody, 1000);
+
+            std::set<MessageQueue>::iterator it = lockedMq.begin();
+            for (; it != lockedMq.end(); it++)
+            {
+                kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+                MessageQueue mmqq = *it;
+                std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mmqq);
+                if (itt != m_processQueueTable.end())
+                {
+                    itt->second->setLocked(true);
+                    itt->second->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
+                }
+            }
+
+            it = lockedMq.find(mq);
+            bool lockOK = (it != lockedMq.end());
+
+            RMQ_INFO("the message queue lock {%s}, {%s}, {%s}",//
+                     (lockOK ? "OK" : "Failed"), //
+                     m_consumerGroup.c_str(), //
+                     mq.toString().c_str());
+            return lockOK;
+        }
+        catch (...)
+        {
+            RMQ_ERROR("lockBatchMQ exception, MQ: {%s}", mq.toString().c_str());
+        }
+    }
+
+    return false;
+}
+
+void RebalanceImpl::lockAll()
+{
+    std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
+
+    std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
+    for (; it != brokerMqs.end(); it++)
+    {
+        std::string brokerName = it->first;
+        std::set<MessageQueue> mqs = it->second;
+
+        if (mqs.empty())
+        {
+            continue;
+        }
+
+        FindBrokerResult findBrokerResult =
+            m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
+        if (!findBrokerResult.brokerAddr.empty())
+        {
+            LockBatchRequestBody* requestBody = new LockBatchRequestBody();
+            requestBody->setConsumerGroup(m_consumerGroup);
+            requestBody->setClientId(m_pMQClientFactory->getClientId());
+            requestBody->setMqSet(mqs);
+
+            try
+            {
+                std::set<MessageQueue> lockOKMQSet =
+                    m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+                        findBrokerResult.brokerAddr, requestBody, 1000);
+
+                std::set<MessageQueue>::iterator its = lockOKMQSet.begin();
+                for (; its != lockOKMQSet.end(); its++)
+                {
+                    kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+                    MessageQueue mq = *its;
+                    std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
+                    if (itt != m_processQueueTable.end())
+                    {
+                        ProcessQueue* processQueue = itt->second;
+                        if (!processQueue->isLocked())
+                        {
+                            RMQ_INFO("the message queue locked OK, Group: {%s}, MQ: %s",
+                            	m_consumerGroup.c_str(),
+                            	mq.toString().c_str());
+                        }
+
+                        processQueue->setLocked(true);
+                        processQueue->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
+                    }
+                }
+
+                its = mqs.begin();
+                for (; its != mqs.end(); its++)
+                {
+                    MessageQueue mq = *its;
+                    std::set<MessageQueue>::iterator itf = lockOKMQSet.find(mq);
+                    if (itf == lockOKMQSet.end())
+                    {
+                        kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+                        std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
+                        if (itt != m_processQueueTable.end())
+                        {
+                            itt->second->setLocked(false);
+                            RMQ_WARN("the message queue locked Failed, Group: {%s}, MQ: %s",
+                            	m_consumerGroup.c_str(),
+                            	mq.toString().c_str());
+                        }
+                    }
+                }
+            }
+            catch (std::exception& e)
+            {
+                RMQ_ERROR("lockBatchMQ exception: %s", e.what());
+            }
+        }
+    }
+}
+
+void RebalanceImpl::doRebalance()
+{
+    std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
+    std::map<std::string, SubscriptionData>::iterator it = subTable.begin();
+    for (; it != subTable.end(); it++)
+    {
+        std::string topic = it->first;
+        try
+        {
+            rebalanceByTopic(topic);
+        }
+        catch (std::exception& e)
+        {
+            if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
+            {
+                RMQ_WARN("rebalanceByTopic Exception: %s", e.what());
+            }
+        }
+    }
+
+    truncateMessageQueueNotMyTopic();
+}
+
+std::map<std::string, SubscriptionData>& RebalanceImpl::getSubscriptionInner()
+{
+    return m_subscriptionInner;
+}
+
+std::map<MessageQueue, ProcessQueue*>& RebalanceImpl::getProcessQueueTable()
+{
+    return m_processQueueTable;
+}
+
+
+kpr::RWMutex& RebalanceImpl::getProcessQueueTableLock()
+{
+	return m_processQueueTableLock;
+}
+
+
+std::map<std::string, std::set<MessageQueue> >& RebalanceImpl::getTopicSubscribeInfoTable()
+{
+    return m_topicSubscribeInfoTable;
+}
+
+std::string& RebalanceImpl::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+void RebalanceImpl::setConsumerGroup(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+}
+
+MessageModel RebalanceImpl::getMessageModel()
+{
+    return m_messageModel;
+}
+
+void RebalanceImpl::setMessageModel(MessageModel messageModel)
+{
+    m_messageModel = messageModel;
+}
+
+AllocateMessageQueueStrategy* RebalanceImpl::getAllocateMessageQueueStrategy()
+{
+    return m_pAllocateMessageQueueStrategy;
+}
+
+void RebalanceImpl::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
+{
+    m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
+}
+
+MQClientFactory* RebalanceImpl::getmQClientFactory()
+{
+    return m_pMQClientFactory;
+}
+
+void RebalanceImpl::setmQClientFactory(MQClientFactory* pMQClientFactory)
+{
+    m_pMQClientFactory = pMQClientFactory;
+}
+
+std::map<std::string, std::set<MessageQueue> > RebalanceImpl::buildProcessQueueTableByBrokerName()
+{
+	std::map<std::string, std::set<MessageQueue> > result ;
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+    std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
+    for (; it != m_processQueueTable.end();)
+    {
+        MessageQueue mq = it->first;
+        std::map<std::string, std::set<MessageQueue> >::iterator itm = result.find(mq.getBrokerName());
+        if (itm == result.end())
+        {
+            std::set<MessageQueue> mqs ;
+            mqs.insert(mq);
+            result[mq.getBrokerName()] = mqs;
+        }
+        else
+        {
+            itm->second.insert(mq);
+        }
+    }
+
+    return result;
+}
+
+void RebalanceImpl::rebalanceByTopic(const std::string& topic)
+{
+	RMQ_DEBUG("rebalanceByTopic begin, topic={%s}", topic.c_str());
+    switch (m_messageModel)
+    {
+        case BROADCASTING:
+        {
+            //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
+            std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
+            if (it != m_topicSubscribeInfoTable.end())
+            {
+                std::set<MessageQueue> mqSet = it->second;
+                bool changed = updateProcessQueueTableInRebalance(topic, mqSet);
+                if (changed)
+                {
+                    messageQueueChanged(topic, mqSet, mqSet);
+                    RMQ_INFO("messageQueueChanged {%s} {%s} {%s} {%s}",
+                             m_consumerGroup.c_str(),
+                             topic.c_str(),
+                             UtilAll::toString(mqSet).c_str(),
+                             UtilAll::toString(mqSet).c_str());
+                }
+            }
+            else
+            {
+                RMQ_WARN("doRebalance, {%s}, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
+            }
+            break;
+        }
+        case CLUSTERING:
+        {
+            //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
+            std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
+            if (it == m_topicSubscribeInfoTable.end())
+            {
+                if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
+                {
+                    RMQ_WARN("doRebalance, %s, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
+                }
+            }
+
+            std::list<std::string> cidAll = m_pMQClientFactory->findConsumerIdList(topic, m_consumerGroup);
+            if (cidAll.empty())
+            {
+                RMQ_WARN("doRebalance, %s:%s, get consumer id list failed.", m_consumerGroup.c_str(), topic.c_str());
+            }
+
+            if (it != m_topicSubscribeInfoTable.end() && !cidAll.empty())
+            {
+                std::vector<MessageQueue> mqAll;
+                std::set<MessageQueue> mqSet = it->second;
+                std::set<MessageQueue>::iterator its = mqSet.begin();
+
+                for (; its != mqSet.end(); its++)
+                {
+                    mqAll.push_back(*its);
+                }
+
+                cidAll.sort();
+
+                AllocateMessageQueueStrategy* strategy = m_pAllocateMessageQueueStrategy;
+
+                std::vector<MessageQueue>* allocateResult = NULL;
+                try
+                {
+                    allocateResult = strategy->allocate(m_consumerGroup,
+                    	m_pMQClientFactory->getClientId(), mqAll, cidAll);
+                }
+                catch (std::exception& e)
+                {
+                    RMQ_ERROR("AllocateMessageQueueStrategy.allocate Exception, allocateMessageQueueStrategyName={%s}, mqAll={%s}, cidAll={%s}, %s",
+                    	strategy->getName().c_str(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str(), e.what());
+                    return;
+                }
+
+                std::set<MessageQueue> allocateResultSet;
+                if (allocateResult != NULL)
+                {
+                    for (size_t i = 0; i < allocateResult->size(); i++)
+                    {
+                        allocateResultSet.insert(allocateResult->at(i));
+                    }
+
+                    delete allocateResult;
+                }
+
+                bool changed = updateProcessQueueTableInRebalance(topic, allocateResultSet);
+                if (changed)
+                {
+                    RMQ_INFO("rebalanced result changed. allocateMessageQueueStrategyName={%s}, group={%s}, topic={%s}, ConsumerId={%s}, "
+                             "rebalanceSize={%u}, rebalanceMqSet={%s}, mqAllSize={%u}, cidAllSize={%u}, mqAll={%s}, cidAll={%s}",
+                             strategy->getName().c_str(), m_consumerGroup.c_str(), topic.c_str(), m_pMQClientFactory->getClientId().c_str(),
+                             (unsigned)allocateResultSet.size(), UtilAll::toString(allocateResultSet).c_str(),
+                             (unsigned)mqAll.size(), (unsigned)cidAll.size(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str()
+                            );
+
+                    messageQueueChanged(topic, mqSet, allocateResultSet);
+                }
+            }
+        }
+        break;
+        default:
+            break;
+    }
+    RMQ_DEBUG("rebalanceByTopic end");
+}
+
+
+void RebalanceImpl::removeProcessQueue(const MessageQueue& mq)
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+	std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.find(mq);
+	if (it != m_processQueueTable.end())
+	{
+		MessageQueue mq = it->first;
+        ProcessQueue* pq = it->second;
+        bool isDroped = pq->isDropped();
+
+        this->removeUnnecessaryMessageQueue(mq, *pq);
+        RMQ_INFO("Fix Offset, {%s}, remove unnecessary mq, {%s} Droped: {%d}",
+        	m_consumerGroup.c_str(), mq.toString().c_str(), isDroped);
+	}
+}
+
+
+bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet)
+{
+	RMQ_DEBUG("updateProcessQueueTableInRebalance begin, topic={%s}", topic.c_str());
+    bool changed = false;
+
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
+        std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
+        for (; it != m_processQueueTable.end();)
+        {
+        	std::map<MessageQueue, ProcessQueue*>::iterator itCur = it++;
+            MessageQueue mq = itCur->first;
+            ProcessQueue* pq = itCur->second;
+            if (mq.getTopic() == topic)
+            {
+                std::set<MessageQueue>::iterator itMq = mqSet.find(mq);
+                if (itMq == mqSet.end())
+                {
+                    pq->setDropped(true);
+        			if (this->removeUnnecessaryMessageQueue(mq, *pq))
+        			{
+        				changed = true;
+        				m_processQueueTable.erase(itCur);
+
+        				RMQ_WARN("doRebalance, {%s}, remove unnecessary mq, {%s}",
+        					m_consumerGroup.c_str(), mq.toString().c_str());
+        			}
+                }
+                else if (pq->isPullExpired())
+                {
+                	switch(this->consumeType())
+                	{
+                		case CONSUME_ACTIVELY:
+                            break;
+                        case CONSUME_PASSIVELY:
+                        	pq->setDropped(true);
+        					if (this->removeUnnecessaryMessageQueue(mq, *pq))
+        					{
+        						changed = true;
+                            	m_processQueueTable.erase(itCur);
+
+                            	RMQ_ERROR("[BUG]doRebalance, {%s}, remove unnecessary mq, {%s}, because pull is pause, so try to fixed it",
+                                    m_consumerGroup.c_str(), mq.toString().c_str());
+        					}
+                            break;
+                        default:
+                            break;
+                	}
+                }
+            }
+        }
+    }
+
+    std::list<PullRequest*> pullRequestList;
+    std::set<MessageQueue>::iterator its = mqSet.begin();
+    for (; its != mqSet.end(); its++)
+    {
+        MessageQueue mq = *its;
+        bool find = false;
+        {
+            kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+            std::map<MessageQueue, ProcessQueue*>::iterator itm = m_processQueueTable.find(mq);
+            if (itm != m_processQueueTable.end())
+            {
+                find = true;
+            }
+        }
+
+        if (!find)
+        {
+        	//todo: memleak
+            PullRequest* pullRequest = new PullRequest();
+            pullRequest->setConsumerGroup(m_consumerGroup);
+            pullRequest->setMessageQueue(mq);
+            pullRequest->setProcessQueue(new ProcessQueue());//todo: memleak
+
+            long long nextOffset = computePullFromWhere(mq);
+            if (nextOffset >= 0)
+            {
+                pullRequest->setNextOffset(nextOffset);
+                pullRequestList.push_back(pullRequest);
+                changed = true;
+
+				{
+	                kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
+	                m_processQueueTable[mq] = pullRequest->getProcessQueue();
+	                RMQ_INFO("doRebalance, {%s}, add a new mq, {%s}, pullRequst: %s",
+	                	m_consumerGroup.c_str(), mq.toString().c_str(), pullRequest->toString().c_str());
+                }
+            }
+            else
+            {
+                RMQ_WARN("doRebalance, {%s}, add new mq failed, {%s}",
+                	m_consumerGroup.c_str(), mq.toString().c_str());
+            }
+        }
+    }
+
+    //todo memleak
+    dispatchPullRequest(pullRequestList);
+    RMQ_DEBUG("updateProcessQueueTableInRebalance end");
+
+    return changed;
+}
+
+void RebalanceImpl::truncateMessageQueueNotMyTopic()
+{
+    std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
+
+    kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
+    std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
+    for (; it != m_processQueueTable.end();)
+    {
+        MessageQueue mq = it->first;
+        std::map<std::string, SubscriptionData>::iterator itt = subTable.find(mq.getTopic());
+
+        if (itt == subTable.end())
+        {
+            ProcessQueue* pq = it->second;
+            if (pq != NULL)
+            {
+                pq->setDropped(true);
+                RMQ_WARN("doRebalance, {%s}, truncateMessageQueueNotMyTopic remove unnecessary mq, {%s}",
+                         m_consumerGroup.c_str(), mq.toString().c_str());
+            }
+            m_processQueueTable.erase(it++);
+        }
+        else
+        {
+            it++;
+        }
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
new file mode 100755
index 0000000..577a031
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
@@ -0,0 +1,102 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REBALANCEIMPL_H__
+#define __REBALANCEIMPL_H__
+
+#include <map>
+#include <string>
+#include <set>
+#include <list>
+
+#include "ConsumeType.h"
+#include "MessageQueue.h"
+#include "ProcessQueue.h"
+#include "PullRequest.h"
+#include "SubscriptionData.h"
+
+namespace rmq
+{
+    class AllocateMessageQueueStrategy;
+    class MQClientFactory;
+
+    class RebalanceImpl
+    {
+    public:
+        RebalanceImpl(const std::string& consumerGroup,
+                      MessageModel messageModel,
+                      AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+                      MQClientFactory* pMQClientFactory);
+        virtual ~RebalanceImpl();
+
+        virtual void messageQueueChanged(const std::string& topic,
+                                         std::set<MessageQueue>& mqAll,
+                                         std::set<MessageQueue>& mqDivided) = 0;
+        virtual bool removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) = 0;
+        virtual void dispatchPullRequest(std::list<PullRequest*>& pullRequestList) = 0;
+        virtual long long computePullFromWhere(MessageQueue& mq) = 0;
+		virtual ConsumeType consumeType() = 0;
+
+        bool lock(MessageQueue& mq);
+        void lockAll();
+
+        void unlock(MessageQueue& mq, bool oneway);
+        void unlockAll(bool oneway);
+
+        void doRebalance();
+
+        std::map<MessageQueue, ProcessQueue*>& getProcessQueueTable();
+		kpr::RWMutex& getProcessQueueTableLock();
+		std::map<std::string, SubscriptionData>& getSubscriptionInner();
+        std::map<std::string, std::set<MessageQueue> >& getTopicSubscribeInfoTable();
+
+        std::string& getConsumerGroup();
+        void setConsumerGroup(const std::string& consumerGroup);
+
+        MessageModel getMessageModel();
+        void setMessageModel(MessageModel messageModel);
+
+        AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy();
+        void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy);
+
+        MQClientFactory* getmQClientFactory();
+        void setmQClientFactory(MQClientFactory* pMQClientFactory);
+
+		void removeProcessQueue(const MessageQueue& mq);
+
+    private:
+        std::map<std::string, std::set<MessageQueue> > buildProcessQueueTableByBrokerName();
+        void rebalanceByTopic(const std::string& topic);
+        bool updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet);
+        void truncateMessageQueueNotMyTopic();
+
+    protected:
+        std::map<MessageQueue, ProcessQueue*> m_processQueueTable;
+        kpr::RWMutex m_processQueueTableLock;
+
+        std::map<std::string, std::set<MessageQueue> > m_topicSubscribeInfoTable;
+        kpr::Mutex m_topicSubscribeInfoTableLock;
+
+        std::map<std::string, SubscriptionData> m_subscriptionInner;
+        kpr::Mutex m_subscriptionInnerLock;
+
+        std::string m_consumerGroup;
+        MessageModel m_messageModel;
+        AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy;
+        MQClientFactory* m_pMQClientFactory;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
new file mode 100755
index 0000000..1aa287b
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
@@ -0,0 +1,79 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalancePullImpl.h"
+#include "DefaultMQPullConsumerImpl.h"
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientFactory.h"
+#include "MessageQueueListener.h"
+#include "OffsetStore.h"
+#include "DefaultMQPullConsumer.h"
+
+namespace rmq
+{
+
+RebalancePullImpl::RebalancePullImpl(DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
+    : RebalanceImpl("", BROADCASTING, NULL, NULL),
+      m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
+{
+}
+
+RebalancePullImpl::RebalancePullImpl(const std::string& consumerGroup,
+                                     MessageModel messageModel,
+                                     AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+                                     MQClientFactory* pMQClientFactory,
+                                     DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
+    : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
+      m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
+{
+}
+
+long long RebalancePullImpl::computePullFromWhere(MessageQueue& mq)
+{
+    return 0;
+}
+
+void RebalancePullImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
+{
+}
+
+void RebalancePullImpl::messageQueueChanged(const std::string& topic,
+        std::set<MessageQueue>& mqAll,
+        std::set<MessageQueue>& mqDivided)
+{
+    MessageQueueListener* messageQueueListener =
+        m_pDefaultMQPullConsumerImpl->getDefaultMQPullConsumer()->getMessageQueueListener();
+    if (messageQueueListener != NULL)
+    {
+        try
+        {
+            messageQueueListener->messageQueueChanged(topic, mqAll, mqDivided);
+        }
+        catch (...)
+        {
+            RMQ_ERROR("messageQueueChanged exception, %s", topic.c_str());
+        }
+    }
+}
+
+bool RebalancePullImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
+{
+    m_pDefaultMQPullConsumerImpl->getOffsetStore()->persist(mq);
+    m_pDefaultMQPullConsumerImpl->getOffsetStore()->removeOffset(mq);
+    return true;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
new file mode 100755
index 0000000..46dbcd1
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
@@ -0,0 +1,56 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REBALANCEPULLIMPL_H__
+#define __REBALANCEPULLIMPL_H__
+
+#include "RebalanceImpl.h"
+
+namespace rmq
+{
+class DefaultMQPullConsumerImpl;
+
+class RebalancePullImpl : public RebalanceImpl
+{
+public:
+    RebalancePullImpl(DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
+
+    RebalancePullImpl(const std::string &consumerGroup,
+                      MessageModel messageModel,
+                      AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
+                      MQClientFactory *pMQClientFactory,
+                      DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
+
+    long long computePullFromWhere(MessageQueue &mq);
+
+    void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
+
+    void messageQueueChanged(const std::string &topic,
+                             std::set<MessageQueue> &mqAll,
+                             std::set<MessageQueue> &mqDivided);
+
+    bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
+
+    ConsumeType consumeType()
+    {
+        return CONSUME_ACTIVELY;
+    };
+
+private:
+    DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
new file mode 100755
index 0000000..fde770d
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
@@ -0,0 +1,217 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalancePushImpl.h"
+
+#include <string.h>
+#include <limits.h>
+
+#include "DefaultMQPushConsumerImpl.h"
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientFactory.h"
+#include "MessageQueueListener.h"
+#include "OffsetStore.h"
+#include "DefaultMQPushConsumer.h"
+#include "MQAdminImpl.h"
+
+
+namespace rmq
+{
+
+RebalancePushImpl::RebalancePushImpl(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
+    : RebalanceImpl("", BROADCASTING, NULL, NULL),
+      m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
+{
+}
+
+RebalancePushImpl::RebalancePushImpl(const std::string& consumerGroup,
+                                     MessageModel messageModel,
+                                     AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+                                     MQClientFactory* pMQClientFactory,
+                                     DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
+    : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
+      m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
+{
+}
+
+void RebalancePushImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
+{
+    std::list<PullRequest*>::iterator it = pullRequestList.begin();
+    for (; it != pullRequestList.end(); it++)
+    {
+        m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(*it);
+        RMQ_INFO("doRebalance, {%s}, add a new pull request {%s}",
+        	m_consumerGroup.c_str(), (*it)->toString().c_str());
+    }
+}
+
+long long RebalancePushImpl::computePullFromWhere(MessageQueue& mq)
+{
+    long long result = -1;
+    ConsumeFromWhere consumeFromWhere =
+        m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeFromWhere();
+    OffsetStore* offsetStore = m_pDefaultMQPushConsumerImpl->getOffsetStore();
+
+    switch (consumeFromWhere)
+    {
+    	case CONSUME_FROM_FIRST_OFFSET:
+        {
+            long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
+            if (lastOffset >= 0)
+            {
+                result = lastOffset;
+            }
+            else if (-1 == lastOffset)
+            {
+                result = 0L;
+            }
+            else
+            {
+                result = -1;
+            }
+            break;
+        }
+        case CONSUME_FROM_LAST_OFFSET:
+        {
+            long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
+            if (lastOffset >= 0)
+            {
+                result = lastOffset;
+            }
+            else if (-1 == lastOffset)
+            {
+                if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
+                {
+                    result = 0L;
+                }
+                else
+                {
+                    //result = LLONG_MAX;
+                    try
+                    {
+                    	result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+                    }
+                    catch(...)
+                    {
+                    	result = -1;
+                    }
+                }
+            }
+            else
+            {
+                result = -1;
+            }
+            break;
+        }
+
+        case CONSUME_FROM_MAX_OFFSET:
+            result = LLONG_MAX;
+            break;
+        case CONSUME_FROM_MIN_OFFSET:
+            result = 0L;
+            break;
+        case CONSUME_FROM_TIMESTAMP:
+        	{
+	        	long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
+	            if (lastOffset >= 0)
+	            {
+	                result = lastOffset;
+	            }
+	            else if (-1 == lastOffset)
+	            {
+	                if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
+	                {
+	                	//result = LLONG_MAX;
+	                    try
+	                    {
+	                    	result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+	                    }
+	                    catch(...)
+	                    {
+	                    	result = -1;
+	                    }
+	                }
+	                else
+	                {
+	                    try
+	                    {
+	                    	long timestamp = UtilAll::str2tm(
+                                    m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeTimestamp(),
+                                    rmq::yyyyMMddHHmmss);
+                        	result = m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
+	                    }
+	                    catch(...)
+	                    {
+	                    	result = -1;
+	                    }
+	                }
+	            }
+	            else
+	            {
+	                result = -1;
+	            }
+	            break;
+	        }
+        	break;
+        default:
+            break;
+    }
+
+    return result;
+}
+
+void RebalancePushImpl::messageQueueChanged(const std::string& topic,
+        std::set<MessageQueue>& mqAll,
+        std::set<MessageQueue>& mqDivided)
+{
+}
+
+
+bool RebalancePushImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
+{
+    m_pDefaultMQPushConsumerImpl->getOffsetStore()->persist(mq);
+    m_pDefaultMQPushConsumerImpl->getOffsetStore()->removeOffset(mq);
+    if (m_pDefaultMQPushConsumerImpl->isConsumeOrderly()
+    	&& m_pDefaultMQPushConsumerImpl->messageModel() == CLUSTERING)
+    {
+    	if (pq.getLockConsume().TryLock(1000))
+    	{
+	    	try
+	    	{
+	        	this->unlock(mq, true);
+	        }
+	        catch (std::exception& e)
+	        {
+	            RMQ_ERROR("removeUnnecessaryMessageQueue Exception: %s", e.what());
+	        }
+	        pq.getLockConsume().Unlock();
+	   	}
+	   	else
+	   	{
+	   		RMQ_WARN("[WRONG]mq is consuming, so can not unlock it, MQ:%s, maybe hanged for a while, times:{%lld}",
+            	mq.toString().c_str(),
+            	pq.getTryUnlockTimes());
+
+			pq.incTryUnlockTimes();
+	   	}
+
+	   	return false;
+    }
+
+    return true;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
new file mode 100755
index 0000000..0aa2b0e
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
@@ -0,0 +1,55 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __REBALANCEPUSHIMPL_H__
+#define __REBALANCEPUSHIMPL_H__
+
+#include "RebalanceImpl.h"
+
+namespace rmq
+{
+class DefaultMQPushConsumerImpl;
+
+class RebalancePushImpl : public RebalanceImpl
+{
+public:
+    RebalancePushImpl(DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
+
+    RebalancePushImpl(const std::string &consumerGroup,
+                      MessageModel messageModel,
+                      AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
+                      MQClientFactory *pMQClientFactory,
+                      DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
+
+    void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
+    long long computePullFromWhere(MessageQueue &mq);
+    void messageQueueChanged(const std::string &topic,
+                             std::set<MessageQueue> &mqAll,
+                             std::set<MessageQueue> &mqDivided);
+    bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
+
+
+    ConsumeType consumeType()
+    {
+        return CONSUME_PASSIVELY;
+    };
+
+private:
+    DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
new file mode 100644
index 0000000..013fefb
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
@@ -0,0 +1,55 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalanceService.h"
+#include "MQClientFactory.h"
+
+namespace rmq
+{
+
+long RebalanceService::s_WaitInterval = 1000 * 10;
+
+RebalanceService::RebalanceService(MQClientFactory* pMQClientFactory)
+    : ServiceThread("RebalanceService"),
+      m_pMQClientFactory(pMQClientFactory)
+{
+}
+
+
+RebalanceService::~RebalanceService()
+{
+
+}
+
+void RebalanceService::Run()
+{
+	RMQ_INFO("%s service started", getServiceName().c_str());
+
+    while (!m_stoped)
+    {
+        waitForRunning(s_WaitInterval);
+        m_pMQClientFactory->doRebalance();
+    }
+
+    RMQ_INFO("%s service end", getServiceName().c_str());
+}
+
+std::string RebalanceService::getServiceName()
+{
+    return "RebalanceService";
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.h b/rocketmq-client4cpp/src/consumer/RebalanceService.h
new file mode 100755
index 0000000..ef4d746
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceService.h
@@ -0,0 +1,44 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REBALANCESERVICE_H__
+#define __REBALANCESERVICE_H__
+
+#include "ServiceThread.h"
+
+namespace rmq
+{
+	class MQClientFactory;
+
+	/**
+	* Rebalance service
+	*
+	*/
+	class RebalanceService : public ServiceThread
+	{
+	public:
+	    RebalanceService(MQClientFactory* pMQClientFactory);
+	    ~RebalanceService();
+
+	    void Run();
+	    std::string getServiceName();
+
+	private:
+	    MQClientFactory* m_pMQClientFactory;
+	    static long s_WaitInterval;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
new file mode 100755
index 0000000..1c4fd23
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -0,0 +1,266 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RemoteBrokerOffsetStore.h"
+#include "MQClientFactory.h"
+#include "ScopedLock.h"
+#include "MQClientException.h"
+#include "CommandCustomHeader.h"
+#include "MQClientAPIImpl.h"
+
+namespace rmq
+{
+
+RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName)
+{
+    m_pMQClientFactory = pMQClientFactory;
+    m_groupName = groupName;
+}
+
+void RemoteBrokerOffsetStore::load()
+{
+
+}
+
+void RemoteBrokerOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)
+{
+    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+    typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+    if (it == m_offsetTable.end())
+    {
+        m_offsetTable[mq] = offset;
+        it = m_offsetTable.find(mq);
+    }
+
+    kpr::AtomicLong& offsetOld = it->second;
+    if (increaseOnly)
+    {
+        MixAll::compareAndIncreaseOnly(offsetOld, offset);
+    }
+    else
+    {
+        offsetOld.set(offset);
+    }
+}
+
+long long RemoteBrokerOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type)
+{
+    RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type);
+    switch (type)
+    {
+        case MEMORY_FIRST_THEN_STORE:
+        case READ_FROM_MEMORY:
+        {
+            kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+            typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+            if (it != m_offsetTable.end())
+            {
+                return it->second.get();
+            }
+            else if (READ_FROM_MEMORY == type)
+            {
+                RMQ_DEBUG("No offset in memory, MQ:%s", mq.toString().c_str());
+                return -1;
+            }
+        }
+        case READ_FROM_STORE:
+        {
+            try
+            {
+                long long brokerOffset = this->fetchConsumeOffsetFromBroker(mq);
+                RMQ_DEBUG("fetchConsumeOffsetFromBroker, MQ:%s, brokerOffset:%lld",
+                          mq.toString().c_str(), brokerOffset);
+                if (brokerOffset >= 0)
+                {
+                    this->updateOffset(mq, brokerOffset, false);
+                }
+                return brokerOffset;
+            }
+            // No offset in broker
+            catch (const MQBrokerException& e)
+            {
+                RMQ_WARN("No offset in broker, MQ:%s, exception:%s", mq.toString().c_str(), e.what());
+                return -1;
+            }
+            catch (const std::exception& e)
+            {
+                RMQ_ERROR("fetchConsumeOffsetFromBroker exception, MQ:%s, msg:%s",
+                          mq.toString().c_str(), e.what());
+                return -2;
+            }
+            catch (...)
+            {
+                RMQ_ERROR("fetchConsumeOffsetFromBroker unknow exception, MQ:%s",
+                          mq.toString().c_str());
+                return -2;
+            }
+        }
+        default:
+            break;
+    }
+
+    return -1;
+}
+
+void RemoteBrokerOffsetStore::persistAll(std::set<MessageQueue>& mqs)
+{
+    if (mqs.empty())
+    {
+        return;
+    }
+
+    std::set<MessageQueue> unusedMQ;
+    long long times = m_storeTimesTotal.fetchAndAdd(1);
+
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+    for (typeof(m_offsetTable.begin()) it = m_offsetTable.begin();
+         it != m_offsetTable.end(); it++)
+    {
+        MessageQueue mq = it->first;
+        kpr::AtomicLong& offset = it->second;
+        if (mqs.find(mq) != mqs.end())
+        {
+            try
+            {
+                this->updateConsumeOffsetToBroker(mq, offset.get());
+                if ((times % 12) == 0)
+                {
+                    RMQ_INFO("updateConsumeOffsetToBroker, Group: {%s} ClientId: {%s}  mq:{%s} offset {%llu}",
+                             m_groupName.c_str(),
+                             m_pMQClientFactory->getClientId().c_str(),
+                             mq.toString().c_str(),
+                             offset.get());
+                }
+            }
+            catch (...)
+            {
+                RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
+            }
+        }
+        else
+        {
+            unusedMQ.insert(mq);
+        }
+    }
+
+    if (!unusedMQ.empty())
+    {
+        for (typeof(unusedMQ.begin()) it = unusedMQ.begin(); it != unusedMQ.end(); it++)
+        {
+            m_offsetTable.erase(*it);
+            RMQ_INFO("remove unused mq, %s, %s", it->toString().c_str(), m_groupName.c_str());
+        }
+    }
+}
+
+void RemoteBrokerOffsetStore::persist(const MessageQueue& mq)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+    typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+    if (it != m_offsetTable.end())
+    {
+        try
+        {
+            this->updateConsumeOffsetToBroker(mq, it->second.get());
+            RMQ_DEBUG("updateConsumeOffsetToBroker ok, mq=%s, offset=%lld", mq.toString().c_str(), it->second.get());
+        }
+        catch (...)
+        {
+            RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
+        }
+    }
+}
+
+void RemoteBrokerOffsetStore::removeOffset(const MessageQueue& mq)
+{
+    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+    m_offsetTable.erase(mq);
+    RMQ_INFO("remove unnecessary messageQueue offset. mq=%s, offsetTableSize=%u",
+             mq.toString().c_str(), (unsigned)m_offsetTable.size());
+}
+
+
+std::map<MessageQueue, long long> RemoteBrokerOffsetStore::cloneOffsetTable(const std::string& topic)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+    std::map<MessageQueue, long long> cloneOffsetTable;
+    RMQ_FOR_EACH(m_offsetTable, it)
+    {
+        MessageQueue mq = it->first;
+        kpr::AtomicLong& offset = it->second;
+        if (topic == mq.getTopic())
+        {
+            cloneOffsetTable[mq] = offset.get();
+        }
+    }
+
+    return cloneOffsetTable;
+}
+
+
+void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset)
+{
+    FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+    if (findBrokerResult.brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+    }
+
+    if (!findBrokerResult.brokerAddr.empty())
+    {
+        UpdateConsumerOffsetRequestHeader* requestHeader = new UpdateConsumerOffsetRequestHeader();
+        requestHeader->topic = mq.getTopic();
+        requestHeader->consumerGroup = this->m_groupName;
+        requestHeader->queueId = mq.getQueueId();
+        requestHeader->commitOffset = offset;
+
+        m_pMQClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway(
+            findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
+    }
+    else
+    {
+        THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+    }
+}
+
+long long RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MessageQueue& mq)
+{
+    FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+    if (findBrokerResult.brokerAddr.empty())
+    {
+        // TODO Here may be heavily overhead for Name Server,need tuning
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+    }
+
+    if (!findBrokerResult.brokerAddr.empty())
+    {
+        QueryConsumerOffsetRequestHeader* requestHeader = new QueryConsumerOffsetRequestHeader();
+        requestHeader->topic = mq.getTopic();
+        requestHeader->consumerGroup = this->m_groupName;
+        requestHeader->queueId = mq.getQueueId();
+
+        return m_pMQClientFactory->getMQClientAPIImpl()->queryConsumerOffset(
+                   findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
+    }
+    else
+    {
+        THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
new file mode 100755
index 0000000..b613084
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
@@ -0,0 +1,61 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REMOTEBROKEROFFSETSTORE_H__
+#define __REMOTEBROKEROFFSETSTORE_H__
+
+#include "OffsetStore.h"
+#include <map>
+#include <string>
+#include <set>
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "Mutex.h"
+
+namespace rmq
+{
+    class MQClientFactory;
+
+    /**
+    * offset remote store
+    *
+    */
+    class RemoteBrokerOffsetStore : public OffsetStore
+    {
+    public:
+        RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName) ;
+
+        void load();
+        void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly);
+        long long readOffset(const MessageQueue& mq, ReadOffsetType type);
+        void persistAll(std::set<MessageQueue>& mqs);
+        void persist(const MessageQueue& mq);
+        void removeOffset(const MessageQueue& mq) ;
+        std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic);
+
+    private:
+        void updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset);
+        long long fetchConsumeOffsetFromBroker(const MessageQueue& mq);
+
+    private:
+        MQClientFactory* m_pMQClientFactory;
+        std::string m_groupName;
+        kpr::AtomicInteger m_storeTimesTotal;
+        std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
+        kpr::RWMutex m_tableMutex;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
new file mode 100755
index 0000000..ed5cf12
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
@@ -0,0 +1,201 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "SubscriptionData.h"
+
+#include <sstream>
+#include "KPRUtil.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+std::string SubscriptionData::SUB_ALL = "*";
+
+SubscriptionData::SubscriptionData()
+{
+    m_subVersion = KPRUtil::GetCurrentTimeMillis();
+}
+
+SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString)
+    : m_topic(topic),
+      m_subString(subString)
+{
+    m_subVersion = KPRUtil::GetCurrentTimeMillis();
+}
+
+std::string SubscriptionData::getTopic()const
+{
+    return m_topic;
+}
+
+void SubscriptionData::setTopic(const std::string& topic)
+{
+    m_topic = topic;
+}
+
+std::string SubscriptionData::getSubString()
+{
+    return m_subString;
+}
+
+void SubscriptionData::setSubString(const std::string& subString)
+{
+    m_subString = subString;
+}
+
+std::set<std::string>& SubscriptionData::getTagsSet()
+{
+    return m_tagsSet;
+}
+
+void SubscriptionData::setTagsSet(const std::set<std::string>& tagsSet)
+{
+    m_tagsSet = tagsSet;
+}
+
+long long SubscriptionData::getSubVersion()
+{
+    return m_subVersion;
+}
+
+void SubscriptionData::setSubVersion(long long subVersion)
+{
+    m_subVersion = subVersion;
+}
+
+std::set<int>& SubscriptionData::getCodeSet()
+{
+    return m_codeSet;
+}
+
+void SubscriptionData::setCodeSet(const std::set<int>& codeSet)
+{
+    m_codeSet = codeSet;
+}
+
+int SubscriptionData::hashCode()
+{
+    /*
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + (classFilterMode ? 1231 : 1237);
+    result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
+    result = prime * result + ((subString == null) ? 0 : subString.hashCode());
+    result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
+    result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+    return result;
+    */
+    std::stringstream ss;
+    ss  << UtilAll::hashCode(m_codeSet)
+        << m_subString
+        << UtilAll::hashCode(m_tagsSet)
+        << m_topic;
+    return UtilAll::hashCode(ss.str());
+}
+
+
+
+
+bool SubscriptionData::operator==(const SubscriptionData& other)
+{
+    if (m_codeSet != other.m_codeSet)
+    {
+        return false;
+    }
+
+    if (m_subString != other.m_subString)
+    {
+        return false;
+    }
+
+    if (m_subVersion != other.m_subVersion)
+    {
+        return false;
+    }
+
+    if (m_tagsSet != other.m_tagsSet)
+    {
+        return false;
+    }
+
+    if (m_topic != other.m_topic)
+    {
+        return false;
+    }
+
+    return true;
+}
+
+bool SubscriptionData::operator<(const SubscriptionData& other)const
+{
+    if (m_topic < other.m_topic)
+    {
+        return true;
+    }
+    else if (m_topic == other.m_topic)
+    {
+        if (m_subString < other.m_subString)
+        {
+            return true;
+        }
+        else
+        {
+            return false;
+        }
+    }
+    else
+    {
+        return false;
+    }
+}
+
+void SubscriptionData::toJson(Json::Value& obj) const
+{
+    obj["classFilterMode"] = false;
+    obj["topic"] = m_topic;
+    obj["subString"] = m_subString;
+    obj["subVersion"] = (long long)m_subVersion;
+
+    Json::Value tagSet(Json::arrayValue);
+    RMQ_FOR_EACH(m_tagsSet, it)
+    {
+        tagSet.append(*it);
+    }
+    obj["tagsSet"] = tagSet;
+
+    Json::Value codeSet(Json::arrayValue);
+    RMQ_FOR_EACH(m_codeSet, it)
+    {
+        codeSet.append(*it);
+    }
+    obj["codeSet"] = codeSet;
+}
+
+std::string SubscriptionData::toString() const
+{
+    std::stringstream ss;
+    ss << "{classFilterMode=" << false
+       << ",topic=" << m_topic
+       << ",subString=" << m_subString
+       << ",subVersion=" << m_subVersion
+       << ",tagsSet=" << UtilAll::toString(m_tagsSet)
+       << ",codeSet=" << UtilAll::toString(m_codeSet)
+       << "}";
+    return ss.str();
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/SubscriptionData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.h b/rocketmq-client4cpp/src/consumer/SubscriptionData.h
new file mode 100755
index 0000000..4796fb7
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/SubscriptionData.h
@@ -0,0 +1,76 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __SUBSCRIPTIONDATA_H__
+#define __SUBSCRIPTIONDATA_H__
+
+#include <string>
+#include <set>
+
+#include "RocketMQClient.h"
+#include "RemotingSerializable.h"
+#include "RefHandle.h"
+#include "json/json.h"
+
+namespace rmq
+{
+    class SubscriptionData : public kpr::RefCount
+    {
+    public:
+        SubscriptionData();
+        SubscriptionData(const std::string& topic, const std::string& subString);
+
+        std::string getTopic()const;
+        void setTopic(const std::string& topic);
+
+        std::string getSubString();
+        void setSubString(const std::string& subString);
+
+        std::set<std::string>& getTagsSet();
+        void setTagsSet(const std::set<std::string>& tagsSet);
+
+        long long getSubVersion();
+        void setSubVersion(long long subVersion);
+
+        std::set<int>& getCodeSet();
+        void setCodeSet(const std::set<int>& codeSet);
+
+        int hashCode();
+        void toJson(Json::Value& obj) const;
+		std::string toString() const;
+
+        bool operator==(const SubscriptionData& other);
+        bool operator<(const SubscriptionData& other)const;
+
+    public:
+        static std::string SUB_ALL;
+
+    private:
+        std::string m_topic;
+        std::string m_subString;
+        std::set<std::string> m_tagsSet;
+        std::set<int> m_codeSet;
+        long long m_subVersion ;
+    };
+	typedef kpr::RefHandleT<SubscriptionData> SubscriptionDataPtr;
+
+	inline std::ostream& operator<<(std::ostream& os, const SubscriptionData& obj)
+	{
+	    os << obj.toString();
+	    return os;
+	}
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/AUTHORS
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/AUTHORS b/rocketmq-client4cpp/src/jsoncpp/AUTHORS
new file mode 100755
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/LICENSE b/rocketmq-client4cpp/src/jsoncpp/LICENSE
new file mode 100755
index 0000000..403d096
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/LICENSE
@@ -0,0 +1 @@
+The json-cpp library and this documentation are in Public Domain.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/README.txt
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/README.txt b/rocketmq-client4cpp/src/jsoncpp/README.txt
new file mode 100755
index 0000000..379d376
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/README.txt
@@ -0,0 +1,117 @@
+* Introduction:
+  =============
+
+JSON (JavaScript Object Notation) is a lightweight data-interchange format. 
+It can represent integer, real number, string, an ordered sequence of 
+value, and a collection of name/value pairs.
+
+JsonCpp is a simple API to manipulate JSON value, handle serialization 
+and unserialization to string.
+
+It can also preserve existing comment in unserialization/serialization steps,
+making it a convenient format to store user input files.
+
+Unserialization parsing is user friendly and provides precise error reports.
+
+
+* Building/Testing:
+  =================
+
+JsonCpp uses Scons (http://www.scons.org) as a build system. Scons requires
+python to be installed (http://www.python.org).
+
+You download scons-local distribution from the following url:
+http://sourceforge.net/project/showfiles.php?group_id=30337&package_id=67375
+
+Unzip it in the directory where you found this README file. scons.py Should be 
+at the same level as README.
+
+python scons.py platform=PLTFRM [TARGET]
+where PLTFRM may be one of:
+	suncc Sun C++ (Solaris)
+	vacpp Visual Age C++ (AIX)
+	mingw 
+	msvc6 Microsoft Visual Studio 6 service pack 5-6
+	msvc70 Microsoft Visual Studio 2002
+	msvc71 Microsoft Visual Studio 2003
+	msvc80 Microsoft Visual Studio 2005
+	linux-gcc Gnu C++ (linux, also reported to work for Mac OS X)
+	
+adding platform is fairly simple. You need to change the Sconstruct file 
+to do so.
+	
+and TARGET may be:
+	check: build library and run unit tests.
+
+    
+* Running the test manually:
+  ==========================
+
+cd test
+# This will run the Reader/Writer tests
+python runjsontests.py "path to jsontest.exe"
+
+# This will run the Reader/Writer tests, using JSONChecker test suite
+# (http://www.json.org/JSON_checker/).
+# Notes: not all tests pass: JsonCpp is too lenient (for example,
+# it allows an integer to start with '0'). The goal is to improve
+# strict mode parsing to get all tests to pass.
+python runjsontests.py --with-json-checker "path to jsontest.exe"
+
+# This will run the unit tests (mostly Value)
+python rununittests.py "path to test_lib_json.exe"
+
+You can run the tests using valgrind:
+python rununittests.py --valgrind "path to test_lib_json.exe"
+
+
+* Building the documentation:
+  ===========================
+
+Run the python script doxybuild.py from the top directory:
+
+python doxybuild.py --open --with-dot
+
+See doxybuild.py --help for options. 
+
+
+* Adding a reader/writer test:
+  ============================
+
+To add a test, you need to create two files in test/data:
+- a TESTNAME.json file, that contains the input document in JSON format.
+- a TESTNAME.expected file, that contains a flatened representation of 
+  the input document.
+  
+TESTNAME.expected file format:
+- each line represents a JSON element of the element tree represented 
+  by the input document.
+- each line has two parts: the path to access the element separated from
+  the element value by '='. Array and object values are always empty 
+  (e.g. represented by either [] or {}).
+- element path: '.' represented the root element, and is used to separate 
+  object members. [N] is used to specify the value of an array element
+  at index N.
+See test_complex_01.json and test_complex_01.expected to better understand
+element path.
+
+
+* Understanding reader/writer test output:
+  ========================================
+
+When a test is run, output files are generated aside the input test files. 
+Below is a short description of the content of each file:
+
+- test_complex_01.json: input JSON document
+- test_complex_01.expected: flattened JSON element tree used to check if 
+    parsing was corrected.
+
+- test_complex_01.actual: flattened JSON element tree produced by 
+    jsontest.exe from reading test_complex_01.json
+- test_complex_01.rewrite: JSON document written by jsontest.exe using the
+    Json::Value parsed from test_complex_01.json and serialized using
+    Json::StyledWritter.
+- test_complex_01.actual-rewrite: flattened JSON element tree produced by 
+    jsontest.exe from reading test_complex_01.rewrite.
+test_complex_01.process-output: jsontest.exe output, typically useful to
+    understand parsing error.
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
new file mode 100755
index 0000000..1235a3e
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
@@ -0,0 +1,96 @@
+// Copyright 2007-2010 Baptiste Lepilleur
+// Distributed under MIT license, or public domain if desired and
+// recognized in your jurisdiction.
+// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
+
+#ifndef CPPTL_JSON_ALLOCATOR_H_INCLUDED
+#define CPPTL_JSON_ALLOCATOR_H_INCLUDED
+
+#include <cstring>
+#include <memory>
+
+namespace rmq {
+namespace Json {
+template<typename T>
+class SecureAllocator {
+	public:
+		// Type definitions
+		using value_type      = T;
+		using pointer         = T*;
+		using const_pointer   = const T*;
+		using reference       = T&;
+		using const_reference = const T&;
+		using size_type       = std::size_t;
+		using difference_type = std::ptrdiff_t;
+
+		/**
+		 * Allocate memory for N items using the standard allocator.
+		 */
+		pointer allocate(size_type n) {
+			// allocate using "global operator new"
+			return static_cast<pointer>(::operator new(n * sizeof(T)));
+		}
+
+		/**
+		 * Release memory which was allocated for N items at pointer P.
+		 *
+		 * The memory block is filled with zeroes before being released.
+		 * The pointer argument is tagged as "volatile" to prevent the
+		 * compiler optimizing out this critical step.
+		 */
+		void deallocate(volatile pointer p, size_type n) {
+			std::memset(p, 0, n * sizeof(T));
+			// free using "global operator delete"
+			::operator delete(p);
+		}
+
+		/**
+		 * Construct an item in-place at pointer P.
+		 */
+		template<typename... Args>
+		void construct(pointer p, Args&&... args) {
+			// construct using "placement new" and "perfect forwarding"
+			::new (static_cast<void*>(p)) T(std::forward<Args>(args)...);
+		}
+
+		size_type max_size() const {
+			return size_t(-1) / sizeof(T);
+		}
+
+		pointer address( reference x ) const {
+			return std::addressof(x);
+		}
+
+		const_pointer address( const_reference x ) const {
+			return std::addressof(x);
+		}
+
+		/**
+		 * Destroy an item in-place at pointer P.
+		 */
+		void destroy(pointer p) {
+			// destroy using "explicit destructor"
+			p->~T();
+		}
+
+		// Boilerplate
+		SecureAllocator() {}
+		template<typename U> SecureAllocator(const SecureAllocator<U>&) {}
+		template<typename U> struct rebind { using other = SecureAllocator<U>; };
+};
+
+
+template<typename T, typename U>
+bool operator==(const SecureAllocator<T>&, const SecureAllocator<U>&) {
+	return true;
+}
+
+template<typename T, typename U>
+bool operator!=(const SecureAllocator<T>&, const SecureAllocator<U>&) {
+	return false;
+}
+
+} //namespace Json
+} //namespace rmq
+
+#endif // CPPTL_JSON_ALLOCATOR_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
new file mode 100755
index 0000000..dc67b27
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
@@ -0,0 +1,54 @@
+// Copyright 2007-2010 Baptiste Lepilleur
+// Distributed under MIT license, or public domain if desired and
+// recognized in your jurisdiction.
+// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
+
+#ifndef CPPTL_JSON_ASSERTIONS_H_INCLUDED
+#define CPPTL_JSON_ASSERTIONS_H_INCLUDED
+
+#include <stdlib.h>
+#include <sstream>
+
+#if !defined(JSON_IS_AMALGAMATION)
+#include "config.h"
+#endif // if !defined(JSON_IS_AMALGAMATION)
+
+/** It should not be possible for a maliciously designed file to
+ *  cause an abort() or seg-fault, so these macros are used only
+ *  for pre-condition violations and internal logic errors.
+ */
+#if JSON_USE_EXCEPTION
+
+// @todo <= add detail about condition in exception
+# define JSON_ASSERT(condition)                                                \
+  {if (!(condition)) {rmq::Json::throwLogicError( "assert json failed" );}}
+
+# define JSON_FAIL_MESSAGE(message)                                            \
+  {                                                                            \
+    JSONCPP_OSTRINGSTREAM oss; oss << message;                                    \
+    rmq::Json::throwLogicError(oss.str());                                          \
+    abort();                                                                   \
+  }
+
+#else // JSON_USE_EXCEPTION
+
+# define JSON_ASSERT(condition) assert(condition)
+
+// The call to assert() will show the failure message in debug builds. In
+// release builds we abort, for a core-dump or debugger.
+# define JSON_FAIL_MESSAGE(message)                                            \
+  {                                                                            \
+    JSONCPP_OSTRINGSTREAM oss; oss << message;                                    \
+    assert(false && oss.str().c_str());                                        \
+    abort();                                                                   \
+  }
+
+
+#endif
+
+#define JSON_ASSERT_MESSAGE(condition, message)                                \
+  if (!(condition)) {                                                          \
+    JSON_FAIL_MESSAGE(message);                                                \
+  }
+
+#endif // CPPTL_JSON_ASSERTIONS_H_INCLUDED

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
new file mode 100644
index 0000000..6fcc8af
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
@@ -0,0 +1,25 @@
+// Copyright 2007-2010 Baptiste Lepilleur
+// Distributed under MIT license, or public domain if desired and
+// recognized in your jurisdiction.
+// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
+
+#ifndef JSON_AUTOLINK_H_INCLUDED
+#define JSON_AUTOLINK_H_INCLUDED
+
+#include "config.h"
+
+#ifdef JSON_IN_CPPTL
+#include <cpptl/cpptl_autolink.h>
+#endif
+
+#if !defined(JSON_NO_AUTOLINK) && !defined(JSON_DLL_BUILD) &&                  \
+    !defined(JSON_IN_CPPTL)
+#define CPPTL_AUTOLINK_NAME "json"
+#undef CPPTL_AUTOLINK_DLL
+#ifdef JSON_DLL
+#define CPPTL_AUTOLINK_DLL
+#endif
+#include "autolink.h"
+#endif
+
+#endif // JSON_AUTOLINK_H_INCLUDED


Mime
View raw message