KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore (#14718)

Signed-off-by: Greg Harris <greg.harris@aiven.io>

Reviewers: Yash Mayya <yash.mayya@gmail.com>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
index 3a94ab2..1d6632e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
@@ -89,7 +89,13 @@
         }
         // The topic parameter is irrelevant for the JsonConverter which is the internal converter used by
         // Connect workers.
-        Object deserializedKey = keyConverter.toConnectData("", partitionKey).value();
+        Object deserializedKey;
+        try {
+            deserializedKey = keyConverter.toConnectData("", partitionKey).value();
+        } catch (DataException e) {
+            log.warn("Ignoring offset partition key with unknown serialization. Expected json.", e);
+            return;
+        }
         if (!(deserializedKey instanceof List)) {
             log.warn("Ignoring offset partition key with an unexpected format. Expected type: {}, actual type: {}",
                     List.class.getName(), className(deserializedKey));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
index f5e6910..06fb51d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.connect.json.JsonConverterConfig;
 import org.junit.Test;
 
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -76,48 +77,50 @@
     }
 
     @Test
+    public void testProcessPartitionKeyWithUnknownSerialization() {
+        assertInvalidPartitionKey(
+                new byte[]{0},
+                "Ignoring offset partition key with unknown serialization");
+        assertInvalidPartitionKey(
+                "i-am-not-json".getBytes(StandardCharsets.UTF_8),
+                "Ignoring offset partition key with unknown serialization");
+    }
+
+    @Test
     public void testProcessPartitionKeyNotList() {
-        try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
-            Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
-            OffsetUtils.processPartitionKey(serializePartitionKey(new HashMap<>()), new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an unexpected format"));
-        }
+        assertInvalidPartitionKey(
+                new byte[]{},
+                "Ignoring offset partition key with an unexpected format");
+        assertInvalidPartitionKey(
+                serializePartitionKey(new HashMap<>()),
+                "Ignoring offset partition key with an unexpected format");
     }
 
     @Test
     public void testProcessPartitionKeyListWithOneElement() {
-        try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
-            Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
-            OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")), new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an unexpected number of elements"));
-        }
+        assertInvalidPartitionKey(
+                serializePartitionKey(Collections.singletonList("")),
+                "Ignoring offset partition key with an unexpected number of elements");
     }
 
     @Test
     public void testProcessPartitionKeyListWithElementsOfWrongType() {
+        assertInvalidPartitionKey(
+                serializePartitionKey(Arrays.asList(1, new HashMap<>())),
+                "Ignoring offset partition key with an unexpected format for the first element in the partition key list");
+        assertInvalidPartitionKey(
+                serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())),
+                "Ignoring offset partition key with an unexpected format for the second element in the partition key list");
+    }
+
+    public void assertInvalidPartitionKey(byte[] key, String message) {
         try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
             Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
-            OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList(1, new HashMap<>())), new byte[0], CONVERTER, connectorPartitions);
+            OffsetUtils.processPartitionKey(key, new byte[0], CONVERTER, connectorPartitions);
             // Expect no partition to be added to the map since the partition key is of an invalid format
             assertEquals(0, connectorPartitions.size());
             assertEquals(1, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(0),
-                    containsString("Ignoring offset partition key with an unexpected format for the first element in the partition key list"));
-
-            OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())), new byte[0], CONVERTER, connectorPartitions);
-            // Expect no partition to be added to the map since the partition key is of an invalid format
-            assertEquals(0, connectorPartitions.size());
-            assertEquals(2, logCaptureAppender.getMessages().size());
-            assertThat(logCaptureAppender.getMessages().get(1),
-                    containsString("Ignoring offset partition key with an unexpected format for the second element in the partition key list"));
+            assertThat(logCaptureAppender.getMessages().get(0), containsString(message));
         }
     }