blob: 8f31b19626783825f3818c53f3383777d14c56f8 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.examples;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.FencedInstanceIdException;
import org.apache.kafka.common.errors.ProducerFencedException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;
* A demo class for how to write a customized EOS app. It takes a consume-process-produce loop.
* Important configurations and APIs are commented.
public class ExactlyOnceMessageProcessor extends Thread {
private static final boolean READ_COMMITTED = true;
private final String inputTopic;
private final String outputTopic;
private final String transactionalId;
private final String groupInstanceId;
private final KafkaProducer<Integer, String> producer;
private final KafkaConsumer<Integer, String> consumer;
private final CountDownLatch latch;
public ExactlyOnceMessageProcessor(final String inputTopic,
final String outputTopic,
final int instanceIdx,
final CountDownLatch latch) {
this.inputTopic = inputTopic;
this.outputTopic = outputTopic;
this.transactionalId = "Processor-" + instanceIdx;
// It is recommended to have a relatively short txn timeout in order to clear pending offsets faster.
final int transactionTimeoutMs = 10000;
// A unique must be provided in order to properly use EOS.
producer = new Producer(outputTopic, true, transactionalId, true, -1, transactionTimeoutMs, null).get();
// Consumer must be in read_committed mode, which means it won't be able to read uncommitted data.
// Consumer could optionally configure groupInstanceId to avoid unnecessary rebalances.
this.groupInstanceId = "Txn-consumer-" + instanceIdx;
consumer = new Consumer(inputTopic, "Eos-consumer",
Optional.of(groupInstanceId), READ_COMMITTED, -1, null).get();
this.latch = latch;
public void run() {
// Init transactions call should always happen first in order to clear zombie transactions from previous generation.
final AtomicLong messageRemaining = new AtomicLong(Long.MAX_VALUE);
consumer.subscribe(Collections.singleton(inputTopic), new ConsumerRebalanceListener() {
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
printWithTxnId("Revoked partition assignment to kick-off rebalancing: " + partitions);
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
printWithTxnId("Received partition assignment after rebalancing: " + partitions);
int messageProcessed = 0;
while (messageRemaining.get() > 0) {
try {
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofMillis(200));
if (records.count() > 0) {
// Begin a new transaction session.
for (ConsumerRecord<Integer, String> record : records) {
// Process the record and send to downstream.
ProducerRecord<Integer, String> customizedRecord = transform(record);
Map<TopicPartition, OffsetAndMetadata> offsets = consumerOffsets();
// Checkpoint the progress by sending offsets to group coordinator broker.
// Note that this API is only available for broker >= 2.5.
producer.sendOffsetsToTransaction(offsets, consumer.groupMetadata());
// Finish the transaction. All sent records should be visible for consumption now.
messageProcessed += records.count();
} catch (ProducerFencedException e) {
throw new KafkaException(String.format("The %s has been claimed by another process", transactionalId));
} catch (FencedInstanceIdException e) {
throw new KafkaException(String.format("The %s has been claimed by another process", groupInstanceId));
} catch (KafkaException e) {
// If we have not been fenced, try to abort the transaction and continue. This will raise immediately
// if the producer has hit a fatal error.
// The consumer fetch position needs to be restored to the committed offset
// before the transaction started.
printWithTxnId("Message remaining: " + messageRemaining);
printWithTxnId("Finished processing " + messageProcessed + " records");
private Map<TopicPartition, OffsetAndMetadata> consumerOffsets() {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
for (TopicPartition topicPartition : consumer.assignment()) {
offsets.put(topicPartition, new OffsetAndMetadata(consumer.position(topicPartition), null));
return offsets;
private void printWithTxnId(final String message) {
System.out.println(transactionalId + ": " + message);
private ProducerRecord<Integer, String> transform(final ConsumerRecord<Integer, String> record) {
printWithTxnId("Transformed record (" + record.key() + "," + record.value() + ")");
return new ProducerRecord<>(outputTopic, record.key() / 2, "Transformed_" + record.value());
private long messagesRemaining(final KafkaConsumer<Integer, String> consumer) {
final Map<TopicPartition, Long> fullEndOffsets = consumer.endOffsets(new ArrayList<>(consumer.assignment()));
// If we couldn't detect any end offset, that means we are still not able to fetch offsets.
if (fullEndOffsets.isEmpty()) {
return Long.MAX_VALUE;
return consumer.assignment().stream().mapToLong(partition -> {
long currentPosition = consumer.position(partition);
printWithTxnId("Processing partition " + partition + " with full offsets " + fullEndOffsets);
if (fullEndOffsets.containsKey(partition)) {
return fullEndOffsets.get(partition) - currentPosition;
return 0;
private static void resetToLastCommittedPositions(KafkaConsumer<Integer, String> consumer) {
final Map<TopicPartition, OffsetAndMetadata> committed = consumer.committed(consumer.assignment());
consumer.assignment().forEach(tp -> {
OffsetAndMetadata offsetAndMetadata = committed.get(tp);
if (offsetAndMetadata != null), offsetAndMetadata.offset());