blob: 9ca131095e10e87a0191072d70e91afef9a659ed [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <atomic>
#include <chrono>
#include <set>
#include <thread>
#include "ConsumerWrapper.h"
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
#include "lib/MessageIdUtil.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
extern std::string unique_str();
class AcknowledgeTest
: public testing::TestWithParam<std::tuple<int /* ack grouping time */, bool /* ack with receipt */>> {};
TEST_P(AcknowledgeTest, testAckMsgList) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
constexpr auto numMsg = 100;
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
const std::string subName = "sub-ack-list";
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
ConsumerConfiguration consumerConfig;
consumerConfig.setAckGroupingMaxSize(numMsg);
consumerConfig.setAckGroupingTimeMs(std::get<0>(GetParam()));
consumerConfig.setAckReceiptEnabled(std::get<1>(GetParam()));
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();
auto consumerStats = PulsarFriend::getConsumerStatsPtr(consumer);
auto ackMap = consumerStats->getAckedMsgMap();
unsigned long totalAck = ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
ASSERT_EQ(totalAck, numMsg);
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
producer.close();
consumer.close();
client.close();
}
TEST_P(AcknowledgeTest, testAckMsgListWithMultiConsumer) {
Client client(lookupUrl);
auto clientImplPtr = PulsarFriend::getClientImplPtr(client);
std::string uniqueChunk = unique_str();
std::string topicName = "persistent://public/default/test-ack-msgs" + uniqueChunk;
// call admin api to make it partitioned
std::string url =
adminUrl + "admin/v2/persistent/public/default/test-ack-msgs" + uniqueChunk + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
constexpr auto numMsg = 100;
const std::string subName = "sub-ack-list";
Producer producer;
ProducerConfiguration producerConfig;
// Turn off batch to ensure even distribution
producerConfig.setBatchingEnabled(false);
producerConfig.setPartitionsRoutingMode(pulsar::ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
ConsumerConfiguration consumerConfig;
// set ack grouping max size is 10
consumerConfig.setAckGroupingMaxSize(10);
consumerConfig.setAckGroupingTimeMs(std::get<0>(GetParam()));
consumerConfig.setAckReceiptEnabled(std::get<1>(GetParam()));
consumerConfig.setUnAckedMessagesTimeoutMs(10000);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, subName, consumerConfig, consumer));
// Sending and receiving messages.
for (auto count = 0; count < numMsg; ++count) {
Message msg = MessageBuilder().setContent(std::string("MSG-") + std::to_string(count)).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
std::vector<MessageId> recvMsgId;
for (auto count = 0; count < numMsg; ++count) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 1000));
recvMsgId.emplace_back(msg.getMessageId());
}
ASSERT_EQ(ResultOk, consumer.acknowledge(recvMsgId));
// try redeliver unack messages.
consumer.redeliverUnacknowledgedMessages();
// assert stats
unsigned long totalAck = 0;
auto consumerStatsList = PulsarFriend::getConsumerStatsPtrList(consumer);
for (auto consumerStats : consumerStatsList) {
auto ackMap = consumerStats->getAckedMsgMap();
totalAck += ackMap[std::make_pair(ResultOk, CommandAck_AckType_Individual)];
}
ASSERT_EQ(totalAck, numMsg);
Message msg;
auto ret = consumer.receive(msg, 1000);
ASSERT_EQ(ResultTimeout, ret) << "Received redundant message ID: " << msg.getMessageId();
producer.close();
consumer.close();
client.close();
}
TEST_F(AcknowledgeTest, testBatchedMessageId) {
Client client(lookupUrl);
const std::string topic = "test-batched-message-id-" + unique_str();
constexpr int batchingMaxMessages = 3;
constexpr int numMessages = batchingMaxMessages * 3;
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic,
ProducerConfiguration()
.setBatchingMaxMessages(batchingMaxMessages)
.setBatchingMaxPublishDelayMs(3600 * 1000 /* 1h */),
producer));
std::vector<ConsumerWrapper> consumers{4};
for (size_t i = 0; i < consumers.size(); i++) {
consumers[i].initialize(client, topic, "sub-" + std::to_string(i));
}
for (size_t i = 0; i < numMessages; i++) {
producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), nullptr);
}
for (size_t i = 0; i < consumers.size(); i++) {
consumers[i].receiveAtMost(numMessages);
if (i > 0) {
ASSERT_EQ(consumers[i].messageIdList(), consumers[0].messageIdList());
}
}
Message msg;
// ack 2 messages of the batch that has 3 messages
consumers[0].acknowledgeAndRedeliver({0, 2}, AckType::INDIVIDUAL);
ASSERT_EQ(consumers[0].getConsumer().receive(msg, 1000), ResultOk);
EXPECT_EQ(msg.getMessageId(), consumers[0].messageIdList()[0]);
ASSERT_EQ(consumers[0].getNumAcked(CommandAck_AckType_Individual), 0);
// ack the whole batch
consumers[1].acknowledgeAndRedeliver({0, 1, 2}, AckType::INDIVIDUAL);
ASSERT_EQ(consumers[1].getConsumer().receive(msg, 1000), ResultOk);
EXPECT_EQ(msg.getMessageId(), consumers[1].messageIdList()[batchingMaxMessages]);
ASSERT_EQ(consumers[1].getNumAcked(CommandAck_AckType_Individual), 3);
// ack cumulatively the previous message id
consumers[2].acknowledgeAndRedeliver({batchingMaxMessages, batchingMaxMessages + 1}, AckType::CUMULATIVE);
ASSERT_EQ(consumers[2].getConsumer().receive(msg, 1000), ResultOk);
EXPECT_EQ(msg.getMessageId(), consumers[2].messageIdList()[batchingMaxMessages]);
// the previous message id will only be acknowledged once
ASSERT_EQ(consumers[2].getNumAcked(CommandAck_AckType_Cumulative), 1);
// the whole 2nd batch is acknowledged
consumers[3].acknowledgeAndRedeliver({batchingMaxMessages + 2}, AckType::CUMULATIVE);
ASSERT_EQ(consumers[3].getConsumer().receive(msg, 1000), ResultOk);
EXPECT_EQ(msg.getMessageId(), consumers[3].messageIdList()[batchingMaxMessages * 2]);
ASSERT_EQ(consumers[3].getNumAcked(CommandAck_AckType_Cumulative), 1);
}
TEST_F(AcknowledgeTest, testBatchIndexAck) {
Client client(lookupUrl);
const std::string topic = "test-batch-index-ack-" + unique_str();
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(
topic,
ProducerConfiguration().setBatchingMaxMessages(100).setBatchingMaxPublishDelayMs(
3600 * 1000 /* 1h */),
producer));
std::vector<ConsumerWrapper> consumers{3};
for (size_t i = 0; i < consumers.size(); i++) {
consumers[i].initialize(client, topic, "sub-" + std::to_string(i), true /* enable batch index ack */);
}
constexpr int numMessages = 5;
for (int i = 0; i < numMessages; i++) {
producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), nullptr);
}
producer.flush();
for (int i = 0; i < consumers.size(); i++) {
consumers[i].receiveAtMost(numMessages);
if (i >= 0) {
ASSERT_EQ(consumers[0].messageIdList(), consumers[i].messageIdList());
}
}
auto msgIds = consumers[0].messageIdList();
consumers[0].acknowledgeAndRestart({0, 2, 4}, AckType::INDIVIDUAL);
consumers[0].receiveAtMost(2);
ASSERT_EQ(subMessageIdList(msgIds, {1, 3}), consumers[0].messageIdList());
consumers[1].acknowledgeAndRestart({0, 3}, AckType::INDIVIDUAL_LIST);
consumers[1].receiveAtMost(3);
ASSERT_EQ(subMessageIdList(msgIds, {1, 2, 4}), consumers[1].messageIdList());
consumers[2].acknowledgeAndRestart({3}, AckType::CUMULATIVE);
consumers[2].receiveAtMost(1);
ASSERT_EQ(subMessageIdList(msgIds, {4}), consumers[2].messageIdList());
}
TEST_F(AcknowledgeTest, testMixedCumulativeAck) {
Client client(lookupUrl);
const std::string topic = "test-mixed-cumulative-ack-" + unique_str();
auto sendBatch = [&](int numMessages) {
Producer producer;
auto conf =
ProducerConfiguration().setBatchingMaxMessages(100).setBatchingMaxPublishDelayMs(3600 * 1000);
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
Latch latch{numMessages};
for (int i = 0; i < numMessages; i++) {
producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(),
[&latch](Result, const MessageId&) { latch.countdown(); });
}
producer.flush();
ASSERT_TRUE(latch.wait(std::chrono::seconds(3)));
};
auto sendNonBatch = [&](const std::string& msg) {
Producer producer;
auto conf = ProducerConfiguration().setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topic, conf, producer));
producer.send(MessageBuilder().setContent(msg).build());
producer.close();
};
ConsumerWrapper consumer;
consumer.initialize(client, topic, "sub", true);
sendNonBatch("msg");
sendBatch(5);
consumer.receiveAtMost(6);
// Acknowledge the 1st MessageId, which is a MessageIdImpl
auto msgIds = consumer.messageIdList();
ASSERT_TRUE(msgIds[0].batchIndex() < 0);
consumer.acknowledgeAndRestart({0}, AckType::CUMULATIVE);
consumer.receiveAtMost(5);
ASSERT_EQ(subMessageIdList(msgIds, {1, 2, 3, 4, 5}), consumer.messageIdList());
// Acknowledge the 3rd MessageId, which is a BatchedMessageIdImpl
ASSERT_EQ(consumer.messageIdList()[2].batchIndex(), 2);
consumer.acknowledgeAndRestart({2}, AckType::CUMULATIVE);
consumer.receiveAtMost(2);
ASSERT_EQ(subMessageIdList(msgIds, {4, 5}), consumer.messageIdList());
consumer.getConsumer().acknowledgeCumulative(discardBatch(consumer.messageIdList()[0]));
consumer.getConsumer().close();
consumer.initialize(client, topic, "sub", true);
Message msg;
ASSERT_EQ(ResultTimeout, consumer.getConsumer().receive(msg, 1000));
}
TEST_F(AcknowledgeTest, testInvalidMessageId) {
Client client(lookupUrl);
std::vector<std::string> topics{"test-invalid-message-id0" + unique_str(),
"test-invalid-message-id1" + unique_str()};
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topics, "sub", consumer));
Message msg;
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg));
msg = MessageBuilder().setContent("msg").build();
ASSERT_EQ(ResultOperationNotSupported, consumer.acknowledge(msg));
}
TEST_F(AcknowledgeTest, testAckReceiptEnabled) {
Client client(lookupUrl);
const std::string topic = "test-ack-receipt-enabled" + unique_str();
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topic, producer));
std::vector<MessageId> msgIds;
for (int i = 0; i < 5; i++) {
MessageId msgId;
ASSERT_EQ(ResultOk,
producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), msgId));
msgIds.emplace_back(msgId);
}
constexpr long ackGroupingTimeMs = 200;
Consumer consumer;
ConsumerConfiguration conf;
conf.setAckGroupingTimeMs(ackGroupingTimeMs);
conf.setAckReceiptEnabled(true);
conf.setBatchIndexAckEnabled(true);
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
using namespace std::chrono;
// The ACK grouping timer starts after it's subscribed successfully. To ensure the acknowledgments
// complete after `ackGroupingTimeMs`, record the start timestamp before subscribing
auto now = high_resolution_clock::now();
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", conf, consumer));
std::atomic<decltype(std::this_thread::get_id())> threadId[3];
std::atomic_long durationMs[3];
std::atomic<Result> result[3];
Latch latch{3};
auto createCallback = [&](int i) -> ResultCallback {
return [i, now, &threadId, &durationMs, &result, &latch](Result result0) {
threadId[i] = std::this_thread::get_id();
durationMs[i] =
duration_cast<std::chrono::milliseconds>(high_resolution_clock::now() - now).count();
result[i] = result0;
latch.countdown();
};
};
consumer.acknowledgeAsync(msgIds[1], createCallback(0));
consumer.acknowledgeCumulativeAsync(msgIds[2], createCallback(1));
consumer.acknowledgeAsync(msgIds, createCallback(2));
ASSERT_TRUE(latch.wait(std::chrono::seconds(3)));
for (int i = 0; i < 3; i++) {
LOG_INFO("Ack time: " << durationMs[i] << "ms");
EXPECT_TRUE(durationMs[i] > ackGroupingTimeMs && durationMs[i] < ackGroupingTimeMs + 100);
EXPECT_NE(threadId[i], std::this_thread::get_id());
EXPECT_EQ(result[i], ResultOk);
}
client.close();
}
INSTANTIATE_TEST_SUITE_P(BasicEndToEndTest, AcknowledgeTest,
testing::Combine(testing::Values(100, 0), testing::Values(true, false)),
[](const testing::TestParamInfo<std::tuple<int, bool>>& info) {
return std::to_string(std::get<0>(info.param)) + "_" +
std::to_string(std::get<1>(info.param));
});