blob: 60b213c6a7618a997b2b26954c782ff60be6e1e7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.oss.sync;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Implementation based on standard Java concurrency primitives. This makes it
* useless for anything but testing, development, and maybe a single-node HBase
* instance. It is however much faster and easier to debug, despite including
* logic very similar to what other implementations would need.
*
* Can be enabled in JUnit tests with -Plocal (although it's the default).
*/
public class LocalTreeLockManager extends TreeLockManager {
private static final Logger LOG =
LoggerFactory.getLogger(LocalTreeLockManager.class);
public void initialize(FileSystem fs) {
this.fs = fs;
}
@Override
protected void writeLock(Path p) throws IOException {
createLocksIfNeeded(p);
index.get(p).lock.writeLock().lock();
}
@Override
protected void writeUnlock(Path p) throws IOException {
try {
LockNode node = index.get(p);
// Node to unlock may already be gone after deletes
if (node != null) {
node.lock.writeLock().unlock();
}
} catch(IllegalMonitorStateException e) {
// Reentrant locks might be acquired multiple times
LOG.error("Tried to release unacquired write lock: {}", p);
throw e;
}
}
@Override
protected void readLock(Path p) throws IOException {
createLocksIfNeeded(p);
index.get(p).lock.readLock().lock();
}
@Override
protected void readUnlock(Path p) throws IOException {
try {
index.get(p).lock.readLock().unlock();
} catch(IllegalMonitorStateException e) {
// Reentrant locks might be acquired multiple times
LOG.error("Tried to release unacquired read lock: {}", p);
throw e;
}
}
@Override
protected boolean writeLockAbove(Path p) throws IOException {
createLocksIfNeeded(p);
while (!p.isRoot()) {
p = p.getParent();
LockNode currentNode = index.get(p);
if (currentNode.lock.isWriteLocked() &&
!currentNode.lock.isWriteLockedByCurrentThread()) {
LOG.warn("Parent write lock currently held: {}", p);
return true;
}
}
return false;
}
@Override
protected boolean writeLockBelow(Path p) throws IOException {
createLocksIfNeeded(p);
return writeLockBelow(p, true);
}
@Override
protected boolean readLockBelow(Path p) throws IOException {
createLocksIfNeeded(p);
return readLockBelow(p, true);
}
@Override
protected void recursiveDelete(Path p) {
if (!p.isRoot()) {
index.get(p.getParent()).children.remove(p);
}
LockNode currentNode = index.get(p);
innerRecursiveDelete(p);
currentNode.lock.writeLock().unlock();
}
private void innerRecursiveDelete(Path p) {
LockNode currentNode = index.remove(p);
Set<Path> childPaths = currentNode.children.keySet();
for (Path child : childPaths) {
innerRecursiveDelete(child);
}
}
private class LockNode {
public Path path;
public LockNode(Path path) {
this.path = path;
}
public Map<Path, LockNode> children = new HashMap<>();
public ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
}
private Map<Path, LockNode> index = new HashMap<>();
private synchronized void createLocksIfNeeded(Path p) {
if (index.containsKey(p)) {
return;
}
Path lastPath = null;
while (p != null) {
LockNode currentNode = index.get(p);
if (currentNode != null) {
if (lastPath != null) {
currentNode.children.put(lastPath, index.get(lastPath));
}
return;
}
currentNode = new LockNode(p);
if (lastPath != null) {
currentNode.children.put(lastPath, index.get(lastPath));
}
index.put(p, currentNode);
lastPath = p;
p = p.getParent();
}
}
private synchronized boolean writeLockBelow(Path p, boolean firstLevel) {
LockNode currentNode = index.get(p);
if (!firstLevel && currentNode.lock.isWriteLocked() &&
!currentNode.lock.isWriteLockedByCurrentThread()) {
LOG.warn("Child write lock current held: {}", p);
return true;
}
Set<Path> childPaths = currentNode.children.keySet();
for (Path child : childPaths) {
if (writeLockBelow(child, false)) {
return true;
}
}
return false;
}
// TODO will return true even if current thread has a read lock below...
private synchronized boolean readLockBelow(Path p, boolean firstLevel) {
LockNode currentNode = index.get(p);
if (!firstLevel && currentNode.lock.getReadLockCount() > 0) {
LOG.warn("Child read lock currently held: {}", p);
return true;
}
Set<Path> childPaths = index.get(p).children.keySet();
for (Path child : childPaths) {
if (readLockBelow(child, false)) {
return true;
}
}
return false;
}
}