tree: ba5883928b35706ff18c6a532eac0cf7c40e6ff9 [path history] [tgz]
  1. src/
  2. logicalDAGKafkaExactlyOnce.png
  3. pom.xml
  4. README.md
examples/kafka/README.md

Kafka to HDFS example :

This sample application show how to read lines from a Kafka topic using the new (0.9) Kafka input operator and write them out to HDFS using rolling files with a bounded size.

The output files start out with a .tmp extension and get renamed when they reach the size bound. Additional operators to perform parsing, aggregation or filtering can be inserted into this pipeline as needed.

HDFS to Kafka example :

This sample application shows how to read lines from files in HDFS and write them out to a Kafka topic. Each line of the input file is considered a separate message. The topic name, the name of the directory that is monitored for input files, and other parameters are configurable in META_INF/properties-hdfs2kafka.xml.

Kafka exactly-once output example (Kafka 0.9 API)

This application verifies exactly-once semantics by writing a defined sequence of input data to two Kafka output operators -- one that guarantees those semantics and one that does not, each writing to a different topic. It deliberately causes the intermediate pass-through operator to fail causing it to be restarted and some tuples to be reprocessed. Then a KafkaInputOperator reads tuples from both topics to verify that the former topic has no duplicates but the latter does and writes a single line to a HDFS file with the verification results of the following form:

Duplicates: exactly-once: 0, at-least-once: 5

NOTE: KafkaInputOperator guarantees at-least-once semantics; in most scenarios it also yields exactly-once results, though in rare corner cases duplicate processing may occur. When this happens validation in this example will output wrong results.

DAG of this application:

logical DAG of application

Plain text representation of DAG:

sequenceGenerator --> passthrough ==> {kafkaExactlyOnceOutputOperator, kafkaOutputOperator(at-least-once)}

{kafkaTopicExactly, kafkaTopicAtLeast} --> validationToFile

Running the Application

Run Test

The application can be run in local mode which will write the validation file to target/validation.txt

Run on Cluster

To run the application on a cluster a running Kafka service is needed. A local Kafka single-node instance can easily be deployed (see kafka.apache.org/quickstart).

By default Kafka creates topics automatically when a message to a non-existing topic arrives. If disabled manually creation of the two topics is needed:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic exactly-once
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic at-least-once

Kafka topics should be cleared/deleted after every run in order for validation to work correctly

Enable topic deletion in Kafka's server.properties file:

delete.topic.enable=true

Delete topics:

bin/kafka-topics --zookeeper localhost:2181 --delete --topic exactly-once
bin/kafka-topics --zookeeper localhost:2181 --delete --topic at-least-once

Check if deletion was successful:

kafka-topics --list --zookeeper localhost:2181

properties:

By default the Kafka broker is set to ‘localhost:9092’. To set a different broker address change the value in properties.xml as well as in Application.java The directory for the validation file and the number of tuples to be generated can also be changed in properties.xml