http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientAPIImpl.cpp
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.cpp b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp
new file mode 100755
index 0000000..fa5a2b9
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientAPIImpl.cpp
@@ -0,0 +1,1323 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include <assert.h>
+
+#include "MQClientAPIImpl.h"
+#include "MQClientException.h"
+#include "SocketUtil.h"
+#include "UtilAll.h"
+#include "TcpRemotingClient.h"
+#include "MQProtos.h"
+#include "PullResultExt.h"
+#include "ConsumerInvokeCallback.h"
+#include "NamesrvUtil.h"
+#include "VirtualEnvUtil.h"
+#include "ClientRemotingProcessor.h"
+#include "CommandCustomHeader.h"
+#include "TopicList.h"
+#include "ProducerInvokeCallback.h"
+#include "MessageDecoder.h"
+#include "MessageSysFlag.h"
+#include "GetConsumerListByGroupResponseBody.h"
+
+
+namespace rmq
+{
+
+
+MQClientAPIImpl::MQClientAPIImpl(ClientConfig& clientConfig,
+ const RemoteClientConfig& remoteClientConfig,
+ ClientRemotingProcessor* pClientRemotingProcessor)
+ : m_pClientRemotingProcessor(pClientRemotingProcessor)
+{
+ m_pRemotingClient = new TcpRemotingClient(remoteClientConfig);
+
+ m_pRemotingClient->registerProcessor(CHECK_TRANSACTION_STATE_VALUE, m_pClientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(NOTIFY_CONSUMER_IDS_CHANGED_VALUE, m_pClientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(RESET_CONSUMER_CLIENT_OFFSET_VALUE, m_pClientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(GET_CONSUMER_STATUS_FROM_CLIENT_VALUE, m_pClientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(GET_CONSUMER_RUNNING_INFO_VALUE, m_pClientRemotingProcessor);
+ m_pRemotingClient->registerProcessor(CONSUME_MESSAGE_DIRECTLY_VALUE, m_pClientRemotingProcessor);
+}
+
+MQClientAPIImpl::~MQClientAPIImpl()
+{
+}
+
+std::string MQClientAPIImpl::getProjectGroupPrefix()
+{
+ return m_projectGroupPrefix;
+}
+
+std::vector<std::string> MQClientAPIImpl::getNameServerAddressList()
+{
+ return m_pRemotingClient->getNameServerAddressList();
+}
+
+TcpRemotingClient* MQClientAPIImpl::getRemotingClient()
+{
+ return m_pRemotingClient;
+}
+
+std::string MQClientAPIImpl::fetchNameServerAddr()
+{
+ try
+ {
+ std::string addrs = m_topAddressing.fetchNSAddr();
+ if (!addrs.empty())
+ {
+ if (addrs != m_nameSrvAddr)
+ {
+ RMQ_INFO("name server address changed, %s -> %s",
+ m_nameSrvAddr.c_str(), addrs.c_str());
+ updateNameServerAddressList(addrs);
+ m_nameSrvAddr = addrs;
+ return m_nameSrvAddr;
+ }
+ }
+ }
+ catch (...)
+ {
+ RMQ_ERROR("fetchNameServerAddr Exception");
+ }
+
+ return m_nameSrvAddr;
+}
+
+void MQClientAPIImpl::updateNameServerAddressList(const std::string& addrs)
+{
+ m_nameSrvAddr = addrs;
+ std::vector<std::string> av;
+ UtilAll::Split(av, addrs, ";");
+ if (av.size() > 0)
+ {
+ m_pRemotingClient->updateNameServerAddressList(av);
+ }
+}
+
+void MQClientAPIImpl::start()
+{
+ m_pRemotingClient->start();
+
+ try
+ {
+ std::string localAddress = getLocalAddress();
+ m_projectGroupPrefix = getProjectGroupByIp(localAddress, 3000);
+ }
+ catch (std::exception e)
+ {
+ }
+}
+
+void MQClientAPIImpl::shutdown()
+{
+ m_pRemotingClient->shutdown();
+}
+
+void MQClientAPIImpl::createSubscriptionGroup(const std::string& addr,
+ SubscriptionGroupConfig config,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+
+void MQClientAPIImpl::createTopic(const std::string& addr,
+ const std::string& defaultTopic,
+ TopicConfig topicConfig,
+ int timeoutMillis)
+{
+ std::string topicWithProjectGroup = topicConfig.getTopicName();
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ topicWithProjectGroup =
+ VirtualEnvUtil::buildWithProjectGroup(topicConfig.getTopicName(), m_projectGroupPrefix);
+ }
+
+ CreateTopicRequestHeader* requestHeader = new CreateTopicRequestHeader();
+ requestHeader->topic = (topicWithProjectGroup);
+ requestHeader->defaultTopic = (defaultTopic);
+ requestHeader->readQueueNums = (topicConfig.getReadQueueNums());
+ requestHeader->writeQueueNums = (topicConfig.getWriteQueueNums());
+ requestHeader->perm = (topicConfig.getPerm());
+ requestHeader->topicFilterType = (topicConfig.getTopicFilterType());
+ requestHeader->topicSysFlag = (topicConfig.getTopicSysFlag());
+ requestHeader->order = (topicConfig.isOrder());
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(UPDATE_AND_CREATE_TOPIC_VALUE, requestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ return;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "createTopic failed", -1);
+}
+
+SendResult MQClientAPIImpl::sendMessage(const std::string& addr,
+ const std::string& brokerName,
+ Message& msg,
+ SendMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ SendCallback* pSendCallback)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), m_projectGroupPrefix));
+ pRequestHeader->producerGroup = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->producerGroup,
+ m_projectGroupPrefix));
+ pRequestHeader->topic = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+ m_projectGroupPrefix));
+ }
+
+ bool sendSmartMsg = true;
+ RemotingCommandPtr request = NULL;
+ if (sendSmartMsg)
+ {
+ SendMessageRequestHeaderV2* pRequestHeaderV2 = SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(pRequestHeader);
+ request = RemotingCommand::createRequestCommand(SEND_MESSAGE_V2_VALUE, pRequestHeaderV2);
+ delete pRequestHeader;
+ }
+ else
+ {
+ request = RemotingCommand::createRequestCommand(SEND_MESSAGE_VALUE, pRequestHeader);
+ }
+
+ if (msg.getCompressBody() != NULL)
+ {
+ request->setBody((char*)msg.getCompressBody(), msg.getCompressBodyLen(), false);
+ }
+ else
+ {
+ request->setBody((char*)msg.getBody(), msg.getBodyLen(), false);
+ }
+
+ SendResult result;
+ switch (communicationMode)
+ {
+ case ONEWAY:
+ m_pRemotingClient->invokeOneway(addr, request, timeoutMillis);
+ return result;
+ case ASYNC:
+ sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, pSendCallback);
+ return result;
+ case SYNC:
+ {
+ SendResult* r = sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
+ if (r)
+ {
+ result = *r;
+ delete r;
+ }
+ return result;
+ }
+ default:
+ break;
+ }
+ return result;
+}
+
+PullResult* MQClientAPIImpl::pullMessage(const std::string& addr,
+ PullMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ PullCallback* pPullCallback)
+{
+
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestHeader->consumerGroup = (VirtualEnvUtil::buildWithProjectGroup(
+ pRequestHeader->consumerGroup, m_projectGroupPrefix));
+ pRequestHeader->topic = (VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+ m_projectGroupPrefix));
+ }
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(PULL_MESSAGE_VALUE, pRequestHeader);
+
+ PullResult* result = NULL;
+ switch (communicationMode)
+ {
+ case ONEWAY:
+ break;
+ case ASYNC:
+ pullMessageAsync(addr, request, timeoutMillis, pPullCallback);
+ break;
+ case SYNC:
+ result = pullMessageSync(addr, request, timeoutMillis);
+ break;
+ default:
+ assert(false);
+ break;
+ }
+
+ return result;
+}
+
+MessageExt* MQClientAPIImpl::viewMessage(const std::string& addr, long long phyoffset, int timeoutMillis)
+{
+ ViewMessageRequestHeader* requestHeader = new ViewMessageRequestHeader();
+ requestHeader->offset = phyoffset;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(VIEW_MESSAGE_BY_ID_VALUE, requestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ if (response->getBody() != NULL)
+ {
+ int len = 0;
+ MessageExt* messageExt = MessageDecoder::decode((char*)response->getBody(),
+ response->getBodyLen(), len);
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ messageExt->setTopic(VirtualEnvUtil::clearProjectGroup(messageExt->getTopic(),
+ m_projectGroupPrefix));
+ }
+ return messageExt;
+ }
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "viewMessage failed", -1);
+}
+
+long long MQClientAPIImpl::searchOffset(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ long long timestamp,
+ int timeoutMillis)
+{
+ std::string topicWithProjectGroup = topic;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix);
+ }
+
+ SearchOffsetRequestHeader* pRequestHeader = new SearchOffsetRequestHeader();
+ pRequestHeader->topic = topicWithProjectGroup;
+ pRequestHeader->queueId = queueId;
+ pRequestHeader->timestamp = timestamp;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(SEARCH_OFFSET_BY_TIMESTAMP_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ SearchOffsetResponseHeader* ret = (SearchOffsetResponseHeader*)response->getCommandCustomHeader();
+ return ret->offset;
+ }
+ default:
+ break;
+ }
+ //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ //THROW_MQEXCEPTION(MQClientException, "searchOffset failed", -1);
+ return -1;
+}
+
+long long MQClientAPIImpl::getMaxOffset(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ int timeoutMillis)
+{
+ std::string topicWithProjectGroup = topic;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix);
+ }
+
+ GetMaxOffsetRequestHeader* pRequestHeader = new GetMaxOffsetRequestHeader();
+ pRequestHeader->topic = topicWithProjectGroup;
+ pRequestHeader->queueId = queueId;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(GET_MAX_OFFSET_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ GetMaxOffsetResponseHeader* ret = (GetMaxOffsetResponseHeader*)response->getCommandCustomHeader();
+ return ret->offset;
+ }
+ default:
+ break;
+ }
+ //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ //THROW_MQEXCEPTION(MQClientException, "getMaxOffset failed", -1);
+ return -1;
+}
+
+
+std::list<std::string> MQClientAPIImpl::getConsumerIdListByGroup(const std::string& addr,
+ const std::string& consumerGroup,
+ int timeoutMillis)
+{
+ std::string consumerGroupWithProjectGroup = consumerGroup;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ consumerGroupWithProjectGroup =
+ VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix);
+ }
+
+ GetConsumerListByGroupRequestHeader* requestHeader = new GetConsumerListByGroupRequestHeader();
+ requestHeader->consumerGroup = consumerGroupWithProjectGroup;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(GET_CONSUMER_LIST_BY_GROUP_VALUE, requestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ if (response->getBody() != NULL)
+ {
+ GetConsumerListByGroupResponseBody* body =
+ GetConsumerListByGroupResponseBody::decode((char*)response->getBody(), response->getBodyLen());
+ std::list<std::string> ret = body->getConsumerIdList();
+ delete body;
+ return ret;
+ }
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "getConsumerIdListByGroup failed", -1);
+}
+
+long long MQClientAPIImpl::getMinOffset(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ int timeoutMillis)
+{
+ std::string topicWithProjectGroup = topic;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix);
+ }
+
+ GetMinOffsetRequestHeader* pRequestHeader = new GetMinOffsetRequestHeader();
+ pRequestHeader->topic = topicWithProjectGroup;
+ pRequestHeader->queueId = queueId;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(GET_MIN_OFFSET_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ GetMinOffsetResponseHeader* ret = (GetMinOffsetResponseHeader*)response->getCommandCustomHeader();
+ return ret->offset;
+ }
+ default:
+ break;
+ }
+ //THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ //THROW_MQEXCEPTION(MQClientException, "getMinOffset failed", -1);
+ return -1;
+}
+
+long long MQClientAPIImpl::getEarliestMsgStoretime(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ int timeoutMillis)
+{
+ std::string topicWithProjectGroup = topic;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix);
+ }
+
+ GetEarliestMsgStoretimeRequestHeader* pRequestHeader = new GetEarliestMsgStoretimeRequestHeader();
+ pRequestHeader->topic = topicWithProjectGroup;
+ pRequestHeader->queueId = queueId;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(GET_EARLIEST_MSG_STORETIME_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ GetEarliestMsgStoretimeResponseHeader* ret = (GetEarliestMsgStoretimeResponseHeader*)response->getCommandCustomHeader();
+ return ret->timestamp;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "getEarliestMsgStoretime failed", -1);
+}
+
+long long MQClientAPIImpl::queryConsumerOffset(const std::string& addr,
+ QueryConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup(
+ pRequestHeader->consumerGroup, m_projectGroupPrefix);
+ pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+ m_projectGroupPrefix);
+ }
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(QUERY_CONSUMER_OFFSET_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ QueryConsumerOffsetResponseHeader* ret = (QueryConsumerOffsetResponseHeader*)response->getCommandCustomHeader();
+ long long offset = ret->offset;
+ return offset;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQBrokerException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "queryConsumerOffset failed", -1);
+ return -1;
+}
+
+void MQClientAPIImpl::updateConsumerOffset(const std::string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup(
+ pRequestHeader->consumerGroup, m_projectGroupPrefix);
+ pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(
+ pRequestHeader->topic, m_projectGroupPrefix);
+ }
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ return;
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "updateConsumerOffset failed", -1);
+}
+
+void MQClientAPIImpl::updateConsumerOffsetOneway(const std::string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestHeader->consumerGroup = VirtualEnvUtil::buildWithProjectGroup(
+ pRequestHeader->consumerGroup, m_projectGroupPrefix);
+ pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+ m_projectGroupPrefix);
+ }
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(UPDATE_CONSUMER_OFFSET_VALUE, pRequestHeader);
+
+ m_pRemotingClient->invokeOneway(addr, request, timeoutMillis);
+}
+
+void MQClientAPIImpl::sendHearbeat(const std::string& addr, HeartbeatData* pHeartbeatData, int timeoutMillis)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ std::set<ConsumerData>& consumerDatas = pHeartbeatData->getConsumerDataSet();
+ std::set<ConsumerData>::iterator it = consumerDatas.begin();
+ for (; it != consumerDatas.end(); it++)
+ {
+ ConsumerData& consumerData = (ConsumerData&)(*it);
+ consumerData.groupName = VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName,
+ m_projectGroupPrefix);
+
+ std::set<SubscriptionData>& subscriptionDatas = consumerData.subscriptionDataSet;
+ std::set<SubscriptionData>::iterator itsub = subscriptionDatas.begin();
+ for (; itsub != subscriptionDatas.end(); itsub++)
+ {
+ SubscriptionData& subscriptionData = (SubscriptionData&)(*itsub);
+ subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup(
+ subscriptionData.getTopic(), m_projectGroupPrefix));
+ }
+ }
+
+ std::set<ProducerData>& producerDatas = pHeartbeatData->getProducerDataSet();
+ std::set<ProducerData>::iterator itp = producerDatas.begin();
+ for (; itp != producerDatas.end(); itp++)
+ {
+ ProducerData& producerData = (ProducerData&)(*itp);
+ producerData.groupName = VirtualEnvUtil::buildWithProjectGroup(producerData.groupName,
+ m_projectGroupPrefix);
+ }
+ }
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL);
+
+ std::string body;
+ pHeartbeatData->encode(body);
+ request->setBody((char*)body.data(), body.length(), true);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ return;
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "sendHearbeat failed", -1);
+}
+
+void MQClientAPIImpl::unregisterClient(const std::string& addr,
+ const std::string& clientID,
+ const std::string& producerGroup,
+ const std::string& consumerGroup,
+ int timeoutMillis)
+{
+ std::string producerGroupWithProjectGroup = producerGroup;
+ std::string consumerGroupWithProjectGroup = consumerGroup;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ producerGroupWithProjectGroup =
+ VirtualEnvUtil::buildWithProjectGroup(producerGroup, m_projectGroupPrefix);
+ consumerGroupWithProjectGroup =
+ VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix);
+ }
+
+ UnregisterClientRequestHeader* requestHeader = new UnregisterClientRequestHeader();
+ requestHeader->clientID = (clientID);
+ requestHeader->producerGroup = (producerGroupWithProjectGroup);
+ requestHeader->consumerGroup = (consumerGroupWithProjectGroup);
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(UNREGISTER_CLIENT_VALUE, requestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ return;
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "unregisterClient failed", -1);
+}
+
+void MQClientAPIImpl::endTransactionOneway(const std::string& addr,
+ EndTransactionRequestHeader* pRequestHeader,
+ const std::string& remark,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+void MQClientAPIImpl::queryMessage(const std::string& addr,
+ QueryMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ InvokeCallback* pInvokeCallback)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestHeader->topic = VirtualEnvUtil::buildWithProjectGroup(pRequestHeader->topic,
+ m_projectGroupPrefix);
+ }
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(QUERY_MESSAGE_VALUE, pRequestHeader);
+
+ m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, pInvokeCallback);
+ return;
+}
+
+bool MQClientAPIImpl::registerClient(const std::string& addr, HeartbeatData& heartbeat, int timeoutMillis)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ std::set<ConsumerData>& consumerDatas = heartbeat.getConsumerDataSet();
+ std::set<ConsumerData>::iterator it = consumerDatas.begin();
+
+ for (; it != consumerDatas.end(); it++)
+ {
+ ConsumerData& consumerData = (ConsumerData&)(*it);
+
+ consumerData.groupName = VirtualEnvUtil::buildWithProjectGroup(consumerData.groupName,
+ m_projectGroupPrefix);
+ std::set<SubscriptionData>& subscriptionDatas = consumerData.subscriptionDataSet;
+ std::set<SubscriptionData>::iterator itsub = subscriptionDatas.begin();
+
+ for (; itsub != subscriptionDatas.end(); itsub++)
+ {
+ SubscriptionData& subscriptionData = (SubscriptionData&)(*itsub);
+ subscriptionData.setTopic(VirtualEnvUtil::buildWithProjectGroup(
+ subscriptionData.getTopic(), m_projectGroupPrefix));
+ }
+ }
+
+ std::set<ProducerData>& producerDatas = heartbeat.getProducerDataSet();
+ std::set<ProducerData>::iterator itp = producerDatas.begin();
+ for (; itp != producerDatas.end(); itp++)
+ {
+ ProducerData& producerData = (ProducerData&)(*itp);
+ producerData.groupName = VirtualEnvUtil::buildWithProjectGroup(producerData.groupName,
+ m_projectGroupPrefix);
+ }
+ }
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(HEART_BEAT_VALUE, NULL);
+
+ std::string body;
+ heartbeat.encode(body);
+
+ request->setBody((char*)body.data(), body.length(), true);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ return (response && response->getCode() == SUCCESS_VALUE);
+}
+
+void MQClientAPIImpl::consumerSendMessageBack(
+ const std::string& addr,
+ MessageExt& msg,
+ const std::string& consumerGroup,
+ int delayLevel,
+ int timeoutMillis)
+{
+ std::string consumerGroupWithProjectGroup = consumerGroup;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ consumerGroupWithProjectGroup =
+ VirtualEnvUtil::buildWithProjectGroup(consumerGroup, m_projectGroupPrefix);
+ msg.setTopic(VirtualEnvUtil::buildWithProjectGroup(msg.getTopic(), m_projectGroupPrefix));
+ }
+
+ ConsumerSendMsgBackRequestHeader* requestHeader = new ConsumerSendMsgBackRequestHeader();
+ requestHeader->group = consumerGroupWithProjectGroup;
+ requestHeader->offset = msg.getCommitLogOffset();
+ requestHeader->delayLevel = delayLevel;
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(CONSUMER_SEND_MSG_BACK_VALUE, requestHeader);
+
+ std::string brokerAddr = addr.empty() ? socketAddress2IPPort(msg.getStoreHost()) : addr;
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(brokerAddr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ return;
+ break;
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "consumerSendMessageBack failed", -1);
+}
+
+std::set<MessageQueue> MQClientAPIImpl::lockBatchMQ(const std::string& addr,
+ LockBatchRequestBody* pRequestBody,
+ int timeoutMillis)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup(
+ pRequestBody->getConsumerGroup(), m_projectGroupPrefix)));
+ std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet();
+ std::set<MessageQueue>::iterator it = messageQueues.begin();
+
+ for (; it != messageQueues.end(); it++)
+ {
+ MessageQueue& messageQueue = (MessageQueue&)(*it);
+ messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(),
+ m_projectGroupPrefix));
+ }
+ }
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(LOCK_BATCH_MQ_VALUE, NULL);
+
+ std::string body;
+ pRequestBody->encode(body);
+ request->setBody((char*)body.data(), body.length(), true);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ LockBatchResponseBody* responseBody =
+ LockBatchResponseBody::decode(response->getBody(), response->getBodyLen());
+ std::set<MessageQueue> messageQueues = responseBody->getLockOKMQSet();
+
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ std::set<MessageQueue>::iterator it = messageQueues.begin();
+
+ for (; it != messageQueues.end(); it++)
+ {
+ MessageQueue& messageQueue = (MessageQueue&)(*it);
+ messageQueue.setTopic(VirtualEnvUtil::clearProjectGroup(messageQueue.getTopic(),
+ m_projectGroupPrefix));
+ }
+ }
+ return messageQueues;
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "lockBatchMQ failed", -1);
+}
+
+void MQClientAPIImpl::unlockBatchMQ(const std::string& addr,
+ UnlockBatchRequestBody* pRequestBody,
+ int timeoutMillis,
+ bool oneway)
+{
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ pRequestBody->setConsumerGroup((VirtualEnvUtil::buildWithProjectGroup(
+ pRequestBody->getConsumerGroup(), m_projectGroupPrefix)));
+ std::set<MessageQueue>& messageQueues = pRequestBody->getMqSet();
+ std::set<MessageQueue>::iterator it = messageQueues.begin();
+
+ for (; it != messageQueues.end(); it++)
+ {
+ MessageQueue& messageQueue = (MessageQueue&)(*it);
+ messageQueue.setTopic(VirtualEnvUtil::buildWithProjectGroup(messageQueue.getTopic(),
+ m_projectGroupPrefix));
+ }
+ }
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(UNLOCK_BATCH_MQ_VALUE, NULL);
+
+ std::string body;
+ pRequestBody->encode(body);
+ request->setBody((char*)body.data(), body.length(), true);
+
+ if (oneway)
+ {
+ m_pRemotingClient->invokeOneway(addr, request, timeoutMillis);
+ }
+ else
+ {
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ return;
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "unlockBatchMQ failed", -1);
+ }
+}
+
+TopicStatsTable MQClientAPIImpl::getTopicStatsInfo(const std::string& addr,
+ const std::string& topic,
+ int timeoutMillis)
+{
+ //TODO
+ TopicStatsTable t;
+ return t;
+}
+
+ConsumeStats MQClientAPIImpl::getConsumeStats(const std::string& addr,
+ const std::string& consumerGroup,
+ int timeoutMillis)
+{
+ //TODO
+ ConsumeStats cs;
+ return cs;
+}
+
+ProducerConnection* MQClientAPIImpl::getProducerConnectionList(const std::string& addr,
+ const std::string& producerGroup,
+ int timeoutMillis)
+{
+ //TODO
+ return NULL;
+}
+
+ConsumerConnection* MQClientAPIImpl::getConsumerConnectionList(const std::string& addr,
+ const std::string& consumerGroup,
+ int timeoutMillis)
+{
+ //TODO
+ return NULL;
+}
+
+KVTable MQClientAPIImpl::getBrokerRuntimeInfo(const std::string& addr, int timeoutMillis)
+{
+ //TODO
+ KVTable kv;
+ return kv;
+}
+
+void MQClientAPIImpl::updateBrokerConfig(const std::string& addr,
+ const std::map<std::string, std::string>& properties,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+ClusterInfo* MQClientAPIImpl::getBrokerClusterInfo(int timeoutMillis)
+{
+ //TODO
+ return NULL;
+}
+
+TopicRouteData* MQClientAPIImpl::getDefaultTopicRouteInfoFromNameServer(const std::string& topic,
+ int timeoutMillis)
+{
+ GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader();
+ requestHeader->topic = topic;
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, requestHeader);
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case TOPIC_NOT_EXIST_VALUE:
+ {
+ // TODO LOG
+ break;
+ }
+ case SUCCESS_VALUE:
+ {
+ int bodyLen = response->getBodyLen();
+ const char* body = response->getBody();
+ if (body)
+ {
+ TopicRouteData* ret = TopicRouteData::encode(body, bodyLen);
+ return ret;
+ }
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ return NULL;
+}
+
+TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis)
+{
+ std::string topicWithProjectGroup = topic;
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ topicWithProjectGroup = VirtualEnvUtil::buildWithProjectGroup(topic, m_projectGroupPrefix);
+ }
+
+ GetRouteInfoRequestHeader* requestHeader = new GetRouteInfoRequestHeader();
+ requestHeader->topic = topicWithProjectGroup;
+
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ROUTEINTO_BY_TOPIC_VALUE, requestHeader);
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case TOPIC_NOT_EXIST_VALUE:
+ {
+ if (topic != MixAll::DEFAULT_TOPIC)
+ {
+ RMQ_WARN("get Topic [{%s}] RouteInfoFromNameServer is not exist value", topic.c_str());
+ }
+ break;
+ }
+ case SUCCESS_VALUE:
+ {
+ int bodyLen = response->getBodyLen();
+ const char* body = response->getBody();
+ if (body)
+ {
+ TopicRouteData* ret = TopicRouteData::encode(body, bodyLen);
+ return ret;
+ }
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ return NULL;
+}
+
+TopicList* MQClientAPIImpl::getTopicListFromNameServer(int timeoutMillis)
+{
+ RemotingCommandPtr request = RemotingCommand::createRequestCommand(GET_ALL_TOPIC_LIST_FROM_NAMESERVER_VALUE, NULL);
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ char* body = (char*)response->getBody();
+ if (body != NULL)
+ {
+ TopicList* topicList = TopicList::decode(body, response->getBodyLen());
+
+ if (!UtilAll::isBlank(m_projectGroupPrefix))
+ {
+ std::set<std::string> newTopicSet;
+
+ const std::set<std::string>& topics = topicList->getTopicList();
+ std::set<std::string>::const_iterator it = topics.begin();
+ for (; it != topics.end(); it++)
+ {
+ std::string topic = *it;
+ newTopicSet.insert(VirtualEnvUtil::clearProjectGroup(topic, m_projectGroupPrefix));
+ }
+
+ topicList->setTopicList(newTopicSet);
+ }
+
+ return topicList;
+ }
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ return NULL;
+}
+
+int MQClientAPIImpl::wipeWritePermOfBroker(const std::string& namesrvAddr,
+ const std::string& brokerName,
+ int timeoutMillis)
+{
+ //TODO
+ return 0;
+}
+
+void MQClientAPIImpl::deleteTopicInBroker(const std::string& addr,
+ const std::string& topic,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+void MQClientAPIImpl::deleteTopicInNameServer(const std::string& addr,
+ const std::string& topic,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+void MQClientAPIImpl::deleteSubscriptionGroup(const std::string& addr,
+ const std::string& groupName,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+std::string MQClientAPIImpl::getKVConfigValue(const std::string& projectNamespace,
+ const std::string& key,
+ int timeoutMillis)
+{
+ GetKVConfigRequestHeader* pRequestHeader = new GetKVConfigRequestHeader();
+ pRequestHeader->namespace_ = projectNamespace;
+ pRequestHeader->key = key;
+
+ RemotingCommandPtr request =
+ RemotingCommand::createRequestCommand(GET_KV_CONFIG_VALUE, pRequestHeader);
+
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync("", request, timeoutMillis);
+ if (response)
+ {
+ switch (response->getCode())
+ {
+ case SUCCESS_VALUE:
+ {
+ GetKVConfigResponseHeader* ret = (GetKVConfigResponseHeader*)response->getCommandCustomHeader();
+ return ret->value;
+ }
+ default:
+ break;
+ }
+ THROW_MQEXCEPTION(MQClientException, response->getRemark(), response->getCode());
+ }
+
+ THROW_MQEXCEPTION(MQClientException, "getKVConfigValue failed", -1);
+}
+
+void MQClientAPIImpl::putKVConfigValue(const std::string& projectNamespace,
+ const std::string& key,
+ const std::string& value,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+void MQClientAPIImpl::deleteKVConfigValue(const std::string& projectNamespace,
+ const std::string& key,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+std::string MQClientAPIImpl::getProjectGroupByIp(const std::string& ip, int timeoutMillis)
+{
+ return getKVConfigValue(NamesrvUtil::NAMESPACE_PROJECT_CONFIG, ip, timeoutMillis);
+}
+
+std::string MQClientAPIImpl::getKVConfigByValue(const std::string& projectNamespace,
+ const std::string& projectGroup,
+ int timeoutMillis)
+{
+ //TODO
+ return "";
+}
+
+KVTable MQClientAPIImpl::getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis)
+{
+ //TODO
+ return KVTable();
+}
+
+void MQClientAPIImpl::deleteKVConfigByValue(const std::string& projectNamespace,
+ const std::string& projectGroup,
+ int timeoutMillis)
+{
+ //TODO
+}
+
+SendResult* MQClientAPIImpl::sendMessageSync(const std::string& addr,
+ const std::string& brokerName,
+ Message& msg,
+ int timeoutMillis,
+ RemotingCommand* request)
+{
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, request, timeoutMillis);
+ return processSendResponse(brokerName, msg.getTopic(), response);
+}
+
+void MQClientAPIImpl::sendMessageAsync(const std::string& addr,
+ const std::string& brokerName,
+ Message& msg,
+ int timeoutMillis,
+ RemotingCommand* request,
+ SendCallback* pSendCallback)
+{
+ ProducerInvokeCallback* callback = new ProducerInvokeCallback(pSendCallback, this, msg.getTopic(), brokerName);
+ m_pRemotingClient->invokeAsync(addr, request, timeoutMillis, callback);
+}
+
+SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName,
+ const std::string& topic,
+ RemotingCommand* pResponse)
+{
+ if (pResponse == NULL)
+ {
+ return NULL;
+ }
+
+ switch (pResponse->getCode())
+ {
+ case FLUSH_DISK_TIMEOUT_VALUE:
+ case FLUSH_SLAVE_TIMEOUT_VALUE:
+ case SLAVE_NOT_AVAILABLE_VALUE:
+ {
+ // TODO LOG
+ }
+ case SUCCESS_VALUE:
+ {
+ SendStatus sendStatus = SEND_OK;
+ switch (pResponse->getCode())
+ {
+ case FLUSH_DISK_TIMEOUT_VALUE:
+ sendStatus = FLUSH_DISK_TIMEOUT;
+ break;
+ case FLUSH_SLAVE_TIMEOUT_VALUE:
+ sendStatus = FLUSH_SLAVE_TIMEOUT;
+ break;
+ case SLAVE_NOT_AVAILABLE_VALUE:
+ sendStatus = SLAVE_NOT_AVAILABLE;
+ break;
+ case SUCCESS_VALUE:
+ sendStatus = SEND_OK;
+ break;
+ default:
+ //assert false;
+ break;
+ }
+
+ SendMessageResponseHeader* responseHeader = (SendMessageResponseHeader*)pResponse->getCommandCustomHeader();
+ MessageQueue messageQueue(topic, brokerName, responseHeader->queueId);
+ SendResult* ret = new SendResult(sendStatus, responseHeader->msgId, messageQueue,
+ responseHeader->queueOffset, m_projectGroupPrefix);
+
+ return ret;
+ }
+ default:
+ break;
+ }
+
+ THROW_MQEXCEPTION(MQClientException, pResponse->getRemark(), pResponse->getCode());
+}
+
+void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
+ RemotingCommand* pRequest,
+ int timeoutMillis,
+ PullCallback* pPullCallback)
+{
+ ConsumerInvokeCallback* callback = new ConsumerInvokeCallback(pPullCallback, this);
+ m_pRemotingClient->invokeAsync(addr, pRequest, timeoutMillis, callback);
+}
+
+PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* pResponse)
+{
+ PullStatus pullStatus = NO_NEW_MSG;
+ switch (pResponse->getCode())
+ {
+ case SUCCESS_VALUE:
+ pullStatus = FOUND;
+ break;
+ case PULL_NOT_FOUND_VALUE:
+ pullStatus = NO_NEW_MSG;
+ break;
+ case PULL_RETRY_IMMEDIATELY_VALUE:
+ pullStatus = NO_MATCHED_MSG;
+ break;
+ case PULL_OFFSET_MOVED_VALUE:
+ pullStatus = OFFSET_ILLEGAL;
+ break;
+ default:
+ THROW_MQEXCEPTION(MQBrokerException, pResponse->getRemark(), pResponse->getCode());
+ break;
+ }
+
+ PullMessageResponseHeader* responseHeader = (PullMessageResponseHeader*) pResponse->getCommandCustomHeader();
+ std::list<MessageExt*> msgFoundList;
+ return new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
+ responseHeader->minOffset, responseHeader->maxOffset, msgFoundList,
+ responseHeader->suggestWhichBrokerId, pResponse->getBody(), pResponse->getBodyLen());
+}
+
+PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr,
+ RemotingCommand* pRequest,
+ int timeoutMillis)
+{
+ RemotingCommandPtr response = m_pRemotingClient->invokeSync(addr, pRequest, timeoutMillis);
+ PullResult* result = processPullResponse(response);
+
+ response->setBody(NULL, 0, false);
+ return result;
+}
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq-externals/blob/6a45c767/rocketmq-client4cpp/src/MQClientAPIImpl.h
----------------------------------------------------------------------
diff --git a/rocketmq-client4cpp/src/MQClientAPIImpl.h b/rocketmq-client4cpp/src/MQClientAPIImpl.h
new file mode 100755
index 0000000..88defb5
--- /dev/null
+++ b/rocketmq-client4cpp/src/MQClientAPIImpl.h
@@ -0,0 +1,280 @@
+/**
+* Copyright (C) 2013 kangliqiang ,kangliq@163.com
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+#ifndef __MQCLIENTAPIIMPL_H__
+#define __MQCLIENTAPIIMPL_H__
+
+#include <string>
+#include <map>
+#include <list>
+#include <set>
+
+#include "ClientConfig.h"
+#include "RemoteClientConfig.h"
+#include "SubscriptionGroupConfig.h"
+#include "TopicConfig.h"
+#include "ConsumeStats.h"
+#include "TopicStatsTable.h"
+#include "KVTable.h"
+#include "TopicRouteData.h"
+#include "SendResult.h"
+#include "PullResult.h"
+#include "MessageExt.h"
+#include "CommunicationMode.h"
+#include "TopAddressing.h"
+#include "HeartbeatData.h"
+#include "LockBatchBody.h"
+
+namespace rmq
+{
+class ClientConfig;
+class TcpRemotingClient;
+class QueryConsumerOffsetRequestHeader;
+class UpdateConsumerOffsetRequestHeader;
+class EndTransactionRequestHeader;
+class SendMessageRequestHeader;
+class PullMessageRequestHeader;
+class QueryMessageRequestHeader;
+class ProducerConnection;
+class ConsumerConnection;
+class ClusterInfo;
+class TopicList;
+class InvokeCallback;
+class RemotingCommand;
+class PullCallback;
+class SendCallback;
+class ClientRemotingProcessor;
+
+class MQClientAPIImpl
+{
+ public:
+ MQClientAPIImpl(ClientConfig& clientConfig,
+ const RemoteClientConfig& remoteClientConfig,
+ ClientRemotingProcessor* pClientRemotingProcessor);
+ ~MQClientAPIImpl();
+
+ void start();
+ void shutdown();
+
+ std::string getProjectGroupPrefix();
+ std::vector<std::string> getNameServerAddressList();
+ void updateNameServerAddressList(const std::string& addrs);
+ std::string fetchNameServerAddr();
+
+ void createSubscriptionGroup(const std::string& addr,
+ SubscriptionGroupConfig config,
+ int timeoutMillis);
+
+ void createTopic(const std::string& addr,
+ const std::string& defaultTopic,
+ TopicConfig topicConfig,
+ int timeoutMillis);
+
+ SendResult sendMessage(const std::string& addr,
+ const std::string& brokerName,
+ Message& msg,
+ SendMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ SendCallback* pSendCallback);
+
+ PullResult* pullMessage(const std::string& addr,
+ PullMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ PullCallback* pPullCallback);
+
+ MessageExt* viewMessage(const std::string& addr, long long phyoffset, int timeoutMillis);
+
+
+ long long searchOffset(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ long long timestamp,
+ int timeoutMillis);
+
+ long long getMaxOffset(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ int timeoutMillis);
+
+ std::list<std::string> getConsumerIdListByGroup(const std::string& addr,
+ const std::string& consumerGroup,
+ int timeoutMillis);
+
+ long long getMinOffset(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ int timeoutMillis);
+
+ long long getEarliestMsgStoretime(const std::string& addr,
+ const std::string& topic,
+ int queueId,
+ int timeoutMillis);
+
+ long long queryConsumerOffset(const std::string& addr,
+ QueryConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis);
+
+ void updateConsumerOffset(const std::string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis);
+
+ void updateConsumerOffsetOneway(const std::string& addr,
+ UpdateConsumerOffsetRequestHeader* pRequestHeader,
+ int timeoutMillis);
+
+ void sendHearbeat(const std::string& addr, HeartbeatData* pHeartbeatData, int timeoutMillis);
+
+ void unregisterClient(const std::string& addr,
+ const std::string& clientID,
+ const std::string& producerGroup,
+ const std::string& consumerGroup,
+ int timeoutMillis);
+
+ void endTransactionOneway(const std::string& addr,
+ EndTransactionRequestHeader* pRequestHeader,
+ const std::string& remark,
+ int timeoutMillis);
+
+ void queryMessage(const std::string& addr,
+ QueryMessageRequestHeader* pRequestHeader,
+ int timeoutMillis,
+ InvokeCallback* pInvokeCallback);
+
+ bool registerClient(const std::string& addr,
+ HeartbeatData& heartbeat,
+ int timeoutMillis);
+
+ void consumerSendMessageBack(const std::string& addr,
+ MessageExt& msg,
+ const std::string& consumerGroup,
+ int delayLevel,
+ int timeoutMillis);
+
+ std::set<MessageQueue> lockBatchMQ(const std::string& addr,
+ LockBatchRequestBody* pRequestBody,
+ int timeoutMillis);
+
+ void unlockBatchMQ(const std::string& addr,
+ UnlockBatchRequestBody* pRequestBody,
+ int timeoutMillis,
+ bool oneway);
+
+ TopicStatsTable getTopicStatsInfo(const std::string& addr,
+ const std::string& topic,
+ int timeoutMillis);
+
+ ConsumeStats getConsumeStats(const std::string& addr,
+ const std::string& consumerGroup,
+ int timeoutMillis);
+
+ ProducerConnection* getProducerConnectionList(const std::string& addr,
+ const std::string& producerGroup,
+ int timeoutMillis);
+
+ ConsumerConnection* getConsumerConnectionList(const std::string& addr,
+ const std::string& consumerGroup,
+ int timeoutMillis);
+
+ KVTable getBrokerRuntimeInfo(const std::string& addr, int timeoutMillis);
+
+ void updateBrokerConfig(const std::string& addr,
+ const std::map<std::string, std::string>& properties,
+ int timeoutMillis);
+
+ ClusterInfo* getBrokerClusterInfo(int timeoutMillis);
+
+ TopicRouteData* getDefaultTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
+
+ TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
+
+ TopicList* getTopicListFromNameServer(int timeoutMillis);
+
+ int wipeWritePermOfBroker(const std::string& namesrvAddr,
+ const std::string& brokerName,
+ int timeoutMillis);
+
+ void deleteTopicInBroker(const std::string& addr, const std::string& topic, int timeoutMillis);
+ void deleteTopicInNameServer(const std::string& addr, const std::string& topic, int timeoutMillis);
+ void deleteSubscriptionGroup(const std::string& addr,
+ const std::string& groupName,
+ int timeoutMillis);
+
+ std::string getKVConfigValue(const std::string& projectNamespace,
+ const std::string& key,
+ int timeoutMillis);
+
+ void putKVConfigValue(const std::string& projectNamespace,
+ const std::string& key,
+ const std::string& value,
+ int timeoutMillis);
+
+ void deleteKVConfigValue(const std::string& projectNamespace, const std::string& key, int timeoutMillis);
+
+ std::string getProjectGroupByIp(const std::string& ip, int timeoutMillis);
+
+ std::string getKVConfigByValue(const std::string& projectNamespace,
+ const std::string& projectGroup,
+ int timeoutMillis);
+
+ KVTable getKVListByNamespace(const std::string& projectNamespace, int timeoutMillis);
+
+ void deleteKVConfigByValue(const std::string& projectNamespace,
+ const std::string& projectGroup,
+ int timeoutMillis);
+
+ TcpRemotingClient* getRemotingClient();
+
+ SendResult* processSendResponse(const std::string& brokerName,
+ const std::string& topic,
+ RemotingCommand* pResponse);
+
+ PullResult* processPullResponse(RemotingCommand* pResponse);
+
+ private:
+ SendResult* sendMessageSync(const std::string& addr,
+ const std::string& brokerName,
+ Message& msg,
+ int timeoutMillis,
+ RemotingCommand* request);
+
+ void sendMessageAsync(const std::string& addr,
+ const std::string& brokerName,
+ Message& msg,
+ int timeoutMillis,
+ RemotingCommand* request,
+ SendCallback* pSendCallback);
+
+ void pullMessageAsync(const std::string& addr,
+ RemotingCommand* pRequest,
+ int timeoutMillis,
+ PullCallback* pPullCallback);
+
+ PullResult* pullMessageSync(const std::string& addr,
+ RemotingCommand* pRequest,
+ int timeoutMillis);
+
+ private:
+ TcpRemotingClient* m_pRemotingClient;
+ TopAddressing m_topAddressing;
+ ClientRemotingProcessor* m_pClientRemotingProcessor;
+ std::string m_nameSrvAddr;
+ std::string m_projectGroupPrefix;
+ };
+}
+
+#endif
|