[hotfix] [flink-connector-kafka] Simplify ProducerRecord instantiation w.r.t. headers
diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
index 1cc9220..34cf6ef 100644
--- a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
+++ b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java
@@ -347,22 +347,13 @@
context.getPartitionsForTopic(targetTopic)))
: OptionalInt.empty();
- if (headerProvider != null) {
- return new ProducerRecord<>(
- targetTopic,
- partition.isPresent() ? partition.getAsInt() : null,
- timestamp == null || timestamp < 0L ? null : timestamp,
- key,
- value,
- headerProvider.getHeaders(element));
- } else {
- return new ProducerRecord<>(
- targetTopic,
- partition.isPresent() ? partition.getAsInt() : null,
- timestamp == null || timestamp < 0L ? null : timestamp,
- key,
- value);
- }
+ return new ProducerRecord<>(
+ targetTopic,
+ partition.isPresent() ? partition.getAsInt() : null,
+ timestamp == null || timestamp < 0L ? null : timestamp,
+ key,
+ value,
+ headerProvider != null ? headerProvider.getHeaders(element) : null);
}
}
}