blob: 4b0da11d9437c3590158e8601eb309c3819481d4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Function;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import org.apache.curator.RetryLoop;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.utils.PathUtils;
import org.apache.curator.utils.ZKPaths;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class LockInternals
{
private final WatcherRemoveCuratorFramework client;
private final String path;
private final String basePath;
private final LockInternalsDriver driver;
private final String lockName;
private final AtomicReference<RevocationSpec> revocable = new AtomicReference<RevocationSpec>(null);
private final CuratorWatcher revocableWatcher = new CuratorWatcher()
{
@Override
public void process(WatchedEvent event) throws Exception
{
if ( event.getType() == Watcher.Event.EventType.NodeDataChanged )
{
checkRevocableWatcher(event.getPath());
}
}
};
private final Watcher watcher = new Watcher()
{
@Override
public void process(WatchedEvent event)
{
notifyFromWatcher();
}
};
private volatile int maxLeases;
static final byte[] REVOKE_MESSAGE = "__REVOKE__".getBytes();
/**
* Attempt to delete the lock node so that sequence numbers get reset
*
* @throws Exception errors
*/
public void clean() throws Exception
{
try
{
client.delete().forPath(basePath);
}
catch ( KeeperException.BadVersionException ignore )
{
// ignore - another thread/process got the lock
}
catch ( KeeperException.NotEmptyException ignore )
{
// ignore - other threads/processes are waiting
}
}
LockInternals(CuratorFramework client, LockInternalsDriver driver, String path, String lockName, int maxLeases)
{
this.driver = driver;
this.lockName = lockName;
this.maxLeases = maxLeases;
this.client = client.newWatcherRemoveCuratorFramework();
this.basePath = PathUtils.validatePath(path);
this.path = ZKPaths.makePath(path, lockName);
}
synchronized void setMaxLeases(int maxLeases)
{
this.maxLeases = maxLeases;
notifyAll();
}
void makeRevocable(RevocationSpec entry)
{
revocable.set(entry);
}
final void releaseLock(String lockPath) throws Exception
{
client.removeWatchers();
revocable.set(null);
deleteOurPath(lockPath);
}
CuratorFramework getClient()
{
return client;
}
public static Collection<String> getParticipantNodes(CuratorFramework client, final String basePath, String lockName, LockInternalsSorter sorter) throws Exception
{
List<String> names = getSortedChildren(client, basePath, lockName, sorter);
Iterable<String> transformed = Iterables.transform
(
names,
new Function<String, String>()
{
@Override
public String apply(String name)
{
return ZKPaths.makePath(basePath, name);
}
}
);
return ImmutableList.copyOf(transformed);
}
public static List<String> getSortedChildren(CuratorFramework client, String basePath, final String lockName, final LockInternalsSorter sorter) throws Exception
{
List<String> children = client.getChildren().forPath(basePath);
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
public static List<String> getSortedChildren(final String lockName, final LockInternalsSorter sorter, List<String> children)
{
List<String> sortedList = Lists.newArrayList(children);
Collections.sort
(
sortedList,
new Comparator<String>()
{
@Override
public int compare(String lhs, String rhs)
{
return sorter.fixForSorting(lhs, lockName).compareTo(sorter.fixForSorting(rhs, lockName));
}
}
);
return sortedList;
}
List<String> getSortedChildren() throws Exception
{
return getSortedChildren(client, basePath, lockName, driver);
}
String getLockName()
{
return lockName;
}
LockInternalsDriver getDriver()
{
return driver;
}
String attemptLock(long time, TimeUnit unit, byte[] lockNodeBytes) throws Exception
{
final long startMillis = System.currentTimeMillis();
final Long millisToWait = (unit != null) ? unit.toMillis(time) : null;
final byte[] localLockNodeBytes = (revocable.get() != null) ? new byte[0] : lockNodeBytes;
int retryCount = 0;
String ourPath = null;
boolean hasTheLock = false;
boolean isDone = false;
while ( !isDone )
{
isDone = true;
try
{
ourPath = driver.createsTheLock(client, path, localLockNodeBytes);
hasTheLock = internalLockLoop(startMillis, millisToWait, ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// gets thrown by StandardLockInternalsDriver when it can't find the lock node
// this can happen when the session expires, etc. So, if the retry allows, just try it all again
if ( client.getZookeeperClient().getRetryPolicy().allowRetry(retryCount++, System.currentTimeMillis() - startMillis, RetryLoop.getDefaultRetrySleeper()) )
{
isDone = false;
}
else
{
throw e;
}
}
}
if ( hasTheLock )
{
return ourPath;
}
return null;
}
private void checkRevocableWatcher(String path) throws Exception
{
RevocationSpec entry = revocable.get();
if ( entry != null )
{
try
{
byte[] bytes = client.getData().usingWatcher(revocableWatcher).forPath(path);
if ( Arrays.equals(bytes, REVOKE_MESSAGE) )
{
entry.getExecutor().execute(entry.getRunnable());
}
}
catch ( KeeperException.NoNodeException ignore )
{
// ignore
}
}
}
private boolean internalLockLoop(long startMillis, Long millisToWait, String ourPath) throws Exception
{
boolean haveTheLock = false;
boolean doDelete = false;
try
{
if ( revocable.get() != null )
{
client.getData().usingWatcher(revocableWatcher).forPath(ourPath);
}
while ( (client.getState() == CuratorFrameworkState.STARTED) && !haveTheLock )
{
List<String> children = getSortedChildren();
String sequenceNodeName = ourPath.substring(basePath.length() + 1); // +1 to include the slash
PredicateResults predicateResults = driver.getsTheLock(client, children, sequenceNodeName, maxLeases);
if ( predicateResults.getsTheLock() )
{
haveTheLock = true;
}
else
{
String previousSequencePath = basePath + "/" + predicateResults.getPathToWatch();
synchronized(this)
{
try
{
// use getData() instead of exists() to avoid leaving unneeded watchers which is a type of resource leak
client.getData().usingWatcher(watcher).forPath(previousSequencePath);
if ( millisToWait != null )
{
millisToWait -= (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if ( millisToWait <= 0 )
{
doDelete = true; // timed out - delete our node
break;
}
wait(millisToWait);
}
else
{
wait();
}
}
catch ( KeeperException.NoNodeException e )
{
// it has been deleted (i.e. lock released). Try to acquire again
}
}
}
}
}
catch ( Exception e )
{
doDelete = true;
throw e;
}
finally
{
if ( doDelete )
{
deleteOurPath(ourPath);
}
}
return haveTheLock;
}
private void deleteOurPath(String ourPath) throws Exception
{
try
{
client.delete().guaranteed().forPath(ourPath);
}
catch ( KeeperException.NoNodeException e )
{
// ignore - already deleted (possibly expired session, etc.)
}
}
private synchronized void notifyFromWatcher()
{
notifyAll();
}
}