[PIP 131] Resolve produce chunk messages failed when topic level maxMessageSize is set (#13599)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 937662e..36fa611 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -990,7 +990,11 @@
return waitingExclusiveProducers.size();
}
- protected boolean isExceedMaximumMessageSize(int size) {
+ protected boolean isExceedMaximumMessageSize(int size, PublishContext publishContext) {
+ if (publishContext.isChunked()) {
+ //skip topic level max message check if it's chunk message.
+ return false;
+ }
int topicMaxMessageSize = topicPolicies.getTopicMaxMessageSize().get();
if (topicMaxMessageSize <= 0) {
//invalid setting means this check is disabled.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index ea9d0ce..1690ed8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -349,6 +349,7 @@
return sequenceId;
}
+ @Override
public boolean isChunked() {
return chunked;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index ba7ad0d..99e21f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -105,6 +105,10 @@
default Object getProperty(String propertyName) {
return null;
}
+
+ default boolean isChunked() {
+ return false;
+ }
}
CompletableFuture<Void> initialize();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5a6fc1c..1e5012a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -166,7 +166,7 @@
@Override
public void publishMessage(ByteBuf data, PublishContext callback) {
- if (isExceedMaximumMessageSize(data.readableBytes())) {
+ if (isExceedMaximumMessageSize(data.readableBytes(), callback)) {
callback.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 104575e..a95ab6e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -404,7 +404,7 @@
decrementPendingWriteOpsAndCheck();
return;
}
- if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+ if (isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
@@ -2971,7 +2971,7 @@
decrementPendingWriteOpsAndCheck();
return;
}
- if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+ if (isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 015330b..60e5914 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2981,4 +2981,35 @@
}
+ @Test
+ public void testMaxMessageSizeWithChunking() throws Exception {
+ this.conf.setMaxMessageSize(1000);
+
+ @Cleanup
+ PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+
+ @Cleanup
+ Producer<byte[]> producer = pulsarClient.newProducer()
+ .topic(persistenceTopic)
+ .enableChunking(true)
+ .enableBatching(false)
+ .create();
+
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(persistenceTopic,false).join().get();
+
+ // send success when topic level maxMessage is not set.
+ producer.send(new byte[2000]);
+
+ admin.topicPolicies().setMaxMessageSize(persistenceTopic, 500);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals((int) topic.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(), 500);
+ });
+
+ // non-chunk message send success
+ producer.send(new byte[400]);
+
+ // chunk message send success
+ producer.send(new byte[2000]);
+ }
+
}