http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullRequest.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.cpp b/rocketmq-client4cpp/src/consumer/PullRequest.cpp
new file mode 100755
index 0000000..b8650c6
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullRequest.cpp
@@ -0,0 +1,108 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "PullRequest.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+PullRequest::~PullRequest()
+{
+
+}
+
+std::string PullRequest::getConsumerGroup()
+{
+ return m_consumerGroup;
+}
+
+void PullRequest::setConsumerGroup(const std::string& consumerGroup)
+{
+ m_consumerGroup = consumerGroup;
+}
+
+MessageQueue& PullRequest::getMessageQueue()
+{
+ return m_messageQueue;
+}
+
+void PullRequest::setMessageQueue(const MessageQueue& messageQueue)
+{
+ m_messageQueue = messageQueue;
+}
+
+long long PullRequest::getNextOffset()
+{
+ return m_nextOffset;
+}
+
+void PullRequest::setNextOffset(long long nextOffset)
+{
+ m_nextOffset = nextOffset;
+}
+
+int PullRequest::hashCode()
+{
+ /*
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((consumerGroup == null) ? 0 : consumerGroup.hashCode());
+ result = prime * result + ((messageQueue == null) ? 0 : messageQueue.hashCode());
+ return result;
+ */
+ std::stringstream ss;
+ ss << m_consumerGroup
+ << m_messageQueue.hashCode();
+ return UtilAll::hashCode(ss.str());
+}
+
+std::string PullRequest::toString() const
+{
+ std::stringstream ss;
+ ss << "{consumerGroup=" << m_consumerGroup
+ << ",messageQueue=" << m_messageQueue.toString()
+ << ",nextOffset=" << m_nextOffset << "}";
+ return ss.str();
+}
+
+
+bool PullRequest::operator==(const PullRequest& other)
+{
+ if (m_consumerGroup != other.m_consumerGroup)
+ {
+ return false;
+ }
+
+ if (!(m_messageQueue == other.m_messageQueue))
+ {
+ return false;
+ }
+
+ return true;
+}
+
+ProcessQueue* PullRequest::getProcessQueue()
+{
+ return m_pProcessQueue;
+}
+
+void PullRequest::setProcessQueue(ProcessQueue* pProcessQueue)
+{
+ m_pProcessQueue = pProcessQueue;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullRequest.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullRequest.h b/rocketmq-client4cpp/src/consumer/PullRequest.h
new file mode 100755
index 0000000..3fb8367
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullRequest.h
@@ -0,0 +1,59 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PULLREQUEST_H__
+#define __PULLREQUEST_H__
+
+#include <string>
+#include <sstream>
+
+#include "MessageQueue.h"
+#include "ProcessQueue.h"
+
+namespace rmq
+{
+ class PullRequest
+ {
+ public:
+ virtual ~PullRequest();
+
+ std::string getConsumerGroup();
+ void setConsumerGroup(const std::string& consumerGroup);
+
+ MessageQueue& getMessageQueue();
+ void setMessageQueue(const MessageQueue& messageQueue);
+
+ long long getNextOffset();
+ void setNextOffset(long long nextOffset);
+
+ int hashCode();
+ std::string toString() const;
+
+ bool operator==(const PullRequest& other);
+
+ ProcessQueue* getProcessQueue();
+ void setProcessQueue(ProcessQueue* pProcessQueue);
+
+ private:
+ std::string m_consumerGroup;
+ MessageQueue m_messageQueue;
+
+ ProcessQueue* m_pProcessQueue;
+ long long m_nextOffset;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullResultExt.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullResultExt.h b/rocketmq-client4cpp/src/consumer/PullResultExt.h
new file mode 100755
index 0000000..24235b2
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullResultExt.h
@@ -0,0 +1,53 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PULLRESULTEXT_H__
+#define __PULLRESULTEXT_H__
+
+#include "PullResult.h"
+
+namespace rmq
+{
+
+ struct PullResultExt : public PullResult
+ {
+ PullResultExt(PullStatus pullStatus,
+ long long nextBeginOffset,
+ long long minOffset,
+ long long maxOffset,
+ std::list<MessageExt*>& msgFoundList,
+ long suggestWhichBrokerId,
+ const char* messageBinary,
+ int messageBinaryLen)
+ : PullResult(pullStatus,
+ nextBeginOffset,
+ minOffset,
+ maxOffset,
+ msgFoundList),
+ suggestWhichBrokerId(suggestWhichBrokerId),
+ messageBinary(messageBinary),
+ messageBinaryLen(messageBinaryLen)
+ {
+
+ }
+
+ long suggestWhichBrokerId;
+ const char* messageBinary;
+ int messageBinaryLen;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
new file mode 100755
index 0000000..efdc1cc
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceImpl.cpp
@@ -0,0 +1,613 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalanceImpl.h"
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientFactory.h"
+#include "MixAll.h"
+#include "LockBatchBody.h"
+#include "MQClientAPIImpl.h"
+#include "KPRUtil.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+RebalanceImpl::RebalanceImpl(const std::string& consumerGroup,
+ MessageModel messageModel,
+ AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+ MQClientFactory* pMQClientFactory)
+ : m_consumerGroup(consumerGroup),
+ m_messageModel(messageModel),
+ m_pAllocateMessageQueueStrategy(pAllocateMessageQueueStrategy),
+ m_pMQClientFactory(pMQClientFactory)
+{
+
+}
+
+RebalanceImpl::~RebalanceImpl()
+{
+}
+
+void RebalanceImpl::unlock(MessageQueue& mq, bool oneway)
+{
+ FindBrokerResult findBrokerResult =
+ m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
+ if (!findBrokerResult.brokerAddr.empty())
+ {
+ UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
+ requestBody->setConsumerGroup(m_consumerGroup);
+ requestBody->setClientId(m_pMQClientFactory->getClientId());
+ requestBody->getMqSet().insert(mq);
+
+ try
+ {
+ m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
+ requestBody, 1000, oneway);
+ }
+ catch (...)
+ {
+ RMQ_ERROR("unlockBatchMQ exception, MQ: {%s}" , mq.toString().c_str());
+ }
+ }
+}
+
+void RebalanceImpl::unlockAll(bool oneway)
+{
+ std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
+ std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
+
+ for (; it != brokerMqs.end(); it++)
+ {
+ std::string brokerName = it->first;
+ std::set<MessageQueue> mqs = it->second;
+
+ if (mqs.empty())
+ {
+ continue;
+ }
+
+ FindBrokerResult findBrokerResult =
+ m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
+
+ if (!findBrokerResult.brokerAddr.empty())
+ {
+ UnlockBatchRequestBody* requestBody = new UnlockBatchRequestBody();
+ requestBody->setConsumerGroup(m_consumerGroup);
+ requestBody->setClientId(m_pMQClientFactory->getClientId());
+ requestBody->setMqSet(mqs);
+
+ try
+ {
+ m_pMQClientFactory->getMQClientAPIImpl()->unlockBatchMQ(findBrokerResult.brokerAddr,
+ requestBody, 1000, oneway);
+
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::set<MessageQueue>::iterator itm = mqs.begin();
+ for (; itm != mqs.end(); itm++)
+ {
+ std::map<MessageQueue, ProcessQueue*>::iterator itp = m_processQueueTable.find(*itm);
+ if (itp != m_processQueueTable.end())
+ {
+ itp->second->setLocked(false);
+ RMQ_INFO("the message queue unlock OK, Group: {%s}, MQ: {%s}",
+ m_consumerGroup.c_str(), (*itm).toString().c_str());
+ }
+ }
+ }
+ catch (...)
+ {
+ RMQ_ERROR("unlockBatchMQ exception, mqs.size: {%u} ", (unsigned)mqs.size());
+ }
+ }
+ }
+}
+
+bool RebalanceImpl::lock(MessageQueue& mq)
+{
+ FindBrokerResult findBrokerResult =
+ m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(), MixAll::MASTER_ID, true);
+ if (!findBrokerResult.brokerAddr.empty())
+ {
+ LockBatchRequestBody* requestBody = new LockBatchRequestBody();
+ requestBody->setConsumerGroup(m_consumerGroup);
+ requestBody->setClientId(m_pMQClientFactory->getClientId());
+ requestBody->getMqSet().insert(mq);
+
+ try
+ {
+ std::set<MessageQueue> lockedMq =
+ m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+ findBrokerResult.brokerAddr, requestBody, 1000);
+
+ std::set<MessageQueue>::iterator it = lockedMq.begin();
+ for (; it != lockedMq.end(); it++)
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ MessageQueue mmqq = *it;
+ std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mmqq);
+ if (itt != m_processQueueTable.end())
+ {
+ itt->second->setLocked(true);
+ itt->second->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
+ }
+ }
+
+ it = lockedMq.find(mq);
+ bool lockOK = (it != lockedMq.end());
+
+ RMQ_INFO("the message queue lock {%s}, {%s}, {%s}",//
+ (lockOK ? "OK" : "Failed"), //
+ m_consumerGroup.c_str(), //
+ mq.toString().c_str());
+ return lockOK;
+ }
+ catch (...)
+ {
+ RMQ_ERROR("lockBatchMQ exception, MQ: {%s}", mq.toString().c_str());
+ }
+ }
+
+ return false;
+}
+
+void RebalanceImpl::lockAll()
+{
+ std::map<std::string, std::set<MessageQueue> > brokerMqs = buildProcessQueueTableByBrokerName();
+
+ std::map<std::string, std::set<MessageQueue> >::iterator it = brokerMqs.begin();
+ for (; it != brokerMqs.end(); it++)
+ {
+ std::string brokerName = it->first;
+ std::set<MessageQueue> mqs = it->second;
+
+ if (mqs.empty())
+ {
+ continue;
+ }
+
+ FindBrokerResult findBrokerResult =
+ m_pMQClientFactory->findBrokerAddressInSubscribe(brokerName, MixAll::MASTER_ID, true);
+ if (!findBrokerResult.brokerAddr.empty())
+ {
+ LockBatchRequestBody* requestBody = new LockBatchRequestBody();
+ requestBody->setConsumerGroup(m_consumerGroup);
+ requestBody->setClientId(m_pMQClientFactory->getClientId());
+ requestBody->setMqSet(mqs);
+
+ try
+ {
+ std::set<MessageQueue> lockOKMQSet =
+ m_pMQClientFactory->getMQClientAPIImpl()->lockBatchMQ(
+ findBrokerResult.brokerAddr, requestBody, 1000);
+
+ std::set<MessageQueue>::iterator its = lockOKMQSet.begin();
+ for (; its != lockOKMQSet.end(); its++)
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ MessageQueue mq = *its;
+ std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
+ if (itt != m_processQueueTable.end())
+ {
+ ProcessQueue* processQueue = itt->second;
+ if (!processQueue->isLocked())
+ {
+ RMQ_INFO("the message queue locked OK, Group: {%s}, MQ: %s",
+ m_consumerGroup.c_str(),
+ mq.toString().c_str());
+ }
+
+ processQueue->setLocked(true);
+ processQueue->setLastLockTimestamp(KPRUtil::GetCurrentTimeMillis());
+ }
+ }
+
+ its = mqs.begin();
+ for (; its != mqs.end(); its++)
+ {
+ MessageQueue mq = *its;
+ std::set<MessageQueue>::iterator itf = lockOKMQSet.find(mq);
+ if (itf == lockOKMQSet.end())
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::map<MessageQueue, ProcessQueue*>::iterator itt = m_processQueueTable.find(mq);
+ if (itt != m_processQueueTable.end())
+ {
+ itt->second->setLocked(false);
+ RMQ_WARN("the message queue locked Failed, Group: {%s}, MQ: %s",
+ m_consumerGroup.c_str(),
+ mq.toString().c_str());
+ }
+ }
+ }
+ }
+ catch (std::exception& e)
+ {
+ RMQ_ERROR("lockBatchMQ exception: %s", e.what());
+ }
+ }
+ }
+}
+
+void RebalanceImpl::doRebalance()
+{
+ std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
+ std::map<std::string, SubscriptionData>::iterator it = subTable.begin();
+ for (; it != subTable.end(); it++)
+ {
+ std::string topic = it->first;
+ try
+ {
+ rebalanceByTopic(topic);
+ }
+ catch (std::exception& e)
+ {
+ if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
+ {
+ RMQ_WARN("rebalanceByTopic Exception: %s", e.what());
+ }
+ }
+ }
+
+ truncateMessageQueueNotMyTopic();
+}
+
+std::map<std::string, SubscriptionData>& RebalanceImpl::getSubscriptionInner()
+{
+ return m_subscriptionInner;
+}
+
+std::map<MessageQueue, ProcessQueue*>& RebalanceImpl::getProcessQueueTable()
+{
+ return m_processQueueTable;
+}
+
+
+kpr::RWMutex& RebalanceImpl::getProcessQueueTableLock()
+{
+ return m_processQueueTableLock;
+}
+
+
+std::map<std::string, std::set<MessageQueue> >& RebalanceImpl::getTopicSubscribeInfoTable()
+{
+ return m_topicSubscribeInfoTable;
+}
+
+std::string& RebalanceImpl::getConsumerGroup()
+{
+ return m_consumerGroup;
+}
+
+void RebalanceImpl::setConsumerGroup(const std::string& consumerGroup)
+{
+ m_consumerGroup = consumerGroup;
+}
+
+MessageModel RebalanceImpl::getMessageModel()
+{
+ return m_messageModel;
+}
+
+void RebalanceImpl::setMessageModel(MessageModel messageModel)
+{
+ m_messageModel = messageModel;
+}
+
+AllocateMessageQueueStrategy* RebalanceImpl::getAllocateMessageQueueStrategy()
+{
+ return m_pAllocateMessageQueueStrategy;
+}
+
+void RebalanceImpl::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
+{
+ m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
+}
+
+MQClientFactory* RebalanceImpl::getmQClientFactory()
+{
+ return m_pMQClientFactory;
+}
+
+void RebalanceImpl::setmQClientFactory(MQClientFactory* pMQClientFactory)
+{
+ m_pMQClientFactory = pMQClientFactory;
+}
+
+std::map<std::string, std::set<MessageQueue> > RebalanceImpl::buildProcessQueueTableByBrokerName()
+{
+ std::map<std::string, std::set<MessageQueue> > result ;
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
+ for (; it != m_processQueueTable.end();)
+ {
+ MessageQueue mq = it->first;
+ std::map<std::string, std::set<MessageQueue> >::iterator itm = result.find(mq.getBrokerName());
+ if (itm == result.end())
+ {
+ std::set<MessageQueue> mqs ;
+ mqs.insert(mq);
+ result[mq.getBrokerName()] = mqs;
+ }
+ else
+ {
+ itm->second.insert(mq);
+ }
+ }
+
+ return result;
+}
+
+void RebalanceImpl::rebalanceByTopic(const std::string& topic)
+{
+ RMQ_DEBUG("rebalanceByTopic begin, topic={%s}", topic.c_str());
+ switch (m_messageModel)
+ {
+ case BROADCASTING:
+ {
+ //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
+ std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
+ if (it != m_topicSubscribeInfoTable.end())
+ {
+ std::set<MessageQueue> mqSet = it->second;
+ bool changed = updateProcessQueueTableInRebalance(topic, mqSet);
+ if (changed)
+ {
+ messageQueueChanged(topic, mqSet, mqSet);
+ RMQ_INFO("messageQueueChanged {%s} {%s} {%s} {%s}",
+ m_consumerGroup.c_str(),
+ topic.c_str(),
+ UtilAll::toString(mqSet).c_str(),
+ UtilAll::toString(mqSet).c_str());
+ }
+ }
+ else
+ {
+ RMQ_WARN("doRebalance, {%s}, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
+ }
+ break;
+ }
+ case CLUSTERING:
+ {
+ //kpr::ScopedLock<kpr::Mutex> lock(m_topicSubscribeInfoTableLock);
+ std::map<std::string, std::set<MessageQueue> >::iterator it = m_topicSubscribeInfoTable.find(topic);
+ if (it == m_topicSubscribeInfoTable.end())
+ {
+ if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != 0)
+ {
+ RMQ_WARN("doRebalance, %s, but the topic[%s] not exist.", m_consumerGroup.c_str(), topic.c_str());
+ }
+ }
+
+ std::list<std::string> cidAll = m_pMQClientFactory->findConsumerIdList(topic, m_consumerGroup);
+ if (cidAll.empty())
+ {
+ RMQ_WARN("doRebalance, %s:%s, get consumer id list failed.", m_consumerGroup.c_str(), topic.c_str());
+ }
+
+ if (it != m_topicSubscribeInfoTable.end() && !cidAll.empty())
+ {
+ std::vector<MessageQueue> mqAll;
+ std::set<MessageQueue> mqSet = it->second;
+ std::set<MessageQueue>::iterator its = mqSet.begin();
+
+ for (; its != mqSet.end(); its++)
+ {
+ mqAll.push_back(*its);
+ }
+
+ cidAll.sort();
+
+ AllocateMessageQueueStrategy* strategy = m_pAllocateMessageQueueStrategy;
+
+ std::vector<MessageQueue>* allocateResult = NULL;
+ try
+ {
+ allocateResult = strategy->allocate(m_consumerGroup,
+ m_pMQClientFactory->getClientId(), mqAll, cidAll);
+ }
+ catch (std::exception& e)
+ {
+ RMQ_ERROR("AllocateMessageQueueStrategy.allocate Exception, allocateMessageQueueStrategyName={%s}, mqAll={%s}, cidAll={%s}, %s",
+ strategy->getName().c_str(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str(), e.what());
+ return;
+ }
+
+ std::set<MessageQueue> allocateResultSet;
+ if (allocateResult != NULL)
+ {
+ for (size_t i = 0; i < allocateResult->size(); i++)
+ {
+ allocateResultSet.insert(allocateResult->at(i));
+ }
+
+ delete allocateResult;
+ }
+
+ bool changed = updateProcessQueueTableInRebalance(topic, allocateResultSet);
+ if (changed)
+ {
+ RMQ_INFO("rebalanced result changed. allocateMessageQueueStrategyName={%s}, group={%s}, topic={%s}, ConsumerId={%s}, "
+ "rebalanceSize={%u}, rebalanceMqSet={%s}, mqAllSize={%u}, cidAllSize={%u}, mqAll={%s}, cidAll={%s}",
+ strategy->getName().c_str(), m_consumerGroup.c_str(), topic.c_str(), m_pMQClientFactory->getClientId().c_str(),
+ (unsigned)allocateResultSet.size(), UtilAll::toString(allocateResultSet).c_str(),
+ (unsigned)mqAll.size(), (unsigned)cidAll.size(), UtilAll::toString(mqAll).c_str(), UtilAll::toString(cidAll).c_str()
+ );
+
+ messageQueueChanged(topic, mqSet, allocateResultSet);
+ }
+ }
+ }
+ break;
+ default:
+ break;
+ }
+ RMQ_DEBUG("rebalanceByTopic end");
+}
+
+
+void RebalanceImpl::removeProcessQueue(const MessageQueue& mq)
+{
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.find(mq);
+ if (it != m_processQueueTable.end())
+ {
+ MessageQueue mq = it->first;
+ ProcessQueue* pq = it->second;
+ bool isDroped = pq->isDropped();
+
+ this->removeUnnecessaryMessageQueue(mq, *pq);
+ RMQ_INFO("Fix Offset, {%s}, remove unnecessary mq, {%s} Droped: {%d}",
+ m_consumerGroup.c_str(), mq.toString().c_str(), isDroped);
+ }
+}
+
+
+bool RebalanceImpl::updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet)
+{
+ RMQ_DEBUG("updateProcessQueueTableInRebalance begin, topic={%s}", topic.c_str());
+ bool changed = false;
+
+ {
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
+ for (; it != m_processQueueTable.end();)
+ {
+ std::map<MessageQueue, ProcessQueue*>::iterator itCur = it++;
+ MessageQueue mq = itCur->first;
+ ProcessQueue* pq = itCur->second;
+ if (mq.getTopic() == topic)
+ {
+ std::set<MessageQueue>::iterator itMq = mqSet.find(mq);
+ if (itMq == mqSet.end())
+ {
+ pq->setDropped(true);
+ if (this->removeUnnecessaryMessageQueue(mq, *pq))
+ {
+ changed = true;
+ m_processQueueTable.erase(itCur);
+
+ RMQ_WARN("doRebalance, {%s}, remove unnecessary mq, {%s}",
+ m_consumerGroup.c_str(), mq.toString().c_str());
+ }
+ }
+ else if (pq->isPullExpired())
+ {
+ switch(this->consumeType())
+ {
+ case CONSUME_ACTIVELY:
+ break;
+ case CONSUME_PASSIVELY:
+ pq->setDropped(true);
+ if (this->removeUnnecessaryMessageQueue(mq, *pq))
+ {
+ changed = true;
+ m_processQueueTable.erase(itCur);
+
+ RMQ_ERROR("[BUG]doRebalance, {%s}, remove unnecessary mq, {%s}, because pull is pause, so try to fixed it",
+ m_consumerGroup.c_str(), mq.toString().c_str());
+ }
+ break;
+ default:
+ break;
+ }
+ }
+ }
+ }
+ }
+
+ std::list<PullRequest*> pullRequestList;
+ std::set<MessageQueue>::iterator its = mqSet.begin();
+ for (; its != mqSet.end(); its++)
+ {
+ MessageQueue mq = *its;
+ bool find = false;
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::map<MessageQueue, ProcessQueue*>::iterator itm = m_processQueueTable.find(mq);
+ if (itm != m_processQueueTable.end())
+ {
+ find = true;
+ }
+ }
+
+ if (!find)
+ {
+ //todo: memleak
+ PullRequest* pullRequest = new PullRequest();
+ pullRequest->setConsumerGroup(m_consumerGroup);
+ pullRequest->setMessageQueue(mq);
+ pullRequest->setProcessQueue(new ProcessQueue());//todo: memleak
+
+ long long nextOffset = computePullFromWhere(mq);
+ if (nextOffset >= 0)
+ {
+ pullRequest->setNextOffset(nextOffset);
+ pullRequestList.push_back(pullRequest);
+ changed = true;
+
+ {
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ m_processQueueTable[mq] = pullRequest->getProcessQueue();
+ RMQ_INFO("doRebalance, {%s}, add a new mq, {%s}, pullRequst: %s",
+ m_consumerGroup.c_str(), mq.toString().c_str(), pullRequest->toString().c_str());
+ }
+ }
+ else
+ {
+ RMQ_WARN("doRebalance, {%s}, add new mq failed, {%s}",
+ m_consumerGroup.c_str(), mq.toString().c_str());
+ }
+ }
+ }
+
+ //todo memleak
+ dispatchPullRequest(pullRequestList);
+ RMQ_DEBUG("updateProcessQueueTableInRebalance end");
+
+ return changed;
+}
+
+void RebalanceImpl::truncateMessageQueueNotMyTopic()
+{
+ std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
+
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_processQueueTableLock);
+ std::map<MessageQueue, ProcessQueue*>::iterator it = m_processQueueTable.begin();
+ for (; it != m_processQueueTable.end();)
+ {
+ MessageQueue mq = it->first;
+ std::map<std::string, SubscriptionData>::iterator itt = subTable.find(mq.getTopic());
+
+ if (itt == subTable.end())
+ {
+ ProcessQueue* pq = it->second;
+ if (pq != NULL)
+ {
+ pq->setDropped(true);
+ RMQ_WARN("doRebalance, {%s}, truncateMessageQueueNotMyTopic remove unnecessary mq, {%s}",
+ m_consumerGroup.c_str(), mq.toString().c_str());
+ }
+ m_processQueueTable.erase(it++);
+ }
+ else
+ {
+ it++;
+ }
+ }
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceImpl.h b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
new file mode 100755
index 0000000..577a031
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceImpl.h
@@ -0,0 +1,102 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REBALANCEIMPL_H__
+#define __REBALANCEIMPL_H__
+
+#include <map>
+#include <string>
+#include <set>
+#include <list>
+
+#include "ConsumeType.h"
+#include "MessageQueue.h"
+#include "ProcessQueue.h"
+#include "PullRequest.h"
+#include "SubscriptionData.h"
+
+namespace rmq
+{
+ class AllocateMessageQueueStrategy;
+ class MQClientFactory;
+
+ class RebalanceImpl
+ {
+ public:
+ RebalanceImpl(const std::string& consumerGroup,
+ MessageModel messageModel,
+ AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+ MQClientFactory* pMQClientFactory);
+ virtual ~RebalanceImpl();
+
+ virtual void messageQueueChanged(const std::string& topic,
+ std::set<MessageQueue>& mqAll,
+ std::set<MessageQueue>& mqDivided) = 0;
+ virtual bool removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq) = 0;
+ virtual void dispatchPullRequest(std::list<PullRequest*>& pullRequestList) = 0;
+ virtual long long computePullFromWhere(MessageQueue& mq) = 0;
+ virtual ConsumeType consumeType() = 0;
+
+ bool lock(MessageQueue& mq);
+ void lockAll();
+
+ void unlock(MessageQueue& mq, bool oneway);
+ void unlockAll(bool oneway);
+
+ void doRebalance();
+
+ std::map<MessageQueue, ProcessQueue*>& getProcessQueueTable();
+ kpr::RWMutex& getProcessQueueTableLock();
+ std::map<std::string, SubscriptionData>& getSubscriptionInner();
+ std::map<std::string, std::set<MessageQueue> >& getTopicSubscribeInfoTable();
+
+ std::string& getConsumerGroup();
+ void setConsumerGroup(const std::string& consumerGroup);
+
+ MessageModel getMessageModel();
+ void setMessageModel(MessageModel messageModel);
+
+ AllocateMessageQueueStrategy* getAllocateMessageQueueStrategy();
+ void setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy);
+
+ MQClientFactory* getmQClientFactory();
+ void setmQClientFactory(MQClientFactory* pMQClientFactory);
+
+ void removeProcessQueue(const MessageQueue& mq);
+
+ private:
+ std::map<std::string, std::set<MessageQueue> > buildProcessQueueTableByBrokerName();
+ void rebalanceByTopic(const std::string& topic);
+ bool updateProcessQueueTableInRebalance(const std::string& topic, std::set<MessageQueue>& mqSet);
+ void truncateMessageQueueNotMyTopic();
+
+ protected:
+ std::map<MessageQueue, ProcessQueue*> m_processQueueTable;
+ kpr::RWMutex m_processQueueTableLock;
+
+ std::map<std::string, std::set<MessageQueue> > m_topicSubscribeInfoTable;
+ kpr::Mutex m_topicSubscribeInfoTableLock;
+
+ std::map<std::string, SubscriptionData> m_subscriptionInner;
+ kpr::Mutex m_subscriptionInnerLock;
+
+ std::string m_consumerGroup;
+ MessageModel m_messageModel;
+ AllocateMessageQueueStrategy* m_pAllocateMessageQueueStrategy;
+ MQClientFactory* m_pMQClientFactory;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
new file mode 100755
index 0000000..1aa287b
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.cpp
@@ -0,0 +1,79 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalancePullImpl.h"
+#include "DefaultMQPullConsumerImpl.h"
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientFactory.h"
+#include "MessageQueueListener.h"
+#include "OffsetStore.h"
+#include "DefaultMQPullConsumer.h"
+
+namespace rmq
+{
+
+RebalancePullImpl::RebalancePullImpl(DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
+ : RebalanceImpl("", BROADCASTING, NULL, NULL),
+ m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
+{
+}
+
+RebalancePullImpl::RebalancePullImpl(const std::string& consumerGroup,
+ MessageModel messageModel,
+ AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+ MQClientFactory* pMQClientFactory,
+ DefaultMQPullConsumerImpl* pDefaultMQPullConsumerImpl)
+ : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
+ m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl)
+{
+}
+
+long long RebalancePullImpl::computePullFromWhere(MessageQueue& mq)
+{
+ return 0;
+}
+
+void RebalancePullImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
+{
+}
+
+void RebalancePullImpl::messageQueueChanged(const std::string& topic,
+ std::set<MessageQueue>& mqAll,
+ std::set<MessageQueue>& mqDivided)
+{
+ MessageQueueListener* messageQueueListener =
+ m_pDefaultMQPullConsumerImpl->getDefaultMQPullConsumer()->getMessageQueueListener();
+ if (messageQueueListener != NULL)
+ {
+ try
+ {
+ messageQueueListener->messageQueueChanged(topic, mqAll, mqDivided);
+ }
+ catch (...)
+ {
+ RMQ_ERROR("messageQueueChanged exception, %s", topic.c_str());
+ }
+ }
+}
+
+bool RebalancePullImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
+{
+ m_pDefaultMQPullConsumerImpl->getOffsetStore()->persist(mq);
+ m_pDefaultMQPullConsumerImpl->getOffsetStore()->removeOffset(mq);
+ return true;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
new file mode 100755
index 0000000..46dbcd1
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePullImpl.h
@@ -0,0 +1,56 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REBALANCEPULLIMPL_H__
+#define __REBALANCEPULLIMPL_H__
+
+#include "RebalanceImpl.h"
+
+namespace rmq
+{
+class DefaultMQPullConsumerImpl;
+
+class RebalancePullImpl : public RebalanceImpl
+{
+public:
+ RebalancePullImpl(DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
+
+ RebalancePullImpl(const std::string &consumerGroup,
+ MessageModel messageModel,
+ AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
+ MQClientFactory *pMQClientFactory,
+ DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl);
+
+ long long computePullFromWhere(MessageQueue &mq);
+
+ void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
+
+ void messageQueueChanged(const std::string &topic,
+ std::set<MessageQueue> &mqAll,
+ std::set<MessageQueue> &mqDivided);
+
+ bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
+
+ ConsumeType consumeType()
+ {
+ return CONSUME_ACTIVELY;
+ };
+
+private:
+ DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
new file mode 100755
index 0000000..fde770d
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.cpp
@@ -0,0 +1,217 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalancePushImpl.h"
+
+#include <string.h>
+#include <limits.h>
+
+#include "DefaultMQPushConsumerImpl.h"
+#include "AllocateMessageQueueStrategy.h"
+#include "MQClientFactory.h"
+#include "MessageQueueListener.h"
+#include "OffsetStore.h"
+#include "DefaultMQPushConsumer.h"
+#include "MQAdminImpl.h"
+
+
+namespace rmq
+{
+
+RebalancePushImpl::RebalancePushImpl(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
+ : RebalanceImpl("", BROADCASTING, NULL, NULL),
+ m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
+{
+}
+
+RebalancePushImpl::RebalancePushImpl(const std::string& consumerGroup,
+ MessageModel messageModel,
+ AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy,
+ MQClientFactory* pMQClientFactory,
+ DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl)
+ : RebalanceImpl(consumerGroup, messageModel, pAllocateMessageQueueStrategy, pMQClientFactory),
+ m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl)
+{
+}
+
+void RebalancePushImpl::dispatchPullRequest(std::list<PullRequest*>& pullRequestList)
+{
+ std::list<PullRequest*>::iterator it = pullRequestList.begin();
+ for (; it != pullRequestList.end(); it++)
+ {
+ m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(*it);
+ RMQ_INFO("doRebalance, {%s}, add a new pull request {%s}",
+ m_consumerGroup.c_str(), (*it)->toString().c_str());
+ }
+}
+
+long long RebalancePushImpl::computePullFromWhere(MessageQueue& mq)
+{
+ long long result = -1;
+ ConsumeFromWhere consumeFromWhere =
+ m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeFromWhere();
+ OffsetStore* offsetStore = m_pDefaultMQPushConsumerImpl->getOffsetStore();
+
+ switch (consumeFromWhere)
+ {
+ case CONSUME_FROM_FIRST_OFFSET:
+ {
+ long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
+ if (lastOffset >= 0)
+ {
+ result = lastOffset;
+ }
+ else if (-1 == lastOffset)
+ {
+ result = 0L;
+ }
+ else
+ {
+ result = -1;
+ }
+ break;
+ }
+ case CONSUME_FROM_LAST_OFFSET:
+ {
+ long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
+ if (lastOffset >= 0)
+ {
+ result = lastOffset;
+ }
+ else if (-1 == lastOffset)
+ {
+ if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
+ {
+ result = 0L;
+ }
+ else
+ {
+ //result = LLONG_MAX;
+ try
+ {
+ result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+ }
+ catch(...)
+ {
+ result = -1;
+ }
+ }
+ }
+ else
+ {
+ result = -1;
+ }
+ break;
+ }
+
+ case CONSUME_FROM_MAX_OFFSET:
+ result = LLONG_MAX;
+ break;
+ case CONSUME_FROM_MIN_OFFSET:
+ result = 0L;
+ break;
+ case CONSUME_FROM_TIMESTAMP:
+ {
+ long long lastOffset = offsetStore->readOffset(mq, READ_FROM_STORE);
+ if (lastOffset >= 0)
+ {
+ result = lastOffset;
+ }
+ else if (-1 == lastOffset)
+ {
+ if (strncmp(MixAll::RETRY_GROUP_TOPIC_PREFIX.c_str(), mq.getTopic().c_str(), MixAll::RETRY_GROUP_TOPIC_PREFIX.size()) == 0)
+ {
+ //result = LLONG_MAX;
+ try
+ {
+ result = m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+ }
+ catch(...)
+ {
+ result = -1;
+ }
+ }
+ else
+ {
+ try
+ {
+ long timestamp = UtilAll::str2tm(
+ m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer()->getConsumeTimestamp(),
+ rmq::yyyyMMddHHmmss);
+ result = m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
+ }
+ catch(...)
+ {
+ result = -1;
+ }
+ }
+ }
+ else
+ {
+ result = -1;
+ }
+ break;
+ }
+ break;
+ default:
+ break;
+ }
+
+ return result;
+}
+
+void RebalancePushImpl::messageQueueChanged(const std::string& topic,
+ std::set<MessageQueue>& mqAll,
+ std::set<MessageQueue>& mqDivided)
+{
+}
+
+
+bool RebalancePushImpl::removeUnnecessaryMessageQueue(MessageQueue& mq, ProcessQueue& pq)
+{
+ m_pDefaultMQPushConsumerImpl->getOffsetStore()->persist(mq);
+ m_pDefaultMQPushConsumerImpl->getOffsetStore()->removeOffset(mq);
+ if (m_pDefaultMQPushConsumerImpl->isConsumeOrderly()
+ && m_pDefaultMQPushConsumerImpl->messageModel() == CLUSTERING)
+ {
+ if (pq.getLockConsume().TryLock(1000))
+ {
+ try
+ {
+ this->unlock(mq, true);
+ }
+ catch (std::exception& e)
+ {
+ RMQ_ERROR("removeUnnecessaryMessageQueue Exception: %s", e.what());
+ }
+ pq.getLockConsume().Unlock();
+ }
+ else
+ {
+ RMQ_WARN("[WRONG]mq is consuming, so can not unlock it, MQ:%s, maybe hanged for a while, times:{%lld}",
+ mq.toString().c_str(),
+ pq.getTryUnlockTimes());
+
+ pq.incTryUnlockTimes();
+ }
+
+ return false;
+ }
+
+ return true;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
new file mode 100755
index 0000000..0aa2b0e
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalancePushImpl.h
@@ -0,0 +1,55 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __REBALANCEPUSHIMPL_H__
+#define __REBALANCEPUSHIMPL_H__
+
+#include "RebalanceImpl.h"
+
+namespace rmq
+{
+class DefaultMQPushConsumerImpl;
+
+class RebalancePushImpl : public RebalanceImpl
+{
+public:
+ RebalancePushImpl(DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
+
+ RebalancePushImpl(const std::string &consumerGroup,
+ MessageModel messageModel,
+ AllocateMessageQueueStrategy *pAllocateMessageQueueStrategy,
+ MQClientFactory *pMQClientFactory,
+ DefaultMQPushConsumerImpl *pDefaultMQPushConsumerImpl);
+
+ void dispatchPullRequest(std::list<PullRequest *> &pullRequestList);
+ long long computePullFromWhere(MessageQueue &mq);
+ void messageQueueChanged(const std::string &topic,
+ std::set<MessageQueue> &mqAll,
+ std::set<MessageQueue> &mqDivided);
+ bool removeUnnecessaryMessageQueue(MessageQueue &mq, ProcessQueue &pq);
+
+
+ ConsumeType consumeType()
+ {
+ return CONSUME_PASSIVELY;
+ };
+
+private:
+ DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.cpp b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
new file mode 100644
index 0000000..013fefb
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceService.cpp
@@ -0,0 +1,55 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RebalanceService.h"
+#include "MQClientFactory.h"
+
+namespace rmq
+{
+
+long RebalanceService::s_WaitInterval = 1000 * 10;
+
+RebalanceService::RebalanceService(MQClientFactory* pMQClientFactory)
+ : ServiceThread("RebalanceService"),
+ m_pMQClientFactory(pMQClientFactory)
+{
+}
+
+
+RebalanceService::~RebalanceService()
+{
+
+}
+
+void RebalanceService::Run()
+{
+ RMQ_INFO("%s service started", getServiceName().c_str());
+
+ while (!m_stoped)
+ {
+ waitForRunning(s_WaitInterval);
+ m_pMQClientFactory->doRebalance();
+ }
+
+ RMQ_INFO("%s service end", getServiceName().c_str());
+}
+
+std::string RebalanceService::getServiceName()
+{
+ return "RebalanceService";
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RebalanceService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RebalanceService.h b/rocketmq-client4cpp/src/consumer/RebalanceService.h
new file mode 100755
index 0000000..ef4d746
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RebalanceService.h
@@ -0,0 +1,44 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REBALANCESERVICE_H__
+#define __REBALANCESERVICE_H__
+
+#include "ServiceThread.h"
+
+namespace rmq
+{
+ class MQClientFactory;
+
+ /**
+ * Rebalance service
+ *
+ */
+ class RebalanceService : public ServiceThread
+ {
+ public:
+ RebalanceService(MQClientFactory* pMQClientFactory);
+ ~RebalanceService();
+
+ void Run();
+ std::string getServiceName();
+
+ private:
+ MQClientFactory* m_pMQClientFactory;
+ static long s_WaitInterval;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
new file mode 100755
index 0000000..1c4fd23
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -0,0 +1,266 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "RemoteBrokerOffsetStore.h"
+#include "MQClientFactory.h"
+#include "ScopedLock.h"
+#include "MQClientException.h"
+#include "CommandCustomHeader.h"
+#include "MQClientAPIImpl.h"
+
+namespace rmq
+{
+
+RemoteBrokerOffsetStore::RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName)
+{
+ m_pMQClientFactory = pMQClientFactory;
+ m_groupName = groupName;
+}
+
+void RemoteBrokerOffsetStore::load()
+{
+
+}
+
+void RemoteBrokerOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)
+{
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+ typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+ if (it == m_offsetTable.end())
+ {
+ m_offsetTable[mq] = offset;
+ it = m_offsetTable.find(mq);
+ }
+
+ kpr::AtomicLong& offsetOld = it->second;
+ if (increaseOnly)
+ {
+ MixAll::compareAndIncreaseOnly(offsetOld, offset);
+ }
+ else
+ {
+ offsetOld.set(offset);
+ }
+}
+
+long long RemoteBrokerOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type)
+{
+ RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type);
+ switch (type)
+ {
+ case MEMORY_FIRST_THEN_STORE:
+ case READ_FROM_MEMORY:
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+ typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+ if (it != m_offsetTable.end())
+ {
+ return it->second.get();
+ }
+ else if (READ_FROM_MEMORY == type)
+ {
+ RMQ_DEBUG("No offset in memory, MQ:%s", mq.toString().c_str());
+ return -1;
+ }
+ }
+ case READ_FROM_STORE:
+ {
+ try
+ {
+ long long brokerOffset = this->fetchConsumeOffsetFromBroker(mq);
+ RMQ_DEBUG("fetchConsumeOffsetFromBroker, MQ:%s, brokerOffset:%lld",
+ mq.toString().c_str(), brokerOffset);
+ if (brokerOffset >= 0)
+ {
+ this->updateOffset(mq, brokerOffset, false);
+ }
+ return brokerOffset;
+ }
+ // No offset in broker
+ catch (const MQBrokerException& e)
+ {
+ RMQ_WARN("No offset in broker, MQ:%s, exception:%s", mq.toString().c_str(), e.what());
+ return -1;
+ }
+ catch (const std::exception& e)
+ {
+ RMQ_ERROR("fetchConsumeOffsetFromBroker exception, MQ:%s, msg:%s",
+ mq.toString().c_str(), e.what());
+ return -2;
+ }
+ catch (...)
+ {
+ RMQ_ERROR("fetchConsumeOffsetFromBroker unknow exception, MQ:%s",
+ mq.toString().c_str());
+ return -2;
+ }
+ }
+ default:
+ break;
+ }
+
+ return -1;
+}
+
+void RemoteBrokerOffsetStore::persistAll(std::set<MessageQueue>& mqs)
+{
+ if (mqs.empty())
+ {
+ return;
+ }
+
+ std::set<MessageQueue> unusedMQ;
+ long long times = m_storeTimesTotal.fetchAndAdd(1);
+
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+ for (typeof(m_offsetTable.begin()) it = m_offsetTable.begin();
+ it != m_offsetTable.end(); it++)
+ {
+ MessageQueue mq = it->first;
+ kpr::AtomicLong& offset = it->second;
+ if (mqs.find(mq) != mqs.end())
+ {
+ try
+ {
+ this->updateConsumeOffsetToBroker(mq, offset.get());
+ if ((times % 12) == 0)
+ {
+ RMQ_INFO("updateConsumeOffsetToBroker, Group: {%s} ClientId: {%s} mq:{%s} offset {%llu}",
+ m_groupName.c_str(),
+ m_pMQClientFactory->getClientId().c_str(),
+ mq.toString().c_str(),
+ offset.get());
+ }
+ }
+ catch (...)
+ {
+ RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
+ }
+ }
+ else
+ {
+ unusedMQ.insert(mq);
+ }
+ }
+
+ if (!unusedMQ.empty())
+ {
+ for (typeof(unusedMQ.begin()) it = unusedMQ.begin(); it != unusedMQ.end(); it++)
+ {
+ m_offsetTable.erase(*it);
+ RMQ_INFO("remove unused mq, %s, %s", it->toString().c_str(), m_groupName.c_str());
+ }
+ }
+}
+
+void RemoteBrokerOffsetStore::persist(const MessageQueue& mq)
+{
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+ typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+ if (it != m_offsetTable.end())
+ {
+ try
+ {
+ this->updateConsumeOffsetToBroker(mq, it->second.get());
+ RMQ_DEBUG("updateConsumeOffsetToBroker ok, mq=%s, offset=%lld", mq.toString().c_str(), it->second.get());
+ }
+ catch (...)
+ {
+ RMQ_ERROR("updateConsumeOffsetToBroker exception, mq=%s", mq.toString().c_str());
+ }
+ }
+}
+
+void RemoteBrokerOffsetStore::removeOffset(const MessageQueue& mq)
+{
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+ m_offsetTable.erase(mq);
+ RMQ_INFO("remove unnecessary messageQueue offset. mq=%s, offsetTableSize=%u",
+ mq.toString().c_str(), (unsigned)m_offsetTable.size());
+}
+
+
+std::map<MessageQueue, long long> RemoteBrokerOffsetStore::cloneOffsetTable(const std::string& topic)
+{
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+ std::map<MessageQueue, long long> cloneOffsetTable;
+ RMQ_FOR_EACH(m_offsetTable, it)
+ {
+ MessageQueue mq = it->first;
+ kpr::AtomicLong& offset = it->second;
+ if (topic == mq.getTopic())
+ {
+ cloneOffsetTable[mq] = offset.get();
+ }
+ }
+
+ return cloneOffsetTable;
+}
+
+
+void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset)
+{
+ FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+ if (findBrokerResult.brokerAddr.empty())
+ {
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+ }
+
+ if (!findBrokerResult.brokerAddr.empty())
+ {
+ UpdateConsumerOffsetRequestHeader* requestHeader = new UpdateConsumerOffsetRequestHeader();
+ requestHeader->topic = mq.getTopic();
+ requestHeader->consumerGroup = this->m_groupName;
+ requestHeader->queueId = mq.getQueueId();
+ requestHeader->commitOffset = offset;
+
+ m_pMQClientFactory->getMQClientAPIImpl()->updateConsumerOffsetOneway(
+ findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
+ }
+ else
+ {
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+ }
+}
+
+long long RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MessageQueue& mq)
+{
+ FindBrokerResult findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+ if (findBrokerResult.brokerAddr.empty())
+ {
+ // TODO Here may be heavily overhead for Name Server,need tuning
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+ findBrokerResult = m_pMQClientFactory->findBrokerAddressInAdmin(mq.getBrokerName());
+ }
+
+ if (!findBrokerResult.brokerAddr.empty())
+ {
+ QueryConsumerOffsetRequestHeader* requestHeader = new QueryConsumerOffsetRequestHeader();
+ requestHeader->topic = mq.getTopic();
+ requestHeader->consumerGroup = this->m_groupName;
+ requestHeader->queueId = mq.getQueueId();
+
+ return m_pMQClientFactory->getMQClientAPIImpl()->queryConsumerOffset(
+ findBrokerResult.brokerAddr, requestHeader, 1000 * 5);
+ }
+ else
+ {
+ THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+ }
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
new file mode 100755
index 0000000..b613084
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/RemoteBrokerOffsetStore.h
@@ -0,0 +1,61 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __REMOTEBROKEROFFSETSTORE_H__
+#define __REMOTEBROKEROFFSETSTORE_H__
+
+#include "OffsetStore.h"
+#include <map>
+#include <string>
+#include <set>
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "Mutex.h"
+
+namespace rmq
+{
+ class MQClientFactory;
+
+ /**
+ * offset remote store
+ *
+ */
+ class RemoteBrokerOffsetStore : public OffsetStore
+ {
+ public:
+ RemoteBrokerOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName) ;
+
+ void load();
+ void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly);
+ long long readOffset(const MessageQueue& mq, ReadOffsetType type);
+ void persistAll(std::set<MessageQueue>& mqs);
+ void persist(const MessageQueue& mq);
+ void removeOffset(const MessageQueue& mq) ;
+ std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic);
+
+ private:
+ void updateConsumeOffsetToBroker(const MessageQueue& mq, long long offset);
+ long long fetchConsumeOffsetFromBroker(const MessageQueue& mq);
+
+ private:
+ MQClientFactory* m_pMQClientFactory;
+ std::string m_groupName;
+ kpr::AtomicInteger m_storeTimesTotal;
+ std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
+ kpr::RWMutex m_tableMutex;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
new file mode 100755
index 0000000..ed5cf12
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/SubscriptionData.cpp
@@ -0,0 +1,201 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "SubscriptionData.h"
+
+#include <sstream>
+#include "KPRUtil.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+std::string SubscriptionData::SUB_ALL = "*";
+
+SubscriptionData::SubscriptionData()
+{
+ m_subVersion = KPRUtil::GetCurrentTimeMillis();
+}
+
+SubscriptionData::SubscriptionData(const std::string& topic, const std::string& subString)
+ : m_topic(topic),
+ m_subString(subString)
+{
+ m_subVersion = KPRUtil::GetCurrentTimeMillis();
+}
+
+std::string SubscriptionData::getTopic()const
+{
+ return m_topic;
+}
+
+void SubscriptionData::setTopic(const std::string& topic)
+{
+ m_topic = topic;
+}
+
+std::string SubscriptionData::getSubString()
+{
+ return m_subString;
+}
+
+void SubscriptionData::setSubString(const std::string& subString)
+{
+ m_subString = subString;
+}
+
+std::set<std::string>& SubscriptionData::getTagsSet()
+{
+ return m_tagsSet;
+}
+
+void SubscriptionData::setTagsSet(const std::set<std::string>& tagsSet)
+{
+ m_tagsSet = tagsSet;
+}
+
+long long SubscriptionData::getSubVersion()
+{
+ return m_subVersion;
+}
+
+void SubscriptionData::setSubVersion(long long subVersion)
+{
+ m_subVersion = subVersion;
+}
+
+std::set<int>& SubscriptionData::getCodeSet()
+{
+ return m_codeSet;
+}
+
+void SubscriptionData::setCodeSet(const std::set<int>& codeSet)
+{
+ m_codeSet = codeSet;
+}
+
+int SubscriptionData::hashCode()
+{
+ /*
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + (classFilterMode ? 1231 : 1237);
+ result = prime * result + ((codeSet == null) ? 0 : codeSet.hashCode());
+ result = prime * result + ((subString == null) ? 0 : subString.hashCode());
+ result = prime * result + ((tagsSet == null) ? 0 : tagsSet.hashCode());
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ */
+ std::stringstream ss;
+ ss << UtilAll::hashCode(m_codeSet)
+ << m_subString
+ << UtilAll::hashCode(m_tagsSet)
+ << m_topic;
+ return UtilAll::hashCode(ss.str());
+}
+
+
+
+
+bool SubscriptionData::operator==(const SubscriptionData& other)
+{
+ if (m_codeSet != other.m_codeSet)
+ {
+ return false;
+ }
+
+ if (m_subString != other.m_subString)
+ {
+ return false;
+ }
+
+ if (m_subVersion != other.m_subVersion)
+ {
+ return false;
+ }
+
+ if (m_tagsSet != other.m_tagsSet)
+ {
+ return false;
+ }
+
+ if (m_topic != other.m_topic)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+bool SubscriptionData::operator<(const SubscriptionData& other)const
+{
+ if (m_topic < other.m_topic)
+ {
+ return true;
+ }
+ else if (m_topic == other.m_topic)
+ {
+ if (m_subString < other.m_subString)
+ {
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+ else
+ {
+ return false;
+ }
+}
+
+void SubscriptionData::toJson(Json::Value& obj) const
+{
+ obj["classFilterMode"] = false;
+ obj["topic"] = m_topic;
+ obj["subString"] = m_subString;
+ obj["subVersion"] = (long long)m_subVersion;
+
+ Json::Value tagSet(Json::arrayValue);
+ RMQ_FOR_EACH(m_tagsSet, it)
+ {
+ tagSet.append(*it);
+ }
+ obj["tagsSet"] = tagSet;
+
+ Json::Value codeSet(Json::arrayValue);
+ RMQ_FOR_EACH(m_codeSet, it)
+ {
+ codeSet.append(*it);
+ }
+ obj["codeSet"] = codeSet;
+}
+
+std::string SubscriptionData::toString() const
+{
+ std::stringstream ss;
+ ss << "{classFilterMode=" << false
+ << ",topic=" << m_topic
+ << ",subString=" << m_subString
+ << ",subVersion=" << m_subVersion
+ << ",tagsSet=" << UtilAll::toString(m_tagsSet)
+ << ",codeSet=" << UtilAll::toString(m_codeSet)
+ << "}";
+ return ss.str();
+}
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/SubscriptionData.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/SubscriptionData.h b/rocketmq-client4cpp/src/consumer/SubscriptionData.h
new file mode 100755
index 0000000..4796fb7
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/SubscriptionData.h
@@ -0,0 +1,76 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __SUBSCRIPTIONDATA_H__
+#define __SUBSCRIPTIONDATA_H__
+
+#include <string>
+#include <set>
+
+#include "RocketMQClient.h"
+#include "RemotingSerializable.h"
+#include "RefHandle.h"
+#include "json/json.h"
+
+namespace rmq
+{
+ class SubscriptionData : public kpr::RefCount
+ {
+ public:
+ SubscriptionData();
+ SubscriptionData(const std::string& topic, const std::string& subString);
+
+ std::string getTopic()const;
+ void setTopic(const std::string& topic);
+
+ std::string getSubString();
+ void setSubString(const std::string& subString);
+
+ std::set<std::string>& getTagsSet();
+ void setTagsSet(const std::set<std::string>& tagsSet);
+
+ long long getSubVersion();
+ void setSubVersion(long long subVersion);
+
+ std::set<int>& getCodeSet();
+ void setCodeSet(const std::set<int>& codeSet);
+
+ int hashCode();
+ void toJson(Json::Value& obj) const;
+ std::string toString() const;
+
+ bool operator==(const SubscriptionData& other);
+ bool operator<(const SubscriptionData& other)const;
+
+ public:
+ static std::string SUB_ALL;
+
+ private:
+ std::string m_topic;
+ std::string m_subString;
+ std::set<std::string> m_tagsSet;
+ std::set<int> m_codeSet;
+ long long m_subVersion ;
+ };
+ typedef kpr::RefHandleT<SubscriptionData> SubscriptionDataPtr;
+
+ inline std::ostream& operator<<(std::ostream& os, const SubscriptionData& obj)
+ {
+ os << obj.toString();
+ return os;
+ }
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/AUTHORS
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/AUTHORS b/rocketmq-client4cpp/src/jsoncpp/AUTHORS
new file mode 100755
index 0000000..e69de29
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/LICENSE
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/LICENSE b/rocketmq-client4cpp/src/jsoncpp/LICENSE
new file mode 100755
index 0000000..403d096
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/LICENSE
@@ -0,0 +1 @@
+The json-cpp library and this documentation are in Public Domain.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/README.txt
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/README.txt b/rocketmq-client4cpp/src/jsoncpp/README.txt
new file mode 100755
index 0000000..379d376
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/README.txt
@@ -0,0 +1,117 @@
+* Introduction:
+ =============
+
+JSON (JavaScript Object Notation) is a lightweight data-interchange format.
+It can represent integer, real number, string, an ordered sequence of
+value, and a collection of name/value pairs.
+
+JsonCpp is a simple API to manipulate JSON value, handle serialization
+and unserialization to string.
+
+It can also preserve existing comment in unserialization/serialization steps,
+making it a convenient format to store user input files.
+
+Unserialization parsing is user friendly and provides precise error reports.
+
+
+* Building/Testing:
+ =================
+
+JsonCpp uses Scons (http://www.scons.org) as a build system. Scons requires
+python to be installed (http://www.python.org).
+
+You download scons-local distribution from the following url:
+http://sourceforge.net/project/showfiles.php?group_id=30337&package_id=67375
+
+Unzip it in the directory where you found this README file. scons.py Should be
+at the same level as README.
+
+python scons.py platform=PLTFRM [TARGET]
+where PLTFRM may be one of:
+ suncc Sun C++ (Solaris)
+ vacpp Visual Age C++ (AIX)
+ mingw
+ msvc6 Microsoft Visual Studio 6 service pack 5-6
+ msvc70 Microsoft Visual Studio 2002
+ msvc71 Microsoft Visual Studio 2003
+ msvc80 Microsoft Visual Studio 2005
+ linux-gcc Gnu C++ (linux, also reported to work for Mac OS X)
+
+adding platform is fairly simple. You need to change the Sconstruct file
+to do so.
+
+and TARGET may be:
+ check: build library and run unit tests.
+
+
+* Running the test manually:
+ ==========================
+
+cd test
+# This will run the Reader/Writer tests
+python runjsontests.py "path to jsontest.exe"
+
+# This will run the Reader/Writer tests, using JSONChecker test suite
+# (http://www.json.org/JSON_checker/).
+# Notes: not all tests pass: JsonCpp is too lenient (for example,
+# it allows an integer to start with '0'). The goal is to improve
+# strict mode parsing to get all tests to pass.
+python runjsontests.py --with-json-checker "path to jsontest.exe"
+
+# This will run the unit tests (mostly Value)
+python rununittests.py "path to test_lib_json.exe"
+
+You can run the tests using valgrind:
+python rununittests.py --valgrind "path to test_lib_json.exe"
+
+
+* Building the documentation:
+ ===========================
+
+Run the python script doxybuild.py from the top directory:
+
+python doxybuild.py --open --with-dot
+
+See doxybuild.py --help for options.
+
+
+* Adding a reader/writer test:
+ ============================
+
+To add a test, you need to create two files in test/data:
+- a TESTNAME.json file, that contains the input document in JSON format.
+- a TESTNAME.expected file, that contains a flatened representation of
+ the input document.
+
+TESTNAME.expected file format:
+- each line represents a JSON element of the element tree represented
+ by the input document.
+- each line has two parts: the path to access the element separated from
+ the element value by '='. Array and object values are always empty
+ (e.g. represented by either [] or {}).
+- element path: '.' represented the root element, and is used to separate
+ object members. [N] is used to specify the value of an array element
+ at index N.
+See test_complex_01.json and test_complex_01.expected to better understand
+element path.
+
+
+* Understanding reader/writer test output:
+ ========================================
+
+When a test is run, output files are generated aside the input test files.
+Below is a short description of the content of each file:
+
+- test_complex_01.json: input JSON document
+- test_complex_01.expected: flattened JSON element tree used to check if
+ parsing was corrected.
+
+- test_complex_01.actual: flattened JSON element tree produced by
+ jsontest.exe from reading test_complex_01.json
+- test_complex_01.rewrite: JSON document written by jsontest.exe using the
+ Json::Value parsed from test_complex_01.json and serialized using
+ Json::StyledWritter.
+- test_complex_01.actual-rewrite: flattened JSON element tree produced by
+ jsontest.exe from reading test_complex_01.rewrite.
+test_complex_01.process-output: jsontest.exe output, typically useful to
+ understand parsing error.
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/allocator.h b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
new file mode 100755
index 0000000..1235a3e
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/json/allocator.h
@@ -0,0 +1,96 @@
+// Copyright 2007-2010 Baptiste Lepilleur
+// Distributed under MIT license, or public domain if desired and
+// recognized in your jurisdiction.
+// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
+
+#ifndef CPPTL_JSON_ALLOCATOR_H_INCLUDED
+#define CPPTL_JSON_ALLOCATOR_H_INCLUDED
+
+#include <cstring>
+#include <memory>
+
+namespace rmq {
+namespace Json {
+template<typename T>
+class SecureAllocator {
+ public:
+ // Type definitions
+ using value_type = T;
+ using pointer = T*;
+ using const_pointer = const T*;
+ using reference = T&;
+ using const_reference = const T&;
+ using size_type = std::size_t;
+ using difference_type = std::ptrdiff_t;
+
+ /**
+ * Allocate memory for N items using the standard allocator.
+ */
+ pointer allocate(size_type n) {
+ // allocate using "global operator new"
+ return static_cast<pointer>(::operator new(n * sizeof(T)));
+ }
+
+ /**
+ * Release memory which was allocated for N items at pointer P.
+ *
+ * The memory block is filled with zeroes before being released.
+ * The pointer argument is tagged as "volatile" to prevent the
+ * compiler optimizing out this critical step.
+ */
+ void deallocate(volatile pointer p, size_type n) {
+ std::memset(p, 0, n * sizeof(T));
+ // free using "global operator delete"
+ ::operator delete(p);
+ }
+
+ /**
+ * Construct an item in-place at pointer P.
+ */
+ template<typename... Args>
+ void construct(pointer p, Args&&... args) {
+ // construct using "placement new" and "perfect forwarding"
+ ::new (static_cast<void*>(p)) T(std::forward<Args>(args)...);
+ }
+
+ size_type max_size() const {
+ return size_t(-1) / sizeof(T);
+ }
+
+ pointer address( reference x ) const {
+ return std::addressof(x);
+ }
+
+ const_pointer address( const_reference x ) const {
+ return std::addressof(x);
+ }
+
+ /**
+ * Destroy an item in-place at pointer P.
+ */
+ void destroy(pointer p) {
+ // destroy using "explicit destructor"
+ p->~T();
+ }
+
+ // Boilerplate
+ SecureAllocator() {}
+ template<typename U> SecureAllocator(const SecureAllocator<U>&) {}
+ template<typename U> struct rebind { using other = SecureAllocator<U>; };
+};
+
+
+template<typename T, typename U>
+bool operator==(const SecureAllocator<T>&, const SecureAllocator<U>&) {
+ return true;
+}
+
+template<typename T, typename U>
+bool operator!=(const SecureAllocator<T>&, const SecureAllocator<U>&) {
+ return false;
+}
+
+} //namespace Json
+} //namespace rmq
+
+#endif // CPPTL_JSON_ALLOCATOR_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/assertions.h b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
new file mode 100755
index 0000000..dc67b27
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/json/assertions.h
@@ -0,0 +1,54 @@
+// Copyright 2007-2010 Baptiste Lepilleur
+// Distributed under MIT license, or public domain if desired and
+// recognized in your jurisdiction.
+// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
+
+#ifndef CPPTL_JSON_ASSERTIONS_H_INCLUDED
+#define CPPTL_JSON_ASSERTIONS_H_INCLUDED
+
+#include <stdlib.h>
+#include <sstream>
+
+#if !defined(JSON_IS_AMALGAMATION)
+#include "config.h"
+#endif // if !defined(JSON_IS_AMALGAMATION)
+
+/** It should not be possible for a maliciously designed file to
+ * cause an abort() or seg-fault, so these macros are used only
+ * for pre-condition violations and internal logic errors.
+ */
+#if JSON_USE_EXCEPTION
+
+// @todo <= add detail about condition in exception
+# define JSON_ASSERT(condition) \
+ {if (!(condition)) {rmq::Json::throwLogicError( "assert json failed" );}}
+
+# define JSON_FAIL_MESSAGE(message) \
+ { \
+ JSONCPP_OSTRINGSTREAM oss; oss << message; \
+ rmq::Json::throwLogicError(oss.str()); \
+ abort(); \
+ }
+
+#else // JSON_USE_EXCEPTION
+
+# define JSON_ASSERT(condition) assert(condition)
+
+// The call to assert() will show the failure message in debug builds. In
+// release builds we abort, for a core-dump or debugger.
+# define JSON_FAIL_MESSAGE(message) \
+ { \
+ JSONCPP_OSTRINGSTREAM oss; oss << message; \
+ assert(false && oss.str().c_str()); \
+ abort(); \
+ }
+
+
+#endif
+
+#define JSON_ASSERT_MESSAGE(condition, message) \
+ if (!(condition)) { \
+ JSON_FAIL_MESSAGE(message); \
+ }
+
+#endif // CPPTL_JSON_ASSERTIONS_H_INCLUDED
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/jsoncpp/json/autolink.h b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
new file mode 100644
index 0000000..6fcc8af
--- /dev/null
+++ b/rocketmq-client4cpp/src/jsoncpp/json/autolink.h
@@ -0,0 +1,25 @@
+// Copyright 2007-2010 Baptiste Lepilleur
+// Distributed under MIT license, or public domain if desired and
+// recognized in your jurisdiction.
+// See file LICENSE for detail or copy at http://jsoncpp.sourceforge.net/LICENSE
+
+#ifndef JSON_AUTOLINK_H_INCLUDED
+#define JSON_AUTOLINK_H_INCLUDED
+
+#include "config.h"
+
+#ifdef JSON_IN_CPPTL
+#include <cpptl/cpptl_autolink.h>
+#endif
+
+#if !defined(JSON_NO_AUTOLINK) && !defined(JSON_DLL_BUILD) && \
+ !defined(JSON_IN_CPPTL)
+#define CPPTL_AUTOLINK_NAME "json"
+#undef CPPTL_AUTOLINK_DLL
+#ifdef JSON_DLL
+#define CPPTL_AUTOLINK_DLL
+#endif
+#include "autolink.h"
+#endif
+
+#endif // JSON_AUTOLINK_H_INCLUDED
|