blob: c8536bd3401f6d6cec82bf4940f8622ccacc8611 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <lib/HTTPLookupService.h>
DECLARE_LOG_OBJECT()
namespace pulsar {
const static std::string V2_PATH = "/lookup/v2/destination/";
const static std::string PARTITION_PATH = "/admin/persistent/";
const static int MAX_HTTP_REDIRECTS = 20;
const static std::string PARTITION_METHOD_NAME = "partitions";
const static int NUMBER_OF_LOOKUP_THREADS = 1;
HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer;
HTTPLookupService::HTTPLookupService(const std::string &lookupUrl,
const ClientConfiguration &clientConfiguration,
const AuthenticationPtr &authData)
: executorProvider_(boost::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)),
authenticationPtr_(authData),
lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) {
if (lookupUrl[lookupUrl.length() - 1] == '/') {
// Remove trailing '/'
adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1);
} else {
adminUrl_ = lookupUrl;
}
}
Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::string &destinationName) {
LookupPromise promise;
boost::shared_ptr<DestinationName> dn = DestinationName::get(destinationName);
if (!dn) {
LOG_ERROR("Unable to parse destination - " << destinationName);
promise.setFailed(ResultInvalidTopicName);
return promise.getFuture();
}
std::stringstream completeUrlStream;
completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << dn->getProperty() << '/' << dn->getCluster()
<< '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName();
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), Lookup));
return promise.getFuture();
}
Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync(const DestinationNamePtr &dn) {
LookupPromise promise;
std::stringstream completeUrlStream;
completeUrlStream << adminUrl_ << PARTITION_PATH << dn->getProperty() << '/' << dn->getCluster()
<< '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName() << '/'
<< PARTITION_METHOD_NAME;
executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData));
return promise.getFuture();
}
static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) {
((std::string*)responseDataPtr)->append((char*)contents, size * nmemb);
return size * nmemb;
}
void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl,
RequestType requestType) {
CURL *handle;
CURLcode res;
std::string responseData;
std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_;
handle = curl_easy_init();
if(!handle) {
LOG_ERROR("Unable to curl_easy_init for url " << completeUrl);
promise.setFailed(ResultLookupError);
// No curl_easy_cleanup required since handle not initialized
return;
}
// set URL
curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str());
// Write callback
curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback);
curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData);
// New connection is made for each call
curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L);
curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L);
// Skipping signal handling - results in timeouts not honored during the DNS lookup
curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L);
// Timer
curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_);
// Set User Agent
curl_easy_setopt(handle, CURLOPT_USERAGENT, version.c_str());
// Redirects
curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_HTTP_REDIRECTS);
// Fail if HTTP return code >=400
curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L);
// Authorization data
AuthenticationDataPtr authDataContent;
Result authResult = authenticationPtr_->getAuthData(authDataContent);
if (authResult != ResultOk) {
LOG_ERROR("All Authentication methods should have AuthenticationData and return true on getAuthData for url " << completeUrl);
promise.setFailed(authResult);
curl_easy_cleanup(handle);
return;
}
struct curl_slist *list = NULL;
if (authDataContent->hasDataForHttp()) {
list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str());
}
curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list);
LOG_INFO("Curl Lookup Request sent for" << completeUrl);
// Make get call to server
res = curl_easy_perform(handle);
// Free header list
curl_slist_free_all(list);
switch(res) {
case CURLE_OK:
long response_code;
curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code);
LOG_INFO("Response received for url " << completeUrl << " code " << response_code);
if (response_code == 200) {
promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData));
} else {
promise.setFailed(ResultLookupError);
}
break;
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_PROXY:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP_RETURNED_ERROR:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultConnectError);
break;
case CURLE_READ_ERROR:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultReadError);
break;
case CURLE_OPERATION_TIMEDOUT:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultTimeout);
break;
default:
LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res);
promise.setFailed(ResultLookupError);
break;
}
curl_easy_cleanup(handle);
}
LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) {
Json::Value root;
Json::Reader reader;
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json of Partition Metadata: " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setPartitions(root.get("partitions", 0).asInt());
LOG_INFO("parsePartitionData = "<<*lookupDataResultPtr);
return lookupDataResultPtr;
}
LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) {
Json::Value root;
Json::Reader reader;
if (!reader.parse(json, root, false)) {
LOG_ERROR("Failed to parse json : " << reader.getFormatedErrorMessages()
<< "\nInput Json = " << json);
return LookupDataResultPtr();
}
const std::string defaultNotFoundString = "Url Not found";
const std::string brokerUrl = root.get("brokerUrl", defaultNotFoundString).asString();
if (brokerUrl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrl not present" << json);
return LookupDataResultPtr();
}
const std::string brokerUrlSsl = root.get("brokerUrlSsl", defaultNotFoundString).asString();
if (brokerUrlSsl == defaultNotFoundString) {
LOG_ERROR("malformed json! - brokerUrlSsl not present" << json);
return LookupDataResultPtr();
}
LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>();
lookupDataResultPtr->setBrokerUrl(brokerUrl);
lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl);
LOG_INFO("parseLookupData = "<<*lookupDataResultPtr);
return lookupDataResultPtr;
}
}