blob: e46cb4575c3574cccc90bcb80afd218fcf91da54 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/MessageBatch.h>
#include <pulsar/MessageIdBuilder.h>
#include <atomic>
#include <ctime>
#include <functional>
#include <sstream>
#include <thread>
#include "ConsumerTest.h"
#include "CustomRoutingPolicy.h"
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/Commands.h"
#include "lib/Future.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
#include "lib/ProtoApiEnums.h"
#include "lib/Utils.h"
#include "lib/stats/ProducerStatsImpl.h"
DECLARE_LOG_OBJECT();
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string adminUrl = "http://localhost:8080/";
// ecpoch time in seconds
const long epochTime = time(NULL);
class MessageCountSendCallback {
public:
MessageCountSendCallback(std::atomic_int& numOfMessagesProduced)
: numOfMessagesProduced_(numOfMessagesProduced) {}
void operator()(Result result, const MessageId&) {
ASSERT_EQ(result, ResultOk);
numOfMessagesProduced_++;
}
private:
std::atomic_int& numOfMessagesProduced_;
};
static void sendFailCallBack(Result r, Result expect_result) { EXPECT_EQ(r, expect_result); }
static int globalPublishCountSuccess = 0;
static int globalPublishCountQueueFull = 0;
static void sendCallBackExpectingErrors(Result r, const MessageId& msgId) {
if (r == ResultProducerQueueIsFull) {
globalPublishCountQueueFull++;
} else if (r == ResultOk) {
globalPublishCountSuccess++;
}
}
TEST(BatchMessageTest, testProducerConfig) {
ProducerConfiguration conf;
try {
conf.setBatchingMaxMessages(1);
FAIL();
} catch (const std::exception&) {
// Ok
}
ASSERT_EQ(ProducerConfiguration::DefaultBatching, conf.getBatchingType());
conf.setBatchingType(ProducerConfiguration::KeyBasedBatching);
ASSERT_EQ(ProducerConfiguration::KeyBasedBatching, conf.getBatchingType());
}
TEST(BatchMessageTest, testProducerTimeout) {
std::string testName = std::to_string(epochTime) + "testProducerTimeout";
ClientConfiguration clientConf;
clientConf.setStatsIntervalInSeconds(1);
Client client(lookupUrl, clientConf);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 3;
int numOfMessages = 4;
int timeout = 4000;
ProducerConfiguration conf;
conf.setCompressionType(CompressionLZ4);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingMaxPublishDelayMs(timeout);
conf.setBatchingEnabled(true);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
std::string prefix = "msg-batch-test-produce-timeout-";
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg = MessageBuilder()
.setContent(messageContent)
.setProperty("type", "batch")
.setProperty("msgIndex", std::to_string(i))
.build();
LOG_DEBUG("sending message " << messageContent);
clock_t start, end;
/* Start the timer */
start = time(NULL);
LOG_DEBUG("start = " << start);
Promise<Result, MessageId> promise;
producer.sendAsync(msg, WaitForCallbackValue<MessageId>(promise));
MessageId mi;
promise.getFuture().get(mi);
/* End the timer */
end = time(NULL);
LOG_DEBUG("end = " << end);
// Greater than or equal to since there may be delay in sending messaging
ASSERT_GE((double)(end - start), timeout / 1000.0);
ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), i + 1);
}
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(receivedMsg.getProperty("type"), "batch");
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
}
TEST(BatchMessageTest, testBatchSizeInBytes) {
std::string testName = std::to_string(epochTime) + "testBatchSizeInBytes";
Client client(lookupUrl);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 1000;
int numOfMessages = 30;
ProducerConfiguration conf;
conf.setCompressionType(CompressionLZ4);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingMaxAllowedSizeInBytes(20);
conf.setBatchingEnabled(true);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
std::atomic_int numOfMessagesProduced{0};
std::string prefix = "12345678";
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
ASSERT_EQ(producerStatsImplPtr->getNumMsgsSent(), i + 1);
ASSERT_LT(PulsarFriend::sum(producerStatsImplPtr->getSendMap()), i + 1);
ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
ASSERT_LT(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), i + 1);
LOG_DEBUG("sending message " << messageContent);
}
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_LT(receivedMsg.getMessageId().batchIndex(), 2);
ASSERT_EQ(receivedMsg.getMessageId().batchSize(), 2);
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
// Check stats
ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getSendMap()), numOfMessages);
ASSERT_EQ(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), numOfMessages);
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
}
TEST(BatchMessageTest, testSmallReceiverQueueSize) {
std::string testName = std::to_string(epochTime) + "testSmallReceiverQueueSize";
ClientConfiguration clientConf;
clientConf.setStatsIntervalInSeconds(20);
Client client(lookupUrl, clientConf);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 10;
int numOfMessages = 1000;
ProducerConfiguration conf;
conf.setCompressionType(CompressionLZ4);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingMaxPublishDelayMs(1);
conf.setBatchingEnabled(true);
conf.setMaxPendingMessages(numOfMessages + 1);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setReceiverQueueSize(41);
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, consumerConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
ASSERT_EQ(producerStatsImplPtr->getTotalMsgsSent(), i + 1);
ASSERT_LE(PulsarFriend::sum(producerStatsImplPtr->getTotalSendMap()), i + 1);
LOG_DEBUG("sending message " << messageContent);
}
Message receivedMsg;
int i = 0;
for (i = 0; i < numOfMessages; i++) {
consumer.receive(receivedMsg);
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
ConsumerStatsImplPtr consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
unsigned long t =
consumerStatsImplPtr->getAckedMsgMap().at(std::make_pair(ResultOk, CommandAck_AckType_Individual));
ASSERT_EQ(t, numOfMessages);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), numOfMessages);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), numOfMessages);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), numOfMessages);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalReceivedMsgMap()), numOfMessages);
ASSERT_EQ(consumerStatsImplPtr->getTotalNumBytesRecieved(), consumerStatsImplPtr->getNumBytesRecieved());
std::this_thread::sleep_for(std::chrono::seconds(20));
ASSERT_NE(consumerStatsImplPtr->getTotalNumBytesRecieved(), consumerStatsImplPtr->getNumBytesRecieved());
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalAckedMsgMap()), numOfMessages);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getTotalReceivedMsgMap()), numOfMessages);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 0);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), 0);
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
}
TEST(BatchMessageTest, testIndividualAck) {
std::string testName = std::to_string(epochTime) + "testIndividualAck";
ClientConfiguration clientConfig;
clientConfig.setStatsIntervalInSeconds(1);
Client client(lookupUrl, clientConfig);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 5;
int numOfMessages = 10;
ProducerConfiguration conf;
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setReceiverQueueSize(1);
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, consumerConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
// Send Asynchronously
std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Ack every 2nd message
if (i % 2 == 0) {
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
}
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
// Unsubscribe and resubscribe
// Expecting all messages to be sent again
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Ack every first 5 and 10th message
if (i <= 5 || i == 10) {
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
}
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
// Unsubscribe and resubscribe
// Expecting only one batch message to be resent
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i + numOfMessages / 2);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + numOfMessages / 2));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Ack first 4 message only
if (i <= 4) {
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
}
// Number of messages consumed
ASSERT_EQ(i, numOfMessages / 2);
// Unsubscribe and resubscribe
// Expecting only one batch message to be resent
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i + numOfMessages / 2);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + numOfMessages / 2));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Ack all
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
// Number of messages consumed
ASSERT_EQ(i, numOfMessages / 2);
// Unsubscribe and resubscribe
// Expecting no batch message to be resent
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
// Number of messages consumed
ASSERT_NE(ResultOk, consumer.receive(receivedMsg, 5000));
consumer.close();
client.close();
}
TEST(BatchMessageTest, testCumulativeAck) {
std::string testName = std::to_string(epochTime) + "testCumulativeAck";
ClientConfiguration clientConfig;
clientConfig.setStatsIntervalInSeconds(100);
Client client(lookupUrl, clientConfig);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 5;
int numOfMessages = 15;
ProducerConfiguration conf;
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setReceiverQueueSize(1);
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, consumerConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
ProducerStatsImplPtr producerStatsImplPtr = PulsarFriend::getProducerStatsPtr(producer);
// Send Asynchronously
std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
Message receivedMsg;
int i = 0;
ConsumerStatsImplPtr consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Cumm. Ack 7th message
if (i == 7) {
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
}
}
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getReceivedMsgMap()), i + 1);
// Since last receive call times out
ASSERT_EQ(consumerStatsImplPtr->getReceivedMsgMap().at(ResultOk), i);
ASSERT_EQ(consumerStatsImplPtr->getReceivedMsgMap().at(ResultTimeout), 1);
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 1);
ASSERT_EQ(producerStatsImplPtr->getNumBytesSent(), consumerStatsImplPtr->getNumBytesRecieved());
unsigned long t =
consumerStatsImplPtr->getAckedMsgMap().at(std::make_pair(ResultOk, CommandAck_AckType_Cumulative));
ASSERT_EQ(t, 1);
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
// Unsubscribe and resubscribe
// Expecting 10 messages to be sent again
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
consumerStatsImplPtr = PulsarFriend::getConsumerStatsPtr(consumer);
i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i + 5);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + 5));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Ack 10th message
if (i == 10) {
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
}
}
ASSERT_EQ(PulsarFriend::sum(consumerStatsImplPtr->getAckedMsgMap()), 1);
t = consumerStatsImplPtr->getAckedMsgMap().at(std::make_pair(ResultOk, CommandAck_AckType_Cumulative));
ASSERT_EQ(t, 1);
// Number of messages consumed
ASSERT_EQ(i, 10);
// Unsubscribe and resubscribe
// Expecting no batch message to be resent
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
// Number of messages consumed
ASSERT_NE(ResultOk, consumer.receive(receivedMsg, 5000));
}
TEST(BatchMessageTest, testMixedAck) {
std::string testName = std::to_string(epochTime) + "testMixedAck";
Client client(lookupUrl);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 5;
int numOfMessages = 15;
ProducerConfiguration conf;
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consumerConfig;
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, consumerConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
// Send Asynchronously
std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Cumm. Ack 14th message
if (i == 14) {
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
}
}
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
// Unsubscribe and resubscribe
// Expecting 5 messages to be sent again
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i + 10);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++ + 10));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
// Cumm Ack 9th message
if (i == 4) {
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
}
}
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
// Number of messages consumed
ASSERT_EQ(i, 5);
// Unsubscribe and resubscribe
// Expecting no batch message to be resent
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
// Number of messages consumed
ASSERT_NE(ResultOk, consumer.receive(receivedMsg, 5000));
}
// Also testing Cumulative Ack test case where greatestCumulativeAck returns
// MessageId()
TEST(BatchMessageTest, testPermits) {
std::string testName = std::to_string(epochTime) + "testPermits";
Client client(lookupUrl);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
// Enable batching on producer side
int batchSize = 10;
int numOfMessages = 75;
ProducerConfiguration conf;
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingMaxPublishDelayMs(5);
conf.setBatchingEnabled(true);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consumerConfig;
consumerConfig.setReceiverQueueSize(5);
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, consumerConfig,
WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
// handling dangling subscriptions
consumer.close();
client.subscribe(topicName, subName, consumerConfig, consumer);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
// Send Asynchronously
std::atomic_int numOfMessagesProduced{0};
std::string prefix = testName;
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
}
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
// Since all messages are acked
// Creating 25 new non batched message
conf.setBatchingEnabled(false);
client.createProducer(topicName, conf, producer);
numOfMessagesProduced = 0;
// Send Asynchronously
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, MessageCountSendCallback(numOfMessagesProduced));
LOG_DEBUG("sending message " << messageContent);
}
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_LE(ConsumerTest::getNumOfMessagesInQueue(consumer), consumerConfig.getReceiverQueueSize());
ASSERT_GE(ConsumerTest::getNumOfMessagesInQueue(consumer), consumerConfig.getReceiverQueueSize() / 2);
i = 0;
while (consumer.receive(receivedMsg, 5000) == ResultOk) {
std::string expectedMessageContent = prefix + std::to_string(i);
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(receivedMsg.getProperty("msgIndex"), std::to_string(i++));
ASSERT_EQ(expectedMessageContent, receivedMsg.getDataAsString());
ASSERT_EQ(ResultOk, consumer.acknowledgeCumulative(receivedMsg));
}
// Number of messages produced
ASSERT_EQ(numOfMessagesProduced.load(), numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages);
}
TEST(BatchMessageTest, testPartitionedTopics) {
Client client(lookupUrl);
std::string topicName =
"persistent://public/default/test-partitioned-batch-messages-" + std::to_string(epochTime);
// call admin api to make it partitioned
std::string url = adminUrl + "admin/v2/persistent/public/default/test-partitioned-batch-messages-" +
std::to_string(epochTime) + "/partitions";
int res = makePutRequest(url, "7");
LOG_DEBUG("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
std::this_thread::sleep_for(std::chrono::seconds(2));
Producer producer;
// Enable batching on producer side
int batchSize = 100;
int numOfMessages = 10000;
ProducerConfiguration conf;
conf.setCompressionType(CompressionZLib);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBatchingMaxPublishDelayMs(5);
conf.setBlockIfQueueFull(false);
conf.setMaxPendingMessages(10);
Promise<Result, Producer> producerPromise;
client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise));
Future<Result, Producer> producerFuture = producerPromise.getFuture();
Result result = producerFuture.get(producer);
ASSERT_EQ(ResultOk, result);
std::string subName = "subscription-name";
Consumer consumer;
Promise<Result, Consumer> consumerPromise;
client.subscribeAsync(topicName, subName, WaitForCallbackValue<Consumer>(consumerPromise));
Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
result = consumerFuture.get(consumer);
ASSERT_EQ(ResultOk, result);
std::string temp = producer.getTopic();
ASSERT_EQ(temp, topicName);
temp = consumer.getTopic();
ASSERT_EQ(temp, topicName);
ASSERT_EQ(consumer.getSubscriptionName(), subName);
globalPublishCountSuccess = 0;
globalPublishCountQueueFull = 0;
// Send Asynchronously
std::string prefix = "msg-batch-";
for (int i = 0; i < numOfMessages; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg =
MessageBuilder().setContent(messageContent).setProperty("msgIndex", std::to_string(i)).build();
producer.sendAsync(msg, &sendCallBackExpectingErrors);
LOG_DEBUG("sending message " << messageContent);
}
Message receivedMsg;
int i = 0;
while (consumer.receive(receivedMsg, 30000) == ResultOk) {
LOG_DEBUG("Received Message with [ content - " << receivedMsg.getDataAsString() << "] [ messageID = "
<< receivedMsg.getMessageId() << "]");
ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
i++;
}
LOG_DEBUG("globalPublishCountQueueFull = " << globalPublishCountQueueFull);
LOG_DEBUG("globalPublishCountSuccess = " << globalPublishCountSuccess);
LOG_DEBUG("numOfMessages = " << numOfMessages);
// Number of messages produced
ASSERT_EQ(globalPublishCountSuccess + globalPublishCountQueueFull, numOfMessages);
// Number of messages consumed
ASSERT_EQ(i, numOfMessages - globalPublishCountQueueFull);
}
TEST(BatchMessageTest, producerFailureResult) {
std::string testName = std::to_string(epochTime) + "testCumulativeAck";
ClientConfiguration clientConfig;
clientConfig.setStatsIntervalInSeconds(100);
Client client(lookupUrl, clientConfig);
std::string topicName = "persistent://public/default/" + testName;
std::string subName = "subscription-name";
Producer producer;
int batchSize = 100;
ProducerConfiguration conf;
conf.setCompressionType(CompressionZLib);
conf.setBatchingMaxMessages(batchSize);
conf.setBatchingEnabled(true);
conf.setBatchingMaxPublishDelayMs(50000);
conf.setBlockIfQueueFull(false);
conf.setMaxPendingMessages(10);
Result res = Result::ResultBrokerMetadataError;
client.createProducer(topicName, conf, producer);
Message msg = MessageBuilder().setContent("test").build();
producer.sendAsync(msg, std::bind(&sendFailCallBack, std::placeholders::_1, res));
PulsarFriend::producerFailMessages(producer, res);
}
TEST(BatchMessageTest, testPraseMessageBatchEntry) {
struct Case {
std::string content;
std::string propKey;
std::string propValue;
};
std::vector<Case> cases;
cases.push_back(Case{"example1", "prop1", "value1"});
cases.push_back(Case{"example2", "prop2", "value2"});
SharedBuffer payload = SharedBuffer::allocate(128);
for (auto it = cases.begin(); it != cases.end(); ++it) {
MessageBuilder msgBuilder;
const Message& message =
msgBuilder.setContent(it->content).setProperty(it->propKey, it->propValue).build();
Commands::serializeSingleMessageInBatchWithPayload(message, payload, 1024);
}
MessageBatch messageBatch;
auto fakeId = MessageIdBuilder().ledgerId(5000L).entryId(10L).partition(0).build();
messageBatch.withMessageId(fakeId).parseFrom(payload, static_cast<uint32_t>(cases.size()));
const std::vector<Message>& messages = messageBatch.messages();
ASSERT_EQ(messages.size(), cases.size());
for (int i = 0; i < cases.size(); ++i) {
const Message& message = messages[i];
const Case& expected = cases[i];
ASSERT_EQ(message.getMessageId().batchIndex(), i);
ASSERT_EQ(message.getMessageId().ledgerId(), 5000);
ASSERT_EQ(message.getDataAsString(), expected.content);
ASSERT_EQ(message.getProperty(expected.propKey), expected.propValue);
}
}
TEST(BatchMessageTest, testSendCallback) {
const std::string topicName = "persistent://public/default/BasicMessageTest-testSendCallback";
Client client(lookupUrl);
constexpr int numMessagesOfBatch = 3;
ProducerConfiguration producerConfig;
producerConfig.setBatchingEnabled(true);
producerConfig.setBatchingMaxMessages(numMessagesOfBatch);
producerConfig.setBatchingMaxPublishDelayMs(1000); // 1 s, it's long enough for 3 messages batched
producerConfig.setMaxPendingMessages(5);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConfig, producer));
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topicName, "SubscriptionName", consumer));
Latch latch(numMessagesOfBatch);
std::set<MessageId> sentIdSet;
for (int i = 0; i < numMessagesOfBatch; i++) {
const auto msg = MessageBuilder().setContent("a").build();
producer.sendAsync(msg, [&sentIdSet, i, &latch](Result result, const MessageId& id) {
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(i, id.batchIndex());
sentIdSet.emplace(id);
LOG_INFO("id of batch " << i << ": " << id);
latch.countdown();
});
}
std::set<MessageId> receivedIdSet;
for (int i = 0; i < numMessagesOfBatch; i++) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg));
receivedIdSet.emplace(msg.getMessageId());
consumer.acknowledge(msg);
}
latch.wait();
ASSERT_EQ(sentIdSet, receivedIdSet);
consumer.close();
producer.close();
client.close();
}
TEST(BatchMessageTest, testProducerQueueWithBatches) {
std::string testName = std::to_string(epochTime) + "testProducerQueueWithBatches";
ClientConfiguration clientConf;
clientConf.setStatsIntervalInSeconds(0);
Client client(lookupUrl, clientConf);
std::string topicName = "persistent://public/default/" + testName;
// Enable batching on producer side
ProducerConfiguration conf;
conf.setBlockIfQueueFull(false);
conf.setMaxPendingMessages(10);
conf.setBatchingMaxMessages(10000);
conf.setBatchingMaxPublishDelayMs(1000);
conf.setBatchingEnabled(true);
Producer producer;
Result result = client.createProducer(topicName, conf, producer);
ASSERT_EQ(ResultOk, result);
std::string prefix = "msg-batch-test-produce-timeout-";
int rejectedMessges = 0;
for (int i = 0; i < 20; i++) {
std::string messageContent = prefix + std::to_string(i);
Message msg = MessageBuilder().setContent("hello").build();
producer.sendAsync(msg, [&rejectedMessges](Result result, const MessageId& id) {
if (result == ResultProducerQueueIsFull) {
++rejectedMessges;
}
});
}
ASSERT_EQ(rejectedMessges, 10);
}
TEST(BatchMessageTest, testSingleMessageMetadata) {
const auto topic = "BatchMessageTest-SingleMessageMetadata-" + std::to_string(time(nullptr));
constexpr int numMessages = 3;
Client client(lookupUrl);
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "sub", consumer));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(
topic, ProducerConfiguration().setBatchingMaxMessages(numMessages), producer));
producer.sendAsync(MessageBuilder()
.setContent("msg-0")
.setPartitionKey("key-0")
.setOrderingKey("ordering-key-0")
.setEventTimestamp(10UL)
.setProperty("k0", "v0")
.setProperty("k1", "v1")
.build(),
nullptr);
producer.sendAsync(MessageBuilder()
.setContent("msg-1")
.setOrderingKey("ordering-key-1")
.setEventTimestamp(11UL)
.setProperty("k2", "v2")
.build(),
nullptr);
producer.sendAsync(MessageBuilder().setContent("msg-2").build(), nullptr);
ASSERT_EQ(ResultOk, producer.flush());
Message msgs[numMessages];
for (int i = 0; i < numMessages; i++) {
Message msg;
ASSERT_EQ(ResultOk, consumer.receive(msg, 3000));
msgs[i] = msg;
LOG_INFO("message " << i << ": " << msg.getDataAsString()
<< ", key: " << (msg.hasPartitionKey() ? msg.getPartitionKey() : "(null)")
<< ", ordering key: " << (msg.hasOrderingKey() ? msg.getOrderingKey() : "(null)")
<< ", event time: " << (msg.getEventTimestamp())
<< ", properties count: " << msg.getProperties().size()
<< ", has schema version: " << msg.hasSchemaVersion());
}
ASSERT_EQ(msgs[0].getDataAsString(), "msg-0");
ASSERT_TRUE(msgs[0].hasPartitionKey());
ASSERT_EQ(msgs[0].getPartitionKey(), "key-0");
ASSERT_TRUE(msgs[0].hasOrderingKey());
ASSERT_EQ(msgs[0].getOrderingKey(), "ordering-key-0");
ASSERT_EQ(msgs[0].getEventTimestamp(), 10UL);
ASSERT_EQ(msgs[0].getProperties().size(), 2);
ASSERT_TRUE(msgs[0].hasProperty("k0"));
ASSERT_EQ(msgs[0].getProperty("k0"), "v0");
ASSERT_TRUE(msgs[0].hasProperty("k1"));
ASSERT_EQ(msgs[0].getProperty("k1"), "v1");
ASSERT_FALSE(msgs[0].hasSchemaVersion());
ASSERT_EQ(msgs[1].getDataAsString(), "msg-1");
ASSERT_FALSE(msgs[1].hasPartitionKey());
ASSERT_TRUE(msgs[1].hasOrderingKey());
ASSERT_EQ(msgs[1].getOrderingKey(), "ordering-key-1");
ASSERT_EQ(msgs[1].getEventTimestamp(), 11UL);
ASSERT_EQ(msgs[1].getProperties().size(), 1);
ASSERT_TRUE(msgs[1].hasProperty("k2"));
ASSERT_EQ(msgs[1].getProperty("k2"), "v2");
ASSERT_FALSE(msgs[1].hasSchemaVersion());
ASSERT_EQ(msgs[2].getDataAsString(), "msg-2");
ASSERT_FALSE(msgs[2].hasPartitionKey());
ASSERT_FALSE(msgs[2].hasOrderingKey());
ASSERT_EQ(msgs[2].getEventTimestamp(), 0UL);
ASSERT_EQ(msgs[2].getProperties().size(), 0);
ASSERT_FALSE(msgs[2].hasSchemaVersion());
client.close();
}