blob: 340f67c050e53b74eb224122b5a2072e999020ea [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <lib/HTTPLookupService.h>
#include <curl/curl.h>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
namespace ptree = boost::property_tree;
DECLARE_LOG_OBJECT()
namespace pulsar {
const static std::string V1_PATH = "/lookup/v2/destination/";
const static std::string V2_PATH = "/lookup/v2/topic/";
const static std::string ADMIN_PATH_V1 = "/admin/";
const static std::string ADMIN_PATH_V2 = "/admin/v2/";
const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;
static inline bool needRedirection(long code) { return (code == 307 || code == 302 || code == 301); }
HTTPLookupService::CurlInitializer::CurlInitializer() {
// Once per application - https://curl.haxx.se/mail/lib-2015-11/0052.html
curl_global_init(CURL_GLOBAL_ALL);
}
HTTPLookupService::CurlInitializer::~CurlInitializer() { curl_global_cleanup(); }
HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;
HTTPLookupService::HTTPLookupService(const std::string &lookupUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
isUseTls_(clientConfiguration.isUseTls()),
tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
tlsValidateHostname_(clientConfiguration.isValidateHostName()) {
if (lookupUrl[lookupUrl.length() - 1] == '/') {
// Remove trailing '/'
adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
} else {
adminUrl_ = lookupUrl;
}
}
Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::string &topic) {
LookupPromise promise;
std::shared_ptr<TopicName> topicName = TopicName::get(topic);
if (!topicName) {
LOG_ERROR("Unable to parse topic - " << topic);
promise.setFailed(ResultInvalidTopicName);
return promise.getFuture();
}
std::stringstream completeUrlStream;
if (topicName->isV2Topic()) {
completeUrlStream << adminUrl_ << V2_PATH << topicName->getDomain() << "/" << topicName->getProperty()
<< '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName();
} else {
completeUrlStream << adminUrl_ << V1_PATH << topicName->getDomain() << "/" << topicName->getProperty()
<< '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName();
}
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
shared_from_this(), promise, completeUrlStream.str(),
Lookup));
return promise.getFuture();
}
Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync(
const TopicNamePtr &topicName) {
LookupPromise promise;
std::stringstream completeUrlStream;
if (topicName->isV2Topic()) {
completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << topicName->getDomain() << '/'
<< topicName->getProperty() << '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME;
} else {
completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << topicName->getDomain() << '/'
<< topicName->getProperty() << '/' << topicName->getCluster() << '/'
<< topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName()
<< '/' << PARTITION_METHOD_NAME;
}
completeUrlStream << "?checkAllowAutoCreation=true";
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
shared_from_this(), promise, completeUrlStream.str(),
PartitionMetaData));
return promise.getFuture();
}
Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr &nsName) {
NamespaceTopicsPromise promise;
std::stringstream completeUrlStream;
if (nsName->isV2()) {
completeUrlStream << adminUrl_ << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/'
<< "topics";
} else {
completeUrlStream << adminUrl_ << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/'
<< "destinations";
}
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
shared_from_this(), promise, completeUrlStream.str()));
return promise.getFuture();
}
static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
((std::string *)responseDataPtr)->append((char *)contents, size * nmemb);
return size * nmemb;
}
void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise,
const std::string completeUrl) {
std::string responseData;
Result result = sendHTTPRequest(completeUrl, responseData);
if (result != ResultOk) {
promise.setFailed(result);
} else {
promise.setValue(parseNamespaceTopicsData(responseData));
}
}
Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData) {
uint16_t reqCount = 0;
Result retResult = ResultOk;
while (++reqCount <= MAX_HTTP_REDIRECTS) {
CURL *handle;
CURLcode res;
std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_INTERNAL_;
handle = curl_easy_init();
if (!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
// No curl_easy_cleanup required since handle not initialized
return ResultLookupError;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());
// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);
// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);
// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);
// Set User Agent
curl_easy_setopt(handle, CURLOPT_USERAGENT, version.c_str());
// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
curl_easy_cleanup(handle);
return authResult;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
// TLS
if (isUseTls_) {
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE, NULL) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
}
if (curl_easy_setopt(handle, CURLOPT_SSLENGINE_DEFAULT, 1L) != CURLE_OK) {
LOG_ERROR("Unable to load SSL engine as default, for url " << completeUrl);
curl_easy_cleanup(handle);
return ResultConnectError;
}
curl_easy_setopt(handle, CURLOPT_SSLCERTTYPE, "PEM");
if (tlsAllowInsecure_) {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 0L);
} else {
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYPEER, 1L);
}
if (!tlsTrustCertsFilePath_.empty()) {
curl_easy_setopt(handle, CURLOPT_CAINFO, tlsTrustCertsFilePath_.c_str());
}
curl_easy_setopt(handle, CURLOPT_SSL_VERIFYHOST, tlsValidateHostname_ ? 1L : 0L);
if (authDataContent->hasDataForTls()) {
curl_easy_setopt(handle, CURLOPT_SSLCERT, authDataContent->getTlsCertificates().c_str());
curl_easy_setopt(handle, CURLOPT_SSLKEY, authDataContent->getTlsPrivateKey().c_str());
}
}
LOG_INFO("Curl [" << reqCount << "] Lookup Request sent for " << completeUrl);
// Make get call to server
res = curl_easy_perform(handle);
long response_code = -1;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " response_code " << response_code
<< " curl res " << res);
// Free header list
curl_slist_free_all(list);
switch (res) {
case CURLE_OK:
long response_code;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " code " << response_code);
if (response_code == 200) {
retResult = ResultOk;
} else if (needRedirection(response_code)) {
char *url = NULL;
curl_easy_getinfo(handle, CURLINFO_REDIRECT_URL, &url);
LOG_INFO("Response from url " << completeUrl << " to new url " << url);
completeUrl = url;
retResult = ResultLookupError;
} else {
retResult = ResultLookupError;
}
break;
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultConnectError;
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultReadError;
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultTimeout;
break;
default:
LOG_ERROR("Response failed for url " << completeUrl << ". Error Code " << res);
retResult = ResultLookupError;
break;
}
curl_easy_cleanup(handle);
if (!needRedirection(response_code)) {
break;
}
}
return retResult;
}
LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) {
ptree::ptree root;
std::stringstream stream;
stream << json;
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = std::make_shared<LookupDataResult>();
lookupDataResultPtr->setPartitions(root.get<int>("partitions", 0));
LOG_INFO("parsePartitionData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) {
ptree::ptree root;
std::stringstream stream;
stream << json;
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json : " << e.what() << "\nInput Json = " << json);
return LookupDataResultPtr();
}
const std::string defaultNotFoundString = "Url Not found";
const std::string brokerUrl = root.get<std::string>("brokerUrl", defaultNotFoundString);
if (brokerUrl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrl not present" << json);
return LookupDataResultPtr();
}
std::string brokerUrlTls = root.get<std::string>("brokerUrlTls", defaultNotFoundString);
if (brokerUrlTls == defaultNotFoundString) {
brokerUrlTls = root.get<std::string>("brokerUrlSsl", defaultNotFoundString);
if (brokerUrlTls == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrlTls not present" << json);
return LookupDataResultPtr();
}
}
LookupDataResultPtr lookupDataResultPtr = std::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlTls(brokerUrlTls);
LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) {
LOG_DEBUG("GetNamespaceTopics json = " << json);
ptree::ptree root;
std::stringstream stream;
stream << json;
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Topics of Namespace: " << e.what() << "\nInput Json = " << json);
return NamespaceTopicsPtr();
}
// passed in json is like: ["topic1", "topic2"...]
// root will be an array of topics
std::set<std::string> topicSet;
// get all topics
for (const auto &item : root) {
// remove partition part
const std::string topicName = item.second.get_value<std::string>();
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);
// filter duped topic name
if (topicSet.find(filteredName) == topicSet.end()) {
topicSet.insert(filteredName);
}
}
NamespaceTopicsPtr topicsResultPtr =
std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
return topicsResultPtr;
}
void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std::string completeUrl,
RequestType requestType) {
std::string responseData;
Result result = sendHTTPRequest(completeUrl, responseData);
if (result != ResultOk) {
promise.setFailed(result);
} else {
promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData)
: parseLookupData(responseData));
}
}
} // namespace pulsar