blob: 5ac41224af9671ef65b96ed6879bd2be3fb1e705 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <BinaryProtoLookupService.h>
#include <Future.h>
#include <HTTPLookupService.h>
#include <Utils.h>
#include <gtest/gtest.h>
#include <pulsar/Authentication.h>
#include <pulsar/Client.h>
#include <algorithm>
#include <boost/exception/all.hpp>
#include "HttpHelper.h"
#include "PulsarFriend.h"
#include "lib/ClientConnection.h"
#include "lib/ConnectionPool.h"
#include "lib/LogUtils.h"
#include "lib/RetryableLookupService.h"
#include "lib/TimeUtils.h"
using namespace pulsar;
DECLARE_LOG_OBJECT()
TEST(LookupServiceTest, basicLookup) {
ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1);
AuthenticationPtr authData = AuthFactory::Disabled();
std::string url = "pulsar://localhost:6650";
ClientConfiguration conf;
ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
ServiceNameResolver serviceNameResolver(url);
BinaryProtoLookupService lookupService(serviceNameResolver, pool_, "");
TopicNamePtr topicName = TopicName::get("topic");
Future<Result, LookupDataResultPtr> partitionFuture = lookupService.getPartitionMetadataAsync(topicName);
LookupDataResultPtr lookupData;
Result result = partitionFuture.get(lookupData);
ASSERT_TRUE(lookupData != NULL);
ASSERT_EQ(0, lookupData->getPartitions());
const auto topicNamePtr = TopicName::get("topic");
auto future = lookupService.getBroker(*topicNamePtr);
LookupService::LookupResult lookupResult;
result = future.get(lookupResult);
ASSERT_EQ(ResultOk, result);
ASSERT_EQ(url, lookupResult.logicalAddress);
ASSERT_EQ(url, lookupResult.physicalAddress);
}
TEST(LookupServiceTest, basicGetNamespaceTopics) {
std::string url = "pulsar://localhost:6650";
std::string adminUrl = "http://localhost:8080/";
Result result;
// 1. create some topics under same namespace
Client client(url);
std::string topicName1 = "persistent://public/default/basicGetNamespaceTopics1";
std::string topicName2 = "persistent://public/default/basicGetNamespaceTopics2";
std::string topicName3 = "persistent://public/default/basicGetNamespaceTopics3";
// This is not in same namespace.
std::string topicName4 = "persistent://public/default-2/basicGetNamespaceTopics4";
// call admin api to make topics partitioned
std::string url1 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics1/partitions";
std::string url2 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics2/partitions";
std::string url3 = adminUrl + "admin/v2/persistent/public/default/basicGetNamespaceTopics3/partitions";
int res = makePutRequest(url1, "2");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url2, "3");
ASSERT_FALSE(res != 204 && res != 409);
res = makePutRequest(url3, "4");
ASSERT_FALSE(res != 204 && res != 409);
Producer producer1;
result = client.createProducer(topicName1, producer1);
ASSERT_EQ(ResultOk, result);
Producer producer2;
result = client.createProducer(topicName2, producer2);
ASSERT_EQ(ResultOk, result);
Producer producer3;
result = client.createProducer(topicName3, producer3);
ASSERT_EQ(ResultOk, result);
Producer producer4;
result = client.createProducer(topicName4, producer4);
ASSERT_EQ(ResultOk, result);
// 2. call getTopicsOfNamespaceAsync
ExecutorServiceProviderPtr service = std::make_shared<ExecutorServiceProvider>(1);
AuthenticationPtr authData = AuthFactory::Disabled();
ClientConfiguration conf;
ExecutorServiceProviderPtr ioExecutorProvider_(std::make_shared<ExecutorServiceProvider>(1));
ConnectionPool pool_(conf, ioExecutorProvider_, authData, true);
ServiceNameResolver serviceNameResolver(url);
BinaryProtoLookupService lookupService(serviceNameResolver, pool_, "");
TopicNamePtr topicName = TopicName::get(topicName1);
NamespaceNamePtr nsName = topicName->getNamespaceName();
Future<Result, NamespaceTopicsPtr> getTopicsFuture = lookupService.getTopicsOfNamespaceAsync(nsName);
NamespaceTopicsPtr topicsData;
result = getTopicsFuture.get(topicsData);
ASSERT_EQ(ResultOk, result);
ASSERT_TRUE(topicsData != NULL);
// 3. verify result contains first 3 topic
ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName1) != topicsData->end());
ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName2) != topicsData->end());
ASSERT_TRUE(std::find(topicsData->begin(), topicsData->end(), topicName3) != topicsData->end());
ASSERT_FALSE(std::find(topicsData->begin(), topicsData->end(), topicName4) != topicsData->end());
client.shutdown();
}
static void testMultiAddresses(LookupService& lookupService) {
std::vector<Result> results;
constexpr int numRequests = 6;
auto verifySuccessCount = [&results] {
// Only half of them succeeded
ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultOk), numRequests / 2);
ASSERT_EQ(std::count(results.cbegin(), results.cend(), ResultRetryable), numRequests / 2);
};
for (int i = 0; i < numRequests; i++) {
const auto topicNamePtr = TopicName::get("topic");
LookupService::LookupResult lookupResult;
const auto result = lookupService.getBroker(*topicNamePtr).get(lookupResult);
LOG_INFO("getBroker [" << i << "] " << result << ", " << lookupResult);
results.emplace_back(result);
}
verifySuccessCount();
results.clear();
for (int i = 0; i < numRequests; i++) {
LookupDataResultPtr data;
const auto result = lookupService.getPartitionMetadataAsync(TopicName::get("topic")).get(data);
LOG_INFO("getPartitionMetadataAsync [" << i << "] " << result);
results.emplace_back(result);
}
verifySuccessCount();
results.clear();
for (int i = 0; i < numRequests; i++) {
NamespaceTopicsPtr data;
const auto result =
lookupService.getTopicsOfNamespaceAsync(TopicName::get("topic")->getNamespaceName()).get(data);
LOG_INFO("getTopicsOfNamespaceAsync [" << i << "] " << result);
results.emplace_back(result);
}
verifySuccessCount();
}
TEST(LookupServiceTest, testMultiAddresses) {
ConnectionPool pool({}, std::make_shared<ExecutorServiceProvider>(1), AuthFactory::Disabled(), true);
ServiceNameResolver serviceNameResolver("pulsar://localhost,localhost:9999");
BinaryProtoLookupService binaryLookupService(serviceNameResolver, pool, "");
testMultiAddresses(binaryLookupService);
// HTTPLookupService calls shared_from_this() internally, we must create a shared pointer to test
ServiceNameResolver serviceNameResolverForHttp("http://localhost,localhost:9999");
auto httpLookupServicePtr = std::make_shared<HTTPLookupService>(
std::ref(serviceNameResolverForHttp), ClientConfiguration{}, AuthFactory::Disabled());
testMultiAddresses(*httpLookupServicePtr);
}
TEST(LookupServiceTest, testRetry) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true);
ServiceNameResolver serviceNameResolver("pulsar://localhost:9999,localhost");
auto lookupService = RetryableLookupService::create(
std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, ""), 30 /* seconds */,
executorProvider);
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
auto topicNamePtr = TopicName::get("lookup-service-test-retry");
auto future1 = lookupService->getBroker(*topicNamePtr);
LookupService::LookupResult lookupResult;
ASSERT_EQ(ResultOk, future1.get(lookupResult));
LOG_INFO("getBroker returns logicalAddress: " << lookupResult.logicalAddress
<< ", physicalAddress: " << lookupResult.physicalAddress);
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
LookupDataResultPtr lookupDataResultPtr;
ASSERT_EQ(ResultOk, future2.get(lookupDataResultPtr));
LOG_INFO("getPartitionMetadataAsync returns " << lookupDataResultPtr->getPartitions() << " partitions");
PulsarFriend::setServiceUrlIndex(serviceNameResolver, 0);
auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
NamespaceTopicsPtr namespaceTopicsPtr;
ASSERT_EQ(ResultOk, future3.get(namespaceTopicsPtr));
LOG_INFO("getTopicPartitionName Async returns " << namespaceTopicsPtr->size() << " topics");
std::atomic_int retryCount{0};
constexpr int totalRetryCount = 3;
auto future4 = lookupService->executeAsync<int>("key", [&retryCount]() -> Future<Result, int> {
Promise<Result, int> promise;
if (++retryCount < totalRetryCount) {
LOG_INFO("Retry count: " << retryCount);
promise.setFailed(ResultRetryable);
} else {
LOG_INFO("Retry done with " << retryCount << " times");
promise.setValue(100);
}
return promise.getFuture();
});
int customResult = 0;
ASSERT_EQ(ResultOk, future4.get(customResult));
ASSERT_EQ(customResult, 100);
ASSERT_EQ(retryCount.load(), totalRetryCount);
ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
}
TEST(LookupServiceTest, testTimeout) {
auto executorProvider = std::make_shared<ExecutorServiceProvider>(1);
ConnectionPool pool({}, executorProvider, AuthFactory::Disabled(), true);
ServiceNameResolver serviceNameResolver("pulsar://localhost:9990,localhost:9902,localhost:9904");
constexpr int timeoutInSeconds = 2;
auto lookupService = RetryableLookupService::create(
std::make_shared<BinaryProtoLookupService>(serviceNameResolver, pool, ""), timeoutInSeconds,
executorProvider);
auto topicNamePtr = TopicName::get("lookup-service-test-retry");
decltype(std::chrono::high_resolution_clock::now()) startTime;
auto beforeMethod = [&startTime] { startTime = std::chrono::high_resolution_clock::now(); };
auto afterMethod = [&startTime](const std::string& name) {
auto timeInterval = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - startTime)
.count();
LOG_INFO(name << " took " << timeInterval << " seconds");
ASSERT_TRUE(timeInterval >= timeoutInSeconds * 1000L);
};
beforeMethod();
auto future1 = lookupService->getBroker(*topicNamePtr);
LookupService::LookupResult lookupResult;
ASSERT_EQ(ResultTimeout, future1.get(lookupResult));
afterMethod("getBroker");
beforeMethod();
auto future2 = lookupService->getPartitionMetadataAsync(topicNamePtr);
LookupDataResultPtr lookupDataResultPtr;
ASSERT_EQ(ResultTimeout, future2.get(lookupDataResultPtr));
afterMethod("getPartitionMetadataAsync");
beforeMethod();
auto future3 = lookupService->getTopicsOfNamespaceAsync(topicNamePtr->getNamespaceName());
NamespaceTopicsPtr namespaceTopicsPtr;
ASSERT_EQ(ResultTimeout, future3.get(namespaceTopicsPtr));
afterMethod("getTopicsOfNamespaceAsync");
ASSERT_EQ(PulsarFriend::getNumberOfPendingTasks(*lookupService), 0);
}