Kafka Spout enables a Heron topology to consume data from Kafka cluster as input into the stream processing pipeline. Primarily, it is written using 2 APIs, the Heron API and Kafka Client API.
##Configuring the underlying Kafka Consumer
Each Kafka Spout instance creates its underlying Consumer instance via a factory interface KafkaConsumerFactory that is passed in as one of the constructor arguments.
The simplest way is to use the provided DefaultKafkaConsumerFactory. It takes a Map<String, Object>
as its only input, which should contain all the user configured properties as instituted by ConsumerConfig
Note: enable.auto.commit
is always set to false in DefaultKafkaConsumerFactory
because the Kafka Spout needs to manually manage the committing of offset. Any custom implementation of KafkaConsumerFactory
should adhere to the same thing
Map<String, Object> kafkaConsumerConfig = new HashMap<>(); //connect to Kafka broker at localhost:9092 kafkaConsumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); //group ID of the consumer group kafkaConsumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka-spout"); //key and value serializer kafkaConsumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); kafkaConsumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumerFactory<String, String> kafkaConsumerFactory = new DefaultKafkaConsumerFactory<>(kafkaConsumerConfig);
##Subscribe to topic(s)
The Kafka Spout instance can be configured to subscribe either a collection of topics by specifying the list of topic name strings in Collection<String>
, or it can take an implementation of TopicPatternProvider to provide a regular expression to match all the topics that it wants to subscribe to. There is a DefaultTopicPatternProvider to convert a regex string to a pattern.
//subscribe to specific named topic new KafkaSpout<>(kafkaConsumerFactory, Collections.singletonList("test-topic")) //subscribe to topics matching a pattern new KafkaSpout<>(kafkaConsumerFactory, new DefaultTopicPatternProvider("test-.*"));
##Convert ConsumerRecord to Tuple
The Spout delegates the conversion of each Kafka ConsumerRecord into an output tuple to the ConsumerRecordTransformer, the DefaultConsumerRecordTransformer is provided to simply convert the incoming record into a tuple with 2 fields: “key”, being the key of the record, and “value”, being the value of the record, and also defines the output stream to be the “default” stream.
User can create their own implementation of the ConsumerRecordTransformer
interface, and set it to the KafkaSpout
via setConsumerRecordTransformer method.
##Behavior in Different Topology Reliability Mode
ATMOST_ONCE
modeThe whole topology will not turn the acking
mechanism on. so, the KafkaSpout can afford to emit the tuple without any message id, and it also immediately commit the currently-read offset back to Kafka broker, and neither ack()
nor fail()
callback will be invoked. Therefore, “in-flight” tuple will just get lost in case the KafkaSpout instance is blown up or the topology is restarted. That's what ATMOST_ONCE
offers.
ATLEAST_ONCE
modeThe acking
mechanism is turned on topology-wise, so the KafkaSpout uses the ack registry
to keep tracking all the continuous acknowledgement ranges for each partition, while the failure registry
keeps tracking the lowest failed acknowledgement for each partition. When it comes to the time that the Kafka Consumer needs to poll the Kafka cluster for more records (because it's emitted everything it got from the previous poll), then the KafkaSpout reconciles as following for each partition that it is consuming:
failure registry
ack registry
then, it polls the Kafka cluster for next batch of records (i.e. from the lowest failed tuple if any)
So, it guarantees each tuple emitted by the KafkaSpout must be successfully processed across the whole topology at least once.
EFFECTIVE_ONCE
Not implemented yet