blob: 74e55ce7798f785688febaec15929859a6c92671 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MessageDecoder.h"
#include <string>
#include <vector>
#include <arpa/inet.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include "ByteArray.h"
#include "ByteBuffer.hpp"
#include "MQMessage.h"
#include "MQMessageConst.h"
#include "MQMessageExt.h"
#include "MessageId.h"
#include "MessageSysFlag.h"
#include "RemotingCommand.h"
#include "UtilAll.h"
#include "protocol/header/CommandHeader.h"
using testing::InitGoogleMock;
using testing::InitGoogleTest;
using testing::Return;
using rocketmq::ByteArray;
using rocketmq::ByteBuffer;
using rocketmq::MessageDecoder;
using rocketmq::MessageId;
using rocketmq::MessageSysFlag;
using rocketmq::MQMessage;
using rocketmq::MQMessageConst;
using rocketmq::MQMessageExt;
using rocketmq::RemotingCommand;
using rocketmq::SendMessageRequestHeader;
using rocketmq::stoba;
using rocketmq::UtilAll;
// TODO
TEST(MessageDecoderTest, MessageId) {
std::string strMsgId = MessageDecoder::createMessageId(rocketmq::StringToSockaddr("127.0.0.1:10091"), 1024LL);
EXPECT_EQ(strMsgId, "7F0000010000276B0000000000000400");
MessageId msgId = MessageDecoder::decodeMessageId(strMsgId);
EXPECT_EQ(msgId.getOffset(), 1024LL);
std::string strMsgId2 = MessageDecoder::createMessageId(rocketmq::StringToSockaddr("/172.16.2.114:0"), 123456LL);
EXPECT_EQ(strMsgId2, "AC10027200000000000000000001E240");
MessageId msgId2 = MessageDecoder::decodeMessageId(strMsgId2);
EXPECT_EQ(msgId2.getOffset(), 123456LL);
}
TEST(MessageDecoderTest, Decode) {
std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(1024));
MQMessageExt msgExt;
// 1 TOTALSIZE 4=0+4
byteBuffer->putInt(111);
msgExt.set_store_size(111);
// 2 MAGICCODE sizeof(int) 8=4+4
byteBuffer->putInt(14);
// 3 BODYCRC 12=8+4
byteBuffer->putInt(24);
msgExt.set_body_crc(24);
// 4 QUEUEID 16=12+4
byteBuffer->putInt(4);
msgExt.set_queue_id(4);
// 5 FLAG 20=16+4
byteBuffer->putInt(4);
msgExt.set_flag(4);
// 6 QUEUEOFFSET 28=20+8
byteBuffer->putLong(1024LL);
msgExt.set_queue_offset(1024LL);
// 7 PHYSICALOFFSET 36=28+8
byteBuffer->putLong(2048LL);
msgExt.set_commit_log_offset(2048LL);
// 8 SYSFLAG 40=36+4
byteBuffer->putInt(0);
msgExt.set_sys_flag(0);
// 9 BORNTIMESTAMP 48=40+8
byteBuffer->putLong(4096LL);
msgExt.set_born_timestamp(4096LL);
// 10 BORNHOST 56=48+4+4
byteBuffer->putInt(ntohl(inet_addr("127.0.0.1")));
byteBuffer->putInt(10091);
msgExt.set_born_host(rocketmq::StringToSockaddr("127.0.0.1:10091"));
// 11 STORETIMESTAMP 64=56+8
byteBuffer->putLong(4096LL);
msgExt.set_store_timestamp(4096LL);
// 12 STOREHOST 72=64+4+4
byteBuffer->putInt(ntohl(inet_addr("127.0.0.2")));
byteBuffer->putInt(10092);
msgExt.set_store_host(rocketmq::StringToSockaddr("127.0.0.2:10092"));
// 13 RECONSUMETIMES 76=72+4
byteBuffer->putInt(18);
msgExt.set_reconsume_times(18);
// 14 Prepared Transaction Offset 84=76+8
byteBuffer->putLong(12LL);
msgExt.set_prepared_transaction_offset(12LL);
// 15 BODY 98=84+4+10
std::string body("1234567890");
byteBuffer->putInt(body.size());
byteBuffer->put(*stoba(body));
msgExt.set_body(body);
// 16 TOPIC 109=98+1+10
std::string topic = "topic_1234";
byteBuffer->put((int8_t)topic.size());
byteBuffer->put(*stoba(topic));
msgExt.set_topic(topic);
// 17 PROPERTIES 111=109+2
byteBuffer->putShort(0);
msgExt.set_msg_id(MessageDecoder::createMessageId(msgExt.store_host(), (int64_t)msgExt.commit_log_offset()));
byteBuffer->flip();
auto msgs = MessageDecoder::decodes(*byteBuffer);
EXPECT_EQ(msgs.size(), 1);
std::cout << msgs[0]->toString() << std::endl;
std::cout << msgExt.toString() << std::endl;
EXPECT_EQ(msgs[0]->toString(), msgExt.toString());
byteBuffer->rewind();
msgs = MessageDecoder::decodes(*byteBuffer, false);
EXPECT_EQ(msgs[0]->body(), "");
//===============================================================
byteBuffer->clear();
// 8 SYSFLAG 40=36+4
byteBuffer->position(36);
byteBuffer->putInt(0 | MessageSysFlag::COMPRESSED_FLAG);
msgExt.set_sys_flag(0 | MessageSysFlag::COMPRESSED_FLAG);
// 15 Body 84
std::string compressedBody;
UtilAll::deflate(body, compressedBody, 5);
byteBuffer->position(84);
byteBuffer->putInt(compressedBody.size());
byteBuffer->put(*stoba(compressedBody));
msgExt.set_body(compressedBody);
// 16 TOPIC
byteBuffer->put((int8_t)topic.size());
byteBuffer->put(*stoba(topic));
msgExt.set_topic(topic);
// 17 PROPERTIES
std::map<std::string, std::string> properties;
properties["RocketMQ"] = "cpp-client";
properties[MQMessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX] = "123456";
std::string props = MessageDecoder::messageProperties2String(properties);
byteBuffer->putShort(props.size());
byteBuffer->put(*stoba(props));
msgExt.set_properties(properties);
msgExt.set_msg_id("123456");
byteBuffer->flip();
byteBuffer->putInt(byteBuffer->limit());
msgExt.set_store_size(byteBuffer->limit());
byteBuffer->rewind();
msgs = MessageDecoder::decodes(*byteBuffer);
EXPECT_EQ(msgs[0]->toString(), msgExt.toString());
}
TEST(MessageDecoderTest, MessagePropertiesAndToString) {
std::map<std::string, std::string> properties;
properties["RocketMQ"] = "cpp-client";
std::string props = MessageDecoder::messageProperties2String(properties);
EXPECT_EQ(props, "RocketMQ\001cpp-client\002");
auto properties2 = MessageDecoder::string2messageProperties(props);
EXPECT_EQ(properties, properties2);
}
int main(int argc, char* argv[]) {
InitGoogleMock(&argc, argv);
testing::GTEST_FLAG(throw_on_failure) = true;
testing::GTEST_FLAG(filter) = "MessageDecoderTest.*";
return RUN_ALL_TESTS();
}