[fix][client] Fix potential NPE in TypedMessageBuilderImpl (#24691)
(cherry picked from commit 5e9e9129d3c082779d05b808b170f0e7823c42ed)
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
index e2bb4b0..35c72e2 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TypedMessageBuilderImpl.java
@@ -80,7 +80,7 @@
return null;
}
}).orElseGet(() -> {
- EncodeData encodeData = schema.encode(producer.topic, value);
+ EncodeData encodeData = schema.encode(getTopic(), value);
content = ByteBuffer.wrap(encodeData.data());
if (encodeData.hasSchemaId()) {
msgMetadata.setSchemaId(encodeData.schemaId());
@@ -275,7 +275,7 @@
public Message<T> getMessage() {
beforeSend();
- return MessageImpl.create(msgMetadata, content, schema, producer != null ? producer.getTopic() : null);
+ return MessageImpl.create(msgMetadata, content, schema, getTopic());
}
public long getPublishTime() {
@@ -314,7 +314,7 @@
EncodeData keyEncoded = null;
// set key as the message key
if (keyValue.getKey() != null) {
- keyEncoded = keyValueSchema.getKeySchema().encode(producer.topic, keyValue.getKey());
+ keyEncoded = keyValueSchema.getKeySchema().encode(getTopic(), keyValue.getKey());
msgMetadata.setPartitionKey(Base64.getEncoder().encodeToString(keyEncoded.data()));
msgMetadata.setPartitionKeyB64Encoded(true);
} else {
@@ -324,7 +324,7 @@
EncodeData valueEncoded = null;
// set value as the payload
if (keyValue.getValue() != null) {
- valueEncoded = keyValueSchema.getValueSchema().encode(producer.topic, keyValue.getValue());
+ valueEncoded = keyValueSchema.getValueSchema().encode(getTopic(), keyValue.getValue());
content = ByteBuffer.wrap(valueEncoded.data());
} else {
msgMetadata.setNullValue(true);
@@ -337,4 +337,9 @@
msgMetadata.setSchemaId(schemaId);
}
}
+
+ private String getTopic() {
+ return producer != null ? producer.getTopic() : null;
+ }
+
}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
index 6d2af96..257fc68 100644
--- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TypedMessageBuilderImplTest.java
@@ -272,4 +272,13 @@
}
}
+ @Test
+ public void testGetMessageWithNullProducer() {
+ TypedMessageBuilderImpl<byte[]> builder = new TypedMessageBuilderImpl<>(null, Schema.BYTES);
+ var data = "test".getBytes();
+ builder.value(data);
+ var message = builder.getMessage();
+ assertEquals(message.getValue(), data);
+ }
+
}