Merged asterix_lsm_stabilization r924:r936.

git-svn-id: https://asterixdb.googlecode.com/svn/branches/asterix_lsm_length_filter@937 eaa15691-b419-025a-1212-ee371bd00084
diff --git a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
index ef6748e..c34b98a 100644
--- a/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
+++ b/asterix-app/src/test/java/edu/uci/ics/asterix/test/runtime/ExecutionTest.java
@@ -93,6 +93,7 @@
         List<CompilationUnit> cUnits = tcCtx.getTestCase().getCompilationUnit();
         for (CompilationUnit cUnit : cUnits) {
             File testFile = tcCtx.getTestFile(cUnit);
+            
             /*************** to avoid run failure cases ****************/
             if (testFile.getAbsolutePath().contains("runtimets/queries/failure/")) {
                 continue;
diff --git a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
index 5cee728..6618ddd 100644
--- a/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/check-state-queries.txt
@@ -1,6 +1,6 @@
-//check_dataset.aql
-//check_datatype.aql
-//check_dataverse.aql
-//check_index.aql
-//check_node.aql
-//check_nodegroup.aql
+check_dataset.aql
+check_datatype.aql
+check_dataverse.aql
+check_index.aql
+check_node.aql
+check_nodegroup.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
index 5fb1c42..c9ef6ee 100644
--- a/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/init-state-queries.txt
@@ -1 +1 @@
-//customers_orders.aql
+customers_orders.aql
diff --git a/asterix-app/src/test/resources/metadata-transactions/queries.txt b/asterix-app/src/test/resources/metadata-transactions/queries.txt
index 5f1589e..0762364 100644
--- a/asterix-app/src/test/resources/metadata-transactions/queries.txt
+++ b/asterix-app/src/test/resources/metadata-transactions/queries.txt
@@ -1,20 +1,20 @@
-//create_duplicate_dataset.aql
-//create_duplicate_dataverse.aql
-//create_duplicate_index.aql
-//create_duplicate_nodegroup.aql
-//create_duplicate_type.aql
-//drop_nonexistent_dataset.aql
-//drop_nonexistent_datatype.aql
-//drop_nonexistent_dataverse.aql
-//drop_nonexistent_index.aql
-//drop_nonexistent_nodegroup.aql
-//rollback_drop_dataset.aql
-//rollback_drop_datatype.aql
-//rollback_drop_dataverse.aql
-//rollback_drop_index.aql
-//rollback_drop_nodegroup.aql
-//rollback_new_dataset.aql
-//rollback_new_datatype.aql
-//rollback_new_dataverse.aql
-//rollback_new_index.aql
-//rollback_new_nodegroup.aql
+create_duplicate_dataset.aql
+create_duplicate_dataverse.aql
+create_duplicate_index.aql
+create_duplicate_nodegroup.aql
+create_duplicate_type.aql
+drop_nonexistent_dataset.aql
+drop_nonexistent_datatype.aql
+drop_nonexistent_dataverse.aql
+drop_nonexistent_index.aql
+drop_nonexistent_nodegroup.aql
+rollback_drop_dataset.aql
+rollback_drop_datatype.aql
+rollback_drop_dataverse.aql
+rollback_drop_index.aql
+rollback_drop_nodegroup.aql
+rollback_new_dataset.aql
+rollback_new_datatype.aql
+rollback_new_dataverse.aql
+rollback_new_index.aql
+rollback_new_nodegroup.aql
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
index d3c7eb0..6e9e936 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/opcallbacks/PrimaryIndexModificationOperationCallback.java
@@ -67,6 +67,9 @@
         int pkHash = computePrimaryKeyHashValue(after, primaryKeyFields, primaryKeyHashFunctions);
         LSMBTreeTupleReference lsmBTreeTuple = (LSMBTreeTupleReference) before;
         IndexOperation oldOp = IndexOperation.INSERT;
+        if (before == null) {
+            oldOp = IndexOperation.NOOP;
+        }
         if (lsmBTreeTuple != null && lsmBTreeTuple.isAntimatter()) {
             oldOp = IndexOperation.DELETE;
         }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
index 4f28c7d..80f74cb 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/ILogRecordHelper.java
@@ -41,7 +41,7 @@
     
     byte getResourceMgrId(LogicalLogLocator logicalLogLocater);
 
-    int getLogRecordSize(LogicalLogLocator logicalLogLocater);
+    int getLogContentSize(LogicalLogLocator logicalLogLocater);
 
     long getLogChecksum(LogicalLogLocator logicalLogLocator);
 
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
index 5e6ef34..cb6a67a 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/IndexResourceManager.java
@@ -43,16 +43,6 @@
         long resourceId = logRecordHelper.getResourceId(logLocator);
         int offset = logRecordHelper.getLogContentBeginPos(logLocator);
 
-        /*
-        byte[] logBufferContent = logLocator.getBuffer().getArray();
-        // read the length of resource id byte array
-        int resourceIdLength = DataUtil.byteArrayToInt(logBufferContent, logContentBeginPos);
-        byte[] resourceIdBytes = new byte[resourceIdLength];
-
-        // copy the resource if bytes
-        System.arraycopy(logBufferContent, logContentBeginPos + 4, resourceIdBytes, 0, resourceIdLength);
-        */
-
         // look up the repository to obtain the resource object
         IIndex index = (IIndex) provider.getTransactionalResourceRepository().getTransactionalResource(resourceId);
 
@@ -106,10 +96,10 @@
                 }
             } else {
                 //For LSMRtree and LSMInvertedIndex
-                //delete --> physical delete
-                //insert --> logical delete
+                //delete --> insert
+                //insert --> delete
                 if (newOperation == (byte) IndexOperation.DELETE.ordinal()) {
-                    indexAccessor.physicalDelete(newTuple);
+                    indexAccessor.insert(newTuple);
                 } else {
                     indexAccessor.delete(newTuple);
                 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
index 8e89301..f29c847 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogCursor.java
@@ -21,33 +21,42 @@
 
 public class LogCursor implements ILogCursor {
 
-    private final ILogManager logManager;
+    private final LogManager logManager;
     private final ILogFilter logFilter;
     private IFileBasedBuffer readOnlyBuffer;
     private LogicalLogLocator logicalLogLocator = null;
     private int bufferIndex = 0;
+    private boolean firstNext = true;
+    private long readLSN = 0;
 
     /**
      * @param logFilter
      */
-    public LogCursor(final ILogManager logManager, ILogFilter logFilter) throws ACIDException {
+    public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
         this.logFilter = logFilter;
         this.logManager = logManager;
 
     }
 
-    public LogCursor(final ILogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
-            throws IOException {
+    public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
+            throws IOException, ACIDException {
         this.logFilter = logFilter;
         this.logManager = logManager;
         initialize(startingPhysicalLogLocator);
     }
 
-    private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException {
-        readOnlyBuffer = getReadOnlyBuffer(startingPhysicalLogLocator.getLsn(), logManager.getLogManagerProperties()
-                .getLogBufferSize());
-        logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), readOnlyBuffer, 0, logManager);
-
+    private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException, ACIDException {
+        if (startingPhysicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
+            readLSN = startingPhysicalLogLocator.getLsn();
+        } else {
+            //read from disk
+            readOnlyBuffer = getReadOnlyBuffer(startingPhysicalLogLocator.getLsn(), logManager
+                    .getLogManagerProperties().getLogBufferSize());
+            logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), readOnlyBuffer, 0,
+                    logManager);
+            readLSN = logicalLogLocator.getLsn();
+        }
+        return;
     }
 
     private IFileBasedBuffer getReadOnlyBuffer(long lsn, int size) throws IOException {
@@ -77,7 +86,12 @@
         int integerRead = -1;
         boolean logRecordBeginPosFound = false;
         long bytesSkipped = 0;
-        
+
+        if (readLSN > logManager.getLastFlushedLsn().get()) {
+            readFromMemory(readLSN, nextLogicalLogLocator);
+            return true;
+        }
+
         //check whether the currentOffset has enough space to have new log record by comparing
         //the smallest log record type(which is commit)'s log header.
         while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
@@ -112,7 +126,9 @@
             }
         }
 
-        int logLength = logManager.getLogRecordHelper().getLogRecordSize(logicalLogLocator);
+        int logLength = logManager.getLogRecordHelper().getLogRecordSize(
+                logManager.getLogRecordHelper().getLogType(logicalLogLocator),
+                logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
         if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
             if (nextLogicalLogLocator == null) {
                 nextLogicalLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
@@ -138,4 +154,57 @@
         return logFilter;
     }
 
+    private void readFromMemory(long lsn, LogicalLogLocator currentLogLocator) throws ACIDException {
+        byte[] logRecord = null;
+        if (lsn > logManager.getCurrentLsn().get()) {
+            throw new ACIDException(" invalid lsn " + lsn);
+        }
+
+        /* check if the log record in the log buffer or has reached the disk. */
+        int pageIndex = logManager.getLogPageIndex(lsn);
+        int pageOffset = logManager.getLogPageOffset(lsn);
+
+        byte[] pageContent = new byte[logManager.getLogManagerProperties().getLogPageSize()];
+        // take a lock on the log page so that the page is not flushed to
+        // disk interim
+        IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
+        int logRecordSize = 0;
+        synchronized (logPage) {
+            // need to check again
+            if (lsn > logManager.getLastFlushedLsn().get()) {
+                // get the log record length
+                logPage.getBytes(pageContent, 0, pageContent.length);
+                byte logType = pageContent[pageOffset + 4];
+                int logHeaderSize = logManager.getLogRecordHelper().getLogHeaderSize(logType);
+                int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
+                logRecordSize = logHeaderSize + logBodySize + logManager.getLogRecordHelper().getLogChecksumSize();
+                logRecord = new byte[logRecordSize];
+
+                //copy the log content
+                System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
+                MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
+                if (logicalLogLocator == null) {
+                    logicalLogLocator = new LogicalLogLocator(lsn, memBuffer, 0, logManager);
+                } else {
+                    logicalLogLocator.setLsn(lsn);
+                    logicalLogLocator.setBuffer(memBuffer);
+                    logicalLogLocator.setMemoryOffset(0);
+                }
+                
+                currentLogLocator.setLsn(lsn);
+                currentLogLocator.setBuffer(memBuffer);
+                currentLogLocator.setMemoryOffset(0);
+                
+                try {
+                    // validate the log record by comparing checksums
+                    if (!logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
+                        throw new ACIDException(" invalid log record at lsn " + lsn);
+                    }
+                } catch (Exception e) {
+                    throw new ACIDException("exception encoutered in validating log record at lsn " + lsn, e);
+                }
+            }
+        }
+        readLSN = readLSN + logRecordSize;
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
index f295347..149498b 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogManager.java
@@ -36,14 +36,8 @@
 
 public class LogManager implements ILogManager {
 
-    /*
-     * Log Record Structure HEADER
-     * <(log_magic_number,4)(log_length,8)(log_type,1
-     * )(log_action_type,1)(log_timestamp
-     * ,8)(log_transaction_id,8)(resource_manager_id
-     * ,1)(page_id,8)(previous_lsn,8) <CONTENT> TAIL <(checksum,8)>
-     */
-
+    
+    public static final boolean IS_DEBUG_MODE = false;//true
     private static final Logger LOGGER = Logger.getLogger(LogManager.class.getName());
     private TransactionSubsystem provider;
     private LogManagerProperties logManagerProperties;
@@ -522,8 +516,8 @@
         }
 
         // all constraints checked and we are good to go and acquire a lsn.
-        long previousLogLocator = -1;
-        long myLogLocator; // the will be set to the location (a long value)
+        long previousLSN = -1;
+        long currentLSN; // the will be set to the location (a long value)
         // where the log record needs to be placed.
 
         /*
@@ -534,10 +528,10 @@
          * the last log record written by (any thread of) the transaction.
          */
         synchronized (context) {
-            previousLogLocator = context.getLastLogLocator().getLsn();
-            myLogLocator = getLsn(totalLogSize, logType);
-            context.getLastLogLocator().setLsn(myLogLocator);
-            logicalLogLocator.setLsn(myLogLocator);
+            previousLSN = context.getLastLogLocator().getLsn();
+            currentLSN = getLsn(totalLogSize, logType);
+            context.setLastLSN(currentLSN);
+            logicalLogLocator.setLsn(currentLSN);
         }
 
         /*
@@ -555,7 +549,7 @@
         // thread has submitted a flush
         // request.
 
-        int pageIndex = (int) getLogPageIndex(myLogLocator);
+        int pageIndex = (int) getLogPageIndex(currentLSN);
 
         /*
          * the lsn has been obtained for the log record. need to set the
@@ -565,13 +559,13 @@
         try {
 
             logicalLogLocator.setBuffer(logPages[pageIndex]);
-            int pageOffset = getLogPageOffset(myLogLocator);
+            int pageOffset = getLogPageOffset(currentLSN);
             logicalLogLocator.setMemoryOffset(pageOffset);
 
             /*
              * write the log header.
              */
-            logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLogLocator,
+            logRecordHelper.writeLogHeader(logicalLogLocator, logType, context, datasetId, PKHashValue, previousLSN,
                     resourceId, resourceMgrId, logContentSize);
             
             // increment the offset so that the transaction can fill up the
@@ -590,6 +584,11 @@
                 // record content at the allocated space.
                 logger.log(context, logicalLogLocator, logContentSize, reusableLogContentObject);
                 logger.postLog(context, reusableLogContentObject);
+                if (IS_DEBUG_MODE) {
+                    logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() - logRecordHelper.getLogHeaderSize(logType));
+                    System.out.println(logRecordHelper.getLogRecordForDisplay(logicalLogLocator));
+                    logicalLogLocator.increaseMemoryOffset(logRecordHelper.getLogHeaderSize(logType));
+                }
             }
 
             /*
@@ -628,7 +627,7 @@
              * been flushed to disk because the containing log page filled up.
              */
             if (logType == LogType.COMMIT) {
-                if (getLastFlushedLsn().get() < myLogLocator) {
+                if (getLastFlushedLsn().get() < currentLSN) {
                     if (!addedFlushRequest) {
                         addFlushRequest(pageIndex);
                     }
@@ -640,7 +639,7 @@
                      * waiting threads of the flush event.
                      */
                     synchronized (logPages[pageIndex]) {
-                        while (getLastFlushedLsn().get() < myLogLocator) {
+                        while (getLastFlushedLsn().get() < currentLSN) {
                             logPages[pageIndex].wait();
                         }
                     }
@@ -701,7 +700,11 @@
             FileChannel fileChannel = raf.getChannel();
             fileChannel.read(buffer);
             buffer.position(0);
-            buffer.limit(buffer.getInt(4));
+            byte logType = buffer.get(4);
+            int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
+            int logBodySize = buffer.getInt(logHeaderSize-4);
+            int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
+            buffer.limit(logRecordSize);
             MemBasedBuffer memBuffer = new MemBasedBuffer(buffer.slice());
             logicalLogLocator = new LogicalLogLocator(physicalLogLocator.getLsn(), memBuffer, 0, this);
             if (!logRecordHelper.validateLogRecord(logicalLogLocator)) {
@@ -737,6 +740,10 @@
         if (lsnValue > getLastFlushedLsn().get()) {
             int pageIndex = getLogPageIndex(lsnValue);
             int pageOffset = getLogPageOffset(lsnValue);
+            
+            //TODO
+            //minimize memory allocation overhead. current code allocates 10MBytes per reading a log record.
+            
             byte[] pageContent = new byte[logManagerProperties.getLogPageSize()];
             // take a lock on the log page so that the page is not flushed to
             // disk interim
@@ -750,13 +757,16 @@
 
                     // get the log record length
                     logPages[pageIndex].getBytes(pageContent, 0, pageContent.length);
-                    int logRecordLength = DataUtil.byteArrayToInt(pageContent, pageOffset + 4);
-                    logRecord = new byte[logRecordLength];
+                    byte logType = pageContent[pageOffset+4];
+                    int logHeaderSize = logRecordHelper.getLogHeaderSize(logType);
+                    int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
+                    int logRecordSize = logHeaderSize + logBodySize + logRecordHelper.getLogChecksumSize();
+                    logRecord = new byte[logRecordSize];
 
                     /*
                      * copy the log record content
                      */
-                    System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordLength);
+                    System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
                     MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
                     logLocator = new LogicalLogLocator(lsnValue, memBuffer, 0, this);
                     try {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
index ef8fc54..d53d842 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/logging/LogRecordHelper.java
@@ -36,6 +36,7 @@
  * LogRecordSize(4)
  * --------------------------- COMMIT doesn't have Body fields.
  * [Body] The Body size is given through the parameter reusableLogContentObjectLength
+ * TupleFieldCount(4)
  * NewOp(1)
  * NewValueLength(4)
  * NewValue(NewValueLength)
@@ -116,14 +117,15 @@
     }
 
     @Override
-    public int getLogRecordSize(LogicalLogLocator logicalLogLocater) {
+    public int getLogContentSize(LogicalLogLocator logicalLogLocater) {
         return logicalLogLocater.getBuffer().readInt(logicalLogLocater.getMemoryOffset() + LOG_RECORD_SIZE_POS);
     }
 
     @Override
     public long getLogChecksum(LogicalLogLocator logicalLogLocator) {
         return (logicalLogLocator.getBuffer()).readLong(logicalLogLocator.getMemoryOffset()
-                + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE);
+                + getLogRecordSize(getLogType(logicalLogLocator), getLogContentSize(logicalLogLocator))
+                - LOG_CHECKSUM_SIZE);
     }
 
     @Override
@@ -133,7 +135,9 @@
 
     @Override
     public int getLogContentEndPos(LogicalLogLocator logicalLogLocator) {
-        return logicalLogLocator.getMemoryOffset() + getLogRecordSize(logicalLogLocator) - LOG_CHECKSUM_SIZE;
+        return logicalLogLocator.getMemoryOffset()
+                + getLogRecordSize(getLogType(logicalLogLocator), getLogContentSize(logicalLogLocator))
+                - LOG_CHECKSUM_SIZE;
     }
 
     @Override
@@ -153,10 +157,13 @@
         builder.append(" Job Id : ").append(getJobId(logicalLogLocator));
         builder.append(" Dataset Id : ").append(getDatasetId(logicalLogLocator));
         builder.append(" PK Hash Value : ").append(getPKHashValue(logicalLogLocator));
-        builder.append(" PrevLSN : ").append(getPrevLSN(logicalLogLocator));
-        builder.append(" Resource Id : ").append(getResourceId(logicalLogLocator));
-        builder.append(" ResourceMgr Id : ").append(getResourceMgrId(logicalLogLocator));
-        builder.append(" Log Record Size : ").append(getLogRecordSize(logicalLogLocator));
+        if (logType == LogType.UPDATE) {
+            builder.append(" PrevLSN : ").append(getPrevLSN(logicalLogLocator).getLsn());
+            builder.append(" Resource Id : ").append(getResourceId(logicalLogLocator));
+            builder.append(" ResourceMgr Id : ").append(getResourceMgrId(logicalLogLocator));
+            builder.append(" Log Record Size : ").append(
+                    getLogRecordSize(logType, getLogContentSize(logicalLogLocator)));
+        }
         return builder.toString();
     }
 
@@ -198,12 +205,13 @@
             /* log record size */
             (logicalLogLocator.getBuffer()).writeInt(logicalLogLocator.getMemoryOffset() + LOG_RECORD_SIZE_POS,
                     logRecordSize);
+
         }
     }
 
     @Override
     public boolean validateLogRecord(LogicalLogLocator logicalLogLocator) {
-        int logLength = this.getLogRecordSize(logicalLogLocator);
+        int logLength = this.getLogRecordSize(getLogType(logicalLogLocator), getLogContentSize(logicalLogLocator));
         long expectedChecksum = DataUtil.getChecksum(logicalLogLocator.getBuffer(),
                 logicalLogLocator.getMemoryOffset(), logLength - LOG_CHECKSUM_SIZE);
         long actualChecksum = getLogChecksum(logicalLogLocator);
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
index 9a097d6..780debc 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/recovery/RecoveryManager.java
@@ -20,9 +20,14 @@
 import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
@@ -40,6 +45,7 @@
 import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.logging.PhysicalLogLocator;
 import edu.uci.ics.asterix.transaction.management.service.transaction.IResourceManager;
+import edu.uci.ics.asterix.transaction.management.service.transaction.JobId;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionManagementConstants;
 import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionSubsystem;
@@ -158,23 +164,27 @@
     }
 
     /**
-     * Rollback a transaction (non-Javadoc)
+     * Rollback a transaction
      * 
      * @see edu.uci.ics.transaction.management.service.recovery.IRecoveryManager# rollbackTransaction (edu.uci.ics.TransactionContext.management.service.transaction .TransactionContext)
      */
     @Override
     public void rollbackTransaction(TransactionContext txnContext) throws ACIDException {
         ILogManager logManager = txnSubsystem.getLogManager();
-        ILogRecordHelper parser = logManager.getLogRecordHelper();
+        ILogRecordHelper logRecordHelper = logManager.getLogRecordHelper();
+        Map<TxnId, List<Long>> loserTxnTable = new HashMap<TxnId, List<Long>>();
+        TxnId tempKeyTxnId = new TxnId(-1, -1, -1);
 
-        // Obtain the last log record written by the transaction
-        PhysicalLogLocator lsn = txnContext.getLastLogLocator();
+        // Obtain the first log record written by the Job
+        PhysicalLogLocator firstLSNLogLocator = txnContext.getFirstLogLocator();
+        PhysicalLogLocator lastLSNLogLocator = txnContext.getLastLogLocator();
         if (LOGGER.isLoggable(Level.INFO)) {
-            LOGGER.info(" rollbacking transaction log records at lsn " + lsn.getLsn());
+            LOGGER.info(" rollbacking transaction log records from " + firstLSNLogLocator.getLsn() + "to"
+                    + lastLSNLogLocator.getLsn());
         }
 
         // check if the transaction actually wrote some logs.
-        if (lsn.getLsn() == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
+        if (firstLSNLogLocator.getLsn() == TransactionManagementConstants.LogManagerConstants.TERMINAL_LSN) {
             if (LOGGER.isLoggable(Level.INFO)) {
                 LOGGER.info(" no need to roll back as there were no operations by the transaction "
                         + txnContext.getJobId());
@@ -182,68 +192,142 @@
             return;
         }
 
-        // a dummy logLocator instance that is re-used during rollback
-        LogicalLogLocator logLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+        // While reading log records from firstLSN to lastLSN, collect uncommitted txn's LSNs 
+        ILogCursor logCursor;
+        try {
+            logCursor = logManager.readLog(firstLSNLogLocator, new ILogFilter() {
+                @Override
+                public boolean accept(IBuffer buffer, long startOffset, int length) {
+                    return true;
+                }
+            });
+        } catch (IOException e) {
+            throw new ACIDException("Failed to create LogCursor with LSN:" + firstLSNLogLocator.getLsn(), e);
+        }
 
-        while (true) {
+        LogicalLogLocator currentLogLocator = LogUtil.getDummyLogicalLogLocator(logManager);
+        boolean valid;
+        byte logType;
+        List<Long> undoLSNSet = null;
+
+        while (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
             try {
-                // read the log record at the given position
-                logLocator = logManager.readLog(lsn);
-            } catch (Exception e) {
-                e.printStackTrace();
-                state = SystemState.CORRUPTED;
-                throw new ACIDException(" could not read log at lsn :" + lsn, e);
+                valid = logCursor.next(currentLogLocator);
+            } catch (IOException e) {
+                throw new ACIDException("Failed to read log at LSN:" + currentLogLocator.getLsn(), e);
+            }
+            if (!valid) {
+                if (currentLogLocator.getLsn() != lastLSNLogLocator.getLsn()) {
+                    throw new ACIDException("Log File Corruption: lastLSN mismatch");
+                } else {
+                    break;//End of Log File
+                }
             }
 
-            byte logType = parser.getLogType(logLocator);
-            if (LOGGER.isLoggable(Level.FINE)) {
-                LOGGER.fine(" reading LSN value inside rollback transaction method " + txnContext.getLastLogLocator()
-                        + " jodId " + parser.getJobId(logLocator) + " log type  " + logType);
-            }
+            tempKeyTxnId.setTxnId(logRecordHelper.getJobId(currentLogLocator), logRecordHelper.getDatasetId(currentLogLocator),
+                    logRecordHelper.getPKHashValue(currentLogLocator));
+            logType = logRecordHelper.getLogType(currentLogLocator);
 
             switch (logType) {
                 case LogType.UPDATE:
-
-                    // extract the resource manager id from the log record.
-                    byte resourceMgrId = parser.getResourceMgrId(logLocator);
-                    if (LOGGER.isLoggable(Level.FINE)) {
-                        LOGGER.fine(parser.getLogRecordForDisplay(logLocator));
+                    undoLSNSet = loserTxnTable.get(tempKeyTxnId);
+                    if (undoLSNSet == null) {
+                        TxnId txnId = new TxnId(logRecordHelper.getJobId(currentLogLocator),
+                                logRecordHelper.getDatasetId(currentLogLocator), logRecordHelper.getPKHashValue(currentLogLocator));
+                        undoLSNSet = new ArrayList<Long>();
+                        loserTxnTable.put(txnId, undoLSNSet);
                     }
-
-                    // look up the repository to get the resource manager
-                    IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
-                            .getTransactionalResourceMgr(resourceMgrId);
-
-                    // register resourceMgr if it is not registered. 
-                    if (resourceMgr == null) {
-                        resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
-                        txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
-                                resourceMgrId, resourceMgr);
-                    }
-                    resourceMgr.undo(parser, logLocator);
+                    undoLSNSet.add(currentLogLocator.getLsn());
                     break;
 
                 case LogType.COMMIT:
-                    throw new ACIDException(txnContext, " cannot rollback commmitted transaction");
+                    undoLSNSet = loserTxnTable.get(tempKeyTxnId);
+                    if (undoLSNSet != null) {
+                        loserTxnTable.remove(tempKeyTxnId);
+                    }
+                    break;
 
                 default:
                     throw new ACIDException("Unsupported LogType: " + logType);
-
-            }
-
-            // follow the previous LSN pointer to get the previous log record
-            // written by the transaction
-            // If the return value is true, the logLocator, it indicates that
-            // the logLocator object has been
-            // appropriately set to the location of the next log record to be
-            // processed as part of the roll back
-            boolean moreLogs = parser.getPrevLSN(lsn, logLocator);
-            if (!moreLogs) {
-                // no more logs to process
-                break;
             }
         }
 
+        //undo loserTxn's effect
+        TxnId txnId;
+        Iterator<Entry<TxnId, List<Long>>> iter = loserTxnTable.entrySet().iterator();
+        byte resourceMgrId;
+        while (iter.hasNext()) {
+            Map.Entry<TxnId, List<Long>> loserTxn = (Map.Entry<TxnId, List<Long>>) iter.next();
+            txnId = loserTxn.getKey();
+
+            undoLSNSet = loserTxn.getValue();
+            Comparator<Long> comparator = Collections.reverseOrder();
+            Collections.sort(undoLSNSet, comparator);
+
+            for (long undoLSN : undoLSNSet) {
+                currentLogLocator.setLsn(undoLSN);
+                // here, all the log records are UPDATE type. So, we don't need to check the type again.
+
+                // extract the resource manager id from the log record.
+                resourceMgrId = logRecordHelper.getResourceMgrId(currentLogLocator);
+                if (LOGGER.isLoggable(Level.FINE)) {
+                    LOGGER.fine(logRecordHelper.getLogRecordForDisplay(currentLogLocator));
+                }
+
+                // look up the repository to get the resource manager
+                IResourceManager resourceMgr = txnSubsystem.getTransactionalResourceRepository()
+                        .getTransactionalResourceMgr(resourceMgrId);
+
+                // register resourceMgr if it is not registered. 
+                if (resourceMgr == null) {
+                    resourceMgr = new IndexResourceManager(resourceMgrId, txnSubsystem);
+                    txnSubsystem.getTransactionalResourceRepository().registerTransactionalResourceManager(
+                            resourceMgrId, resourceMgr);
+                }
+                resourceMgr.undo(logRecordHelper, currentLogLocator);
+            }
+        }
+    }
+}
+
+class TxnId {
+    public int jobId;
+    public int datasetId;
+    public int pkHashVal;
+
+    public TxnId(int jobId, int datasetId, int pkHashVal) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.pkHashVal = pkHashVal;
     }
 
+    public void setTxnId(int jobId, int datasetId, int pkHashVal) {
+        this.jobId = jobId;
+        this.datasetId = datasetId;
+        this.pkHashVal = pkHashVal;
+    }
+
+    public void setTxnId(TxnId txnId) {
+        this.jobId = txnId.jobId;
+        this.datasetId = txnId.datasetId;
+        this.pkHashVal = txnId.pkHashVal;
+    }
+
+    @Override
+    public int hashCode() {
+        return pkHashVal;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == this) {
+            return true;
+        }
+        if (!(o instanceof JobId)) {
+            return false;
+        }
+        TxnId txnId = (TxnId) o;
+
+        return (txnId.pkHashVal == pkHashVal && txnId.datasetId == datasetId && txnId.jobId == jobId);
+    }
 }
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
index bbade35..a88ed5f 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionContext.java
@@ -52,8 +52,9 @@
 
     private static final long serialVersionUID = -6105616785783310111L;
     private TransactionSubsystem transactionSubsystem;
-    private LogicalLogLocator lastLogLocator;
-    private TransactionState txnState;
+    private LogicalLogLocator firstLogLocator;//firstLSN of the Job
+    private LogicalLogLocator lastLogLocator;//lastLSN of the Job
+    private TransactionState txnState; 
     private long startWaitTime;
     private int status;
     private Set<ICloseable> resources = new HashSet<ICloseable>();
@@ -74,6 +75,7 @@
     }
 
     private void init() throws ACIDException {
+        firstLogLocator = LogUtil.getDummyLogicalLogLocator(transactionSubsystem.getLogManager());
         lastLogLocator = LogUtil.getDummyLogicalLogLocator(transactionSubsystem.getLogManager());
         txnState = TransactionState.ACTIVE;
         startWaitTime = INVALID_TIME;
@@ -117,12 +119,19 @@
         resources.add(resource);
     }
 
+    public LogicalLogLocator getFirstLogLocator() {
+        return firstLogLocator;
+    }
+    
     public LogicalLogLocator getLastLogLocator() {
         return lastLogLocator;
     }
 
-    public void setLastLSN(LogicalLogLocator lastLogLocator) {
-        this.lastLogLocator = lastLogLocator;
+    public void setLastLSN(long lsn) {
+        if (firstLogLocator.getLsn() == -1) {
+            firstLogLocator.setLsn(lsn);
+        }
+        lastLogLocator.setLsn(lsn);
     }
 
     public JobId getJobId() {
diff --git a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
index a22a7a2..3f71540 100644
--- a/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
+++ b/asterix-transactions/src/main/java/edu/uci/ics/asterix/transaction/management/service/transaction/TransactionManager.java
@@ -51,6 +51,7 @@
                 if (LOGGER.isLoggable(Level.SEVERE)) {
                     LOGGER.severe(msg);
                 }
+                ae.printStackTrace();
                 throw new Error(msg);
             } finally {
                 txnContext.releaseResources();