blob: 10b55fb509c3de4013f67ad9f750e9f6f379ba0e [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "http/BaseHTTPClient.h"
#include <algorithm>
#include <string>
#include "utils/StringUtils.h"
namespace minifi = org::apache::nifi::minifi;
namespace {
constexpr const char* HTTP = "http://";
constexpr const char* HTTPS = "https://";
std::optional<std::string> parseProtocol(const std::string& url_input) {
if (minifi::utils::string::startsWith(url_input, HTTP)) {
return HTTP;
} else if (minifi::utils::string::startsWith(url_input, HTTPS)) {
return HTTPS;
} else {
return std::nullopt;
}
}
std::optional<int> parsePortNumber(const std::string& port_string) {
try {
size_t pos = 0;
int port = std::stoi(port_string, &pos);
if (pos == port_string.size()) {
return port;
}
} catch (const std::invalid_argument&) {
} catch (const std::out_of_range&) {
}
return std::nullopt;
}
} // namespace
namespace org::apache::nifi::minifi::http {
std::string get_token(BaseHTTPClient* client, const std::string& username, const std::string& password) {
if (nullptr == client) {
return "";
}
std::string token;
client->setContentType("application/x-www-form-urlencoded");
client->set_request_method(HttpRequestMethod::POST);
std::string payload = "username=" + username + "&" + "password=" + password;
client->setPostFields(client->escape(payload));
client->submit();
if (client->submit() && client->getResponseCode() == 200) {
const std::string &response_body = std::string(client->getResponseBody().data(), client->getResponseBody().size());
if (!response_body.empty()) {
token = "Bearer " + response_body;
}
}
return token;
}
URL::URL(const std::string& url_input) {
const auto protocol = parseProtocol(url_input);
if (protocol) {
protocol_ = *protocol;
} else {
logger_->log_error("Unknown protocol in URL '{}'", url_input);
return;
}
std::string::const_iterator current_pos = url_input.begin();
std::advance(current_pos, protocol_.size());
static constexpr std::string_view HOST_TERMINATORS = ":/?#";
std::string::const_iterator end_of_host = std::find_first_of(current_pos, url_input.end(), std::begin(HOST_TERMINATORS), std::end(HOST_TERMINATORS));
host_ = std::string{current_pos, end_of_host};
if (host_.empty()) {
logger_->log_error("No host found in URL '{}'", url_input);
return;
}
current_pos = end_of_host;
if (current_pos != url_input.end() && *current_pos == ':') {
static constexpr std::string_view PORT_TERMINATORS = "/?#";
++current_pos;
std::string::const_iterator end_of_port = std::find_first_of(current_pos, url_input.end(), std::begin(PORT_TERMINATORS), std::end(PORT_TERMINATORS));
const auto port_number = parsePortNumber(std::string{current_pos, end_of_port});
if (port_number) {
port_ = *port_number;
} else {
logger_->log_error("Could not parse the port number in URL '{}'", url_input);
return;
}
current_pos = end_of_port;
}
if (current_pos != url_input.end()) {
path_ = std::string{current_pos, url_input.end()};
}
is_valid_ = true;
}
int URL::port() const {
if (port_) {
return *port_;
} else if (protocol_ == HTTP) {
return 80;
} else if (protocol_ == HTTPS) {
return 443;
} else {
throw std::logic_error{"Undefined port in URL: " + toString()};
}
}
std::string URL::hostPort() const {
if (!isValid()) {
return "INVALID";
}
if (port_) {
return protocol_ + host_ + ':' + std::to_string(*port_);
} else {
return protocol_ + host_;
}
}
std::string URL::toString() const {
if (!isValid()) {
return "INVALID";
}
if (path_) {
return hostPort() + *path_;
} else {
return hostPort();
}
}
/**
* Receive HTTP Response.
*/
size_t HTTPRequestResponse::receiveWrite(char *data, size_t size, size_t nmemb, void *p) {
try {
if (p == nullptr) {
return CALLBACK_ABORT;
}
auto *callback = static_cast<HTTPReadCallback *>(p);
if (callback->stop) {
return CALLBACK_ABORT;
}
callback->write(data, (size * nmemb));
return (size * nmemb);
} catch (...) {
return CALLBACK_ABORT;
}
}
/**
* Callback for post, put, and patch operations
* @param data output buffer to write to
* @param size number of elements to write
* @param nmemb size of each element to write
* @param p input object to read from
*/
size_t HTTPRequestResponse::send_write(char *data, size_t size, size_t nmemb, void *p) {
try {
if (p == nullptr) {
return CALLBACK_ABORT;
}
auto *callback = reinterpret_cast<HTTPUploadCallback *>(p);
return callback->getDataChunk(data, size * nmemb);
} catch (...) {
return CALLBACK_ABORT;
}
}
int HTTPRequestResponse::seek_callback(void *p, int64_t offset, int) {
try {
if (p == nullptr) {
return SEEKFUNC_FAIL;
}
auto *callback = reinterpret_cast<HTTPUploadCallback *>(p);
return gsl::narrow<int>(callback->setPosition(offset));
} catch (...) {
return SEEKFUNC_FAIL;
}
}
size_t HTTPUploadByteArrayInputCallback::getDataChunk(char *data, size_t size) {
if (stop) {
return HTTPRequestResponse::CALLBACK_ABORT;
}
size_t buffer_size = getBufferSize();
if (pos <= buffer_size) {
size_t len = buffer_size - pos;
if (len <= 0) {
return 0;
}
auto *ptr = getBuffer(pos);
if (ptr == nullptr) {
return 0;
}
if (len > size)
len = size;
memcpy(data, ptr, len);
pos += len;
seek(pos);
return len;
}
return 0;
}
size_t HTTPUploadByteArrayInputCallback::setPosition(int64_t offset) {
if (stop) {
return HTTPRequestResponse::SEEKFUNC_FAIL;
}
if (getBufferSize() <= static_cast<size_t>(offset)) {
return HTTPRequestResponse::SEEKFUNC_FAIL;
}
pos = offset;
seek(pos);
return HTTPRequestResponse::SEEKFUNC_OK;
}
size_t HTTPUploadStreamContentsCallback::getDataChunk(char *data, size_t size) {
logger_->log_trace("HTTPUploadStreamContentsCallback is asked for up to {} bytes", size);
std::span<char> buffer{data, size};
size_t num_read = input_stream_->read(as_writable_bytes(buffer));
if (io::isError(num_read)) {
logger_->log_error("Error reading the input stream in HTTPUploadStreamContentsCallback");
return 0;
}
logger_->log_debug("HTTPUploadStreamContentsCallback is returning {} bytes", num_read);
return num_read;
}
size_t HTTPUploadStreamContentsCallback::setPosition(int64_t offset) {
if (offset == 0) {
logger_->log_debug("HTTPUploadStreamContentsCallback is ignoring request to rewind to the beginning");
} else {
logger_->log_warn("HTTPUploadStreamContentsCallback is ignoring request to seek to position {}", offset);
}
return HTTPRequestResponse::SEEKFUNC_OK;
}
} // namespace org::apache::nifi::minifi::http