PIP-428: Change TopicPoliciesService interface to fix consistency issues

Background knowledge

Apache Pulsar‘s topic policies provide a way to configure topic-specific settings that override namespace-level policies. These policies are stored in system topics (specifically in the __change_events topic) and are managed by the TopicPoliciesService. It’s possible to have a third-party implementation of TopicPoliciesService, and since the interface is public, this PIP is needed to change the interface, which is required to fix the consistency issues.

Key concepts:

  • Topic Policies: Configuration settings specific to a topic (e.g., retention, backlog quotas, dispatch rates)
  • System Topics: Special topics used internally by Pulsar to store metadata and configurations
  • TopicPoliciesService: The service responsible for reading, writing, and caching topic policies
  • PulsarEvent: The event format used to store topic policy changes in system topics

The current implementation has several issues:

  1. Thread Safety: Topic policies are cached, and these cached instances are mutable in a non-thread-safe way
  2. Race Conditions: When multiple policy updates are made in quick succession, they can overwrite each other
  3. Consistency Issues: The lack of proper synchronization can lead to corrupted or lost policy updates

Example test code that demonstrates a lost update:

    @Test
    public void testMultipleUpdates() throws Exception {
        String topic = "persistent://my-tenant/my-namespace/testtopic";
        admin.topics().createNonPartitionedTopic(topic);

        InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
        inactiveTopicPolicies.setDeleteWhileInactive(true);
        inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
        inactiveTopicPolicies.setMaxInactiveDurationSeconds(3600);
        admin.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies);

        DispatchRate dispatchRate = DispatchRate
                .builder()
                .dispatchThrottlingRateInMsg(1000)
                .dispatchThrottlingRateInByte(1000000)
                .build();
        admin.topicPolicies().setReplicatorDispatchRate(topic, dispatchRate);

        String clusterId = "test";
        admin.topics().setReplicationClusters(topic, List.of(clusterId));

        // wait for 1 second
        Thread.sleep(1000);

        // the first assertion fails
        assertEquals(admin.topicPolicies().getInactiveTopicPolicies(topic), inactiveTopicPolicies);
        assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic), dispatchRate);
        assertEquals(admin.topics().getReplicationClusters(topic, true), Set.of(clusterId));
    }

Motivation

The current topic policies implementation suffers from critical issues that affect production systems:

  1. Data Corruption: When multiple topic policies are updated within a short time window, some updates may be lost or corrupted. For example, setting replication clusters followed immediately by inactive topic policies can result in the replication clusters being null or incomplete.

  2. Race Conditions: The current implementation retrieves the entire TopicPolicies object, modifies it, and writes it back. Without proper synchronization, concurrent updates can overwrite each other's changes.

  3. Thread Safety: Unlike namespace policies (which were fixed in #9711), topic policies are still mutable and shared across threads without proper synchronization, leading to potential data corruption.

  4. Poor Developer Experience: Developers must add artificial delays between policy updates to work around these issues, which is error-prone and impacts performance.

These issues have been reported multiple times (#24393, #21303) and affect production deployments where automated policy management is common.

Goals

In Scope

  • Implement thread-safe topic policy updates that prevent data corruption
  • Ensure sequential policy updates are processed correctly without requiring artificial delays
  • Provide atomic update operations that modify only the specific policy fields being changed
  • Maintain backward compatibility with existing TopicPoliciesService consumers
  • Implement proper synchronization for concurrent policy updates
  • Add read-your-writes consistency guarantees for policy updates

Out of Scope

  • Changing the underlying storage mechanism for topic policies (system topics)
  • Modifying the wire protocol or client APIs
  • Implementing transactional updates across multiple topics
  • Changing how policies are inherited from namespace to topic level

High Level Design

The solution introduces several key changes to ensure thread-safe and consistent topic policy updates:

  1. Immutable TopicPolicies Objects: Similar to the namespace policies fix, TopicPolicies instances will be made effectively immutable by:

    • Adding a clone() method to TopicPolicies and all nested policy objects
    • Never modifying cached/shared instances directly
    • Always working with cloned copies when updates are needed
  2. Functional Update API: Replace the current “get-modify-set” pattern with a functional update approach:

    CompletableFuture<Void> updateTopicPoliciesAsync(
        TopicName topicName,
        boolean isGlobalPolicy,
        boolean skipUpdateWhenTopicPolicyDoesntExist,
        Consumer<TopicPolicies> policyUpdater
    );
    

    This ensures that updates are applied atomically to a cloned copy of the current policies.

  3. Sequential Update Guarantees: Implement a sequencer mechanism that ensures updates for the same topic and policy type (global/local) are processed in order:

    • Each topic+policyType combination has its own update queue in the form of chained CompletableFutures
    • Updates are chained using CompletableFutures to maintain order
    • The queue is cleaned up when no updates are pending
  4. Read-Your-Writes Consistency: Ensure that policy updates don't complete until they have been read by the system topic reader:

    • Track message IDs of policy updates
    • Wait for the reader to process each update before completing the operation
    • This prevents the race condition where subsequent reads might not see recent updates

Detailed Design

Design & Implementation Details

1. TopicPolicies Cloning

All policy objects will implement Cloneable and provide proper deep cloning:

public class TopicPolicies implements Cloneable {
    @Override
    public TopicPolicies clone() {
        TopicPolicies cloned = (TopicPolicies) super.clone();
        // Deep clone all mutable fields
        if (this.backLogQuotaMap != null) {
            cloned.backLogQuotaMap = new HashMap<>();
            for (Map.Entry<String, BacklogQuotaImpl> entry : this.backLogQuotaMap.entrySet()) {
                cloned.backLogQuotaMap.put(entry.getKey(),
                    entry.getValue() != null ? entry.getValue().clone() : null);
            }
        }
        // ... clone other fields
        return cloned;
    }
}

2. Update Sequencer

The sequencer ensures updates are processed in order:

private final ConcurrentHashMap<Pair<TopicName, Boolean>, CompletableFuture<Void>>
    topicPolicyUpdateSequencer = new ConcurrentHashMap<>();

private CompletableFuture<Void> updateTopicPoliciesAsync(...) {
    Pair<TopicName, Boolean> sequencerKey = Pair.of(topicName, isGlobalPolicy);

    return topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> {
        CompletableFuture<Void> chain = (existingFuture == null || existingFuture.isDone())
            ? CompletableFuture.completedFuture(null)
            : existingFuture;

        return chain.thenCompose(v -> performUpdate(...));
    });
}

3. Message Tracking for Consistency

Track when policy updates have been processed by the reader:

private static class TopicPolicyMessageHandlerTracker {
    private List<MessageIdAdv> lastHandledMessageIds = new ArrayList<>();
    private List<PriorityQueue<PendingMessageFuture>> pendingFutures = new ArrayList<>();

    public synchronized void handleMessageId(MessageIdAdv messageId) {
        // Update last handled message ID
        // Complete any pending futures for messages up to this ID
    }

    public synchronized void addPendingFuture(MessageIdAdv messageId, CompletableFuture<Void> future) {
        // Add future to wait for this message ID to be processed
    }
}

4. Updated Admin API Usage

The admin API implementations will be updated to use the new functional approach:

// Before:
TopicPolicies policies = getTopicPolicies(topicName);
policies.setMaxConsumerPerTopic(maxConsumers);
updateTopicPoliciesAsync(topicName, policies);

// After:
updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> {
    policies.setMaxConsumerPerTopic(maxConsumers);
});

