blob: e704f028e93cd9c6cdc747bbaf3c1ddb8416cbd3 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.imps;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.curator.CuratorConnectionLossException;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.RetryLoop;
import org.apache.curator.drivers.OperationTrace;
import org.apache.curator.framework.AuthInfo;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.*;
import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.listen.StandardListenerManager;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.framework.state.ConnectionStateManager;
import org.apache.curator.utils.Compatibility;
import org.apache.curator.utils.DebugUtils;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.utils.ThreadUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
public class CuratorFrameworkImpl implements CuratorFramework
{
private final Logger log = LoggerFactory.getLogger(getClass());
private final CuratorZookeeperClient client;
private final StandardListenerManager<CuratorListener> listeners;
private final StandardListenerManager<UnhandledErrorListener> unhandledErrorListeners;
private final ThreadFactory threadFactory;
private final int maxCloseWaitMs;
private final BlockingQueue<OperationAndData<?>> backgroundOperations;
private final BlockingQueue<OperationAndData<?>> forcedSleepOperations;
private final NamespaceImpl namespace;
private final ConnectionStateManager connectionStateManager;
private final List<AuthInfo> authInfos;
private final byte[] defaultData;
private final FailedDeleteManager failedDeleteManager;
private final FailedRemoveWatchManager failedRemoveWatcherManager;
private final CompressionProvider compressionProvider;
private final ACLProvider aclProvider;
private final NamespaceFacadeCache namespaceFacadeCache;
private final boolean useContainerParentsIfAvailable;
private final ConnectionStateErrorPolicy connectionStateErrorPolicy;
private final AtomicLong currentInstanceIndex = new AtomicLong(-1);
private final InternalConnectionHandler internalConnectionHandler;
private final EnsembleTracker ensembleTracker;
private final SchemaSet schemaSet;
private final Executor runSafeService;
private volatile ExecutorService executorService;
private final AtomicBoolean logAsErrorConnectionErrors = new AtomicBoolean(false);
private static final boolean LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL = !Boolean.getBoolean(DebugUtils.PROPERTY_LOG_ONLY_FIRST_CONNECTION_ISSUE_AS_ERROR_LEVEL);
interface DebugBackgroundListener
{
void listen(OperationAndData<?> data);
}
volatile DebugBackgroundListener debugListener = null;
@VisibleForTesting
public volatile UnhandledErrorListener debugUnhandledErrorListener = null;
private final AtomicReference<CuratorFrameworkState> state;
public CuratorFrameworkImpl(CuratorFrameworkFactory.Builder builder)
{
ZookeeperFactory localZookeeperFactory = makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient
(
localZookeeperFactory,
builder.getEnsembleProvider(),
builder.getSessionTimeoutMs(),
builder.getConnectionTimeoutMs(),
builder.getWaitForShutdownTimeoutMs(),
new Watcher()
{
@Override
public void process(WatchedEvent watchedEvent)
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), unfixForNamespace(watchedEvent.getPath()), null, null, null, null, null, watchedEvent, null, null);
processEvent(event);
}
},
builder.getRetryPolicy(),
builder.canBeReadOnly()
);
internalConnectionHandler = new StandardInternalConnectionHandler();
listeners = StandardListenerManager.standard();
unhandledErrorListeners = StandardListenerManager.standard();
backgroundOperations = new DelayQueue<OperationAndData<?>>();
forcedSleepOperations = new LinkedBlockingQueue<>();
namespace = new NamespaceImpl(this, builder.getNamespace());
threadFactory = getThreadFactory(builder);
maxCloseWaitMs = builder.getMaxCloseWaitMs();
connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory(), builder.getSessionTimeoutMs(), builder.getSimulatedSessionExpirationPercent(), builder.getConnectionStateListenerManagerFactory());
compressionProvider = builder.getCompressionProvider();
aclProvider = builder.getAclProvider();
state = new AtomicReference<CuratorFrameworkState>(CuratorFrameworkState.LATENT);
useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
connectionStateErrorPolicy = Preconditions.checkNotNull(builder.getConnectionStateErrorPolicy(), "errorPolicy cannot be null");
schemaSet = Preconditions.checkNotNull(builder.getSchemaSet(), "schemaSet cannot be null");
byte[] builderDefaultData = builder.getDefaultData();
defaultData = (builderDefaultData != null) ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
authInfos = buildAuths(builder);
failedDeleteManager = new FailedDeleteManager(this);
failedRemoveWatcherManager = new FailedRemoveWatchManager(this);
namespaceFacadeCache = new NamespaceFacadeCache(this);
ensembleTracker = new EnsembleTracker(this, builder.getEnsembleProvider());
runSafeService = makeRunSafeService(builder);
}
private Executor makeRunSafeService(CuratorFrameworkFactory.Builder builder)
{
if ( builder.getRunSafeService() != null )
{
return builder.getRunSafeService();
}
ThreadFactory threadFactory = builder.getThreadFactory();
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("SafeNotifyService");
}
return Executors.newSingleThreadExecutor(threadFactory);
}
private List<AuthInfo> buildAuths(CuratorFrameworkFactory.Builder builder)
{
ImmutableList.Builder<AuthInfo> builder1 = ImmutableList.builder();
if ( builder.getAuthInfos() != null )
{
builder1.addAll(builder.getAuthInfos());
}
return builder1.build();
}
@Override
public CompletableFuture<Void> runSafe(Runnable runnable)
{
return CompletableFuture.runAsync(runnable, runSafeService);
}
@Override
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework()
{
return new WatcherRemovalFacade(this);
}
@Override
public QuorumVerifier getCurrentConfig()
{
return (ensembleTracker != null) ? ensembleTracker.getCurrentConfig() : null;
}
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
{
return new ZookeeperFactory()
{
@Override
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
{
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
for ( AuthInfo auth : authInfos )
{
zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
}
return zooKeeper;
}
};
}
private ThreadFactory getThreadFactory(CuratorFrameworkFactory.Builder builder)
{
ThreadFactory threadFactory = builder.getThreadFactory();
if ( threadFactory == null )
{
threadFactory = ThreadUtils.newThreadFactory("Framework");
}
return threadFactory;
}
protected CuratorFrameworkImpl(CuratorFrameworkImpl parent)
{
client = parent.client;
listeners = parent.listeners;
unhandledErrorListeners = parent.unhandledErrorListeners;
threadFactory = parent.threadFactory;
maxCloseWaitMs = parent.maxCloseWaitMs;
backgroundOperations = parent.backgroundOperations;
forcedSleepOperations = parent.forcedSleepOperations;
connectionStateManager = parent.connectionStateManager;
defaultData = parent.defaultData;
failedDeleteManager = parent.failedDeleteManager;
failedRemoveWatcherManager = parent.failedRemoveWatcherManager;
compressionProvider = parent.compressionProvider;
aclProvider = parent.aclProvider;
namespaceFacadeCache = parent.namespaceFacadeCache;
namespace = new NamespaceImpl(this, null);
state = parent.state;
authInfos = parent.authInfos;
useContainerParentsIfAvailable = parent.useContainerParentsIfAvailable;
connectionStateErrorPolicy = parent.connectionStateErrorPolicy;
internalConnectionHandler = parent.internalConnectionHandler;
schemaSet = parent.schemaSet;
ensembleTracker = null;
runSafeService = parent.runSafeService;
}
@Override
public void createContainers(String path) throws Exception
{
checkExists().creatingParentContainersIfNeeded().forPath(ZKPaths.makePath(path, "foo"));
}
@Override
public void clearWatcherReferences(Watcher watcher)
{
// NOP
}
@Override
public CuratorFrameworkState getState()
{
return state.get();
}
@Override
@Deprecated
public boolean isStarted()
{
return state.get() == CuratorFrameworkState.STARTED;
}
@Override
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units) throws InterruptedException
{
return connectionStateManager.blockUntilConnected(maxWaitTime, units);
}
@Override
public void blockUntilConnected() throws InterruptedException
{
blockUntilConnected(0, null);
}
@Override
public ConnectionStateErrorPolicy getConnectionStateErrorPolicy()
{
return connectionStateErrorPolicy;
}
@Override
public void start()
{
log.info("Starting");
if ( !state.compareAndSet(CuratorFrameworkState.LATENT, CuratorFrameworkState.STARTED) )
{
throw new IllegalStateException("Cannot be started more than once");
}
try
{
connectionStateManager.start(); // ordering dependency - must be called before client.start()
final ConnectionStateListener listener = new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
if ( ConnectionState.CONNECTED == newState || ConnectionState.RECONNECTED == newState )
{
logAsErrorConnectionErrors.set(true);
}
}
@Override
public boolean doNotProxy()
{
return true;
}
};
this.getConnectionStateListenable().addListener(listener);
client.start();
executorService = Executors.newSingleThreadScheduledExecutor(threadFactory);
executorService.submit(new Callable<Object>()
{
@Override
public Object call() throws Exception
{
backgroundOperationsLoop();
return null;
}
});
if ( ensembleTracker != null )
{
ensembleTracker.start();
}
log.info(schemaSet.toDocumentation());
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(null, e);
}
}
@Override
public void close()
{
log.debug("Closing");
if ( state.compareAndSet(CuratorFrameworkState.STARTED, CuratorFrameworkState.STOPPED) )
{
listeners.forEach(listener ->
{
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.CLOSING, 0, null, null, null, null, null, null, null, null, null);
try
{
listener.eventReceived(CuratorFrameworkImpl.this, event);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
log.error("Exception while sending Closing event", e);
}
});
if ( executorService != null )
{
executorService.shutdownNow();
try
{
executorService.awaitTermination(maxCloseWaitMs, TimeUnit.MILLISECONDS);
}
catch ( InterruptedException e )
{
// Interrupted while interrupting; I give up.
Thread.currentThread().interrupt();
}
}
if ( ensembleTracker != null )
{
ensembleTracker.close();
}
listeners.clear();
unhandledErrorListeners.clear();
connectionStateManager.close();
client.close();
}
}
@Override
@Deprecated
public CuratorFramework nonNamespaceView()
{
return usingNamespace(null);
}
@Override
public String getNamespace()
{
String str = namespace.getNamespace();
return (str != null) ? str : "";
}
private void checkState()
{
CuratorFrameworkState state = getState();
Preconditions.checkState(state == CuratorFrameworkState.STARTED, "Expected state [%s] was [%s]", CuratorFrameworkState.STARTED, state);
}
@Override
public CuratorFramework usingNamespace(String newNamespace)
{
checkState();
return namespaceFacadeCache.get(newNamespace);
}
@Override
public CreateBuilder create()
{
checkState();
return new CreateBuilderImpl(this);
}
@Override
public DeleteBuilder delete()
{
checkState();
return new DeleteBuilderImpl(this);
}
@Override
public ExistsBuilder checkExists()
{
checkState();
return new ExistsBuilderImpl(this);
}
@Override
public GetDataBuilder getData()
{
checkState();
return new GetDataBuilderImpl(this);
}
@Override
public SetDataBuilder setData()
{
checkState();
return new SetDataBuilderImpl(this);
}
@Override
public GetChildrenBuilder getChildren()
{
checkState();
return new GetChildrenBuilderImpl(this);
}
@Override
public GetACLBuilder getACL()
{
checkState();
return new GetACLBuilderImpl(this);
}
@Override
public SetACLBuilder setACL()
{
checkState();
return new SetACLBuilderImpl(this);
}
@Override
public ReconfigBuilder reconfig()
{
return new ReconfigBuilderImpl(this);
}
@Override
public GetConfigBuilder getConfig()
{
return new GetConfigBuilderImpl(this);
}
@Override
public CuratorTransaction inTransaction()
{
checkState();
return new CuratorTransactionImpl(this);
}
@Override
public CuratorMultiTransaction transaction()
{
checkState();
return new CuratorMultiTransactionImpl(this);
}
@Override
public TransactionOp transactionOp()
{
checkState();
return new TransactionOpImpl(this);
}
@Override
public Listenable<ConnectionStateListener> getConnectionStateListenable()
{
return connectionStateManager.getListenable();
}
@Override
public Listenable<CuratorListener> getCuratorListenable()
{
return listeners;
}
@Override
public Listenable<UnhandledErrorListener> getUnhandledErrorListenable()
{
return unhandledErrorListeners;
}
@Override
public void sync(String path, Object context)
{
checkState();
path = fixForNamespace(path);
internalSync(this, path, context);
}
@Override
public SyncBuilder sync()
{
return new SyncBuilderImpl(this);
}
@Override
public RemoveWatchesBuilder watches()
{
return new RemoveWatchesBuilderImpl(this);
}
@Override
public WatchesBuilder watchers()
{
Preconditions.checkState(Compatibility.hasPersistentWatchers(), "watchers() is not supported in the ZooKeeper library being used. Use watches() instead.");
return new WatchesBuilderImpl(this);
}
protected void internalSync(CuratorFrameworkImpl impl, String path, Object context)
{
BackgroundOperation<String> operation = new BackgroundSyncImpl(impl, context);
performBackgroundOperation(new OperationAndData<String>(operation, path, null, null, context, null));
}
@Override
public CuratorZookeeperClient getZookeeperClient()
{
return client;
}
@Override
public EnsurePath newNamespaceAwareEnsurePath(String path)
{
return namespace.newNamespaceAwareEnsurePath(path);
}
@Override
public SchemaSet getSchemaSet()
{
return schemaSet;
}
ACLProvider getAclProvider()
{
return aclProvider;
}
FailedDeleteManager getFailedDeleteManager()
{
return failedDeleteManager;
}
FailedRemoveWatchManager getFailedRemoveWatcherManager()
{
return failedRemoveWatcherManager;
}
RetryLoop newRetryLoop()
{
return client.newRetryLoop();
}
ZooKeeper getZooKeeper() throws Exception
{
return client.getZooKeeper();
}
CompressionProvider getCompressionProvider()
{
return compressionProvider;
}
boolean useContainerParentsIfAvailable()
{
return useContainerParentsIfAvailable;
}
<DATA_TYPE> void processBackgroundOperation(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
boolean isInitialExecution = (event == null);
if ( isInitialExecution )
{
performBackgroundOperation(operationAndData);
return;
}
boolean doQueueOperation = false;
do
{
final KeeperException ke = KeeperException.create(event.getResultCode());
if ( getZookeeperClient().getRetryPolicy().allowRetry(ke) )
{
doQueueOperation = checkBackgroundRetry(operationAndData, event);
break;
}
if ( operationAndData.getCallback() != null )
{
sendToBackgroundCallback(operationAndData, event);
break;
}
processEvent(event);
}
while ( false );
if ( doQueueOperation )
{
queueOperation(operationAndData);
}
}
/**
* @param operationAndData operation entry
* @return true if the operation was actually queued, false if not
*/
<DATA_TYPE> boolean queueOperation(OperationAndData<DATA_TYPE> operationAndData)
{
if ( getState() == CuratorFrameworkState.STARTED )
{
backgroundOperations.offer(operationAndData);
return true;
}
return false;
}
void logError(String reason, final Throwable e)
{
if ( (reason == null) || (reason.length() == 0) )
{
reason = "n/a";
}
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) || !(e instanceof KeeperException) )
{
if ( e instanceof KeeperException.ConnectionLossException )
{
if ( LOG_ALL_CONNECTION_ISSUES_AS_ERROR_LEVEL || logAsErrorConnectionErrors.compareAndSet(true, false) )
{
log.error(reason, e);
}
else
{
log.debug(reason, e);
}
}
else
{
log.error(reason, e);
}
}
final String localReason = reason;
unhandledErrorListeners.forEach(l -> l.unhandledError(localReason, e));
if ( debugUnhandledErrorListener != null )
{
debugUnhandledErrorListener.unhandledError(reason, e);
}
}
String unfixForNamespace(String path)
{
return namespace.unfixForNamespace(path);
}
String fixForNamespace(String path)
{
return namespace.fixForNamespace(path, false);
}
String fixForNamespace(String path, boolean isSequential)
{
return namespace.fixForNamespace(path, isSequential);
}
byte[] getDefaultData()
{
return defaultData;
}
NamespaceFacadeCache getNamespaceFacadeCache()
{
return namespaceFacadeCache;
}
void validateConnection(Watcher.Event.KeeperState state)
{
if ( state == Watcher.Event.KeeperState.Disconnected )
{
internalConnectionHandler.suspendConnection(this);
}
else if ( state == Watcher.Event.KeeperState.Expired )
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
else if ( state == Watcher.Event.KeeperState.SyncConnected )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.RECONNECTED);
unSleepBackgroundOperations();
}
else if ( state == Watcher.Event.KeeperState.ConnectedReadOnly )
{
internalConnectionHandler.checkNewConnection(this);
connectionStateManager.addStateChange(ConnectionState.READ_ONLY);
}
}
void checkInstanceIndex()
{
long instanceIndex = client.getInstanceIndex();
long newInstanceIndex = currentInstanceIndex.getAndSet(instanceIndex);
if ( (newInstanceIndex >= 0) && (instanceIndex != newInstanceIndex) ) // currentInstanceIndex is initially -1 - ignore this
{
connectionStateManager.addStateChange(ConnectionState.LOST);
}
}
Watcher.Event.KeeperState codeToState(KeeperException.Code code)
{
switch ( code )
{
case AUTHFAILED:
case NOAUTH:
{
return Watcher.Event.KeeperState.AuthFailed;
}
case CONNECTIONLOSS:
case OPERATIONTIMEOUT:
{
return Watcher.Event.KeeperState.Disconnected;
}
case SESSIONEXPIRED:
{
return Watcher.Event.KeeperState.Expired;
}
case OK:
case SESSIONMOVED:
{
return Watcher.Event.KeeperState.SyncConnected;
}
}
return Watcher.Event.KeeperState.fromInt(-1);
}
WatcherRemovalManager getWatcherRemovalManager()
{
return null;
}
boolean setToSuspended()
{
return connectionStateManager.setToSuspended();
}
void addStateChange(ConnectionState newConnectionState)
{
connectionStateManager.addStateChange(newConnectionState);
}
EnsembleTracker getEnsembleTracker()
{
return ensembleTracker;
}
@VisibleForTesting
volatile CountDownLatch debugCheckBackgroundRetryLatch;
@VisibleForTesting
volatile CountDownLatch debugCheckBackgroundRetryReadyLatch;
@VisibleForTesting
volatile KeeperException.Code injectedCode;
@SuppressWarnings({"ThrowableResultOfMethodCallIgnored"})
private <DATA_TYPE> boolean checkBackgroundRetry(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
boolean doRetry = false;
if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
{
doRetry = true;
}
else
{
if ( operationAndData.getErrorCallback() != null )
{
operationAndData.getErrorCallback().retriesExhausted(operationAndData);
}
if ( operationAndData.getCallback() != null )
{
sendToBackgroundCallback(operationAndData, event);
}
KeeperException.Code code = KeeperException.Code.get(event.getResultCode());
Exception e = null;
try
{
e = (code != null) ? KeeperException.create(code) : null;
}
catch ( Throwable t )
{
ThreadUtils.checkInterrupted(t);
}
if ( e == null )
{
e = new Exception("Unknown result codegetResultCode()");
}
if ( debugCheckBackgroundRetryLatch != null ) // scaffolding to test CURATOR-525
{
if ( debugCheckBackgroundRetryReadyLatch != null )
{
debugCheckBackgroundRetryReadyLatch.countDown();
}
try
{
debugCheckBackgroundRetryLatch.await();
if (injectedCode != null)
{
code = injectedCode;
}
}
catch ( InterruptedException ex )
{
Thread.currentThread().interrupt();
}
}
validateConnection(codeToState(code));
logError("Background operation retry gave up", e);
}
return doRetry;
}
private <DATA_TYPE> void sendToBackgroundCallback(OperationAndData<DATA_TYPE> operationAndData, CuratorEvent event)
{
try
{
operationAndData.getCallback().processResult(this, event);
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
handleBackgroundOperationException(operationAndData, e);
}
}
private <DATA_TYPE> void handleBackgroundOperationException(OperationAndData<DATA_TYPE> operationAndData, Throwable e)
{
do
{
if ( (operationAndData != null) && getZookeeperClient().getRetryPolicy().allowRetry(e) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retry-able exception received", e);
}
if ( client.getRetryPolicy().allowRetry(operationAndData.getThenIncrementRetryCount(), operationAndData.getElapsedTimeMs(), operationAndData) )
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retrying operation");
}
backgroundOperations.offer(operationAndData);
break;
}
else
{
if ( !Boolean.getBoolean(DebugUtils.PROPERTY_DONT_LOG_CONNECTION_ISSUES) )
{
log.debug("Retry policy did not allow retry");
}
if ( operationAndData.getErrorCallback() != null )
{
operationAndData.getErrorCallback().retriesExhausted(operationAndData);
}
}
}
logError("Background exception was not retry-able or retry gave up", e);
}
while ( false );
}
private void backgroundOperationsLoop()
{
try
{
while ( state.get() == CuratorFrameworkState.STARTED )
{
OperationAndData<?> operationAndData;
try
{
operationAndData = backgroundOperations.take();
if ( debugListener != null )
{
debugListener.listen(operationAndData);
}
performBackgroundOperation(operationAndData);
}
catch ( InterruptedException e )
{
// swallow the interrupt as it's only possible from either a background
// operation and, thus, doesn't apply to this loop or the instance
// is being closed in which case the while test will get it
}
}
}
finally
{
log.info("backgroundOperationsLoop exiting");
}
}
void performBackgroundOperation(OperationAndData<?> operationAndData)
{
try
{
if ( !operationAndData.isConnectionRequired() || client.isConnected() )
{
operationAndData.callPerformBackgroundOperation();
}
else
{
client.getZooKeeper(); // important - allow connection resets, timeouts, etc. to occur
if ( operationAndData.getElapsedTimeMs() >= client.getConnectionTimeoutMs() )
{
throw new CuratorConnectionLossException();
}
sleepAndQueueOperation(operationAndData);
}
}
catch ( Throwable e )
{
ThreadUtils.checkInterrupted(e);
/**
* Fix edge case reported as CURATOR-52. ConnectionState.checkTimeouts() throws KeeperException.ConnectionLossException
* when the initial (or previously failed) connection cannot be re-established. This needs to be run through the retry policy
* and callbacks need to get invoked, etc.
*/
if ( e instanceof CuratorConnectionLossException )
{
WatchedEvent watchedEvent = new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.Disconnected, null);
CuratorEvent event = new CuratorEventImpl(this, CuratorEventType.WATCHED, KeeperException.Code.CONNECTIONLOSS.intValue(), null, null, operationAndData.getContext(), null, null, null, watchedEvent, null, null);
if ( checkBackgroundRetry(operationAndData, event) )
{
queueOperation(operationAndData);
}
else
{
logError("Background retry gave up", e);
}
}
else
{
handleBackgroundOperationException(operationAndData, e);
}
}
}
@VisibleForTesting
volatile long sleepAndQueueOperationSeconds = 1;
private void sleepAndQueueOperation(OperationAndData<?> operationAndData) throws InterruptedException
{
operationAndData.sleepFor(sleepAndQueueOperationSeconds, TimeUnit.SECONDS);
if ( queueOperation(operationAndData) )
{
forcedSleepOperations.add(operationAndData);
}
}
private void unSleepBackgroundOperations()
{
Collection<OperationAndData<?>> drain = new ArrayList<>(forcedSleepOperations.size());
forcedSleepOperations.drainTo(drain);
log.debug("Clearing sleep for {} operations", drain.size());
for ( OperationAndData<?> operation : drain )
{
operation.clearSleep();
if ( backgroundOperations.remove(operation) ) // due to the internals of DelayQueue, operation must be removed/re-added so that re-sorting occurs
{
backgroundOperations.offer(operation);
}
}
}
private void processEvent(final CuratorEvent curatorEvent)
{
if ( curatorEvent.getType() == CuratorEventType.WATCHED )
{
validateConnection(curatorEvent.getWatchedEvent().getState());
}
listeners.forEach(listener ->
{
try
{
OperationTrace trace = client.startAdvancedTracer("EventListener");
listener.eventReceived(CuratorFrameworkImpl.this, curatorEvent);
trace.commit();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
logError("Event listener threw exception", e);
}
});
}
}