blob: f793ba26d7ac575fb08b343c01ecb3e8f427c865 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.record.sink.kafka;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.kafka.shared.component.KafkaClientComponent;
import org.apache.nifi.kafka.shared.property.provider.KafkaPropertyProvider;
import org.apache.nifi.kafka.shared.property.provider.StandardKafkaPropertyProvider;
import org.apache.nifi.kafka.shared.validation.DynamicPropertyValidator;
import org.apache.nifi.kafka.shared.validation.KafkaClientCustomValidationFunction;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.RecordSet;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.exception.TokenTooLargeException;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.time.Duration;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@Tags({"kafka", "record", "sink"})
@CapabilityDescription("Provides a service to write records to a Kafka 2.6+ topic.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+ " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
+ " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
expressionLanguageScope = ExpressionLanguageScope.ENVIRONMENT)
public class KafkaRecordSink_2_6 extends AbstractControllerService implements KafkaClientComponent, RecordSinkService {
static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery",
"Records are considered 'transmitted unsuccessfully' unless the message is replicated to the appropriate "
+ "number of Kafka Nodes according to the Topic configuration.");
static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery",
"Records are considered 'transmitted successfully' if the message is received by a single Kafka node, "
+ "whether or not it is replicated. This is faster than <Guarantee Replicated Delivery> "
+ "but can result in data loss if a Kafka node crashes.");
static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort",
"Records are considered 'transmitted successfully' after successfully writing the content to a Kafka node, "
+ "without waiting for a response. This provides the best performance but may result in data loss.");
static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name")
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("acks")
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT)
.build();
static final PropertyDescriptor METADATA_WAIT_TIME = new PropertyDescriptor.Builder()
.name("max.block.ms")
.displayName("Max Metadata Wait Time")
.description("The amount of time publisher will wait to obtain metadata or wait for the buffer to flush during the 'send' call before failing the "
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.defaultValue("5 sec")
.build();
static final PropertyDescriptor ACK_WAIT_TIME = new PropertyDescriptor.Builder()
.name("ack.wait.time")
.displayName("Acknowledgment Wait Time")
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
.expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 secs")
.build();
static final PropertyDescriptor MAX_REQUEST_SIZE = new PropertyDescriptor.Builder()
.name("max.request.size")
.displayName("Max Request Size")
.description("The maximum size of a request in bytes. Corresponds to Kafka's 'max.request.size' property and defaults to 1 MB (1048576).")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
.defaultValue("1 MB")
.build();
static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder()
.name("compression.type")
.displayName("Compression Type")
.description("This parameter allows you to specify the compression codec for all data generated by this producer.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.allowableValues("none", "gzip", "snappy", "lz4")
.defaultValue("none")
.build();
static final PropertyDescriptor MESSAGE_HEADER_ENCODING = new PropertyDescriptor.Builder()
.name("message-header-encoding")
.displayName("Message Header Encoding")
.description("For any attribute that is added as a message header, as configured via the <Attributes to Send as Headers> property, "
+ "this property indicates the Character Encoding to use for serializing the headers.")
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.required(false)
.build();
private static final List<PropertyDescriptor> PROPERTIES = List.of(
BOOTSTRAP_SERVERS,
TOPIC,
RecordSinkService.RECORD_WRITER_FACTORY,
DELIVERY_GUARANTEE,
MESSAGE_HEADER_ENCODING,
SECURITY_PROTOCOL,
SELF_CONTAINED_KERBEROS_USER_SERVICE,
KERBEROS_SERVICE_NAME,
SSL_CONTEXT_SERVICE,
MAX_REQUEST_SIZE,
ACK_WAIT_TIME,
METADATA_WAIT_TIME,
COMPRESSION_CODEC
);
private volatile RecordSetWriterFactory writerFactory;
private volatile int maxMessageSize;
private volatile long maxAckWaitMillis;
private volatile String topic;
private volatile Producer<byte[], byte[]> producer;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTIES;
}
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName)
.addValidator(new DynamicPropertyValidator(ProducerConfig.class))
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
}
@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
return new KafkaClientCustomValidationFunction().apply(validationContext);
}
@OnEnabled
public void onEnabled(final ConfigurationContext context) throws InitializationException {
topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
writerFactory = context.getProperty(RecordSinkService.RECORD_WRITER_FACTORY).asControllerService(RecordSetWriterFactory.class);
maxMessageSize = context.getProperty(MAX_REQUEST_SIZE).asDataSize(DataUnit.B).intValue();
maxAckWaitMillis = context.getProperty(ACK_WAIT_TIME).asTimePeriod(TimeUnit.MILLISECONDS);
final KafkaPropertyProvider propertyProvider = new StandardKafkaPropertyProvider(ProducerConfig.class);
final Map<String, Object> kafkaProperties = propertyProvider.getProperties(context);
kafkaProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
kafkaProperties.put("max.request.size", String.valueOf(maxMessageSize));
try {
producer = createProducer(kafkaProperties);
} catch (Exception e) {
getLogger().error("Could not create Kafka producer due to {}", e.getMessage(), e);
throw new InitializationException(e);
}
}
@Override
public WriteResult sendData(final RecordSet recordSet, final Map<String, String> attributes, final boolean sendZeroResults) throws IOException {
try {
final RecordSchema writeSchema = getWriterFactory().getSchema(null, recordSet.getSchema());
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
final ByteCountingOutputStream out = new ByteCountingOutputStream(baos);
final Queue<Future<RecordMetadata>> ackQ = new LinkedList<>();
int recordCount = 0;
try (final RecordSetWriter writer = getWriterFactory().createWriter(getLogger(), writeSchema, out, attributes)) {
Record record;
while ((record = recordSet.next()) != null) {
baos.reset();
out.reset();
writer.write(record);
writer.flush();
recordCount++;
if (out.getBytesWritten() > maxMessageSize) {
throw new TokenTooLargeException("A record's size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
}
sendMessage(topic, baos.toByteArray(), ackQ);
}
if (out.getBytesWritten() > maxMessageSize) {
throw new TokenTooLargeException("A record's size exceeds the maximum allowed message size of " + maxMessageSize + " bytes.");
}
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
attributes.put("record.count", Integer.toString(recordCount));
}
if (recordCount == 0) {
if (sendZeroResults) {
sendMessage(topic, new byte[0], ackQ);
} else {
return WriteResult.EMPTY;
}
}
acknowledgeTransmission(ackQ);
return WriteResult.of(recordCount, attributes);
} catch (IOException ioe) {
throw ioe;
} catch (Exception e) {
throw new IOException("Failed to write metrics using record writer: " + e.getMessage(), e);
}
}
private void sendMessage(String topic, byte[] payload, final Queue<Future<RecordMetadata>> ackQ) throws IOException {
final ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, null, null, payload);
// Add the Future to the queue
ackQ.add(producer.send(record, (metadata, exception) -> {
if (exception != null) {
throw new KafkaSendException(exception);
}
}));
}
private void acknowledgeTransmission(final Queue<Future<RecordMetadata>> ackQ) throws IOException, ExecutionException {
try {
Future<RecordMetadata> ack;
while ((ack = ackQ.poll()) != null) {
ack.get(maxAckWaitMillis, TimeUnit.MILLISECONDS);
}
} catch (KafkaSendException kse) {
Throwable t = kse.getCause();
if (t instanceof IOException) {
throw (IOException) t;
} else {
throw new IOException(t);
}
} catch (final InterruptedException e) {
getLogger().warn("Interrupted while waiting for an acknowledgement from Kafka");
Thread.currentThread().interrupt();
} catch (final TimeoutException e) {
getLogger().warn("Timed out while waiting for an acknowledgement from Kafka");
}
}
@OnDisabled
public void stop() {
if (producer != null) {
producer.close(Duration.ofMillis(maxAckWaitMillis));
}
}
// this getter is intended explicitly for testing purposes
protected RecordSetWriterFactory getWriterFactory() {
return this.writerFactory;
}
protected Producer<byte[], byte[]> createProducer(Map<String, Object> kafkaProperties) {
return new KafkaProducer<>(kafkaProperties);
}
private static class KafkaSendException extends RuntimeException {
KafkaSendException(Throwable cause) {
super(cause);
}
}
}