blob: 81e528b1d2d0276258ca369c12a8bd7cd65296e6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.hyracks.storage.am.common.impls;
import java.util.ArrayList;
import java.util.List;
import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.io.FileReference;
import org.apache.hyracks.storage.am.common.api.IPageManager;
import org.apache.hyracks.storage.am.common.api.ITreeIndex;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexFrameFactory;
import org.apache.hyracks.storage.am.common.api.ITreeIndexMetadataFrame;
import org.apache.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import org.apache.hyracks.storage.common.IIndexBulkLoader;
import org.apache.hyracks.storage.common.MultiComparator;
import org.apache.hyracks.storage.common.buffercache.HaltOnFailureCallback;
import org.apache.hyracks.storage.common.buffercache.IBufferCache;
import org.apache.hyracks.storage.common.buffercache.ICachedPage;
import org.apache.hyracks.storage.common.buffercache.IFIFOPageWriter;
import org.apache.hyracks.storage.common.buffercache.IPageWriteCallback;
import org.apache.hyracks.storage.common.buffercache.PageWriteFailureCallback;
import org.apache.hyracks.storage.common.compression.file.ICompressedPageWriter;
import org.apache.hyracks.storage.common.file.BufferedFileHandle;
public abstract class AbstractTreeIndex implements ITreeIndex {
public static final int MINIMAL_TREE_PAGE_COUNT = 2;
public static final int MINIMAL_TREE_PAGE_COUNT_WITH_FILTER = 3;
protected int rootPage = 1;
protected final IBufferCache bufferCache;
protected final IPageManager freePageManager;
protected final ITreeIndexFrameFactory interiorFrameFactory;
protected final ITreeIndexFrameFactory leafFrameFactory;
protected final IBinaryComparatorFactory[] cmpFactories;
protected final int fieldCount;
protected FileReference file;
private int fileId = -1;
protected boolean isActive = false;
protected int bulkloadLeafStart = 0;
public AbstractTreeIndex(IBufferCache bufferCache, IPageManager freePageManager,
ITreeIndexFrameFactory interiorFrameFactory, ITreeIndexFrameFactory leafFrameFactory,
IBinaryComparatorFactory[] cmpFactories, int fieldCount, FileReference file) {
this.bufferCache = bufferCache;
this.freePageManager = freePageManager;
this.interiorFrameFactory = interiorFrameFactory;
this.leafFrameFactory = leafFrameFactory;
this.cmpFactories = cmpFactories;
this.fieldCount = fieldCount;
this.file = file;
}
@Override
public synchronized void create() throws HyracksDataException {
if (isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_CREATE_ACTIVE_INDEX);
}
fileId = bufferCache.createFile(file);
boolean failed = true;
try {
bufferCache.openFile(fileId);
failed = false;
} finally {
if (failed) {
bufferCache.deleteFile(fileId);
}
}
failed = true;
try {
freePageManager.open(fileId);
freePageManager.init(interiorFrameFactory, leafFrameFactory);
setRootPage();
freePageManager.close(HaltOnFailureCallback.INSTANCE);
failed = false;
} finally {
bufferCache.closeFile(fileId);
if (failed) {
bufferCache.deleteFile(fileId);
}
}
}
private void setRootPage() throws HyracksDataException {
rootPage = freePageManager.getRootPageId();
bulkloadLeafStart = freePageManager.getBulkLoadLeaf();
}
@Override
public synchronized void activate() throws HyracksDataException {
if (isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_ACTIVATE_ACTIVE_INDEX);
}
if (fileId >= 0) {
bufferCache.openFile(fileId);
} else {
fileId = bufferCache.openFile(file);
}
freePageManager.open(fileId);
setRootPage();
// TODO: Should probably have some way to check that the tree is physically consistent
// or that the file we just opened actually is a tree
isActive = true;
}
@Override
public synchronized void deactivate() throws HyracksDataException {
if (!isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_DEACTIVATE_INACTIVE_INDEX);
}
freePageManager.close(HaltOnFailureCallback.INSTANCE);
bufferCache.closeFile(fileId);
isActive = false;
}
@Override
public void purge() throws HyracksDataException {
if (isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_PURGE_ACTIVE_INDEX);
}
bufferCache.purgeHandle(fileId);
// after purging, the fileId has no mapping and no meaning
fileId = -1;
}
@Override
public synchronized void destroy() throws HyracksDataException {
if (isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_DESTROY_ACTIVE_INDEX);
}
bufferCache.deleteFile(file);
}
@Override
public synchronized void clear() throws HyracksDataException {
if (!isActive) {
throw HyracksDataException.create(ErrorCode.CANNOT_CLEAR_INACTIVE_INDEX);
}
freePageManager.init(interiorFrameFactory, leafFrameFactory);
setRootPage();
}
public boolean isEmptyTree(ITreeIndexFrame frame) throws HyracksDataException {
if (rootPage == -1) {
return true;
}
return freePageManager.isEmpty(frame, rootPage);
}
public byte getTreeHeight(ITreeIndexFrame frame) throws HyracksDataException {
ICachedPage rootNode = bufferCache.pin(BufferedFileHandle.getDiskPageId(fileId, rootPage), false);
rootNode.acquireReadLatch();
try {
frame.setPage(rootNode);
return frame.getLevel();
} finally {
rootNode.releaseReadLatch();
bufferCache.unpin(rootNode);
}
}
@Override
public int getFileId() {
return fileId;
}
public FileReference getFileReference() {
return file;
}
@Override
public IBufferCache getBufferCache() {
return bufferCache;
}
@Override
public ITreeIndexFrameFactory getInteriorFrameFactory() {
return interiorFrameFactory;
}
@Override
public ITreeIndexFrameFactory getLeafFrameFactory() {
return leafFrameFactory;
}
@Override
public IBinaryComparatorFactory[] getComparatorFactories() {
return cmpFactories;
}
@Override
public IPageManager getPageManager() {
return freePageManager;
}
@Override
public int getRootPageId() {
return rootPage;
}
@Override
public int getFieldCount() {
return fieldCount;
}
public abstract class AbstractTreeIndexBulkLoader extends PageWriteFailureCallback implements IIndexBulkLoader {
protected final MultiComparator cmp;
protected final int slotSize;
protected final int leafMaxBytes;
protected final int interiorMaxBytes;
protected final ArrayList<NodeFrontier> nodeFrontiers = new ArrayList<>();
protected final ITreeIndexMetadataFrame metaFrame;
protected final ITreeIndexTupleWriter tupleWriter;
protected ITreeIndexFrame leafFrame;
protected ITreeIndexFrame interiorFrame;
// Immutable bulk loaders write their root page at page -2, as needed e.g. by append-only file systems such as
// HDFS. Since loading this tree relies on the root page actually being at that point, no further inserts into
// that tree are allowed. Currently, this is not enforced.
protected boolean releasedLatches;
private final IFIFOPageWriter pageWriter;
protected List<ICachedPage> pagesToWrite;
private final ICompressedPageWriter compressedPageWriter;
public AbstractTreeIndexBulkLoader(float fillFactor, IPageWriteCallback callback) throws HyracksDataException {
leafFrame = leafFrameFactory.createFrame();
interiorFrame = interiorFrameFactory.createFrame();
metaFrame = freePageManager.createMetadataFrame();
pageWriter = bufferCache.createFIFOWriter(callback, this);
if (!isEmptyTree(leafFrame)) {
throw HyracksDataException.create(ErrorCode.CANNOT_BULK_LOAD_NON_EMPTY_TREE);
}
this.cmp = MultiComparator.create(cmpFactories);
leafFrame.setMultiComparator(cmp);
interiorFrame.setMultiComparator(cmp);
tupleWriter = leafFrame.getTupleWriter();
NodeFrontier leafFrontier = new NodeFrontier(leafFrame.createTupleReference());
leafFrontier.pageId = freePageManager.takePage(metaFrame);
leafFrontier.page =
bufferCache.confiscatePage(BufferedFileHandle.getDiskPageId(fileId, leafFrontier.pageId));
interiorFrame.setPage(leafFrontier.page);
interiorFrame.initBuffer((byte) 0);
interiorMaxBytes = (int) (interiorFrame.getBuffer().capacity() * fillFactor);
leafFrame.setPage(leafFrontier.page);
leafFrame.initBuffer((byte) 0);
leafMaxBytes = (int) (leafFrame.getBuffer().capacity() * fillFactor);
slotSize = leafFrame.getSlotSize();
nodeFrontiers.add(leafFrontier);
pagesToWrite = new ArrayList<>();
compressedPageWriter = bufferCache.getCompressedPageWriter(fileId);
}
protected void handleException() {
// Unlatch and unpin pages that weren't in the queue to avoid leaking memory.
compressedPageWriter.abort();
for (NodeFrontier nodeFrontier : nodeFrontiers) {
if (nodeFrontier != null && nodeFrontier.page != null) {
ICachedPage frontierPage = nodeFrontier.page;
if (frontierPage.confiscated()) {
bufferCache.returnPage(frontierPage, false);
}
}
}
for (ICachedPage pageToDiscard : pagesToWrite) {
if (pageToDiscard != null) {
bufferCache.returnPage(pageToDiscard, false);
}
}
releasedLatches = true;
}
@Override
public void end() throws HyracksDataException {
if (hasFailed()) {
throw HyracksDataException.create(getFailure());
}
freePageManager.setRootPageId(rootPage);
}
protected void addLevel() throws HyracksDataException {
NodeFrontier frontier = new NodeFrontier(tupleWriter.createTupleReference());
frontier.page = bufferCache.confiscatePage(IBufferCache.INVALID_DPID);
frontier.pageId = -1;
frontier.lastTuple.setFieldCount(cmp.getKeyFieldCount());
interiorFrame.setPage(frontier.page);
interiorFrame.initBuffer((byte) nodeFrontiers.size());
nodeFrontiers.add(frontier);
}
public ITreeIndexFrame getLeafFrame() {
return leafFrame;
}
public void setLeafFrame(ITreeIndexFrame leafFrame) {
this.leafFrame = leafFrame;
}
public void write(ICachedPage cPage) throws HyracksDataException {
compressedPageWriter.prepareWrite(cPage);
pageWriter.write(cPage);
}
@Override
public void force() throws HyracksDataException {
bufferCache.force(fileId, false);
}
}
public IBinaryComparatorFactory[] getCmpFactories() {
return cmpFactories;
}
@Override
public String toString() {
return "{\"class\":\"" + getClass().getSimpleName() + "\",\"file\":\"" + file.getRelativePath() + "\"}";
}
}