rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [12/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
new file mode 100755
index 0000000..c7d9695
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -0,0 +1,574 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include <list>
+#include <string>
+
+#include "ConsumeMessageOrderlyService.h"
+#include "DefaultMQPushConsumerImpl.h"
+#include "MQClientFactory.h"
+#include "DefaultMQProducer.h"
+#include "MessageListener.h"
+#include "MessageQueue.h"
+#include "RebalanceImpl.h"
+#include "DefaultMQPushConsumer.h"
+#include "OffsetStore.h"
+#include "ScopedLock.h"
+#include "KPRUtil.h"
+#include "MixAll.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+
+class LockMq : public kpr::TimerHandler
+{
+public:
+    LockMq(ConsumeMessageOrderlyService* pService)
+        : m_pService(pService)
+    {
+
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+        m_pService->lockMQPeriodically();
+
+        // can not delete
+        //delete this;
+    }
+
+private:
+    ConsumeMessageOrderlyService* m_pService;
+};
+
+class SubmitConsumeRequestLaterOrderly : public kpr::TimerHandler
+{
+public:
+    SubmitConsumeRequestLaterOrderly(ProcessQueue* pProcessQueue,
+                                     const MessageQueue& messageQueue,
+                                     ConsumeMessageOrderlyService* pService)
+        : m_pProcessQueue(pProcessQueue),
+          m_messageQueue(messageQueue),
+          m_pService(pService)
+    {
+
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+    	try
+    	{
+        	std::list<MessageExt*> msgs;
+       		m_pService->submitConsumeRequest(msgs, m_pProcessQueue, m_messageQueue, true);
+        }
+        catch(...)
+        {
+        	RMQ_ERROR("SubmitConsumeRequestLaterOrderly OnTimeOut exception");
+        }
+
+        delete this;
+    }
+
+private:
+    ProcessQueue* m_pProcessQueue;
+    MessageQueue m_messageQueue;
+    ConsumeMessageOrderlyService* m_pService;
+};
+
+
+class TryLockLaterAndReconsume : public kpr::TimerHandler
+{
+public:
+    TryLockLaterAndReconsume(ProcessQueue* pProcessQueue,
+                             MessageQueue& messageQueue,
+                             ConsumeMessageOrderlyService* pService)
+        : m_pProcessQueue(pProcessQueue),
+          m_messageQueue(messageQueue),
+          m_pService(pService)
+    {
+
+    }
+
+    void OnTimeOut(unsigned int timerID)
+    {
+    	try
+    	{
+	        bool lockOK = m_pService->lockOneMQ(m_messageQueue);
+	        if (lockOK)
+	        {
+	            m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10);
+	        }
+	        else
+	        {
+	            m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 3000);
+	        }
+        }
+        catch(...)
+        {
+        	RMQ_ERROR("TryLockLaterAndReconsume OnTimeOut exception");
+        }
+
+        delete this;
+    }
+
+private:
+    ProcessQueue* m_pProcessQueue;
+    MessageQueue m_messageQueue;
+    ConsumeMessageOrderlyService* m_pService;
+};
+
+
+
+ConsumeMessageOrderlyService::ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl* pDefaultMQPushConsumerImpl,
+        MessageListenerOrderly* pMessageListener)
+{
+	m_stoped = false;
+    m_pDefaultMQPushConsumerImpl = pDefaultMQPushConsumerImpl;
+    m_pMessageListener = pMessageListener;
+    m_pDefaultMQPushConsumer = m_pDefaultMQPushConsumerImpl->getDefaultMQPushConsumer();
+    m_consumerGroup = m_pDefaultMQPushConsumer->getConsumerGroup();
+    m_pConsumeExecutor = new kpr::ThreadPool("ConsumeMessageThreadPool", 1,
+    m_pDefaultMQPushConsumer->getConsumeThreadMin(), m_pDefaultMQPushConsumer->getConsumeThreadMax());
+    m_scheduledExecutorService = new kpr::TimerThread("ConsumeMessageConcurrentlyService", 10);
+}
+
+ConsumeMessageOrderlyService::~ConsumeMessageOrderlyService()
+{
+}
+
+
+void ConsumeMessageOrderlyService::start()
+{
+    m_scheduledExecutorService->Start();
+
+    LockMq* lm = new LockMq(this);
+    m_scheduledExecutorService->RegisterTimer(0, ProcessQueue::s_RebalanceLockInterval, lm, true);
+}
+
+void ConsumeMessageOrderlyService::shutdown()
+{
+    m_stoped = true;
+    m_pConsumeExecutor->Destroy();
+    m_scheduledExecutorService->Stop();
+    m_scheduledExecutorService->Join();
+    unlockAllMQ();
+}
+
+void ConsumeMessageOrderlyService::unlockAllMQ()
+{
+    m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->unlockAll(false);
+}
+
+void ConsumeMessageOrderlyService::lockMQPeriodically()
+{
+    if (!m_stoped)
+    {
+        m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lockAll();
+    }
+}
+
+bool ConsumeMessageOrderlyService::lockOneMQ(MessageQueue& mq)
+{
+    if (!m_stoped)
+    {
+        return m_pDefaultMQPushConsumerImpl->getRebalanceImpl()->lock(mq);
+    }
+
+    return false;
+}
+
+void ConsumeMessageOrderlyService::tryLockLaterAndReconsume(MessageQueue& messageQueue,
+        ProcessQueue* pProcessQueue,
+        long long delayMills)
+{
+    TryLockLaterAndReconsume* consume = new TryLockLaterAndReconsume(pProcessQueue, messageQueue, this);
+    m_scheduledExecutorService->RegisterTimer(0, int(delayMills), consume, false);
+}
+
+ConsumerStat& ConsumeMessageOrderlyService::getConsumerStat()
+{
+    return m_pDefaultMQPushConsumerImpl->getConsumerStatManager()->getConsumertat();
+}
+
+void ConsumeMessageOrderlyService::submitConsumeRequestLater(ProcessQueue* pProcessQueue,
+        const MessageQueue& messageQueue,
+        long long suspendTimeMillis)
+{
+    long timeMillis = long(suspendTimeMillis);
+    if (timeMillis < 10)
+    {
+        timeMillis = 10;
+    }
+    else if (timeMillis > 30000)
+    {
+        timeMillis = 30000;
+    }
+
+    SubmitConsumeRequestLaterOrderly* sc = new SubmitConsumeRequestLaterOrderly(pProcessQueue, messageQueue, this);
+    m_scheduledExecutorService->RegisterTimer(0, timeMillis, sc, false);
+}
+
+void ConsumeMessageOrderlyService::submitConsumeRequest(std::list<MessageExt*>& msgs,
+        ProcessQueue* pProcessQueue,
+        const MessageQueue& messageQueue,
+        bool dispathToConsume)
+{
+    if (dispathToConsume)
+    {
+        kpr::ThreadPoolWorkPtr consumeRequest = new ConsumeOrderlyRequest(pProcessQueue, messageQueue, this);
+        m_pConsumeExecutor->AddWork(consumeRequest);
+    }
+}
+
+void ConsumeMessageOrderlyService::updateCorePoolSize(int corePoolSize)
+{
+}
+
+
+std::string& ConsumeMessageOrderlyService::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+MessageListenerOrderly* ConsumeMessageOrderlyService::getMessageListener()
+{
+    return m_pMessageListener;
+}
+
+DefaultMQPushConsumerImpl* ConsumeMessageOrderlyService::getDefaultMQPushConsumerImpl()
+{
+    return m_pDefaultMQPushConsumerImpl;
+}
+
+bool ConsumeMessageOrderlyService::processConsumeResult(std::list<MessageExt*>& msgs,
+        ConsumeOrderlyStatus status,
+        ConsumeOrderlyContext& context,
+        ConsumeOrderlyRequest& consumeRequest)
+{
+    bool continueConsume = true;
+    long long commitOffset = -1L;
+    int msgsSize = msgs.size();
+
+    if (context.autoCommit)
+    {
+        switch (status)
+        {
+            case COMMIT:
+            case ROLLBACK:
+                RMQ_WARN("the message queue consume result is illegal, we think you want to ack these message: %s",
+                	consumeRequest.getMessageQueue().toString().c_str());
+            case SUCCESS:
+                getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize);
+                commitOffset = consumeRequest.getProcessQueue()->commit();
+                break;
+            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+                getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
+	            if (checkReconsumeTimes(msgs))
+	            {
+	                consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs);
+	                submitConsumeRequestLater(consumeRequest.getProcessQueue(),
+	                                          consumeRequest.getMessageQueue(),
+	                                          context.suspendCurrentQueueTimeMillis);
+	                continueConsume = false;
+				}
+				else
+				{
+					commitOffset = consumeRequest.getProcessQueue()->commit();
+				}
+
+                break;
+            default:
+                break;
+        }
+    }
+    else
+    {
+        switch (status)
+        {
+            case SUCCESS:
+                getConsumerStat().consumeMsgOKTotal.fetchAndAdd(msgsSize);
+                break;
+            case COMMIT:
+                commitOffset = consumeRequest.getProcessQueue()->commit();
+                break;
+            case ROLLBACK:
+                consumeRequest.getProcessQueue()->rollback();
+                submitConsumeRequestLater(consumeRequest.getProcessQueue(),
+                                          consumeRequest.getMessageQueue(),
+                                          context.suspendCurrentQueueTimeMillis);
+                continueConsume = false;
+                break;
+            case SUSPEND_CURRENT_QUEUE_A_MOMENT:
+                getConsumerStat().consumeMsgFailedTotal.fetchAndAdd(msgsSize);
+	            if (checkReconsumeTimes(msgs))
+	            {
+	                consumeRequest.getProcessQueue()->makeMessageToCosumeAgain(msgs);
+	                submitConsumeRequestLater(consumeRequest.getProcessQueue(),
+	                                          consumeRequest.getMessageQueue(),
+	                                          context.suspendCurrentQueueTimeMillis);
+	                continueConsume = false;
+                }
+                break;
+            default:
+                break;
+        }
+    }
+
+    if (commitOffset >= 0 && !consumeRequest.getProcessQueue()->isDropped())
+    {
+        m_pDefaultMQPushConsumerImpl->getOffsetStore()->updateOffset(consumeRequest.getMessageQueue(),
+                commitOffset, false);
+    }
+
+    return continueConsume;
+}
+
+bool ConsumeMessageOrderlyService::checkReconsumeTimes(std::list<MessageExt*>& msgs)
+{
+    bool suspend = false;
+
+    if (!msgs.empty())
+    {
+    	std::list<MessageExt*>::iterator it = msgs.begin();
+        for (; it != msgs.end(); it++)
+        {
+            MessageExt* msg = *it;
+            if (msg->getReconsumeTimes() >= m_pDefaultMQPushConsumer->getMaxReconsumeTimes())
+            {
+            	msg->putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(msg->getReconsumeTimes()));
+
+                if (!sendMessageBack(*msg))
+                {
+                    suspend = true;
+                    msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
+                }
+            }
+            else
+            {
+                suspend = true;
+                msg->setReconsumeTimes(msg->getReconsumeTimes() + 1);
+            }
+        }
+    }
+
+    return suspend;
+}
+
+bool ConsumeMessageOrderlyService::sendMessageBack(MessageExt& msg)
+{
+    try
+    {
+    	Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPushConsumer->getConsumerGroup()),
+                       msg.getBody(), msg.getBodyLen());
+
+		std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
+		newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
+                    : originMsgId);
+
+        newMsg.setFlag(msg.getFlag());
+        newMsg.setProperties(msg.getProperties());
+        newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
+
+        int reTimes = msg.getReconsumeTimes() + 1;
+        newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
+        newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPushConsumer->getMaxReconsumeTimes()));
+        newMsg.setDelayTimeLevel(3 + reTimes);
+
+        m_pDefaultMQPushConsumerImpl->getmQClientFactory()->getDefaultMQProducer()->send(newMsg);
+
+        return true;
+    }
+    catch (...)
+    {
+        RMQ_ERROR("sendMessageBack exception, group: %s, msg: %s",
+                  m_consumerGroup.c_str(), msg.toString().c_str());
+    }
+
+    return false;
+}
+
+
+MessageQueueLock& ConsumeMessageOrderlyService::getMessageQueueLock()
+{
+    return m_messageQueueLock;
+}
+
+DefaultMQPushConsumer* ConsumeMessageOrderlyService::getDefaultMQPushConsumer()
+{
+    return m_pDefaultMQPushConsumer;
+}
+
+ConsumeOrderlyRequest::ConsumeOrderlyRequest(ProcessQueue* pProcessQueue,
+        const MessageQueue& messageQueue,
+        ConsumeMessageOrderlyService* pService)
+{
+	m_pProcessQueue = pProcessQueue;
+	m_messageQueue = messageQueue;
+	m_pService = pService;
+}
+
+ConsumeOrderlyRequest::~ConsumeOrderlyRequest()
+{
+}
+
+void ConsumeOrderlyRequest::Do()
+{
+	if (m_pProcessQueue->isDropped())
+	{
+        RMQ_WARN("run, the message queue not be able to consume, because it's dropped, MQ: %s",
+            m_messageQueue.toString().c_str());
+        return;
+    }
+
+	try
+	{
+	    kpr::Mutex* objLock = m_pService->getMessageQueueLock().fetchLockObject(m_messageQueue);
+	    {
+	        kpr::ScopedLock<kpr::Mutex> lock(*objLock);
+
+	        MessageModel messageModel = m_pService->getDefaultMQPushConsumerImpl()->messageModel();
+	        if (BROADCASTING == messageModel
+	        	|| (m_pProcessQueue->isLocked() || !m_pProcessQueue->isLockExpired()))
+	        {
+	            long long beginTime = KPRUtil::GetCurrentTimeMillis();
+	            for (bool continueConsume = true; continueConsume;)
+	            {
+	                if (m_pProcessQueue->isDropped())
+	                {
+	                    RMQ_INFO("the message queue not be able to consume, because it's droped, MQ: %s",
+	                             m_messageQueue.toString().c_str());
+	                    break;
+	                }
+
+	                if (CLUSTERING == messageModel
+	                 	&& !m_pProcessQueue->isLocked())
+	                {
+	                    RMQ_WARN("the message queue not locked, so consume later, MQ: %s", m_messageQueue.toString().c_str());
+	                    m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10);
+	                    break;
+	                }
+
+	                if (CLUSTERING == messageModel
+	                 	&& m_pProcessQueue->isLockExpired())
+	                {
+	                    RMQ_WARN("the message queue lock expired, so consume later, MQ: %s", m_messageQueue.toString().c_str());
+	                    m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 10);
+	                    break;
+	                }
+
+	                long interval = long(KPRUtil::GetCurrentTimeMillis() - beginTime);
+	                if (interval > ConsumeMessageOrderlyService::s_MaxTimeConsumeContinuously)
+	                {
+	                    m_pService->submitConsumeRequestLater(m_pProcessQueue, m_messageQueue, 10);
+	                    break;
+	                }
+
+	                int consumeBatchSize =
+	                    m_pService->getDefaultMQPushConsumer()->getConsumeMessageBatchMaxSize();
+
+	                std::list<MessageExt*> msgs = m_pProcessQueue->takeMessages(consumeBatchSize);
+	                if (!msgs.empty())
+	                {
+	                    ConsumeOrderlyContext context(m_messageQueue);
+
+	                    ConsumeOrderlyStatus status = SUSPEND_CURRENT_QUEUE_A_MOMENT;
+
+	                    ConsumeMessageContext consumeMessageContext;
+	                    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
+	                    {
+	                        consumeMessageContext.consumerGroup = m_pService->getConsumerGroup();
+	                        consumeMessageContext.mq = m_messageQueue;
+	                        consumeMessageContext.msgList = msgs;
+	                        consumeMessageContext.success = false;
+	                        m_pService->getDefaultMQPushConsumerImpl()->executeHookBefore(consumeMessageContext);
+	                    }
+
+	                    long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
+	                    try
+	                    {
+	                    	kpr::ScopedLock<kpr::Mutex> lock(m_pProcessQueue->getLockConsume());
+	                    	if (m_pProcessQueue->isDropped())
+							{
+						        RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s",
+						            m_messageQueue.toString().c_str());
+						        break;
+						    }
+
+	                        status = m_pService->getMessageListener()->consumeMessage(msgs, context);
+	                    }
+	                    catch (...)
+	                    {
+	                        RMQ_WARN("consumeMessage exception, Group: {%s}, Msgs: {%u}, MQ: %s",//
+	                                 m_pService->getConsumerGroup().c_str(),
+	                                 (unsigned)msgs.size(),
+	                                 m_messageQueue.toString().c_str());
+	                    }
+
+	                    long long consumeRT = KPRUtil::GetCurrentTimeMillis() - beginTimestamp;
+
+	                    if (SUSPEND_CURRENT_QUEUE_A_MOMENT == status
+	                    	|| ROLLBACK == status)
+	                    {
+	                    	RMQ_WARN("consumeMessage Orderly return not OK, Group: {%s} Msgs: {%u} MQ: %s",//
+	                                    m_pService->getConsumerGroup().c_str(),
+	                                    (unsigned)msgs.size(),
+	                                     m_messageQueue.toString().c_str());
+	                    	//status = ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
+	                    }
+
+	                    if (m_pService->getDefaultMQPushConsumerImpl()->hasHook())
+	                    {
+	                        consumeMessageContext.success = (SUCCESS == status
+	                                                         || COMMIT == status);
+	                        m_pService->getDefaultMQPushConsumerImpl()->executeHookAfter(consumeMessageContext);
+	                    }
+
+	                    m_pService->getConsumerStat().consumeMsgRTTotal.fetchAndAdd(consumeRT);
+	                    MixAll::compareAndIncreaseOnly(m_pService->getConsumerStat()
+	                                                   .consumeMsgRTMax, consumeRT);
+
+	                    continueConsume = m_pService->processConsumeResult(msgs, status, context, *this);
+	                }
+	                else
+	                {
+	                    continueConsume = false;
+	                }
+	            }
+	        }
+	        else
+	        {
+	        	if (m_pProcessQueue->isDropped())
+				{
+			        RMQ_WARN("consumeMessage, the message queue not be able to consume, because it's dropped, MQ: %s",
+			            m_messageQueue.toString().c_str());
+			        return;
+			    }
+
+	            m_pService->tryLockLaterAndReconsume(m_messageQueue, m_pProcessQueue, 100);
+	        }
+	    }
+	}
+	catch(...)
+	{
+		RMQ_WARN("ConsumeOrderlyRequest exception");
+	}
+
+    return;
+}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
new file mode 100755
index 0000000..0f8628b
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageOrderlyService.h
@@ -0,0 +1,122 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CONSUMEMESSAGEORDERLYSERVICE_H__
+#define __CONSUMEMESSAGEORDERLYSERVICE_H__
+
+#include "ConsumeMessageService.h"
+
+#include <list>
+#include <string>
+#include "RocketMQClient.h"
+#include "ConsumerStatManage.h"
+#include "MessageQueueLock.h"
+#include "MessageListener.h"
+#include "ThreadPool.h"
+#include "TimerThread.h"
+
+namespace rmq
+{
+class DefaultMQPushConsumerImpl;
+class MessageListenerOrderly;
+class DefaultMQPushConsumer;
+class ConsumeMessageOrderlyService;
+
+class ConsumeOrderlyRequest: public kpr::ThreadPoolWork
+{
+public:
+    ConsumeOrderlyRequest(ProcessQueue *pProcessQueue,
+                          const MessageQueue &messageQueue,
+                          ConsumeMessageOrderlyService *pService);
+    ~ConsumeOrderlyRequest();
+
+    virtual void Do();
+
+    ProcessQueue *getProcessQueue()
+    {
+        return m_pProcessQueue;
+    }
+
+    MessageQueue &getMessageQueue()
+    {
+        return m_messageQueue;
+    }
+
+private:
+    ProcessQueue *m_pProcessQueue;
+    MessageQueue m_messageQueue;
+    ConsumeMessageOrderlyService *m_pService;
+};
+
+
+class ConsumeMessageOrderlyService : public ConsumeMessageService
+{
+public:
+	static const long s_MaxTimeConsumeContinuously = 60000;
+
+public:
+    ConsumeMessageOrderlyService(DefaultMQPushConsumerImpl
+                                 *pDefaultMQPushConsumerImpl,
+                                 MessageListenerOrderly *pMessageListener);
+    ~ConsumeMessageOrderlyService();
+
+    void start();
+    void shutdown();
+
+    void unlockAllMQ();
+    void lockMQPeriodically();
+    bool lockOneMQ(MessageQueue &mq);
+    void tryLockLaterAndReconsume(MessageQueue &messageQueue,
+                                  ProcessQueue *pProcessQueue,
+                                  long long delayMills);
+    bool processConsumeResult(std::list<MessageExt *> &msgs,
+                              ConsumeOrderlyStatus status,
+                              ConsumeOrderlyContext &context,
+                              ConsumeOrderlyRequest &consumeRequest);
+    bool checkReconsumeTimes(std::list<MessageExt *> &msgs);
+    bool sendMessageBack(MessageExt &msg);
+    ConsumerStat& getConsumerStat();
+
+    void submitConsumeRequestLater(ProcessQueue *pProcessQueue,
+                                   const MessageQueue &messageQueue,
+                                   long long suspendTimeMillis);
+
+    void submitConsumeRequest(std::list<MessageExt *> &msgs,
+                              ProcessQueue *pProcessQueue,
+                              const MessageQueue &messageQueue,
+                              bool dispathToConsume);
+
+    void updateCorePoolSize(int corePoolSize);
+    MessageQueueLock &getMessageQueueLock();
+    std::string &getConsumerGroup();
+    MessageListenerOrderly *getMessageListener();
+    DefaultMQPushConsumerImpl *getDefaultMQPushConsumerImpl();
+    DefaultMQPushConsumer *getDefaultMQPushConsumer();
+
+private:
+    volatile bool m_stoped;
+    DefaultMQPushConsumerImpl *m_pDefaultMQPushConsumerImpl;
+    DefaultMQPushConsumer *m_pDefaultMQPushConsumer;
+    MessageListenerOrderly *m_pMessageListener;
+    std::string m_consumerGroup;
+    MessageQueueLock m_messageQueueLock;
+
+    kpr::ThreadPoolPtr m_pConsumeExecutor;
+    kpr::TimerThreadPtr m_scheduledExecutorService;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h b/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
new file mode 100755
index 0000000..57a9bee
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumeMessageService.h
@@ -0,0 +1,41 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __CONSUMEMESSAGESERVICE_H__
+#define __CONSUMEMESSAGESERVICE_H__
+
+#include <list>
+
+namespace rmq
+{
+	class MessageExt;
+	class ProcessQueue;
+	class MessageQueue;
+
+	class ConsumeMessageService
+	{
+	public:
+	    virtual ~ConsumeMessageService() {}
+	    virtual void start() = 0;
+	    virtual void shutdown() = 0;
+	    virtual void updateCorePoolSize(int corePoolSize) = 0;
+	    virtual void submitConsumeRequest(std::list<MessageExt*>& msgs,
+	                                      ProcessQueue* pProcessQueue,
+	                                      const MessageQueue& messageQueue,
+	                                      bool dispathToConsume) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumeType.cpp b/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
new file mode 100755
index 0000000..6ef5837
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumeType.cpp
@@ -0,0 +1,70 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "ConsumeType.h"
+
+namespace rmq
+{
+
+const char* getConsumeTypeString(ConsumeType type)
+{
+    switch (type)
+    {
+        case CONSUME_ACTIVELY:
+            return "CONSUME_ACTIVELY";
+        case CONSUME_PASSIVELY:
+            return "CONSUME_PASSIVELY";
+    }
+
+    return "UnknowConsumeType";
+}
+
+const char* getConsumeFromWhereString(ConsumeFromWhere type)
+{
+    switch (type)
+    {
+        case CONSUME_FROM_LAST_OFFSET:
+            return "CONSUME_FROM_LAST_OFFSET";
+        case CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST:
+            return "CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST";
+        case CONSUME_FROM_MAX_OFFSET:
+            return "CONSUME_FROM_MAX_OFFSET";
+        case CONSUME_FROM_MIN_OFFSET:
+            return "CONSUME_FROM_MIN_OFFSET";
+        case CONSUME_FROM_FIRST_OFFSET:
+            return "CONSUME_FROM_FIRST_OFFSET";
+        case CONSUME_FROM_TIMESTAMP:
+            return "CONSUME_FROM_TIMESTAMP";
+    }
+
+    return "UnknowConsumeFromWhere";
+}
+
+const char* getMessageModelString(MessageModel type)
+{
+    switch (type)
+    {
+        case CLUSTERING:
+            return "CLUSTERING";
+        case BROADCASTING:
+            return "BROADCASTING";
+    }
+
+    return "UnknowMessageModel";
+}
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
new file mode 100755
index 0000000..c9dc304
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.cpp
@@ -0,0 +1,96 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "ConsumerInvokeCallback.h"
+#include "ResponseFuture.h"
+#include "PullResult.h"
+#include "MQClientAPIImpl.h"
+#include "PullCallback.h"
+#include "MQClientException.h"
+#include "RemotingCommand.h"
+
+namespace rmq
+{
+
+ConsumerInvokeCallback::ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl)
+    : m_pPullCallback(pPullCallback),
+      m_pMQClientAPIImpl(pMQClientAPIImpl)
+{
+}
+
+ConsumerInvokeCallback::~ConsumerInvokeCallback()
+{
+    if (m_pPullCallback != NULL)
+    {
+        delete m_pPullCallback;
+        m_pPullCallback = NULL;
+    }
+}
+
+void ConsumerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture)
+{
+    if (m_pPullCallback == NULL)
+    {
+        delete this;
+        return;
+    }
+
+    RemotingCommand* response = pResponseFuture->getResponseCommand();
+    if (response != NULL)
+    {
+        try
+        {
+            PullResult* pullResult = m_pMQClientAPIImpl->processPullResponse(response);
+            response->setBody(NULL, 0, false);
+
+            m_pPullCallback->onSuccess(*pullResult);
+
+            pullResult->msgFoundList.clear();
+            delete pullResult;
+        }
+        catch (MQException& e)
+        {
+            m_pPullCallback->onException(e);
+        }
+
+        delete response;
+    }
+    else
+    {
+        if (!pResponseFuture->isSendRequestOK())
+        {
+            std::string msg = "send request failed";
+            MQClientException e(msg, -1, __FILE__, __LINE__);
+            m_pPullCallback->onException(e);
+        }
+        else if (pResponseFuture->isTimeout())
+        {
+            std::string msg = "wait response timeout";
+            MQClientException e(msg, -1, __FILE__, __LINE__);
+            m_pPullCallback->onException(e);
+        }
+        else
+        {
+            std::string msg = "unknow reseaon";
+            MQClientException e(msg, -1, __FILE__, __LINE__);
+            m_pPullCallback->onException(e);
+        }
+    }
+
+    delete this;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
new file mode 100755
index 0000000..675f2fd
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumerInvokeCallback.h
@@ -0,0 +1,40 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __CONSUMER_INVOKECALLBACK_H__
+#define __CONSUMER_INVOKECALLBACK_H__
+
+#include "InvokeCallback.h"
+
+namespace rmq
+{
+	class PullCallback;
+	class MQClientAPIImpl;
+
+	class ConsumerInvokeCallback : public InvokeCallback
+	{
+	public:
+	    ConsumerInvokeCallback(PullCallback* pPullCallback, MQClientAPIImpl* pMQClientAPIImpl);
+	    virtual ~ConsumerInvokeCallback();
+	    virtual void operationComplete(ResponseFuturePtr pResponseFuture);
+
+	private:
+	    PullCallback* m_pPullCallback;
+	    MQClientAPIImpl* m_pMQClientAPIImpl;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h b/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
new file mode 100755
index 0000000..92cf74c
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/ConsumerStatManage.h
@@ -0,0 +1,132 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __CONSUMERSTAT_H__
+#define __CONSUMERSTAT_H__
+
+#include <list>
+#include <string>
+
+#include "AtomicValue.h"
+#include "KPRUtil.h"
+#include "Mutex.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+    struct ConsumerStat
+    {
+        long long createTimestamp;
+        kpr::AtomicLong consumeMsgRTMax;
+        kpr::AtomicLong consumeMsgRTTotal;
+        kpr::AtomicLong consumeMsgOKTotal;
+        kpr::AtomicLong consumeMsgFailedTotal;
+        kpr::AtomicLong pullRTTotal;
+        kpr::AtomicLong pullTimesTotal;
+
+		ConsumerStat()
+		{
+			createTimestamp = KPRUtil::GetCurrentTimeMillis();
+			consumeMsgRTMax = 0;
+			consumeMsgRTTotal = 0;
+			consumeMsgOKTotal = 0;
+			consumeMsgFailedTotal = 0;
+			pullRTTotal = 0;
+			pullTimesTotal = 0;
+		}
+    };
+
+
+    class ConsumerStatManager
+    {
+    public:
+        ConsumerStat& getConsumertat()
+        {
+            return m_consumertat;
+        }
+
+        std::list<ConsumerStat>& getSnapshotList()
+        {
+            return m_snapshotList;
+        }
+
+        /**
+        * every 1s
+        */
+        void recordSnapshotPeriodically()
+        {
+            kpr::ScopedWLock<kpr::RWMutex> lock(m_snapshotListLock);
+            m_snapshotList.push_back(m_consumertat);
+            if (m_snapshotList.size() > 60)
+            {
+                m_snapshotList.pop_front();
+            }
+        }
+
+        /**
+        * every 1m
+        */
+        void logStatsPeriodically(std::string& group, std::string& clientId)
+        {
+            kpr::ScopedRLock<kpr::RWMutex> lock(m_snapshotListLock);
+            if (m_snapshotList.size() >= 60)
+            {
+                ConsumerStat& first = m_snapshotList.front();
+                ConsumerStat& last = m_snapshotList.back();
+
+                {
+                    double avgRT = (last.consumeMsgRTTotal.get() - first.consumeMsgRTTotal.get())
+                                   /
+                                   (double)((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get())
+                                            - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get()));
+
+                    double tps = ((last.consumeMsgOKTotal.get() + last.consumeMsgFailedTotal.get())
+                                  - (first.consumeMsgOKTotal.get() + first.consumeMsgFailedTotal.get()))
+                                 / (double)(last.createTimestamp - first.createTimestamp);
+
+                    tps *= 1000;
+
+                    RMQ_INFO(
+                        "Consumer, {%s} {%s}, ConsumeAvgRT: {%f} ConsumeMaxRT: {%lld} TotalOKMsg: {%lld} TotalFailedMsg: {%lld} consumeTPS: {%f}",
+                        group.c_str(),
+                        clientId.c_str(),
+                        avgRT,
+                        last.consumeMsgRTMax.get(),
+                        last.consumeMsgOKTotal.get(),
+                        last.consumeMsgFailedTotal.get(),
+                        tps);
+                }
+
+                {
+                    double avgRT = (last.pullRTTotal.get() - first.pullRTTotal.get())
+                                   / (double)(last.pullTimesTotal.get() - first.pullTimesTotal.get());
+
+                    RMQ_INFO("Consumer, {%s} {%s}, PullAvgRT: {%f}  PullTimesTotal: {%lld}",
+                             group.c_str(),
+                             clientId.c_str(),
+                             avgRT,
+                             last.pullTimesTotal.get());
+                }
+            }
+        }
+
+    private:
+        ConsumerStat m_consumertat;
+        std::list<ConsumerStat> m_snapshotList;
+        kpr::RWMutex m_snapshotListLock;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
new file mode 100755
index 0000000..67a8c8c
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumer.cpp
@@ -0,0 +1,309 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "DefaultMQPullConsumer.h"
+
+#include <list>
+#include <string>
+
+#include "MessageQueue.h"
+#include "MessageExt.h"
+#include "ClientConfig.h"
+#include "DefaultMQPullConsumerImpl.h"
+#include "MixAll.h"
+#include "AllocateMessageQueueStrategyInner.h"
+
+namespace rmq
+{
+
+DefaultMQPullConsumer::DefaultMQPullConsumer()
+    : m_consumerGroup(MixAll::DEFAULT_CONSUMER_GROUP),
+      m_brokerSuspendMaxTimeMillis(1000 * 20),
+      m_consumerTimeoutMillisWhenSuspend(1000 * 30),
+      m_consumerPullTimeoutMillis(1000 * 10),
+      m_messageModel(CLUSTERING),
+      m_pMessageQueueListener(NULL),
+      m_pOffsetStore(NULL),
+      m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),
+      m_unitMode(false),
+      m_maxReconsumeTimes(16)
+{
+    m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this);
+}
+
+DefaultMQPullConsumer::DefaultMQPullConsumer(const std::string& consumerGroup)
+    : m_consumerGroup(consumerGroup),
+      m_brokerSuspendMaxTimeMillis(1000 * 20),
+      m_consumerTimeoutMillisWhenSuspend(1000 * 30),
+      m_consumerPullTimeoutMillis(1000 * 10),
+      m_messageModel(CLUSTERING),
+      m_pMessageQueueListener(NULL),
+      m_pOffsetStore(NULL),
+      m_pAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely()),
+      m_unitMode(false),
+      m_maxReconsumeTimes(16)
+{
+    m_pDefaultMQPullConsumerImpl = new DefaultMQPullConsumerImpl(this);
+}
+
+DefaultMQPullConsumer::~DefaultMQPullConsumer()
+{
+	//memleak or coredump
+	if (m_pAllocateMessageQueueStrategy)
+		delete m_pAllocateMessageQueueStrategy;
+	if (m_pDefaultMQPullConsumerImpl)
+		delete m_pDefaultMQPullConsumerImpl;
+}
+
+//MQAdmin
+void DefaultMQPullConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
+{
+    m_pDefaultMQPullConsumerImpl->createTopic(key, newTopic, queueNum);
+}
+
+long long DefaultMQPullConsumer::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+    return m_pDefaultMQPullConsumerImpl->searchOffset(mq, timestamp);
+}
+
+long long DefaultMQPullConsumer::maxOffset(const MessageQueue& mq)
+{
+    return m_pDefaultMQPullConsumerImpl->maxOffset(mq);
+}
+
+long long DefaultMQPullConsumer::minOffset(const MessageQueue& mq)
+{
+    return m_pDefaultMQPullConsumerImpl->minOffset(mq);
+}
+
+long long DefaultMQPullConsumer::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    return m_pDefaultMQPullConsumerImpl->earliestMsgStoreTime(mq);
+}
+
+MessageExt* DefaultMQPullConsumer::viewMessage(const std::string& msgId)
+{
+    return m_pDefaultMQPullConsumerImpl->viewMessage(msgId);
+}
+
+QueryResult DefaultMQPullConsumer::queryMessage(const std::string& topic,
+        const std::string&  key,
+        int maxNum,
+        long long begin,
+        long long end)
+{
+    return m_pDefaultMQPullConsumerImpl->queryMessage(topic, key, maxNum, begin, end);
+}
+// MQadmin end
+
+AllocateMessageQueueStrategy* DefaultMQPullConsumer::getAllocateMessageQueueStrategy()
+{
+    return m_pAllocateMessageQueueStrategy;
+}
+
+void DefaultMQPullConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
+{
+    m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
+}
+
+int DefaultMQPullConsumer::getBrokerSuspendMaxTimeMillis()
+{
+    return m_brokerSuspendMaxTimeMillis;
+}
+
+void DefaultMQPullConsumer::setBrokerSuspendMaxTimeMillis(int brokerSuspendMaxTimeMillis)
+{
+    m_brokerSuspendMaxTimeMillis = brokerSuspendMaxTimeMillis;
+}
+
+std::string DefaultMQPullConsumer::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+void DefaultMQPullConsumer::setConsumerGroup(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+}
+
+int DefaultMQPullConsumer::getConsumerPullTimeoutMillis()
+{
+    return m_consumerPullTimeoutMillis;
+}
+
+void DefaultMQPullConsumer::setConsumerPullTimeoutMillis(int consumerPullTimeoutMillis)
+{
+    m_consumerPullTimeoutMillis = consumerPullTimeoutMillis;
+}
+
+int DefaultMQPullConsumer::getConsumerTimeoutMillisWhenSuspend()
+{
+    return m_consumerTimeoutMillisWhenSuspend;
+}
+
+void DefaultMQPullConsumer::setConsumerTimeoutMillisWhenSuspend(int consumerTimeoutMillisWhenSuspend)
+{
+    m_consumerTimeoutMillisWhenSuspend = consumerTimeoutMillisWhenSuspend;
+}
+
+MessageModel DefaultMQPullConsumer::getMessageModel()
+{
+    return m_messageModel;
+}
+
+void DefaultMQPullConsumer::setMessageModel(MessageModel messageModel)
+{
+    m_messageModel = messageModel;
+}
+
+MessageQueueListener* DefaultMQPullConsumer::getMessageQueueListener()
+{
+    return m_pMessageQueueListener;
+}
+
+void DefaultMQPullConsumer::setMessageQueueListener(MessageQueueListener* pMessageQueueListener)
+{
+    m_pMessageQueueListener = pMessageQueueListener;
+}
+
+std::set<std::string> DefaultMQPullConsumer::getRegisterTopics()
+{
+    return m_registerTopics;
+}
+
+void DefaultMQPullConsumer::setRegisterTopics(std::set<std::string> registerTopics)
+{
+    m_registerTopics = registerTopics;
+}
+
+//MQConsumer
+void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel)
+{
+    m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, "");
+}
+
+void DefaultMQPullConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
+{
+    m_pDefaultMQPullConsumerImpl->sendMessageBack(msg, delayLevel, brokerName);
+}
+
+
+
+std::set<MessageQueue>* DefaultMQPullConsumer::fetchSubscribeMessageQueues(const std::string& topic)
+{
+    return m_pDefaultMQPullConsumerImpl->fetchSubscribeMessageQueues(topic);
+}
+
+void DefaultMQPullConsumer::start()
+{
+    m_pDefaultMQPullConsumerImpl->start();
+}
+
+void DefaultMQPullConsumer::shutdown()
+{
+    m_pDefaultMQPullConsumerImpl->shutdown();
+}
+//MQConsumer end
+
+//MQPullConsumer
+void DefaultMQPullConsumer::registerMessageQueueListener(const std::string& topic, MessageQueueListener* pListener)
+{
+    m_registerTopics.insert(topic);
+
+    if (pListener)
+    {
+        m_pMessageQueueListener = pListener;
+    }
+}
+
+PullResult* DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)
+{
+    return m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums);
+}
+
+void DefaultMQPullConsumer::pull(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums, PullCallback* pPullCallback)
+{
+    m_pDefaultMQPullConsumerImpl->pull(mq, subExpression, offset, maxNums, pPullCallback);
+}
+
+PullResult* DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums)
+{
+    return m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums);
+}
+
+void DefaultMQPullConsumer::pullBlockIfNotFound(MessageQueue& mq,
+        const std::string& subExpression,
+        long long offset,
+        int maxNums,
+        PullCallback* pPullCallback)
+{
+    m_pDefaultMQPullConsumerImpl->pullBlockIfNotFound(mq, subExpression, offset, maxNums, pPullCallback);
+}
+
+void DefaultMQPullConsumer::updateConsumeOffset(MessageQueue& mq, long long offset)
+{
+    m_pDefaultMQPullConsumerImpl->updateConsumeOffset(mq, offset);
+}
+
+long long DefaultMQPullConsumer::fetchConsumeOffset(MessageQueue& mq, bool fromStore)
+{
+    return m_pDefaultMQPullConsumerImpl->fetchConsumeOffset(mq, fromStore);
+}
+
+std::set<MessageQueue>* DefaultMQPullConsumer::fetchMessageQueuesInBalance(const std::string& topic)
+{
+    return m_pDefaultMQPullConsumerImpl->fetchMessageQueuesInBalance(topic);
+}
+//MQPullConsumer end
+
+OffsetStore* DefaultMQPullConsumer::getOffsetStore()
+{
+    return m_pOffsetStore;
+}
+
+void DefaultMQPullConsumer::setOffsetStore(OffsetStore* offsetStore)
+{
+    m_pOffsetStore = offsetStore;
+}
+
+DefaultMQPullConsumerImpl* DefaultMQPullConsumer::getDefaultMQPullConsumerImpl()
+{
+    return m_pDefaultMQPullConsumerImpl;
+}
+
+bool DefaultMQPullConsumer::isUnitMode()
+{
+    return m_unitMode;
+}
+
+void DefaultMQPullConsumer::setUnitMode(bool isUnitMode)
+{
+    m_unitMode = isUnitMode;
+}
+
+int DefaultMQPullConsumer::getMaxReconsumeTimes()
+{
+    return m_maxReconsumeTimes;
+}
+
+void DefaultMQPullConsumer::setMaxReconsumeTimes(int maxReconsumeTimes)
+{
+    m_maxReconsumeTimes = maxReconsumeTimes;
+}
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
new file mode 100755
index 0000000..d6465e9
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.cpp
@@ -0,0 +1,630 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "DefaultMQPullConsumerImpl.h"
+
+#include <iostream>
+#include <string>
+#include <set>
+#include "DefaultMQPullConsumer.h"
+#include "DefaultMQProducer.h"
+#include "MQClientFactory.h"
+#include "MQAdminImpl.h"
+#include "RebalancePullImpl.h"
+#include "MQClientAPIImpl.h"
+#include "OffsetStore.h"
+#include "MixAll.h"
+#include "MQClientManager.h"
+#include "LocalFileOffsetStore.h"
+#include "RemoteBrokerOffsetStore.h"
+#include "PullSysFlag.h"
+#include "FilterAPI.h"
+#include "PullAPIWrapper.h"
+#include "MQClientException.h"
+#include "Validators.h"
+#include "ScopedLock.h"
+
+namespace rmq
+{
+
+DefaultMQPullConsumerImpl::DefaultMQPullConsumerImpl(DefaultMQPullConsumer* pDefaultMQPullConsumer)
+    : m_pDefaultMQPullConsumer(pDefaultMQPullConsumer),
+      m_serviceState(CREATE_JUST)
+{
+	m_pMQClientFactory = NULL;
+	m_pPullAPIWrapper = NULL;
+    m_pOffsetStore = NULL;
+    m_pRebalanceImpl = new RebalancePullImpl(this);
+}
+
+DefaultMQPullConsumerImpl::~DefaultMQPullConsumerImpl()
+{
+	if (m_pRebalanceImpl)
+		delete m_pRebalanceImpl;
+	if (m_pPullAPIWrapper)
+		delete m_pPullAPIWrapper;
+	if (m_pOffsetStore)
+		delete m_pOffsetStore;
+	//delete m_pMQClientFactory;
+}
+
+void  DefaultMQPullConsumerImpl::start()
+{
+    RMQ_INFO("DefaultMQPullConsumerImpl::start()");
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+	        {
+	        	RMQ_INFO("the consumer [{%s}] start beginning. messageModel={%s}",
+                	m_pDefaultMQPullConsumer->getConsumerGroup().c_str(),
+                	getMessageModelString(m_pDefaultMQPullConsumer->getMessageModel()));
+
+	            m_serviceState = START_FAILED;
+	            checkConfig();
+	            copySubscription();
+
+	            if (m_pDefaultMQPullConsumer->getMessageModel() == CLUSTERING)
+	            {
+	                m_pDefaultMQPullConsumer->changeInstanceNameToPID();
+	            }
+
+	            m_pMQClientFactory = MQClientManager::getInstance()->getAndCreateMQClientFactory(*m_pDefaultMQPullConsumer);
+
+	            m_pRebalanceImpl->setConsumerGroup(m_pDefaultMQPullConsumer->getConsumerGroup());
+	            m_pRebalanceImpl->setMessageModel(m_pDefaultMQPullConsumer->getMessageModel());
+	            m_pRebalanceImpl->setAllocateMessageQueueStrategy(m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy());
+	            m_pRebalanceImpl->setmQClientFactory(m_pMQClientFactory);
+
+	            m_pPullAPIWrapper = new PullAPIWrapper(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
+
+	            if (m_pDefaultMQPullConsumer->getOffsetStore() != NULL)
+	            {
+	                m_pOffsetStore = m_pDefaultMQPullConsumer->getOffsetStore();
+	            }
+	            else
+	            {
+	                switch (m_pDefaultMQPullConsumer->getMessageModel())
+	                {
+	                    case BROADCASTING:
+	                        m_pOffsetStore = new LocalFileOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
+	                        break;
+	                    case CLUSTERING:
+	                        m_pOffsetStore = new RemoteBrokerOffsetStore(m_pMQClientFactory, m_pDefaultMQPullConsumer->getConsumerGroup());
+	                        break;
+	                    default:
+	                        break;
+	                }
+	            }
+
+	            m_pOffsetStore->load();
+
+	            bool registerOK =
+	                m_pMQClientFactory->registerConsumer(m_pDefaultMQPullConsumer->getConsumerGroup(), this);
+	            if (!registerOK)
+	            {
+	                m_serviceState = CREATE_JUST;
+	                std::string str = "The consumer group[" + m_pDefaultMQPullConsumer->getConsumerGroup();
+	                str += "] has been created before, specify another name please.";
+	                THROW_MQEXCEPTION(MQClientException, str, -1);
+	            }
+
+	            m_pMQClientFactory->start();
+
+	            m_serviceState = RUNNING;
+	        }
+        	break;
+        case RUNNING:
+        case START_FAILED:
+        case SHUTDOWN_ALREADY:
+            THROW_MQEXCEPTION(MQClientException, "The PullConsumer service state not OK, maybe started once, ", -1);
+        default:
+            break;
+    }
+}
+
+
+void  DefaultMQPullConsumerImpl::shutdown()
+{
+    RMQ_DEBUG("DefaultMQPullConsumerImpl::shutdown()");
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+            break;
+        case RUNNING:
+            persistConsumerOffset();
+            m_pMQClientFactory->unregisterConsumer(m_pDefaultMQPullConsumer->getConsumerGroup());
+            m_pMQClientFactory->shutdown();
+
+            m_serviceState = SHUTDOWN_ALREADY;
+            break;
+        case SHUTDOWN_ALREADY:
+            break;
+        default:
+            break;
+    }
+}
+
+
+void DefaultMQPullConsumerImpl::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
+{
+    makeSureStateOK();
+    m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
+}
+
+long long DefaultMQPullConsumerImpl::fetchConsumeOffset(MessageQueue& mq, bool fromStore)
+{
+    makeSureStateOK();
+    return m_pOffsetStore->readOffset(mq, fromStore ? READ_FROM_STORE : MEMORY_FIRST_THEN_STORE);
+}
+
+std::set<MessageQueue>* DefaultMQPullConsumerImpl::fetchMessageQueuesInBalance(const std::string& topic)
+{
+    makeSureStateOK();
+    std::set<MessageQueue>* mqResult = new std::set<MessageQueue>;
+
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
+    std::map<MessageQueue, ProcessQueue*>& mqTable = m_pRebalanceImpl->getProcessQueueTable();
+    RMQ_FOR_EACH(mqTable, it)
+    {
+        if (it->first.getTopic() == topic)
+        {
+            mqResult->insert(it->first);
+        }
+    }
+
+    return mqResult;
+}
+
+std::vector<MessageQueue>* DefaultMQPullConsumerImpl::fetchPublishMessageQueues(const std::string&  topic)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic);
+}
+
+std::set<MessageQueue>*  DefaultMQPullConsumerImpl::fetchSubscribeMessageQueues(const std::string& topic)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->fetchSubscribeMessageQueues(topic);
+}
+
+long long  DefaultMQPullConsumerImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
+}
+
+std::string  DefaultMQPullConsumerImpl::groupName()
+{
+    return m_pDefaultMQPullConsumer->getConsumerGroup();
+}
+
+MessageModel  DefaultMQPullConsumerImpl::messageModel()
+{
+    return m_pDefaultMQPullConsumer->getMessageModel();
+}
+
+ConsumeType  DefaultMQPullConsumerImpl::consumeType()
+{
+    return CONSUME_ACTIVELY;
+}
+
+ConsumeFromWhere  DefaultMQPullConsumerImpl::consumeFromWhere()
+{
+    return CONSUME_FROM_LAST_OFFSET;
+}
+
+std::set<SubscriptionData>  DefaultMQPullConsumerImpl::subscriptions()
+{
+    //TODO
+    std::set<SubscriptionData> result;
+    return result;
+}
+
+void DefaultMQPullConsumerImpl::doRebalance()
+{
+    if (m_pRebalanceImpl != NULL)
+    {
+        m_pRebalanceImpl->doRebalance();
+    }
+}
+
+void  DefaultMQPullConsumerImpl::persistConsumerOffset()
+{
+    try
+    {
+        makeSureStateOK();
+
+        std::set<MessageQueue> mqs;
+		{
+	        kpr::ScopedRLock<kpr::RWMutex> lock(m_pRebalanceImpl->getProcessQueueTableLock());
+	        std::map<MessageQueue, ProcessQueue*> processQueueTable = m_pRebalanceImpl->getProcessQueueTable();
+	        RMQ_FOR_EACH(processQueueTable, it)
+	        {
+	            mqs.insert(it->first);
+	        }
+        }
+
+        m_pOffsetStore->persistAll(mqs);
+    }
+    catch (...)
+    {
+        RMQ_ERROR("group {%s} persistConsumerOffset exception",
+                  m_pDefaultMQPullConsumer->getConsumerGroup().c_str());
+    }
+}
+
+void  DefaultMQPullConsumerImpl::updateTopicSubscribeInfo(const std::string& topic, const std::set<MessageQueue>& info)
+{
+    std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner();
+
+    if (subTable.find(topic) != subTable.end())
+    {
+        m_pRebalanceImpl->getTopicSubscribeInfoTable().insert(std::pair<std::string, std::set<MessageQueue> >(topic, info));
+    }
+}
+
+bool  DefaultMQPullConsumerImpl::isSubscribeTopicNeedUpdate(const std::string& topic)
+{
+    std::map<std::string, SubscriptionData>& subTable = m_pRebalanceImpl->getSubscriptionInner();
+    if (subTable.find(topic) != subTable.end())
+    {
+        std::map<std::string, std::set<MessageQueue> >& mqs =
+            m_pRebalanceImpl->getTopicSubscribeInfoTable();
+        return mqs.find(topic) == mqs.end();
+    }
+
+    return false;
+}
+
+long long  DefaultMQPullConsumerImpl::maxOffset(const MessageQueue& mq)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+}
+
+long long  DefaultMQPullConsumerImpl::minOffset(const MessageQueue& mq)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
+}
+
+PullResult* DefaultMQPullConsumerImpl::pull(MessageQueue& mq,
+        const std::string& subExpression,
+        long long offset,
+        int maxNums)
+{
+    return pullSyncImpl(mq, subExpression, offset, maxNums, false);
+}
+
+void  DefaultMQPullConsumerImpl::pull(MessageQueue& mq,
+                                      const std::string& subExpression,
+                                      long long offset,
+                                      int maxNums,
+                                      PullCallback* pPullCallback)
+{
+    pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, false);
+}
+
+PullResult* DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq,
+        const std::string& subExpression,
+        long long offset,
+        int maxNums)
+{
+    return pullSyncImpl(mq, subExpression, offset, maxNums, true);
+}
+
+void  DefaultMQPullConsumerImpl::pullBlockIfNotFound(MessageQueue& mq,
+        const std::string& subExpression,
+        long long offset,
+        int maxNums,
+        PullCallback* pPullCallback)
+{
+    pullAsyncImpl(mq, subExpression, offset, maxNums, pPullCallback, true);
+}
+
+QueryResult  DefaultMQPullConsumerImpl::queryMessage(const std::string& topic,
+        const std::string&  key,
+        int maxNum,
+        long long begin,
+        long long end)
+{
+    makeSureStateOK();
+
+    QueryResult result(0, std::list<MessageExt*>());
+    return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum, begin, end);
+}
+
+long long DefaultMQPullConsumerImpl::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
+}
+
+void  DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName)
+{
+	return sendMessageBack(msg, delayLevel, brokerName, m_pDefaultMQPullConsumer->getConsumerGroup());
+}
+
+
+void DefaultMQPullConsumerImpl::sendMessageBack(MessageExt& msg, int delayLevel, const std::string& brokerName,
+	const std::string& consumerGroup)
+{
+    try
+    {
+    	std::string brokerAddr = brokerName.empty() ?
+    		socketAddress2IPPort(msg.getStoreHost()) : m_pMQClientFactory->findBrokerAddressInPublish(brokerName);
+
+        m_pMQClientFactory->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg,
+                consumerGroup.empty() ? m_pDefaultMQPullConsumer->getConsumerGroup() : consumerGroup,
+                delayLevel,
+                3000);
+    }
+    catch (...)
+    {
+        RMQ_ERROR("sendMessageBack Exception, group: %s", m_pDefaultMQPullConsumer->getConsumerGroup().c_str());
+        Message newMsg(MixAll::getRetryTopic(m_pDefaultMQPullConsumer->getConsumerGroup()),
+                       msg.getBody(), msg.getBodyLen());
+
+		std::string originMsgId = msg.getProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID);
+		newMsg.putProperty(Message::PROPERTY_ORIGIN_MESSAGE_ID, UtilAll::isBlank(originMsgId) ? msg.getMsgId()
+                    : originMsgId);
+
+        newMsg.setFlag(msg.getFlag());
+        newMsg.setProperties(msg.getProperties());
+        newMsg.putProperty(Message::PROPERTY_RETRY_TOPIC, msg.getTopic());
+
+        int reTimes = msg.getReconsumeTimes() + 1;
+        newMsg.putProperty(Message::PROPERTY_RECONSUME_TIME, UtilAll::toString(reTimes));
+        newMsg.putProperty(Message::PROPERTY_MAX_RECONSUME_TIMES, UtilAll::toString(m_pDefaultMQPullConsumer->getMaxReconsumeTimes()));
+        newMsg.setDelayTimeLevel(3 + reTimes);
+
+        m_pMQClientFactory->getDefaultMQProducer()->send(newMsg);
+    }
+}
+
+void  DefaultMQPullConsumerImpl::updateConsumeOffset(MessageQueue& mq, long long offset)
+{
+    makeSureStateOK();
+    m_pOffsetStore->updateOffset(mq, offset, false);
+}
+
+MessageExt*  DefaultMQPullConsumerImpl::viewMessage(const std::string& msgId)
+{
+    makeSureStateOK();
+
+    return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
+}
+
+DefaultMQPullConsumer*  DefaultMQPullConsumerImpl::getDefaultMQPullConsumer()
+{
+    return m_pDefaultMQPullConsumer;
+}
+
+OffsetStore*  DefaultMQPullConsumerImpl::getOffsetStore()
+{
+    return m_pOffsetStore;
+}
+
+void  DefaultMQPullConsumerImpl::setOffsetStore(OffsetStore* pOffsetStore)
+{
+    m_pOffsetStore = pOffsetStore;
+}
+
+void  DefaultMQPullConsumerImpl::makeSureStateOK()
+{
+    if (m_serviceState != RUNNING)
+    {
+        THROW_MQEXCEPTION(MQClientException, "The consumer service state not OK, ", -1);
+    }
+}
+
+PullResult* DefaultMQPullConsumerImpl::pullSyncImpl(MessageQueue& mq,
+        const std::string& subExpression,
+        long long offset,
+        int maxNums,
+        bool block)
+{
+    makeSureStateOK();
+
+    if (offset < 0)
+    {
+        THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
+    }
+
+    if (maxNums <= 0)
+    {
+        THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
+    }
+
+    subscriptionAutomatically(mq.getTopic());
+
+    int sysFlag = PullSysFlag::buildSysFlag(false, block, true);
+
+    SubscriptionDataPtr subscriptionData = NULL;
+    try
+    {
+        subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression);
+    }
+    catch (...)
+    {
+        THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1);
+    }
+
+    int timeoutMillis =
+        block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend()
+        : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis();
+
+    PullResult* pullResult = m_pPullAPIWrapper->pullKernelImpl(//
+                                 mq, // 1
+                                 subscriptionData->getSubString(), // 2
+                                 0L, // 3
+                                 offset, // 4
+                                 maxNums, // 5
+                                 sysFlag, // 6
+                                 0, // 7
+                                 m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8
+                                 timeoutMillis, // 9
+                                 SYNC, // 10
+                                 NULL// 11
+                             );
+
+    return m_pPullAPIWrapper->processPullResult(mq, *pullResult, *subscriptionData);
+}
+
+void  DefaultMQPullConsumerImpl::subscriptionAutomatically(const std::string& topic)
+{
+    std::map<std::string, SubscriptionData>& sd = m_pRebalanceImpl->getSubscriptionInner();
+    std::map<std::string, SubscriptionData>::iterator it = sd.find(topic);
+
+    if (it == sd.end())
+    {
+        try
+        {
+            SubscriptionDataPtr subscriptionData =
+                FilterAPI::buildSubscriptionData(topic, SubscriptionData::SUB_ALL);
+            sd[topic] = *subscriptionData;
+        }
+        catch (...)
+        {
+        	RMQ_WARN("FilterAPI::buildSubscriptionData exception");
+        }
+    }
+}
+
+void DefaultMQPullConsumerImpl::pullAsyncImpl(//
+    MessageQueue& mq, const std::string& subExpression, long long offset, int maxNums,
+    PullCallback* pPullCallback,//
+    bool block)
+{
+    makeSureStateOK();
+
+    if (offset < 0)
+    {
+        THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
+    }
+
+    if (maxNums <= 0)
+    {
+        THROW_MQEXCEPTION(MQClientException, "maxNums <= 0", -1);
+    }
+
+    if (pPullCallback == NULL)
+    {
+        THROW_MQEXCEPTION(MQClientException, "pullCallback is null", -1);
+    }
+
+    subscriptionAutomatically(mq.getTopic());
+    try
+    {
+        int sysFlag = PullSysFlag::buildSysFlag(false, block, true);
+
+        SubscriptionDataPtr subscriptionData = NULL;
+        try
+        {
+            subscriptionData = FilterAPI::buildSubscriptionData(mq.getTopic(), subExpression);
+        }
+        catch (...)
+        {
+            THROW_MQEXCEPTION(MQClientException, "parse subscription error", -1);
+        }
+
+        int timeoutMillis =
+            block ? m_pDefaultMQPullConsumer->getConsumerTimeoutMillisWhenSuspend()
+            : m_pDefaultMQPullConsumer->getConsumerPullTimeoutMillis();
+        DefaultMQPullConsumerImplCallback* callback =
+            new DefaultMQPullConsumerImplCallback(*subscriptionData,
+                    mq, this, pPullCallback);
+
+        m_pPullAPIWrapper->pullKernelImpl(
+			mq, // 1
+			subscriptionData->getSubString(), // 2
+			0L, // 3
+			offset, // 4
+			maxNums, // 5
+			sysFlag, // 6
+			0, // 7
+			m_pDefaultMQPullConsumer->getBrokerSuspendMaxTimeMillis(), // 8
+			timeoutMillis, // 9
+			ASYNC, // 10
+			callback// 11
+		);
+    }
+    catch (const MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, "pullAsync unknow exception", -1);
+    }
+}
+
+
+void  DefaultMQPullConsumerImpl::copySubscription()
+{
+    try
+    {
+        std::set<std::string> registerTopics = m_pDefaultMQPullConsumer->getRegisterTopics();
+        std::set<std::string>::iterator it = registerTopics.begin();
+
+        for (; it != registerTopics.end(); it++)
+        {
+            SubscriptionDataPtr subscriptionData =
+                FilterAPI::buildSubscriptionData(*it, SubscriptionData::SUB_ALL);
+            m_pRebalanceImpl->getSubscriptionInner()[*it] = *subscriptionData;
+        }
+    }
+    catch (...)
+    {
+        THROW_MQEXCEPTION(MQClientException, "subscription exception", -1);
+    }
+}
+
+
+void  DefaultMQPullConsumerImpl::checkConfig()
+{
+    // check consumerGroup
+    Validators::checkGroup(m_pDefaultMQPullConsumer->getConsumerGroup());
+
+    // consumerGroup
+    if (m_pDefaultMQPullConsumer->getConsumerGroup() == MixAll::DEFAULT_CONSUMER_GROUP)
+    {
+        THROW_MQEXCEPTION(MQClientException, "consumerGroup can not equal "
+                          + MixAll::DEFAULT_CONSUMER_GROUP //
+                          + ", please specify another one.", -1);
+    }
+
+    if (m_pDefaultMQPullConsumer->getMessageModel() != BROADCASTING
+        && m_pDefaultMQPullConsumer->getMessageModel() != CLUSTERING)
+    {
+        THROW_MQEXCEPTION(MQClientException, "messageModel is valid ", -1);
+    }
+
+    // allocateMessageQueueStrategy
+    if (m_pDefaultMQPullConsumer->getAllocateMessageQueueStrategy() == NULL)
+    {
+        THROW_MQEXCEPTION(MQClientException, "allocateMessageQueueStrategy is null", -1);
+    }
+}
+
+ServiceState DefaultMQPullConsumerImpl::getServiceState()
+{
+    return m_serviceState;
+}
+
+void DefaultMQPullConsumerImpl::setServiceState(ServiceState serviceState)
+{
+    m_serviceState = serviceState;
+}
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
new file mode 100755
index 0000000..171565c
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/DefaultMQPullConsumerImpl.h
@@ -0,0 +1,174 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __DEFAULTMQPULLCONSUMERIMPL_H__
+#define __DEFAULTMQPULLCONSUMERIMPL_H__
+
+#include <string>
+#include <set>
+#include <map>
+#include <vector>
+#include "MQConsumerInner.h"
+#include "MessageExt.h"
+#include "QueryResult.h"
+#include "ServiceState.h"
+#include "PullRequest.h"
+#include "MessageQueue.h"
+#include "PullResult.h"
+#include "PullCallback.h"
+#include "PullAPIWrapper.h"
+
+namespace rmq
+{
+class DefaultMQPullConsumer;
+class PullCallback;
+class OffsetStore;
+class RebalanceImpl;
+class MQClientFactory;
+class PullAPIWrapper;
+
+/**
+* PullConsumer imp
+*/
+class DefaultMQPullConsumerImpl : public  MQConsumerInner
+{
+public:
+    DefaultMQPullConsumerImpl(DefaultMQPullConsumer *pDefaultMQPullConsumer);
+    ~DefaultMQPullConsumerImpl();
+    void createTopic(const std::string &key, const std::string &newTopic,
+                     int queueNum);
+    long long fetchConsumeOffset(MessageQueue &mq, bool fromStore);
+    std::set<MessageQueue> *fetchMessageQueuesInBalance(const std::string &topic);
+    std::vector<MessageQueue> *fetchPublishMessageQueues(const std::string  &topic);
+    std::set<MessageQueue> *fetchSubscribeMessageQueues(const std::string &topic);
+    long long earliestMsgStoreTime(const MessageQueue &mq);
+    std::string groupName();
+    MessageModel messageModel();
+    ConsumeType consumeType();
+    ConsumeFromWhere consumeFromWhere();
+    std::set<SubscriptionData> subscriptions();
+    void doRebalance();
+    void persistConsumerOffset();
+    void updateTopicSubscribeInfo(const std::string &topic,
+                                  const std::set<MessageQueue> &info);
+    bool isSubscribeTopicNeedUpdate(const std::string &topic);
+    long long maxOffset(const MessageQueue &mq);
+    long long minOffset(const MessageQueue &mq);
+
+    PullResult *pull(MessageQueue &mq,
+                     const std::string &subExpression,
+                     long long offset,
+                     int maxNums);
+
+    void pull(MessageQueue &mq,
+              const std::string &subExpression,
+              long long offset,
+              int maxNums,
+              PullCallback *pPullCallback);
+
+    PullResult *pullBlockIfNotFound(MessageQueue &mq,
+                                    const std::string &subExpression,
+                                    long long offset, int maxNums);
+
+    void pullBlockIfNotFound(MessageQueue &mq,
+                             const std::string &subExpression,
+                             long long offset, int maxNums,
+                             PullCallback *pPullCallback);
+
+    QueryResult queryMessage(const std::string &topic,
+                             const std::string  &key,
+                             int maxNum,
+                             long long begin,
+                             long long end);
+
+    long long searchOffset(const MessageQueue &mq, long long timestamp);
+    void sendMessageBack(MessageExt &msg, int delayLevel,
+                         const std::string &brokerName);
+    void sendMessageBack(MessageExt &msg, int delayLevel,
+                         const std::string &brokerName, const std::string &consumerGroup);
+    void shutdown();
+    void updateConsumeOffset(MessageQueue &mq, long long offset);
+    MessageExt *viewMessage(const std::string &msgId);
+    DefaultMQPullConsumer *getDefaultMQPullConsumer();
+    OffsetStore *getOffsetStore();
+    void setOffsetStore(OffsetStore *pOffsetStore);
+    void start();
+
+    ServiceState getServiceState();
+    void setServiceState(ServiceState serviceState);
+
+private:
+    void makeSureStateOK();
+    void subscriptionAutomatically(const std::string &topic);
+    void copySubscription();
+    void checkConfig();
+
+    PullResult *pullSyncImpl(MessageQueue &mq,
+                             const std::string &subExpression,
+                             long long offset,
+                             int maxNums,
+                             bool block) ;
+    void pullAsyncImpl(MessageQueue &mq,
+                       const std::string &subExpression,
+                       long long offset,
+                       int maxNums,
+                       PullCallback *pPullCallback,
+                       bool block);
+
+private:
+    DefaultMQPullConsumer *m_pDefaultMQPullConsumer;
+    ServiceState m_serviceState;
+    MQClientFactory *m_pMQClientFactory;
+    PullAPIWrapper *m_pPullAPIWrapper;
+    OffsetStore *m_pOffsetStore;
+    RebalanceImpl *m_pRebalanceImpl;
+    friend class DefaultMQPullConsumerImplCallback;
+};
+
+class DefaultMQPullConsumerImplCallback : public PullCallback
+{
+public:
+    DefaultMQPullConsumerImplCallback(SubscriptionData &subscriptionData,
+                                      MessageQueue &mq,
+                                      DefaultMQPullConsumerImpl *pDefaultMQPullConsumerImpl,
+                                      PullCallback *pCallback)
+        : m_subscriptionData(subscriptionData),
+          m_mq(mq),
+          m_pDefaultMQPullConsumerImpl(pDefaultMQPullConsumerImpl),
+          m_pCallback(pCallback)
+    {
+    }
+
+    void onSuccess(PullResult &pullResult)
+    {
+        m_pCallback->onSuccess(
+            *m_pDefaultMQPullConsumerImpl->m_pPullAPIWrapper->
+            processPullResult(m_mq, pullResult, m_subscriptionData));
+    }
+
+    void onException(MQException &e)
+    {
+        m_pCallback->onException(e);
+    }
+
+private:
+    SubscriptionData m_subscriptionData;
+    MessageQueue m_mq;
+    DefaultMQPullConsumerImpl *m_pDefaultMQPullConsumerImpl;
+    PullCallback *m_pCallback;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
new file mode 100755
index 0000000..45ee907
--- /dev/null
+++ b/rocketmq-client4cpp/src/consumer/DefaultMQPushConsumer.cpp
@@ -0,0 +1,399 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "DefaultMQPushConsumer.h"
+#include <list>
+#include <string>
+
+#include "DefaultMQPushConsumerImpl.h"
+#include "MessageQueue.h"
+#include "MessageExt.h"
+#include "ClientConfig.h"
+#include "ConsumerStatManage.h"
+#include "MixAll.h"
+#include "AllocateMessageQueueStrategyInner.h"
+
+namespace rmq
+{
+
+class AllocateMessageQueueStrategy;
+
+DefaultMQPushConsumer::DefaultMQPushConsumer()
+{
+    m_consumerGroup = MixAll::DEFAULT_CONSUMER_GROUP;
+    m_messageModel = CLUSTERING;
+    m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET;
+    m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
+    m_pMessageListener = NULL;
+    m_consumeThreadMin = 5;
+    m_consumeThreadMax = 25;
+    m_consumeConcurrentlyMaxSpan = 2000;
+    m_pullThresholdForQueue = 1000;
+    m_pullInterval = 0;
+    m_consumeMessageBatchMaxSize = 1;
+    m_pullBatchSize = 32;
+    m_postSubscriptionWhenPull = false;
+    m_unitMode = false;
+    m_maxReconsumeTimes = 16;
+    m_suspendCurrentQueueTimeMillis = 1000;
+    m_consumeTimeout = 15;
+    m_pOffsetStore = NULL;
+    m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this);
+}
+
+DefaultMQPushConsumer::DefaultMQPushConsumer(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+    m_messageModel = CLUSTERING;
+    m_consumeFromWhere = CONSUME_FROM_LAST_OFFSET;
+    m_pAllocateMessageQueueStrategy = new AllocateMessageQueueAveragely();
+    m_pMessageListener = NULL;
+    m_consumeThreadMin = 5;
+    m_consumeThreadMax = 25;
+    m_consumeConcurrentlyMaxSpan = 2000;
+    m_pullThresholdForQueue = 1000;
+    m_pullInterval = 0;
+    m_consumeMessageBatchMaxSize = 1;
+    m_pullBatchSize = 32;
+    m_postSubscriptionWhenPull = false;
+    m_unitMode = false;
+    m_maxReconsumeTimes = 16;
+    m_suspendCurrentQueueTimeMillis = 1000;
+    m_consumeTimeout = 15;
+    m_pOffsetStore = NULL;
+    m_pDefaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this);
+}
+
+DefaultMQPushConsumer::~DefaultMQPushConsumer()
+{
+    delete m_pAllocateMessageQueueStrategy;
+    delete m_pDefaultMQPushConsumerImpl;
+}
+
+//MQAdmin
+void DefaultMQPushConsumer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
+{
+    m_pDefaultMQPushConsumerImpl->createTopic(key, newTopic, queueNum);
+}
+
+long long DefaultMQPushConsumer::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+    return m_pDefaultMQPushConsumerImpl->searchOffset(mq, timestamp);
+}
+
+long long DefaultMQPushConsumer::maxOffset(const MessageQueue& mq)
+{
+    return m_pDefaultMQPushConsumerImpl->maxOffset(mq);
+}
+
+long long DefaultMQPushConsumer::minOffset(const MessageQueue& mq)
+{
+    return m_pDefaultMQPushConsumerImpl->minOffset(mq);
+}
+
+long long DefaultMQPushConsumer::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    return m_pDefaultMQPushConsumerImpl->earliestMsgStoreTime(mq);
+}
+
+MessageExt* DefaultMQPushConsumer::viewMessage(const std::string& msgId)
+{
+    return m_pDefaultMQPushConsumerImpl->viewMessage(msgId);
+}
+
+QueryResult DefaultMQPushConsumer::queryMessage(const std::string& topic,
+        const std::string&  key,
+        int maxNum,
+        long long begin,
+        long long end)
+{
+    return m_pDefaultMQPushConsumerImpl->queryMessage(topic, key, maxNum, begin, end);
+}
+// MQadmin end
+
+AllocateMessageQueueStrategy* DefaultMQPushConsumer::getAllocateMessageQueueStrategy()
+{
+    return m_pAllocateMessageQueueStrategy;
+}
+
+void DefaultMQPushConsumer::setAllocateMessageQueueStrategy(AllocateMessageQueueStrategy* pAllocateMessageQueueStrategy)
+{
+    m_pAllocateMessageQueueStrategy = pAllocateMessageQueueStrategy;
+}
+
+int DefaultMQPushConsumer::getConsumeConcurrentlyMaxSpan()
+{
+    return m_consumeConcurrentlyMaxSpan;
+}
+
+void DefaultMQPushConsumer::setConsumeConcurrentlyMaxSpan(int consumeConcurrentlyMaxSpan)
+{
+    m_consumeConcurrentlyMaxSpan = consumeConcurrentlyMaxSpan;
+}
+
+ConsumeFromWhere DefaultMQPushConsumer::getConsumeFromWhere()
+{
+    return m_consumeFromWhere;
+}
+
+void DefaultMQPushConsumer::setConsumeFromWhere(ConsumeFromWhere consumeFromWhere)
+{
+    m_consumeFromWhere = consumeFromWhere;
+}
+
+int DefaultMQPushConsumer::getConsumeMessageBatchMaxSize()
+{
+    return m_consumeMessageBatchMaxSize;
+}
+
+void DefaultMQPushConsumer::setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize)
+{
+    m_consumeMessageBatchMaxSize = consumeMessageBatchMaxSize;
+}
+
+std::string DefaultMQPushConsumer::getConsumerGroup()
+{
+    return m_consumerGroup;
+}
+
+void DefaultMQPushConsumer::setConsumerGroup(const std::string& consumerGroup)
+{
+    m_consumerGroup = consumerGroup;
+}
+
+int DefaultMQPushConsumer::getConsumeThreadMax()
+{
+    return m_consumeThreadMax;
+}
+
+void DefaultMQPushConsumer::setConsumeThreadMax(int consumeThreadMax)
+{
+    m_consumeThreadMax = consumeThreadMax;
+}
+
+int DefaultMQPushConsumer::getConsumeThreadMin()
+{
+    return m_consumeThreadMin;
+}
+
+void DefaultMQPushConsumer::setConsumeThreadMin(int consumeThreadMin)
+{
+    m_consumeThreadMin = consumeThreadMin;
+}
+
+DefaultMQPushConsumerImpl* DefaultMQPushConsumer::getDefaultMQPushConsumerImpl()
+{
+    return m_pDefaultMQPushConsumerImpl;
+}
+
+MessageListener* DefaultMQPushConsumer::getMessageListener()
+{
+    return m_pMessageListener;
+}
+
+void DefaultMQPushConsumer::setMessageListener(MessageListener* pMessageListener)
+{
+    m_pMessageListener = pMessageListener;
+}
+
+MessageModel DefaultMQPushConsumer::getMessageModel()
+{
+    return m_messageModel;
+}
+
+void DefaultMQPushConsumer::setMessageModel(MessageModel messageModel)
+{
+    m_messageModel = messageModel;
+}
+
+int DefaultMQPushConsumer::getPullBatchSize()
+{
+    return m_pullBatchSize;
+}
+
+void DefaultMQPushConsumer::setPullBatchSize(int pullBatchSize)
+{
+    m_pullBatchSize = pullBatchSize;
+}
+
+long DefaultMQPushConsumer::getPullInterval()
+{
+    return m_pullInterval;
+}
+
+void DefaultMQPushConsumer::setPullInterval(long pullInterval)
+{
+    m_pullInterval = pullInterval;
+}
+
+int DefaultMQPushConsumer::getPullThresholdForQueue()
+{
+    return m_pullThresholdForQueue;
+}
+
+void DefaultMQPushConsumer::setPullThresholdForQueue(int pullThresholdForQueue)
+{
+    m_pullThresholdForQueue = pullThresholdForQueue;
+}
+
+std::map<std::string, std::string>& DefaultMQPushConsumer::getSubscription()
+{
+    return m_subscription;
+}
+
+void DefaultMQPushConsumer::setSubscription(const std::map<std::string, std::string>& subscription)
+{
+    m_subscription = subscription;
+}
+
+//MQConsumer
+void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel)
+{
+    m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, "");
+}
+
+void DefaultMQPushConsumer::sendMessageBack(MessageExt& msg, int delayLevel, const std::string brokerName)
+{
+    m_pDefaultMQPushConsumerImpl->sendMessageBack(msg, delayLevel, brokerName);
+}
+
+
+std::set<MessageQueue>* DefaultMQPushConsumer::fetchSubscribeMessageQueues(const std::string& topic)
+{
+    return m_pDefaultMQPushConsumerImpl->fetchSubscribeMessageQueues(topic);
+}
+
+void DefaultMQPushConsumer::start()
+{
+    m_pDefaultMQPushConsumerImpl->start();
+}
+
+void DefaultMQPushConsumer::shutdown()
+{
+    m_pDefaultMQPushConsumerImpl->shutdown();
+}
+//MQConsumer end
+
+//MQPushConsumer
+void DefaultMQPushConsumer::registerMessageListener(MessageListener* pMessageListener)
+{
+    m_pMessageListener = pMessageListener;
+    m_pDefaultMQPushConsumerImpl->registerMessageListener(pMessageListener);
+}
+
+void DefaultMQPushConsumer::subscribe(const std::string& topic, const std::string& subExpression)
+{
+    m_pDefaultMQPushConsumerImpl->subscribe(topic, subExpression);
+}
+
+void DefaultMQPushConsumer::unsubscribe(const std::string& topic)
+{
+    m_pDefaultMQPushConsumerImpl->unsubscribe(topic);
+}
+
+void DefaultMQPushConsumer::updateCorePoolSize(int corePoolSize)
+{
+    m_pDefaultMQPushConsumerImpl->updateCorePoolSize(corePoolSize);
+}
+
+void DefaultMQPushConsumer::suspend()
+{
+    m_pDefaultMQPushConsumerImpl->suspend();
+}
+
+void DefaultMQPushConsumer::resume()
+{
+    m_pDefaultMQPushConsumerImpl->resume();
+}
+//MQPushConsumer end
+
+OffsetStore* DefaultMQPushConsumer::getOffsetStore()
+{
+    return m_pOffsetStore;
+}
+
+void DefaultMQPushConsumer::setOffsetStore(OffsetStore* pOffsetStore)
+{
+    m_pOffsetStore = pOffsetStore;
+}
+
+std::string DefaultMQPushConsumer::getConsumeTimestamp() {
+    return m_consumeTimestamp;
+}
+
+void DefaultMQPushConsumer::setConsumeTimestamp(std::string consumeTimestamp) {
+    m_consumeTimestamp = consumeTimestamp;
+}
+
+bool DefaultMQPushConsumer::isPostSubscriptionWhenPull()
+{
+    return m_postSubscriptionWhenPull;
+}
+
+
+void DefaultMQPushConsumer::setPostSubscriptionWhenPull(bool postSubscriptionWhenPull)
+{
+    m_postSubscriptionWhenPull = postSubscriptionWhenPull;
+}
+
+
+bool DefaultMQPushConsumer::isUnitMode()
+{
+    return m_unitMode;
+}
+
+
+void DefaultMQPushConsumer::setUnitMode(bool isUnitMode)
+{
+    m_unitMode = isUnitMode;
+}
+
+int DefaultMQPushConsumer::getMaxReconsumeTimes()
+{
+    return m_maxReconsumeTimes;
+}
+
+
+void DefaultMQPushConsumer::setMaxReconsumeTimes(int maxReconsumeTimes)
+{
+    m_maxReconsumeTimes = maxReconsumeTimes;
+}
+
+
+int DefaultMQPushConsumer::getSuspendCurrentQueueTimeMillis()
+{
+    return m_suspendCurrentQueueTimeMillis;
+}
+
+
+void DefaultMQPushConsumer::setSuspendCurrentQueueTimeMillis(int suspendCurrentQueueTimeMillis)
+{
+    m_suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
+}
+
+
+int DefaultMQPushConsumer::getConsumeTimeout()
+{
+    return m_consumeTimeout;
+}
+
+void DefaultMQPushConsumer::setConsumeTimeout(int consumeTimeout)
+{
+    m_consumeTimeout = consumeTimeout;
+}
+
+
+}


Mime
View raw message