[FLINK-24131][connectors/kafka] Improve handling of committer errors in KafkaCommitter.
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index 8b27980..b511126 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -18,16 +18,18 @@
package org.apache.flink.connector.kafka.sink;
import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.InvalidTxnStateException;
import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@@ -39,9 +41,12 @@
*
* <p>The committer is responsible to finalize the Kafka transactions by committing them.
*/
-class KafkaCommitter implements Committer<KafkaCommittable> {
+class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
+ public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE =
+ "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n"
+ + "To avoid data loss, the application will restart.";
private final Properties kafkaProducerConfig;
@@ -66,17 +71,16 @@
.<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
.orElseGet(() -> getRecoveryProducer(committable));
producer.commitTransaction();
- recyclable.ifPresent(Recyclable::close);
- } catch (InvalidTxnStateException e) {
+ producer.flush();
+ } catch (RetriableException e) {
LOG.warn(
- "Unable to commit recovered transaction ({}) because it's in an invalid state. "
- + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
- committable,
- e);
- recyclable.ifPresent(Recyclable::close);
+ "Encountered retriable exception while committing {}.", transactionalId, e);
+ retryableCommittables.add(committable);
+ continue;
} catch (ProducerFencedException e) {
- LOG.warn(
- "Unable to commit recovered transaction ({}) because its producer is already fenced."
+ // initTransaction has been called on this transaction before
+ LOG.error(
+ "Unable to commit transaction ({}) because its producer is already fenced."
+ " This means that you either have a different producer with the same '{}' (this is"
+ " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
+ " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
@@ -85,19 +89,34 @@
ProducerConfig.TRANSACTIONAL_ID_CONFIG,
KafkaSink.class.getSimpleName(),
ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
- FlinkKafkaProducer.getTransactionTimeout(kafkaProducerConfig),
+ kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
e);
- recyclable.ifPresent(Recyclable::close);
- } catch (Throwable e) {
- LOG.warn("Cannot commit Kafka transaction, retrying.", e);
- retryableCommittables.add(committable);
+ } catch (InvalidTxnStateException e) {
+ // This exception only occurs when aborting after a commit or vice versa.
+ // It does not appear on double commits or double aborts.
+ LOG.error(
+ "Unable to commit transaction ({}) because it's in an invalid state. "
+ + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
+ committable,
+ e);
+ } catch (UnknownProducerIdException e) {
+ LOG.error(
+ "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
+ committable,
+ e);
+ } catch (Exception e) {
+ LOG.error(
+ "Transaction ({}) encountered error and data has been potentially lost.",
+ committable,
+ e);
}
+ recyclable.ifPresent(Recyclable::close);
}
return retryableCommittables;
}
@Override
- public void close() throws Exception {
+ public void close() {
if (recoveryProducer != null) {
recoveryProducer.close();
}
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java
index 163592a..012fa99 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java
@@ -33,10 +33,14 @@
}
public T getObject() {
- checkState(object != null, "Already recycled");
+ checkState(!isRecycled(), "Already recycled");
return object;
}
+ boolean isRecycled() {
+ return object == null;
+ }
+
@Override
public void close() {
recycler.accept(object);
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 62f0cec..6ce4f24 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -19,6 +19,9 @@
import org.apache.flink.util.TestLogger;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Test;
import java.io.IOException;
@@ -26,17 +29,62 @@
import java.util.List;
import java.util.Properties;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
/** Tests for {@link KafkaCommitter}. */
public class KafkaCommitterTest extends TestLogger {
+ private static final int PRODUCER_ID = 0;
+ private static final short EPOCH = 0;
+ private static final String TRANSACTIONAL_ID = "transactionalId";
+
+ /** Causes a network error by inactive broker and tests that a retry will happen. */
@Test
- public void testRetryCommittableOnFailure() throws IOException {
- final KafkaCommitter committer = new KafkaCommitter(new Properties());
- final short epoch = 0;
- final List<KafkaCommittable> committables =
- Collections.singletonList(new KafkaCommittable(0, epoch, "transactionalId", null));
- assertEquals(committables, committer.commit(committables));
+ public void testRetryCommittableOnRetriableError() throws IOException {
+ Properties properties = getProperties();
+ try (final KafkaCommitter committer = new KafkaCommitter(properties);
+ FlinkKafkaInternalProducer<Object, Object> producer =
+ new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID);
+ Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
+ new Recyclable<>(producer, p -> {})) {
+ final List<KafkaCommittable> committables =
+ Collections.singletonList(
+ new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
+ producer.resumeTransaction(PRODUCER_ID, EPOCH);
+ List<KafkaCommittable> recovered = committer.commit(committables);
+ assertThat(recovered, contains(committables.toArray()));
+ assertThat(recyclable.isRecycled(), equalTo(false));
+ }
+ }
+
+ @Test
+ public void testRetryCommittableOnFatalError() throws IOException {
+ Properties properties = getProperties();
+ try (final KafkaCommitter committer = new KafkaCommitter(properties);
+ FlinkKafkaInternalProducer<Object, Object> producer =
+ new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID);
+ Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
+ new Recyclable<>(producer, p -> {})) {
+ final List<KafkaCommittable> committables =
+ Collections.singletonList(
+ new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
+ // will fail because transaction not started
+ List<KafkaCommittable> recovered = committer.commit(committables);
+ assertThat(recovered, empty());
+ assertThat(recyclable.isRecycled(), equalTo(true));
+ }
+ }
+
+ Properties getProperties() {
+ Properties properties = new Properties();
+ properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:1");
+ // Low timeout will fail commitTransaction quicker
+ properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
+ properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return properties;
}
}