KAFKA INPUT OPERATOR

Introduction: About Kafka Input Operator

This is an input operator that consumes data from Kafka messaging system for further processing in Apex. Kafka Input Operator is an fault-tolerant and scalable Malhar Operator.

Why is it needed ?

Kafka is a pull-based and distributed publish subscribe messaging system, topics are partitioned and replicated across nodes. Kafka input operator is needed when you want to read data from multiple partitions of a Kafka topic in parallel in an Apex application.

AbstractKafkaInputOperator

This is the abstract implementation that serves as base class for consuming messages from Kafka messaging system. This class doesn’t have any ports.

AbstractKafkaInput.png

Configuration Parameters

Abstract Methods

void emitTuple(Message message): Abstract method that emits tuples extracted from Kafka message.

KafkaConsumer

This is an abstract implementation of Kafka consumer. It sends the fetch requests to the leading brokers of Kafka partitions. For each request, it receives the set of messages and stores them into the buffer which is ArrayBlockingQueue. SimpleKafkaConsumer which extends KafkaConsumer and serves the functionality of Simple Consumer API and HighLevelKafkaConsumer which extends KafkaConsumer and  serves the functionality of High Level Consumer API.

Pre-requisites

This operator referred the Kafka Consumer API of version 0.8.1.1. So, this operator will work with any 0.8.x and 0.7.x version of Apache Kafka.

Configuration Parameters

Abstract Methods

  1.  void commitOffset(): Commit the offsets at checkpoint.
  2. Map <KafkaPartition, Long> getCurrentOffsets(): Return the current offset status.
  3. resetPartitionsAndOffset(Set <KafkaPartition> partitionIds, Map <KafkaPartition, Long> startOffset): Reset the partitions with parittionIds and offsets with startOffset.

Configuration Parameters for SimpleKafkaConsumer

OffsetManager

This is an interface for offset management and is useful when consuming data from specified offsets. Updates the offsets for all the Kafka partitions periodically. Below is the code snippet:        

public interface OffsetManager
{
  public Map<KafkaPartition, Long> loadInitialOffsets();
  public void updateOffsets(Map<KafkaPartition, Long> offsetsOfPartitions);
}

Abstract Methods                 

Map <KafkaPartition, Long> loadInitialOffsets(): Specifies the initial offset for consuming messages; called at the activation stage.

updateOffsets(Map <KafkaPartition, Long> offsetsOfPartitions):  This method is called at every repartitionCheckInterval to update offsets.

Partitioning

The logical instance of the KafkaInputOperator acts as the Partitioner as well as a StatsListener. This is because the AbstractKafkaInputOperator implements both the com.datatorrent.api.Partitioner and com.datatorrent.api.StatsListener interfaces and provides an implementation of definePartitions(...) and processStats(...) which makes it auto-scalable.

Response processStats(BatchedOperatorStats stats)

The application master invokes this method on the logical instance with the stats (tuplesProcessedPS, bytesPS, etc.) of each partition. Re-partitioning happens based on whether any new Kafka partitions added for the topic or bytesPS and msgPS cross their respective upper bounds.

DefinePartitions

Based on the repartitionRequired field of the Response object which is returned by processStats(...) method, the application master invokes definePartitions(...) on the logical instance which is also the partitioner instance. Dynamic partition can be disabled by setting the parameter repartitionInterval value to a negative value.

AbstractSinglePortKafkaInputOperator

This class extends AbstractKafkaInputOperator and having single output port, will emit the messages through this port.

Ports

outputPort <T>: Tuples extracted from Kafka messages are emitted through this port.

Abstract Methods

T getTuple(Message msg) : Converts the Kafka message to tuple.

Concrete Classes

  1. KafkaSinglePortStringInputOperator : This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts string from Kafka message.

  2. KafkaSinglePortByteArrayInputOperator: This class extends AbstractSinglePortKafkaInputOperator and getTuple() method extracts byte array from Kafka message.

Application Example

This section builds an Apex application using Kafka input operator. Below is the code snippet:

@ApplicationAnnotation(name = "KafkaApp")
public class ExampleKafkaApplication implements StreamingApplication
{
@Override
public void populateDAG(DAG dag, Configuration entries)
{
  KafkaSinglePortByteArrayInputOperator input =  dag.addOperator("MessageReader", new KafkaSinglePortByteArrayInputOperator());

  ConsoleOutputOperator output = dag.addOperator("Output", new ConsoleOutputOperator());

  dag.addStream("MessageData", input.outputPort, output.input);
}
}

Below is the configuration for “test” Kafka topic name and “localhost:2181” is the zookeeper forum:

<property>
<name>dt.operator.MessageReader.prop.topic</name>
<value>test</value>
</property>

<property>
<name>dt.operator.KafkaInputOperator.prop.zookeeper</nam>
<value>localhost:2181</value>
</property>