Merge pull request #114 from githublaohu/unit-test-protocol-3
diff --git a/test/src/protocol/CommandHeaderTest.cpp b/test/src/protocol/CommandHeaderTest.cpp
new file mode 100644
index 0000000..7439c45
--- /dev/null
+++ b/test/src/protocol/CommandHeaderTest.cpp
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <memory>
+#include <string>
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "json/value.h"
+#include "json/writer.h"
+
+#include "CommandHeader.h"
+#include "MQClientException.h"
+#include "MessageSysFlag.h"
+#include "UtilAll.h"
+#include "dataBlock.h"
+#include "json/json.h"
+
+using std::shared_ptr;
+
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using Json::FastWriter;
+using Json::Value;
+
+using rocketmq::CommandHeader;
+using rocketmq::ConsumerSendMsgBackRequestHeader;
+using rocketmq::CreateTopicRequestHeader;
+using rocketmq::GetConsumerListByGroupRequestHeader;
+using rocketmq::GetConsumerListByGroupResponseBody;
+using rocketmq::GetConsumerListByGroupResponseHeader;
+using rocketmq::GetConsumerRunningInfoRequestHeader;
+using rocketmq::GetEarliestMsgStoretimeRequestHeader;
+using rocketmq::GetEarliestMsgStoretimeResponseHeader;
+using rocketmq::GetMaxOffsetRequestHeader;
+using rocketmq::GetMaxOffsetResponseHeader;
+using rocketmq::GetMinOffsetRequestHeader;
+using rocketmq::GetMinOffsetResponseHeader;
+using rocketmq::GetRouteInfoRequestHeader;
+using rocketmq::MemoryBlock;
+using rocketmq::NotifyConsumerIdsChangedRequestHeader;
+using rocketmq::PullMessageRequestHeader;
+using rocketmq::PullMessageResponseHeader;
+using rocketmq::QueryConsumerOffsetRequestHeader;
+using rocketmq::QueryConsumerOffsetResponseHeader;
+using rocketmq::ResetOffsetRequestHeader;
+using rocketmq::SearchOffsetRequestHeader;
+using rocketmq::SearchOffsetResponseHeader;
+using rocketmq::SendMessageRequestHeader;
+using rocketmq::SendMessageResponseHeader;
+using rocketmq::UnregisterClientRequestHeader;
+using rocketmq::UpdateConsumerOffsetRequestHeader;
+using rocketmq::ViewMessageRequestHeader;
+
+TEST(commandHeader, ConsumerSendMsgBackRequestHeader) {}
+
+TEST(commandHeader, GetConsumerListByGroupResponseBody) {
+ Value value;
+ value[0] = "body";
+ value[1] = 1;
+
+ Value root;
+ root["consumerIdList"] = value;
+
+ FastWriter writer;
+ string data = writer.write(root);
+
+ MemoryBlock *mem = new MemoryBlock(data.c_str(), data.size());
+ vector<string> cids;
+ GetConsumerListByGroupResponseBody::Decode(mem, cids);
+
+ EXPECT_EQ(cids.size(), 1);
+
+ delete mem;
+}
+
+TEST(commandHeader, ResetOffsetRequestHeader) {
+ ResetOffsetRequestHeader header;
+
+ header.setTopic("testTopic");
+ EXPECT_EQ(header.getTopic(), "testTopic");
+
+ header.setGroup("testGroup");
+ EXPECT_EQ(header.getGroup(), "testGroup");
+
+ header.setTimeStamp(123);
+ EXPECT_EQ(header.getTimeStamp(), 123);
+
+ header.setForceFlag(true);
+ EXPECT_TRUE(header.getForceFlag());
+
+ Value value;
+ value["isForce"] = "false";
+ shared_ptr<ResetOffsetRequestHeader> headersh(
+ static_cast<ResetOffsetRequestHeader *>(ResetOffsetRequestHeader::Decode(value)));
+ EXPECT_EQ(headersh->getTopic(), "");
+ EXPECT_EQ(headersh->getGroup(), "");
+ // EXPECT_EQ(headersh->getTimeStamp(), 0);
+ EXPECT_FALSE(headersh->getForceFlag());
+ value["topic"] = "testTopic";
+ headersh.reset(static_cast<ResetOffsetRequestHeader *>(ResetOffsetRequestHeader::Decode(value)));
+ EXPECT_EQ(headersh->getTopic(), "testTopic");
+ EXPECT_EQ(headersh->getGroup(), "");
+ // EXPECT_EQ(headersh->getTimeStamp(), 0);
+ EXPECT_FALSE(headersh->getForceFlag());
+
+ value["topic"] = "testTopic";
+ value["group"] = "testGroup";
+ headersh.reset(static_cast<ResetOffsetRequestHeader *>(ResetOffsetRequestHeader::Decode(value)));
+ EXPECT_EQ(headersh->getTopic(), "testTopic");
+ EXPECT_EQ(headersh->getGroup(), "testGroup");
+ // EXPECT_EQ(headersh->getTimeStamp(), 0);
+ EXPECT_FALSE(headersh->getForceFlag());
+
+ value["topic"] = "testTopic";
+ value["group"] = "testGroup";
+ value["timestamp"] = "123";
+ headersh.reset(static_cast<ResetOffsetRequestHeader *>(ResetOffsetRequestHeader::Decode(value)));
+ EXPECT_EQ(headersh->getTopic(), "testTopic");
+ EXPECT_EQ(headersh->getGroup(), "testGroup");
+ EXPECT_EQ(headersh->getTimeStamp(), 123);
+ EXPECT_FALSE(headersh->getForceFlag());
+
+ value["topic"] = "testTopic";
+ value["group"] = "testGroup";
+ value["timestamp"] = "123";
+ value["isForce"] = "1";
+ headersh.reset(static_cast<ResetOffsetRequestHeader *>(ResetOffsetRequestHeader::Decode(value)));
+ EXPECT_EQ(headersh->getTopic(), "testTopic");
+ EXPECT_EQ(headersh->getGroup(), "testGroup");
+ EXPECT_EQ(headersh->getTimeStamp(), 123);
+ EXPECT_TRUE(headersh->getForceFlag());
+}
+
+TEST(commandHeader, GetConsumerRunningInfoRequestHeader) {
+ GetConsumerRunningInfoRequestHeader header;
+ header.setClientId("testClientId");
+ header.setConsumerGroup("testConsumer");
+ header.setJstackEnable(true);
+
+ map<string, string> requestMap;
+ header.SetDeclaredFieldOfCommandHeader(requestMap);
+ EXPECT_EQ(requestMap["clientId"], "testClientId");
+ EXPECT_EQ(requestMap["consumerGroup"], "testConsumer");
+ EXPECT_EQ(requestMap["jstackEnable"], "1");
+
+ Value outData;
+ header.Encode(outData);
+ EXPECT_EQ(outData["clientId"], "testClientId");
+ EXPECT_EQ(outData["consumerGroup"], "testConsumer");
+ EXPECT_TRUE(outData["jstackEnable"].asBool());
+
+ shared_ptr<GetConsumerRunningInfoRequestHeader> decodeHeader(
+ static_cast<GetConsumerRunningInfoRequestHeader *>(GetConsumerRunningInfoRequestHeader::Decode(outData)));
+ EXPECT_EQ(decodeHeader->getClientId(), "testClientId");
+ EXPECT_EQ(decodeHeader->getConsumerGroup(), "testConsumer");
+ EXPECT_TRUE(decodeHeader->isJstackEnable());
+}
+
+TEST(commandHeader, NotifyConsumerIdsChangedRequestHeader) {
+ Json::Value ext;
+ shared_ptr<NotifyConsumerIdsChangedRequestHeader> header(
+ static_cast<NotifyConsumerIdsChangedRequestHeader *>(NotifyConsumerIdsChangedRequestHeader::Decode(ext)));
+ EXPECT_EQ(header->getGroup(), "");
+
+ ext["consumerGroup"] = "testGroup";
+ header.reset(
+ static_cast<NotifyConsumerIdsChangedRequestHeader *>(NotifyConsumerIdsChangedRequestHeader::Decode(ext)));
+ EXPECT_EQ(header->getGroup(), "testGroup");
+}
+
+int main(int argc, char *argv[]) {
+ InitGoogleMock(&argc, argv);
+ testing::GTEST_FLAG(throw_on_failure) = true;
+ testing::GTEST_FLAG(filter) = "commandHeader.*";
+ int itestts = RUN_ALL_TESTS();
+ return itestts;
+}
diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp
new file mode 100644
index 0000000..ef46eb1
--- /dev/null
+++ b/test/src/protocol/ConsumerRunningInfoTest.cpp
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <iostream>
+#include "map"
+#include "string"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "json/reader.h"
+#include "json/value.h"
+
+#include "ConsumerRunningInfo.h"
+#include "MessageQueue.h"
+#include "ProcessQueueInfo.h"
+#include "SubscriptionData.h"
+
+using std::map;
+using std::string;
+
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using Json::Reader;
+using Json::Value;
+
+using rocketmq::ConsumerRunningInfo;
+using rocketmq::MessageQueue;
+using rocketmq::ProcessQueueInfo;
+using rocketmq::SubscriptionData;
+
+TEST(consumerRunningInfo, init) {
+ ConsumerRunningInfo consumerRunningInfo;
+ consumerRunningInfo.setJstack("jstack");
+ EXPECT_EQ(consumerRunningInfo.getJstack(), "jstack");
+
+ EXPECT_TRUE(consumerRunningInfo.getProperties().empty());
+
+ consumerRunningInfo.setProperty("testKey", "testValue");
+ map<string, string> properties = consumerRunningInfo.getProperties();
+ EXPECT_EQ(properties["testKey"], "testValue");
+
+ consumerRunningInfo.setProperties(map<string, string>());
+ EXPECT_TRUE(consumerRunningInfo.getProperties().empty());
+
+ EXPECT_TRUE(consumerRunningInfo.getSubscriptionSet().empty());
+
+ vector<SubscriptionData> subscriptionSet;
+ subscriptionSet.push_back(SubscriptionData());
+
+ consumerRunningInfo.setSubscriptionSet(subscriptionSet);
+ EXPECT_EQ(consumerRunningInfo.getSubscriptionSet().size(), 1);
+
+ EXPECT_TRUE(consumerRunningInfo.getMqTable().empty());
+
+ MessageQueue messageQueue("testTopic", "testBroker", 3);
+ ProcessQueueInfo processQueueInfo;
+ processQueueInfo.commitOffset = 1024;
+ consumerRunningInfo.setMqTable(messageQueue, processQueueInfo);
+ map<MessageQueue, ProcessQueueInfo> mqTable = consumerRunningInfo.getMqTable();
+ EXPECT_EQ(mqTable[messageQueue].commitOffset, processQueueInfo.commitOffset);
+
+ // encode start
+ consumerRunningInfo.setProperty(ConsumerRunningInfo::PROP_NAMESERVER_ADDR, "127.0.0.1:9876");
+ consumerRunningInfo.setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE, "core_size");
+ consumerRunningInfo.setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, "consume_orderly");
+ consumerRunningInfo.setProperty(ConsumerRunningInfo::PROP_CONSUME_TYPE, "consume_type");
+ consumerRunningInfo.setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION, "client_version");
+ consumerRunningInfo.setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, "127");
+ // TODO
+ /* string outstr = consumerRunningInfo.encode();
+ std::cout<< outstr;
+ Value root;
+ Reader reader;
+ reader.parse(outstr.c_str(), root);
+
+ EXPECT_EQ(root["jstack"].asString() , "jstack");
+
+ Json::Value outData = root["properties"];
+ EXPECT_EQ(outData[ConsumerRunningInfo::PROP_NAMESERVER_ADDR].asString(),"127.0.0.1:9876");
+ EXPECT_EQ(
+ outData[ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE].asString(),
+ "core_size");
+ EXPECT_EQ(outData[ConsumerRunningInfo::PROP_CONSUME_ORDERLY].asString(),
+ "consume_orderly");
+ EXPECT_EQ(outData[ConsumerRunningInfo::PROP_CONSUME_TYPE].asString(),
+ "consume_type");
+ EXPECT_EQ(outData[ConsumerRunningInfo::PROP_CLIENT_VERSION].asString(),
+ "client_version");
+ EXPECT_EQ(
+ outData[ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP].asString(),
+ "127");
+
+ Json::Value subscriptionSetJson = root["subscriptionSet"];
+ EXPECT_EQ(subscriptionSetJson[0], subscriptionSet[0].toJson());
+
+ Json::Value mqTableJson = root["mqTable"];
+ EXPECT_EQ(mqTableJson[messageQueue.toJson().toStyledString()].asString(),
+ processQueueInfo.toJson().toStyledString());
+ */
+}
+
+int main(int argc, char *argv[]) {
+ InitGoogleMock(&argc, argv);
+ testing::GTEST_FLAG(throw_on_failure) = true;
+ testing::GTEST_FLAG(filter) = "consumerRunningInfo.*";
+ int itestts = RUN_ALL_TESTS();
+ return itestts;
+}
diff --git a/test/src/protocol/HeartbeatDataTest.cpp b/test/src/protocol/HeartbeatDataTest.cpp
new file mode 100644
index 0000000..21feae1
--- /dev/null
+++ b/test/src/protocol/HeartbeatDataTest.cpp
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "vector"
+
+#include "gmock/gmock.h"
+#include "gtest/gtest.h"
+
+#include "ConsumeType.h"
+#include "HeartbeatData.h"
+#include "SubscriptionData.h"
+
+using std::vector;
+
+using ::testing::InitGoogleMock;
+using ::testing::InitGoogleTest;
+using testing::Return;
+
+using rocketmq::ConsumeFromWhere;
+using rocketmq::ConsumerData;
+using rocketmq::ConsumeType;
+using rocketmq::HeartbeatData;
+using rocketmq::MessageModel;
+using rocketmq::ProducerData;
+using rocketmq::SubscriptionData;
+
+TEST(heartbeatData, ProducerData) {
+ ProducerData producerData;
+ producerData.groupName = "testGroup";
+
+ Json::Value outJson = producerData.toJson();
+ EXPECT_EQ(outJson["groupName"], "testGroup");
+}
+
+TEST(heartbeatData, ConsumerData) {
+ ConsumerData consumerData;
+ consumerData.groupName = "testGroup";
+ consumerData.consumeType = ConsumeType::CONSUME_ACTIVELY;
+ consumerData.messageModel = MessageModel::BROADCASTING;
+ consumerData.consumeFromWhere = ConsumeFromWhere::CONSUME_FROM_TIMESTAMP;
+
+ vector<SubscriptionData> subs;
+ subs.push_back(SubscriptionData("testTopic", "sub"));
+
+ consumerData.subscriptionDataSet = subs;
+
+ Json::Value outJson = consumerData.toJson();
+
+ EXPECT_EQ(outJson["groupName"], "testGroup");
+
+ EXPECT_EQ(outJson["consumeType"].asInt(), ConsumeType::CONSUME_ACTIVELY);
+ EXPECT_EQ(outJson["messageModel"].asInt(), MessageModel::BROADCASTING);
+ EXPECT_EQ(outJson["consumeFromWhere"].asInt(), ConsumeFromWhere::CONSUME_FROM_TIMESTAMP);
+
+ Json::Value subsValue = outJson["subscriptionDataSet"];
+ EXPECT_EQ(subsValue[0]["topic"], "testTopic");
+ EXPECT_EQ(subsValue[0]["subString"], "sub");
+}
+
+TEST(heartbeatData, HeartbeatData) {
+ HeartbeatData heartbeatData;
+ heartbeatData.setClientID("testClientId");
+
+ ProducerData producerData;
+ producerData.groupName = "testGroup";
+
+ EXPECT_TRUE(heartbeatData.isProducerDataSetEmpty());
+ heartbeatData.insertDataToProducerDataSet(producerData);
+ EXPECT_FALSE(heartbeatData.isProducerDataSetEmpty());
+
+ ConsumerData consumerData;
+ consumerData.groupName = "testGroup";
+ consumerData.consumeType = ConsumeType::CONSUME_ACTIVELY;
+ consumerData.messageModel = MessageModel::BROADCASTING;
+ consumerData.consumeFromWhere = ConsumeFromWhere::CONSUME_FROM_TIMESTAMP;
+
+ vector<SubscriptionData> subs;
+ subs.push_back(SubscriptionData("testTopic", "sub"));
+
+ consumerData.subscriptionDataSet = subs;
+ EXPECT_TRUE(heartbeatData.isConsumerDataSetEmpty());
+ heartbeatData.insertDataToConsumerDataSet(consumerData);
+ EXPECT_FALSE(heartbeatData.isConsumerDataSetEmpty());
+
+ string outData;
+ heartbeatData.Encode(outData);
+
+ Json::Value root;
+ Json::Reader reader;
+ reader.parse(outData, root);
+
+ EXPECT_EQ(root["clientID"], "testClientId");
+}
+
+int main(int argc, char *argv[]) {
+ InitGoogleMock(&argc, argv);
+ testing::GTEST_FLAG(throw_on_failure) = true;
+ testing::GTEST_FLAG(filter) = "heartbeatData.*";
+ int itestts = RUN_ALL_TESTS();
+ return itestts;
+}