KAFKA-15800: Prevent DataExceptions from corrupting KafkaOffsetBackingStore (#14718)
Signed-off-by: Greg Harris <greg.harris@aiven.io>
Reviewers: Yash Mayya <yash.mayya@gmail.com>
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
index 3a94ab2..1d6632e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/OffsetUtils.java
@@ -89,7 +89,13 @@
}
// The topic parameter is irrelevant for the JsonConverter which is the internal converter used by
// Connect workers.
- Object deserializedKey = keyConverter.toConnectData("", partitionKey).value();
+ Object deserializedKey;
+ try {
+ deserializedKey = keyConverter.toConnectData("", partitionKey).value();
+ } catch (DataException e) {
+ log.warn("Ignoring offset partition key with unknown serialization. Expected json.", e);
+ return;
+ }
if (!(deserializedKey instanceof List)) {
log.warn("Ignoring offset partition key with an unexpected format. Expected type: {}, actual type: {}",
List.class.getName(), className(deserializedKey));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
index f5e6910..06fb51d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/OffsetUtilsTest.java
@@ -22,6 +22,7 @@
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.junit.Test;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -76,48 +77,50 @@
}
@Test
+ public void testProcessPartitionKeyWithUnknownSerialization() {
+ assertInvalidPartitionKey(
+ new byte[]{0},
+ "Ignoring offset partition key with unknown serialization");
+ assertInvalidPartitionKey(
+ "i-am-not-json".getBytes(StandardCharsets.UTF_8),
+ "Ignoring offset partition key with unknown serialization");
+ }
+
+ @Test
public void testProcessPartitionKeyNotList() {
- try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
- Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
- OffsetUtils.processPartitionKey(serializePartitionKey(new HashMap<>()), new byte[0], CONVERTER, connectorPartitions);
- // Expect no partition to be added to the map since the partition key is of an invalid format
- assertEquals(0, connectorPartitions.size());
- assertEquals(1, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(0),
- containsString("Ignoring offset partition key with an unexpected format"));
- }
+ assertInvalidPartitionKey(
+ new byte[]{},
+ "Ignoring offset partition key with an unexpected format");
+ assertInvalidPartitionKey(
+ serializePartitionKey(new HashMap<>()),
+ "Ignoring offset partition key with an unexpected format");
}
@Test
public void testProcessPartitionKeyListWithOneElement() {
- try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
- Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
- OffsetUtils.processPartitionKey(serializePartitionKey(Collections.singletonList("")), new byte[0], CONVERTER, connectorPartitions);
- // Expect no partition to be added to the map since the partition key is of an invalid format
- assertEquals(0, connectorPartitions.size());
- assertEquals(1, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(0),
- containsString("Ignoring offset partition key with an unexpected number of elements"));
- }
+ assertInvalidPartitionKey(
+ serializePartitionKey(Collections.singletonList("")),
+ "Ignoring offset partition key with an unexpected number of elements");
}
@Test
public void testProcessPartitionKeyListWithElementsOfWrongType() {
+ assertInvalidPartitionKey(
+ serializePartitionKey(Arrays.asList(1, new HashMap<>())),
+ "Ignoring offset partition key with an unexpected format for the first element in the partition key list");
+ assertInvalidPartitionKey(
+ serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())),
+ "Ignoring offset partition key with an unexpected format for the second element in the partition key list");
+ }
+
+ public void assertInvalidPartitionKey(byte[] key, String message) {
try (LogCaptureAppender logCaptureAppender = LogCaptureAppender.createAndRegister(OffsetUtils.class)) {
Map<String, Set<Map<String, Object>>> connectorPartitions = new HashMap<>();
- OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList(1, new HashMap<>())), new byte[0], CONVERTER, connectorPartitions);
+ OffsetUtils.processPartitionKey(key, new byte[0], CONVERTER, connectorPartitions);
// Expect no partition to be added to the map since the partition key is of an invalid format
assertEquals(0, connectorPartitions.size());
assertEquals(1, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(0),
- containsString("Ignoring offset partition key with an unexpected format for the first element in the partition key list"));
-
- OffsetUtils.processPartitionKey(serializePartitionKey(Arrays.asList("connector-name", new ArrayList<>())), new byte[0], CONVERTER, connectorPartitions);
- // Expect no partition to be added to the map since the partition key is of an invalid format
- assertEquals(0, connectorPartitions.size());
- assertEquals(2, logCaptureAppender.getMessages().size());
- assertThat(logCaptureAppender.getMessages().get(1),
- containsString("Ignoring offset partition key with an unexpected format for the second element in the partition key list"));
+ assertThat(logCaptureAppender.getMessages().get(0), containsString(message));
}
}