blob: ed4b5de8576a55030465a7e3289708a0042b5047 [file] [log] [blame]
package org.apache.helix.manager.zk.client;
import java.util.HashMap;
import org.apache.helix.HelixException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Singleton factory that build shared ZkClient which use a shared ZkConnection.
*/
public class SharedZkClientFactory extends HelixZkClientFactory {
private static Logger LOG = LoggerFactory.getLogger(SharedZkClient.class);
// The connection pool to track all created connections.
private final HashMap<HelixZkClient.ZkConnectionConfig, ZkConnectionManager>
_connectionManagerPool = new HashMap<>();
protected SharedZkClientFactory() {}
private static class SingletonHelper {
private static final SharedZkClientFactory INSTANCE = new SharedZkClientFactory();
}
public static SharedZkClientFactory getInstance() {
return SingletonHelper.INSTANCE;
}
/**
* Build a Shared ZkClient that uses sharing ZkConnection that is created based on the specified connection config.
*
* @param connectionConfig The connection configuration that is used to search for a shared connection. Or create new connection if necessary.
* @param clientConfig
* @return Shared ZkClient
*/
@Override
public HelixZkClient buildZkClient(HelixZkClient.ZkConnectionConfig connectionConfig,
HelixZkClient.ZkClientConfig clientConfig) {
synchronized (_connectionManagerPool) {
final ZkConnectionManager zkConnectionManager =
getOrCreateZkConnectionNamanger(connectionConfig, clientConfig.getConnectInitTimeout());
if (zkConnectionManager == null) {
throw new HelixException("Failed to create a connection manager in the pool to share.");
}
LOG.info("Sharing ZkConnection {} to a new SharedZkClient.", connectionConfig.toString());
return new SharedZkClient(zkConnectionManager, clientConfig,
new SharedZkClient.OnCloseCallback() {
@Override
public void onClose() {
cleanupConnectionManager(zkConnectionManager);
}
});
}
}
private ZkConnectionManager getOrCreateZkConnectionNamanger(
HelixZkClient.ZkConnectionConfig connectionConfig, long connectInitTimeout) {
ZkConnectionManager connectionManager = _connectionManagerPool.get(connectionConfig);
if (connectionManager == null || connectionManager.isClosed()) {
connectionManager = new ZkConnectionManager(createZkConnection(connectionConfig), connectInitTimeout,
connectionConfig.toString());
_connectionManagerPool.put(connectionConfig, connectionManager);
}
return connectionManager;
}
// Close the ZkConnectionManager if no other shared client is referring to it.
// Note the close operation of connection manager needs to be synchronized with the pool operation
// to avoid race condition.
private void cleanupConnectionManager(ZkConnectionManager zkConnectionManager) {
synchronized (_connectionManagerPool) {
zkConnectionManager.close(true);
}
}
// For test only
protected int getActiveConnectionCount() {
int count = 0;
synchronized (_connectionManagerPool) {
for (ZkConnectionManager manager : _connectionManagerPool.values()) {
if (!manager.isClosed()) {
count++;
}
}
}
return count;
}
}