Changes to implement crash recovery. Has been tested with a couple of tests doing service shutdown/startup. Will implement a test specific for this.


git-svn-id: https://svn.apache.org/repos/asf/directory/apacheds/branches/apacheds-txns@1351037 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java b/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java
index 57586a7..93b06bf 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java
@@ -72,16 +72,23 @@
      * @return A scanner to read the logs one by one
      */
     LogScanner beginScan();
-
-
+    
+    
     /**
-     * Advances the min needed position in the logs. Logging subsystem uses this
-     * information to get rid of unneeded
-     *
-     * @param newAnchor The new position
+     * Advances the checkpoint in the logs
+     * 
+     * @param checkPoint min needed position the caller needs in the logs.
      */
-    void advanceMinNeededLogPosition( LogAnchor newAnchor );
+    void advanceCheckPoint( LogAnchor checkPoint );
 
+    
+    /**
+     * Return the current checkpoint anchor
+     * 
+     * @return the current checkpoint anchor
+     */
+    LogAnchor getCheckPoint();
+    
 
     /**
      * Synchronizes the log up to the given LSN. If LSN is equal to unknown 
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java b/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java
index 7ec6224..3685ab8 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java
@@ -365,7 +365,7 @@
         if ( entry == null )
         {
             LookupOperationContext lookupCtx = new LookupOperationContext( modifyContext.getSession(), modifyContext.getDn() );
-            entry = wrapped.lookup( lookupCtx );
+            entry = modifyContext.getSession().getDirectoryService().getPartitionNexus().lookup( lookupCtx );;
             modifyContext.setEntry( entry );
         }
 
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java
index f55257d..75f1e9a 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java
@@ -24,6 +24,7 @@
 import java.util.Comparator;
 import java.util.UUID;
 
+import org.apache.directory.server.core.api.log.LogAnchor;
 import org.apache.directory.server.core.api.log.UserLogRecord;
 import org.apache.directory.server.core.api.partition.index.Index;
 import org.apache.directory.server.core.api.partition.index.IndexComparator;
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
index 285d78b..83f7966 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
@@ -19,6 +19,7 @@
  */
 package org.apache.directory.server.core.api.txn;
 
+import org.apache.directory.server.core.api.partition.Partition;
 import org.apache.directory.shared.ldap.model.exception.LdapException;
 
 
@@ -131,5 +132,11 @@
      * @return TRUE if txn needs to do logical data reinit
      */
     boolean prepareForLogicalDataReinit();
+    
+    
+    /** 
+     * Recovers the given partition
+     */
+    void recoverPartition( Partition partition );
 
 }
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java b/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java
index 8672d5f..19880d2 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java
@@ -32,6 +32,12 @@
  */
 public interface LogEdit extends Externalizable
 {
+	public enum EditType
+    {
+    	DATA_CHANGE,
+    	TXN_MARKER
+    }
+	
     /**
      * Returns the position the edit is inserted in the wal.
      * Log anchor is initialized is set after the edit is serialized and inserted into
diff --git a/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java b/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java
index 8074837..89c54c0 100644
--- a/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java
+++ b/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java
@@ -34,6 +34,7 @@
 import org.apache.directory.server.core.integ.FrameworkRunner;
 import org.apache.directory.server.core.integ.IntegrationUtils;
 import org.apache.directory.shared.ldap.model.entry.Attribute;
+import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
 import org.apache.directory.shared.ldap.model.entry.Entry;
 import org.apache.directory.shared.ldap.model.exception.LdapAuthenticationException;
 import org.apache.directory.shared.ldap.model.message.ModifyRequest;
@@ -93,6 +94,7 @@
         performAdminAccountChecks( entry );
         assertTrue( ArrayUtils.isEquals( entry.get( "userPassword" ).get().getBytes(), Strings
             .getBytesUtf8("secret") ) );
+        
         connection.close();
 
         getService().shutdown();
@@ -103,6 +105,9 @@
         performAdminAccountChecks( entry );
         assertTrue( ArrayUtils.isEquals( entry.get( "userPassword" ).get().getBytes(), Strings
             .getBytesUtf8("secret") ) );
+        
+
+        
         connection.close();
     }
 
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java
index 60040b4..b6e810d 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java
@@ -106,16 +106,30 @@
         return logScanner;
     }
 
+    
+    /**
+     * {@inheritDoc}
+     */
+    public void advanceCheckPoint( LogAnchor checkPoint )
+    {
+        if ( checkPoint == null )
+        {
+            return;
+        }
+
+        logManager.advanceCheckPoint( checkPoint );
+    }
+
 
     /**
      * {@inheritDoc}
      */
-    public void advanceMinNeededLogPosition( LogAnchor newAnchor )
+    public LogAnchor getCheckPoint()
     {
-        logManager.advanceMinLogAnchor( newAnchor );
+    	return logManager.getCheckPoint();
     }
-
-
+    
+    
     /**
      * {@inheritDoc}
      */
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java
index 2151f77..dc349c3 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java
@@ -109,6 +109,8 @@
         this.logManager = logManager;
 
         logBuffer = new LogBuffer( logBufferSize, currentLogFile );
+        
+        logLSN = logManager.getInitialLsn();
     }
 
 
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
index 7fad2d8..4c2b878 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
@@ -90,6 +90,9 @@
 
     /** The Checksum used */
     private Checksum checksum = new Adler32();
