blob: 18c87a17fbdca94efbac37c662439a0f78688d26 [file] [log] [blame]
/*
* Copyright 2009-2013 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.am.lsm.common.impls;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.storage.am.lsm.common.api.IVirtualBufferCache;
import edu.uci.ics.hyracks.storage.common.buffercache.ICacheMemoryAllocator;
import edu.uci.ics.hyracks.storage.common.buffercache.ICachedPage;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
import edu.uci.ics.hyracks.storage.common.file.TransientFileMapManager;
public class VirtualBufferCache implements IVirtualBufferCache {
private static final int OVERFLOW_PADDING = 8;
private final ICacheMemoryAllocator allocator;
private final IFileMapManager fileMapManager;
private final int pageSize;
private final int numPages;
private final CacheBucket[] buckets;
private final ArrayList<VirtualPage> pages;
private int nextFree;
private boolean open;
public VirtualBufferCache(ICacheMemoryAllocator allocator, int pageSize, int numPages) {
this.allocator = allocator;
this.fileMapManager = new TransientFileMapManager();
this.pageSize = pageSize;
this.numPages = numPages;
buckets = new CacheBucket[numPages];
pages = new ArrayList<VirtualPage>();
nextFree = 0;
open = false;
}
@Override
public void createFile(FileReference fileRef) throws HyracksDataException {
synchronized (fileMapManager) {
if (fileMapManager.isMapped(fileRef)) {
throw new HyracksDataException("File " + fileRef + " is already mapped");
}
fileMapManager.registerFile(fileRef);
}
}
@Override
public void openFile(int fileId) throws HyracksDataException {
}
@Override
public void closeFile(int fileId) throws HyracksDataException {
}
@Override
public void deleteFile(int fileId, boolean flushDirtyPages) throws HyracksDataException {
synchronized (fileMapManager) {
if (!fileMapManager.isMapped(fileId)) {
throw new HyracksDataException("File with id " + fileId + " is not mapped");
}
fileMapManager.unregisterFile(fileId);
}
for (int i = 0; i < buckets.length; i++) {
CacheBucket bucket = buckets[i];
bucket.bucketLock.lock();
try {
VirtualPage prev = null;
VirtualPage curr = bucket.cachedPage;
while (curr != null) {
if (BufferedFileHandle.getFileId(curr.dpid) == fileId) {
if (prev == null) {
bucket.cachedPage = curr.next;
curr.reset();
curr = bucket.cachedPage;
} else {
prev.next = curr.next;
curr.reset();
curr = prev.next;
}
} else {
prev = curr;
curr = curr.next;
}
}
} finally {
bucket.bucketLock.unlock();
}
}
defragPageList();
}
private void defragPageList() {
synchronized (pages) {
int start = 0;
int end = nextFree - 1;
while (start < end) {
VirtualPage lastUsed = pages.get(end);
while (end > 0 && lastUsed.dpid == -1) {
--end;
lastUsed = pages.get(end);
}
if (end == 0) {
nextFree = lastUsed.dpid == -1 ? 0 : 1;
break;
}
VirtualPage firstUnused = pages.get(start);
while (start < end && firstUnused.dpid != -1) {
++start;
firstUnused = pages.get(start);
}
if (start >= end) {
break;
}
Collections.swap(pages, start, end);
nextFree = end;
--end;
++start;
}
}
}
@Override
public ICachedPage tryPin(long dpid) throws HyracksDataException {
return pin(dpid, false);
}
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
VirtualPage page = null;
int hash = hash(dpid);
CacheBucket bucket = buckets[hash];
bucket.bucketLock.lock();
try {
page = bucket.cachedPage;
while (page != null) {
if (page.dpid == dpid) {
return page;
}
page = page.next;
}
if (!newPage) {
throw new HyracksDataException("Page " + BufferedFileHandle.getPageId(dpid)
+ " does not exist in file "
+ fileMapManager.lookupFileName(BufferedFileHandle.getFileId(dpid)));
}
page = getOrAllocPage(dpid);
page.next = bucket.cachedPage;
bucket.cachedPage = page;
} finally {
bucket.bucketLock.unlock();
}
return page;
}
private int hash(long dpid) {
return (int) (dpid % buckets.length);
}
private VirtualPage getOrAllocPage(long dpid) {
VirtualPage page = null;
synchronized (pages) {
if (nextFree >= pages.size()) {
page = new VirtualPage(allocator.allocate(pageSize, 1)[0]);
pages.add(page);
} else {
page = pages.get(nextFree);
}
++nextFree;
page.dpid = dpid;
}
return page;
}
@Override
public void unpin(ICachedPage page) throws HyracksDataException {
}
@Override
public void flushDirtyPage(ICachedPage page) throws HyracksDataException {
}
@Override
public void force(int fileId, boolean metadata) throws HyracksDataException {
}
@Override
public int getPageSize() {
return pageSize;
}
@Override
public int getNumPages() {
return numPages;
}
@Override
public void open() throws HyracksDataException {
if (open) {
throw new HyracksDataException("Failed to open virtual buffercache since it is already open.");
}
pages.trimToSize();
pages.ensureCapacity(numPages + OVERFLOW_PADDING);
ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
for (int i = 0; i < numPages; i++) {
pages.add(new VirtualPage(buffers[i]));
buckets[i] = new CacheBucket();
}
nextFree = 0;
open = true;
}
@Override
public void reset() {
for (int i = 0; i < numPages; i++) {
pages.get(i).reset();
buckets[i].cachedPage = null;
}
int excess = pages.size() - numPages;
if (excess > 0) {
for (int i = numPages + excess - 1; i >= numPages; i--) {
pages.remove(i);
}
}
nextFree = 0;
}
@Override
public void close() throws HyracksDataException {
if (!open) {
throw new HyracksDataException("Failed to close virtual buffercache since it is already closed.");
}
pages.clear();
for (int i = 0; i < numPages; i++) {
buckets[i].cachedPage = null;
}
open = false;
}
public String dumpState() {
StringBuilder sb = new StringBuilder();
sb.append(String.format("Page size = %d\n", pageSize));
sb.append(String.format("Capacity = %d\n", numPages));
sb.append(String.format("Allocated pages = %d\n", pages.size()));
sb.append(String.format("Next free page = %d\n", nextFree));
return sb.toString();
}
@Override
public IFileMapManager getFileMapProvider() {
return fileMapManager;
}
@Override
public boolean isFull() {
return nextFree >= numPages;
}
private static class CacheBucket {
private final ReentrantLock bucketLock;
private VirtualPage cachedPage;
public CacheBucket() {
this.bucketLock = new ReentrantLock();
}
}
private class VirtualPage implements ICachedPage {
final ByteBuffer buffer;
final ReadWriteLock latch;
volatile long dpid;
VirtualPage next;
public VirtualPage(ByteBuffer buffer) {
this.buffer = buffer;
latch = new ReentrantReadWriteLock(true);
dpid = -1;
next = null;
}
public void reset() {
dpid = -1;
next = null;
}
@Override
public ByteBuffer getBuffer() {
return buffer;
}
@Override
public void acquireReadLatch() {
latch.readLock().lock();
}
@Override
public void releaseReadLatch() {
latch.readLock().unlock();
}
@Override
public void acquireWriteLatch() {
latch.writeLock().lock();
}
@Override
public void releaseWriteLatch() {
latch.writeLock().unlock();
}
}
}