blob: b806b8ef3770b7a1538a06582637669c5912f037 [file] [log] [blame]
/**
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "GetTCP.h"
#include <sys/types.h>
#include <sys/stat.h>
#include <time.h>
#include <stdio.h>
#include <limits.h>
#ifndef WIN32
#include <dirent.h>
#include <unistd.h>
#include <regex.h>
#endif
#include <vector>
#include <queue>
#include <map>
#include <memory>
#include <utility>
#include <set>
#include <sstream>
#include <string>
#include <iostream>
#include <cinttypes>
#include "io/ClientSocket.h"
#include "utils/StringUtils.h"
#include "utils/TimeUtil.h"
#include "core/ProcessContext.h"
#include "core/ProcessSession.h"
namespace org {
namespace apache {
namespace nifi {
namespace minifi {
namespace processors {
const char *DataHandler::SOURCE_ENDPOINT_ATTRIBUTE = "source.endpoint";
core::Property GetTCP::EndpointList(
core::PropertyBuilder::createProperty("endpoint-list")->withDescription("A comma delimited list of the endpoints to connect to. The format should be <server_address>:<port>.")->isRequired(true)
->build());
core::Property GetTCP::ConcurrentHandlers(
core::PropertyBuilder::createProperty("concurrent-handler-count")->withDescription("Number of concurrent handlers for this session")->withDefaultValue<int>(1)->build());
core::Property GetTCP::ReconnectInterval(
core::PropertyBuilder::createProperty("reconnect-interval")->withDescription("The number of seconds to wait before attempting to reconnect to the endpoint.")
->withDefaultValue<core::TimePeriodValue>("5 s")->build());
core::Property GetTCP::ReceiveBufferSize(
core::PropertyBuilder::createProperty("receive-buffer-size")->withDescription("The size of the buffer to receive data in. Default 16384 (16MB).")->withDefaultValue<core::DataSizeValue>("16 MB")
->build());
core::Property GetTCP::SSLContextService(
core::PropertyBuilder::createProperty("SSL Context Service")->withDescription("SSL Context Service Name")->asType<minifi::controllers::SSLContextService>()->build());
core::Property GetTCP::StayConnected(
core::PropertyBuilder::createProperty("Stay Connected")->withDescription("Determines if we keep the same socket despite having no data")->withDefaultValue<bool>(true)->build());
core::Property GetTCP::ConnectionAttemptLimit(
core::PropertyBuilder::createProperty("connection-attempt-timeout")->withDescription("Maximum number of connection attempts before attempting backup hosts, if configured")->withDefaultValue<int>(
3)->build());
core::Property GetTCP::EndOfMessageByte(
core::PropertyBuilder::createProperty("end-of-message-byte")->withDescription(
"Byte value which denotes end of message. Must be specified as integer within the valid byte range (-128 thru 127). For example, '13' = Carriage return and '10' = New line. Default '13'.")
->withDefaultValue("13")->build());
core::Relationship GetTCP::Success("success", "All files are routed to success");
core::Relationship GetTCP::Partial("partial", "Indicates an incomplete message as a result of encountering the end of message byte trigger");
int16_t DataHandler::handle(std::string source, uint8_t *message, size_t size, bool partial) {
std::shared_ptr<core::ProcessSession> my_session = sessionFactory_->createSession();
std::shared_ptr<core::FlowFile> flowFile = my_session->create();
DataHandlerCallback callback(message, size);
my_session->write(flowFile, &callback);
my_session->putAttribute(flowFile, SOURCE_ENDPOINT_ATTRIBUTE, source);
if (partial) {
my_session->transfer(flowFile, GetTCP::Partial);
} else {
my_session->transfer(flowFile, GetTCP::Success);
}
my_session->commit();
return 0;
}
void GetTCP::initialize() {
// Set the supported properties
std::set<core::Property> properties;
properties.insert(EndpointList);
properties.insert(ConcurrentHandlers);
properties.insert(ConnectionAttemptLimit);
properties.insert(EndOfMessageByte);
properties.insert(ReceiveBufferSize);
properties.insert(StayConnected);
properties.insert(SSLContextService);
setSupportedProperties(properties);
// Set the supported relationships
std::set<core::Relationship> relationships;
relationships.insert(Success);
relationships.insert(Partial);
setSupportedRelationships(relationships);
}
void GetTCP::onSchedule(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSessionFactory> &sessionFactory) {
std::string value;
stay_connected_ = true;
if (context->getProperty(EndpointList.getName(), value)) {
endpoints = utils::StringUtils::split(value, ",");
}
int handlers = 0;
if (context->getProperty(ConcurrentHandlers.getName(), handlers)) {
concurrent_handlers_ = handlers;
}
if (context->getProperty(StayConnected.getName(), value)) {
utils::StringUtils::StringToBool(value, stay_connected_);
} else {
stay_connected_ = true;
}
int connects = 0;
if (context->getProperty(ConnectionAttemptLimit.getName(), connects)) {
connection_attempt_limit_ = connects;
}
context->getProperty(ReceiveBufferSize.getName(), receive_buffer_size_);
if (context->getProperty(EndOfMessageByte.getName(), value)) {
logger_->log_trace("EOM is passed in as %s", value);
int64_t byteValue = 0;
core::Property::StringToInt(value, byteValue);
endOfMessageByte = byteValue & 0xFF;
}
logger_->log_trace("EOM is defined as %i", endOfMessageByte);
context->getProperty(ReconnectInterval.getName(), reconnect_interval_);
handler_ = std::unique_ptr<DataHandler>(new DataHandler(sessionFactory));
f_ex = [&] {
std::unique_ptr<io::Socket> socket_ptr;
// reuse the byte buffer.
std::vector<uint8_t> buffer;
int reconnects = 0;
do {
if ( socket_ring_buffer_.try_dequeue(socket_ptr) ) {
buffer.resize(receive_buffer_size_);
int size_read = socket_ptr->read(buffer.data(), gsl::narrow<int>(receive_buffer_size_), false);
if (size_read >= 0) {
if (size_read > 0) {
// determine cut location
int startLoc = 0, i = 0;
for (; i < size_read; i++) {
if (buffer.at(i) == endOfMessageByte && i > 0) {
if (i-startLoc > 0) {
handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (i-startLoc), true);
}
startLoc = i;
}
}
if (startLoc > 0) {
logger_->log_trace("Starting at %i, ending at %i", startLoc, size_read);
if (size_read-startLoc > 0) {
handler_->handle(socket_ptr->getHostname(), buffer.data()+startLoc, (size_read-startLoc), true);
}
} else {
logger_->log_trace("Handling at %i, ending at %i", startLoc, size_read);
if (size_read > 0) {
handler_->handle(socket_ptr->getHostname(), buffer.data(), size_read, false);
}
}
reconnects = 0;
}
socket_ring_buffer_.enqueue(std::move(socket_ptr));
} else if (size_read == -2 && stay_connected_) {
if (++reconnects > connection_attempt_limit_) {
logger_->log_info("Too many reconnects, exiting thread");
socket_ptr->close();
return -1;
}
logger_->log_info("Sleeping for %" PRIu64 " msec before attempting to reconnect", reconnect_interval_);
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
socket_ring_buffer_.enqueue(std::move(socket_ptr));
} else {
socket_ptr->close();
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
logger_->log_info("Read response returned a -1 from socket, exiting thread");
return -1;
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(reconnect_interval_));
logger_->log_info("Could not use socket, exiting thread");
return -1;
}
}while (running_);
logger_->log_debug("Ending private thread");
return 0;
};
if (context->getProperty(SSLContextService.getName(), value)) {
std::shared_ptr<core::controller::ControllerService> service = context->getControllerService(value);
if (nullptr != service) {
ssl_service_ = std::static_pointer_cast<minifi::controllers::SSLContextService>(service);
}
}
client_thread_pool_.setMaxConcurrentTasks(concurrent_handlers_);
client_thread_pool_.start();
running_ = true;
}
void GetTCP::notifyStop() {
running_ = false;
// await threads to shutdown.
client_thread_pool_.shutdown();
std::unique_ptr<io::Socket> socket_ptr;
while (socket_ring_buffer_.size_approx() > 0) {
socket_ring_buffer_.try_dequeue(socket_ptr);
}
}
void GetTCP::onTrigger(const std::shared_ptr<core::ProcessContext> &context, const std::shared_ptr<core::ProcessSession> &session) {
// Perform directory list
metrics_->iterations_++;
std::lock_guard<std::mutex> lock(mutex_);
// check if the futures are valid. If they've terminated remove it from the map.
for (auto &initEndpoint : endpoints) {
std::vector<std::string> hostAndPort = utils::StringUtils::split(initEndpoint, ":");
auto realizedHost = hostAndPort.at(0);
#ifdef WIN32
if ("localhost" == realizedHost) {
realizedHost = org::apache::nifi::minifi::io::Socket::getMyHostName();
}
#endif
if (hostAndPort.size() != 2) {
continue;
}
auto portStr = hostAndPort.at(1);
auto endpoint = realizedHost + ":" + portStr;
auto endPointFuture = live_clients_.find(endpoint);
// does not exist
if (endPointFuture == live_clients_.end()) {
logger_->log_info("creating endpoint for %s", endpoint);
if (hostAndPort.size() == 2) {
logger_->log_debug("Opening another socket to %s:%s is secure %d", realizedHost, portStr, (ssl_service_ != nullptr));
std::unique_ptr<io::Socket> socket =
ssl_service_ != nullptr ? stream_factory_->createSecureSocket(realizedHost, std::stoi(portStr), ssl_service_) : stream_factory_->createSocket(realizedHost, std::stoi(portStr));
if (!socket) {
logger_->log_error("Could not create socket during initialization for %s", endpoint);
continue;
}
socket->setNonBlocking();
if (socket->initialize() != -1) {
logger_->log_debug("Enqueueing socket into ring buffer %s:%s", realizedHost, portStr);
socket_ring_buffer_.enqueue(std::move(socket));
} else {
logger_->log_error("Could not create socket during initialization for %s", endpoint);
continue;
}
} else {
logger_->log_error("Could not create socket for %s", endpoint);
}
std::future<int> *future = new std::future<int>();
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
if (client_thread_pool_.execute(std::move(functor), *future)) {
live_clients_[endpoint] = future;
}
} else {
if (!endPointFuture->second->valid()) {
delete endPointFuture->second;
std::future<int> *future = new std::future<int>();
std::unique_ptr<utils::AfterExecute<int>> after_execute = std::unique_ptr<utils::AfterExecute<int>>(new SocketAfterExecute(running_, endpoint, &live_clients_, &mutex_));
utils::Worker<int> functor(f_ex, "workers", std::move(after_execute));
if (client_thread_pool_.execute(std::move(functor), *future)) {
live_clients_[endpoint] = future;
}
} else {
logger_->log_debug("Thread still running for %s", endPointFuture->first);
// we have a thread corresponding to this.
}
}
}
logger_->log_debug("Updating endpoint");
context->yield();
}
int16_t GetTCP::getMetricNodes(std::vector<std::shared_ptr<state::response::ResponseNode>> &metric_vector) {
metric_vector.push_back(metrics_);
return 0;
}
} /* namespace processors */
} /* namespace minifi */
} /* namespace nifi */
} /* namespace apache */
} /* namespace org */