|  | /** | 
|  | * Licensed to the Apache Software Foundation (ASF) under one | 
|  | * or more contributor license agreements.  See the NOTICE file | 
|  | * distributed with this work for additional information | 
|  | * regarding copyright ownership.  The ASF licenses this file | 
|  | * to you under the Apache License, Version 2.0 (the | 
|  | * "License"); you may not use this file except in compliance | 
|  | * with the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *   http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, | 
|  | * software distributed under the License is distributed on an | 
|  | * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | 
|  | * KIND, either express or implied.  See the License for the | 
|  | * specific language governing permissions and limitations | 
|  | * under the License. | 
|  | */ | 
|  | #include <gtest/gtest.h> | 
|  | #include <pulsar/Client.h> | 
|  | #include <pulsar/KeySharedPolicy.h> | 
|  |  | 
|  | #include <cmath> | 
|  | #include <ctime> | 
|  | #include <map> | 
|  | #include <vector> | 
|  |  | 
|  | #include "HttpHelper.h" | 
|  | #include "LogHelper.h" | 
|  | #include "lib/LogUtils.h" | 
|  |  | 
|  | DECLARE_LOG_OBJECT() | 
|  |  | 
|  | using namespace pulsar; | 
|  |  | 
|  | static std::string lookupUrl = "pulsar://localhost:6650"; | 
|  |  | 
|  | class KeySharedPolicyTest : public ::testing::Test { | 
|  | protected: | 
|  | KeySharedPolicyTest() : client(lookupUrl, ClientConfiguration().setPartititionsUpdateInterval(1)) {} | 
|  |  | 
|  | void TearDown() override { client.close(); } | 
|  |  | 
|  | void addProducer(const std::string& topicName) { | 
|  | producers.emplace_back(); | 
|  | auto conf = ProducerConfiguration().setBatchingEnabled(false).setPartitionsRoutingMode( | 
|  | ProducerConfiguration::RoundRobinDistribution); | 
|  | ASSERT_EQ(ResultOk, client.createProducer(topicName, conf, producers.back())); | 
|  | } | 
|  |  | 
|  | ConsumerConfiguration getConsumerConfiguration() { | 
|  | ConsumerConfiguration conf; | 
|  | conf.setConsumerType(ConsumerKeyShared); | 
|  | return conf; | 
|  | } | 
|  |  | 
|  | static constexpr int NUMBER_OF_KEYS = 300; | 
|  |  | 
|  | static Message newIntMessage(int i, const std::string& key, const char* orderingKey = nullptr) { | 
|  | MessageBuilder builder; | 
|  | if (orderingKey) { | 
|  | builder.setOrderingKey(orderingKey); | 
|  | } | 
|  | return builder.setPartitionKey(key).setContent(std::to_string(i)).build(); | 
|  | } | 
|  |  | 
|  | static void sendCallback(Result result, const MessageId&) { ASSERT_EQ(result, ResultOk); } | 
|  |  | 
|  | void receiveAndCheckDistribution(int expectedNumTotalMessages) { | 
|  | keyToConsumer.clear(); | 
|  | messagesPerConsumer.clear(); | 
|  | int totalMessages = 0; | 
|  |  | 
|  | for (size_t i = 0; i < consumers.size(); i++) { | 
|  | auto& consumer = consumers[i]; | 
|  | while (true) { | 
|  | Message msg; | 
|  | Result result = consumer.receive(msg, 3000); | 
|  | if (result == ResultTimeout) { | 
|  | break; | 
|  | } | 
|  |  | 
|  | ASSERT_EQ(result, ResultOk); | 
|  | totalMessages++; | 
|  | messagesPerConsumer[i]++; | 
|  | ASSERT_EQ(ResultOk, consumer.acknowledge(msg)); | 
|  |  | 
|  | if (msg.hasPartitionKey() || msg.hasOrderingKey()) { | 
|  | std::string key = msg.hasOrderingKey() ? msg.getOrderingKey() : msg.getPartitionKey(); | 
|  | auto iter = keyToConsumer.find(key); | 
|  | if (iter == keyToConsumer.end()) { | 
|  | keyToConsumer[key] = i; | 
|  | } else { | 
|  | // check messages with the same key will be consumed by the same consumer | 
|  | ASSERT_EQ(iter->second, i); | 
|  | } | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | LOG_INFO("messagesPerConsumer: " << messagesPerConsumer); | 
|  | int numTotalMessages = 0; | 
|  | for (const auto& kv : messagesPerConsumer) { | 
|  | numTotalMessages += kv.second; | 
|  | } | 
|  | ASSERT_EQ(numTotalMessages, expectedNumTotalMessages); | 
|  |  | 
|  | const double expectedMessagesPerConsumer = static_cast<double>(totalMessages) / consumers.size(); | 
|  | constexpr double PERCENT_ERROR = 0.50; | 
|  | for (const auto& kv : messagesPerConsumer) { | 
|  | int count = kv.second; | 
|  | ASSERT_LT(fabs(count - expectedMessagesPerConsumer), expectedMessagesPerConsumer * PERCENT_ERROR); | 
|  | } | 
|  | } | 
|  |  | 
|  | Client client; | 
|  | std::vector<Producer> producers; | 
|  | std::vector<Consumer> consumers; | 
|  | const std::string subName = "SubscriptionName"; | 
|  |  | 
|  | // key is message's ordering key or partitioned key, value is consumer index | 
|  | std::map<std::string, size_t> keyToConsumer; | 
|  | // key is consumer index, value is the number of message received by | 
|  | std::map<size_t, int> messagesPerConsumer; | 
|  | }; | 
|  |  | 
|  | TEST_F(KeySharedPolicyTest, testStickyConsumer) { | 
|  | const std::string topicName = "KeySharedPolicyTest-sticky-consumer" + std::to_string(time(nullptr)); | 
|  |  | 
|  | consumers.emplace_back(); | 
|  | KeySharedPolicy ksp1; | 
|  | ksp1.setKeySharedMode(STICKY); | 
|  | ksp1.setStickyRanges({StickyRange(0, 20000)}); | 
|  | ConsumerConfiguration consumerConfig1 = getConsumerConfiguration().setKeySharedPolicy(ksp1); | 
|  | Result result = client.subscribe(topicName, subName, consumerConfig1, consumers.back()); | 
|  | ASSERT_EQ(ResultOk, result); | 
|  |  | 
|  | consumers.emplace_back(); | 
|  | KeySharedPolicy ksp2; | 
|  | ksp2.setKeySharedMode(STICKY); | 
|  | ksp2.setStickyRanges({StickyRange(20001, 40000)}); | 
|  | ConsumerConfiguration consumerConfig2 = getConsumerConfiguration().setKeySharedPolicy(ksp2); | 
|  | result = client.subscribe(topicName, subName, consumerConfig2, consumers.back()); | 
|  | ASSERT_EQ(ResultOk, result); | 
|  |  | 
|  | consumers.emplace_back(); | 
|  | KeySharedPolicy ksp3; | 
|  | ksp3.setKeySharedMode(STICKY); | 
|  | ksp3.setStickyRanges({StickyRange(40001, 65535)}); | 
|  | ConsumerConfiguration consumerConfig3 = getConsumerConfiguration().setKeySharedPolicy(ksp3); | 
|  | result = client.subscribe(topicName, subName, consumerConfig3, consumers.back()); | 
|  | ASSERT_EQ(ResultOk, result); | 
|  |  | 
|  | addProducer(topicName); | 
|  |  | 
|  | srand(time(nullptr)); | 
|  | constexpr int numMessagesPerProducer = 1000; | 
|  | for (int i = 0; i < numMessagesPerProducer; i++) { | 
|  | std::string key = std::to_string(rand() % NUMBER_OF_KEYS); | 
|  | producers[0].sendAsync(newIntMessage(i, key), sendCallback); | 
|  | } | 
|  | ASSERT_EQ(ResultOk, producers[0].flush()); | 
|  |  | 
|  | receiveAndCheckDistribution(numMessagesPerProducer); | 
|  | } | 
|  |  | 
|  | TEST_F(KeySharedPolicyTest, ResultConsumerAssignError) { | 
|  | const std::string topicName = | 
|  | "KeySharedPolicyTest-result-consumer-assign-error" + std::to_string(time(nullptr)); | 
|  |  | 
|  | // empty range | 
|  | KeySharedPolicy ksp; | 
|  | ksp.setKeySharedMode(STICKY); | 
|  | ConsumerConfiguration consumerConfig = getConsumerConfiguration().setKeySharedPolicy(ksp); | 
|  | Consumer consumer; | 
|  | ASSERT_EQ(ResultConsumerAssignError, client.subscribe(topicName, subName, consumerConfig, consumer)); | 
|  |  | 
|  | // intersect range | 
|  | KeySharedPolicy ksp1; | 
|  | ksp1.setKeySharedMode(STICKY); | 
|  | ksp1.setStickyRanges({StickyRange(0, 65535)}); | 
|  | ConsumerConfiguration consumerConfig1 = getConsumerConfiguration().setKeySharedPolicy(ksp1); | 
|  | Consumer consumer1; | 
|  | Result result = client.subscribe(topicName, subName, consumerConfig1, consumer1); | 
|  | ASSERT_EQ(ResultOk, result); | 
|  |  | 
|  | KeySharedPolicy ksp2; | 
|  | ksp2.setKeySharedMode(STICKY); | 
|  | ksp2.setStickyRanges({StickyRange(0, 65535)}); | 
|  | ConsumerConfiguration consumerConfig2 = getConsumerConfiguration().setKeySharedPolicy(ksp2); | 
|  | Consumer consumer2; | 
|  | ASSERT_EQ(ResultConsumerAssignError, client.subscribe(topicName, subName, consumerConfig2, consumer2)); | 
|  |  | 
|  | ASSERT_EQ(ResultOk, consumer1.close()); | 
|  | } | 
|  |  | 
|  | TEST_F(KeySharedPolicyTest, InvalidStickyRanges) { | 
|  | KeySharedPolicy ksp; | 
|  | ASSERT_THROW(ksp.setStickyRanges({}), std::invalid_argument); | 
|  | ASSERT_THROW(ksp.setStickyRanges({StickyRange(-1, 10)}), std::invalid_argument); | 
|  | ASSERT_THROW(ksp.setStickyRanges({StickyRange(0, 65536)}), std::invalid_argument); | 
|  | ASSERT_THROW(ksp.setStickyRanges({StickyRange(0, 10), StickyRange(9, 20)}), std::invalid_argument); | 
|  | } | 
|  |  | 
|  | TEST_F(KeySharedPolicyTest, testStickyConsumerExpected) { | 
|  | // Test whether the saved range vector is as expected | 
|  | KeySharedPolicy ksp; | 
|  | ksp.setStickyRanges({StickyRange(0, 300), StickyRange(400, 500)}); | 
|  |  | 
|  | std::vector<std::pair<int, int>> expectedStickyRange{{0, 300}, {400, 500}}; | 
|  | ASSERT_EQ(ksp.getStickyRanges(), expectedStickyRange); | 
|  | } | 
|  |  | 
|  | TEST_F(KeySharedPolicyTest, testStickyConsumerVectors) { | 
|  | // test whether the saved range vector is the same when using initializer_list or vector | 
|  | KeySharedPolicy ksp; | 
|  | ksp.setStickyRanges({StickyRange(0, 300), StickyRange(400, 500)}); | 
|  |  | 
|  | KeySharedPolicy ksp2; | 
|  | std::vector<std::pair<int, int>> stickyRangeVec{{0, 300}, {400, 500}}; | 
|  | ksp2.setStickyRanges(stickyRangeVec); | 
|  |  | 
|  | ASSERT_EQ(ksp.getStickyRanges(), ksp2.getStickyRanges()); | 
|  | } |