merge -r1655:1701 from asterix_lsm_stabilization to asterix_lsm_stabilization_kisskys

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_stabilization_kisskys@1702 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
index aeb54b5..b1a28ca 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/locking/LockManager.java
@@ -46,6 +46,7 @@
     public static final boolean IS_DEBUG_MODE = false;//true
 
     public static final boolean ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET = true;
+    //Threshold must be greater than 1 and should be reasonably large enough not to escalate too soon.
     public static final int ESCALATE_TRHESHOLD_ENTITY_TO_DATASET = 1000;
     private static final int DO_ESCALATE = 0;
     private static final int ESCALATED = 1;
@@ -103,11 +104,11 @@
     @Override
     public void lock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
             throws ACIDException {
-        internalLock(datasetId, entityHashValue, lockMode, txnContext);
+        internalLock(datasetId, entityHashValue, lockMode, txnContext, false);
     }
 
-    private void internalLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
-            throws ACIDException {
+    private void internalLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext,
+            boolean isInstant) throws ACIDException {
 
         JobId jobId = txnContext.getJobId();
         int jId = jobId.getId(); //int-type jobId
@@ -117,7 +118,7 @@
         DatasetLockInfo dLockInfo = null;
         JobInfo jobInfo;
         byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
-        boolean isEscalated = false;
+        boolean doEscalate = false;
 
         latchLockTable();
         validateJob(txnContext);
@@ -131,12 +132,12 @@
         jobInfo = jobHT.get(jobId);
 
         if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-            if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
-                int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
-                switch (upgradeStatus) {
+            if (!isInstant && datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
+                int escalateStatus = needEscalateFromEntityToDataset(jobInfo, dId, lockMode);
+                switch (escalateStatus) {
                     case DO_ESCALATE:
                         entityHashValue = -1;
-                        isEscalated = true;
+                        doEscalate = true;
                         break;
 
                     case ESCALATED:
@@ -180,8 +181,12 @@
             jobInfo.addHoldingResource(entityInfo);
 
             if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-                if (datasetLockMode == LockMode.IS) {
+                if (!isInstant && datasetLockMode == LockMode.IS) {
                     jobInfo.increaseDatasetISLockCount(dId);
+                    if (doEscalate) {
+                        throw new IllegalStateException("ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
+                                + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+                    }
                 }
             }
 
@@ -204,11 +209,21 @@
         }
 
         if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-            if (isEscalated) {
-                releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
-            }
-            if (jobInfo != null && datasetLockMode == LockMode.IS) {
-                jobInfo.increaseDatasetISLockCount(dId);
+            if (!isInstant) {
+                if (doEscalate) {
+                    //jobInfo must not be null.
+                    assert jobInfo != null;
+                    jobInfo.increaseDatasetISLockCount(dId);
+                    //release pre-acquired locks
+                    releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+                } else if (datasetLockMode == LockMode.IS) {
+                    if (jobInfo == null) {
+                        jobInfo = jobHT.get(jobId);
+                        //jobInfo must not be null;
+                        assert jobInfo != null;
+                    }
+                    jobInfo.increaseDatasetISLockCount(dId);
+                }
             }
         }
 
@@ -247,7 +262,7 @@
         }
     }
 
-    private int needUpgradeFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
+    private int needEscalateFromEntityToDataset(JobInfo jobInfo, int datasetId, byte lockMode) {
         //we currently allow upgrade only if the lockMode is S. 
         if (lockMode != LockMode.S) {
             return DONOT_ESCALATE;
@@ -594,17 +609,21 @@
 
     @Override
     public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, false);
+        internalUnlock(datasetId, entityHashValue, txnContext, false, false);
     }
 
     @Override
     public void unlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext, boolean commitFlag)
             throws ACIDException {
-        internalUnlock(datasetId, entityHashValue, txnContext, commitFlag);
+        internalUnlock(datasetId, entityHashValue, txnContext, false, commitFlag);
+    }
+    
+    private void instantUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext) throws ACIDException {
+        internalUnlock(datasetId, entityHashValue, txnContext, true, false);
     }
 
     private void internalUnlock(DatasetId datasetId, int entityHashValue, TransactionContext txnContext,
-            boolean commitFlag) throws ACIDException {
+            boolean isInstant, boolean commitFlag) throws ACIDException {
         JobId jobId = txnContext.getJobId();
         int eLockInfo = -1;
         DatasetLockInfo dLockInfo = null;
@@ -731,7 +750,7 @@
         //since the datasetLockInfo is likely to be referred to again.
 
         if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-            if (datasetLockMode == LockMode.IS) {
+            if (!isInstant && datasetLockMode == LockMode.IS) {
                 jobInfo.decreaseDatasetISLockCount(datasetId.getId());
             }
         }
@@ -949,14 +968,14 @@
         //        } finally {
         //            unlock(datasetId, entityHashValue, txnContext);
         //        }
-        internalLock(datasetId, entityHashValue, lockMode, txnContext);
-        unlock(datasetId, entityHashValue, txnContext);
+        internalLock(datasetId, entityHashValue, lockMode, txnContext, true);
+        instantUnlock(datasetId, entityHashValue, txnContext);
     }
 
     @Override
     public boolean tryLock(DatasetId datasetId, int entityHashValue, byte lockMode, TransactionContext txnContext)
             throws ACIDException {
-        return internalTryLock(datasetId, entityHashValue, lockMode, txnContext);
+        return internalTryLock(datasetId, entityHashValue, lockMode, txnContext, false);
     }
 
     @Override
