[fix] [cli] the variable producerName of BatchMsgContainer is null (#20819)
Motivation: If the producer name is generated by the Broker, the producer will update the variable `producerName` after connecting, but not update the same variable of the batch message container.
Modifications: fix bug
(cherry picked from commit aba50f2a276412bc43a4652b9ce303d384f71966)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
index e369e7d..c9d8a2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java
@@ -1538,11 +1538,12 @@
headersAndPayload.resetReaderIndex();
if (log.isDebugEnabled()) {
log.debug("[{}] Received send message request. producer: {}:{} {}:{} size: {},"
- + " partition key is: {}, ordering key is {}",
+ + " partition key is: {}, ordering key is {}, uncompressedSize is {}",
remoteAddress, send.getProducerId(), send.getSequenceId(), msgMetadata.getProducerName(),
msgMetadata.getSequenceId(), headersAndPayload.readableBytes(),
msgMetadata.hasPartitionKey() ? msgMetadata.getPartitionKey() : null,
- msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null);
+ msgMetadata.hasOrderingKey() ? msgMetadata.getOrderingKey() : null,
+ msgMetadata.getUncompressedSize());
}
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
index 9b4d1b7..f762b5a 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AbstractBatchMessageContainer.java
@@ -35,7 +35,6 @@
protected CompressionType compressionType;
protected CompressionCodec compressor;
protected String topicName;
- protected String producerName;
protected ProducerImpl producer;
protected int maxNumMessagesInBatch;
@@ -98,7 +97,6 @@
public void setProducer(ProducerImpl<?> producer) {
this.producer = producer;
this.topicName = producer.getTopic();
- this.producerName = producer.getProducerName();
this.compressionType = CompressionCodecProvider
.convertToWireProtocol(producer.getConfiguration().getCompressionType());
this.compressor = CompressionCodecProvider.getCompressionCodec(compressionType);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
index 37b522d..f657816 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageContainerImpl.java
@@ -80,8 +80,8 @@
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName, producerName,
- numMessagesInBatch);
+ log.debug("[{}] [{}] add message to batch, num messages in batch so far {}", topicName,
+ producer.getProducerName(), numMessagesInBatch);
}
if (++numMessagesInBatch == 1) {
@@ -193,8 +193,8 @@
firstCallback.sendComplete(ex);
}
} catch (Throwable t) {
- log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName, producerName,
- lowestSequenceId, t);
+ log.warn("[{}] [{}] Got exception while completing the callback for msg {}:", topicName,
+ producer.getProducerName(), lowestSequenceId, t);
}
clear();
}
@@ -226,6 +226,14 @@
ByteBufPair cmd = producer.sendMessage(producer.producerId, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), numMessagesInBatch, messageMetadata, encryptedPayload);
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] [{}] Build batch msg seq:{}, highest-seq:{}, numMessagesInBatch: {}, uncompressedSize: {},"
+ + " payloadSize: {}", topicName, producer.getProducerName(),
+ messageMetadata.getSequenceId(), messageMetadata.getNumMessagesInBatch(),
+ messageMetadata.getHighestSequenceId(),
+ messageMetadata.getUncompressedSize(), encryptedPayload.readableBytes());
+ }
+
OpSendMsg op = OpSendMsg.create(messages, cmd, messageMetadata.getSequenceId(),
messageMetadata.getHighestSequenceId(), firstCallback);
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
index 7614728..272f2dd 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BatchMessageKeyBasedContainer.java
@@ -54,8 +54,8 @@
@Override
public boolean add(MessageImpl<?> msg, SendCallback callback) {
if (log.isDebugEnabled()) {
- log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName, producerName,
- numMessagesInBatch);
+ log.debug("[{}] [{}] add message to batch, num messages in batch so far is {}", topicName,
+ producer.getProducerName(), numMessagesInBatch);
}
numMessagesInBatch++;
currentBatchSizeBytes += msg.getDataBuffer().readableBytes();