rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [11/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp
new file mode 100755
index 0000000..8d7f8a1
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -0,0 +1,1018 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "DefaultMQPushConsumerImpl.h"
+
+#include <string>
+#include <set>
+#include "DefaultMQPushConsumer.h"
+#include "ConsumerStatManage.h"
+#include "DefaultMQPullConsumer.h"
+#include "DefaultMQProducer.h"
+#include "MQClientFactory.h"
+#include "MQAdminImpl.h"
+#include "RebalancePushImpl.h"
+#include "MQClientAPIImpl.h"
+#include "OffsetStore.h"
+#include "MixAll.h"
+#include "MQClientManager.h"
+#include "LocalFileOffsetStore.h"
+#include "RemoteBrokerOffsetStore.h"
+#include "PullSysFlag.h"
+#include "FilterAPI.h"
+#include "PullAPIWrapper.h"
+#include "MQClientException.h"
+#include "Validators.h"
+#include "MessageListener.h"
+#include "ConsumeMessageHook.h"
+#include "PullMessageService.h"
+#include "ConsumeMessageOrderlyService.h"
+#include "ConsumeMessageConcurrentlyService.h"
+#include "KPRUtil.h"
+#include "TimerThread.h"
+
+namespace rmq
+{
+
+/* RemoveProcessQueueLater */
+class RemoveProcessQueueLater : public kpr::TimerHandler
+{
+public:
+    RemoveProcessQueueLater(DefaultMQPushConsumerImpl* pConsumerImp, PullRequest* pPullRequest)
+        : m_pConsumerImp(pConsumerImp), m_pPullRequest(pPullRequest)
+    {
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+    	try
+    	{
+            m_pConsumerImp->getOffsetStore()->updateOffset(m_pPullRequest->getMessageQueue(), m_pPullRequest->getNextOffset(), false);
+            m_pConsumerImp->getOffsetStore()->persist(m_pPullRequest->getMessageQueue());
+            m_pConsumerImp->getRebalanceImpl()->removeProcessQueue(m_pPullRequest->getMessageQueue());
+
+            RMQ_WARN("fix the pull request offset, {%s}", m_pPullRequest->toString().c_str());
+        }
+        catch(...)
+        {
+        	RMQ_ERROR("RemoveProcessQueueLater OnTimeOut Exception");
+        }
+
+        delete this;
+    }
+
+private:
+	DefaultMQPushConsumerImpl* m_pConsumerImp;
+    PullRequest* m_pPullRequest;
+};
+
+
+/* DefaultMQPushConsumerImplCallback */
+class DefaultMQPushConsumerImplCallback : public PullCallback
+{
+public:
+    DefaultMQPushConsumerImplCallback(SubscriptionData& subscriptionData,
+        DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
+        PullRequest* pPullRequest)
+	    : m_subscriptionData(subscriptionData),
+	      m_pDefaultMQPushConsumerImpl(pDefaultMQPushConsumerImpl),
+	      m_pPullRequest(pPullRequest)
+	{
+	    m_beginTimestamp = KPRUtil::GetCurrentTimeMillis();
+	}
+
+    void onSuccess(PullResult& pullResult)
+	{
+		RMQ_DEBUG("onSuccess begin: %s", pullResult.toString().c_str());
+	    PullResult* pPullResult = &pullResult;
+	    if (pPullResult != NULL)
+	    {
+	        pPullResult =
+	            m_pDefaultMQPushConsumerImpl->m_pPullAPIWrapper->processPullResult(
+	                m_pPullRequest->getMessageQueue(), *pPullResult, m_subscriptionData);
+
+	        switch (pPullResult->pullStatus)
+	        {
+	            case FOUND:
+	            {
+	                m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
+
+	                long long pullRT = KPRUtil::GetCurrentTimeMillis() - m_beginTimestamp;
+	                m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat()
+	                .pullTimesTotal++;
+	                m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat()
+	                .pullRTTotal.fetchAndAdd(pullRT);
+
+	                ProcessQueue* processQueue = m_pPullRequest->getProcessQueue();
+	                bool dispatchToConsume = processQueue->putMessage(pPullResult->msgFoundList);
+
+	                m_pDefaultMQPushConsumerImpl->m_pConsumeMessageService->submitConsumeRequest(//
+	                    pPullResult->msgFoundList, //
+	                    processQueue, //
+	                    m_pPullRequest->getMessageQueue(), //
+	                    dispatchToConsume);
+
+	                if (m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval() > 0)
+	                {
+	                    m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest,
+	                            m_pDefaultMQPushConsumerImpl->m_pDefaultMQPushConsumer->getPullInterval());
+	                }
+	                else
+	                {
+	                    m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
+	                }
+	            }
+	            break;
+	            case NO_NEW_MSG:
+	                m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
+	                m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest);
+	                m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
+	                break;
+	            case NO_MATCHED_MSG:
+	                m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
+	                m_pDefaultMQPushConsumerImpl->correctTagsOffset(*m_pPullRequest);
+	                m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
+	                break;
+	            case OFFSET_ILLEGAL:
+	                RMQ_WARN("the pull request offset illegal, %s, %s",
+	                         m_pPullRequest->toString().c_str(), pPullResult->toString().c_str());
+
+	                /*
+	                if (m_pPullRequest->getNextOffset() < pPullResult->minOffset)
+	                {
+	                    m_pPullRequest->setNextOffset(pPullResult->minOffset);
+	                }
+	                else if (m_pPullRequest->getNextOffset() > pPullResult->maxOffset)
+	                {
+	                    m_pPullRequest->setNextOffset(pPullResult->maxOffset);
+	                }
+	                m_pDefaultMQPushConsumerImpl->m_pOffsetStore->updateOffset(
+	                    m_pPullRequest->getMessageQueue(), m_pPullRequest->getNextOffset(), false);
+	                m_pDefaultMQPushConsumerImpl->executePullRequestImmediately(m_pPullRequest);
+	                */
+
+	                // todo
+	                m_pPullRequest->setNextOffset(pPullResult->nextBeginOffset);
+					m_pPullRequest->getProcessQueue()->setDropped(true);
+
+					m_pDefaultMQPushConsumerImpl->executeTaskLater(new RemoveProcessQueueLater(
+						m_pDefaultMQPushConsumerImpl, m_pPullRequest), 10000);
+		            break;
+	            default:
+	                break;
+	        }
+	    }
+	    else
+	    {
+	        RMQ_WARN("Warning: PullRequest is null!");
+	    }
+	    RMQ_DEBUG("onSuccess end");
+	}
+
+	void onException(MQException& e)
+	{
+	    std::string topic = m_pPullRequest->getMessageQueue().getTopic();
+	    if (topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) != std::string::npos)
+	    {
+	        RMQ_WARN("execute the pull request exception:%s", e.what());
+	    }
+
+	    m_pDefaultMQPushConsumerImpl->executePullRequestLater(m_pPullRequest,
+	            DefaultMQPushConsumerImpl::s_PullTimeDelayMillsWhenException);
+	}
+
+private:
+    SubscriptionData m_subscriptionData;
+    DefaultMQPushConsumerImpl* m_pDefaultMQPushConsumerImpl;
+    PullRequest* m_pPullRequest;
+    unsigned long long m_beginTimestamp;
+};
+
+
+DefaultMQPushConsumerImpl::DefaultMQPushConsumerImpl(DefaultMQPushConsumer* pDefaultMQPushConsumer)
+{
+    m_pDefaultMQPushConsumer = pDefaultMQPushConsumer;
+    m_serviceState = CREATE_JUST;
+    flowControlTimes1 = 0;
+    flowControlTimes2 = 0;
+    m_pause = false;
+    m_consumeOrderly = false;
+
+    m_pMQClientFactory = NULL;
+    m_pPullAPIWrapper = NULL;
+    m_pMessageListenerInner = NULL;
+    m_pOffsetStore = NULL;
+    m_pRebalanceImpl = new RebalancePushImpl(this);
+    m_pConsumerStatManager = new ConsumerStatManager();
+    m_pConsumeMessageService = NULL;
+}
+
+DefaultMQPushConsumerImpl::~DefaultMQPushConsumerImpl()
+{
+    //delete m_pMessageListenerInner;
+    if (m_pPullAPIWrapper)
+		delete m_pPullAPIWrapper;
+	if (m_pRebalanceImpl)
+    	delete m_pRebalanceImpl;
+    if (m_pConsumerStatManager)
+    	delete m_pConsumerStatManager;
+    if (m_pConsumeMessageService)
+    	delete m_pConsumeMessageService;
+    if (m_pOffsetStore)
+    	delete m_pOffsetStore;
+    //delete m_pMQClientFactory;
+}
+
+void DefaultMQPushConsumerImpl::start()
+{
+    RMQ_DEBUG("DefaultMQPushConsumerImpl::start()");
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+        {
+        	RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}",
+                m_pDefaultMQPushConsumer->getConsumerGroup().c_str(),
+                getMessageModelString(m_pDefaultMQPushConsumer->getMessageModel()));
+
+            m_serviceState = START_FAILED;
+            checkConfig();
+            copySubscription();
+
+            if (m_pDefaultMQPushConsumer->getMessageModel() == CLUSTERING)
+            {
+                m_pDefaultMQPushConsumer->changeInstanceNameToPID();
+            }
+
+            m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPushConsumer);
+
+            m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPushConsumer->getConsumerGroup());
+            m_pRebalanceImpl->setMessageModel(m_pDefaultMQPushConsumer->getMessageModel());
+            m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy());
+            m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory);
+
+            m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup());
+
+            if (m_pDefaultMQPushConsumer->getOffsetStore() != NULL)
+            {
+                m_pOffsetStore = m_pDefaultMQPushConsumer->getOffsetStore();
+            }
+            else
+            {
+                switch (m_pDefaultMQPushConsumer->getMessageModel())
+                {
+                    case BROADCASTING:
+                        m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup());
+                        break;
+                    case CLUSTERING:
+                        m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPushConsumer->getConsumerGroup());
+                        break;
+                    default:
+                        break;
+                }
+            }
+
+            m_pOffsetStore->load();
+
+            if (dynamic_cast<MessageListenerOrderly*>(m_pMessageListenerInner) != NULL)
+            {
+                m_consumeOrderly = true;
+                m_pConsumeMessageService =
+                    new ConsumeMessageOrderlyService(this, (MessageListenerOrderly*)m_pMessageListenerInner);
+            }
+            else if (dynamic_cast<MessageListenerConcurrently*>(m_pMessageListenerInner) != NULL)
+            {
+                m_consumeOrderly = false;
+                m_pConsumeMessageService =
+                    new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently*)m_pMessageListenerInner);
+            }
+            m_pConsumeMessageService->start();
+
+            bool registerOK = m_pMQClientFactory->registerConsumer(m_pDefaultMQPushConsumer->getConsumerGroup(), this);
+            if (!registerOK)
+            {
+                m_serviceState = CREATE_JUST;
+                m_pConsumeMessageService->shutdown();
+                std::string str = "The consumer group[" + m_pDefaultMQPushConsumer->getConsumerGroup();
+                str += "] has been created before, specify another name please.";
+                THROW_MQEXCEPTION(MQClientException, str, -1);
+            }
+            m_pMQClientFactory->start();
+
+			RMQ_INFO("the consumer [%s] start OK.", m_pDefaultMQPushConsumer->getConsumerGroup().c_str());
+            m_serviceState = RUNNING;
+        }
+        break;
+        case RUNNING:
+        case START_FAILED:
+        case SHUTDOWN_ALREADY:
+            THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1);
+        default:
+            break;
+    }
+
+    updateTopicSubscribeInfoWhenSubscriptionChanged();
+    m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
+    m_pMQClientFactory->rebalanceImmediately();
+}
+
+
+void DefaultMQPushConsumerImpl::shutdown()
+{
+    RMQ_DEBUG("DefaultMQPushConsumerImpl::shutdown()");
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+            break;
+        case RUNNING:
+            m_pConsumeMessageService->shutdown();
+            persistConsumerOffset();
+            m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPushConsumer->getConsumerGroup());
+            m_pMQClientFactory->shutdown();
+
+            m_serviceState = SHUTDOWN_ALREADY;
+            break;
+        case SHUTDOWN_ALREADY:
+            break;
+        default:
+            break;
+    }
+}
+
+
+
+
+bool DefaultMQPushConsumerImpl::hasHook()
+{
+    return !m_hookList.empty();
+}
+
+void DefaultMQPushConsumerImpl::registerHook(ConsumeMessageHook* pHook)
+{
+    m_hookList.push_back(pHook);
+}
+
+void DefaultMQPushConsumerImpl::executeHookBefore(ConsumeMessageContext& context)
+{
+    std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin();
+    for (; it != m_hookList.end(); it++)
+    {
+        try
+        {
+            (*it)->consumeMessageBefore(context);
+        }
+        catch (...)
+        {
+        	RMQ_WARN("consumeMessageBefore exception");
+        }
+    }
+}
+
+void DefaultMQPushConsumerImpl::executeHookAfter(ConsumeMessageContext& context)
+{
+    std::list<ConsumeMessageHook*>::iterator it = m_hookList.begin();
+    for (; it != m_hookList.end(); it++)
+    {
+        try
+        {
+            (*it)->consumeMessageAfter(context);
+        }
+        catch (...)
+        {
+        	RMQ_WARN("consumeMessageAfter exception");
+        }
+    }
+}
+
+void DefaultMQPushConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
+{
+    m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
+}
+
+std::set<MessageQueue>* DefaultMQPushConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic)
+{
+    std::map<std::string, std::set<MessageQueue> >& mqs =  m_pRebalanceImpl->getTopicSubscribeInfoTable();
+    std::map<std::string, std::set<MessageQueue> >::iterator it = mqs.find(topic);
+
+    if (it == mqs.end())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic);
+        mqs =  m_pRebalanceImpl->getTopicSubscribeInfoTable();
+        it = mqs.find(topic);
+    }
+
+    if (it == mqs.end())
+    {
+        THROW_MQEXCEPTION(MQClientException, "The topic[" + topic + "] not exist", -1);
+    }
+
+    std::set<MessageQueue>* result = new std::set<MessageQueue>(it->second.begin(), it->second.end());
+    return result;
+}
+
+DefaultMQPushConsumer* DefaultMQPushConsumerImpl::getDefaultMQPushConsumer()
+{
+    return m_pDefaultMQPushConsumer;
+}
+
+long long DefaultMQPushConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
+}
+
+long long DefaultMQPushConsumerImpl::maxOffset(const MessageQueue& mq)
+{
+    return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+}
+
+long long DefaultMQPushConsumerImpl::minOffset(const MessageQueue& mq)
+{
+    return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
+}
+
+OffsetStore* DefaultMQPushConsumerImpl::getOffsetStore()
+{
+    return m_pOffsetStore;
+}
+
+void DefaultMQPushConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore)
+{
+    m_pOffsetStore = pOffsetStore;
+}
+
+//MQConsumerInner
+std::string DefaultMQPushConsumerImpl::groupName()
+{
+    return m_pDefaultMQPushConsumer->getConsumerGroup();
+}
+
+MessageModel DefaultMQPushConsumerImpl::messageModel()
+{
+    return m_pDefaultMQPushConsumer->getMessageModel();
+}
+
+ConsumeType DefaultMQPushConsumerImpl::consumeType()
+{
+    return CONSUME_PASSIVELY;
+}
+
+ConsumeFromWhere DefaultMQPushConsumerImpl::consumeFromWhere()
+{
+    return m_pDefaultMQPushConsumer->getConsumeFromWhere();
+}
+
+std::set<SubscriptionData> DefaultMQPushConsumerImpl::subscriptions()
+{
+    std::set<SubscriptionData> sds;
+    std::map<std::string, SubscriptionData>& subscription = m_pRebalanceImpl->getSubscriptionInner();
+    std::map<std::string, SubscriptionData>::iterator it = subscription.begin();
+    for (; it != subscription.end(); it++)
+    {
+        sds.insert(it->second);
+    }
+
+    return sds;
+}
+
+void DefaultMQPushConsumerImpl::doRebalance()
+{
+    if (m_pRebalanceImpl != NULL)
+    {
+        m_pRebalanceImpl->doRebalance();
+    }
+}
+
+void DefaultMQPushConsumerImpl::persistConsumerOffset()
+{
+    try
+    {
+        makeSureStateOK();
+
+        std::set<MessageQueue> mqs;
+        {
+	        kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
+	        std::map<MessageQueue, ProcessQueue*>& processQueueTable = m_pRebalanceImpl->getProcessQueueTable();
+	        RMQ_FOR_EACH(processQueueTable, it)
+	        {
+	            mqs.insert(it->first);
+	        }
+        }
+
+        m_pOffsetStore->persistAll(mqs);
+    }
+    catch (...)
+    {
+    	RMQ_ERROR("persistConsumerOffset exception, group: %s",
+    		m_pDefaultMQPushConsumer->getConsumerGroup().c_str());
+    }
+}
+
+void DefaultMQPushConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info)
+{
+    std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner();
+
+    if (subTable.find(topic) != subTable.end())
+    {
+        m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info));
+    }
+}
+
+std::map<std::string, SubscriptionData>& DefaultMQPushConsumerImpl::getSubscriptionInner()
+{
+    return m_pRebalanceImpl->getSubscriptionInner();
+}
+
+bool DefaultMQPushConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic)
+{
+    std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner();
+
+    if (subTable.find(topic) != subTable.end())
+    {
+        std::map<std::string, std::set<MessageQueue> >& mqs =
+            m_pRebalanceImpl->getTopicSubscribeInfoTable();
+
+        return mqs.find(topic) == mqs.end();
+    }
+
+    return false;
+}
+
+bool DefaultMQPushConsumerImpl::isPause()
+{
+    return m_pause;
+}
+
+void DefaultMQPushConsumerImpl::setPause(bool pause)
+{
+    m_pause = pause;
+}
+
+
+void DefaultMQPushConsumerImpl::correctTagsOffset(PullRequest& pullRequest)
+{
+    if (pullRequest.getProcessQueue()->getMsgCount().get() == 0)
+    {
+        m_pOffsetStore->updateOffset(pullRequest.getMessageQueue(), pullRequest.getNextOffset(), true);
+    }
+}
+
+void DefaultMQPushConsumerImpl::pullMessage(PullRequest* pPullRequest)
+{
+	RMQ_DEBUG("pullMessage begin: %s", pPullRequest->toString().c_str());
+
+    ProcessQueue* processQueue = pPullRequest->getProcessQueue();
+    if (processQueue->isDropped())
+    {
+        RMQ_WARN("the pull request[%s] is dropped.", pPullRequest->toString().c_str());
+        delete pPullRequest;
+        return;
+    }
+
+    pPullRequest->getProcessQueue()->setLastPullTimestamp(KPRUtil::GetCurrentTimeMillis());
+
+    try
+    {
+        makeSureStateOK();
+    }
+    catch (const MQException& e)
+    {
+        RMQ_WARN("pullMessage exception [%s], consumer state not ok", e.what());
+        executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException);
+        return;
+    }
+
+    if (isPause())
+    {
+    	RMQ_WARN("consumer was paused, execute pull request later. instanceName={%s}",
+                m_pDefaultMQPushConsumer->getInstanceName().c_str());
+        executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenSuspend);
+        return;
+    }
+
+    long size = processQueue->getMsgCount().get();
+    if (size > m_pDefaultMQPushConsumer->getPullThresholdForQueue())
+    {
+        executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenFlowControl);
+        if ((flowControlTimes1++ % 3000) == 0)
+        {
+            RMQ_WARN("the consumer message buffer is full, so do flow control, {%ld} {%s} {%lld}", size,
+                    pPullRequest->toString().c_str(), flowControlTimes1);
+        }
+        return;
+    }
+
+    if (!m_consumeOrderly)
+    {
+        if (processQueue->getMaxSpan() > m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan())
+        {
+            executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenFlowControl);
+            if ((flowControlTimes2++ % 3000) == 0)
+            {
+                RMQ_WARN("the queue's messages, span too long, so do flow control, size: {%ld}, pullRequest: {%s}, times: {%lld}, maxspan: {%lld}",
+                	size, pPullRequest->toString().c_str(), flowControlTimes2, processQueue->getMaxSpan());
+            }
+            return;
+        }
+    }
+
+    std::map<std::string, SubscriptionData>& subTable = getSubscriptionInner();
+    std::string topic = pPullRequest->getMessageQueue().getTopic();
+    std::map<std::string, SubscriptionData>::iterator it = subTable.find(topic);
+    if (it == subTable.end())
+    {
+        executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException);
+        RMQ_WARN("find the consumer's subscription failed, {%s}", pPullRequest->toString().c_str());
+        return;
+    }
+
+    SubscriptionData subscriptionData = it->second;
+    PullCallback* pullCallback = new DefaultMQPushConsumerImplCallback(subTable[topic], this, pPullRequest);
+
+    bool commitOffsetEnable = false;
+    long commitOffsetValue = 0L;
+    if (CLUSTERING == m_pDefaultMQPushConsumer->getMessageModel())
+    {
+        commitOffsetValue = m_pOffsetStore->readOffset(pPullRequest->getMessageQueue(),
+                            READ_FROM_MEMORY);
+        if (commitOffsetValue > 0)
+        {
+            commitOffsetEnable = true;
+        }
+    }
+
+    int sysFlag = PullSysFlag::buildSysFlag(
+                      commitOffsetEnable, // commitOffset
+                      true, // suspend
+                      false// subscription
+                  );
+    try
+    {
+        m_pPullAPIWrapper->pullKernelImpl(
+            pPullRequest->getMessageQueue(), // 1
+            "", // 2
+            subscriptionData.getSubVersion(), // 3
+            pPullRequest->getNextOffset(), // 4
+            m_pDefaultMQPushConsumer->getPullBatchSize(), // 5
+            sysFlag, // 6
+            commitOffsetValue,// 7
+            s_BrokerSuspendMaxTimeMillis, // 8
+            s_ConsumerTimeoutMillisWhenSuspend, // 9
+            ASYNC, // 10
+            pullCallback// 11
+        );
+    }
+    catch (...)
+    {
+        RMQ_ERROR("pullKernelImpl exception");
+        executePullRequestLater(pPullRequest, s_PullTimeDelayMillsWhenException);
+    }
+
+    RMQ_DEBUG("pullMessage end");
+}
+
+void DefaultMQPushConsumerImpl::executePullRequestImmediately(PullRequest* pullRequest)
+{
+    m_pMQClientFactory->getPullMessageService()->executePullRequestImmediately(pullRequest);
+}
+
+void DefaultMQPushConsumerImpl::executePullRequestLater(PullRequest* pullRequest, long timeDelay)
+{
+    m_pMQClientFactory->getPullMessageService()->executePullRequestLater(pullRequest, timeDelay);
+}
+
+void DefaultMQPushConsumerImpl::executeTaskLater(kpr::TimerHandler* handler, long timeDelay)
+{
+    m_pMQClientFactory->getPullMessageService()->executeTaskLater(handler, timeDelay);
+}
+
+
+void DefaultMQPushConsumerImpl::makeSureStateOK()
+{
+    if (m_serviceState != RUNNING)
+    {
+        THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1);
+    }
+}
+
+ConsumerStatManager* DefaultMQPushConsumerImpl::getConsumerStatManager()
+{
+    return m_pConsumerStatManager;
+}
+
+QueryResult DefaultMQPushConsumerImpl::queryMessage(const std::string& topic,
+        const std::string&  key,
+        int maxNum,
+        long long begin,
+        long long end)
+{
+    return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end);
+}
+
+void DefaultMQPushConsumerImpl::registerMessageListener(MessageListener* pMessageListener)
+{
+    m_pMessageListenerInner = pMessageListener;
+}
+
+void DefaultMQPushConsumerImpl::resume()
+{
+    m_pause = false;
+}
+
+long long DefaultMQPushConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+    return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
+}
+
+void DefaultMQPushConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
+{
+    try
+    {
+    	std::string brokerAddr = brokerName.empty() ?
+    		socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName);
+
+        m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg,
+                m_pDefaultMQPushConsumer->getConsumerGroup(),
+                delayLevel,
+                5000);
+    }
+    catch (...)
+    {
+    	RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPushConsumer->getConsumerGroup().c_str());
+        Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()),
+                       msg.getBody(), msg.getBodyLen());
+
+		std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
+		newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
+                    : originMsgId);
+
+        newMsg.setFlag(msg.getFlag());
+        newMsg.setProperties(msg.getProperties());
+        newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
+
+        int reTimes = msg.getReconsumeTimes() + 1;
+        newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
+        newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes()));
+        newMsg.setDelayTimeLevel(3 + reTimes);
+
+        m_pMQClientFactory->getDefaultMQProducer()->send(newMsg);
+    }
+}
+
+void DefaultMQPushConsumerImpl::checkConfig()
+{
+    // consumerGroup check
+    Validators::checkGroup(m_pDefaultMQPushConsumer->getConsumerGroup());
+
+    // consumerGroup
+    if (m_pDefaultMQPushConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP)
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal "
+                          + MixAll::DEFAULT_CONSUMER_GROUP //
+                          + ", please specify another one.", -1);
+    }
+
+    if (m_pDefaultMQPushConsumer->getMessageModel() != BROADCASTING
+        && m_pDefaultMQPushConsumer->getMessageModel() != CLUSTERING)
+    {
+        THROW_MQEXCEPTION(MQClientException, "messageModel is invalid ", -1);
+    }
+
+    // allocateMessageQueueStrategy
+    if (m_pDefaultMQPushConsumer->getAllocateMessageQueueStrategy() == NULL)
+    {
+        THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1);
+    }
+
+    // consumeFromWhereOffset
+    if (m_pDefaultMQPushConsumer->getConsumeFromWhere() < CONSUME_FROM_LAST_OFFSET
+        || m_pDefaultMQPushConsumer->getConsumeFromWhere() > CONSUME_FROM_MAX_OFFSET)
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumeFromWhere is invalid", -1);
+    }
+
+    // subscription
+    /*
+    if (m_pDefaultMQPushConsumer->getSubscription().size() == 0)
+    {
+      THROW_MQEXCEPTION(MQClientException,"subscription is null" ,-1);
+    }
+    */
+
+    // messageListener
+    if (m_pDefaultMQPushConsumer->getMessageListener() == NULL)
+    {
+        THROW_MQEXCEPTION(MQClientException, "messageListener is null", -1);
+    }
+
+    MessageListener* listener = m_pDefaultMQPushConsumer->getMessageListener();
+    MessageListener* orderly = (dynamic_cast<MessageListenerOrderly*>(listener)) ;
+    MessageListener* concurrently = (dynamic_cast<MessageListenerConcurrently*>(listener)) ;
+
+    if (!orderly && !concurrently)
+    {
+        THROW_MQEXCEPTION(MQClientException,
+                          "messageListener must be instanceof MessageListenerOrderly or MessageListenerConcurrently" ,
+                          -1);
+    }
+
+    // consumeThreadMin
+    if (m_pDefaultMQPushConsumer->getConsumeThreadMin() < 1
+        || m_pDefaultMQPushConsumer->getConsumeThreadMin() > 1000
+        || m_pDefaultMQPushConsumer->getConsumeThreadMin() > m_pDefaultMQPushConsumer->getConsumeThreadMax()
+       )
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumeThreadMin Out of range [1, 1000]", -1);
+    }
+
+    // consumeThreadMax
+    if (m_pDefaultMQPushConsumer->getConsumeThreadMax() < 1
+        || m_pDefaultMQPushConsumer->getConsumeThreadMax() > 1000)
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumeThreadMax Out of range [1, 1000]", -1);
+    }
+
+    // consumeConcurrentlyMaxSpan
+    if (m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() < 1
+        || m_pDefaultMQPushConsumer->getConsumeConcurrentlyMaxSpan() > 65535)
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumeConcurrentlyMaxSpan Out of range [1, 65535]" , -1);
+    }
+
+    // pullThresholdForQueue
+    if (m_pDefaultMQPushConsumer->getPullThresholdForQueue() < 1
+        || m_pDefaultMQPushConsumer->getPullThresholdForQueue() > 65535)
+    {
+        THROW_MQEXCEPTION(MQClientException, "pullThresholdForQueue Out of range [1, 65535]", -1);
+    }
+
+    // pullInterval
+    if (m_pDefaultMQPushConsumer->getPullInterval() < 0
+        || m_pDefaultMQPushConsumer->getPullInterval() > 65535)
+    {
+        THROW_MQEXCEPTION(MQClientException, "pullInterval Out of range [0, 65535]", -1);
+    }
+
+    // consumeMessageBatchMaxSize
+    if (m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() < 1
+        || m_pDefaultMQPushConsumer->getConsumeMessageBatchMaxSize() > 1024)
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumeMessageBatchMaxSize Out of range [1, 1024]", -1);
+    }
+
+    // pullBatchSize
+    if (m_pDefaultMQPushConsumer->getPullBatchSize() < 1
+        || m_pDefaultMQPushConsumer->getPullBatchSize() > 1024)
+    {
+        THROW_MQEXCEPTION(MQClientException, "pullBatchSize Out of range [1, 1024]", -1);
+    }
+}
+
+void DefaultMQPushConsumerImpl::copySubscription()
+{
+    try
+    {
+        std::map<std::string, std::string>& sub = m_pDefaultMQPushConsumer->getSubscription();
+        std::map<std::string, std::string>::iterator it = sub.begin();
+        for (; it != sub.end(); it++)
+        {
+            SubscriptionDataPtr subscriptionData = FilterAPI::buildSubscriptionData(it->first, it->second);
+            m_pRebalanceImpl->getSubscriptionInner()[it->first] = *subscriptionData;
+        }
+
+        if (m_pMessageListenerInner == NULL)
+        {
+            m_pMessageListenerInner = m_pDefaultMQPushConsumer->getMessageListener();
+        }
+
+        switch (m_pDefaultMQPushConsumer->getMessageModel())
+        {
+            case BROADCASTING:
+                break;
+            case CLUSTERING:
+            {
+                std::string retryTopic = MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup());
+                SubscriptionDataPtr subscriptionData =
+                    FilterAPI::buildSubscriptionData(retryTopic, SubscriptionData::SUB_ALL);
+                m_pRebalanceImpl->getSubscriptionInner()[retryTopic] = *subscriptionData;
+            }
+
+            break;
+            default:
+                break;
+        }
+    }
+    catch (...)
+    {
+        THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
+    }
+}
+
+void DefaultMQPushConsumerImpl::updateTopicSubscribeInfoWhenSubscriptionChanged()
+{
+    std::map<std::string, SubscriptionData> subTable = getSubscriptionInner();
+    std::map<std::string, SubscriptionData>::iterator it = subTable.begin();
+    for (; it != subTable.end(); it++)
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(it->first);
+    }
+}
+
+MessageListener* DefaultMQPushConsumerImpl::getMessageListenerInner()
+{
+    return m_pMessageListenerInner;
+}
+
+void DefaultMQPushConsumerImpl::subscribe(const std::string& topic, const std::string& subExpression)
+{
+    try
+    {
+        SubscriptionDataPtr subscriptionData = FilterAPI::buildSubscriptionData(topic, subExpression);
+        m_pRebalanceImpl->getSubscriptionInner()[topic] = *subscriptionData;
+
+        if (m_pMQClientFactory)
+        {
+            m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
+        }
+    }
+    catch (...)
+    {
+        THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
+    }
+}
+
+void DefaultMQPushConsumerImpl::suspend()
+{
+    m_pause = true;
+}
+
+void DefaultMQPushConsumerImpl::unsubscribe(const std::string& topic)
+{
+    m_pRebalanceImpl->getSubscriptionInner().erase(topic);
+}
+
+void DefaultMQPushConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset)
+{
+    m_pOffsetStore->updateOffset(mq, offset, false);
+}
+
+void DefaultMQPushConsumerImpl::updateCorePoolSize(int corePoolSize)
+{
+    m_pConsumeMessageService->updateCorePoolSize(corePoolSize);
+}
+
+MessageExt* DefaultMQPushConsumerImpl::viewMessage(const std::string& msgId)
+{
+    return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
+}
+
+RebalanceImpl* DefaultMQPushConsumerImpl::getRebalanceImpl()
+{
+    return m_pRebalanceImpl;
+}
+
+bool DefaultMQPushConsumerImpl::isConsumeOrderly()
+{
+    return m_consumeOrderly;
+}
+
+void DefaultMQPushConsumerImpl::setConsumeOrderly(bool consumeOrderly)
+{
+    m_consumeOrderly = consumeOrderly;
+}
+
+
+MQClientFactory* DefaultMQPushConsumerImpl::getmQClientFactory()
+{
+    return m_pMQClientFactory;
+}
+
+void DefaultMQPushConsumerImpl::setmQClientFactory(MQClientFactory* mQClientFactory)
+{
+    m_pMQClientFactory = mQClientFactory;
+}
+
+
+ServiceState DefaultMQPushConsumerImpl::getServiceState()
+{
+    return m_serviceState;
+}
+
+void DefaultMQPushConsumerImpl::setServiceState(ServiceState serviceState)
+{
+    m_serviceState = serviceState;
+}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h
new file mode 100755
index 0000000..5370586
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumerImpl.h
@@ -0,0 +1,169 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __DEFAULTMQPUSHCONSUMERIMPL_H__
+#define __DEFAULTMQPUSHCONSUMERIMPL_H__
+
+#include <string>
+#include <set>
+#include <map>
+
+#include "MQConsumerInner.h"
+#include "MessageExt.h"
+#include "QueryResult.h"
+#include "ServiceState.h"
+#include "PullResult.h"
+#include "ConsumeMessageHook.h"
+#include "MixAll.h"
+#include "PullCallback.h"
+#include "TimerThread.h"
+
+namespace rmq
+{
+    class DefaultMQPushConsumer;
+    class ConsumeMessageHook;
+    class OffsetStore;
+    class RebalanceImpl;
+    class ConsumerStatManager;
+    class ConsumeMessageService;
+    class MessageListener;
+    class PullRequest;
+    class MQClientFactory;
+    class PullAPIWrapper;
+    class PullMessageService;
+    class DefaultMQPushConsumerImplCallback;
+	class MQException;
+
+    /**
+    * Push Consumer Impl
+    *
+    */
+    class DefaultMQPushConsumerImpl : public  MQConsumerInner
+    {
+    public:
+        DefaultMQPushConsumerImpl(DefaultMQPushConsumer* pDefaultMQPushConsumer);
+		~DefaultMQPushConsumerImpl();
+
+        void start();
+        void suspend();
+        void resume();
+        void shutdown();
+        bool isPause();
+        void setPause(bool pause);
+
+        bool hasHook();
+        void registerHook(ConsumeMessageHook* pHook);
+        void executeHookBefore(ConsumeMessageContext& context);
+        void executeHookAfter(ConsumeMessageContext& context);
+
+        void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
+        std::set<MessageQueue>* fetchSubscribeMessageQueues(const std::string& topic);
+
+        long long earliestMsgStoreTime(const MessageQueue& mq);
+        long long maxOffset(const MessageQueue& mq);
+        long long minOffset(const MessageQueue& mq);
+        OffsetStore* getOffsetStore() ;
+        void setOffsetStore(OffsetStore* pOffsetStore);
+
+        //MQConsumerInner
+        std::string groupName() ;
+        MessageModel messageModel() ;
+        ConsumeType consumeType();
+        ConsumeFromWhere consumeFromWhere();
+        std::set<SubscriptionData> subscriptions();
+        void doRebalance() ;
+        void persistConsumerOffset() ;
+        void updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info);
+        std::map<std::string, SubscriptionData>& getSubscriptionInner() ;
+        bool isSubscribeTopicNeedUpdate(const std::string& topic);
+
+        MessageExt* viewMessage(const std::string& msgId);
+        QueryResult queryMessage(const std::string& topic,
+                                 const std::string&  key,
+                                 int maxNum,
+                                 long long begin,
+                                 long long end);
+
+        void registerMessageListener(MessageListener* pMessageListener);
+        long long searchOffset(const MessageQueue& mq, long long timestamp);
+        void sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName);
+
+        void subscribe(const std::string& topic, const std::string& subExpression);
+        void unsubscribe(const std::string& topic);
+
+        void updateConsumeOffset(MessageQueue& mq, long long offset);
+        void updateCorePoolSize(int corePoolSize);
+        bool isConsumeOrderly();
+        void setConsumeOrderly(bool consumeOrderly);
+
+        RebalanceImpl* getRebalanceImpl() ;
+        MessageListener* getMessageListenerInner();
+        DefaultMQPushConsumer* getDefaultMQPushConsumer() ;
+        ConsumerStatManager* getConsumerStatManager();
+
+		MQClientFactory* getmQClientFactory();
+		void setmQClientFactory(MQClientFactory* mQClientFactory);
+
+		ServiceState getServiceState();
+		void setServiceState(ServiceState serviceState);
+
+    private:
+        void correctTagsOffset(PullRequest& pullRequest) ;
+
+        void pullMessage(PullRequest* pPullRequest);
+
+
+        void executePullRequestImmediately(PullRequest* pullRequest);
+
+
+        void executePullRequestLater(PullRequest* pullRequest, long timeDelay);
+		void executeTaskLater(kpr::TimerHandler* handler, long timeDelay);
+
+        void makeSureStateOK();
+        void checkConfig();
+        void copySubscription() ;
+        void updateTopicSubscribeInfoWhenSubscriptionChanged();
+
+    private:
+		static const int s_PullTimeDelayMillsWhenException = 3000;
+		static const int s_PullTimeDelayMillsWhenFlowControl = 50;
+		static const int s_PullTimeDelayMillsWhenSuspend = 1000;
+		static const int s_BrokerSuspendMaxTimeMillis = 15000;
+		static const int s_ConsumerTimeoutMillisWhenSuspend = 30000;
+
+        long long flowControlTimes1;
+        long long flowControlTimes2;
+        ServiceState m_serviceState;
+        volatile bool m_pause;
+        bool m_consumeOrderly;
+        DefaultMQPushConsumer* m_pDefaultMQPushConsumer;
+        MQClientFactory* m_pMQClientFactory;
+        PullAPIWrapper* m_pPullAPIWrapper;
+        MessageListener* m_pMessageListenerInner;
+        OffsetStore* m_pOffsetStore;
+        RebalanceImpl* m_pRebalanceImpl;
+        ConsumerStatManager* m_pConsumerStatManager;
+        ConsumeMessageService* m_pConsumeMessageService;
+
+        std::list<ConsumeMessageHook*> m_hookList;
+        friend class PullMessageService;
+        friend class RebalancePushImpl;
+        friend class DefaultMQPushConsumerImplCallback;
+    };
+}
+
+#endif
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp
new file mode 100755
index 0000000..40e9d65
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.cpp
@@ -0,0 +1,257 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "LocalFileOffsetStore.h"
+
+#include "MQClientFactory.h"
+#include "OffsetSerializeWrapper.h"
+#include "ScopedLock.h"
+#include "FileUtil.h"
+#include "MixAll.h"
+#include "Exception.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+
+LocalFileOffsetStore::LocalFileOffsetStore(MQClientFactory* pMQClientFactory,
+        const std::string& groupName)
+{
+    m_pMQClientFactory = pMQClientFactory;
+    m_groupName = groupName;
+    std::string homePath = getenv("HOME");
+    m_storePath = homePath + "/.rocketmq_offsets/" + m_pMQClientFactory->getClientId()
+                  + "/" + m_groupName + "/offsets.json";
+}
+
+void  LocalFileOffsetStore::load()
+{
+    OffsetSerializeWrapperPtr offsetSerializeWrapper = this->readLocalOffset();
+    if (offsetSerializeWrapper.ptr() != NULL
+        && offsetSerializeWrapper->getOffsetTable().size() > 0)
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+        m_offsetTable = offsetSerializeWrapper->getOffsetTable();
+        RMQ_FOR_EACH(m_offsetTable, it)
+        {
+            const MessageQueue& mq = it->first;
+            const kpr::AtomicLong& offset = it->second;
+            RMQ_INFO("load consumer's offset, {%s} {%s} {%lld}",
+                     m_groupName.c_str(),
+                     mq.toString().c_str(),
+                     offset.get());
+        }
+    }
+}
+
+
+void  LocalFileOffsetStore::updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly)
+{
+    RMQ_DEBUG("updateOffset, MQ:%s, offset:%lld", mq.toString().c_str(), offset);
+    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+    typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+    if (it == m_offsetTable.end())
+    {
+        m_offsetTable[mq] = offset;
+        it = m_offsetTable.find(mq);
+    }
+
+    kpr::AtomicLong& offsetOld = it->second;
+    if (increaseOnly)
+    {
+        MixAll::compareAndIncreaseOnly(offsetOld, offset);
+    }
+    else
+    {
+        offsetOld.set(offset);
+    }
+}
+
+long long  LocalFileOffsetStore::readOffset(const MessageQueue& mq, ReadOffsetType type)
+{
+    RMQ_DEBUG("readOffset, MQ:%s, type:%d", mq.toString().c_str(), type);
+    switch (type)
+    {
+        case MEMORY_FIRST_THEN_STORE:
+        case READ_FROM_MEMORY:
+        {
+            kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+            typeof(m_offsetTable.begin()) it = m_offsetTable.find(mq);
+            if (it != m_offsetTable.end())
+            {
+                return it->second.get();
+            }
+            else if (READ_FROM_MEMORY == type)
+            {
+                RMQ_WARN("No offset in memory, MQ:%s", mq.toString().c_str());
+                return -1;
+            }
+        }
+        case READ_FROM_STORE:
+        {
+            OffsetSerializeWrapperPtr offsetSerializeWrapper;
+            try
+            {
+                offsetSerializeWrapper = this->readLocalOffset();
+            }
+            catch (std::exception& e)
+            {
+                RMQ_WARN("load offset file fail, MQ:%s, exception:%s", mq.toString().c_str(), e.what());
+                return -1;
+            }
+
+            if (offsetSerializeWrapper.ptr() != NULL)
+            {
+                std::map<MessageQueue, kpr::AtomicLong>& offsetTable = offsetSerializeWrapper->getOffsetTable();
+                typeof(offsetTable.begin()) it = offsetTable.find(mq);
+                if (it != offsetTable.end())
+                {
+                    kpr::ScopedWLock<kpr::RWMutex> lock(m_tableMutex);
+                    m_offsetTable[mq] = it->second.get();
+                    return it->second.get();
+                }
+            }
+            return -1;
+        }
+        default:
+            break;
+    }
+
+    return -1;
+}
+
+
+void  LocalFileOffsetStore::persistAll(std::set<MessageQueue>& mqs)
+{
+    RMQ_DEBUG("persistAll, mqs.size={%u}, mqs=%s",
+              (unsigned)mqs.size(), UtilAll::toString(mqs).c_str());
+    if (mqs.empty())
+    {
+        return;
+    }
+    RMQ_DEBUG("persistAll, m_offsetTable.size={%u}, m_offsetTable=%s",
+              (unsigned)m_offsetTable.size(), UtilAll::toString(m_offsetTable).c_str());
+
+    OffsetSerializeWrapper offsetSerializeWrapper;
+    std::map<MessageQueue, kpr::AtomicLong>& offsetTable = offsetSerializeWrapper.getOffsetTable();
+    {
+        kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+        RMQ_FOR_EACH(m_offsetTable, it)
+        {
+            MessageQueue mq = it->first;
+            kpr::AtomicLong& offset = it->second;
+            if (mqs.find(mq) != mqs.end())
+            {
+                offsetTable[mq] = offset;
+            }
+        }
+    }
+
+    RMQ_DEBUG("persistAll, offsetTable.size={%u}, offsetTable=%s",
+              (unsigned)offsetTable.size(), UtilAll::toString(offsetTable).c_str());
+
+    std::string jsonString;
+    offsetSerializeWrapper.encode(jsonString);
+    RMQ_DEBUG("persistAll, json=%s", jsonString.c_str());
+
+    if (!jsonString.empty())
+    {
+        try
+        {
+            kpr::FileUtil::makeDirRecursive(kpr::FileUtil::extractFilePath(m_storePath));
+            MixAll::string2File(m_storePath, jsonString);
+        }
+        catch (const std::exception& e)
+        {
+            RMQ_ERROR("persistAll consumer offset Exception, %s, %s", m_storePath.c_str(), e.what());
+        }
+    }
+}
+
+void  LocalFileOffsetStore::persist(const MessageQueue& mq)
+{
+}
+
+void  LocalFileOffsetStore::removeOffset(const MessageQueue& mq)
+{
+}
+
+
+std::map<MessageQueue, long long> LocalFileOffsetStore::cloneOffsetTable(const std::string& topic)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_tableMutex);
+    std::map<MessageQueue, long long> cloneOffsetTable;
+    RMQ_FOR_EACH(m_offsetTable, it)
+    {
+        MessageQueue mq = it->first;
+        kpr::AtomicLong& offset = it->second;
+        if (topic == mq.getTopic())
+        {
+            cloneOffsetTable[mq] = offset.get();
+        }
+    }
+
+    return cloneOffsetTable;
+}
+
+
+OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffset()
+{
+    std::string content = MixAll::file2String(m_storePath);
+    if (content.length() == 0)
+    {
+        return this->readLocalOffsetBak();
+    }
+    else
+    {
+        OffsetSerializeWrapper* offsetSerializeWrapper = NULL;
+        try
+        {
+            offsetSerializeWrapper = OffsetSerializeWrapper::decode(content.c_str(), content.size());
+        }
+        catch (const MQException& e)
+        {
+            RMQ_WARN("readLocalOffset Exception, and try to correct, %s", e.what());
+            return this->readLocalOffsetBak();
+        }
+
+        return offsetSerializeWrapper;
+    }
+}
+
+
+OffsetSerializeWrapper* LocalFileOffsetStore::readLocalOffsetBak()
+{
+    std::string content = MixAll::file2String(m_storePath + ".bak");
+    if (content.length() > 0)
+    {
+        OffsetSerializeWrapper* offsetSerializeWrapper = NULL;
+        try
+        {
+            offsetSerializeWrapper = OffsetSerializeWrapper::decode(content.c_str(), content.size());
+        }
+        catch (const MQException& e)
+        {
+            RMQ_WARN("readLocalOffset Exception, maybe json content invalid, %s", e.what());
+        }
+
+        return offsetSerializeWrapper;
+    }
+
+    return NULL;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h
new file mode 100755
index 0000000..c4efb76
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/LocalFileOffsetStore.h
@@ -0,0 +1,61 @@
+/**
+* Copyright (C) 2013 suwenkuang ,hooligan_520@qq.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __LOCALFILEOFFSETSTORE_H__
+#define __LOCALFILEOFFSETSTORE_H__
+#include <map>
+#include <string>
+#include <set>
+
+#include "RocketMQClient.h"
+#include "OffsetStore.h"
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "Mutex.h"
+
+namespace rmq
+{
+    class MQClientFactory;
+    class MessageQueue;
+    class OffsetSerializeWrapper;
+
+    class LocalFileOffsetStore : public OffsetStore
+    {
+    public:
+        LocalFileOffsetStore(MQClientFactory* pMQClientFactory, const std::string& groupName);
+
+        void load();
+        void updateOffset(const MessageQueue& mq, long long offset, bool increaseOnly);
+        long long readOffset(const MessageQueue& mq, ReadOffsetType type);
+        void persistAll(std::set<MessageQueue>& mqs);
+        void persist(const MessageQueue& mq);
+        void removeOffset(const MessageQueue& mq) ;
+        std::map<MessageQueue, long long> cloneOffsetTable(const std::string& topic);
+
+    private:
+        OffsetSerializeWrapper* readLocalOffset();
+        OffsetSerializeWrapper* readLocalOffsetBak();
+
+    private:
+        MQClientFactory* m_pMQClientFactory;
+        std::string m_groupName;
+        std::string m_storePath;
+        std::map<MessageQueue, kpr::AtomicLong> m_offsetTable;
+        kpr::RWMutex m_tableMutex;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/MQConsumerInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/MQConsumerInner.h b/rocketmq-client4cpp/src/consumer/MQConsumerInner.h
new file mode 100755
index 0000000..ed83621
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/MQConsumerInner.h
@@ -0,0 +1,46 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __MQCONSUMERINNER_H__
+#define __MQCONSUMERINNER_H__
+
+#include <string>
+#include <set>
+
+#include "ConsumeType.h"
+#include "SubscriptionData.h"
+
+namespace rmq
+{
+    class MessageQueue;
+
+    class MQConsumerInner
+    {
+    public:
+        virtual ~MQConsumerInner() {}
+        virtual std::string groupName() = 0;
+        virtual MessageModel messageModel() = 0;
+        virtual ConsumeType consumeType() = 0;
+        virtual ConsumeFromWhere consumeFromWhere() = 0;
+        virtual std::set<SubscriptionData> subscriptions() = 0;
+        virtual void doRebalance() = 0;
+        virtual void persistConsumerOffset() = 0;
+        virtual void updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info) = 0;
+        virtual bool isSubscribeTopicNeedUpdate(const std::string& topic) = 0;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/MessageQueueLock.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/MessageQueueLock.h b/rocketmq-client4cpp/src/consumer/MessageQueueLock.h
new file mode 100755
index 0000000..65af99e
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/MessageQueueLock.h
@@ -0,0 +1,68 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MESSAGEQUEUELOCK_H__
+#define __MESSAGEQUEUELOCK_H__
+
+#include <map>
+#include "Mutex.h"
+#include "ScopedLock.h"
+#include "MessageQueue.h"
+
+namespace rmq
+{
+    class MessageQueueLock
+    {
+    public:
+        MessageQueueLock()
+        {
+
+        }
+
+        ~MessageQueueLock()
+        {
+            std::map<MessageQueue, kpr::Mutex*>::iterator it = m_mqLockTable.begin();
+
+            for (; it != m_mqLockTable.end(); it++)
+            {
+                delete it->second;
+            }
+        }
+
+        kpr::Mutex* fetchLockObject(MessageQueue& mq)
+        {
+            kpr::ScopedLock<kpr::Mutex> lock(m_lock);
+            std::map<MessageQueue, kpr::Mutex*>::iterator it = m_mqLockTable.find(mq);
+            kpr::Mutex* objLock;
+            if (it == m_mqLockTable.end())
+            {
+                objLock = new kpr::Mutex();
+                m_mqLockTable[mq] = objLock;
+            }
+            else
+            {
+                objLock = it->second;
+            }
+
+            return objLock;
+        }
+
+    private:
+        std::map<MessageQueue, kpr::Mutex*> m_mqLockTable;
+        kpr::Mutex m_lock;
+    };
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp b/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp
new file mode 100755
index 0000000..f90e502
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ProcessQueue.cpp
@@ -0,0 +1,445 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "ProcessQueue.h"
+#include "MessageExt.h"
+#include "KPRUtil.h"
+#include "UtilAll.h"
+#include "ScopedLock.h"
+#include "DefaultMQPushConsumer.h"
+#include "DefaultMQPushConsumerImpl.h"
+
+namespace rmq
+{
+
+ProcessQueue::ProcessQueue()
+{
+    m_lastPullTimestamp = KPRUtil::GetCurrentTimeMillis();
+    m_lastConsumeTimestamp = KPRUtil::GetCurrentTimeMillis();
+    m_queueOffsetMax = 0L;
+    m_msgCount = 0;
+    m_dropped = false;
+    m_locked = false;
+    m_lastLockTimestamp = KPRUtil::GetCurrentTimeMillis();
+    m_consuming = false;
+}
+
+bool ProcessQueue::isLockExpired()
+{
+    bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastLockTimestamp) >
+                  s_RebalanceLockMaxLiveTime;
+    return result;
+}
+
+bool ProcessQueue::isPullExpired()
+{
+    bool result = (KPRUtil::GetCurrentTimeMillis() - m_lastPullTimestamp) >
+                  s_PullMaxIdleTime;
+    return result;
+}
+
+
+void ProcessQueue::cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer)
+{
+    if (pPushConsumer->getDefaultMQPushConsumerImpl()->isConsumeOrderly())
+    {
+        return;
+    }
+
+	long long now = KPRUtil::GetCurrentTimeMillis();
+    int loop = m_msgTreeMap.size() < 16 ? m_msgTreeMap.size() : 16;
+    for (int i = 0; i < loop; i++)
+    {
+        MessageExt* msg = NULL;
+        try
+        {
+            kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap);
+            if (m_msgTreeMap.empty())
+            {
+            	return;
+            }
+
+            MessageExt* firstMsg = m_msgTreeMap.begin()->second;
+            long long startTimestamp = UtilAll::str2ll(firstMsg->getProperty(Message::PROPERTY_CONSUME_START_TIMESTAMP).c_str());
+            if (startTimestamp > 0 && (now - startTimestamp) > (pPushConsumer->getConsumeTimeout() * 60 * 1000))
+            {
+                msg = firstMsg;
+            }
+            else
+            {
+                return;
+            }
+        }
+        catch (...)
+        {
+            RMQ_ERROR("getExpiredMsg exception");
+        }
+
+        try
+        {
+            pPushConsumer->sendMessageBack((*msg), 3);
+            RMQ_WARN("send expire msg back. topic={%s}, msgId={%s}, storeHost={%s}, queueId={%d}, queueOffset={%lld}",
+                     msg->getTopic().c_str(), msg->getMsgId().c_str(), msg->getStoreHostString().c_str(),
+                     msg->getQueueId(), msg->getQueueOffset());
+
+            try
+            {
+                kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+                if (!m_msgTreeMap.empty() && msg->getQueueOffset() == m_msgTreeMap.begin()->first)
+                {
+                    try
+                    {
+                        m_msgTreeMap.erase(m_msgTreeMap.begin());
+                        m_msgCount -= 1;
+                        // if free msg, may be coredump
+                        //delete msg;
+                    }
+                    catch (...)
+                    {
+                        RMQ_ERROR("send expired msg exception");
+                    }
+                }
+
+            }
+            catch (...)
+            {
+                RMQ_ERROR("delExpiredMsg exception");
+            }
+        }
+        catch (...)
+        {
+            RMQ_ERROR("send expired msg exception");
+        }
+    }
+}
+
+
+bool ProcessQueue::putMessage(const std::list<MessageExt *> &msgs)
+{
+    bool dispathToConsume = false;
+
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+        int validMsgCnt = 0;
+        std::list<MessageExt *>::const_iterator it = msgs.begin();
+
+        for (; it != msgs.end(); it++)
+        {
+            MessageExt *msg = (*it);
+
+            if (m_msgTreeMap.find(msg->getQueueOffset()) == m_msgTreeMap.end())
+            {
+                validMsgCnt++;
+                m_queueOffsetMax = msg->getQueueOffset();
+            }
+
+            m_msgTreeMap[msg->getQueueOffset()] = msg;
+        }
+
+        m_msgCount += validMsgCnt;
+
+        if (!m_msgTreeMap.empty() && !m_consuming)
+        {
+            dispathToConsume = true;
+            m_consuming = true;
+        }
+    }
+    catch (...)
+    {
+        RMQ_ERROR("putMessage exception");
+    }
+
+    return dispathToConsume;
+}
+
+long long ProcessQueue::getMaxSpan()
+{
+    try
+    {
+        kpr::ScopedRLock<kpr::RWMutex> lock(m_lockTreeMap);
+
+        if (!m_msgTreeMap.empty())
+        {
+            std::map<long long, MessageExt *>::iterator it1 = m_msgTreeMap.begin();
+            std::map<long long, MessageExt *>::iterator it2 = m_msgTreeMap.end();
+            it2--;
+            return it2->first - it1->first;
+        }
+    }
+    catch (...)
+    {
+        RMQ_ERROR("getMaxSpan exception");
+    }
+
+    return 0;
+}
+
+long long ProcessQueue::removeMessage(std::list<MessageExt *> &msgs)
+{
+    long long result = -1;
+    unsigned long long now = KPRUtil::GetCurrentTimeMillis();
+
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+        m_lastConsumeTimestamp = now;
+
+        if (!m_msgTreeMap.empty())
+        {
+            result = m_queueOffsetMax + 1;
+            int removedCnt = 0;
+            std::list<MessageExt *>::iterator it = msgs.begin();
+
+            for (; it != msgs.end();)
+            {
+                MessageExt *msg = (*it);
+
+                if (m_msgTreeMap.find(msg->getQueueOffset()) != m_msgTreeMap.end())
+                {
+                    removedCnt++;
+                }
+
+                m_msgTreeMap.erase(msg->getQueueOffset());
+                //TODO delete message?
+                it = msgs.erase(it);
+                delete msg;
+            }
+
+            m_msgCount -= removedCnt;
+
+            if (!m_msgTreeMap.empty())
+            {
+                std::map<long long, MessageExt *>::iterator it = m_msgTreeMap.begin();
+                result = it->first;
+            }
+        }
+    }
+    catch (...)
+    {
+        RMQ_ERROR("removeMessage exception");
+    }
+
+    return result;
+}
+
+
+void ProcessQueue::clear()
+{
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+        m_msgTreeMap.clear();
+        m_msgTreeMapTemp.clear();
+        m_msgCount.set(0);
+        m_queueOffsetMax = 0;
+    }
+    catch (...)
+    {
+        RMQ_ERROR("clear exception");
+    }
+
+    return;
+}
+
+
+std::map<long long, MessageExt *> ProcessQueue::getMsgTreeMap()
+{
+    return m_msgTreeMap;
+}
+
+kpr::AtomicInteger ProcessQueue::getMsgCount()
+{
+    return m_msgCount;
+}
+
+bool ProcessQueue::isDropped()
+{
+    return m_dropped;
+}
+
+void ProcessQueue::setDropped(bool dropped)
+{
+    m_dropped = dropped;
+}
+
+unsigned long long ProcessQueue::getLastPullTimestamp()
+{
+    return m_lastPullTimestamp;
+}
+
+
+void ProcessQueue::setLastPullTimestamp(unsigned long long lastPullTimestamp)
+{
+    m_lastPullTimestamp = lastPullTimestamp;
+}
+
+
+unsigned long long ProcessQueue::getLastConsumeTimestamp()
+{
+    return m_lastConsumeTimestamp;
+}
+
+
+void ProcessQueue::setLastConsumeTimestamp(unsigned long long
+        lastConsumeTimestamp)
+{
+    m_lastConsumeTimestamp = lastConsumeTimestamp;
+}
+
+
+/**
+* ========================================================================
+*/
+kpr::Mutex &ProcessQueue::getLockConsume()
+{
+    return m_lockConsume;
+}
+
+void ProcessQueue::setLocked(bool locked)
+{
+    m_locked = locked;
+}
+
+bool ProcessQueue::isLocked()
+{
+    return m_locked;
+}
+
+long long ProcessQueue::getTryUnlockTimes()
+{
+    return m_tryUnlockTimes.get();
+}
+
+void ProcessQueue::incTryUnlockTimes()
+{
+    m_tryUnlockTimes++;
+}
+
+
+void ProcessQueue::rollback()
+{
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+        m_msgTreeMap = m_msgTreeMapTemp;
+        m_msgTreeMapTemp.clear();
+    }
+    catch (...)
+    {
+        RMQ_ERROR("rollback exception");
+    }
+}
+
+long long ProcessQueue::commit()
+{
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+
+        if (!m_msgTreeMapTemp.empty())
+        {
+            std::map<long long, MessageExt *>::iterator it = m_msgTreeMapTemp.end();
+            it--;
+            long long offset = it->first;
+            m_msgCount -= m_msgTreeMapTemp.size();
+            m_msgTreeMapTemp.clear();
+            return offset + 1;
+        }
+    }
+    catch (...)
+    {
+        RMQ_ERROR("commit exception");
+    }
+
+    return -1;
+}
+
+void ProcessQueue::makeMessageToCosumeAgain(const std::list<MessageExt *> &msgs)
+{
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+        std::list<MessageExt *>::const_iterator it = msgs.begin();
+
+        for (; it != msgs.end(); it++)
+        {
+            MessageExt *msg = (*it);
+            m_msgTreeMapTemp.erase(msg->getQueueOffset());
+            m_msgTreeMap[msg->getQueueOffset()] = msg;
+        }
+    }
+    catch (...)
+    {
+        RMQ_ERROR("makeMessageToCosumeAgain exception");
+    }
+}
+
+std::list<MessageExt *> ProcessQueue::takeMessages(int batchSize)
+{
+    std::list<MessageExt *> result;
+    unsigned long long now = KPRUtil::GetCurrentTimeMillis();
+
+    try
+    {
+        kpr::ScopedWLock<kpr::RWMutex> lock(m_lockTreeMap);
+        m_lastConsumeTimestamp = now;
+
+        if (!m_msgTreeMap.empty())
+        {
+            for (int i = 0; i < batchSize; i++)
+            {
+                std::map<long long, MessageExt *>::iterator it = m_msgTreeMap.begin();
+
+                if (it != m_msgTreeMap.end())
+                {
+                    result.push_back(it->second);
+                    m_msgTreeMapTemp[it->first] = it->second;
+                    m_msgTreeMap.erase(it);
+                }
+                else
+                {
+                    break;
+                }
+            }
+
+            if (result.empty())
+            {
+                m_consuming = false;
+            }
+        }
+    }
+    catch (...)
+    {
+        RMQ_ERROR("takeMessags exception");
+    }
+
+    return result;
+}
+
+long long ProcessQueue::getLastLockTimestamp()
+{
+    return m_lastLockTimestamp;
+}
+
+void ProcessQueue::setLastLockTimestamp(long long lastLockTimestamp)
+{
+    m_lastLockTimestamp = lastLockTimestamp;
+}
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ProcessQueue.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ProcessQueue.h b/rocketmq-client4cpp/src/consumer/ProcessQueue.h
new file mode 100755
index 0000000..559dd7f
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ProcessQueue.h
@@ -0,0 +1,102 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PROCESSQUEUE_H__
+#define __PROCESSQUEUE_H__
+
+#include <list>
+#include <map>
+#include "Mutex.h"
+#include "AtomicValue.h"
+
+namespace rmq
+{
+    class MessageExt;
+	class DefaultMQPushConsumer;
+
+    class ProcessQueue
+    {
+    public:
+		static const unsigned int s_RebalanceLockMaxLiveTime = 30000;
+		static const unsigned int s_RebalanceLockInterval = 20000;
+		static const unsigned int s_PullMaxIdleTime = 120000;
+
+    public:
+        ProcessQueue();
+
+        bool isLockExpired();
+		bool isPullExpired();
+
+		void cleanExpiredMsg(DefaultMQPushConsumer* pPushConsumer);
+        bool putMessage(const std::list<MessageExt*>& msgs);
+
+        long long getMaxSpan();
+        long long removeMessage(std::list<MessageExt*>& msgs);
+
+		void clear();
+
+        std::map<long long, MessageExt*> getMsgTreeMap();
+        kpr::AtomicInteger getMsgCount();
+        bool isDropped();
+        void setDropped(bool dropped);
+
+		unsigned long long getLastPullTimestamp();
+		void setLastPullTimestamp(unsigned long long lastPullTimestamp);
+
+		unsigned long long getLastConsumeTimestamp();
+		void setLastConsumeTimestamp(unsigned long long lastConsumeTimestamp);
+
+        /**
+        * ========================================================================
+        */
+		kpr::Mutex& getLockConsume();
+        void setLocked(bool locked);
+        bool isLocked();
+		long long getTryUnlockTimes();
+		void incTryUnlockTimes();
+
+        void rollback();
+        long long commit();
+        void makeMessageToCosumeAgain(const std::list<MessageExt*>& msgs);
+
+        std::list<MessageExt*> takeMessages(int batchSize);
+
+        long long getLastLockTimestamp();
+        void setLastLockTimestamp(long long lastLockTimestamp);
+
+
+    private:
+        kpr::RWMutex m_lockTreeMap;
+        std::map<long long, MessageExt*> m_msgTreeMap;
+        volatile long long m_queueOffsetMax ;
+        kpr::AtomicInteger m_msgCount;
+        volatile bool m_dropped;
+        volatile unsigned long long m_lastPullTimestamp;
+		volatile unsigned long long m_lastConsumeTimestamp;
+
+        /**
+        * order message
+        */
+        kpr::Mutex m_lockConsume;
+        volatile bool m_locked;
+        volatile unsigned long long m_lastLockTimestamp;
+        volatile bool m_consuming;
+        std::map<long long, MessageExt*> m_msgTreeMapTemp;
+        kpr::AtomicInteger m_tryUnlockTimes;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp
new file mode 100755
index 0000000..c520e4c
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.cpp
@@ -0,0 +1,222 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "PullAPIWrapper.h"
+
+#include <stdlib.h>
+#include <list>
+#include <set>
+#include "ScopedLock.h"
+#include "MQClientFactory.h"
+#include "PullCallback.h"
+#include "MixAll.h"
+#include "PullSysFlag.h"
+#include "CommandCustomHeader.h"
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "SubscriptionData.h"
+#include "UtilAll.h"
+#include "MessageExt.h"
+#include "PullResultExt.h"
+#include "MessageDecoder.h"
+#include "VirtualEnvUtil.h"
+
+namespace rmq
+{
+
+PullAPIWrapper::PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& consumerGroup)
+{
+    m_pMQClientFactory = pMQClientFactory;
+    m_consumerGroup = consumerGroup;
+}
+
+void  PullAPIWrapper::updatePullFromWhichNode(MessageQueue& mq, long brokerId)
+{
+    std::map<MessageQueue, kpr::AtomicInteger>::iterator it;
+    {
+        kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock);
+        it = m_pullFromWhichNodeTable.find(mq);
+        if (it != m_pullFromWhichNodeTable.end())
+        {
+            it->second.set(brokerId);
+            return;
+        }
+    }
+
+    kpr::ScopedWLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock);
+    m_pullFromWhichNodeTable[mq] = kpr::AtomicInteger(brokerId);
+}
+
+PullResult* PullAPIWrapper::processPullResult(MessageQueue& mq,
+        PullResult& pullResult,
+        SubscriptionData& subscriptionData)
+{
+    std::string projectGroupPrefix = m_pMQClientFactory->getMQClientAPIImpl()->getProjectGroupPrefix();
+    PullResultExt& pullResultExt = (PullResultExt&) pullResult;
+
+    updatePullFromWhichNode(mq, pullResultExt.suggestWhichBrokerId);
+
+    if (pullResult.pullStatus == FOUND)
+    {
+        std::list<MessageExt*> msgList =
+            MessageDecoder::decodes(pullResultExt.messageBinary, pullResultExt.messageBinaryLen);
+
+        std::list<MessageExt*> msgListFilterAgain;
+
+        if (!subscriptionData.getTagsSet().empty())
+        {
+            std::list<MessageExt*>::iterator it = msgList.begin();
+            for (; it != msgList.end();)
+            {
+                MessageExt* msg = *it;
+                if (!msg->getTags().empty())
+                {
+                    std::set<std::string>& tags = subscriptionData.getTagsSet();
+                    if (tags.find(msg->getTags()) != tags.end())
+                    {
+                        msgListFilterAgain.push_back(msg);
+                        it = msgList.erase(it);
+                    }
+                    else
+                    {
+                        it++;
+                    }
+                }
+            }
+        }
+        else
+        {
+            msgListFilterAgain.assign(msgList.begin(), msgList.end());
+            msgList.clear();
+        }
+
+        if (!UtilAll::isBlank(projectGroupPrefix))
+        {
+            subscriptionData.setTopic(VirtualEnvUtil::clearProjectGroup(subscriptionData.getTopic(),
+                                      projectGroupPrefix));
+            mq.setTopic(VirtualEnvUtil::clearProjectGroup(mq.getTopic(), projectGroupPrefix));
+
+            std::list<MessageExt*>::iterator it = msgListFilterAgain.begin();
+            for (; it != msgListFilterAgain.end(); it++)
+            {
+                MessageExt* msg = *it;
+                msg->setTopic(VirtualEnvUtil::clearProjectGroup(msg->getTopic(), projectGroupPrefix));
+
+                msg->putProperty(Message::PROPERTY_MIN_OFFSET, UtilAll::toString(pullResult.minOffset));
+                msg->putProperty(Message::PROPERTY_MAX_OFFSET, UtilAll::toString(pullResult.maxOffset));
+            }
+        }
+        else
+        {
+            std::list<MessageExt*>::iterator it = msgListFilterAgain.begin();
+            for (; it != msgListFilterAgain.end(); it++)
+            {
+                MessageExt* msg = *it;
+
+                msg->putProperty(Message::PROPERTY_MIN_OFFSET, UtilAll::toString(pullResult.minOffset));
+                msg->putProperty(Message::PROPERTY_MAX_OFFSET, UtilAll::toString(pullResult.maxOffset));
+            }
+        }
+
+        std::list<MessageExt*>::iterator it = msgListFilterAgain.begin();
+        for (; it != msgListFilterAgain.end(); it++)
+        {
+            pullResultExt.msgFoundList.push_back(*it);
+        }
+
+        it = msgList.begin();
+        for (; it != msgList.end(); it++)
+        {
+            delete *it;
+        }
+
+        delete[] pullResultExt.messageBinary;
+        pullResultExt.messageBinary = NULL;
+        pullResultExt.messageBinaryLen = 0;
+    }
+
+    return &pullResult;
+}
+
+long PullAPIWrapper::recalculatePullFromWhichNode(MessageQueue& mq)
+{
+    kpr::ScopedRLock<kpr::RWMutex> lock(m_pullFromWhichNodeTableLock);
+    std::map<MessageQueue, kpr::AtomicInteger>::iterator it = m_pullFromWhichNodeTable.find(mq);
+    if (it != m_pullFromWhichNodeTable.end())
+    {
+        return it->second.get();
+    }
+
+    return MixAll::MASTER_ID;
+}
+
+PullResult* PullAPIWrapper::pullKernelImpl(MessageQueue& mq,
+        const std::string& subExpression,
+        long long subVersion,
+        long long offset,
+        int maxNums,
+        int sysFlag,
+        long long commitOffset,
+        long long brokerSuspendMaxTimeMillis,
+        int timeoutMillis,
+        CommunicationMode communicationMode,
+        PullCallback* pPullCallback)
+{
+    FindBrokerResult findBrokerResult =
+        m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
+                recalculatePullFromWhichNode(mq), false);
+    if (findBrokerResult.brokerAddr.empty())
+    {
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic());
+        findBrokerResult = m_pMQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
+                           recalculatePullFromWhichNode(mq), false);
+    }
+
+    if (!findBrokerResult.brokerAddr.empty())
+    {
+        int sysFlagInner = sysFlag;
+
+        if (findBrokerResult.slave)
+        {
+            sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
+        }
+
+        PullMessageRequestHeader* requestHeader = new PullMessageRequestHeader();
+        requestHeader->consumerGroup = m_consumerGroup;
+        requestHeader->topic = mq.getTopic();
+        requestHeader->queueId = mq.getQueueId();
+        requestHeader->queueOffset = offset;
+        requestHeader->maxMsgNums = maxNums;
+        requestHeader->sysFlag = sysFlagInner;
+        requestHeader->commitOffset = commitOffset;
+        requestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
+        requestHeader->subscription = subExpression;
+        requestHeader->subVersion = subVersion;
+
+        PullResult* pullResult = m_pMQClientFactory->getMQClientAPIImpl()->pullMessage(//
+                                     findBrokerResult.brokerAddr,//
+                                     requestHeader,//
+                                     timeoutMillis,//
+                                     communicationMode,//
+                                     pPullCallback);
+
+        return pullResult;
+    }
+
+    THROW_MQEXCEPTION(MQClientException, "The broker[" + mq.getBrokerName() + "] not exist", -1);
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h
new file mode 100755
index 0000000..d5ec787
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullAPIWrapper.h
@@ -0,0 +1,67 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PULLAPIWRAPPER_H__
+#define __PULLAPIWRAPPER_H__
+
+#include <string>
+#include <map>
+
+#include "AtomicValue.h"
+#include "PullResult.h"
+#include "MessageQueue.h"
+#include "CommunicationMode.h"
+#include "Mutex.h"
+
+namespace rmq
+{
+  class MQClientFactory;
+  class PullCallback;
+  class SubscriptionData;
+
+  class PullAPIWrapper
+  {
+  public:
+      PullAPIWrapper(MQClientFactory* pMQClientFactory, const std::string& consumerGroup);
+      void updatePullFromWhichNode(MessageQueue& mq, long brokerId);
+
+
+      PullResult* processPullResult(MessageQueue& mq,
+                                    PullResult& pullResult,
+                                    SubscriptionData& subscriptionData);
+      long recalculatePullFromWhichNode(MessageQueue& mq);
+
+      PullResult* pullKernelImpl(MessageQueue& mq,
+                                 const std::string& subExpression,
+                                 long long subVersion,
+                                 long long offset,
+                                 int maxNums,
+                                 int sysFlag,
+                                 long long commitOffset,
+                                 long long brokerSuspendMaxTimeMillis,
+                                 int timeoutMillis,
+                                 CommunicationMode communicationMode,
+                                 PullCallback* pPullCallback);
+
+  private:
+      std::map<MessageQueue, kpr::AtomicInteger> m_pullFromWhichNodeTable;
+      kpr::RWMutex m_pullFromWhichNodeTableLock;
+      MQClientFactory* m_pMQClientFactory;
+      std::string m_consumerGroup;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullMessageService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.cpp b/rocketmq-client4cpp/src/consumer/PullMessageService.cpp
new file mode 100755
index 0000000..6d9972e
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullMessageService.cpp
@@ -0,0 +1,171 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "PullMessageService.h"
+#include <list>
+#include "MQClientFactory.h"
+#include "MQConsumerInner.h"
+#include "PullRequest.h"
+#include "DefaultMQPushConsumerImpl.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+class SubmitPullRequestLater : public kpr::TimerHandler
+{
+public:
+    SubmitPullRequestLater(PullMessageService* pService, PullRequest* pPullRequest)
+        : m_pService(pService), m_pPullRequest(pPullRequest)
+    {
+
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+    	try
+    	{
+        	m_pService->executePullRequestImmediately(m_pPullRequest);
+        }
+        catch(...)
+        {
+        	RMQ_ERROR("SubmitPullRequestLater OnTimeOut exception");
+        }
+
+        delete this;
+    }
+
+private:
+    PullMessageService* m_pService;
+    PullRequest* m_pPullRequest;
+};
+
+
+PullMessageService::PullMessageService(MQClientFactory* pMQClientFactory)
+    : ServiceThread("PullMessageService"),
+      m_pMQClientFactory(pMQClientFactory)
+{
+    m_TimerThread = new kpr::TimerThread("PullMessageService-timer", 10);
+    m_TimerThread->Start();
+}
+
+
+PullMessageService::~PullMessageService()
+{
+
+}
+
+
+void PullMessageService::executePullRequestLater(PullRequest* pPullRequest, long timeDelay)
+{
+    SubmitPullRequestLater* pHandler = new SubmitPullRequestLater(this, pPullRequest);
+    m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false);
+}
+
+
+void PullMessageService::executeTaskLater(kpr::TimerHandler* pHandler, long timeDelay)
+{
+    m_TimerThread->RegisterTimer(0, timeDelay, pHandler, false);
+}
+
+
+void PullMessageService::executePullRequestImmediately(PullRequest* pPullRequest)
+{
+    try
+    {
+        {
+            kpr::ScopedLock<kpr::Mutex> lock(m_lock);
+            m_pullRequestQueue.push_back(pPullRequest);
+        }
+
+        wakeup();
+    }
+    catch (...)
+    {
+    	RMQ_ERROR("executePullRequestImmediately pullRequestQueue.push");
+    }
+}
+
+void PullMessageService::Run()
+{
+	RMQ_INFO("%s service started", getServiceName().c_str());
+
+    while (!m_stoped)
+    {
+        try
+        {
+            bool wait = false;
+            {
+                kpr::ScopedLock<kpr::Mutex> lock(m_lock);
+                if (m_pullRequestQueue.empty())
+                {
+                    wait = true;
+                }
+            }
+
+            if (wait)
+            {
+                waitForRunning(5000);
+            }
+
+            PullRequest* pullRequest = NULL;
+            {
+                kpr::ScopedLock<kpr::Mutex> lock(m_lock);
+                if (!m_pullRequestQueue.empty())
+                {
+                    pullRequest = m_pullRequestQueue.front();
+                    m_pullRequestQueue.pop_front();
+                }
+            }
+
+            if (pullRequest != NULL)
+            {
+                pullMessage(pullRequest);
+            }
+        }
+        catch (...)
+        {
+			RMQ_ERROR("Pull Message Service Run Method exception");
+        }
+    }
+
+    m_TimerThread->Stop();
+    m_TimerThread->Join();
+
+    RMQ_INFO("%s service end", getServiceName().c_str());
+}
+
+std::string PullMessageService::getServiceName()
+{
+    return "PullMessageService";
+}
+
+
+void PullMessageService::pullMessage(PullRequest* pPullRequest)
+{
+    MQConsumerInner* consumer = m_pMQClientFactory->selectConsumer(pPullRequest->getConsumerGroup());
+    if (consumer != NULL)
+    {
+        DefaultMQPushConsumerImpl* impl = (DefaultMQPushConsumerImpl*) consumer;
+        impl->pullMessage(pPullRequest);
+    }
+    else
+    {
+        RMQ_WARN("No matched consumer for the PullRequest {%s}, drop it", pPullRequest->toString().c_str());
+    }
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/PullMessageService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/PullMessageService.h b/rocketmq-client4cpp/src/consumer/PullMessageService.h
new file mode 100755
index 0000000..d6ebcee
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/PullMessageService.h
@@ -0,0 +1,56 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __PULLMESSAGESERVICE_H__
+#define __PULLMESSAGESERVICE_H__
+
+#include <list>
+#include "RocketMQClient.h"
+#include "ServiceThread.h"
+#include "TimerThread.h"
+#include "PullRequest.h"
+
+namespace rmq
+{
+    class MQClientFactory;
+    class MQConsumerInner;
+    class PullRequest;
+
+    class PullMessageService : public ServiceThread
+    {
+    public:
+        PullMessageService(MQClientFactory* pMQClientFactory);
+        ~PullMessageService();
+
+        void executePullRequestLater(PullRequest* pPullRequest, long timeDelay);
+		void executeTaskLater(kpr::TimerHandler* pHandler, long timeDelay);
+
+        void executePullRequestImmediately(PullRequest* pPullRequest);
+        std::string getServiceName();
+
+        void Run();
+    private:
+        void pullMessage(PullRequest* pPullRequest);
+
+    private:
+        std::list<PullRequest*> m_pullRequestQueue;
+        kpr::Mutex m_lock;
+        MQClientFactory* m_pMQClientFactory;
+        kpr::TimerThreadPtr m_TimerThread;
+    };
+	typedef kpr::RefHandleT<PullMessageService> PullMessageServicePtr;
+}
+
+#endif



Mime
View raw message