blob: 5b3ba5f949019441dfac3e449eb300605b873708 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ConsumeKafka.h"
#include "KafkaProcessorBase.h"
#include "PublishKafka.h"
#include "controllers/SSLContextService.h"
#include "core/PropertyBuilder.h"
#include "core/Resource.h"
// FIXME(fgerlits): we need to put all these static definitions in a single file so that they are executed in this order at runtime
// when https://issues.apache.org/jira/browse/MINIFICPP-1825 is closed, these definitions should be moved back to the cpp file of the class to which they belong
namespace org::apache::nifi::minifi::processors {
// KafkaProcessorBase
const core::Property KafkaProcessorBase::SecurityProtocol(
core::PropertyBuilder::createProperty("Security Protocol")
->withDescription("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
->withDefaultValue<std::string>(toString(SecurityProtocolOption::PLAINTEXT))
->withAllowableValues<std::string>(SecurityProtocolOption::values())
->isRequired(true)
->build());
const core::Property KafkaProcessorBase::SSLContextService(
core::PropertyBuilder::createProperty("SSL Context Service")
->withDescription("SSL Context Service Name")
->asType<minifi::controllers::SSLContextService>()
->build());
const core::Property KafkaProcessorBase::KerberosServiceName(
core::PropertyBuilder::createProperty("Kerberos Service Name")
->withDescription("Kerberos Service Name")
->build());
const core::Property KafkaProcessorBase::KerberosPrincipal(
core::PropertyBuilder::createProperty("Kerberos Principal")
->withDescription("Keberos Principal")
->build());
const core::Property KafkaProcessorBase::KerberosKeytabPath(
core::PropertyBuilder::createProperty("Kerberos Keytab Path")
->withDescription("The path to the location on the local filesystem where the kerberos keytab is located. Read permission on the file is required.")
->build());
const core::Property KafkaProcessorBase::SASLMechanism(
core::PropertyBuilder::createProperty("SASL Mechanism")
->withDescription("The SASL mechanism to use for authentication. Corresponds to Kafka's 'sasl.mechanism' property.")
->withDefaultValue<std::string>(toString(SASLMechanismOption::GSSAPI))
->withAllowableValues<std::string>(SASLMechanismOption::values())
->isRequired(true)
->build());
const core::Property KafkaProcessorBase::Username(
core::PropertyBuilder::createProperty("Username")
->withDescription("The username when the SASL Mechanism is sasl_plaintext")
->build());
const core::Property KafkaProcessorBase::Password(
core::PropertyBuilder::createProperty("Password")
->withDescription("The password for the given username when the SASL Mechanism is sasl_plaintext")
->build());
// ConsumeKafka
const core::Property ConsumeKafka::KafkaBrokers(core::PropertyBuilder::createProperty("Kafka Brokers")
->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>.")
->withDefaultValue("localhost:9092", core::StandardValidators::get().NON_BLANK_VALIDATOR)
->supportsExpressionLanguage(true)
->isRequired(true)
->build());
const core::Property ConsumeKafka::TopicNames(core::PropertyBuilder::createProperty("Topic Names")
->withDescription("The name of the Kafka Topic(s) to pull from. Multiple topic names are supported as a comma separated list.")
->supportsExpressionLanguage(true)
->isRequired(true)
->build());
const core::Property ConsumeKafka::TopicNameFormat(core::PropertyBuilder::createProperty("Topic Name Format")
->withDescription("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression. "
"Using regular expressions does not automatically discover Kafka topics created after the processor started.")
->withAllowableValues<std::string>({TOPIC_FORMAT_NAMES, TOPIC_FORMAT_PATTERNS})
->withDefaultValue(TOPIC_FORMAT_NAMES)
->isRequired(true)
->build());
const core::Property ConsumeKafka::HonorTransactions(core::PropertyBuilder::createProperty("Honor Transactions")
->withDescription(
"Specifies whether or not MiNiFi should honor transactional guarantees when communicating with Kafka. If false, the Processor will use an \"isolation level\" of "
"read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. "
"If this value is true, MiNiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer "
"must wait for the producer to finish its entire transaction instead of pulling as the messages become available.")
->withDefaultValue<bool>(true)
->isRequired(true)
->build());
const core::Property ConsumeKafka::GroupID(core::PropertyBuilder::createProperty("Group ID")
->withDescription("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
->supportsExpressionLanguage(true)
->isRequired(true)
->build());
const core::Property ConsumeKafka::OffsetReset(core::PropertyBuilder::createProperty("Offset Reset")
->withDescription("Allows you to manage the condition when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that "
"data has been deleted). Corresponds to Kafka's 'auto.offset.reset' property.")
->withAllowableValues<std::string>({OFFSET_RESET_EARLIEST, OFFSET_RESET_LATEST, OFFSET_RESET_NONE})
->withDefaultValue(OFFSET_RESET_LATEST)
->isRequired(true)
->build());
const core::Property ConsumeKafka::KeyAttributeEncoding(core::PropertyBuilder::createProperty("Key Attribute Encoding")
->withDescription("FlowFiles that are emitted have an attribute named 'kafka.key'. This property dictates how the value of the attribute should be encoded.")
->withAllowableValues<std::string>({KEY_ATTR_ENCODING_UTF_8, KEY_ATTR_ENCODING_HEX})
->withDefaultValue(KEY_ATTR_ENCODING_UTF_8)
->isRequired(true)
->build());
const core::Property ConsumeKafka::MessageDemarcator(core::PropertyBuilder::createProperty("Message Demarcator")
->withDescription("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains all Kafka messages in a single batch "
"for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use for demarcating apart multiple Kafka messages. "
"This is an optional property and if not provided each Kafka message received will result in a single FlowFile which time it is triggered. ")
->supportsExpressionLanguage(true)
->build());
const core::Property ConsumeKafka::MessageHeaderEncoding(core::PropertyBuilder::createProperty("Message Header Encoding")
->withDescription("Any message header that is found on a Kafka message will be added to the outbound FlowFile as an attribute. This property indicates the Character Encoding "
"to use for deserializing the headers.")
->withAllowableValues<std::string>({MSG_HEADER_ENCODING_UTF_8, MSG_HEADER_ENCODING_HEX})
->withDefaultValue(MSG_HEADER_ENCODING_UTF_8)
->build());
const core::Property ConsumeKafka::HeadersToAddAsAttributes(core::PropertyBuilder::createProperty("Headers To Add As Attributes")
->withDescription("A comma separated list to match against all message headers. Any message header whose name matches an item from the list will be added to the FlowFile "
"as an Attribute. If not specified, no Header values will be added as FlowFile attributes. The behaviour on when multiple headers of the same name are present is set using "
"the DuplicateHeaderHandling attribute.")
->build());
const core::Property ConsumeKafka::DuplicateHeaderHandling(core::PropertyBuilder::createProperty("Duplicate Header Handling")
->withDescription("For headers to be added as attributes, this option specifies how to handle cases where multiple headers are present with the same key. "
"For example in case of receiving these two headers: \"Accept: text/html\" and \"Accept: application/xml\" and we want to attach the value of \"Accept\" "
"as a FlowFile attribute:\n"
" - \"Keep First\" attaches: \"Accept -> text/html\"\n"
" - \"Keep Latest\" attaches: \"Accept -> application/xml\"\n"
" - \"Comma-separated Merge\" attaches: \"Accept -> text/html, application/xml\"\n")
->withAllowableValues<std::string>({MSG_HEADER_KEEP_FIRST, MSG_HEADER_KEEP_LATEST, MSG_HEADER_COMMA_SEPARATED_MERGE})
->withDefaultValue(MSG_HEADER_KEEP_LATEST) // Mirroring NiFi behaviour
->build());
const core::Property ConsumeKafka::MaxPollRecords(core::PropertyBuilder::createProperty("Max Poll Records")
->withDescription("Specifies the maximum number of records Kafka should return when polling each time the processor is triggered.")
->withDefaultValue<unsigned int>(DEFAULT_MAX_POLL_RECORDS)
->build());
const core::Property ConsumeKafka::MaxPollTime(core::PropertyBuilder::createProperty("Max Poll Time")
->withDescription("Specifies the maximum amount of time the consumer can use for polling data from the brokers. "
"Polling is a blocking operation, so the upper limit of this value is specified in 4 seconds.")
->withDefaultValue(DEFAULT_MAX_POLL_TIME, std::make_shared<core::ConsumeKafkaMaxPollTimeValidator>(std::string("ConsumeKafkaMaxPollTimeValidator")))
->isRequired(true)
->build());
const core::Property ConsumeKafka::SessionTimeout(core::PropertyBuilder::createProperty("Session Timeout")
->withDescription("Client group session and failure detection timeout. The consumer sends periodic heartbeats "
"to indicate its liveness to the broker. If no hearts are received by the broker for a group member within "
"the session timeout, the broker will remove the consumer from the group and trigger a rebalance. "
"The allowed range is configured with the broker configuration properties group.min.session.timeout.ms and group.max.session.timeout.ms.")
->withDefaultValue<core::TimePeriodValue>("60 seconds")
->build());
const core::Relationship ConsumeKafka::Success("success", "Incoming Kafka messages as flowfiles. Depending on the demarcation strategy, this can be one or multiple flowfiles per message.");
REGISTER_RESOURCE(ConsumeKafka, Processor);
// PublishKafka
const core::Property PublishKafka::SeedBrokers(
core::PropertyBuilder::createProperty("Known Brokers")->withDescription("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
->isRequired(true)->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::Topic(
core::PropertyBuilder::createProperty("Topic Name")->withDescription("The Kafka Topic of interest")
->isRequired(true)->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::DeliveryGuarantee(
core::PropertyBuilder::createProperty("Delivery Guarantee")->withDescription("Specifies the requirement for guaranteeing that a message is sent to Kafka. "
"Valid values are 0 (do not wait for acks), "
"-1 or all (block until message is committed by all in sync replicas) "
"or any concrete number of nodes.")
->isRequired(false)->supportsExpressionLanguage(true)->withDefaultValue(DELIVERY_ONE_NODE)->build());
const core::Property PublishKafka::MaxMessageSize(
core::PropertyBuilder::createProperty("Max Request Size")->withDescription("Maximum Kafka protocol request message size")
->isRequired(false)->build());
const core::Property PublishKafka::RequestTimeOut(
core::PropertyBuilder::createProperty("Request Timeout")->withDescription("The ack timeout of the producer request")
->isRequired(false)->withDefaultValue<core::TimePeriodValue>("10 sec")->build());
const core::Property PublishKafka::MessageTimeOut(
core::PropertyBuilder::createProperty("Message Timeout")->withDescription("The total time sending a message could take")
->isRequired(false)->withDefaultValue<core::TimePeriodValue>("30 sec")->build());
const core::Property PublishKafka::ClientName(
core::PropertyBuilder::createProperty("Client Name")->withDescription("Client Name to use when communicating with Kafka")
->isRequired(true)->supportsExpressionLanguage(true)->build());
const core::Property PublishKafka::BatchSize(
core::PropertyBuilder::createProperty("Batch Size")->withDescription("Maximum number of messages batched in one MessageSet")
->isRequired(false)->withDefaultValue<uint32_t>(10)->build());
const core::Property PublishKafka::TargetBatchPayloadSize(
core::PropertyBuilder::createProperty("Target Batch Payload Size")->withDescription("The target total payload size for a batch. 0 B means unlimited (Batch Size is still applied).")
->isRequired(false)->withDefaultValue<core::DataSizeValue>("512 KB")->build());
const core::Property PublishKafka::AttributeNameRegex("Attributes to Send as Headers", "Any attribute whose name matches the regex will be added to the Kafka messages as a Header", "");
const core::Property PublishKafka::QueueBufferMaxTime(
core::PropertyBuilder::createProperty("Queue Buffering Max Time")
->isRequired(false)
->withDefaultValue<core::TimePeriodValue>("5 millis")
->withDescription("Delay to wait for messages in the producer queue to accumulate before constructing message batches")
->build());
const core::Property PublishKafka::QueueBufferMaxSize(
core::PropertyBuilder::createProperty("Queue Max Buffer Size")
->isRequired(false)
->withDefaultValue<core::DataSizeValue>("1 MB")
->withDescription("Maximum total message size sum allowed on the producer queue")
->build());
const core::Property PublishKafka::QueueBufferMaxMessage(
core::PropertyBuilder::createProperty("Queue Max Message")
->isRequired(false)
->withDefaultValue<uint64_t>(1000)
->withDescription("Maximum number of messages allowed on the producer queue")
->build());
const core::Property PublishKafka::CompressCodec(
core::PropertyBuilder::createProperty("Compress Codec")
->isRequired(false)
->withDefaultValue<std::string>(COMPRESSION_CODEC_NONE)
->withAllowableValues<std::string>({COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY})
->withDescription("compression codec to use for compressing message sets")
->build());
const core::Property PublishKafka::MaxFlowSegSize(
core::PropertyBuilder::createProperty("Max Flow Segment Size")->withDescription("Maximum flow content payload segment size for the kafka record. 0 B means unlimited.")
->isRequired(false)->withDefaultValue<core::DataSizeValue>("0 B")->build());
const core::Property PublishKafka::SecurityCA("Security CA", "DEPRECATED in favor of SSL Context Service. File or directory path to CA certificate(s) for verifying the broker's key", "");
const core::Property PublishKafka::SecurityCert("Security Cert", "DEPRECATED in favor of SSL Context Service.Path to client's public key (PEM) used for authentication", "");
const core::Property PublishKafka::SecurityPrivateKey("Security Private Key", "DEPRECATED in favor of SSL Context Service.Path to client's private key (PEM) used for authentication", "");
const core::Property PublishKafka::SecurityPrivateKeyPassWord("Security Pass Phrase", "DEPRECATED in favor of SSL Context Service.Private key passphrase", "");
const core::Property PublishKafka::KafkaKey(
core::PropertyBuilder::createProperty("Kafka Key")
->withDescription("The key to use for the message. If not specified, the UUID of the flow file is used as the message key.")
->supportsExpressionLanguage(true)
->build());
const core::Property PublishKafka::MessageKeyField("Message Key Field", "DEPRECATED, does not work -- use Kafka Key instead", "");
const core::Property PublishKafka::DebugContexts("Debug contexts", "A comma-separated list of debug contexts to enable."
"Including: generic, broker, topic, metadata, feature, queue, msg, protocol, cgrp, security, fetch, interceptor, plugin, consumer, admin, eos, all", "");
const core::Property PublishKafka::FailEmptyFlowFiles(
core::PropertyBuilder::createProperty("Fail empty flow files")
->withDescription("Keep backwards compatibility with <=0.7.0 bug which caused flow files with empty content to not be published to Kafka and forwarded to failure. The old behavior is "
"deprecated. Use connections to drop empty flow files!")
->isRequired(false)
->withDefaultValue<bool>(true)
->build());
const core::Relationship PublishKafka::Success("success", "Any FlowFile that is successfully sent to Kafka will be routed to this Relationship");
const core::Relationship PublishKafka::Failure("failure", "Any FlowFile that cannot be sent to Kafka will be routed to this Relationship");
REGISTER_RESOURCE(PublishKafka, Processor);
} // namespace org::apache::nifi::minifi::processors