PIP-344: Correct the behavior of the public API pulsarClient.getPartitionsForTopic(topicName)

Background knowledge

Topic auto-creation

  • The partitioned topic auto-creation is dependent on pulsarClient.getPartitionsForTopic
    • It triggers partitioned metadata creation by pulsarClient.getPartitionsForTopic
    • And triggers the topic partition creation by producers' registration and consumers' registration.
  • When calling pulsarClient.getPartitionsForTopic(topicName), Pulsar will automatically create the partitioned topic metadata if it does not exist, either using HttpLookupService or BinaryProtoLookupService.

Now pulsarClient.getPartitionsForTopic's behavior | case | broker allow auto-create | param allow
create if not exists | non-partitioned topic | partitioned topic | current behavior | | --- | --- | --- | --- | --- | --- | | 1 | true/false | true/false | exists: true | | REST API: partitions: 0
Binary API: partitions: 0 | | 2 | true/false | true/false | | exists: true
partitions: 3 | REST API: partitions: 3
Binary API: partitions: 3 | | 3 | true | true | | | REST API:
  - create new: true
  - partitions: 3
Binary API:
  - create new: true
  - partitions: 3
| | 4 | true | false | | | REST API:
  - create new: false
  - partitions: 0
Binary API:
  not support
| | 5 | false | true | | | REST API:
  - create new: false
  - partitions: 0
Binary API:
  - create new: false
  - partitions: 0
|

  • Broker allows auto-create: see also the config allowAutoTopicCreation in broker.conf.
  • Param allow
    create if not exists
    • Regarding the HTTP API PersistentTopics.getPartitionedMetadata, it is an optional param which named checkAllowAutoCreation, and the default value is false.
    • Regarding the pulsar-admin API, it depends on the HTTP API PersistentTopics.getPartitionedMetadata, and it always sets the param checkAllowAutoCreation to false and can not be set manually.
    • Regarding the client API HttpLookupService.getPartitionedTopicMetadata, it depends on the HTTP API PersistentTopics.getPartitionedMetadata, and it always sets the param checkAllowAutoCreation to true and can not be set manually.
    • Regarding the client API BinaryProtoLookupService.getPartitionedTopicMetadata, it always tries to create partitioned metadata.
  • REST API & HTTP API: Since there are only two implementations of the 4 ways to get partitioned metadata, we call HTTP API PersistentTopics.getPartitionedMetadata, pulsar-admin, and HttpLookupService.getPartitionedTopicMetadata HTTP API, and call BinaryProtoLookupService.getPartitionedTopicMetadata Binary API.

Motivation

The param create if not exists of the Binary API is always true.

  • For case 4 of pulsarClient.getPartitionsForTopic's behavior, it always tries to create the partitioned metadata, but the API name is getxxx.
  • For case 5 of pulsarClient.getPartitionsForTopic's behavior, it returns a 0 partitioned metadata, but the topic does not exist. For the correct behavior of this case, we had discussed here before.
  • BTW, flink-connector-pulsar is using this API to create partitioned topic metadata.

Goals

  • Regarding the case 4: Add a new API PulsarClient.getPartitionsForTopic(String, boolean) to support the feature that just get partitioned topic metadata and do not try to create one. See detail below.
  • Regarding the case 5: Instead of returning a 0 partitioned metadata, respond to a not found error when calling pulsarClient.getPartitionsForTopic(String) if the topic does not exist.

Detailed Design

Public-facing Changes

When you call the public API pulsarClient.getPartitionsForTopic, pulsar will not create the partitioned metadata anymore.

Public API

LookupService.java


- CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName); /** * 1. Get the partitions if the topic exists. Return "{partition: n}" if a partitioned topic exists; return "{partition: 0}" if a non-partitioned topic exists. * 2. When {@param createIfAutoCreationEnabled} is "false," neither partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}. * 2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl". * 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1. * @version 3.3.0 */ + CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(TopicName topicName, boolean createIfAutoCreationEnabled);

