PIP-321: Split the responsibilities of namespace replication-clusters

Background knowledge

Pulsar‘s geo-replication mechanism is typically used for disaster recovery, enabling the replication of persistently stored message data across multiple data centers. For instance, your application publishes data in one region, and you would like to process it for consumption in other regions. With Pulsar’s geo-replication mechanism, messages can be produced and consumed in different geo-replicated regions. See the introduction of geo-replication to get more information.[1]

A client can set allowed clusters for a tenant. The allowed-cluster for the tenant is a cluster that the tenant can access.

A client can set replication clusters for a namespace, and the pulsar broker internally manages replication to all the replication clusters. And the replication clusters for a namespace must be a subgroup of the tenant's allowed clusters.

A client cannot set allowed clusters at the namespace level, but the functionality of replication-clusters essentially accomplishes something similar. A topic cannot be created or loaded at clusters that are not specified in the replication-clusters of the namespace policy. Subsequently, PIP-8 introduced the concept of peer-clusters for global namespace redirection and fails the PartitionedMetadata-Lookup request if the global namespace's replication-clusters do not include the current/peer-clusters. More information about peer-clusters can be found in PIP-8. [2]

A namespace has multiple topics. Once a namespace is configured with replication clusters, all the topics under this namespace will enable replication in these clusters.

Namespace Policy is a configuration in the namespace level, that is stored in the configuration store, e.g. zookeeper, and this configuration can not be accessed across multiple clusters with different configuration stores.

Replication clusters can be configured at the message level. Pulsar support setting replication clusters when send messages.

producer.newMessage().replicationClusters(List.of("cluster1", "cluster2")).send();

[1] https://pulsar.apache.org/docs/3.1.x/concepts-replication [2] https://github.com/apache/pulsar/pull/903 [3] https://github.com/apache/pulsar/wiki/PIP-92%3A-Topic-policy-across-multiple-clusters

Motivation

Geo-replication at the topic level and message level can‘t work as expected when geo-replication is disabled at the namespace level and the clusters use a shared configuration store. Let’s see an example:

Example For Topic Level:

  • Environment:

    • cluster1 and cluster2 in different regions sharing the same configuration store.
  • Replication clusters configuration:

    • Set namespace ns replication clusters : cluster1 (local cluster)
    • Set topic ns/topic1 replication clusters : cluster1, cluster2.
  • Expected:

    • Topic ns/topic1 can replicate between cluster1 and cluster2.
  • Actual:

    • Topic cannot be created at cluster2.
PRECONDITION_FAILED: Namespace missing local cluster name in clusters list: local_cluster=cluster2 ns=ns clusters=[cluster1]

Example For Message Level

  • Environment:

    • cluster1 and cluster2 in different regions sharing the same Zookeeper cluster.
  • Replication clusters configuration:

    • Set namespace ns replication clusters : cluster1 (local cluster)
    • Set replication clusters when send message1: cluster1, cluster2.
  • Expected:

    • Message1 can replicate between cluster1 and cluster2.
  • Actual:

    • Topic cannot be created at cluster2, and so the message1 can not be replicated to cluster2.
PRECONDITION_FAILED: Namespace missing local cluster name in clusters list: local_cluster=cluster2 ns=ns clusters=[cluster1]

The root cause of these issues is that topics cannot access clusters that are not included in the replication-clusters of the namespace policy. If you set both clusters to the namespace‘s replication clusters. All the topics under this namespace will start to replicate data between clusters unless they set replication clusters to one cluster to the topic level for all topics. It’s super hard for Pulsar maintainers and impossible to control the newly created topics (create a topic first and then set topic policies). The replication clusters and allowed clusters are different. Employing one configuration for two purposes is insoluble. But in the current implementation, the replication-clusters and allowed-Clusters are all configured by specifying the replication clusters. This will make the topic unable to have its replication policies.

To support geo-replication policies at the topic level and the message level, we must make the cluster configuration at the namespace level more clearly. Introduce allowed-clusters at the namespace level and make replication-clusters only the default replication clusters for the topics under the namespace.

Goals

In Scope

The namespace will have a clearer configuration for clusters. Users can use replication-clusters and allowed-clusters to specify the clusters that the data of the namespace will replicate and the clusters that can load the topics under the namespace; it‘s similar to the tenant’s allowed-clusters.

Out of Scope

This proposal can be used to solve the problem of topic-level and message level geo-replication can not work as expected. It is the initial motivation for this proposal, but this proposal does not involve modifications to geo-replication.

