| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| package org.apache.directory.server.core.shared.log; |
| |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.zip.Adler32; |
| import java.util.zip.Checksum; |
| |
| import org.apache.directory.server.core.api.log.InvalidLogException; |
| import org.apache.directory.server.core.api.log.LogAnchor; |
| import org.apache.directory.server.core.api.log.UserLogRecord; |
| import org.apache.directory.server.core.shared.log.LogFileManager.LogFileWriter; |
| import org.apache.directory.server.i18n.I18n; |
| |
| |
| /** |
| * Manages the flushing of log to media and scanning of logs. All appends to the log file go |
| * through this class. |
| * |
| * Internally it manages a circular buffer where appends initially go. Appends are first |
| * appended to this in memory circular log. As the in memory circular log fills up or as the |
| * user requests, memory buffer is flushed to the underlying media. |
| * |
| * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> |
| */ |
| /* Package protected */class LogFlushManager |
| { |
| /** Ever increasing logical log sequence number assigned to user log records. Bumped up under append lock */ |
| private long logLSN = Long.MIN_VALUE + 1; |
| |
| /** Memory buffer size in bytes */ |
| private final int logBufferSize; |
| |
| /** Synchronizes appends */ |
| private final Lock appendLock = new ReentrantLock(); |
| |
| /** Synchronizes flushes to media */ |
| private final Lock flushLock = new ReentrantLock(); |
| |
| /** Used to wait on ongoing flush */ |
| private final Condition flushCondition = flushLock.newCondition(); |
| |
| /** In memory LogBuffer */ |
| private LogBuffer logBuffer; |
| |
| /** Flush status */ |
| private FlushStatus flushStatus = new FlushStatus(); |
| |
| /** Current LogFile appends go to */ |
| private LogFileManager.LogFileWriter currentLogFile; |
| |
| /** Log manager */ |
| private LogManager logManager; |
| |
| /** Size of data appended to the currentLogFile so far */ |
| private long appendedSize; |
| |
| /** Sof limit on the log file size */ |
| private long targetLogFileSize; |
| |
| /** If logging cannot succeed, then loggingFailed is set to true and further logging is prevented */ |
| private boolean logFailed; |
| |
| /** The Checksum used */ |
| private Checksum checksum = new Adler32(); |
| |
| |
| /** |
| * Creates a LogFlushManager instance. We define the memory buffer size, and the default maximum |
| * size for each Log file (this maximum size may be exceeded, if one user record is bigger than |
| * this maximum size. Log file may be smaller too. |
| * |
| * @param logManager The associated LogManager |
| * @param logMemoryBufferSize The buffer size |
| * @param logFileSize The default max size for each Log file. |
| */ |
| public LogFlushManager( LogManager logManager, int logMemoryBufferSize, long logFileSize ) |
| { |
| if ( ( logMemoryBufferSize < 0 ) || ( logFileSize < 0 ) ) |
| { |
| throw new IllegalArgumentException( I18n.err( I18n.ERR_748, logMemoryBufferSize, logFileSize ) ); |
| } |
| |
| logBufferSize = logMemoryBufferSize; |
| targetLogFileSize = logFileSize; |
| this.logManager = logManager; |
| |
| logBuffer = new LogBuffer( logBufferSize, currentLogFile ); |
| } |
| |
| |
| /** |
| * Appends the given user record to the log. Position where the record is appended is returned as part of |
| * userRecord. |
| * |
| * @param userLogRecord provides the user data to be appended to the log |
| * @param sync if true, this calls returns after making sure that the appended data is reflected to the underlying file |
| * @throws IOException If we had an issue while appending some record in the file |
| * @throws InvalidLogException If the log system is is declared as invalid, due to a previous error |
| */ |
| public void append( UserLogRecord userRecord, boolean sync ) throws IOException, InvalidLogException |
| { |
| boolean appendedRecord = false; |
| byte[] userBuffer = userRecord.getDataBuffer(); |
| int length = userRecord.getDataLength(); |
| LogAnchor userLogAnchor = userRecord.getLogAnchor(); |
| |
| int recordSize = LogFileRecords.RECORD_HEADER_SIZE + length + LogFileRecords.RECORD_FOOTER_SIZE; |
| |
| // The addition of a record is done in a protected section |
| appendLock.lock(); |
| |
| // Get out immediately if the log system is invalid |
| if ( logFailed ) |
| { |
| appendLock.unlock(); |
| throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); |
| } |
| |
| // Get a new sequence number for the logged data |
| long lsn = logLSN++; |
| |
| try |
| { |
| // Compute the checksum for the user record |
| checksum.reset(); |
| checksum.update( userBuffer, 0, length ); |
| |
| if ( currentLogFile == null ) |
| { |
| // We are just starting, get the current log file |
| currentLogFile = logManager.switchToNextLogFile( null ); |
| appendedSize = currentLogFile.getLength(); |
| } |
| |
| // If we try to store more data that what can be hold by the current file, |
| // we have to switch to the next file |
| if ( appendedSize > targetLogFileSize ) |
| { |
| // Make sure everything outstanding goes to the current log file |
| flush( lsn, null, 0, 0, true ); |
| |
| currentLogFile = logManager.switchToNextLogFile( currentLogFile ); |
| appendedSize = currentLogFile.getLength(); |
| } |
| |
| if ( recordSize <= logBufferSize ) |
| { |
| ByteBuffer writeHead = logBuffer.writeHead; |
| |
| while ( !appendedRecord ) |
| { |
| // First get the rewind count then the position to which the readhead advanced |
| int readHeadRewindCount = logBuffer.readHeadRewindCount.get(); |
| int readHeadPosition = logBuffer.readHeadPosition; |
| |
| if ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount ) || |
| ( ( logBuffer.writeHeadRewindCount == readHeadRewindCount + 1 ) && |
| ( readHeadPosition < writeHead.position() ) ) ) |
| { |
| if ( writeHead.remaining() >= recordSize ) |
| { |
| // Write the header |
| writeHeader( writeHead, recordSize, lsn ); |
| |
| // Write the data |
| writeHead.put( userBuffer, 0, length ); |
| |
| // Write the footer |
| writeFooter( writeHead, ( int ) checksum.getValue() ); |
| |
| appendedRecord = true; |
| } |
| else |
| // ( writeHead.remaining() < recordSize ) |
| { |
| if ( writeHead.remaining() >= LogFileRecords.RECORD_HEADER_SIZE ) |
| { |
| // Write a skip record |
| writeHeader( writeHead, -1, -1 ); |
| } |
| |
| // rewind buffer now |
| writeHead.rewind(); |
| logBuffer.writeHeadRewindCount++; |
| } |
| } |
| else |
| { |
| if ( logBuffer.writeHeadRewindCount != ( readHeadRewindCount + 1 ) ) |
| { |
| throw new IllegalStateException( "Unexpected sequence number for read/write heads:" |
| + logBuffer.writeHeadRewindCount + |
| " " + readHeadRewindCount ); |
| } |
| |
| if ( ( readHeadPosition - writeHead.position() ) > recordSize ) |
| { |
| // Write the header |
| writeHeader( writeHead, recordSize, lsn ); |
| |
| // Write the data |
| writeHead.put( userBuffer, 0, length ); |
| |
| // Write the footer |
| writeFooter( writeHead, ( int ) checksum.getValue() ); |
| |
| appendedRecord = true; |
| } |
| else |
| { |
| flush( lsn, null, 0, 0, true ); |
| } |
| } |
| } |
| } |
| else |
| { |
| flush( lsn, userBuffer, 0, length, true ); |
| } |
| |
| userLogAnchor.resetLogAnchor( currentLogFile.logFileNumber(), appendedSize, lsn ); |
| appendedSize += recordSize; |
| } |
| catch ( IOException e ) |
| { |
| e.printStackTrace(); |
| logFailed = true; // Mark log subsytem failed |
| } |
| catch ( InvalidLogException e ) |
| { |
| e.printStackTrace(); |
| logFailed = true; // Mark log subsystem failed |
| } |
| finally |
| { |
| appendLock.unlock(); |
| } |
| |
| if ( sync ) |
| { |
| flush( lsn, null, 0, 0, false ); |
| } |
| } |
| |
| |
| /** |
| * Syncs the log upto the given lsn. If lsn is equal to unknow lsn, then the log is |
| * flushed upto the latest logged lsn. |
| * |
| * @param uptoLSN lsn to flush upto. Unkown lsn if caller just wants to sync the log upto the latest logged lsn. |
| * @throws IOException If we had an issue while flushing some record in the file |
| * @throws InvalidLogException If the log system is is declared as invalid, due to a previous error |
| */ |
| void sync( long uptoLSN ) throws IOException, InvalidLogException |
| { |
| if ( uptoLSN == LogAnchor.UNKNOWN_LSN ) |
| { |
| appendLock.lock(); |
| uptoLSN = logLSN - 1; |
| appendLock.unlock(); |
| } |
| |
| // If nothing to flush, then just return |
| if ( uptoLSN == LogAnchor.UNKNOWN_LSN ) |
| { |
| |
| return; |
| } |
| |
| flush( uptoLSN, null, 0, 0, false ); |
| } |
| |
| |
| /** |
| * Flushes the changes in the log buffer upto the given point. The given point is determined as follows: |
| * appendLock is held: flushLSN is the highest lsn generated by the logging system and no more appends can |
| * proceed. In this case log is flushed until where the write head is.Log record with the flushLSN might not |
| * have been appended yet. |
| * |
| * Otherwise: Given flushLSN is appended to the log already. Log is flushed upto max(flushLSN, current flashSatus.uptoLSN) |
| * |
| * Also userBuffer != null => appendLockHeld == true |
| * |
| * Only one thread can do flush. Once a thread find out that a flush is already going on, it waits for the ongoing flush |
| * and is woken up to do its flush. |
| * |
| * flushStatus.uptoLSN represents the highest lsn that any thread wanted to sync. If a couple of threads wait on sync to |
| * complete, the thread that wakes up and does the sync will take it for the team and sync upto flushStatus.uptoLSN so |
| * that logging is more efficient. |
| * |
| * @param flushLSN max LSN the calling thread wants to sync upto |
| * @param userBuffer if not null, user buffer is appended to the log without any buffering |
| * @param offset offset of data in user buffer |
| * @param length length of user data |
| * @param appendLockHeld true if append lock is held |
| * @throws IOException If we had an issue while flushing some record in the file |
| * @throws InvalidLogException If the log system is is declared as invalid, due to a previous error |
| */ |
| private void flush( long flushLSN, byte[] userBuffer, int offset, int length, |
| boolean appendLockHeld ) throws IOException, InvalidLogException |
| { |
| long uptoLSN = flushLSN; |
| |
| if ( appendLockHeld == true ) |
| { |
| uptoLSN--; |
| } |
| |
| flushLock.lock(); |
| |
| // Update max requested lsn if necessary |
| if ( uptoLSN > flushStatus.uptoLSN ) |
| { |
| flushStatus.uptoLSN = uptoLSN; |
| } |
| |
| /* |
| * Check if we need to do flush and wait for ongoing flush if |
| * necessary |
| */ |
| while ( true ) |
| { |
| if ( logFailed ) |
| { |
| flushLock.unlock(); |
| throw new InvalidLogException( I18n.err( I18n.ERR_750 ) ); |
| } |
| |
| if ( ( flushStatus.flushedLSN >= uptoLSN ) && ( appendLockHeld == false ) ) |
| { |
| flushLock.unlock(); |
| return; |
| } |
| |
| if ( flushStatus.flushInProgress == false ) |
| { |
| break; |
| } |
| |
| flushStatus.numWaiters++; |
| flushCondition.awaitUninterruptibly(); |
| flushStatus.numWaiters--; |
| } |
| |
| // Mark flush in progress and do the flush |
| flushStatus.flushInProgress = true; |
| |
| // If not appendlock held, adjust uptoLSN with the max one requested by any thread |
| if ( appendLockHeld == false ) |
| { |
| uptoLSN = flushStatus.uptoLSN; |
| } |
| else |
| { |
| uptoLSN = flushLSN; |
| } |
| |
| flushLock.unlock(); |
| |
| long flushedLSN = LogAnchor.UNKNOWN_LSN; |
| |
| try |
| { |
| flushedLSN = doFlush( uptoLSN, appendLockHeld ); |
| |
| // Now if there is a user buffer, flush from that |
| if ( userBuffer != null ) |
| { |
| ByteBuffer headerFooterHead = logBuffer.headerFooterHead; |
| int recordSize = LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE + length; |
| |
| headerFooterHead.rewind(); |
| writeHeader( headerFooterHead, recordSize, flushLSN ); |
| currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_HEADER_SIZE ); |
| |
| currentLogFile.append( userBuffer, offset, length ); |
| |
| headerFooterHead.rewind(); |
| writeFooter( headerFooterHead, ( int ) checksum.getValue() ); |
| currentLogFile.append( logBuffer.headerFooterBuffer, 0, LogFileRecords.RECORD_FOOTER_SIZE ); |
| |
| flushedLSN = flushLSN; |
| } |
| |
| currentLogFile.sync(); |
| } |
| catch ( IOException e ) |
| { |
| // Mark the logger invalid, wakeup any waiters and return |
| flushLock.lock(); |
| logFailed = true; |
| flushStatus.flushInProgress = false; |
| |
| if ( flushStatus.numWaiters != 0 ) |
| { |
| flushCondition.signalAll(); |
| } |
| |
| flushLock.unlock(); |
| |
| throw e; |
| } |
| |
| flushLock.lock(); |
| |
| if ( flushedLSN != LogAnchor.UNKNOWN_LSN ) |
| { |
| flushStatus.flushedLSN = flushedLSN; |
| |
| if ( flushStatus.flushedLSN > flushStatus.uptoLSN ) |
| { |
| // This should only happen with append lock held |
| if ( appendLockHeld == false ) |
| { |
| throw new IllegalStateException( "FlushedLSN went ahead of uptoLSN while appendlock is not held: " + |
| flushStatus.flushedLSN + " " + flushStatus.uptoLSN ); |
| } |
| |
| flushStatus.uptoLSN = flushStatus.flushedLSN; |
| } |
| } |
| |
| flushStatus.flushInProgress = false; |
| |
| if ( flushStatus.numWaiters != 0 ) |
| { |
| flushCondition.signalAll(); |
| } |
| |
| flushLock.unlock(); |
| } |
| |
| |
| /** |
| * Walks the log buffer and writes it to the underlying log file until the uptoLSN or current write head. |
| * |
| * @param uptoLSN max LSN until where log is flushed |
| * @param appendLockHeld true if appendlock held. |
| * @return lsn upto which flush is done. UNKNOWN_LSN if no flushing is done. |
| * @throws IOException |
| */ |
| private long doFlush( long uptoLSN, boolean appendLockHeld ) throws IOException |
| { |
| ByteBuffer readHead = logBuffer.readHead; |
| ByteBuffer writeHead = logBuffer.writeHead; |
| boolean done = false; |
| |
| int magicNumber; |
| int length; |
| long lsn = LogAnchor.UNKNOWN_LSN; |
| |
| while ( !done ) |
| { |
| int totalLength = 0; |
| |
| while ( true ) |
| { |
| /* |
| * If append lock is held, we might hit write head. We can read |
| * the write head here when append lock is held |
| */ |
| if ( appendLockHeld ) |
| { |
| if ( ( writeHead.position() == readHead.position() ) && |
| ( logBuffer.writeHeadRewindCount == logBuffer.readHeadRewindCount.get() ) ) |
| { |
| done = true; |
| break; |
| } |
| } |
| |
| // If less than header length left to process, then break and flush whatever we got so far |
| if ( readHead.remaining() < LogFileRecords.RECORD_HEADER_SIZE ) |
| { |
| break; |
| } |
| |
| magicNumber = readHead.getInt(); |
| |
| if ( magicNumber != LogFileRecords.RECORD_HEADER_MAGIC_NUMBER ) |
| { |
| throw new IllegalStateException( " Record header magic " + |
| "number does not match " + magicNumber + " expected " + |
| LogFileRecords.RECORD_HEADER_MAGIC_NUMBER ); |
| } |
| |
| length = readHead.getInt(); |
| |
| // Did we hit a skip record at the end of the buffer? |
| if ( length == LogBuffer.SKIP_RECORD_LENGTH ) |
| { |
| break; |
| } |
| |
| // Sanitize length, it includes header and footer overhead |
| if ( length <= ( LogFileRecords.RECORD_HEADER_SIZE + LogFileRecords.RECORD_FOOTER_SIZE ) ) |
| { |
| throw new IllegalStateException( "Record length doesnt make sense:" + length + " expected:" + |
| ( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER + LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER ) ); |
| } |
| |
| // Add to the total length |
| totalLength += length; |
| |
| lsn = readHead.getLong(); |
| |
| // Move to the next record, we processed 16 bytes already |
| readHead.position( readHead.position() + length - 16 ); |
| |
| if ( lsn >= uptoLSN ) |
| { |
| done = true; |
| break; |
| } |
| } |
| |
| // If there is something to flush, then do it now |
| if ( totalLength > 0 ) |
| { |
| int offset; |
| offset = logBuffer.readHeadPosition; |
| |
| currentLogFile.append( logBuffer.buffer, offset, totalLength ); |
| |
| //move the position to the next record |
| logBuffer.readHeadPosition = readHead.position(); |
| } |
| |
| if ( !done ) |
| { |
| // this means we need to rewind and keep flushing |
| logBuffer.readHeadPosition = 0; |
| readHead.rewind(); |
| logBuffer.readHeadRewindCount.incrementAndGet(); |
| } |
| } |
| |
| return lsn; |
| } |
| |
| |
| /** |
| * Write the log file header |
| */ |
| private void writeHeader( ByteBuffer buffer, int length, long lsn ) |
| { |
| buffer.putInt( LogFileRecords.RECORD_HEADER_MAGIC_NUMBER ); |
| buffer.putInt( length ); |
| buffer.putLong( lsn ); |
| buffer.putLong( length ^ lsn ); |
| } |
| |
| |
| /** |
| * Write the log file footer |
| */ |
| private void writeFooter( ByteBuffer buffer, int checksum ) |
| { |
| buffer.putInt( checksum ); |
| buffer.putInt( LogFileRecords.RECORD_FOOTER_MAGIC_NUMBER ); |
| } |
| |
| /** |
| * Used to group the memory buffer data together |
| */ |
| private static class LogBuffer |
| { |
| /** In memory buffer */ |
| private byte buffer[]; |
| |
| /** Used to scan the buffer while reading it to flush */ |
| private ByteBuffer readHead; |
| |
| /** Advanced as readHead flushes data */ |
| private int readHeadPosition; |
| |
| /** Rewind count of readHead. Used to avoid overwriting non flushed data */ |
| private AtomicInteger readHeadRewindCount; |
| |
| /** Used to scan the buffer while appending records into it */ |
| private ByteBuffer writeHead; |
| |
| /** Rewind count of writeHead. used to avoid overwriting non flushed data */ |
| private int writeHeadRewindCount; |
| |
| /** Used to mark records that should be skipped at the end of the log buffer */ |
| private final static int SKIP_RECORD_LENGTH = -1; |
| |
| /** Header footer buffer used when writing user buffers directly */ |
| private byte headerFooterBuffer[]; |
| |
| /** Used to format header footer buffer */ |
| private ByteBuffer headerFooterHead; |
| |
| |
| /** |
| * Create a new instance of a LogBuffer |
| */ |
| private LogBuffer( int bufferSize, LogFileWriter currentLogFile ) |
| { |
| buffer = new byte[bufferSize]; |
| readHead = ByteBuffer.wrap( buffer ); |
| |
| readHeadRewindCount = new AtomicInteger( 0 ); |
| |
| writeHead = ByteBuffer.wrap( buffer ); |
| |
| headerFooterBuffer = new byte[LogFileRecords.MAX_MARKER_SIZE]; |
| headerFooterHead = ByteBuffer.wrap( headerFooterBuffer ); |
| } |
| } |
| |
| /** |
| * Used to group the flush related data together |
| */ |
| private static class FlushStatus |
| { |
| /** whether flush is going on */ |
| boolean flushInProgress; |
| |
| /** Current flush request */ |
| long uptoLSN = LogAnchor.UNKNOWN_LSN; |
| |
| /** Current flushed lsn */ |
| long flushedLSN = LogAnchor.UNKNOWN_LSN; |
| |
| /** Keeps track of the number of waiters */ |
| int numWaiters; |
| } |
| } |