The behavior of the new API LookupService.getPartitionedTopicMetadata(TopicName, boolean).

caseclient-side param: createIfAutoCreationEnablednon-partitioned topicpartitioned topicbroker-side: topic auto-creation strategycurrent behavior
1true/falseexists: trueREST/Binary API: {partitions: 0}
2true/falseexists: true
partitions: 2
REST/Binary API: {partitions: 2}
3trueallowAutoTopicCreation: true allowAutoTopicCreationType: non-partitionedREST/Binary API:
  - create new: true
  - {partitions: 0}
4trueallowAutoTopicCreation: true allowAutoTopicCreationType: partitioned
defaultNumPartitions: 2
REST/Binary API:
  - create new: true
  - {partitions: 2}
5falseallowAutoTopicCreation: trueREST/Binary API:
  - Not found error
6trueallowAutoTopicCreation: falseREST/Binary API:
  - Not found error

PulsarClient.java

// This API existed before. Not change it, thus ensuring compatibility.
+ @Deprecated it is not suggested to use now; please use {@link #getPartitionsForTopic(TopicName, boolean)}.
- CompletableFuture<List<String>> getPartitionsForTopic(String topic);
+ default CompletableFuture<List<String>> getPartitionsForTopic(String topic) {
+    getPartitionsForTopic(topic, true);
+ }

/**
 * 1. Get the partitions if the topic exists. Return "[{partition-0}, {partition-1}....{partition-n}}]" if a partitioned topic exists; return "[{topic}]" if a non-partitioned topic exists.
 * 2. When {@param createIfAutoCreationEnabled} is "false", neither the partitioned topic nor non-partitioned topic does not exist. You will get an {@link PulsarClientException.NotFoundException}.
 *  2-1. You will get a {@link PulsarClientException.NotSupportedException} if the broker's version is an older one that does not support this feature and the Pulsar client is using a binary protocol "serviceUrl".
 * 3. When {@param createIfAutoCreationEnabled} is "true," it will trigger an auto-creation for this topic(using the default topic auto-creation strategy you set for the broker), and the corresponding result is returned. For the result, see case 1.
 * @version 3.3.0
 */
CompletableFuture<List<String>> getPartitionsForTopic(String topic, boolean createIfAutoCreationEnabled);

The behavior of the new API PulsarClient.getPartitionsForTopic(String, boolean).

caseclient-side param: createIfAutoCreationEnablednon-partitioned topicpartitioned topicbroker-side: topic autp-creation strategycurrent behavior
1true/falseexists: trueREST/Binary API: ["{tenat}/{ns}/topic"]
2true/falseexists: true
partitions: 2
REST/Binary API: ["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"]
3trueallowAutoTopicCreation: true allowAutoTopicCreationType: non-partitionedREST/Binary API:
  - create new: true
  - ["{tenat}/{ns}/topic"]
4trueallowAutoTopicCreation: true allowAutoTopicCreationType: partitioned
defaultNumPartitions: 2
REST/Binary API:
  - create new: true
  - ["{tenat}/{ns}/topic-partition-0", "{tenat}/{ns}/topic-partition-1"]
5falseallowAutoTopicCreation: trueREST/Binary API:
  - Not found error
5trueallowAutoTopicCreation: falseREST/Binary API:
  - Not found error

Binary protocol

CommandPartitionedTopicMetadata

message CommandPartitionedTopicMetadata {
  + optional bool metadata_auto_creation_enabled = 6 [default = true];
}

FeatureFlags

message FeatureFlags {
  + optional bool supports_binary_api_get_partitioned_meta_with_param_created_false = 5 [default = false];
}

Backward & Forward Compatibility

  • Old version client and New version Broker: The client will call the old API.

  • New version client and Old version Broker: The feature flag supports_binary_api_get_partitioned_meta_with_param_created_false will be false. The client will get a not-support error if the param createIfAutoCreationEnabled is false.