Out of this proposal, there are others actions needed to perform.

  1. Limit the replication configuration at the namespace level, topic level and message level.
    • The replication_clusters at the namespace level, topic level and message level should be the subgroup of allowed_clusters at the namespace level.
      • Otherwise, 400 Bad Request will be returned when specify the replication_clusters at the namespace level or topic level.
      • Fail send request with a NotAllowedException exception when the replication_clusters of the message is not the subgroup of the allowed_clusters at the namespace level.
  2. Implement allowed_clusters at the topic level, this should need another proposal.
    • If allowed_clusters is implemented in the topic policy, the replication_clusters at the topic level and message level should be the subgroup of the allowed_clusters of the topic level.
      • Otherwise, 400 Bad Request will be returned when specify the replication_clustersat the namespace level.
      • Fail send request with a NotAllowedException exception when the replication_clusters of the message is not the subgroup of the allowed_clusters at the topic level.

Fail request of sending message when the message set the configuration of the replication_clusters at the message level.

High Level Design

A new namespace policy option allowed_clusters will be added. The allowed_clusters policy will specify the clusters where topics under this namespace can be created or loaded. The replication_clusters indicates the clusters that are used to create a full mesh replication for all topics under this namespace.

When a namespace has the policy with allowed_clusters and replication_clusters, the topics under this namespace will replicate data to all replication_clusters by default. Additionally, the topic can have a flexible replication clusters configuration, which should be a subset of the allowed_clusters of the namespace.

If allowed_clusters is not set, replication_clusters will be used as the default value for allowed_clusters.

If neither allowed_clusters nor replication_clusters are set, topics under this namespace will only be able to publish/subscribe at the local cluster. The local cluster will be added in the allowed_clusters automatically when creating namespace.

Message-level replication is similar to topic-level replication. The replication clusters of a message should be the subset of the allowed_clusters, and are the replication_clusters configured at the topic level or namespace level by default.

Detailed Design

Public-facing Changes

Public API

setNamespaceAllowedClusters Endpoint

This new endpoint allows setting the list of allowed clusters for a specific namespace.

Method:

POST

Path:

/namespaces/{tenant}/{namespace}/allowedClusters

HTTP Body Parameters:

  • clusterIds: A list of cluster IDs.

Response Codes:

  • 400 Bad Request: The list of allowed clusters should include all replication clusters.
  • 403 Forbidden: The requester does not have admin permissions.
  • 404 Not Found: The specified tenant, cluster, or namespace does not exist.
  • 409 Conflict: A peer-cluster cannot be part of an allowed-cluster.
  • 412 Precondition Failed: The namespace is not global or the provided cluster IDs are invalid.

Explanation for 409 Conflict: This follows the behavior of namespace replication clusters. As per PIP-8, a peer-cluster cannot be part of a replication-cluster. Similarly, for allowed-clusters, users could enable replication at the topic level, hence a peer-cluster cannot be part of allowed-clusters as well.

getNamespaceAllowedClusters Endpoint

This new endpoint allows retrieving the list of allowed clusters for a specific namespace.

Method:

GET

Path:

/namespaces/{tenant}/{namespace}/allowedClusters

Response Codes:

  • 403 Forbidden: The requester does not have admin permissions.
  • 404 Not Found: The specified tenant, cluster, or namespace does not exist.
  • 412 Precondition Failed: The namespace is not global.

Example Response:

[
    "cluster1",
    "cluster2",
    "cluster3"
]

Binary protocol

Configuration

CLI

setNamespaceAllowedClusters Command

This new command allows you to set the list of allowed clusters for a specific namespace.

Usage:

$ pulsar admin namespaces set-allowed-clusters --clusters <cluster-ids> <tenant>/<namespace>

Options:

  • --clusters, -c
    • A comma-separated list of cluster IDs.

Response Codes:

  • 400 Bad Request: The allowed clusters should contain all replication clusters.
  • 403 Forbidden: You do not have admin permission.
  • 404 Not Found: The tenant, cluster, or namespace does not exist.
  • 409 Conflict: A peer-cluster cannot be part of an allowed-cluster.
  • 412 Precondition Failed: The namespace is not global or the cluster IDs are invalid.

Explanation for 409 Conflict: This follows the behavior of namespace replication clusters. In PIP-8, it introduced the concept of a peer-cluster, which cannot be part of a replication-cluster. For the allowed-clusters, users could enable replication at the topic level, so the peer-cluster cannot be part of the allowed-clusters too.

getNamespaceAllowedClusters Command

This new command allows you to retrieve the list of allowed clusters for a specific namespace.

Usage:

$ pulsar admin namespaces get-allowed-clusters <tenant>/<namespace>

