blob: 1bceea38529210db8b2bd4622e56832114360620 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.common.buffercache;
import java.nio.ByteBuffer;
import java.util.concurrent.BlockingQueue;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.api.io.IFileHandle;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.api.util.IoUtil;
import org.apache.hyracks.control.nc.io.IOManager;
import org.apache.hyracks.storage.common.compression.file.CompressedFileReference;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.util.annotations.NotThreadSafe;
/**
* Handles all IO operations for a specified file.
*/
@NotThreadSafe
public abstract class AbstractBufferedFileIOManager {
private static final String ERROR_MESSAGE = "%s unexpected number of bytes: [expected: %d, actual: %d, file: %s]";
private static final String READ = "Read";
private static final String WRITE = "Written";
protected final BufferCache bufferCache;
protected final IPageReplacementStrategy pageReplacementStrategy;
protected final IOManager ioManager;
private final BlockingQueue<BufferCacheHeaderHelper> headerPageCache;
private IFileHandle fileHandle;
private volatile boolean hasOpen;
protected AbstractBufferedFileIOManager(BufferCache bufferCache, IIOManager ioManager,
BlockingQueue<BufferCacheHeaderHelper> headerPageCache, IPageReplacementStrategy pageReplacementStrategy) {
this.bufferCache = bufferCache;
this.ioManager = (IOManager) ioManager;
this.headerPageCache = headerPageCache;
this.pageReplacementStrategy = pageReplacementStrategy;
hasOpen = false;
}
/* ********************************
* Read/Write page methods
* ********************************
*/
/**
* Read the CachedPage from disk
*
* @param cPage
* CachedPage in {@link BufferCache}
* @throws HyracksDataException
*/
public abstract void read(CachedPage cPage) throws HyracksDataException;
/**
* Write the CachedPage into disk
*
* @param cPage
* CachedPage in {@link BufferCache}
* @throws HyracksDataException
*/
public void write(CachedPage cPage) throws HyracksDataException {
final int totalPages = cPage.getFrameSizeMultiplier();
final int extraBlockPageId = cPage.getExtraBlockPageId();
final BufferCacheHeaderHelper header = checkoutHeaderHelper();
write(cPage, header, totalPages, extraBlockPageId);
}
/**
* Write the CachedPage into disk called by {@link AbstractBufferedFileIOManager#write(CachedPage)}
* Note: It is the responsibility of the caller to return {@link BufferCacheHeaderHelper}
*
* @param cPage
* CachedPage that will be written
* @param header
* HeaderHelper to add into the written page
* @param totalPages
* Number of pages to be written
* @param extraBlockPageId
* Extra page ID in case it has more than one page
* @throws HyracksDataException
*/
protected abstract void write(CachedPage cPage, BufferCacheHeaderHelper header, int totalPages,
int extraBlockPageId) throws HyracksDataException;
/* ********************************
* File operations' methods
* ********************************
*/
/**
* Open the file
*
* @throws HyracksDataException
*/
public void open(FileReference fileRef) throws HyracksDataException {
fileHandle = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
hasOpen = true;
}
/**
* Close the file
*
* @throws HyracksDataException
*/
public void close() throws HyracksDataException {
if (hasOpen) {
ioManager.close(fileHandle);
}
}
public void purge() throws HyracksDataException {
ioManager.close(fileHandle);
}
/**
* Force the file into disk
*
* @param metadata
* see {@link java.nio.channels.FileChannel#force(boolean)}
* @throws HyracksDataException
*/
public void force(boolean metadata) throws HyracksDataException {
ioManager.sync(fileHandle, metadata);
}
/**
* Get the number of pages in the file
*
* @throws HyracksDataException
*/
public abstract int getNumberOfPages() throws HyracksDataException;
public void markAsDeleted() throws HyracksDataException {
fileHandle = null;
}
/**
* Check whether the file has been deleted
*
* @return
* true if has been deleted, false o.w
*/
public boolean hasBeenDeleted() {
return fileHandle == null;
}
/**
* Check whether the file has ever been opened
*
* @return
* true if has ever been opened, false otherwise
*/
public final boolean hasBeenOpened() {
return hasOpen;
}
public final FileReference getFileReference() {
return fileHandle.getFileReference();
}
public static void createFile(BufferCache bufferCache, FileReference fileRef) throws HyracksDataException {
IoUtil.create(fileRef);
if (fileRef.isCompressed()) {
final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
try {
bufferCache.createFile(cFileRef.getLAFFileReference());
} catch (HyracksDataException e) {
//In case of creating the LAF file failed, delete index file reference
IoUtil.delete(fileRef);
throw e;
}
}
}
public static void deleteFile(FileReference fileRef, IIOManager ioManager) throws HyracksDataException {
HyracksDataException savedEx = null;
/*
* LAF file has to be deleted before the index file.
* If the index file deleted first and a non-graceful shutdown happened before the deletion of
* the LAF file, the LAF file will not be deleted during the next recovery.
*/
try {
if (fileRef.isCompressed()) {
final CompressedFileReference cFileRef = (CompressedFileReference) fileRef;
final FileReference lafFileRef = cFileRef.getLAFFileReference();
if (lafFileRef.getFile().exists()) {
ioManager.delete(lafFileRef);
}
}
} catch (HyracksDataException e) {
savedEx = e;
}
try {
ioManager.delete(fileRef);
} catch (HyracksDataException e) {
if (savedEx != null) {
savedEx.addSuppressed(e);
} else {
savedEx = e;
}
}
if (savedEx != null) {
throw savedEx;
}
}
/* ********************************
* Compressed file methods
* ********************************
*/
public abstract ICompressedPageWriter getCompressedPageWriter();
/* ********************************
* Common helper methods
* ********************************
*/
/**
* Get the offset for the first page
*
* @param cPage
* CachedPage for which the offset is needed
* @return
* page offset in the file
*/
protected abstract long getFirstPageOffset(CachedPage cPage);
/**
* Get the offset for the extra page
*
* @param cPage
* CachedPage for which the offset is needed
* @return
* page offset in the file
*/
protected abstract long getExtraPageOffset(CachedPage cPage);
protected final BufferCacheHeaderHelper checkoutHeaderHelper() {
BufferCacheHeaderHelper helper = headerPageCache.poll();
if (helper == null) {
helper = new BufferCacheHeaderHelper(bufferCache.getPageSize());
}
return helper;
}
protected final void returnHeaderHelper(BufferCacheHeaderHelper buffer) {
headerPageCache.offer(buffer); //NOSONAR
}
protected final long readToBuffer(ByteBuffer buf, long offset) throws HyracksDataException {
return ioManager.syncRead(fileHandle, offset, buf);
}
protected final long writeToFile(ByteBuffer buf, long offset) throws HyracksDataException {
return ioManager.doSyncWrite(fileHandle, offset, buf);
}
protected final long writeToFile(ByteBuffer[] buf, long offset) throws HyracksDataException {
return ioManager.doSyncWrite(fileHandle, offset, buf);
}
protected final long getFileSize() throws HyracksDataException {
return ioManager.getSize(fileHandle);
}
protected final void verifyBytesWritten(long expected, long actual) {
if (expected != actual) {
throwException(WRITE, expected, actual);
}
}
protected final boolean verifyBytesRead(long expected, long actual) {
if (expected != actual) {
if (actual == -1) {
// disk order scan code seems to rely on this behavior, so silently return
return false;
} else {
throwException(READ, expected, actual);
}
}
return true;
}
protected void throwException(String op, long expected, long actual) {
final String path = fileHandle.getFileReference().getAbsolutePath();
throw new IllegalStateException(String.format(ERROR_MESSAGE, op, expected, actual, path));
}
}