Changes to implement crash recovery. Has been tested with a couple of tests doing service shutdown/startup. Will implement a test specific for this.
git-svn-id: https://svn.apache.org/repos/asf/directory/apacheds/branches/apacheds-txns@1351037 13f79535-47bb-0310-9956-ffa450edef68
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java b/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java
index 57586a7..93b06bf 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/log/Log.java
@@ -72,16 +72,23 @@
* @return A scanner to read the logs one by one
*/
LogScanner beginScan();
-
-
+
+
/**
- * Advances the min needed position in the logs. Logging subsystem uses this
- * information to get rid of unneeded
- *
- * @param newAnchor The new position
+ * Advances the checkpoint in the logs
+ *
+ * @param checkPoint min needed position the caller needs in the logs.
*/
- void advanceMinNeededLogPosition( LogAnchor newAnchor );
+ void advanceCheckPoint( LogAnchor checkPoint );
+
+ /**
+ * Return the current checkpoint anchor
+ *
+ * @return the current checkpoint anchor
+ */
+ LogAnchor getCheckPoint();
+
/**
* Synchronizes the log up to the given LSN. If LSN is equal to unknown
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java b/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java
index 7ec6224..3685ab8 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/schema/SchemaPartition.java
@@ -365,7 +365,7 @@
if ( entry == null )
{
LookupOperationContext lookupCtx = new LookupOperationContext( modifyContext.getSession(), modifyContext.getDn() );
- entry = wrapped.lookup( lookupCtx );
+ entry = modifyContext.getSession().getDirectoryService().getPartitionNexus().lookup( lookupCtx );;
modifyContext.setEntry( entry );
}
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java
index f55257d..75f1e9a 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnLogManager.java
@@ -24,6 +24,7 @@
import java.util.Comparator;
import java.util.UUID;
+import org.apache.directory.server.core.api.log.LogAnchor;
import org.apache.directory.server.core.api.log.UserLogRecord;
import org.apache.directory.server.core.api.partition.index.Index;
import org.apache.directory.server.core.api.partition.index.IndexComparator;
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
index 285d78b..83f7966 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/txn/TxnManager.java
@@ -19,6 +19,7 @@
*/
package org.apache.directory.server.core.api.txn;
+import org.apache.directory.server.core.api.partition.Partition;
import org.apache.directory.shared.ldap.model.exception.LdapException;
@@ -131,5 +132,11 @@
* @return TRUE if txn needs to do logical data reinit
*/
boolean prepareForLogicalDataReinit();
+
+
+ /**
+ * Recovers the given partition
+ */
+ void recoverPartition( Partition partition );
}
diff --git a/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java b/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java
index 8672d5f..19880d2 100644
--- a/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java
+++ b/core-api/src/main/java/org/apache/directory/server/core/api/txn/logedit/LogEdit.java
@@ -32,6 +32,12 @@
*/
public interface LogEdit extends Externalizable
{
+ public enum EditType
+ {
+ DATA_CHANGE,
+ TXN_MARKER
+ }
+
/**
* Returns the position the edit is inserted in the wal.
* Log anchor is initialized is set after the edit is serialized and inserted into
diff --git a/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java b/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java
index 8074837..89c54c0 100644
--- a/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java
+++ b/core-integ/src/test/java/org/apache/directory/server/core/authn/SimpleAuthenticationIT.java
@@ -34,6 +34,7 @@
import org.apache.directory.server.core.integ.FrameworkRunner;
import org.apache.directory.server.core.integ.IntegrationUtils;
import org.apache.directory.shared.ldap.model.entry.Attribute;
+import org.apache.directory.shared.ldap.model.entry.DefaultEntry;
import org.apache.directory.shared.ldap.model.entry.Entry;
import org.apache.directory.shared.ldap.model.exception.LdapAuthenticationException;
import org.apache.directory.shared.ldap.model.message.ModifyRequest;
@@ -93,6 +94,7 @@
performAdminAccountChecks( entry );
assertTrue( ArrayUtils.isEquals( entry.get( "userPassword" ).get().getBytes(), Strings
.getBytesUtf8("secret") ) );
+
connection.close();
getService().shutdown();
@@ -103,6 +105,9 @@
performAdminAccountChecks( entry );
assertTrue( ArrayUtils.isEquals( entry.get( "userPassword" ).get().getBytes(), Strings
.getBytesUtf8("secret") ) );
+
+
+
connection.close();
}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java
index 60040b4..b6e810d 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/DefaultLog.java
@@ -106,16 +106,30 @@
return logScanner;
}
+
+ /**
+ * {@inheritDoc}
+ */
+ public void advanceCheckPoint( LogAnchor checkPoint )
+ {
+ if ( checkPoint == null )
+ {
+ return;
+ }
+
+ logManager.advanceCheckPoint( checkPoint );
+ }
+
/**
* {@inheritDoc}
*/
- public void advanceMinNeededLogPosition( LogAnchor newAnchor )
+ public LogAnchor getCheckPoint()
{
- logManager.advanceMinLogAnchor( newAnchor );
+ return logManager.getCheckPoint();
}
-
-
+
+
/**
* {@inheritDoc}
*/
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java
index 2151f77..dc349c3 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogFlushManager.java
@@ -109,6 +109,8 @@
this.logManager = logManager;
logBuffer = new LogBuffer( logBufferSize, currentLogFile );
+
+ logLSN = logManager.getInitialLsn();
}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
index 7fad2d8..4c2b878 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/log/LogManager.java
@@ -90,6 +90,9 @@
/** The Checksum used */
private Checksum checksum = new Adler32();
+
+ /** Max lsn in the log after recovery */
+ private long initialLsn = Long.MIN_VALUE ;
/**
@@ -163,6 +166,9 @@
{
scanner.close();
}
+
+ initialLsn = logRecord.getLogAnchor().getLogLSN();
+ System.out.println(" Log manager inital lsn " + initialLsn);
long lastGoodLogFileNumber = scanner.getLastGoodFileNumber();
long lastGoodLogFileOffset = scanner.getLastGoodOffset();
@@ -227,6 +233,8 @@
createNextLogFile( true );
}
}
+
+ return;
}
/*
@@ -267,6 +275,15 @@
/**
+ *
+ * @return return the max lsn in the log after recovery
+ */
+ public long getInitialLsn()
+ {
+ return initialLsn;
+ }
+
+ /**
* Called by LogFlushManager to switch to the next file.
*
* Note:Currently we do a checkpoint and delete unnecessary log files when we switch to a new file. Some
@@ -298,44 +315,55 @@
return writer;
}
-
- /**
- * @return The anchor associated with the last valid checkpoint.
- */
- /* Package protected */LogAnchor getMinLogAnchor()
- {
- minLogAnchorLock.lock();
- LogAnchor anchor = new LogAnchor();
- anchor.resetLogAnchor( minLogAnchor );
- minLogAnchorLock.unlock();
-
- return anchor;
- }
-
-
/**
* Called when the logging subsystem is notified about the minimum position
* in the log files that is needed. Log manager uses this information to advance
* its checkpoint and delete unnecessary log files.
*
- * @param newLogAnchor min needed log anchor
+ * @param newCheckPoint min needed log anchor
*/
- public void advanceMinLogAnchor( LogAnchor newLogAnchor )
+ public void advanceCheckPoint( LogAnchor newCheckPoint )
{
- if ( newLogAnchor == null )
+ if ( newCheckPoint == null )
{
return;
}
minLogAnchorLock.lock();
- if ( anchorComparator.compare( minLogAnchor, newLogAnchor ) < 0 )
+ if ( anchorComparator.compare( minLogAnchor, newCheckPoint ) < 0 )
{
- minLogAnchor.resetLogAnchor( newLogAnchor );
+ minLogAnchor.resetLogAnchor( newCheckPoint );
+ }
+
+ try
+ {
+ writeControlFile();
+ }
+ catch ( IOException e )
+ {
+ // Ignore
}
minLogAnchorLock.unlock();
}
+
+ /**
+ *
+ * @return the current with the checkpoint log achor
+ */
+ public LogAnchor getCheckPoint()
+ {
+ LogAnchor anchor = new LogAnchor();
+
+ minLogAnchorLock.lock();
+
+ anchor.resetLogAnchor( minLogAnchor );
+
+ minLogAnchorLock.unlock();
+
+ return anchor;
+ }
/**
@@ -359,15 +387,10 @@
*/
private void writeControlFile() throws IOException
{
- // Copy the min log file anchor
- minLogAnchorLock.lock();
-
controlFileRecord.minNeededLogFile = minLogAnchor.getLogFileNumber();
controlFileRecord.minNeededLogFileOffset = minLogAnchor.getLogFileOffset();
controlFileRecord.minNeededLSN = minLogAnchor.getLogLSN();
-
- minLogAnchorLock.unlock();
-
+
if ( controlFileRecord.minNeededLogFile > controlFileRecord.minExistingLogFile )
{
deleteUnnecessaryLogFiles( controlFileRecord.minExistingLogFile, controlFileRecord.minNeededLogFile );
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java
index 9fff0b1..a2941cc 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/DefaultOperationExecutionManager.java
@@ -504,6 +504,7 @@
}
catch ( Exception e )
{
+ e.printStackTrace();
throw new LdapOperationErrorException( e.getMessage(), e );
}
}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java
index 41362a6..651fcec 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/partition/OperationExecutionManagerFactory.java
@@ -33,17 +33,31 @@
{
/** Operation Manager instance */
private OperationExecutionManager executionManager;
+
+ private TxnManagerFactory txnManagerFactory;
+
+ private boolean inited = false;
public OperationExecutionManagerFactory( TxnManagerFactory txnManagerFactory )
{
- executionManager = new DefaultOperationExecutionManager( txnManagerFactory );
+ this.txnManagerFactory = txnManagerFactory;
+
+ this.init();
}
+ public void init()
+ {
+ if ( inited )
+ return;
+
+ executionManager = new DefaultOperationExecutionManager( txnManagerFactory );
+ }
+
public void shutdown()
{
- //do nothing;
+ inited = false;
}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
index 957edb1..eb42c50 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/AbstractTransaction.java
@@ -61,6 +61,12 @@
/** version of the logical data vseen by this txn */
private long myLogicalDataVersion;
+
+ public void setTxnId( long id )
+ {
+ this.id = id;
+ }
+
public boolean isOptimisticLockHeld()
{
return isOptimisticLockHeld;
@@ -100,6 +106,7 @@
id = counter.getAndIncrement();
}
+
/**
* {@inheritDoc}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java
index 627710a..4ec1789 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnLogManager.java
@@ -44,7 +44,7 @@
*
* @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
*/
-public class DefaultTxnLogManager implements TxnLogManager
+public class DefaultTxnLogManager implements TxnLogManagerInternal
{
/** Write ahead log */
private Log wal;
@@ -103,13 +103,13 @@
ReadWriteTxn txn = ( ReadWriteTxn ) curTxn;
UserLogRecord logRecord = txn.getUserLogRecord();
+ ( ( AbstractLogEdit ) logEdit ).setTxnID( txn.getId() );
logEdit.injectData( logRecord, UserLogRecord.LogEditType.DATA );
+
+ logEdit.getLogAnchor().resetLogAnchor( logRecord.getLogAnchor() );
log( logRecord, sync );
- logEdit.getLogAnchor().resetLogAnchor( logRecord.getLogAnchor() );
- ( ( AbstractLogEdit ) logEdit ).setTxnID( txn.getId() );
-
txn.addLogEdit( logEdit );
}
@@ -265,6 +265,10 @@
addDnSet( baseDn, scope, false );
}
+ public Log getWAL()
+ {
+ return wal;
+ }
private void addDnSet( Dn baseDn, SearchScope scope, boolean read )
{
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
index 301190c..3a3434e 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/DefaultTxnManager.java
@@ -20,7 +20,9 @@
package org.apache.directory.server.core.shared.txn;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -35,15 +37,22 @@
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.directory.server.core.api.log.Log;
import org.apache.directory.server.core.api.log.LogAnchor;
+import org.apache.directory.server.core.api.log.LogScanner;
import org.apache.directory.server.core.api.log.UserLogRecord;
import org.apache.directory.server.core.api.partition.Partition;
+import org.apache.directory.server.core.api.schema.SchemaPartition;
import org.apache.directory.server.core.api.txn.TxnConflictException;
import org.apache.directory.server.core.api.txn.TxnHandle;
import org.apache.directory.server.core.api.txn.TxnLogManager;
import org.apache.directory.server.core.api.txn.logedit.LogEdit;
+import org.apache.directory.server.core.api.txn.logedit.LogEdit.EditType;
+import org.apache.directory.server.core.shared.txn.logedit.DataChangeContainer;
import org.apache.directory.server.core.shared.txn.logedit.TxnStateChange;
+import org.apache.directory.server.core.shared.txn.logedit.TxnStateChange.ChangeState;
import org.apache.directory.shared.ldap.model.exception.LdapException;
+import org.apache.directory.shared.ldap.model.name.Dn;
/**
@@ -55,6 +64,9 @@
{
/** wal log manager */
private TxnLogManager txnLogManager;
+
+ /** Write ahead log */
+ private Log wal;
/** List of committed txns in commit LSN order */
private ConcurrentLinkedQueue<ReadWriteTxn> committedQueue = new ConcurrentLinkedQueue<ReadWriteTxn>();
@@ -86,8 +98,14 @@
/** Flush lock */
private Lock flushLock = new ReentrantLock();
- /** Number of flushes */
+ /** Number of flushed txns */
+ private int numFlushedTxns;
+
+ /** Number of flushed */
private int numFlushes;
+
+ /** Take a checkpoint every 1000 flushes ~100 secs */
+ private final static int DEFAULT_FLUSH_ROUNDS = 1000;
/** Flush Condition object */
private Condition flushCondition = flushLock.newCondition();
@@ -107,7 +125,16 @@
private AtomicInteger pending = new AtomicInteger();
/** Logical data version number */
- private long logicalDataVersion = 0;
+ private long logicalDataVersion;
+
+ /** Initial scan point into the logs */
+ private LogAnchor initialScanPoint;
+
+ /** Initial set of committed txns */
+ private HashSet<Long> txnsToRecover = new HashSet<Long>();
+
+ /** last flushed log anchor */
+ private LogAnchor lastFlushedLogAnchor;
/** Per thread txn context */
static final ThreadLocal<Transaction> txnVar =
@@ -130,41 +157,66 @@
* @param idComparator
* @param idSerializer
*/
- public void init( TxnLogManager txnLogManager )
+ public void init( TxnLogManagerInternal txnLogManager )
{
this.txnLogManager = txnLogManager;
+ wal = txnLogManager.getWAL();
flushInterval = DEFAULT_FLUSH_INTERVAL;
+
+
+ committedQueue.clear();
+ latestFlushedTxnLSN.set( LogAnchor.UNKNOWN_LSN );
+ txnsToRecover.clear();
+ logicalDataVersion = 0;
+ lastFlushedLogAnchor = new LogAnchor();
- dummyTxn.commitTxn( LogAnchor.UNKNOWN_LSN );
+ initialScanPoint = wal.getCheckPoint();
+ lastFlushedLogAnchor.resetLogAnchor( initialScanPoint );
+
+ dummyTxn.commitTxn( initialScanPoint.getLogLSN() );
latestCommittedTxn.set( dummyTxn );
latestVerifiedTxn.set( dummyTxn );
committedQueue.offer( dummyTxn );
+
+ getTxnsToReover();
- syncer = new LogSyncer();
- syncer.setDaemon( true );
- syncer.start();
+ if ( syncer == null )
+ {
+ syncer = new LogSyncer();
+ syncer.setDaemon( true );
+ syncer.start();
+ }
}
public void shutdown()
{
+ System.out.println("in shutdown");
syncer.interrupt();
try
{
- syncer.join();
+ syncer.join();
}
catch ( InterruptedException e )
{
- // Ignore
+ //Ignore
}
+
// Do a best effort last flush
flushLock.lock();
try
{
- flushTxns();
+ ReadWriteTxn latestCommitted = latestCommittedTxn.get();
+ long latestFlushedLsn = latestFlushedTxnLSN.get();
+
+ System.out.println("latest committed txn " + latestCommitted.getCommitTime() +
+ " latest flushed " + latestFlushedLsn);
+ //flushTxns();
+
+ //advanceCheckPoint( lastFlushedLogAnchor );
}
catch ( Exception e )
{
@@ -174,7 +226,7 @@
{
flushLock.unlock();
}
-
+
syncer = null;
}
@@ -616,6 +668,7 @@
{
txnLogManager.log( logRecord, false );
txn.startTxn( logRecord.getLogAnchor().getLogLSN(), logicalDataVersion );
+ txn.setTxnId( logRecord.getLogAnchor().getLogLSN() );
do
{
@@ -852,6 +905,8 @@
*/
private void flushTxns() throws Exception
{
+ UserLogRecord lastLogRecord = null;
+
// If flushing failed already, dont do anything anymore
if ( flushFailed )
{
@@ -879,6 +934,8 @@
txnToFlush.flushLogEdits( flushedToPartitions );
latestFlushedTxnLSN.set( txnToFlush.getCommitTime() );
+
+ lastLogRecord = txnToFlush.getUserLogRecord();
}
if ( txnToFlush == latestCommitted )
@@ -887,7 +944,7 @@
break;
}
- numFlushes++;
+ numFlushedTxns++;
// if ( numFlushes % 100 == 0 )
// {
@@ -919,11 +976,147 @@
{
partitionIt.next().sync();
}
+
+ numFlushes++;
+
+ if ( lastLogRecord != null )
+ {
+ lastFlushedLogAnchor.resetLogAnchor(lastLogRecord.getLogAnchor());
+ }
+
+ if (numFlushes % DEFAULT_FLUSH_ROUNDS == 0 )
+ {
+ advanceCheckPoint( lastFlushedLogAnchor );
+ }
}
+
+
+ private void advanceCheckPoint( LogAnchor checkPoint )
+ {
+ wal.advanceCheckPoint(checkPoint);
+ }
+
+
+ private void getTxnsToReover()
+ {
+ LogScanner logScanner = wal.beginScan( initialScanPoint );
+ UserLogRecord logRecord = new UserLogRecord();
+ byte userRecord[];
+
+ System.out.println(" Get txns to recover " + initialScanPoint.getLogLSN() );
+
+ try
+ {
+ while ( logScanner.getNextRecord( logRecord ) )
+ {
+ userRecord = logRecord.getDataBuffer();
+ ObjectInputStream in = buildStream( userRecord );
+
+ EditType editType = EditType.values()[in.read()];
+
+
+ if (editType == EditType.TXN_MARKER)
+ {
+ TxnStateChange stateChange = new TxnStateChange();
+ stateChange.readExternal(in);
+
+ if ( stateChange.getTxnState() == ChangeState.TXN_COMMIT )
+ {
+ System.out.println("Adding txn " + stateChange.getTxnID() + " to the tobe recovered txns");
+ txnsToRecover.add( new Long( stateChange.getTxnID() ) );
+ }
+ }
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ // Ignore
+ }
+ }
+
+
+ // Walk over the txn log records from the latest checkpoint and apply the
+ // log records to the partition
+ public void recoverPartition( Partition partition )
+ {
+ Dn partitionSuffix = partition.getSuffixDn();
+
+ System.out.println("Recover partition " + partitionSuffix);
+
+ LogScanner logScanner = wal.beginScan( initialScanPoint );
+ UserLogRecord logRecord = new UserLogRecord();
+ byte userRecord[];
+
+ boolean recoveredChanges = false;
+
+ try
+ {
+ while ( logScanner.getNextRecord( logRecord ) )
+ {
+ userRecord = logRecord.getDataBuffer();
+ ObjectInputStream in = buildStream( userRecord );
+
+ EditType editType = EditType.values()[in.read()];
+
+ if (editType == EditType.DATA_CHANGE)
+ {
+ DataChangeContainer dataChangeContainer = new DataChangeContainer();
+ dataChangeContainer.readExternal(in);
+
+ System.out.println("Data change container for " + dataChangeContainer.getPartitionDn() +
+ " txn id " + dataChangeContainer.getTxnID() );
+
+ // If this change is for the partition we are tyring to recover
+ // and belongs to a txn that committed, then
+ Long txnID = new Long( dataChangeContainer.getTxnID() );
+
+ if ( txnsToRecover.contains( txnID ) )
+ {
+ if( dataChangeContainer.getPartitionDn().equals( partitionSuffix ) )
+ {
+ System.out.println("Apply change to partition " + partitionSuffix);
+ dataChangeContainer.setPartition( partition );
+ dataChangeContainer.apply( true );
+ recoveredChanges = true;
+ }
+ }
+ }
+ }
+
+ if ( recoveredChanges && partition instanceof SchemaPartition )
+ {
+ ( (SchemaPartition) partition ).getSchemaManager().reloadAllEnabled();
+ }
+ }
+ catch ( Exception e )
+ {
+ e.printStackTrace();
+ // Ignore for now
+ }
+ }
+
+
+ private ObjectInputStream buildStream( byte[] buffer ) throws IOException
+ {
+ ObjectInputStream oIn = null;
+ ByteArrayInputStream in = new ByteArrayInputStream( buffer );
+
+ try
+ {
+ oIn = new ObjectInputStream( in );
+
+ return oIn;
+ }
+ catch ( IOException ioe )
+ {
+ throw ioe;
+ }
+ }
class LogSyncer extends Thread
- {
+ {
@Override
public void run()
{
@@ -931,8 +1124,8 @@
try
{
- while ( !this.isInterrupted() )
- {
+ while ( true )
+ {
flushCondition.await( flushInterval, TimeUnit.MILLISECONDS );
flushTxns();
}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
index 26d34ca..7fdfb9e 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/Transaction.java
@@ -37,6 +37,12 @@
/** Package protected */
interface Transaction extends TxnHandle
{
+ /**
+ * Set the txn id. Used to change the txn id to the log lsn
+ * @param id new txn id
+ */
+ void setTxnId( long id );
+
/**
* returns TRUE if optimisticLock held, false otherwise
*
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnLogManagerInternal.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnLogManagerInternal.java
new file mode 100644
index 0000000..49f3e9a
--- /dev/null
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnLogManagerInternal.java
@@ -0,0 +1,13 @@
+package org.apache.directory.server.core.shared.txn;
+
+import org.apache.directory.server.core.api.log.Log;
+import org.apache.directory.server.core.api.txn.TxnLogManager;
+
+interface TxnLogManagerInternal extends TxnLogManager
+{
+ /**
+ *
+ * @return return the wal log manager used by the txnlogmanager
+ */
+ Log getWAL();
+}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
index af47205..4569e62 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/TxnManagerFactory.java
@@ -39,12 +39,22 @@
private TxnManagerInternal txnManager;
/** The only txn log manager */
- private TxnLogManager txnLogManager;
+ private TxnLogManagerInternal txnLogManager;
+
+ /** WAL */
+ private Log log;
/** log suffix */
private String LOG_SUFFIX = "log";
private boolean inited;
+
+
+ private String logFolderPath;
+
+ private int logBufferSize;
+
+ private long logFileSize;
/**
@@ -59,25 +69,39 @@
public TxnManagerFactory( String logFolderPath,
int logBufferSize, long logFileSize ) throws IOException
{
- Log log = new DefaultLog();
-
- try
- {
- log.init( logFolderPath, LOG_SUFFIX, logBufferSize, logFileSize );
- }
- catch ( InvalidLogException e )
- {
- throw new IOException( e );
- }
+ this.logFolderPath = logFolderPath;
+ this.logBufferSize = logBufferSize;
+ this.logFileSize = logFileSize;
+
+ log = new DefaultLog();
txnManager = new DefaultTxnManager();
txnLogManager = new DefaultTxnLogManager( log, this );
-
- ( ( DefaultTxnManager ) txnManager ).init( txnLogManager );
-
- inited = true;
-
+
+ this.init();
+ }
+
+
+ public void init() throws IOException
+ {
+ if ( inited )
+ {
+ return;
+ }
+
+ try
+ {
+ log.init( logFolderPath, LOG_SUFFIX, logBufferSize, logFileSize );
+ }
+ catch ( InvalidLogException e )
+ {
+ throw new IOException( e );
+ }
+
+ ( ( DefaultTxnManager ) txnManager ).init(txnLogManager);
+
+ inited = true;
}
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java
index f5327fc..829f9f6 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/DataChangeContainer.java
@@ -97,6 +97,11 @@
{
return partition;
}
+
+ public void setPartition( Partition partition )
+ {
+ this.partition = partition;
+ }
public UUID getEntryID()
@@ -154,18 +159,23 @@
{
EntryModification entryModification = ( EntryModification ) change;
- curEntry = entryModification.applyModification( partition, curEntry, entryID, changeLsn, false );
+ curEntry = entryModification.applyModification( partition, curEntry, entryID, changeLsn, recovery );
}
else
{
IndexModification indexModification = ( IndexModification ) change;
- indexModification.applyModification( partition, false );
+ indexModification.applyModification( partition, recovery );
}
}
if ( curEntry != null )
{
MasterTable master = partition.getMasterTable();
+
+ if ( !curEntry.isSchemaAware() )
+ {
+ curEntry.apply( partition.getSchemaManager() );
+ }
master.put( entryID, curEntry );
}
else
@@ -312,6 +322,8 @@
{
DataChange change;
+ out.write(EditType.DATA_CHANGE.ordinal());
+
if ( entryID != null )
{
out.writeBoolean( true );
diff --git a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java
index 9982f71..f08fa46 100644
--- a/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java
+++ b/core-shared/src/main/java/org/apache/directory/server/core/shared/txn/logedit/TxnStateChange.java
@@ -99,6 +99,8 @@
@Override
public void writeExternal( ObjectOutput out ) throws IOException
{
+ out.write(EditType.TXN_MARKER.ordinal());
+
out.writeLong( txnID );
out.write( txnState.ordinal() );
}
diff --git a/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java b/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
index adad548..c21ac6b 100644
--- a/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
+++ b/core/src/main/java/org/apache/directory/server/core/DefaultDirectoryService.java
@@ -1358,11 +1358,16 @@
txnManagerFactory = new TxnManagerFactory( getInstanceLayout().getTxnLogDirectory().getPath(),
TXN_LOG_BUFFER_SIZE, TXN_LOG_FILE_SIZE );
}
-
+
+ txnManagerFactory.init();
+
+
if ( executionManagerFactory == null )
{
executionManagerFactory = new OperationExecutionManagerFactory( txnManagerFactory );
}
+
+ executionManagerFactory.init();
initialize();
showSecurityWarnings();
@@ -1417,7 +1422,7 @@
{
return;
}
-
+
// --------------------------------------------------------------------
// Shutdown the txnManager
//
diff --git a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java
index 43ee99c..69ec772 100644
--- a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java
+++ b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmIndex.java
@@ -184,7 +184,7 @@
String path = new File( this.wkDirPath, attributeType.getOid() ).getAbsolutePath();
BaseRecordManager base = new BaseRecordManager( path );
- base.disableTransactions();
+ //base.disableTransactions();
this.recMan = new SnapshotRecordManager( base, DEFAULT_INDEX_CACHE_SIZE );
try
diff --git a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java
index bf63d63..fd3710f 100644
--- a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java
+++ b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmPartition.java
@@ -135,7 +135,7 @@
// First, check if the file storing the data exists
String path = partitionDir.getPath() + File.separator + "master";
BaseRecordManager baseRecordManager = new BaseRecordManager( path );
- baseRecordManager.disableTransactions();
+ //baseRecordManager.disableTransactions();
if ( cacheSize < 0 )
{
@@ -188,6 +188,9 @@
deleteUnusedIndexFiles( allIndices, allIndexDbFiles );
+ // Apply the txn logs
+ txnManagerFactory.txnManagerInstance().recoverPartition( this );
+
// We are done !
initialized = true;
}
diff --git a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java
index 92d23db..8704a57 100644
--- a/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java
+++ b/jdbm-partition/src/main/java/org/apache/directory/server/core/partition/impl/btree/jdbm/JdbmRdnIndex.java
@@ -96,7 +96,7 @@
//System.out.println( "IDX Created index " + path );
BaseRecordManager base = new BaseRecordManager( path );
- base.disableTransactions();
+ //base.disableTransactions();
this.recMan = new SnapshotRecordManager( base, cacheSize );
try
diff --git a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java
index 3f07e76..71d4832 100644
--- a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java
+++ b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/LdifPartition.java
@@ -156,6 +156,9 @@
if ( suffixDirectory.exists() )
{
loadEntries( partitionDir );
+
+ // Apply the txn logs
+ txnManagerFactory.txnManagerInstance().recoverPartition( this );
}
else
{
@@ -195,7 +198,7 @@
// And add this entry to the underlying partition
AddOperationContext addContext = new AddOperationContext( schemaManager, contextEntry );
- add( addContext );
+ executionManager.add( this, addContext );
}
}
}
diff --git a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java
index 77a907e..1eb8da8 100644
--- a/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java
+++ b/ldif-partition/src/main/java/org/apache/directory/server/core/partition/ldif/SingleFileLdifPartition.java
@@ -125,6 +125,9 @@
super.doInit();
loadEntries();
+
+ // Apply the txn logs
+ txnManagerFactory.txnManagerInstance().recoverPartition( this );
}
}
diff --git a/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java b/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java
index d12def0..d72b618 100644
--- a/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java
+++ b/service-osgi/src/main/java/org/apache/directory/server/ApacheDsService.java
@@ -166,7 +166,7 @@
LOG.info( "using partition dir {}", partitionsDir.getAbsolutePath() );
txnManagerFactory = new TxnManagerFactory( instanceLayout.getTxnLogDirectory().getPath(),
- DirectoryService.TXN_LOG_BUFFER_SIZE, DirectoryService.TXN_LOG_FILE_SIZE );
+ DirectoryService.TXN_LOG_BUFFER_SIZE, DirectoryService.TXN_LOG_FILE_SIZE );
executionManagerFactory = new OperationExecutionManagerFactory( txnManagerFactory );
initSchemaManager( instanceLayout );