@@ -971,15 +990,15 @@
         //                unlock(datasetId, entityHashValue, txnContext);
         //            }
         //        }
-        isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext);
+        isGranted = internalTryLock(datasetId, entityHashValue, lockMode, txnContext, true);
         if (isGranted) {
-            unlock(datasetId, entityHashValue, txnContext);
+            instantUnlock(datasetId, entityHashValue, txnContext);
         }
         return isGranted;
     }
 
     private boolean internalTryLock(DatasetId datasetId, int entityHashValue, byte lockMode,
-            TransactionContext txnContext) throws ACIDException {
+            TransactionContext txnContext, boolean isInstant) throws ACIDException {
         JobId jobId = txnContext.getJobId();
         int jId = jobId.getId(); //int-type jobId
         int dId = datasetId.getId(); //int-type datasetId
@@ -989,7 +1008,7 @@
         JobInfo jobInfo;
         byte datasetLockMode = entityHashValue == -1 ? lockMode : lockMode == LockMode.S ? LockMode.IS : LockMode.IX;
         boolean isSuccess = false;
-        boolean isEscalated = false;
+        boolean doEscalate = false;
 
         latchLockTable();
         validateJob(txnContext);
@@ -1003,12 +1022,12 @@
         jobInfo = jobHT.get(jobId);
 
         if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-            if (datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
-                int upgradeStatus = needUpgradeFromEntityToDataset(jobInfo, dId, lockMode);
+            if (!isInstant && datasetLockMode == LockMode.IS && jobInfo != null && dLockInfo != null) {
+                int upgradeStatus = needEscalateFromEntityToDataset(jobInfo, dId, lockMode);
                 switch (upgradeStatus) {
                     case DO_ESCALATE:
                         entityHashValue = -1;
-                        isEscalated = true;
+                        doEscalate = true;
                         break;
 
                     case ESCALATED:
@@ -1052,8 +1071,14 @@
             jobInfo.addHoldingResource(entityInfo);
 
             if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-                if (datasetLockMode == LockMode.IS) {
+                if (!isInstant && datasetLockMode == LockMode.IS) {
                     jobInfo.increaseDatasetISLockCount(dId);
+                    if (doEscalate) {
+                        //This exception is thrown when the threshold value is set to 1.
+                        //We don't want to allow the lock escalation when there is a first lock request on a dataset. 
+                        throw new IllegalStateException("ESCALATE_TRHESHOLD_ENTITY_TO_DATASET should not be set to "
+                                + ESCALATE_TRHESHOLD_ENTITY_TO_DATASET);
+                    }
                 }
             }
 
@@ -1083,11 +1108,21 @@
         }
 
         if (ALLOW_ESCALATE_FROM_ENTITY_TO_DATASET) {
-            if (isEscalated) {
-                releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
-            }
-            if (jobInfo != null && datasetLockMode == LockMode.IS) {
-                jobInfo.increaseDatasetISLockCount(dId);
+            if (!isInstant) {
+                if (doEscalate) {
+                    //jobInfo must not be null.
+                    assert jobInfo != null;
+                    jobInfo.increaseDatasetISLockCount(dId);
+                    //release pre-acquired locks
+                    releaseDatasetISLocks(jobInfo, jobId, datasetId, txnContext);
+                } else if (datasetLockMode == LockMode.IS) {
+                    if (jobInfo == null) {
+                        jobInfo = jobHT.get(jobId);
+                        //jobInfo must not be null;
+                        assert jobInfo != null;
+                    }
+                    jobInfo.increaseDatasetISLockCount(dId);
+                }
             }
         }