+    
+    /** Max lsn in the log after recovery */
+    private long initialLsn = Long.MIN_VALUE ;
 
 
     /**
@@ -163,6 +166,9 @@
             {
                 scanner.close();
             }
+            
+            initialLsn = logRecord.getLogAnchor().getLogLSN();
+            System.out.println(" Log manager inital lsn " + initialLsn);
 
             long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
             long lastGoodLogFileOffset = scanner.getLastGoodOffset();
@@ -227,6 +233,8 @@
                     createNextLogFile( true );
                 }
             }
+            
+            return;
         }
 
         /*
@@ -267,6 +275,15 @@
 
 
     /**
+     * 
+     * @return return the max lsn in the log after recovery
+     */
+    public long getInitialLsn()
+    {
+    	return initialLsn;
+    }
+    
+    /**
      * Called by LogFlushManager to switch to the next file.
      *
      * Note:Currently we do a checkpoint and delete unnecessary log files when we switch to a new file. Some
@@ -298,44 +315,55 @@
         return writer;
     }
 
-
-    /**
-     * @return The anchor associated with the last valid checkpoint.
-     */
-    /* Package protected */LogAnchor getMinLogAnchor()
-    {
-        minLogAnchorLock.lock();
-        LogAnchor anchor = new LogAnchor();
-        anchor.resetLogAnchor( minLogAnchor );
-        minLogAnchorLock.unlock();
-
-        return anchor;
-    }
-
-
     /**
      * Called when the logging subsystem is notified about the minimum position 
      * in the log files that is needed. Log manager uses this information to advance
      * its checkpoint and delete unnecessary log files.
      *
-     * @param newLogAnchor min needed log anchor
+     * @param newCheckPoint min needed log anchor
      */
-    public void advanceMinLogAnchor( LogAnchor newLogAnchor )
+    public void advanceCheckPoint( LogAnchor newCheckPoint )
     {
-        if ( newLogAnchor == null )
+        if ( newCheckPoint == null )
         {
             return;
         }
 
         minLogAnchorLock.lock();
 
-        if ( anchorComparator.compare( minLogAnchor, newLogAnchor ) < 0 )
+        if ( anchorComparator.compare( minLogAnchor, newCheckPoint ) < 0 )
         {
-            minLogAnchor.resetLogAnchor( newLogAnchor );
+            minLogAnchor.resetLogAnchor( newCheckPoint );
+        }
+        
+        try
+        {
+        	writeControlFile();
+        }
+        catch ( IOException e )
+        {
+        	// Ignore
         }
 
         minLogAnchorLock.unlock();
     }
