blob: 94c2d98840712459cd1de0f55f5aeea8fa7a1177 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ListenTCP.h"
#include "core/Resource.h"
#include "core/PropertyBuilder.h"
#include "controllers/SSLContextService.h"
#include "utils/ProcessorConfigUtils.h"
namespace org::apache::nifi::minifi::processors {
const core::Property ListenTCP::Port(
core::PropertyBuilder::createProperty("Listening Port")
->withDescription("The port to listen on for communication.")
->withType(core::StandardValidators::get().LISTEN_PORT_VALIDATOR)
->isRequired(true)
->build());
const core::Property ListenTCP::MaxQueueSize(
core::PropertyBuilder::createProperty("Max Size of Message Queue")
->withDescription("Maximum number of messages allowed to be buffered before processing them when the processor is triggered. "
"If the buffer is full, the message is ignored. If set to zero the buffer is unlimited.")
->withDefaultValue<uint64_t>(10000)
->isRequired(true)
->build());
const core::Property ListenTCP::MaxBatchSize(
core::PropertyBuilder::createProperty("Max Batch Size")
->withDescription("The maximum number of messages to process at a time.")
->withDefaultValue<uint64_t>(500)
->isRequired(true)
->build());
const core::Property ListenTCP::SSLContextService(
core::PropertyBuilder::createProperty("SSL Context Service")
->withDescription("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.")
->asType<minifi::controllers::SSLContextService>()
->build());
const core::Property ListenTCP::ClientAuth(
core::PropertyBuilder::createProperty("Client Auth")
->withDescription("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.")
->withDefaultValue<std::string>(toString(utils::net::SslServer::ClientAuthOption::NONE))
->withAllowableValues<std::string>(utils::net::SslServer::ClientAuthOption::values())
->build());
const core::Relationship ListenTCP::Success("success", "Messages received successfully will be sent out this relationship.");
void ListenTCP::initialize() {
setSupportedProperties(properties());
setSupportedRelationships(relationships());
}
void ListenTCP::onSchedule(const std::shared_ptr<core::ProcessContext>& context, const std::shared_ptr<core::ProcessSessionFactory>&) {
gsl_Expects(context);
startTcpServer(*context, SSLContextService, ClientAuth);
}
void ListenTCP::transferAsFlowFile(const utils::net::Message& message, core::ProcessSession& session) {
auto flow_file = session.create();
session.writeBuffer(flow_file, message.message_data);
flow_file->setAttribute("tcp.port", std::to_string(message.server_port));
flow_file->setAttribute("tcp.sender", message.sender_address.to_string());
session.transfer(flow_file, Success);
}
const core::Property& ListenTCP::getMaxBatchSizeProperty() {
return MaxBatchSize;
}
const core::Property& ListenTCP::getMaxQueueSizeProperty() {
return MaxQueueSize;
}
const core::Property& ListenTCP::getPortProperty() {
return Port;
}
REGISTER_RESOURCE(ListenTCP, Processor);
} // namespace org::apache::nifi::minifi::processors