PIP-425: Support connecting with next available endpoint for multi-endpoint serviceUrls

Implementation PR: #24387

Abstract

This proposal introduces a mechanism for the Pulsar client to gracefully handle unavailable service endpoints. When configured with a multi-endpoint service URL, the client will temporarily quarantine unresponsive endpoints and automatically attempt to connect to the next available one. This change introduces an intelligent failover strategy with exponential backoff, significantly improving the client's connection reliability and resilience in environments where broker availability can fluctuate.

Background knowledge

In the current Pulsar client versions, consumer or producer creation is the responsibility of PulsarClient, we can build a PulsarClient by pass serviceUrl param. The serviceUrl param supports multiple endpoints with the syntax pulsar://host1:6650,host2:6650,host3:6650,... or pulsar+ssl://host1:6651,host2:6651,host3:6651,.... When a Pulsar client tries to create a producer or consumer, it may pick an endpoint from the list that is temporarily unavailable. The creation will fail, but worse, subsequent attempts may continue to pick the same unavailable endpoint, leading to repeated failures. This occurs even when other endpoints in the list are perfectly healthy and available.

public class Example {
    public static void main(String[] args) throws PulsarClientException {
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://host1:6650,host2:6650,host3:6650")
                .build();
        Consumer<Long> consumer = pulsarClient.newConsumer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .subscriptionName("my-subscription")
                .subscribe();

        Producer<Long> producer = pulsarClient.newProducer(Schema.INT64)
                .topic("persistent://public/default/my-topic")
                .create();

        .....

    }
}

Motivation

When most of the endpoints in service url are unavailable (but there is at least one available endpoint), creating consumers and producers through PulsarClient will most likely fail. This is an unexpected behavior for Pulsar users since when multiple endpoints are provided, it would be expected that when an endpoint is unavailable, an available endpoint would be used. This behavior is counter-intuitive for users who expect the client to transparently fail over to a working endpoint. To address this, we propose an intelligent feedback mechanism that allows the client to identify, quarantine, and eventually retry unavailable endpoints, ensuring that new connections are routed to healthy brokers whenever possible.

Goals

Optimize the PulsarClient code logic, remove unavailable serviceUrl endpoints through the feedback mechanism, and improve the success rate of PulsarClient requests.

In Scope

  • Improve Connection Resilience: Modify the PulsarClient to avoid repeatedly trying to connect to unavailable endpoints.
  • Introduce Endpoint Quarantining: When a connection to an endpoint fails, temporarily remove it from the pool of available endpoints.
  • Implement Exponential Backoff: Re-introduce quarantined endpoints for connection attempts after a dynamically increasing delay, allowing them to recover without overwhelming the system.

Detailed Design

We will introduce an endpoint quarantining strategy based on passive feedback from connection attempts. Explanation of nouns:

  • availableAddressPool: a pool containing all available addresses, subset of serviceUrl.
  • allAddressPool: a pool containing all addresses in serviceUrl.

By default, the client will only try to connect to endpoints from the availableAddressPool.

  • On Connection Failure: If the client fails to connect to an endpoint, it will be temporarily removed from the availableAddressPool and placed into “quarantine”.
  • Exponential Backoff: The quarantine duration will increase exponentially upon consecutive failures.
  • On Connection Success: A successful connection to an endpoint proves it is available. If the endpoint was previously failing, its quarantine backoff state will be reset. It will be immediately considered part of the availableAddressPool.
  • Fallback: If the availableAddressPool ever becomes empty, the client will fall back to select endpoint from the allAddressPool by a round-robin approach. This prevents a deadlock where the client cannot connect because all known endpoints are simultaneously in quarantine. As #22934 mentioned, add markHostAvailability method in ServiceNameResolver interface like:
    /**
     * Mark the availability of a host.
     * @param address the host address to mark availability for
     * @param isAvailable true if the host is available, false otherwise
     */
    default void markHostAvailability(InetSocketAddress address, boolean isAvailable){
        // Default implementation does nothing
        // Subclass can override this method to implement host availability tracking
    }

When resolve one address in serviceUrl, the ConnectionPool will invoke markHostAvailability to tell availability about that address, so the ServiceNameResolver can remove it from availableAddressPool if unavailable (or recover back to availableAddressPool if available). If all addressed in availableAddressPool are unavailable, the resolver implementation could use a round-robin approach to select address from the allAddressPool.

