rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dongefore...@apache.org
Subject [04/17] incubator-rocketmq-externals git commit: [ROCKETMQ-129] Initialized the rocketmq c++ client closes apache/incubator-rocketmq-externals#11
Date Fri, 21 Apr 2017 10:09:44 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageQueue.cpp b/rocketmq-client4cpp/src/message/MessageQueue.cpp
new file mode 100755
index 0000000..d632550
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/MessageQueue.cpp
@@ -0,0 +1,153 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "MessageQueue.h"
+
+#include <string>
+#include <sstream>
+#include <UtilAll.h>
+
+namespace rmq
+{
+
+MessageQueue::MessageQueue()
+	: m_queueId(0)
+{
+}
+
+MessageQueue::MessageQueue(const std::string& topic, const std::string& brokerName, int queueId)
+    : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId)
+{
+
+}
+
+std::string MessageQueue::getTopic()const
+{
+    return m_topic;
+}
+
+void MessageQueue::setTopic(const std::string& topic)
+{
+    m_topic = topic;
+}
+
+std::string MessageQueue::getBrokerName()const
+{
+    return m_brokerName;
+}
+
+void MessageQueue::setBrokerName(const std::string& brokerName)
+{
+    m_brokerName = brokerName;
+}
+
+int MessageQueue::getQueueId()const
+{
+    return m_queueId;
+}
+
+void MessageQueue::setQueueId(int queueId)
+{
+    m_queueId = queueId;
+}
+
+int MessageQueue::hashCode()
+{
+    /*
+    final int prime = 31;
+    int result = 1;
+    result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+    result = prime * result + queueId;
+    result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+    return result;
+    */
+    std::stringstream ss;
+    ss << m_topic << m_brokerName << m_queueId;
+    return UtilAll::hashCode(ss.str());
+}
+
+std::string MessageQueue::toString() const
+{
+    std::stringstream ss;
+    ss << "{topic=" << m_topic
+       << ",brokerName=" << m_brokerName
+       << ",queueId=" << m_queueId << "}";
+    return ss.str();
+}
+
+
+std::string MessageQueue::toJsonString() const
+{
+    std::stringstream ss;
+    ss << "{\"topic\":\"" << m_topic
+       << "\",\"brokerName\":\"" << m_brokerName
+       << "\",\"queueId\":" << m_queueId << "}";
+    return ss.str();
+}
+
+
+bool MessageQueue::operator==(const MessageQueue& mq)const
+{
+    if (this == &mq)
+    {
+        return true;
+    }
+
+    if (m_brokerName != mq.m_brokerName)
+    {
+        return false;
+    }
+
+    if (m_queueId != mq.m_queueId)
+    {
+        return false;
+    }
+
+    if (m_topic != mq.m_topic)
+    {
+        return false;
+    }
+
+    return true;
+}
+
+int MessageQueue::compareTo(const MessageQueue& mq)const
+{
+    {
+        int result = strcmp(m_topic.c_str(), mq.m_topic.c_str());
+        if (result != 0)
+        {
+            return result;
+        }
+    }
+
+    {
+        int result = strcmp(m_brokerName.c_str(), mq.m_brokerName.c_str());
+        if (result != 0)
+        {
+            return result;
+        }
+    }
+
+    return m_queueId - mq.m_queueId;
+}
+
+bool MessageQueue::operator<(const MessageQueue& mq)const
+{
+    return compareTo(mq) < 0;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
new file mode 100755
index 0000000..dcad654
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
@@ -0,0 +1,277 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License")
+{
+}
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "DefaultMQProducer.h"
+
+#include <assert.h>
+#include "MessageExt.h"
+#include "QueryResult.h"
+#include "DefaultMQProducerImpl.h"
+#include "MixAll.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+
+DefaultMQProducer::DefaultMQProducer()
+    : m_producerGroup(MixAll::DEFAULT_PRODUCER_GROUP),
+      m_createTopicKey(MixAll::DEFAULT_TOPIC),
+  	  m_defaultTopicQueueNums(4),
+      m_sendMsgTimeout(3000),
+      m_compressMsgBodyOverHowmuch(1024 * 4),
+	  m_retryTimesWhenSendFailed(2),
+      m_retryAnotherBrokerWhenNotStoreOK(false),
+      m_maxMessageSize(1024 * 128),
+      m_compressLevel(5)
+{
+    m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this);
+}
+
+DefaultMQProducer::DefaultMQProducer(const std::string& producerGroup)
+    : m_producerGroup(producerGroup),
+      m_createTopicKey(MixAll::DEFAULT_TOPIC),
+  	  m_defaultTopicQueueNums(4),
+      m_sendMsgTimeout(3000),
+      m_compressMsgBodyOverHowmuch(1024 * 4),
+      m_retryTimesWhenSendFailed(2),
+      m_retryAnotherBrokerWhenNotStoreOK(false),
+      m_maxMessageSize(1024 * 128),
+      m_compressLevel(5)
+{
+    m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this);
+}
+
+DefaultMQProducer::~DefaultMQProducer()
+{
+    // memleak: maybe core
+    delete m_pDefaultMQProducerImpl;
+}
+
+
+void DefaultMQProducer::start()
+{
+    m_pDefaultMQProducerImpl->start();
+}
+
+void DefaultMQProducer::shutdown()
+{
+    m_pDefaultMQProducerImpl->shutdown();
+}
+
+std::vector<MessageQueue>* DefaultMQProducer::fetchPublishMessageQueues(const std::string& topic)
+{
+    return m_pDefaultMQProducerImpl->fetchPublishMessageQueues(topic);
+}
+
+SendResult DefaultMQProducer::send(Message& msg)
+{
+    return m_pDefaultMQProducerImpl->send(msg);
+}
+
+void DefaultMQProducer::send(Message& msg, SendCallback* pSendCallback)
+{
+    m_pDefaultMQProducerImpl->send(msg, pSendCallback);
+}
+
+void DefaultMQProducer::sendOneway(Message& msg)
+{
+    m_pDefaultMQProducerImpl->sendOneway(msg);
+}
+
+SendResult DefaultMQProducer::send(Message& msg, MessageQueue& mq)
+{
+    return m_pDefaultMQProducerImpl->send(msg, mq);
+}
+
+void DefaultMQProducer::send(Message& msg, MessageQueue& mq, SendCallback* pSendCallback)
+{
+    m_pDefaultMQProducerImpl->send(msg, mq, pSendCallback);
+}
+
+void DefaultMQProducer::sendOneway(Message& msg, MessageQueue& mq)
+{
+    m_pDefaultMQProducerImpl->sendOneway(msg, mq);
+}
+
+SendResult DefaultMQProducer::send(Message& msg, MessageQueueSelector* pSelector, void* arg)
+{
+    return m_pDefaultMQProducerImpl->send(msg, pSelector, arg);
+}
+
+void DefaultMQProducer::send(Message& msg,
+                             MessageQueueSelector* pSelector,
+                             void* arg,
+                             SendCallback* pSendCallback)
+{
+    m_pDefaultMQProducerImpl->send(msg, pSelector, arg, pSendCallback);
+}
+
+void DefaultMQProducer::sendOneway(Message& msg, MessageQueueSelector* pSelector, void* arg)
+{
+    m_pDefaultMQProducerImpl->sendOneway(msg, pSelector, arg);
+}
+
+TransactionSendResult DefaultMQProducer::sendMessageInTransaction(Message& msg,
+        LocalTransactionExecuter* tranExecuter, void* arg)
+{
+    THROW_MQEXCEPTION(MQClientException,
+                      "sendMessageInTransaction not implement, please use TransactionMQProducer class", -1);
+    TransactionSendResult result;
+
+    return result;
+}
+
+void DefaultMQProducer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
+{
+    m_pDefaultMQProducerImpl->createTopic(key, newTopic, queueNum);
+}
+
+long long DefaultMQProducer::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+    return m_pDefaultMQProducerImpl->searchOffset(mq, timestamp);
+}
+
+long long DefaultMQProducer::maxOffset(const MessageQueue& mq)
+{
+    return m_pDefaultMQProducerImpl->maxOffset(mq);
+}
+
+long long DefaultMQProducer::minOffset(const MessageQueue& mq)
+{
+    return m_pDefaultMQProducerImpl->minOffset(mq);
+}
+
+long long DefaultMQProducer::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    return m_pDefaultMQProducerImpl->earliestMsgStoreTime(mq);
+}
+
+MessageExt* DefaultMQProducer::viewMessage(const std::string& msgId)
+{
+    return m_pDefaultMQProducerImpl->viewMessage(msgId);
+}
+
+QueryResult DefaultMQProducer::queryMessage(const std::string& topic,
+        const std::string& key,
+        int maxNum,
+        long long begin,
+        long long end)
+{
+
+    return m_pDefaultMQProducerImpl->queryMessage(topic, key, maxNum, begin, end);
+}
+
+std::string DefaultMQProducer::getProducerGroup()
+{
+    return m_producerGroup;
+}
+
+void DefaultMQProducer::setProducerGroup(const std::string& producerGroup)
+{
+    m_producerGroup = producerGroup;
+}
+
+std::string DefaultMQProducer::getCreateTopicKey()
+{
+    return m_createTopicKey;
+}
+
+void DefaultMQProducer::setCreateTopicKey(const std::string& createTopicKey)
+{
+    m_createTopicKey = createTopicKey;
+}
+
+int DefaultMQProducer::getSendMsgTimeout()
+{
+    return m_sendMsgTimeout;
+}
+
+void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout)
+{
+    m_sendMsgTimeout = sendMsgTimeout;
+}
+
+int DefaultMQProducer::getCompressMsgBodyOverHowmuch()
+{
+    return m_compressMsgBodyOverHowmuch;
+}
+
+void DefaultMQProducer::setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch)
+{
+    m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+}
+
+DefaultMQProducerImpl* DefaultMQProducer::getDefaultMQProducerImpl()
+{
+    return m_pDefaultMQProducerImpl;
+}
+
+bool DefaultMQProducer::isRetryAnotherBrokerWhenNotStoreOK()
+{
+    return m_retryAnotherBrokerWhenNotStoreOK;
+}
+
+void DefaultMQProducer::setRetryAnotherBrokerWhenNotStoreOK(bool retryAnotherBrokerWhenNotStoreOK)
+{
+    m_retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
+}
+
+int DefaultMQProducer::getMaxMessageSize()
+{
+    return m_maxMessageSize;
+}
+
+void DefaultMQProducer::setMaxMessageSize(int maxMessageSize)
+{
+    m_maxMessageSize = maxMessageSize;
+}
+
+int DefaultMQProducer::getDefaultTopicQueueNums()
+{
+    return m_defaultTopicQueueNums;
+}
+
+void DefaultMQProducer::setDefaultTopicQueueNums(int defaultTopicQueueNums)
+{
+    m_defaultTopicQueueNums = defaultTopicQueueNums;
+}
+
+int DefaultMQProducer::getRetryTimesWhenSendFailed()
+{
+    return m_retryTimesWhenSendFailed;
+}
+
+void DefaultMQProducer::setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)
+{
+    m_retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+}
+
+int DefaultMQProducer::getCompressLevel()
+{
+    return m_compressLevel;
+}
+
+void DefaultMQProducer::setCompressLevel(int compressLevel)
+{
+    assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1);
+
+    m_compressLevel = compressLevel;
+}
+
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
new file mode 100755
index 0000000..26b3f0b
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
@@ -0,0 +1,932 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License")
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "DefaultMQProducerImpl.h"
+#include "DefaultMQProducer.h"
+#include "MessageExt.h"
+#include "QueryResult.h"
+#include "TopicPublishInfo.h"
+#include "MQClientException.h"
+#include "LocalTransactionExecuter.h"
+#include "SendMessageHook.h"
+#include "MQClientManager.h"
+#include "MQClientFactory.h"
+#include "Validators.h"
+#include "MQAdminImpl.h"
+#include "MQClientAPIImpl.h"
+#include "MessageSysFlag.h"
+#include "CommandCustomHeader.h"
+#include "KPRUtil.h"
+#include "MessageDecoder.h"
+#include "MessageQueueSelector.h"
+#include "MQProtos.h"
+#include "RemotingCommand.h"
+#include "UtilAll.h"
+
+
+
+namespace rmq
+{
+
+DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducer
+        *pDefaultMQProducer)
+    : m_pDefaultMQProducer(pDefaultMQProducer),
+      m_serviceState(CREATE_JUST),
+      m_pMQClientFactory(NULL)
+{
+}
+
+DefaultMQProducerImpl::~DefaultMQProducerImpl()
+{
+	//delete m_pMQClientFactory;
+}
+
+
+void DefaultMQProducerImpl::start()
+{
+    start(true);
+}
+
+void DefaultMQProducerImpl::start(bool startFactory)
+{
+    RMQ_DEBUG("DefaultMQProducerImpl::start()");
+
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+        {
+        	RMQ_INFO("the producer [{%s}] start beginning.",
+                	m_pDefaultMQProducer->getProducerGroup().c_str());
+
+            m_serviceState = START_FAILED;
+            checkConfig();
+
+            if (m_pDefaultMQProducer->getProducerGroup() !=
+                MixAll::CLIENT_INNER_PRODUCER_GROUP)
+            {
+                m_pDefaultMQProducer->changeInstanceNameToPID();
+            }
+
+            m_pMQClientFactory =
+                MQClientManager::getInstance()->getAndCreateMQClientFactory(
+                    *m_pDefaultMQProducer);
+            bool registerOK = m_pMQClientFactory->registerProducer(
+                                  m_pDefaultMQProducer->getProducerGroup(), this);
+
+            if (!registerOK)
+            {
+                m_serviceState = CREATE_JUST;
+                THROW_MQEXCEPTION(MQClientException,
+                                  "The producer group[" + m_pDefaultMQProducer->getProducerGroup()
+                                  + "] has been created before, specify another name please.", -1);
+            }
+
+            m_topicPublishInfoTable[m_pDefaultMQProducer->getCreateTopicKey()] =
+                TopicPublishInfo();
+
+            if (startFactory)
+            {
+                m_pMQClientFactory->start();
+            }
+
+			RMQ_INFO("the producer [%s] start OK", m_pDefaultMQProducer->getProducerGroup().c_str());
+            m_serviceState = RUNNING;
+        }
+        break;
+
+        case RUNNING:
+            RMQ_ERROR("This client is already running.");
+
+        case START_FAILED:
+            RMQ_ERROR("This client failed to start previously.");
+
+        case SHUTDOWN_ALREADY:
+            RMQ_ERROR("This client has been shutted down.");
+            THROW_MQEXCEPTION(MQClientException,
+                              "The producer service state not OK, maybe started once, ", -1);
+
+        default:
+            break;
+    }
+
+    m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
+}
+
+void DefaultMQProducerImpl::shutdown()
+{
+    shutdown(true);
+}
+
+void DefaultMQProducerImpl::shutdown(bool shutdownFactory)
+{
+    RMQ_DEBUG("DefaultMQProducerImpl::shutdown()");
+
+    switch (m_serviceState)
+    {
+        case CREATE_JUST:
+            break;
+
+        case RUNNING:
+            m_pMQClientFactory->unregisterProducer(
+                m_pDefaultMQProducer->getProducerGroup());
+
+            if (shutdownFactory)
+            {
+                m_pMQClientFactory->shutdown();
+            }
+
+			RMQ_INFO("the producer [%s] shutdown OK", m_pDefaultMQProducer->getProducerGroup().c_str());
+            m_serviceState = SHUTDOWN_ALREADY;
+            break;
+
+        case SHUTDOWN_ALREADY:
+            break;
+
+        default:
+            break;
+    }
+}
+
+
+void DefaultMQProducerImpl::initTransactionEnv()
+{
+    //TODO
+}
+
+void DefaultMQProducerImpl::destroyTransactionEnv()
+{
+    //TODO
+}
+
+bool DefaultMQProducerImpl::hasHook()
+{
+    return !m_hookList.empty();
+}
+
+void DefaultMQProducerImpl::registerHook(SendMessageHook* pHook)
+{
+    m_hookList.push_back(pHook);
+}
+
+void DefaultMQProducerImpl::executeHookBefore(const SendMessageContext& context)
+{
+    std::list<SendMessageHook*>::iterator it = m_hookList.begin();
+
+    for (; it != m_hookList.end(); it++)
+    {
+        try
+        {
+            (*it)->sendMessageBefore(context);
+        }
+        catch (...)
+        {
+        	RMQ_WARN("sendMessageBefore exception");
+        }
+    }
+}
+
+void DefaultMQProducerImpl::executeHookAfter(const SendMessageContext& context)
+{
+    std::list<SendMessageHook*>::iterator it = m_hookList.begin();
+
+    for (; it != m_hookList.end(); it++)
+    {
+        try
+        {
+            (*it)->sendMessageAfter(context);
+        }
+        catch (...)
+        {
+        	RMQ_WARN("sendMessageAfter exception");
+        }
+    }
+}
+
+
+std::set<std::string> DefaultMQProducerImpl::getPublishTopicList()
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+    std::set<std::string> toplist;
+    std::map<std::string, TopicPublishInfo>::iterator it =
+        m_topicPublishInfoTable.begin();
+    for (; it != m_topicPublishInfoTable.end(); it++)
+    {
+        toplist.insert(it->first);
+    }
+
+    return toplist;
+}
+
+bool DefaultMQProducerImpl::isPublishTopicNeedUpdate(const std::string& topic)
+{
+	kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+    std::map<std::string, TopicPublishInfo>::iterator it =
+        m_topicPublishInfoTable.find(topic);
+    if (it != m_topicPublishInfoTable.end())
+    {
+        return !it->second.ok();
+    }
+
+    return true;
+}
+
+void DefaultMQProducerImpl::checkTransactionState(const std::string& addr, //
+        const MessageExt& msg, //
+        const CheckTransactionStateRequestHeader& checkRequestHeader)
+{
+    //TODO
+}
+
+void DefaultMQProducerImpl::updateTopicPublishInfo(const std::string& topic,
+        TopicPublishInfo& info)
+{
+	{
+		kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+	    std::map<std::string, TopicPublishInfo>::iterator it =
+	        m_topicPublishInfoTable.find(topic);
+	    if (it != m_topicPublishInfoTable.end())
+	    {
+	        info.getSendWhichQueue() = it->second.getSendWhichQueue();
+	        RMQ_INFO("updateTopicPublishInfo prev is not null, %s", it->second.toString().c_str());
+	    }
+	    m_topicPublishInfoTable[topic] = info;
+    }
+}
+
+void DefaultMQProducerImpl::createTopic(const std::string& key,
+                                        const std::string& newTopic, int queueNum)
+{
+    makeSureStateOK();
+    Validators::checkTopic(newTopic);
+
+    m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
+}
+
+std::vector<MessageQueue>* DefaultMQProducerImpl::fetchPublishMessageQueues(
+    const std::string& topic)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic);
+}
+
+long long DefaultMQProducerImpl::searchOffset(const MessageQueue& mq,
+        long long timestamp)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
+}
+
+long long DefaultMQProducerImpl::maxOffset(const MessageQueue& mq)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+}
+
+long long DefaultMQProducerImpl::minOffset(const MessageQueue& mq)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
+}
+
+long long DefaultMQProducerImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
+}
+
+MessageExt* DefaultMQProducerImpl::viewMessage(const std::string& msgId)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
+}
+
+QueryResult DefaultMQProducerImpl::queryMessage(const std::string& topic,
+        const std::string& key, int maxNum, long long begin, long long end)
+{
+    makeSureStateOK();
+    return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum,
+            begin, end);
+}
+
+
+/**
+ * DEFAULT ASYNC -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback)
+{
+    send(msg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback, int timeout)
+{
+    try
+    {
+        sendDefaultImpl(msg, ASYNC, pSendCallback, timeout);
+    }
+    catch (MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+    }
+}
+
+
+/**
+ * DEFAULT ONEWAY -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::sendOneway(Message& msg)
+{
+    try
+    {
+        sendDefaultImpl(msg, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout());
+    }
+    catch (MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+    }
+}
+
+
+/**
+ * KERNEL SYNC -------------------------------------------------------
+ */
+SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq)
+{
+    return send(msg, mq, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq, int timeout)
+{
+    makeSureStateOK();
+    Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+    if (msg.getTopic() != mq.getTopic())
+    {
+        THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+    }
+
+    return sendKernelImpl(msg, mq, SYNC, NULL, timeout);
+}
+
+
+/**
+ * KERNEL ASYNC -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq,
+                                 SendCallback* pSendCallback)
+{
+    return send(msg, mq, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq,
+	SendCallback* pSendCallback, int timeout)
+{
+    makeSureStateOK();
+    Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+    if (msg.getTopic() != mq.getTopic())
+    {
+        THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+    }
+
+    try
+    {
+        sendKernelImpl(msg, mq, ASYNC, pSendCallback, timeout);
+    }
+    catch (MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+    }
+}
+
+/**
+ * KERNEL ONEWAY -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::sendOneway(Message& msg, MessageQueue& mq)
+{
+    makeSureStateOK();
+    Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+    if (msg.getTopic() != mq.getTopic())
+    {
+        THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+    }
+
+    try
+    {
+        sendKernelImpl(msg, mq, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout());
+    }
+    catch (MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+    }
+}
+
+
+/**
+ * SELECT SYNC -------------------------------------------------------
+ */
+SendResult DefaultMQProducerImpl::send(Message& msg,
+                                       MessageQueueSelector* pSelector, void* arg)
+{
+    return send(msg, pSelector, arg, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+SendResult DefaultMQProducerImpl::send(Message& msg,
+                                       MessageQueueSelector* pSelector, void* arg, int timeout)
+{
+    return sendSelectImpl(msg, pSelector, arg, SYNC, NULL, timeout);
+}
+
+
+/**
+ * SELECT ASYNC -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::send(Message& msg,
+                                 MessageQueueSelector* pSelector,
+                                 void* arg,
+                                 SendCallback* pSendCallback)
+{
+    return send(msg, pSelector, arg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+void DefaultMQProducerImpl::send(Message& msg,
+                                 MessageQueueSelector* pSelector,
+                                 void* arg,
+                                 SendCallback* pSendCallback,
+                                 int timeout)
+{
+    try
+    {
+        sendSelectImpl(msg, pSelector, arg, ASYNC, pSendCallback, timeout);
+    }
+    catch (MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+    }
+}
+
+
+/**
+ * SELECT ONEWAY -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::sendOneway(Message& msg,
+	MessageQueueSelector* pSelector, void* arg)
+{
+    try
+    {
+        sendSelectImpl(msg, pSelector, arg, ONEWAY, NULL,
+        	m_pDefaultMQProducer->getSendMsgTimeout());
+    }
+    catch (MQBrokerException& e)
+    {
+        THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+    }
+}
+
+
+/*
+ * Send with Transaction
+ */
+TransactionSendResult DefaultMQProducerImpl::sendMessageInTransaction(
+    Message& msg,
+    LocalTransactionExecuter* tranExecuter, void* arg)
+{
+    //TODO
+    TransactionSendResult result;
+    return result;
+}
+
+void DefaultMQProducerImpl::endTransaction(//
+    SendResult sendResult, //
+    LocalTransactionState localTransactionState, //
+    MQClientException localException)
+{
+    //TODO
+}
+
+/**
+ * DEFAULT SYNC -------------------------------------------------------
+ */
+SendResult DefaultMQProducerImpl::send(Message& msg)
+{
+    return send(msg, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+SendResult DefaultMQProducerImpl::send(Message& msg, int timeout)
+{
+    return sendDefaultImpl(msg, SYNC, NULL, timeout);
+}
+
+
+std::map<std::string, TopicPublishInfo> DefaultMQProducerImpl::getTopicPublishInfoTable()
+{
+    return m_topicPublishInfoTable;
+}
+
+MQClientFactory* DefaultMQProducerImpl::getMQClientFactory()
+{
+    return m_pMQClientFactory;
+}
+
+int DefaultMQProducerImpl::getZipCompressLevel()
+{
+    return m_zipCompressLevel;
+}
+
+void DefaultMQProducerImpl::setZipCompressLevel(int zipCompressLevel)
+{
+    m_zipCompressLevel = zipCompressLevel;
+}
+
+ServiceState DefaultMQProducerImpl::getServiceState() {
+    return m_serviceState;
+}
+
+
+void DefaultMQProducerImpl::setServiceState(ServiceState serviceState) {
+    m_serviceState = serviceState;
+}
+
+
+SendResult DefaultMQProducerImpl::sendDefaultImpl(Message& msg,
+        CommunicationMode communicationMode,
+        SendCallback* pSendCallback,
+        int timeout)
+{
+    makeSureStateOK();
+    Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+	long long maxTimeout = m_pDefaultMQProducer->getSendMsgTimeout() + 1000;
+    long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
+    long long endTimestamp = beginTimestamp;
+    TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic());
+    SendResult sendResult;
+
+    if (topicPublishInfo.ok())
+    {
+        MessageQueue* mq = NULL;
+
+		int times = 0;
+		int timesTotal = 1 + m_pDefaultMQProducer->getRetryTimesWhenSendFailed();
+		std::vector<std::string> brokersSent;
+        for (; times < timesTotal && int(endTimestamp - beginTimestamp) < maxTimeout; times++)
+        {
+            std::string lastBrokerName = (NULL == mq) ? "" : mq->getBrokerName();
+            MessageQueue* tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
+
+            if (tmpmq != NULL)
+            {
+                mq = tmpmq;
+                brokersSent.push_back(mq->getBrokerName());
+
+                try
+                {
+                    sendResult = sendKernelImpl(msg, *mq, communicationMode, pSendCallback, timeout);
+                    endTimestamp = KPRUtil::GetCurrentTimeMillis();
+
+                    switch (communicationMode)
+                    {
+                        case ASYNC:
+                            return sendResult;
+
+                        case ONEWAY:
+                            return sendResult;
+
+                        case SYNC:
+                            if (sendResult.getSendStatus() != SEND_OK)
+                            {
+                                if (m_pDefaultMQProducer->isRetryAnotherBrokerWhenNotStoreOK())
+                                {
+                                    continue;
+                                }
+                            }
+
+                            return sendResult;
+
+                        default:
+                            break;
+                    }
+                }
+                catch (RemotingException& e)
+                {
+                    endTimestamp = KPRUtil::GetCurrentTimeMillis();
+                    continue;
+                }
+                catch (MQClientException& e)
+                {
+                    endTimestamp = KPRUtil::GetCurrentTimeMillis();
+                    continue;
+                }
+                catch (MQBrokerException& e)
+                {
+                    endTimestamp = KPRUtil::GetCurrentTimeMillis();
+
+                    switch (e.GetError())
+                    {
+                        case TOPIC_NOT_EXIST_VALUE:
+                        case SERVICE_NOT_AVAILABLE_VALUE:
+                        case SYSTEM_ERROR_VALUE:
+                        case NO_PERMISSION_VALUE:
+                        case NO_BUYER_ID_VALUE:
+                        case NOT_IN_CURRENT_UNIT_VALUE:
+                            continue;
+                        default:
+                        	if (sendResult.hasResult())
+                        	{
+                            	return sendResult;
+                            }
+                            throw;
+                    }
+                }
+                catch (InterruptedException& e)
+                {
+                	endTimestamp = KPRUtil::GetCurrentTimeMillis();
+                    throw;
+                }
+            }
+            else
+            {
+                break;
+            }
+        } // end of for
+
+		std::string info = RocketMQUtil::str2fmt("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
+			times, int(endTimestamp - beginTimestamp), msg.getTopic().c_str(), UtilAll::toString(brokersSent).c_str());
+		RMQ_WARN("%s", info.c_str());
+        THROW_MQEXCEPTION(MQClientException, info, -1);
+        return sendResult;
+    }
+
+    std::vector<std::string> nsList =
+        getMQClientFactory()->getMQClientAPIImpl()->getNameServerAddressList();
+    if (nsList.empty())
+    {
+        THROW_MQEXCEPTION(MQClientException, "No name server address, please set it", -1);
+    }
+
+    THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1);
+}
+
+SendResult DefaultMQProducerImpl::sendKernelImpl(Message& msg,
+        const MessageQueue& mq,
+        CommunicationMode communicationMode,
+        SendCallback* sendCallback,
+        int timeout)
+{
+    std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    if (brokerAddr.empty())
+    {
+        tryToFindTopicPublishInfo(mq.getTopic());
+        brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+    }
+
+    SendMessageContext context;
+    if (!brokerAddr.empty())
+    {
+        try
+        {
+            int sysFlag = 0;
+
+            if (tryToCompressMessage(msg))
+            {
+                sysFlag |= MessageSysFlag::CompressedFlag;
+            }
+
+            std::string tranMsg = msg.getProperty(Message::PROPERTY_TRANSACTION_PREPARED);
+            if (!tranMsg.empty() && tranMsg == "true")
+            {
+                sysFlag |= MessageSysFlag::TransactionPreparedType;
+            }
+
+            // ִ��hook
+            if (hasHook())
+            {
+                context.producerGroup = (m_pDefaultMQProducer->getProducerGroup());
+                context.communicationMode = (communicationMode);
+                context.brokerAddr = (brokerAddr);
+                context.msg = (msg);
+                context.mq = (mq);
+                executeHookBefore(context);
+            }
+
+            SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
+            requestHeader->producerGroup = (m_pDefaultMQProducer->getProducerGroup());
+            requestHeader->topic = (msg.getTopic());
+            requestHeader->defaultTopic = (m_pDefaultMQProducer->getCreateTopicKey());
+            requestHeader->defaultTopicQueueNums = (m_pDefaultMQProducer->getDefaultTopicQueueNums());
+            requestHeader->queueId = (mq.getQueueId());
+            requestHeader->sysFlag = (sysFlag);
+            requestHeader->bornTimestamp = (KPRUtil::GetCurrentTimeMillis());
+            requestHeader->flag = (msg.getFlag());
+            requestHeader->properties = (MessageDecoder::messageProperties2String(msg.getProperties()));
+            requestHeader->reconsumeTimes = 0;
+
+			if (requestHeader->topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) == 0)
+			{
+                std::string reconsumeTimes = msg.getProperty(Message::PROPERTY_RECONSUME_TIME);
+                if (!reconsumeTimes.empty())
+                {
+                    requestHeader->reconsumeTimes = int(UtilAll::str2ll(reconsumeTimes.c_str()));
+                    msg.clearProperty(Message::PROPERTY_RECONSUME_TIME);
+                }
+
+				/*
+				3.5.8 new features
+                std::string maxReconsumeTimes = msg.getProperty(Message::PROPERTY_MAX_RECONSUME_TIMES);
+                if (!maxReconsumeTimes.empty())
+                {
+                    requestHeader->maxReconsumeTimes = int(UtilAll::str2ll(maxReconsumeTimes.c_str()));
+                    msg.clearProperty(Message::PROPERTY_MAX_RECONSUME_TIMES);
+                }
+                */
+            }
+
+            SendResult sendResult = m_pMQClientFactory->getMQClientAPIImpl()->sendMessage(
+			    brokerAddr,
+			    mq.getBrokerName(),
+			    msg,
+			    requestHeader,
+			    timeout,
+			    communicationMode,
+			    sendCallback
+			);
+
+            if (hasHook())
+            {
+                context.sendResult = (sendResult);
+                executeHookAfter(context);
+            }
+
+            return sendResult;
+        }
+        catch (RemotingException& e)
+        {
+            if (hasHook())
+            {
+                context.pException = (&e);
+                executeHookAfter(context);
+            }
+			RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
+            throw;
+        }
+        catch (MQBrokerException& e)
+        {
+            if (hasHook())
+            {
+                context.pException = (&e);
+                executeHookAfter(context);
+            }
+			RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
+            throw;
+        }
+        catch (InterruptedException& e)
+        {
+            if (hasHook())
+            {
+                context.pException = (&e);
+                executeHookAfter(context);
+            }
+			RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
+            throw;
+        }
+    }
+
+    THROW_MQEXCEPTION(MQClientException, std::string("The broker[") + mq.getBrokerName() + "] not exist", -1);
+}
+
+SendResult DefaultMQProducerImpl::sendSelectImpl(Message& msg,
+        MessageQueueSelector* selector,
+        void* pArg,
+        CommunicationMode communicationMode,
+        SendCallback* sendCallback,
+        int timeout)
+{
+    makeSureStateOK();
+    Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+    SendResult result;
+    TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic());
+    SendResult sendResult;
+
+    if (topicPublishInfo.ok())
+    {
+        MessageQueue* mq = NULL;
+
+		try
+		{
+			mq = selector->select(topicPublishInfo.getMessageQueueList(), msg, pArg);
+		}
+		catch (std::exception& e)
+		{
+            THROW_MQEXCEPTION(MQClientException,
+            	std::string("select message queue throwed exception, ") + e.what(), -1);
+        }
+        catch (...)
+		{
+            THROW_MQEXCEPTION(MQClientException, "select message queue throwed exception, ", -1);
+        }
+
+        if (mq != NULL)
+        {
+            return sendKernelImpl(msg, *mq, communicationMode, sendCallback, timeout);
+        }
+        else
+        {
+            THROW_MQEXCEPTION(MQClientException, "select message queue return null", -1);
+        }
+    }
+
+    THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1);
+}
+
+void DefaultMQProducerImpl::makeSureStateOK()
+{
+    if (m_serviceState != RUNNING)
+    {
+        THROW_MQEXCEPTION(MQClientException, "The producer service state not OK, ", -1);
+    }
+}
+
+void DefaultMQProducerImpl::checkConfig()
+{
+	Validators::checkGroup(m_pDefaultMQProducer->getProducerGroup());
+
+    if (m_pDefaultMQProducer->getProducerGroup().empty())
+    {
+        THROW_MQEXCEPTION(MQClientException, "producerGroup is null", -1);
+    }
+
+    if (m_pDefaultMQProducer->getProducerGroup() == MixAll::DEFAULT_PRODUCER_GROUP)
+    {
+    	THROW_MQEXCEPTION(MQClientException,
+    		std::string("producerGroup can not equal [") + MixAll::DEFAULT_PRODUCER_GROUP + "], please specify another one",
+    		-1);
+    }
+}
+
+TopicPublishInfo& DefaultMQProducerImpl::tryToFindTopicPublishInfo(
+    const std::string& topic)
+{
+	std::map<std::string, TopicPublishInfo>::iterator it;
+	{
+		kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+	    it = m_topicPublishInfoTable.find(topic);
+    }
+
+    if (it == m_topicPublishInfoTable.end() || !it->second.ok())
+    {
+    	{
+	    	kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+	        m_topicPublishInfoTable[topic] = TopicPublishInfo();
+        }
+
+        m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic);
+
+        {
+	        kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+	        it = m_topicPublishInfoTable.find(topic);
+        }
+    }
+
+    if (it != m_topicPublishInfoTable.end()
+    	&& (it->second.ok() || it->second.isHaveTopicRouterInfo()))
+    {
+    	return (it->second);
+    }
+    else
+    {
+    	m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic, true,
+                m_pDefaultMQProducer);
+        {
+	        kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+    	    it = m_topicPublishInfoTable.find(topic);
+    	}
+    	return (it->second);
+    }
+}
+
+bool DefaultMQProducerImpl::tryToCompressMessage(Message& msg)
+{
+    if (msg.getBodyLen() >= m_pDefaultMQProducer->getCompressMsgBodyOverHowmuch())
+    {
+        if (msg.tryToCompress(m_pDefaultMQProducer->getCompressLevel()))
+        {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+TransactionCheckListener* DefaultMQProducerImpl::checkListener()
+{
+    return NULL;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
new file mode 100755
index 0000000..3df914c
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
@@ -0,0 +1,205 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __DEFAULTMQPRODUCERIMPL_H__
+#define __DEFAULTMQPRODUCERIMPL_H__
+
+#include <list>
+#include <vector>
+#include "MQProducerInner.h"
+#include "QueryResult.h"
+#include "ServiceState.h"
+#include "CommunicationMode.h"
+#include "SendResult.h"
+#include "MQClientException.h"
+#include "Mutex.h"
+#include "ScopedLock.h"
+
+
+namespace rmq
+{
+    class DefaultMQProducer;
+    class SendMessageHook;
+    class SendMessageContext;
+    class MessageQueue;
+    class MessageExt;
+    class SendCallback;
+    class MessageQueueSelector;
+    class MQClientFactory;
+    class MQClientException;
+    class RemotingException;
+    class MQBrokerException;
+    class InterruptedException;
+    class LocalTransactionExecuter;
+
+
+    class DefaultMQProducerImpl : public MQProducerInner
+    {
+    public:
+        DefaultMQProducerImpl(DefaultMQProducer* pDefaultMQProducer);
+		~DefaultMQProducerImpl();
+        void initTransactionEnv();
+        void destroyTransactionEnv();
+
+        bool hasHook();
+        void registerHook(SendMessageHook* pHook);
+        void executeHookBefore(const SendMessageContext& context);
+        void executeHookAfter(const SendMessageContext& context);
+
+        void start();
+        void start(bool startFactory);
+        void shutdown();
+        void shutdown(bool shutdownFactory);
+
+        std::set<std::string> getPublishTopicList();
+        bool isPublishTopicNeedUpdate(const std::string& topic);
+
+        void checkTransactionState(const std::string& addr,
+                                   const MessageExt& msg,
+                                   const CheckTransactionStateRequestHeader& checkRequestHeader);
+
+        void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info);
+        virtual TransactionCheckListener* checkListener();
+
+        void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
+        std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic);
+
+        long long searchOffset(const MessageQueue& mq, long long timestamp);
+        long long maxOffset(const MessageQueue& mq);
+        long long minOffset(const MessageQueue& mq);
+
+        long long earliestMsgStoreTime(const MessageQueue& mq);
+
+        MessageExt* viewMessage(const std::string& msgId);
+        QueryResult queryMessage(const std::string& topic,
+                                 const std::string& key,
+                                 int maxNum,
+                                 long long begin,
+                                 long long end);
+
+        /**
+        * DEFAULT ASYNC -------------------------------------------------------
+        */
+        void send(Message& msg, SendCallback* sendCallback);
+		void send(Message& msg, SendCallback* sendCallback, int timeout);
+
+        /**
+        * DEFAULT ONEWAY -------------------------------------------------------
+        */
+        void sendOneway(Message& msg);
+
+        /**
+        * KERNEL SYNC -------------------------------------------------------
+        */
+        SendResult send(Message& msg, MessageQueue& mq);
+		SendResult send(Message& msg, MessageQueue& mq, int timeout);
+
+        /**
+        * KERNEL ASYNC -------------------------------------------------------
+        */
+        void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback);
+		void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback, int timeout);
+
+        /**
+        * KERNEL ONEWAY -------------------------------------------------------
+        */
+        void sendOneway(Message& msg, MessageQueue& mq);
+
+        /**
+        * SELECT SYNC -------------------------------------------------------
+        */
+        SendResult send(Message& msg, MessageQueueSelector* selector, void* arg);
+		SendResult send(Message& msg, MessageQueueSelector* selector, void* arg, int timeout);
+
+        /**
+        * SELECT ASYNC -------------------------------------------------------
+        */
+        void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback);
+		void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback, int timeout);
+
+        /**
+        * SELECT ONEWAY -------------------------------------------------------
+        */
+        void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg);
+
+		/**
+		* SEND with Transaction
+		*/
+        TransactionSendResult sendMessageInTransaction(Message& msg, LocalTransactionExecuter* tranExecuter, void* arg);
+
+        /**
+        * DEFAULT SYNC -------------------------------------------------------
+        */
+        SendResult send(Message& msg);
+		SendResult send(Message& msg, int timeout);
+
+        std::map<std::string, TopicPublishInfo> getTopicPublishInfoTable();
+
+        MQClientFactory* getMQClientFactory();
+
+        int getZipCompressLevel();
+        void setZipCompressLevel(int zipCompressLevel);
+
+		ServiceState getServiceState();
+		void setServiceState(ServiceState serviceState);
+
+    private:
+        SendResult sendSelectImpl(Message& msg,
+                                  MessageQueueSelector* selector,
+                                  void* pArg,
+                                  CommunicationMode communicationMode,
+                                  SendCallback* sendCallback,
+                                  int timeout);
+
+        SendResult sendDefaultImpl(Message& msg,
+                                   CommunicationMode communicationMode,
+                                   SendCallback* pSendCallback,
+                                   int timeout);
+
+        SendResult sendKernelImpl(Message& msg,
+                                  const MessageQueue& mq,
+                                  CommunicationMode communicationMode,
+                                  SendCallback* pSendCallback,
+                                  int timeout);
+
+        void endTransaction(SendResult sendResult,
+                            LocalTransactionState localTransactionState,
+                            MQClientException localException);
+
+        void makeSureStateOK();
+        void checkConfig();
+
+        TopicPublishInfo& tryToFindTopicPublishInfo(const std::string& topic) ;
+        bool tryToCompressMessage(Message& msg);
+
+    protected:
+        //TODO transaction imp
+
+    private:
+        int m_zipCompressLevel;// message compress level, default is 5
+
+        DefaultMQProducer* m_pDefaultMQProducer;
+
+        std::map<std::string, TopicPublishInfo> m_topicPublishInfoTable;
+		kpr::RWMutex m_topicPublishInfoTableLock;
+
+        ServiceState m_serviceState;
+        MQClientFactory* m_pMQClientFactory;
+        std::list<SendMessageHook*> m_hookList;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
new file mode 100755
index 0000000..a124884
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOCALTRANSACTIONEXECUTER_H__
+#define __LOCALTRANSACTIONEXECUTER_H__
+
+#include "SendResult.h"
+
+namespace rmq
+{
+	class LocalTransactionExecuter
+	{
+	public:
+	    virtual~LocalTransactionExecuter() {}
+	    virtual LocalTransactionState executeLocalTransactionBranch(Message& msg, void* arg) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/MQProducerInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/MQProducerInner.h b/rocketmq-client4cpp/src/producer/MQProducerInner.h
new file mode 100755
index 0000000..56194dc
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/MQProducerInner.h
@@ -0,0 +1,44 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQPRODUCERINNER_H__
+#define __MQPRODUCERINNER_H__
+
+#include <string>
+#include <set>
+
+namespace rmq
+{
+	class TransactionCheckListener;
+	class MessageExt;
+	class CheckTransactionStateRequestHeader;
+	class TopicPublishInfo;
+
+	class MQProducerInner
+	{
+	public:
+	    virtual ~MQProducerInner() {}
+	    virtual std::set<std::string> getPublishTopicList() = 0;
+	    virtual bool isPublishTopicNeedUpdate(const std::string& topic) = 0;
+	    virtual TransactionCheckListener* checkListener() = 0;
+	    virtual void checkTransactionState(const std::string& addr, //
+	                                       const MessageExt& msg, //
+	                                       const CheckTransactionStateRequestHeader& checkRequestHeader) = 0;
+	    virtual void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/MessageQueueSelector.h b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
new file mode 100755
index 0000000..6d5ac48
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
@@ -0,0 +1,96 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MESSAGEQUEUESELECTOR_H__
+#define __MESSAGEQUEUESELECTOR_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <time.h>
+#include <math.h>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "MessageQueue.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+    class Message;
+
+    class MessageQueueSelector
+    {
+    public:
+        virtual ~MessageQueueSelector() {}
+        virtual MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) = 0;
+    };
+
+    class SelectMessageQueueByRandoom : public MessageQueueSelector
+    {
+    public:
+        MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
+        {
+            srand((unsigned)time(NULL));
+            int value = rand();
+            value = value % mqs.size();
+            return &(mqs.at(value));
+        }
+    };
+
+    class SelectMessageQueueByHash : public MessageQueueSelector
+    {
+    public:
+        MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
+        {
+			std::string* sArg = (std::string*)arg;
+            int value = UtilAll::hashCode(sArg->c_str(), sArg->size());
+            if (value < 0)
+            {
+                value = abs(value);
+            }
+
+            value = value % mqs.size();
+            return &(mqs.at(value));
+        }
+    };
+
+
+    class SelectMessageQueueByMachineRoom : public MessageQueueSelector
+    {
+    public:
+        MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
+        {
+            // TODO Auto-generated method stub
+            return NULL;
+        }
+
+        std::set<std::string> getConsumeridcs()
+        {
+            return m_consumeridcs;
+        }
+
+        void setConsumeridcs(const std::set<std::string>& consumeridcs)
+        {
+            m_consumeridcs = consumeridcs;
+        }
+
+    private:
+        std::set<std::string> m_consumeridcs;
+    };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
new file mode 100755
index 0000000..573db95
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
@@ -0,0 +1,101 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ProducerInvokeCallback.h"
+#include "ResponseFuture.h"
+#include "SendResult.h"
+#include "MQClientAPIImpl.h"
+#include "SendCallback.h"
+#include "MQClientException.h"
+#include "RemotingCommand.h"
+
+namespace rmq
+{
+
+ProducerInvokeCallback::ProducerInvokeCallback(SendCallback* pSendCallBack,
+        MQClientAPIImpl* pMQClientAPIImpl,
+        const std::string& topic,
+        const std::string& brokerName)
+    : m_pSendCallBack(pSendCallBack),
+      m_pMQClientAPIImpl(pMQClientAPIImpl),
+      m_topic(topic),
+      m_brokerName(brokerName)
+{
+}
+
+ProducerInvokeCallback::~ProducerInvokeCallback()
+{
+	if (m_pSendCallBack)
+	{
+		delete m_pSendCallBack;
+		m_pSendCallBack = NULL;
+	}
+}
+
+void ProducerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture)
+{
+    if (m_pSendCallBack == NULL)
+    {
+        delete this;
+        return;
+    }
+
+    RemotingCommand* response = pResponseFuture->getResponseCommand();
+    if (response != NULL)
+    {
+        try
+        {
+            SendResult* sendResult =
+                m_pMQClientAPIImpl->processSendResponse(m_brokerName, m_topic, response);
+
+			assert(sendResult != NULL);
+            m_pSendCallBack->onSuccess(*sendResult);
+
+            delete sendResult;
+        }
+        catch (MQException& e)
+        {
+            m_pSendCallBack->onException(e);
+        }
+
+        delete response;
+    }
+    else
+    {
+        if (!pResponseFuture->isSendRequestOK())
+        {
+            std::string msg = "send request failed";
+            MQClientException e(msg, -1, __FILE__, __LINE__);
+            m_pSendCallBack->onException(e);
+        }
+        else if (pResponseFuture->isTimeout())
+        {
+            std::string msg = RocketMQUtil::str2fmt("wait response timeout %lld ms",
+            	pResponseFuture->getTimeoutMillis());
+            MQClientException e(msg, -1, __FILE__, __LINE__);
+            m_pSendCallBack->onException(e);
+        }
+        else
+        {
+            std::string msg = "unknow reseaon";
+            MQClientException e(msg, -1, __FILE__, __LINE__);
+            m_pSendCallBack->onException(e);
+        }
+    }
+
+    delete this;
+}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
new file mode 100755
index 0000000..d2c9825
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
@@ -0,0 +1,46 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PRODUCERINVOKECALLBACK_H__
+#define __PRODUCERINVOKECALLBACK_H__
+
+#include <string>
+#include "InvokeCallback.h"
+
+namespace rmq
+{
+  class MQClientAPIImpl;
+  class SendCallback;
+
+  class ProducerInvokeCallback : public InvokeCallback
+  {
+  public:
+      ProducerInvokeCallback(SendCallback* pSendCallBack,
+                             MQClientAPIImpl* pMQClientAPIImpl,
+                             const std::string& topic,
+                             const std::string& brokerName);
+      virtual ~ProducerInvokeCallback();
+      virtual void operationComplete(ResponseFuturePtr pResponseFuture);
+
+  private:
+      SendCallback* m_pSendCallBack;
+      MQClientAPIImpl* m_pMQClientAPIImpl;
+      std::string m_topic;
+      std::string m_brokerName;
+  };
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TopicPublishInfo.h b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
new file mode 100755
index 0000000..0d85b5f
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
@@ -0,0 +1,141 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __TOPICPUBLISHINFO_H__
+#define __TOPICPUBLISHINFO_H__
+
+#include <list>
+#include <vector>
+#include <string>
+#include <sstream>
+#include <math.h>
+#include <stdlib.h>
+
+#include "RocketMQClient.h"
+#include "RefHandle.h"
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "UtilAll.h"
+
+
+namespace rmq
+{
+    class TopicPublishInfo : public kpr::RefCount
+    {
+    public:
+        TopicPublishInfo()
+        {
+            m_orderTopic = false;
+			m_haveTopicRouterInfo = false;
+        }
+
+        ~TopicPublishInfo()
+        {
+            m_messageQueueList.clear();
+        }
+
+        bool isOrderTopic()
+        {
+            return m_orderTopic;
+        }
+
+        bool ok()
+        {
+            return !m_messageQueueList.empty();
+        }
+
+        void setOrderTopic(bool orderTopic)
+        {
+            m_orderTopic = orderTopic;
+        }
+
+        std::vector<MessageQueue>& getMessageQueueList()
+        {
+            return m_messageQueueList;
+        }
+
+        void setMessageQueueList(const std::vector<MessageQueue>& messageQueueList)
+        {
+            m_messageQueueList = messageQueueList;
+        }
+
+        kpr::AtomicInteger& getSendWhichQueue()
+        {
+            return m_sendWhichQueue;
+        }
+
+        void setSendWhichQueue(kpr::AtomicInteger& sendWhichQueue)
+        {
+            m_sendWhichQueue = sendWhichQueue;
+        }
+
+		bool isHaveTopicRouterInfo()
+		{
+        	return m_haveTopicRouterInfo;
+	    }
+
+
+	    void setHaveTopicRouterInfo(bool haveTopicRouterInfo)
+		{
+	        m_haveTopicRouterInfo = haveTopicRouterInfo;
+	    }
+
+        MessageQueue* selectOneMessageQueue(const std::string lastBrokerName)
+        {
+            if (!lastBrokerName.empty())
+            {
+                int index = m_sendWhichQueue++;
+                for (size_t i = 0; i < m_messageQueueList.size(); i++)
+                {
+                    int pos = abs(index++) % m_messageQueueList.size();
+                    MessageQueue& mq = m_messageQueueList.at(pos);
+                    if (mq.getBrokerName() != lastBrokerName)
+                    {
+                        return &mq;
+                    }
+                }
+
+                return NULL;
+            }
+            else
+            {
+                int index = m_sendWhichQueue++;
+                int pos = abs(index) % m_messageQueueList.size();
+                return &(m_messageQueueList.at(pos));
+            }
+        }
+
+		std::string toString() const
+		{
+			std::stringstream ss;
+		    ss << "{orderTopic=" << m_orderTopic
+		       << ",messageQueueList=" << UtilAll::toString(m_messageQueueList)
+		       << ",sendWhichQueue=" << m_sendWhichQueue
+		       << ",haveTopicRouterInfo=" << m_haveTopicRouterInfo
+		       << "}";
+		    return ss.str();
+    	}
+
+    private:
+        bool m_orderTopic;
+        std::vector<MessageQueue> m_messageQueueList;
+        kpr::AtomicInteger m_sendWhichQueue;
+		bool m_haveTopicRouterInfo;
+    };
+	typedef kpr::RefHandleT<TopicPublishInfo> TopicPublishInfoPtr;
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TransactionCheckListener.h b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
new file mode 100755
index 0000000..8955742
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TRANSACTIONCHECKLISTENER_H__
+#define __TRANSACTIONCHECKLISTENER_H__
+
+#include "SendResult.h"
+
+namespace rmq
+{
+	class TransactionCheckListener
+	{
+	public:
+	    virtual ~TransactionCheckListener() {}
+	    virtual LocalTransactionState checkLocalTransactionState(MessageExt* pMsg) = 0;
+	};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TransactionMQProducer.h b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
new file mode 100755
index 0000000..bee11a5
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
@@ -0,0 +1,118 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __TRANSACTIONMQPRODUCER_H__
+#define __TRANSACTIONMQPRODUCER_H__
+
+#include "DefaultMQProducer.h"
+#include "DefaultMQProducerImpl.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+    class TransactionMQProducer : public DefaultMQProducer
+    {
+    public:
+        TransactionMQProducer()
+            : m_pTransactionCheckListener(NULL),
+              m_checkThreadPoolMinSize(1),
+              m_checkThreadPoolMaxSize(1),
+              m_checkRequestHoldMax(2000)
+        {
+
+        }
+
+        TransactionMQProducer(const std::string& producerGroup)
+            : DefaultMQProducer(producerGroup),
+              m_pTransactionCheckListener(NULL),
+              m_checkThreadPoolMinSize(1),
+              m_checkThreadPoolMaxSize(1),
+              m_checkRequestHoldMax(2000)
+        {
+
+        }
+
+        void start()
+        {
+            m_pDefaultMQProducerImpl->initTransactionEnv();
+            DefaultMQProducer::start();
+        }
+
+        void shutdown()
+        {
+            DefaultMQProducer::shutdown();
+            m_pDefaultMQProducerImpl->destroyTransactionEnv();
+        }
+
+        TransactionSendResult sendMessageInTransaction(const Message& msg,
+                LocalTransactionExecuter* tranExecuter, void* arg)
+        {
+            if (NULL == m_pTransactionCheckListener)
+            {
+                THROW_MQEXCEPTION("localTransactionBranchCheckListener is null", -1);
+            }
+
+            return m_pDefaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+        }
+
+        TransactionCheckListener* getTransactionCheckListener()
+        {
+            return m_pTransactionCheckListener;
+        }
+
+        void setTransactionCheckListener(TransactionCheckListener* pTransactionCheckListener)
+        {
+            m_pTransactionCheckListener = pTransactionCheckListener;
+        }
+
+        int getCheckThreadPoolMinSize()
+        {
+            return m_checkThreadPoolMinSize;
+        }
+
+        void setCheckThreadPoolMinSize(int checkThreadPoolMinSize)
+        {
+            m_checkThreadPoolMinSize = checkThreadPoolMinSize;
+        }
+
+        int getCheckThreadPoolMaxSize()
+        {
+            return m_checkThreadPoolMaxSize;
+        }
+
+        void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize)
+        {
+            m_checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+        }
+
+        int getCheckRequestHoldMax()
+        {
+            return m_checkRequestHoldMax;
+        }
+
+        void setCheckRequestHoldMax(int checkRequestHoldMax)
+        {
+            m_checkRequestHoldMax = checkRequestHoldMax;
+        }
+
+    private:
+        TransactionCheckListener* m_pTransactionCheckListener;
+        int m_checkThreadPoolMinSize;
+        int m_checkThreadPoolMaxSize;
+        int m_checkRequestHoldMax;
+    };
+}
+
+#endif


Mime
View raw message