Optimize TopicPolicies#delayedDelivery Enabled and TickTimeMillis with HierarchyTopicPolicies (#13649)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
index 211707d..08f0388 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractTopic.java
@@ -58,6 +58,7 @@
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
@@ -170,6 +171,8 @@
data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString())));
topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds());
+ topicPolicies.getDelayedDeliveryEnabled().updateTopicValue(data.getDelayedDeliveryEnabled());
+ topicPolicies.getDelayedDeliveryTickTimeMillis().updateTopicValue(data.getDelayedDeliveryTickTimeMillis());
}
protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) {
@@ -189,6 +192,12 @@
.updateNamespaceValue(namespacePolicies.max_consumers_per_subscription);
topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies);
topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled);
+ topicPolicies.getDelayedDeliveryEnabled().updateNamespaceValue(
+ Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
+ .map(DelayedDeliveryPolicies::isActive).orElse(null));
+ topicPolicies.getDelayedDeliveryTickTimeMillis().updateNamespaceValue(
+ Optional.ofNullable(namespacePolicies.delayed_delivery_policies)
+ .map(DelayedDeliveryPolicies::getTickTime).orElse(null));
topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue(
subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled));
Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(
@@ -219,6 +228,8 @@
topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize());
topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds());
+ topicPolicies.getDelayedDeliveryEnabled().updateBrokerValue(config.isDelayedDeliveryEnabled());
+ topicPolicies.getDelayedDeliveryTickTimeMillis().updateBrokerValue(config.getDelayedDeliveryTickTimeMillis());
}
private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index 990f2fc..46edd82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -184,9 +184,7 @@
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
- public volatile long delayedDeliveryTickTimeMillis = 1000;
private final long backloggedCursorThresholdEntries;
- public volatile boolean delayedDeliveryEnabled = false;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
protected final MessageDeduplication messageDeduplication;
@@ -256,9 +254,6 @@
this.ledger = ledger;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
- this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
- this.delayedDeliveryTickTimeMillis =
- brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
this.backloggedCursorThresholdEntries =
brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeRateLimiterIfNeeded(Optional.empty());
@@ -2446,10 +2441,6 @@
updateUnackedMessagesAppliedOnSubscription(data);
updateUnackedMessagesExceededOnConsumer(data);
- if (data.delayed_delivery_policies != null) {
- delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
- delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
- }
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
Optional<TopicPolicies> topicPolicies = getTopicPolicies();
@@ -3042,10 +3033,7 @@
}
public long getDelayedDeliveryTickTimeMillis() {
- //Topic level setting has higher priority than namespace level
- return getTopicPolicies()
- .map(TopicPolicies::getDelayedDeliveryTickTimeMillis)
- .orElse(delayedDeliveryTickTimeMillis);
+ return topicPolicies.getDelayedDeliveryTickTimeMillis().get();
}
public int getMaxUnackedMessagesOnConsumer() {
@@ -3053,10 +3041,7 @@
}
public boolean isDelayedDeliveryEnabled() {
- //Topic level setting has higher priority than namespace level
- return getTopicPolicies()
- .map(TopicPolicies::getDelayedDeliveryEnabled)
- .orElse(delayedDeliveryEnabled);
+ return topicPolicies.getDelayedDeliveryEnabled().get();
}
public int getMaxUnackedMessagesOnSubscription() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
index 662c191..77901a6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/policies/data/HierarchyTopicPolicies.java
@@ -42,8 +42,11 @@
final PolicyHierarchyValue<Integer> topicMaxMessageSize;
final PolicyHierarchyValue<Integer> messageTTLInSeconds;
final PolicyHierarchyValue<Integer> maxConsumerPerTopic;
+ final PolicyHierarchyValue<Boolean> delayedDeliveryEnabled;
+ final PolicyHierarchyValue<Long> delayedDeliveryTickTimeMillis;
final PolicyHierarchyValue<Integer> maxConsumersPerSubscription;
+
public HierarchyTopicPolicies() {
replicationClusters = new PolicyHierarchyValue<>();
deduplicationEnabled = new PolicyHierarchyValue<>();
@@ -59,5 +62,7 @@
.build();
topicMaxMessageSize = new PolicyHierarchyValue<>();
messageTTLInSeconds = new PolicyHierarchyValue<>();
+ delayedDeliveryEnabled = new PolicyHierarchyValue<>();
+ delayedDeliveryTickTimeMillis = new PolicyHierarchyValue<>();
}
}