[pulsar-io] [kafka-connect-adapter] KafkaSourceRecord - Initialize key property with Optional.empty() if keyBytes from SourceRecord is null (Optional property should never itself be null) (#5591)
diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index d9662c4..1bdd6c4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -187,9 +187,7 @@
srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
byte[] valueBytes = valueConverter.fromConnectData(
srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
- if (keyBytes != null) {
- this.key = Optional.of(Base64.getEncoder().encodeToString(keyBytes));
- }
+ this.key = keyBytes != null ? Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
this.value = new KeyValue(keyBytes, valueBytes);
this.topicName = Optional.of(srcRecord.topic());