blob: 1f55858e8e58a57958b025d0a4bf124ce1f2d33a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.processors.kafka.pubsub;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.FlowFileFilters;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_CONSUMER_GROUP_ID;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_CONSUMER_OFFSETS_COMMITTED;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_LEADER_EPOCH;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_MAX_OFFSET;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_OFFSET;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_PARTITION;
import static org.apache.nifi.kafka.shared.attribute.KafkaFlowFileAttribute.KAFKA_TOPIC;
public class PublishKafkaUtil {
/**
* Polls for a batch of FlowFiles that should be published to Kafka within a single transaction
* @param session the process session to poll from
* @return the FlowFiles that should be sent as a single transaction
*/
public static List<FlowFile> pollFlowFiles(final ProcessSession session) {
final List<FlowFile> initialFlowFiles = session.get(FlowFileFilters.newSizeBasedFilter(1, DataUnit.MB, 500));
if (initialFlowFiles.isEmpty()) {
return initialFlowFiles;
}
// Check if any of the FlowFiles indicate that the consumer offsets have yet to be committed.
boolean offsetsCommitted = true;
for (final FlowFile flowFile : initialFlowFiles) {
if ("false".equals(flowFile.getAttribute(KAFKA_CONSUMER_OFFSETS_COMMITTED))) {
offsetsCommitted = false;
break;
}
}
if (offsetsCommitted) {
return initialFlowFiles;
}
// If we need to commit consumer offsets, it is important that we retrieve all FlowFiles that may be available. Otherwise, we could
// have a situation in which there are 2 FlowFiles for Topic MyTopic and Partition 1. The first FlowFile may have an offset of 100,000
// while the second has an offset of 98,000. If we gather only the first, we could commit 100,000 offset before processing offset 98,000.
// To avoid that, we consume all FlowFiles in the queue. It's important also that all FlowFiles that have been consumed from Kafka are made
// available in the queue. This can be done by using a ProcessGroup with Batch Output, as described in the additionalDetails of the Kafka Processors.
return pollAllFlowFiles(session, initialFlowFiles);
}
private static List<FlowFile> pollAllFlowFiles(final ProcessSession session, final List<FlowFile> initialFlowFiles) {
final List<FlowFile> polled = new ArrayList<>(initialFlowFiles);
while (true) {
final List<FlowFile> flowFiles = session.get(10_000);
if (flowFiles.isEmpty()) {
break;
}
polled.addAll(flowFiles);
}
return polled;
}
/**
* Adds the appropriate Kafka Consumer offsets to the active transaction of the given publisher lease
* @param lease the lease that has an open transaction
* @param flowFile the FlowFile whose offsets should be acknowledged
* @param logger the processor's logger
*/
public static void addConsumerOffsets(final PublisherLease lease, final FlowFile flowFile, final ComponentLog logger) {
final String topic = flowFile.getAttribute(KAFKA_TOPIC);
final Long partition = getNumericAttribute(flowFile, KAFKA_PARTITION, logger);
Long maxOffset = getNumericAttribute(flowFile, KAFKA_MAX_OFFSET, logger);
if (maxOffset == null) {
maxOffset = getNumericAttribute(flowFile, KAFKA_OFFSET, logger);
}
final Long epoch = getNumericAttribute(flowFile, KAFKA_LEADER_EPOCH, logger);
final String consumerGroupId = flowFile.getAttribute(KAFKA_CONSUMER_GROUP_ID);
if (topic == null || partition == null || maxOffset == null || consumerGroupId == null) {
logger.warn("Cannot commit consumer offsets because at least one of the following FlowFile attributes is missing from {}: {}", flowFile,
Arrays.asList(KAFKA_TOPIC, KAFKA_PARTITION, KAFKA_MAX_OFFSET + " (or " + KAFKA_OFFSET + ")", KAFKA_LEADER_EPOCH, KAFKA_CONSUMER_GROUP_ID));
return;
}
lease.ackConsumerOffsets(topic, partition.intValue(), maxOffset, epoch == null ? null : epoch.intValue(), consumerGroupId);
}
private static Long getNumericAttribute(final FlowFile flowFile, final String attributeName, final ComponentLog logger) {
final String attributeValue = flowFile.getAttribute(attributeName);
if (attributeValue == null) {
return null;
}
try {
return Long.parseLong(attributeValue);
} catch (final NumberFormatException nfe) {
logger.warn("Expected a numeric value for attribute '{}' but found non-numeric value for {}", attributeName, flowFile);
return null;
}
}
}