For removed addresses, the quarantine duration will increase exponentially. The quarantine duration can be customized via new optional configurations serviceUrlQuarantineInitDuration and serviceUrlQuarantineMaxDuration in ClientBuilder That is to say, assuming that serviceUrlQuarantineInitDuration is 1 minute and serviceUrlQuarantineMaxDuration is 30 minutes, after host1 is judged as unavailable for the first time, it will be quarantined for 1 minute and then added to availableAddressPool again. If it is judged as unavailable again the next time, it will be quarantined for 2 minutes. Similarly, if it is judged as unavailable continuously, the isolation time will be 4 minutes, 8 minutes, 16 minutes, 30 minutes (it reaches serviceUrlQuarantineMaxDuration and no longer increases), 30 minutes... If host1 is determined to be available, the next quarantine duration will be reset to 1 minute.

Usage Example

    private void example() throws PulsarClientException {
        // The failed endpoint will be removed and retried after a period of time, with an initial quarantine duration of 30 seconds increasing exponentially. After reaching the maximum quarantine duration of 1 hour, will be maintained once an hour until a certain succeeds, and the quarantine duration is reset to 0.
        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://host1:6650,host2:6650,host3:6650")
                .serviceUrlQuarantineInitDuration(30000, TimeUnit.MILLISECONDS)
                .serviceUrlQuarantineMaxDuration(1, TimeUnit.HOURS)
                .build();

        // disable removing unavailable serviceUrl endpoints by pass 0 to serviceUrlQuarantineInitDuration and serviceUrlQuarantineMaxDuration
        PulsarClient pulsarClientDisableReovery = PulsarClient.builder()
                .serviceUrl("pulsar://host1:6650,host2:6650,host3:6650")
                .serviceUrlQuarantineInitDuration(0, TimeUnit.MILLISECONDS)
                .serviceUrlQuarantineMaxDuration(0, TimeUnit.MILLISECONDS)
                .build();
}

Public-facing Changes

Public API

  1. Add two optional configs serviceUrlQuarantineInitDuration and serviceUrlQuarantineMaxDuration in ClientBuilder
    /**
     * Configure the service URL init quarantine duration.
     * For single host serviceUrl, this setting has no effect.
     *
     * <p>When the client is unable to connect to the a endpint from serviceUrl with multiple hosts, that endpoint
     *  will be quarantined for a specific duration that is determined in a certain emponential way.
     * The init value of a single quarantine duration is set by
     * @param serviceUrlQuarantineInitDuration. A successful usage of the endpoint will reset the
     * duration to the initial value and move it back to the available addresses pool.
     *
     * <p>
     * A value of 0 means don't quarantine any endpoints even if they fail.
     * @param serviceUrlQuarantineInitDuration the initial quarantine duration
     * for unavailable endpoint. Defaults to 60 seconds.
     * @param unit the time unit for the quarantine duration
     * @return the client builder instance
     */
    ClientBuilder serviceUrlQuarantineInitDuration(long serviceUrlQuarantineInitDuration, TimeUnit unit);

    /**
     * Configure the service URL max quarantine duration.
     * For single host serviceUrl, this setting has no effect.
     *
     * <p>When the client is unable to connect to the a endpint from serviceUrl with multiple hosts, that endpoint
     * will be quarantined for a specific duration that is determined in a certain emponential way.
     * The max value of a single quarantine duration is set by
     * @param serviceUrlQuarantineMaxDuration. A successful usage of the endpoint will reset the
     * duration to the initial value and move it back to the available addresses pool.
     *
     * <p>
     * A value of 0 means don't quarantine any endpoints even if they fail.
     * @param serviceUrlQuarantineMaxDuration the maximum quarantine duration for
     * unavailable endpoint. Defaults to 1 day.
     * @param unit the time unit for the quarantine duration
     * @return the client builder instance
     */
    ClientBuilder serviceUrlQuarantineMaxDuration(long serviceUrlQuarantineMaxDuration, TimeUnit unit);

Backward & Forward Compatibility

You can do upgrading or reverting normally, no specified steps are needed to do.

Alternatives

#22935 removes unavailable endpoints through a regular health check mechanism, but this brings new problems (frequent creation of connections and increased system load). So this pip will not use the health check solution.

Links