blob: 010e5cb030f5eeb2be9ed26ca2772eeacc17c6b8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <gtest/gtest.h>
#include <pulsar/Client.h>
#include <chrono>
#include <memory>
#include <set>
#include <thread>
#include "CustomRoutingPolicy.h"
#include "HttpHelper.h"
using namespace pulsar;
static const std::string serviceUrl = "pulsar://localhost:6650";
static const std::string adminUrl = "http://localhost:8080/";
static ClientConfiguration newClientConfig(bool enablePartitionsUpdate) {
ClientConfiguration clientConfig;
if (enablePartitionsUpdate) {
clientConfig.setPartititionsUpdateInterval(1); // 1s
} else {
clientConfig.setPartititionsUpdateInterval(0); // disable
}
return clientConfig;
}
// In round robin routing mode, if N messages were sent to a topic with N partitions, each partition must have
// received 1 message. So we check whether producer/consumer have increased along with partitions by checking
// partitions' count of N messages.
// Use std::set because it doesn't allow repeated elements.
class PartitionsSet {
public:
size_t size() const { return names_.size(); }
Result initProducer(std::string topicName, bool enablePartitionsUpdate,
bool lazyStartPartitionedProducers) {
clientForProducer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
const auto producerConfig = ProducerConfiguration()
.setMessageRouter(std::make_shared<SimpleRoundRobinRoutingPolicy>())
.setLazyStartPartitionedProducers(lazyStartPartitionedProducers);
return clientForProducer_->createProducer(topicName, producerConfig, producer_);
}
Result initConsumer(std::string topicName, bool enablePartitionsUpdate) {
clientForConsumer_.reset(new Client(serviceUrl, newClientConfig(enablePartitionsUpdate)));
return clientForConsumer_->subscribe(topicName, "SubscriptionName", consumer_);
}
void close() {
producer_.close();
clientForProducer_->close();
consumer_.close();
clientForConsumer_->close();
}
void doSendAndReceive(int numMessagesSend, int numMessagesReceive) {
names_.clear();
for (int i = 0; i < numMessagesSend; i++) {
producer_.send(MessageBuilder().setContent("a").build());
}
while (numMessagesReceive > 0) {
Message msg;
if (consumer_.receive(msg, 100) == ResultOk) {
names_.emplace(msg.getTopicName());
consumer_.acknowledge(msg);
numMessagesReceive--;
}
}
}
private:
std::set<std::string> names_;
std::unique_ptr<Client> clientForProducer_;
Producer producer_;
std::unique_ptr<Client> clientForConsumer_;
Consumer consumer_;
};
static void waitForPartitionsUpdated() {
// Assume producer and consumer have updated partitions in 3 seconds if enabled
std::this_thread::sleep_for(std::chrono::seconds(3));
}
TEST(PartitionsUpdateTest, testConfigPartitionsUpdateInterval) {
ClientConfiguration clientConfig;
ASSERT_EQ(60, clientConfig.getPartitionsUpdateInterval());
clientConfig.setPartititionsUpdateInterval(0);
ASSERT_EQ(0, clientConfig.getPartitionsUpdateInterval());
clientConfig.setPartititionsUpdateInterval(1);
ASSERT_EQ(1, clientConfig.getPartitionsUpdateInterval());
clientConfig.setPartititionsUpdateInterval(-1);
ASSERT_EQ(static_cast<unsigned int>(-1), clientConfig.getPartitionsUpdateInterval());
}
void testPartitionsUpdate(bool lazyStartPartitionedProducers, std::string topicNameSuffix) {
std::string topicName = "persistent://" + topicNameSuffix;
std::string topicOperateUrl = adminUrl + "admin/v2/persistent/" + topicNameSuffix + "/partitions";
// Ensure `topicName` doesn't exist before created
makeDeleteRequest(topicOperateUrl);
// Create a 2 partitions topic
int res = makePutRequest(topicOperateUrl, "2");
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
PartitionsSet partitionsSet;
// 1. Both producer and consumer enable partitions update
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, lazyStartPartitionedProducers));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
res = makePostRequest(topicOperateUrl, "3"); // update partitions to 3
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
partitionsSet.doSendAndReceive(3, 3);
ASSERT_EQ(3, partitionsSet.size());
partitionsSet.close();
// 2. Only producer enables partitions update
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, true, false));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
res = makePostRequest(topicOperateUrl, "5"); // update partitions to 5
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
partitionsSet.doSendAndReceive(5, 3); // can't consume partition-3,4
ASSERT_EQ(3, partitionsSet.size());
partitionsSet.close();
// 3. Only consumer enables partitions update
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, true));
res = makePostRequest(topicOperateUrl, "7"); // update partitions to 7
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
partitionsSet.doSendAndReceive(7, 7);
ASSERT_EQ(5, partitionsSet.size());
partitionsSet.close();
// 4. Both producer and consumer disables partitions update
ASSERT_EQ(ResultOk, partitionsSet.initProducer(topicName, false, false));
ASSERT_EQ(ResultOk, partitionsSet.initConsumer(topicName, false));
res = makePostRequest(topicOperateUrl, "10"); // update partitions to 10
ASSERT_TRUE(res == 204 || res == 409) << "res: " << res;
waitForPartitionsUpdated();
partitionsSet.doSendAndReceive(10, 10);
ASSERT_EQ(7, partitionsSet.size());
partitionsSet.close();
}
TEST(PartitionsUpdateTest, testPartitionsUpdate) {
testPartitionsUpdate(false, "public/default/partitions-update-test-topic");
}
TEST(PartitionsUpdateTest, testPartitionsUpdateWithLazyProducers) {
testPartitionsUpdate(true, "public/default/partitions-update-test-topic-lazy");
}