diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
index 063120b..7170f0f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/AdminResource.java
@@ -339,9 +339,6 @@
             BundlesData bundleData = NamespaceBundleFactory.getBundlesData(bundles);
             policies.bundles = bundleData != null ? bundleData : policies.bundles;
 
-            // hydrate the namespace polices
-            mergeNamespaceWithDefaults(policies, namespace, policyPath);
-
             return policies;
         } catch (RestException re) {
             throw re;
@@ -371,8 +368,6 @@
                         return FutureUtil.failedFuture(new RestException(e));
                     }
                     policies.get().bundles = bundleData != null ? bundleData : policies.get().bundles;
-                    // hydrate the namespace polices
-                    mergeNamespaceWithDefaults(policies.get(), namespace, policyPath);
                     return CompletableFuture.completedFuture(policies.get());
                 });
             } else {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7f582eac..0b53838 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -2089,17 +2089,17 @@
         }
     }
 
-    protected int internalGetMaxConsumersPerSubscription() {
+    protected Integer internalGetMaxConsumersPerSubscription() {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.READ);
         return getNamespacePolicies(namespaceName).max_consumers_per_subscription;
     }
 
-    protected void internalSetMaxConsumersPerSubscription(int maxConsumersPerSubscription) {
+    protected void internalSetMaxConsumersPerSubscription(Integer maxConsumersPerSubscription) {
         validateNamespacePolicyOperation(namespaceName, PolicyName.MAX_CONSUMERS, PolicyOperation.WRITE);
         validatePoliciesReadOnlyAccess();
 
         try {
-            if (maxConsumersPerSubscription < 0) {
+            if (maxConsumersPerSubscription != null && maxConsumersPerSubscription < 0) {
                 throw new RestException(Status.PRECONDITION_FAILED,
                         "maxConsumersPerSubscription must be 0 or more");
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
index 72fc840..9afcb24 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java
@@ -1138,7 +1138,7 @@
     @ApiOperation(value = "Get maxConsumersPerSubscription config on a namespace.")
     @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin permission"),
             @ApiResponse(code = 404, message = "Namespace does not exist") })
-    public int getMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
+    public Integer getMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace) {
         validateNamespaceName(tenant, namespace);
         return internalGetMaxConsumersPerSubscription();
@@ -1160,6 +1160,19 @@
         internalSetMaxConsumersPerSubscription(maxConsumersPerSubscription);
     }
 
+    @DELETE
+    @Path("/{tenant}/{namespace}/maxConsumersPerSubscription")
+    @ApiOperation(value = " Set maxConsumersPerSubscription configuration on a namespace.")
+    @ApiResponses(value = {@ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Namespace does not exist"),
+            @ApiResponse(code = 409, message = "Concurrent modification"),
+            @ApiResponse(code = 412, message = "maxConsumersPerSubscription value is not valid")})
+    public void removeMaxConsumersPerSubscription(@PathParam("tenant") String tenant,
+                                               @PathParam("namespace") String namespace) {
+        validateNamespaceName(tenant, namespace);
+        internalSetMaxConsumersPerSubscription(null);
+    }
+
     @GET
     @Path("/{tenant}/{namespace}/maxUnackedMessagesPerConsumer")
     @ApiOperation(value = "Get maxUnackedMessagesPerConsumer config on a namespace.")
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index d57fc7b..7610677 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -179,7 +179,9 @@
         }
 
         if (maxConsumersPerSubscription == null) {
-            maxConsumersPerSubscription = policies != null && policies.max_consumers_per_subscription > 0
+            maxConsumersPerSubscription = policies != null
+                    && policies.max_consumers_per_subscription != null
+                    && policies.max_consumers_per_subscription >= 0
                     ? policies.max_consumers_per_subscription :
                     brokerService.pulsar().getConfiguration().getMaxConsumersPerSubscription();
         }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
index 7087c1a..cca01e0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest2.java
@@ -609,6 +609,21 @@
         producer.close();
     }
 