+    
+    /**
+     * 
+     * @return the current with the checkpoint log achor
+     */
+    public LogAnchor getCheckPoint()
+    {
+    	LogAnchor anchor = new LogAnchor();
+    	
+    	minLogAnchorLock.lock();
+    	
+    	anchor.resetLogAnchor( minLogAnchor );
+    	
+    	minLogAnchorLock.unlock();
+    	
+    	return anchor;
+    }    
 
 
     /**
@@ -359,15 +387,10 @@
      */
     private void writeControlFile() throws IOException
     {
-        // Copy the min log file anchor
-        minLogAnchorLock.lock();
-
         controlFileRecord.minNeededLogFile = minLogAnchor.getLogFileNumber();
         controlFileRecord.minNeededLogFileOffset = minLogAnchor.getLogFileOffset();
         controlFileRecord.minNeededLSN = minLogAnchor.getLogLSN();
-
-        minLogAnchorLock.unlock();
-
+        
         if ( controlFileRecord.minNeededLogFile > controlFileRecord.minExistingLogFile )
         {
             deleteUnnecessaryLogFiles( controlFileRecord.minExistingLogFile, controlFileRecord.minNeededLogFile );
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java
index 9fff0b1..a2941cc 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java
@@ -504,6 +504,7 @@
         }
         catch ( Exception e )
         {
+        	e.printStackTrace();
             throw new LdapOperationErrorException( e.getMessage(), e );
         }
     }
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java
index 41362a6..651fcec 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java
@@ -33,17 +33,31 @@
 {
     /** Operation Manager instance */
     private OperationExecutionManager executionManager;
+    
+    private TxnManagerFactory txnManagerFactory;
+    
+    private boolean inited = false;
 
 
     public OperationExecutionManagerFactory( TxnManagerFactory txnManagerFactory )
     {
-        executionManager = new DefaultOperationExecutionManager( txnManagerFactory );
+    	this.txnManagerFactory = txnManagerFactory;
+    	
+    	this.init();
     }
 
 
+    public void init()
+    {
+    	if ( inited )
+    		return;
+    	
+    	executionManager = new DefaultOperationExecutionManager( txnManagerFactory );
+    }
+    
     public void shutdown()
     {
-        //do nothing;
+        inited = false;
     }
 
 
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
index 957edb1..eb42c50 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
@@ -61,6 +61,12 @@
     /** version of the logical data vseen by this txn */
     private long myLogicalDataVersion;
 
+    
+    public void setTxnId( long id )
+    {
+    	this.id = id;
+    }
+    
     public boolean isOptimisticLockHeld()
     {
         return isOptimisticLockHeld;
@@ -100,6 +106,7 @@
         id = counter.getAndIncrement();
     }
 
+    
 
     /**
      * {@inheritDoc}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java
index 627710a..4ec1789 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java
@@ -44,7 +44,7 @@
  * 
  * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
  */
-public class DefaultTxnLogManager implements TxnLogManager
+public class DefaultTxnLogManager implements TxnLogManagerInternal
 {
     /** Write ahead log */
     private Log wal;
@@ -103,13 +103,13 @@
         ReadWriteTxn txn = ( ReadWriteTxn ) curTxn;
         UserLogRecord logRecord = txn.getUserLogRecord();
 
+        ( ( AbstractLogEdit ) logEdit ).setTxnID( txn.getId() );   
         logEdit.injectData( logRecord, UserLogRecord.LogEditType.DATA );
+        
+        logEdit.getLogAnchor().resetLogAnchor( logRecord.getLogAnchor() );
 
         log( logRecord, sync );
 
-        logEdit.getLogAnchor().resetLogAnchor( logRecord.getLogAnchor() );
-        ( ( AbstractLogEdit ) logEdit ).setTxnID( txn.getId() );
-
         txn.addLogEdit( logEdit );
     }
 
@@ -265,6 +265,10 @@
         addDnSet( baseDn, scope, false );
     }
 
