MINOR: Fix SubscriptionResponseWrapperSerializer (#17205)
The existing check is not correct, because `byte` range is from -128...127.
This PR fixes the check to use `< 0`.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
index 939b1ec..02837b9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
+++ b/streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java
@@ -46,7 +46,6 @@
* @param record The record that failed to produce
* @param exception The exception that occurred during production
*/
- @SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handle(final ErrorHandlerContext context,
final ProducerRecord<byte[], byte[]> record,
final Exception exception) {
@@ -76,7 +75,6 @@
* @param exception the exception that occurred during serialization
* @param origin the origin of the serialization exception
*/
- @SuppressWarnings("deprecation")
default ProductionExceptionHandlerResponse handleSerializationException(final ErrorHandlerContext context,
final ProducerRecord record,
final Exception exception,
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
index 9e143d4..72a918e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerde.java
@@ -66,9 +66,8 @@
public byte[] serialize(final String topic, final SubscriptionResponseWrapper<V> data) {
//{1-bit-isHashNull}{7-bits-version}{Optional-16-byte-Hash}{n-bytes serialized data}
- //7-bit (0x7F) maximum for data version.
- if (Byte.compare((byte) 0x7F, data.version()) < 0) {
- throw new UnsupportedVersionException("SubscriptionResponseWrapper version is larger than maximum supported 0x7F");
+ if (data.version() < 0) {
+ throw new UnsupportedVersionException("SubscriptionResponseWrapper version cannot be negative");
}
final byte[] serializedData = data.foreignValue() == null ? null : serializer.serialize(topic, data.foreignValue());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
index 9be5e39..276600f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionResponseWrapperSerdeTest.java
@@ -42,14 +42,10 @@
}
@Override
- public void configure(final Map<String, ?> configs, final boolean isKey) {
-
- }
+ public void configure(final Map<String, ?> configs, final boolean isKey) { }
@Override
- public void close() {
-
- }
+ public void close() { }
@Override
public Serializer<T> serializer() {
@@ -73,68 +69,95 @@
}
@Test
- @SuppressWarnings("unchecked")
public void ShouldSerdeWithNonNullsTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
+ final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullForeignValueTest() {
final long[] hashedValue = Murmur3.hash128(new byte[] {(byte) 0x01, (byte) 0x9A, (byte) 0xFF, (byte) 0x00});
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, null, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
+ final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertNull(result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertNull(result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullHashTest() {
final long[] hashedValue = null;
final String foreignValue = "foreignValue";
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
+ final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
- @SuppressWarnings("unchecked")
public void shouldSerdeWithNullsTest() {
final long[] hashedValue = null;
final String foreignValue = null;
final SubscriptionResponseWrapper<String> srw = new SubscriptionResponseWrapper<>(hashedValue, foreignValue, 1);
- final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde(new NonNullableSerde(Serdes.String()));
- final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
- final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(new NonNullableSerde<>(Serdes.String()))) {
+ final byte[] serResponse = srwSerde.serializer().serialize(null, srw);
+ final SubscriptionResponseWrapper<String> result = srwSerde.deserializer().deserialize(null, serResponse);
- assertArrayEquals(hashedValue, result.originalValueHash());
- assertEquals(foreignValue, result.foreignValue());
- assertNull(result.primaryPartition());
+ assertArrayEquals(hashedValue, result.originalValueHash());
+ assertEquals(foreignValue, result.foreignValue());
+ assertNull(result.primaryPartition());
+ }
}
@Test
public void shouldThrowExceptionWithBadVersionTest() {
final long[] hashedValue = null;
- assertThrows(UnsupportedVersionException.class,
- () -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) 0xFF, 1));
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> new SubscriptionResponseWrapper<>(hashedValue, "foreignValue", (byte) -1, 1)
+ );
+ }
+
+ @Test
+ public void shouldThrowExceptionOnSerializeWhenDataVersionUnknown() {
+ final SubscriptionResponseWrapper<String> srw = new InvalidSubscriptionResponseWrapper(null, null, 1);
+ try (final SubscriptionResponseWrapperSerde<String> srwSerde = new SubscriptionResponseWrapperSerde<>(null)) {
+ assertThrows(
+ UnsupportedVersionException.class,
+ () -> srwSerde.serializer().serialize(null, srw)
+ );
+ }
+ }
+
+ public static class InvalidSubscriptionResponseWrapper extends SubscriptionResponseWrapper<String> {
+
+ public InvalidSubscriptionResponseWrapper(final long[] originalValueHash,
+ final String foreignValue,
+ final Integer primaryPartition) {
+ super(originalValueHash, foreignValue, primaryPartition);
+ }
+
+ @Override
+ public byte version() {
+ return -1;
+ }
}
}
\ No newline at end of file