Response Codes:

  • 403 Forbidden: You do not have admin permission.
  • 404 Not Found: The tenant, cluster, or namespace does not exist.
  • 412 Precondition Failed: The namespace is not global.

Example Response:

"cluster1"
"cluster2"
"cluster3"

CreateNamespace Command

Add a new option to this command to set allowed clusters when create a new namespace.

Usage:

$ pulsar admin namespaces create [options] tenant/namespace

Options:

  • --bundles, -b
    • number of bundles to activate. Default: 0
  • --clusters, -c
    • List of replication clusters this namespace will be assigned. (Modified*)
  • --allowed-clusters, -a (New*)
    • List of allowed clusters this namespace will be assigned. When the --allowed-clusters option is not specified, the --clusters option will be used as --allowed-clusters.

Response Codes:

  • 400 Bad Request: The specified policies is invalid. The allowed clusters should contain all replication clusters and a peer-cluster cannot be part of an allowed-clusters or replication clusters. (New*)
  • 403 Forbidden: Don't have admin permission.
  • 404 Not Found: Tenant or cluster doesn't exist
  • 409 Conflict: Namespace already exists.
  • 412 Precondition Failed: Namespace name is not valid.

Modified* - This option is modified in this proposal.
New* - This option is new added in this proposal.

Metrics

None.

Monitoring

None.

Security Considerations

If the broker enables authentication, then this configuration can only be set by the client who was authenticated. If the user does not implement their AuthorizationProvider, only the superuser and tenant admin is allowed to access the newly added API.

Backward & Forward Compatibility

The new namespace policy will not impact the behavior of existing systems. If users do not utilize the new feature, no operation should be executed during an upgrade or revert

Revert

To revert, simply switch back to the old version of Pulsar. However, note that topics will be removed from those clusters that are not included in the replication clusters configured at the namespace level. For example, replication clusters at the topic level, for topic1, is cluster1, cluster2, cluster3. Replication clusters at the namespace level is cluster1, cluster2. Allowed clusters at the namespace level is cluster1, cluster2, cluster3. After revert pulsar version to the old one, the topic1 will be deleted at the cluster3.

Upgrade

No additional operations need to be performed. The replication-clusters will be the default value of allowed-clusters.

Alternatives

Approach 1

Changes

  • Remove the limit for the system topic and then for the change_event topic, which is used to store the topic policy of all the topics under a namespace.
  • Check the replication_clusters in the topic policy when performing operations such as lookup, fetchPartitionMetadata, and loadingTopic.

Work Flow

  1. Specify the replication_clusters of topic1 for cluster1 and cluster2.
  2. The broker receives this policy message and sets the replication_clusters in the metadata of the message.
  3. The policy message will be replicated to cluster2, assuming cluster1 is the local cluster.
  4. Retrieve the topic policy from the TopicPoliciesService when performing operations such as lookup, fetchPartitionMetadata, and loadingTopic.

Notes: Steps 3 and 4 cannot be replaced with manual operations by the users, as topic policies cannot be specified when the topic has not been created yet.

Deprecation Rationale

The replication_clusters specified at the message level may not be a subset of the replication_clusters at the topic or namespace level. For instance, replication_clusters specified in topic1 could be cluster1 and cluster2, and some messages could set replication_clusters for cluster1 and cluster3. This means the topic should be loaded in cluster3, which is not specified in the replication_clusters of the topic policies or namespace policies. Moreover, messages are sent to the broker side after the topic is created, so any topic could be loaded at any clusters in the allowed_clusters of the tenant policy. This renders approach 1 meaningless.

Approach 2

Changes

  • Following the discussion of Approach 1, we understand that checking for the replication_clusters of the topic policies when a topic is created or loaded is meaningless. Any topic could be loaded at any clusters in the allowed_clusters of the tenant policy. So, could we remove the check for the replication_clusters when performing operations such as lookup, fetchPartitionMetadata, or creating a topic?

Work Flow

  1. Specify the replication_clusters of topic1 for cluster1 and cluster2.
  2. Topic1 could be created at cluster1 and cluster2. Replication at the topic level works as expected.
  3. Specify the replication_clusters of message1 sent to topic1 for cluster1 and cluster3.
  4. Topic1 could be created at cluster1 and cluster3. Replication at the message level works as expected.

Deprecation Rationale

In fact, Approach 2 has the same issue as the approach adopted in this proposal. Whether we remove the limit of the replication_clusters or add the allowed_clusters in the namespace, both are providing a feature for the topics. They allow a topic to be loaded in different clusters and the data of the topic is not replicated from these clusters. To minimize the impact, it is reasonable to introduce a more granular control of allowed_clusters at the namespace level.

General Notes

Links