Public-facing Changes

Public API

The main change is to the TopicPoliciesService interface:

public interface TopicPoliciesService {
    //...
    /**
     * Update policies for a topic asynchronously.
     * The policyUpdater will be called with a TopicPolicies object
     * (either newly created or cloned from existing) which can be safely mutated.
     *
     * @param topicName       topic name
     * @param isGlobalPolicy  true if updating global policy, false for local
     * @param skipUpdateWhenTopicPolicyDoesntExist when true, skips the update if the topic policy does not already
     *                                             exist. This is useful for cases when the policyUpdater is removing
     *                                             a setting in the policy.
     * @param policyUpdater   a function that modifies the TopicPolicies
     * @return a CompletableFuture that completes when the update has been
     *         processed with read-your-writes consistency
     */
    CompletableFuture<Void> updateTopicPoliciesAsync(
        TopicName topicName,
        boolean isGlobalPolicy,
        boolean skipUpdateWhenTopicPolicyDoesntExist,
        Consumer<TopicPolicies> policyUpdater
    );
   //...
}

The previous method signature CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies) has been removed since it‘s not possible to guarantee thread safety and consistency when the complete TopicPolicies object is mutated at once. In the changed method, the policyUpdater function receives a cloned instance of the current policies, which can be safely modified, applying only a specific change. The skipUpdateWhenTopicPolicyDoesntExist parameter allows skipping the update if the topic policy does not exist. This is useful when removing or nullifying specific policy settings where creating a new policy is unnecessary if one doesn’t already exist.

An example of this case can be seen in many of the remove methods.

    // skipUpdateWhenTopicPolicyDoesntExist is set to true when removing max producers
    protected CompletableFuture<Void> internalRemoveMaxProducers(boolean isGlobal) {
        return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> {
            policies.setMaxProducerPerTopic(null);
        });
    }

    // skipUpdateWhenTopicPolicyDoesntExist is set to true when offloadPoliciesToSet is null
    protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPoliciesToSet,
                                                                boolean isGlobal) {
        return pulsar().getTopicPoliciesService()
              .updateTopicPoliciesAsync(topicName, isGlobal, offloadPoliciesToSet == null, policies -> {
                 policies.setOffloadPolicies(offloadPoliciesToSet);
              });
    }

Security Considerations

This change does not introduce new security concerns:

  • The update mechanism respects existing authorization checks
  • No new APIs or endpoints are exposed
  • The functional update pattern actually improves security by preventing accidental policy corruption

Backward & Forward Compatibility

  • The change is backward compatible as it's internal to the broker
  • No special upgrade steps required

Alternatives

Pessimistic Locking: Use distributed locks for each topic's policies

Rejected: Would add significant latency and complexity

Transactional Updates: Implement full ACID transactions for policy updates

Rejected: Overkill for this use case and would require major architectural changes

Client-side Retries: Document the issue and require clients to implement retry logic

Rejected: Poor developer experience and doesn't fix the root cause

Optimistic Concurrency Control: Use version numbers for policies

Rejected: Would break existing client APIs and require significant changes

General Notes

This fix follows somewhat the same pattern successfully used for namespace policies in #9711. However, the major difference is that topic policies use system topics for persistence, and there aren't similar ways to achieve consistency.

Links