* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.kafka.connect;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.Header;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.nifi.controller.queue.QueueSize;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class StatelessNiFiSinkTask extends SinkTask {
private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSinkTask.class);
private StatelessDataflow dataflow;
private String inputPortName;
private Set<String> failurePortNames;
private long timeoutMillis;
private Pattern headerNameRegex;
private String headerNamePrefix;
private QueueSize queueSize;
private String dataflowName;
private long backoffMillis = 0L;
public String version() {
return StatelessKafkaConnectorUtil.getVersion();
public void start(final Map<String, String> properties) {"Starting Sink Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));
final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
final String regex = properties.get(StatelessNiFiSinkConnector.HEADERS_AS_ATTRIBUTES_REGEX);
headerNameRegex = regex == null ? null : Pattern.compile(regex);
headerNamePrefix = properties.getOrDefault(StatelessNiFiSinkConnector.HEADER_ATTRIBUTE_NAME_PREFIX, "");
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
// Determine input port name. If input port is explicitly set, use the value given. Otherwise, if only one port exists, use that. Otherwise, throw ConfigException.
final String dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
inputPortName = properties.get(StatelessNiFiSinkConnector.INPUT_PORT_NAME);
if (inputPortName == null) {
final Set<String> inputPorts = dataflow.getInputPortNames();
if (inputPorts.isEmpty()) {
throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Input Port at the root level. Dataflows used for a Kafka Connect Sink Task "
+ "must have at least one Input Port at the root level.");
if (inputPorts.size() > 1) {
throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Input Ports at the root level (" + inputPorts.toString()
+ "). The " + StatelessNiFiSinkConnector.INPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be sent to.");
inputPortName = inputPorts.iterator().next();
// Validate the input port
if (!dataflow.getInputPortNames().contains(inputPortName)) {
throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have Input Port with name <" + inputPortName + "> at the root level. Existing Input Port names are "
+ dataflow.getInputPortNames());
// Determine the failure Ports, if any are given.
final String failurePortList = properties.get(StatelessNiFiSinkConnector.FAILURE_PORTS);
if (failurePortList == null || failurePortList.trim().isEmpty()) {
failurePortNames = Collections.emptySet();
} else {
failurePortNames = new HashSet<>();
final String[] names = failurePortList.split(",");
for (final String name : names) {
final String trimmed = name.trim();
// Validate the failure ports
final Set<String> outputPortNames = dataflow.getOutputPortNames();
for (final String failurePortName : failurePortNames) {
if (!outputPortNames.contains(failurePortName)) {
throw new ConfigException("Dataflow was configured with a Failure Port of " + failurePortName
+ " but there is no Port with that name in the dataflow. Valid Port names are " + outputPortNames);
public void put(final Collection<SinkRecord> records) {
logger.debug("Enqueuing {} Kafka messages", records.size());
for (final SinkRecord record : records) {
final Map<String, String> attributes = createAttributes(record);
final byte[] contents = getContents(record.value());
queueSize = dataflow.enqueue(contents, attributes, inputPortName);
private void backoff() {
// If no backoff period has been set, set it to 1 second. Otherwise, double the amount of time to backoff, up to 10 seconds.
if (backoffMillis == 0L) {
backoffMillis = 1000L;
backoffMillis = Math.min(backoffMillis * 2, 10_000L);
private void resetBackoff() {
backoffMillis = 0L;
private synchronized void triggerDataflow() {
final long start = System.nanoTime();
while (dataflow.isFlowFileQueued()) {
final DataflowTrigger trigger = dataflow.trigger();
try {
final Optional<TriggerResult> resultOptional = trigger.getResult(timeoutMillis, TimeUnit.MILLISECONDS);
if (resultOptional.isPresent()) {
final TriggerResult result = resultOptional.get();
if (result.isSuccessful()) {
// Verify that data was only transferred to the expected Input Port
verifyOutputPortContents(trigger, result);
// Acknowledge the data so that the session can be committed
} else {
retry(trigger, "Dataflow " + dataflowName + " failed to execute properly", result.getFailureCause().orElse(null));
} else {
retry(trigger, "Timed out waiting for dataflow " + dataflowName + " to complete", null);
} catch (final InterruptedException e) {
throw new RuntimeException("Interrupted while waiting for dataflow to complete", e);
final long nanos = System.nanoTime() - start;
if (queueSize != null) {
logger.debug("Ran dataflow with {} messages ({}) in {} nanos", queueSize.getObjectCount(), FormatUtils.formatDataSize(queueSize.getByteCount()), nanos);
private void retry(final DataflowTrigger trigger, final String explanation, final Throwable cause) {
logger.error(explanation, cause);
// We don't want to keep running as fast as possible, as doing so may overwhelm a destination system that is already struggling.
// This is analogous to ProcessContext.yield() in NiFi parlance.
// We will throw a RetriableException, which will redeliver all messages. So we need to purge anything currently in the dataflow.
// Because a background thread may have triggered the dataflow, we need to note that the last trigger was unsuccessful so the subsequent
// call to either put() or flush() will throw a RetriableException. This will result in the data being redelivered/retried.
throw new RetriableException(explanation, cause);
private void verifyOutputPortContents(final DataflowTrigger trigger, final TriggerResult result) {
for (final String failurePort : failurePortNames) {
final List<FlowFile> flowFiles = result.getOutputFlowFiles(failurePort);
if (flowFiles != null && !flowFiles.isEmpty()) {
logger.error("Dataflow transferred FlowFiles to Port {}, which is configured as a Failure Port. Rolling back session.", failurePort);
throw new RetriableException("Data was transferred to Failure Port " + failurePort);
public void flush(final Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
private byte[] getContents(final Object value) {
if (value == null) {
return new byte[0];
if (value instanceof String) {
return ((String) value).getBytes(StandardCharsets.UTF_8);
if (value instanceof byte[]) {
return (byte[]) value;
throw new IllegalArgumentException("Unsupported message type: the Message value was " + value + " but was expected to be a byte array or a String");
private Map<String, String> createAttributes(final SinkRecord record) {
final Map<String, String> attributes = new HashMap<>();
attributes.put("kafka.topic", record.topic());
attributes.put("kafka.offset", String.valueOf(record.kafkaOffset()));
attributes.put("kafka.partition", String.valueOf(record.kafkaPartition()));
attributes.put("kafka.timestamp", String.valueOf(record.timestamp()));
final Object key = record.key();
if (key instanceof String) {
attributes.put("kafka.key", (String) key);
if (headerNameRegex != null) {
for (final Header header : record.headers()) {
if (headerNameRegex.matcher(header.key()).matches()) {
final String attributeName = headerNamePrefix + header.key();
final String attributeValue = String.valueOf(header.value());
attributes.put(attributeName, attributeValue);
return attributes;
public void stop() {"Shutting down Sink Task");
if (dataflow != null) {