[pulsar-io] [kafka-connect-adapter] KafkaSourceRecord - Initialize key property with Optional.empty() if keyBytes from SourceRecord is null (Optional property should never itself be null) (#5591)

diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
index d9662c4..1bdd6c4 100644
--- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
+++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java
@@ -187,9 +187,7 @@
                 srcRecord.topic(), srcRecord.keySchema(), srcRecord.key());
             byte[] valueBytes = valueConverter.fromConnectData(
                 srcRecord.topic(), srcRecord.valueSchema(), srcRecord.value());
-            if (keyBytes != null) {
-                this.key = Optional.of(Base64.getEncoder().encodeToString(keyBytes));
-            }
+            this.key = keyBytes != null ? Optional.of(Base64.getEncoder().encodeToString(keyBytes)) : Optional.empty();
             this.value = new KeyValue(keyBytes, valueBytes);
 
             this.topicName = Optional.of(srcRecord.topic());