blob: d1f81cab7c959508338308e00597ca9d8bdd0fb6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.oss.sync;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.framework.recipes.locks.InterProcessReadWriteLock;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.utils.ZKPaths;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.oss.Constants;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Objects.requireNonNull;
/**
* Implementation based on Apache Curator and Apache ZooKeeper. This allows
* HBOSS to re-use an Apache HBase cluster's ZooKeeper ensemble for file
* system locking.
*
* Can be enabled in JUnit tests with -Pzk. If {@link Constants.ZK_CONN_STRING}
* isn't specified, an embedded ZooKeeper process will be spun up for tests.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG)
@InterfaceStability.Unstable
public class ZKTreeLockManager extends TreeLockManager {
private static final Logger LOG =
LoggerFactory.getLogger(ZKTreeLockManager.class);
private CuratorFramework curator;
private String root;
private void setRoot() {
root = "/hboss";
}
private static final String lockSubZnode = ".hboss-lock-znode";
private Map<Path,InterProcessReadWriteLock> lockCache = new HashMap<>();
public void initialize(FileSystem fs) throws IOException {
this.fs = fs;
Configuration conf = fs.getConf();
int baseSleepTimeMs = conf.getInt(Constants.ZK_BASE_SLEEP_MS, 1000);
int maxRetries = conf.getInt(Constants.ZK_MAX_RETRIES, 3);
RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMs, maxRetries);
// Create a temporary connection to ensure the root is created, then create
// a new connection 'jailed' inside that root to eliminate the need for
// paths to constantly be resolved inside the root.
String zookeeperConnectionString = conf.get(Constants.ZK_CONN_STRING);
curator = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
curator.start();
waitForCuratorToConnect();
setRoot();
try {
ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), root, true);
} catch (Exception e) {
throw new IOException("Unable to initialize root znodes", e);
}
curator.close();
zookeeperConnectionString += root;
curator = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
curator.start();
waitForCuratorToConnect();
}
private void waitForCuratorToConnect() {
try {
if (!requireNonNull(curator).blockUntilConnected(30, TimeUnit.SECONDS)) {
throw new RuntimeException("Failed to connect to ZooKeeper");
}
} catch (InterruptedException e) {
LOG.warn("Interrupted waiting to connect to ZooKeeper", e);
Thread.currentThread().interrupt();
return;
}
}
@Override
public void close() throws IOException {
curator.close();
}
@Override
protected void writeLock(Path p) throws IOException {
try {
LOG.debug("writeLock {} acquire", p);
get(p).writeLock().acquire();
} catch (Exception e) {
throw new IOException("Exception during write locking of path " + p, e);
}
}
@Override
protected void writeUnlock(Path p) throws IOException {
try {
LOG.debug("writeLock {} release", p);
get(p).writeLock().release();
} catch(IllegalMonitorStateException e) {
// Reentrant locks might be acquired multiple times
LOG.error("Tried to release unacquired write lock: {}", p);
throw e;
} catch (Exception e) {
throw new IOException("Exception during write unlocking of path " + p, e);
}
}
@Override
protected void readLock(Path p) throws IOException {
LOG.debug("readLock {} acquire", p);
try {
get(p).readLock().acquire();
} catch (Exception e) {
throw new IOException("Exception during read locking of path " + p, e);
}
}
@Override
protected void readUnlock(Path p) throws IOException {
LOG.debug("readLock {} release", p);
try {
get(p).readLock().release();
} catch(IllegalMonitorStateException e) {
// Reentrant locks might be acquired multiple times
LOG.error("Tried to release unacquired write lock: {}", p);
throw e;
} catch (Exception e) {
throw new IOException("Exception during read unlocking of path " + p, e);
}
}
@Override
protected boolean writeLockAbove(Path p) throws IOException {
LOG.debug("Checking for write lock above {}", p);
while (!p.isRoot()) {
p = p.getParent();
if (isLocked(get(p).writeLock())) {
LOG.debug("Parent write lock currently held: {}", p);
return true;
}
}
return false;
}
@Override
@VisibleForTesting
public boolean writeLockBelow(Path p, Depth depth) throws IOException {
int maxLevel = (depth == Depth.DIRECTORY) ? 1 : Integer.MAX_VALUE;
boolean b = writeLockBelow(p, 0, maxLevel);
return b;
}
@Override
@VisibleForTesting
public boolean readLockBelow(Path p, Depth depth) throws IOException {
int maxLevel = (depth == Depth.DIRECTORY) ? 1 : Integer.MAX_VALUE;
boolean b = readLockBelow(p, 0, maxLevel);
return b;
}
@Override
protected void recursiveDelete(Path p) throws IOException {
try {
ZKPaths.deleteChildren(curator.getZookeeperClient().getZooKeeper(),
p.toString(), !p.isRoot());
// Before this method is called, we have a guarantee that
// 1. There are no write locks above or below us
// 2. There are no read locks below us
// As such, we can just remove locks beneath us as we find them.
removeInMemoryLocks(p);
} catch (KeeperException.NoNodeException e) {
LOG.warn("Lock not found during recursive delete: {}", p);
} catch (Exception e) {
throw new IOException("Exception while deleting lock " + p, e);
}
}
private synchronized void removeInMemoryLocks(Path p) {
Iterator<Entry<Path,InterProcessReadWriteLock>> iter = lockCache.entrySet().iterator();
while (iter.hasNext()) {
Entry<Path,InterProcessReadWriteLock> entry = iter.next();
if (isBeneath(p, entry.getKey())) {
LOG.trace("Removing lock for {}", entry.getKey());
iter.remove();
}
}
}
private boolean isBeneath(Path parent, Path other) {
if (parent.equals(other)) {
return false;
}
// Is `other` fully contained in some path beneath the parent.
return 0 == other.toString().indexOf(parent.toString());
}
private boolean writeLockBelow(Path p, int level, int maxLevel) throws IOException {
try {
if (level > 0 && isLocked(get(p).writeLock())) {
return true;
}
if (level < maxLevel) {
List<String> children = curator.getChildren().forPath(p.toString());
for (String child : children) {
if (child.equals(lockSubZnode)) {
continue;
}
if (writeLockBelow(new Path(p, child), level+1, maxLevel)) {
LOG.debug("Parent write lock currently held: {}", p);
return true;
}
}
}
} catch (KeeperException.NoNodeException e) {
// Ignore, means we hit the bottom of the tree
} catch (Exception e) {
throw new IOException("Error checking parents for write lock: " + p, e);
}
return false;
}
private boolean readLockBelow(Path p, int level, int maxLevel) throws IOException {
try {
if (level > 0 && isLocked(get(p).readLock())) {
return true;
}
if (level < maxLevel) {
List<String> children = curator.getChildren().forPath(p.toString());
for (String child : children) {
if (child.equals(lockSubZnode)) {
continue;
}
if (readLockBelow(new Path(p, child), level+1, maxLevel)) {
LOG.debug("Child read lock currently held: {}", p);
return true;
}
}
}
} catch (KeeperException.NoNodeException e) {
// Ignore, means we hit the bottom of the tree
} catch (Exception e) {
throw new IOException("Error checking children for read lock: " + p, e);
}
return false;
}
/**
* Specifically, if this is lock by another thread.
*/
private boolean isLocked(InterProcessMutex lock) throws IOException {
try {
if (lock.isOwnedByCurrentThread()) {
// First check the current thread, because we allow you to get locks
// when parent or child paths are only locked by you.
return false;
}
if (lock.isAcquiredInThisProcess()) {
// We know it's not this thread, but this is less expensive
// than checking other processes.
return true;
}
// Finally, see if another process holds the lock. This is a terrible way
// to check but Curator doesn't expose another way.
if (lock.acquire(0, TimeUnit.NANOSECONDS)) {
lock.release();
return false;
}
} catch (Exception e) {
throw new IOException("Exception while testing a lock", e);
}
return true;
}
public String summarizeLocks() {
StringBuilder sb = new StringBuilder();
Map<Path,InterProcessReadWriteLock> cache = getUnmodifiableCache();
for (Entry<Path,InterProcessReadWriteLock> entry : cache.entrySet()) {
sb.append(entry.getKey()).append("=").append(describeLock(entry.getValue()));
}
return sb.toString();
}
String describeLock(InterProcessReadWriteLock lock) {
if (lock == null) {
return "null";
}
InterProcessMutex rlock = lock.readLock();
InterProcessMutex wlock = lock.writeLock();
StringBuilder sb = new StringBuilder();
sb.append("ReadLock[heldByThisThread=").append(rlock.isOwnedByCurrentThread());
sb.append(", heldInThisProcess=").append(rlock.isAcquiredInThisProcess()).append("]");
sb.append(" WriteLock[heldByThisThread=").append(wlock.isOwnedByCurrentThread());
sb.append(", heldInThisProcess=").append(wlock.isAcquiredInThisProcess()).append("]");
return sb.toString();
}
public synchronized Map<Path,InterProcessReadWriteLock> getUnmodifiableCache() {
return Collections.unmodifiableMap(lockCache);
}
private synchronized InterProcessReadWriteLock get(Path path) throws IOException {
if (!lockCache.containsKey(path)) {
String zkPath = new Path(path, lockSubZnode).toString();
try {
ZKPaths.mkdirs(curator.getZookeeperClient().getZooKeeper(), zkPath, true);
} catch (KeeperException.NodeExistsException e) {
// Ignore
} catch (Exception e) {
throw new IOException("Exception while ensuring lock parents exist: " +
path, e);
}
lockCache.put(path, new InterProcessReadWriteLock(curator, zkPath));
}
return lockCache.get(path);
}
}