blob: 81e2a7849148d827b3be914b4d9e19c3c52abf25 [file] [log] [blame] [view]
# PIP-428: Change TopicPoliciesService interface to fix consistency issues
# Background knowledge
Apache Pulsar's topic policies provide a way to configure topic-specific settings that override namespace-level policies. These policies are stored in system topics (specifically in the `__change_events` topic) and are managed by the `TopicPoliciesService`.
It's possible to have a third-party implementation of `TopicPoliciesService`, and since the interface is public, this PIP is needed to change the interface, which is required to fix the consistency issues.
**Key concepts:**
- **Topic Policies**: Configuration settings specific to a topic (e.g., retention, backlog quotas, dispatch rates)
- **System Topics**: Special topics used internally by Pulsar to store metadata and configurations
- **TopicPoliciesService**: The service responsible for reading, writing, and caching topic policies
- **PulsarEvent**: The event format used to store topic policy changes in system topics
The current implementation has several issues:
1. **Thread Safety**: Topic policies are cached, and these cached instances are mutable in a non-thread-safe way
2. **Race Conditions**: When multiple policy updates are made in quick succession, they can overwrite each other
3. **Consistency Issues**: The lack of proper synchronization can lead to corrupted or lost policy updates
Example test code that demonstrates a lost update:
```java
@Test
public void testMultipleUpdates() throws Exception {
String topic = "persistent://my-tenant/my-namespace/testtopic";
admin.topics().createNonPartitionedTopic(topic);
InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
inactiveTopicPolicies.setDeleteWhileInactive(true);
inactiveTopicPolicies.setInactiveTopicDeleteMode(InactiveTopicDeleteMode.delete_when_subscriptions_caught_up);
inactiveTopicPolicies.setMaxInactiveDurationSeconds(3600);
admin.topicPolicies().setInactiveTopicPolicies(topic, inactiveTopicPolicies);
DispatchRate dispatchRate = DispatchRate
.builder()
.dispatchThrottlingRateInMsg(1000)
.dispatchThrottlingRateInByte(1000000)
.build();
admin.topicPolicies().setReplicatorDispatchRate(topic, dispatchRate);
String clusterId = "test";
admin.topics().setReplicationClusters(topic, List.of(clusterId));
// wait for 1 second
Thread.sleep(1000);
// the first assertion fails
assertEquals(admin.topicPolicies().getInactiveTopicPolicies(topic), inactiveTopicPolicies);
assertEquals(admin.topicPolicies().getReplicatorDispatchRate(topic), dispatchRate);
assertEquals(admin.topics().getReplicationClusters(topic, true), Set.of(clusterId));
}
```
# Motivation
The current topic policies implementation suffers from critical issues that affect production systems:
1. **Data Corruption**: When multiple topic policies are updated within a short time window, some updates may be lost or corrupted. For example, setting replication clusters followed immediately by inactive topic policies can result in the replication clusters being null or incomplete.
2. **Race Conditions**: The current implementation retrieves the entire TopicPolicies object, modifies it, and writes it back. Without proper synchronization, concurrent updates can overwrite each other's changes.
3. **Thread Safety**: Unlike namespace policies (which were fixed in #9711), topic policies are still mutable and shared across threads without proper synchronization, leading to potential data corruption.
4. **Poor Developer Experience**: Developers must add artificial delays between policy updates to work around these issues, which is error-prone and impacts performance.
These issues have been reported multiple times (#24393, #21303) and affect production deployments where automated policy management is common.
# Goals
## In Scope
- Implement thread-safe topic policy updates that prevent data corruption
- Ensure sequential policy updates are processed correctly without requiring artificial delays
- Provide atomic update operations that modify only the specific policy fields being changed
- Maintain backward compatibility with existing TopicPoliciesService consumers
- Implement proper synchronization for concurrent policy updates
- Add read-your-writes consistency guarantees for policy updates
## Out of Scope
- Changing the underlying storage mechanism for topic policies (system topics)
- Modifying the wire protocol or client APIs
- Implementing transactional updates across multiple topics
- Changing how policies are inherited from namespace to topic level
# High Level Design
The solution introduces several key changes to ensure thread-safe and consistent topic policy updates:
1. **Immutable TopicPolicies Objects**: Similar to the namespace policies fix, TopicPolicies instances will be made effectively immutable by:
- Adding a `clone()` method to TopicPolicies and all nested policy objects
- Never modifying cached/shared instances directly
- Always working with cloned copies when updates are needed
2. **Functional Update API**: Replace the current "get-modify-set" pattern with a functional update approach:
```java
CompletableFuture<Void> updateTopicPoliciesAsync(
TopicName topicName,
boolean isGlobalPolicy,
boolean skipUpdateWhenTopicPolicyDoesntExist,
Consumer<TopicPolicies> policyUpdater
);
```
This ensures that updates are applied atomically to a cloned copy of the current policies.
3. **Sequential Update Guarantees**: Implement a sequencer mechanism that ensures updates for the same topic and policy type (global/local) are processed in order:
- Each topic+policyType combination has its own update queue in the form of chained CompletableFutures
- Updates are chained using CompletableFutures to maintain order
- The queue is cleaned up when no updates are pending
4. **Read-Your-Writes Consistency**: Ensure that policy updates don't complete until they have been read by the system topic reader:
- Track message IDs of policy updates
- Wait for the reader to process each update before completing the operation
- This prevents the race condition where subsequent reads might not see recent updates
# Detailed Design
## Design & Implementation Details
### 1. TopicPolicies Cloning
All policy objects will implement `Cloneable` and provide proper deep cloning:
```java
public class TopicPolicies implements Cloneable {
@Override
public TopicPolicies clone() {
TopicPolicies cloned = (TopicPolicies) super.clone();
// Deep clone all mutable fields
if (this.backLogQuotaMap != null) {
cloned.backLogQuotaMap = new HashMap<>();
for (Map.Entry<String, BacklogQuotaImpl> entry : this.backLogQuotaMap.entrySet()) {
cloned.backLogQuotaMap.put(entry.getKey(),
entry.getValue() != null ? entry.getValue().clone() : null);
}
}
// ... clone other fields
return cloned;
}
}
```
### 2. Update Sequencer
The sequencer ensures updates are processed in order:
```java
private final ConcurrentHashMap<Pair<TopicName, Boolean>, CompletableFuture<Void>>
topicPolicyUpdateSequencer = new ConcurrentHashMap<>();
private CompletableFuture<Void> updateTopicPoliciesAsync(...) {
Pair<TopicName, Boolean> sequencerKey = Pair.of(topicName, isGlobalPolicy);
return topicPolicyUpdateSequencer.compute(sequencerKey, (key, existingFuture) -> {
CompletableFuture<Void> chain = (existingFuture == null || existingFuture.isDone())
? CompletableFuture.completedFuture(null)
: existingFuture;
return chain.thenCompose(v -> performUpdate(...));
});
}
```
### 3. Message Tracking for Consistency
Track when policy updates have been processed by the reader:
```java
private static class TopicPolicyMessageHandlerTracker {
private List<MessageIdAdv> lastHandledMessageIds = new ArrayList<>();
private List<PriorityQueue<PendingMessageFuture>> pendingFutures = new ArrayList<>();
public synchronized void handleMessageId(MessageIdAdv messageId) {
// Update last handled message ID
// Complete any pending futures for messages up to this ID
}
public synchronized void addPendingFuture(MessageIdAdv messageId, CompletableFuture<Void> future) {
// Add future to wait for this message ID to be processed
}
}
```
### 4. Updated Admin API Usage
The admin API implementations will be updated to use the new functional approach:
```java
// Before:
TopicPolicies policies = getTopicPolicies(topicName);
policies.setMaxConsumerPerTopic(maxConsumers);
updateTopicPoliciesAsync(topicName, policies);
// After:
updateTopicPoliciesAsync(topicName, isGlobal, false, policies -> {
policies.setMaxConsumerPerTopic(maxConsumers);
});
```
## Public-facing Changes
### Public API
The main change is to the `TopicPoliciesService` interface:
```java
public interface TopicPoliciesService {
//...
/**
* Update policies for a topic asynchronously.
* The policyUpdater will be called with a TopicPolicies object
* (either newly created or cloned from existing) which can be safely mutated.
*
* @param topicName topic name
* @param isGlobalPolicy true if updating global policy, false for local
* @param skipUpdateWhenTopicPolicyDoesntExist when true, skips the update if the topic policy does not already
* exist. This is useful for cases when the policyUpdater is removing
* a setting in the policy.
* @param policyUpdater a function that modifies the TopicPolicies
* @return a CompletableFuture that completes when the update has been
* processed with read-your-writes consistency
*/
CompletableFuture<Void> updateTopicPoliciesAsync(
TopicName topicName,
boolean isGlobalPolicy,
boolean skipUpdateWhenTopicPolicyDoesntExist,
Consumer<TopicPolicies> policyUpdater
);
//...
}
```
The previous method signature
`CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies)` has been removed since
it's not possible to guarantee thread safety and consistency when the complete `TopicPolicies` object is mutated at
once.
In the changed method, the `policyUpdater` function receives a cloned instance of the current policies, which can be
safely modified, applying only a specific change.
The `skipUpdateWhenTopicPolicyDoesntExist` parameter allows skipping the update if the topic policy does not exist. This
is useful when removing or nullifying specific policy settings where creating a new policy is unnecessary if one doesn't
already exist.
An example of this case can be seen in many of the remove methods.
```java
// skipUpdateWhenTopicPolicyDoesntExist is set to true when removing max producers
protected CompletableFuture<Void> internalRemoveMaxProducers(boolean isGlobal) {
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, isGlobal, true, policies -> {
policies.setMaxProducerPerTopic(null);
});
}
// skipUpdateWhenTopicPolicyDoesntExist is set to true when offloadPoliciesToSet is null
protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPoliciesToSet,
boolean isGlobal) {
return pulsar().getTopicPoliciesService()
.updateTopicPoliciesAsync(topicName, isGlobal, offloadPoliciesToSet == null, policies -> {
policies.setOffloadPolicies(offloadPoliciesToSet);
});
}
```
## Security Considerations
This change does not introduce new security concerns:
- The update mechanism respects existing authorization checks
- No new APIs or endpoints are exposed
- The functional update pattern actually improves security by preventing accidental policy corruption
## Backward & Forward Compatibility
- The change is backward compatible as it's internal to the broker
- No special upgrade steps required
## Alternatives
### Pessimistic Locking: Use distributed locks for each topic's policies
Rejected: Would add significant latency and complexity
### Transactional Updates: Implement full ACID transactions for policy updates
Rejected: Overkill for this use case and would require major architectural changes
### Client-side Retries: Document the issue and require clients to implement retry logic
Rejected: Poor developer experience and doesn't fix the root cause
### Optimistic Concurrency Control: Use version numbers for policies
Rejected: Would break existing client APIs and require significant changes
## General Notes
This fix follows somewhat the same pattern successfully used for namespace policies in #9711. However, the major difference is that topic policies use system topics for persistence, and there aren't similar ways to achieve consistency.
## Links
- Mailing List discussion thread: https://lists.apache.org/thread/dk5lks30hkxj2fl8nqrcds9f5j7hryfl
- Mailing List voting thread: https://lists.apache.org/thread/ndh0dhfjqf1htcmodgyyqt6k4bgdfj3x