| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <BinaryProtoLookupService.h> |
| #include <Future.h> |
| #include <HTTPLookupService.h> |
| #include <Utils.h> |
| #include <gtest/gtest.h> |
| #include <pulsar/Authentication.h> |
| #include <pulsar/Client.h> |
| |
| #include <algorithm> |
| #include <boost/exception/all.hpp> |
| #include <future> |
| #include <stdexcept> |
| |
| #include "HttpHelper.h" |
| #include "PulsarFriend.h" |
| #include "lib/ClientConnection.h" |
| #include "lib/ConnectionPool.h" |
| #include "lib/LogUtils.h" |
| #include "lib/RetryableLookupService.h" |
| #include "lib/TimeUtils.h" |
| |
| using namespace pulsar; |
| |
| DECLARE_LOG_OBJECT() |
| |
| static std::string binaryLookupUrl = "pulsar://localhost:6650"; |
| static std::string httpLookupUrl = "http://localhost:8080"; |
| |
| extern std::string unique_str(); |
| |
| TEST(LookupServiceTest, basicLookup) { |
| ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1); |
| AuthenticationPtr authData = AuthFactory::Disabled(); |
| std::string url = "pulsar://localhost:6650"; |
| ClientConfiguration conf; |
| ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1)); |
| ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, ""); |
| ServiceNameResolver serviceNameResolver(url); |
| BinaryProtoLookupService lookupService(serviceNameResolver, pool_, conf); |
| |
| TopicNamePtr topicName = TopicName::get("topic"); |
| |
| Future<Result, LookupDataResultPtr> partitionFuture = lookupService.getPartitionMetadataAsync(topicName); |
| LookupDataResultPtr lookupData; |
| Result result = partitionFuture.get(lookupData); |
| ASSERT_TRUE(lookupData != NULL); |
| ASSERT_EQ(0, lookupData->getPartitions()); |
| |
| const auto topicNamePtr = TopicName::get("topic"); |
| auto future = lookupService.getBroker(*topicNamePtr); |
| LookupService::LookupResult lookupResult; |
| result = future.get(lookupResult); |
| |
| ASSERT_EQ(ResultOk, result); |
| ASSERT_EQ(url, lookupResult.logicalAddress); |
| ASSERT_EQ(url, lookupResult.physicalAddress); |
| } |
| |
| static void testMultiAddresses(LookupService& lookupService) { |
| std::vector<Result> results; |
| constexpr int numRequests = 6; |
| |
| auto verifySuccessCount = [&results] { |
| // Only half of them succeeded |
| ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2); |
| ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultRetryable), numRequests / 2); |
| }; |
| |
| for (int i = 0; i < numRequests; i++) { |
| const auto topicNamePtr = TopicName::get("topic"); |
| LookupService::LookupResult lookupResult; |
| const auto result = lookupService.getBroker(*topicNamePtr).get(lookupResult); |
| LOG_INFO("getBroker [" << i << "] " << result << ", " << lookupResult); |
| results.emplace_back(result); |
| } |
| verifySuccessCount(); |
| |
| results.clear(); |
| for (int i = 0; i < numRequests; i++) { |
| LookupDataResultPtr data; |
| const auto result = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data); |
| LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result); |
| results.emplace_back(result); |
| } |
| verifySuccessCount(); |
| |
| results.clear(); |
| for (int i = 0; i < numRequests; i++) { |
| NamespaceTopicsPtr data; |
| const auto result = lookupService |
| .getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName(), |
| CommandGetTopicsOfNamespace_Mode_PERSISTENT) |
| .get(data); |
| LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result); |
| results.emplace_back(result); |
| } |
| verifySuccessCount(); |
| } |
| |
| TEST(LookupServiceTest, testMultiAddresses) { |
| ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), true, ""); |
| ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999"); |
| ClientConfiguration conf; |
| BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, conf); |
| testMultiAddresses(binaryLookupService); |
| |
| // HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test |
| ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999"); |
| auto httpLookupServicePtr = std::make_shared<HTTPLookupService>( |
| std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled()); |
| testMultiAddresses(*httpLookupServicePtr); |
| } |
| TEST(LookupServiceTest, testRetry) { |
| auto executorProvider = std::make_shared<ExecutorServiceProvider>(1); |
| ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, ""); |
| ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost"); |
| ClientConfiguration conf; |
| |
| auto lookupService = RetryableLookupService::create( |
| std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf), 30 /* seconds */, |
| executorProvider); |
| |
| PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); |
| auto topicNamePtr = TopicName::get("lookup-service-test-retry"); |
| auto future1 = lookupService->getBroker(*topicNamePtr); |
| LookupService::LookupResult lookupResult; |
| ASSERT_EQ(ResultOk, future1.get(lookupResult)); |
| LOG_INFO("getBroker returns logicalAddress: " << lookupResult.logicalAddress |
| << ", physicalAddress: " << lookupResult.physicalAddress); |
| |
| PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); |
| auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr); |
| LookupDataResultPtr lookupDataResultPtr; |
| ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr)); |
| LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions"); |
| |
| PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0); |
| auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), |
| CommandGetTopicsOfNamespace_Mode_PERSISTENT); |
| NamespaceTopicsPtr namespaceTopicsPtr; |
| ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr)); |
| LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics"); |
| |
| std::atomic_int retryCount{0}; |
| constexpr int totalRetryCount = 3; |
| auto future4 = lookupService->executeAsync<int>("key", [&retryCount]() -> Future<Result, int> { |
| Promise<Result, int> promise; |
| if (++retryCount < totalRetryCount) { |
| LOG_INFO("Retry count: " << retryCount); |
| promise.setFailed(ResultRetryable); |
| } else { |
| LOG_INFO("Retry done with " << retryCount << " times"); |
| promise.setValue(100); |
| } |
| return promise.getFuture(); |
| }); |
| int customResult = 0; |
| ASSERT_EQ(ResultOk, future4.get(customResult)); |
| ASSERT_EQ(customResult, 100); |
| ASSERT_EQ(retryCount.load(), totalRetryCount); |
| |
| ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); |
| } |
| |
| TEST(LookupServiceTest, testTimeout) { |
| auto executorProvider = std::make_shared<ExecutorServiceProvider>(1); |
| ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true, ""); |
| ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904"); |
| ClientConfiguration conf; |
| |
| constexpr int timeoutInSeconds = 2; |
| auto lookupService = RetryableLookupService::create( |
| std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, conf), timeoutInSeconds, |
| executorProvider); |
| auto topicNamePtr = TopicName::get("lookup-service-test-retry"); |
| |
| decltype(std::chrono::high_resolution_clock::now()) startTime; |
| auto beforeMethod = [&startTime] { startTime = std::chrono::high_resolution_clock::now(); }; |
| auto afterMethod = [&startTime](const std::string& name) { |
| auto timeInterval = std::chrono::duration_cast<std::chrono::milliseconds>( |
| std::chrono::high_resolution_clock::now() - startTime) |
| .count(); |
| LOG_INFO(name << " took " << timeInterval << " seconds"); |
| ASSERT_TRUE(timeInterval >= timeoutInSeconds * 1000L); |
| }; |
| |
| beforeMethod(); |
| auto future1 = lookupService->getBroker(*topicNamePtr); |
| LookupService::LookupResult lookupResult; |
| ASSERT_EQ(ResultTimeout, future1.get(lookupResult)); |
| afterMethod("getBroker"); |
| |
| beforeMethod(); |
| auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr); |
| LookupDataResultPtr lookupDataResultPtr; |
| ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr)); |
| afterMethod("getPartitionMetadataAsync"); |
| |
| beforeMethod(); |
| auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName(), |
| CommandGetTopicsOfNamespace_Mode_PERSISTENT); |
| NamespaceTopicsPtr namespaceTopicsPtr; |
| ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr)); |
| afterMethod("getTopicsOfNamespaceAsync"); |
| |
| ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0); |
| } |
| |
| class LookupServiceTest : public ::testing::TestWithParam<std::string> { |
| public: |
| void SetUp() override { client_ = Client{GetParam()}; } |
| void TearDown() override { client_.close(); } |
| |
| protected: |
| Client client_{httpLookupUrl}; |
| }; |
| |
| TEST_P(LookupServiceTest, basicGetNamespaceTopics) { |
| Result result; |
| |
| auto nsName = NamespaceName::get("public", GetParam().substr(0, 4) + std::to_string(time(nullptr))); |
| std::string topicName1 = "persistent://" + nsName->toString() + "/basicGetNamespaceTopics1"; |
| std::string topicName2 = "persistent://" + nsName->toString() + "/basicGetNamespaceTopics2"; |
| std::string topicName3 = "non-persistent://" + nsName->toString() + "/basicGetNamespaceTopics3"; |
| |
| // 0. create a namespace |
| auto createNsUrl = httpLookupUrl + "/admin/v2/namespaces/" + nsName->toString(); |
| auto res = makePutRequest(createNsUrl, ""); |
| ASSERT_FALSE(res != 204 && res != 409); |
| |
| // 1. trigger auto create topic |
| Producer producer1; |
| result = client_.createProducer(topicName1, producer1); |
| ASSERT_EQ(ResultOk, result); |
| Producer producer2; |
| result = client_.createProducer(topicName2, producer2); |
| ASSERT_EQ(ResultOk, result); |
| Producer producer3; |
| result = client_.createProducer(topicName3, producer3); |
| ASSERT_EQ(ResultOk, result); |
| |
| // 2. verify getTopicsOfNamespace by regex mode. |
| auto lookupServicePtr = PulsarFriend::getClientImplPtr(client_)->getLookup(); |
| auto verifyGetTopics = [&](CommandGetTopicsOfNamespace_Mode mode, |
| const std::set<std::string>& expectedTopics) { |
| Future<Result, NamespaceTopicsPtr> getTopicsFuture = |
| lookupServicePtr->getTopicsOfNamespaceAsync(nsName, mode); |
| NamespaceTopicsPtr topicsData; |
| result = getTopicsFuture.get(topicsData); |
| ASSERT_EQ(ResultOk, result); |
| ASSERT_TRUE(topicsData != NULL); |
| std::set<std::string> actualTopics(topicsData->begin(), topicsData->end()); |
| ASSERT_EQ(expectedTopics, actualTopics); |
| }; |
| verifyGetTopics(CommandGetTopicsOfNamespace_Mode_PERSISTENT, {topicName1, topicName2}); |
| verifyGetTopics(CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT, {topicName3}); |
| verifyGetTopics(CommandGetTopicsOfNamespace_Mode_ALL, {topicName1, topicName2, topicName3}); |
| |
| client_.close(); |
| } |
| |
| TEST_P(LookupServiceTest, testGetSchema) { |
| const std::string topic = "testGetSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); |
| std::string jsonSchema = |
| R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; |
| |
| StringMap properties; |
| properties.emplace("key1", "value1"); |
| properties.emplace("key2", "value2"); |
| |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setSchema(SchemaInfo(SchemaType::JSON, "json", jsonSchema, properties)); |
| Producer producer; |
| ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); |
| |
| auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); |
| auto lookup = clientImplPtr->getLookup(); |
| |
| SchemaInfo schemaInfo; |
| auto future = lookup->getSchema(TopicName::get(topic)); |
| ASSERT_EQ(ResultOk, future.get(schemaInfo)); |
| ASSERT_EQ(jsonSchema, schemaInfo.getSchema()); |
| ASSERT_EQ(SchemaType::JSON, schemaInfo.getSchemaType()); |
| ASSERT_EQ(properties, schemaInfo.getProperties()); |
| } |
| |
| TEST_P(LookupServiceTest, testGetSchemaNotFound) { |
| const std::string topic = |
| "testGetSchemaNotFund" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); |
| |
| Producer producer; |
| ASSERT_EQ(ResultOk, client_.createProducer(topic, producer)); |
| |
| auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); |
| auto lookup = clientImplPtr->getLookup(); |
| |
| SchemaInfo schemaInfo; |
| auto future = lookup->getSchema(TopicName::get(topic)); |
| ASSERT_EQ(ResultTopicNotFound, future.get(schemaInfo)); |
| } |
| |
| TEST_P(LookupServiceTest, testGetKeyValueSchema) { |
| const std::string topic = |
| "testGetKeyValueSchema" + std::to_string(time(nullptr)) + GetParam().substr(0, 4); |
| StringMap properties; |
| properties.emplace("key1", "value1"); |
| properties.emplace("key2", "value2"); |
| std::string jsonSchema = |
| R"({"type":"record","name":"cpx","fields":[{"name":"re","type":"double"},{"name":"im","type":"double"}]})"; |
| SchemaInfo keySchema(JSON, "key-json", jsonSchema, properties); |
| SchemaInfo valueSchema(JSON, "value-json", jsonSchema, properties); |
| SchemaInfo keyValueSchema(keySchema, valueSchema, KeyValueEncodingType::INLINE); |
| |
| ProducerConfiguration producerConfiguration; |
| producerConfiguration.setSchema(keyValueSchema); |
| Producer producer; |
| ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConfiguration, producer)); |
| |
| auto clientImplPtr = PulsarFriend::getClientImplPtr(client_); |
| auto lookup = clientImplPtr->getLookup(); |
| |
| SchemaInfo schemaInfo; |
| auto future = lookup->getSchema(TopicName::get(topic)); |
| ASSERT_EQ(ResultOk, future.get(schemaInfo)); |
| ASSERT_EQ(keyValueSchema.getSchema(), schemaInfo.getSchema()); |
| ASSERT_EQ(SchemaType::KEY_VALUE, schemaInfo.getSchemaType()); |
| ASSERT_FALSE(schemaInfo.getProperties().empty()); |
| } |
| |
| TEST_P(LookupServiceTest, testGetSchemaByVersion) { |
| const auto topic = "testGetSchemaByVersion" + unique_str() + GetParam().substr(0, 4); |
| const std::string schema1 = R"({ |
| "type": "record", |
| "name": "User", |
| "namespace": "test", |
| "fields": [ |
| {"name": "name", "type": ["null", "string"]}, |
| {"name": "age", "type": "int"} |
| ] |
| })"; |
| const std::string schema2 = R"({ |
| "type": "record", |
| "name": "User", |
| "namespace": "test", |
| "fields": [ |
| {"name": "age", "type": "int"}, |
| {"name": "name", "type": ["null", "string"]} |
| ] |
| })"; |
| ProducerConfiguration producerConf1; |
| producerConf1.setSchema(SchemaInfo{AVRO, "Avro", schema1}); |
| Producer producer1; |
| ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf1, producer1)); |
| ProducerConfiguration producerConf2; |
| producerConf2.setSchema(SchemaInfo{AVRO, "Avro", schema2}); |
| Producer producer2; |
| ASSERT_EQ(ResultOk, client_.createProducer(topic, producerConf2, producer2)); |
| |
| // Though these messages are invalid, the C++ client can send them successfully |
| producer1.send(MessageBuilder().setContent("msg0").build()); |
| producer2.send(MessageBuilder().setContent("msg1").build()); |
| |
| ConsumerConfiguration consumerConf; |
| consumerConf.setSubscriptionInitialPosition(InitialPositionEarliest); |
| Consumer consumer; |
| ASSERT_EQ(ResultOk, client_.subscribe(topic, "sub", consumerConf, consumer)); |
| |
| Message msg1; |
| ASSERT_EQ(ResultOk, consumer.receive(msg1, 3000)); |
| Message msg2; |
| ASSERT_EQ(ResultOk, consumer.receive(msg2, 3000)); |
| |
| auto getSchemaInfo = [this](const std::string& topic, int64_t version) { |
| std::promise<SchemaInfo> p; |
| client_.getSchemaInfoAsync(topic, version, [&p](Result result, const SchemaInfo& info) { |
| if (result == ResultOk) { |
| p.set_value(info); |
| } else { |
| p.set_exception(std::make_exception_ptr(std::runtime_error(strResult(result)))); |
| } |
| }); |
| return p.get_future().get(); |
| }; |
| { |
| ASSERT_EQ(msg1.getLongSchemaVersion(), 0); |
| const auto info = getSchemaInfo(topic, 0); |
| ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); |
| ASSERT_EQ(info.getSchema(), schema1); |
| } |
| { |
| ASSERT_EQ(msg2.getLongSchemaVersion(), 1); |
| const auto info = getSchemaInfo(topic, 1); |
| ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); |
| ASSERT_EQ(info.getSchema(), schema2); |
| } |
| { |
| const auto info = getSchemaInfo(topic, -1); |
| ASSERT_EQ(info.getSchemaType(), SchemaType::AVRO); |
| ASSERT_EQ(info.getSchema(), schema2); |
| } |
| try { |
| getSchemaInfo(topic, 2); |
| FAIL(); |
| } catch (const std::runtime_error& e) { |
| ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound)); |
| } |
| try { |
| getSchemaInfo(topic + "-not-exist", 0); |
| FAIL(); |
| } catch (const std::runtime_error& e) { |
| ASSERT_EQ(std::string{e.what()}, strResult(ResultTopicNotFound)); |
| } |
| |
| consumer.close(); |
| producer1.close(); |
| producer2.close(); |
| } |
| |
| INSTANTIATE_TEST_SUITE_P(Pulsar, LookupServiceTest, ::testing::Values(binaryLookupUrl, httpLookupUrl)); |
| |
| class BinaryProtoLookupServiceRedirectTestHelper : public BinaryProtoLookupService { |
| public: |
| BinaryProtoLookupServiceRedirectTestHelper(ServiceNameResolver& serviceNameResolver, ConnectionPool& pool, |
| const ClientConfiguration& clientConfiguration) |
| : BinaryProtoLookupService(serviceNameResolver, pool, clientConfiguration) {} |
| |
| LookupResultFuture findBroker(const std::string& address, bool authoritative, const std::string& topic, |
| size_t redirectCount) { |
| return BinaryProtoLookupService::findBroker(address, authoritative, topic, redirectCount); |
| } |
| }; // class BinaryProtoLookupServiceRedirectTestHelper |
| |
| TEST(LookupServiceTest, testRedirectionLimit) { |
| const auto redirect_limit = 5; |
| AuthenticationPtr authData = AuthFactory::Disabled(); |
| ClientConfiguration conf; |
| conf.setMaxLookupRedirects(redirect_limit); |
| ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1)); |
| ConnectionPool pool_(conf, ioExecutorProvider_, authData, true, ""); |
| std::string url = "pulsar://localhost:6650"; |
| ServiceNameResolver serviceNameResolver(url); |
| BinaryProtoLookupServiceRedirectTestHelper lookupService(serviceNameResolver, pool_, conf); |
| |
| const auto topicNamePtr = TopicName::get("topic"); |
| for (auto idx = 0; idx < redirect_limit + 5; ++idx) { |
| auto future = |
| lookupService.findBroker(serviceNameResolver.resolveHost(), false, topicNamePtr->toString(), idx); |
| LookupService::LookupResult lookupResult; |
| auto result = future.get(lookupResult); |
| |
| if (idx <= redirect_limit) { |
| ASSERT_EQ(ResultOk, result); |
| ASSERT_EQ(url, lookupResult.logicalAddress); |
| ASSERT_EQ(url, lookupResult.physicalAddress); |
| } else { |
| ASSERT_EQ(ResultTooManyLookupRequestException, result); |
| } |
| } |
| } |