[FLINK-16396] [kafka] Allow null keys for the YAML generic Kafka egress

This closes #49.
diff --git a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
index c934496..19af022 100644
--- a/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
+++ b/statefun-flink/statefun-flink-io-bundle/src/main/java/org/apache/flink/statefun/flink/io/kafka/GenericKafkaEgressSerializer.java
@@ -57,9 +57,14 @@
 
   private static ProducerRecord<byte[], byte[]> toProducerRecord(
       KafkaProducerRecord protobufProducerRecord) {
-    return new ProducerRecord<>(
-        protobufProducerRecord.getTopic(),
-        protobufProducerRecord.getKey().getBytes(StandardCharsets.UTF_8),
-        protobufProducerRecord.getValueBytes().toByteArray());
+    final String key = protobufProducerRecord.getKey();
+    final String topic = protobufProducerRecord.getTopic();
+    final byte[] valueBytes = protobufProducerRecord.getValueBytes().toByteArray();
+
+    if (key == null) {
+      return new ProducerRecord<>(topic, valueBytes);
+    } else {
+      return new ProducerRecord<>(topic, key.getBytes(StandardCharsets.UTF_8), valueBytes);
+    }
   }
 }