http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/message/MessageQueue.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/message/MessageQueue.cpp b/rocketmq-client4cpp/src/message/MessageQueue.cpp
new file mode 100755
index 0000000..d632550
--- /dev/null
+++ b/rocketmq-client4cpp/src/message/MessageQueue.cpp
@@ -0,0 +1,153 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#include "MessageQueue.h"
+
+#include <string>
+#include <sstream>
+#include <UtilAll.h>
+
+namespace rmq
+{
+
+MessageQueue::MessageQueue()
+ : m_queueId(0)
+{
+}
+
+MessageQueue::MessageQueue(const std::string& topic, const std::string& brokerName, int queueId)
+ : m_topic(topic), m_brokerName(brokerName), m_queueId(queueId)
+{
+
+}
+
+std::string MessageQueue::getTopic()const
+{
+ return m_topic;
+}
+
+void MessageQueue::setTopic(const std::string& topic)
+{
+ m_topic = topic;
+}
+
+std::string MessageQueue::getBrokerName()const
+{
+ return m_brokerName;
+}
+
+void MessageQueue::setBrokerName(const std::string& brokerName)
+{
+ m_brokerName = brokerName;
+}
+
+int MessageQueue::getQueueId()const
+{
+ return m_queueId;
+}
+
+void MessageQueue::setQueueId(int queueId)
+{
+ m_queueId = queueId;
+}
+
+int MessageQueue::hashCode()
+{
+ /*
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((brokerName == null) ? 0 : brokerName.hashCode());
+ result = prime * result + queueId;
+ result = prime * result + ((topic == null) ? 0 : topic.hashCode());
+ return result;
+ */
+ std::stringstream ss;
+ ss << m_topic << m_brokerName << m_queueId;
+ return UtilAll::hashCode(ss.str());
+}
+
+std::string MessageQueue::toString() const
+{
+ std::stringstream ss;
+ ss << "{topic=" << m_topic
+ << ",brokerName=" << m_brokerName
+ << ",queueId=" << m_queueId << "}";
+ return ss.str();
+}
+
+
+std::string MessageQueue::toJsonString() const
+{
+ std::stringstream ss;
+ ss << "{\"topic\":\"" << m_topic
+ << "\",\"brokerName\":\"" << m_brokerName
+ << "\",\"queueId\":" << m_queueId << "}";
+ return ss.str();
+}
+
+
+bool MessageQueue::operator==(const MessageQueue& mq)const
+{
+ if (this == &mq)
+ {
+ return true;
+ }
+
+ if (m_brokerName != mq.m_brokerName)
+ {
+ return false;
+ }
+
+ if (m_queueId != mq.m_queueId)
+ {
+ return false;
+ }
+
+ if (m_topic != mq.m_topic)
+ {
+ return false;
+ }
+
+ return true;
+}
+
+int MessageQueue::compareTo(const MessageQueue& mq)const
+{
+ {
+ int result = strcmp(m_topic.c_str(), mq.m_topic.c_str());
+ if (result != 0)
+ {
+ return result;
+ }
+ }
+
+ {
+ int result = strcmp(m_brokerName.c_str(), mq.m_brokerName.c_str());
+ if (result != 0)
+ {
+ return result;
+ }
+ }
+
+ return m_queueId - mq.m_queueId;
+}
+
+bool MessageQueue::operator<(const MessageQueue& mq)const
+{
+ return compareTo(mq) < 0;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
new file mode 100755
index 0000000..dcad654
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/DefaultMQProducer.cpp
@@ -0,0 +1,277 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License")
+{
+}
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "DefaultMQProducer.h"
+
+#include <assert.h>
+#include "MessageExt.h"
+#include "QueryResult.h"
+#include "DefaultMQProducerImpl.h"
+#include "MixAll.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+
+DefaultMQProducer::DefaultMQProducer()
+ : m_producerGroup(MixAll::DEFAULT_PRODUCER_GROUP),
+ m_createTopicKey(MixAll::DEFAULT_TOPIC),
+ m_defaultTopicQueueNums(4),
+ m_sendMsgTimeout(3000),
+ m_compressMsgBodyOverHowmuch(1024 * 4),
+ m_retryTimesWhenSendFailed(2),
+ m_retryAnotherBrokerWhenNotStoreOK(false),
+ m_maxMessageSize(1024 * 128),
+ m_compressLevel(5)
+{
+ m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this);
+}
+
+DefaultMQProducer::DefaultMQProducer(const std::string& producerGroup)
+ : m_producerGroup(producerGroup),
+ m_createTopicKey(MixAll::DEFAULT_TOPIC),
+ m_defaultTopicQueueNums(4),
+ m_sendMsgTimeout(3000),
+ m_compressMsgBodyOverHowmuch(1024 * 4),
+ m_retryTimesWhenSendFailed(2),
+ m_retryAnotherBrokerWhenNotStoreOK(false),
+ m_maxMessageSize(1024 * 128),
+ m_compressLevel(5)
+{
+ m_pDefaultMQProducerImpl = new DefaultMQProducerImpl(this);
+}
+
+DefaultMQProducer::~DefaultMQProducer()
+{
+ // memleak: maybe core
+ delete m_pDefaultMQProducerImpl;
+}
+
+
+void DefaultMQProducer::start()
+{
+ m_pDefaultMQProducerImpl->start();
+}
+
+void DefaultMQProducer::shutdown()
+{
+ m_pDefaultMQProducerImpl->shutdown();
+}
+
+std::vector<MessageQueue>* DefaultMQProducer::fetchPublishMessageQueues(const std::string& topic)
+{
+ return m_pDefaultMQProducerImpl->fetchPublishMessageQueues(topic);
+}
+
+SendResult DefaultMQProducer::send(Message& msg)
+{
+ return m_pDefaultMQProducerImpl->send(msg);
+}
+
+void DefaultMQProducer::send(Message& msg, SendCallback* pSendCallback)
+{
+ m_pDefaultMQProducerImpl->send(msg, pSendCallback);
+}
+
+void DefaultMQProducer::sendOneway(Message& msg)
+{
+ m_pDefaultMQProducerImpl->sendOneway(msg);
+}
+
+SendResult DefaultMQProducer::send(Message& msg, MessageQueue& mq)
+{
+ return m_pDefaultMQProducerImpl->send(msg, mq);
+}
+
+void DefaultMQProducer::send(Message& msg, MessageQueue& mq, SendCallback* pSendCallback)
+{
+ m_pDefaultMQProducerImpl->send(msg, mq, pSendCallback);
+}
+
+void DefaultMQProducer::sendOneway(Message& msg, MessageQueue& mq)
+{
+ m_pDefaultMQProducerImpl->sendOneway(msg, mq);
+}
+
+SendResult DefaultMQProducer::send(Message& msg, MessageQueueSelector* pSelector, void* arg)
+{
+ return m_pDefaultMQProducerImpl->send(msg, pSelector, arg);
+}
+
+void DefaultMQProducer::send(Message& msg,
+ MessageQueueSelector* pSelector,
+ void* arg,
+ SendCallback* pSendCallback)
+{
+ m_pDefaultMQProducerImpl->send(msg, pSelector, arg, pSendCallback);
+}
+
+void DefaultMQProducer::sendOneway(Message& msg, MessageQueueSelector* pSelector, void* arg)
+{
+ m_pDefaultMQProducerImpl->sendOneway(msg, pSelector, arg);
+}
+
+TransactionSendResult DefaultMQProducer::sendMessageInTransaction(Message& msg,
+ LocalTransactionExecuter* tranExecuter, void* arg)
+{
+ THROW_MQEXCEPTION(MQClientException,
+ "sendMessageInTransaction not implement, please use TransactionMQProducer class", -1);
+ TransactionSendResult result;
+
+ return result;
+}
+
+void DefaultMQProducer::createTopic(const std::string& key, const std::string& newTopic, int queueNum)
+{
+ m_pDefaultMQProducerImpl->createTopic(key, newTopic, queueNum);
+}
+
+long long DefaultMQProducer::searchOffset(const MessageQueue& mq, long long timestamp)
+{
+ return m_pDefaultMQProducerImpl->searchOffset(mq, timestamp);
+}
+
+long long DefaultMQProducer::maxOffset(const MessageQueue& mq)
+{
+ return m_pDefaultMQProducerImpl->maxOffset(mq);
+}
+
+long long DefaultMQProducer::minOffset(const MessageQueue& mq)
+{
+ return m_pDefaultMQProducerImpl->minOffset(mq);
+}
+
+long long DefaultMQProducer::earliestMsgStoreTime(const MessageQueue& mq)
+{
+ return m_pDefaultMQProducerImpl->earliestMsgStoreTime(mq);
+}
+
+MessageExt* DefaultMQProducer::viewMessage(const std::string& msgId)
+{
+ return m_pDefaultMQProducerImpl->viewMessage(msgId);
+}
+
+QueryResult DefaultMQProducer::queryMessage(const std::string& topic,
+ const std::string& key,
+ int maxNum,
+ long long begin,
+ long long end)
+{
+
+ return m_pDefaultMQProducerImpl->queryMessage(topic, key, maxNum, begin, end);
+}
+
+std::string DefaultMQProducer::getProducerGroup()
+{
+ return m_producerGroup;
+}
+
+void DefaultMQProducer::setProducerGroup(const std::string& producerGroup)
+{
+ m_producerGroup = producerGroup;
+}
+
+std::string DefaultMQProducer::getCreateTopicKey()
+{
+ return m_createTopicKey;
+}
+
+void DefaultMQProducer::setCreateTopicKey(const std::string& createTopicKey)
+{
+ m_createTopicKey = createTopicKey;
+}
+
+int DefaultMQProducer::getSendMsgTimeout()
+{
+ return m_sendMsgTimeout;
+}
+
+void DefaultMQProducer::setSendMsgTimeout(int sendMsgTimeout)
+{
+ m_sendMsgTimeout = sendMsgTimeout;
+}
+
+int DefaultMQProducer::getCompressMsgBodyOverHowmuch()
+{
+ return m_compressMsgBodyOverHowmuch;
+}
+
+void DefaultMQProducer::setCompressMsgBodyOverHowmuch(int compressMsgBodyOverHowmuch)
+{
+ m_compressMsgBodyOverHowmuch = compressMsgBodyOverHowmuch;
+}
+
+DefaultMQProducerImpl* DefaultMQProducer::getDefaultMQProducerImpl()
+{
+ return m_pDefaultMQProducerImpl;
+}
+
+bool DefaultMQProducer::isRetryAnotherBrokerWhenNotStoreOK()
+{
+ return m_retryAnotherBrokerWhenNotStoreOK;
+}
+
+void DefaultMQProducer::setRetryAnotherBrokerWhenNotStoreOK(bool retryAnotherBrokerWhenNotStoreOK)
+{
+ m_retryAnotherBrokerWhenNotStoreOK = retryAnotherBrokerWhenNotStoreOK;
+}
+
+int DefaultMQProducer::getMaxMessageSize()
+{
+ return m_maxMessageSize;
+}
+
+void DefaultMQProducer::setMaxMessageSize(int maxMessageSize)
+{
+ m_maxMessageSize = maxMessageSize;
+}
+
+int DefaultMQProducer::getDefaultTopicQueueNums()
+{
+ return m_defaultTopicQueueNums;
+}
+
+void DefaultMQProducer::setDefaultTopicQueueNums(int defaultTopicQueueNums)
+{
+ m_defaultTopicQueueNums = defaultTopicQueueNums;
+}
+
+int DefaultMQProducer::getRetryTimesWhenSendFailed()
+{
+ return m_retryTimesWhenSendFailed;
+}
+
+void DefaultMQProducer::setRetryTimesWhenSendFailed(int retryTimesWhenSendFailed)
+{
+ m_retryTimesWhenSendFailed = retryTimesWhenSendFailed;
+}
+
+int DefaultMQProducer::getCompressLevel()
+{
+ return m_compressLevel;
+}
+
+void DefaultMQProducer::setCompressLevel(int compressLevel)
+{
+ assert(compressLevel >= 0 && compressLevel <= 9 || compressLevel == -1);
+
+ m_compressLevel = compressLevel;
+}
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
new file mode 100755
index 0000000..26b3f0b
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.cpp
@@ -0,0 +1,932 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License")
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "DefaultMQProducerImpl.h"
+#include "DefaultMQProducer.h"
+#include "MessageExt.h"
+#include "QueryResult.h"
+#include "TopicPublishInfo.h"
+#include "MQClientException.h"
+#include "LocalTransactionExecuter.h"
+#include "SendMessageHook.h"
+#include "MQClientManager.h"
+#include "MQClientFactory.h"
+#include "Validators.h"
+#include "MQAdminImpl.h"
+#include "MQClientAPIImpl.h"
+#include "MessageSysFlag.h"
+#include "CommandCustomHeader.h"
+#include "KPRUtil.h"
+#include "MessageDecoder.h"
+#include "MessageQueueSelector.h"
+#include "MQProtos.h"
+#include "RemotingCommand.h"
+#include "UtilAll.h"
+
+
+
+namespace rmq
+{
+
+DefaultMQProducerImpl::DefaultMQProducerImpl(DefaultMQProducer
+ *pDefaultMQProducer)
+ : m_pDefaultMQProducer(pDefaultMQProducer),
+ m_serviceState(CREATE_JUST),
+ m_pMQClientFactory(NULL)
+{
+}
+
+DefaultMQProducerImpl::~DefaultMQProducerImpl()
+{
+ //delete m_pMQClientFactory;
+}
+
+
+void DefaultMQProducerImpl::start()
+{
+ start(true);
+}
+
+void DefaultMQProducerImpl::start(bool startFactory)
+{
+ RMQ_DEBUG("DefaultMQProducerImpl::start()");
+
+ switch (m_serviceState)
+ {
+ case CREATE_JUST:
+ {
+ RMQ_INFO("the producer [{%s}] start beginning.",
+ m_pDefaultMQProducer->getProducerGroup().c_str());
+
+ m_serviceState = START_FAILED;
+ checkConfig();
+
+ if (m_pDefaultMQProducer->getProducerGroup() !=
+ MixAll::CLIENT_INNER_PRODUCER_GROUP)
+ {
+ m_pDefaultMQProducer->changeInstanceNameToPID();
+ }
+
+ m_pMQClientFactory =
+ MQClientManager::getInstance()->getAndCreateMQClientFactory(
+ *m_pDefaultMQProducer);
+ bool registerOK = m_pMQClientFactory->registerProducer(
+ m_pDefaultMQProducer->getProducerGroup(), this);
+
+ if (!registerOK)
+ {
+ m_serviceState = CREATE_JUST;
+ THROW_MQEXCEPTION(MQClientException,
+ "The producer group[" + m_pDefaultMQProducer->getProducerGroup()
+ + "] has been created before, specify another name please.", -1);
+ }
+
+ m_topicPublishInfoTable[m_pDefaultMQProducer->getCreateTopicKey()] =
+ TopicPublishInfo();
+
+ if (startFactory)
+ {
+ m_pMQClientFactory->start();
+ }
+
+ RMQ_INFO("the producer [%s] start OK", m_pDefaultMQProducer->getProducerGroup().c_str());
+ m_serviceState = RUNNING;
+ }
+ break;
+
+ case RUNNING:
+ RMQ_ERROR("This client is already running.");
+
+ case START_FAILED:
+ RMQ_ERROR("This client failed to start previously.");
+
+ case SHUTDOWN_ALREADY:
+ RMQ_ERROR("This client has been shutted down.");
+ THROW_MQEXCEPTION(MQClientException,
+ "The producer service state not OK, maybe started once, ", -1);
+
+ default:
+ break;
+ }
+
+ m_pMQClientFactory->sendHeartbeatToAllBrokerWithLock();
+}
+
+void DefaultMQProducerImpl::shutdown()
+{
+ shutdown(true);
+}
+
+void DefaultMQProducerImpl::shutdown(bool shutdownFactory)
+{
+ RMQ_DEBUG("DefaultMQProducerImpl::shutdown()");
+
+ switch (m_serviceState)
+ {
+ case CREATE_JUST:
+ break;
+
+ case RUNNING:
+ m_pMQClientFactory->unregisterProducer(
+ m_pDefaultMQProducer->getProducerGroup());
+
+ if (shutdownFactory)
+ {
+ m_pMQClientFactory->shutdown();
+ }
+
+ RMQ_INFO("the producer [%s] shutdown OK", m_pDefaultMQProducer->getProducerGroup().c_str());
+ m_serviceState = SHUTDOWN_ALREADY;
+ break;
+
+ case SHUTDOWN_ALREADY:
+ break;
+
+ default:
+ break;
+ }
+}
+
+
+void DefaultMQProducerImpl::initTransactionEnv()
+{
+ //TODO
+}
+
+void DefaultMQProducerImpl::destroyTransactionEnv()
+{
+ //TODO
+}
+
+bool DefaultMQProducerImpl::hasHook()
+{
+ return !m_hookList.empty();
+}
+
+void DefaultMQProducerImpl::registerHook(SendMessageHook* pHook)
+{
+ m_hookList.push_back(pHook);
+}
+
+void DefaultMQProducerImpl::executeHookBefore(const SendMessageContext& context)
+{
+ std::list<SendMessageHook*>::iterator it = m_hookList.begin();
+
+ for (; it != m_hookList.end(); it++)
+ {
+ try
+ {
+ (*it)->sendMessageBefore(context);
+ }
+ catch (...)
+ {
+ RMQ_WARN("sendMessageBefore exception");
+ }
+ }
+}
+
+void DefaultMQProducerImpl::executeHookAfter(const SendMessageContext& context)
+{
+ std::list<SendMessageHook*>::iterator it = m_hookList.begin();
+
+ for (; it != m_hookList.end(); it++)
+ {
+ try
+ {
+ (*it)->sendMessageAfter(context);
+ }
+ catch (...)
+ {
+ RMQ_WARN("sendMessageAfter exception");
+ }
+ }
+}
+
+
+std::set<std::string> DefaultMQProducerImpl::getPublishTopicList()
+{
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ std::set<std::string> toplist;
+ std::map<std::string, TopicPublishInfo>::iterator it =
+ m_topicPublishInfoTable.begin();
+ for (; it != m_topicPublishInfoTable.end(); it++)
+ {
+ toplist.insert(it->first);
+ }
+
+ return toplist;
+}
+
+bool DefaultMQProducerImpl::isPublishTopicNeedUpdate(const std::string& topic)
+{
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ std::map<std::string, TopicPublishInfo>::iterator it =
+ m_topicPublishInfoTable.find(topic);
+ if (it != m_topicPublishInfoTable.end())
+ {
+ return !it->second.ok();
+ }
+
+ return true;
+}
+
+void DefaultMQProducerImpl::checkTransactionState(const std::string& addr, //
+ const MessageExt& msg, //
+ const CheckTransactionStateRequestHeader& checkRequestHeader)
+{
+ //TODO
+}
+
+void DefaultMQProducerImpl::updateTopicPublishInfo(const std::string& topic,
+ TopicPublishInfo& info)
+{
+ {
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ std::map<std::string, TopicPublishInfo>::iterator it =
+ m_topicPublishInfoTable.find(topic);
+ if (it != m_topicPublishInfoTable.end())
+ {
+ info.getSendWhichQueue() = it->second.getSendWhichQueue();
+ RMQ_INFO("updateTopicPublishInfo prev is not null, %s", it->second.toString().c_str());
+ }
+ m_topicPublishInfoTable[topic] = info;
+ }
+}
+
+void DefaultMQProducerImpl::createTopic(const std::string& key,
+ const std::string& newTopic, int queueNum)
+{
+ makeSureStateOK();
+ Validators::checkTopic(newTopic);
+
+ m_pMQClientFactory->getMQAdminImpl()->createTopic(key, newTopic, queueNum);
+}
+
+std::vector<MessageQueue>* DefaultMQProducerImpl::fetchPublishMessageQueues(
+ const std::string& topic)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->fetchPublishMessageQueues(topic);
+}
+
+long long DefaultMQProducerImpl::searchOffset(const MessageQueue& mq,
+ long long timestamp)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->searchOffset(mq, timestamp);
+}
+
+long long DefaultMQProducerImpl::maxOffset(const MessageQueue& mq)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->maxOffset(mq);
+}
+
+long long DefaultMQProducerImpl::minOffset(const MessageQueue& mq)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->minOffset(mq);
+}
+
+long long DefaultMQProducerImpl::earliestMsgStoreTime(const MessageQueue& mq)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->earliestMsgStoreTime(mq);
+}
+
+MessageExt* DefaultMQProducerImpl::viewMessage(const std::string& msgId)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->viewMessage(msgId);
+}
+
+QueryResult DefaultMQProducerImpl::queryMessage(const std::string& topic,
+ const std::string& key, int maxNum, long long begin, long long end)
+{
+ makeSureStateOK();
+ return m_pMQClientFactory->getMQAdminImpl()->queryMessage(topic, key, maxNum,
+ begin, end);
+}
+
+
+/**
+ * DEFAULT ASYNC -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback)
+{
+ send(msg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+void DefaultMQProducerImpl::send(Message& msg, SendCallback* pSendCallback, int timeout)
+{
+ try
+ {
+ sendDefaultImpl(msg, ASYNC, pSendCallback, timeout);
+ }
+ catch (MQBrokerException& e)
+ {
+ THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+ }
+}
+
+
+/**
+ * DEFAULT ONEWAY -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::sendOneway(Message& msg)
+{
+ try
+ {
+ sendDefaultImpl(msg, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout());
+ }
+ catch (MQBrokerException& e)
+ {
+ THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+ }
+}
+
+
+/**
+ * KERNEL SYNC -------------------------------------------------------
+ */
+SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq)
+{
+ return send(msg, mq, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+SendResult DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq, int timeout)
+{
+ makeSureStateOK();
+ Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+ if (msg.getTopic() != mq.getTopic())
+ {
+ THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+ }
+
+ return sendKernelImpl(msg, mq, SYNC, NULL, timeout);
+}
+
+
+/**
+ * KERNEL ASYNC -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq,
+ SendCallback* pSendCallback)
+{
+ return send(msg, mq, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+void DefaultMQProducerImpl::send(Message& msg, MessageQueue& mq,
+ SendCallback* pSendCallback, int timeout)
+{
+ makeSureStateOK();
+ Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+ if (msg.getTopic() != mq.getTopic())
+ {
+ THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+ }
+
+ try
+ {
+ sendKernelImpl(msg, mq, ASYNC, pSendCallback, timeout);
+ }
+ catch (MQBrokerException& e)
+ {
+ THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+ }
+}
+
+/**
+ * KERNEL ONEWAY -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::sendOneway(Message& msg, MessageQueue& mq)
+{
+ makeSureStateOK();
+ Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+ if (msg.getTopic() != mq.getTopic())
+ {
+ THROW_MQEXCEPTION(MQClientException, "message's topic not equal mq's topic", -1);
+ }
+
+ try
+ {
+ sendKernelImpl(msg, mq, ONEWAY, NULL, m_pDefaultMQProducer->getSendMsgTimeout());
+ }
+ catch (MQBrokerException& e)
+ {
+ THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+ }
+}
+
+
+/**
+ * SELECT SYNC -------------------------------------------------------
+ */
+SendResult DefaultMQProducerImpl::send(Message& msg,
+ MessageQueueSelector* pSelector, void* arg)
+{
+ return send(msg, pSelector, arg, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+SendResult DefaultMQProducerImpl::send(Message& msg,
+ MessageQueueSelector* pSelector, void* arg, int timeout)
+{
+ return sendSelectImpl(msg, pSelector, arg, SYNC, NULL, timeout);
+}
+
+
+/**
+ * SELECT ASYNC -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::send(Message& msg,
+ MessageQueueSelector* pSelector,
+ void* arg,
+ SendCallback* pSendCallback)
+{
+ return send(msg, pSelector, arg, pSendCallback, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+void DefaultMQProducerImpl::send(Message& msg,
+ MessageQueueSelector* pSelector,
+ void* arg,
+ SendCallback* pSendCallback,
+ int timeout)
+{
+ try
+ {
+ sendSelectImpl(msg, pSelector, arg, ASYNC, pSendCallback, timeout);
+ }
+ catch (MQBrokerException& e)
+ {
+ THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+ }
+}
+
+
+/**
+ * SELECT ONEWAY -------------------------------------------------------
+ */
+void DefaultMQProducerImpl::sendOneway(Message& msg,
+ MessageQueueSelector* pSelector, void* arg)
+{
+ try
+ {
+ sendSelectImpl(msg, pSelector, arg, ONEWAY, NULL,
+ m_pDefaultMQProducer->getSendMsgTimeout());
+ }
+ catch (MQBrokerException& e)
+ {
+ THROW_MQEXCEPTION(MQClientException, std::string("unknow exception: ") + e.what(), -1);
+ }
+}
+
+
+/*
+ * Send with Transaction
+ */
+TransactionSendResult DefaultMQProducerImpl::sendMessageInTransaction(
+ Message& msg,
+ LocalTransactionExecuter* tranExecuter, void* arg)
+{
+ //TODO
+ TransactionSendResult result;
+ return result;
+}
+
+void DefaultMQProducerImpl::endTransaction(//
+ SendResult sendResult, //
+ LocalTransactionState localTransactionState, //
+ MQClientException localException)
+{
+ //TODO
+}
+
+/**
+ * DEFAULT SYNC -------------------------------------------------------
+ */
+SendResult DefaultMQProducerImpl::send(Message& msg)
+{
+ return send(msg, m_pDefaultMQProducer->getSendMsgTimeout());
+}
+SendResult DefaultMQProducerImpl::send(Message& msg, int timeout)
+{
+ return sendDefaultImpl(msg, SYNC, NULL, timeout);
+}
+
+
+std::map<std::string, TopicPublishInfo> DefaultMQProducerImpl::getTopicPublishInfoTable()
+{
+ return m_topicPublishInfoTable;
+}
+
+MQClientFactory* DefaultMQProducerImpl::getMQClientFactory()
+{
+ return m_pMQClientFactory;
+}
+
+int DefaultMQProducerImpl::getZipCompressLevel()
+{
+ return m_zipCompressLevel;
+}
+
+void DefaultMQProducerImpl::setZipCompressLevel(int zipCompressLevel)
+{
+ m_zipCompressLevel = zipCompressLevel;
+}
+
+ServiceState DefaultMQProducerImpl::getServiceState() {
+ return m_serviceState;
+}
+
+
+void DefaultMQProducerImpl::setServiceState(ServiceState serviceState) {
+ m_serviceState = serviceState;
+}
+
+
+SendResult DefaultMQProducerImpl::sendDefaultImpl(Message& msg,
+ CommunicationMode communicationMode,
+ SendCallback* pSendCallback,
+ int timeout)
+{
+ makeSureStateOK();
+ Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+ long long maxTimeout = m_pDefaultMQProducer->getSendMsgTimeout() + 1000;
+ long long beginTimestamp = KPRUtil::GetCurrentTimeMillis();
+ long long endTimestamp = beginTimestamp;
+ TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic());
+ SendResult sendResult;
+
+ if (topicPublishInfo.ok())
+ {
+ MessageQueue* mq = NULL;
+
+ int times = 0;
+ int timesTotal = 1 + m_pDefaultMQProducer->getRetryTimesWhenSendFailed();
+ std::vector<std::string> brokersSent;
+ for (; times < timesTotal && int(endTimestamp - beginTimestamp) < maxTimeout; times++)
+ {
+ std::string lastBrokerName = (NULL == mq) ? "" : mq->getBrokerName();
+ MessageQueue* tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
+
+ if (tmpmq != NULL)
+ {
+ mq = tmpmq;
+ brokersSent.push_back(mq->getBrokerName());
+
+ try
+ {
+ sendResult = sendKernelImpl(msg, *mq, communicationMode, pSendCallback, timeout);
+ endTimestamp = KPRUtil::GetCurrentTimeMillis();
+
+ switch (communicationMode)
+ {
+ case ASYNC:
+ return sendResult;
+
+ case ONEWAY:
+ return sendResult;
+
+ case SYNC:
+ if (sendResult.getSendStatus() != SEND_OK)
+ {
+ if (m_pDefaultMQProducer->isRetryAnotherBrokerWhenNotStoreOK())
+ {
+ continue;
+ }
+ }
+
+ return sendResult;
+
+ default:
+ break;
+ }
+ }
+ catch (RemotingException& e)
+ {
+ endTimestamp = KPRUtil::GetCurrentTimeMillis();
+ continue;
+ }
+ catch (MQClientException& e)
+ {
+ endTimestamp = KPRUtil::GetCurrentTimeMillis();
+ continue;
+ }
+ catch (MQBrokerException& e)
+ {
+ endTimestamp = KPRUtil::GetCurrentTimeMillis();
+
+ switch (e.GetError())
+ {
+ case TOPIC_NOT_EXIST_VALUE:
+ case SERVICE_NOT_AVAILABLE_VALUE:
+ case SYSTEM_ERROR_VALUE:
+ case NO_PERMISSION_VALUE:
+ case NO_BUYER_ID_VALUE:
+ case NOT_IN_CURRENT_UNIT_VALUE:
+ continue;
+ default:
+ if (sendResult.hasResult())
+ {
+ return sendResult;
+ }
+ throw;
+ }
+ }
+ catch (InterruptedException& e)
+ {
+ endTimestamp = KPRUtil::GetCurrentTimeMillis();
+ throw;
+ }
+ }
+ else
+ {
+ break;
+ }
+ } // end of for
+
+ std::string info = RocketMQUtil::str2fmt("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",
+ times, int(endTimestamp - beginTimestamp), msg.getTopic().c_str(), UtilAll::toString(brokersSent).c_str());
+ RMQ_WARN("%s", info.c_str());
+ THROW_MQEXCEPTION(MQClientException, info, -1);
+ return sendResult;
+ }
+
+ std::vector<std::string> nsList =
+ getMQClientFactory()->getMQClientAPIImpl()->getNameServerAddressList();
+ if (nsList.empty())
+ {
+ THROW_MQEXCEPTION(MQClientException, "No name server address, please set it", -1);
+ }
+
+ THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1);
+}
+
+SendResult DefaultMQProducerImpl::sendKernelImpl(Message& msg,
+ const MessageQueue& mq,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ int timeout)
+{
+ std::string brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ if (brokerAddr.empty())
+ {
+ tryToFindTopicPublishInfo(mq.getTopic());
+ brokerAddr = m_pMQClientFactory->findBrokerAddressInPublish(mq.getBrokerName());
+ }
+
+ SendMessageContext context;
+ if (!brokerAddr.empty())
+ {
+ try
+ {
+ int sysFlag = 0;
+
+ if (tryToCompressMessage(msg))
+ {
+ sysFlag |= MessageSysFlag::CompressedFlag;
+ }
+
+ std::string tranMsg = msg.getProperty(Message::PROPERTY_TRANSACTION_PREPARED);
+ if (!tranMsg.empty() && tranMsg == "true")
+ {
+ sysFlag |= MessageSysFlag::TransactionPreparedType;
+ }
+
+ // ִ��hook
+ if (hasHook())
+ {
+ context.producerGroup = (m_pDefaultMQProducer->getProducerGroup());
+ context.communicationMode = (communicationMode);
+ context.brokerAddr = (brokerAddr);
+ context.msg = (msg);
+ context.mq = (mq);
+ executeHookBefore(context);
+ }
+
+ SendMessageRequestHeader* requestHeader = new SendMessageRequestHeader();
+ requestHeader->producerGroup = (m_pDefaultMQProducer->getProducerGroup());
+ requestHeader->topic = (msg.getTopic());
+ requestHeader->defaultTopic = (m_pDefaultMQProducer->getCreateTopicKey());
+ requestHeader->defaultTopicQueueNums = (m_pDefaultMQProducer->getDefaultTopicQueueNums());
+ requestHeader->queueId = (mq.getQueueId());
+ requestHeader->sysFlag = (sysFlag);
+ requestHeader->bornTimestamp = (KPRUtil::GetCurrentTimeMillis());
+ requestHeader->flag = (msg.getFlag());
+ requestHeader->properties = (MessageDecoder::messageProperties2String(msg.getProperties()));
+ requestHeader->reconsumeTimes = 0;
+
+ if (requestHeader->topic.find(MixAll::RETRY_GROUP_TOPIC_PREFIX) == 0)
+ {
+ std::string reconsumeTimes = msg.getProperty(Message::PROPERTY_RECONSUME_TIME);
+ if (!reconsumeTimes.empty())
+ {
+ requestHeader->reconsumeTimes = int(UtilAll::str2ll(reconsumeTimes.c_str()));
+ msg.clearProperty(Message::PROPERTY_RECONSUME_TIME);
+ }
+
+ /*
+ 3.5.8 new features
+ std::string maxReconsumeTimes = msg.getProperty(Message::PROPERTY_MAX_RECONSUME_TIMES);
+ if (!maxReconsumeTimes.empty())
+ {
+ requestHeader->maxReconsumeTimes = int(UtilAll::str2ll(maxReconsumeTimes.c_str()));
+ msg.clearProperty(Message::PROPERTY_MAX_RECONSUME_TIMES);
+ }
+ */
+ }
+
+ SendResult sendResult = m_pMQClientFactory->getMQClientAPIImpl()->sendMessage(
+ brokerAddr,
+ mq.getBrokerName(),
+ msg,
+ requestHeader,
+ timeout,
+ communicationMode,
+ sendCallback
+ );
+
+ if (hasHook())
+ {
+ context.sendResult = (sendResult);
+ executeHookAfter(context);
+ }
+
+ return sendResult;
+ }
+ catch (RemotingException& e)
+ {
+ if (hasHook())
+ {
+ context.pException = (&e);
+ executeHookAfter(context);
+ }
+ RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
+ throw;
+ }
+ catch (MQBrokerException& e)
+ {
+ if (hasHook())
+ {
+ context.pException = (&e);
+ executeHookAfter(context);
+ }
+ RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
+ throw;
+ }
+ catch (InterruptedException& e)
+ {
+ if (hasHook())
+ {
+ context.pException = (&e);
+ executeHookAfter(context);
+ }
+ RMQ_WARN("sendKernelImpl exception: %s, msg: %s", e.what(), msg.toString().c_str());
+ throw;
+ }
+ }
+
+ THROW_MQEXCEPTION(MQClientException, std::string("The broker[") + mq.getBrokerName() + "] not exist", -1);
+}
+
+SendResult DefaultMQProducerImpl::sendSelectImpl(Message& msg,
+ MessageQueueSelector* selector,
+ void* pArg,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ int timeout)
+{
+ makeSureStateOK();
+ Validators::checkMessage(msg, m_pDefaultMQProducer);
+
+ SendResult result;
+ TopicPublishInfo& topicPublishInfo = tryToFindTopicPublishInfo(msg.getTopic());
+ SendResult sendResult;
+
+ if (topicPublishInfo.ok())
+ {
+ MessageQueue* mq = NULL;
+
+ try
+ {
+ mq = selector->select(topicPublishInfo.getMessageQueueList(), msg, pArg);
+ }
+ catch (std::exception& e)
+ {
+ THROW_MQEXCEPTION(MQClientException,
+ std::string("select message queue throwed exception, ") + e.what(), -1);
+ }
+ catch (...)
+ {
+ THROW_MQEXCEPTION(MQClientException, "select message queue throwed exception, ", -1);
+ }
+
+ if (mq != NULL)
+ {
+ return sendKernelImpl(msg, *mq, communicationMode, sendCallback, timeout);
+ }
+ else
+ {
+ THROW_MQEXCEPTION(MQClientException, "select message queue return null", -1);
+ }
+ }
+
+ THROW_MQEXCEPTION(MQClientException, std::string("No route info of this topic, ") + msg.getTopic(), -1);
+}
+
+void DefaultMQProducerImpl::makeSureStateOK()
+{
+ if (m_serviceState != RUNNING)
+ {
+ THROW_MQEXCEPTION(MQClientException, "The producer service state not OK, ", -1);
+ }
+}
+
+void DefaultMQProducerImpl::checkConfig()
+{
+ Validators::checkGroup(m_pDefaultMQProducer->getProducerGroup());
+
+ if (m_pDefaultMQProducer->getProducerGroup().empty())
+ {
+ THROW_MQEXCEPTION(MQClientException, "producerGroup is null", -1);
+ }
+
+ if (m_pDefaultMQProducer->getProducerGroup() == MixAll::DEFAULT_PRODUCER_GROUP)
+ {
+ THROW_MQEXCEPTION(MQClientException,
+ std::string("producerGroup can not equal [") + MixAll::DEFAULT_PRODUCER_GROUP + "], please specify another one",
+ -1);
+ }
+}
+
+TopicPublishInfo& DefaultMQProducerImpl::tryToFindTopicPublishInfo(
+ const std::string& topic)
+{
+ std::map<std::string, TopicPublishInfo>::iterator it;
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ it = m_topicPublishInfoTable.find(topic);
+ }
+
+ if (it == m_topicPublishInfoTable.end() || !it->second.ok())
+ {
+ {
+ kpr::ScopedWLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ m_topicPublishInfoTable[topic] = TopicPublishInfo();
+ }
+
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic);
+
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ it = m_topicPublishInfoTable.find(topic);
+ }
+ }
+
+ if (it != m_topicPublishInfoTable.end()
+ && (it->second.ok() || it->second.isHaveTopicRouterInfo()))
+ {
+ return (it->second);
+ }
+ else
+ {
+ m_pMQClientFactory->updateTopicRouteInfoFromNameServer(topic, true,
+ m_pDefaultMQProducer);
+ {
+ kpr::ScopedRLock<kpr::RWMutex> lock(m_topicPublishInfoTableLock);
+ it = m_topicPublishInfoTable.find(topic);
+ }
+ return (it->second);
+ }
+}
+
+bool DefaultMQProducerImpl::tryToCompressMessage(Message& msg)
+{
+ if (msg.getBodyLen() >= m_pDefaultMQProducer->getCompressMsgBodyOverHowmuch())
+ {
+ if (msg.tryToCompress(m_pDefaultMQProducer->getCompressLevel()))
+ {
+ return true;
+ }
+ }
+
+ return false;
+}
+
+TransactionCheckListener* DefaultMQProducerImpl::checkListener()
+{
+ return NULL;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
new file mode 100755
index 0000000..3df914c
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/DefaultMQProducerImpl.h
@@ -0,0 +1,205 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __DEFAULTMQPRODUCERIMPL_H__
+#define __DEFAULTMQPRODUCERIMPL_H__
+
+#include <list>
+#include <vector>
+#include "MQProducerInner.h"
+#include "QueryResult.h"
+#include "ServiceState.h"
+#include "CommunicationMode.h"
+#include "SendResult.h"
+#include "MQClientException.h"
+#include "Mutex.h"
+#include "ScopedLock.h"
+
+
+namespace rmq
+{
+ class DefaultMQProducer;
+ class SendMessageHook;
+ class SendMessageContext;
+ class MessageQueue;
+ class MessageExt;
+ class SendCallback;
+ class MessageQueueSelector;
+ class MQClientFactory;
+ class MQClientException;
+ class RemotingException;
+ class MQBrokerException;
+ class InterruptedException;
+ class LocalTransactionExecuter;
+
+
+ class DefaultMQProducerImpl : public MQProducerInner
+ {
+ public:
+ DefaultMQProducerImpl(DefaultMQProducer* pDefaultMQProducer);
+ ~DefaultMQProducerImpl();
+ void initTransactionEnv();
+ void destroyTransactionEnv();
+
+ bool hasHook();
+ void registerHook(SendMessageHook* pHook);
+ void executeHookBefore(const SendMessageContext& context);
+ void executeHookAfter(const SendMessageContext& context);
+
+ void start();
+ void start(bool startFactory);
+ void shutdown();
+ void shutdown(bool shutdownFactory);
+
+ std::set<std::string> getPublishTopicList();
+ bool isPublishTopicNeedUpdate(const std::string& topic);
+
+ void checkTransactionState(const std::string& addr,
+ const MessageExt& msg,
+ const CheckTransactionStateRequestHeader& checkRequestHeader);
+
+ void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info);
+ virtual TransactionCheckListener* checkListener();
+
+ void createTopic(const std::string& key, const std::string& newTopic, int queueNum);
+ std::vector<MessageQueue>* fetchPublishMessageQueues(const std::string& topic);
+
+ long long searchOffset(const MessageQueue& mq, long long timestamp);
+ long long maxOffset(const MessageQueue& mq);
+ long long minOffset(const MessageQueue& mq);
+
+ long long earliestMsgStoreTime(const MessageQueue& mq);
+
+ MessageExt* viewMessage(const std::string& msgId);
+ QueryResult queryMessage(const std::string& topic,
+ const std::string& key,
+ int maxNum,
+ long long begin,
+ long long end);
+
+ /**
+ * DEFAULT ASYNC -------------------------------------------------------
+ */
+ void send(Message& msg, SendCallback* sendCallback);
+ void send(Message& msg, SendCallback* sendCallback, int timeout);
+
+ /**
+ * DEFAULT ONEWAY -------------------------------------------------------
+ */
+ void sendOneway(Message& msg);
+
+ /**
+ * KERNEL SYNC -------------------------------------------------------
+ */
+ SendResult send(Message& msg, MessageQueue& mq);
+ SendResult send(Message& msg, MessageQueue& mq, int timeout);
+
+ /**
+ * KERNEL ASYNC -------------------------------------------------------
+ */
+ void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback);
+ void send(Message& msg, MessageQueue& mq, SendCallback* sendCallback, int timeout);
+
+ /**
+ * KERNEL ONEWAY -------------------------------------------------------
+ */
+ void sendOneway(Message& msg, MessageQueue& mq);
+
+ /**
+ * SELECT SYNC -------------------------------------------------------
+ */
+ SendResult send(Message& msg, MessageQueueSelector* selector, void* arg);
+ SendResult send(Message& msg, MessageQueueSelector* selector, void* arg, int timeout);
+
+ /**
+ * SELECT ASYNC -------------------------------------------------------
+ */
+ void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback);
+ void send(Message& msg, MessageQueueSelector* selector, void* arg, SendCallback* sendCallback, int timeout);
+
+ /**
+ * SELECT ONEWAY -------------------------------------------------------
+ */
+ void sendOneway(Message& msg, MessageQueueSelector* selector, void* arg);
+
+ /**
+ * SEND with Transaction
+ */
+ TransactionSendResult sendMessageInTransaction(Message& msg, LocalTransactionExecuter* tranExecuter, void* arg);
+
+ /**
+ * DEFAULT SYNC -------------------------------------------------------
+ */
+ SendResult send(Message& msg);
+ SendResult send(Message& msg, int timeout);
+
+ std::map<std::string, TopicPublishInfo> getTopicPublishInfoTable();
+
+ MQClientFactory* getMQClientFactory();
+
+ int getZipCompressLevel();
+ void setZipCompressLevel(int zipCompressLevel);
+
+ ServiceState getServiceState();
+ void setServiceState(ServiceState serviceState);
+
+ private:
+ SendResult sendSelectImpl(Message& msg,
+ MessageQueueSelector* selector,
+ void* pArg,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ int timeout);
+
+ SendResult sendDefaultImpl(Message& msg,
+ CommunicationMode communicationMode,
+ SendCallback* pSendCallback,
+ int timeout);
+
+ SendResult sendKernelImpl(Message& msg,
+ const MessageQueue& mq,
+ CommunicationMode communicationMode,
+ SendCallback* pSendCallback,
+ int timeout);
+
+ void endTransaction(SendResult sendResult,
+ LocalTransactionState localTransactionState,
+ MQClientException localException);
+
+ void makeSureStateOK();
+ void checkConfig();
+
+ TopicPublishInfo& tryToFindTopicPublishInfo(const std::string& topic) ;
+ bool tryToCompressMessage(Message& msg);
+
+ protected:
+ //TODO transaction imp
+
+ private:
+ int m_zipCompressLevel;// message compress level, default is 5
+
+ DefaultMQProducer* m_pDefaultMQProducer;
+
+ std::map<std::string, TopicPublishInfo> m_topicPublishInfoTable;
+ kpr::RWMutex m_topicPublishInfoTableLock;
+
+ ServiceState m_serviceState;
+ MQClientFactory* m_pMQClientFactory;
+ std::list<SendMessageHook*> m_hookList;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
new file mode 100755
index 0000000..a124884
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/LocalTransactionExecuter.h
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __LOCALTRANSACTIONEXECUTER_H__
+#define __LOCALTRANSACTIONEXECUTER_H__
+
+#include "SendResult.h"
+
+namespace rmq
+{
+ class LocalTransactionExecuter
+ {
+ public:
+ virtual~LocalTransactionExecuter() {}
+ virtual LocalTransactionState executeLocalTransactionBranch(Message& msg, void* arg) = 0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/MQProducerInner.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/MQProducerInner.h b/rocketmq-client4cpp/src/producer/MQProducerInner.h
new file mode 100755
index 0000000..56194dc
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/MQProducerInner.h
@@ -0,0 +1,44 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQPRODUCERINNER_H__
+#define __MQPRODUCERINNER_H__
+
+#include <string>
+#include <set>
+
+namespace rmq
+{
+ class TransactionCheckListener;
+ class MessageExt;
+ class CheckTransactionStateRequestHeader;
+ class TopicPublishInfo;
+
+ class MQProducerInner
+ {
+ public:
+ virtual ~MQProducerInner() {}
+ virtual std::set<std::string> getPublishTopicList() = 0;
+ virtual bool isPublishTopicNeedUpdate(const std::string& topic) = 0;
+ virtual TransactionCheckListener* checkListener() = 0;
+ virtual void checkTransactionState(const std::string& addr, //
+ const MessageExt& msg, //
+ const CheckTransactionStateRequestHeader& checkRequestHeader) = 0;
+ virtual void updateTopicPublishInfo(const std::string& topic, TopicPublishInfo& info) = 0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/MessageQueueSelector.h b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
new file mode 100755
index 0000000..6d5ac48
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/MessageQueueSelector.h
@@ -0,0 +1,96 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MESSAGEQUEUESELECTOR_H__
+#define __MESSAGEQUEUESELECTOR_H__
+
+#include <stdlib.h>
+#include <stdio.h>
+#include <time.h>
+#include <math.h>
+#include <set>
+#include <string>
+#include <vector>
+
+#include "MessageQueue.h"
+#include "UtilAll.h"
+
+namespace rmq
+{
+ class Message;
+
+ class MessageQueueSelector
+ {
+ public:
+ virtual ~MessageQueueSelector() {}
+ virtual MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg) = 0;
+ };
+
+ class SelectMessageQueueByRandoom : public MessageQueueSelector
+ {
+ public:
+ MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
+ {
+ srand((unsigned)time(NULL));
+ int value = rand();
+ value = value % mqs.size();
+ return &(mqs.at(value));
+ }
+ };
+
+ class SelectMessageQueueByHash : public MessageQueueSelector
+ {
+ public:
+ MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
+ {
+ std::string* sArg = (std::string*)arg;
+ int value = UtilAll::hashCode(sArg->c_str(), sArg->size());
+ if (value < 0)
+ {
+ value = abs(value);
+ }
+
+ value = value % mqs.size();
+ return &(mqs.at(value));
+ }
+ };
+
+
+ class SelectMessageQueueByMachineRoom : public MessageQueueSelector
+ {
+ public:
+ MessageQueue* select(std::vector<MessageQueue>& mqs, const Message& msg, void* arg)
+ {
+ // TODO Auto-generated method stub
+ return NULL;
+ }
+
+ std::set<std::string> getConsumeridcs()
+ {
+ return m_consumeridcs;
+ }
+
+ void setConsumeridcs(const std::set<std::string>& consumeridcs)
+ {
+ m_consumeridcs = consumeridcs;
+ }
+
+ private:
+ std::set<std::string> m_consumeridcs;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
new file mode 100755
index 0000000..573db95
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.cpp
@@ -0,0 +1,101 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ProducerInvokeCallback.h"
+#include "ResponseFuture.h"
+#include "SendResult.h"
+#include "MQClientAPIImpl.h"
+#include "SendCallback.h"
+#include "MQClientException.h"
+#include "RemotingCommand.h"
+
+namespace rmq
+{
+
+ProducerInvokeCallback::ProducerInvokeCallback(SendCallback* pSendCallBack,
+ MQClientAPIImpl* pMQClientAPIImpl,
+ const std::string& topic,
+ const std::string& brokerName)
+ : m_pSendCallBack(pSendCallBack),
+ m_pMQClientAPIImpl(pMQClientAPIImpl),
+ m_topic(topic),
+ m_brokerName(brokerName)
+{
+}
+
+ProducerInvokeCallback::~ProducerInvokeCallback()
+{
+ if (m_pSendCallBack)
+ {
+ delete m_pSendCallBack;
+ m_pSendCallBack = NULL;
+ }
+}
+
+void ProducerInvokeCallback::operationComplete(ResponseFuturePtr pResponseFuture)
+{
+ if (m_pSendCallBack == NULL)
+ {
+ delete this;
+ return;
+ }
+
+ RemotingCommand* response = pResponseFuture->getResponseCommand();
+ if (response != NULL)
+ {
+ try
+ {
+ SendResult* sendResult =
+ m_pMQClientAPIImpl->processSendResponse(m_brokerName, m_topic, response);
+
+ assert(sendResult != NULL);
+ m_pSendCallBack->onSuccess(*sendResult);
+
+ delete sendResult;
+ }
+ catch (MQException& e)
+ {
+ m_pSendCallBack->onException(e);
+ }
+
+ delete response;
+ }
+ else
+ {
+ if (!pResponseFuture->isSendRequestOK())
+ {
+ std::string msg = "send request failed";
+ MQClientException e(msg, -1, __FILE__, __LINE__);
+ m_pSendCallBack->onException(e);
+ }
+ else if (pResponseFuture->isTimeout())
+ {
+ std::string msg = RocketMQUtil::str2fmt("wait response timeout %lld ms",
+ pResponseFuture->getTimeoutMillis());
+ MQClientException e(msg, -1, __FILE__, __LINE__);
+ m_pSendCallBack->onException(e);
+ }
+ else
+ {
+ std::string msg = "unknow reseaon";
+ MQClientException e(msg, -1, __FILE__, __LINE__);
+ m_pSendCallBack->onException(e);
+ }
+ }
+
+ delete this;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
new file mode 100755
index 0000000..d2c9825
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/ProducerInvokeCallback.h
@@ -0,0 +1,46 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __PRODUCERINVOKECALLBACK_H__
+#define __PRODUCERINVOKECALLBACK_H__
+
+#include <string>
+#include "InvokeCallback.h"
+
+namespace rmq
+{
+ class MQClientAPIImpl;
+ class SendCallback;
+
+ class ProducerInvokeCallback : public InvokeCallback
+ {
+ public:
+ ProducerInvokeCallback(SendCallback* pSendCallBack,
+ MQClientAPIImpl* pMQClientAPIImpl,
+ const std::string& topic,
+ const std::string& brokerName);
+ virtual ~ProducerInvokeCallback();
+ virtual void operationComplete(ResponseFuturePtr pResponseFuture);
+
+ private:
+ SendCallback* m_pSendCallBack;
+ MQClientAPIImpl* m_pMQClientAPIImpl;
+ std::string m_topic;
+ std::string m_brokerName;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TopicPublishInfo.h b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
new file mode 100755
index 0000000..0d85b5f
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/TopicPublishInfo.h
@@ -0,0 +1,141 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __TOPICPUBLISHINFO_H__
+#define __TOPICPUBLISHINFO_H__
+
+#include <list>
+#include <vector>
+#include <string>
+#include <sstream>
+#include <math.h>
+#include <stdlib.h>
+
+#include "RocketMQClient.h"
+#include "RefHandle.h"
+#include "MessageQueue.h"
+#include "AtomicValue.h"
+#include "UtilAll.h"
+
+
+namespace rmq
+{
+ class TopicPublishInfo : public kpr::RefCount
+ {
+ public:
+ TopicPublishInfo()
+ {
+ m_orderTopic = false;
+ m_haveTopicRouterInfo = false;
+ }
+
+ ~TopicPublishInfo()
+ {
+ m_messageQueueList.clear();
+ }
+
+ bool isOrderTopic()
+ {
+ return m_orderTopic;
+ }
+
+ bool ok()
+ {
+ return !m_messageQueueList.empty();
+ }
+
+ void setOrderTopic(bool orderTopic)
+ {
+ m_orderTopic = orderTopic;
+ }
+
+ std::vector<MessageQueue>& getMessageQueueList()
+ {
+ return m_messageQueueList;
+ }
+
+ void setMessageQueueList(const std::vector<MessageQueue>& messageQueueList)
+ {
+ m_messageQueueList = messageQueueList;
+ }
+
+ kpr::AtomicInteger& getSendWhichQueue()
+ {
+ return m_sendWhichQueue;
+ }
+
+ void setSendWhichQueue(kpr::AtomicInteger& sendWhichQueue)
+ {
+ m_sendWhichQueue = sendWhichQueue;
+ }
+
+ bool isHaveTopicRouterInfo()
+ {
+ return m_haveTopicRouterInfo;
+ }
+
+
+ void setHaveTopicRouterInfo(bool haveTopicRouterInfo)
+ {
+ m_haveTopicRouterInfo = haveTopicRouterInfo;
+ }
+
+ MessageQueue* selectOneMessageQueue(const std::string lastBrokerName)
+ {
+ if (!lastBrokerName.empty())
+ {
+ int index = m_sendWhichQueue++;
+ for (size_t i = 0; i < m_messageQueueList.size(); i++)
+ {
+ int pos = abs(index++) % m_messageQueueList.size();
+ MessageQueue& mq = m_messageQueueList.at(pos);
+ if (mq.getBrokerName() != lastBrokerName)
+ {
+ return &mq;
+ }
+ }
+
+ return NULL;
+ }
+ else
+ {
+ int index = m_sendWhichQueue++;
+ int pos = abs(index) % m_messageQueueList.size();
+ return &(m_messageQueueList.at(pos));
+ }
+ }
+
+ std::string toString() const
+ {
+ std::stringstream ss;
+ ss << "{orderTopic=" << m_orderTopic
+ << ",messageQueueList=" << UtilAll::toString(m_messageQueueList)
+ << ",sendWhichQueue=" << m_sendWhichQueue
+ << ",haveTopicRouterInfo=" << m_haveTopicRouterInfo
+ << "}";
+ return ss.str();
+ }
+
+ private:
+ bool m_orderTopic;
+ std::vector<MessageQueue> m_messageQueueList;
+ kpr::AtomicInteger m_sendWhichQueue;
+ bool m_haveTopicRouterInfo;
+ };
+ typedef kpr::RefHandleT<TopicPublishInfo> TopicPublishInfoPtr;
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TransactionCheckListener.h b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
new file mode 100755
index 0000000..8955742
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/TransactionCheckListener.h
@@ -0,0 +1,31 @@
+/**
+ * Copyright (C) 2013 kangliqiang ,kangliq@163.com
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef __TRANSACTIONCHECKLISTENER_H__
+#define __TRANSACTIONCHECKLISTENER_H__
+
+#include "SendResult.h"
+
+namespace rmq
+{
+ class TransactionCheckListener
+ {
+ public:
+ virtual ~TransactionCheckListener() {}
+ virtual LocalTransactionState checkLocalTransactionState(MessageExt* pMsg) = 0;
+ };
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/producer/TransactionMQProducer.h b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
new file mode 100755
index 0000000..bee11a5
--- /dev/null
+++ b/rocketmq-client4cpp/src/producer/TransactionMQProducer.h
@@ -0,0 +1,118 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#ifndef __TRANSACTIONMQPRODUCER_H__
+#define __TRANSACTIONMQPRODUCER_H__
+
+#include "DefaultMQProducer.h"
+#include "DefaultMQProducerImpl.h"
+#include "MQClientException.h"
+
+namespace rmq
+{
+ class TransactionMQProducer : public DefaultMQProducer
+ {
+ public:
+ TransactionMQProducer()
+ : m_pTransactionCheckListener(NULL),
+ m_checkThreadPoolMinSize(1),
+ m_checkThreadPoolMaxSize(1),
+ m_checkRequestHoldMax(2000)
+ {
+
+ }
+
+ TransactionMQProducer(const std::string& producerGroup)
+ : DefaultMQProducer(producerGroup),
+ m_pTransactionCheckListener(NULL),
+ m_checkThreadPoolMinSize(1),
+ m_checkThreadPoolMaxSize(1),
+ m_checkRequestHoldMax(2000)
+ {
+
+ }
+
+ void start()
+ {
+ m_pDefaultMQProducerImpl->initTransactionEnv();
+ DefaultMQProducer::start();
+ }
+
+ void shutdown()
+ {
+ DefaultMQProducer::shutdown();
+ m_pDefaultMQProducerImpl->destroyTransactionEnv();
+ }
+
+ TransactionSendResult sendMessageInTransaction(const Message& msg,
+ LocalTransactionExecuter* tranExecuter, void* arg)
+ {
+ if (NULL == m_pTransactionCheckListener)
+ {
+ THROW_MQEXCEPTION("localTransactionBranchCheckListener is null", -1);
+ }
+
+ return m_pDefaultMQProducerImpl.sendMessageInTransaction(msg, tranExecuter, arg);
+ }
+
+ TransactionCheckListener* getTransactionCheckListener()
+ {
+ return m_pTransactionCheckListener;
+ }
+
+ void setTransactionCheckListener(TransactionCheckListener* pTransactionCheckListener)
+ {
+ m_pTransactionCheckListener = pTransactionCheckListener;
+ }
+
+ int getCheckThreadPoolMinSize()
+ {
+ return m_checkThreadPoolMinSize;
+ }
+
+ void setCheckThreadPoolMinSize(int checkThreadPoolMinSize)
+ {
+ m_checkThreadPoolMinSize = checkThreadPoolMinSize;
+ }
+
+ int getCheckThreadPoolMaxSize()
+ {
+ return m_checkThreadPoolMaxSize;
+ }
+
+ void setCheckThreadPoolMaxSize(int checkThreadPoolMaxSize)
+ {
+ m_checkThreadPoolMaxSize = checkThreadPoolMaxSize;
+ }
+
+ int getCheckRequestHoldMax()
+ {
+ return m_checkRequestHoldMax;
+ }
+
+ void setCheckRequestHoldMax(int checkRequestHoldMax)
+ {
+ m_checkRequestHoldMax = checkRequestHoldMax;
+ }
+
+ private:
+ TransactionCheckListener* m_pTransactionCheckListener;
+ int m_checkThreadPoolMinSize;
+ int m_checkThreadPoolMaxSize;
+ int m_checkRequestHoldMax;
+ };
+}
+
+#endif
|