| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <gtest/gtest.h> |
| #include <pulsar/Client.h> |
| #include <pulsar/MessageIdBuilder.h> |
| |
| #include <ctime> |
| #include <random> |
| |
| #include "ChunkMessageIdImpl.h" |
| #include "PulsarFriend.h" |
| #include "WaitUtils.h" |
| #include "lib/LogUtils.h" |
| |
| DECLARE_LOG_OBJECT() |
| |
| using namespace pulsar; |
| |
| static const std::string lookupUrl = "pulsar://localhost:6650"; |
| |
| // See the `maxMessageSize` config in test-conf/standalone-ssl.conf |
| static constexpr size_t maxMessageSize = 1024000; |
| |
| static std::string toString(CompressionType compressionType) { |
| switch (compressionType) { |
| case CompressionType::CompressionNone: |
| return "None"; |
| case CompressionType::CompressionLZ4: |
| return "LZ4"; |
| case CompressionType::CompressionZLib: |
| return "ZLib"; |
| case CompressionType::CompressionZSTD: |
| return "ZSTD"; |
| case CompressionType::CompressionSNAPPY: |
| return "SNAPPY"; |
| default: |
| return "Unknown (" + std::to_string(compressionType) + ")"; |
| } |
| } |
| |
| inline std::string createLargeMessage() { |
| std::string largeMessage(maxMessageSize * 3, 'a'); |
| std::default_random_engine e(time(nullptr)); |
| std::uniform_int_distribution<unsigned> u(0, 25); |
| for (size_t i = 0; i < largeMessage.size(); i++) { |
| largeMessage[i] = 'a' + u(e); |
| } |
| return largeMessage; |
| } |
| |
| class MessageChunkingTest : public ::testing::TestWithParam<CompressionType> { |
| public: |
| static std::string largeMessage; |
| |
| void TearDown() override { client_.close(); } |
| |
| void createProducer(const std::string& topic, Producer& producer) { |
| ProducerConfiguration conf; |
| conf.setBatchingEnabled(false); |
| conf.setChunkingEnabled(true); |
| conf.setCompressionType(GetParam()); |
| LOG_INFO("Create producer to topic: " << topic |
| << ", compression: " << toString(conf.getCompressionType())); |
| ASSERT_EQ(ResultOk, client_.createProducer(topic, conf, producer)); |
| } |
| |
| void createConsumer(const std::string& topic, Consumer& consumer) { |
| ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer)); |
| } |
| |
| void createConsumer(const std::string& topic, Consumer& consumer, ConsumerConfiguration& conf) { |
| ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf, consumer)); |
| } |
| |
| private: |
| Client client_{lookupUrl}; |
| }; |
| |
| std::string MessageChunkingTest::largeMessage = createLargeMessage(); |
| |
| TEST_F(MessageChunkingTest, testInvalidConfig) { |
| Client client(lookupUrl); |
| ProducerConfiguration conf; |
| conf.setBatchingEnabled(true); |
| conf.setChunkingEnabled(true); |
| Producer producer; |
| ASSERT_THROW(client.createProducer("xxx", conf, producer), std::invalid_argument); |
| client.close(); |
| } |
| |
| TEST_P(MessageChunkingTest, testEndToEnd) { |
| const std::string topic = |
| "MessageChunkingTest-EndToEnd-" + toString(GetParam()) + std::to_string(time(nullptr)); |
| Consumer consumer; |
| createConsumer(topic, consumer); |
| Producer producer; |
| createProducer(topic, producer); |
| |
| constexpr int numMessages = 10; |
| |
| std::vector<MessageId> sendMessageIds; |
| for (int i = 0; i < numMessages; i++) { |
| MessageId messageId; |
| ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); |
| auto chunkMsgId = |
| std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId)); |
| ASSERT_TRUE(chunkMsgId); |
| LOG_INFO("Send " << i << " to " << messageId); |
| sendMessageIds.emplace_back(messageId); |
| } |
| |
| Message msg; |
| std::vector<MessageId> receivedMessageIds; |
| for (int i = 0; i < numMessages; i++) { |
| ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); |
| LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId()); |
| ASSERT_EQ(msg.getDataAsString(), largeMessage); |
| ASSERT_EQ(msg.getMessageId().batchIndex(), -1); |
| ASSERT_EQ(msg.getMessageId().batchSize(), 0); |
| auto messageId = msg.getMessageId(); |
| auto chunkMsgId = |
| std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(messageId)); |
| ASSERT_TRUE(chunkMsgId); |
| receivedMessageIds.emplace_back(messageId); |
| } |
| ASSERT_EQ(receivedMessageIds, sendMessageIds); |
| ASSERT_EQ(receivedMessageIds.front().ledgerId(), receivedMessageIds.front().ledgerId()); |
| ASSERT_GT(receivedMessageIds.back().entryId(), numMessages); |
| |
| // Verify the cache has been cleared |
| auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); |
| ASSERT_EQ(chunkedMessageCache.size(), 0); |
| |
| producer.close(); |
| consumer.close(); |
| } |
| |
| TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) { |
| // This test is time-consuming and is not related to the compressionType. So skip other compressionType |
| // here. |
| if (toString(GetParam()) != "None") { |
| return; |
| } |
| const std::string topic = "MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) + |
| std::to_string(time(nullptr)); |
| Consumer consumer; |
| ConsumerConfiguration consumerConf; |
| consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000); |
| consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); |
| createConsumer(topic, consumer, consumerConf); |
| Producer producer; |
| createProducer(topic, producer); |
| |
| auto msg = MessageBuilder().setContent("test-data").build(); |
| auto& metadata = PulsarFriend::getMessageMetadata(msg); |
| metadata.set_num_chunks_from_msg(2); |
| metadata.set_chunk_id(0); |
| metadata.set_total_chunk_msg_size(100); |
| |
| producer.send(msg); |
| |
| auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer); |
| |
| waitUntil( |
| std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0; }, 1000); |
| ASSERT_EQ(chunkedMessageCache.size(), 1); |
| |
| // Wait for triggering the check of the expiration. |
| // Need to wait for 2 * expireTime because there may be a gap in checking the expiration time. |
| waitUntil( |
| std::chrono::seconds(10), [&] { return chunkedMessageCache.size() == 0; }, 1000); |
| ASSERT_EQ(chunkedMessageCache.size(), 0); |
| |
| producer.close(); |
| consumer.close(); |
| } |
| |
| TEST_P(MessageChunkingTest, testMaxPendingChunkMessages) { |
| if (toString(GetParam()) != "None") { |
| return; |
| } |
| const std::string topic = "MessageChunkingTest-testMaxPendingChunkMessages-" + toString(GetParam()) + |
| std::to_string(time(nullptr)); |
| Consumer consumer; |
| ConsumerConfiguration consumerConf; |
| consumerConf.setMaxPendingChunkedMessage(1); |
| consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true); |
| createConsumer(topic, consumer, consumerConf); |
| Producer producer; |
| createProducer(topic, producer); |
| |
| auto msg = MessageBuilder().setContent("chunk-0-0|").build(); |
| auto& metadata = PulsarFriend::getMessageMetadata(msg); |
| metadata.set_num_chunks_from_msg(2); |
| metadata.set_chunk_id(0); |
| metadata.set_uuid("0"); |
| metadata.set_total_chunk_msg_size(100); |
| |
| producer.send(msg); |
| |
| auto msg2 = MessageBuilder().setContent("chunk-1-0|").build(); |
| auto& metadata2 = PulsarFriend::getMessageMetadata(msg2); |
| metadata2.set_num_chunks_from_msg(2); |
| metadata2.set_uuid("1"); |
| metadata2.set_chunk_id(0); |
| metadata2.set_total_chunk_msg_size(100); |
| |
| producer.send(msg2); |
| |
| auto msg3 = MessageBuilder().setContent("chunk-1-1|").build(); |
| auto& metadata3 = PulsarFriend::getMessageMetadata(msg3); |
| metadata3.set_num_chunks_from_msg(2); |
| metadata3.set_uuid("1"); |
| metadata3.set_chunk_id(1); |
| metadata3.set_total_chunk_msg_size(100); |
| |
| producer.send(msg3); |
| |
| Message receivedMsg; |
| ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 3000)); |
| ASSERT_EQ(receivedMsg.getDataAsString(), "chunk-1-0|chunk-1-1|"); |
| |
| consumer.redeliverUnacknowledgedMessages(); |
| |
| // The consumer may acknowledge the wrong message(the latest message) in the old version of codes. This |
| // test case ensure that it should not happen again. |
| Message receivedMsg2; |
| ASSERT_EQ(ResultOk, consumer.receive(receivedMsg2, 3000)); |
| ASSERT_EQ(receivedMsg2.getDataAsString(), "chunk-1-0|chunk-1-1|"); |
| |
| consumer.acknowledge(receivedMsg2); |
| |
| consumer.redeliverUnacknowledgedMessages(); |
| auto msg4 = MessageBuilder().setContent("chunk-0-1|").build(); |
| auto& metadata4 = PulsarFriend::getMessageMetadata(msg4); |
| metadata4.set_num_chunks_from_msg(2); |
| metadata4.set_uuid("0"); |
| metadata4.set_chunk_id(1); |
| metadata4.set_total_chunk_msg_size(100); |
| |
| producer.send(msg4); |
| |
| // This ensures that the message chunk-0-0 was acknowledged successfully. So we cannot receive it anymore. |
| Message receivedMsg3; |
| consumer.receive(receivedMsg3, 3000); |
| |
| producer.close(); |
| consumer.close(); |
| } |
| |
| TEST_P(MessageChunkingTest, testSeekChunkMessages) { |
| const std::string topic = |
| "MessageChunkingTest-testSeekChunkMessages-" + toString(GetParam()) + std::to_string(time(nullptr)); |
| |
| constexpr int numMessages = 10; |
| |
| Consumer consumer1; |
| ConsumerConfiguration consumer1Conf; |
| consumer1Conf.setStartMessageIdInclusive(true); |
| createConsumer(topic, consumer1, consumer1Conf); |
| |
| Producer producer; |
| createProducer(topic, producer); |
| |
| for (int i = 0; i < numMessages; i++) { |
| MessageId messageId; |
| ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent(largeMessage).build(), messageId)); |
| LOG_INFO("Send " << i << " to " << messageId); |
| } |
| |
| Message msg; |
| std::vector<MessageId> receivedMessageIds; |
| for (int i = 0; i < numMessages; i++) { |
| ASSERT_EQ(ResultOk, consumer1.receive(msg, 3000)); |
| LOG_INFO("Receive " << msg.getLength() << " bytes from " << msg.getMessageId()); |
| receivedMessageIds.emplace_back(msg.getMessageId()); |
| } |
| |
| consumer1.seek(receivedMessageIds[1]); |
| for (int i = 1; i < numMessages; i++) { |
| Message msgAfterSeek; |
| ASSERT_EQ(ResultOk, consumer1.receive(msgAfterSeek, 3000)); |
| ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]); |
| } |
| |
| consumer1.close(); |
| Consumer consumer2; |
| createConsumer(topic, consumer2); |
| |
| consumer2.seek(receivedMessageIds[1]); |
| for (int i = 2; i < numMessages; i++) { |
| Message msgAfterSeek; |
| ASSERT_EQ(ResultOk, consumer2.receive(msgAfterSeek, 3000)); |
| ASSERT_EQ(msgAfterSeek.getMessageId(), receivedMessageIds[i]); |
| } |
| |
| consumer2.close(); |
| producer.close(); |
| } |
| |
| TEST(ChunkMessageIdTest, testSetChunkMessageId) { |
| MessageId msgId; |
| { |
| ChunkMessageIdImplPtr chunkMsgId = std::make_shared<ChunkMessageIdImpl>(); |
| chunkMsgId->setFirstChunkMessageId(MessageIdBuilder().ledgerId(1).entryId(2).partition(3).build()); |
| chunkMsgId->setLastChunkMessageId(MessageIdBuilder().ledgerId(4).entryId(5).partition(6).build()); |
| msgId = chunkMsgId->build(); |
| // Test the destructor of the underlying message id should also work for the generated messageId. |
| } |
| |
| std::string msgIdData; |
| msgId.serialize(msgIdData); |
| MessageId deserializedMsgId = MessageId::deserialize(msgIdData); |
| |
| ASSERT_EQ(deserializedMsgId.ledgerId(), 4); |
| ASSERT_EQ(deserializedMsgId.entryId(), 5); |
| ASSERT_EQ(deserializedMsgId.partition(), 6); |
| |
| auto chunkMsgId = |
| std::dynamic_pointer_cast<ChunkMessageIdImpl>(PulsarFriend::getMessageIdImpl(deserializedMsgId)); |
| ASSERT_TRUE(chunkMsgId); |
| auto firstChunkMsgId = chunkMsgId->getFirstChunkMessageId(); |
| ASSERT_EQ(firstChunkMsgId->ledgerId_, 1); |
| ASSERT_EQ(firstChunkMsgId->entryId_, 2); |
| ASSERT_EQ(firstChunkMsgId->partition_, 3); |
| } |
| |
| // The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't have INSTANTIATE_TEST_SUITE_P |
| INSTANTIATE_TEST_CASE_P(Pulsar, MessageChunkingTest, |
| ::testing::Values(CompressionNone, CompressionLZ4, CompressionZLib, CompressionZSTD, |
| CompressionSNAPPY)); |