+    public Log getWAL()
+    {
+    	return wal;
+    }
 
     private void addDnSet( Dn baseDn, SearchScope scope, boolean read )
     {
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
index 301190c..3a3434e 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
@@ -20,7 +20,9 @@
 package org.apache.directory.server.core.shared.txn;
 
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -35,15 +37,22 @@
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import org.apache.directory.server.core.api.log.Log;
 import org.apache.directory.server.core.api.log.LogAnchor;
+import org.apache.directory.server.core.api.log.LogScanner;
 import org.apache.directory.server.core.api.log.UserLogRecord;
 import org.apache.directory.server.core.api.partition.Partition;
+import org.apache.directory.server.core.api.schema.SchemaPartition;
 import org.apache.directory.server.core.api.txn.TxnConflictException;
 import org.apache.directory.server.core.api.txn.TxnHandle;
 import org.apache.directory.server.core.api.txn.TxnLogManager;
 import org.apache.directory.server.core.api.txn.logedit.LogEdit;
+import org.apache.directory.server.core.api.txn.logedit.LogEdit.EditType;
+import org.apache.directory.server.core.shared.txn.logedit.DataChangeContainer;
 import org.apache.directory.server.core.shared.txn.logedit.TxnStateChange;
+import org.apache.directory.server.core.shared.txn.logedit.TxnStateChange.ChangeState;
 import org.apache.directory.shared.ldap.model.exception.LdapException;
+import org.apache.directory.shared.ldap.model.name.Dn;
 
 
 /**
@@ -55,6 +64,9 @@
 {
     /** wal log manager */
     private TxnLogManager txnLogManager;
+    
+    /** Write ahead log */
+    private Log wal;
 
     /** List of committed txns in commit LSN order */
     private ConcurrentLinkedQueue<ReadWriteTxn> committedQueue = new ConcurrentLinkedQueue<ReadWriteTxn>();
@@ -86,8 +98,14 @@
     /** Flush lock */
     private Lock flushLock = new ReentrantLock();
 
-    /** Number of flushes */
+    /** Number of flushed txns */
+    private int numFlushedTxns;
+    
+    /** Number of flushed */
     private int numFlushes;
+    
+    /** Take a checkpoint every 1000 flushes ~100 secs */
+    private final static int DEFAULT_FLUSH_ROUNDS = 1000;
 
     /** Flush Condition object */
     private Condition flushCondition = flushLock.newCondition();
@@ -107,7 +125,16 @@
     private AtomicInteger pending = new AtomicInteger();
 
     /** Logical data version number */
-    private long logicalDataVersion = 0;
+    private long logicalDataVersion;
+    
+    /** Initial scan point into the logs */
+    private LogAnchor initialScanPoint;
+    
+    /** Initial set of committed txns */
+    private HashSet<Long> txnsToRecover = new HashSet<Long>(); 
+    
+    /** last flushed log anchor */
+    private LogAnchor lastFlushedLogAnchor;
 
     /** Per thread txn context */
     static final ThreadLocal<Transaction> txnVar =
@@ -130,41 +157,66 @@
      * @param idComparator
      * @param idSerializer
      */
-    public void init( TxnLogManager txnLogManager )
+    public void init( TxnLogManagerInternal txnLogManager )
     {
         this.txnLogManager = txnLogManager;
+        wal = txnLogManager.getWAL();
         flushInterval = DEFAULT_FLUSH_INTERVAL;
+        
+        
+        committedQueue.clear();
+        latestFlushedTxnLSN.set( LogAnchor.UNKNOWN_LSN );
+        txnsToRecover.clear();
+        logicalDataVersion = 0;
+        lastFlushedLogAnchor = new LogAnchor();
 
-        dummyTxn.commitTxn( LogAnchor.UNKNOWN_LSN );
+        initialScanPoint = wal.getCheckPoint();
+        lastFlushedLogAnchor.resetLogAnchor( initialScanPoint );
+        
+        dummyTxn.commitTxn( initialScanPoint.getLogLSN() );
         latestCommittedTxn.set( dummyTxn );
         latestVerifiedTxn.set( dummyTxn );
         committedQueue.offer( dummyTxn );
+        
+        getTxnsToReover();
 
-        syncer = new LogSyncer();
-        syncer.setDaemon( true );
-        syncer.start();
+        if ( syncer == null )
+        {
+        	syncer = new LogSyncer();
+        	syncer.setDaemon( true );
+        	syncer.start();
+        }
     }
 
 
     public void shutdown()
     {
+    	System.out.println("in shutdown");
         syncer.interrupt();
 
         try
         {
-            syncer.join();
+        	syncer.join();
         }
         catch ( InterruptedException e )
         {
-            // Ignore
+        	//Ignore
         }
+    	
 
         // Do a best effort last flush
         flushLock.lock();
 
         try
         {
-            flushTxns();
+        	ReadWriteTxn latestCommitted = latestCommittedTxn.get();
+            long latestFlushedLsn = latestFlushedTxnLSN.get();
+            
+            System.out.println("latest committed txn " + latestCommitted.getCommitTime() + 
+            		" latest flushed " + latestFlushedLsn);
+            //flushTxns();
+            
+            //advanceCheckPoint( lastFlushedLogAnchor );
         }
         catch ( Exception e )
         {
@@ -174,7 +226,7 @@
         {
             flushLock.unlock();
         }
-
+        
         syncer = null;
     }
 
@@ -616,6 +668,7 @@
         {
             txnLogManager.log( logRecord, false );
             txn.startTxn( logRecord.getLogAnchor().getLogLSN(), logicalDataVersion );
+            txn.setTxnId( logRecord.getLogAnchor().getLogLSN() );
 
             do
             {
@@ -852,6 +905,8 @@
      */
     private void flushTxns() throws Exception
     {
+    	UserLogRecord lastLogRecord = null;
+    	
         // If flushing failed already, dont do anything anymore
         if ( flushFailed )
         {
@@ -879,6 +934,8 @@
                 txnToFlush.flushLogEdits( flushedToPartitions );
 
                 latestFlushedTxnLSN.set( txnToFlush.getCommitTime() );
+                
+                lastLogRecord = txnToFlush.getUserLogRecord();
             }
 
             if ( txnToFlush == latestCommitted )
@@ -887,7 +944,7 @@
                 break;
             }
 
-            numFlushes++;
+            numFlushedTxns++;
 
             //           if (  numFlushes % 100 == 0 )
             //           {
@@ -919,11 +976,147 @@
         {
             partitionIt.next().sync();
         }
+        
+        numFlushes++;
+        
+        if ( lastLogRecord != null )
+        {
+        	lastFlushedLogAnchor.resetLogAnchor(lastLogRecord.getLogAnchor());
+        }
+        
+        if (numFlushes % DEFAULT_FLUSH_ROUNDS == 0 )
+        {
+        	advanceCheckPoint( lastFlushedLogAnchor );
+        }
 
     }
+    
+    
+    private void advanceCheckPoint( LogAnchor checkPoint )
+    {
+    	wal.advanceCheckPoint(checkPoint);
+    }
+    
+    
+    private void getTxnsToReover()
+    {
+    	LogScanner logScanner = wal.beginScan( initialScanPoint );
+    	UserLogRecord logRecord = new UserLogRecord();
+    	byte userRecord[]; 
+    	
+    	System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
+    	
+    	try
+    	{
+	    	while ( logScanner.getNextRecord( logRecord ) )
+	        {
+	            userRecord = logRecord.getDataBuffer();
+	            ObjectInputStream in = buildStream( userRecord );
+	            
+	            EditType editType = EditType.values()[in.read()];
+	           
+	            
+	            if (editType == EditType.TXN_MARKER)
+	            {
+	            	TxnStateChange stateChange = new TxnStateChange();
+	            	stateChange.readExternal(in);
+	            	
+	            	if ( stateChange.getTxnState() == ChangeState.TXN_COMMIT )
+	            	{
+	            		System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe recovered txns");
+	            		txnsToRecover.add( new Long( stateChange.getTxnID() ) );
+	            	}
+	            }
+	        }
+    	}
+    	catch ( Exception e )
+    	{
+    		e.printStackTrace();
+    		// Ignore
+    	}
+    }
+    
+    
+    // Walk over the txn log records from the latest checkpoint and apply the
+    // log records to the partition
+    public void recoverPartition( Partition partition )
+    {
+    	Dn partitionSuffix = partition.getSuffixDn();
+    	
+    	System.out.println("Recover partition " + partitionSuffix);
+    	
+    	LogScanner logScanner = wal.beginScan( initialScanPoint );
+    	UserLogRecord logRecord = new UserLogRecord();
+    	byte userRecord[]; 
+    	
+    	boolean recoveredChanges = false;
+    	
+    	try
+    	{
+	    	while ( logScanner.getNextRecord( logRecord ) )
+	        {
+	            userRecord = logRecord.getDataBuffer();
+	            ObjectInputStream in = buildStream( userRecord );
+	            
+	            EditType editType = EditType.values()[in.read()];
+	            
+	            if (editType == EditType.DATA_CHANGE)
+	            {
+	            	DataChangeContainer dataChangeContainer = new DataChangeContainer();
+	            	dataChangeContainer.readExternal(in);
+	            	
+	            	System.out.println("Data change container for " + dataChangeContainer.getPartitionDn() + 
+	            			" txn id " + dataChangeContainer.getTxnID() );
+	            	
+	            	// If this change is for the partition we are tyring to recover 
+	                // and belongs to a txn that committed, then 
+	            	Long txnID = new Long( dataChangeContainer.getTxnID() );
+	         
+	            	if ( txnsToRecover.contains( txnID ) )
+	                {
+	            		if(	dataChangeContainer.getPartitionDn().equals( partitionSuffix ) )
+	            		{
+	            			System.out.println("Apply change to partition " + partitionSuffix);
+	            			dataChangeContainer.setPartition( partition );
+	            			dataChangeContainer.apply( true );
+	            			recoveredChanges = true;
+	            		}
+	                }
+	            }
+	        }
+	    	
+	    	if ( recoveredChanges && partition instanceof SchemaPartition )
+	    	{
+	    		( (SchemaPartition) partition ).getSchemaManager().reloadAllEnabled();
+	    	}
+    	}
+    	catch ( Exception e )
+    	{
+    		e.printStackTrace();
+    		// Ignore for now
+    	}
+    }
+    
+    
+    private ObjectInputStream buildStream( byte[] buffer ) throws IOException
+    {
+        ObjectInputStream oIn = null;
+        ByteArrayInputStream in = new ByteArrayInputStream( buffer );
+
+        try
+        {
+            oIn = new ObjectInputStream( in );
+
+            return oIn;
+        }
+        catch ( IOException ioe )
+        {
+            throw ioe;
+        }
+    }
 
     class LogSyncer extends Thread
-    {
+    {	
         @Override
         public void run()
         {
@@ -931,8 +1124,8 @@
 
             try
             {
-                while ( !this.isInterrupted() )
-                {
+                while ( true )
+                {	
                     flushCondition.await( flushInterval, TimeUnit.MILLISECONDS );
                     flushTxns();
                 }
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
index 26d34ca..7fdfb9e 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
@@ -37,6 +37,12 @@
 /** Package protected */
 interface Transaction extends TxnHandle
 {
+	/**
+	 * Set the txn id. Used to change the txn id to the log lsn
+	 * @param id new txn id
+	 */
+	void setTxnId( long id );
+	
     /**
      * returns TRUE if optimisticLock held, false otherwise
      *
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnLogManagerInternal.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnLogManagerInternal.java
new file mode 100644
index 0000000..49f3e9a
--- /dev/null
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnLogManagerInternal.java
@@ -0,0 +1,13 @@
+package org.apache.directory.server.core.shared.txn;
+
+import org.apache.directory.server.core.api.log.Log;
+import org.apache.directory.server.core.api.txn.TxnLogManager;
+
+interface TxnLogManagerInternal extends TxnLogManager
+{
+	/**
+	 * 
+	 * @return return the wal log manager used by the txnlogmanager
+	 */
+	Log getWAL();
+}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
index af47205..4569e62 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
@@ -39,12 +39,22 @@
     private TxnManagerInternal txnManager;
 
     /** The only txn log manager */
-    private TxnLogManager txnLogManager;
+    private TxnLogManagerInternal txnLogManager;
+    
+    /** WAL */
+    private Log log;
 
     /** log suffix */
     private String LOG_SUFFIX = "log";
 
     private boolean inited;
+    
+    
+    private String logFolderPath;
+    
+    private int logBufferSize;
+    
+    private long logFileSize;
 
 
     /**
@@ -59,25 +69,39 @@
     public TxnManagerFactory( String logFolderPath,
         int logBufferSize, long logFileSize ) throws IOException
     {
-        Log log = new DefaultLog();
-
-        try
-        {
-            log.init( logFolderPath, LOG_SUFFIX, logBufferSize, logFileSize );
-        }
-        catch ( InvalidLogException e )
-        {
-            throw new IOException( e );
-        }
+    	this.logFolderPath = logFolderPath;
+    	this.logBufferSize = logBufferSize;
+    	this.logFileSize = logFileSize;
+    	
+        log = new DefaultLog();
 
         txnManager = new DefaultTxnManager();
 
         txnLogManager = new DefaultTxnLogManager( log, this );
-
-        ( ( DefaultTxnManager ) txnManager ).init( txnLogManager );
-
-        inited = true;
-
+        
+        this.init();
+    }
+    
+    
+    public void init() throws IOException
+    {
+    	if ( inited )
+    	{
+    		return;
+    	}
+    	
+    	try
+    	{
+    		log.init( logFolderPath, LOG_SUFFIX, logBufferSize, logFileSize );
+        }	
+        catch ( InvalidLogException e )
+        {
+        	throw new IOException( e );
+        }	
+    	
+    	( ( DefaultTxnManager ) txnManager ).init(txnLogManager);
+    	 
+    	inited = true;
     }
 
 
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java
index f5327fc..829f9f6 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java
@@ -97,6 +97,11 @@
     {
         return partition;
     }
+    
+    public void setPartition( Partition partition )
+    {
+    	this.partition = partition;
+    }
 
 
     public UUID getEntryID()
@@ -154,18 +159,23 @@
             {
                 EntryModification entryModification = ( EntryModification ) change;
 
-                curEntry = entryModification.applyModification( partition, curEntry, entryID, changeLsn, false );
+                curEntry = entryModification.applyModification( partition, curEntry, entryID, changeLsn, recovery );
             }
             else
             {
                 IndexModification indexModification = ( IndexModification ) change;
-                indexModification.applyModification( partition, false );
+                indexModification.applyModification( partition, recovery );
             }
         }
 
         if ( curEntry != null )
         {
             MasterTable master = partition.getMasterTable();
+            
+            if ( !curEntry.isSchemaAware() )
+            {
+            	curEntry.apply( partition.getSchemaManager() );
+            }
             master.put( entryID, curEntry );
         }
         else
@@ -312,6 +322,8 @@
     {
         DataChange change;
 
+        out.write(EditType.DATA_CHANGE.ordinal());
+        
         if ( entryID != null )
         {
             out.writeBoolean( true );
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java
index 9982f71..f08fa46 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java
@@ -99,6 +99,8 @@
     @Override
     public void writeExternal( ObjectOutput out ) throws IOException
     {
+    	out.write(EditType.TXN_MARKER.ordinal());
+    	
         out.writeLong( txnID );
         out.write( txnState.ordinal() );
     }
diff --git a/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java b/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
index adad548..c21ac6b 100644
--- a/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
+++ b/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
@@ -1358,11 +1358,16 @@
             txnManagerFactory = new TxnManagerFactory( getInstanceLayout().getTxnLogDirectory().getPath(),
                 TXN_LOG_BUFFER_SIZE, TXN_LOG_FILE_SIZE );
         }
-
+        
+        txnManagerFactory.init();
+        
+        
         if ( executionManagerFactory == null )
         {
             executionManagerFactory = new OperationExecutionManagerFactory( txnManagerFactory );
         }
+        
+        executionManagerFactory.init();
 
         initialize();
         showSecurityWarnings();
@@ -1417,7 +1422,7 @@
         {
             return;
         }
-
+        
         // --------------------------------------------------------------------
         // Shutdown the txnManager
         //
diff --git a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java
index 43ee99c..69ec772 100644
--- a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java
+++ b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java
@@ -184,7 +184,7 @@
         String path = new File( this.wkDirPath, attributeType.getOid() ).getAbsolutePath();
 
         BaseRecordManager base = new BaseRecordManager( path );
-        base.disableTransactions();
+        //base.disableTransactions();
         this.recMan = new SnapshotRecordManager( base, DEFAULT_INDEX_CACHE_SIZE );
 
         try
diff --git a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java
index bf63d63..fd3710f 100644
--- a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java
+++ b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java
@@ -135,7 +135,7 @@
             // First, check if the file storing the data exists
             String path = partitionDir.getPath() + File.separator + "master";
             BaseRecordManager baseRecordManager = new BaseRecordManager( path );
-            baseRecordManager.disableTransactions();
+            //baseRecordManager.disableTransactions();
     
             if ( cacheSize < 0 )
             {
@@ -188,6 +188,9 @@
     
             deleteUnusedIndexFiles( allIndices, allIndexDbFiles );
             
+            // Apply the txn logs
+            txnManagerFactory.txnManagerInstance().recoverPartition( this );
+            
             // We are done !
             initialized = true;
         }
diff --git a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java
index 92d23db..8704a57 100644
--- a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java
+++ b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java
@@ -96,7 +96,7 @@
         
         //System.out.println( "IDX Created index " + path );
         BaseRecordManager base = new BaseRecordManager( path );
-        base.disableTransactions();
+        //base.disableTransactions();
         this.recMan = new SnapshotRecordManager( base, cacheSize );
 
         try
diff --git a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java
index 3f07e76..71d4832 100644
--- a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java
+++ b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java
@@ -156,6 +156,9 @@
             if ( suffixDirectory.exists() )
             {
                 loadEntries( partitionDir );
+                
+                // Apply the txn logs
+                txnManagerFactory.txnManagerInstance().recoverPartition( this );
             }
             else
             {
@@ -195,7 +198,7 @@
 
                     // And add this entry to the underlying partition
                     AddOperationContext addContext = new AddOperationContext( schemaManager, contextEntry );
-                    add( addContext );
+                    executionManager.add( this, addContext );
                 }
             }
         }
diff --git a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java
index 77a907e..1eb8da8 100644
--- a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java
+++ b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java
@@ -125,6 +125,9 @@
             super.doInit();
 
             loadEntries();
+            
+            // Apply the txn logs
+            txnManagerFactory.txnManagerInstance().recoverPartition( this );
         }
     }
 
diff --git a/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java b/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java
index d12def0..d72b618 100644
--- a/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java
+++ b/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java
@@ -166,7 +166,7 @@
         LOG.info( "using partition dir {}", partitionsDir.getAbsolutePath() );
 
         txnManagerFactory = new TxnManagerFactory( instanceLayout.getTxnLogDirectory().getPath(), 
-            DirectoryService.TXN_LOG_BUFFER_SIZE, DirectoryService.TXN_LOG_FILE_SIZE );
+            DirectoryService.TXN_LOG_BUFFER_SIZE, DirectoryService.TXN_LOG_FILE_SIZE );      
         executionManagerFactory = new OperationExecutionManagerFactory( txnManagerFactory );
         
         initSchemaManager( instanceLayout );