| /* |
| * Copyright 2009-2012 by The Regents of the University of California |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * you may obtain a copy of the License from |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package edu.uci.ics.asterix.transaction.management.service.locking; |
| |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| |
| import edu.uci.ics.asterix.transaction.management.exception.ACIDException; |
| import edu.uci.ics.asterix.transaction.management.service.logging.LogType; |
| import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil; |
| import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.DatasetId; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.ITransactionManager.TransactionState; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.JobId; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants.LockManagerConstants.LockMode; |
| import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem; |
| |
| /** |
| * An implementation of the ILockManager interface for the |
| * specific case of locking protocol with two lock modes: (S) and (X), |
| * where S lock mode is shown by 0, and X lock mode is shown by 1. |
| * |
| * @author pouria, kisskys |
| */ |
| |
| public class LockManager implements ILockManager { |
| |
| public static final boolean IS_DEBUG_MODE = false;//true |
| |
| public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true; |
| //Threshold must be greater than 1 and should be reasonably large enough not to escalate too soon. |
| public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000; |
| private static final int DO_ESCALATE = 0; |
| private static final int ESCALATED = 1; |
| private static final int DONOT_ESCALATE = 2; |
| |
| private TransactionSubsystem txnSubsystem; |
| |
| //all threads accessing to LockManager's tables such as jobHT and datasetResourceHT |
| //are serialized through LockTableLatch. All threads waiting the latch will be fairly served |
| //in FIFO manner when the latch is available. |
| private final ReadWriteLock lockTableLatch; |
| private final ReadWriteLock waiterLatch; |
| private HashMap<JobId, JobInfo> jobHT; |
| private HashMap<DatasetId, DatasetLockInfo> datasetResourceHT; |
| |
| private EntityLockInfoManager entityLockInfoManager; |
| private EntityInfoManager entityInfoManager; |
| private LockWaiterManager lockWaiterManager; |
| |
| private DeadlockDetector deadlockDetector; |
| private TimeOutDetector toutDetector; |
| private DatasetId tempDatasetIdObj; //temporary object to avoid object creation |
| |
| private int tryLockDatasetGranuleRevertOperation; |
| |
| private LockRequestTracker lockRequestTracker; //for debugging |
| private ConsecutiveWakeupContext consecutiveWakeupContext; |
| |
| //TODO |
| //This code should be taken care properly when there is a way to avoid doubling memory space for txnIds. |
| private LogicalLogLocator logicalLogLocator; |
| |
| public LockManager(TransactionSubsystem txnSubsystem) throws ACIDException { |
| this.txnSubsystem = txnSubsystem; |
| this.lockTableLatch = new ReentrantReadWriteLock(true); |
| this.waiterLatch = new ReentrantReadWriteLock(true); |
| this.jobHT = new HashMap<JobId, JobInfo>(); |
| this.datasetResourceHT = new HashMap<DatasetId, DatasetLockInfo>(); |
| this.entityInfoManager = new EntityInfoManager(); |
| this.lockWaiterManager = new LockWaiterManager(); |
| this.entityLockInfoManager = new EntityLockInfoManager(entityInfoManager, lockWaiterManager); |
| this.deadlockDetector = new DeadlockDetector(jobHT, datasetResourceHT, entityLockInfoManager, |
| entityInfoManager, lockWaiterManager); |
| this.toutDetector = new TimeOutDetector(this); |
| this.tempDatasetIdObj = new DatasetId(0); |
| this.consecutiveWakeupContext = new ConsecutiveWakeupContext(); |
| |
| this.logicalLogLocator = LogUtil.getDummyLogicalLogLocator(txnSubsystem.getLogManager()); |
| |
| if (IS_DEBUG_MODE) { |
| this.lockRequestTracker = new LockRequestTracker(); |
| } |
| } |
| |
| @Override |
| public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext) |
| throws ACIDException { |
| internalLock(datasetId, entityHashValue, lockMode, txnContext, false); |
| } |
| |
| private void internalLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext, |
| boolean isInstant) throws ACIDException { |
| |
| JobId jobId = txnContext.getJobId(); |
| int jId = jobId.getId(); //int-type jobId |
| int dId = datasetId.getId(); //int-type datasetId |
| int entityInfo; |
| int eLockInfo = -1; |
| DatasetLockInfo dLockInfo = null; |
| JobInfo jobInfo; |
| byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| boolean doEscalate = false; |
| |
| latchLockTable(); |
| validateJob(txnContext); |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Requested", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant && datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) { |
| int escalateStatus = needEscalateFromEntityToDataset(jobInfo, dId, lockMode); |
| switch (escalateStatus) { |
| case DO_ESCALATE: |
| entityHashValue = -1; |
| doEscalate = true; |
| break; |
| |
| case ESCALATED: |
| unlatchLockTable(); |
| return; |
| |
| default: |
| break; |
| } |
| } |
| } |
| |
| //#. if the datasetLockInfo doesn't exist in datasetResourceHT |
| if (dLockInfo == null || dLockInfo.isNoHolder()) { |
| if (dLockInfo == null) { |
| dLockInfo = new DatasetLockInfo(entityLockInfoManager, entityInfoManager, lockWaiterManager); |
| datasetResourceHT.put(new DatasetId(dId), dLockInfo); //datsetId obj should be created |
| } |
| entityInfo = entityInfoManager.allocate(jId, dId, entityHashValue, lockMode); |
| |
| //if dataset-granule lock |
| if (entityHashValue == -1) { //-1 stands for dataset-granule |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //add entityLockInfo |
| eLockInfo = entityLockInfoManager.allocate(); |
| dLockInfo.getEntityResourceHT().put(entityHashValue, eLockInfo); |
| entityInfoManager.increaseEntityLockCount(entityInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfo); |
| } |
| |
| if (jobInfo == null) { |
| jobInfo = new JobInfo(entityInfoManager, lockWaiterManager, txnContext); |
| jobHT.put(jobId, jobInfo); //jobId obj doesn't have to be created |
| } |
| jobInfo.addHoldingResource(entityInfo); |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant && datasetLockMode == LockMode.IS) { |
| jobInfo.increaseDatasetISLockCount(dId); |
| if (doEscalate) { |
| throw new IllegalStateException("ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to " |
| + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET); |
| } |
| } |
| } |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Granted", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| |
| unlatchLockTable(); |
| return; |
| } |
| |
| //#. the datasetLockInfo exists in datasetResourceHT. |
| //1. handle dataset-granule lock |
| entityInfo = lockDatasetGranule(datasetId, entityHashValue, lockMode, txnContext); |
| |
| //2. handle entity-granule lock |
| if (entityHashValue != -1) { |
| lockEntityGranule(datasetId, entityHashValue, lockMode, entityInfo, txnContext); |
| } |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant) { |
| if (doEscalate) { |
| //jobInfo must not be null. |
| assert jobInfo != null; |
| jobInfo.increaseDatasetISLockCount(dId); |
| //release pre-acquired locks |
| releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext); |
| } else if (datasetLockMode == LockMode.IS) { |
| if (jobInfo == null) { |
| jobInfo = jobHT.get(jobId); |
| //jobInfo must not be null; |
| assert jobInfo != null; |
| } |
| jobInfo.increaseDatasetISLockCount(dId); |
| } |
| } |
| } |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Granted", RequestType.LOCK, datasetId, entityHashValue, lockMode, txnContext, dLockInfo, |
| eLockInfo); |
| } |
| unlatchLockTable(); |
| return; |
| } |
| |
| private void releaseDatasetISLocks(JobInfo jobInfo, JobId jobId, DatasetId datasetId, TransactionContext txnContext) |
| throws ACIDException { |
| int entityInfo; |
| int prevEntityInfo; |
| int entityHashValue; |
| int did;//int-type dataset id |
| |
| //while traversing all holding resources, |
| //release IS locks on the escalated dataset and |
| //release S locks on the corresponding enttites |
| //by calling unlock() method. |
| entityInfo = jobInfo.getLastHoldingResource(); |
| while (entityInfo != -1) { |
| prevEntityInfo = entityInfoManager.getPrevJobResource(entityInfo); |
| |
| //release a lock only if the datset is the escalated dataset and |
| //the entityHashValue is not -1("not -1" means a non-dataset-level lock) |
| did = entityInfoManager.getDatasetId(entityInfo); |
| entityHashValue = entityInfoManager.getPKHashVal(entityInfo); |
| if (did == datasetId.getId() && entityHashValue != -1) { |
| this.unlock(datasetId, entityHashValue, txnContext); |
| } |
| |
| entityInfo = prevEntityInfo; |
| } |
| } |
| |
| private int needEscalateFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) { |
| //we currently allow upgrade only if the lockMode is S. |
| if (lockMode != LockMode.S) { |
| return DONOT_ESCALATE; |
| } |
| |
| int count = jobInfo.getDatasetISLockCount(datasetId); |
| if (count == ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) { |
| return DO_ESCALATE; |
| } else if (count > ESCALATE_TRHESHOLD_ENTITY_TO_DATASET) { |
| return ESCALATED; |
| } else { |
| return DONOT_ESCALATE; |
| } |
| } |
| |
| private void validateJob(TransactionContext txnContext) throws ACIDException { |
| if (txnContext.getTxnState() == TransactionState.ABORTED) { |
| unlatchLockTable(); |
| throw new ACIDException("" + txnContext.getJobId() + " is in ABORTED state."); |
| } else if (txnContext.getStatus() == TransactionContext.TIMED_OUT_STATUS) { |
| try { |
| requestAbort(txnContext); |
| } finally { |
| unlatchLockTable(); |
| } |
| } |
| } |
| |
| private int lockDatasetGranule(DatasetId datasetId, int entityHashValue, byte lockMode, |
| TransactionContext txnContext) throws ACIDException { |
| JobId jobId = txnContext.getJobId(); |
| int jId = jobId.getId(); //int-type jobId |
| int dId = datasetId.getId(); //int-type datasetId |
| int waiterObjId; |
| int entityInfo = -1; |
| DatasetLockInfo dLockInfo; |
| JobInfo jobInfo; |
| boolean isUpgrade = false; |
| int weakerModeLockCount; |
| int waiterCount = 0; |
| byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| |
| //check duplicated call |
| |
| //1. lock request causing duplicated upgrading requests from different threads in a same job |
| waiterObjId = dLockInfo.findUpgraderFromUpgraderList(jId, entityHashValue); |
| if (waiterObjId != -1) { |
| //make the caller wait on the same LockWaiter object |
| entityInfo = lockWaiterManager.getLockWaiter(waiterObjId).getEntityInfoSlot(); |
| waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, true, true, txnContext, jobInfo, waiterObjId); |
| |
| //Only for the first-get-up thread, the waiterCount will be more than 0 and |
| //the thread updates lock count on behalf of the all other waiting threads. |
| //Therefore, all the next-get-up threads will not update any lock count. |
| if (waiterCount > 0) { |
| //add ((the number of waiting upgrader) - 1) to entityInfo's dataset lock count and datasetLockInfo's lock count |
| //where -1 is for not counting the first upgrader's request since the lock count for the first upgrader's request |
| //is already counted. |
| weakerModeLockCount = entityInfoManager.getDatasetLockCount(entityInfo); |
| entityInfoManager.setDatasetLockMode(entityInfo, lockMode); |
| entityInfoManager.increaseDatasetLockCount(entityInfo, waiterCount - 1); |
| |
| if (entityHashValue == -1) { //dataset-granule lock |
| dLockInfo.increaseLockCount(LockMode.X, weakerModeLockCount + waiterCount - 1);//new lock mode |
| dLockInfo.decreaseLockCount(LockMode.S, weakerModeLockCount);//current lock mode |
| } else { |
| dLockInfo.increaseLockCount(LockMode.IX, weakerModeLockCount + waiterCount - 1); |
| dLockInfo.decreaseLockCount(LockMode.IS, weakerModeLockCount); |
| } |
| } |
| |
| return entityInfo; |
| } |
| |
| //2. lock request causing duplicated waiting requests from different threads in a same job |
| waiterObjId = dLockInfo.findWaiterFromWaiterList(jId, entityHashValue); |
| if (waiterObjId != -1) { |
| //make the caller wait on the same LockWaiter object |
| entityInfo = lockWaiterManager.getLockWaiter(waiterObjId).getEntityInfoSlot(); |
| waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, false, true, txnContext, jobInfo, waiterObjId); |
| |
| if (waiterCount > 0) { |
| entityInfoManager.increaseDatasetLockCount(entityInfo, waiterCount); |
| if (entityHashValue == -1) { |
| dLockInfo.increaseLockCount(datasetLockMode, waiterCount); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| dLockInfo.increaseLockCount(datasetLockMode, waiterCount); |
| //IS and IX holders are implicitly handled. |
| } |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| } |
| |
| return entityInfo; |
| } |
| |
| //3. lock request causing duplicated holding requests from different threads or a single thread in a same job |
| entityInfo = dLockInfo.findEntityInfoFromHolderList(jId, entityHashValue); |
| if (entityInfo == -1) { |
| |
| entityInfo = entityInfoManager.allocate(jId, dId, entityHashValue, lockMode); |
| if (jobInfo == null) { |
| jobInfo = new JobInfo(entityInfoManager, lockWaiterManager, txnContext); |
| jobHT.put(jobId, jobInfo); |
| } |
| |
| //wait if any upgrader exists or upgrading lock mode is not compatible |
| if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1 |
| || !dLockInfo.isCompatible(datasetLockMode)) { |
| |
| ///////////////////////////////////////////////////////////////////////////////////////////// |
| //[Notice] Mimicking SIX mode |
| //When the lock escalation from IS to S in dataset-level is allowed, the following case occurs |
| //DatasetLockInfo's SCount = 1 and the same job who carried out the escalation tries to insert, |
| //then the job should be able to insert without being blocked by itself. |
| //Our approach is to introduce SIX mode, but we don't have currently, |
| //so I simply mimicking SIX by allowing S and IX coexist in the dataset level |
| //only if their job id is identical for the requests. |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (datasetLockMode == LockMode.IX && dLockInfo.getSCount() == 1 |
| && jobInfo.isDatasetLockGranted(dId, LockMode.S)) { |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| //IX holders are implicitly handled without adding holder |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| return entityInfo; |
| } |
| } |
| /////////////////////////////////////////////////////////////////////////////////////////////// |
| |
| ///////////////////////////////////////////////////////////////////////////////////////////// |
| //[Notice] |
| //There has been no same caller as (jId, dId, entityHashValue) triplet. |
| //But there could be the same caller as (jId, dId) pair. |
| //For example, two requests (J1, D1, E1) and (J1, D1, E2) are considered as duplicated call in dataset-granule perspective. |
| //Therefore, the above duplicated call case is covered in the following code. |
| //find the same dataset-granule lock request, that is, (J1, D1) pair in the above example. |
| //if (jobInfo.isDatasetLockGranted(dId, datasetLockMode)) { |
| if (jobInfo.isDatasetLockGranted(dId, LockMode.IS)) { |
| if (dLockInfo.isCompatible(datasetLockMode)) { |
| //this is duplicated call |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| if (entityHashValue == -1) { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //IS and IX holders are implicitly handled. |
| } |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| |
| return entityInfo; |
| } else { |
| //considered as upgrader |
| waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, true, true, txnContext, jobInfo, -1); |
| if (waiterCount > 0) { |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| if (entityHashValue == -1) { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //IS and IX holders are implicitly handled. |
| } |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| } |
| return entityInfo; |
| } |
| } |
| ///////////////////////////////////////////////////////////////////////////////////////////// |
| |
| waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, false, true, txnContext, jobInfo, -1); |
| } else { |
| waiterCount = 1; |
| } |
| |
| if (waiterCount > 0) { |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| if (entityHashValue == -1) { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //IS and IX holders are implicitly handled. |
| } |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| } |
| } else { |
| isUpgrade = isLockUpgrade(entityInfoManager.getDatasetLockMode(entityInfo), lockMode); |
| if (isUpgrade) { //upgrade call |
| //wait if any upgrader exists or upgrading lock mode is not compatible |
| if (dLockInfo.getFirstUpgrader() != -1 || !dLockInfo.isUpgradeCompatible(datasetLockMode, entityInfo)) { |
| waiterCount = handleLockWaiter(dLockInfo, -1, entityInfo, true, true, txnContext, jobInfo, -1); |
| } else { |
| waiterCount = 1; |
| } |
| |
| if (waiterCount > 0) { |
| //add ((the number of waiting upgrader) - 1) to entityInfo's dataset lock count and datasetLockInfo's lock count |
| //where -1 is for not counting the first upgrader's request since the lock count for the first upgrader's request |
| //is already counted. |
| weakerModeLockCount = entityInfoManager.getDatasetLockCount(entityInfo); |
| entityInfoManager.setDatasetLockMode(entityInfo, lockMode); |
| entityInfoManager.increaseDatasetLockCount(entityInfo, waiterCount - 1); |
| |
| if (entityHashValue == -1) { //dataset-granule lock |
| dLockInfo.increaseLockCount(LockMode.X, weakerModeLockCount + waiterCount - 1);//new lock mode |
| dLockInfo.decreaseLockCount(LockMode.S, weakerModeLockCount);//current lock mode |
| } else { |
| dLockInfo.increaseLockCount(LockMode.IX, weakerModeLockCount + waiterCount - 1); |
| dLockInfo.decreaseLockCount(LockMode.IS, weakerModeLockCount); |
| } |
| } |
| } else { //duplicated call |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo); |
| |
| if (entityHashValue == -1) { //dataset-granule |
| dLockInfo.increaseLockCount(datasetLockMode); |
| } else { //entity-granule |
| datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| dLockInfo.increaseLockCount(datasetLockMode); |
| } |
| } |
| } |
| |
| return entityInfo; |
| } |
| |
| private void lockEntityGranule(DatasetId datasetId, int entityHashValue, byte lockMode, |
| int entityInfoFromDLockInfo, TransactionContext txnContext) throws ACIDException { |
| JobId jobId = txnContext.getJobId(); |
| int jId = jobId.getId(); //int-type jobId |
| int waiterObjId; |
| int eLockInfo = -1; |
| int entityInfo; |
| DatasetLockInfo dLockInfo; |
| JobInfo jobInfo; |
| boolean isUpgrade = false; |
| int waiterCount = 0; |
| int weakerModeLockCount; |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue); |
| |
| if (eLockInfo != -1) { |
| //check duplicated call |
| |
| //1. lock request causing duplicated upgrading requests from different threads in a same job |
| waiterObjId = entityLockInfoManager.findUpgraderFromUpgraderList(eLockInfo, jId, entityHashValue); |
| if (waiterObjId != -1) { |
| entityInfo = lockWaiterManager.getLockWaiter(waiterObjId).getEntityInfoSlot(); |
| waiterCount = handleLockWaiter(dLockInfo, eLockInfo, -1, true, false, txnContext, jobInfo, waiterObjId); |
| |
| if (waiterCount > 0) { |
| weakerModeLockCount = entityInfoManager.getEntityLockCount(entityInfo); |
| entityInfoManager.setEntityLockMode(entityInfo, LockMode.X); |
| entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount - 1); |
| |
| entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X, (short) (weakerModeLockCount |
| + waiterCount - 1));//new lock mode |
| entityLockInfoManager.decreaseLockCount(eLockInfo, LockMode.S, (short) weakerModeLockCount);//old lock mode |
| } |
| return; |
| } |
| |
| //2. lock request causing duplicated waiting requests from different threads in a same job |
| waiterObjId = entityLockInfoManager.findWaiterFromWaiterList(eLockInfo, jId, entityHashValue); |
| if (waiterObjId != -1) { |
| entityInfo = lockWaiterManager.getLockWaiter(waiterObjId).getEntityInfoSlot(); |
| waiterCount = handleLockWaiter(dLockInfo, eLockInfo, -1, false, false, txnContext, jobInfo, waiterObjId); |
| |
| if (waiterCount > 0) { |
| entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode, (short) waiterCount); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfo); |
| } |
| return; |
| } |
| |
| //3. lock request causing duplicated holding requests from different threads or a single thread in a same job |
| entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jId, entityHashValue); |
| if (entityInfo != -1) {//duplicated call or upgrader |
| |
| isUpgrade = isLockUpgrade(entityInfoManager.getEntityLockMode(entityInfo), lockMode); |
| if (isUpgrade) {//upgrade call |
| //wait if any upgrader exists or upgrading lock mode is not compatible |
| if (entityLockInfoManager.getUpgrader(eLockInfo) != -1 |
| || !entityLockInfoManager.isUpgradeCompatible(eLockInfo, lockMode, entityInfo)) { |
| waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, true, false, txnContext, |
| jobInfo, -1); |
| } else { |
| waiterCount = 1; |
| } |
| |
| if (waiterCount > 0) { |
| weakerModeLockCount = entityInfoManager.getEntityLockCount(entityInfo); |
| entityInfoManager.setEntityLockMode(entityInfo, lockMode); |
| entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount - 1); |
| |
| entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X, (short) (weakerModeLockCount |
| + waiterCount - 1));//new lock mode |
| entityLockInfoManager.decreaseLockCount(eLockInfo, LockMode.S, (short) weakerModeLockCount);//old lock mode |
| } |
| |
| } else {//duplicated call |
| entityInfoManager.increaseEntityLockCount(entityInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, entityInfoManager.getEntityLockMode(entityInfo)); |
| } |
| } else {//new call from this job, but still eLockInfo exists since other threads hold it or wait on it |
| entityInfo = entityInfoFromDLockInfo; |
| if (entityLockInfoManager.getUpgrader(eLockInfo) != -1 |
| || entityLockInfoManager.getFirstWaiter(eLockInfo) != -1 |
| || !entityLockInfoManager.isCompatible(eLockInfo, lockMode)) { |
| waiterCount = handleLockWaiter(dLockInfo, eLockInfo, entityInfo, false, false, txnContext, jobInfo, |
| -1); |
| } else { |
| waiterCount = 1; |
| } |
| |
| if (waiterCount > 0) { |
| entityInfoManager.increaseEntityLockCount(entityInfo, waiterCount); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode, (short) waiterCount); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfo); |
| } |
| } |
| } else {//eLockInfo doesn't exist, so this lock request is the first request and can be granted without waiting. |
| eLockInfo = entityLockInfoManager.allocate(); |
| dLockInfo.getEntityResourceHT().put(entityHashValue, eLockInfo); |
| entityInfoManager.increaseEntityLockCount(entityInfoFromDLockInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfoFromDLockInfo); |
| } |
| } |
| |
| @Override |
| public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException { |
| internalUnlock(datasetId, entityHashValue, txnContext, false, false); |
| } |
| |
| @Override |
| public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag) |
| throws ACIDException { |
| internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag); |
| } |
| |
| private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException { |
| internalUnlock(datasetId, entityHashValue, txnContext, true, false); |
| } |
| |
| private void internalUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, |
| boolean isInstant, boolean commitFlag) throws ACIDException { |
| JobId jobId = txnContext.getJobId(); |
| int eLockInfo = -1; |
| DatasetLockInfo dLockInfo = null; |
| JobInfo jobInfo; |
| int entityInfo = -1; |
| byte datasetLockMode; |
| |
| if (IS_DEBUG_MODE) { |
| if (entityHashValue == -1) { |
| throw new UnsupportedOperationException( |
| "Unsupported unlock request: dataset-granule unlock is not supported"); |
| } |
| } |
| |
| latchLockTable(); |
| validateJob(txnContext); |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Requested", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| |
| //find the resource to be unlocked |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| if (dLockInfo == null || jobInfo == null) { |
| unlatchLockTable(); |
| throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist."); |
| } |
| |
| eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue); |
| |
| if (eLockInfo == -1) { |
| unlatchLockTable(); |
| throw new IllegalStateException("Invalid unlock request: Corresponding lock info doesn't exist."); |
| } |
| |
| //find the corresponding entityInfo |
| entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jobId.getId(), entityHashValue); |
| if (entityInfo == -1) { |
| unlatchLockTable(); |
| throw new IllegalStateException("Invalid unlock request[" + jobId.getId() + "," + datasetId.getId() + "," |
| + entityHashValue + "]: Corresponding lock info doesn't exist."); |
| } |
| |
| datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo) == LockMode.S ? LockMode.IS : LockMode.IX; |
| |
| //decrease the corresponding count of dLockInfo/eLockInfo/entityInfo |
| dLockInfo.decreaseLockCount(datasetLockMode); |
| entityLockInfoManager.decreaseLockCount(eLockInfo, entityInfoManager.getEntityLockMode(entityInfo)); |
| entityInfoManager.decreaseDatasetLockCount(entityInfo); |
| entityInfoManager.decreaseEntityLockCount(entityInfo); |
| |
| if (entityInfoManager.getEntityLockCount(entityInfo) == 0 |
| && entityInfoManager.getDatasetLockCount(entityInfo) == 0) { |
| int threadCount = 0; //number of threads(in the same job) waiting on the same resource |
| int waiterObjId = jobInfo.getFirstWaitingResource(); |
| int waitingEntityInfo; |
| LockWaiter waiterObj; |
| |
| //TODO |
| //This code should be taken care properly when there is a way to avoid doubling memory space for txnIds. |
| //This commit log is written here in order to avoid increasing the memory space for managing transactionIds |
| if (commitFlag) { |
| if (txnContext.getTransactionType().equals(TransactionContext.TransactionType.READ_WRITE)) { |
| try { |
| txnSubsystem.getLogManager().log(LogType.ENTITY_COMMIT, txnContext, datasetId.getId(), |
| entityHashValue, -1, (byte) 0, 0, null, null, logicalLogLocator); |
| } catch (ACIDException e) { |
| try { |
| requestAbort(txnContext); |
| } finally { |
| unlatchLockTable(); |
| } |
| } |
| } |
| |
| txnContext.updateLastLSNForIndexes(logicalLogLocator.getLsn()); |
| } |
| |
| //1) wake up waiters and remove holder |
| //wake up waiters of dataset-granule lock |
| wakeUpDatasetLockWaiters(dLockInfo); |
| //wake up waiters of entity-granule lock |
| wakeUpEntityLockWaiters(eLockInfo); |
| //remove the holder from eLockInfo's holder list and remove the holding resource from jobInfo's holding resource list |
| //this can be done in the following single function call. |
| entityLockInfoManager.removeHolder(eLockInfo, entityInfo, jobInfo); |
| |
| //2) if |
| // there is no waiting thread on the same resource (this can be checked through jobInfo) |
| // then |
| // a) delete the corresponding entityInfo |
| // b) write commit log for the unlocked resource(which is a committed txn). |
| while (waiterObjId != -1) { |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| waitingEntityInfo = waiterObj.getEntityInfoSlot(); |
| if (entityInfoManager.getDatasetId(waitingEntityInfo) == datasetId.getId() |
| && entityInfoManager.getPKHashVal(waitingEntityInfo) == entityHashValue) { |
| threadCount++; |
| break; |
| } |
| waiterObjId = waiterObj.getNextWaiterObjId(); |
| } |
| if (threadCount == 0) { |
| if (entityInfoManager.getEntityLockMode(entityInfo) == LockMode.X) { |
| //TODO |
| //write a commit log for the unlocked resource |
| //need to figure out that instantLock() also needs to write a commit log. |
| } |
| entityInfoManager.deallocate(entityInfo); |
| } |
| } |
| |
| //deallocate entityLockInfo's slot if there is no txn referring to the entityLockInfo. |
| if (entityLockInfoManager.getFirstWaiter(eLockInfo) == -1 |
| && entityLockInfoManager.getLastHolder(eLockInfo) == -1 |
| && entityLockInfoManager.getUpgrader(eLockInfo) == -1) { |
| dLockInfo.getEntityResourceHT().remove(entityHashValue); |
| entityLockInfoManager.deallocate(eLockInfo); |
| } |
| |
| //we don't deallocate datasetLockInfo even if there is no txn referring to the datasetLockInfo |
| //since the datasetLockInfo is likely to be referred to again. |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant && datasetLockMode == LockMode.IS) { |
| jobInfo.decreaseDatasetISLockCount(datasetId.getId()); |
| } |
| } |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Granted", RequestType.UNLOCK, datasetId, entityHashValue, (byte) 0, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| unlatchLockTable(); |
| } |
| |
| @Override |
| public void releaseLocks(TransactionContext txnContext) throws ACIDException { |
| LockWaiter waiterObj; |
| int entityInfo; |
| int prevEntityInfo; |
| int entityHashValue; |
| DatasetLockInfo dLockInfo = null; |
| int eLockInfo = -1; |
| int did;//int-type dataset id |
| int datasetLockCount; |
| int entityLockCount; |
| byte lockMode; |
| boolean existWaiter = false; |
| |
| JobId jobId = txnContext.getJobId(); |
| |
| latchLockTable(); |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Requested", RequestType.RELEASE_LOCKS, new DatasetId(0), 0, (byte) 0, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| |
| JobInfo jobInfo = jobHT.get(jobId); |
| if (jobInfo == null) { |
| unlatchLockTable(); |
| return; |
| } |
| |
| //remove waiterObj of JobInfo |
| //[Notice] |
| //waiterObjs may exist if aborted thread is the caller of this function. |
| //Even if there are the waiterObjs, there is no waiting thread on the objects. |
| //If the caller of this function is an aborted thread, it is guaranteed that there is no waiting threads |
| //on the waiterObjs since when the aborted caller thread is waken up, all other waiting threads are |
| //also waken up at the same time through 'notifyAll()' call. |
| //In contrast, if the caller of this function is not an aborted thread, then there is no waiting object. |
| int waiterObjId = jobInfo.getFirstWaitingResource(); |
| int nextWaiterObjId; |
| while (waiterObjId != -1) { |
| existWaiter = true; |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| nextWaiterObjId = waiterObj.getNextWaitingResourceObjId(); |
| entityInfo = waiterObj.getEntityInfoSlot(); |
| if (IS_DEBUG_MODE) { |
| if (jobId.getId() != entityInfoManager.getJobId(entityInfo)) { |
| throw new IllegalStateException("JobInfo(" + jobId + ") has diffrent Job(JID:" |
| + entityInfoManager.getJobId(entityInfo) + "'s lock request!!!"); |
| } |
| } |
| |
| //1. remove from waiter(or upgrader)'s list of dLockInfo or eLockInfo. |
| did = entityInfoManager.getDatasetId(entityInfo); |
| tempDatasetIdObj.setId(did); |
| dLockInfo = datasetResourceHT.get(tempDatasetIdObj); |
| |
| if (waiterObj.isWaitingOnEntityLock()) { |
| entityHashValue = entityInfoManager.getPKHashVal(entityInfo); |
| eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue); |
| if (waiterObj.isWaiter()) { |
| entityLockInfoManager.removeWaiter(eLockInfo, waiterObjId); |
| } else { |
| entityLockInfoManager.removeUpgrader(eLockInfo, waiterObjId); |
| } |
| } else { |
| if (waiterObj.isWaiter()) { |
| dLockInfo.removeWaiter(waiterObjId); |
| } else { |
| dLockInfo.removeUpgrader(waiterObjId); |
| } |
| } |
| |
| //2. wake-up waiters |
| latchWaitNotify(); |
| synchronized (waiterObj) { |
| unlatchWaitNotify(); |
| waiterObj.setWait(false); |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\twake-up(D): WID(" + waiterObjId |
| + "),EID(" + waiterObj.getEntityInfoSlot() + ")"); |
| } |
| waiterObj.notifyAll(); |
| } |
| |
| //3. deallocate waiterObj |
| lockWaiterManager.deallocate(waiterObjId); |
| |
| //4. deallocate entityInfo only if this waiter is not an upgrader |
| if (entityInfoManager.getDatasetLockCount(entityInfo) == 0 |
| && entityInfoManager.getEntityLockCount(entityInfo) == 0) { |
| entityInfoManager.deallocate(entityInfo); |
| } |
| waiterObjId = nextWaiterObjId; |
| } |
| |
| //release holding resources |
| entityInfo = jobInfo.getLastHoldingResource(); |
| while (entityInfo != -1) { |
| prevEntityInfo = entityInfoManager.getPrevJobResource(entityInfo); |
| |
| //decrease lock count of datasetLock and entityLock |
| did = entityInfoManager.getDatasetId(entityInfo); |
| tempDatasetIdObj.setId(did); |
| dLockInfo = datasetResourceHT.get(tempDatasetIdObj); |
| entityHashValue = entityInfoManager.getPKHashVal(entityInfo); |
| |
| if (entityHashValue == -1) { |
| //decrease datasetLockCount |
| lockMode = entityInfoManager.getDatasetLockMode(entityInfo); |
| datasetLockCount = entityInfoManager.getDatasetLockCount(entityInfo); |
| if (datasetLockCount != 0) { |
| dLockInfo.decreaseLockCount(lockMode, datasetLockCount); |
| |
| //wakeup waiters of datasetLock and remove holder from datasetLockInfo |
| wakeUpDatasetLockWaiters(dLockInfo); |
| |
| //remove the holder from datasetLockInfo only if the lock is dataset-granule lock. |
| //--> this also removes the holding resource from jobInfo |
| //(Because the IX and IS lock's holders are handled implicitly, |
| //those are not in the holder list of datasetLockInfo.) |
| dLockInfo.removeHolder(entityInfo, jobInfo); |
| } |
| } else { |
| //decrease datasetLockCount |
| lockMode = entityInfoManager.getDatasetLockMode(entityInfo); |
| lockMode = lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| datasetLockCount = entityInfoManager.getDatasetLockCount(entityInfo); |
| |
| if (datasetLockCount != 0) { |
| dLockInfo.decreaseLockCount(lockMode, datasetLockCount); |
| } |
| |
| //decrease entityLockCount |
| lockMode = entityInfoManager.getEntityLockMode(entityInfo); |
| entityLockCount = entityInfoManager.getEntityLockCount(entityInfo); |
| eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue); |
| if (IS_DEBUG_MODE) { |
| if (eLockInfo < 0) { |
| System.out.println("eLockInfo:" + eLockInfo); |
| } |
| } |
| |
| if (entityLockCount != 0) { |
| entityLockInfoManager.decreaseLockCount(eLockInfo, lockMode, (short) entityLockCount); |
| } |
| |
| if (datasetLockCount != 0) { |
| //wakeup waiters of datasetLock and don't remove holder from datasetLockInfo |
| wakeUpDatasetLockWaiters(dLockInfo); |
| } |
| |
| if (entityLockCount != 0) { |
| //wakeup waiters of entityLock |
| wakeUpEntityLockWaiters(eLockInfo); |
| |
| //remove the holder from entityLockInfo |
| //--> this also removes the holding resource from jobInfo |
| entityLockInfoManager.removeHolder(eLockInfo, entityInfo, jobInfo); |
| } |
| |
| //deallocate entityLockInfo if there is no holder and waiter. |
| if (entityLockInfoManager.getLastHolder(eLockInfo) == -1 |
| && entityLockInfoManager.getFirstWaiter(eLockInfo) == -1 |
| && entityLockInfoManager.getUpgrader(eLockInfo) == -1) { |
| dLockInfo.getEntityResourceHT().remove(entityHashValue); |
| entityLockInfoManager.deallocate(eLockInfo); |
| // if (IS_DEBUG_MODE) { |
| // System.out.println("removed PK["+entityHashValue+"]"); |
| // } |
| } |
| } |
| |
| //deallocate entityInfo |
| entityInfoManager.deallocate(entityInfo); |
| // if (IS_DEBUG_MODE) { |
| // System.out.println("dellocate EntityInfo["+entityInfo+"]"); |
| // } |
| |
| entityInfo = prevEntityInfo; |
| } |
| |
| //remove JobInfo |
| jobHT.remove(jobId); |
| |
| if (existWaiter) { |
| txnContext.setStatus(TransactionContext.TIMED_OUT_STATUS); |
| txnContext.setTxnState(TransactionState.ABORTED); |
| } |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Granted", RequestType.RELEASE_LOCKS, new DatasetId(0), 0, (byte) 0, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| unlatchLockTable(); |
| } |
| |
| @Override |
| public void instantLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext) |
| throws ACIDException { |
| |
| // try { |
| // internalLock(datasetId, entityHashValue, lockMode, txnContext); |
| // return; |
| // } finally { |
| // unlock(datasetId, entityHashValue, txnContext); |
| // } |
| internalLock(datasetId, entityHashValue, lockMode, txnContext, true); |
| instantUnlock(datasetId, entityHashValue, txnContext); |
| } |
| |
| @Override |
| public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext) |
| throws ACIDException { |
| return internalTryLock(datasetId, entityHashValue, lockMode, txnContext, false); |
| } |
| |
| @Override |
| public boolean instantTryLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext) |
| throws ACIDException { |
| boolean isGranted = false; |
| // try { |
| // isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext); |
| // return isGranted; |
| // } finally { |
| // if (isGranted) { |
| // unlock(datasetId, entityHashValue, txnContext); |
| // } |
| // } |
| isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext, true); |
| if (isGranted) { |
| instantUnlock(datasetId, entityHashValue, txnContext); |
| } |
| return isGranted; |
| } |
| |
| private boolean internalTryLock(DatasetId datasetId, int entityHashValue, byte lockMode, |
| TransactionContext txnContext, boolean isInstant) throws ACIDException { |
| JobId jobId = txnContext.getJobId(); |
| int jId = jobId.getId(); //int-type jobId |
| int dId = datasetId.getId(); //int-type datasetId |
| int entityInfo; |
| int eLockInfo = -1; |
| DatasetLockInfo dLockInfo = null; |
| JobInfo jobInfo; |
| byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| boolean isSuccess = false; |
| boolean doEscalate = false; |
| |
| latchLockTable(); |
| validateJob(txnContext); |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Requested", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant && datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) { |
| int upgradeStatus = needEscalateFromEntityToDataset(jobInfo, dId, lockMode); |
| switch (upgradeStatus) { |
| case DO_ESCALATE: |
| entityHashValue = -1; |
| doEscalate = true; |
| break; |
| |
| case ESCALATED: |
| unlatchLockTable(); |
| return true; |
| |
| default: |
| break; |
| } |
| } |
| } |
| |
| //#. if the datasetLockInfo doesn't exist in datasetResourceHT |
| if (dLockInfo == null || dLockInfo.isNoHolder()) { |
| if (dLockInfo == null) { |
| dLockInfo = new DatasetLockInfo(entityLockInfoManager, entityInfoManager, lockWaiterManager); |
| datasetResourceHT.put(new DatasetId(dId), dLockInfo); //datsetId obj should be created |
| } |
| entityInfo = entityInfoManager.allocate(jId, dId, entityHashValue, lockMode); |
| |
| //if dataset-granule lock |
| if (entityHashValue == -1) { //-1 stands for dataset-granule |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //add entityLockInfo |
| eLockInfo = entityLockInfoManager.allocate(); |
| dLockInfo.getEntityResourceHT().put(entityHashValue, eLockInfo); |
| entityInfoManager.increaseEntityLockCount(entityInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfo); |
| } |
| |
| if (jobInfo == null) { |
| jobInfo = new JobInfo(entityInfoManager, lockWaiterManager, txnContext); |
| jobHT.put(jobId, jobInfo); //jobId obj doesn't have to be created |
| } |
| jobInfo.addHoldingResource(entityInfo); |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant && datasetLockMode == LockMode.IS) { |
| jobInfo.increaseDatasetISLockCount(dId); |
| if (doEscalate) { |
| //This exception is thrown when the threshold value is set to 1. |
| //We don't want to allow the lock escalation when there is a first lock request on a dataset. |
| throw new IllegalStateException("ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to " |
| + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET); |
| } |
| } |
| } |
| |
| if (IS_DEBUG_MODE) { |
| trackLockRequest("Granted", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| |
| unlatchLockTable(); |
| return true; |
| } |
| |
| //#. the datasetLockInfo exists in datasetResourceHT. |
| //1. handle dataset-granule lock |
| tryLockDatasetGranuleRevertOperation = 0; |
| entityInfo = tryLockDatasetGranule(datasetId, entityHashValue, lockMode, txnContext); |
| if (entityInfo == -2) {//-2 represents fail |
| isSuccess = false; |
| } else { |
| //2. handle entity-granule lock |
| if (entityHashValue != -1) { |
| isSuccess = tryLockEntityGranule(datasetId, entityHashValue, lockMode, entityInfo, txnContext); |
| if (!isSuccess) { |
| revertTryLockDatasetGranuleOperation(datasetId, entityHashValue, lockMode, entityInfo, txnContext); |
| } |
| } |
| } |
| |
| if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) { |
| if (!isInstant) { |
| if (doEscalate) { |
| //jobInfo must not be null. |
| assert jobInfo != null; |
| jobInfo.increaseDatasetISLockCount(dId); |
| //release pre-acquired locks |
| releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext); |
| } else if (datasetLockMode == LockMode.IS) { |
| if (jobInfo == null) { |
| jobInfo = jobHT.get(jobId); |
| //jobInfo must not be null; |
| assert jobInfo != null; |
| } |
| jobInfo.increaseDatasetISLockCount(dId); |
| } |
| } |
| } |
| |
| if (IS_DEBUG_MODE) { |
| if (isSuccess) { |
| trackLockRequest("Granted", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext, |
| dLockInfo, eLockInfo); |
| } else { |
| trackLockRequest("Failed", RequestType.TRY_LOCK, datasetId, entityHashValue, lockMode, txnContext, |
| dLockInfo, eLockInfo); |
| } |
| } |
| |
| unlatchLockTable(); |
| |
| return isSuccess; |
| } |
| |
| private void trackLockRequest(String msg, int requestType, DatasetId datasetIdObj, int entityHashValue, |
| byte lockMode, TransactionContext txnContext, DatasetLockInfo dLockInfo, int eLockInfo) { |
| StringBuilder s = new StringBuilder(); |
| LockRequest request = new LockRequest(Thread.currentThread().getName(), requestType, datasetIdObj, |
| entityHashValue, lockMode, txnContext); |
| s.append(Thread.currentThread().getId() + ":"); |
| s.append(msg); |
| if (msg.equals("Granted")) { |
| if (dLockInfo != null) { |
| s.append("\t|D| "); |
| s.append(dLockInfo.getIXCount()).append(","); |
| s.append(dLockInfo.getISCount()).append(","); |
| s.append(dLockInfo.getXCount()).append(","); |
| s.append(dLockInfo.getSCount()).append(","); |
| if (dLockInfo.getFirstUpgrader() != -1) { |
| s.append("+"); |
| } else { |
| s.append("-"); |
| } |
| s.append(","); |
| if (dLockInfo.getFirstWaiter() != -1) { |
| s.append("+"); |
| } else { |
| s.append("-"); |
| } |
| } |
| |
| if (eLockInfo != -1) { |
| s.append("\t|E| "); |
| s.append(entityLockInfoManager.getXCount(eLockInfo)).append(","); |
| s.append(entityLockInfoManager.getSCount(eLockInfo)).append(","); |
| if (entityLockInfoManager.getUpgrader(eLockInfo) != -1) { |
| s.append("+"); |
| } else { |
| s.append("-"); |
| } |
| s.append(","); |
| if (entityLockInfoManager.getFirstWaiter(eLockInfo) != -1) { |
| s.append("+"); |
| } else { |
| s.append("-"); |
| } |
| } |
| } |
| |
| lockRequestTracker.addEvent(s.toString(), request); |
| if (msg.equals("Requested")) { |
| lockRequestTracker.addRequest(request); |
| } |
| System.out.println(request.prettyPrint() + "--> " + s.toString()); |
| } |
| |
| public String getHistoryForAllJobs() { |
| if (IS_DEBUG_MODE) { |
| return lockRequestTracker.getHistoryForAllJobs(); |
| } |
| return null; |
| } |
| |
| public String getHistoryPerJob() { |
| if (IS_DEBUG_MODE) { |
| return lockRequestTracker.getHistoryPerJob(); |
| } |
| return null; |
| } |
| |
| public String getRequestHistoryForAllJobs() { |
| if (IS_DEBUG_MODE) { |
| return lockRequestTracker.getRequestHistoryForAllJobs(); |
| } |
| return null; |
| } |
| |
| private void revertTryLockDatasetGranuleOperation(DatasetId datasetId, int entityHashValue, byte lockMode, |
| int entityInfo, TransactionContext txnContext) { |
| JobId jobId = txnContext.getJobId(); |
| DatasetLockInfo dLockInfo; |
| JobInfo jobInfo; |
| int lockCount; |
| byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| |
| //see tryLockDatasetGranule() function to know the revert operation |
| switch (tryLockDatasetGranuleRevertOperation) { |
| |
| case 1://[revertOperation1]: reverting 'adding a holder' |
| |
| if (entityHashValue == -1) { |
| dLockInfo.decreaseLockCount(datasetLockMode); |
| dLockInfo.removeHolder(entityInfo, jobInfo); //--> this call removes entityInfo from JobInfo's holding-resource-list as well. |
| } else { |
| dLockInfo.decreaseLockCount(datasetLockMode); |
| jobInfo.removeHoldingResource(entityInfo); |
| } |
| entityInfoManager.decreaseDatasetLockCount(entityInfo); |
| if (jobInfo.getLastHoldingResource() == -1 && jobInfo.getFirstWaitingResource() == -1) { |
| jobHT.remove(jobId); |
| } |
| entityInfoManager.deallocate(entityInfo); |
| break; |
| |
| case 2://[revertOperation2]: reverting 'adding an upgrader' |
| lockCount = entityInfoManager.getDatasetLockCount(entityInfo); |
| if (entityHashValue == -1) { //dataset-granule lock |
| dLockInfo.decreaseLockCount(LockMode.X, lockCount); |
| dLockInfo.increaseLockCount(LockMode.S, lockCount); |
| } else { |
| dLockInfo.decreaseLockCount(LockMode.IX, lockCount); |
| dLockInfo.increaseLockCount(LockMode.IS, lockCount); |
| } |
| entityInfoManager.setDatasetLockMode(entityInfo, LockMode.S); |
| break; |
| |
| case 3://[revertOperation3]: reverting 'adding a duplicated call' |
| entityInfoManager.decreaseDatasetLockCount(entityInfo); |
| datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo); |
| if (entityHashValue == -1) { //dataset-granule |
| dLockInfo.decreaseLockCount(datasetLockMode); |
| } else { //entity-granule |
| datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| dLockInfo.decreaseLockCount(datasetLockMode); |
| } |
| |
| break; |
| default: |
| //do nothing; |
| } |
| } |
| |
| private int tryLockDatasetGranule(DatasetId datasetId, int entityHashValue, byte lockMode, |
| TransactionContext txnContext) throws ACIDException { |
| JobId jobId = txnContext.getJobId(); |
| int jId = jobId.getId(); //int-type jobId |
| int dId = datasetId.getId(); //int-type datasetId |
| int waiterObjId; |
| int entityInfo = -1; |
| DatasetLockInfo dLockInfo; |
| JobInfo jobInfo; |
| boolean isUpgrade = false; |
| int weakerModeLockCount; |
| byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| jobInfo = jobHT.get(jobId); |
| |
| //check duplicated call |
| |
| //1. lock request causing duplicated upgrading requests from different threads in a same job |
| waiterObjId = dLockInfo.findUpgraderFromUpgraderList(jId, entityHashValue); |
| if (waiterObjId != -1) { |
| return -2; |
| } |
| |
| //2. lock request causing duplicated waiting requests from different threads in a same job |
| waiterObjId = dLockInfo.findWaiterFromWaiterList(jId, entityHashValue); |
| if (waiterObjId != -1) { |
| return -2; |
| } |
| |
| //3. lock request causing duplicated holding requests from different threads or a single thread in a same job |
| entityInfo = dLockInfo.findEntityInfoFromHolderList(jId, entityHashValue); |
| if (entityInfo == -1) { //new call from this job -> doesn't mean that eLockInfo doesn't exist since another thread might have create the eLockInfo already. |
| |
| ////////////////////////////////////////////////////////////////////////////////////// |
| //[part of revertOperation1] |
| entityInfo = entityInfoManager.allocate(jId, dId, entityHashValue, lockMode); |
| if (jobInfo == null) { |
| jobInfo = new JobInfo(entityInfoManager, lockWaiterManager, txnContext); |
| jobHT.put(jobId, jobInfo); |
| } |
| ////////////////////////////////////////////////////////////////////////////////////// |
| |
| //return fail if any upgrader exists or upgrading lock mode is not compatible |
| if (dLockInfo.getFirstUpgrader() != -1 || dLockInfo.getFirstWaiter() != -1 |
| || !dLockInfo.isCompatible(datasetLockMode)) { |
| |
| //[Notice] |
| //There has been no same caller as (jId, dId, entityHashValue) triplet. |
| //But there could be the same caller as (jId, dId) pair. |
| //For example, two requests (J1, D1, E1) and (J1, D1, E2) are considered as duplicated call in dataset-granule perspective. |
| //Therefore, the above duplicated call case is covered in the following code. |
| //find the same dataset-granule lock request, that is, (J1, D1) pair in the above example. |
| if (jobInfo.isDatasetLockGranted(dId, LockMode.IS)) { |
| if (dLockInfo.isCompatible(datasetLockMode)) { |
| //this is duplicated call |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| if (entityHashValue == -1) { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //IS and IX holders are implicitly handled. |
| } |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| |
| tryLockDatasetGranuleRevertOperation = 1; |
| |
| return entityInfo; |
| } |
| } |
| |
| //revert [part of revertOperation1] before return |
| if (jobInfo.getLastHoldingResource() == -1 && jobInfo.getFirstWaitingResource() == -1) { |
| jobHT.remove(jobId); |
| } |
| entityInfoManager.deallocate(entityInfo); |
| |
| return -2; |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////////////// |
| //revert the following operations if the caller thread has to wait during this call. |
| //[revertOperation1] |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| if (entityHashValue == -1) { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| dLockInfo.addHolder(entityInfo); |
| } else { |
| dLockInfo.increaseLockCount(datasetLockMode); |
| //IS and IX holders are implicitly handled. |
| } |
| //add entityInfo to JobInfo's holding-resource list |
| jobInfo.addHoldingResource(entityInfo); |
| |
| //set revert operation to be reverted when tryLock() fails |
| tryLockDatasetGranuleRevertOperation = 1; |
| ////////////////////////////////////////////////////////////////////////////////////// |
| |
| } else { |
| isUpgrade = isLockUpgrade(entityInfoManager.getDatasetLockMode(entityInfo), lockMode); |
| if (isUpgrade) { //upgrade call |
| //return fail if any upgrader exists or upgrading lock mode is not compatible |
| if (dLockInfo.getFirstUpgrader() != -1 || !dLockInfo.isUpgradeCompatible(datasetLockMode, entityInfo)) { |
| return -2; |
| } |
| |
| //update entityInfo's dataset lock count and datasetLockInfo's lock count |
| weakerModeLockCount = entityInfoManager.getDatasetLockCount(entityInfo); |
| |
| ////////////////////////////////////////////////////////////////////////////////////// |
| //revert the following operations if the caller thread has to wait during this call. |
| //[revertOperation2] |
| entityInfoManager.setDatasetLockMode(entityInfo, lockMode); |
| |
| if (entityHashValue == -1) { //dataset-granule lock |
| dLockInfo.increaseLockCount(LockMode.X, weakerModeLockCount);//new lock mode |
| dLockInfo.decreaseLockCount(LockMode.S, weakerModeLockCount);//current lock mode |
| } else { |
| dLockInfo.increaseLockCount(LockMode.IX, weakerModeLockCount); |
| dLockInfo.decreaseLockCount(LockMode.IS, weakerModeLockCount); |
| } |
| tryLockDatasetGranuleRevertOperation = 2; |
| ////////////////////////////////////////////////////////////////////////////////////// |
| |
| } else { //duplicated call |
| |
| ////////////////////////////////////////////////////////////////////////////////////// |
| //revert the following operations if the caller thread has to wait during this call. |
| //[revertOperation3] |
| entityInfoManager.increaseDatasetLockCount(entityInfo); |
| datasetLockMode = entityInfoManager.getDatasetLockMode(entityInfo); |
| |
| if (entityHashValue == -1) { //dataset-granule |
| dLockInfo.increaseLockCount(datasetLockMode); |
| } else { //entity-granule |
| datasetLockMode = datasetLockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| dLockInfo.increaseLockCount(datasetLockMode); |
| } |
| |
| tryLockDatasetGranuleRevertOperation = 3; |
| ////////////////////////////////////////////////////////////////////////////////////// |
| |
| } |
| } |
| |
| return entityInfo; |
| } |
| |
| private boolean tryLockEntityGranule(DatasetId datasetId, int entityHashValue, byte lockMode, |
| int entityInfoFromDLockInfo, TransactionContext txnContext) throws ACIDException { |
| JobId jobId = txnContext.getJobId(); |
| int jId = jobId.getId(); //int-type jobId |
| int waiterObjId; |
| int eLockInfo = -1; |
| int entityInfo; |
| DatasetLockInfo dLockInfo; |
| boolean isUpgrade = false; |
| int weakerModeLockCount; |
| |
| dLockInfo = datasetResourceHT.get(datasetId); |
| eLockInfo = dLockInfo.getEntityResourceHT().get(entityHashValue); |
| |
| if (eLockInfo != -1) { |
| //check duplicated call |
| |
| //1. lock request causing duplicated upgrading requests from different threads in a same job |
| waiterObjId = entityLockInfoManager.findUpgraderFromUpgraderList(eLockInfo, jId, entityHashValue); |
| if (waiterObjId != -1) { |
| return false; |
| } |
| |
| //2. lock request causing duplicated waiting requests from different threads in a same job |
| waiterObjId = entityLockInfoManager.findWaiterFromWaiterList(eLockInfo, jId, entityHashValue); |
| if (waiterObjId != -1) { |
| return false; |
| } |
| |
| //3. lock request causing duplicated holding requests from different threads or a single thread in a same job |
| entityInfo = entityLockInfoManager.findEntityInfoFromHolderList(eLockInfo, jId, entityHashValue); |
| if (entityInfo != -1) {//duplicated call or upgrader |
| |
| isUpgrade = isLockUpgrade(entityInfoManager.getEntityLockMode(entityInfo), lockMode); |
| if (isUpgrade) {//upgrade call |
| //wait if any upgrader exists or upgrading lock mode is not compatible |
| if (entityLockInfoManager.getUpgrader(eLockInfo) != -1 |
| || !entityLockInfoManager.isUpgradeCompatible(eLockInfo, lockMode, entityInfo)) { |
| return false; |
| } |
| |
| weakerModeLockCount = entityInfoManager.getEntityLockCount(entityInfo); |
| entityInfoManager.setEntityLockMode(entityInfo, lockMode); |
| |
| entityLockInfoManager.increaseLockCount(eLockInfo, LockMode.X, (short) weakerModeLockCount);//new lock mode |
| entityLockInfoManager.decreaseLockCount(eLockInfo, LockMode.S, (short) weakerModeLockCount);//old lock mode |
| |
| } else {//duplicated call |
| entityInfoManager.increaseEntityLockCount(entityInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, entityInfoManager.getEntityLockMode(entityInfo)); |
| } |
| } else {//new call from this job, but still eLockInfo exists since other threads hold it or wait on it |
| entityInfo = entityInfoFromDLockInfo; |
| if (entityLockInfoManager.getUpgrader(eLockInfo) != -1 |
| || entityLockInfoManager.getFirstWaiter(eLockInfo) != -1 |
| || !entityLockInfoManager.isCompatible(eLockInfo, lockMode)) { |
| return false; |
| } |
| |
| entityInfoManager.increaseEntityLockCount(entityInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfo); |
| } |
| } else {//eLockInfo doesn't exist, so this lock request is the first request and can be granted without waiting. |
| eLockInfo = entityLockInfoManager.allocate(); |
| dLockInfo.getEntityResourceHT().put(entityHashValue, eLockInfo); |
| entityInfoManager.increaseEntityLockCount(entityInfoFromDLockInfo); |
| entityLockInfoManager.increaseLockCount(eLockInfo, lockMode); |
| entityLockInfoManager.addHolder(eLockInfo, entityInfoFromDLockInfo); |
| } |
| |
| return true; |
| } |
| |
| private void latchLockTable() { |
| lockTableLatch.writeLock().lock(); |
| } |
| |
| private void unlatchLockTable() { |
| lockTableLatch.writeLock().unlock(); |
| } |
| |
| private void latchWaitNotify() { |
| waiterLatch.writeLock().lock(); |
| } |
| |
| private void unlatchWaitNotify() { |
| waiterLatch.writeLock().unlock(); |
| } |
| |
| private int handleLockWaiter(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo, boolean isUpgrade, |
| boolean isDatasetLockInfo, TransactionContext txnContext, JobInfo jobInfo, int duplicatedWaiterObjId) |
| throws ACIDException { |
| int waiterId = -1; |
| LockWaiter waiter; |
| int waiterCount = 0; |
| boolean isInterruptedExceptionOccurred = false; |
| |
| if (duplicatedWaiterObjId != -1 |
| || isDeadlockFree(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade)) {//deadlock free -> wait |
| if (duplicatedWaiterObjId == -1) { |
| waiterId = lockWaiterManager.allocate(); //initial value of waiterObj: wait = true, victim = false |
| waiter = lockWaiterManager.getLockWaiter(waiterId); |
| waiter.setEntityInfoSlot(entityInfo); |
| jobInfo.addWaitingResource(waiterId); |
| waiter.setBeginWaitTime(System.currentTimeMillis()); |
| } else { |
| waiterId = duplicatedWaiterObjId; |
| waiter = lockWaiterManager.getLockWaiter(waiterId); |
| } |
| |
| if (duplicatedWaiterObjId == -1) { |
| //add actor properly |
| if (isDatasetLockInfo) { |
| waiter.setWaitingOnEntityLock(false); |
| if (isUpgrade) { |
| dLockInfo.addUpgrader(waiterId); |
| waiter.setWaiter(false); |
| } else { |
| dLockInfo.addWaiter(waiterId); |
| waiter.setWaiter(true); |
| } |
| } else { |
| waiter.setWaitingOnEntityLock(true); |
| if (isUpgrade) { |
| waiter.setWaiter(false); |
| entityLockInfoManager.addUpgrader(eLockInfo, waiterId); |
| } else { |
| waiter.setWaiter(true); |
| entityLockInfoManager.addWaiter(eLockInfo, waiterId); |
| } |
| } |
| } |
| waiter.increaseWaiterCount(); |
| waiter.setFirstGetUp(true); |
| |
| latchWaitNotify(); |
| unlatchLockTable(); |
| synchronized (waiter) { |
| unlatchWaitNotify(); |
| while (waiter.needWait()) { |
| try { |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\twaits(" |
| + waiter.getWaiterCount() + "): WID(" + waiterId + "),EID(" |
| + waiter.getEntityInfoSlot() + ")"); |
| } |
| waiter.wait(); |
| } catch (InterruptedException e) { |
| //TODO figure-out what is the appropriate way to handle this exception |
| e.printStackTrace(); |
| isInterruptedExceptionOccurred = true; |
| waiter.setWait(false); |
| } |
| } |
| } |
| |
| if (isInterruptedExceptionOccurred) { |
| throw new ACIDException("InterruptedException is caught"); |
| } |
| |
| //waiter woke up -> remove/deallocate waiter object and abort if timeout |
| latchLockTable(); |
| |
| if (txnContext.getStatus() == TransactionContext.TIMED_OUT_STATUS || waiter.isVictim()) { |
| try { |
| requestAbort(txnContext); |
| } finally { |
| unlatchLockTable(); |
| } |
| } |
| |
| if (waiter.isFirstGetUp()) { |
| waiter.setFirstGetUp(false); |
| waiterCount = waiter.getWaiterCount(); |
| } else { |
| waiterCount = 0; |
| } |
| |
| waiter.decreaseWaiterCount(); |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\tgot-up!(" + waiter.getWaiterCount() |
| + "): WID(" + waiterId + "),EID(" + waiter.getEntityInfoSlot() + ")"); |
| } |
| if (waiter.getWaiterCount() == 0) { |
| //remove actor properly |
| if (isDatasetLockInfo) { |
| if (isUpgrade) { |
| dLockInfo.removeUpgrader(waiterId); |
| } else { |
| dLockInfo.removeWaiter(waiterId); |
| } |
| } else { |
| if (isUpgrade) { |
| entityLockInfoManager.removeUpgrader(eLockInfo, waiterId); |
| } else { |
| entityLockInfoManager.removeWaiter(eLockInfo, waiterId); |
| } |
| } |
| |
| //if (!isUpgrade && isDatasetLockInfo) { |
| jobInfo.removeWaitingResource(waiterId); |
| //} |
| lockWaiterManager.deallocate(waiterId); |
| } |
| |
| } else { //deadlock -> abort |
| //[Notice] |
| //Before requesting abort, the entityInfo for waiting datasetLock request is deallocated. |
| if (!isUpgrade && isDatasetLockInfo) { |
| //deallocate the entityInfo |
| entityInfoManager.deallocate(entityInfo); |
| } |
| try { |
| requestAbort(txnContext); |
| } finally { |
| unlatchLockTable(); |
| } |
| } |
| |
| return waiterCount; |
| } |
| |
| private boolean isDeadlockFree(DatasetLockInfo dLockInfo, int eLockInfo, int entityInfo, boolean isDatasetLockInfo, |
| boolean isUpgrade) { |
| return deadlockDetector.isSafeToAdd(dLockInfo, eLockInfo, entityInfo, isDatasetLockInfo, isUpgrade); |
| } |
| |
| private void requestAbort(TransactionContext txnContext) throws ACIDException { |
| txnContext.setStatus(TransactionContext.TIMED_OUT_STATUS); |
| txnContext.setStartWaitTime(TransactionContext.INVALID_TIME); |
| throw new ACIDException("Transaction " + txnContext.getJobId() |
| + " should abort (requested by the Lock Manager)"); |
| } |
| |
| /** |
| * For now, upgrading lock granule from entity-granule to dataset-granule is not supported!! |
| * |
| * @param fromLockMode |
| * @param toLockMode |
| * @return |
| */ |
| private boolean isLockUpgrade(byte fromLockMode, byte toLockMode) { |
| return fromLockMode == LockMode.S && toLockMode == LockMode.X; |
| } |
| |
| /** |
| * wake up upgraders first, then waiters. |
| * Criteria to wake up upgraders: if the upgrading lock mode is compatible, then wake up the upgrader. |
| */ |
| private void wakeUpDatasetLockWaiters(DatasetLockInfo dLockInfo) { |
| int waiterObjId = dLockInfo.getFirstUpgrader(); |
| int entityInfo; |
| LockWaiter waiterObj; |
| byte datasetLockMode; |
| byte lockMode; |
| boolean areAllUpgradersAwaken = true; |
| |
| consecutiveWakeupContext.reset(); |
| while (waiterObjId != -1) { |
| //wake up upgraders |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| entityInfo = waiterObj.getEntityInfoSlot(); |
| datasetLockMode = entityInfoManager.getPKHashVal(entityInfo) == -1 ? LockMode.X : LockMode.IX; |
| if (dLockInfo.isUpgradeCompatible(datasetLockMode, entityInfo) |
| && consecutiveWakeupContext.isCompatible(datasetLockMode)) { |
| consecutiveWakeupContext.setLockMode(datasetLockMode); |
| //compatible upgrader is waken up |
| latchWaitNotify(); |
| synchronized (waiterObj) { |
| unlatchWaitNotify(); |
| waiterObj.setWait(false); |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\twake-up(D): WID(" + waiterObjId |
| + "),EID(" + waiterObj.getEntityInfoSlot() + ")"); |
| } |
| waiterObj.notifyAll(); |
| } |
| waiterObjId = waiterObj.getNextWaiterObjId(); |
| } else { |
| areAllUpgradersAwaken = false; |
| break; |
| } |
| } |
| |
| if (areAllUpgradersAwaken) { |
| //wake up waiters |
| waiterObjId = dLockInfo.getFirstWaiter(); |
| while (waiterObjId != -1) { |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| entityInfo = waiterObj.getEntityInfoSlot(); |
| lockMode = entityInfoManager.getDatasetLockMode(entityInfo); |
| datasetLockMode = entityInfoManager.getPKHashVal(entityInfo) == -1 ? lockMode |
| : lockMode == LockMode.S ? LockMode.IS : LockMode.IX; |
| if (dLockInfo.isCompatible(datasetLockMode) && consecutiveWakeupContext.isCompatible(datasetLockMode)) { |
| consecutiveWakeupContext.setLockMode(datasetLockMode); |
| //compatible waiter is waken up |
| latchWaitNotify(); |
| synchronized (waiterObj) { |
| unlatchWaitNotify(); |
| waiterObj.setWait(false); |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\twake-up(D): WID(" |
| + waiterObjId + "),EID(" + waiterObj.getEntityInfoSlot() + ")"); |
| } |
| waiterObj.notifyAll(); |
| } |
| waiterObjId = waiterObj.getNextWaiterObjId(); |
| } else { |
| break; |
| } |
| } |
| } |
| } |
| |
| private void wakeUpEntityLockWaiters(int eLockInfo) { |
| boolean areAllUpgradersAwaken = true; |
| int waiterObjId = entityLockInfoManager.getUpgrader(eLockInfo); |
| int entityInfo; |
| LockWaiter waiterObj; |
| byte entityLockMode; |
| |
| consecutiveWakeupContext.reset(); |
| while (waiterObjId != -1) { |
| //wake up upgraders |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| entityInfo = waiterObj.getEntityInfoSlot(); |
| if (entityLockInfoManager.isUpgradeCompatible(eLockInfo, LockMode.X, entityInfo) |
| && consecutiveWakeupContext.isCompatible(LockMode.X)) { |
| consecutiveWakeupContext.setLockMode(LockMode.X); |
| latchWaitNotify(); |
| synchronized (waiterObj) { |
| unlatchWaitNotify(); |
| waiterObj.setWait(false); |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\twake-up(E): WID(" + waiterObjId |
| + "),EID(" + waiterObj.getEntityInfoSlot() + ")"); |
| } |
| waiterObj.notifyAll(); |
| } |
| waiterObjId = waiterObj.getNextWaiterObjId(); |
| } else { |
| areAllUpgradersAwaken = false; |
| break; |
| } |
| } |
| |
| if (areAllUpgradersAwaken) { |
| //wake up waiters |
| waiterObjId = entityLockInfoManager.getFirstWaiter(eLockInfo); |
| while (waiterObjId != -1) { |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| entityInfo = waiterObj.getEntityInfoSlot(); |
| entityLockMode = entityInfoManager.getEntityLockMode(entityInfo); |
| if (entityLockInfoManager.isCompatible(eLockInfo, entityLockMode) |
| && consecutiveWakeupContext.isCompatible(entityLockMode)) { |
| consecutiveWakeupContext.setLockMode(entityLockMode); |
| //compatible waiter is waken up |
| latchWaitNotify(); |
| synchronized (waiterObj) { |
| unlatchWaitNotify(); |
| waiterObj.setWait(false); |
| if (IS_DEBUG_MODE) { |
| System.out.println("" + Thread.currentThread().getName() + "\twake-up(E): WID(" |
| + waiterObjId + "),EID(" + waiterObj.getEntityInfoSlot() + ")"); |
| } |
| waiterObj.notifyAll(); |
| } |
| } else { |
| break; |
| } |
| waiterObjId = waiterObj.getNextWaiterObjId(); |
| } |
| } |
| } |
| |
| @Override |
| public String prettyPrint() throws ACIDException { |
| StringBuilder s = new StringBuilder("\n########### LockManager Status #############\n"); |
| return s + "\n"; |
| } |
| |
| public void sweepForTimeout() throws ACIDException { |
| JobInfo jobInfo; |
| int waiterObjId; |
| LockWaiter waiterObj; |
| |
| latchLockTable(); |
| |
| Iterator<Entry<JobId, JobInfo>> iter = jobHT.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Map.Entry<JobId, JobInfo> pair = (Map.Entry<JobId, JobInfo>) iter.next(); |
| jobInfo = pair.getValue(); |
| waiterObjId = jobInfo.getFirstWaitingResource(); |
| while (waiterObjId != -1) { |
| waiterObj = lockWaiterManager.getLockWaiter(waiterObjId); |
| toutDetector.checkAndSetVictim(waiterObj); |
| waiterObjId = waiterObj.getNextWaiterObjId(); |
| } |
| } |
| |
| unlatchLockTable(); |
| } |
| } |
| |
| class ConsecutiveWakeupContext { |
| private boolean IS; |
| private boolean IX; |
| private boolean S; |
| private boolean X; |
| |
| public void reset() { |
| IS = false; |
| IX = false; |
| S = false; |
| X = false; |
| } |
| |
| public boolean isCompatible(byte lockMode) { |
| switch (lockMode) { |
| case LockMode.IX: |
| return !S && !X; |
| |
| case LockMode.IS: |
| return !X; |
| |
| case LockMode.X: |
| return !IS && !IX && !S && !X; |
| |
| case LockMode.S: |
| return !IX && !X; |
| |
| default: |
| throw new IllegalStateException("Invalid upgrade lock mode"); |
| } |
| } |
| |
| public void setLockMode(byte lockMode) { |
| switch (lockMode) { |
| case LockMode.IX: |
| IX = true; |
| return; |
| |
| case LockMode.IS: |
| IS = true; |
| return; |
| |
| case LockMode.X: |
| X = true; |
| return; |
| |
| case LockMode.S: |
| S = true; |
| return; |
| |
| default: |
| throw new IllegalStateException("Invalid lock mode"); |
| } |
| |
| } |
| |
| } |