[FLINK-24131][connectors/kafka] Improve handling of committer errors in KafkaCommitter.
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
index 8b27980..b511126 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaCommitter.java
@@ -18,16 +18,18 @@
 package org.apache.flink.connector.kafka.sink;
 
 import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
 
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.errors.InvalidTxnStateException;
 import org.apache.kafka.common.errors.ProducerFencedException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -39,9 +41,12 @@
  *
  * <p>The committer is responsible to finalize the Kafka transactions by committing them.
  */
-class KafkaCommitter implements Committer<KafkaCommittable> {
+class KafkaCommitter implements Committer<KafkaCommittable>, Closeable {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaCommitter.class);
+    public static final String UNKNOWN_PRODUCER_ID_ERROR_MESSAGE =
+            "because of a bug in the Kafka broker (KAFKA-9310). Please upgrade to Kafka 2.5+. If you are running with concurrent checkpoints, you also may want to try without them.\n"
+                    + "To avoid data loss, the application will restart.";
 
     private final Properties kafkaProducerConfig;
 
@@ -66,17 +71,16 @@
                                 .<FlinkKafkaInternalProducer<?, ?>>map(Recyclable::getObject)
                                 .orElseGet(() -> getRecoveryProducer(committable));
                 producer.commitTransaction();
-                recyclable.ifPresent(Recyclable::close);
-            } catch (InvalidTxnStateException e) {
+                producer.flush();
+            } catch (RetriableException e) {
                 LOG.warn(
-                        "Unable to commit recovered transaction ({}) because it's in an invalid state. "
-                                + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
-                        committable,
-                        e);
-                recyclable.ifPresent(Recyclable::close);
+                        "Encountered retriable exception while committing {}.", transactionalId, e);
+                retryableCommittables.add(committable);
+                continue;
             } catch (ProducerFencedException e) {
-                LOG.warn(
-                        "Unable to commit recovered transaction ({}) because its producer is already fenced."
+                // initTransaction has been called on this transaction before
+                LOG.error(
+                        "Unable to commit transaction ({}) because its producer is already fenced."
                                 + " This means that you either have a different producer with the same '{}' (this is"
                                 + " unlikely with the '{}' as all generated ids are unique and shouldn't be reused)"
                                 + " or recovery took longer than '{}' ({}ms). In both cases this most likely signals data loss,"
@@ -85,19 +89,34 @@
                         ProducerConfig.TRANSACTIONAL_ID_CONFIG,
                         KafkaSink.class.getSimpleName(),
                         ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,
-                        FlinkKafkaProducer.getTransactionTimeout(kafkaProducerConfig),
+                        kafkaProducerConfig.getProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG),
                         e);
-                recyclable.ifPresent(Recyclable::close);
-            } catch (Throwable e) {
-                LOG.warn("Cannot commit Kafka transaction, retrying.", e);
-                retryableCommittables.add(committable);
+            } catch (InvalidTxnStateException e) {
+                // This exception only occurs when aborting after a commit or vice versa.
+                // It does not appear on double commits or double aborts.
+                LOG.error(
+                        "Unable to commit transaction ({}) because it's in an invalid state. "
+                                + "Most likely the transaction has been aborted for some reason. Please check the Kafka logs for more details.",
+                        committable,
+                        e);
+            } catch (UnknownProducerIdException e) {
+                LOG.error(
+                        "Unable to commit transaction ({}) " + UNKNOWN_PRODUCER_ID_ERROR_MESSAGE,
+                        committable,
+                        e);
+            } catch (Exception e) {
+                LOG.error(
+                        "Transaction ({}) encountered error and data has been potentially lost.",
+                        committable,
+                        e);
             }
+            recyclable.ifPresent(Recyclable::close);
         }
         return retryableCommittables;
     }
 
     @Override
-    public void close() throws Exception {
+    public void close() {
         if (recoveryProducer != null) {
             recoveryProducer.close();
         }
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java
index 163592a..012fa99 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/Recyclable.java
@@ -33,10 +33,14 @@
     }
 
     public T getObject() {
-        checkState(object != null, "Already recycled");
+        checkState(!isRecycled(), "Already recycled");
         return object;
     }
 
+    boolean isRecycled() {
+        return object == null;
+    }
+
     @Override
     public void close() {
         recycler.accept(object);
diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
index 62f0cec..6ce4f24 100644
--- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
+++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaCommitterTest.java
@@ -19,6 +19,9 @@
 
 import org.apache.flink.util.TestLogger;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -26,17 +29,62 @@
 import java.util.List;
 import java.util.Properties;
 
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
 
 /** Tests for {@link KafkaCommitter}. */
 public class KafkaCommitterTest extends TestLogger {
 
+    private static final int PRODUCER_ID = 0;
+    private static final short EPOCH = 0;
+    private static final String TRANSACTIONAL_ID = "transactionalId";
+
+    /** Causes a network error by inactive broker and tests that a retry will happen. */
     @Test
-    public void testRetryCommittableOnFailure() throws IOException {
-        final KafkaCommitter committer = new KafkaCommitter(new Properties());
-        final short epoch = 0;
-        final List<KafkaCommittable> committables =
-                Collections.singletonList(new KafkaCommittable(0, epoch, "transactionalId", null));
-        assertEquals(committables, committer.commit(committables));
+    public void testRetryCommittableOnRetriableError() throws IOException {
+        Properties properties = getProperties();
+        try (final KafkaCommitter committer = new KafkaCommitter(properties);
+                FlinkKafkaInternalProducer<Object, Object> producer =
+                        new FlinkKafkaInternalProducer<>(properties, TRANSACTIONAL_ID);
+                Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
+                        new Recyclable<>(producer, p -> {})) {
+            final List<KafkaCommittable> committables =
+                    Collections.singletonList(
+                            new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
+            producer.resumeTransaction(PRODUCER_ID, EPOCH);
+            List<KafkaCommittable> recovered = committer.commit(committables);
+            assertThat(recovered, contains(committables.toArray()));
+            assertThat(recyclable.isRecycled(), equalTo(false));
+        }
+    }
+
+    @Test
+    public void testRetryCommittableOnFatalError() throws IOException {
+        Properties properties = getProperties();
+        try (final KafkaCommitter committer = new KafkaCommitter(properties);
+                FlinkKafkaInternalProducer<Object, Object> producer =
+                        new FlinkKafkaInternalProducer(properties, TRANSACTIONAL_ID);
+                Recyclable<FlinkKafkaInternalProducer<Object, Object>> recyclable =
+                        new Recyclable<>(producer, p -> {})) {
+            final List<KafkaCommittable> committables =
+                    Collections.singletonList(
+                            new KafkaCommittable(PRODUCER_ID, EPOCH, TRANSACTIONAL_ID, recyclable));
+            // will fail because transaction not started
+            List<KafkaCommittable> recovered = committer.commit(committables);
+            assertThat(recovered, empty());
+            assertThat(recyclable.isRecycled(), equalTo(true));
+        }
+    }
+
+    Properties getProperties() {
+        Properties properties = new Properties();
+        properties.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "http://localhost:1");
+        // Low timeout will fail commitTransaction quicker
+        properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "100");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return properties;
     }
 }