blob: d032dd4235fb7cd28690f4376814e7abd7dda61a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <pulsar/KeySharedPolicy.h>
#include <cmath>
#include <ctime>
#include <map>
#include <vector>
#include "HttpHelper.h"
#include "LogHelper.h"
#include "lib/LogUtils.h"
DECLARE_LOG_OBJECT()
using namespace pulsar;
static std::string lookupUrl = "pulsar://localhost:6650";
class KeySharedPolicyTest : public ::testing::Test {
protected:
KeySharedPolicyTest() : client(lookupUrl, ClientConfiguration().setPartititionsUpdateInterval(1)) {}
void TearDown() override { client.close(); }
void addProducer(const std::string& topicName) {
producers.emplace_back();
auto conf = ProducerConfiguration().setBatchingEnabled(false).setPartitionsRoutingMode(
ProducerConfiguration::RoundRobinDistribution);
ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producers.back()));
}
ConsumerConfiguration getConsumerConfiguration() {
ConsumerConfiguration conf;
conf.setConsumerType(ConsumerKeyShared);
return conf;
}
static constexpr int NUMBER_OF_KEYS = 300;
static Message newIntMessage(int i, const std::string& key, const char* orderingKey = nullptr) {
MessageBuilder builder;
if (orderingKey) {
builder.setOrderingKey(orderingKey);
}
return builder.setPartitionKey(key).setContent(std::to_string(i)).build();
}
static void sendCallback(Result result, const MessageId&) { ASSERT_EQ(result, ResultOk); }
void receiveAndCheckDistribution(int expectedNumTotalMessages) {
keyToConsumer.clear();
messagesPerConsumer.clear();
int totalMessages = 0;
for (size_t i = 0; i < consumers.size(); i++) {
auto& consumer = consumers[i];
while (true) {
Message msg;
Result result = consumer.receive(msg, 3000);
if (result == ResultTimeout) {
break;
}
ASSERT_EQ(result, ResultOk);
totalMessages++;
messagesPerConsumer[i]++;
ASSERT_EQ(ResultOk, consumer.acknowledge(msg));
if (msg.hasPartitionKey() || msg.hasOrderingKey()) {
std::string key = msg.hasOrderingKey() ? msg.getOrderingKey() : msg.getPartitionKey();
auto iter = keyToConsumer.find(key);
if (iter == keyToConsumer.end()) {
keyToConsumer[key] = i;
} else {
// check messages with the same key will be consumed by the same consumer
ASSERT_EQ(iter->second, i);
}
}
}
}
LOG_INFO("messagesPerConsumer: " << messagesPerConsumer);
int numTotalMessages = 0;
for (const auto& kv : messagesPerConsumer) {
numTotalMessages += kv.second;
}
ASSERT_EQ(numTotalMessages, expectedNumTotalMessages);
const double expectedMessagesPerConsumer = static_cast<double>(totalMessages) / consumers.size();
constexpr double PERCENT_ERROR = 0.50;
for (const auto& kv : messagesPerConsumer) {
int count = kv.second;
ASSERT_LT(fabs(count - expectedMessagesPerConsumer), expectedMessagesPerConsumer * PERCENT_ERROR);
}
}
Client client;
std::vector<Producer> producers;
std::vector<Consumer> consumers;
const std::string subName = "SubscriptionName";
// key is message's ordering key or partitioned key, value is consumer index
std::map<std::string, size_t> keyToConsumer;
// key is consumer index, value is the number of message received by
std::map<size_t, int> messagesPerConsumer;
};
TEST_F(KeySharedPolicyTest, testStickyConsumer) {
const std::string topicName = "KeySharedPolicyTest-sticky-consumer" + std::to_string(time(nullptr));
consumers.emplace_back();
KeySharedPolicy ksp1;
ksp1.setKeySharedMode(STICKY);
ksp1.setStickyRanges({StickyRange(0, 20000)});
ConsumerConfiguration consumerConfig1 = getConsumerConfiguration().setKeySharedPolicy(ksp1);
Result result = client.subscribe(topicName, subName, consumerConfig1, consumers.back());
ASSERT_EQ(ResultOk, result);
consumers.emplace_back();
KeySharedPolicy ksp2;
ksp2.setKeySharedMode(STICKY);
ksp2.setStickyRanges({StickyRange(20001, 40000)});
ConsumerConfiguration consumerConfig2 = getConsumerConfiguration().setKeySharedPolicy(ksp2);
result = client.subscribe(topicName, subName, consumerConfig2, consumers.back());
ASSERT_EQ(ResultOk, result);
consumers.emplace_back();
KeySharedPolicy ksp3;
ksp3.setKeySharedMode(STICKY);
ksp3.setStickyRanges({StickyRange(40001, 65535)});
ConsumerConfiguration consumerConfig3 = getConsumerConfiguration().setKeySharedPolicy(ksp3);
result = client.subscribe(topicName, subName, consumerConfig3, consumers.back());
ASSERT_EQ(ResultOk, result);
addProducer(topicName);
srand(time(nullptr));
constexpr int numMessagesPerProducer = 1000;
for (int i = 0; i < numMessagesPerProducer; i++) {
std::string key = std::to_string(rand() % NUMBER_OF_KEYS);
producers[0].sendAsync(newIntMessage(i, key), sendCallback);
}
ASSERT_EQ(ResultOk, producers[0].flush());
receiveAndCheckDistribution(numMessagesPerProducer);
}
TEST_F(KeySharedPolicyTest, ResultConsumerAssignError) {
const std::string topicName =
"KeySharedPolicyTest-result-consumer-assign-error" + std::to_string(time(nullptr));
// empty range
KeySharedPolicy ksp;
ksp.setKeySharedMode(STICKY);
ConsumerConfiguration consumerConfig = getConsumerConfiguration().setKeySharedPolicy(ksp);
Consumer consumer;
ASSERT_EQ(ResultConsumerAssignError, client.subscribe(topicName, subName, consumerConfig, consumer));
// intersect range
KeySharedPolicy ksp1;
ksp1.setKeySharedMode(STICKY);
ksp1.setStickyRanges({StickyRange(0, 65535)});
ConsumerConfiguration consumerConfig1 = getConsumerConfiguration().setKeySharedPolicy(ksp1);
Consumer consumer1;
Result result = client.subscribe(topicName, subName, consumerConfig1, consumer1);
ASSERT_EQ(ResultOk, result);
KeySharedPolicy ksp2;
ksp2.setKeySharedMode(STICKY);
ksp2.setStickyRanges({StickyRange(0, 65535)});
ConsumerConfiguration consumerConfig2 = getConsumerConfiguration().setKeySharedPolicy(ksp2);
Consumer consumer2;
ASSERT_EQ(ResultConsumerAssignError, client.subscribe(topicName, subName, consumerConfig2, consumer2));
ASSERT_EQ(ResultOk, consumer1.close());
}
TEST_F(KeySharedPolicyTest, InvalidStickyRanges) {
KeySharedPolicy ksp;
ASSERT_THROW(ksp.setStickyRanges({}), std::invalid_argument);
ASSERT_THROW(ksp.setStickyRanges({StickyRange(-1, 10)}), std::invalid_argument);
ASSERT_THROW(ksp.setStickyRanges({StickyRange(0, 65536)}), std::invalid_argument);
ASSERT_THROW(ksp.setStickyRanges({StickyRange(0, 10), StickyRange(9, 20)}), std::invalid_argument);
}
TEST_F(KeySharedPolicyTest, testStickyConsumerExpected) {
// Test whether the saved range vector is as expected
KeySharedPolicy ksp;
ksp.setStickyRanges({StickyRange(0, 300), StickyRange(400, 500)});
std::vector<std::pair<int, int>> expectedStickyRange{{0, 300}, {400, 500}};
ASSERT_EQ(ksp.getStickyRanges(), expectedStickyRange);
}
TEST_F(KeySharedPolicyTest, testStickyConsumerVectors) {
// test whether the saved range vector is the same when using initializer_list or vector
KeySharedPolicy ksp;
ksp.setStickyRanges({StickyRange(0, 300), StickyRange(400, 500)});
KeySharedPolicy ksp2;
std::vector<std::pair<int, int>> stickyRangeVec{{0, 300}, {400, 500}};
ksp2.setStickyRanges(stickyRangeVec);
ASSERT_EQ(ksp.getStickyRanges(), ksp2.getStickyRanges());
}