Optimize TopicPolicies#delayedDelivery Enabled and TickTimeMillis with HierarchyTopicPolicies (#13649)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 211707d..08f0388 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,6 +58,7 @@
 import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.Policies;
@@ -170,6 +171,8 @@
                         data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
         topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
         topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+        topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
+        topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
     }
 
     protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
@@ -189,6 +192,12 @@
                 .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
         topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
         topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
+        topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
+                Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
+                        .map(DelayedDeliveryPolicies::isActive).orElse(null));
+        topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue(
+                Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
+                        .map(DelayedDeliveryPolicies::getTickTime).orElse(null));
         topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
                 subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
         Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
@@ -219,6 +228,8 @@
 
         topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
         topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
+        topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
+        topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
     }
 
     private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 990f2fc..46edd82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,9 +184,7 @@
 
     private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
     private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
-    public volatile long delayedDeliveryTickTimeMillis = 1000;
     private final long backloggedCursorThresholdEntries;
-    public volatile boolean delayedDeliveryEnabled = false;
     public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
 
     protected final MessageDeduplication messageDeduplication;
@@ -256,9 +254,6 @@
         this.ledger = ledger;
         this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
         this.replicators = new ConcurrentOpenHashMap<>(16, 1);
-        this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
-        this.delayedDeliveryTickTimeMillis =
-                brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
         this.backloggedCursorThresholdEntries =
                 brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
         initializeRateLimiterIfNeeded(Optional.empty());
@@ -2446,10 +2441,6 @@
         updateUnackedMessagesAppliedOnSubscription(data);
         updateUnackedMessagesExceededOnConsumer(data);
 
-        if (data.delayed_delivery_policies != null) {
-            delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
-            delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
-        }
         //If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
         Optional<TopicPolicies> topicPolicies = getTopicPolicies();
 
@@ -3042,10 +3033,7 @@
     }
 
     public long getDelayedDeliveryTickTimeMillis() {
-        //Topic level setting has higher priority than namespace level
-        return getTopicPolicies()
-                .map(TopicPolicies::getDelayedDeliveryTickTimeMillis)
-                .orElse(delayedDeliveryTickTimeMillis);
+        return topicPolicies.getDelayedDeliveryTickTimeMillis().get();
     }
 
     public int getMaxUnackedMessagesOnConsumer() {
@@ -3053,10 +3041,7 @@
     }
 
     public boolean isDelayedDeliveryEnabled() {
-        //Topic level setting has higher priority than namespace level
-        return getTopicPolicies()
-                .map(TopicPolicies::getDelayedDeliveryEnabled)
-                .orElse(delayedDeliveryEnabled);
+        return topicPolicies.getDelayedDeliveryEnabled().get();
     }
 
     public int getMaxUnackedMessagesOnSubscription() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 662c191..77901a6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -42,8 +42,11 @@
     final PolicyHierarchyValue<Integer> topicMaxMessageSize;
     final PolicyHierarchyValue<Integer> messageTTLInSeconds;
     final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
+    final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
+    final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
     final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
 
+
     public HierarchyTopicPolicies() {
         replicationClusters = new PolicyHierarchyValue<>();
         deduplicationEnabled = new PolicyHierarchyValue<>();
@@ -59,5 +62,7 @@
                 .build();
         topicMaxMessageSize = new PolicyHierarchyValue<>();
         messageTTLInSeconds = new PolicyHierarchyValue<>();
+        delayedDeliveryEnabled = new PolicyHierarchyValue<>();
+        delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
     }
 }