blob: 93b9db44466619a177798a48adeebcf60e6c930b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "HTTPLookupService.h"
#include <pulsar/Version.h>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include "CurlWrapper.h"
#include "ExecutorService.h"
#include "Int64SerDes.h"
#include "LogUtils.h"
#include "NamespaceName.h"
#include "SchemaUtils.h"
#include "ServiceNameResolver.h"
#include "TopicName.h"
namespace ptree = boost::property_tree;
DECLARE_LOG_OBJECT()
namespace pulsar {
const static std::string V1_PATH = "/lookup/v2/destination/";
const static std::string V2_PATH = "/lookup/v2/topic/";
const static std::string ADMIN_PATH_V1 = "/admin/";
const static std::string ADMIN_PATH_V2 = "/admin/v2/";
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;
HTTPLookupService::HTTPLookupService(const std::string &serviceUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(std::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
serviceNameResolver_(serviceUrl),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()),
maxLookupRedirects_(clientConfiguration.getMaxLookupRedirects()),
tlsPrivateFilePath_(clientConfiguration.getTlsPrivateKeyFilePath()),
tlsCertificateFilePath_(clientConfiguration.getTlsCertificateFilePath()),
tlsTrustCertsFilePath_(clientConfiguration.getTlsTrustCertsFilePath()),
isUseTls_(clientConfiguration.isUseTls()),
tlsAllowInsecure_(clientConfiguration.isTlsAllowInsecureConnection()),
tlsValidateHostname_(clientConfiguration.isValidateHostName()) {}
auto HTTPLookupService::getBroker(const TopicName &topicName) -> LookupResultFuture {
LookupResultPromise promise;
const auto &url = serviceNameResolver_.resolveHost();
std::stringstream completeUrlStream;
if (topicName.isV2Topic()) {
completeUrlStream << url << V2_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/'
<< topicName.getNamespacePortion() << '/' << topicName.getEncodedLocalName();
} else {
completeUrlStream << url << V1_PATH << topicName.getDomain() << "/" << topicName.getProperty() << '/'
<< topicName.getCluster() << '/' << topicName.getNamespacePortion() << '/'
<< topicName.getEncodedLocalName();
}
const auto completeUrl = completeUrlStream.str();
auto self = shared_from_this();
executorProvider_->get()->postWork([this, self, promise, completeUrl] {
std::string responseData;
Result result = sendHTTPRequest(completeUrl, responseData);
if (result != ResultOk) {
promise.setFailed(result);
} else {
const auto lookupDataResultPtr = parseLookupData(responseData);
const auto brokerAddress = (serviceNameResolver_.useTls() ? lookupDataResultPtr->getBrokerUrlTls()
: lookupDataResultPtr->getBrokerUrl());
promise.setValue({brokerAddress, brokerAddress});
}
});
return promise.getFuture();
}
Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync(
const TopicNamePtr &topicName) {
LookupPromise promise;
std::stringstream completeUrlStream;
const auto &url = serviceNameResolver_.resolveHost();
if (topicName->isV2Topic()) {
completeUrlStream << url << ADMIN_PATH_V2 << topicName->getDomain() << '/' << topicName->getProperty()
<< '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME;
} else {
completeUrlStream << url << ADMIN_PATH_V1 << topicName->getDomain() << '/' << topicName->getProperty()
<< '/' << topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName() << '/' << PARTITION_METHOD_NAME;
}
completeUrlStream << "?checkAllowAutoCreation=true";
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleLookupHTTPRequest,
shared_from_this(), promise, completeUrlStream.str(),
PartitionMetaData));
return promise.getFuture();
}
Future<Result, NamespaceTopicsPtr> HTTPLookupService::getTopicsOfNamespaceAsync(
const NamespaceNamePtr &nsName, CommandGetTopicsOfNamespace_Mode mode) {
NamespaceTopicsPromise promise;
std::stringstream completeUrlStream;
auto convertRegexSubMode = [](CommandGetTopicsOfNamespace_Mode mode) {
switch (mode) {
case CommandGetTopicsOfNamespace_Mode_PERSISTENT:
return "PERSISTENT";
case CommandGetTopicsOfNamespace_Mode_NON_PERSISTENT:
return "NON_PERSISTENT";
case CommandGetTopicsOfNamespace_Mode_ALL:
return "ALL";
default:
return "PERSISTENT";
}
};
const auto &url = serviceNameResolver_.resolveHost();
if (nsName->isV2()) {
completeUrlStream << url << ADMIN_PATH_V2 << "namespaces" << '/' << nsName->toString() << '/'
<< "topics?mode=" << convertRegexSubMode(mode);
} else {
completeUrlStream << url << ADMIN_PATH_V1 << "namespaces" << '/' << nsName->toString() << '/'
<< "destinations?mode=" << convertRegexSubMode(mode);
}
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleNamespaceTopicsHTTPRequest,
shared_from_this(), promise, completeUrlStream.str()));
return promise.getFuture();
}
Future<Result, SchemaInfo> HTTPLookupService::getSchema(const TopicNamePtr &topicName,
const std::string &version) {
Promise<Result, SchemaInfo> promise;
std::stringstream completeUrlStream;
const auto &url = serviceNameResolver_.resolveHost();
if (topicName->isV2Topic()) {
completeUrlStream << url << ADMIN_PATH_V2 << "schemas/" << topicName->getProperty() << '/'
<< topicName->getNamespacePortion() << '/' << topicName->getEncodedLocalName()
<< "/schema";
} else {
completeUrlStream << url << ADMIN_PATH_V1 << "schemas/" << topicName->getProperty() << '/'
<< topicName->getCluster() << '/' << topicName->getNamespacePortion() << '/'
<< topicName->getEncodedLocalName() << "/schema";
}
if (!version.empty()) {
completeUrlStream << "/" << fromBigEndianBytes(version);
}
executorProvider_->get()->postWork(std::bind(&HTTPLookupService::handleGetSchemaHTTPRequest,
shared_from_this(), promise, completeUrlStream.str()));
return promise.getFuture();
}
void HTTPLookupService::handleNamespaceTopicsHTTPRequest(NamespaceTopicsPromise promise,
const std::string completeUrl) {
std::string responseData;
Result result = sendHTTPRequest(completeUrl, responseData);
if (result != ResultOk) {
promise.setFailed(result);
} else {
promise.setValue(parseNamespaceTopicsData(responseData));
}
}
Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData) {
long responseCode = -1;
return sendHTTPRequest(completeUrl, responseData, responseCode);
}
Result HTTPLookupService::sendHTTPRequest(std::string completeUrl, std::string &responseData,
long &responseCode) {
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("Failed to getAuthData: " << authResult);
return authResult;
}
CurlWrapper curl;
if (!curl.init()) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
return ResultLookupError;
}
std::unique_ptr<CurlWrapper::TlsContext> tlsContext;
if (isUseTls_) {
tlsContext.reset(new CurlWrapper::TlsContext);
tlsContext->trustCertsFilePath = tlsTrustCertsFilePath_;
tlsContext->validateHostname = tlsValidateHostname_;
tlsContext->allowInsecure = tlsAllowInsecure_;
if (authDataContent->hasDataForTls()) {
tlsContext->certPath = authDataContent->getTlsCertificates();
tlsContext->keyPath = authDataContent->getTlsPrivateKey();
} else {
tlsContext->certPath = tlsCertificateFilePath_;
tlsContext->keyPath = tlsPrivateFilePath_;
}
}
LOG_INFO("Curl Lookup Request sent for " << completeUrl);
CurlWrapper::Options options;
options.timeoutInSeconds = lookupTimeoutInSeconds_;
options.userAgent = std::string("Pulsar-CPP-v") + PULSAR_VERSION_STR;
options.maxLookupRedirects = maxLookupRedirects_;
auto result = curl.get(completeUrl, authDataContent->getHttpHeaders(), options, tlsContext.get());
const auto &error = result.error;
if (!error.empty()) {
LOG_ERROR(completeUrl << " failed: " << error);
return ResultConnectError;
}
responseData = result.responseData;
responseCode = result.responseCode;
auto res = result.code;
if (res == CURLE_OK) {
LOG_INFO("Response received for url " << completeUrl << " responseCode " << responseCode);
} else if (res == CURLE_TOO_MANY_REDIRECTS) {
LOG_ERROR("Response received for url " << completeUrl << ": " << curl_easy_strerror(res)
<< ", curl error: " << result.serverError
<< ", redirect URL: " << result.redirectUrl);
} else {
LOG_ERROR("Response failed for url " << completeUrl << ": " << curl_easy_strerror(res)
<< ", curl error: " << result.serverError);
}
switch (res) {
case CURLE_OK:
return ResultOk;
case CURLE_COULDNT_CONNECT:
return ResultRetryable;
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
return ResultConnectError;
case CURLE_READ_ERROR:
return ResultReadError;
case CURLE_OPERATION_TIMEDOUT:
return ResultTimeout;
default:
return ResultLookupError;
}
}
LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) {
ptree::ptree root;
std::stringstream stream;
stream << json;
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what() << "\nInput Json = " << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = std::make_shared<LookupDataResult>();
lookupDataResultPtr->setPartitions(root.get<int>("partitions", 0));
LOG_INFO("parsePartitionData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) {
ptree::ptree root;
std::stringstream stream;
stream << json;
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json : " << e.what() << "\nInput Json = " << json);
return LookupDataResultPtr();
}
const std::string defaultNotFoundString = "Url Not found";
const std::string brokerUrl = root.get<std::string>("brokerUrl", defaultNotFoundString);
if (brokerUrl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrl not present" << json);
return LookupDataResultPtr();
}
std::string brokerUrlTls = root.get<std::string>("brokerUrlTls", defaultNotFoundString);
if (brokerUrlTls == defaultNotFoundString) {
brokerUrlTls = root.get<std::string>("brokerUrlSsl", defaultNotFoundString);
if (brokerUrlTls == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrlTls not present" << json);
return LookupDataResultPtr();
}
}
LookupDataResultPtr lookupDataResultPtr = std::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlTls(brokerUrlTls);
LOG_INFO("parseLookupData = " << *lookupDataResultPtr);
return lookupDataResultPtr;
}
NamespaceTopicsPtr HTTPLookupService::parseNamespaceTopicsData(const std::string &json) {
LOG_DEBUG("GetNamespaceTopics json = " << json);
ptree::ptree root;
std::stringstream stream;
stream << json;
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Topics of Namespace: " << e.what() << "\nInput Json = " << json);
return NamespaceTopicsPtr();
}
// passed in json is like: ["topic1", "topic2"...]
// root will be an array of topics
std::set<std::string> topicSet;
// get all topics
for (const auto &item : root) {
// remove partition part
const std::string topicName = item.second.get_value<std::string>();
int pos = topicName.find("-partition-");
std::string filteredName = topicName.substr(0, pos);
// filter duped topic name
if (topicSet.find(filteredName) == topicSet.end()) {
topicSet.insert(filteredName);
}
}
NamespaceTopicsPtr topicsResultPtr =
std::make_shared<std::vector<std::string>>(topicSet.begin(), topicSet.end());
return topicsResultPtr;
}
void HTTPLookupService::handleLookupHTTPRequest(LookupPromise promise, const std::string completeUrl,
RequestType requestType) {
std::string responseData;
Result result = sendHTTPRequest(completeUrl, responseData);
if (result != ResultOk) {
promise.setFailed(result);
} else {
promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData)
: parseLookupData(responseData));
}
}
void HTTPLookupService::handleGetSchemaHTTPRequest(GetSchemaPromise promise, const std::string completeUrl) {
std::string responseData;
long responseCode = -1;
Result result = sendHTTPRequest(completeUrl, responseData, responseCode);
if (responseCode == 404) {
promise.setFailed(ResultTopicNotFound);
} else if (result != ResultOk) {
promise.setFailed(result);
} else {
ptree::ptree root;
std::stringstream stream(responseData);
try {
ptree::read_json(stream, root);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what()
<< "\nInput Json = " << responseData);
promise.setFailed(ResultInvalidMessage);
return;
}
const std::string defaultNotFoundString = "Not found";
auto schemaTypeStr = root.get<std::string>("type", defaultNotFoundString);
if (schemaTypeStr == defaultNotFoundString) {
LOG_ERROR("malformed json! - type not present" << responseData);
promise.setFailed(ResultInvalidMessage);
return;
}
auto schemaData = root.get<std::string>("data", defaultNotFoundString);
if (schemaData == defaultNotFoundString) {
LOG_ERROR("malformed json! - data not present" << responseData);
promise.setFailed(ResultInvalidMessage);
return;
}
auto schemaType = enumSchemaType(schemaTypeStr);
if (schemaType == KEY_VALUE) {
ptree::ptree kvRoot;
std::stringstream kvStream(schemaData);
try {
ptree::read_json(kvStream, kvRoot);
} catch (ptree::json_parser_error &e) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << e.what()
<< "\nInput Json = " << schemaData);
promise.setFailed(ResultInvalidMessage);
return;
}
std::stringstream keyStream;
ptree::write_json(keyStream, kvRoot.get_child("key"), false);
std::stringstream valueStream;
ptree::write_json(valueStream, kvRoot.get_child("value"), false);
auto keyData = keyStream.str();
auto valueData = valueStream.str();
// Remove the last line break.
keyData.pop_back();
valueData.pop_back();
schemaData = mergeKeyValueSchema(keyData, valueData);
}
StringMap properties;
auto propertiesTree = root.get_child("properties");
for (const auto &item : propertiesTree) {
properties[item.first] = item.second.get_value<std::string>();
}
SchemaInfo schemaInfo = SchemaInfo(schemaType, "", schemaData, properties);
promise.setValue(schemaInfo);
}
}
} // namespace pulsar