Implementation PR: #24387
This proposal introduces a mechanism for the Pulsar client to gracefully handle unavailable service endpoints. When configured with a multi-endpoint service URL, the client will temporarily quarantine unresponsive endpoints and automatically attempt to connect to the next available one. This change introduces an intelligent failover strategy with exponential backoff, significantly improving the client's connection reliability and resilience in environments where broker availability can fluctuate.
In the current Pulsar client versions, consumer or producer creation is the responsibility of PulsarClient, we can build a PulsarClient by pass serviceUrl param. The serviceUrl param supports multiple endpoints with the syntax pulsar://host1:6650,host2:6650,host3:6650,...
or pulsar+ssl://host1:6651,host2:6651,host3:6651,...
. When a Pulsar client tries to create a producer or consumer, it may pick an endpoint from the list that is temporarily unavailable. The creation will fail, but worse, subsequent attempts may continue to pick the same unavailable endpoint, leading to repeated failures. This occurs even when other endpoints in the list are perfectly healthy and available.
public class Example { public static void main(String[] args) throws PulsarClientException { PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl("pulsar://host1:6650,host2:6650,host3:6650") .build(); Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64) .topic("persistent://public/default/my-topic") .subscriptionName("my-subscription") .subscribe(); Producer<Long> producer = pulsarClient.newProducer(Schema.INT64) .topic("persistent://public/default/my-topic") .create(); ..... } }
When most of the endpoints in service url are unavailable (but there is at least one available endpoint), creating consumers and producers through PulsarClient will most likely fail. This is an unexpected behavior for Pulsar users since when multiple endpoints are provided, it would be expected that when an endpoint is unavailable, an available endpoint would be used. This behavior is counter-intuitive for users who expect the client to transparently fail over to a working endpoint. To address this, we propose an intelligent feedback mechanism that allows the client to identify, quarantine, and eventually retry unavailable endpoints, ensuring that new connections are routed to healthy brokers whenever possible.
Optimize the PulsarClient code logic, remove unavailable serviceUrl endpoints through the feedback mechanism, and improve the success rate of PulsarClient requests.
We will introduce an endpoint quarantining strategy based on passive feedback from connection attempts. Explanation of nouns:
availableAddressPool
: a pool containing all available addresses, subset of serviceUrl.allAddressPool
: a pool containing all addresses in serviceUrl.By default, the client will only try to connect to endpoints from the availableAddressPool.
markHostAvailability
method in ServiceNameResolver
interface like:/** * Mark the availability of a host. * @param address the host address to mark availability for * @param isAvailable true if the host is available, false otherwise */ default void markHostAvailability(InetSocketAddress address, boolean isAvailable){ // Default implementation does nothing // Subclass can override this method to implement host availability tracking }
When resolve one address in serviceUrl, the ConnectionPool
will invoke markHostAvailability
to tell availability about that address, so the ServiceNameResolver
can remove it from availableAddressPool
if unavailable (or recover back to availableAddressPool
if available). If all addressed in availableAddressPool
are unavailable, the resolver implementation could use a round-robin approach to select address from the allAddressPool
.
For removed addresses, the quarantine duration will increase exponentially. The quarantine duration can be customized via new optional configurations serviceUrlQuarantineInitDuration
and serviceUrlQuarantineMaxDuration
in ClientBuilder
That is to say, assuming that serviceUrlQuarantineInitDuration
is 1 minute and serviceUrlQuarantineMaxDuration
is 30 minutes, after host1 is judged as unavailable for the first time, it will be quarantined for 1 minute and then added to availableAddressPool
again. If it is judged as unavailable again the next time, it will be quarantined for 2 minutes. Similarly, if it is judged as unavailable continuously, the isolation time will be 4 minutes, 8 minutes, 16 minutes, 30 minutes (it reaches serviceUrlQuarantineMaxDuration and no longer increases), 30 minutes... If host1 is determined to be available, the next quarantine duration will be reset to 1 minute.
private void example() throws PulsarClientException { // The failed endpoint will be removed and retried after a period of time, with an initial quarantine duration of 30 seconds increasing exponentially. After reaching the maximum quarantine duration of 1 hour, will be maintained once an hour until a certain succeeds, and the quarantine duration is reset to 0. PulsarClient pulsarClient = PulsarClient.builder() .serviceUrl("pulsar://host1:6650,host2:6650,host3:6650") .serviceUrlQuarantineInitDuration(30000, TimeUnit.MILLISECONDS) .serviceUrlQuarantineMaxDuration(1, TimeUnit.HOURS) .build(); // disable removing unavailable serviceUrl endpoints by pass 0 to serviceUrlQuarantineInitDuration and serviceUrlQuarantineMaxDuration PulsarClient pulsarClientDisableReovery = PulsarClient.builder() .serviceUrl("pulsar://host1:6650,host2:6650,host3:6650") .serviceUrlQuarantineInitDuration(0, TimeUnit.MILLISECONDS) .serviceUrlQuarantineMaxDuration(0, TimeUnit.MILLISECONDS) .build(); }
serviceUrlQuarantineInitDuration
and serviceUrlQuarantineMaxDuration
in ClientBuilder
/** * Configure the service URL init quarantine duration. * For single host serviceUrl, this setting has no effect. * * <p>When the client is unable to connect to the a endpint from serviceUrl with multiple hosts, that endpoint * will be quarantined for a specific duration that is determined in a certain emponential way. * The init value of a single quarantine duration is set by * @param serviceUrlQuarantineInitDuration. A successful usage of the endpoint will reset the * duration to the initial value and move it back to the available addresses pool. * * <p> * A value of 0 means don't quarantine any endpoints even if they fail. * @param serviceUrlQuarantineInitDuration the initial quarantine duration * for unavailable endpoint. Defaults to 60 seconds. * @param unit the time unit for the quarantine duration * @return the client builder instance */ ClientBuilder serviceUrlQuarantineInitDuration(long serviceUrlQuarantineInitDuration, TimeUnit unit); /** * Configure the service URL max quarantine duration. * For single host serviceUrl, this setting has no effect. * * <p>When the client is unable to connect to the a endpint from serviceUrl with multiple hosts, that endpoint * will be quarantined for a specific duration that is determined in a certain emponential way. * The max value of a single quarantine duration is set by * @param serviceUrlQuarantineMaxDuration. A successful usage of the endpoint will reset the * duration to the initial value and move it back to the available addresses pool. * * <p> * A value of 0 means don't quarantine any endpoints even if they fail. * @param serviceUrlQuarantineMaxDuration the maximum quarantine duration for * unavailable endpoint. Defaults to 1 day. * @param unit the time unit for the quarantine duration * @return the client builder instance */ ClientBuilder serviceUrlQuarantineMaxDuration(long serviceUrlQuarantineMaxDuration, TimeUnit unit);
You can do upgrading or reverting normally, no specified steps are needed to do.
#22935 removes unavailable endpoints through a regular health check mechanism, but this brings new problems (frequent creation of connections and increased system load). So this pip will not use the health check solution.