rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ding...@apache.org
Subject [rocketmq-client-cpp] branch master updated: Add unit-test for transport (#110)
Date Wed, 03 Apr 2019 02:20:51 GMT
This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f83ddc  Add unit-test for transport (#110)
3f83ddc is described below

commit 3f83ddcbe5d0c869282f5c672745ed06c902c9c3
Author: githublaohu <2372554140@qq.com>
AuthorDate: Wed Apr 3 10:20:47 2019 +0800

    Add unit-test for transport (#110)
    
    Add unit-test for transport
---
 test/src/transport/ClientRemotingProcessorTest.cpp | 235 +++++++++++++++++++++
 test/src/transport/ResponseFutureTest.cpp          | 180 ++++++++++++++++
 .../SocketUtilTest.cpp}                            |  43 ++--
 3 files changed, 438 insertions(+), 20 deletions(-)

diff --git a/test/src/transport/ClientRemotingProcessorTest.cpp b/test/src/transport/ClientRemotingProcessorTest.cpp
new file mode 100644
index 0000000..80348b0
--- /dev/null
+++ b/test/src/transport/ClientRemotingProcessorTest.cpp
@@ -0,0 +1,235 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include "map"
+#include "string.h"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "json/value.h"
+#include "json/writer.h"
+
+#include "ClientRPCHook.h"
+#include "ClientRemotingProcessor.h"
+#include "ConsumerRunningInfo.h"
+#include "MQClientFactory.h"
+#include "MQMessageQueue.h"
+#include "MQProtos.h"
+#include "RemotingCommand.h"
+#include "SessionCredentials.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+
+using std::map;
+using std::string;
+
+using ::testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Mock;
+using testing::Return;
+using testing::SetArgReferee;
+
+using Json::FastWriter;
+using Json::Value;
+
+using rocketmq::ClientRemotingProcessor;
+using rocketmq::ClientRPCHook;
+using rocketmq::ConsumerRunningInfo;
+using rocketmq::GetConsumerRunningInfoRequestHeader;
+using rocketmq::MemoryBlock;
+using rocketmq::MQClientFactory;
+using rocketmq::MQMessageQueue;
+using rocketmq::MQRequestCode;
+using rocketmq::MQResponseCode;
+using rocketmq::NotifyConsumerIdsChangedRequestHeader;
+using rocketmq::RemotingCommand;
+using rocketmq::ResetOffsetBody;
+using rocketmq::ResetOffsetRequestHeader;
+using rocketmq::SessionCredentials;
+using rocketmq::UtilAll;
+
+class MockClientRemotingProcessor : public ClientRemotingProcessor {
+   public:
+    MockClientRemotingProcessor(MQClientFactory *factrory) : ClientRemotingProcessor(factrory)
{}
+    MOCK_METHOD1(resetOffset, RemotingCommand *(RemotingCommand *request));
+    MOCK_METHOD1(getConsumerRunningInfo, RemotingCommand *(RemotingCommand *request));
+    MOCK_METHOD1(notifyConsumerIdsChanged, RemotingCommand *(RemotingCommand *request));
+};
+
+class MockMQClientFactory : public MQClientFactory {
+   public:
+    MockMQClientFactory(const string &clientID,
+                        int pullThreadNum,
+                        uint64_t tcpConnectTimeout,
+                        uint64_t tcpTransportTryLockTimeout,
+                        string unitName)
+        : MQClientFactory(clientID, pullThreadNum, tcpConnectTimeout, tcpTransportTryLockTimeout,
unitName) {}
+
+    MOCK_METHOD3(resetOffset,
+                 void(const string &group, const string &topic, const map<MQMessageQueue,
int64> &offsetTable));
+    MOCK_METHOD1(consumerRunningInfo, ConsumerRunningInfo *(const string &consumerGroup));
+    MOCK_METHOD2(getSessionCredentialFromConsumer,
+                 bool(const string &consumerGroup, SessionCredentials &sessionCredentials));
+    MOCK_METHOD1(doRebalanceByConsumerGroup, void(const string &consumerGroup));
+};
+
+TEST(clientRemotingProcessor, processRequest) {
+    MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+    ClientRemotingProcessor clientRemotingProcessor(factory);
+
+    string addr = "127.0.0.1:9876";
+    RemotingCommand *command = new RemotingCommand();
+    RemotingCommand *pResponse = new RemotingCommand(13);
+
+    pResponse->setCode(MQRequestCode::RESET_CONSUMER_CLIENT_OFFSET);
+    command->setCode(MQRequestCode::RESET_CONSUMER_CLIENT_OFFSET);
+    EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+    EXPECT_EQ(nullptr, clientRemotingProcessor.processRequest(addr, command));
+
+    NotifyConsumerIdsChangedRequestHeader *header = new NotifyConsumerIdsChangedRequestHeader();
+    header->setGroup("testGroup");
+    RemotingCommand *twoCommand = new RemotingCommand(MQRequestCode::NOTIFY_CONSUMER_IDS_CHANGED,
header);
+
+    EXPECT_EQ(NULL, clientRemotingProcessor.processRequest(addr, twoCommand));
+
+    command->setCode(MQRequestCode::GET_CONSUMER_RUNNING_INFO);
+    // EXPECT_EQ(NULL , clientRemotingProcessor.processRequest(addr, command));
+
+    command->setCode(MQRequestCode::CHECK_TRANSACTION_STATE);
+    EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+    command->setCode(MQRequestCode::GET_CONSUMER_STATUS_FROM_CLIENT);
+    EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+    command->setCode(MQRequestCode::CONSUME_MESSAGE_DIRECTLY);
+    EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+    command->setCode(1);
+    EXPECT_TRUE(clientRemotingProcessor.processRequest(addr, command) == nullptr);
+
+    delete command;
+    delete pResponse;
+}
+
+TEST(clientRemotingProcessor, resetOffset) {
+    MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+    Mock::AllowLeak(factory);
+    ClientRemotingProcessor clientRemotingProcessor(factory);
+    Value root;
+    Value messageQueues;
+    Value messageQueue;
+    messageQueue["brokerName"] = "testBroker";
+    messageQueue["queueId"] = 4;
+    messageQueue["topic"] = "testTopic";
+    messageQueue["offset"] = 1024;
+
+    messageQueues.append(messageQueue);
+    root["offsetTable"] = messageQueues;
+
+    FastWriter wrtier;
+    string strData = wrtier.write(root);
+
+    ResetOffsetRequestHeader *header = new ResetOffsetRequestHeader();
+    RemotingCommand *request = new RemotingCommand(13, header);
+
+    EXPECT_CALL(*factory, resetOffset(_, _, _)).Times(1);
+    clientRemotingProcessor.resetOffset(request);
+
+    request->SetBody(strData.c_str(), strData.size() - 2);
+    clientRemotingProcessor.resetOffset(request);
+
+    request->SetBody(strData.c_str(), strData.size());
+    clientRemotingProcessor.resetOffset(request);
+
+    delete header;
+    delete request;
+}
+
+TEST(clientRemotingProcessorS, getConsumerRunningInfo) {
+    MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+    ConsumerRunningInfo *info = new ConsumerRunningInfo();
+    EXPECT_CALL(*factory, consumerRunningInfo(_)).Times(2).WillOnce(Return(info)).WillOnce(Return(info));
+    EXPECT_CALL(*factory, getSessionCredentialFromConsumer(_, _))
+        .Times(2);  //.WillRepeatedly(SetArgReferee<1>(sessionCredentials));
+    ClientRemotingProcessor clientRemotingProcessor(factory);
+
+    GetConsumerRunningInfoRequestHeader *header = new GetConsumerRunningInfoRequestHeader();
+    header->setConsumerGroup("testGroup");
+
+    RemotingCommand *request = new RemotingCommand(14, header);
+
+    RemotingCommand *command = clientRemotingProcessor.getConsumerRunningInfo("127.0.0.1:9876",
request);
+    EXPECT_EQ(command->getCode(), MQResponseCode::SYSTEM_ERROR);
+    EXPECT_EQ(command->getRemark(), "The Consumer Group not exist in this consumer");
+    delete command;
+    delete request;
+}
+
+TEST(clientRemotingProcessor, notifyConsumerIdsChanged) {
+    MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+    Mock::AllowLeak(factory);
+    ClientRemotingProcessor clientRemotingProcessor(factory);
+    NotifyConsumerIdsChangedRequestHeader *header = new NotifyConsumerIdsChangedRequestHeader();
+    header->setGroup("testGroup");
+    RemotingCommand *request = new RemotingCommand(14, header);
+
+    EXPECT_CALL(*factory, doRebalanceByConsumerGroup(_)).Times(1);
+    clientRemotingProcessor.notifyConsumerIdsChanged(request);
+
+    delete request;
+}
+
+TEST(clientRemotingProcessor, resetOffsetBody) {
+    MockMQClientFactory *factory = new MockMQClientFactory("testClientId", 4, 3000, 4000,
"a");
+    ClientRemotingProcessor clientRemotingProcessor(factory);
+
+    Value root;
+    Value messageQueues;
+    Value messageQueue;
+    messageQueue["brokerName"] = "testBroker";
+    messageQueue["queueId"] = 4;
+    messageQueue["topic"] = "testTopic";
+    messageQueue["offset"] = 1024;
+
+    messageQueues.append(messageQueue);
+    root["offsetTable"] = messageQueues;
+
+    FastWriter wrtier;
+    string strData = wrtier.write(root);
+
+    MemoryBlock *mem = new MemoryBlock(strData.c_str(), strData.size());
+
+    ResetOffsetBody *resetOffset = ResetOffsetBody::Decode(mem);
+
+    map<MQMessageQueue, int64> map = resetOffset->getOffsetTable();
+    MQMessageQueue mqmq("testTopic", "testBroker", 4);
+    EXPECT_EQ(map[mqmq], 1024);
+    Mock::AllowLeak(factory);
+    delete resetOffset;
+    delete mem;
+}
+
+int main(int argc, char *argv[]) {
+    InitGoogleMock(&argc, argv);
+    testing::GTEST_FLAG(throw_on_failure) = true;
+    testing::GTEST_FLAG(filter) = "clientRemotingProcessor.*";
+    int itestts = RUN_ALL_TESTS();
+    return itestts;
+}
diff --git a/test/src/transport/ResponseFutureTest.cpp b/test/src/transport/ResponseFutureTest.cpp
new file mode 100644
index 0000000..483fced
--- /dev/null
+++ b/test/src/transport/ResponseFutureTest.cpp
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "AsyncCallback.h"
+#include "AsyncCallbackWrap.h"
+#include "MQClientAPIImpl.h"
+#include "MQMessage.h"
+#include "RemotingCommand.h"
+#include "ResponseFuture.h"
+#include "TcpRemotingClient.h"
+#include "UtilAll.h"
+
+using ::testing::_;
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using rocketmq::AsyncCallback;
+using rocketmq::asyncCallBackStatus;
+using rocketmq::asyncCallBackType;
+using rocketmq::AsyncCallbackWrap;
+using rocketmq::MQClientAPIImpl;
+using rocketmq::MQMessage;
+using rocketmq::RemotingCommand;
+using rocketmq::ResponseFuture;
+using rocketmq::SendCallbackWrap;
+using rocketmq::TcpRemotingClient;
+using rocketmq::UtilAll;
+
+class MockAsyncCallbackWrap : public SendCallbackWrap {
+   public:
+    MockAsyncCallbackWrap(AsyncCallback *pAsyncCallback, MQClientAPIImpl *pclientAPI)
+        : SendCallbackWrap("", MQMessage(), pAsyncCallback, pclientAPI) {}
+
+    MOCK_METHOD2(operationComplete, void(ResponseFuture *, bool));
+    MOCK_METHOD0(onException, void());
+    asyncCallBackType getCallbackType() { return asyncCallBackType::sendCallbackWrap; }
+};
+
+TEST(responseFuture, init) {
+    ResponseFuture responseFuture(13, 4, NULL, 1000);
+    EXPECT_EQ(responseFuture.getRequestCode(), 13);
+    EXPECT_EQ(responseFuture.getOpaque(), 4);
+
+    EXPECT_EQ(responseFuture.getRequestCommand().getCode(), 0);
+    EXPECT_FALSE(responseFuture.isSendRequestOK());
+    EXPECT_EQ(responseFuture.getMaxRetrySendTimes(), 1);
+    EXPECT_EQ(responseFuture.getRetrySendTimes(), 1);
+    EXPECT_EQ(responseFuture.getBrokerAddr(), "");
+
+    EXPECT_FALSE(responseFuture.getASyncFlag());
+    EXPECT_TRUE(responseFuture.getAsyncResponseFlag());
+    EXPECT_FALSE(responseFuture.getSyncResponseFlag());
+    EXPECT_TRUE(responseFuture.getAsyncCallbackWrap() == nullptr);
+
+    // ~ResponseFuture  delete pcall
+    SendCallbackWrap *pcall = new SendCallbackWrap("", MQMessage(), nullptr, nullptr);
+    ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, true, pcall);
+    EXPECT_TRUE(twoResponseFuture.getASyncFlag());
+    EXPECT_FALSE(twoResponseFuture.getAsyncResponseFlag());
+    EXPECT_TRUE(twoResponseFuture.getSyncResponseFlag());
+    EXPECT_FALSE(twoResponseFuture.getAsyncCallbackWrap() == nullptr);
+}
+
+TEST(responseFuture, info) {
+    ResponseFuture responseFuture(13, 4, NULL, 1000);
+
+    responseFuture.setAsyncResponseFlag();
+    EXPECT_TRUE(responseFuture.getAsyncResponseFlag());
+
+    responseFuture.setBrokerAddr("127.0.0.1:9876");
+    EXPECT_EQ(responseFuture.getBrokerAddr(), "127.0.0.1:9876");
+
+    responseFuture.setMaxRetrySendTimes(3000);
+    EXPECT_EQ(responseFuture.getMaxRetrySendTimes(), 3000);
+
+    responseFuture.setRetrySendTimes(3000);
+    EXPECT_EQ(responseFuture.getRetrySendTimes(), 3000);
+
+    responseFuture.setSendRequestOK(true);
+    EXPECT_TRUE(responseFuture.isSendRequestOK());
+}
+
+TEST(responseFuture, response) {
+    // m_bAsync = false  m_syncResponse
+    ResponseFuture responseFuture(13, 4, NULL, 1000);
+
+    EXPECT_FALSE(responseFuture.getASyncFlag());
+    EXPECT_FALSE(responseFuture.getSyncResponseFlag());
+    EXPECT_TRUE(responseFuture.getAsyncResponseFlag());
+
+    RemotingCommand *pResponseCommand = NULL;
+    responseFuture.setResponse(pResponseCommand);
+    EXPECT_EQ(responseFuture.getRequestCommand().getCode(), 0);
+
+    EXPECT_TRUE(responseFuture.getSyncResponseFlag());
+
+    // m_bAsync = true  m_syncResponse
+    ResponseFuture twoResponseFuture(13, 4, NULL, 1000, true);
+
+    EXPECT_TRUE(twoResponseFuture.getASyncFlag());
+    EXPECT_TRUE(twoResponseFuture.getSyncResponseFlag());
+    EXPECT_FALSE(twoResponseFuture.getAsyncResponseFlag());
+
+    twoResponseFuture.setResponse(pResponseCommand);
+    EXPECT_TRUE(twoResponseFuture.getSyncResponseFlag());
+
+    ResponseFuture threeSesponseFuture(13, 4, NULL, 1000);
+
+    uint64_t millis = UtilAll::currentTimeMillis();
+    RemotingCommand *remotingCommand = threeSesponseFuture.waitResponse(10);
+    uint64_t useTime = UtilAll::currentTimeMillis() - millis;
+    EXPECT_LT(useTime, 30);
+
+    EXPECT_TRUE(responseFuture.getSyncResponseFlag());
+    EXPECT_EQ(NULL, remotingCommand);
+}
+
+TEST(responseFuture, executeInvokeCallback) {
+    //  executeInvokeCallback delete wrap
+    MockAsyncCallbackWrap *wrap = new MockAsyncCallbackWrap(nullptr, nullptr);
+    ResponseFuture responseFuture(13, 4, nullptr, 1000, false, wrap);
+
+    RemotingCommand *pResponseCommand = new RemotingCommand();
+    responseFuture.setResponse(pResponseCommand);
+    responseFuture.executeInvokeCallback();
+    EXPECT_EQ(NULL, responseFuture.getCommand());
+
+    EXPECT_CALL(*wrap, operationComplete(_, _)).Times(1);
+    pResponseCommand = new RemotingCommand();
+    responseFuture.setResponse(pResponseCommand);
+    responseFuture.setAsyncCallBackStatus(asyncCallBackStatus::asyncCallBackStatus_response);
+    responseFuture.executeInvokeCallback();
+    EXPECT_EQ(pResponseCommand->getCode(), 0);
+
+    ResponseFuture twoResponseFuture(13, 4, nullptr, 1000, false, NULL);
+    pResponseCommand = new RemotingCommand();
+    twoResponseFuture.executeInvokeCallback();
+    EXPECT_EQ(NULL, twoResponseFuture.getCommand());
+}
+
+TEST(responseFuture, executeInvokeCallbackException) {
+    //  executeInvokeCallbackException delete wrap
+    MockAsyncCallbackWrap *wrap = new MockAsyncCallbackWrap(nullptr, nullptr);
+
+    ResponseFuture responseFuture(13, 4, nullptr, 1000, false, wrap);
+
+    EXPECT_CALL(*wrap, onException()).Times(1);
+    responseFuture.executeInvokeCallbackException();
+
+    responseFuture.setAsyncCallBackStatus(asyncCallBackStatus::asyncCallBackStatus_timeout);
+    responseFuture.executeInvokeCallbackException();
+
+    ResponseFuture twoRresponseFuture(13, 4, nullptr, 1000, false, NULL);
+    twoRresponseFuture.executeInvokeCallbackException();
+}
+
+int main(int argc, char *argv[]) {
+    InitGoogleMock(&argc, argv);
+    testing::GTEST_FLAG(throw_on_failure) = true;
+    testing::GTEST_FLAG(filter) = "responseFuture.*";
+    int itestts = RUN_ALL_TESTS();
+    return itestts;
+}
diff --git a/test/src/MQDecoderTest.cpp b/test/src/transport/SocketUtilTest.cpp
similarity index 61%
rename from test/src/MQDecoderTest.cpp
rename to test/src/transport/SocketUtilTest.cpp
index 263fe30..875f2e7 100644
--- a/test/src/MQDecoderTest.cpp
+++ b/test/src/transport/SocketUtilTest.cpp
@@ -14,31 +14,34 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-#include "gtest/gtest.h"
 #include "gmock/gmock.h"
-#include <unistd.h>
-#include <stdio.h>
-#include "BatchMessage.h"
-#include "MQMessage.h"
-#include <map>
-#include "MQDecoder.h"
-
-using namespace std;
-using namespace rocketmq;
-using ::testing::InitGoogleTest;
+#include "gtest/gtest.h"
+
+#include "SocketUtil.h"
+
 using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
 using testing::Return;
 
-TEST(MQDecoderTest, messageProperties2String) {
-    map<string, string> properties;
-    string property = MQDecoder::messageProperties2String(properties);
-    EXPECT_EQ(property.size(), 0);
-    properties["aaa"] = "aaa";
-    property = MQDecoder::messageProperties2String(properties);
-    EXPECT_EQ(property.size(), 8);
+TEST(socketUtil, init) {
+    sockaddr addr = rocketmq::IPPort2socketAddress(inet_addr("127.0.0.1"), 10091);
+
+    EXPECT_EQ(rocketmq::socketAddress2IPPort(addr), "1.0.0.127:10091");
+
+    int host;
+    int port;
+
+    rocketmq::socketAddress2IPPort(addr, host, port);
+    EXPECT_EQ(host, inet_addr("127.0.0.1"));
+    EXPECT_EQ(port, 10091);
+
+    EXPECT_EQ(rocketmq::socketAddress2String(addr), "1.0.0.127");
 }
 
-int main(int argc, char* argv[]) {
+int main(int argc, char *argv[]) {
     InitGoogleMock(&argc, argv);
-    return RUN_ALL_TESTS();
+    testing::GTEST_FLAG(throw_on_failure) = true;
+    testing::GTEST_FLAG(filter) = "socketUtil.init";
+    int itestts = RUN_ALL_TESTS();
+    return itestts;
 }


Mime
View raw message