+
+    @Test(timeOut = 20000)
+    public void testMaxConsumersOnSubApi() throws Exception {
+        final String namespace = "prop-xyz/ns1";
+        assertNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+        admin.namespaces().setMaxConsumersPerSubscription(namespace, 10);
+        Awaitility.await().untilAsserted(() -> {
+            assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(namespace));
+            assertEquals(admin.namespaces().getMaxConsumersPerSubscription(namespace).intValue(), 10);
+        });
+        admin.namespaces().removeMaxConsumersPerSubscription(namespace);
+        Awaitility.await().untilAsserted(() ->
+                admin.namespaces().getMaxConsumersPerSubscription(namespace));
+    }
+
     /**
      * It verifies that pulsar with different load-manager generates different load-report and returned by admin-api
      *
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index 35527e1..023e604 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -1381,6 +1381,72 @@
         assertEquals(admin.topics().getSubscribeRate(topic, true), brokerPolicy);
     }
 
+    @Test(timeOut = 30000)
+    public void testPriorityAndDisableMaxConsumersOnSub() throws Exception {
+        final String topic = testTopic + UUID.randomUUID();
+        int maxConsumerInBroker = 1;
+        int maxConsumerInNs = 2;
+        int maxConsumerInTopic = 4;
+        String mySub = "my-sub";
+        conf.setMaxConsumersPerSubscription(maxConsumerInBroker);
+        pulsarClient.newProducer().topic(topic).create().close();
+        Awaitility.await().until(() ->
+                pulsar.getTopicPoliciesService().cacheIsInitialized(TopicName.get(topic)));
+        List<Consumer<String>> consumerList = new ArrayList<>();
+        ConsumerBuilder<String> builder = pulsarClient.newConsumer(Schema.STRING)
+                .subscriptionType(SubscriptionType.Shared)
+                .topic(topic).subscriptionName(mySub);
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, maxConsumerInNs);
+        Awaitility.await().untilAsserted(() ->
+                assertNotNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+        //disabled
+        admin.namespaces().setMaxConsumersPerSubscription(myNamespace, 0);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin.namespaces().getMaxConsumersPerSubscription(myNamespace).intValue(), 0));
+        consumerList.add(builder.subscribe());
+        //set topic-level
+        admin.topics().setMaxConsumersPerSubscription(topic, maxConsumerInTopic);
+        Awaitility.await().untilAsserted(() ->
+                assertNotNull(admin.topics().getMaxConsumersPerSubscription(topic)));
+        consumerList.add(builder.subscribe());
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+        //remove topic policies
+        admin.topics().removeMaxConsumersPerSubscription(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin.topics().getMaxConsumersPerSubscription(topic)));
+        consumerList.add(builder.subscribe());
+        //remove namespace policies, then use broker-level
+        admin.namespaces().removeMaxConsumersPerSubscription(myNamespace);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin.namespaces().getMaxConsumersPerSubscription(myNamespace)));
+        try {
+            builder.subscribe();
+            fail("should fail");
+        } catch (PulsarClientException ignored) {
+        }
+
+        for (Consumer<String> consumer : consumerList) {
+            consumer.close();
+        }
+    }
+
     @Test
     public void testRemoveSubscribeRate() throws Exception {
         admin.topics().createPartitionedTopic(persistenceTopic, 2);
diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
index 3c61ce8..703269a 100644
--- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
+++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Namespaces.java
@@ -2903,7 +2903,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+    Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
 
     /**
      * Get the maxConsumersPerSubscription for a namespace asynchronously.
@@ -2959,6 +2959,20 @@
     CompletableFuture<Void> setMaxConsumersPerSubscriptionAsync(String namespace, int maxConsumersPerSubscription);
 
     /**
+     * Remove maxConsumersPerSubscription for a namespace.
+     * @param namespace
+     * @throws PulsarAdminException
+     */
+    void removeMaxConsumersPerSubscription(String namespace) throws PulsarAdminException;
+
+    /**
+     * Remove maxConsumersPerSubscription for a namespace asynchronously.
+     * @param namespace
+     * @return
+     */
+    CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(String namespace);
+
+    /**
      * Get the maxUnackedMessagesPerConsumer for a namespace.
      * <p/>
      * Response example:
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
index fd2c422..e110645 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/NamespacesImpl.java
@@ -2543,7 +2543,7 @@
     }
 
     @Override
-    public int getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
+    public Integer getMaxConsumersPerSubscription(String namespace) throws PulsarAdminException {
         try {
             return getMaxConsumersPerSubscriptionAsync(namespace).
                     get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
@@ -2602,6 +2602,30 @@
     }
 
     @Override
+    public void removeMaxConsumersPerSubscription(String namespace)
+            throws PulsarAdminException {
+        try {
+            removeMaxConsumersPerSubscriptionAsync(namespace)
+                    .get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> removeMaxConsumersPerSubscriptionAsync(
+            String namespace) {
+        NamespaceName ns = NamespaceName.get(namespace);
+        WebTarget path = namespacePath(ns, "maxConsumersPerSubscription");
+        return asyncDeleteRequest(path);
+    }
+
+    @Override
     public Integer getMaxUnackedMessagesPerConsumer(String namespace) throws PulsarAdminException {
         try {
             return getMaxUnackedMessagesPerConsumerAsync(namespace).
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7c03f2f..93d547f 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -536,6 +536,9 @@
         namespaces.run(split("get-max-consumers-per-subscription myprop/clust/ns1"));
         verify(mockNamespaces).getMaxConsumersPerSubscription("myprop/clust/ns1");
 
+        namespaces.run(split("remove-max-consumers-per-subscription myprop/clust/ns1"));
+        verify(mockNamespaces).removeMaxConsumersPerSubscription("myprop/clust/ns1");
+
         namespaces.run(split("set-max-consumers-per-subscription myprop/clust/ns1 -c 3"));
         verify(mockNamespaces).setMaxConsumersPerSubscription("myprop/clust/ns1", 3);
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
index a749271..7a2e52d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdNamespaces.java
@@ -1475,6 +1475,18 @@
         }
     }
 
+    @Parameters(commandDescription = "Remove maxConsumersPerSubscription for a namespace")
+    private class RemoveMaxConsumersPerSubscription extends CliCommand {
+        @Parameter(description = "tenant/namespace\n", required = true)
+        private java.util.List<String> params;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String namespace = validateNamespace(params);
+            getAdmin().namespaces().removeMaxConsumersPerSubscription(namespace);
+        }
+    }
+
     @Parameters(commandDescription = "Set maxConsumersPerSubscription for a namespace")
     private class SetMaxConsumersPerSubscription extends CliCommand {
         @Parameter(description = "tenant/namespace", required = true)
@@ -2190,6 +2202,7 @@
 
         jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
+        jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
 
         jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
         jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
index 184c58d..72d6f6f 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/Policies.java
@@ -86,7 +86,7 @@
     @SuppressWarnings("checkstyle:MemberName")
     public Integer max_consumers_per_topic = null;
     @SuppressWarnings("checkstyle:MemberName")
-    public int max_consumers_per_subscription = 0;
+    public Integer max_consumers_per_subscription = null;
     @SuppressWarnings("checkstyle:MemberName")
     public Integer max_unacked_messages_per_consumer = null;
     @SuppressWarnings("checkstyle:MemberName")
@@ -178,8 +178,8 @@
                     && Objects.equals(max_consumers_per_topic, other.max_consumers_per_topic)
                     && Objects.equals(max_unacked_messages_per_consumer, other.max_unacked_messages_per_consumer)
                     && Objects.equals(max_unacked_messages_per_subscription, max_unacked_messages_per_subscription)
-                    && max_consumers_per_subscription == other.max_consumers_per_subscription
-                    && compaction_threshold == other.compaction_threshold
+                    && Objects.equals(max_consumers_per_subscription, max_consumers_per_subscription)
+                    && Objects.equals(compaction_threshold, compaction_threshold)
                     && offload_threshold == other.offload_threshold
                     && Objects.equals(offload_deletion_lag_ms, other.offload_deletion_lag_ms)
                     && schema_auto_update_compatibility_strategy == other.schema_auto_update_compatibility_strategy
