blob: 86e9093a57ca366d25a04177a0b2fe0d53d538ec [file] [log] [blame]
/*
* Copyright The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.InterProcessLock;
import org.apache.hadoop.hbase.InterProcessLock.MetadataHandler;
import org.apache.hadoop.hbase.InterProcessReadWriteLock;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.LockTimeoutException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.lock.ZKInterProcessReadWriteLock;
import org.apache.zookeeper.KeeperException;
/**
* A manager for distributed table level locks.
*/
@InterfaceAudience.Private
public abstract class TableLockManager {
private static final Log LOG = LogFactory.getLog(TableLockManager.class);
/** Configuration key for enabling table-level locks for schema changes */
public static final String TABLE_LOCK_ENABLE =
"hbase.table.lock.enable";
/** by default we should enable table-level locks for schema changes */
private static final boolean DEFAULT_TABLE_LOCK_ENABLE = true;
/** Configuration key for time out for trying to acquire table locks */
protected static final String TABLE_WRITE_LOCK_TIMEOUT_MS =
"hbase.table.write.lock.timeout.ms";
/** Configuration key for time out for trying to acquire table locks */
protected static final String TABLE_READ_LOCK_TIMEOUT_MS =
"hbase.table.read.lock.timeout.ms";
protected static final long DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS =
600 * 1000; //10 min default
protected static final long DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS =
600 * 1000; //10 min default
public static final String TABLE_LOCK_EXPIRE_TIMEOUT = "hbase.table.lock.expire.ms";
public static final long DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS =
600 * 1000; //10 min default
/**
* A distributed lock for a table.
*/
@InterfaceAudience.Private
public interface TableLock {
/**
* Acquire the lock, with the configured lock timeout.
* @throws LockTimeoutException If unable to acquire a lock within a specified
* time period (if any)
* @throws IOException If unrecoverable error occurs
*/
void acquire() throws IOException;
/**
* Release the lock already held.
* @throws IOException If there is an unrecoverable error releasing the lock
*/
void release() throws IOException;
}
/**
* Returns a TableLock for locking the table for exclusive access
* @param tableName Table to lock
* @param purpose Human readable reason for locking the table
* @return A new TableLock object for acquiring a write lock
*/
public abstract TableLock writeLock(TableName tableName, String purpose);
/**
* Returns a TableLock for locking the table for shared access among read-lock holders
* @param tableName Table to lock
* @param purpose Human readable reason for locking the table
* @return A new TableLock object for acquiring a read lock
*/
public abstract TableLock readLock(TableName tableName, String purpose);
/**
* Visits all table locks(read and write), and lock attempts with the given callback
* MetadataHandler.
* @param handler the metadata handler to call
* @throws IOException If there is an unrecoverable error
*/
public abstract void visitAllLocks(MetadataHandler handler) throws IOException;
/**
* Force releases all table locks(read and write) that have been held longer than
* "hbase.table.lock.expire.ms". Assumption is that the clock skew between zookeeper
* and this servers is negligible.
* The behavior of the lock holders still thinking that they have the lock is undefined.
* @throws IOException If there is an unrecoverable error
*/
public abstract void reapAllExpiredLocks() throws IOException;
/**
* Force releases table write locks and lock attempts even if this thread does
* not own the lock. The behavior of the lock holders still thinking that they
* have the lock is undefined. This should be used carefully and only when
* we can ensure that all write-lock holders have died. For example if only
* the master can hold write locks, then we can reap it's locks when the backup
* master starts.
* @throws IOException If there is an unrecoverable error
*/
public abstract void reapWriteLocks() throws IOException;
/**
* Called after a table has been deleted, and after the table lock is released.
* TableLockManager should do cleanup for the table state.
* @param tableName name of the table
* @throws IOException If there is an unrecoverable error releasing the lock
*/
public abstract void tableDeleted(TableName tableName)
throws IOException;
/**
* Creates and returns a TableLockManager according to the configuration
*/
public static TableLockManager createTableLockManager(Configuration conf,
ZooKeeperWatcher zkWatcher, ServerName serverName) {
// Initialize table level lock manager for schema changes, if enabled.
if (conf.getBoolean(TABLE_LOCK_ENABLE,
DEFAULT_TABLE_LOCK_ENABLE)) {
long writeLockTimeoutMs = conf.getLong(TABLE_WRITE_LOCK_TIMEOUT_MS,
DEFAULT_TABLE_WRITE_LOCK_TIMEOUT_MS);
long readLockTimeoutMs = conf.getLong(TABLE_READ_LOCK_TIMEOUT_MS,
DEFAULT_TABLE_READ_LOCK_TIMEOUT_MS);
long lockExpireTimeoutMs = conf.getLong(TABLE_LOCK_EXPIRE_TIMEOUT,
DEFAULT_TABLE_LOCK_EXPIRE_TIMEOUT_MS);
return new ZKTableLockManager(zkWatcher, serverName, writeLockTimeoutMs, readLockTimeoutMs, lockExpireTimeoutMs);
}
return new NullTableLockManager();
}
/**
* A null implementation
*/
@InterfaceAudience.Private
public static class NullTableLockManager extends TableLockManager {
static class NullTableLock implements TableLock {
@Override
public void acquire() throws IOException {
}
@Override
public void release() throws IOException {
}
}
@Override
public TableLock writeLock(TableName tableName, String purpose) {
return new NullTableLock();
}
@Override
public TableLock readLock(TableName tableName, String purpose) {
return new NullTableLock();
}
@Override
public void reapAllExpiredLocks() throws IOException {
}
@Override
public void reapWriteLocks() throws IOException {
}
@Override
public void tableDeleted(TableName tableName) throws IOException {
}
@Override
public void visitAllLocks(MetadataHandler handler) throws IOException {
}
}
/** Public for hbck */
public static ZooKeeperProtos.TableLock fromBytes(byte[] bytes) {
int pblen = ProtobufUtil.lengthOfPBMagic();
if (bytes == null || bytes.length < pblen) {
return null;
}
try {
ZooKeeperProtos.TableLock.Builder builder = ZooKeeperProtos.TableLock.newBuilder();
ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
return builder.build();
} catch (IOException ex) {
LOG.warn("Exception in deserialization", ex);
}
return null;
}
/**
* ZooKeeper based TableLockManager
*/
@InterfaceAudience.Private
private static class ZKTableLockManager extends TableLockManager {
private static final MetadataHandler METADATA_HANDLER = new MetadataHandler() {
@Override
public void handleMetadata(byte[] ownerMetadata) {
if (!LOG.isDebugEnabled()) {
return;
}
ZooKeeperProtos.TableLock data = fromBytes(ownerMetadata);
if (data == null) {
return;
}
LOG.debug("Table is locked by " +
String.format("[tableName=%s:%s, lockOwner=%s, threadId=%s, " +
"purpose=%s, isShared=%s, createTime=%s]",
data.getTableName().getNamespace().toStringUtf8(),
data.getTableName().getQualifier().toStringUtf8(),
ProtobufUtil.toServerName(data.getLockOwner()), data.getThreadId(),
data.getPurpose(), data.getIsShared(), data.getCreateTime()));
}
};
private static class TableLockImpl implements TableLock {
long lockTimeoutMs;
TableName tableName;
InterProcessLock lock;
boolean isShared;
ZooKeeperWatcher zkWatcher;
ServerName serverName;
String purpose;
public TableLockImpl(TableName tableName, ZooKeeperWatcher zkWatcher,
ServerName serverName, long lockTimeoutMs, boolean isShared, String purpose) {
this.tableName = tableName;
this.zkWatcher = zkWatcher;
this.serverName = serverName;
this.lockTimeoutMs = lockTimeoutMs;
this.isShared = isShared;
this.purpose = purpose;
}
@Override
public void acquire() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Attempt to acquire table " + (isShared ? "read" : "write") +
" lock on: " + tableName + " for:" + purpose);
}
lock = createTableLock();
try {
if (lockTimeoutMs == -1) {
// Wait indefinitely
lock.acquire();
} else {
if (!lock.tryAcquire(lockTimeoutMs)) {
throw new LockTimeoutException("Timed out acquiring " +
(isShared ? "read" : "write") + "lock for table:" + tableName +
"for:" + purpose + " after " + lockTimeoutMs + " ms.");
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted acquiring a lock for " + tableName, e);
Thread.currentThread().interrupt();
throw new InterruptedIOException("Interrupted acquiring a lock");
}
if (LOG.isTraceEnabled()) LOG.trace("Acquired table " + (isShared ? "read" : "write")
+ " lock on " + tableName + " for " + purpose);
}
@Override
public void release() throws IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Attempt to release table " + (isShared ? "read" : "write")
+ " lock on " + tableName);
}
if (lock == null) {
throw new IllegalStateException("Table " + tableName +
" is not locked!");
}
try {
lock.release();
} catch (InterruptedException e) {
LOG.warn("Interrupted while releasing a lock for " + tableName);
throw new InterruptedIOException();
}
if (LOG.isTraceEnabled()) {
LOG.trace("Released table lock on " + tableName);
}
}
private InterProcessLock createTableLock() {
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode,
tableName.getNameAsString());
ZooKeeperProtos.TableLock data = ZooKeeperProtos.TableLock.newBuilder()
.setTableName(ProtobufUtil.toProtoTableName(tableName))
.setLockOwner(ProtobufUtil.toServerName(serverName))
.setThreadId(Thread.currentThread().getId())
.setPurpose(purpose)
.setIsShared(isShared)
.setCreateTime(EnvironmentEdgeManager.currentTime()).build();
byte[] lockMetadata = toBytes(data);
InterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(zkWatcher, tableLockZNode,
METADATA_HANDLER);
return isShared ? lock.readLock(lockMetadata) : lock.writeLock(lockMetadata);
}
}
private static byte[] toBytes(ZooKeeperProtos.TableLock data) {
return ProtobufUtil.prependPBMagic(data.toByteArray());
}
private final ServerName serverName;
private final ZooKeeperWatcher zkWatcher;
private final long writeLockTimeoutMs;
private final long readLockTimeoutMs;
private final long lockExpireTimeoutMs;
/**
* Initialize a new manager for table-level locks.
* @param zkWatcher
* @param serverName Address of the server responsible for acquiring and
* releasing the table-level locks
* @param writeLockTimeoutMs Timeout (in milliseconds) for acquiring a write lock for a
* given table, or -1 for no timeout
* @param readLockTimeoutMs Timeout (in milliseconds) for acquiring a read lock for a
* given table, or -1 for no timeout
*/
public ZKTableLockManager(ZooKeeperWatcher zkWatcher,
ServerName serverName, long writeLockTimeoutMs, long readLockTimeoutMs, long lockExpireTimeoutMs) {
this.zkWatcher = zkWatcher;
this.serverName = serverName;
this.writeLockTimeoutMs = writeLockTimeoutMs;
this.readLockTimeoutMs = readLockTimeoutMs;
this.lockExpireTimeoutMs = lockExpireTimeoutMs;
}
@Override
public TableLock writeLock(TableName tableName, String purpose) {
return new TableLockImpl(tableName, zkWatcher,
serverName, writeLockTimeoutMs, false, purpose);
}
public TableLock readLock(TableName tableName, String purpose) {
return new TableLockImpl(tableName, zkWatcher,
serverName, readLockTimeoutMs, true, purpose);
}
public void visitAllLocks(MetadataHandler handler) throws IOException {
for (String tableName : getTableNames()) {
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
zkWatcher, tableLockZNode, null);
lock.readLock(null).visitLocks(handler);
lock.writeLock(null).visitLocks(handler);
}
}
private List<String> getTableNames() throws IOException {
List<String> tableNames;
try {
tableNames = ZKUtil.listChildrenNoWatch(zkWatcher, zkWatcher.tableLockZNode);
} catch (KeeperException e) {
LOG.error("Unexpected ZooKeeper error when listing children", e);
throw new IOException("Unexpected ZooKeeper exception", e);
}
return tableNames;
}
@Override
public void reapWriteLocks() throws IOException {
//get the table names
try {
for (String tableName : getTableNames()) {
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
zkWatcher, tableLockZNode, null);
lock.writeLock(null).reapAllLocks();
}
} catch (IOException ex) {
throw ex;
} catch (Exception ex) {
LOG.warn("Caught exception while reaping table write locks", ex);
}
}
@Override
public void reapAllExpiredLocks() throws IOException {
//get the table names
try {
for (String tableName : getTableNames()) {
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableName);
ZKInterProcessReadWriteLock lock = new ZKInterProcessReadWriteLock(
zkWatcher, tableLockZNode, null);
lock.readLock(null).reapExpiredLocks(lockExpireTimeoutMs);
lock.writeLock(null).reapExpiredLocks(lockExpireTimeoutMs);
}
} catch (IOException ex) {
throw ex;
} catch (Exception ex) {
throw new IOException(ex);
}
}
@Override
public void tableDeleted(TableName tableName) throws IOException {
//table write lock from DeleteHandler is already released, just delete the parent znode
String tableNameStr = tableName.getNameAsString();
String tableLockZNode = ZKUtil.joinZNode(zkWatcher.tableLockZNode, tableNameStr);
try {
ZKUtil.deleteNode(zkWatcher, tableLockZNode);
} catch (KeeperException ex) {
if (ex.code() == KeeperException.Code.NOTEMPTY) {
//we might get this in rare occasions where a CREATE table or some other table operation
//is waiting to acquire the lock. In this case, parent znode won't be deleted.
LOG.warn("Could not delete the znode for table locks because NOTEMPTY: "
+ tableLockZNode);
return;
}
throw new IOException(ex);
}
}
}
}