blob: 456660d37577bc34414e4b674957a546acd21984 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.procedure;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType;
import org.apache.hadoop.hbase.procedure2.AbstractProcedureScheduler;
import org.apache.hadoop.hbase.procedure2.LockAndQueue;
import org.apache.hadoop.hbase.procedure2.LockStatus;
import org.apache.hadoop.hbase.procedure2.LockedResource;
import org.apache.hadoop.hbase.procedure2.LockedResourceType;
import org.apache.hadoop.hbase.procedure2.Procedure;
import org.apache.hadoop.hbase.util.AvlUtil.AvlIterableList;
import org.apache.hadoop.hbase.util.AvlUtil.AvlKeyComparator;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTree;
import org.apache.hadoop.hbase.util.AvlUtil.AvlTreeIterator;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* ProcedureScheduler for the Master Procedures.
* This ProcedureScheduler tries to provide to the ProcedureExecutor procedures
* that can be executed without having to wait on a lock.
* Most of the master operations can be executed concurrently, if they
* are operating on different tables (e.g. two create table procedures can be performed
* at the same time) or against two different servers; say two servers that crashed at
* about the same time.
*
* <p>Each procedure should implement an Interface providing information for this queue.
* For example table related procedures should implement TableProcedureInterface.
* Each procedure will be pushed in its own queue, and based on the operation type
* we may make smarter decisions: e.g. we can abort all the operations preceding
* a delete table, or similar.
*
* <h4>Concurrency control</h4>
* Concurrent access to member variables (tableRunQueue, serverRunQueue, locking, tableMap,
* serverBuckets) is controlled by schedLock(). This mainly includes:<br>
* <ul>
* <li>
* {@link #push(Procedure, boolean, boolean)}: A push will add a Queue back to run-queue
* when:
* <ol>
* <li>Queue was empty before push (so must have been out of run-queue)</li>
* <li>Child procedure is added (which means parent procedure holds exclusive lock, and it
* must have moved Queue out of run-queue)</li>
* </ol>
* </li>
* <li>
* {@link #poll(long)}: A poll will remove a Queue from run-queue when:
* <ol>
* <li>Queue becomes empty after poll</li>
* <li>Exclusive lock is requested by polled procedure and lock is available (returns the
* procedure)</li>
* <li>Exclusive lock is requested but lock is not available (returns null)</li>
* <li>Polled procedure is child of parent holding exclusive lock and the next procedure is
* not a child</li>
* </ol>
* </li>
* <li>
* Namespace/table/region locks: Queue is added back to run-queue when lock being released is:
* <ol>
* <li>Exclusive lock</li>
* <li>Last shared lock (in case queue was removed because next procedure in queue required
* exclusive lock)</li>
* </ol>
* </li>
* </ul>
*/
@InterfaceAudience.Private
public class MasterProcedureScheduler extends AbstractProcedureScheduler {
private static final Logger LOG = LoggerFactory.getLogger(MasterProcedureScheduler.class);
private static final AvlKeyComparator<ServerQueue> SERVER_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((ServerName) k);
private final static AvlKeyComparator<TableQueue> TABLE_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private final static AvlKeyComparator<PeerQueue> PEER_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((String) k);
private final static AvlKeyComparator<MetaQueue> META_QUEUE_KEY_COMPARATOR =
(n, k) -> n.compareKey((TableName) k);
private final FairQueue<ServerName> serverRunQueue = new FairQueue<>();
private final FairQueue<TableName> tableRunQueue = new FairQueue<>();
private final FairQueue<String> peerRunQueue = new FairQueue<>();
private final FairQueue<TableName> metaRunQueue = new FairQueue<>();
private final ServerQueue[] serverBuckets = new ServerQueue[128];
private TableQueue tableMap = null;
private PeerQueue peerMap = null;
private MetaQueue metaMap = null;
private final SchemaLocking locking;
public MasterProcedureScheduler(Function<Long, Procedure<?>> procedureRetriever) {
locking = new SchemaLocking(procedureRetriever);
}
@Override
public void yield(final Procedure proc) {
push(proc, false, true);
}
@Override
protected void enqueue(final Procedure proc, final boolean addFront) {
if (isMetaProcedure(proc)) {
doAdd(metaRunQueue, getMetaQueue(), proc, addFront);
} else if (isTableProcedure(proc)) {
doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront);
} else if (isServerProcedure(proc)) {
ServerProcedureInterface spi = (ServerProcedureInterface) proc;
doAdd(serverRunQueue, getServerQueue(spi.getServerName(), spi), proc, addFront);
} else if (isPeerProcedure(proc)) {
doAdd(peerRunQueue, getPeerQueue(getPeerId(proc)), proc, addFront);
} else {
// TODO: at the moment we only have Table and Server procedures
// if you are implementing a non-table/non-server procedure, you have two options: create
// a group for all the non-table/non-server procedures or try to find a key for your
// non-table/non-server procedures and implement something similar to the TableRunQueue.
throw new UnsupportedOperationException(
"RQs for non-table/non-server procedures are not implemented yet: " + proc);
}
}
private <T extends Comparable<T>> void doAdd(FairQueue<T> fairq, Queue<T> queue,
Procedure<?> proc, boolean addFront) {
queue.add(proc, addFront);
// For the following conditions, we will put the queue back into execution
// 1. The procedure has already held the lock, or the lock has been restored when restarting,
// which means it can be executed immediately.
// 2. The exclusive lock for this queue has not been held.
// 3. The given procedure has the exclusive lock permission for this queue.
Supplier<String> reason = null;
if (proc.hasLock()) {
reason = () -> proc + " has lock";
} else if (proc.isLockedWhenLoading()) {
reason = () -> proc + " restores lock when restarting";
} else if (!queue.getLockStatus().hasExclusiveLock()) {
reason = () -> "the exclusive lock is not held by anyone when adding " + proc;
} else if (queue.getLockStatus().hasLockAccess(proc)) {
reason = () -> proc + " has the excusive lock access";
}
if (reason != null) {
addToRunQueue(fairq, queue, reason);
}
}
@Override
protected boolean queueHasRunnables() {
return metaRunQueue.hasRunnables() || tableRunQueue.hasRunnables() ||
serverRunQueue.hasRunnables() || peerRunQueue.hasRunnables();
}
@Override
protected Procedure dequeue() {
// meta procedure is always the first priority
Procedure<?> pollResult = doPoll(metaRunQueue);
// For now, let server handling have precedence over table handling; presumption is that it
// is more important handling crashed servers than it is running the
// enabling/disabling tables, etc.
if (pollResult == null) {
pollResult = doPoll(serverRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(peerRunQueue);
}
if (pollResult == null) {
pollResult = doPoll(tableRunQueue);
}
return pollResult;
}
private <T extends Comparable<T>> boolean isLockReady(Procedure<?> proc, Queue<T> rq) {
LockStatus s = rq.getLockStatus();
// if we have the lock access, we are ready
if (s.hasLockAccess(proc)) {
return true;
}
boolean xlockReq = rq.requireExclusiveLock(proc);
// if we need to hold the xlock, then we need to make sure that no one holds any lock, including
// the shared lock, otherwise, we just need to make sure that no one holds the xlock
return xlockReq ? !s.isLocked() : !s.hasExclusiveLock();
}
private <T extends Comparable<T>> Procedure<?> doPoll(final FairQueue<T> fairq) {
Queue<T> rq = fairq.poll();
if (rq == null || !rq.isAvailable()) {
return null;
}
// loop until we find out a procedure which is ready to run, or if we have checked all the
// procedures, then we give up and remove the queue from run queue.
for (int i = 0, n = rq.size(); i < n; i++) {
Procedure<?> proc = rq.poll();
if (isLockReady(proc, rq)) {
// the queue is empty, remove from run queue
if (rq.isEmpty()) {
removeFromRunQueue(fairq, rq, () -> "queue is empty after polling out " + proc);
}
return proc;
}
// we are not ready to run, add back and try the next procedure
rq.add(proc, false);
}
// no procedure is ready for execution, remove from run queue
removeFromRunQueue(fairq, rq, () -> "no procedure can be executed");
return null;
}
@Override
public List<LockedResource> getLocks() {
schedLock();
try {
return locking.getLocks();
} finally {
schedUnlock();
}
}
@Override
public LockedResource getLockResource(LockedResourceType resourceType, String resourceName) {
schedLock();
try {
return locking.getLockResource(resourceType, resourceName);
} finally {
schedUnlock();
}
}
@Override
public void clear() {
schedLock();
try {
clearQueue();
locking.clear();
} finally {
schedUnlock();
}
}
private void clearQueue() {
// Remove Servers
for (int i = 0; i < serverBuckets.length; ++i) {
clear(serverBuckets[i], serverRunQueue, SERVER_QUEUE_KEY_COMPARATOR);
serverBuckets[i] = null;
}
// Remove Tables
clear(tableMap, tableRunQueue, TABLE_QUEUE_KEY_COMPARATOR);
tableMap = null;
// Remove Peers
clear(peerMap, peerRunQueue, PEER_QUEUE_KEY_COMPARATOR);
peerMap = null;
assert size() == 0 : "expected queue size to be 0, got " + size();
}
private <T extends Comparable<T>, TNode extends Queue<T>> void clear(TNode treeMap,
FairQueue<T> fairq, AvlKeyComparator<TNode> comparator) {
while (treeMap != null) {
Queue<T> node = AvlTree.getFirst(treeMap);
treeMap = AvlTree.remove(treeMap, node.getKey(), comparator);
if (fairq != null) {
removeFromRunQueue(fairq, node, () -> "clear all queues");
}
}
}
private int queueSize(Queue<?> head) {
int count = 0;
AvlTreeIterator<Queue<?>> iter = new AvlTreeIterator<Queue<?>>(head);
while (iter.hasNext()) {
count += iter.next().size();
}
return count;
}
@Override
protected int queueSize() {
int count = 0;
for (ServerQueue serverMap : serverBuckets) {
count += queueSize(serverMap);
}
count += queueSize(tableMap);
count += queueSize(peerMap);
count += queueSize(metaMap);
return count;
}
@Override
public void completionCleanup(final Procedure proc) {
if (proc instanceof TableProcedureInterface) {
TableProcedureInterface iProcTable = (TableProcedureInterface) proc;
boolean tableDeleted;
if (proc.hasException()) {
Exception procEx = proc.getException().unwrapRemoteException();
if (iProcTable.getTableOperationType() == TableOperationType.CREATE) {
// create failed because the table already exist
tableDeleted = !(procEx instanceof TableExistsException);
} else {
// the operation failed because the table does not exist
tableDeleted = (procEx instanceof TableNotFoundException);
}
} else {
// the table was deleted
tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE);
}
if (tableDeleted) {
markTableAsDeleted(iProcTable.getTableName(), proc);
return;
}
} else if (proc instanceof PeerProcedureInterface) {
tryCleanupPeerQueue(getPeerId(proc), proc);
} else if (proc instanceof ServerProcedureInterface) {
tryCleanupServerQueue(getServerName(proc), proc);
} else {
// No cleanup for other procedure types, yet.
return;
}
}
private static <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue,
Supplier<String> reason) {
if (LOG.isTraceEnabled()) {
LOG.trace("Add {} to run queue because: {}", queue, reason.get());
}
if (!AvlIterableList.isLinked(queue) && !queue.isEmpty()) {
fairq.add(queue);
}
}
private static <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq,
Queue<T> queue, Supplier<String> reason) {
if (LOG.isTraceEnabled()) {
LOG.trace("Remove {} from run queue because: {}", queue, reason.get());
}
if (AvlIterableList.isLinked(queue)) {
fairq.remove(queue);
}
}
// ============================================================================
// Table Queue Lookup Helpers
// ============================================================================
private TableQueue getTableQueue(TableName tableName) {
TableQueue node = AvlTree.get(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
if (node != null) return node;
node = new TableQueue(tableName, MasterProcedureUtil.getTablePriority(tableName),
locking.getTableLock(tableName), locking.getNamespaceLock(tableName.getNamespaceAsString()));
tableMap = AvlTree.insert(tableMap, node);
return node;
}
private void removeTableQueue(TableName tableName) {
tableMap = AvlTree.remove(tableMap, tableName, TABLE_QUEUE_KEY_COMPARATOR);
locking.removeTableLock(tableName);
}
private static boolean isTableProcedure(Procedure<?> proc) {
return proc instanceof TableProcedureInterface;
}
private static TableName getTableName(Procedure<?> proc) {
return ((TableProcedureInterface)proc).getTableName();
}
// ============================================================================
// Server Queue Lookup Helpers
// ============================================================================
private ServerQueue getServerQueue(ServerName serverName, ServerProcedureInterface proc) {
final int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
int priority;
if (proc != null) {
priority = MasterProcedureUtil.getServerPriority(proc);
} else {
priority = 1;
}
node = new ServerQueue(serverName, priority, locking.getServerLock(serverName));
serverBuckets[index] = AvlTree.insert(serverBuckets[index], node);
return node;
}
private void removeServerQueue(ServerName serverName) {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
serverBuckets[index] =
AvlTree.remove(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
locking.removeServerLock(serverName);
}
private void tryCleanupServerQueue(ServerName serverName, Procedure<?> proc) {
schedLock();
try {
int index = getBucketIndex(serverBuckets, serverName.hashCode());
ServerQueue node = AvlTree.get(serverBuckets[index], serverName, SERVER_QUEUE_KEY_COMPARATOR);
if (node == null) {
return;
}
LockAndQueue lock = locking.getServerLock(serverName);
if (node.isEmpty() && lock.tryExclusiveLock(proc)) {
removeFromRunQueue(serverRunQueue, node,
() -> "clean up server queue after " + proc + " completed");
removeServerQueue(serverName);
}
} finally {
schedUnlock();
}
}
private static int getBucketIndex(Object[] buckets, int hashCode) {
return Math.abs(hashCode) % buckets.length;
}
private static boolean isServerProcedure(Procedure<?> proc) {
return proc instanceof ServerProcedureInterface;
}
private static ServerName getServerName(Procedure<?> proc) {
return ((ServerProcedureInterface)proc).getServerName();
}
// ============================================================================
// Peer Queue Lookup Helpers
// ============================================================================
private PeerQueue getPeerQueue(String peerId) {
PeerQueue node = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
node = new PeerQueue(peerId, locking.getPeerLock(peerId));
peerMap = AvlTree.insert(peerMap, node);
return node;
}
private void removePeerQueue(String peerId) {
peerMap = AvlTree.remove(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
locking.removePeerLock(peerId);
}
private void tryCleanupPeerQueue(String peerId, Procedure<?> procedure) {
schedLock();
try {
PeerQueue queue = AvlTree.get(peerMap, peerId, PEER_QUEUE_KEY_COMPARATOR);
if (queue == null) {
return;
}
final LockAndQueue lock = locking.getPeerLock(peerId);
if (queue.isEmpty() && lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, queue,
() -> "clean up peer queue after " + procedure + " completed");
removePeerQueue(peerId);
}
} finally {
schedUnlock();
}
}
private static boolean isPeerProcedure(Procedure<?> proc) {
return proc instanceof PeerProcedureInterface;
}
private static String getPeerId(Procedure<?> proc) {
return ((PeerProcedureInterface) proc).getPeerId();
}
// ============================================================================
// Meta Queue Lookup Helpers
// ============================================================================
private MetaQueue getMetaQueue() {
MetaQueue node = AvlTree.get(metaMap, TableName.META_TABLE_NAME, META_QUEUE_KEY_COMPARATOR);
if (node != null) {
return node;
}
node = new MetaQueue(locking.getMetaLock());
metaMap = AvlTree.insert(metaMap, node);
return node;
}
private static boolean isMetaProcedure(Procedure<?> proc) {
return proc instanceof MetaProcedureInterface;
}
// ============================================================================
// Table Locking Helpers
// ============================================================================
/**
* Get lock info for a resource of specified type and name and log details
*/
private void logLockedResource(LockedResourceType resourceType, String resourceName) {
if (!LOG.isDebugEnabled()) {
return;
}
LockedResource lockedResource = getLockResource(resourceType, resourceName);
if (lockedResource != null) {
String msg = resourceType.toString() + " '" + resourceName + "', shared lock count=" +
lockedResource.getSharedLockCount();
Procedure<?> proc = lockedResource.getExclusiveLockOwnerProcedure();
if (proc != null) {
msg += ", exclusively locked by procId=" + proc.getProcId();
}
LOG.debug(msg);
}
}
/**
* Suspend the procedure if the specified table is already locked.
* Other operations in the table-queue will be executed after the lock is released.
* @param procedure the procedure trying to acquire the lock
* @param table Table to lock
* @return true if the procedure has to wait for the table to be available
*/
public boolean waitTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
schedLock();
try {
final String namespace = table.getNamespaceAsString();
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
final LockAndQueue tableLock = locking.getTableLock(table);
if (!namespaceLock.trySharedLock(procedure)) {
waitProcedure(namespaceLock, procedure);
logLockedResource(LockedResourceType.NAMESPACE, namespace);
return true;
}
if (!tableLock.tryExclusiveLock(procedure)) {
namespaceLock.releaseSharedLock();
waitProcedure(tableLock, procedure);
logLockedResource(LockedResourceType.TABLE, table.getNameAsString());
return true;
}
removeFromRunQueue(tableRunQueue, getTableQueue(table),
() -> procedure + " held the exclusive lock");
return false;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified table
* @param procedure the procedure releasing the lock
* @param table the name of the table that has the exclusive lock
*/
public void wakeTableExclusiveLock(final Procedure<?> procedure, final TableName table) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
final LockAndQueue tableLock = locking.getTableLock(table);
int waitingCount = 0;
if (tableLock.releaseExclusiveLock(procedure)) {
waitingCount += wakeWaitingProcedures(tableLock);
}
if (namespaceLock.releaseSharedLock()) {
waitingCount += wakeWaitingProcedures(namespaceLock);
}
addToRunQueue(tableRunQueue, getTableQueue(table),
() -> procedure + " released the exclusive lock");
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* Suspend the procedure if the specified table is already locked.
* other "read" operations in the table-queue may be executed concurrently,
* @param procedure the procedure trying to acquire the lock
* @param table Table to lock
* @return true if the procedure has to wait for the table to be available
*/
public boolean waitTableSharedLock(final Procedure<?> procedure, final TableName table) {
return waitTableQueueSharedLock(procedure, table) == null;
}
private TableQueue waitTableQueueSharedLock(final Procedure<?> procedure, final TableName table) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
final LockAndQueue tableLock = locking.getTableLock(table);
if (!namespaceLock.trySharedLock(procedure)) {
waitProcedure(namespaceLock, procedure);
return null;
}
if (!tableLock.trySharedLock(procedure)) {
namespaceLock.releaseSharedLock();
waitProcedure(tableLock, procedure);
return null;
}
return getTableQueue(table);
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified table
* @param procedure the procedure releasing the lock
* @param table the name of the table that has the shared lock
*/
public void wakeTableSharedLock(final Procedure<?> procedure, final TableName table) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(table.getNamespaceAsString());
final LockAndQueue tableLock = locking.getTableLock(table);
int waitingCount = 0;
if (tableLock.releaseSharedLock()) {
addToRunQueue(tableRunQueue, getTableQueue(table),
() -> procedure + " released the shared lock");
waitingCount += wakeWaitingProcedures(tableLock);
}
if (namespaceLock.releaseSharedLock()) {
waitingCount += wakeWaitingProcedures(namespaceLock);
}
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* Tries to remove the queue and the table-lock of the specified table.
* If there are new operations pending (e.g. a new create),
* the remove will not be performed.
* @param table the name of the table that should be marked as deleted
* @param procedure the procedure that is removing the table
* @return true if deletion succeeded, false otherwise meaning that there are
* other new operations pending for that table (e.g. a new create).
*/
boolean markTableAsDeleted(final TableName table, final Procedure<?> procedure) {
schedLock();
try {
final TableQueue queue = getTableQueue(table);
final LockAndQueue tableLock = locking.getTableLock(table);
if (queue == null) return true;
if (queue.isEmpty() && tableLock.tryExclusiveLock(procedure)) {
// remove the table from the run-queue and the map
if (AvlIterableList.isLinked(queue)) {
tableRunQueue.remove(queue);
}
removeTableQueue(table);
} else {
// TODO: If there are no create, we can drop all the other ops
return false;
}
} finally {
schedUnlock();
}
return true;
}
// ============================================================================
// Region Locking Helpers
// ============================================================================
/**
* Suspend the procedure if the specified region is already locked.
* @param procedure the procedure trying to acquire the lock on the region
* @param regionInfo the region we are trying to lock
* @return true if the procedure has to wait for the regions to be available
*/
public boolean waitRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
return waitRegions(procedure, regionInfo.getTable(), regionInfo);
}
/**
* Suspend the procedure if the specified set of regions are already locked.
* @param procedure the procedure trying to acquire the lock on the regions
* @param table the table name of the regions we are trying to lock
* @param regionInfos the list of regions we are trying to lock
* @return true if the procedure has to wait for the regions to be available
*/
public boolean waitRegions(final Procedure<?> procedure, final TableName table,
final RegionInfo... regionInfos) {
Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
schedLock();
try {
assert table != null;
if (waitTableSharedLock(procedure, table)) {
return true;
}
// acquire region xlocks or wait
boolean hasLock = true;
final LockAndQueue[] regionLocks = new LockAndQueue[regionInfos.length];
for (int i = 0; i < regionInfos.length; ++i) {
assert regionInfos[i] != null;
assert regionInfos[i].getTable() != null;
assert regionInfos[i].getTable().equals(table): regionInfos[i] + " " + procedure;
assert i == 0 || regionInfos[i] != regionInfos[i - 1] : "duplicate region: " + regionInfos[i];
regionLocks[i] = locking.getRegionLock(regionInfos[i].getEncodedName());
if (!regionLocks[i].tryExclusiveLock(procedure)) {
LOG.info("Waiting on xlock for {} held by pid={}", procedure,
regionLocks[i].getExclusiveLockProcIdOwner());
waitProcedure(regionLocks[i], procedure);
hasLock = false;
while (i-- > 0) {
regionLocks[i].releaseExclusiveLock(procedure);
}
break;
} else {
LOG.info("Took xlock for {}", procedure);
}
}
if (!hasLock) {
wakeTableSharedLock(procedure, table);
}
return !hasLock;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified region
* @param procedure the procedure that was holding the region
* @param regionInfo the region the procedure was holding
*/
public void wakeRegion(final Procedure<?> procedure, final RegionInfo regionInfo) {
wakeRegions(procedure, regionInfo.getTable(), regionInfo);
}
/**
* Wake the procedures waiting for the specified regions
* @param procedure the procedure that was holding the regions
* @param regionInfos the list of regions the procedure was holding
*/
public void wakeRegions(final Procedure<?> procedure,final TableName table,
final RegionInfo... regionInfos) {
Arrays.sort(regionInfos, RegionInfo.COMPARATOR);
schedLock();
try {
int numProcs = 0;
final Procedure<?>[] nextProcs = new Procedure[regionInfos.length];
for (int i = 0; i < regionInfos.length; ++i) {
assert regionInfos[i].getTable().equals(table);
assert i == 0 || regionInfos[i] != regionInfos[i - 1] : "duplicate region: " + regionInfos[i];
LockAndQueue regionLock = locking.getRegionLock(regionInfos[i].getEncodedName());
if (regionLock.releaseExclusiveLock(procedure)) {
if (!regionLock.isWaitingQueueEmpty()) {
// release one procedure at the time since regions has an xlock
nextProcs[numProcs++] = regionLock.removeFirst();
} else {
locking.removeRegionLock(regionInfos[i].getEncodedName());
}
}
}
// awake procedures if any
for (int i = numProcs - 1; i >= 0; --i) {
wakeProcedure(nextProcs[i]);
}
wakePollIfNeeded(numProcs);
// release the table shared-lock.
wakeTableSharedLock(procedure, table);
} finally {
schedUnlock();
}
}
// ============================================================================
// Namespace Locking Helpers
// ============================================================================
/**
* Suspend the procedure if the specified namespace is already locked.
* @see #wakeNamespaceExclusiveLock(Procedure,String)
* @param procedure the procedure trying to acquire the lock
* @param namespace Namespace to lock
* @return true if the procedure has to wait for the namespace to be available
*/
public boolean waitNamespaceExclusiveLock(Procedure<?> procedure, String namespace) {
schedLock();
try {
final LockAndQueue systemNamespaceTableLock =
locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
if (!systemNamespaceTableLock.trySharedLock(procedure)) {
waitProcedure(systemNamespaceTableLock, procedure);
logLockedResource(LockedResourceType.TABLE,
TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME.getNameAsString());
return true;
}
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
if (!namespaceLock.tryExclusiveLock(procedure)) {
systemNamespaceTableLock.releaseSharedLock();
waitProcedure(namespaceLock, procedure);
logLockedResource(LockedResourceType.NAMESPACE, namespace);
return true;
}
return false;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified namespace
* @see #waitNamespaceExclusiveLock(Procedure,String)
* @param procedure the procedure releasing the lock
* @param namespace the namespace that has the exclusive lock
*/
public void wakeNamespaceExclusiveLock(final Procedure<?> procedure, final String namespace) {
schedLock();
try {
final LockAndQueue namespaceLock = locking.getNamespaceLock(namespace);
final LockAndQueue systemNamespaceTableLock =
locking.getTableLock(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME);
int waitingCount = 0;
if (namespaceLock.releaseExclusiveLock(procedure)) {
waitingCount += wakeWaitingProcedures(namespaceLock);
}
if (systemNamespaceTableLock.releaseSharedLock()) {
addToRunQueue(tableRunQueue,
getTableQueue(TableProcedureInterface.DUMMY_NAMESPACE_TABLE_NAME),
() -> procedure + " released namespace exclusive lock");
waitingCount += wakeWaitingProcedures(systemNamespaceTableLock);
}
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
// ============================================================================
// Server Locking Helpers
// ============================================================================
/**
* Try to acquire the exclusive lock on the specified server.
* @see #wakeServerExclusiveLock(Procedure,ServerName)
* @param procedure the procedure trying to acquire the lock
* @param serverName Server to lock
* @return true if the procedure has to wait for the server to be available
*/
public boolean waitServerExclusiveLock(final Procedure<?> procedure,
final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
if (lock.tryExclusiveLock(procedure)) {
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
removeFromRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null),
() -> procedure + " held exclusive lock");
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.SERVER, serverName.getServerName());
return true;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified server
* @see #waitServerExclusiveLock(Procedure,ServerName)
* @param procedure the procedure releasing the lock
* @param serverName the server that has the exclusive lock
*/
public void wakeServerExclusiveLock(final Procedure<?> procedure, final ServerName serverName) {
schedLock();
try {
final LockAndQueue lock = locking.getServerLock(serverName);
// Only SCP will acquire/release server lock so do not need to check the return value here.
lock.releaseExclusiveLock(procedure);
// In tests we may pass procedures other than ServerProcedureInterface, just pass null if
// so.
addToRunQueue(serverRunQueue,
getServerQueue(serverName,
procedure instanceof ServerProcedureInterface ? (ServerProcedureInterface) procedure
: null), () -> procedure + " released exclusive lock");
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
// ============================================================================
// Peer Locking Helpers
// ============================================================================
/**
* Try to acquire the exclusive lock on the specified peer.
* @see #wakePeerExclusiveLock(Procedure, String)
* @param procedure the procedure trying to acquire the lock
* @param peerId peer to lock
* @return true if the procedure has to wait for the peer to be available
*/
public boolean waitPeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();
try {
final LockAndQueue lock = locking.getPeerLock(peerId);
if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(peerRunQueue, getPeerQueue(peerId),
() -> procedure + " held exclusive lock");
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.PEER, peerId);
return true;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for the specified peer
* @see #waitPeerExclusiveLock(Procedure, String)
* @param procedure the procedure releasing the lock
* @param peerId the peer that has the exclusive lock
*/
public void wakePeerExclusiveLock(Procedure<?> procedure, String peerId) {
schedLock();
try {
final LockAndQueue lock = locking.getPeerLock(peerId);
if (lock.releaseExclusiveLock(procedure)) {
addToRunQueue(peerRunQueue, getPeerQueue(peerId),
() -> procedure + " released exclusive lock");
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
}
} finally {
schedUnlock();
}
}
// ============================================================================
// Meta Locking Helpers
// ============================================================================
/**
* Try to acquire the exclusive lock on meta.
* @see #wakeMetaExclusiveLock(Procedure)
* @param procedure the procedure trying to acquire the lock
* @return true if the procedure has to wait for meta to be available
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/
@Deprecated
public boolean waitMetaExclusiveLock(Procedure<?> procedure) {
schedLock();
try {
final LockAndQueue lock = locking.getMetaLock();
if (lock.tryExclusiveLock(procedure)) {
removeFromRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " held exclusive lock");
return false;
}
waitProcedure(lock, procedure);
logLockedResource(LockedResourceType.META, TableName.META_TABLE_NAME.getNameAsString());
return true;
} finally {
schedUnlock();
}
}
/**
* Wake the procedures waiting for meta.
* @see #waitMetaExclusiveLock(Procedure)
* @param procedure the procedure releasing the lock
* @deprecated only used for {@link RecoverMetaProcedure}. Should be removed along with
* {@link RecoverMetaProcedure}.
*/
@Deprecated
public void wakeMetaExclusiveLock(Procedure<?> procedure) {
schedLock();
try {
final LockAndQueue lock = locking.getMetaLock();
lock.releaseExclusiveLock(procedure);
addToRunQueue(metaRunQueue, getMetaQueue(), () -> procedure + " released exclusive lock");
int waitingCount = wakeWaitingProcedures(lock);
wakePollIfNeeded(waitingCount);
} finally {
schedUnlock();
}
}
/**
* For debugging. Expensive.
*/
public String dumpLocks() throws IOException {
schedLock();
try {
// TODO: Refactor so we stream out locks for case when millions; i.e. take a PrintWriter
return this.locking.toString();
} finally {
schedUnlock();
}
}
}