Apache Pulsar‘s topic policies provide a way to configure topic-specific settings that override namespace-level policies. These policies are stored in system topics (specifically in the __change_events
topic) and are managed by the TopicPoliciesService
. It’s possible to have a third-party implementation of TopicPoliciesService
, and since the interface is public, this PIP is needed to change the interface, which is required to fix the consistency issues.
Key concepts:
The current implementation has several issues:
Example test code that demonstrates a lost update:
@Test public void testMultipleUpdates() throws Exception { String topic = "persistent://my-tenant/my-namespace/testtopic"; admin.topics().createNonPartitionedTopic(topic); InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies(); inactiveTopicPolicies.setDeleteWhileInactive(true); inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up); inactiveTopicPolicies.setMaxInactiveDurationSeconds(3600); admin.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies); DispatchRate dispatchRate = DispatchRate .builder() .dispatchThrottlingRateInMsg(1000) .dispatchThrottlingRateInByte(1000000) .build(); admin.topicPolicies().setReplicatorDispatchRate(topic, dispatchRate); String clusterId = "test"; admin.topics().setReplicationClusters(topic, List.of(clusterId)); // wait for 1 second Thread.sleep(1000); // the first assertion fails assertEquals(admin.topicPolicies().getInactiveTopicPolicies(topic), inactiveTopicPolicies); assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic), dispatchRate); assertEquals(admin.topics().getReplicationClusters(topic, true), Set.of(clusterId)); }
The current topic policies implementation suffers from critical issues that affect production systems:
Data Corruption: When multiple topic policies are updated within a short time window, some updates may be lost or corrupted. For example, setting replication clusters followed immediately by inactive topic policies can result in the replication clusters being null or incomplete.
Race Conditions: The current implementation retrieves the entire TopicPolicies object, modifies it, and writes it back. Without proper synchronization, concurrent updates can overwrite each other's changes.
Thread Safety: Unlike namespace policies (which were fixed in #9711), topic policies are still mutable and shared across threads without proper synchronization, leading to potential data corruption.
Poor Developer Experience: Developers must add artificial delays between policy updates to work around these issues, which is error-prone and impacts performance.
These issues have been reported multiple times (#24393, #21303) and affect production deployments where automated policy management is common.
The solution introduces several key changes to ensure thread-safe and consistent topic policy updates:
Immutable TopicPolicies Objects: Similar to the namespace policies fix, TopicPolicies instances will be made effectively immutable by:
clone()
method to TopicPolicies and all nested policy objectsFunctional Update API: Replace the current “get-modify-set” pattern with a functional update approach:
CompletableFuture<Void> updateTopicPoliciesAsync( TopicName topicName, boolean isGlobalPolicy, boolean skipUpdateWhenTopicPolicyDoesntExist, Consumer<TopicPolicies> policyUpdater );
This ensures that updates are applied atomically to a cloned copy of the current policies.
Sequential Update Guarantees: Implement a sequencer mechanism that ensures updates for the same topic and policy type (global/local) are processed in order:
Read-Your-Writes Consistency: Ensure that policy updates don't complete until they have been read by the system topic reader:
All policy objects will implement Cloneable
and provide proper deep cloning:
public class TopicPolicies implements Cloneable { @Override public TopicPolicies clone() { TopicPolicies cloned = (TopicPolicies) super.clone(); // Deep clone all mutable fields if (this.backLogQuotaMap != null) { cloned.backLogQuotaMap = new HashMap<>(); for (Map.Entry<String, BacklogQuotaImpl> entry : this.backLogQuotaMap.entrySet()) { cloned.backLogQuotaMap.put(entry.getKey(), entry.getValue() != null ? entry.getValue().clone() : null); } } // ... clone other fields return cloned; } }
The sequencer ensures updates are processed in order:
private final ConcurrentHashMap<Pair<TopicName, Boolean>, CompletableFuture<Void>> topicPolicyUpdateSequencer = new ConcurrentHashMap<>(); private CompletableFuture<Void> updateTopicPoliciesAsync(...) { Pair<TopicName, Boolean> sequencerKey = Pair.of(topicName, isGlobalPolicy); return topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> { CompletableFuture<Void> chain = (existingFuture == null || existingFuture.isDone()) ? CompletableFuture.completedFuture(null) : existingFuture; return chain.thenCompose(v -> performUpdate(...)); }); }
Track when policy updates have been processed by the reader:
private static class TopicPolicyMessageHandlerTracker { private List<MessageIdAdv> lastHandledMessageIds = new ArrayList<>(); private List<PriorityQueue<PendingMessageFuture>> pendingFutures = new ArrayList<>(); public synchronized void handleMessageId(MessageIdAdv messageId) { // Update last handled message ID // Complete any pending futures for messages up to this ID } public synchronized void addPendingFuture(MessageIdAdv messageId, CompletableFuture<Void> future) { // Add future to wait for this message ID to be processed } }
The admin API implementations will be updated to use the new functional approach:
// Before: TopicPolicies policies = getTopicPolicies(topicName); policies.setMaxConsumerPerTopic(maxConsumers); updateTopicPoliciesAsync(topicName, policies); // After: updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> { policies.setMaxConsumerPerTopic(maxConsumers); });
The main change is to the TopicPoliciesService
interface:
public interface TopicPoliciesService { //... /** * Update policies for a topic asynchronously. * The policyUpdater will be called with a TopicPolicies object * (either newly created or cloned from existing) which can be safely mutated. * * @param topicName topic name * @param isGlobalPolicy true if updating global policy, false for local * @param skipUpdateWhenTopicPolicyDoesntExist when true, skips the update if the topic policy does not already * exist. This is useful for cases when the policyUpdater is removing * a setting in the policy. * @param policyUpdater a function that modifies the TopicPolicies * @return a CompletableFuture that completes when the update has been * processed with read-your-writes consistency */ CompletableFuture<Void> updateTopicPoliciesAsync( TopicName topicName, boolean isGlobalPolicy, boolean skipUpdateWhenTopicPolicyDoesntExist, Consumer<TopicPolicies> policyUpdater ); //... }
The previous method signature CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies)
has been removed since it‘s not possible to guarantee thread safety and consistency when the complete TopicPolicies
object is mutated at once. In the changed method, the policyUpdater
function receives a cloned instance of the current policies, which can be safely modified, applying only a specific change. The skipUpdateWhenTopicPolicyDoesntExist
parameter allows skipping the update if the topic policy does not exist. This is useful when removing or nullifying specific policy settings where creating a new policy is unnecessary if one doesn’t already exist.
An example of this case can be seen in many of the remove methods.
// skipUpdateWhenTopicPolicyDoesntExist is set to true when removing max producers protected CompletableFuture<Void> internalRemoveMaxProducers(boolean isGlobal) { return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> { policies.setMaxProducerPerTopic(null); }); } // skipUpdateWhenTopicPolicyDoesntExist is set to true when offloadPoliciesToSet is null protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPoliciesToSet, boolean isGlobal) { return pulsar().getTopicPoliciesService() .updateTopicPoliciesAsync(topicName, isGlobal, offloadPoliciesToSet == null, policies -> { policies.setOffloadPolicies(offloadPoliciesToSet); }); }
This change does not introduce new security concerns:
Rejected: Would add significant latency and complexity
Rejected: Overkill for this use case and would require major architectural changes
Rejected: Poor developer experience and doesn't fix the root cause
Rejected: Would break existing client APIs and require significant changes
This fix follows somewhat the same pattern successfully used for namespace policies in #9711. However, the major difference is that topic policies use system topics for persistence, and there aren't similar ways to achieve consistency.