blob: afef384d1232de637dfb406cd5f04117c4fc57f4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/Reader.h>
#include <time.h>
#include <string>
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "WaitUtils.h"
#include "lib/ClientConnection.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
#include "lib/ReaderImpl.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
static std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";
class ReaderTest : public ::testing::TestWithParam<bool> {
public:
void initTopic(std::string topicName) {
if (isMultiTopic_) {
// call admin api to make it partitioned
std::string url = adminUrl + "admin/v2/persistent/public/default/" + topicName + "/partitions";
int res = makePutRequest(url, "5");
LOG_INFO("res = " << res);
ASSERT_FALSE(res != 204 && res != 409);
}
}
protected:
bool isMultiTopic_ = GetParam();
};
TEST_P(ReaderTest, testSimpleReader) {
Client client(serviceUrl);
std::string topicName =
"test-simple-reader" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
for (int i = 0; i < 10; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
producer.close();
reader.close();
client.close();
}
TEST_P(ReaderTest, testAsyncRead) {
Client client(serviceUrl);
std::string topicName = "testAsyncRead" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
for (int i = 0; i < 10; i++) {
reader.readNextAsync([i](Result result, const Message& msg) {
ASSERT_EQ(ResultOk, result);
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
});
}
waitUntil(
std::chrono::seconds(5),
[&]() {
bool hasMsg;
reader.hasMessageAvailable(hasMsg);
return !hasMsg;
},
1000);
bool hasMsg;
reader.hasMessageAvailable(hasMsg);
ASSERT_FALSE(hasMsg);
producer.close();
reader.close();
client.close();
}
TEST_P(ReaderTest, testReaderAfterMessagesWerePublished) {
Client client(serviceUrl);
std::string topicName = "testReaderAfterMessagesWerePublished" + std::to_string(time(nullptr)) +
std::to_string(isMultiTopic_);
initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
for (int i = 0; i < 10; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
producer.close();
reader.close();
client.close();
}
TEST_P(ReaderTest, testMultipleReaders) {
Client client(serviceUrl);
std::string topicName =
"testMultipleReaders" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
ReaderConfiguration readerConf;
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader1));
Reader reader2;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader2));
for (int i = 0; i < 10; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader1.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
for (int i = 0; i < 10; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader2.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
producer.close();
reader1.close();
reader2.close();
client.close();
}
TEST_P(ReaderTest, testReaderOnLastMessage) {
Client client(serviceUrl);
std::string topicName =
"testReaderOnLastMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
for (int i = 10; i < 20; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
for (int i = 10; i < 20; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
producer.close();
reader.close();
client.close();
}
TEST_P(ReaderTest, testReaderOnSpecificMessage) {
Client client(serviceUrl);
std::string topicName =
"testReaderOnSpecificMessage" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
MessageId lastMessageId;
for (int i = 0; i < 5; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
lastMessageId = msg.getMessageId();
}
// Create another reader starting on msgid4
ASSERT_EQ(ResultOk, client.createReader(topicName, lastMessageId, readerConf, reader));
for (int i = 5; i < 10; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
producer.close();
reader.close();
client.close();
}
/**
* build, file MessageIdBuilder.cc, line 45.?? Test that we can position on a particular message even within a
* batch
*/
TEST_P(ReaderTest, testReaderOnSpecificMessageWithBatches) {
Client client(serviceUrl);
std::string topicName = "testReaderOnSpecificMessageWithBatches" + std::to_string(time(nullptr)) +
std::to_string(isMultiTopic_);
initTopic(topicName);
Producer producer;
// Enable batching
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelayMs(1000);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
producer.sendAsync(msg, NULL);
}
// Send one sync message, to wait for everything before to be persisted as well
std::string content = "my-message-10";
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
std::string lastMessageId;
for (int i = 0; i < 5; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
msg.getMessageId().serialize(lastMessageId);
}
// Create another reader starting on msgid4
auto msgId4 = MessageId::deserialize(lastMessageId);
Reader reader2;
ASSERT_EQ(ResultOk, client.createReader(topicName, msgId4, readerConf, reader2));
for (int i = 5; i < 11; i++) {
Message msg;
ASSERT_EQ(ResultOk, reader2.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(i);
ASSERT_EQ(expected, content);
}
producer.close();
reader.close();
reader2.close();
client.close();
}
TEST_P(ReaderTest, testReaderReachEndOfTopic) {
Client client(serviceUrl);
std::string topicName =
"testReaderReachEndOfTopic" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
// 1. create producer
Producer producer;
// Enable batching
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(true);
producerConf.setBatchingMaxPublishDelayMs(1000);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
// 2. create reader, and expect hasMessageAvailable return false since no message produced.
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_FALSE(hasMessageAvailable);
// 3. produce 10 messages.
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
// 4. expect hasMessageAvailable return true, and after read 10 messages out, it return false.
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_TRUE(hasMessageAvailable);
int readMessageCount = 0;
for (; hasMessageAvailable; readMessageCount++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(readMessageCount);
ASSERT_EQ(expected, content);
reader.hasMessageAvailable(hasMessageAvailable);
}
ASSERT_EQ(readMessageCount, 10);
ASSERT_FALSE(hasMessageAvailable);
// 5. produce another 10 messages, expect hasMessageAvailable return true,
// and after read these 10 messages out, it return false.
for (int i = 10; i < 20; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
}
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_TRUE(hasMessageAvailable);
for (; hasMessageAvailable; readMessageCount++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(readMessageCount);
ASSERT_EQ(expected, content);
reader.hasMessageAvailable(hasMessageAvailable);
}
ASSERT_EQ(readMessageCount, 20);
ASSERT_FALSE(hasMessageAvailable);
producer.close();
reader.close();
client.close();
}
TEST_P(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) {
Client client(serviceUrl);
std::string topicName = "testReaderReachEndOfTopicMessageWithoutBatches" + std::to_string(time(nullptr)) +
std::to_string(isMultiTopic_);
initTopic(topicName);
// 1. create producer
Producer producer;
ProducerConfiguration producerConf;
producerConf.setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
// 2. create reader, and expect hasMessageAvailable return false since no message produced.
ReaderConfiguration readerConf;
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), readerConf, reader));
bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_FALSE(hasMessageAvailable);
// 3. produce 10 messages in batches way.
for (int i = 0; i < 10; i++) {
std::string content = "my-message-" + std::to_string(i);
Message msg = MessageBuilder().setContent(content).build();
producer.sendAsync(msg, NULL);
}
// Send one sync message, to wait for everything before to be persisted as well
std::string content = "my-message-10";
Message msg = MessageBuilder().setContent(content).build();
ASSERT_EQ(ResultOk, producer.send(msg));
// 4. expect hasMessageAvailable return true, and after read 11 messages out, it return false.
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
ASSERT_TRUE(hasMessageAvailable);
std::string lastMessageId;
int readMessageCount = 0;
for (; hasMessageAvailable; readMessageCount++) {
Message msg;
ASSERT_EQ(ResultOk, reader.readNext(msg));
std::string content = msg.getDataAsString();
std::string expected = "my-message-" + std::to_string(readMessageCount);
ASSERT_EQ(expected, content);
reader.hasMessageAvailable(hasMessageAvailable);
msg.getMessageId().serialize(lastMessageId);
}
ASSERT_FALSE(hasMessageAvailable);
ASSERT_EQ(readMessageCount, 11);
producer.close();
reader.close();
client.close();
}
TEST(ReaderTest, testPartitionIndex) {
Client client(serviceUrl);
const std::string nonPartitionedTopic = "ReaderTestPartitionIndex-topic-" + std::to_string(time(nullptr));
const std::string partitionedTopic =
"ReaderTestPartitionIndex-par-topic-" + std::to_string(time(nullptr));
int res = makePutRequest(
adminUrl + "admin/v2/persistent/public/default/" + partitionedTopic + "/partitions", "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
const std::string partition0 = partitionedTopic + "-partition-0";
const std::string partition1 = partitionedTopic + "-partition-1";
ReaderConfiguration readerConf;
Reader readers[3];
ASSERT_EQ(ResultOk,
client.createReader(nonPartitionedTopic, MessageId::earliest(), readerConf, readers[0]));
ASSERT_EQ(ResultOk, client.createReader(partition0, MessageId::earliest(), readerConf, readers[1]));
ASSERT_EQ(ResultOk, client.createReader(partition1, MessageId::earliest(), readerConf, readers[2]));
Producer producers[3];
ASSERT_EQ(ResultOk, client.createProducer(nonPartitionedTopic, producers[0]));
ASSERT_EQ(ResultOk, client.createProducer(partition0, producers[1]));
ASSERT_EQ(ResultOk, client.createProducer(partition1, producers[2]));
for (auto& producer : producers) {
ASSERT_EQ(ResultOk, producer.send(MessageBuilder().setContent("hello").build()));
}
Message msg;
readers[0].readNext(msg);
ASSERT_EQ(msg.getMessageId().partition(), -1);
readers[1].readNext(msg);
ASSERT_EQ(msg.getMessageId().partition(), 0);
readers[2].readNext(msg);
ASSERT_EQ(msg.getMessageId().partition(), 1);
client.close();
}
TEST_P(ReaderTest, testSubscriptionNameSetting) {
Client client(serviceUrl);
std::string topicName =
"testSubscriptionNameSetting" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
std::string subName = "test-sub";
ReaderConfiguration readerConf;
readerConf.setInternalSubscriptionName(subName);
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
reader.close();
client.close();
}
TEST_P(ReaderTest, testSetSubscriptionNameAndPrefix) {
Client client(serviceUrl);
std::string topicName =
"testSetSubscriptionNameAndPrefix" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
std::string subName = "test-sub";
ReaderConfiguration readerConf;
readerConf.setInternalSubscriptionName(subName);
readerConf.setSubscriptionRolePrefix("my-prefix");
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader));
ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName());
reader.close();
client.close();
}
TEST_P(ReaderTest, testMultiSameSubscriptionNameReaderShouldFail) {
Client client(serviceUrl);
std::string topicName = "testMultiSameSubscriptionNameReaderShouldFail" + std::to_string(time(nullptr)) +
std::to_string(isMultiTopic_);
initTopic(topicName);
std::string subscriptionName = "test-sub";
ReaderConfiguration readerConf1;
readerConf1.setInternalSubscriptionName(subscriptionName);
Reader reader1;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf1, reader1));
ReaderConfiguration readerConf2;
readerConf2.setInternalSubscriptionName(subscriptionName);
Reader reader2;
ASSERT_EQ(ResultConsumerBusy,
client.createReader(topicName, MessageId::earliest(), readerConf2, reader2));
reader1.close();
reader2.close();
client.close();
}
TEST_P(ReaderTest, testIsConnected) {
Client client(serviceUrl);
std::string topicName = "testIsConnected" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
Reader reader;
ASSERT_FALSE(reader.isConnected());
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), {}, reader));
ASSERT_TRUE(reader.isConnected());
ASSERT_EQ(ResultOk, reader.close());
ASSERT_FALSE(reader.isConnected());
}
TEST_P(ReaderTest, testHasMessageAvailableWhenCreated) {
Client client(serviceUrl);
std::string topicName =
"testHasMessageAvailableWhenCreated" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
ProducerConfiguration producerConf;
producerConf.setBatchingMaxMessages(3);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producerConf, producer));
std::vector<MessageId> messageIds;
constexpr int numMessages = 7;
Latch latch(numMessages);
for (int i = 0; i < numMessages; i++) {
producer.sendAsync(MessageBuilder().setContent("msg-" + std::to_string(i)).build(),
[i, &messageIds, &latch](Result result, const MessageId& messageId) {
if (result == ResultOk) {
LOG_INFO("Send " << i << " to " << messageId);
messageIds.emplace_back(messageId);
} else {
LOG_ERROR("Failed to send " << i << ": " << messageId);
}
latch.countdown();
});
}
latch.wait(std::chrono::seconds(3));
ASSERT_EQ(messageIds.size(), numMessages);
Reader reader;
bool hasMessageAvailable;
for (size_t i = 0; i < messageIds.size() - 1; i++) {
ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds[i], {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_TRUE(hasMessageAvailable);
}
// The start message ID is exclusive by default, so when we start at the last message, there should be no
// message available.
ASSERT_EQ(ResultOk, client.createReader(topicName, messageIds.back(), {}, reader));
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
EXPECT_FALSE(hasMessageAvailable);
client.close();
}
TEST_P(ReaderTest, testReceiveAfterSeek) {
Client client(serviceUrl);
std::string topicName =
"testReceiveAfterSeek" + std::to_string(time(nullptr)) + std::to_string(isMultiTopic_);
initTopic(topicName);
Producer producer;
ASSERT_EQ(ResultOk, client.createProducer(topicName, producer));
MessageId seekMessageId;
for (int i = 0; i < 5; i++) {
MessageId messageId;
producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), messageId);
if (i == 3) {
seekMessageId = messageId;
}
}
Reader reader;
ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::latest(), {}, reader));
reader.seek(seekMessageId);
bool hasMessageAvailable;
ASSERT_EQ(ResultOk, reader.hasMessageAvailable(hasMessageAvailable));
client.close();
}
INSTANTIATE_TEST_SUITE_P(Pulsar, ReaderTest, ::testing::Values(true, false));