Apache Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across nodes.
The Kafka input operator consumes data from the partitions of a Kafka topic for processing in Apex. The operator has the ability to automatically scale with the Kafka partitioning for high throughput. It is fault-tolerant (consumer offset checkpointing) and guarantees idempotency to allow exactly-once results in the downstream pipeline.
For more information about the operator design see this presentation and for processing guarantees this blog.
There are two separate implementations of the input operator, one built against Kafka 0.8 client and a newer version for the Kafka 0.9 consumer API that also works with MapR Streams. These reside in different packages and are described separately below.
Package: com.datatorrent.contrib.kafka
Maven artifact: malhar-contrib
This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn’t have any ports.
void emitTuple(Message message)
: Abstract method that emits tuples extracted from Kafka message.
This is an abstract implementation of Kafka consumer. It sends the fetch requests to the leading brokers of Kafka partitions. For each request, it receives the set of messages and stores them into the buffer which is ArrayBlockingQueue. SimpleKafkaConsumer which extends KafkaConsumer and serves the functionality of Simple Consumer API and HighLevelKafkaConsumer which extends KafkaConsumer and serves the functionality of High Level Consumer API.
This operator uses the Kafka 0.8.2.1 client consumer API and will work with 0.8.x and 0.7.x versions of Kafka broker.
This is an interface for offset management and is useful when consuming data from specified offsets. Updates the offsets for all the Kafka partitions periodically. Below is the code snippet:
public interface OffsetManager { public Map<KafkaPartition, Long> loadInitialOffsets(); public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions); }
Map <KafkaPartition, Long> loadInitialOffsets()
: Specifies the initial offset for consuming messages; called at the activation stage.
updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions)
: This method is called at every repartitionCheckInterval to update offsets.
The logical instance of the KafkaInputOperator acts as the Partitioner as well as a StatsListener. This is because the AbstractKafkaInputOperator implements both the com.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener interfaces and provides an implementation of definePartitions(...) and processStats(...) which makes it auto-scalable.
The application master invokes this method on the logical instance with the stats (tuplesProcessedPS, bytesPS, etc.) of each partition. Re-partitioning happens based on whether any new Kafka partitions added for the topic or bytesPS and msgPS cross their respective upper bounds.
Based on the repartitionRequired field of the Response object which is returned by processStats(...) method, the application master invokes definePartitions(...) on the logical instance which is also the partitioner instance. Dynamic partition can be disabled by setting the parameter repartitionInterval value to a negative value.
This class extends AbstractKafkaInputOperator to emit messages through single output port.
outputPort <T>
: Tuples extracted from Kafka messages are emitted through this port.
T getTuple(Message msg)
: Converts the Kafka message to tuple.
AbstractSinglePortKafkaInputOperator
, extracts string from Kafka message.AbstractSinglePortKafkaInputOperator
, extracts byte array from Kafka message.This section builds an Apex application using Kafka input operator. Below is the code snippet:
@ApplicationAnnotation(name = "KafkaApp") public class ExampleKafkaApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration entries) { KafkaSinglePortByteArrayInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator()); ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); dag.addStream("MessageData", input.outputPort, output.input); } }
Below is the configuration for “test” Kafka topic name and “localhost:2181” is the zookeeper forum:
<property> <name>dt.operator.MessageReader.prop.topic</name> <value>test</value> </property> <property> <name>dt.operator.KafkaInputOperator.prop.zookeeper</nam> <value>localhost:2181</value> </property>
Package: org.apache.apex.malhar.kafka
Maven Artifact: malhar-kafka
This version uses the new 0.9 version of consumer API and works with Kafka broker version 0.9 and later. The operator is fault-tolerant, scalable and supports input from multiple clusters and multiple topics in a single operator instance.
This operator requires version 0.9.0 or later of the Kafka Consumer API.
This abstract class doesn't have any ports.
clusters - String[]
topics - String[]
strategy - PartitionStrategy
Operator supports two types of partitioning strategies, ONE_TO_ONE
and ONE_TO_MANY
.
ONE_TO_ONE
: If this is enabled, the AppMaster creates one input operator instance per Kafka topic partition. So the number of Kafka topic partitions equals the number of operator instances. ONE_TO_MANY
: The AppMaster creates K = min(initialPartitionCount, N) Kafka input operator instances where N is the number of Kafka topic partitions. If K is less than N, the remaining topic partitions are assigned to the K operator instances in round-robin fashion. If K is less than initialPartitionCount, the AppMaster creates one input operator instance per Kafka topic partition. For example, if initialPartitionCount = 5 and number of Kafka partitions(N) = 2 then AppMaster creates 2 Kafka input operator instances. Default Value = PartitionStrategy.ONE_TO_ONE
.
initialPartitionCount - Integer
repartitionInterval - Long
repartitionCheckInterval - Long
maxTuplesPerWindow - Integer
MAX_VALUE
initialOffset - InitialOffset
EARLIEST
or LATEST
or APPLICATION_OR_EARLIEST
or APPLICATION_OR_LATEST
. LATEST
=> Consume new messages from latest offset in the topic. EARLIEST
=> Consume all messages available in the topic. APPLICATION_OR_EARLIEST
=> Consume messages from committed position from last run. If there is no committed offset, then start consuming from beginning. APPLICATION_OR_LATEST
=> Consumes messages from committed position from last run. If a committed offset is unavailable, then start consuming from latest position. Default value = InitialOffset.APPLICATION_OR_LATEST
metricsRefreshInterval - Long
consumerTimeout - Long
holdingBufferSize - Long
consumerProps - Properties
windowDataManager - WindowDataManager
FSWindowDataManager
, specifies that the operator will process the same set of messages in a window before and after a failure. This is important but it comes with higher cost because at the end of each window the operator needs to persist some state with respect to that window. Default value = WindowDataManager.NoopWindowDataManager
.void emitTuple(String cluster, ConsumerRecord<byte[], byte[]> message)
: Abstract method that emits tuples extracted from Kafka message.
This class extends from AbstractKafkaInputOperator and defines the getTuple()
method which extracts byte array from Kafka message.
outputPort <byte[]>
: Tuples extracted from Kafka messages are emitted through this port.
This section builds an Apex application using Kafka input operator. Below is the code snippet:
@ApplicationAnnotation(name = "KafkaApp") public class ExampleKafkaApplication implements StreamingApplication { @Override public void populateDAG(DAG dag, Configuration entries) { KafkaSinglePortInputOperator input = dag.addOperator("MessageReader", new KafkaSinglePortInputOperator()); ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator()); dag.addStream("MessageData", input.outputPort, output.input); } }
Below is the configuration for topic “test” and broker “localhost:9092”:
<property> <name>apex.operator.MessageReader.prop.topics</name> <value>test</value> </property> <property> <name>apex.operator.KafkaInputOperator.prop.clusters</name> <value>localhost:9092</value> </property>
Multiple topics can be specified as a comma-separated list; similarly, multiple clusters can be specified as a semicolon-separated list; for example:
<property> <name>apex.operator.MessageReader.prop.topics</name> <value>test1, test2</value> </property> <property> <name>apex.operator.KafkaInputOperator.prop.clusters</nam> <value>localhost:9092; localhost:9093; localhost:9094</value> </property>
A full example application project can be found here.
Kafka from 0.9.x onwards supports Authentication, Encryption and Authorization.
See here for more information.