blob: f29c847107ba036bbd57825be7461987724742b4 [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.transaction.management.service.logging;
import java.io.File;
import java.io.IOException;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
public class LogCursor implements ILogCursor {
private final LogManager logManager;
private final ILogFilter logFilter;
private IFileBasedBuffer readOnlyBuffer;
private LogicalLogLocator logicalLogLocator = null;
private int bufferIndex = 0;
private boolean firstNext = true;
private long readLSN = 0;
/**
* @param logFilter
*/
public LogCursor(final LogManager logManager, ILogFilter logFilter) throws ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
}
public LogCursor(final LogManager logManager, PhysicalLogLocator startingPhysicalLogLocator, ILogFilter logFilter)
throws IOException, ACIDException {
this.logFilter = logFilter;
this.logManager = logManager;
initialize(startingPhysicalLogLocator);
}
private void initialize(final PhysicalLogLocator startingPhysicalLogLocator) throws IOException, ACIDException {
if (startingPhysicalLogLocator.getLsn() > logManager.getLastFlushedLsn().get()) {
readLSN = startingPhysicalLogLocator.getLsn();
} else {
//read from disk
readOnlyBuffer = getReadOnlyBuffer(startingPhysicalLogLocator.getLsn(), logManager
.getLogManagerProperties().getLogBufferSize());
logicalLogLocator = new LogicalLogLocator(startingPhysicalLogLocator.getLsn(), readOnlyBuffer, 0,
logManager);
readLSN = logicalLogLocator.getLsn();
}
return;
}
private IFileBasedBuffer getReadOnlyBuffer(long lsn, int size) throws IOException {
int fileId = (int) (lsn / logManager.getLogManagerProperties().getLogPartitionSize());
String filePath = LogUtil.getLogFilePath(logManager.getLogManagerProperties(), fileId);
File file = new File(filePath);
if (file.exists()) {
return FileUtil.getFileBasedBuffer(filePath, lsn, size);
} else {
return null;
}
}
/**
* Moves the cursor to the next log record that satisfies the configured
* filter. The parameter nextLogLocator is set to the point to the next log
* record.
*
* @param nextLogicalLogLocator
* @return true if the cursor was successfully moved to the next log record
* false if there are no more log records that satisfy the
* configured filter.
*/
@Override
public boolean next(LogicalLogLocator nextLogicalLogLocator) throws IOException, ACIDException {
int integerRead = -1;
boolean logRecordBeginPosFound = false;
long bytesSkipped = 0;
if (readLSN > logManager.getLastFlushedLsn().get()) {
readFromMemory(readLSN, nextLogicalLogLocator);
return true;
}
//check whether the currentOffset has enough space to have new log record by comparing
//the smallest log record type(which is commit)'s log header.
while (logicalLogLocator.getMemoryOffset() <= readOnlyBuffer.getSize()
- logManager.getLogRecordHelper().getLogHeaderSize(LogType.COMMIT)) {
integerRead = readOnlyBuffer.readInt(logicalLogLocator.getMemoryOffset());
if (integerRead == logManager.getLogManagerProperties().logMagicNumber) {
logRecordBeginPosFound = true;
break;
}
logicalLogLocator.increaseMemoryOffset(1);
logicalLogLocator.incrementLsn();
bytesSkipped++;
if (bytesSkipped > logManager.getLogManagerProperties().getLogPageSize()) {
return false; // the maximum size of a log record is limited to
// a log page size. If we have skipped as many
// bytes without finding a log record, it
// indicates an absence of logs any further.
}
}
if (!logRecordBeginPosFound) {
// need to reload the buffer
long lsnpos = (++bufferIndex * logManager.getLogManagerProperties().getLogBufferSize());
readOnlyBuffer = getReadOnlyBuffer(lsnpos, logManager.getLogManagerProperties().getLogBufferSize());
if (readOnlyBuffer != null) {
logicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.setLsn(lsnpos);
logicalLogLocator.setMemoryOffset(0);
return next(nextLogicalLogLocator);
} else {
return false;
}
}
int logLength = logManager.getLogRecordHelper().getLogRecordSize(
logManager.getLogRecordHelper().getLogType(logicalLogLocator),
logManager.getLogRecordHelper().getLogContentSize(logicalLogLocator));
if (logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
if (nextLogicalLogLocator == null) {
nextLogicalLogLocator = new LogicalLogLocator(0, readOnlyBuffer, -1, logManager);
}
nextLogicalLogLocator.setLsn(logicalLogLocator.getLsn());
nextLogicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset());
nextLogicalLogLocator.setBuffer(readOnlyBuffer);
logicalLogLocator.incrementLsn(logLength);
logicalLogLocator.setMemoryOffset(logicalLogLocator.getMemoryOffset() + logLength);
} else {
throw new ACIDException("Invalid Log Record found ! checksums do not match :( ");
}
return logFilter.accept(readOnlyBuffer, nextLogicalLogLocator.getMemoryOffset(), logLength);
}
/**
* Returns the filter associated with the cursor.
*
* @return ILogFilter
*/
@Override
public ILogFilter getLogFilter() {
return logFilter;
}
private void readFromMemory(long lsn, LogicalLogLocator currentLogLocator) throws ACIDException {
byte[] logRecord = null;
if (lsn > logManager.getCurrentLsn().get()) {
throw new ACIDException(" invalid lsn " + lsn);
}
/* check if the log record in the log buffer or has reached the disk. */
int pageIndex = logManager.getLogPageIndex(lsn);
int pageOffset = logManager.getLogPageOffset(lsn);
byte[] pageContent = new byte[logManager.getLogManagerProperties().getLogPageSize()];
// take a lock on the log page so that the page is not flushed to
// disk interim
IFileBasedBuffer logPage = logManager.getLogPage(pageIndex);
int logRecordSize = 0;
synchronized (logPage) {
// need to check again
if (lsn > logManager.getLastFlushedLsn().get()) {
// get the log record length
logPage.getBytes(pageContent, 0, pageContent.length);
byte logType = pageContent[pageOffset + 4];
int logHeaderSize = logManager.getLogRecordHelper().getLogHeaderSize(logType);
int logBodySize = DataUtil.byteArrayToInt(pageContent, pageOffset + logHeaderSize - 4);
logRecordSize = logHeaderSize + logBodySize + logManager.getLogRecordHelper().getLogChecksumSize();
logRecord = new byte[logRecordSize];
//copy the log content
System.arraycopy(pageContent, pageOffset, logRecord, 0, logRecordSize);
MemBasedBuffer memBuffer = new MemBasedBuffer(logRecord);
if (logicalLogLocator == null) {
logicalLogLocator = new LogicalLogLocator(lsn, memBuffer, 0, logManager);
} else {
logicalLogLocator.setLsn(lsn);
logicalLogLocator.setBuffer(memBuffer);
logicalLogLocator.setMemoryOffset(0);
}
currentLogLocator.setLsn(lsn);
currentLogLocator.setBuffer(memBuffer);
currentLogLocator.setMemoryOffset(0);
try {
// validate the log record by comparing checksums
if (!logManager.getLogRecordHelper().validateLogRecord(logicalLogLocator)) {
throw new ACIDException(" invalid log record at lsn " + lsn);
}
} catch (Exception e) {
throw new ACIDException("exception encoutered in validating log record at lsn " + lsn, e);
}
}
}
readLSN = readLSN + logRecordSize;
}
}