PIP-165: Auto release client useless connections

Currently, the Pulsar client keeps the connection even if no producers or consumers use this connection. If a client produces messages to topic A and we have 3 brokers 1, 2, 3. Due to the bundle unloading(load manager) topic ownership will change from A to B and finally to C. For now, the client-side will keep 3 connections to all 3 brokers. We can optimize this part to reduce the broker side connections, the client should close the unused connections.

So a mechanism needs to be added to release unwanted connections.

Why are there idle connections?

1.When configuration maxConnectionsPerHosts is not set to 0, the connection is not closed at all. The design is to hold a fixed number of connections per Host, avoiding frequent closing and creation.

https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java#L325-L335

2-1. When clients receive command-close, will reconnect immediately. It's designed to make it possible to reconnect, rebalance, and unload.

https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionHandler.java#L122-L141

2-2. The broker will close client connections before writing ownership info to the ZK. Then clients will get deprecated broker address when it tries lookup.

https://github.com/apache/pulsar/blob/72349117c4fd9825adaaf16d3588a695e8a9dd27/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1282-L1293

Goal

Automatically release connections that are no longer used.

  • Scope
    • Pulsar client Contains connections used by consumers, Producers, and Transactions.

    • Pulsar proxy The connection between proxy and broker has two parts: For lookup commands; For consumers, producers commands and other commands. The connection “For consumers, producers commands and other commands” is managed by DirectProxyHandler, which holds the connection until the client is closed, so it does not affect of producers or consumers, These connections do not require additional closing. The connection “For lookup commands”: When the proxy is configured metadataStoreUrl, the Lookup Command will select the registered broker by rotation training and create a connection. If we do not optimize the broker load balancing algorithm, all connections are considered useful connections. When the cluster is large, holds so many connections becomes redundant. Later, I will try to put forward other proposals to improve this phenomenon, so this proposal does not involve proxy connection release.

Approach

Periodically check for idle connections and close them.

Changes

API changes

ClientCnx added an idle check method to mark idle time.

/** Create time. **/
private final long createTime;
/** The time when marks the connection is idle. **/
private long IdleMarkTime;
/** The time when the last valid data was transmitted. **/
private long lastWorkTime;
/** Idle stat **/
private IdleState stat;
/**
  * Check client connection is now free. This method may change the state to idle.
  * This method will not change the state to idle.
  */
public boolean doIdleCheck();
/** Get stat **/
public IdleState getIdleStat();
/** Change stat **/
public boolean setStat(IdleState originalIdleStat, IdleState newIdleStat);

public enum IdleState {
  /** The connection is in use. **/
  using,
  /** The connection is not in use. **/
  idle_marked,
  /** The connection will be released soon. **/
  before_release,
  /** The connection has already been released. **/
  released;
}

Configuration changes

We can set the check frequency and release rule for idle connections at ClientConfigurationData.


@ApiModelProperty( name = "connectionMaxIdleSeconds", value = "Release the connection if it is not used for more than [connectionMaxIdleSeconds] seconds. " + "If [connectionMaxIdleSeconds] < 0, disabled the feature that auto release the idle connections" ) private int connectionMaxIdleSeconds = 180; @ApiModelProperty( name = "connectionIdleDetectionIntervalSeconds", value = "How often check idle connections" ) private int connectionIdleDetectionIntervalSeconds = 60;

Implementation

  • Pulsar client If no consumer, producer, or transaction uses the current connection, release it.
    • A fixed rate task in ConnectionPool, will do two things. 1: mark the connection idle 2: when reach max idle time release it
    • ConnectionCnx holds the Consumer ,Producer and TransactionClient, so we can rely on this information to determine if the connection is still in use