Fix can not disable and remove max consumer per subscription (#10070)

### Motivation
1)Now, we cannot disable `MaxConsumersPerSubscription` in any level of Policy
2) The Namespace level MaxConsumersPerSubscription cannot be deleted as long as it is set
3)The default value of the namespace level is incorrect, and the broker level data will be returned
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 063120b..7170f0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -339,9 +339,6 @@
             BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
             policies.bundles = bundleData != null ? bundleData : policies.bundles;
 
-            // hydrate the namespace polices
-            mergeNamespaceWithDefaults(policies, namespace, policyPath);
-
             return policies;
         } catch (RestException re) {
             throw re;
@@ -371,8 +368,6 @@
                         return FutureUtil.failedFuture(new RestException(e));
                     }
                     policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
-                    // hydrate the namespace polices
-                    mergeNamespaceWithDefaults(policies.get(), namespace, policyPath);
                     return CompletableFuture.completedFuture(policies.get());
                 });
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7f582eac..0b53838 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2089,17 +2089,17 @@
         }
     }
 
-    protected int internalGetMaxConsumersPerSubscription() {
+    protected Integer internalGetMaxConsumersPerSubscription() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
     }
 
-    protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
+    protected void internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
-            if (maxConsumersPerSubscription < 0) {
+            if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
                 throw new RestException(Status.PRECONDITION_FAILED,
                         "maxConsumersPerSubscription must be 0 or more");
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 72fc840..9afcb24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1138,7 +1138,7 @@
     @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public int getMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
+    public Integer getMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
         return internalGetMaxConsumersPerSubscription();
@@ -1160,6 +1160,19 @@
         internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid")})
+    public void removeMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
+                                               @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetMaxConsumersPerSubscription(null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer")
     @ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d57fc7b..7610677 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -179,7 +179,9 @@
         }
 
         if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0
+            maxConsumersPerSubscription = policies != null
+                    && policies.max_consumers_per_subscription != null
+                    && policies.max_consumers_per_subscription >= 0
                     ? policies.max_consumers_per_subscription :
                     brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 7087c1a..cca01e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -609,6 +609,21 @@
         producer.close();
     }
 
+
+    @Test(timeOut = 20000)
+    public void testMaxConsumersOnSubApi() throws Exception {
+        final String namespace = "prop-xyz/ns1";
+        assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+        admin.namespaces().setMaxConsumersPerSubscription(namespace, 10);
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+            assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10);
+        });
+        admin.namespaces().removeMaxConsumersPerSubscription(namespace);
+        Awaitility.await().untilAsserted(() ->
+                admin.namespaces().getMaxConsumersPerSubscription(namespace));
+    }
+
     /**
      * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
      *
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 35527e1..023e604 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1381,6 +1381,72 @@
         assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
     }
 
+    @Test(timeOut = 30000)
+    public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        int maxConsumerInBroker = 1;
+        int maxConsumerInNs = 2;
+        int maxConsumerInTopic = 4;
+        String mySub = "my-sub";
+        conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().until(() ->
+                pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        List<Consumer<String>> consumerList = new ArrayList<>();
+        ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic).subscriptionName(mySub);
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs);
+        Awaitility.await().untilAsserted(() ->
+                assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+        //disabled
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0));
+        consumerList.add(builder.subscribe());
+        //set topic-level
+        admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic);
+        Awaitility.await().untilAsserted(() ->
+                assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic)));
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+        //remove topic policies
+        admin.topics().removeMaxConsumersPerSubscription(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin.topics().getMaxConsumersPerSubscription(topic)));
+        consumerList.add(builder.subscribe());
+        //remove namespace policies, then use broker-level
+        admin.namespaces().removeMaxConsumersPerSubscription(myNamespace);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+
+        for (Consumer<String> consumer : consumerList) {
+            consumer.close();
+        }
+    }
+
     @Test
     public void testRemoveSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 3c61ce8..703269a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2903,7 +2903,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+    Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
 
     /**
      * Get the maxConsumersPerSubscription for a namespace asynchronously.
@@ -2959,6 +2959,20 @@
     CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription);
 
     /**
+     * Remove maxConsumersPerSubscription for a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+    /**
+     * Remove maxConsumersPerSubscription for a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String namespace);
+
+    /**
      * Get the maxUnackedMessagesPerConsumer for a namespace.
      * <p/>
      * Response example:
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index fd2c422..e110645 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2543,7 +2543,7 @@
     }
 
     @Override
-    public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+    public Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
         try {
             return getMaxConsumersPerSubscriptionAsync(namespace).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -2602,6 +2602,30 @@
     }
 
     @Override
+    public void removeMaxConsumersPerSubscription(String namespace)
+            throws PulsarAdminException {
+        try {
+            removeMaxConsumersPerSubscriptionAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(
+            String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException {
         try {
             return getMaxUnackedMessagesPerConsumerAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7c03f2f..93d547f 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -536,6 +536,9 @@
         namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
         verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
 
+        namespaces.run(split("remove-max-consumers-per-subscription myprop/clust/ns1"));
+        verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1");
+
         namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
         verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a749271..7a2e52d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1475,6 +1475,18 @@
         }
     }
 
+    @Parameters(commandDescription = "Remove maxConsumersPerSubscription for a namespace")
+    private class RemoveMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().removeMaxConsumersPerSubscription(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace")
     private class SetMaxConsumersPerSubscription extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -2190,6 +2202,7 @@
 
         jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
+        jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
 
         jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
         jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 184c58d..72d6f6f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -86,7 +86,7 @@
     @SuppressWarnings("checkstyle:MemberName")
     public Integer max_consumers_per_topic = null;
     @SuppressWarnings("checkstyle:MemberName")
-    public int max_consumers_per_subscription = 0;
+    public Integer max_consumers_per_subscription = null;
     @SuppressWarnings("checkstyle:MemberName")
     public Integer max_unacked_messages_per_consumer = null;
     @SuppressWarnings("checkstyle:MemberName")
@@ -178,8 +178,8 @@
                     && Objects.equals(max_consumers_per_topic, other.max_consumers_per_topic)
                     && Objects.equals(max_unacked_messages_per_consumer, other.max_unacked_messages_per_consumer)
                     && Objects.equals(max_unacked_messages_per_subscription, max_unacked_messages_per_subscription)
-                    && max_consumers_per_subscription == other.max_consumers_per_subscription
-                    && compaction_threshold == other.compaction_threshold
+                    && Objects.equals(max_consumers_per_subscription, max_consumers_per_subscription)
+                    && Objects.equals(compaction_threshold, compaction_threshold)
                     && offload_threshold == other.offload_threshold
                     && Objects.equals(offload_deletion_lag_ms, other.offload_deletion_lag_ms)
                     && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy