blob: 7fa127519dcbe221420e9e30dee00d96557e7eda [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.curator.utils.ThreadUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.io.Closeable;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* <p>
* See {@link RetryLoop} for the main details on retry loops. <b>All Curator/ZooKeeper operations
* should be done in a retry loop.</b>
* </p>
*
* <p>
* The standard retry loop treats session failure as a type of connection failure. i.e. the fact
* that it is a session failure isn't considered. This can be problematic if you are performing
* a series of operations that rely on ephemeral nodes. If the session fails after the ephemeral
* node has been created, future Curator/ZooKeeper operations may succeed even though the
* ephemeral node has been removed by ZooKeeper.
* </p>
*
* <p>
* Here's an example:
* </p>
* <ul>
* <li>You create an ephemeral/sequential node as a kind of lock/marker</li>
* <li>You perform some other operations</li>
* <li>The session fails for some reason</li>
* <li>You attempt to create a node assuming that the lock/marker still exists
* <ul>
* <li>Curator will notice the session failure and try to reconnect</li>
* <li>In most cases, the reconnect will succeed and, thus, the node creation will succeed
* even though the ephemeral node will have been deleted by ZooKeeper.</li>
* </ul>
* </li>
* </ul>
*
* <p>
* The SessionFailRetryLoop prevents this type of scenario. When a session failure is detected,
* the thread is marked as failed which will cause all future Curator operations to fail. The
* SessionFailRetryLoop will then either retry the entire
* set of operations or fail (depending on {@link SessionFailRetryLoop.Mode})
* </p>
*
* Canonical usage:<br>
* <pre>
* SessionFailRetryLoop retryLoop = client.newSessionFailRetryLoop(mode);
* retryLoop.start();
* try
* {
* while ( retryLoop.shouldContinue() )
* {
* try
* {
* // do work
* }
* catch ( Exception e )
* {
* retryLoop.takeException(e);
* }
* }
* }
* finally
* {
* retryLoop.close();
* }
* </pre>
*/
public class SessionFailRetryLoop implements Closeable
{
private final CuratorZookeeperClient client;
private final Mode mode;
private final Thread ourThread = Thread.currentThread();
private final AtomicBoolean sessionHasFailed = new AtomicBoolean(false);
private final AtomicBoolean isDone = new AtomicBoolean(false);
private final RetryLoop retryLoop;
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
if ( event.getState() == Event.KeeperState.Expired )
{
sessionHasFailed.set(true);
failedSessionThreads.add(ourThread);
}
}
};
private static final Set<Thread> failedSessionThreads = Sets.newSetFromMap(Maps.<Thread, Boolean>newConcurrentMap());
public static class SessionFailedException extends Exception
{
private static final long serialVersionUID = 1L;
}
public enum Mode
{
/**
* If the session fails, retry the entire set of operations when {@link SessionFailRetryLoop#shouldContinue()}
* is called
*/
RETRY,
/**
* If the session fails, throw {@link KeeperException.SessionExpiredException} when
* {@link SessionFailRetryLoop#shouldContinue()} is called
*/
FAIL
}
/**
* Convenience utility: creates a "session fail" retry loop calling the given proc
*
* @param client Zookeeper
* @param mode how to handle session failures
* @param proc procedure to call with retry
* @param <T> return type
* @return procedure result
* @throws Exception any non-retriable errors
*/
public static<T> T callWithRetry(CuratorZookeeperClient client, Mode mode, Callable<T> proc) throws Exception
{
T result = null;
SessionFailRetryLoop retryLoop = client.newSessionFailRetryLoop(mode);
retryLoop.start();
try
{
while ( retryLoop.shouldContinue() )
{
try
{
result = proc.call();
}
catch ( Exception e )
{
ThreadUtils.checkInterrupted(e);
retryLoop.takeException(e);
}
}
}
finally
{
retryLoop.close();
}
return result;
}
SessionFailRetryLoop(CuratorZookeeperClient client, Mode mode)
{
this.client = client;
this.mode = mode;
retryLoop = client.newRetryLoop();
}
static boolean sessionForThreadHasFailed()
{
return (failedSessionThreads.size() > 0) && failedSessionThreads.contains(Thread.currentThread());
}
/**
* SessionFailRetryLoop must be started
*/
public void start()
{
Preconditions.checkState(Thread.currentThread().equals(ourThread), "Not in the correct thread");
client.addParentWatcher(watcher);
}
/**
* If true is returned, make an attempt at the set of operations
*
* @return true/false
*/
public boolean shouldContinue()
{
boolean localIsDone = isDone.getAndSet(true);
return !localIsDone;
}
/**
* Must be called in a finally handler when done with the loop
*/
@Override
public void close()
{
Preconditions.checkState(Thread.currentThread().equals(ourThread), "Not in the correct thread");
failedSessionThreads.remove(ourThread);
client.removeParentWatcher(watcher);
}
/**
* Pass any caught exceptions here
*
* @param exception the exception
* @throws Exception if not retry-able or the retry policy returned negative
*/
public void takeException(Exception exception) throws Exception
{
Preconditions.checkState(Thread.currentThread().equals(ourThread), "Not in the correct thread");
boolean passUp = true;
if ( sessionHasFailed.get() )
{
switch ( mode )
{
case RETRY:
{
sessionHasFailed.set(false);
failedSessionThreads.remove(ourThread);
if ( exception instanceof SessionFailedException )
{
isDone.set(false);
passUp = false;
}
break;
}
case FAIL:
{
break;
}
}
}
if ( passUp )
{
retryLoop.takeException(exception);
}
}
}