| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| #include <lib/HTTPLookupService.h> |
| |
| DECLARE_LOG_OBJECT() |
| |
| namespace pulsar { |
| const static std::string V2_PATH = "/lookup/v2/destination/"; |
| const static std::string PARTITION_PATH = "/admin/persistent/"; |
| const static int MAX_HTTP_REDIRECTS = 20; |
| const static std::string PARTITION_METHOD_NAME = "partitions"; |
| const static int NUMBER_OF_LOOKUP_THREADS = 1; |
| |
| HTTPLookupService::CurlInitializer HTTPLookupService::curlInitializer; |
| |
| HTTPLookupService::HTTPLookupService(const std::string &lookupUrl, |
| const ClientConfiguration &clientConfiguration, |
| const AuthenticationPtr &authData) |
| : executorProvider_(boost::make_shared<ExecutorServiceProvider>(NUMBER_OF_LOOKUP_THREADS)), |
| authenticationPtr_(authData), |
| lookupTimeoutInSeconds_(clientConfiguration.getOperationTimeoutSeconds()) { |
| if (lookupUrl[lookupUrl.length() - 1] == '/') { |
| // Remove trailing '/' |
| adminUrl_ = lookupUrl.substr(0, lookupUrl.length() - 1); |
| } else { |
| adminUrl_ = lookupUrl; |
| } |
| } |
| |
| Future<Result, LookupDataResultPtr> HTTPLookupService::lookupAsync(const std::string &destinationName) { |
| LookupPromise promise; |
| boost::shared_ptr<DestinationName> dn = DestinationName::get(destinationName); |
| if (!dn) { |
| LOG_ERROR("Unable to parse destination - " << destinationName); |
| promise.setFailed(ResultInvalidTopicName); |
| return promise.getFuture(); |
| } |
| |
| std::stringstream completeUrlStream; |
| completeUrlStream << adminUrl_ << V2_PATH << "persistent/" << dn->getProperty() << '/' << dn->getCluster() |
| << '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName(); |
| executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), Lookup)); |
| return promise.getFuture(); |
| } |
| |
| Future<Result, LookupDataResultPtr> HTTPLookupService::getPartitionMetadataAsync(const DestinationNamePtr &dn) { |
| LookupPromise promise; |
| std::stringstream completeUrlStream; |
| completeUrlStream << adminUrl_ << PARTITION_PATH << dn->getProperty() << '/' << dn->getCluster() |
| << '/' << dn->getNamespacePortion() << '/' << dn->getEncodedLocalName() << '/' |
| << PARTITION_METHOD_NAME; |
| executorProvider_->get()->postWork(boost::bind(&HTTPLookupService::sendHTTPRequest, shared_from_this(), promise, completeUrlStream.str(), PartitionMetaData)); |
| return promise.getFuture(); |
| } |
| |
| static size_t curlWriteCallback(void *contents, size_t size, size_t nmemb, void *responseDataPtr) { |
| ((std::string*)responseDataPtr)->append((char*)contents, size * nmemb); |
| return size * nmemb; |
| } |
| |
| void HTTPLookupService::sendHTTPRequest(LookupPromise promise, const std::string completeUrl, |
| RequestType requestType) { |
| CURL *handle; |
| CURLcode res; |
| std::string responseData; |
| std::string version = std::string("Pulsar-CPP-v") + _PULSAR_VERSION_; |
| handle = curl_easy_init(); |
| |
| if(!handle) { |
| LOG_ERROR("Unable to curl_easy_init for url " << completeUrl); |
| promise.setFailed(ResultLookupError); |
| // No curl_easy_cleanup required since handle not initialized |
| return; |
| } |
| // set URL |
| curl_easy_setopt(handle, CURLOPT_URL, completeUrl.c_str()); |
| |
| // Write callback |
| curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, curlWriteCallback); |
| curl_easy_setopt(handle, CURLOPT_WRITEDATA, &responseData); |
| |
| // New connection is made for each call |
| curl_easy_setopt(handle, CURLOPT_FRESH_CONNECT, 1L); |
| curl_easy_setopt(handle, CURLOPT_FORBID_REUSE, 1L); |
| |
| // Skipping signal handling - results in timeouts not honored during the DNS lookup |
| curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L); |
| |
| // Timer |
| curl_easy_setopt(handle, CURLOPT_TIMEOUT, lookupTimeoutInSeconds_); |
| |
| // Set User Agent |
| curl_easy_setopt(handle, CURLOPT_USERAGENT, version.c_str()); |
| |
| // Redirects |
| curl_easy_setopt(handle, CURLOPT_FOLLOWLOCATION, 1L); |
| curl_easy_setopt(handle, CURLOPT_MAXREDIRS, MAX_HTTP_REDIRECTS); |
| |
| // Fail if HTTP return code >=400 |
| curl_easy_setopt(handle, CURLOPT_FAILONERROR, 1L); |
| |
| // Authorization data |
| AuthenticationDataPtr authDataContent; |
| Result authResult = authenticationPtr_->getAuthData(authDataContent); |
| if (authResult != ResultOk) { |
| LOG_ERROR("All Authentication methods should have AuthenticationData and return true on getAuthData for url " << completeUrl); |
| promise.setFailed(authResult); |
| curl_easy_cleanup(handle); |
| return; |
| } |
| struct curl_slist *list = NULL; |
| if (authDataContent->hasDataForHttp()) { |
| list = curl_slist_append(list, authDataContent->getHttpHeaders().c_str()); |
| } |
| curl_easy_setopt(handle, CURLOPT_HTTPHEADER, list); |
| |
| LOG_INFO("Curl Lookup Request sent for" << completeUrl); |
| |
| // Make get call to server |
| res = curl_easy_perform(handle); |
| |
| // Free header list |
| curl_slist_free_all(list); |
| |
| switch(res) { |
| case CURLE_OK: |
| long response_code; |
| curl_easy_getinfo(handle, CURLINFO_RESPONSE_CODE, &response_code); |
| LOG_INFO("Response received for url " << completeUrl << " code " << response_code); |
| if (response_code == 200) { |
| promise.setValue((requestType == PartitionMetaData) ? parsePartitionData(responseData) : parseLookupData(responseData)); |
| } else { |
| promise.setFailed(ResultLookupError); |
| } |
| break; |
| case CURLE_COULDNT_CONNECT: |
| case CURLE_COULDNT_RESOLVE_PROXY: |
| case CURLE_COULDNT_RESOLVE_HOST: |
| case CURLE_HTTP_RETURNED_ERROR: |
| LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res); |
| promise.setFailed(ResultConnectError); |
| break; |
| case CURLE_READ_ERROR: |
| LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res); |
| promise.setFailed(ResultReadError); |
| break; |
| case CURLE_OPERATION_TIMEDOUT: |
| LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res); |
| promise.setFailed(ResultTimeout); |
| break; |
| default: |
| LOG_ERROR("Response failed for url "<<completeUrl << ". Error Code "<<res); |
| promise.setFailed(ResultLookupError); |
| break; |
| } |
| curl_easy_cleanup(handle); |
| } |
| |
| LookupDataResultPtr HTTPLookupService::parsePartitionData(const std::string &json) { |
| Json::Value root; |
| Json::Reader reader; |
| if (!reader.parse(json, root, false)) { |
| LOG_ERROR("Failed to parse json of Partition Metadata: " << reader.getFormatedErrorMessages() |
| << "\nInput Json = " << json); |
| return LookupDataResultPtr(); |
| } |
| LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>(); |
| lookupDataResultPtr->setPartitions(root.get("partitions", 0).asInt()); |
| LOG_INFO("parsePartitionData = "<<*lookupDataResultPtr); |
| return lookupDataResultPtr; |
| } |
| |
| LookupDataResultPtr HTTPLookupService::parseLookupData(const std::string &json) { |
| Json::Value root; |
| Json::Reader reader; |
| if (!reader.parse(json, root, false)) { |
| LOG_ERROR("Failed to parse json : " << reader.getFormatedErrorMessages() |
| << "\nInput Json = " << json); |
| return LookupDataResultPtr(); |
| } |
| const std::string defaultNotFoundString = "Url Not found"; |
| const std::string brokerUrl = root.get("brokerUrl", defaultNotFoundString).asString(); |
| if (brokerUrl == defaultNotFoundString) { |
| LOG_ERROR("malformed json! - brokerUrl not present" << json); |
| return LookupDataResultPtr(); |
| } |
| |
| const std::string brokerUrlSsl = root.get("brokerUrlSsl", defaultNotFoundString).asString(); |
| if (brokerUrlSsl == defaultNotFoundString) { |
| LOG_ERROR("malformed json! - brokerUrlSsl not present" << json); |
| return LookupDataResultPtr(); |
| } |
| |
| LookupDataResultPtr lookupDataResultPtr = boost::make_shared<LookupDataResult>(); |
| lookupDataResultPtr->setBrokerUrl(brokerUrl); |
| lookupDataResultPtr->setBrokerUrlSsl(brokerUrlSsl); |
| LOG_INFO("parseLookupData = "<<*lookupDataResultPtr); |
| return lookupDataResultPtr; |
| } |
| } |