[PIP 131] Resolve produce chunk messages failed when topic level maxMessageSize is set (#13599)


diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 937662e..36fa611 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -990,7 +990,11 @@
         return waitingExclusiveProducers.size();
     }
 
-    protected boolean isExceedMaximumMessageSize(int size) {
+    protected boolean isExceedMaximumMessageSize(int size, PublishContext publishContext) {
+        if (publishContext.isChunked()) {
+            //skip topic level max message check if it's chunk message.
+            return false;
+        }
         int topicMaxMessageSize = topicPolicies.getTopicMaxMessageSize().get();
         if (topicMaxMessageSize <= 0) {
             //invalid setting means this check is disabled.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
index ea9d0ce..1690ed8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Producer.java
@@ -349,6 +349,7 @@
             return sequenceId;
         }
 
+        @Override
         public boolean isChunked() {
             return chunked;
         }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index ba7ad0d..99e21f9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -105,6 +105,10 @@
         default Object getProperty(String propertyName) {
             return null;
         }
+
+        default boolean isChunked() {
+            return false;
+        }
     }
 
     CompletableFuture<Void> initialize();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
index 5a6fc1c..1e5012a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java
@@ -166,7 +166,7 @@
 
     @Override
     public void publishMessage(ByteBuf data, PublishContext callback) {
-        if (isExceedMaximumMessageSize(data.readableBytes())) {
+        if (isExceedMaximumMessageSize(data.readableBytes(), callback)) {
             callback.completed(new NotAllowedException("Exceed maximum message size")
                     , -1, -1);
             return;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 104575e..a95ab6e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -404,7 +404,7 @@
             decrementPendingWriteOpsAndCheck();
             return;
         }
-        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
             publishContext.completed(new NotAllowedException("Exceed maximum message size")
                     , -1, -1);
             decrementPendingWriteOpsAndCheck();
@@ -2971,7 +2971,7 @@
             decrementPendingWriteOpsAndCheck();
             return;
         }
-        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
+        if (isExceedMaximumMessageSize(headersAndPayload.readableBytes(), publishContext)) {
             publishContext.completed(new NotAllowedException("Exceed maximum message size")
                     , -1, -1);
             decrementPendingWriteOpsAndCheck();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 015330b..60e5914 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -2981,4 +2981,35 @@
 
     }
 
+    @Test
+    public void testMaxMessageSizeWithChunking() throws Exception {
+        this.conf.setMaxMessageSize(1000);
+
+        @Cleanup
+        PulsarClient pulsarClient = newPulsarClient(lookupUrl.toString(), 0);
+
+        @Cleanup
+        Producer<byte[]> producer = pulsarClient.newProducer()
+                .topic(persistenceTopic)
+                .enableChunking(true)
+                .enableBatching(false)
+                .create();
+
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getTopic(persistenceTopic,false).join().get();
+
+        // send success when topic level maxMessage is not set.
+        producer.send(new byte[2000]);
+
+        admin.topicPolicies().setMaxMessageSize(persistenceTopic, 500);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals((int) topic.getHierarchyTopicPolicies().getTopicMaxMessageSize().get(), 500);
+        });
+
+        // non-chunk message send success
+        producer.send(new byte[400]);
+
+        // chunk message send success
+        producer.send(new byte[2000]);
+    }
+
 }