PIP-313: Support force unsubscribe using consumer api

Motivation

As discussed in Issue: https://github.com/apache/pulsar/issues/21451

Apache Pulsar provides a messaging queue using a Shared subscription to process unordered messages in parallel using multiple connected consumers. Shared subscription is also commonly used in data processing pipelines where they need to forcefully unsubscribe from the subscription after processing messages on the topic. One example is Pulsar-Storm adapter where Pulsar spout creates Pulsar consumers on a shared subscription for distributed processing and then unsubscribe on the topic.

However, PulsarSpout always fails to unsubscribe shared subscriptions and it also doesn't close the pulsar consumers if there is more than one consumer connected to the subscription which causes a leaked subscription and consumer for that application. It also causes a backlog on a topic due to failed unsubscribe and application team has to build external service to just address such failures.

In this usecases, client application can not successfully unsubscribe on a shared subscription when multiple consumers are connected because Pulsar client library first tries to unsubscribe which will not be successful as multiple consumers are still connected on the subscription and eventually Pulsar client lib fails to unsubscribe and close the consumer on the subscription. Because of that none of the consumers can disconnect or unsubscribe from the subscription. This will make it impossible for applications to unsubscribe on a shared subscription and they need an API to forcefully unsubscribe on a shared subscription using consumer API. We already have the admin-api to unsubscribe forcefully but adding such support in consumer API will allow applications like Pulsar-storm to unsubscribe successfully and also allow consumers to close gracefully.

Goals

Support unsubscribe API with force option in consumer API along with admin API which can help applications to unsubscribe on various subscriptions such as Failover, Shared, Key-Shared.

High Level Design

Consumer API will have additional unsubscribe api with additional flag to enable forceful unsubscribe on a subscription. Pulsar client library will pass the flag to broker while unsubscribing and broker will use it with existing broker side implementation of ubsubscribing forcefully.

Design & Implementation Details

(1) Pulsar client library changes

Add support of unsubscribe api with force option in Consumer API

Consumer.java

void unsubscribe(boolean force) throws PulsarClientException;
CompletableFuture<Void> unsubscribeAsync(boolean force);

Calling unsubscribe with force flag will make broker to fence the subscription and disconnect all the consumers forcefully to eventually unsubscribe and delete the subscription. However, reconnection of the consumer can recreate the subscription so, client application should make sure to call force-unsubscribe from all the consumers to eventually delete subscription or disable auto subscription creation based on application usecases.

(2) Protobuf changes

Pulsar client library will pass an additional force flag (with default value =false) to the broker with wire protocol change

PulsarApi.proto

message CommandUnsubscribe {
    required uint64 consumer_id   = 1;
    required uint64 request_id    = 2;
    optional bool force           = 3 [default = false];
}

(3) Broker changes

Broker already supports force delete subscription using admin-api so, broker already has implementation to unsubscribe forcefully but it doesn’t have option to trigger using binary api. Therefore, once client sends additional force flag to broker while unsubscribing , broker reads the flag and passes to the subscription API to forcefully unsubscribe the subscription.

Security Considerations

General Notes

Links

Issue: https://github.com/apache/pulsar/issues/21451 Sample PR: https://github.com/apache/pulsar/compare/master...rdhabalia:shared_unsub?expand=1 Discuss thread: https://lists.apache.org/thread/hptx8z9mktn94gvqtt4547wzcfcgdsrv Vote thread: https://lists.apache.org/thread/3kp9hfs5opw17fgmkn251sc6cd408yty