blob: 0a9ddc10f7c1bbf81507133d55426735180d009c [file] [log] [blame]
package org.apache.helix.manager.zk.client;
import java.util.HashSet;
import java.util.Set;
import org.I0Itec.zkclient.IZkConnection;
import org.I0Itec.zkclient.serialize.SerializableSerializer;
import org.apache.helix.HelixException;
import org.apache.helix.manager.zk.BasicZkSerializer;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A ZkConnection manager that maintain connection status and allows additional watchers to be registered.
* It will forward events to those watchers.
*
* TODO Separate connection management logic from the raw ZkClient class.
* So this manager is a peer to the ZkClient. Connection Manager for maintaining the connection and
* ZkClient to handle user request.
* After this is done, Dedicated ZkClient hires one manager for it's connection.
* While multiple Shared ZkClients can use single connection manager if possible.
*/
class ZkConnectionManager extends org.apache.helix.manager.zk.ZkClient {
private static Logger LOG = LoggerFactory.getLogger(ZkConnectionManager.class);
// Client type that is used in monitor, and metrics.
private final static String MONITOR_TYPE = "ZkConnectionManager";
private final String _monitorKey;
// Set of all registered watchers
private final Set<Watcher> _sharedWatchers = new HashSet<>();
/**
* Construct and init a ZkConnection Manager.
*
* @param zkConnection
* @param connectionTimeout
*/
protected ZkConnectionManager(IZkConnection zkConnection, long connectionTimeout,
String monitorKey) {
super(zkConnection, (int) connectionTimeout, HelixZkClient.DEFAULT_OPERATION_TIMEOUT,
new BasicZkSerializer(new SerializableSerializer()), MONITOR_TYPE, monitorKey, null, true);
_monitorKey = monitorKey;
LOG.info("ZkConnection {} was created for sharing.", _monitorKey);
}
/**
* Register event watcher.
*
* @param watcher
* @return true if the watcher is newly added. false if it is already in record.
*/
protected synchronized boolean registerWatcher(Watcher watcher) {
if (isClosed()) {
throw new HelixException("Cannot add watcher to a closed client.");
}
return _sharedWatchers.add(watcher);
}
/**
* Unregister the event watcher.
*
* @param watcher
* @return number of the remaining event watchers
*/
protected synchronized int unregisterWatcher(Watcher watcher) {
_sharedWatchers.remove(watcher);
return _sharedWatchers.size();
}
@Override
public void process(final WatchedEvent event) {
super.process(event);
forwardingEvent(event);
}
private synchronized void forwardingEvent(final WatchedEvent event) {
// note that process (then forwardingEvent) could be triggered during construction, when sharedWatchers is still null.
if (_sharedWatchers == null || _sharedWatchers.isEmpty()) {
return;
}
// forward event to all the watchers' event queue
for (final Watcher watcher : _sharedWatchers) {
watcher.process(event);
}
}
@Override
public void close() {
// Enforce closing, if any watcher exists, throw Exception.
close(false);
}
protected synchronized void close(boolean skipIfWatched) {
cleanupInactiveWatchers();
if (_sharedWatchers.size() > 0) {
if (skipIfWatched) {
LOG.debug("Skip closing ZkConnection due to existing watchers. Watcher count {}.",
_sharedWatchers.size());
return;
} else {
throw new HelixException(
"Cannot close the connection when there are still shared watchers listen on the event.");
}
}
super.close();
LOG.info("ZkConnection {} was closed.", _monitorKey);
}
private void cleanupInactiveWatchers() {
Set<Watcher> closedWatchers = new HashSet<>();
for (Watcher watcher : _sharedWatchers) {
// TODO ideally, we shall have a ClosableWatcher interface so as to check accordingly. -- JJ
if (watcher instanceof SharedZkClient && ((SharedZkClient) watcher).isClosed()) {
closedWatchers.add(watcher);
}
}
_sharedWatchers.removeAll(closedWatchers);
}
}