blob: 57af2127656e85eab5c46cc01343deb0d084d389 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.curator.framework.recipes.locks;
import com.google.common.base.Predicate;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import org.apache.curator.framework.CuratorFramework;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
/**
* <p>
* A re-entrant read/write mutex that works across JVMs. Uses Zookeeper to hold the lock. All processes
* in all JVMs that use the same lock path will achieve an inter-process critical section. Further, this mutex is
* "fair" - each user will get the mutex in the order requested (from ZK's point of view).
* </p>
*
* <p>
* A read write lock maintains a pair of associated locks, one for read-only operations and one
* for writing. The read lock may be held simultaneously by multiple reader processes, so long as
* there are no writers. The write lock is exclusive.
* </p>
*
* <p>
* <b>Reentrancy</b><br>
* This lock allows both readers and writers to reacquire read or write locks in the style of a
* re-entrant lock. Non-re-entrant readers are not allowed until all write locks held by the
* writing thread/process have been released. Additionally, a writer can acquire the read lock, but not
* vice-versa. If a reader tries to acquire the write lock it will never succeed.<br><br>
*
* <b>Lock downgrading</b><br>
* Re-entrancy also allows downgrading from the write lock to a read lock, by acquiring the write
* lock, then the read lock and then releasing the write lock. However, upgrading from a read
* lock to the write lock is not possible.
* </p>
*/
public class InterProcessReadWriteLock
{
private final InterProcessMutex readMutex;
private final InterProcessMutex writeMutex;
// must be the same length. LockInternals depends on it
private static final String READ_LOCK_NAME = "__READ__";
private static final String WRITE_LOCK_NAME = "__WRIT__";
private static class SortingLockInternalsDriver extends StandardLockInternalsDriver
{
@Override
public final String fixForSorting(String str, String lockName)
{
str = super.fixForSorting(str, READ_LOCK_NAME);
str = super.fixForSorting(str, WRITE_LOCK_NAME);
return str;
}
}
private static class InternalInterProcessMutex extends InterProcessMutex
{
private final String lockName;
private final byte[] lockData;
InternalInterProcessMutex(CuratorFramework client, String path, String lockName, byte[] lockData, int maxLeases, LockInternalsDriver driver)
{
super(client, path, lockName, maxLeases, driver);
this.lockName = lockName;
this.lockData = lockData;
}
@Override
public Collection<String> getParticipantNodes() throws Exception
{
Collection<String> nodes = super.getParticipantNodes();
Iterable<String> filtered = Iterables.filter
(
nodes,
new Predicate<String>()
{
@Override
public boolean apply(String node)
{
return node.contains(lockName);
}
}
);
return ImmutableList.copyOf(filtered);
}
@Override
protected byte[] getLockNodeBytes()
{
return lockData;
}
}
/**
* @param client the client
* @param basePath path to use for locking
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath)
{
this(client, basePath, null);
}
/**
* @param client the client
* @param basePath path to use for locking
* @param lockData the data to store in the lock nodes
*/
public InterProcessReadWriteLock(CuratorFramework client, String basePath, byte[] lockData)
{
lockData = (lockData == null) ? null : Arrays.copyOf(lockData, lockData.length);
writeMutex = new InternalInterProcessMutex
(
client,
basePath,
WRITE_LOCK_NAME,
lockData,
1,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return super.getsTheLock(client, children, sequenceNodeName, maxLeases);
}
}
);
readMutex = new InternalInterProcessMutex
(
client,
basePath,
READ_LOCK_NAME,
lockData,
Integer.MAX_VALUE,
new SortingLockInternalsDriver()
{
@Override
public PredicateResults getsTheLock(CuratorFramework client, List<String> children, String sequenceNodeName, int maxLeases) throws Exception
{
return readLockPredicate(children, sequenceNodeName);
}
}
);
}
/**
* Returns the lock used for reading.
*
* @return read lock
*/
public InterProcessMutex readLock()
{
return readMutex;
}
/**
* Returns the lock used for writing.
*
* @return write lock
*/
public InterProcessMutex writeLock()
{
return writeMutex;
}
private PredicateResults readLockPredicate(List<String> children, String sequenceNodeName) throws Exception
{
if ( writeMutex.isOwnedByCurrentThread() )
{
return new PredicateResults(null, true);
}
int index = 0;
int firstWriteIndex = Integer.MAX_VALUE;
int ourIndex = -1;
for ( String node : children )
{
if ( node.contains(WRITE_LOCK_NAME) )
{
firstWriteIndex = Math.min(index, firstWriteIndex);
}
else if ( node.startsWith(sequenceNodeName) )
{
ourIndex = index;
break;
}
++index;
}
StandardLockInternalsDriver.validateOurIndex(sequenceNodeName, ourIndex);
boolean getsTheLock = (ourIndex < firstWriteIndex);
String pathToWatch = getsTheLock ? null : children.get(firstWriteIndex);
return new PredicateResults(pathToWatch, getsTheLock);
}
}