PIP-39 introduces system topics and topic-level policies. Currently, the topic policies service (TopicPoliciesService
) has only one implementation (SystemTopicBasedTopicPoliciesService
) that depends on system topics. Therefore, the following configurations are required (though they are enabled by default):
systemTopicEnabled=true topicLevelPoliciesEnabled=true
However, using system topics to manage topic policies may not always be the best choice. Users might need an alternative approach to manage topic policies.
TopicPoliciesService
InterfaceThe TopicPoliciesService
interface is poorly designed for third-party implementations due to the following reasons:
Methods that Should Not Be Exposed:
addOwnedNamespaceBundleAsync
and removeOwnedNamespaceBundleAsync
are used internally in SystemTopicBasedTopicPoliciesService
.getTopicPoliciesBypassCacheAsync
is used only in tests to replay the __change_events
topic and construct the topic policies map.Confusing and Inconsistent getTopicPolicies
Methods:
getTopicPolicies
:TopicPolicies getTopicPolicies(TopicName topicName, boolean isGlobal) throws TopicPoliciesCacheNotInitException; TopicPolicies getTopicPolicies(TopicName topicName) throws TopicPoliciesCacheNotInitException;
getTopicPolicies(topicName, false)
.TopicPoliciesCacheNotInitException
.These methods are hard to use and are primarily used in tests. The getTopicPoliciesAsyncWithRetry
method uses a user-provided executor and backoff policy to call getTopicPolicies
until TopicPoliciesCacheNotInitException
is not thrown:
default CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsyncWithRetry(TopicName topicName, final Backoff backoff, ScheduledExecutorService scheduledExecutorService, boolean isGlobal) {
The getTopicPolicies
methods are confusing for users who want to implement their own topic policies service. They need to look deeply into Pulsar's source code to understand these details.
PR #21231 adds two asynchronous overrides that are more user-friendly:
CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName, boolean isGlobal); CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(@Nonnull TopicName topicName);
Now there are five asynchronous get
methods. Unlike getTopicPolicies
, getTopicPoliciesAsync(topic)
is not equivalent to getTopicPoliciesAsync(topic, false)
. Instead:
getTopicPoliciesAsync(topic)
tries getting local policies first, then global policies if absent.getTopicPoliciesAsync(topic, true)
tries getting global policies.getTopicPoliciesAsync(topic, false)
tries getting local policies.Since PR #12517, topic policies support global policies across clusters. Therefore, there are local and global policies.
Currently:
getTopicPoliciesAsync(TopicName)
is used in BrokerService#getTopicPoliciesBypassSystemTopic
for initializing topic policies of PersistentTopic
objects.getTopicPoliciesAsyncWithRetry
is used in AdminResource#getTopicPoliciesAsyncWithRetry
for all topic policies admin APIs.There is also a sixth method, getTopicPoliciesIfExists
, which tries to get local topic policies from the cache:
TopicPolicies getTopicPoliciesIfExists(TopicName topicName);
However, this method is called just because there was no getTopicPoliciesAsync
methods before and getTopicPolicies
is hard to use. For example, here is an example code snippet in PersistentTopicsBase#internalUpdatePartitionedTopicAsync
:
TopicPolicies topicPolicies = pulsarService.getTopicPoliciesService().getTopicPoliciesIfExists(topicName); if (topicPolicies != null && topicPolicies.getReplicationClusters() != null) { replicationClusters = topicPolicies.getReplicationClustersSet(); }
With the new getTopicPoliciesAsync
methods, this code can be replaced with:
pulsarService.getTopicPoliciesService().getTopicPoliciesAsync(topicName, GetType.LOCAL_ONLY) .thenAccept(topicPolicies -> { if (topicPolicies.isPresent() && topicPolicies.get().getReplicationClusters() != null) { replicationClusters = topicPolicies.get().getReplicationClustersSet(); } });
Make TopicPoliciesService
pluggable so users can customize the topic policies service via another backend metadata store.
Redesign a clear and simple TopicPoliciesService
interface for users to customize.
Add a topicPoliciesServiceClassName
configuration to specify the topic policies service class name. If the class name is not the default SystemTopicBasedTopicPoliciesService
, systemTopicEnabled
will not be required unless the implementation requires it.
Add a unified method to get topic policies:
enum GetType { DEFAULT, // try getting the local topic policies, if not present, then get the global policies GLOBAL_ONLY, // only get the global policies LOCAL_ONLY, // only get the local policies } CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type);
getTopicPoliciesAsyncWithRetry
will be replaced by getTopicPoliciesAsync(topicName, LOCAL_ONLY)
or getTopicPoliciesAsync(topicName, GLOBAL_ONLY)
. The other two original getTopicPoliciesAsync
methods and getTopicPoliciesIfExists
will be replaced by getTopicPoliciesAsync(topicName, DEFAULT)
.
Move addOwnedNamespaceBundleAsync
and removeOwnedNamespaceBundleAsync
to private methods of SystemTopicBasedTopicPoliciesService
.
Add a TestUtils
class in tests to include getTopicPolicies
and getTopicPoliciesBypassCacheAsync
methods.
Remove the generic parameter from TopicPolicyListener
as the value type should always be TopicPolicies
. Mark this listener interface as Stable
.
Add a PulsarService
parameter to the start
method so that the implementation can have a constructor with an empty parameter list and get the PulsarService
instance from the start
method.
Add a boolean
return value to registerListener
since PersistentTopic#initTopicPolicy
checks if the topic policies are enabled. The return value will indicate if the TopicPoliciesService
instance is topicPoliciesServiceClassName.DISABLED
.
Since the topic policies service is now decoupled from system topics, remove all isSystemTopicAndTopicLevelPoliciesEnabled()
calls.
Here is the refactored TopicPoliciesService
interface:
/** * Delete policies for a topic asynchronously. * * @param topicName topic name */ CompletableFuture<Void> deleteTopicPoliciesAsync(TopicName topicName); /** * Update policies for a topic asynchronously. * * @param topicName topic name * @param policies policies for the topic name */ CompletableFuture<Void> updateTopicPoliciesAsync(TopicName topicName, TopicPolicies policies); /** * It controls the behavior of {@link TopicPoliciesService#getTopicPoliciesAsync}. */ enum GetType { DEFAULT, // try getting the local topic policies, if not present, then get the global policies GLOBAL_ONLY, // only get the global policies LOCAL_ONLY, // only get the local policies } /** * Retrieve the topic policies. */ CompletableFuture<Optional<TopicPolicies>> getTopicPoliciesAsync(TopicName topicName, GetType type); /** * Start the topic policy service. */ default void start(PulsarService pulsar) { } /** * Close the resources if necessary. */ default void close() throws Exception { } /** * Registers a listener for topic policies updates. * * <p> * The listener will receive the latest topic policies when they are updated. If the policies are removed, the * listener will receive a null value. Note that not every update is guaranteed to trigger the listener. For * instance, if the policies change from A -> B -> null -> C in quick succession, only the final state (C) is * guaranteed to be received by the listener. * In summary, the listener is guaranteed to receive only the latest value. * </p> * * @return true if the listener is registered successfully */ boolean registerListener(TopicName topicName, TopicPolicyListener listener); /** * Unregister the topic policies listener. */ void unregisterListener(TopicName topicName, TopicPolicyListener listener);
@InterfaceStability.Stable public interface TopicPolicyListener { void onUpdate(TopicPolicies data); }
Add a new configuration topicPoliciesServiceClassName
.
If downstream applications need to call APIs from TopicPoliciesService
, they should modify the code to use the new API.
TopicPoliciesService
Interface CompatibleThe current interface is poorly designed because it has only one implementation. Keeping these methods will burden developers who want to develop a customized interface. They need to understand where these confusing methods are called and handle them carefully.