blob: 644f42c409f42a1725e53ece06da48669208391b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <functional>
#include <memory>
#include <mutex>
#include "ConsumerTest.h"
#include "lib/Latch.h"
#include "lib/LogUtils.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
static int totalMessages = 10;
static int globalCount = 0;
static std::string lookupUrl = "pulsar://localhost:6650";
static std::string contentBase = "msg-";
static void messageListenerFunction(Consumer consumer, const Message& msg, Latch& latch) {
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
std::ostringstream ss;
ss << contentBase << globalCount;
ASSERT_EQ(ss.str(), msg.getDataAsString());
globalCount++;
latch.countdown();
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
}
TEST(ZeroQueueSizeTest, testProduceConsume) {
Client client(lookupUrl);
std::string topicName = "zero-queue-size";
std::string subName = "my-sub-name";
Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setReceiverQueueSize(0);
result = client.subscribe(topicName, subName, consConfig, consumer);
ASSERT_EQ(ResultOk, result);
for (int i = 0; i < totalMessages; i++) {
std::ostringstream ss;
ss << contentBase << i;
Message msg = MessageBuilder().setContent(ss.str()).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
}
for (int i = 0; i < totalMessages; i++) {
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
std::ostringstream ss;
ss << contentBase << i;
Message receivedMsg;
consumer.receive(receivedMsg);
ASSERT_EQ(ss.str(), receivedMsg.getDataAsString());
ASSERT_EQ(0, ConsumerTest::getNumOfMessagesInQueue(consumer));
}
consumer.unsubscribe();
consumer.close();
producer.close();
client.close();
}
TEST(ZeroQueueSizeTest, testMessageListener) {
Client client(lookupUrl);
std::string topicName = "zero-queue-size-listener";
std::string subName = "my-sub-name";
Producer producer;
Result result = client.createProducer(topicName, producer);
ASSERT_EQ(ResultOk, result);
Consumer consumer;
ConsumerConfiguration consConfig;
consConfig.setReceiverQueueSize(0);
Latch latch(totalMessages);
consConfig.setMessageListener(
std::bind(messageListenerFunction, std::placeholders::_1, std::placeholders::_2, latch));
result = client.subscribe(topicName, subName, consConfig, consumer);
ASSERT_EQ(ResultOk, result);
globalCount = 0;
for (int i = 0; i < totalMessages; i++) {
std::ostringstream ss;
ss << contentBase << i;
Message msg = MessageBuilder().setContent(ss.str()).build();
result = producer.send(msg);
ASSERT_EQ(ResultOk, result);
}
ASSERT_TRUE(latch.wait(std::chrono::seconds(30)));
ASSERT_EQ(globalCount, totalMessages);
consumer.unsubscribe();
consumer.close();
producer.close();
client.close();
}
static ConsumerConfiguration zeroQueueSharedConsumerConf(
const std::string& name, std::function<void(Consumer, const Message&)> callback) {
ConsumerConfiguration conf;
conf.setConsumerType(ConsumerShared);
conf.setReceiverQueueSize(0);
conf.setSubscriptionInitialPosition(InitialPositionEarliest);
conf.setMessageListener([name, callback](Consumer consumer, const Message& msg) {
LOG_INFO(name << " received " << msg.getDataAsString() << " from " << msg.getMessageId());
callback(consumer, msg);
});
return conf;
}
class IntVector {
public:
size_t add(int i) {
std::lock_guard<std::mutex> lock(mutex_);
data_.emplace_back(i);
return data_.size();
}
std::vector<int> data() const {
std::lock_guard<std::mutex> lock(mutex_);
return data_;
}
private:
std::vector<int> data_;
mutable std::mutex mutex_;
};
TEST(ZeroQueueSizeTest, testPauseResume) {
Client client(lookupUrl);
const auto topic = "ZeroQueueSizeTestPauseListener-" + std::to_string(time(nullptr));
const auto subscription = "my-sub";
auto intToMessage = [](int i) { return MessageBuilder().setContent(std::to_string(i)).build(); };
auto messageToInt = [](const Message& msg) { return std::stoi(msg.getDataAsString()); };
// 1. Produce 10 messages
Producer producer;
const auto producerConf = ProducerConfiguration().setBatchingEnabled(false);
ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf, producer));
for (int i = 0; i < 10; i++) {
MessageId id;
ASSERT_EQ(ResultOk, producer.send(intToMessage(i), id));
LOG_INFO("Send " << i << " to " << id);
}
// 2. consumer-1 receives 1 message and pause
std::mutex mtx;
std::condition_variable condConsumer1FirstMessage;
std::condition_variable condConsumer1Completed;
IntVector messages1;
const auto conf1 = zeroQueueSharedConsumerConf("consumer-1", [&](Consumer consumer, const Message& msg) {
const auto numReceived = messages1.add(messageToInt(msg));
if (numReceived == 1) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer1FirstMessage.notify_all();
} else if (numReceived == 5) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer1Completed.notify_all();
}
});
Consumer consumer1;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf1, consumer1));
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer1FirstMessage.wait_for(lock, std::chrono::seconds(3)),
std::cv_status::no_timeout);
ASSERT_EQ(messages1.data(), (std::vector<int>{0}));
}
// 3. consumer-2 receives 5 messages and pause
std::condition_variable condConsumer2Completed;
IntVector messages2;
const auto conf2 = zeroQueueSharedConsumerConf("consumer-2", [&](Consumer consumer, const Message& msg) {
const int numReceived = messages2.add(messageToInt(msg));
if (numReceived == 5) {
ASSERT_EQ(ResultOk, consumer.pauseMessageListener());
condConsumer2Completed.notify_all();
}
});
Consumer consumer2;
ASSERT_EQ(ResultOk, client.subscribe(topic, subscription, conf2, consumer2));
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer2Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout);
ASSERT_EQ(messages2.data(), (std::vector<int>{1, 2, 3, 4, 5}));
}
// 4. consumer-1 resumes listening, and receives last 4 messages
ASSERT_EQ(ResultOk, consumer1.resumeMessageListener());
{
std::unique_lock<std::mutex> lock(mtx);
ASSERT_EQ(condConsumer1Completed.wait_for(lock, std::chrono::seconds(3)), std::cv_status::no_timeout);
ASSERT_EQ(messages1.data(), (std::vector<int>{0, 6, 7, 8, 9}));
}
client.close();
}
TEST(ZeroQueueSizeTest, testPauseResumeNoReconnection) {
Client client(lookupUrl);
const auto topic = "ZeroQueueSizeTestPauseResumeNoReconnection-" + std::to_string(time(nullptr));
std::mutex mtx;
std::condition_variable cond;
bool running = true;
auto notify = [&mtx, &cond, &running] {
std::unique_lock<std::mutex> lock(mtx);
running = false;
cond.notify_all();
};
auto wait = [&mtx, &cond, &running] {
std::unique_lock<std::mutex> lock(mtx);
running = true;
while (running) {
cond.wait(lock);
}
};
std::mutex mtxForMessages;
std::vector<std::string> receivedMessages;
ConsumerConfiguration consumerConf;
consumerConf.setReceiverQueueSize(0);
consumerConf.setMessageListener(
[&mtxForMessages, &receivedMessages, &notify](Consumer consumer, const Message& msg) {
std::unique_lock<std::mutex> lock(mtxForMessages);
receivedMessages.emplace_back(msg.getDataAsString());
lock.unlock();
consumer.acknowledge(msg);
notify(); // notify the consumer that a new message arrived
});
Consumer consumer;
ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumerConf, consumer));
Producer producer;
ASSERT_EQ(ResultOk,
client.createProducer(topic, ProducerConfiguration().setBatchingEnabled(false), producer));
constexpr int numMessages = 300;
for (int i = 0; i < numMessages; i++) {
const auto message = MessageBuilder().setContent(std::to_string(i)).build();
consumer.resumeMessageListener();
producer.sendAsync(message, {});
wait(); // wait until a new message is received
consumer.pauseMessageListener();
}
std::unique_lock<std::mutex> lock(mtxForMessages);
ASSERT_EQ(receivedMessages.size(), numMessages);
for (int i = 0; i < numMessages; i++) {
ASSERT_EQ(i, std::stoi(receivedMessages[i]));
}
lock.unlock();
client.close();
}