package org.apache.nifi.kafka.connect;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.header.ConnectHeaders;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.stateless.flow.DataflowTrigger;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.TriggerResult;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
public class StatelessNiFiSourceTask extends SourceTask {
public static final String STATE_MAP_KEY = "task.index";
private static final Logger logger = LoggerFactory.getLogger(StatelessNiFiSourceTask.class);
private StatelessDataflow dataflow;
private String outputPortName;
private String topicName;
private String topicNameAttribute;
private TriggerResult triggerResult;
private String keyAttributeName;
private Pattern headerAttributeNamePattern;
private long timeoutMillis;
private String dataflowName;
private long failureYieldExpiration = 0L;
private final Map<String, String> clusterStatePartitionMap = Collections.singletonMap(STATE_MAP_KEY, "CLUSTER");
private Map<String, String> localStatePartitionMap = new HashMap<>();
private final AtomicLong unacknowledgedRecords = new AtomicLong(0L);
public String version() {
return StatelessKafkaConnectorUtil.getVersion();
public void start(final Map<String, String> properties) {"Starting Source Task with properties {}", StatelessKafkaConnectorUtil.getLoggableProperties(properties));
final String timeout = properties.getOrDefault(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT);
timeoutMillis = (long) FormatUtils.getPreciseTimeDuration(timeout, TimeUnit.MILLISECONDS);
topicName = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME);
topicNameAttribute = properties.get(StatelessNiFiSourceConnector.TOPIC_NAME_ATTRIBUTE);
keyAttributeName = properties.get(StatelessNiFiSourceConnector.KEY_ATTRIBUTE);
if (topicName == null && topicNameAttribute == null) {
throw new ConfigException("Either the or configuration must be specified");
final String headerRegex = properties.get(StatelessNiFiSourceConnector.HEADER_REGEX);
headerAttributeNamePattern = headerRegex == null ? null : Pattern.compile(headerRegex);
dataflow = StatelessKafkaConnectorUtil.createDataflow(properties);
// Determine the name of the Output Port to retrieve data from
dataflowName = properties.get(StatelessKafkaConnectorUtil.DATAFLOW_NAME);
outputPortName = properties.get(StatelessNiFiSourceConnector.OUTPUT_PORT_NAME);
if (outputPortName == null) {
final Set<String> outputPorts = dataflow.getOutputPortNames();
if (outputPorts.isEmpty()) {
throw new ConfigException("The dataflow specified for <" + dataflowName + "> does not have an Output Port at the root level. Dataflows used for a Kafka Connect Source Task "
+ "must have at least one Output Port at the root level.");
if (outputPorts.size() > 1) {
throw new ConfigException("The dataflow specified for <" + dataflowName + "> has multiple Output Ports at the root level (" + outputPorts.toString()
+ "). The " + StatelessNiFiSourceConnector.OUTPUT_PORT_NAME + " property must be set to indicate which of these Ports Kafka records should be retrieved from.");
outputPortName = outputPorts.iterator().next();
final String taskIndex = properties.get(STATE_MAP_KEY);
localStatePartitionMap.put(STATE_MAP_KEY, taskIndex);
final Map<String, String> localStateMap = (Map<String, String>) (Map) context.offsetStorageReader().offset(localStatePartitionMap);
final Map<String, String> clusterStateMap = (Map<String, String>) (Map) context.offsetStorageReader().offset(clusterStatePartitionMap);
dataflow.setComponentStates(localStateMap, Scope.LOCAL);
dataflow.setComponentStates(clusterStateMap, Scope.CLUSTER);
public List<SourceRecord> poll() throws InterruptedException {
final long yieldExpiration = Math.max(failureYieldExpiration, dataflow.getSourceYieldExpiration());
final long now = System.currentTimeMillis();
final long yieldMillis = yieldExpiration - now;
if (yieldMillis > 0) {
// If source component has yielded, we don't want to trigger it again until the yield expiration expires, in order to avoid
// overloading the source system.
logger.debug("Source of NiFi flow has opted to yield for {} milliseconds. Will pause dataflow until that time period has elapsed.", yieldMillis);
return null;
if (unacknowledgedRecords.get() > 0) {
// If we have records that haven't yet been acknowledged, we want to return null instead of running.
// We need to wait for the last results to complete before triggering the dataflow again.
return null;
logger.debug("Triggering dataflow");
final long start = System.nanoTime();
final DataflowTrigger trigger = dataflow.trigger();
final Optional<TriggerResult> resultOptional = trigger.getResult(timeoutMillis, TimeUnit.MILLISECONDS);
if (!resultOptional.isPresent()) {
logger.warn("Dataflow timed out after waiting {} milliseconds. Will cancel the execution.", timeoutMillis);
return null;
triggerResult = resultOptional.get();
if (!triggerResult.isSuccessful()) {
logger.error("Dataflow {} failed to execute properly", dataflowName, triggerResult.getFailureCause().orElse(null));
failureYieldExpiration = System.currentTimeMillis() + 1000L; // delay next execution for 1 second to avoid constnatly failing and utilization huge amounts of resources
return null;
// Verify that data was only transferred to the expected Output Port
verifyFlowFilesTransferredToProperPort(triggerResult, outputPortName, trigger);
final long nanos = System.nanoTime() - start;
final List<FlowFile> outputFlowFiles = triggerResult.getOutputFlowFiles(outputPortName);
final List<SourceRecord> sourceRecords = new ArrayList<>(outputFlowFiles.size());
Map<String, ?> componentState = dataflow.getComponentStates(Scope.CLUSTER);
final Map<String, ?> partitionMap;
if (componentState == null || componentState.isEmpty()) {
componentState = dataflow.getComponentStates(Scope.LOCAL);
partitionMap = localStatePartitionMap;
} else {
partitionMap = clusterStatePartitionMap;
for (final FlowFile flowFile : outputFlowFiles) {
final byte[] contents = triggerResult.readContent(flowFile);
final SourceRecord sourceRecord = createSourceRecord(flowFile, contents, componentState, partitionMap);
logger.debug("Returning {} records from poll() method (took {} nanos to run dataflow)", sourceRecords.size(), nanos);
// If there is at least one record, we don't want to acknowledge the trigger result until Kafka has committed the Record.
// This is handled by incrementing the unacknkowledgedRecords count. Then, Kafka Connect will call this.commitRecords().
// The commitRecords() call will then decrement the number of unacknowledgedRecords, and when all unacknowledged Records have been
// acknowledged, it will acknowledge the trigger result.
// However, if there are no records, this.commitRecords() will never be called. As a result, we need toe nsure that we acknowledge the trigger result here.
if (sourceRecords.size() > 0) {
} else {
return sourceRecords;
private void verifyFlowFilesTransferredToProperPort(final TriggerResult triggerResult, final String expectedPortName, final DataflowTrigger trigger) {
final Map<String, List<FlowFile>> flowFileOutputMap = triggerResult.getOutputFlowFiles();
for (final Map.Entry<String, List<FlowFile>> entry : flowFileOutputMap.entrySet()) {
final String portName = entry.getKey();
final List<FlowFile> flowFiles = entry.getValue();
if (!flowFiles.isEmpty() && !expectedPortName.equals(portName)) {
logger.error("Dataflow transferred FlowFiles to Port {} but was expecting data to be transferred to {}. Rolling back session.", portName, expectedPortName);
throw new RetriableException("Data was transferred to unexpected port. Expected: " + expectedPortName + ". Actual: " + portName);
private SourceRecord createSourceRecord(final FlowFile flowFile, final byte[] contents, final Map<String, ?> componentState, final Map<String, ?> partitionMap) {
final Schema valueSchema = (contents == null || contents.length == 0) ? null : Schema.BYTES_SCHEMA;
// Kafka Connect currently gives us no way to determine the number of partitions that a given topic has.
// Therefore, we have no way to partition based on an attribute or anything like that, unless we left it up to
// the dataflow developer to know how many partitions exist a priori and explicitly set an attribute in the range of 0..max,
// but that is not a great solution. Kafka does support using a Simple Message Transform to change the partition of a given
// record, so that may be the best solution.
final Integer topicPartition = null;
final String topic;
if (topicNameAttribute == null) {
topic = topicName;
} else {
final String attributeValue = flowFile.getAttribute(topicNameAttribute);
topic = attributeValue == null ? topicName : attributeValue;
final ConnectHeaders headers = new ConnectHeaders();
if (headerAttributeNamePattern != null) {
for (final Map.Entry<String, String> entry : flowFile.getAttributes().entrySet()) {
if (headerAttributeNamePattern.matcher(entry.getKey()).matches()) {
final String headerName = entry.getKey();
final String headerValue = entry.getValue();
headers.add(headerName, headerValue, Schema.STRING_SCHEMA);
final Object key = keyAttributeName == null ? null : flowFile.getAttribute(keyAttributeName);
final Schema keySchema = key == null ? null : Schema.STRING_SCHEMA;
final Long timestamp = System.currentTimeMillis();
return new SourceRecord(partitionMap, componentState, topic, topicPartition, keySchema, key, valueSchema, contents, timestamp, headers);
public void commitRecord(final SourceRecord record, final RecordMetadata metadata) throws InterruptedException {
super.commitRecord(record, metadata);
final long unacked = unacknowledgedRecords.decrementAndGet();
logger.debug("SourceRecord {} committed; number of unacknowledged FlowFiles is now {}", record, unacked);
if (unacked < 1) {
logger.debug("Acknowledging trigger result");
public void stop() {"Shutting down Source Task for " + dataflowName);
if (dataflow != null) {
// Available for testing
protected StatelessDataflow getDataflow() {
return dataflow;