| package org.apache.helix.zookeeper.zkclient; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.lang.reflect.Method; |
| import java.util.Arrays; |
| import java.util.Date; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.OptionalLong; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.CopyOnWriteArraySet; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import javax.management.JMException; |
| import org.apache.helix.zookeeper.api.client.ChildrenSubscribeResult; |
| import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys; |
| import org.apache.helix.zookeeper.datamodel.SessionAwareZNRecord; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.exception.ZkClientException; |
| import org.apache.helix.zookeeper.util.GZipCompressionUtil; |
| import org.apache.helix.zookeeper.util.ZNRecordUtil; |
| import org.apache.helix.zookeeper.zkclient.annotation.PreFetchChangedData; |
| import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallMonitorContext; |
| import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncCallbacks; |
| import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryCallContext; |
| import org.apache.helix.zookeeper.zkclient.callback.ZkAsyncRetryThread; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkException; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkSessionMismatchedException; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; |
| import org.apache.helix.zookeeper.zkclient.metric.ZkClientMonitor; |
| import org.apache.helix.zookeeper.zkclient.serialize.BasicZkSerializer; |
| import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; |
| import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer; |
| import org.apache.helix.zookeeper.zkclient.util.ExponentialBackoffStrategy; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.KeeperException.ConnectionLossException; |
| import org.apache.zookeeper.KeeperException.SessionExpiredException; |
| import org.apache.zookeeper.Op; |
| import org.apache.zookeeper.OpResult; |
| import org.apache.zookeeper.WatchedEvent; |
| import org.apache.zookeeper.Watcher; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.Watcher.Event.KeeperState; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.apache.zookeeper.data.ACL; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * "Native ZkClient": not to be used directly. |
| * |
| * Abstracts the interaction with zookeeper and allows permanent (not just one time) watches on |
| * nodes in ZooKeeper. |
| * WARN: Do not use this class directly, use {@link org.apache.helix.zookeeper.impl.client.ZkClient} instead. |
| */ |
| public class ZkClient implements Watcher { |
| private static final Logger LOG = LoggerFactory.getLogger(ZkClient.class); |
| |
| public static final long TTL_NOT_SET = -1L; |
| private static final long MAX_RECONNECT_INTERVAL_MS = 30000; // 30 seconds |
| |
| // If number of children exceeds this limit, getChildren() should not retry on connection loss. |
| // This is a workaround for exiting retry on connection loss because of large number of children. |
| // 100K is specific for helix messages which use UUID, making packet length just below 4 MB. |
| // TODO: remove it once we have a better way to exit retry for this case |
| private static final int NUM_CHILDREN_LIMIT = 100 * 1000; |
| |
| private static final boolean SYNC_ON_SESSION = Boolean.parseBoolean( |
| System.getProperty(ZkSystemPropertyKeys.ZK_AUTOSYNC_ENABLED, "true")); |
| private static final String SYNC_PATH = "/"; |
| |
| private static AtomicLong UID = new AtomicLong(0); |
| public final long _uid; |
| |
| // ZNode write size limit in bytes. |
| // TODO: use ZKConfig#JUTE_MAXBUFFER once bumping up ZK to 3.5.2+ |
| private static final int WRITE_SIZE_LIMIT = |
| Integer.getInteger(ZkSystemPropertyKeys.JUTE_MAXBUFFER, ZNRecord.SIZE_LIMIT); |
| |
| private final IZkConnection _connection; |
| private final long _operationRetryTimeoutInMillis; |
| private final Map<String, Set<IZkChildListener>> _childListener = new ConcurrentHashMap<>(); |
| private final ConcurrentHashMap<String, Set<IZkDataListenerEntry>> _dataListener = |
| new ConcurrentHashMap<>(); |
| private final Set<IZkStateListener> _stateListener = new CopyOnWriteArraySet<>(); |
| private KeeperState _currentState; |
| private final ZkLock _zkEventLock = new ZkLock(); |
| |
| // When a new zookeeper instance is created in reconnect, its session id is not yet valid before |
| // the zookeeper session is established(SyncConnected). To avoid session race condition in |
| // handling new session, the new session event is only fired after SyncConnected. Meanwhile, |
| // SyncConnected state is also received when re-opening the zk connection. So to avoid firing |
| // new session event more than once, this flag is used to check. |
| // It is set to false when once existing expires. And set it to true once the new session event |
| // is fired the first time. |
| private boolean _isNewSessionEventFired; |
| |
| private boolean _shutdownTriggered; |
| private ZkEventThread _eventThread; |
| // TODO PVo remove this later |
| private Thread _zookeeperEventThread; |
| private volatile boolean _closed; |
| private PathBasedZkSerializer _pathBasedZkSerializer; |
| private ZkClientMonitor _monitor; |
| |
| // To automatically retry the async operation, we need a separate thread other than the |
| // ZkEventThread. Otherwise the retry request might block the normal event processing. |
| protected final ZkAsyncRetryThread _asyncCallRetryThread; |
| |
| private class IZkDataListenerEntry { |
| final IZkDataListener _dataListener; |
| final boolean _prefetchData; |
| |
| public IZkDataListenerEntry(IZkDataListener dataListener, boolean prefetchData) { |
| _dataListener = dataListener; |
| _prefetchData = prefetchData; |
| } |
| |
| public IZkDataListenerEntry(IZkDataListener dataListener) { |
| _dataListener = dataListener; |
| _prefetchData = false; |
| } |
| |
| public IZkDataListener getDataListener() { |
| return _dataListener; |
| } |
| |
| public boolean isPrefetchData() { |
| return _prefetchData; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) { |
| return true; |
| } |
| if (!(o instanceof IZkDataListenerEntry)) { |
| return false; |
| } |
| |
| IZkDataListenerEntry that = (IZkDataListenerEntry) o; |
| |
| return _dataListener.equals(that._dataListener); |
| } |
| |
| @Override |
| public int hashCode() { |
| return _dataListener.hashCode(); |
| } |
| } |
| |
| private class ZkPathStatRecord { |
| private final String _path; |
| private Stat _stat = null; |
| private boolean _checked = false; |
| |
| public ZkPathStatRecord(String path) { |
| _path = path; |
| } |
| |
| public boolean pathExists() { |
| return _stat != null; |
| } |
| |
| public boolean pathChecked() { |
| return _checked; |
| } |
| |
| /* |
| * Note this method is not thread safe. |
| */ |
| public void recordPathStat(Stat stat, OptionalLong notificationTime) { |
| _checked = true; |
| _stat = stat; |
| |
| if (_monitor != null && stat != null && notificationTime.isPresent()) { |
| long updateTime = Math.max(stat.getCtime(), stat.getMtime()); |
| if (notificationTime.getAsLong() > updateTime) { |
| _monitor.recordDataPropagationLatency(_path, notificationTime.getAsLong() - updateTime); |
| } // else, the node was updated again after the notification. Propagation latency is |
| // unavailable. |
| } |
| } |
| } |
| |
| protected ZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout, |
| PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey, |
| String monitorInstanceName, boolean monitorRootPathOnly) { |
| if (zkConnection == null) { |
| throw new NullPointerException("Zookeeper connection is null!"); |
| } |
| |
| _uid = UID.getAndIncrement(); |
| validateWriteSizeLimitConfig(); |
| |
| _connection = zkConnection; |
| _pathBasedZkSerializer = zkSerializer; |
| _operationRetryTimeoutInMillis = operationRetryTimeout; |
| _isNewSessionEventFired = false; |
| |
| _asyncCallRetryThread = new ZkAsyncRetryThread(zkConnection.getServers()); |
| _asyncCallRetryThread.start(); |
| LOG.debug("ZkClient created with uid {}, _asyncCallRetryThread id {}", _uid, _asyncCallRetryThread.getId()); |
| |
| if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null && !monitorType |
| .isEmpty()) { |
| _monitor = |
| new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly, |
| _eventThread); |
| } else { |
| LOG.info("ZkClient monitor key or type is not provided. Skip monitoring."); |
| } |
| |
| connect(connectionTimeout, this); |
| |
| try { |
| if (_monitor != null) { |
| _monitor.register(); |
| } |
| } catch (JMException e){ |
| LOG.error("Error in creating ZkClientMonitor", e); |
| } |
| } |
| |
| public List<String> subscribeChildChanges(String path, IZkChildListener listener) { |
| ChildrenSubscribeResult result = subscribeChildChanges(path, listener, false); |
| return result.getChildren(); |
| } |
| |
| public ChildrenSubscribeResult subscribeChildChanges(String path, IZkChildListener listener, boolean skipWatchingNonExistNode) { |
| synchronized (_childListener) { |
| Set<IZkChildListener> listeners = _childListener.get(path); |
| if (listeners == null) { |
| listeners = new CopyOnWriteArraySet<>(); |
| _childListener.put(path, listeners); |
| } |
| listeners.add(listener); |
| } |
| |
| List<String> children = watchForChilds(path, skipWatchingNonExistNode); |
| if (children == null && skipWatchingNonExistNode) { |
| unsubscribeChildChanges(path, listener); |
| LOG.info("zkclient{}, watchForChilds failed to install no-existing watch and add listener. Path: {}", _uid, path); |
| return new ChildrenSubscribeResult(children, false); |
| } |
| |
| return new ChildrenSubscribeResult(children, true); |
| } |
| |
| public void unsubscribeChildChanges(String path, IZkChildListener childListener) { |
| synchronized (_childListener) { |
| final Set<IZkChildListener> listeners = _childListener.get(path); |
| if (listeners != null) { |
| listeners.remove(childListener); |
| } |
| } |
| } |
| |
| public boolean subscribeDataChanges(String path, IZkDataListener listener, boolean skipWatchingNonExistNode) { |
| Set<IZkDataListenerEntry> listenerEntries; |
| synchronized (_dataListener) { |
| listenerEntries = _dataListener.get(path); |
| if (listenerEntries == null) { |
| listenerEntries = new CopyOnWriteArraySet<>(); |
| _dataListener.put(path, listenerEntries); |
| } |
| |
| boolean prefetchEnabled = isPrefetchEnabled(listener); |
| IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(listener, prefetchEnabled); |
| listenerEntries.add(listenerEntry); |
| if (prefetchEnabled) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("zkclient {} subscribed data changes for {}, listener {}, prefetch data {}", |
| _uid, path, listener, prefetchEnabled); |
| } |
| } |
| } |
| |
| boolean watchInstalled = watchForData(path, skipWatchingNonExistNode); |
| if (!watchInstalled) { |
| // Now let us remove this handler. |
| unsubscribeDataChanges(path, listener); |
| LOG.info("zkclient {} watchForData failed to install no-existing path and thus add listener. Path: {}", |
| _uid, path); |
| return false; |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("zkclient {}, Subscribed data changes for {}", _uid, path); |
| } |
| return true; |
| } |
| |
| /** |
| * Subscribe the path and the listener will handle data events of the path |
| * WARNING: if the path is created after deletion, users need to re-subscribe the path |
| * @param path The zookeeper path |
| * @param listener Instance of {@link IZkDataListener} |
| */ |
| public void subscribeDataChanges(String path, IZkDataListener listener) { |
| subscribeDataChanges(path, listener, false); |
| } |
| |
| private boolean isPrefetchEnabled(IZkDataListener dataListener) { |
| PreFetchChangedData preFetch = dataListener.getClass().getAnnotation(PreFetchChangedData.class); |
| if (preFetch != null) { |
| return preFetch.enabled(); |
| } |
| |
| Method callbackMethod = IZkDataListener.class.getMethods()[0]; |
| try { |
| Method method = dataListener.getClass() |
| .getMethod(callbackMethod.getName(), callbackMethod.getParameterTypes()); |
| PreFetchChangedData preFetchInMethod = method.getAnnotation(PreFetchChangedData.class); |
| if (preFetchInMethod != null) { |
| return preFetchInMethod.enabled(); |
| } |
| } catch (NoSuchMethodException e) { |
| LOG.warn("Zkclient {}, No method {} defined in listener {}", |
| _uid, callbackMethod.getName(), dataListener.getClass().getCanonicalName()); |
| } |
| |
| return true; |
| } |
| |
| public void unsubscribeDataChanges(String path, IZkDataListener dataListener) { |
| synchronized (_dataListener) { |
| final Set<IZkDataListenerEntry> listeners = _dataListener.get(path); |
| if (listeners != null) { |
| IZkDataListenerEntry listenerEntry = new IZkDataListenerEntry(dataListener); |
| listeners.remove(listenerEntry); |
| } |
| if (listeners == null || listeners.isEmpty()) { |
| _dataListener.remove(path); |
| } |
| } |
| } |
| |
| public void subscribeStateChanges(final IZkStateListener listener) { |
| synchronized (_stateListener) { |
| _stateListener.add(listener); |
| } |
| } |
| |
| /** |
| * Subscribes state changes for a {@link IZkStateListener} listener. |
| * |
| * @deprecated |
| * This is deprecated. It is kept for backwards compatibility. Please use |
| * {@link #subscribeStateChanges(IZkStateListener)}. |
| * |
| * @param listener {@link IZkStateListener} listener |
| */ |
| @Deprecated |
| public void subscribeStateChanges( |
| final org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { |
| subscribeStateChanges(new IZkStateListenerI0ItecImpl(listener)); |
| } |
| |
| public void unsubscribeStateChanges(IZkStateListener stateListener) { |
| synchronized (_stateListener) { |
| _stateListener.remove(stateListener); |
| } |
| } |
| |
| /** |
| * Unsubscribes state changes for a {@link IZkStateListener} listener. |
| * |
| * @deprecated |
| * This is deprecated. It is kept for backwards compatibility. Please use |
| * {@link #unsubscribeStateChanges(IZkStateListener)}. |
| * |
| * @param stateListener {@link IZkStateListener} listener |
| */ |
| @Deprecated |
| public void unsubscribeStateChanges( |
| org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener stateListener) { |
| unsubscribeStateChanges(new IZkStateListenerI0ItecImpl(stateListener)); |
| } |
| |
| public void unsubscribeAll() { |
| synchronized (_childListener) { |
| _childListener.clear(); |
| } |
| synchronized (_dataListener) { |
| _dataListener.clear(); |
| } |
| synchronized (_stateListener) { |
| _stateListener.clear(); |
| } |
| } |
| |
| // </listeners> |
| |
| /** |
| * Create a persistent node. |
| * @param path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistent(String path) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createPersistent(path, false); |
| } |
| |
| /** |
| * Create a persistent node with TTL. |
| * @param path the path where you want the node to be created |
| * @param ttl TTL of the node in milliseconds |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistentWithTTL(String path, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createPersistentWithTTL(path, false, ttl); |
| } |
| |
| /** |
| * Create a container node. |
| * @param path the path where you want the node to be created |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createContainer(String path) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createContainer(path, false); |
| } |
| |
| /** |
| * Create a persistent node and set its ACLs. |
| * @param path |
| * @param createParents |
| * if true all parent dirs are created as well and no {@link ZkNodeExistsException} is |
| * thrown in case the |
| * path already exists |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistent(String path, boolean createParents) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createPersistent(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE); |
| } |
| |
| /** |
| * Create a persistent node with TTL and set its ACLs. |
| * @param path the path where you want the node to be created |
| * @param createParents if true all parent dirs are created as well and no |
| * {@link ZkNodeExistsException} is thrown in case the path already exists |
| * @param ttl TTL of the node in milliseconds |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistentWithTTL(String path, boolean createParents, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createPersistentWithTTL(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE, ttl); |
| } |
| |
| /** |
| * Create a container node and set its ACLs. |
| * @param path the path where you want the node to be created |
| * @param createParents if true all parent dirs are created as well and no |
| * {@link ZkNodeExistsException} is thrown in case the path already exists |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createContainer(String path, boolean createParents) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createContainer(path, createParents, ZooDefs.Ids.OPEN_ACL_UNSAFE); |
| } |
| |
| /** |
| * Create a persistent node and set its ACLs. |
| * @param path |
| * @param acl |
| * List of ACL permissions to assign to the node |
| * @param createParents |
| * if true all parent dirs are created as well and no {@link ZkNodeExistsException} is |
| * thrown in case the |
| * path already exists |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistent(String path, boolean createParents, List<ACL> acl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| try { |
| create(path, null, acl, CreateMode.PERSISTENT); |
| } catch (ZkNodeExistsException e) { |
| if (!createParents) { |
| throw e; |
| } |
| } catch (ZkNoNodeException e) { |
| if (!createParents) { |
| throw e; |
| } |
| String parentDir = path.substring(0, path.lastIndexOf('/')); |
| createPersistent(parentDir, createParents, acl); |
| createPersistent(path, createParents, acl); |
| } |
| } |
| |
| /** |
| * Create a persistent node with TTL and set its ACLs. |
| * @param path the path where you want the node to be created |
| * @param createParents if true all parent dirs are created as well and no |
| * {@link ZkNodeExistsException} is thrown in case the path already exists |
| * @param acl List of ACL permissions to assign to the node |
| * @param ttl TTL of the node in milliseconds |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistentWithTTL(String path, boolean createParents, List<ACL> acl, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| try { |
| create(path, null, acl, CreateMode.PERSISTENT_WITH_TTL, ttl); |
| } catch (ZkNodeExistsException e) { |
| if (!createParents) { |
| throw e; |
| } |
| } catch (ZkNoNodeException e) { |
| if (!createParents) { |
| throw e; |
| } |
| String parentDir = path.substring(0, path.lastIndexOf('/')); |
| createPersistentWithTTL(parentDir, createParents, acl, ttl); |
| createPersistentWithTTL(path, createParents, acl, ttl); |
| } |
| } |
| |
| /** |
| * Create a container node and set its ACLs. |
| * @param path the path where you want the node to be created |
| * @param createParents if true all parent dirs are created as well and no |
| * {@link ZkNodeExistsException} is thrown in case the path already exists |
| * @param acl List of ACL permissions to assign to the node |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createContainer(String path, boolean createParents, List<ACL> acl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| try { |
| create(path, null, acl, CreateMode.CONTAINER); |
| } catch (ZkNodeExistsException e) { |
| if (!createParents) { |
| throw e; |
| } |
| } catch (ZkNoNodeException e) { |
| if (!createParents) { |
| throw e; |
| } |
| String parentDir = path.substring(0, path.lastIndexOf('/')); |
| createContainer(parentDir, createParents, acl); |
| createContainer(path, createParents, acl); |
| } |
| } |
| |
| /** |
| * Create a persistent node. |
| * @param path |
| * @param data |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistent(String path, Object data) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, CreateMode.PERSISTENT); |
| } |
| |
| /** |
| * Create a persistent node with TTL. |
| * @param path the path where you want the node to be created |
| * @param data data of the node |
| * @param ttl TTL of the node in milliseconds |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistentWithTTL(String path, Object data, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, CreateMode.PERSISTENT_WITH_TTL, ttl); |
| } |
| |
| /** |
| * Create a container node. |
| * @param path the path where you want the node to be created |
| * @param data data of the node |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createContainer(String path, Object data) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, CreateMode.CONTAINER); |
| } |
| |
| /** |
| * Create a persistent node. |
| * @param path |
| * @param data |
| * @param acl |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistent(String path, Object data, List<ACL> acl) { |
| create(path, data, acl, CreateMode.PERSISTENT); |
| } |
| |
| /** |
| * Create a persistent node with TTL. |
| * @param path the path where you want the node to be created |
| * @param data data of the node |
| * @param acl list of ACL for the node |
| * @param ttl TTL of the node in milliseconds |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createPersistentWithTTL(String path, Object data, List<ACL> acl, long ttl) { |
| create(path, data, acl, CreateMode.PERSISTENT_WITH_TTL, ttl); |
| } |
| |
| /** |
| * Create a container node. |
| * @param path the path where you want the node to be created |
| * @param data data of the node |
| * @param acl list of ACL for the node |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createContainer(String path, Object data, List<ACL> acl) { |
| create(path, data, acl, CreateMode.CONTAINER); |
| } |
| |
| /** |
| * Create a persistent, sequental node. |
| * @param path |
| * @param data |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createPersistentSequential(String path, Object data) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL); |
| } |
| |
| /** |
| * Create a persistent, sequential node. |
| * @param path the path where you want the node to be created |
| * @param data data of the node |
| * @param ttl TTL of the node in milliseconds |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createPersistentSequentialWithTTL(String path, Object data, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl); |
| } |
| |
| /** |
| * Create a persistent, sequential node and set its ACL. |
| * @param path |
| * @param acl |
| * @param data |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createPersistentSequential(String path, Object data, List<ACL> acl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL); |
| } |
| |
| /** |
| * Create a persistent, sequential node and set its ACL. |
| * @param path the path where you want the node to be created |
| * @param acl list of ACL for the node |
| * @param data data of the node |
| * @param ttl TTL of the node in milliseconds |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createPersistentSequentialWithTTL(String path, Object data, List<ACL> acl, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, acl, CreateMode.PERSISTENT_SEQUENTIAL_WITH_TTL, ttl); |
| } |
| |
| /** |
| * Create an ephemeral node. |
| * @param path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, null, CreateMode.EPHEMERAL); |
| } |
| |
| /** |
| * Creates an ephemeral node. This ephemeral node is created by the expected(passed-in) ZK session. |
| * If the expected session does not match the current ZK session, the node will not be created. |
| * |
| * @param path path of the node |
| * @param sessionId expected session id of the ZK connection. If the session id of current ZK |
| * connection does not match the expected session id, ephemeral creation will |
| * fail |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final String sessionId) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| createEphemeral(path, null, sessionId); |
| } |
| |
| /** |
| * Create an ephemeral node and set its ACL. |
| * @param path |
| * @param acl |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final List<ACL> acl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, null, acl, CreateMode.EPHEMERAL); |
| } |
| |
| /** |
| * Creates an ephemeral node and set its ACL. This ephemeral node is created by the |
| * expected(passed-in) ZK session. If the expected session does not match the current ZK session, |
| * the node will not be created. |
| * |
| * @param path path of the ephemeral node |
| * @param acl a list of ACL for the ephemeral node. |
| * @param sessionId expected session id of the ZK connection. If the session id of current ZK |
| * connection does not match the expected session id, ephemeral creation will |
| * fail. |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final List<ACL> acl, final String sessionId) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, null, acl, CreateMode.EPHEMERAL, TTL_NOT_SET, sessionId); |
| } |
| |
| /** |
| * Create a node. |
| * @param path |
| * @param data |
| * @param mode |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String create(final String path, Object data, final CreateMode mode) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); |
| } |
| |
| /** |
| * Create a node. |
| * @param path the path where you want the node to be created |
| * @param data data of the node |
| * @param mode {@link CreateMode} of the node |
| * @param ttl TTL of the node in milliseconds, if mode is {@link CreateMode#PERSISTENT_WITH_TTL} |
| * or {@link CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL} |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String create(final String path, Object data, final CreateMode mode, long ttl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, ttl); |
| } |
| |
| /** |
| * Create a node with ACL. |
| * @param path |
| * @param datat |
| * @param acl |
| * @param mode |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode) |
| throws IllegalArgumentException, ZkException { |
| return create(path, datat, acl, mode, TTL_NOT_SET, null); |
| } |
| |
| /** |
| * Create a node with ACL. |
| * @param path the path where you want the node to be created |
| * @param datat data of the node |
| * @param acl list of ACL for the node |
| * @param mode {@link CreateMode} of the node |
| * @param ttl TTL of the node in milliseconds, if mode is {@link CreateMode#PERSISTENT_WITH_TTL} |
| * or {@link CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL} |
| * @return create node's path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String create(final String path, Object datat, final List<ACL> acl, final CreateMode mode, |
| long ttl) throws IllegalArgumentException, ZkException { |
| return create(path, datat, acl, mode, ttl, null); |
| } |
| |
| /** |
| * Creates a node and returns the actual path of the created node. |
| * |
| * Given an expected non-null session id, if the node is successfully created, it is guaranteed to |
| * be created in the expected(passed-in) session. |
| * |
| * If the expected session is expired, which means the expected session does not match the current |
| * session of ZK connection, the node will not be created. |
| * |
| * @param path the path where you want the node to be created |
| * @param dataObject data of the node |
| * @param acl list of ACL for the node |
| * @param mode {@link CreateMode} of the node |
| * @param ttl TTL of the node in milliseconds, if mode is {@link CreateMode#PERSISTENT_WITH_TTL} |
| * or {@link CreateMode#PERSISTENT_SEQUENTIAL_WITH_TTL} |
| * @param expectedSessionId the expected session ID of the ZK connection. It is not necessarily the |
| * session ID of current ZK Connection. If the expected session ID is NOT null, |
| * the node is guaranteed to be created in the expected session, or creation is |
| * failed if the expected session id doesn't match current connected zk session. |
| * If the session id is null, it means the create operation is NOT session aware. |
| * @return path of the node created |
| * @throws IllegalArgumentException if called from anything else except the ZooKeeper event thread |
| * @throws ZkException if any zookeeper exception occurs |
| */ |
| private String create(final String path, final Object dataObject, final List<ACL> acl, |
| final CreateMode mode, long ttl, final String expectedSessionId) |
| throws IllegalArgumentException, ZkException { |
| if (path == null) { |
| throw new NullPointerException("Path must not be null."); |
| } |
| if (acl == null || acl.size() == 0) { |
| throw new NullPointerException("Missing value for ACL"); |
| } |
| long startT = System.currentTimeMillis(); |
| try { |
| final byte[] dataBytes = dataObject == null ? null : serialize(dataObject, path); |
| checkDataSizeLimit(path, dataBytes); |
| |
| final String actualPath; |
| if (mode.isTTL()) { |
| actualPath = retryUntilConnected(() -> getExpectedZookeeper(expectedSessionId) |
| .create(path, dataBytes, acl, mode, null, ttl)); |
| } else { |
| actualPath = retryUntilConnected(() -> getExpectedZookeeper(expectedSessionId) |
| .create(path, dataBytes, acl, mode)); |
| } |
| |
| record(path, dataBytes, startT, ZkClientMonitor.AccessType.WRITE); |
| return actualPath; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.WRITE); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient {} create, path {}, time {} ms", _uid, path, (endT - startT)); |
| } |
| } |
| } |
| |
| /** |
| * Create an ephemeral node. |
| * @param path |
| * @param data |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final Object data) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, CreateMode.EPHEMERAL); |
| } |
| |
| /** |
| * Creates an ephemeral node. Given an expected non-null session id, if the ephemeral |
| * node is successfully created, it is guaranteed to be in the expected(passed-in) session. |
| * |
| * If the expected session is expired, which means the expected session does not match the session |
| * of current ZK connection, the ephemeral node will not be created. |
| * If connection is timed out or interrupted, exception is thrown. |
| * |
| * @param path path of the ephemeral node being created |
| * @param data data of the ephemeral node being created |
| * @param sessionId the expected session ID of the ZK connection. It is not necessarily the |
| * session ID of current ZK Connection. If the expected session ID is NOT null, |
| * the node is guaranteed to be created in the expected session, or creation is |
| * failed if the expected session id doesn't match current connected zk session. |
| * If the session id is null, it means the operation is NOT session aware |
| * and the node will be created by current ZK session. |
| * @throws ZkInterruptedException if operation is interrupted, or a required reconnection gets |
| * interrupted |
| * @throws IllegalArgumentException if called from anything except the ZooKeeper event thread |
| * @throws ZkException if any ZooKeeper exception occurs |
| * @throws RuntimeException if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final Object data, final String sessionId) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, TTL_NOT_SET, sessionId); |
| } |
| |
| /** |
| * Create an ephemeral node. |
| * @param path |
| * @param data |
| * @param acl |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final Object data, final List<ACL> acl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, acl, CreateMode.EPHEMERAL); |
| } |
| |
| /** |
| * Creates an ephemeral node in an expected ZK session. Given an expected non-null session id, |
| * if the ephemeral node is successfully created, it is guaranteed to be in the expected session. |
| * If the expected session is expired, which means the expected session does not match the session |
| * of current ZK connection, the ephemeral node will not be created. |
| * If connection is timed out or interrupted, exception is thrown. |
| * |
| * @param path path of the ephemeral node being created |
| * @param data data of the ephemeral node being created |
| * @param acl list of ACL for the ephemeral node |
| * @param sessionId the expected session ID of the ZK connection. It is not necessarily the |
| * session ID of current ZK Connection. If the expected session ID is NOT null, |
| * the node is guaranteed to be created in the expected session, or creation is |
| * failed if the expected session id doesn't match current connected zk session. |
| * If the session id is null, it means the create operation is NOT session aware |
| * and the node will be created by current ZK session. |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public void createEphemeral(final String path, final Object data, final List<ACL> acl, |
| final String sessionId) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| create(path, data, acl, CreateMode.EPHEMERAL, TTL_NOT_SET, sessionId); |
| } |
| |
| /** |
| * Create an ephemeral, sequential node. |
| * @param path |
| * @param data |
| * @return created path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createEphemeralSequential(final String path, final Object data) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, CreateMode.EPHEMERAL_SEQUENTIAL); |
| } |
| |
| /** |
| * Creates an ephemeral, sequential node with ACL in an expected ZK session. |
| * Given an expected non-null session id, if the ephemeral node is successfully created, |
| * it is guaranteed to be in the expected session. |
| * If the expected session is expired, which means the expected session does not match the session |
| * of current ZK connection, the ephemeral node will not be created. |
| * If connection is timed out or interrupted, exception is thrown. |
| * |
| * @param path path of the node |
| * @param data data of the node |
| * @param acl list of ACL for the node |
| * @return created path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl, |
| final String sessionId) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL, TTL_NOT_SET, sessionId); |
| } |
| |
| /** |
| * Creates an ephemeral, sequential node. Given an expected non-null session id, |
| * if the ephemeral node is successfully created, it is guaranteed to be in the expected session. |
| * If the expected session is expired, which means the expected session does not match the session |
| * of current ZK connection, the ephemeral node will not be created. |
| * If connection is timed out or interrupted, exception is thrown. |
| * |
| * @param path path of the node |
| * @param data data of the node |
| * @param sessionId the expected session ID of the ZK connection. It is not necessarily the |
| * session ID of current ZK Connection. If the expected session ID is NOT null, |
| * the node is guaranteed to be created in the expected session, or creation is |
| * failed if the expected session id doesn't match current connected zk session. |
| * If the session id is null, it means the create operation is NOT session aware |
| * and the node will be created by current ZK session. |
| * @return created path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createEphemeralSequential(final String path, final Object data, |
| final String sessionId) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, |
| TTL_NOT_SET, sessionId); |
| } |
| |
| /** |
| * Create an ephemeral, sequential node with ACL. |
| * @param path |
| * @param data |
| * @param acl |
| * @return created path |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs |
| */ |
| public String createEphemeralSequential(final String path, final Object data, final List<ACL> acl) |
| throws ZkInterruptedException, IllegalArgumentException, ZkException, RuntimeException { |
| return create(path, data, acl, CreateMode.EPHEMERAL_SEQUENTIAL); |
| } |
| |
| @Override |
| public void process(WatchedEvent event) { |
| long notificationTime = System.currentTimeMillis(); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("zkclient {}, Received event: {} ", _uid, event); |
| } |
| _zookeeperEventThread = Thread.currentThread(); |
| |
| boolean stateChanged = event.getPath() == null; |
| boolean sessionExpired = stateChanged && event.getState() == KeeperState.Expired; |
| boolean znodeChanged = event.getPath() != null; |
| boolean dataChanged = |
| event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted |
| || event.getType() == EventType.NodeCreated |
| || event.getType() == EventType.NodeChildrenChanged; |
| if (event.getType() == EventType.NodeDeleted) { |
| LOG.debug("zkclient {}, Path {} is deleted", _uid, event.getPath()); |
| } |
| |
| getEventLock().lock(); |
| try { |
| // We might have to install child change event listener if a new node was created |
| if (getShutdownTrigger()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("zkclient {} ignoring event {}|{} since shutdown triggered", |
| _uid, event.getType(), event.getPath()); |
| } |
| return; |
| } |
| if (stateChanged) { |
| processStateChanged(event); |
| } |
| if (dataChanged) { |
| processDataOrChildChange(event, notificationTime); |
| } |
| } finally { |
| if (stateChanged) { |
| getEventLock().getStateChangedCondition().signalAll(); |
| |
| // If the session expired we have to signal all conditions, because watches might have been |
| // removed and |
| // there is no guarantee that those |
| // conditions will be signaled at all after an Expired event |
| // TODO PVo write a test for this |
| if (event.getState() == KeeperState.Expired) { |
| getEventLock().getZNodeEventCondition().signalAll(); |
| getEventLock().getDataChangedCondition().signalAll(); |
| } |
| } |
| if (znodeChanged) { |
| getEventLock().getZNodeEventCondition().signalAll(); |
| } |
| if (dataChanged) { |
| getEventLock().getDataChangedCondition().signalAll(); |
| } |
| getEventLock().unlock(); |
| |
| // update state change counter. |
| recordStateChange(stateChanged, dataChanged, sessionExpired); |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("zkclient {} Leaving process event", _uid); |
| } |
| } |
| } |
| |
| private void fireAllEvents() { |
| //TODO: During handling new session, if the path is deleted, watcher leakage could still happen |
| for (Entry<String, Set<IZkChildListener>> entry : _childListener.entrySet()) { |
| fireChildChangedEvents(entry.getKey(), entry.getValue(), true); |
| } |
| for (Entry<String, Set<IZkDataListenerEntry>> entry : _dataListener.entrySet()) { |
| fireDataChangedEvents(entry.getKey(), entry.getValue(), OptionalLong.empty(), true); |
| } |
| } |
| |
| /** |
| * Returns a list of children of the given path. |
| * <p> |
| * NOTE: if the given path has too many children which causes the network packet length to exceed |
| * {@code jute.maxbuffer}, there are 2 cases, depending on whether or not the native |
| * zk supports paginated getChildren API and the config |
| * {@link ZkSystemPropertyKeys#ZK_GETCHILDREN_PAGINATION_DISABLED}: |
| * <p>1) pagination is disabled by {@link ZkSystemPropertyKeys#ZK_GETCHILDREN_PAGINATION_DISABLED} |
| * set to true or zk does not support pagination: the operation will fail. |
| * <p>2) config is false and zk supports pagination. A list of all children will be fetched using |
| * pagination and returned. But please note that the final children list is NOT strongly |
| * consistent with server - the list might contain some deleted children if some children |
| * are deleted before the last page is fetched. The upstream caller should be able to handle this. |
| */ |
| public List<String> getChildren(String path) { |
| return getChildren(path, hasListeners(path)); |
| } |
| |
| protected List<String> getChildren(final String path, final boolean watch) { |
| long startT = System.currentTimeMillis(); |
| |
| try { |
| List<String> children = retryUntilConnected(new Callable<List<String>>() { |
| private int connectionLossRetryCount = 0; |
| |
| @Override |
| public List<String> call() throws Exception { |
| try { |
| return getConnection().getChildren(path, watch); |
| } catch (ConnectionLossException e) { |
| // Issue: https://github.com/apache/helix/issues/962 |
| // Connection loss might be caused by an excessive number of children. |
| // Infinitely retrying connecting may cause high GC in ZK server and kill ZK server. |
| // This is a workaround to check numChildren to have a chance to exit retry loop. |
| // Check numChildren stat every other 3 connection loss, because there is a higher |
| // possibility that connection loss is caused by other factors such as network |
| // connectivity, session expired, etc. |
| // TODO: remove this check once we have a better way to exit infinite retry |
| ++connectionLossRetryCount; |
| if (connectionLossRetryCount >= 3) { |
| checkNumChildrenLimit(path); |
| connectionLossRetryCount = 0; |
| } |
| |
| // Re-throw the ConnectionLossException for retryUntilConnected() to catch and retry. |
| throw e; |
| } |
| } |
| }); |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| return children; |
| } catch (ZkNoNodeException e) { |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient {} getChildren, path {} time: {} ms", _uid, path, (endT - startT) ); |
| } |
| } |
| } |
| |
| /** |
| * Counts number of children for the given path. |
| * @param path |
| * @return number of children or 0 if path does not exist. |
| */ |
| public int countChildren(String path) { |
| try { |
| return getChildren(path).size(); |
| } catch (ZkNoNodeException e) { |
| return 0; |
| } |
| } |
| |
| public boolean exists(final String path) { |
| return exists(path, hasListeners(path)); |
| } |
| |
| protected boolean exists(final String path, final boolean watch) { |
| long startT = System.currentTimeMillis(); |
| try { |
| boolean exists = retryUntilConnected(new Callable<Boolean>() { |
| @Override |
| public Boolean call() throws Exception { |
| return getConnection().exists(path, watch); |
| } |
| }); |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| return exists; |
| } catch (ZkNoNodeException e) { |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient exists, path: {}, time: {} ms", _uid, path, (endT - startT)); |
| } |
| } |
| } |
| |
| public Stat getStat(final String path) { |
| return getStat(path, false); |
| } |
| |
| private Stat getStat(final String path, final boolean watch) { |
| long startT = System.currentTimeMillis(); |
| final Stat stat; |
| try { |
| stat = retryUntilConnected( |
| () -> ((ZkConnection) getConnection()).getZookeeper().exists(path, watch)); |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| return stat; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient exists, path: {}, time: {} ms", _uid, path, (endT - startT)); |
| } |
| } |
| } |
| |
| /* |
| * This one installs watch only if path is there. Meant to avoid leaking watch in Zk server. |
| */ |
| private Stat installWatchOnlyPathExist(final String path) { |
| long startT = System.currentTimeMillis(); |
| final Stat stat; |
| try { |
| stat = new Stat(); |
| try { |
| LOG.debug("installWatchOnlyPathExist with path: {} ", path); |
| retryUntilConnected(() -> ((ZkConnection) getConnection()).getZookeeper().getData(path, true, stat)); |
| } catch (ZkNoNodeException e) { |
| LOG.debug("installWatchOnlyPathExist path not existing: {}", path); |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| return null; |
| } |
| record(path, null, startT, ZkClientMonitor.AccessType.READ); |
| return stat; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient getData (installWatchOnlyPathExist), path: {}, time: {} ms", |
| _uid, path, (endT - startT)); |
| } |
| } |
| } |
| |
| protected void processStateChanged(WatchedEvent event) { |
| LOG.info("zkclient {}, zookeeper state changed ( {} )", _uid, event.getState()); |
| setCurrentState(event.getState()); |
| if (getShutdownTrigger()) { |
| return; |
| } |
| |
| fireStateChangedEvent(event.getState()); |
| |
| /* |
| * Note, the intention is that only the ZkClient managing the session would do auto reconnect |
| * and fireNewSessionEvents and fireAllEvent. |
| * Other ZkClient not managing the session would only fireAllEvent upon a new session. |
| */ |
| if (event.getState() == KeeperState.SyncConnected) { |
| if (!_isNewSessionEventFired && !"0".equals(getHexSessionId())) { |
| /* |
| * Before the new zookeeper instance is connected to the zookeeper service and its session |
| * is established, its session id is 0. |
| * New session event is not fired until the new zookeeper session receives the first |
| * SyncConnected state(the zookeeper session is established). |
| * Now the session id is available and non-zero, and we can fire new session events. |
| */ |
| fireNewSessionEvents(); |
| |
| /* |
| * Set it true to avoid firing events again for the same session next time |
| * when SyncConnected events are received. |
| */ |
| _isNewSessionEventFired = true; |
| |
| /* |
| * With this first SyncConnected state, we just get connected to zookeeper service after |
| * reconnecting when the session expired. Because previous session expired, we also have to |
| * notify all listeners that something might have changed. |
| */ |
| fireAllEvents(); |
| } |
| } else if (event.getState() == KeeperState.Expired) { |
| _isNewSessionEventFired = false; |
| reconnectOnExpiring(); |
| } |
| } |
| |
| private void reconnectOnExpiring() { |
| // only managing zkclient reconnect |
| if (!isManagingZkConnection()) { |
| return; |
| } |
| int retryCount = 0; |
| ExponentialBackoffStrategy retryStrategy = |
| new ExponentialBackoffStrategy(MAX_RECONNECT_INTERVAL_MS, true); |
| |
| Exception reconnectException = new ZkException("Shutdown triggered."); |
| while (!isClosed()) { |
| try { |
| reconnect(); |
| return; |
| } catch (ZkInterruptedException interrupt) { |
| reconnectException = interrupt; |
| break; |
| } catch (Exception e) { |
| reconnectException = e; |
| long waitInterval = retryStrategy.getNextWaitInterval(retryCount++); |
| LOG.warn("ZkClient {}, reconnect on expiring failed. Will retry after {} ms", |
| _uid, waitInterval, e); |
| try { |
| Thread.sleep(waitInterval); |
| } catch (InterruptedException ex) { |
| reconnectException = ex; |
| break; |
| } |
| } |
| } |
| |
| LOG.info("Zkclient {} unable to re-establish connection. Notifying consumer of the following exception:{}", |
| _uid, reconnectException); |
| fireSessionEstablishmentError(reconnectException); |
| } |
| |
| private void reconnect() { |
| getEventLock().lock(); |
| try { |
| ZkConnection connection = ((ZkConnection) getConnection()); |
| connection.reconnect(this); |
| } catch (InterruptedException e) { |
| throw new ZkInterruptedException(e); |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| private void doAsyncSync(final ZooKeeper zk, final String path, final long startT, |
| final ZkAsyncCallbacks.SyncCallbackHandler cb) { |
| try { |
| zk.sync(path, cb, |
| new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) { |
| @Override |
| protected void doRetry() throws Exception { |
| doAsyncSync(zk, path, System.currentTimeMillis(), cb); |
| } |
| }); |
| } catch (RuntimeException e) { |
| // Process callback to release caller from waiting |
| cb.processResult(KeeperException.Code.APIERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, true)); |
| throw e; |
| } |
| } |
| |
| |
| /* |
| * Note, issueSync takes a ZooKeeper (client) object and pass it to doAsyncSync(). |
| * The reason we do this is that we want to ensure each new session event is preceded with exactly |
| * one sync() to server. The sync() is to make sure the server would not see stale data. |
| * |
| * ZooKeeper client object has an invariant of each object has one session. With this invariant |
| * we can achieve each one sync() to server upon new session establishment. The reasoning is: |
| * issueSync() is called when fireNewSessionEvents() which in under eventLock of ZkClient. Thus |
| * we are guaranteed the ZooKeeper object passed in would have the new incoming sessionId. If by |
| * the time sync() is invoked, the session expires. The sync() would fail with a stale session. |
| * This is exactly what we want. The newer session would ensure another fireNewSessionEvents. |
| */ |
| private boolean issueSync(ZooKeeper zk) { |
| String sessionId = Long.toHexString(zk.getSessionId()); |
| ZkAsyncCallbacks.SyncCallbackHandler callbackHandler = |
| new ZkAsyncCallbacks.SyncCallbackHandler(sessionId); |
| |
| final long startT = System.currentTimeMillis(); |
| doAsyncSync(zk, SYNC_PATH, startT, callbackHandler); |
| |
| callbackHandler.waitForSuccess(); |
| |
| KeeperException.Code code = KeeperException.Code.get(callbackHandler.getRc()); |
| if (code == KeeperException.Code.OK) { |
| LOG.info("zkclient {}, sycnOnNewSession with sessionID {} async return code: {} and proceeds", |
| _uid, sessionId, code); |
| return true; |
| } |
| |
| // Not retryable error, including session expiration; return false. |
| return false; |
| } |
| |
| private void fireNewSessionEvents() { |
| // only managing zkclient fire handleNewSession event |
| if (!isManagingZkConnection()) { |
| return; |
| } |
| final String sessionId = getHexSessionId(); |
| |
| if (SYNC_ON_SESSION) { |
| final ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper(); |
| _eventThread.send(new ZkEventThread.ZkEvent("Sync call before new session event of session " + sessionId, |
| sessionId) { |
| @Override |
| public void run() throws Exception { |
| if (issueSync(zk) == false) { |
| LOG.warn("zkclient{}, Failed to call sync() on new session {}", _uid, sessionId); |
| } |
| } |
| }); |
| } |
| |
| for (final IZkStateListener stateListener : _stateListener) { |
| _eventThread |
| .send(new ZkEventThread.ZkEvent("New session event sent to " + stateListener, sessionId) { |
| |
| @Override |
| public void run() throws Exception { |
| stateListener.handleNewSession(sessionId); |
| } |
| }); |
| } |
| } |
| |
| protected void fireStateChangedEvent(final KeeperState state) { |
| final String sessionId = getHexSessionId(); |
| for (final IZkStateListener stateListener : _stateListener) { |
| final String description = "State changed to " + state + " sent to " + stateListener; |
| _eventThread.send(new ZkEventThread.ZkEvent(description, sessionId) { |
| |
| @Override |
| public void run() throws Exception { |
| stateListener.handleStateChanged(state); |
| } |
| }); |
| } |
| } |
| |
| private void fireSessionEstablishmentError(final Throwable error) { |
| for (final IZkStateListener stateListener : _stateListener) { |
| _eventThread |
| .send(new ZkEventThread.ZkEvent("Session establishment error(" + error + ") sent to " + stateListener) { |
| |
| @Override |
| public void run() throws Exception { |
| stateListener.handleSessionEstablishmentError(error); |
| } |
| }); |
| } |
| } |
| |
| private boolean hasListeners(String path) { |
| Set<IZkDataListenerEntry> dataListeners = _dataListener.get(path); |
| if (dataListeners != null && dataListeners.size() > 0) { |
| return true; |
| } |
| Set<IZkChildListener> childListeners = _childListener.get(path); |
| if (childListeners != null && childListeners.size() > 0) { |
| return true; |
| } |
| return false; |
| } |
| |
| /** |
| * Delete the path as well as all its children. |
| * This method is deprecated, please use {@link #deleteRecursively(String)}} instead |
| * @param path ZK path |
| * @return true if successfully deleted all children, and the given path, else false |
| */ |
| @Deprecated |
| public boolean deleteRecursive(String path) { |
| try { |
| deleteRecursively(path); |
| return true; |
| } catch (ZkClientException e) { |
| LOG.error("zkcient {}, Failed to recursively delete path {}, exception {}", |
| _uid, path, e); |
| return false; |
| } |
| } |
| |
| /** |
| * Delete the path as well as all its children. |
| * @param path |
| * @throws ZkClientException |
| */ |
| public void deleteRecursively(String path) throws ZkClientException { |
| List<String> children; |
| try { |
| children = getChildren(path, false); |
| } catch (ZkNoNodeException e) { |
| // if the node to be deleted does not exist, treat it as success. |
| return; |
| } |
| |
| for (String subPath : children) { |
| deleteRecursively(path + "/" + subPath); |
| } |
| |
| // delete() function call will return true if successful, false if the path does not |
| // exist (in this context, it should be treated as successful), and throw exception |
| // if there is any other failure case. |
| try { |
| delete(path); |
| } catch (Exception e) { |
| LOG.error("zkclient {}, Failed to delete {}, exception {}", _uid, path, e); |
| throw new ZkClientException("Failed to delete " + path, e); |
| } |
| } |
| |
| private void processDataOrChildChange(WatchedEvent event, long notificationTime) { |
| final String path = event.getPath(); |
| final boolean pathExists = event.getType() != EventType.NodeDeleted; |
| if (EventType.NodeDeleted == event.getType()) { |
| LOG.debug("zkclient{}, Event NodeDeleted: {}", _uid, event.getPath()); |
| } |
| |
| if (event.getType() == EventType.NodeChildrenChanged || event.getType() == EventType.NodeCreated |
| || event.getType() == EventType.NodeDeleted) { |
| Set<IZkChildListener> childListeners = _childListener.get(path); |
| if (childListeners != null && !childListeners.isEmpty()) { |
| // TODO recording child changed event propagation latency as well. Note this change will |
| // introduce additional ZK access. |
| fireChildChangedEvents(path, childListeners, pathExists); |
| } |
| } |
| |
| if (event.getType() == EventType.NodeDataChanged || event.getType() == EventType.NodeDeleted |
| || event.getType() == EventType.NodeCreated) { |
| Set<IZkDataListenerEntry> listeners = _dataListener.get(path); |
| if (listeners != null && !listeners.isEmpty()) { |
| fireDataChangedEvents(event.getPath(), listeners, OptionalLong.of(notificationTime), |
| pathExists); |
| } |
| } |
| } |
| |
| private void fireDataChangedEvents(final String path, Set<IZkDataListenerEntry> listeners, |
| final OptionalLong notificationTime, boolean pathExists) { |
| try { |
| final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path); |
| // Trigger listener callbacks |
| for (final IZkDataListenerEntry listener : listeners) { |
| _eventThread.send(new ZkEventThread.ZkEvent( |
| "Data of " + path + " changed sent to " + listener.getDataListener() |
| + " prefetch data: " + listener.isPrefetchData()) { |
| @Override |
| public void run() throws Exception { |
| if (!pathStatRecord.pathChecked()) { |
| // getStat() wrapp two ways to install data watch by using exists() or getData(). |
| // getData() aka useGetData (true) would not install the watch if the node not ] |
| // existing. Exists() aka useGetData (false) would install (leak) the watch if the |
| // node not existing. |
| // Here the goal is to avoid leaking watch. Thus, if we know path not exists, we use |
| // the exists() useGetData (false) route to check stat. Otherwise, we use getData() |
| // to install watch. |
| Stat stat = null; |
| if (!pathExists) { |
| stat = getStat(path, false); |
| } else { |
| stat = installWatchOnlyPathExist(path); |
| } |
| pathStatRecord.recordPathStat(stat, notificationTime); |
| } |
| if (!pathStatRecord.pathExists()) { |
| listener.getDataListener().handleDataDeleted(path); |
| } else { |
| Object data = null; |
| if (listener.isPrefetchData()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("zkclient {} Prefetch data for path: {}", _uid, path); |
| } |
| try { |
| // TODO: the data is redundantly read multiple times when multiple listeners exist |
| data = readData(path, null, true); |
| } catch (ZkNoNodeException e) { |
| LOG.warn("zkclient {} Prefetch data for path: {} failed.", _uid, path, e); |
| listener.getDataListener().handleDataDeleted(path); |
| return; |
| } |
| } |
| listener.getDataListener().handleDataChange(path, data); |
| } |
| } |
| }); |
| } |
| } catch (Exception e) { |
| LOG.error("zkclient {} Failed to fire data changed event for path: {}", _uid, path, e); |
| } |
| } |
| |
| private void fireChildChangedEvents(final String path, Set<IZkChildListener> childListeners, boolean pathExists) { |
| try { |
| final ZkPathStatRecord pathStatRecord = new ZkPathStatRecord(path); |
| for (final IZkChildListener listener : childListeners) { |
| _eventThread.send(new ZkEventThread.ZkEvent("Children of " + path + " changed sent to " + listener) { |
| @Override |
| public void run() throws Exception { |
| if (!pathStatRecord.pathChecked()) { |
| Stat stat = null; |
| if (!pathExists || !hasListeners(path)) { |
| // will not install listener using exists call |
| stat = getStat(path, false); |
| } else { |
| // will install listener using getData() call; if node not there, install nothing |
| stat = installWatchOnlyPathExist(path); |
| } |
| pathStatRecord.recordPathStat(stat, OptionalLong.empty()); |
| } |
| List<String> children = null; |
| if (pathStatRecord.pathExists()) { |
| try { |
| children = getChildren(path); |
| } catch (ZkNoNodeException e) { |
| LOG.warn("zkclient {} Get children under path: {} failed.", _uid, path, e); |
| // Continue trigger the change handler |
| } |
| } |
| listener.handleChildChange(path, children); |
| } |
| }); |
| } |
| } catch (Exception e) { |
| LOG.error("zkclient {} Failed to fire child changed event. Unable to getChildren.", _uid, e); |
| } |
| } |
| |
| public boolean waitUntilExists(String path, TimeUnit timeUnit, long time) |
| throws ZkInterruptedException { |
| Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time)); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Waiting until znode {} becomes available.", _uid, path); |
| } |
| if (exists(path)) { |
| return true; |
| } |
| acquireEventLock(); |
| try { |
| while (!exists(path, true)) { |
| boolean gotSignal = getEventLock().getZNodeEventCondition().awaitUntil(timeout); |
| if (!gotSignal) { |
| return false; |
| } |
| } |
| return true; |
| } catch (InterruptedException e) { |
| throw new ZkInterruptedException(e); |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| public IZkConnection getConnection() { |
| return _connection; |
| } |
| |
| public long waitForEstablishedSession(long timeout, TimeUnit timeUnit) { |
| validateCurrentThread(); |
| |
| acquireEventLock(); |
| try { |
| if (!waitForKeeperState(KeeperState.SyncConnected, timeout, timeUnit)) { |
| throw new ZkTimeoutException("Waiting to be connected to ZK server has timed out."); |
| } |
| // Reading session ID before unlocking event lock is critical to guarantee the established |
| // session's ID won't change. |
| return getSessionId(); |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| public boolean waitUntilConnected(long time, TimeUnit timeUnit) throws ZkInterruptedException { |
| return waitForKeeperState(KeeperState.SyncConnected, time, timeUnit); |
| } |
| |
| public boolean waitForKeeperState(KeeperState keeperState, long time, TimeUnit timeUnit) |
| throws ZkInterruptedException { |
| validateCurrentThread(); |
| Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time)); |
| |
| LOG.debug("zkclient {}, Waiting for keeper state {} ", _uid, keeperState); |
| acquireEventLock(); |
| try { |
| boolean stillWaiting = true; |
| while (_currentState != keeperState) { |
| if (!stillWaiting) { |
| return false; |
| } |
| stillWaiting = getEventLock().getStateChangedCondition().awaitUntil(timeout); |
| } |
| LOG.debug("zkclient {} State is {}", |
| _uid, (_currentState == null ? "CLOSED" : _currentState)); |
| return true; |
| } catch (InterruptedException e) { |
| throw new ZkInterruptedException(e); |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| private void acquireEventLock() { |
| try { |
| getEventLock().lockInterruptibly(); |
| } catch (InterruptedException e) { |
| throw new ZkInterruptedException(e); |
| } |
| } |
| |
| /** |
| * @param <T> |
| * @param callable |
| * @return result of Callable |
| * @throws ZkInterruptedException |
| * if operation was interrupted, or a required reconnection got interrupted |
| * @throws IllegalArgumentException |
| * if called from anything except the ZooKeeper event thread |
| * @throws ZkException |
| * if any ZooKeeper exception occurred |
| * @throws RuntimeException |
| * if any other exception occurs from invoking the Callable |
| */ |
| public <T> T retryUntilConnected(final Callable<T> callable) |
| throws IllegalArgumentException, ZkException { |
| if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { |
| throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); |
| } |
| final long operationStartTime = System.currentTimeMillis(); |
| if (_monitor != null) { |
| _monitor.increaseOutstandingRequestGauge(); |
| } |
| try { |
| while (true) { |
| // Because ConnectionLossException and SessionExpiredException are caught but not thrown, |
| // we don't know what causes retry. This is used to record which one of the two exceptions |
| // causes retry in ZkTimeoutException. |
| // This also helps the test testConnectionLossWhileCreateEphemeral. |
| KeeperException.Code retryCauseCode; |
| |
| if (isClosed()) { |
| throw new IllegalStateException("ZkClient already closed!"); |
| } |
| try { |
| final ZkConnection zkConnection = (ZkConnection) getConnection(); |
| // Validate that the connection is not null before trigger callback |
| if (zkConnection == null || zkConnection.getZookeeper() == null) { |
| throw new IllegalStateException( |
| "ZkConnection is in invalid state! Please close this ZkClient and create new client."); |
| } |
| return callable.call(); |
| } catch (ConnectionLossException e) { |
| retryCauseCode = e.code(); |
| // we give the event thread some time to update the status to 'Disconnected' |
| Thread.yield(); |
| waitForRetry(); |
| } catch (SessionExpiredException e) { |
| retryCauseCode = e.code(); |
| // we give the event thread some time to update the status to 'Expired' |
| Thread.yield(); |
| waitForRetry(); |
| } catch (ZkSessionMismatchedException e) { |
| throw e; |
| } catch (KeeperException e) { |
| throw ZkException.create(e); |
| } catch (InterruptedException e) { |
| throw new ZkInterruptedException(e); |
| } catch (Exception e) { |
| throw ExceptionUtil.convertToRuntimeException(e); |
| } |
| |
| LOG.debug("zkclient {}, Retrying operation, caused by {}", _uid,retryCauseCode); |
| // before attempting a retry, check whether retry timeout has elapsed |
| if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) { |
| throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" |
| + _operationRetryTimeoutInMillis + " milli seconds). Retry was caused by " |
| + retryCauseCode); |
| } |
| } |
| } finally { |
| if (_monitor != null) { |
| _monitor.decreaseOutstandingRequestGauge(); |
| } |
| } |
| } |
| |
| private void waitForRetry() { |
| waitUntilConnected(_operationRetryTimeoutInMillis, TimeUnit.MILLISECONDS); |
| } |
| |
| public void setCurrentState(KeeperState currentState) { |
| getEventLock().lock(); |
| try { |
| _currentState = currentState; |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| /** |
| * Returns a mutex all zookeeper events are synchronized aginst. So in case you need to do |
| * something without getting |
| * any zookeeper event interruption synchronize against this mutex. Also all threads waiting on |
| * this mutex object |
| * will be notified on an event. |
| * @return the mutex. |
| */ |
| public ZkLock getEventLock() { |
| return _zkEventLock; |
| } |
| |
| /** |
| * Delete the given path. Path should not have any children or the deletion will fail. |
| * This function will throw exception if we fail to delete an existing path |
| * @param path |
| * @return true if path is successfully deleted, false if path does not exist |
| */ |
| public boolean delete(final String path) { |
| long startT = System.currentTimeMillis(); |
| boolean success; |
| try { |
| try { |
| retryUntilConnected(new Callable<Object>() { |
| |
| @Override |
| public Object call() throws Exception { |
| getConnection().delete(path); |
| return null; |
| } |
| }); |
| success = true; |
| } catch (ZkNoNodeException e) { |
| success = false; |
| LOG.debug("zkclient {}, Failed to delete path {}, znode does not exist!", _uid, path, e); |
| } |
| record(path, null, startT, ZkClientMonitor.AccessType.WRITE); |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.WRITE); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient {} delete, path: {}, time {} ms", _uid, path, (endT - startT)); |
| } |
| } |
| return success; |
| } |
| |
| public void setZkSerializer(ZkSerializer zkSerializer) { |
| _pathBasedZkSerializer = new BasicZkSerializer(zkSerializer); |
| } |
| |
| public void setZkSerializer(PathBasedZkSerializer zkSerializer) { |
| _pathBasedZkSerializer = zkSerializer; |
| } |
| |
| public PathBasedZkSerializer getZkSerializer() { |
| return _pathBasedZkSerializer; |
| } |
| |
| public byte[] serialize(Object data, String path) { |
| return _pathBasedZkSerializer.serialize(data, path); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T extends Object> T deserialize(byte[] data, String path) { |
| if (data == null) { |
| return null; |
| } |
| return (T) _pathBasedZkSerializer.deserialize(data, path); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T extends Object> T readData(String path) { |
| return (T) readData(path, false); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T extends Object> T readData(String path, boolean returnNullIfPathNotExists) { |
| T data = null; |
| try { |
| data = (T) readData(path, null); |
| } catch (ZkNoNodeException e) { |
| if (!returnNullIfPathNotExists) { |
| throw e; |
| } |
| } |
| return data; |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T extends Object> T readData(String path, Stat stat) { |
| return (T) readData(path, stat, hasListeners(path)); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T extends Object> T readData(final String path, final Stat stat, final boolean watch) { |
| long startT = System.currentTimeMillis(); |
| byte[] data = null; |
| try { |
| data = retryUntilConnected(new Callable<byte[]>() { |
| |
| @Override |
| public byte[] call() throws Exception { |
| return getConnection().readData(path, stat, watch); |
| } |
| }); |
| record(path, data, startT, ZkClientMonitor.AccessType.READ); |
| return (T) deserialize(data, path); |
| } catch (ZkNoNodeException e) { |
| record(path, data, startT, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.READ); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient {}, getData, path {}, time {} ms", _uid, path, (endT - startT)); |
| } |
| } |
| } |
| |
| @SuppressWarnings("unchecked") |
| public <T extends Object> T readDataAndStat(String path, Stat stat, |
| boolean returnNullIfPathNotExists) { |
| T data = null; |
| try { |
| data = readData(path, stat); |
| } catch (ZkNoNodeException e) { |
| if (!returnNullIfPathNotExists) { |
| throw e; |
| } |
| } |
| return data; |
| } |
| |
| public void writeData(String path, Object object) { |
| writeData(path, object, -1); |
| } |
| |
| /** |
| * Updates data of an existing znode. The current content of the znode is passed to the |
| * {@link DataUpdater} that is |
| * passed into this method, which returns the new content. The new content is only written back to |
| * ZooKeeper if |
| * nobody has modified the given znode in between. If a concurrent change has been detected the |
| * new data of the |
| * znode is passed to the updater once again until the new contents can be successfully written |
| * back to ZooKeeper. |
| * @param <T> |
| * @param path |
| * The path of the znode. |
| * @param updater |
| * Updater that creates the new contents. |
| */ |
| @SuppressWarnings("unchecked") |
| public <T extends Object> void updateDataSerialized(String path, DataUpdater<T> updater) { |
| Stat stat = new Stat(); |
| boolean retry; |
| do { |
| retry = false; |
| try { |
| T oldData = (T) readData(path, stat); |
| T newData = updater.update(oldData); |
| writeData(path, newData, stat.getVersion()); |
| } catch (ZkBadVersionException e) { |
| retry = true; |
| } |
| } while (retry); |
| } |
| |
| public void writeData(final String path, Object datat, final int expectedVersion) { |
| writeDataReturnStat(path, datat, expectedVersion); |
| } |
| |
| public Stat writeDataReturnStat(final String path, Object datat, final int expectedVersion) { |
| long startT = System.currentTimeMillis(); |
| try { |
| final byte[] data = serialize(datat, path); |
| checkDataSizeLimit(path, data); |
| final Stat stat = (Stat) retryUntilConnected( |
| (Callable<Object>) () -> getConnection().writeDataReturnStat(path, data, expectedVersion)); |
| record(path, data, startT, ZkClientMonitor.AccessType.WRITE); |
| return stat; |
| } catch (Exception e) { |
| recordFailure(path, ZkClientMonitor.AccessType.WRITE); |
| throw e; |
| } finally { |
| long endT = System.currentTimeMillis(); |
| if (LOG.isTraceEnabled()) { |
| LOG.trace("zkclient {}, setData, path {}, time {} ms", _uid, path, (endT - startT)); |
| } |
| } |
| } |
| |
| public Stat writeDataGetStat(final String path, Object datat, final int expectedVersion) { |
| return writeDataReturnStat(path, datat, expectedVersion); |
| } |
| |
| public void asyncCreate(final String path, Object datat, final CreateMode mode, |
| final ZkAsyncCallbacks.CreateCallbackHandler cb) { |
| final long startT = System.currentTimeMillis(); |
| final byte[] data; |
| try { |
| data = (datat == null ? null : serialize(datat, path)); |
| } catch (ZkMarshallingError e) { |
| cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); |
| return; |
| } |
| doAsyncCreate(path, data, mode, TTL_NOT_SET, startT, cb, parseExpectedSessionId(datat)); |
| } |
| |
| private void doAsyncCreate(final String path, final byte[] data, final CreateMode mode, long ttl, |
| final long startT, final ZkAsyncCallbacks.CreateCallbackHandler cb, final String expectedSessionId) { |
| try { |
| retryUntilConnected(() -> { |
| getExpectedZookeeper(expectedSessionId).create(path, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode, cb, |
| new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false, |
| GZipCompressionUtil.isCompressed(data)) { |
| @Override |
| protected void doRetry() { |
| doAsyncCreate(path, data, mode, ttl, System.currentTimeMillis(), cb, expectedSessionId); |
| } |
| }, ttl); |
| return null; |
| }); |
| } catch (RuntimeException e) { |
| // Process callback to release caller from waiting |
| cb.processResult(KeeperException.Code.APIERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null); |
| throw e; |
| } |
| } |
| |
| public void asyncCreate(final String path, Object datat, final CreateMode mode, long ttl, |
| final ZkAsyncCallbacks.CreateCallbackHandler cb) { |
| final long startT = System.currentTimeMillis(); |
| final byte[] data; |
| try { |
| data = (datat == null ? null : serialize(datat, path)); |
| } catch (ZkMarshallingError e) { |
| cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null, null); |
| return; |
| } |
| doAsyncCreate(path, data, mode, ttl, startT, cb, parseExpectedSessionId(datat)); |
| } |
| |
| // Async Data Accessors |
| public void asyncSetData(final String path, Object datat, final int version, |
| final ZkAsyncCallbacks.SetDataCallbackHandler cb) { |
| final long startT = System.currentTimeMillis(); |
| final byte[] data; |
| try { |
| data = serialize(datat, path); |
| } catch (ZkMarshallingError e) { |
| cb.processResult(KeeperException.Code.MARSHALLINGERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); |
| return; |
| } |
| doAsyncSetData(path, data, version, startT, cb, parseExpectedSessionId(datat)); |
| } |
| |
| private void doAsyncSetData(final String path, byte[] data, final int version, final long startT, |
| final ZkAsyncCallbacks.SetDataCallbackHandler cb, final String expectedSessionId) { |
| try { |
| retryUntilConnected(() -> { |
| getExpectedZookeeper(expectedSessionId).setData(path, data, version, cb, |
| new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, data == null ? 0 : data.length, |
| false, GZipCompressionUtil.isCompressed(data)) { |
| @Override |
| protected void doRetry() { |
| doAsyncSetData(path, data, version, System.currentTimeMillis(), cb, expectedSessionId); |
| } |
| }); |
| return null; |
| }); |
| } catch (RuntimeException e) { |
| // Process callback to release caller from waiting |
| cb.processResult(KeeperException.Code.APIERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, false), null); |
| throw e; |
| } |
| } |
| |
| public void asyncGetData(final String path, final ZkAsyncCallbacks.GetDataCallbackHandler cb) { |
| final long startT = System.currentTimeMillis(); |
| try { |
| retryUntilConnected(() -> { |
| ((ZkConnection) getConnection()).getZookeeper().getData(path, null, cb, |
| new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) { |
| @Override |
| protected void doRetry() { |
| asyncGetData(path, cb); |
| } |
| }); |
| return null; |
| }); |
| } catch (RuntimeException e) { |
| // Process callback to release caller from waiting |
| cb.processResult(KeeperException.Code.APIERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, true), null, null); |
| throw e; |
| } |
| } |
| |
| public void asyncExists(final String path, final ZkAsyncCallbacks.ExistsCallbackHandler cb) { |
| final long startT = System.currentTimeMillis(); |
| try { |
| retryUntilConnected(() -> { |
| ((ZkConnection) getConnection()).getZookeeper().exists(path, null, cb, |
| new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, true) { |
| @Override |
| protected void doRetry() { |
| asyncExists(path, cb); |
| } |
| }); |
| return null; |
| }); |
| } catch (RuntimeException e) { |
| // Process callback to release caller from waiting |
| cb.processResult(KeeperException.Code.APIERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, true), null); |
| throw e; |
| } |
| } |
| |
| public void asyncDelete(final String path, final ZkAsyncCallbacks.DeleteCallbackHandler cb) { |
| final long startT = System.currentTimeMillis(); |
| try { |
| retryUntilConnected(() -> { |
| ((ZkConnection) getConnection()).getZookeeper().delete(path, -1, cb, |
| new ZkAsyncRetryCallContext(_asyncCallRetryThread, cb, _monitor, startT, 0, false) { |
| @Override |
| protected void doRetry() { |
| asyncDelete(path, cb); |
| } |
| }); |
| return null; |
| }); |
| } catch (RuntimeException e) { |
| // Process callback to release caller from waiting |
| cb.processResult(KeeperException.Code.APIERROR.intValue(), path, |
| new ZkAsyncCallMonitorContext(_monitor, startT, 0, false)); |
| throw e; |
| } |
| } |
| |
| private void checkDataSizeLimit(String path, byte[] data) { |
| if (data == null) { |
| return; |
| } |
| |
| if (data.length > WRITE_SIZE_LIMIT) { |
| throw new ZkClientException("Data size of path " + path |
| + " is greater than write size limit " |
| + WRITE_SIZE_LIMIT + " bytes"); |
| } |
| } |
| |
| public void watchForData(final String path) { |
| watchForData(path, false); |
| } |
| |
| private boolean watchForData(final String path, boolean skipWatchingNonExistNode) { |
| try { |
| if (skipWatchingNonExistNode) { |
| retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper().getData(path, true, new Stat()))); |
| } else { |
| retryUntilConnected(() -> (((ZkConnection) getConnection()).getZookeeper().exists(path, true))); |
| } |
| } catch (ZkNoNodeException e) { |
| // Do nothing, this is what we want as this is not going to leak watch in ZooKeeepr server. |
| LOG.info("zkclient {}, watchForData path not existing: {} ", _uid, path); |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Installs a child watch for the given path. |
| * @param path |
| * @return the current children of the path or null if the zk node with the given path doesn't |
| * exist. |
| */ |
| public List<String> watchForChilds(final String path) { |
| return watchForChilds(path, false); |
| } |
| |
| /** |
| * The following captures about how we reason Zookeeper watch leakage issue based on various |
| * comments in review |
| * 1. Removal of a parent zk path (such as currentstate/sessionid) is async to all threads in |
| * Helix router or controller which watches the path. Thus, if we install a watch to a path |
| * expected to be created, we always have the risk of leaking if the path changed. |
| * |
| * 2. Current the CallbackHandler life cycle is like this: |
| * CallbackHandler for currentstate and some others can be created before the parent path is |
| * created. Thus, we still needs exists() call. This corresponds to INIT change type of |
| * CallbackHanlder. This is the time eventually watchForChilds() with be called with |
| * skipWatchingNonExistNode as false. |
| * Aside from creation time, CallbackHandler normal cycle would see CALLBACK change type. This |
| * time we should normally expected the parent path is created. Thus, the subscription from |
| * CallbackHandler would use skipWatchingNonExistNode false. Avoid leaking path. |
| * Note, if the path is removed, CallbackHandler would see children of parent path as null. THis |
| * would end the CallbackHanlder' life. |
| * |
| * From the above life cycle of Callbackhandler, we know the only place that can leak is that |
| * INIT change type time, participant expires the session more than twice in a row before the |
| * watchForChild(skipWatchingNonExistNode=false) issue exists() call. |
| * |
| * THe chance of this sequence is slim though. |
| * |
| */ |
| private List<String> watchForChilds(final String path, boolean skipWatchingNonExistNode) { |
| if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { |
| throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); |
| } |
| return retryUntilConnected(new Callable<List<String>>() { |
| @Override |
| public List<String> call() throws Exception { |
| if (!skipWatchingNonExistNode) { |
| exists(path, true); |
| } |
| try { |
| return getChildren(path, true); |
| } catch (ZkNoNodeException e) { |
| // ignore, the "exists" watch will listen for the parent node to appear |
| LOG.info("zkclient{} watchForChilds path not existing:{} skipWatchingNodeNoteExist: {}", |
| _uid, path, skipWatchingNonExistNode); |
| } |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * Add authentication information to the connection. This will be used to identify the user and |
| * check access to |
| * nodes protected by ACLs |
| * @param scheme |
| * @param auth |
| */ |
| public void addAuthInfo(final String scheme, final byte[] auth) { |
| retryUntilConnected(new Callable<Object>() { |
| @Override |
| public Object call() throws Exception { |
| getConnection().addAuthInfo(scheme, auth); |
| return null; |
| } |
| }); |
| } |
| |
| /** |
| * Connect to ZooKeeper. |
| * @param maxMsToWaitUntilConnected |
| * @param watcher |
| * @throws ZkInterruptedException |
| * if the connection timed out due to thread interruption |
| * @throws ZkTimeoutException |
| * if the connection timed out |
| * @throws IllegalStateException |
| * if the connection timed out due to thread interruption |
| */ |
| public void connect(final long maxMsToWaitUntilConnected, Watcher watcher) |
| throws ZkInterruptedException, ZkTimeoutException, IllegalStateException { |
| if (isClosed()) { |
| throw new IllegalStateException("ZkClient already closed!"); |
| } |
| boolean started = false; |
| acquireEventLock(); |
| try { |
| setShutdownTrigger(false); |
| |
| IZkConnection zkConnection = getConnection(); |
| _eventThread = new ZkEventThread(zkConnection.getServers()); |
| |
| if (_monitor != null) { |
| boolean result = _monitor.setAndInitZkEventThreadMonitor(_eventThread); |
| if (!result) { |
| LOG.error("register _eventThread monitor failed due to an existing one"); |
| } |
| } |
| |
| _eventThread.start(); |
| |
| LOG.debug("ZkClient {}, _eventThread {}", _uid, _eventThread.getId()); |
| |
| if (isManagingZkConnection()) { |
| zkConnection.connect(watcher); |
| LOG.debug("zkclient{} Awaiting connection to Zookeeper server", _uid); |
| if (!waitUntilConnected(maxMsToWaitUntilConnected, TimeUnit.MILLISECONDS)) { |
| throw new ZkTimeoutException( |
| "Unable to connect to zookeeper server within timeout: " + maxMsToWaitUntilConnected); |
| } |
| } else { |
| // if the client is not managing connection, the input connection is supposed to connect. |
| if (isConnectionClosed()) { |
| throw new ZkClientException( |
| "Unable to connect to zookeeper server with the specified ZkConnection"); |
| } |
| // TODO Refine the init state here. Here we pre-config it to be connected. This may not be |
| // the case, if the connection is connecting or recovering. -- JJ |
| // For shared client, the event notification will not be forwarded before wather add to the |
| // connection manager. |
| setCurrentState(KeeperState.SyncConnected); |
| } |
| |
| started = true; |
| } finally { |
| getEventLock().unlock(); |
| |
| // we should close the zookeeper instance, otherwise it would keep |
| // on trying to connect |
| if (!started) { |
| close(); |
| } |
| } |
| } |
| |
| public long getCreationTime(String path) { |
| acquireEventLock(); |
| try { |
| return getConnection().getCreateTime(path); |
| } catch (KeeperException e) { |
| throw ZkException.create(e); |
| } catch (InterruptedException e) { |
| throw new ZkInterruptedException(e); |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| public String getServers() { |
| return getConnection().getServers(); |
| } |
| |
| /** |
| * Close the client. |
| * @throws ZkInterruptedException |
| */ |
| public void close() throws ZkInterruptedException { |
| if (LOG.isTraceEnabled()) { |
| StackTraceElement[] calls = Thread.currentThread().getStackTrace(); |
| LOG.trace("Closing a zkclient uid:{}, callStack: {} ", _uid, Arrays.asList(calls)); |
| } |
| getEventLock().lock(); |
| IZkConnection connection = getConnection(); |
| try { |
| if (connection == null || _closed) { |
| return; |
| } |
| setShutdownTrigger(true); |
| if (_asyncCallRetryThread != null) { |
| _asyncCallRetryThread.interrupt(); |
| _asyncCallRetryThread.join(2000); |
| } |
| _eventThread.interrupt(); |
| _eventThread.join(2000); |
| if (isManagingZkConnection()) { |
| LOG.info("Closing zkclient uid:{}, zk:{}", _uid, ((ZkConnection) connection).getZookeeper()); |
| connection.close(); |
| } |
| _closed = true; |
| |
| // send state change notification to unlock any wait |
| setCurrentState(null); |
| getEventLock().getStateChangedCondition().signalAll(); |
| } catch (InterruptedException e) { |
| /** |
| * Workaround for HELIX-264: calling ZkClient#close() in its own eventThread context will |
| * throw ZkInterruptedException and skip ZkConnection#close() |
| */ |
| if (connection != null) { |
| try { |
| /** |
| * ZkInterruptedException#construct() honors InterruptedException by calling |
| * Thread.currentThread().interrupt(); clear it first, so we can safely close the |
| * zk-connection |
| */ |
| Thread.interrupted(); |
| if (isManagingZkConnection()) { |
| connection.close(); |
| } |
| /** |
| * restore interrupted status of current thread |
| */ |
| Thread.currentThread().interrupt(); |
| } catch (InterruptedException e1) { |
| throw new ZkInterruptedException(e1); |
| } |
| } |
| } finally { |
| getEventLock().unlock(); |
| if (_monitor != null) { |
| _monitor.unregister(); |
| } |
| LOG.info("Closed zkclient with uid:{}", _uid); |
| } |
| } |
| |
| public boolean isClosed() { |
| try { |
| getEventLock().lock(); |
| return _closed; |
| } finally { |
| getEventLock().unlock(); |
| } |
| } |
| |
| public boolean isConnectionClosed() { |
| IZkConnection connection = getConnection(); |
| return (connection == null || connection.getZookeeperState() == null || !connection |
| .getZookeeperState().isAlive()); |
| } |
| |
| public void setShutdownTrigger(boolean triggerState) { |
| _shutdownTriggered = triggerState; |
| } |
| |
| public boolean getShutdownTrigger() { |
| return _shutdownTriggered; |
| } |
| |
| public int numberOfListeners() { |
| int listeners = 0; |
| for (Set<IZkChildListener> childListeners : _childListener.values()) { |
| listeners += childListeners.size(); |
| } |
| for (Set<IZkDataListenerEntry> dataListeners : _dataListener.values()) { |
| listeners += dataListeners.size(); |
| } |
| listeners += _stateListener.size(); |
| |
| return listeners; |
| } |
| |
| public List<OpResult> multi(final Iterable<Op> ops) throws ZkException { |
| if (ops == null) { |
| throw new NullPointerException("ops must not be null."); |
| } |
| |
| return retryUntilConnected(new Callable<List<OpResult>>() { |
| |
| @Override |
| public List<OpResult> call() throws Exception { |
| return getConnection().multi(ops); |
| } |
| }); |
| } |
| |
| /** |
| * @return true if this ZkClient is managing the ZkConnection. |
| */ |
| protected boolean isManagingZkConnection() { |
| return true; |
| } |
| |
| public long getSessionId() { |
| ZkConnection zkConnection = ((ZkConnection) getConnection()); |
| ZooKeeper zk = zkConnection.getZookeeper(); |
| if (zk == null) { |
| throw new ZkClientException( |
| "ZooKeeper connection information is not available now. ZkClient might be disconnected."); |
| } else { |
| return zkConnection.getZookeeper().getSessionId(); |
| } |
| } |
| |
| /* |
| * Gets a session id in hexadecimal notation. |
| * Ex. 1000a5ceb930004 is returned. |
| */ |
| private String getHexSessionId() { |
| return Long.toHexString(getSessionId()); |
| } |
| |
| /* |
| * Gets the zookeeper instance that ensures its session ID matches the expected session ID. |
| * It is used for write operations that suppose the znode to be created by the expected session. |
| */ |
| private ZooKeeper getExpectedZookeeper(final String expectedSessionId) { |
| /* |
| * Cache the zookeeper reference and make sure later zooKeeper.create() is being run |
| * under this zookeeper connection. This is to avoid zk session change after expected |
| * session check. |
| */ |
| ZooKeeper zk = ((ZkConnection) getConnection()).getZookeeper(); |
| |
| /* |
| * The operation is NOT session aware, we will use the actual zookeeper session without |
| * checking expected session. |
| */ |
| if (expectedSessionId == null || expectedSessionId.isEmpty()) { |
| return zk; |
| } |
| |
| /* |
| * If operation is session aware (expectedSession is valid), |
| * we have to check whether or not the passed-in(expected) session id |
| * matches actual session's id. |
| * If not, we should not return a zk object for the zk operation. |
| */ |
| final String actualSessionId = Long.toHexString(zk.getSessionId()); |
| if (!actualSessionId.equals(expectedSessionId)) { |
| throw new ZkSessionMismatchedException( |
| "Failed to get expected zookeeper instance! There is a session id mismatch. Expected: " |
| + expectedSessionId + ". Actual: " + actualSessionId); |
| } |
| |
| return zk; |
| } |
| |
| private String parseExpectedSessionId(Object data) { |
| if (!(data instanceof SessionAwareZNRecord)) { |
| return null; |
| } |
| return ((SessionAwareZNRecord) data).getExpectedSessionId(); |
| } |
| |
| // operations to update monitor's counters |
| private void record(String path, byte[] data, long startTimeMilliSec, |
| ZkClientMonitor.AccessType accessType) { |
| if (_monitor != null) { |
| int dataSize = (data != null) ? data.length : 0; |
| _monitor.record(path, dataSize, startTimeMilliSec, accessType); |
| |
| if (GZipCompressionUtil.isCompressed(data)) { |
| _monitor.increaseZnodeCompressCounter(); |
| } |
| } |
| } |
| |
| private void recordFailure(String path, ZkClientMonitor.AccessType accessType) { |
| if (_monitor != null) { |
| _monitor.recordFailure(path, accessType); |
| } |
| } |
| |
| private void recordStateChange(boolean stateChanged, boolean dataChanged, boolean sessionExpired) { |
| // update state change counter. |
| if (_monitor != null) { |
| if (stateChanged) { |
| _monitor.increaseStateChangeEventCounter(); |
| } |
| if (dataChanged) { |
| _monitor.increaseDataChangeEventCounter(); |
| } |
| if (sessionExpired) { |
| _monitor.increasExpiredSessionCounter(); |
| } |
| } |
| } |
| |
| /** |
| * Creates a {@link IZkStateListener} that wraps a default |
| * implementation of {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}, which means the returned |
| * listener runs the methods of {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}. |
| * This is for backward compatibility with {@link org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener}. |
| */ |
| private static class IZkStateListenerI0ItecImpl implements IZkStateListener { |
| private org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener _listener; |
| |
| IZkStateListenerI0ItecImpl( |
| org.apache.helix.zookeeper.zkclient.deprecated.IZkStateListener listener) { |
| _listener = listener; |
| } |
| |
| @Override |
| public void handleStateChanged(KeeperState keeperState) throws Exception { |
| _listener.handleStateChanged(keeperState); |
| } |
| |
| @Override |
| public void handleNewSession(final String sessionId) throws Exception { |
| /* |
| * org.I0Itec.zkclient.IZkStateListener does not have handleNewSession(sessionId), |
| * so just call handleNewSession() by default. |
| */ |
| _listener.handleNewSession(); |
| } |
| |
| @Override |
| public void handleSessionEstablishmentError(Throwable error) throws Exception { |
| _listener.handleSessionEstablishmentError(error); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj == this) { |
| return true; |
| } |
| if (!(obj instanceof IZkStateListenerI0ItecImpl)) { |
| return false; |
| } |
| if (_listener == null) { |
| return false; |
| } |
| |
| IZkStateListenerI0ItecImpl defaultListener = (IZkStateListenerI0ItecImpl) obj; |
| |
| return _listener.equals(defaultListener._listener); |
| } |
| |
| @Override |
| public int hashCode() { |
| /* |
| * The original listener's hashcode helps find the wrapped listener with the same original |
| * listener. This is helpful in unsubscribeStateChanges(listener) when finding the listener |
| * to remove. |
| */ |
| return _listener.hashCode(); |
| } |
| } |
| |
| private void validateCurrentThread() { |
| if (_zookeeperEventThread != null && Thread.currentThread() == _zookeeperEventThread) { |
| throw new IllegalArgumentException("Must not be done in the zookeeper event thread."); |
| } |
| } |
| |
| private void checkNumChildrenLimit(String path) throws KeeperException { |
| Stat stat = getStat(path); |
| if (stat == null) { |
| return; |
| } |
| |
| if (stat.getNumChildren() > NUM_CHILDREN_LIMIT) { |
| LOG.error("Failed to get children for path {} because of connection loss. " |
| + "Number of children {} exceeds limit {}, aborting retry.", path, stat.getNumChildren(), |
| NUM_CHILDREN_LIMIT); |
| // MarshallingErrorException could represent transport error: exceeding the |
| // Jute buffer size. So use it to exit retry loop and tell that zk is not able to |
| // transport the data because packet length is too large. |
| throw new KeeperException.MarshallingErrorException(); |
| } else { |
| LOG.debug("Number of children {} is less than limit {}, not exiting retry.", |
| stat.getNumChildren(), NUM_CHILDREN_LIMIT); |
| } |
| } |
| |
| private void validateWriteSizeLimitConfig() { |
| int serializerSize = ZNRecordUtil.getSerializerWriteSizeLimit(); |
| LOG.info("ZNRecord serializer write size limit: {}; ZkClient write size limit: {}", |
| serializerSize, WRITE_SIZE_LIMIT); |
| if (serializerSize > WRITE_SIZE_LIMIT) { |
| throw new IllegalStateException("ZNRecord serializer write size limit " + serializerSize |
| + " is greater than ZkClient size limit " + WRITE_SIZE_LIMIT); |
| } |
| } |
| } |