blob: d78ac244f9b2f0aa92aefbd6594ef89a682195cb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.common.compression.file;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hyracks.api.compression.ICompressorDecompressor;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.IIOManager;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.ICachedPageInternal;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
/**
* CompressedFileManager is responsible to manage the Look Aside File (LAF file), which contains
* the compression information. LAF file format is as follow:
*
* [<offset0, size0>, <offset1, size1> .... <offsetN, sizeN>]
* Each entry <offsetM, sizeM> is an entry of 16-bytes for page M (8 bytes for offset and 8 for size).
*
* The file is responsible to store the beginning and the size of each page after compression.
*/
public class CompressedFileManager {
protected static final int SIZE_ENTRY_OFFSET = 8; // 0 is for the compressed page offset
protected static final int ENTRY_LENGTH = 16; //<offset(8-bytes),size(8-bytes)>
protected static final int EOF = -1;
private static final EnumSet<State> CLOSED = EnumSet.of(State.CLOSED);
private static final EnumSet<State> CLOSED_OR_INVALID = EnumSet.of(State.CLOSED, State.INVALID);
private static final EnumSet<State> OPEN = EnumSet.of(State.READABLE, State.WRITABLE);
private static final EnumSet<State> OPEN_OR_INVALID = EnumSet.of(State.READABLE, State.WRITABLE, State.INVALID);
private static final EnumSet<State> READABLE = EnumSet.of(State.READABLE);
private static final EnumSet<State> WRITABLE = EnumSet.of(State.WRITABLE);
private enum State {
READABLE,
WRITABLE,
/*
* INVALID state means that the LAF does not exists.
* This could happened after deleting the LAF file but not the index file (e.g., non-graceful shutdown).
* To account for this case, we should indicate the index is not valid and the index should be deleted by the
* recovery manager.
*/
INVALID,
CLOSED
}
private final IBufferCache bufferCache;
private final ICompressorDecompressor compressorDecompressor;
private final CompressedFileReference fileRef;
private final IIOManager ioManager;
private int fileId;
private State state;
private int totalNumOfPages;
private LAFWriter lafWriter;
public CompressedFileManager(IBufferCache bufferCache, CompressedFileReference fileRef, IIOManager ioManager) {
state = State.CLOSED;
totalNumOfPages = 0;
this.bufferCache = bufferCache;
this.fileRef = fileRef;
this.compressorDecompressor = fileRef.getCompressorDecompressor();
this.ioManager = ioManager;
}
/* ************************
* File handling methods
* ************************
*/
/**
* Open LAF file
* If the file is empty (i.e. the number of pages is zero)
* Then the state will be WRITABLE.
*
* @return
* true if the file exists and was closed
* false otherwise
* @throws IllegalStateException
* if the the file is not in CLOSED state
*/
public boolean open() throws HyracksDataException {
ensureState(CLOSED);
boolean open = false;
if (ioManager.exists(fileRef.getLAFFileReference())) {
fileId = bufferCache.openFile(fileRef.getLAFFileReference());
open = true;
} else {
fileId = -1;
}
changeToFunctionalState();
return open;
}
/**
* Close LAF file
*
* @return
* true if the LAF file exists and was OPEN
* false otherwise
* @throws IllegalStateException
* if the the file is not in OPEN state
*/
public boolean close() throws HyracksDataException {
ensureState(OPEN_OR_INVALID);
boolean closed = false;
if (state != State.INVALID) {
bufferCache.closeFile(fileId);
state = State.CLOSED;
closed = true;
}
return closed;
}
/**
* Close and purge LAF file
*
* @throws IllegalStateException
* if the the file is not in OPEN state
*/
public void purge() throws HyracksDataException {
if (close()) {
bufferCache.purgeHandle(fileId);
}
}
/**
* Delete LAF file
*
* @throws IllegalStateException
* if the the file is not in CLOSED state
*/
public void delete() throws HyracksDataException {
ensureState(CLOSED_OR_INVALID);
if (state != State.INVALID) {
bufferCache.deleteFile(fileId);
}
}
/**
* Force LAF file content to disk
*
* @throws IllegalStateException
* if the the file is not in CLOSED state
*/
public void force(boolean metadata) throws HyracksDataException {
ensureState(OPEN);
bufferCache.force(fileId, metadata);
}
/* ************************
* LAF writing methods
* ************************
*/
/*
* Should be only visible to LAFWriter
*/
int getFileId() {
return fileId;
}
public ICompressedPageWriter getCompressedPageWriter() {
ensureState(WRITABLE);
return lafWriter;
}
/**
* Add page information (offset, size) after compression.
*
* @param dpid
* @param size
* @return offset for the compressed page.
* @throws HyracksDataException
*/
public long writePageInfo(long dpid, long size) throws HyracksDataException {
final int pageId = BufferedFileHandle.getPageId(dpid);
//Write the page (extraPageIndex = 0)
return writeExtraPageInfo(pageId, size, 0);
}
/**
* Add extra page information (offset, size) after compression.
*
* @param extraPageId
* extra page ID
* @param size
* size of the extra page
* @param extraPageIndex
* the index of the extra page (starting from 0)
* @return offset for the compressed page.
* @throws IllegalStateException
* If the file is not in WRITABLE state
*/
public long writeExtraPageInfo(int extraPageId, long size, int extraPageIndex) throws HyracksDataException {
ensureState(WRITABLE);
final long compressedPageOffset;
try {
compressedPageOffset = lafWriter.writePageInfo(extraPageId + extraPageIndex, size);
} catch (HyracksDataException e) {
lafWriter.abort();
throw e;
}
return compressedPageOffset;
}
/**
* This methods is used by {@link LAFWriter#endWriting()} to signal the end of writing.
* After calling this methods, LAF file will be READ-ONLY.
*
* @param totalNumOfPages
* The total number of pages of the index
* @throws IllegalStateException
* If the file is not in WRITABLE state
*/
void endWriting(int totalNumOfPages) {
ensureState(WRITABLE);
this.totalNumOfPages = totalNumOfPages;
lafWriter = null;
state = State.READABLE;
}
/* ************************
* LAF reading methods
* ************************
*/
/**
* Set the compressed page offset and size
*
* @param compressedPage
*/
public void setCompressedPageInfo(ICachedPageInternal compressedPage) throws HyracksDataException {
setCompressedPageInfo(BufferedFileHandle.getPageId(compressedPage.getDiskPageId()), compressedPage);
}
/**
* Set the extra compressed page offset and size
*
* @param compressedPage
* @param extraPageIndex
*/
public void setExtraCompressedPageInfo(ICachedPageInternal compressedPage, int extraPageIndex)
throws HyracksDataException {
setCompressedPageInfo(compressedPage.getExtraBlockPageId() + extraPageIndex, compressedPage);
}
/* ************************
* LAF general methods
* ************************
*/
/**
* Get the number of compressed pages
*/
public int getNumberOfPages() {
return totalNumOfPages;
}
public ICompressorDecompressor getCompressorDecompressor() {
return compressorDecompressor;
}
/* ************************
* Private methods
* ************************
*/
private void ensureState(EnumSet<State> expectedStates) {
if (!expectedStates.contains(state)) {
throw new IllegalStateException(
"Expecting the state to be " + expectedStates + ". Currently it is " + state);
}
}
private void changeToFunctionalState() throws HyracksDataException {
if (fileId == -1) {
state = State.INVALID;
} else if (bufferCache.getNumPagesOfFile(fileId) == 0) {
state = State.WRITABLE;
lafWriter = new LAFWriter(this, bufferCache);
} else {
state = State.READABLE;
init();
}
}
private void init() throws HyracksDataException {
final int numOfPages = bufferCache.getNumPagesOfFile(fileId);
//Maximum number of entries in a page
final int numOfEntriesPerPage = bufferCache.getPageSize() / ENTRY_LENGTH;
//get the last page which may contain less entries than maxNumOfEntries
final long dpid = getDiskPageId(numOfPages - 1);
final ICachedPage page = bufferCache.pin(dpid, false);
try {
final ByteBuffer buf = page.getBuffer();
//Start at 1 since it is impossible to have EOF at the first entry of a page
int i = 1;
//Seek EOF and count number of entries
while (i < numOfEntriesPerPage && buf.getLong(i * ENTRY_LENGTH) != EOF) {
i++;
}
totalNumOfPages = (numOfPages - 1) * numOfEntriesPerPage + i;
} finally {
bufferCache.unpin(page);
}
}
private ICachedPage pinAndGetPage(int compressedPageId) throws HyracksDataException {
final int pageId = compressedPageId * ENTRY_LENGTH / bufferCache.getPageSize();
return bufferCache.pin(getDiskPageId(pageId), false);
}
private long getDiskPageId(int pageId) {
return BufferedFileHandle.getDiskPageId(fileId, pageId);
}
private void setCompressedPageInfo(int compressedPageId, ICachedPageInternal compressedPage)
throws HyracksDataException {
ensureState(READABLE);
if (totalNumOfPages == 0) {
/*
* It seems it is legal to pin empty file.
* Return the page information as it is not compressed.
*/
compressedPage.setCompressedPageOffset(0);
compressedPage.setCompressedPageSize(bufferCache.getPageSize());
return;
}
final ICachedPage page = pinAndGetPage(compressedPageId);
try {
// No need for read latches as pages are immutable.
final ByteBuffer buf = page.getBuffer();
final int entryOffset = compressedPageId * ENTRY_LENGTH % bufferCache.getPageSize();
compressedPage.setCompressedPageOffset(buf.getLong(entryOffset));
compressedPage.setCompressedPageSize((int) buf.getLong(entryOffset + SIZE_ENTRY_OFFSET));
} finally {
bufferCache.unpin(page);
}
}
}