| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.kafka; |
| |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Comparator; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.regex.Pattern; |
| |
| import org.apache.kafka.clients.producer.BufferExhaustedException; |
| import org.apache.kafka.clients.producer.Callback; |
| import org.apache.kafka.clients.producer.KafkaProducer; |
| import org.apache.kafka.clients.producer.Producer; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.kafka.clients.producer.RecordMetadata; |
| import org.apache.kafka.common.Node; |
| import org.apache.kafka.common.PartitionInfo; |
| import org.apache.kafka.common.serialization.ByteArraySerializer; |
| import org.apache.nifi.annotation.behavior.DynamicProperty; |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.TriggerWhenEmpty; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.AllowableValue; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.ValidationContext; |
| import org.apache.nifi.components.ValidationResult; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.processor.AbstractSessionFactoryProcessor; |
| import org.apache.nifi.processor.DataUnit; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.ProcessSessionFactory; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.io.InputStreamCallback; |
| import org.apache.nifi.processor.util.StandardValidators; |
| import org.apache.nifi.stream.io.BufferedInputStream; |
| import org.apache.nifi.stream.io.ByteArrayOutputStream; |
| import org.apache.nifi.stream.io.ByteCountingInputStream; |
| import org.apache.nifi.stream.io.StreamUtils; |
| import org.apache.nifi.stream.io.util.NonThreadSafeCircularBuffer; |
| import org.apache.nifi.util.LongHolder; |
| |
| @InputRequirement(Requirement.INPUT_REQUIRED) |
| @Tags({"Apache", "Kafka", "Put", "Send", "Message", "PubSub"}) |
| @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a " |
| + "user-specified delimiter, such as a new-line.") |
| @TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed |
| @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", |
| description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." |
| + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" |
| + " overriden with warning message describing the override." |
| + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") |
| public class PutKafka extends AbstractSessionFactoryProcessor { |
| |
| private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}"; |
| private static final String BROKER_REGEX = SINGLE_BROKER_REGEX + "(?:,\\s*" + SINGLE_BROKER_REGEX + ")*"; |
| |
| public static final AllowableValue DELIVERY_REPLICATED = new AllowableValue("all", "Guarantee Replicated Delivery", "FlowFile will be routed to" |
| + " failure unless the message is replicated to the appropriate number of Kafka Nodes according to the Topic configuration"); |
| public static final AllowableValue DELIVERY_ONE_NODE = new AllowableValue("1", "Guarantee Single Node Delivery", "FlowFile will be routed" |
| + " to success if the message is received by a single Kafka node, whether or not it is replicated. This is faster than" |
| + " <Guarantee Replicated Delivery> but can result in data loss if a Kafka node crashes"); |
| public static final AllowableValue DELIVERY_BEST_EFFORT = new AllowableValue("0", "Best Effort", "FlowFile will be routed to success after" |
| + " successfully writing the content to a Kafka node, without waiting for a response. This provides the best performance but may result" |
| + " in data loss."); |
| |
| |
| /** |
| * AllowableValue for sending messages to Kafka without compression |
| */ |
| public static final AllowableValue COMPRESSION_CODEC_NONE = new AllowableValue("none", "None", "Compression will not be used for any topic."); |
| |
| /** |
| * AllowableValue for sending messages to Kafka with GZIP compression |
| */ |
| public static final AllowableValue COMPRESSION_CODEC_GZIP = new AllowableValue("gzip", "GZIP", "Compress messages using GZIP"); |
| |
| /** |
| * AllowableValue for sending messages to Kafka with Snappy compression |
| */ |
| public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy", "Compress messages using Snappy"); |
| |
| static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin", |
| "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, the next Partition to Partition 2, and so on, wrapping as necessary."); |
| static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random", |
| "Messages will be assigned to random partitions."); |
| static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined", |
| "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be assigned to the same partition."); |
| |
| |
| public static final PropertyDescriptor SEED_BROKERS = new PropertyDescriptor.Builder() |
| .name("Known Brokers") |
| .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>") |
| .required(true) |
| .addValidator(StandardValidators.createRegexMatchingValidator(Pattern.compile(BROKER_REGEX))) |
| .expressionLanguageSupported(false) |
| .build(); |
| public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() |
| .name("Topic Name") |
| .description("The Kafka Topic of interest") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(true) |
| .build(); |
| static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder() |
| .name("Partition Strategy") |
| .description("Specifies how messages should be partitioned when sent to Kafka") |
| .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING) |
| .defaultValue(ROUND_ROBIN_PARTITIONING.getValue()) |
| .required(true) |
| .build(); |
| public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder() |
| .name("Partition") |
| .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages in the same FlowFile will be sent to the same partition. " |
| + "If a partition is specified but is not valid, then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.") |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .expressionLanguageSupported(true) |
| .required(false) |
| .build(); |
| public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder() |
| .name("Kafka Key") |
| .description("The Key to use for the Message") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(true) |
| .build(); |
| public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder() |
| .name("Delivery Guarantee") |
| .description("Specifies the requirement for guaranteeing that a message is sent to Kafka") |
| .required(true) |
| .expressionLanguageSupported(false) |
| .allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED) |
| .defaultValue(DELIVERY_BEST_EFFORT.getValue()) |
| .build(); |
| public static final PropertyDescriptor MESSAGE_DELIMITER = new PropertyDescriptor.Builder() |
| .name("Message Delimiter") |
| .description("Specifies the delimiter to use for splitting apart multiple messages within a single FlowFile. " |
| + "If not specified, the entire content of the FlowFile will be used as a single message. " |
| + "If specified, the contents of the FlowFile will be split on this delimiter and each section " |
| + "sent as a separate Kafka message. Note that if messages are delimited and some messages for a given FlowFile " |
| + "are transferred successfully while others are not, the messages will be split into individual FlowFiles, such that those " |
| + "messages that were successfully sent are routed to the 'success' relationship while other messages are sent to the 'failure' " |
| + "relationship.") |
| .required(false) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(true) |
| .build(); |
| public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder() |
| .name("Max Buffer Size") |
| .description("The maximum amount of data to buffer in memory before sending to Kafka") |
| .required(true) |
| .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .defaultValue("5 MB") |
| .build(); |
| static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder() |
| .name("Max Record Size") |
| .description("The maximum size that any individual record can be.") |
| .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) |
| .required(true) |
| .defaultValue("1 MB") |
| .build(); |
| public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder() |
| .name("Communications Timeout") |
| .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") |
| .required(true) |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .defaultValue("30 secs") |
| .build(); |
| public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() |
| .name("Client Name") |
| .description("Client Name to use when communicating with Kafka") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .build(); |
| public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder() |
| .name("Async Batch Size") |
| .displayName("Batch Size") |
| .description("The number of messages to send in one batch. The producer will wait until either this number of messages are ready" |
| + " to send or \"Queue Buffering Max Time\" is reached.") |
| .required(true) |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .defaultValue("200") |
| .build(); |
| public static final PropertyDescriptor QUEUE_BUFFERING_MAX = new PropertyDescriptor.Builder() |
| .name("Queue Buffering Max Time") |
| .description("Maximum time to buffer data before sending to Kafka. For example a setting of 100 ms" |
| + " will try to batch together 100 milliseconds' worth of messages to send at once. This will improve" |
| + " throughput but adds message delivery latency due to the buffering.") |
| .required(true) |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .defaultValue("5 secs") |
| .build(); |
| public static final PropertyDescriptor COMPRESSION_CODEC = new PropertyDescriptor.Builder() |
| .name("Compression Codec") |
| .description("This parameter allows you to specify the compression codec for all" |
| + " data generated by this producer.") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .allowableValues(COMPRESSION_CODEC_NONE, COMPRESSION_CODEC_GZIP, COMPRESSION_CODEC_SNAPPY) |
| .defaultValue(COMPRESSION_CODEC_NONE.getValue()) |
| .build(); |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder() |
| .name("success") |
| .description("Any FlowFile that is successfully sent to Kafka will be routed to this Relationship") |
| .build(); |
| public static final Relationship REL_FAILURE = new Relationship.Builder() |
| .name("failure") |
| .description("Any FlowFile that cannot be sent to Kafka will be routed to this Relationship") |
| .build(); |
| |
| private static final Pattern NUMBER_PATTERN = Pattern.compile("-?\\d+"); |
| private final BlockingQueue<FlowFileMessageBatch> completeBatches = new LinkedBlockingQueue<>(); |
| private final Set<FlowFileMessageBatch> activeBatches = Collections.synchronizedSet(new HashSet<FlowFileMessageBatch>()); |
| |
| private final ConcurrentMap<String, AtomicLong> partitionIndexMap = new ConcurrentHashMap<>(); |
| |
| private volatile Producer<byte[], byte[]> producer; |
| |
| private volatile ExecutorService executor; |
| private volatile long deadlockTimeout; |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| final PropertyDescriptor clientName = new PropertyDescriptor.Builder() |
| .fromPropertyDescriptor(CLIENT_NAME) |
| .defaultValue("NiFi-" + getIdentifier()) |
| .build(); |
| |
| final List<PropertyDescriptor> props = new ArrayList<>(); |
| props.add(SEED_BROKERS); |
| props.add(TOPIC); |
| props.add(PARTITION_STRATEGY); |
| props.add(PARTITION); |
| props.add(KEY); |
| props.add(DELIVERY_GUARANTEE); |
| props.add(MESSAGE_DELIMITER); |
| props.add(MAX_BUFFER_SIZE); |
| props.add(MAX_RECORD_SIZE); |
| props.add(TIMEOUT); |
| props.add(BATCH_NUM_MESSAGES); |
| props.add(QUEUE_BUFFERING_MAX); |
| props.add(COMPRESSION_CODEC); |
| props.add(clientName); |
| return props; |
| } |
| |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| final Set<Relationship> relationships = new HashSet<>(1); |
| relationships.add(REL_SUCCESS); |
| relationships.add(REL_FAILURE); |
| return relationships; |
| } |
| |
| @Override |
| protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) { |
| final List<ValidationResult> results = new ArrayList<>(); |
| |
| final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue(); |
| if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue()) && !validationContext.getProperty(PARTITION).isSet()) { |
| results.add(new ValidationResult.Builder().subject("Partition").valid(false).explanation( |
| "The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy").build()); |
| } |
| |
| return results; |
| } |
| |
| protected Producer<byte[], byte[]> getProducer() { |
| return producer; |
| } |
| |
| @OnStopped |
| public void cleanup() { |
| final Producer<byte[], byte[]> producer = getProducer(); |
| if (producer != null) { |
| producer.close(); |
| } |
| |
| for (final FlowFileMessageBatch batch : activeBatches) { |
| batch.cancelOrComplete(); |
| } |
| if (this.executor != null) { |
| this.executor.shutdown(); |
| try { |
| if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { |
| this.executor.shutdownNow(); |
| getLogger().warn("Executor did not stop in 30 sec. Terminated."); |
| } |
| this.executor = null; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| @OnScheduled |
| public void createProducer(final ProcessContext context) { |
| this.deadlockTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; |
| if (this.executor == null || this.executor.isShutdown()) { |
| this.executor = Executors.newCachedThreadPool(); |
| } |
| producer = new KafkaProducer<byte[], byte[]>(createConfig(context), new ByteArraySerializer(), new ByteArraySerializer()); |
| } |
| |
| protected int getActiveMessageBatchCount() { |
| return activeBatches.size(); |
| } |
| |
| protected int getCompleteMessageBatchCount() { |
| return completeBatches.size(); |
| } |
| |
| protected Properties createConfig(final ProcessContext context) { |
| final String brokers = context.getProperty(SEED_BROKERS).getValue(); |
| |
| final Properties properties = new Properties(); |
| properties.setProperty("bootstrap.servers", brokers); |
| properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue()); |
| properties.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); |
| |
| final String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue()); |
| properties.setProperty("timeout.ms", timeout); |
| properties.setProperty("metadata.fetch.timeout.ms", timeout); |
| |
| properties.setProperty("batch.size", context.getProperty(BATCH_NUM_MESSAGES).getValue()); |
| properties.setProperty("max.request.size", String.valueOf(context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).longValue())); |
| |
| final long maxBufferSize = context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue(); |
| properties.setProperty("buffer.memory", String.valueOf(maxBufferSize)); |
| |
| final String compressionCodec = context.getProperty(COMPRESSION_CODEC).getValue(); |
| properties.setProperty("compression.type", compressionCodec); |
| |
| final Long queueBufferingMillis = context.getProperty(QUEUE_BUFFERING_MAX).asTimePeriod(TimeUnit.MILLISECONDS); |
| if (queueBufferingMillis != null) { |
| properties.setProperty("linger.ms", String.valueOf(queueBufferingMillis)); |
| } |
| |
| properties.setProperty("retries", "0"); |
| properties.setProperty("block.on.buffer.full", "false"); |
| |
| for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { |
| PropertyDescriptor descriptor = entry.getKey(); |
| if (descriptor.isDynamic()) { |
| if (properties.containsKey(descriptor.getName())) { |
| this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" |
| + properties.getProperty(descriptor.getName()) + "' with dynamically set value '" |
| + entry.getValue() + "'."); |
| } |
| properties.setProperty(descriptor.getName(), entry.getValue()); |
| } |
| } |
| |
| return properties; |
| } |
| |
| private Integer getPartition(final ProcessContext context, final FlowFile flowFile, final String topic) { |
| final long unnormalizedIndex; |
| |
| final String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue(); |
| if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) { |
| AtomicLong partitionIndex = partitionIndexMap.get(topic); |
| if (partitionIndex == null) { |
| partitionIndex = new AtomicLong(0L); |
| final AtomicLong existing = partitionIndexMap.putIfAbsent(topic, partitionIndex); |
| if (existing != null) { |
| partitionIndex = existing; |
| } |
| } |
| |
| unnormalizedIndex = partitionIndex.getAndIncrement(); |
| } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) { |
| return null; |
| } else { |
| if (context.getProperty(PARTITION).isSet()) { |
| final String partitionValue = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue(); |
| |
| if (NUMBER_PATTERN.matcher(partitionValue).matches()) { |
| // Subtract 1 because if the partition is "3" then we want to get index 2 into the List of partitions. |
| unnormalizedIndex = Long.parseLong(partitionValue) - 1; |
| } else { |
| unnormalizedIndex = partitionValue.hashCode(); |
| } |
| } else { |
| return null; |
| } |
| } |
| |
| final Producer<byte[], byte[]> producer = getProducer(); |
| final List<PartitionInfo> partitionInfos = producer.partitionsFor(topic); |
| final int partitionIdx = (int) (unnormalizedIndex % partitionInfos.size()); |
| return partitionInfos.get(partitionIdx).partition(); |
| } |
| |
| @Override |
| protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { |
| return new PropertyDescriptor.Builder() |
| .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") |
| .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) |
| .build(); |
| } |
| |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { |
| FlowFileMessageBatch batch; |
| while ((batch = completeBatches.poll()) != null) { |
| batch.completeSession(); |
| } |
| |
| final ProcessSession session = sessionFactory.createSession(); |
| final FlowFile flowFile = session.get(); |
| if (flowFile != null){ |
| Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| doOnTrigger(context, session, flowFile); |
| return null; |
| } |
| }); |
| try { |
| consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| consumptionFuture.cancel(true); |
| Thread.currentThread().interrupt(); |
| getLogger().warn("Interrupted while sending messages", e); |
| } catch (ExecutionException e) { |
| throw new IllegalStateException(e); |
| } catch (TimeoutException e) { |
| consumptionFuture.cancel(true); |
| getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while sending messages", e); |
| } |
| } else { |
| context.yield(); |
| } |
| } |
| |
| private void doOnTrigger(final ProcessContext context, ProcessSession session, final FlowFile flowFile) throws ProcessException { |
| final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue(); |
| final String key = context.getProperty(KEY).evaluateAttributeExpressions(flowFile).getValue(); |
| final byte[] keyBytes = key == null ? null : key.getBytes(StandardCharsets.UTF_8); |
| String delimiter = context.getProperty(MESSAGE_DELIMITER).evaluateAttributeExpressions(flowFile).getValue(); |
| if (delimiter != null) { |
| delimiter = delimiter.replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); |
| } |
| |
| final Producer<byte[], byte[]> producer = getProducer(); |
| |
| if (delimiter == null) { |
| // Send the entire FlowFile as a single message. |
| final byte[] value = new byte[(int) flowFile.getSize()]; |
| session.read(flowFile, new InputStreamCallback() { |
| @Override |
| public void process(final InputStream in) throws IOException { |
| StreamUtils.fillBuffer(in, value); |
| } |
| }); |
| |
| final Integer partition; |
| try { |
| partition = getPartition(context, flowFile, topic); |
| } catch (final Exception e) { |
| getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); |
| session.transfer(session.penalize(flowFile), REL_FAILURE); |
| session.commit(); |
| return; |
| } |
| |
| final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, value); |
| |
| final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); |
| messageBatch.setNumMessages(1); |
| activeBatches.add(messageBatch); |
| |
| try { |
| producer.send(producerRecord, new Callback() { |
| @Override |
| public void onCompletion(final RecordMetadata metadata, final Exception exception) { |
| if (exception == null) { |
| // record was successfully sent. |
| messageBatch.addSuccessfulRange(0L, flowFile.getSize(), metadata.offset()); |
| } else { |
| messageBatch.addFailedRange(0L, flowFile.getSize(), exception); |
| } |
| } |
| }); |
| } catch (final BufferExhaustedException bee) { |
| messageBatch.addFailedRange(0L, flowFile.getSize(), bee); |
| context.yield(); |
| return; |
| } |
| } else { |
| final byte[] delimiterBytes = delimiter.getBytes(StandardCharsets.UTF_8); |
| |
| // The NonThreadSafeCircularBuffer allows us to add a byte from the stream one at a time and see |
| // if it matches some pattern. We can use this to search for the delimiter as we read through |
| // the stream of bytes in the FlowFile |
| final NonThreadSafeCircularBuffer buffer = new NonThreadSafeCircularBuffer(delimiterBytes); |
| |
| final LongHolder messagesSent = new LongHolder(0L); |
| final FlowFileMessageBatch messageBatch = new FlowFileMessageBatch(session, flowFile, topic); |
| activeBatches.add(messageBatch); |
| |
| try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) { |
| session.read(flowFile, new InputStreamCallback() { |
| @Override |
| public void process(final InputStream rawIn) throws IOException { |
| byte[] data = null; // contents of a single message |
| |
| boolean streamFinished = false; |
| |
| int nextByte; |
| try (final InputStream bufferedIn = new BufferedInputStream(rawIn); |
| final ByteCountingInputStream in = new ByteCountingInputStream(bufferedIn)) { |
| |
| long messageStartOffset = in.getBytesConsumed(); |
| |
| // read until we're out of data. |
| while (!streamFinished) { |
| nextByte = in.read(); |
| |
| if (nextByte > -1) { |
| baos.write(nextByte); |
| } |
| |
| if (nextByte == -1) { |
| // we ran out of data. This message is complete. |
| data = baos.toByteArray(); |
| streamFinished = true; |
| } else if (buffer.addAndCompare((byte) nextByte)) { |
| // we matched our delimiter. This message is complete. We want all of the bytes from the |
| // underlying BAOS exception for the last 'delimiterBytes.length' bytes because we don't want |
| // the delimiter itself to be sent. |
| data = Arrays.copyOfRange(baos.getUnderlyingBuffer(), 0, baos.size() - delimiterBytes.length); |
| } |
| |
| if (data != null) { |
| final long messageEndOffset = in.getBytesConsumed(); |
| |
| // If the message has no data, ignore it. |
| if (data.length != 0) { |
| final Integer partition; |
| try { |
| partition = getPartition(context, flowFile, topic); |
| } catch (final Exception e) { |
| messageBatch.addFailedRange(messageStartOffset, messageEndOffset, e); |
| getLogger().error("Failed to obtain a partition for {} due to {}", new Object[] {flowFile, e}); |
| continue; |
| } |
| |
| |
| final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(topic, partition, keyBytes, data); |
| final long rangeStart = messageStartOffset; |
| |
| try { |
| producer.send(producerRecord, new Callback() { |
| @Override |
| public void onCompletion(final RecordMetadata metadata, final Exception exception) { |
| if (exception == null) { |
| // record was successfully sent. |
| messageBatch.addSuccessfulRange(rangeStart, messageEndOffset, metadata.offset()); |
| } else { |
| messageBatch.addFailedRange(rangeStart, messageEndOffset, exception); |
| } |
| } |
| }); |
| |
| messagesSent.incrementAndGet(); |
| } catch (final BufferExhaustedException bee) { |
| // Not enough room in the buffer. Add from the beginning of this message to end of FlowFile as a failed range |
| messageBatch.addFailedRange(messageStartOffset, flowFile.getSize(), bee); |
| context.yield(); |
| return; |
| } |
| |
| } |
| |
| // reset BAOS so that we can start a new message. |
| baos.reset(); |
| data = null; |
| messageStartOffset = in.getBytesConsumed(); |
| } |
| } |
| } |
| } |
| }); |
| |
| messageBatch.setNumMessages(messagesSent.get()); |
| } |
| } |
| } |
| |
| |
| private static class Range { |
| private final long start; |
| private final long end; |
| private final Long kafkaOffset; |
| |
| public Range(final long start, final long end, final Long kafkaOffset) { |
| this.start = start; |
| this.end = end; |
| this.kafkaOffset = kafkaOffset; |
| } |
| |
| public long getStart() { |
| return start; |
| } |
| |
| public long getEnd() { |
| return end; |
| } |
| |
| public Long getKafkaOffset() { |
| return kafkaOffset; |
| } |
| |
| @Override |
| public String toString() { |
| return "Range[" + start + "-" + end + "]"; |
| } |
| } |
| |
| private class FlowFileMessageBatch { |
| private final ProcessSession session; |
| private final FlowFile flowFile; |
| private final String topic; |
| private final long startTime = System.nanoTime(); |
| |
| private final List<Range> successfulRanges = new ArrayList<>(); |
| private final List<Range> failedRanges = new ArrayList<>(); |
| |
| private Exception lastFailureReason; |
| private long numMessages = -1L; |
| private long completeTime = 0L; |
| private boolean canceled = false; |
| |
| public FlowFileMessageBatch(final ProcessSession session, final FlowFile flowFile, final String topic) { |
| this.session = session; |
| this.flowFile = flowFile; |
| this.topic = topic; |
| } |
| |
| public synchronized void cancelOrComplete() { |
| if (isComplete()) { |
| completeSession(); |
| return; |
| } |
| |
| this.canceled = true; |
| |
| session.rollback(); |
| successfulRanges.clear(); |
| failedRanges.clear(); |
| } |
| |
| public synchronized void addSuccessfulRange(final long start, final long end, final long kafkaOffset) { |
| if (canceled) { |
| return; |
| } |
| |
| successfulRanges.add(new Range(start, end, kafkaOffset)); |
| |
| if (isComplete()) { |
| activeBatches.remove(this); |
| completeBatches.add(this); |
| completeTime = System.nanoTime(); |
| } |
| } |
| |
| public synchronized void addFailedRange(final long start, final long end, final Exception e) { |
| if (canceled) { |
| return; |
| } |
| |
| failedRanges.add(new Range(start, end, null)); |
| lastFailureReason = e; |
| |
| if (isComplete()) { |
| activeBatches.remove(this); |
| completeBatches.add(this); |
| completeTime = System.nanoTime(); |
| } |
| } |
| |
| private boolean isComplete() { |
| return !canceled && (numMessages > -1) && (successfulRanges.size() + failedRanges.size() >= numMessages); |
| } |
| |
| public synchronized void setNumMessages(final long msgCount) { |
| this.numMessages = msgCount; |
| |
| if (isComplete()) { |
| activeBatches.remove(this); |
| completeBatches.add(this); |
| completeTime = System.nanoTime(); |
| } |
| } |
| |
| private Long getMin(final Long a, final Long b) { |
| if (a == null && b == null) { |
| return null; |
| } |
| |
| if (a == null) { |
| return b; |
| } |
| |
| if (b == null) { |
| return a; |
| } |
| |
| return Math.min(a, b); |
| } |
| |
| private Long getMax(final Long a, final Long b) { |
| if (a == null && b == null) { |
| return null; |
| } |
| |
| if (a == null) { |
| return b; |
| } |
| |
| if (b == null) { |
| return a; |
| } |
| |
| return Math.max(a, b); |
| } |
| |
| private void transferRanges(final List<Range> ranges, final Relationship relationship) { |
| Collections.sort(ranges, new Comparator<Range>() { |
| @Override |
| public int compare(final Range o1, final Range o2) { |
| return Long.compare(o1.getStart(), o2.getStart()); |
| } |
| }); |
| |
| for (int i = 0; i < ranges.size(); i++) { |
| Range range = ranges.get(i); |
| int count = 1; |
| Long smallestKafkaOffset = range.getKafkaOffset(); |
| Long largestKafkaOffset = range.getKafkaOffset(); |
| |
| while (i + 1 < ranges.size()) { |
| // Check if the next range in the List continues where this one left off. |
| final Range nextRange = ranges.get(i + 1); |
| |
| if (nextRange.getStart() == range.getEnd()) { |
| // We have two ranges in a row that are contiguous; combine them into a single Range. |
| range = new Range(range.getStart(), nextRange.getEnd(), null); |
| |
| smallestKafkaOffset = getMin(smallestKafkaOffset, nextRange.getKafkaOffset()); |
| largestKafkaOffset = getMax(largestKafkaOffset, nextRange.getKafkaOffset()); |
| count++; |
| i++; |
| } else { |
| break; |
| } |
| } |
| |
| // Create a FlowFile for this range. |
| FlowFile child = session.clone(flowFile, range.getStart(), range.getEnd() - range.getStart()); |
| if (relationship == REL_SUCCESS) { |
| session.getProvenanceReporter().send(child, getTransitUri(), "Sent " + count + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); |
| session.transfer(child, relationship); |
| } else { |
| session.transfer(session.penalize(child), relationship); |
| } |
| } |
| } |
| |
| private String getTransitUri() { |
| final List<PartitionInfo> partitions = getProducer().partitionsFor(topic); |
| if (partitions.isEmpty()) { |
| return "kafka://unknown-host" + "/topics/" + topic; |
| } |
| |
| final PartitionInfo info = partitions.get(0); |
| final Node leader = info.leader(); |
| final String host = leader.host(); |
| final int port = leader.port(); |
| |
| return "kafka://" + host + ":" + port + "/topics/" + topic; |
| } |
| |
| public synchronized void completeSession() { |
| if (canceled) { |
| return; |
| } |
| |
| if (successfulRanges.isEmpty() && failedRanges.isEmpty()) { |
| getLogger().info("Completed processing {} but sent 0 FlowFiles to Kafka", new Object[] {flowFile}); |
| session.transfer(flowFile, REL_SUCCESS); |
| session.commit(); |
| return; |
| } |
| |
| if (successfulRanges.isEmpty()) { |
| getLogger().error("Failed to send {} to Kafka; routing to 'failure'; last failure reason reported was {};", new Object[] {flowFile, lastFailureReason}); |
| session.transfer(session.penalize(flowFile), REL_FAILURE); |
| session.commit(); |
| return; |
| } |
| |
| if (failedRanges.isEmpty()) { |
| final long transferMillis = TimeUnit.NANOSECONDS.toMillis(completeTime - startTime); |
| |
| if (successfulRanges.size() == 1) { |
| final Long kafkaOffset = successfulRanges.get(0).getKafkaOffset(); |
| final String msg = "Sent 1 message" + ((kafkaOffset == null) ? "" : ("; Kafka offset = " + kafkaOffset)); |
| session.getProvenanceReporter().send(flowFile, getTransitUri(), msg); |
| } else { |
| long smallestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); |
| long largestKafkaOffset = successfulRanges.get(0).getKafkaOffset(); |
| |
| for (final Range range : successfulRanges) { |
| smallestKafkaOffset = Math.min(smallestKafkaOffset, range.getKafkaOffset()); |
| largestKafkaOffset = Math.max(largestKafkaOffset, range.getKafkaOffset()); |
| } |
| |
| session.getProvenanceReporter().send(flowFile, getTransitUri(), |
| "Sent " + successfulRanges.size() + " messages; Kafka offsets range from " + smallestKafkaOffset + " to " + largestKafkaOffset); |
| } |
| |
| session.transfer(flowFile, REL_SUCCESS); |
| getLogger().info("Successfully sent {} messages to Kafka for {} in {} millis", new Object[] {successfulRanges.size(), flowFile, transferMillis}); |
| session.commit(); |
| return; |
| } |
| |
| // At this point, the successful ranges is not empty and the failed ranges is not empty. This indicates that some messages made their way to Kafka |
| // successfully and some failed. We will address this by splitting apart the source FlowFile into children and sending the successful messages to 'success' |
| // and the failed messages to 'failure'. |
| transferRanges(successfulRanges, REL_SUCCESS); |
| transferRanges(failedRanges, REL_FAILURE); |
| session.remove(flowFile); |
| getLogger().error("Successfully sent {} messages to Kafka but failed to send {} messages; the last error received was {}", |
| new Object[] {successfulRanges.size(), failedRanges.size(), lastFailureReason}); |
| session.commit(); |
| } |
| } |
| } |