| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.nifi.processors.kafka; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.nifi.annotation.behavior.DynamicProperty; |
| import org.apache.nifi.annotation.behavior.InputRequirement; |
| import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; |
| import org.apache.nifi.annotation.behavior.SupportsBatching; |
| import org.apache.nifi.annotation.behavior.WritesAttribute; |
| import org.apache.nifi.annotation.behavior.WritesAttributes; |
| import org.apache.nifi.annotation.documentation.CapabilityDescription; |
| import org.apache.nifi.annotation.documentation.Tags; |
| import org.apache.nifi.annotation.lifecycle.OnScheduled; |
| import org.apache.nifi.annotation.lifecycle.OnStopped; |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.Validator; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.processor.AbstractProcessor; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.processor.Relationship; |
| import org.apache.nifi.processor.exception.ProcessException; |
| import org.apache.nifi.processor.io.OutputStreamCallback; |
| import org.apache.nifi.processor.util.StandardValidators; |
| |
| import kafka.consumer.Consumer; |
| import kafka.consumer.ConsumerConfig; |
| import kafka.consumer.ConsumerIterator; |
| import kafka.consumer.ConsumerTimeoutException; |
| import kafka.consumer.KafkaStream; |
| import kafka.javaapi.consumer.ConsumerConnector; |
| import kafka.message.MessageAndMetadata; |
| |
| @SupportsBatching |
| @InputRequirement(Requirement.INPUT_FORBIDDEN) |
| @CapabilityDescription("Fetches messages from Apache Kafka") |
| @Tags({"Kafka", "Apache", "Get", "Ingest", "Ingress", "Topic", "PubSub"}) |
| @WritesAttributes({ |
| @WritesAttribute(attribute = "kafka.topic", description = "The name of the Kafka Topic from which the message was received"), |
| @WritesAttribute(attribute = "kafka.key", description = "The key of the Kafka message, if it exists and batch size is 1. If" |
| + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"), |
| @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"), |
| @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")}) |
| @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.", |
| description = "These properties will be added on the Kafka configuration after loading any provided configuration properties." |
| + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be" |
| + " overriden with warning message describing the override." |
| + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.") |
| public class GetKafka extends AbstractProcessor { |
| |
| public static final String SMALLEST = "smallest"; |
| public static final String LARGEST = "largest"; |
| |
| public static final PropertyDescriptor ZOOKEEPER_CONNECTION_STRING = new PropertyDescriptor.Builder() |
| .name("ZooKeeper Connection String") |
| .description("The Connection String to use in order to connect to ZooKeeper. This is often a comma-separated list of <host>:<port>" |
| + " combinations. For example, host1:2181,host2:2181,host3:2188") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .build(); |
| public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder() |
| .name("Topic Name") |
| .description("The Kafka Topic to pull messages from") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .build(); |
| public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder() |
| .name("Zookeeper Commit Frequency") |
| .description("Specifies how often to communicate with ZooKeeper to indicate which messages have been pulled. A longer time period will" |
| + " result in better overall performance but can result in more data duplication if a NiFi node is lost") |
| .required(true) |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .defaultValue("60 secs") |
| .build(); |
| public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder() |
| .name("ZooKeeper Communications Timeout") |
| .description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error") |
| .required(true) |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .defaultValue("30 secs") |
| .build(); |
| public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder() |
| .name("Kafka Communications Timeout") |
| .description("The amount of time to wait for a response from Kafka before determining that there is a communications error") |
| .required(true) |
| .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .defaultValue("30 secs") |
| .build(); |
| public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() |
| .name("Batch Size") |
| .description("Specifies the maximum number of messages to combine into a single FlowFile. These messages will be " |
| + "concatenated together with the <Message Demarcator> string placed between the content of each message. " |
| + "If the messages from Kafka should not be concatenated together, leave this value at 1.") |
| .required(true) |
| .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .defaultValue("1") |
| .build(); |
| public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder() |
| .name("Message Demarcator") |
| .description("Specifies the characters to use in order to demarcate multiple messages from Kafka. If the <Batch Size> " |
| + "property is set to 1, this value is ignored. Otherwise, for each two subsequent messages in the batch, " |
| + "this value will be placed in between them.") |
| .required(true) |
| .addValidator(Validator.VALID) // accept anything as a demarcator, including empty string |
| .expressionLanguageSupported(false) |
| .defaultValue("\\n") |
| .build(); |
| |
| public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder() |
| .name("Client Name") |
| .description("Client Name to use when communicating with Kafka") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .build(); |
| public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder() |
| .name("Group ID") |
| .description("A Group ID is used to identify consumers that are within the same consumer group") |
| .required(true) |
| .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) |
| .expressionLanguageSupported(false) |
| .build(); |
| |
| public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder() |
| .name("Auto Offset Reset") |
| .description("Automatically reset the offset to the smallest or largest offset available on the broker") |
| .required(true) |
| .allowableValues(SMALLEST, LARGEST) |
| .defaultValue(LARGEST) |
| .build(); |
| |
| public static final Relationship REL_SUCCESS = new Relationship.Builder() |
| .name("success") |
| .description("All FlowFiles that are created are routed to this relationship") |
| .build(); |
| |
| private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>(); |
| private volatile ConsumerConnector consumer; |
| |
| private final AtomicBoolean consumerStreamsReady = new AtomicBoolean(); |
| |
| private volatile long deadlockTimeout; |
| |
| private volatile ExecutorService executor; |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| final PropertyDescriptor clientNameWithDefault = new PropertyDescriptor.Builder() |
| .fromPropertyDescriptor(CLIENT_NAME) |
| .defaultValue("NiFi-" + getIdentifier()) |
| .build(); |
| final PropertyDescriptor groupIdWithDefault = new PropertyDescriptor.Builder() |
| .fromPropertyDescriptor(GROUP_ID) |
| .defaultValue(getIdentifier()) |
| .build(); |
| |
| final List<PropertyDescriptor> props = new ArrayList<>(); |
| props.add(ZOOKEEPER_CONNECTION_STRING); |
| props.add(TOPIC); |
| props.add(ZOOKEEPER_COMMIT_DELAY); |
| props.add(BATCH_SIZE); |
| props.add(MESSAGE_DEMARCATOR); |
| props.add(clientNameWithDefault); |
| props.add(groupIdWithDefault); |
| props.add(KAFKA_TIMEOUT); |
| props.add(ZOOKEEPER_TIMEOUT); |
| props.add(AUTO_OFFSET_RESET); |
| return props; |
| } |
| |
| @Override |
| public Set<Relationship> getRelationships() { |
| final Set<Relationship> relationships = new HashSet<>(1); |
| relationships.add(REL_SUCCESS); |
| return relationships; |
| } |
| |
| public void createConsumers(final ProcessContext context) { |
| final String topic = context.getProperty(TOPIC).getValue(); |
| |
| final Properties props = new Properties(); |
| props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue()); |
| props.setProperty("group.id", context.getProperty(GROUP_ID).getValue()); |
| props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue()); |
| props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS))); |
| props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue()); |
| props.setProperty("zookeeper.connection.timeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); |
| props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString()); |
| |
| for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) { |
| PropertyDescriptor descriptor = entry.getKey(); |
| if (descriptor.isDynamic()) { |
| if (props.containsKey(descriptor.getName())) { |
| this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '" |
| + props.getProperty(descriptor.getName()) + "' with dynamically set value '" + entry.getValue() + "'."); |
| } |
| props.setProperty(descriptor.getName(), entry.getValue()); |
| } |
| } |
| |
| /* |
| * Unless user sets it to some explicit value we are setting it to the |
| * lowest possible value of 1 millisecond to ensure the |
| * consumerStream.hasNext() doesn't block. See |
| * http://kafka.apache.org/documentation.html#configuration) as well as |
| * comment in 'catch ConsumerTimeoutException' in onTrigger() for more |
| * explanation as to the reasoning behind it. |
| */ |
| if (!props.containsKey("consumer.timeout.ms")) { |
| this.getLogger().info("Setting 'consumer.timeout.ms' to 1 milliseconds to avoid consumer" |
| + " block in the event when no events are present in Kafka topic. If you wish to change this value " |
| + " set it as dynamic property. If you wish to explicitly enable consumer block (at your own risk)" |
| + " set its value to -1."); |
| props.setProperty("consumer.timeout.ms", "1"); |
| } |
| |
| final ConsumerConfig consumerConfig = new ConsumerConfig(props); |
| consumer = Consumer.createJavaConsumerConnector(consumerConfig); |
| |
| final Map<String, Integer> topicCountMap = new HashMap<>(1); |
| |
| int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue()); |
| |
| int concurrentTaskToUse = context.getMaxConcurrentTasks(); |
| if (context.getMaxConcurrentTasks() < partitionCount){ |
| this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " |
| + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " |
| + "Consider making it equal to the amount of partition count for most efficient event consumption."); |
| } else if (context.getMaxConcurrentTasks() > partitionCount){ |
| concurrentTaskToUse = partitionCount; |
| this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for " |
| + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. " |
| + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events"); |
| } |
| |
| topicCountMap.put(topic, concurrentTaskToUse); |
| |
| final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); |
| final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic); |
| |
| this.streamIterators.clear(); |
| |
| for (final KafkaStream<byte[], byte[]> stream : streams) { |
| streamIterators.add(stream.iterator()); |
| } |
| this.consumerStreamsReady.set(true); |
| } |
| |
| @OnStopped |
| public void shutdownConsumer() { |
| this.consumerStreamsReady.set(false); |
| if (consumer != null) { |
| try { |
| consumer.commitOffsets(); |
| } finally { |
| consumer.shutdown(); |
| } |
| } |
| if (this.executor != null) { |
| this.executor.shutdown(); |
| try { |
| if (!this.executor.awaitTermination(30000, TimeUnit.MILLISECONDS)) { |
| this.executor.shutdownNow(); |
| getLogger().warn("Executor did not stop in 30 sec. Terminated."); |
| } |
| this.executor = null; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| @Override |
| protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) { |
| return new PropertyDescriptor.Builder() |
| .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.") |
| .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true) |
| .build(); |
| } |
| |
| @OnScheduled |
| public void schedule(ProcessContext context) { |
| this.deadlockTimeout = context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS) * 2; |
| if (this.executor == null || this.executor.isShutdown()) { |
| this.executor = Executors.newCachedThreadPool(); |
| } |
| } |
| |
| @Override |
| public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException { |
| /* |
| * Will ensure that consumer streams are ready upon the first invocation |
| * of onTrigger. Will be reset to 'false' in the event of exception |
| */ |
| synchronized (this.consumerStreamsReady) { |
| if (!this.consumerStreamsReady.get()) { |
| Future<Void> f = this.executor.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| createConsumers(context); |
| return null; |
| } |
| }); |
| try { |
| f.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| this.consumerStreamsReady.set(false); |
| f.cancel(true); |
| Thread.currentThread().interrupt(); |
| getLogger().warn("Interrupted while waiting to get connection", e); |
| } catch (ExecutionException e) { |
| throw new IllegalStateException(e); |
| } catch (TimeoutException e) { |
| this.consumerStreamsReady.set(false); |
| f.cancel(true); |
| getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while waiting to get connection", e); |
| } |
| } |
| } |
| //=== |
| if (this.consumerStreamsReady.get()) { |
| Future<Void> consumptionFuture = this.executor.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| ConsumerIterator<byte[], byte[]> iterator = getStreamIterator(); |
| if (iterator != null) { |
| consumeFromKafka(context, session, iterator); |
| } |
| return null; |
| } |
| }); |
| try { |
| consumptionFuture.get(this.deadlockTimeout, TimeUnit.MILLISECONDS); |
| } catch (InterruptedException e) { |
| this.consumerStreamsReady.set(false); |
| consumptionFuture.cancel(true); |
| Thread.currentThread().interrupt(); |
| getLogger().warn("Interrupted while consuming messages", e); |
| } catch (ExecutionException e) { |
| throw new IllegalStateException(e); |
| } catch (TimeoutException e) { |
| this.consumerStreamsReady.set(false); |
| consumptionFuture.cancel(true); |
| getLogger().warn("Timed out after " + this.deadlockTimeout + " milliseconds while consuming messages", e); |
| } |
| } |
| } |
| |
| protected ConsumerIterator<byte[], byte[]> getStreamIterator() { |
| return this.streamIterators.poll(); |
| } |
| |
| |
| private void consumeFromKafka(final ProcessContext context, final ProcessSession session, |
| ConsumerIterator<byte[], byte[]> iterator) throws ProcessException { |
| |
| final int batchSize = context.getProperty(BATCH_SIZE).asInteger(); |
| final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t"); |
| final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8); |
| final String topic = context.getProperty(TOPIC).getValue(); |
| |
| FlowFile flowFile = session.create(); |
| |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put("kafka.topic", topic); |
| final long start = System.nanoTime(); |
| int msgCount = 0; |
| |
| try { |
| for (; msgCount < batchSize && iterator.hasNext(); msgCount++) { |
| final MessageAndMetadata<byte[], byte[]> mam = iterator.next(); |
| |
| if (batchSize == 1) { |
| final byte[] key = mam.key(); |
| // the kafka.key, kafka.offset, and kafka.partition attributes are added only |
| // for a batch size of 1. |
| if (key != null) { |
| attributes.put("kafka.key", new String(key, StandardCharsets.UTF_8)); |
| } |
| |
| attributes.put("kafka.offset", String.valueOf(mam.offset())); |
| attributes.put("kafka.partition", String.valueOf(mam.partition())); |
| } |
| |
| // add the message to the FlowFile's contents |
| final boolean firstMessage = msgCount == 0; |
| flowFile = session.append(flowFile, new OutputStreamCallback() { |
| @Override |
| public void process(final OutputStream out) throws IOException { |
| if (!firstMessage) { |
| out.write(demarcatorBytes); |
| } |
| out.write(mam.message()); |
| } |
| }); |
| } |
| this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount); |
| } catch (ConsumerTimeoutException e) { |
| /* |
| * By default Kafka blocks indefinitely if topic is empty via |
| * stream.hasNext(). If 'consumer.timeout.ms' property is set (see |
| * http://kafka.apache.org/documentation.html#configuration) the |
| * hasNext() will fail with this exception. To this processor it |
| * simply means there are no messages and current task should exit |
| * in non-failure releasing the flow file if it was able to |
| * accumulate any events. |
| */ |
| this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount); |
| } catch (final Exception e) { |
| this.shutdownConsumer(); |
| getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e}); |
| if (flowFile != null) { |
| session.remove(flowFile); |
| } |
| } finally { |
| // Add the iterator back to the queue |
| if (iterator != null) { |
| streamIterators.offer(iterator); |
| } |
| } |
| } |
| |
| /** |
| * Will release flow file. Releasing of the flow file in the context of this |
| * operation implies the following: |
| * |
| * If Empty then remove from session and return |
| * If has something then transfer to REL_SUCCESS |
| */ |
| private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<String, String> attributes, long start, String topic, int msgCount){ |
| if (flowFile.getSize() == 0L) { |
| session.remove(flowFile); |
| } else { |
| flowFile = session.putAllAttributes(flowFile, attributes); |
| final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); |
| session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + msgCount + " Kafka messages", millis); |
| getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, msgCount, millis}); |
| session.transfer(flowFile, REL_SUCCESS); |
| } |
| } |
| } |