blob: 816d0bda04c88a18868e154a885d18c0ee25a1d8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.ACLProvider;
import org.apache.curator.framework.api.CompressionProvider;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetACLBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetConfigBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.ReconfigBuilder;
import org.apache.curator.framework.api.RemoveWatchesBuilder;
import org.apache.curator.framework.api.SetACLBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.api.SyncBuilder;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.WatchesBuilder;
import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateManager;
import org.apache.curator.utils.DebugUtils;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZookeeperCompatibility;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ZKClientConfig;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CuratorFrameworkImpl implements CuratorFramework {
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorZookeeperClient client;
private final StandardListenerManager<CuratorListener> listeners;
private final StandardListenerManager<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
private final int maxCloseWaitMs;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final BlockingQueue<OperationAndData<?>> forcedSleepOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
private final List<AuthInfo> authInfos;
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final boolean useContainerParentsIfAvailable;
private final ConnectionStateErrorPolicy connectionStateErrorPolicy;
private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
private final InternalConnectionHandler internalConnectionHandler;
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final Executor runSafeService;
private final ZookeeperCompatibility zookeeperCompatibility;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL =
!Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
interface DebugBackgroundListener {
void listen(OperationAndData<?> data);
}
volatile DebugBackgroundListener debugListener = null;
@VisibleForTesting
public volatile UnhandledErrorListener debugUnhandledErrorListener = null;
private final AtomicReference<CuratorFrameworkState> state;
private final Object closeLock = new Object();
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder) {
ZookeeperFactory localZookeeperFactory =
makeZookeeperFactory(builder.getZookeeperFactory(), builder.getZkClientConfig());
this.client = new CuratorZookeeperClient(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
CuratorEvent event = new CuratorEventImpl(
CuratorFrameworkImpl.this,
CuratorEventType.WATCHED,
watchedEvent.getState().getIntValue(),
unfixForNamespace(watchedEvent.getPath()),
null,
null,
null,
null,
null,
watchedEvent,
null,
null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly());
internalConnectionHandler = new StandardInternalConnectionHandler();
listeners = StandardListenerManager.standard();
unhandledErrorListeners = StandardListenerManager.standard();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
forcedSleepOperations = new LinkedBlockingQueue<>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(
this,
builder.getThreadFactory(),
builder.getSessionTimeoutMs(),
builder.getSimulatedSessionExpirationPercent(),
builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
connectionStateErrorPolicy =
Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null)
? Arrays.copyOf(builderDefaultData, builderDefaultData.length)
: new byte[0];
authInfos = buildAuths(builder);
failedDeleteManager = new FailedDeleteManager(this);
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
ensembleTracker =
builder.withEnsembleTracker() ? new EnsembleTracker(this, builder.getEnsembleProvider()) : null;
runSafeService = makeRunSafeService(builder);
zookeeperCompatibility = builder.getZookeeperCompatibility();
}
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder) {
if (builder.getRunSafeService() != null) {
return builder.getRunSafeService();
}
ThreadFactory threadFactory = builder.getThreadFactory();
if (threadFactory == null) {
threadFactory = ThreadUtils.newThreadFactory("SafeNotifyService");
}
return Executors.newSingleThreadExecutor(threadFactory);
}
private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder) {
ImmutableList.Builder<AuthInfo> builder1 = ImmutableList.builder();
if (builder.getAuthInfos() != null) {
builder1.addAll(builder.getAuthInfos());
}
return builder1.build();
}
@Override
public CompletableFuture<Void> runSafe(Runnable runnable) {
return CompletableFuture.runAsync(runnable, runSafeService);
}
@Override
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {
return new WatcherRemovalFacade(this);
}
@Override
public QuorumVerifier getCurrentConfig() {
return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null;
}
private ZookeeperFactory makeZookeeperFactory(
final ZookeeperFactory actualZookeeperFactory, ZKClientConfig zkClientConfig) {
return new ZookeeperFactory() {
@Override
public ZooKeeper newZooKeeper(
String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception {
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(
connectString, sessionTimeout, watcher, canBeReadOnly, zkClientConfig);
addAuthInfos(zooKeeper);
return zooKeeper;
}
};
}
private void addAuthInfos(ZooKeeper zooKeeper) {
for (AuthInfo auth : authInfos) {
zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
}
}
private ThreadFactory getThreadFactory(CuratorFrameworkFactory.Builder builder) {
ThreadFactory threadFactory = builder.getThreadFactory();
if (threadFactory == null) {
threadFactory = ThreadUtils.newThreadFactory("Framework");
}
return threadFactory;
}
protected CuratorFrameworkImpl(CuratorFrameworkImpl parent) {
client = parent.client;
listeners = parent.listeners;
unhandledErrorListeners = parent.unhandledErrorListeners;
threadFactory = parent.threadFactory;
maxCloseWaitMs = parent.maxCloseWaitMs;
backgroundOperations = parent.backgroundOperations;
forcedSleepOperations = parent.forcedSleepOperations;
connectionStateManager = parent.connectionStateManager;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
namespace = parent.namespace;
state = parent.state;
authInfos = parent.authInfos;
useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
internalConnectionHandler = parent.internalConnectionHandler;
schemaSet = parent.schemaSet;
ensembleTracker = parent.ensembleTracker;
runSafeService = parent.runSafeService;
zookeeperCompatibility = parent.zookeeperCompatibility;
}
@Override
public void createContainers(String path) throws Exception {
checkExists().creatingParentContainersIfNeeded().forPath(ZKPaths.makePath(path, "foo"));
}
@Override
public void clearWatcherReferences(Watcher watcher) {
// NOP
}
@Override
public CuratorFrameworkState getState() {
return state.get();
}
@Override
@Deprecated
public boolean isStarted() {
return state.get() == CuratorFrameworkState.STARTED;
}
@Override
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException {
return connectionStateManager.blockUntilConnected(maxWaitTime, units);
}
@Override
public void blockUntilConnected() throws InterruptedException {
blockUntilConnected(0, null);
}
@Override
public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {
return connectionStateErrorPolicy;
}
@Override
public void start() {
log.info("Starting");
if (!state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED)) {
throw new IllegalStateException("Cannot be started more than once");
}
try {
connectionStateManager.start(); // ordering dependency - must be called before client.start()
final ConnectionStateListener listener = new ConnectionStateListener() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
if (ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState) {
logAsErrorConnectionErrors.set(true);
}
}
@Override
public boolean doNotProxy() {
return true;
}
};
this.getConnectionStateListenable().addListener(listener);
client.start();
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
backgroundOperationsLoop();
return null;
}
});
if (ensembleTracker != null) {
ensembleTracker.start();
}
log.info(schemaSet.toDocumentation());
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
/**
* Change state from {@link CuratorFrameworkState#STARTED} to {@link CuratorFrameworkState#STOPPED}
* in {@link #closeLock}.
*
* <p>This gives us synchronized view of {@link #state} and other components in closing.</p>
*
* @return true if state changed by this call
*/
private boolean closeWithLock() {
synchronized (closeLock) {
return state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED);
}
}
@Override
public void close() {
log.debug("Closing");
if (closeWithLock()) {
listeners.forEach(listener -> {
CuratorEvent event = new CuratorEventImpl(
CuratorFrameworkImpl.this,
CuratorEventType.CLOSING,
0,
null,
null,
null,
null,
null,
null,
null,
null,
null);
try {
listener.eventReceived(CuratorFrameworkImpl.this, event);
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
log.error("Exception while sending Closing event", e);
}
});
if (executorService != null) {
executorService.shutdownNow();
try {
executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
// Interrupted while interrupting; I give up.
Thread.currentThread().interrupt();
}
}
if (ensembleTracker != null) {
ensembleTracker.close();
}
// Operations are forbidden to queue after closing, but there are still other concurrent mutations,
// say, un-sleeping and not fully terminated background thread. So we have to drain the queue atomically
// to avoid duplicated close. But DelayQueue counts Delayed::getDelay, so we have to clear it up front.
backgroundOperations.forEach(OperationAndData::clearSleep);
Collection<OperationAndData<?>> droppedOperations = new ArrayList<>(backgroundOperations.size());
backgroundOperations.drainTo(droppedOperations);
droppedOperations.forEach(this::closeOperation);
listeners.clear();
unhandledErrorListeners.clear();
connectionStateManager.close();
client.close();
}
}
@Override
@Deprecated
public CuratorFramework nonNamespaceView() {
return usingNamespace(null);
}
@Override
public String getNamespace() {
String str = namespace.getNamespace();
return (str != null) ? str : "";
}
private void checkState() {
CuratorFrameworkState state = getState();
Preconditions.checkState(
state == CuratorFrameworkState.STARTED,
"Expected state [%s] was [%s]",
CuratorFrameworkState.STARTED,
state);
}
@Override
public CuratorFramework usingNamespace(String newNamespace) {
checkState();
return namespaceFacadeCache.get(newNamespace);
}
@Override
public CreateBuilder create() {
checkState();
return new CreateBuilderImpl(this);
}
@Override
public DeleteBuilder delete() {
checkState();
return new DeleteBuilderImpl(this);
}
@Override
public ExistsBuilder checkExists() {
checkState();
return new ExistsBuilderImpl(this);
}
@Override
public GetDataBuilder getData() {
checkState();
return new GetDataBuilderImpl(this);
}
@Override
public SetDataBuilder setData() {
checkState();
return new SetDataBuilderImpl(this);
}
@Override
public GetChildrenBuilder getChildren() {
checkState();
return new GetChildrenBuilderImpl(this);
}
@Override
public GetACLBuilder getACL() {
checkState();
return new GetACLBuilderImpl(this);
}
@Override
public SetACLBuilder setACL() {
checkState();
return new SetACLBuilderImpl(this);
}
@Override
public ReconfigBuilder reconfig() {
return new ReconfigBuilderImpl(this);
}
@Override
public GetConfigBuilder getConfig() {
return new GetConfigBuilderImpl(this);
}
@Override
public CuratorTransaction inTransaction() {
checkState();
return new CuratorTransactionImpl(this);
}
@Override
public CuratorMultiTransaction transaction() {
checkState();
return new CuratorMultiTransactionImpl(this);
}
@Override
public TransactionOp transactionOp() {
checkState();
return new TransactionOpImpl(this);
}
@Override
public Listenable<ConnectionStateListener> getConnectionStateListenable() {
return connectionStateManager.getListenable();
}
@Override
public Listenable<CuratorListener> getCuratorListenable() {
return listeners;
}
@Override
public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
return unhandledErrorListeners;
}
@Override
public void sync(String path, Object context) {
checkState();
path = fixForNamespace(path);
internalSync(this, path, context);
}
@Override
public SyncBuilder sync() {
return new SyncBuilderImpl(this);
}
@Override
public RemoveWatchesBuilder watches() {
return new RemoveWatchesBuilderImpl(this);
}
@Override
public WatchesBuilder watchers() {
Preconditions.checkState(
zookeeperCompatibility.hasPersistentWatchers(),
"watchers() is not supported in the ZooKeeper library and/or server being used. Use watches() instead.");
return new WatchesBuilderImpl(this);
}
protected void internalSync(CuratorFrameworkImpl impl, String path, Object context) {
BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
performBackgroundOperation(new OperationAndData<String>(operation, path, null, null, context, null));
}
@Override
public CuratorZookeeperClient getZookeeperClient() {
return client;
}
@Override
public ZookeeperCompatibility getZookeeperCompatibility() {
return zookeeperCompatibility;
}
@Override
public EnsurePath newNamespaceAwareEnsurePath(String path) {
return namespace.newNamespaceAwareEnsurePath(path);
}
@Override
public SchemaSet getSchemaSet() {
return schemaSet;
}
ACLProvider getAclProvider() {
return aclProvider;
}
FailedDeleteManager getFailedDeleteManager() {
return failedDeleteManager;
}
FailedRemoveWatchManager getFailedRemoveWatcherManager() {
return failedRemoveWatcherManager;
}
RetryLoop newRetryLoop() {
return client.newRetryLoop();
}
ZooKeeper getZooKeeper() throws Exception {
return client.getZooKeeper();
}
CompressionProvider getCompressionProvider() {
return compressionProvider;
}
boolean useContainerParentsIfAvailable() {
return useContainerParentsIfAvailable;
}
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event) {
boolean isInitialExecution = (event == null);
if (isInitialExecution) {
performBackgroundOperation(operationAndData);
return;
}
boolean doQueueOperation = false;
do {
KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
if ((code != KeeperException.Code.OK)
&& getZookeeperClient().getRetryPolicy().allowRetry(KeeperException.create(code))) {
doQueueOperation = checkBackgroundRetry(operationAndData, event);
break;
}
if (operationAndData.getCallback() != null) {
sendToBackgroundCallback(operationAndData, event);
break;
}
processEvent(event);
} while (false);
if (doQueueOperation) {
queueOperation(operationAndData);
}
}
private void abortOperation(OperationAndData<?> operation, Throwable e) {
if (operation.getCallback() == null) {
return;
}
CuratorEvent event;
if (e instanceof KeeperException) {
event = new CuratorEventImpl(
this,
operation.getEventType(),
((KeeperException) e).code().intValue(),
((KeeperException) e).getPath(),
null,
operation.getContext(),
null,
null,
null,
null,
null,
null);
} else if (getState() == CuratorFrameworkState.STARTED) {
event = new CuratorEventImpl(
this,
operation.getEventType(),
KeeperException.Code.SYSTEMERROR.intValue(),
null,
null,
operation.getContext(),
null,
null,
null,
null,
null,
null);
} else {
event = new CuratorEventImpl(
this,
operation.getEventType(),
KeeperException.Code.SESSIONEXPIRED.intValue(),
null,
null,
operation.getContext(),
null,
null,
null,
null,
null,
null);
}
sendToBackgroundCallback(operation, event);
}
private void closeOperation(OperationAndData<?> operation) {
if (operation.getCallback() == null) {
return;
}
CuratorEvent event = new CuratorEventImpl(
this,
operation.getEventType(),
KeeperException.Code.SESSIONEXPIRED.intValue(),
null,
null,
operation.getContext(),
null,
null,
null,
null,
null,
null);
sendToBackgroundCallback(operation, event);
}
private void requeueSleepOperation(OperationAndData<?> operationAndData) {
operationAndData.clearSleep();
synchronized (closeLock) {
if (getState() == CuratorFrameworkState.STARTED) {
if (backgroundOperations.remove(operationAndData)) {
// due to the internals of DelayQueue, operation must be removed/re-added so that re-sorting occurs
backgroundOperations.offer(operationAndData);
} // This operation has been taken over by background thread.
return;
}
}
// Sleeping operations are queued with delay, it could have been pulled out for execution or cancellation.
if (backgroundOperations.remove(operationAndData)) {
closeOperation(operationAndData);
}
}
/**
* @param operationAndData operation entry
* @return true if the operation was actually queued, false if not
*/
<DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData) {
synchronized (closeLock) {
if (getState() == CuratorFrameworkState.STARTED) {
backgroundOperations.offer(operationAndData);
return true;
}
}
closeOperation(operationAndData);
return false;
}
void logError(String reason, final Throwable e) {
if ((reason == null) || (reason.length() == 0)) {
reason = "n/a";
}
if (!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException)) {
if (e instanceof KeeperException.ConnectionLossException) {
if (LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false)) {
log.error(reason, e);
} else {
log.debug(reason, e);
}
} else {
log.error(reason, e);
}
}
final String localReason = reason;
unhandledErrorListeners.forEach(l -> l.unhandledError(localReason, e));
if (debugUnhandledErrorListener != null) {
debugUnhandledErrorListener.unhandledError(reason, e);
}
}
String unfixForNamespace(String path) {
return namespace.unfixForNamespace(path);
}
String fixForNamespace(String path) {
return namespace.fixForNamespace(path, false);
}
String fixForNamespace(String path, boolean isSequential) {
return namespace.fixForNamespace(path, isSequential);
}
byte[] getDefaultData() {
return defaultData;
}
NamespaceFacadeCache getNamespaceFacadeCache() {
return namespaceFacadeCache;
}
void validateConnection(Watcher.Event.KeeperState state) {
if (state == Watcher.Event.KeeperState.Disconnected) {
internalConnectionHandler.suspendConnection(this);
} else if (state == Watcher.Event.KeeperState.Expired) {
connectionStateManager.addStateChange(ConnectionState.LOST);
} else if (state == Watcher.Event.KeeperState.SyncConnected) {
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
unSleepBackgroundOperations();
} else if (state == Watcher.Event.KeeperState.ConnectedReadOnly) {
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
void checkInstanceIndex() {
long instanceIndex = client.getInstanceIndex();
long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
if ((newInstanceIndex >= 0)
&& (instanceIndex != newInstanceIndex)) // currentInstanceIndex is initially -1 - ignore this
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
}
Watcher.Event.KeeperState codeToState(KeeperException.Code code) {
switch (code) {
case AUTHFAILED:
case NOAUTH: {
return Watcher.Event.KeeperState.AuthFailed;
}
case CONNECTIONLOSS:
case OPERATIONTIMEOUT: {
return Watcher.Event.KeeperState.Disconnected;
}
case SESSIONEXPIRED: {
return Watcher.Event.KeeperState.Expired;
}
case OK:
case SESSIONMOVED: {
return Watcher.Event.KeeperState.SyncConnected;
}
}
return Watcher.Event.KeeperState.fromInt(-1);
}
WatcherRemovalManager getWatcherRemovalManager() {
return null;
}
boolean setToSuspended() {
return connectionStateManager.setToSuspended();
}
void addStateChange(ConnectionState newConnectionState) {
connectionStateManager.addStateChange(newConnectionState);
}
EnsembleTracker getEnsembleTracker() {
return ensembleTracker;
}
@VisibleForTesting
volatile CountDownLatch debugCheckBackgroundRetryLatch;
@VisibleForTesting
volatile CountDownLatch debugCheckBackgroundRetryReadyLatch;
@VisibleForTesting
volatile KeeperException.Code injectedCode;
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event) {
boolean doRetry = false;
if (client.getRetryPolicy()
.allowRetry(
operationAndData.getThenIncrementRetryCount(),
operationAndData.getElapsedTimeMs(),
operationAndData)) {
doRetry = true;
} else {
if (operationAndData.getErrorCallback() != null) {
operationAndData.getErrorCallback().retriesExhausted(operationAndData);
}
if (operationAndData.getCallback() != null) {
sendToBackgroundCallback(operationAndData, event);
}
KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
Exception e = null;
try {
e = (code != null) ? KeeperException.create(code) : null;
} catch (Throwable t) {
ThreadUtils.checkInterrupted(t);
}
if (e == null) {
e = new Exception("Unknown result codegetResultCode()");
}
if (debugCheckBackgroundRetryLatch != null) // scaffolding to test CURATOR-525
{
if (debugCheckBackgroundRetryReadyLatch != null) {
debugCheckBackgroundRetryReadyLatch.countDown();
}
try {
debugCheckBackgroundRetryLatch.await();
if (injectedCode != null) {
code = injectedCode;
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
validateConnection(codeToState(code));
logError("Background operation retry gave up", e);
}
return doRetry;
}
private <DATA_TYPE> void sendToBackgroundCallback(
OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event) {
try {
operationAndData.getCallback().processResult(this, event);
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
// This operation is already completed, and we don't retry a completed operation.
handleBackgroundOperationException(null, e);
}
}
private <DATA_TYPE> void handleBackgroundOperationException(
OperationAndData<DATA_TYPE> operationAndData, Throwable e) {
do {
if ((operationAndData != null)
&& getZookeeperClient().getRetryPolicy().allowRetry(e)) {
if (!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)) {
log.debug("Retry-able exception received", e);
}
if (client.getRetryPolicy()
.allowRetry(
operationAndData.getThenIncrementRetryCount(),
operationAndData.getElapsedTimeMs(),
operationAndData)) {
if (!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)) {
log.debug("Retrying operation");
}
queueOperation(operationAndData);
break;
} else {
if (!Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES)) {
log.debug("Retry policy did not allow retry");
}
if (operationAndData.getErrorCallback() != null) {
operationAndData.getErrorCallback().retriesExhausted(operationAndData);
}
}
}
if (operationAndData != null) {
abortOperation(operationAndData, e);
}
logError("Background exception was not retry-able or retry gave up", e);
} while (false);
}
private void backgroundOperationsLoop() {
try {
while (state.get() == CuratorFrameworkState.STARTED) {
OperationAndData<?> operationAndData;
try {
operationAndData = backgroundOperations.take();
if (debugListener != null) {
debugListener.listen(operationAndData);
}
performBackgroundOperation(operationAndData);
} catch (InterruptedException e) {
// swallow the interrupt as it's only possible from either a background
// operation and, thus, doesn't apply to this loop or the instance
// is being closed in which case the while test will get it
}
}
} finally {
log.info("backgroundOperationsLoop exiting");
}
}
void performBackgroundOperation(OperationAndData<?> operationAndData) {
try {
if (!operationAndData.isConnectionRequired() || client.isConnected()) {
operationAndData.callPerformBackgroundOperation();
return;
}
client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur
if (operationAndData.getElapsedTimeMs() < client.getConnectionTimeoutMs()) {
sleepAndQueueOperation(operationAndData);
return;
}
/*
* Fix edge case reported as CURATOR-52. Connection timeout is detected when the initial (or previously failed) connection
* cannot be re-established. This needs to be run through the retry policy and callbacks need to get invoked, etc.
*/
CuratorEvent event = new CuratorEventImpl(
this,
operationAndData.getEventType(),
KeeperException.Code.CONNECTIONLOSS.intValue(),
null,
null,
operationAndData.getContext(),
null,
null,
null,
null,
null,
null);
if (checkBackgroundRetry(operationAndData, event)) {
queueOperation(operationAndData);
} else {
logError("Background retry gave up", new CuratorConnectionLossException());
}
} catch (Throwable e) {
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(operationAndData, e);
}
}
@VisibleForTesting
volatile long sleepAndQueueOperationSeconds = 1;
private void sleepAndQueueOperation(OperationAndData<?> operationAndData) throws InterruptedException {
operationAndData.sleepFor(sleepAndQueueOperationSeconds, TimeUnit.SECONDS);
if (queueOperation(operationAndData)) {
forcedSleepOperations.add(operationAndData);
}
}
private void unSleepBackgroundOperations() {
Collection<OperationAndData<?>> drain = new ArrayList<>(forcedSleepOperations.size());
forcedSleepOperations.drainTo(drain);
log.debug("Clearing sleep for {} operations", drain.size());
drain.forEach(this::requeueSleepOperation);
}
private void processEvent(final CuratorEvent curatorEvent) {
if (curatorEvent.getType() == CuratorEventType.WATCHED) {
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(listener -> {
try {
OperationTrace trace = client.startAdvancedTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
} catch (Exception e) {
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
});
}
}