| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUTKey WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.hbase.master.procedure; |
| |
| import java.io.IOException; |
| import java.util.ArrayDeque; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.HRegionInfo; |
| import org.apache.hadoop.hbase.ServerName; |
| import org.apache.hadoop.hbase.TableExistsException; |
| import org.apache.hadoop.hbase.TableName; |
| import org.apache.hadoop.hbase.TableNotFoundException; |
| import org.apache.hadoop.hbase.classification.InterfaceAudience; |
| import org.apache.hadoop.hbase.classification.InterfaceStability; |
| import org.apache.hadoop.hbase.master.TableLockManager; |
| import org.apache.hadoop.hbase.master.TableLockManager.TableLock; |
| import org.apache.hadoop.hbase.master.procedure.TableProcedureInterface.TableOperationType; |
| import org.apache.hadoop.hbase.procedure2.Procedure; |
| import org.apache.hadoop.hbase.procedure2.ProcedureRunnableSet; |
| |
| /** |
| * ProcedureRunnableSet for the Master Procedures. |
| * This RunnableSet tries to provide to the ProcedureExecutor procedures |
| * that can be executed without having to wait on a lock. |
| * Most of the master operations can be executed concurrently, if they |
| * are operating on different tables (e.g. two create table can be performed |
| * at the same, time assuming table A and table B) or against two different servers; say |
| * two servers that crashed at about the same time. |
| * |
| * <p>Each procedure should implement an interface providing information for this queue. |
| * for example table related procedures should implement TableProcedureInterface. |
| * each procedure will be pushed in its own queue, and based on the operation type |
| * we may take smarter decision. e.g. we can abort all the operations preceding |
| * a delete table, or similar. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class MasterProcedureScheduler implements ProcedureRunnableSet { |
| private static final Log LOG = LogFactory.getLog(MasterProcedureScheduler.class); |
| |
| private final TableLockManager lockManager; |
| private final ReentrantLock schedLock = new ReentrantLock(); |
| private final Condition schedWaitCond = schedLock.newCondition(); |
| |
| private final FairQueue<ServerName> serverRunQueue = new FairQueue<ServerName>(); |
| private final FairQueue<TableName> tableRunQueue = new FairQueue<TableName>(); |
| private int queueSize = 0; |
| |
| private final Object[] serverBuckets = new Object[128]; |
| private Queue<String> namespaceMap = null; |
| private Queue<TableName> tableMap = null; |
| |
| private final int metaTablePriority; |
| private final int userTablePriority; |
| private final int sysTablePriority; |
| |
| // TODO: metrics |
| private long pollCalls = 0; |
| private long nullPollCalls = 0; |
| |
| public MasterProcedureScheduler(final Configuration conf, final TableLockManager lockManager) { |
| this.lockManager = lockManager; |
| |
| // TODO: should this be part of the HTD? |
| metaTablePriority = conf.getInt("hbase.master.procedure.queue.meta.table.priority", 3); |
| sysTablePriority = conf.getInt("hbase.master.procedure.queue.system.table.priority", 2); |
| userTablePriority = conf.getInt("hbase.master.procedure.queue.user.table.priority", 1); |
| } |
| |
| @Override |
| public void addFront(Procedure proc) { |
| doAdd(proc, true); |
| } |
| |
| @Override |
| public void addBack(Procedure proc) { |
| doAdd(proc, false); |
| } |
| |
| @Override |
| public void yield(final Procedure proc) { |
| doAdd(proc, isTableProcedure(proc)); |
| } |
| |
| private void doAdd(final Procedure proc, final boolean addFront) { |
| doAdd(proc, addFront, true); |
| } |
| |
| private void doAdd(final Procedure proc, final boolean addFront, final boolean notify) { |
| schedLock.lock(); |
| try { |
| if (isTableProcedure(proc)) { |
| doAdd(tableRunQueue, getTableQueue(getTableName(proc)), proc, addFront); |
| } else if (isServerProcedure(proc)) { |
| doAdd(serverRunQueue, getServerQueue(getServerName(proc)), proc, addFront); |
| } else { |
| // TODO: at the moment we only have Table and Server procedures |
| // if you are implementing a non-table/non-server procedure, you have two options: create |
| // a group for all the non-table/non-server procedures or try to find a key for your |
| // non-table/non-server procedures and implement something similar to the TableRunQueue. |
| throw new UnsupportedOperationException( |
| "RQs for non-table/non-server procedures are not implemented yet"); |
| } |
| if (notify) { |
| schedWaitCond.signal(); |
| } |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| private <T extends Comparable<T>> void doAdd(final FairQueue<T> fairq, |
| final Queue<T> queue, final Procedure proc, final boolean addFront) { |
| if (proc.isSuspended()) return; |
| |
| queue.add(proc, addFront); |
| |
| if (!(queue.isSuspended() || queue.hasExclusiveLock())) { |
| // the queue is not suspended or removed from the fairq (run-queue) |
| // because someone has an xlock on it. |
| // so, if the queue is not-linked we should add it |
| if (queue.size() == 1 && !IterableList.isLinked(queue)) { |
| fairq.add(queue); |
| } |
| queueSize++; |
| } else if (proc.hasParent() && queue.isLockOwner(proc.getParentProcId())) { |
| assert addFront : "expected to add a child in the front"; |
| assert !queue.isSuspended() : "unexpected suspended state for the queue"; |
| // our (proc) parent has the xlock, |
| // so the queue is not in the fairq (run-queue) |
| // add it back to let the child run (inherit the lock) |
| if (!IterableList.isLinked(queue)) { |
| fairq.add(queue); |
| } |
| queueSize++; |
| } |
| } |
| |
| @Override |
| public Procedure poll() { |
| return poll(-1); |
| } |
| |
| @edu.umd.cs.findbugs.annotations.SuppressWarnings("WA_AWAIT_NOT_IN_LOOP") |
| protected Procedure poll(long waitNsec) { |
| Procedure pollResult = null; |
| schedLock.lock(); |
| try { |
| if (queueSize == 0) { |
| if (waitNsec < 0) { |
| schedWaitCond.await(); |
| } else { |
| schedWaitCond.awaitNanos(waitNsec); |
| } |
| if (queueSize == 0) { |
| return null; |
| } |
| } |
| |
| // For now, let server handling have precedence over table handling; presumption is that it |
| // is more important handling crashed servers than it is running the |
| // enabling/disabling tables, etc. |
| pollResult = doPoll(serverRunQueue); |
| if (pollResult == null) { |
| pollResult = doPoll(tableRunQueue); |
| } |
| |
| // update metrics |
| pollCalls++; |
| nullPollCalls += (pollResult == null) ? 1 : 0; |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } finally { |
| schedLock.unlock(); |
| } |
| return pollResult; |
| } |
| |
| private <T extends Comparable<T>> Procedure doPoll(final FairQueue<T> fairq) { |
| Queue<T> rq = fairq.poll(); |
| if (rq == null || !rq.isAvailable()) { |
| return null; |
| } |
| |
| assert !rq.isSuspended() : "rq=" + rq + " is suspended"; |
| Procedure pollResult = rq.poll(); |
| this.queueSize--; |
| if (rq.isEmpty() || rq.requireExclusiveLock(pollResult)) { |
| removeFromRunQueue(fairq, rq); |
| } else if (pollResult.hasParent() && rq.isLockOwner(pollResult.getParentProcId())) { |
| // if the rq is in the fairq because of runnable child |
| // check if the next procedure is still a child. |
| // if not, remove the rq from the fairq and go back to the xlock state |
| Procedure nextProc = rq.peek(); |
| if (nextProc != null && nextProc.getParentProcId() != pollResult.getParentProcId()) { |
| removeFromRunQueue(fairq, rq); |
| } |
| } |
| |
| return pollResult; |
| } |
| |
| @Override |
| public void clear() { |
| // NOTE: USED ONLY FOR TESTING |
| schedLock.lock(); |
| try { |
| // Remove Servers |
| for (int i = 0; i < serverBuckets.length; ++i) { |
| clear((ServerQueue)serverBuckets[i], serverRunQueue); |
| serverBuckets[i] = null; |
| } |
| |
| // Remove Tables |
| clear(tableMap, tableRunQueue); |
| tableMap = null; |
| |
| assert queueSize == 0 : "expected queue size to be 0, got " + queueSize; |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| private <T extends Comparable<T>> void clear(Queue<T> treeMap, FairQueue<T> fairq) { |
| while (treeMap != null) { |
| Queue<T> node = AvlTree.getFirst(treeMap); |
| assert !node.isSuspended() : "can't clear suspended " + node.getKey(); |
| treeMap = AvlTree.remove(treeMap, node.getKey()); |
| removeFromRunQueue(fairq, node); |
| } |
| } |
| |
| @Override |
| public void signalAll() { |
| schedLock.lock(); |
| try { |
| schedWaitCond.signalAll(); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| @Override |
| public int size() { |
| schedLock.lock(); |
| try { |
| return queueSize; |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void completionCleanup(Procedure proc) { |
| if (proc instanceof TableProcedureInterface) { |
| TableProcedureInterface iProcTable = (TableProcedureInterface)proc; |
| boolean tableDeleted; |
| if (proc.hasException()) { |
| IOException procEx = proc.getException().unwrapRemoteException(); |
| if (iProcTable.getTableOperationType() == TableOperationType.CREATE) { |
| // create failed because the table already exist |
| tableDeleted = !(procEx instanceof TableExistsException); |
| } else { |
| // the operation failed because the table does not exist |
| tableDeleted = (procEx instanceof TableNotFoundException); |
| } |
| } else { |
| // the table was deleted |
| tableDeleted = (iProcTable.getTableOperationType() == TableOperationType.DELETE); |
| } |
| if (tableDeleted) { |
| markTableAsDeleted(iProcTable.getTableName()); |
| return; |
| } |
| } else { |
| // No cleanup for ServerProcedureInterface types, yet. |
| return; |
| } |
| } |
| |
| private <T extends Comparable<T>> void addToRunQueue(FairQueue<T> fairq, Queue<T> queue) { |
| if (IterableList.isLinked(queue)) return; |
| if (!queue.isEmpty()) { |
| fairq.add(queue); |
| queueSize += queue.size(); |
| } |
| } |
| |
| private <T extends Comparable<T>> void removeFromRunQueue(FairQueue<T> fairq, Queue<T> queue) { |
| if (!IterableList.isLinked(queue)) return; |
| fairq.remove(queue); |
| queueSize -= queue.size(); |
| } |
| |
| // ============================================================================ |
| // TODO: Metrics |
| // ============================================================================ |
| public long getPollCalls() { |
| return pollCalls; |
| } |
| |
| public long getNullPollCalls() { |
| return nullPollCalls; |
| } |
| |
| // ============================================================================ |
| // Event Helpers |
| // ============================================================================ |
| public boolean waitEvent(ProcedureEvent event, Procedure procedure) { |
| return waitEvent(event, procedure, false); |
| } |
| |
| public boolean waitEvent(ProcedureEvent event, Procedure procedure, boolean suspendQueue) { |
| return waitEvent(event, /* lockEvent= */false, procedure, suspendQueue); |
| } |
| |
| private boolean waitEvent(ProcedureEvent event, boolean lockEvent, |
| Procedure procedure, boolean suspendQueue) { |
| synchronized (event) { |
| if (event.isReady()) { |
| if (lockEvent) { |
| event.setReady(false); |
| } |
| return false; |
| } |
| |
| if (!suspendQueue) { |
| suspendProcedure(event, procedure); |
| } else if (isTableProcedure(procedure)) { |
| waitTableEvent(event, procedure); |
| } else if (isServerProcedure(procedure)) { |
| waitServerEvent(event, procedure); |
| } else { |
| // TODO: at the moment we only have Table and Server procedures |
| // if you are implementing a non-table/non-server procedure, you have two options: create |
| // a group for all the non-table/non-server procedures or try to find a key for your |
| // non-table/non-server procedures and implement something similar to the TableRunQueue. |
| throw new UnsupportedOperationException( |
| "RQs for non-table/non-server procedures are not implemented yet"); |
| } |
| } |
| return true; |
| } |
| |
| private void waitTableEvent(ProcedureEvent event, Procedure procedure) { |
| final TableName tableName = getTableName(procedure); |
| final boolean isDebugEnabled = LOG.isDebugEnabled(); |
| |
| schedLock.lock(); |
| try { |
| TableQueue queue = getTableQueue(tableName); |
| queue.addFront(procedure); |
| if (queue.isSuspended()) return; |
| |
| if (isDebugEnabled) { |
| LOG.debug("Suspend table queue " + tableName); |
| } |
| queue.setSuspended(true); |
| removeFromRunQueue(tableRunQueue, queue); |
| event.suspendTableQueue(queue); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| private void waitServerEvent(ProcedureEvent event, Procedure procedure) { |
| final ServerName serverName = getServerName(procedure); |
| final boolean isDebugEnabled = LOG.isDebugEnabled(); |
| |
| schedLock.lock(); |
| try { |
| // TODO: This will change once we have the new AM |
| ServerQueue queue = getServerQueue(serverName); |
| queue.addFront(procedure); |
| if (queue.isSuspended()) return; |
| |
| if (isDebugEnabled) { |
| LOG.debug("Suspend server queue " + serverName); |
| } |
| queue.setSuspended(true); |
| removeFromRunQueue(serverRunQueue, queue); |
| event.suspendServerQueue(queue); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| public void suspend(ProcedureEvent event) { |
| final boolean isDebugEnabled = LOG.isDebugEnabled(); |
| synchronized (event) { |
| event.setReady(false); |
| if (isDebugEnabled) { |
| LOG.debug("Suspend event " + event); |
| } |
| } |
| } |
| |
| public void wake(ProcedureEvent event) { |
| final boolean isDebugEnabled = LOG.isDebugEnabled(); |
| synchronized (event) { |
| event.setReady(true); |
| if (isDebugEnabled) { |
| LOG.debug("Wake event " + event); |
| } |
| |
| schedLock.lock(); |
| try { |
| while (event.hasWaitingTables()) { |
| Queue<TableName> queue = event.popWaitingTable(); |
| addToRunQueue(tableRunQueue, queue); |
| } |
| // TODO: This will change once we have the new AM |
| while (event.hasWaitingServers()) { |
| Queue<ServerName> queue = event.popWaitingServer(); |
| addToRunQueue(serverRunQueue, queue); |
| } |
| |
| while (event.hasWaitingProcedures()) { |
| wakeProcedure(event.popWaitingProcedure(false)); |
| } |
| |
| if (queueSize > 1) { |
| schedWaitCond.signalAll(); |
| } else if (queueSize > 0) { |
| schedWaitCond.signal(); |
| } |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| } |
| |
| private void suspendProcedure(BaseProcedureEvent event, Procedure procedure) { |
| procedure.suspend(); |
| event.suspendProcedure(procedure); |
| } |
| |
| private void wakeProcedure(Procedure procedure) { |
| procedure.resume(); |
| doAdd(procedure, /* addFront= */ true, /* notify= */false); |
| } |
| |
| private static abstract class BaseProcedureEvent { |
| private ArrayDeque<Procedure> waitingProcedures = null; |
| |
| protected void suspendProcedure(Procedure proc) { |
| if (waitingProcedures == null) { |
| waitingProcedures = new ArrayDeque<Procedure>(); |
| } |
| waitingProcedures.addLast(proc); |
| } |
| |
| protected boolean hasWaitingProcedures() { |
| return waitingProcedures != null; |
| } |
| |
| protected Procedure popWaitingProcedure(boolean popFront) { |
| // it will be nice to use IterableList on a procedure and avoid allocations... |
| Procedure proc = popFront ? waitingProcedures.removeFirst() : waitingProcedures.removeLast(); |
| if (waitingProcedures.isEmpty()) { |
| waitingProcedures = null; |
| } |
| return proc; |
| } |
| } |
| |
| public static class ProcedureEvent extends BaseProcedureEvent { |
| private final String description; |
| |
| private Queue<ServerName> waitingServers = null; |
| private Queue<TableName> waitingTables = null; |
| private boolean ready = false; |
| |
| public ProcedureEvent(String description) { |
| this.description = description; |
| } |
| |
| public synchronized boolean isReady() { |
| return ready; |
| } |
| |
| private synchronized void setReady(boolean isReady) { |
| this.ready = isReady; |
| } |
| |
| private void suspendTableQueue(Queue<TableName> queue) { |
| waitingTables = IterableList.append(waitingTables, queue); |
| } |
| |
| private void suspendServerQueue(Queue<ServerName> queue) { |
| waitingServers = IterableList.append(waitingServers, queue); |
| } |
| |
| private boolean hasWaitingTables() { |
| return waitingTables != null; |
| } |
| |
| private Queue<TableName> popWaitingTable() { |
| Queue<TableName> node = waitingTables; |
| waitingTables = IterableList.remove(waitingTables, node); |
| node.setSuspended(false); |
| return node; |
| } |
| |
| private boolean hasWaitingServers() { |
| return waitingServers != null; |
| } |
| |
| private Queue<ServerName> popWaitingServer() { |
| Queue<ServerName> node = waitingServers; |
| waitingServers = IterableList.remove(waitingServers, node); |
| node.setSuspended(false); |
| return node; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("ProcedureEvent(%s)", description); |
| } |
| } |
| |
| // ============================================================================ |
| // Table Queue Lookup Helpers |
| // ============================================================================ |
| private TableQueue getTableQueueWithLock(TableName tableName) { |
| schedLock.lock(); |
| try { |
| return getTableQueue(tableName); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| private TableQueue getTableQueue(TableName tableName) { |
| Queue<TableName> node = AvlTree.get(tableMap, tableName); |
| if (node != null) return (TableQueue)node; |
| |
| NamespaceQueue nsQueue = getNamespaceQueue(tableName.getNamespaceAsString()); |
| node = new TableQueue(tableName, nsQueue, getTablePriority(tableName)); |
| tableMap = AvlTree.insert(tableMap, node); |
| return (TableQueue)node; |
| } |
| |
| private void removeTableQueue(TableName tableName) { |
| tableMap = AvlTree.remove(tableMap, tableName); |
| } |
| |
| private int getTablePriority(TableName tableName) { |
| if (tableName.equals(TableName.META_TABLE_NAME)) { |
| return metaTablePriority; |
| } else if (tableName.isSystemTable()) { |
| return sysTablePriority; |
| } |
| return userTablePriority; |
| } |
| |
| private static boolean isTableProcedure(Procedure proc) { |
| return proc instanceof TableProcedureInterface; |
| } |
| |
| private static TableName getTableName(Procedure proc) { |
| return ((TableProcedureInterface)proc).getTableName(); |
| } |
| |
| // ============================================================================ |
| // Namespace Queue Lookup Helpers |
| // ============================================================================ |
| private NamespaceQueue getNamespaceQueue(String namespace) { |
| Queue<String> node = AvlTree.get(namespaceMap, namespace); |
| if (node != null) return (NamespaceQueue)node; |
| |
| node = new NamespaceQueue(namespace); |
| namespaceMap = AvlTree.insert(namespaceMap, node); |
| return (NamespaceQueue)node; |
| } |
| |
| // ============================================================================ |
| // Server Queue Lookup Helpers |
| // ============================================================================ |
| private ServerQueue getServerQueueWithLock(ServerName serverName) { |
| schedLock.lock(); |
| try { |
| return getServerQueue(serverName); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| private ServerQueue getServerQueue(ServerName serverName) { |
| int index = getBucketIndex(serverBuckets, serverName.hashCode()); |
| Queue<ServerName> root = getTreeRoot(serverBuckets, index); |
| Queue<ServerName> node = AvlTree.get(root, serverName); |
| if (node != null) return (ServerQueue)node; |
| |
| node = new ServerQueue(serverName); |
| serverBuckets[index] = AvlTree.insert(root, node); |
| return (ServerQueue)node; |
| } |
| |
| private void removeServerQueue(ServerName serverName) { |
| int index = getBucketIndex(serverBuckets, serverName.hashCode()); |
| serverBuckets[index] = AvlTree.remove((ServerQueue)serverBuckets[index], serverName); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static <T extends Comparable<T>> Queue<T> getTreeRoot(Object[] buckets, int index) { |
| return (Queue<T>) buckets[index]; |
| } |
| |
| private static int getBucketIndex(Object[] buckets, int hashCode) { |
| return Math.abs(hashCode) % buckets.length; |
| } |
| |
| private static boolean isServerProcedure(Procedure proc) { |
| return proc instanceof ServerProcedureInterface; |
| } |
| |
| private static ServerName getServerName(Procedure proc) { |
| return ((ServerProcedureInterface)proc).getServerName(); |
| } |
| |
| // ============================================================================ |
| // Table and Server Queue Implementation |
| // ============================================================================ |
| public static class ServerQueue extends QueueImpl<ServerName> { |
| public ServerQueue(ServerName serverName) { |
| super(serverName); |
| } |
| |
| public boolean requireExclusiveLock(Procedure proc) { |
| ServerProcedureInterface spi = (ServerProcedureInterface)proc; |
| switch (spi.getServerOperationType()) { |
| case CRASH_HANDLER: |
| return true; |
| default: |
| break; |
| } |
| throw new UnsupportedOperationException("unexpected type " + spi.getServerOperationType()); |
| } |
| } |
| |
| private static class RegionEvent extends BaseProcedureEvent { |
| private final HRegionInfo regionInfo; |
| private long exclusiveLockProcIdOwner = Long.MIN_VALUE; |
| |
| public RegionEvent(HRegionInfo regionInfo) { |
| this.regionInfo = regionInfo; |
| } |
| |
| public boolean hasExclusiveLock() { |
| return exclusiveLockProcIdOwner != Long.MIN_VALUE; |
| } |
| |
| public boolean isLockOwner(long procId) { |
| return exclusiveLockProcIdOwner == procId; |
| } |
| |
| public boolean tryExclusiveLock(long procIdOwner) { |
| assert procIdOwner != Long.MIN_VALUE; |
| if (hasExclusiveLock()) return false; |
| exclusiveLockProcIdOwner = procIdOwner; |
| return true; |
| } |
| |
| private void releaseExclusiveLock() { |
| exclusiveLockProcIdOwner = Long.MIN_VALUE; |
| } |
| |
| public HRegionInfo getRegionInfo() { |
| return regionInfo; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("region %s event", regionInfo.getRegionNameAsString()); |
| } |
| } |
| |
| public static class TableQueue extends QueueImpl<TableName> { |
| private final NamespaceQueue namespaceQueue; |
| |
| private HashMap<HRegionInfo, RegionEvent> regionEventMap; |
| private TableLock tableLock = null; |
| |
| public TableQueue(TableName tableName, NamespaceQueue namespaceQueue, int priority) { |
| super(tableName, priority); |
| this.namespaceQueue = namespaceQueue; |
| } |
| |
| public NamespaceQueue getNamespaceQueue() { |
| return namespaceQueue; |
| } |
| |
| @Override |
| public synchronized boolean isAvailable() { |
| // if there are no items in the queue, or the namespace is locked. |
| // we can't execute operation on this table |
| if (isEmpty() || namespaceQueue.hasExclusiveLock()) { |
| return false; |
| } |
| |
| if (hasExclusiveLock()) { |
| // if we have an exclusive lock already taken |
| // only child of the lock owner can be executed |
| Procedure availProc = peek(); |
| return availProc != null && availProc.hasParent() && |
| isLockOwner(availProc.getParentProcId()); |
| } |
| |
| // no xlock |
| return true; |
| } |
| |
| public synchronized RegionEvent getRegionEvent(final HRegionInfo regionInfo) { |
| if (regionEventMap == null) { |
| regionEventMap = new HashMap<HRegionInfo, RegionEvent>(); |
| } |
| RegionEvent event = regionEventMap.get(regionInfo); |
| if (event == null) { |
| event = new RegionEvent(regionInfo); |
| regionEventMap.put(regionInfo, event); |
| } |
| return event; |
| } |
| |
| public synchronized void removeRegionEvent(final RegionEvent event) { |
| regionEventMap.remove(event.getRegionInfo()); |
| if (regionEventMap.isEmpty()) { |
| regionEventMap = null; |
| } |
| } |
| |
| // TODO: We can abort pending/in-progress operation if the new call is |
| // something like drop table. We can Override addBack(), |
| // check the type and abort all the in-flight procedurs. |
| private boolean canAbortPendingOperations(Procedure proc) { |
| TableProcedureInterface tpi = (TableProcedureInterface)proc; |
| switch (tpi.getTableOperationType()) { |
| case DELETE: |
| return true; |
| default: |
| return false; |
| } |
| } |
| |
| public boolean requireExclusiveLock(Procedure proc) { |
| TableProcedureInterface tpi = (TableProcedureInterface)proc; |
| switch (tpi.getTableOperationType()) { |
| case CREATE: |
| case DELETE: |
| case DISABLE: |
| case ENABLE: |
| return true; |
| case EDIT: |
| // we allow concurrent edit on the NS table |
| return !tpi.getTableName().equals(TableName.NAMESPACE_TABLE_NAME); |
| case READ: |
| return false; |
| // region operations are using the shared-lock on the table |
| // and then they will grab an xlock on the region. |
| case SPLIT: |
| case MERGE: |
| case ASSIGN: |
| case UNASSIGN: |
| return false; |
| default: |
| break; |
| } |
| throw new UnsupportedOperationException("unexpected type " + tpi.getTableOperationType()); |
| } |
| |
| private synchronized boolean tryZkSharedLock(final TableLockManager lockManager, |
| final String purpose) { |
| // Since we only have one lock resource. We should only acquire zk lock if the znode |
| // does not exist. |
| // |
| if (isSingleSharedLock()) { |
| // Take zk-read-lock |
| TableName tableName = getKey(); |
| tableLock = lockManager.readLock(tableName, purpose); |
| try { |
| tableLock.acquire(); |
| } catch (IOException e) { |
| LOG.error("failed acquire read lock on " + tableName, e); |
| tableLock = null; |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private synchronized void releaseZkSharedLock(final TableLockManager lockManager) { |
| if (isSingleSharedLock()) { |
| releaseTableLock(lockManager, true); |
| } |
| } |
| |
| private synchronized boolean tryZkExclusiveLock(final TableLockManager lockManager, |
| final String purpose) { |
| // Take zk-write-lock |
| TableName tableName = getKey(); |
| tableLock = lockManager.writeLock(tableName, purpose); |
| try { |
| tableLock.acquire(); |
| } catch (IOException e) { |
| LOG.error("failed acquire write lock on " + tableName, e); |
| tableLock = null; |
| return false; |
| } |
| return true; |
| } |
| |
| private synchronized void releaseZkExclusiveLock(final TableLockManager lockManager) { |
| releaseTableLock(lockManager, true); |
| } |
| |
| private void releaseTableLock(final TableLockManager lockManager, boolean reset) { |
| for (int i = 0; i < 3; ++i) { |
| try { |
| tableLock.release(); |
| if (reset) { |
| tableLock = null; |
| } |
| break; |
| } catch (IOException e) { |
| LOG.warn("Could not release the table write-lock", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * the namespace is currently used just as a rwlock, not as a queue. |
| * because ns operation are not frequent enough. so we want to avoid |
| * having to move table queues around for suspend/resume. |
| */ |
| private static class NamespaceQueue extends Queue<String> { |
| public NamespaceQueue(String namespace) { |
| super(namespace); |
| } |
| |
| @Override |
| public boolean requireExclusiveLock(Procedure proc) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public void add(final Procedure proc, final boolean addToFront) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Procedure peek() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public Procedure poll() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| @Override |
| public int size() { |
| throw new UnsupportedOperationException(); |
| } |
| } |
| |
| // ============================================================================ |
| // Table Locking Helpers |
| // ============================================================================ |
| /** |
| * Try to acquire the exclusive lock on the specified table. |
| * other operations in the table-queue will be executed after the lock is released. |
| * @param procedure the procedure trying to acquire the lock |
| * @param table Table to lock |
| * @return true if we were able to acquire the lock on the table, otherwise false. |
| */ |
| public boolean tryAcquireTableExclusiveLock(final Procedure procedure, final TableName table) { |
| schedLock.lock(); |
| TableQueue queue = getTableQueue(table); |
| if (!queue.getNamespaceQueue().trySharedLock()) { |
| return false; |
| } |
| |
| if (!queue.tryExclusiveLock(procedure.getProcId())) { |
| queue.getNamespaceQueue().releaseSharedLock(); |
| schedLock.unlock(); |
| return false; |
| } |
| |
| removeFromRunQueue(tableRunQueue, queue); |
| schedLock.unlock(); |
| |
| // Zk lock is expensive... |
| boolean hasXLock = queue.tryZkExclusiveLock(lockManager, procedure.toString()); |
| if (!hasXLock) { |
| schedLock.lock(); |
| queue.releaseExclusiveLock(); |
| queue.getNamespaceQueue().releaseSharedLock(); |
| addToRunQueue(tableRunQueue, queue); |
| schedLock.unlock(); |
| } |
| return hasXLock; |
| } |
| |
| /** |
| * Release the exclusive lock taken with tryAcquireTableWrite() |
| * @param procedure the procedure releasing the lock |
| * @param table the name of the table that has the exclusive lock |
| */ |
| public void releaseTableExclusiveLock(final Procedure procedure, final TableName table) { |
| schedLock.lock(); |
| TableQueue queue = getTableQueue(table); |
| schedLock.unlock(); |
| |
| // Zk lock is expensive... |
| queue.releaseZkExclusiveLock(lockManager); |
| |
| schedLock.lock(); |
| queue.releaseExclusiveLock(); |
| queue.getNamespaceQueue().releaseSharedLock(); |
| addToRunQueue(tableRunQueue, queue); |
| schedLock.unlock(); |
| } |
| |
| /** |
| * Try to acquire the shared lock on the specified table. |
| * other "read" operations in the table-queue may be executed concurrently, |
| * @param procedure the procedure trying to acquire the lock |
| * @param table Table to lock |
| * @return true if we were able to acquire the lock on the table, otherwise false. |
| */ |
| public boolean tryAcquireTableSharedLock(final Procedure procedure, final TableName table) { |
| return tryAcquireTableQueueSharedLock(procedure, table) != null; |
| } |
| |
| private TableQueue tryAcquireTableQueueSharedLock(final Procedure procedure, |
| final TableName table) { |
| schedLock.lock(); |
| TableQueue queue = getTableQueue(table); |
| if (!queue.getNamespaceQueue().trySharedLock()) { |
| return null; |
| } |
| |
| if (!queue.trySharedLock()) { |
| queue.getNamespaceQueue().releaseSharedLock(); |
| schedLock.unlock(); |
| return null; |
| } |
| |
| // TODO: Zk lock is expensive and it would be perf bottleneck. Long term solution is |
| // to remove it. |
| if (!queue.tryZkSharedLock(lockManager, procedure.toString())) { |
| queue.releaseSharedLock(); |
| queue.getNamespaceQueue().releaseSharedLock(); |
| schedLock.unlock(); |
| return null; |
| } |
| |
| schedLock.unlock(); |
| |
| return queue; |
| } |
| |
| /** |
| * Release the shared lock taken with tryAcquireTableRead() |
| * @param procedure the procedure releasing the lock |
| * @param table the name of the table that has the shared lock |
| */ |
| public void releaseTableSharedLock(final Procedure procedure, final TableName table) { |
| final TableQueue queue = getTableQueueWithLock(table); |
| |
| schedLock.lock(); |
| // Zk lock is expensive... |
| queue.releaseZkSharedLock(lockManager); |
| |
| queue.releaseSharedLock(); |
| queue.getNamespaceQueue().releaseSharedLock(); |
| schedLock.unlock(); |
| } |
| |
| /** |
| * Tries to remove the queue and the table-lock of the specified table. |
| * If there are new operations pending (e.g. a new create), |
| * the remove will not be performed. |
| * @param table the name of the table that should be marked as deleted |
| * @return true if deletion succeeded, false otherwise meaning that there are |
| * other new operations pending for that table (e.g. a new create). |
| */ |
| protected boolean markTableAsDeleted(final TableName table) { |
| final ReentrantLock l = schedLock; |
| l.lock(); |
| try { |
| TableQueue queue = getTableQueue(table); |
| if (queue == null) return true; |
| |
| if (queue.isEmpty() && queue.tryExclusiveLock(0)) { |
| // remove the table from the run-queue and the map |
| if (IterableList.isLinked(queue)) { |
| tableRunQueue.remove(queue); |
| } |
| |
| // Remove the table lock |
| try { |
| lockManager.tableDeleted(table); |
| } catch (IOException e) { |
| LOG.warn("Received exception from TableLockManager.tableDeleted:", e); //not critical |
| } |
| |
| removeTableQueue(table); |
| } else { |
| // TODO: If there are no create, we can drop all the other ops |
| return false; |
| } |
| } finally { |
| l.unlock(); |
| } |
| return true; |
| } |
| |
| // ============================================================================ |
| // Region Locking Helpers |
| // ============================================================================ |
| public boolean waitRegion(final Procedure procedure, final HRegionInfo regionInfo) { |
| return waitRegions(procedure, regionInfo.getTable(), regionInfo); |
| } |
| |
| public boolean waitRegions(final Procedure procedure, final TableName table, |
| final HRegionInfo... regionInfo) { |
| Arrays.sort(regionInfo); |
| |
| final TableQueue queue; |
| if (procedure.hasParent()) { |
| // the assumption is that the parent procedure have already the table xlock |
| queue = getTableQueueWithLock(table); |
| } else { |
| // acquire the table shared-lock |
| queue = tryAcquireTableQueueSharedLock(procedure, table); |
| if (queue == null) return false; |
| } |
| |
| // acquire region xlocks or wait |
| boolean hasLock = true; |
| final RegionEvent[] event = new RegionEvent[regionInfo.length]; |
| synchronized (queue) { |
| for (int i = 0; i < regionInfo.length; ++i) { |
| assert regionInfo[i].getTable().equals(table); |
| event[i] = queue.getRegionEvent(regionInfo[i]); |
| if (!event[i].tryExclusiveLock(procedure.getProcId())) { |
| suspendProcedure(event[i], procedure); |
| hasLock = false; |
| while (i-- > 0) { |
| event[i].releaseExclusiveLock(); |
| } |
| break; |
| } |
| } |
| } |
| |
| if (!hasLock && !procedure.hasParent()) { |
| releaseTableSharedLock(procedure, table); |
| } |
| return hasLock; |
| } |
| |
| public void wakeRegion(final Procedure procedure, final HRegionInfo regionInfo) { |
| wakeRegions(procedure, regionInfo.getTable(), regionInfo); |
| } |
| |
| public void wakeRegions(final Procedure procedure,final TableName table, |
| final HRegionInfo... regionInfo) { |
| Arrays.sort(regionInfo); |
| |
| final TableQueue queue = getTableQueueWithLock(table); |
| |
| int numProcs = 0; |
| final Procedure[] nextProcs = new Procedure[regionInfo.length]; |
| synchronized (queue) { |
| for (int i = 0; i < regionInfo.length; ++i) { |
| assert regionInfo[i].getTable().equals(table); |
| RegionEvent event = queue.getRegionEvent(regionInfo[i]); |
| event.releaseExclusiveLock(); |
| if (event.hasWaitingProcedures()) { |
| // release one procedure at the time since regions has an xlock |
| nextProcs[numProcs++] = event.popWaitingProcedure(true); |
| } else { |
| queue.removeRegionEvent(event); |
| } |
| } |
| } |
| |
| // awake procedures if any |
| schedLock.lock(); |
| try { |
| for (int i = numProcs - 1; i >= 0; --i) { |
| wakeProcedure(nextProcs[i]); |
| } |
| |
| if (numProcs > 1) { |
| schedWaitCond.signalAll(); |
| } else if (numProcs > 0) { |
| schedWaitCond.signal(); |
| } |
| |
| if (!procedure.hasParent()) { |
| // release the table shared-lock. |
| // (if we have a parent, it is holding an xlock so we didn't take the shared-lock) |
| releaseTableSharedLock(procedure, table); |
| } |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| // ============================================================================ |
| // Namespace Locking Helpers |
| // ============================================================================ |
| /** |
| * Try to acquire the exclusive lock on the specified namespace. |
| * @see #releaseNamespaceExclusiveLock(Procedure,String) |
| * @param procedure the procedure trying to acquire the lock |
| * @param nsName Namespace to lock |
| * @return true if we were able to acquire the lock on the namespace, otherwise false. |
| */ |
| public boolean tryAcquireNamespaceExclusiveLock(final Procedure procedure, final String nsName) { |
| schedLock.lock(); |
| try { |
| TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); |
| if (!tableQueue.trySharedLock()) return false; |
| |
| NamespaceQueue nsQueue = getNamespaceQueue(nsName); |
| boolean hasLock = nsQueue.tryExclusiveLock(procedure.getProcId()); |
| if (!hasLock) { |
| tableQueue.releaseSharedLock(); |
| } |
| return hasLock; |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| /** |
| * Release the exclusive lock |
| * @see #tryAcquireNamespaceExclusiveLock(Procedure,String) |
| * @param procedure the procedure releasing the lock |
| * @param nsName the namespace that has the exclusive lock |
| */ |
| public void releaseNamespaceExclusiveLock(final Procedure procedure, final String nsName) { |
| schedLock.lock(); |
| try { |
| TableQueue tableQueue = getTableQueue(TableName.NAMESPACE_TABLE_NAME); |
| tableQueue.releaseSharedLock(); |
| |
| NamespaceQueue queue = getNamespaceQueue(nsName); |
| queue.releaseExclusiveLock(); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| // ============================================================================ |
| // Server Locking Helpers |
| // ============================================================================ |
| /** |
| * Try to acquire the exclusive lock on the specified server. |
| * @see #releaseServerExclusiveLock(Procedure,ServerName) |
| * @param procedure the procedure trying to acquire the lock |
| * @param serverName Server to lock |
| * @return true if we were able to acquire the lock on the server, otherwise false. |
| */ |
| public boolean tryAcquireServerExclusiveLock(final Procedure procedure, |
| final ServerName serverName) { |
| schedLock.lock(); |
| try { |
| ServerQueue queue = getServerQueue(serverName); |
| if (queue.tryExclusiveLock(procedure.getProcId())) { |
| removeFromRunQueue(serverRunQueue, queue); |
| return true; |
| } |
| } finally { |
| schedLock.unlock(); |
| } |
| return false; |
| } |
| |
| /** |
| * Release the exclusive lock |
| * @see #tryAcquireServerExclusiveLock(Procedure,ServerName) |
| * @param procedure the procedure releasing the lock |
| * @param serverName the server that has the exclusive lock |
| */ |
| public void releaseServerExclusiveLock(final Procedure procedure, |
| final ServerName serverName) { |
| schedLock.lock(); |
| try { |
| ServerQueue queue = getServerQueue(serverName); |
| queue.releaseExclusiveLock(); |
| addToRunQueue(serverRunQueue, queue); |
| } finally { |
| schedLock.unlock(); |
| } |
| } |
| |
| /** |
| * Try to acquire the shared lock on the specified server. |
| * @see #releaseServerSharedLock(Procedure,ServerName) |
| * @param procedure the procedure releasing the lock |
| * @param serverName Server to lock |
| * @return true if we were able to acquire the lock on the server, otherwise false. |
| */ |
| public boolean tryAcquireServerSharedLock(final Procedure procedure, |
| final ServerName serverName) { |
| return getServerQueueWithLock(serverName).trySharedLock(); |
| } |
| |
| /** |
| * Release the shared lock taken |
| * @see #tryAcquireServerSharedLock(Procedure,ServerName) |
| * @param procedure the procedure releasing the lock |
| * @param serverName the server that has the shared lock |
| */ |
| public void releaseServerSharedLock(final Procedure procedure, |
| final ServerName serverName) { |
| getServerQueueWithLock(serverName).releaseSharedLock(); |
| } |
| |
| // ============================================================================ |
| // Generic Helpers |
| // ============================================================================ |
| private static interface QueueInterface { |
| boolean isAvailable(); |
| boolean isEmpty(); |
| int size(); |
| |
| void add(Procedure proc, boolean addFront); |
| boolean requireExclusiveLock(Procedure proc); |
| Procedure peek(); |
| Procedure poll(); |
| |
| boolean isSuspended(); |
| } |
| |
| private static abstract class Queue<TKey extends Comparable<TKey>> implements QueueInterface { |
| private Queue<TKey> avlRight = null; |
| private Queue<TKey> avlLeft = null; |
| private int avlHeight = 1; |
| |
| private Queue<TKey> iterNext = null; |
| private Queue<TKey> iterPrev = null; |
| private boolean suspended = false; |
| |
| private long exclusiveLockProcIdOwner = Long.MIN_VALUE; |
| private int sharedLock = 0; |
| |
| private final TKey key; |
| private final int priority; |
| |
| public Queue(TKey key) { |
| this(key, 1); |
| } |
| |
| public Queue(TKey key, int priority) { |
| this.key = key; |
| this.priority = priority; |
| } |
| |
| protected TKey getKey() { |
| return key; |
| } |
| |
| protected int getPriority() { |
| return priority; |
| } |
| |
| /** |
| * True if the queue is not in the run-queue and it is owned by an event. |
| */ |
| public boolean isSuspended() { |
| return suspended; |
| } |
| |
| protected boolean setSuspended(boolean isSuspended) { |
| if (this.suspended == isSuspended) return false; |
| this.suspended = isSuspended; |
| return true; |
| } |
| |
| // ====================================================================== |
| // Read/Write Locking helpers |
| // ====================================================================== |
| public synchronized boolean isLocked() { |
| return hasExclusiveLock() || sharedLock > 0; |
| } |
| |
| public synchronized boolean hasExclusiveLock() { |
| return this.exclusiveLockProcIdOwner != Long.MIN_VALUE; |
| } |
| |
| public synchronized boolean trySharedLock() { |
| if (hasExclusiveLock()) return false; |
| sharedLock++; |
| return true; |
| } |
| |
| public synchronized void releaseSharedLock() { |
| sharedLock--; |
| } |
| |
| protected synchronized boolean isSingleSharedLock() { |
| return sharedLock == 1; |
| } |
| |
| public synchronized boolean isLockOwner(long procId) { |
| return exclusiveLockProcIdOwner == procId; |
| } |
| |
| public synchronized boolean tryExclusiveLock(long procIdOwner) { |
| assert procIdOwner != Long.MIN_VALUE; |
| if (isLocked()) return false; |
| exclusiveLockProcIdOwner = procIdOwner; |
| return true; |
| } |
| |
| public synchronized void releaseExclusiveLock() { |
| exclusiveLockProcIdOwner = Long.MIN_VALUE; |
| } |
| |
| // This should go away when we have the new AM and its events |
| // and we move xlock to the lock-event-queue. |
| public synchronized boolean isAvailable() { |
| return !hasExclusiveLock() && !isEmpty(); |
| } |
| |
| // ====================================================================== |
| // Generic Helpers |
| // ====================================================================== |
| public int compareKey(TKey cmpKey) { |
| return key.compareTo(cmpKey); |
| } |
| |
| public int compareTo(Queue<TKey> other) { |
| return compareKey(other.key); |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s(%s)", getClass().getSimpleName(), key); |
| } |
| } |
| |
| // ====================================================================== |
| // Helper Data Structures |
| // ====================================================================== |
| private static abstract class QueueImpl<TKey extends Comparable<TKey>> extends Queue<TKey> { |
| private final ArrayDeque<Procedure> runnables = new ArrayDeque<Procedure>(); |
| |
| public QueueImpl(TKey key) { |
| super(key); |
| } |
| |
| public QueueImpl(TKey key, int priority) { |
| super(key, priority); |
| } |
| |
| public void add(final Procedure proc, final boolean addToFront) { |
| if (addToFront) { |
| addFront(proc); |
| } else { |
| addBack(proc); |
| } |
| } |
| |
| protected void addFront(final Procedure proc) { |
| runnables.addFirst(proc); |
| } |
| |
| protected void addBack(final Procedure proc) { |
| runnables.addLast(proc); |
| } |
| |
| public Procedure peek() { |
| return runnables.peek(); |
| } |
| |
| @Override |
| public Procedure poll() { |
| return runnables.poll(); |
| } |
| |
| @Override |
| public boolean isEmpty() { |
| return runnables.isEmpty(); |
| } |
| |
| public int size() { |
| return runnables.size(); |
| } |
| } |
| |
| private static class FairQueue<T extends Comparable<T>> { |
| private final int quantum; |
| |
| private Queue<T> currentQueue = null; |
| private Queue<T> queueHead = null; |
| private int currentQuantum = 0; |
| |
| public FairQueue() { |
| this(1); |
| } |
| |
| public FairQueue(int quantum) { |
| this.quantum = quantum; |
| } |
| |
| public void add(Queue<T> queue) { |
| queueHead = IterableList.append(queueHead, queue); |
| if (currentQueue == null) setNextQueue(queueHead); |
| } |
| |
| public void remove(Queue<T> queue) { |
| Queue<T> nextQueue = queue.iterNext; |
| queueHead = IterableList.remove(queueHead, queue); |
| if (currentQueue == queue) { |
| setNextQueue(queueHead != null ? nextQueue : null); |
| } |
| } |
| |
| public Queue<T> poll() { |
| if (currentQuantum == 0) { |
| if (!nextQueue()) { |
| return null; // nothing here |
| } |
| currentQuantum = calculateQuantum(currentQueue) - 1; |
| } else { |
| currentQuantum--; |
| } |
| |
| // This should go away when we have the new AM and its events |
| if (!currentQueue.isAvailable()) { |
| Queue<T> lastQueue = currentQueue; |
| do { |
| if (!nextQueue()) |
| return null; |
| } while (currentQueue != lastQueue && !currentQueue.isAvailable()); |
| |
| currentQuantum = calculateQuantum(currentQueue) - 1; |
| } |
| return currentQueue; |
| } |
| |
| private boolean nextQueue() { |
| if (currentQueue == null) return false; |
| currentQueue = currentQueue.iterNext; |
| return currentQueue != null; |
| } |
| |
| private void setNextQueue(Queue<T> queue) { |
| currentQueue = queue; |
| if (queue != null) { |
| currentQuantum = calculateQuantum(currentQueue); |
| } else { |
| currentQuantum = 0; |
| } |
| } |
| |
| private int calculateQuantum(final Queue queue) { |
| return Math.max(1, queue.getPriority() * quantum); // TODO |
| } |
| } |
| |
| private static class AvlTree { |
| public static <T extends Comparable<T>> Queue<T> get(Queue<T> root, T key) { |
| while (root != null) { |
| int cmp = root.compareKey(key); |
| if (cmp > 0) { |
| root = root.avlLeft; |
| } else if (cmp < 0) { |
| root = root.avlRight; |
| } else { |
| return root; |
| } |
| } |
| return null; |
| } |
| |
| public static <T extends Comparable<T>> Queue<T> getFirst(Queue<T> root) { |
| if (root != null) { |
| while (root.avlLeft != null) { |
| root = root.avlLeft; |
| } |
| } |
| return root; |
| } |
| |
| public static <T extends Comparable<T>> Queue<T> getLast(Queue<T> root) { |
| if (root != null) { |
| while (root.avlRight != null) { |
| root = root.avlRight; |
| } |
| } |
| return root; |
| } |
| |
| public static <T extends Comparable<T>> Queue<T> insert(Queue<T> root, Queue<T> node) { |
| if (root == null) return node; |
| if (node.compareTo(root) < 0) { |
| root.avlLeft = insert(root.avlLeft, node); |
| } else { |
| root.avlRight = insert(root.avlRight, node); |
| } |
| return balance(root); |
| } |
| |
| private static <T extends Comparable<T>> Queue<T> removeMin(Queue<T> p) { |
| if (p.avlLeft == null) |
| return p.avlRight; |
| p.avlLeft = removeMin(p.avlLeft); |
| return balance(p); |
| } |
| |
| public static <T extends Comparable<T>> Queue<T> remove(Queue<T> root, T key) { |
| if (root == null) return null; |
| |
| int cmp = root.compareKey(key); |
| if (cmp == 0) { |
| Queue<T> q = root.avlLeft; |
| Queue<T> r = root.avlRight; |
| if (r == null) return q; |
| Queue<T> min = getFirst(r); |
| min.avlRight = removeMin(r); |
| min.avlLeft = q; |
| return balance(min); |
| } else if (cmp > 0) { |
| root.avlLeft = remove(root.avlLeft, key); |
| } else /* if (cmp < 0) */ { |
| root.avlRight = remove(root.avlRight, key); |
| } |
| return balance(root); |
| } |
| |
| private static <T extends Comparable<T>> Queue<T> balance(Queue<T> p) { |
| fixHeight(p); |
| int balance = balanceFactor(p); |
| if (balance == 2) { |
| if (balanceFactor(p.avlRight) < 0) { |
| p.avlRight = rotateRight(p.avlRight); |
| } |
| return rotateLeft(p); |
| } else if (balance == -2) { |
| if (balanceFactor(p.avlLeft) > 0) { |
| p.avlLeft = rotateLeft(p.avlLeft); |
| } |
| return rotateRight(p); |
| } |
| return p; |
| } |
| |
| private static <T extends Comparable<T>> Queue<T> rotateRight(Queue<T> p) { |
| Queue<T> q = p.avlLeft; |
| p.avlLeft = q.avlRight; |
| q.avlRight = p; |
| fixHeight(p); |
| fixHeight(q); |
| return q; |
| } |
| |
| private static <T extends Comparable<T>> Queue<T> rotateLeft(Queue<T> q) { |
| Queue<T> p = q.avlRight; |
| q.avlRight = p.avlLeft; |
| p.avlLeft = q; |
| fixHeight(q); |
| fixHeight(p); |
| return p; |
| } |
| |
| private static <T extends Comparable<T>> void fixHeight(Queue<T> node) { |
| int heightLeft = height(node.avlLeft); |
| int heightRight = height(node.avlRight); |
| node.avlHeight = 1 + Math.max(heightLeft, heightRight); |
| } |
| |
| private static <T extends Comparable<T>> int height(Queue<T> node) { |
| return node != null ? node.avlHeight : 0; |
| } |
| |
| private static <T extends Comparable<T>> int balanceFactor(Queue<T> node) { |
| return height(node.avlRight) - height(node.avlLeft); |
| } |
| } |
| |
| private static class IterableList { |
| public static <T extends Comparable<T>> Queue<T> prepend(Queue<T> head, Queue<T> node) { |
| assert !isLinked(node) : node + " is already linked"; |
| if (head != null) { |
| Queue<T> tail = head.iterPrev; |
| tail.iterNext = node; |
| head.iterPrev = node; |
| node.iterNext = head; |
| node.iterPrev = tail; |
| } else { |
| node.iterNext = node; |
| node.iterPrev = node; |
| } |
| return node; |
| } |
| |
| public static <T extends Comparable<T>> Queue<T> append(Queue<T> head, Queue<T> node) { |
| assert !isLinked(node) : node + " is already linked"; |
| if (head != null) { |
| Queue<T> tail = head.iterPrev; |
| tail.iterNext = node; |
| node.iterNext = head; |
| node.iterPrev = tail; |
| head.iterPrev = node; |
| return head; |
| } |
| node.iterNext = node; |
| node.iterPrev = node; |
| return node; |
| } |
| |
| public static <T extends Comparable<T>> Queue<T> appendList(Queue<T> head, Queue<T> otherHead) { |
| if (head == null) return otherHead; |
| if (otherHead == null) return head; |
| |
| Queue<T> tail = head.iterPrev; |
| Queue<T> otherTail = otherHead.iterPrev; |
| tail.iterNext = otherHead; |
| otherHead.iterPrev = tail; |
| otherTail.iterNext = head; |
| head.iterPrev = otherTail; |
| return head; |
| } |
| |
| private static <T extends Comparable<T>> Queue<T> remove(Queue<T> head, Queue<T> node) { |
| assert isLinked(node) : node + " is not linked"; |
| if (node != node.iterNext) { |
| node.iterPrev.iterNext = node.iterNext; |
| node.iterNext.iterPrev = node.iterPrev; |
| head = (head == node) ? node.iterNext : head; |
| } else { |
| head = null; |
| } |
| node.iterNext = null; |
| node.iterPrev = null; |
| return head; |
| } |
| |
| private static <T extends Comparable<T>> boolean isLinked(Queue<T> node) { |
| return node.iterPrev != null && node.iterNext != null; |
| } |
| } |
| } |