[FLINK-16396] [kafka] Allow null keys for the YAML generic Kafka egress
This closes #49.
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index c934496..19af022 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -57,9 +57,14 @@
private static ProducerRecord<byte[], byte[]> toProducerRecord(
KafkaProducerRecord protobufProducerRecord) {
- return new ProducerRecord<>(
- protobufProducerRecord.getTopic(),
- protobufProducerRecord.getKey().getBytes(StandardCharsets.UTF_8),
- protobufProducerRecord.getValueBytes().toByteArray());
+ final String key = protobufProducerRecord.getKey();
+ final String topic = protobufProducerRecord.getTopic();
+ final byte[] valueBytes = protobufProducerRecord.getValueBytes().toByteArray();
+
+ if (key == null) {
+ return new ProducerRecord<>(topic, valueBytes);
+ } else {
+ return new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), valueBytes);
+ }
}
}