This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 3f83ddc Add unit-test for transport (#110)
3f83ddc is described below
commit 3f83ddcbe5d0c869282f5c672745ed06c902c9c3
Author: githublaohu <2372554140@qq.com>
AuthorDate: Wed Apr 3 10:20:47 2019 +0800
Add unit-test for transport (#110)
Add unit-test for transport
---
test/src/transport/ClientRemotingProcessorTest.cpp | 235 +++++++++++++++++++++
test/src/transport/ResponseFutureTest.cpp | 180 ++++++++++++++++
.../SocketUtilTest.cpp} | 43 ++--
3 files changed, 438 insertions(+), 20 deletions(-)
diff --git a/test/src/transport/ClientRemotingProcessorTest.cpp b/test/src/transport/ClientRemotingProcessorTest.cpp
new file mode 100644
index 0000000..80348b0
--- /dev/null
+++ b/test/src/transport/ClientRemotingProcessorTest.cpp
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include "map"
+#include "string.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "json/value.h"
+#include "json/writer.h"
+
+#include "ClientRPCHook.h"
+#include "ClientRemotingProcessor.h"
+#include "ConsumerRunningInfo.h"
+#include "MQClientFactory.h"
+#include "MQMessageQueue.h"
+#include "MQProtos.h"
+#include "RemotingCommand.h"
+#include "SessionCredentials.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+
+using std::map;
+using std::string;
+
+using ::testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Mock;
+using testing::Return;
+using testing::SetArgReferee;
+
+using Json::FastWriter;
+using Json::Value;
+
+using rocketmq::ClientRemotingProcessor;
+using rocketmq::ClientRPCHook;
+using rocketmq::ConsumerRunningInfo;
+using rocketmq::GetConsumerRunningInfoRequestHeader;
+using rocketmq::MemoryBlock;
+using rocketmq::MQClientFactory;
+using rocketmq::MQMessageQueue;
+using rocketmq::MQRequestCode;
+using rocketmq::MQResponseCode;
+using rocketmq::NotifyConsumerIdsChangedRequestHeader;
+using rocketmq::RemotingCommand;
+using rocketmq::ResetOffsetBody;
+using rocketmq::ResetOffsetRequestHeader;
+using rocketmq::SessionCredentials;
+using rocketmq::UtilAll;
+
+class MockClientRemotingProcessor : public ClientRemotingProcessor {
+ public:
+ MockClientRemotingProcessor(MQClientFactory *factrory) : ClientRemotingProcessor(factrory)
{}
+ MOCK_METHOD1(resetOffset, RemotingCommand *(RemotingCommand *request));
+ MOCK_METHOD1(getConsumerRunningInfo, RemotingCommand *(RemotingCommand *request));
+ MOCK_METHOD1(notifyConsumerIdsChanged, RemotingCommand *(RemotingCommand *request));
+};
+
+class MockMQClientFactory : public MQClientFactory {
+ public:
+ MockMQClientFactory(const string &clientID,
+ int pullThreadNum,
+ uint64_t tcpConnectTimeout,
+ uint64_t tcpTransportTryLockTimeout,
+ string unitName)
+ : MQClientFactory(clientID, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout,
unitName) {}
+
+ MOCK_METHOD3(resetOffset,
+ void(const string &group, const string &topic, const map<MQMessageQueue,
int64> &offsetTable));
+ MOCK_METHOD1(consumerRunningInfo, ConsumerRunningInfo *(const string &consumerGroup));
+ MOCK_METHOD2(getSessionCredentialFromConsumer,
+ bool(const string &consumerGroup, SessionCredentials &sessionCredentials));
+ MOCK_METHOD1(doRebalanceByConsumerGroup, void(const string &consumerGroup));
+};
+
+TEST(clientRemotingProcessor, processRequest) {
+ MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+ ClientRemotingProcessor clientRemotingProcessor(factory);
+
+ string addr = "127.0.0.1:9876";
+ RemotingCommand *command = new RemotingCommand();
+ RemotingCommand *pResponse = new RemotingCommand(13);
+
+ pResponse->setCode(MQRequestCode::RESET_CONSUMER_CLIENT_OFFSET);
+ command->setCode(MQRequestCode::RESET_CONSUMER_CLIENT_OFFSET);
+ EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+ EXPECT_EQ(nullptr, clientRemotingProcessor.processRequest(addr, command));
+
+ NotifyConsumerIdsChangedRequestHeader *header = new NotifyConsumerIdsChangedRequestHeader();
+ header->setGroup("testGroup");
+ RemotingCommand *twoCommand = new RemotingCommand(MQRequestCode::NOTIFY_CONSUMER_IDS_CHANGED,
header);
+
+ EXPECT_EQ(NULL, clientRemotingProcessor.processRequest(addr, twoCommand));
+
+ command->setCode(MQRequestCode::GET_CONSUMER_RUNNING_INFO);
+ // EXPECT_EQ(NULL , clientRemotingProcessor.processRequest(addr, command));
+
+ command->setCode(MQRequestCode::CHECK_TRANSACTION_STATE);
+ EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+ command->setCode(MQRequestCode::GET_CONSUMER_STATUS_FROM_CLIENT);
+ EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+ command->setCode(MQRequestCode::CONSUME_MESSAGE_DIRECTLY);
+ EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+ command->setCode(1);
+ EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+ delete command;
+ delete pResponse;
+}
+
+TEST(clientRemotingProcessor, resetOffset) {
+ MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+ Mock::AllowLeak(factory);
+ ClientRemotingProcessor clientRemotingProcessor(factory);
+ Value root;
+ Value messageQueues;
+ Value messageQueue;
+ messageQueue["brokerName"] = "testBroker";
+ messageQueue["queueId"] = 4;
+ messageQueue["topic"] = "testTopic";
+ messageQueue["offset"] = 1024;
+
+ messageQueues.append(messageQueue);
+ root["offsetTable"] = messageQueues;
+
+ FastWriter wrtier;
+ string strData = wrtier.write(root);
+
+ ResetOffsetRequestHeader *header = new ResetOffsetRequestHeader();
+ RemotingCommand *request = new RemotingCommand(13, header);
+
+ EXPECT_CALL(*factory, resetOffset(_, _, _)).Times(1);
+ clientRemotingProcessor.resetOffset(request);
+
+ request->SetBody(strData.c_str(), strData.size() - 2);
+ clientRemotingProcessor.resetOffset(request);
+
+ request->SetBody(strData.c_str(), strData.size());
+ clientRemotingProcessor.resetOffset(request);
+
+ delete header;
+ delete request;
+}
+
+TEST(clientRemotingProcessorS, getConsumerRunningInfo) {
+ MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+ ConsumerRunningInfo *info = new ConsumerRunningInfo();
+ EXPECT_CALL(*factory, consumerRunningInfo(_)).Times(2).WillOnce(Return(info)).WillOnce(Return(info));
+ EXPECT_CALL(*factory, getSessionCredentialFromConsumer(_, _))
+ .Times(2); //.WillRepeatedly(SetArgReferee<1>(sessionCredentials));
+ ClientRemotingProcessor clientRemotingProcessor(factory);
+
+ GetConsumerRunningInfoRequestHeader *header = new GetConsumerRunningInfoRequestHeader();
+ header->setConsumerGroup("testGroup");
+
+ RemotingCommand *request = new RemotingCommand(14, header);
+
+ RemotingCommand *command = clientRemotingProcessor.getConsumerRunningInfo("127.0.0.1:9876",
request);
+ EXPECT_EQ(command->getCode(), MQResponseCode::SYSTEM_ERROR);
+ EXPECT_EQ(command->getRemark(), "The Consumer Group not exist in this consumer");
+ delete command;
+ delete request;
+}
+
+TEST(clientRemotingProcessor, notifyConsumerIdsChanged) {
+ MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+ Mock::AllowLeak(factory);
+ ClientRemotingProcessor clientRemotingProcessor(factory);
+ NotifyConsumerIdsChangedRequestHeader *header = new NotifyConsumerIdsChangedRequestHeader();
+ header->setGroup("testGroup");
+ RemotingCommand *request = new RemotingCommand(14, header);
+
+ EXPECT_CALL(*factory, doRebalanceByConsumerGroup(_)).Times(1);
+ clientRemotingProcessor.notifyConsumerIdsChanged(request);
+
+ delete request;
+}
+
+TEST(clientRemotingProcessor, resetOffsetBody) {
+ MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+ ClientRemotingProcessor clientRemotingProcessor(factory);
+
+ Value root;
+ Value messageQueues;
+ Value messageQueue;
+ messageQueue["brokerName"] = "testBroker";
+ messageQueue["queueId"] = 4;
+ messageQueue["topic"] = "testTopic";
+ messageQueue["offset"] = 1024;
+
+ messageQueues.append(messageQueue);
+ root["offsetTable"] = messageQueues;
+
+ FastWriter wrtier;
+ string strData = wrtier.write(root);
+
+ MemoryBlock *mem = new MemoryBlock(strData.c_str(), strData.size());
+
+ ResetOffsetBody *resetOffset = ResetOffsetBody::Decode(mem);
+
+ map<MQMessageQueue, int64> map = resetOffset->getOffsetTable();
+ MQMessageQueue mqmq("testTopic", "testBroker", 4);
+ EXPECT_EQ(map[mqmq], 1024);
+ Mock::AllowLeak(factory);
+ delete resetOffset;
+ delete mem;
+}
+
+int main(int argc, char *argv[]) {
+ InitGoogleMock(&argc, argv);
+ testing::GTEST_FLAG(throw_on_failure) = true;
+ testing::GTEST_FLAG(filter) = "clientRemotingProcessor.*";
+ int itestts = RUN_ALL_TESTS();
+ return itestts;
+}
diff --git a/test/src/transport/ResponseFutureTest.cpp b/test/src/transport/ResponseFutureTest.cpp
new file mode 100644
index 0000000..483fced
--- /dev/null
+++ b/test/src/transport/ResponseFutureTest.cpp
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "AsyncCallback.h"
+#include "AsyncCallbackWrap.h"
+#include "MQClientAPIImpl.h"
+#include "MQMessage.h"
+#include "RemotingCommand.h"
+#include "ResponseFuture.h"
+#include "TcpRemotingClient.h"
+#include "UtilAll.h"
+
+using ::testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using rocketmq::AsyncCallback;
+using rocketmq::asyncCallBackStatus;
+using rocketmq::asyncCallBackType;
+using rocketmq::AsyncCallbackWrap;
+using rocketmq::MQClientAPIImpl;
+using rocketmq::MQMessage;
+using rocketmq::RemotingCommand;
+using rocketmq::ResponseFuture;
+using rocketmq::SendCallbackWrap;
+using rocketmq::TcpRemotingClient;
+using rocketmq::UtilAll;
+
+class MockAsyncCallbackWrap : public SendCallbackWrap {
+ public:
+ MockAsyncCallbackWrap(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pclientAPI)
+ : SendCallbackWrap("", MQMessage(), pAsyncCallback, pclientAPI) {}
+
+ MOCK_METHOD2(operationComplete, void(ResponseFuture *, bool));
+ MOCK_METHOD0(onException, void());
+ asyncCallBackType getCallbackType() { return asyncCallBackType::sendCallbackWrap; }
+};
+
+TEST(responseFuture, init) {
+ ResponseFuture responseFuture(13, 4, NULL, 1000);
+ EXPECT_EQ(responseFuture.getRequestCode(), 13);
+ EXPECT_EQ(responseFuture.getOpaque(), 4);
+
+ EXPECT_EQ(responseFuture.getRequestCommand().getCode(), 0);
+ EXPECT_FALSE(responseFuture.isSendRequestOK());
+ EXPECT_EQ(responseFuture.getMaxRetrySendTimes(), 1);
+ EXPECT_EQ(responseFuture.getRetrySendTimes(), 1);
+ EXPECT_EQ(responseFuture.getBrokerAddr(), "");
+
+ EXPECT_FALSE(responseFuture.getASyncFlag());
+ EXPECT_TRUE(responseFuture.getAsyncResponseFlag());
+ EXPECT_FALSE(responseFuture.getSyncResponseFlag());
+ EXPECT_TRUE(responseFuture.getAsyncCallbackWrap() == nullptr);
+
+ // ~ResponseFuture delete pcall
+ SendCallbackWrap *pcall = new SendCallbackWrap("", MQMessage(), nullptr, nullptr);
+ ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, pcall);
+ EXPECT_TRUE(twoResponseFuture.getASyncFlag());
+ EXPECT_FALSE(twoResponseFuture.getAsyncResponseFlag());
+ EXPECT_TRUE(twoResponseFuture.getSyncResponseFlag());
+ EXPECT_FALSE(twoResponseFuture.getAsyncCallbackWrap() == nullptr);
+}
+
+TEST(responseFuture, info) {
+ ResponseFuture responseFuture(13, 4, NULL, 1000);
+
+ responseFuture.setAsyncResponseFlag();
+ EXPECT_TRUE(responseFuture.getAsyncResponseFlag());
+
+ responseFuture.setBrokerAddr("127.0.0.1:9876");
+ EXPECT_EQ(responseFuture.getBrokerAddr(), "127.0.0.1:9876");
+
+ responseFuture.setMaxRetrySendTimes(3000);
+ EXPECT_EQ(responseFuture.getMaxRetrySendTimes(), 3000);
+
+ responseFuture.setRetrySendTimes(3000);
+ EXPECT_EQ(responseFuture.getRetrySendTimes(), 3000);
+
+ responseFuture.setSendRequestOK(true);
+ EXPECT_TRUE(responseFuture.isSendRequestOK());
+}
+
+TEST(responseFuture, response) {
+ // m_bAsync = false m_syncResponse
+ ResponseFuture responseFuture(13, 4, NULL, 1000);
+
+ EXPECT_FALSE(responseFuture.getASyncFlag());
+ EXPECT_FALSE(responseFuture.getSyncResponseFlag());
+ EXPECT_TRUE(responseFuture.getAsyncResponseFlag());
+
+ RemotingCommand *pResponseCommand = NULL;
+ responseFuture.setResponse(pResponseCommand);
+ EXPECT_EQ(responseFuture.getRequestCommand().getCode(), 0);
+
+ EXPECT_TRUE(responseFuture.getSyncResponseFlag());
+
+ // m_bAsync = true m_syncResponse
+ ResponseFuture twoResponseFuture(13, 4, NULL, 1000, true);
+
+ EXPECT_TRUE(twoResponseFuture.getASyncFlag());
+ EXPECT_TRUE(twoResponseFuture.getSyncResponseFlag());
+ EXPECT_FALSE(twoResponseFuture.getAsyncResponseFlag());
+
+ twoResponseFuture.setResponse(pResponseCommand);
+ EXPECT_TRUE(twoResponseFuture.getSyncResponseFlag());
+
+ ResponseFuture threeSesponseFuture(13, 4, NULL, 1000);
+
+ uint64_t millis = UtilAll::currentTimeMillis();
+ RemotingCommand *remotingCommand = threeSesponseFuture.waitResponse(10);
+ uint64_t useTime = UtilAll::currentTimeMillis() - millis;
+ EXPECT_LT(useTime, 30);
+
+ EXPECT_TRUE(responseFuture.getSyncResponseFlag());
+ EXPECT_EQ(NULL, remotingCommand);
+}
+
+TEST(responseFuture, executeInvokeCallback) {
+ // executeInvokeCallback delete wrap
+ MockAsyncCallbackWrap *wrap = new MockAsyncCallbackWrap(nullptr, nullptr);
+ ResponseFuture responseFuture(13, 4, nullptr, 1000, false, wrap);
+
+ RemotingCommand *pResponseCommand = new RemotingCommand();
+ responseFuture.setResponse(pResponseCommand);
+ responseFuture.executeInvokeCallback();
+ EXPECT_EQ(NULL, responseFuture.getCommand());
+
+ EXPECT_CALL(*wrap, operationComplete(_, _)).Times(1);
+ pResponseCommand = new RemotingCommand();
+ responseFuture.setResponse(pResponseCommand);
+ responseFuture.setAsyncCallBackStatus(asyncCallBackStatus::asyncCallBackStatus_response);
+ responseFuture.executeInvokeCallback();
+ EXPECT_EQ(pResponseCommand->getCode(), 0);
+
+ ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, false, NULL);
+ pResponseCommand = new RemotingCommand();
+ twoResponseFuture.executeInvokeCallback();
+ EXPECT_EQ(NULL, twoResponseFuture.getCommand());
+}
+
+TEST(responseFuture, executeInvokeCallbackException) {
+ // executeInvokeCallbackException delete wrap
+ MockAsyncCallbackWrap *wrap = new MockAsyncCallbackWrap(nullptr, nullptr);
+
+ ResponseFuture responseFuture(13, 4, nullptr, 1000, false, wrap);
+
+ EXPECT_CALL(*wrap, onException()).Times(1);
+ responseFuture.executeInvokeCallbackException();
+
+ responseFuture.setAsyncCallBackStatus(asyncCallBackStatus::asyncCallBackStatus_timeout);
+ responseFuture.executeInvokeCallbackException();
+
+ ResponseFuture twoRresponseFuture(13, 4, nullptr, 1000, false, NULL);
+ twoRresponseFuture.executeInvokeCallbackException();
+}
+
+int main(int argc, char *argv[]) {
+ InitGoogleMock(&argc, argv);
+ testing::GTEST_FLAG(throw_on_failure) = true;
+ testing::GTEST_FLAG(filter) = "responseFuture.*";
+ int itestts = RUN_ALL_TESTS();
+ return itestts;
+}
diff --git a/test/src/MQDecoderTest.cpp b/test/src/transport/SocketUtilTest.cpp
similarity index 61%
rename from test/src/MQDecoderTest.cpp
rename to test/src/transport/SocketUtilTest.cpp
index 263fe30..875f2e7 100644
--- a/test/src/MQDecoderTest.cpp
+++ b/test/src/transport/SocketUtilTest.cpp
@@ -14,31 +14,34 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-#include "gtest/gtest.h"
#include "gmock/gmock.h"
-#include <unistd.h>
-#include <stdio.h>
-#include "BatchMessage.h"
-#include "MQMessage.h"
-#include <map>
-#include "MQDecoder.h"
-
-using namespace std;
-using namespace rocketmq;
-using ::testing::InitGoogleTest;
+#include "gtest/gtest.h"
+
+#include "SocketUtil.h"
+
using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
using testing::Return;
-TEST(MQDecoderTest, messageProperties2String) {
- map<string, string> properties;
- string property = MQDecoder::messageProperties2String(properties);
- EXPECT_EQ(property.size(), 0);
- properties["aaa"] = "aaa";
- property = MQDecoder::messageProperties2String(properties);
- EXPECT_EQ(property.size(), 8);
+TEST(socketUtil, init) {
+ sockaddr addr = rocketmq::IPPort2socketAddress(inet_addr("127.0.0.1"), 10091);
+
+ EXPECT_EQ(rocketmq::socketAddress2IPPort(addr), "1.0.0.127:10091");
+
+ int host;
+ int port;
+
+ rocketmq::socketAddress2IPPort(addr, host, port);
+ EXPECT_EQ(host, inet_addr("127.0.0.1"));
+ EXPECT_EQ(port, 10091);
+
+ EXPECT_EQ(rocketmq::socketAddress2String(addr), "1.0.0.127");
}
-int main(int argc, char* argv[]) {
+int main(int argc, char *argv[]) {
InitGoogleMock(&argc, argv);
- return RUN_ALL_TESTS();
+ testing::GTEST_FLAG(throw_on_failure) = true;
+ testing::GTEST_FLAG(filter) = "socketUtil.init";
+ int itestts = RUN_ALL_TESTS();
+ return itestts;
}
|