| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #include "PutTCP.h" |
| |
| #include <utility> |
| #include <tuple> |
| |
| #include "range/v3/range/conversion.hpp" |
| |
| #include "utils/gsl.h" |
| #include "core/ProcessContext.h" |
| #include "core/ProcessSession.h" |
| #include "core/Resource.h" |
| #include "core/logging/Logger.h" |
| #include "utils/net/AsioCoro.h" |
| #include "utils/net/AsioSocketUtils.h" |
| |
| using asio::ip::tcp; |
| |
| using namespace std::literals::chrono_literals; |
| using std::chrono::steady_clock; |
| using org::apache::nifi::minifi::utils::net::use_nothrow_awaitable; |
| using org::apache::nifi::minifi::utils::net::HandshakeType; |
| using org::apache::nifi::minifi::utils::net::TcpSocket; |
| using org::apache::nifi::minifi::utils::net::SslSocket; |
| using org::apache::nifi::minifi::utils::net::asyncOperationWithTimeout; |
| |
| namespace org::apache::nifi::minifi::processors { |
| |
| constexpr size_t chunk_size = 1024; |
| |
| PutTCP::PutTCP(const std::string& name, const utils::Identifier& uuid) |
| : Processor(name, uuid) {} |
| |
| PutTCP::~PutTCP() = default; |
| |
| void PutTCP::initialize() { |
| setSupportedProperties(Properties); |
| setSupportedRelationships(Relationships); |
| } |
| |
| void PutTCP::notifyStop() {} |
| |
| void PutTCP::onSchedule(core::ProcessContext* const context, core::ProcessSessionFactory*) { |
| gsl_Expects(context); |
| |
| // if the required properties are missing or empty even before evaluating the EL expression, then we can throw in onSchedule, before we waste any flow files |
| if (context->getProperty(Hostname).value_or(std::string{}).empty()) { |
| throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing hostname"}; |
| } |
| if (context->getProperty(Port).value_or(std::string{}).empty()) { |
| throw Exception{ExceptionType::PROCESSOR_EXCEPTION, "missing port"}; |
| } |
| if (auto idle_connection_expiration = context->getProperty<core::TimePeriodValue>(IdleConnectionExpiration); idle_connection_expiration && idle_connection_expiration->getMilliseconds() > 0ms) |
| idle_connection_expiration_ = idle_connection_expiration->getMilliseconds(); |
| else |
| idle_connection_expiration_.reset(); |
| |
| if (auto timeout = context->getProperty<core::TimePeriodValue>(Timeout); timeout && timeout->getMilliseconds() > 0ms) |
| timeout_duration_ = timeout->getMilliseconds(); |
| else |
| timeout_duration_ = 15s; |
| |
| if (context->getProperty<bool>(ConnectionPerFlowFile).value_or(false)) |
| connections_.reset(); |
| else |
| connections_.emplace(); |
| |
| std::string context_name; |
| ssl_context_.reset(); |
| if (context->getProperty(SSLContextService, context_name) && !IsNullOrEmpty(context_name)) { |
| if (auto controller_service = context->getControllerService(context_name)) { |
| if (auto ssl_context_service = std::dynamic_pointer_cast<minifi::controllers::SSLContextService>(context->getControllerService(context_name))) { |
| ssl_context_ = utils::net::getSslContext(*ssl_context_service); |
| } else { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, context_name + " is not an SSL Context Service"); |
| } |
| } else { |
| throw Exception(PROCESS_SCHEDULE_EXCEPTION, "Invalid controller service: " + context_name); |
| } |
| } |
| |
| const auto delimiter_str = context->getProperty(OutgoingMessageDelimiter).value_or(std::string{}); |
| delimiter_ = utils::span_to<std::vector>(as_bytes(std::span(delimiter_str))); |
| |
| if (auto max_size_of_socket_send_buffer = context->getProperty<core::DataSizeValue>(MaxSizeOfSocketSendBuffer)) |
| max_size_of_socket_send_buffer_ = max_size_of_socket_send_buffer->getValue(); |
| else |
| max_size_of_socket_send_buffer_.reset(); |
| } |
| |
| namespace { |
| |
| template<class SocketType> |
| class ConnectionHandler : public ConnectionHandlerBase { |
| public: |
| ConnectionHandler(utils::net::ConnectionId connection_id, |
| std::chrono::milliseconds timeout, |
| std::shared_ptr<core::logging::Logger> logger, |
| std::optional<size_t> max_size_of_socket_send_buffer, |
| asio::ssl::context* ssl_context) |
| : connection_id_(std::move(connection_id)), |
| timeout_duration_(timeout), |
| logger_(std::move(logger)), |
| max_size_of_socket_send_buffer_(max_size_of_socket_send_buffer), |
| ssl_context_(ssl_context) { |
| } |
| |
| ~ConnectionHandler() override { |
| shutdownSocket(); |
| } |
| |
| asio::awaitable<std::error_code> sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, |
| const std::vector<std::byte>& delimiter, |
| asio::io_context& io_context_) override; |
| |
| private: |
| [[nodiscard]] bool hasBeenUsedIn(std::chrono::milliseconds dur) const override { |
| return last_used_ && *last_used_ >= (steady_clock::now() - dur); |
| } |
| |
| void reset() override { |
| last_used_.reset(); |
| socket_.reset(); |
| } |
| |
| [[nodiscard]] bool hasBeenUsed() const override { return last_used_.has_value(); } |
| [[nodiscard]] asio::awaitable<std::error_code> setupUsableSocket(asio::io_context& io_context); |
| [[nodiscard]] bool hasUsableSocket() const { return socket_ && socket_->lowest_layer().is_open(); } |
| |
| asio::awaitable<std::error_code> establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context_); |
| asio::awaitable<std::error_code> send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter); |
| |
| SocketType createNewSocket(asio::io_context& io_context_); |
| void shutdownSocket(); |
| |
| utils::net::ConnectionId connection_id_; |
| std::optional<SocketType> socket_; |
| |
| std::optional<steady_clock::time_point> last_used_; |
| asio::steady_timer::duration timeout_duration_; |
| |
| std::shared_ptr<core::logging::Logger> logger_; |
| std::optional<size_t> max_size_of_socket_send_buffer_; |
| |
| asio::ssl::context* ssl_context_; |
| }; |
| |
| template<> |
| TcpSocket ConnectionHandler<TcpSocket>::createNewSocket(asio::io_context& io_context_) { |
| gsl_Expects(!ssl_context_); |
| return TcpSocket{io_context_}; |
| } |
| |
| template<> |
| SslSocket ConnectionHandler<SslSocket>::createNewSocket(asio::io_context& io_context_) { |
| gsl_Expects(ssl_context_); |
| return {io_context_, *ssl_context_}; |
| } |
| |
| template<> |
| void ConnectionHandler<TcpSocket>::shutdownSocket() { |
| } |
| |
| template<> |
| void ConnectionHandler<SslSocket>::shutdownSocket() { |
| gsl_Expects(ssl_context_); |
| if (socket_) { |
| asio::error_code ec; |
| socket_->lowest_layer().cancel(ec); |
| if (ec) { |
| logger_->log_error("Cancelling asynchronous operations of SSL socket failed with: %s", ec.message()); |
| } |
| socket_->shutdown(ec); |
| if (ec) { |
| logger_->log_error("Shutdown of SSL socket failed with: %s", ec.message()); |
| } |
| } |
| } |
| |
| template<class SocketType> |
| asio::awaitable<std::error_code> ConnectionHandler<SocketType>::establishNewConnection(const tcp::resolver::results_type& endpoints, asio::io_context& io_context) { |
| auto socket = createNewSocket(io_context); |
| std::error_code last_error; |
| for (const auto& endpoint : endpoints) { |
| auto [connection_error] = co_await asyncOperationWithTimeout(socket.lowest_layer().async_connect(endpoint, use_nothrow_awaitable), timeout_duration_); |
| if (connection_error) { |
| core::logging::LOG_DEBUG(logger_) << "Connecting to " << endpoint.endpoint() << " failed due to " << connection_error.message(); |
| last_error = connection_error; |
| continue; |
| } |
| auto [handshake_error] = co_await utils::net::handshake(socket, timeout_duration_); |
| if (handshake_error) { |
| core::logging::LOG_DEBUG(logger_) << "Handshake with " << endpoint.endpoint() << " failed due to " << handshake_error.message(); |
| last_error = handshake_error; |
| continue; |
| } |
| if (max_size_of_socket_send_buffer_) |
| socket.lowest_layer().set_option(TcpSocket::send_buffer_size(*max_size_of_socket_send_buffer_)); |
| socket_.emplace(std::move(socket)); |
| co_return std::error_code(); |
| } |
| co_return last_error; |
| } |
| |
| template<class SocketType> |
| [[nodiscard]] asio::awaitable<std::error_code> ConnectionHandler<SocketType>::setupUsableSocket(asio::io_context& io_context) { |
| if (hasUsableSocket()) |
| co_return std::error_code(); |
| tcp::resolver resolver(io_context); |
| auto [resolve_error, resolve_result] = co_await asyncOperationWithTimeout( |
| resolver.async_resolve(connection_id_.getHostname(), connection_id_.getService(), use_nothrow_awaitable), timeout_duration_); |
| if (resolve_error) |
| co_return resolve_error; |
| co_return co_await establishNewConnection(resolve_result, io_context); |
| } |
| |
| template<class SocketType> |
| asio::awaitable<std::error_code> ConnectionHandler<SocketType>::sendStreamWithDelimiter(const std::shared_ptr<io::InputStream>& stream_to_send, |
| const std::vector<std::byte>& delimiter, |
| asio::io_context& io_context) { |
| if (auto connection_error = co_await setupUsableSocket(io_context)) // NOLINT |
| co_return connection_error; |
| co_return co_await send(stream_to_send, delimiter); |
| } |
| |
| template<class SocketType> |
| asio::awaitable<std::error_code> ConnectionHandler<SocketType>::send(const std::shared_ptr<io::InputStream>& stream_to_send, const std::vector<std::byte>& delimiter) { |
| gsl_Expects(hasUsableSocket()); |
| |
| std::vector<std::byte> data_chunk; |
| data_chunk.resize(chunk_size); |
| std::span<std::byte> buffer{data_chunk}; |
| while (stream_to_send->tell() < stream_to_send->size()) { |
| size_t num_read = stream_to_send->read(buffer); |
| if (io::isError(num_read)) |
| co_return std::make_error_code(std::errc::io_error); |
| auto [write_error, bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(data_chunk, num_read), use_nothrow_awaitable), timeout_duration_); |
| if (write_error) |
| co_return write_error; |
| logger_->log_trace("Writing flowfile(%zu bytes) to socket succeeded", bytes_written); |
| } |
| auto [delimiter_write_error, delimiter_bytes_written] = co_await asyncOperationWithTimeout(asio::async_write(*socket_, asio::buffer(delimiter), use_nothrow_awaitable), timeout_duration_); |
| if (delimiter_write_error) |
| co_return delimiter_write_error; |
| logger_->log_trace("Writing delimiter(%zu bytes) to socket succeeded", delimiter_bytes_written); |
| |
| last_used_ = steady_clock::now(); |
| co_return std::error_code(); |
| } |
| } // namespace |
| |
| void PutTCP::onTrigger(core::ProcessContext* context, core::ProcessSession* const session) { |
| gsl_Expects(context && session); |
| |
| const auto flow_file = session->get(); |
| if (!flow_file) { |
| yield(); |
| return; |
| } |
| |
| removeExpiredConnections(); |
| |
| auto hostname = context->getProperty(Hostname, flow_file).value_or(std::string{}); |
| auto port = context->getProperty(Port, flow_file).value_or(std::string{}); |
| if (hostname.empty() || port.empty()) { |
| logger_->log_error("[%s] invalid target endpoint: hostname: %s, port: %s", flow_file->getUUIDStr(), |
| hostname.empty() ? "(empty)" : hostname.c_str(), |
| port.empty() ? "(empty)" : port.c_str()); |
| session->transfer(flow_file, Failure); |
| return; |
| } |
| |
| auto connection_id = utils::net::ConnectionId(std::move(hostname), std::move(port)); |
| std::shared_ptr<ConnectionHandlerBase> handler; |
| if (!connections_ || !connections_->contains(connection_id)) { |
| if (ssl_context_) |
| handler = std::make_shared<ConnectionHandler<SslSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, &*ssl_context_); |
| else |
| handler = std::make_shared<ConnectionHandler<TcpSocket>>(connection_id, timeout_duration_, logger_, max_size_of_socket_send_buffer_, nullptr); |
| if (connections_) |
| (*connections_)[connection_id] = handler; |
| } else { |
| handler = (*connections_)[connection_id]; |
| } |
| |
| gsl_Expects(handler); |
| |
| processFlowFile(handler, *session, flow_file); |
| } |
| |
| void PutTCP::removeExpiredConnections() { |
| if (connections_) { |
| std::erase_if(*connections_, [this](auto& item) -> bool { |
| const auto& connection_handler = item.second; |
| return (!connection_handler || (idle_connection_expiration_ && !connection_handler->hasBeenUsedIn(*idle_connection_expiration_))); |
| }); |
| } |
| } |
| |
| std::error_code PutTCP::sendFlowFileContent(std::shared_ptr<ConnectionHandlerBase>& connection_handler, |
| const std::shared_ptr<io::InputStream>& flow_file_content_stream) { |
| std::error_code operation_error; |
| io_context_.restart(); |
| asio::co_spawn(io_context_, |
| connection_handler->sendStreamWithDelimiter(flow_file_content_stream, delimiter_, io_context_), |
| [&operation_error](const std::exception_ptr&, std::error_code error_code) { |
| operation_error = error_code; |
| }); |
| io_context_.run(); |
| return operation_error; |
| } |
| |
| void PutTCP::processFlowFile(std::shared_ptr<ConnectionHandlerBase>& connection_handler, |
| core::ProcessSession& session, |
| const std::shared_ptr<core::FlowFile>& flow_file) { |
| auto flow_file_content_stream = session.getFlowFileContentStream(flow_file); |
| if (!flow_file_content_stream) { |
| session.transfer(flow_file, Failure); |
| return; |
| } |
| |
| std::error_code operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream); |
| |
| if (operation_error && connection_handler->hasBeenUsed()) { |
| logger_->log_warn("%s with reused connection, retrying...", operation_error.message()); |
| connection_handler->reset(); |
| operation_error = sendFlowFileContent(connection_handler, flow_file_content_stream); |
| } |
| |
| if (operation_error) { |
| connection_handler->reset(); |
| logger_->log_error("%s", operation_error.message()); |
| session.transfer(flow_file, Failure); |
| } else { |
| session.transfer(flow_file, Success); |
| } |
| } |
| |
| REGISTER_RESOURCE(PutTCP, Processor); |
| |
| } // namespace org::apache::nifi::minifi::processors |