blob: 98fdc08cd6cd1704e882cde1c3927dda885deb41 [file] [log] [blame]
/**
* @file RemoteProcessGroupPort.cpp
* RemoteProcessGroupPort class implementation
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RemoteProcessGroupPort.h"
#include <cinttypes>
#include <iostream>
#include <memory>
#include <string>
#include <utility>
#include <vector>
#include "Exception.h"
#include "controllers/SSLContextService.h"
#include "core/ProcessContext.h"
#include "core/Processor.h"
#include "core/logging/Logger.h"
#include "http/BaseHTTPClient.h"
#include "rapidjson/document.h"
#include "sitetosite/Peer.h"
#include "sitetosite/SiteToSiteFactory.h"
#include "utils/net/DNS.h"
#undef GetObject // windows.h #defines GetObject = GetObjectA or GetObjectW, which conflicts with rapidjson
using namespace std::literals::chrono_literals;
namespace org::apache::nifi::minifi {
namespace {
std::string buildFullSiteToSiteUrl(const RPG& nifi) {
std::stringstream full_url;
full_url << nifi.protocol << nifi.host;
// don't append port if it is 0 ( undefined )
if (nifi.port > 0) {
full_url << ":" << std::to_string(nifi.port);
}
full_url << "/nifi-api/site-to-site";
return full_url.str();
}
} // namespace
const char *RemoteProcessGroupPort::RPG_SSL_CONTEXT_SERVICE_NAME = "RemoteProcessGroupPortSSLContextService";
void RemoteProcessGroupPort::setURL(const std::string& val) {
auto urls = utils::string::split(val, ",");
for (const auto& url : urls) {
http::URL parsed_url{utils::string::trim(url)};
if (parsed_url.isValid()) {
logger_->log_debug("Parsed RPG URL '{}' -> '{}'", url, parsed_url.hostPort());
nifi_instances_.push_back({parsed_url.host(), parsed_url.port(), parsed_url.protocol()});
} else {
logger_->log_error("Could not parse RPG URL '{}'", url);
}
}
}
gsl::not_null<std::unique_ptr<sitetosite::SiteToSiteClient>> RemoteProcessGroupPort::initializeProtocol(sitetosite::SiteToSiteClientConfiguration& config) const {
config.setSecurityContext(ssl_service_);
config.setHTTPProxy(proxy_);
config.setIdleTimeout(idle_timeout_);
config.setUseCompression(use_compression_);
config.setBatchCount(batch_count_);
config.setBatchSize(batch_size_);
config.setBatchDuration(batch_duration_);
config.setTimeout(timeout_);
return sitetosite::createClient(config);
}
std::unique_ptr<sitetosite::SiteToSiteClient> RemoteProcessGroupPort::getNextProtocol() {
std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
if (!available_protocols_.try_dequeue(next_protocol)) {
std::lock_guard<std::mutex> lock(peer_mutex_);
if (peer_index_ >= 0) {
logger_->log_debug("Creating client from peer {}", peer_index_);
auto& peer_status = peers_[peer_index_];
sitetosite::SiteToSiteClientConfiguration config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), local_network_interface_, client_type_);
peer_index_++;
if (peer_index_ >= static_cast<int>(peers_.size())) {
peer_index_ = 0;
}
next_protocol = initializeProtocol(config);
} else {
logger_->log_debug("Refreshing the peer list since there are none configured.");
refreshPeerList();
}
}
logger_->log_debug("Obtained protocol from available_protocols_");
return next_protocol;
}
void RemoteProcessGroupPort::returnProtocol(core::ProcessContext& context, std::unique_ptr<sitetosite::SiteToSiteClient> return_protocol) {
auto count = std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
if (available_protocols_.size_approx() >= count) {
logger_->log_debug("not enqueueing protocol {}", getUUIDStr());
// let the memory be freed
return;
}
logger_->log_debug("enqueueing protocol {}, have a total of {}", getUUIDStr(), available_protocols_.size_approx());
available_protocols_.enqueue(std::move(return_protocol));
}
void RemoteProcessGroupPort::initialize() {
setSupportedProperties(Properties);
setSupportedRelationships(Relationships);
logger_->log_trace("Finished initialization");
}
void RemoteProcessGroupPort::onSchedule(core::ProcessContext& context, core::ProcessSessionFactory&) {
if (auto protocol_uuid = context.getProperty(portUUID)) {
protocol_uuid_ = *protocol_uuid;
}
auto context_name = context.getProperty(SSLContext);
if (!context_name || IsNullOrEmpty(*context_name)) {
context_name = RPG_SSL_CONTEXT_SERVICE_NAME;
}
std::shared_ptr<core::controller::ControllerService> service = context.getControllerService(*context_name, getUUID());
if (nullptr != service) {
ssl_service_ = std::dynamic_pointer_cast<minifi::controllers::SSLContextServiceInterface>(service);
} else {
std::string secureStr;
if (configure_->get(Configure::nifi_remote_input_secure, secureStr) && utils::string::toBool(secureStr).value_or(false)) {
ssl_service_ = std::make_shared<minifi::controllers::SSLContextService>(RPG_SSL_CONTEXT_SERVICE_NAME, configure_);
ssl_service_->onEnable();
}
}
idle_timeout_ = context.getProperty(idleTimeout) | utils::andThen(parsing::parseDuration<std::chrono::milliseconds>) | utils::orThrow("RemoteProcessGroupPort::idleTimeout is a required Property");
std::lock_guard<std::mutex> lock(peer_mutex_);
if (!nifi_instances_.empty()) {
refreshPeerList();
if (!peers_.empty())
peer_index_ = 0;
}
// populate the site2site protocol for load balancing between them
if (!peers_.empty()) {
auto count = std::max<size_t>(context.getProcessor().getMaxConcurrentTasks(), peers_.size());
for (uint32_t i = 0; i < count; i++) {
auto peer_status = peers_[peer_index_];
sitetosite::SiteToSiteClientConfiguration config(peer_status.getPortId(), peer_status.getHost(), peer_status.getPort(), local_network_interface_, client_type_);
peer_index_++;
if (peer_index_ >= static_cast<int>(peers_.size())) {
peer_index_ = 0;
}
logger_->log_trace("Creating client");
auto next_protocol = initializeProtocol(config);
logger_->log_trace("Created client, moving into available protocols");
returnProtocol(context, std::move(next_protocol));
}
} else {
// we don't have any peers
logger_->log_error("No peers selected during scheduling");
}
}
void RemoteProcessGroupPort::notifyStop() {
transmitting_ = false;
std::unique_ptr<sitetosite::SiteToSiteClient> next_protocol = nullptr;
while (available_protocols_.try_dequeue(next_protocol)) {
// clear all protocols now
}
}
void RemoteProcessGroupPort::onTrigger(core::ProcessContext& context, core::ProcessSession& session) {
logger_->log_trace("On trigger {}", getUUIDStr());
if (!transmitting_) {
return;
}
try {
logger_->log_trace("get protocol in on trigger");
auto protocol = getNextProtocol();
if (!protocol) {
logger_->log_info("no protocol, yielding");
context.yield();
return;
}
if (!protocol->transfer(direction_, context, session)) {
logger_->log_warn("protocol transmission failed, yielding");
context.yield();
}
returnProtocol(context, std::move(protocol));
} catch (const std::exception&) {
context.yield();
session.rollback();
}
}
std::optional<std::string> RemoteProcessGroupPort::getRestApiToken(const RPG& nifi) const {
std::string rest_user_name;
configure_->get(Configure::nifi_rest_api_user_name, rest_user_name);
if (rest_user_name.empty()) {
return std::nullopt;
}
std::string rest_password;
configure_->get(Configure::nifi_rest_api_password, rest_password);
std::stringstream login_url;
login_url << nifi.protocol << nifi.host;
// don't append port if it is 0 ( undefined )
if (nifi.port > 0) {
login_url << ":" << std::to_string(nifi.port);
}
login_url << "/nifi-api/access/token";
auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
if (nullptr == client_ptr) {
logger_->log_error("Could not locate HTTPClient. You do not have cURL support!");
return std::nullopt;
}
auto client = std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
client->initialize(http::HttpRequestMethod::GET, login_url.str(), ssl_service_);
// use a connection timeout. if this times out we will simply attempt re-connection
// so no need for configuration parameter that isn't already defined in Processor
client->setConnectionTimeout(10s);
client->setReadTimeout(idle_timeout_);
auto token = http::get_token(client.get(), rest_user_name, rest_password);
logger_->log_debug("Got token from NiFi REST Api endpoint {}", login_url.str());
return token;
}
std::optional<std::pair<std::string, uint16_t>> RemoteProcessGroupPort::parseSiteToSiteDataFromControllerConfig(const RPG& nifi, const std::string& controller) const {
rapidjson::Document doc;
rapidjson::ParseResult ok = doc.Parse(controller.c_str());
if (!ok || !doc.IsObject() || doc.ObjectEmpty()) {
return std::nullopt;
}
rapidjson::Value::MemberIterator itr = doc.FindMember("controller");
if (itr == doc.MemberEnd() || !itr->value.IsObject()) {
return std::nullopt;
}
rapidjson::Value controllerValue = itr->value.GetObject();
rapidjson::Value::ConstMemberIterator end_itr = controllerValue.MemberEnd();
rapidjson::Value::ConstMemberIterator port_itr = controllerValue.FindMember("remoteSiteListeningPort");
rapidjson::Value::ConstMemberIterator secure_itr = controllerValue.FindMember("siteToSiteSecure");
bool site_to_site_secure = secure_itr != end_itr && secure_itr->value.IsBool() ? secure_itr->value.GetBool() : false;
uint16_t site_to_site_port = 0;
if (client_type_ == sitetosite::ClientType::RAW && port_itr != end_itr && port_itr->value.IsNumber()) {
site_to_site_port = port_itr->value.GetInt();
} else {
site_to_site_port = nifi.port;
}
logger_->log_debug("process group remote site2site port {}, is secure {}", site_to_site_port, site_to_site_secure);
return std::make_pair(nifi.host, site_to_site_port);
}
std::optional<std::pair<std::string, uint16_t>> RemoteProcessGroupPort::tryRefreshSiteToSiteInstance(RPG nifi) const { // NOLINT(performance-unnecessary-value-param)
#ifdef WIN32
if ("localhost" == nifi.host) {
nifi.host = org::apache::nifi::minifi::utils::net::getMyHostName();
}
#endif
auto token = getRestApiToken(nifi);
if (token && token->empty()) {
return std::nullopt;
}
auto client_ptr = core::ClassLoader::getDefaultClassLoader().instantiateRaw("HTTPClient", "HTTPClient");
if (nullptr == client_ptr) {
logger_->log_error("Could not locate HTTPClient. You do not have cURL support, defaulting to base configuration!");
return std::nullopt;
}
auto client = std::unique_ptr<http::BaseHTTPClient>(dynamic_cast<http::BaseHTTPClient*>(client_ptr));
auto full_url = buildFullSiteToSiteUrl(nifi);
client->initialize(http::HttpRequestMethod::GET, full_url, ssl_service_);
// use a connection timeout. if this times out we will simply attempt re-connection
// so no need for configuration parameter that isn't already defined in Processor
client->setConnectionTimeout(10s);
client->setReadTimeout(idle_timeout_);
if (!proxy_.host.empty()) {
client->setHTTPProxy(proxy_);
}
if (token) {
client->setRequestHeader("Authorization", token);
}
client->setVerbose(false);
if (!client->submit() || client->getResponseCode() != 200) {
logger_->log_error("ProcessGroup::refreshRemoteSiteToSiteInfo -- curl_easy_perform() failed , response code {}\n", client->getResponseCode());
return std::nullopt;
}
const std::vector<char> &response_body = client->getResponseBody();
if (response_body.empty()) {
logger_->log_error("Cannot output body to content for ProcessGroup::refreshRemoteSiteToSiteInfo: received HTTP code {} from {}", client->getResponseCode(), full_url);
return std::nullopt;
}
std::string controller = std::string(response_body.begin(), response_body.end());
logger_->log_trace("controller config {}", controller);
return parseSiteToSiteDataFromControllerConfig(nifi, controller);
}
std::optional<std::pair<std::string, uint16_t>> RemoteProcessGroupPort::refreshRemoteSiteToSiteInfo() {
if (nifi_instances_.empty()) {
return std::nullopt;
}
for (const auto& nifi : nifi_instances_) {
auto result = tryRefreshSiteToSiteInstance(nifi);
if (result) {
return result;
}
}
return std::nullopt;
}
void RemoteProcessGroupPort::refreshPeerList() {
auto connection = refreshRemoteSiteToSiteInfo();
if (!connection) {
logger_->log_warn("No port configured");
return;
}
peers_.clear();
std::unique_ptr<sitetosite::SiteToSiteClient> protocol;
sitetosite::SiteToSiteClientConfiguration config(protocol_uuid_, connection->first, connection->second, local_network_interface_, client_type_);
protocol = initializeProtocol(config);
if (protocol) {
if (auto peers = protocol->getPeerList()) {
peers_ = *peers;
}
}
logger_->log_info("Have {} peers", peers_.size());
if (!peers_.empty()) {
peer_index_ = 0;
}
}
} // namespace org::apache::nifi::minifi