blob: c2693ab03b96f14b7d414eba20391b6fe5a0619c [file] [log] [blame]
/*
* Copyright 2009-2010 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.hyracks.storage.common.buffercache;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import edu.uci.ics.hyracks.api.exceptions.HyracksDataException;
import edu.uci.ics.hyracks.api.io.FileHandle;
import edu.uci.ics.hyracks.api.io.FileReference;
import edu.uci.ics.hyracks.api.io.IIOManager;
import edu.uci.ics.hyracks.storage.common.file.BufferedFileHandle;
import edu.uci.ics.hyracks.storage.common.file.IFileMapManager;
public class BufferCache implements IBufferCacheInternal {
private static final Logger LOGGER = Logger.getLogger(BufferCache.class.getName());
private static final int MAP_FACTOR = 2;
private static final int MAX_VICTIMIZATION_TRY_COUNT = 3;
private final IIOManager ioManager;
private final int pageSize;
private final int numPages;
private final CachedPage[] cachedPages;
private final CacheBucket[] pageMap;
private final IPageReplacementStrategy pageReplacementStrategy;
private final IFileMapManager fileMapManager;
private final CleanerThread cleanerThread;
private final Map<Integer, BufferedFileHandle> fileInfoMap;
private boolean closed;
public BufferCache(IIOManager ioManager, ICacheMemoryAllocator allocator,
IPageReplacementStrategy pageReplacementStrategy, IFileMapManager fileMapManager, int pageSize, int numPages) {
this.ioManager = ioManager;
this.pageSize = pageSize;
this.numPages = numPages;
pageReplacementStrategy.setBufferCache(this);
ByteBuffer[] buffers = allocator.allocate(pageSize, numPages);
cachedPages = new CachedPage[buffers.length];
for (int i = 0; i < buffers.length; ++i) {
cachedPages[i] = new CachedPage(i, buffers[i], pageReplacementStrategy);
}
pageMap = new CacheBucket[numPages * MAP_FACTOR];
for (int i = 0; i < pageMap.length; ++i) {
pageMap[i] = new CacheBucket();
}
this.pageReplacementStrategy = pageReplacementStrategy;
this.fileMapManager = fileMapManager;
fileInfoMap = new HashMap<Integer, BufferedFileHandle>();
cleanerThread = new CleanerThread();
cleanerThread.start();
closed = false;
}
@Override
public int getPageSize() {
return pageSize;
}
@Override
public int getNumPages() {
return numPages;
}
@Override
public ICachedPage pin(long dpid, boolean newPage) throws HyracksDataException {
if (closed) {
throw new HyracksDataException("pin called on a closed cache");
}
CachedPage cPage = findPage(dpid, newPage);
if (!newPage) {
if (!cPage.valid) {
/*
* We got a buffer and we have pinned it. But its invalid. If its a new page, we just mark it as valid
* and return. Or else, while we hold the page lock, we get a write latch on the data and start a read.
*/
cPage.acquireWriteLatch(false);
try {
if (!cPage.valid) {
read(cPage);
}
cPage.valid = true;
} finally {
cPage.releaseWriteLatch();
}
}
} else {
cPage.valid = true;
}
pageReplacementStrategy.notifyCachePageAccess(cPage);
return cPage;
}
private CachedPage findPage(long dpid, boolean newPage) {
int victimizationTryCount = 0;
int localCleanCount = -1;
synchronized (cleanerThread.cleanNotification) {
localCleanCount = cleanerThread.cleanCount;
}
while (true) {
CachedPage cPage = null;
/*
* Hash dpid to get a bucket and then check if the page exists in the bucket.
*/
int hash = hash(dpid);
CacheBucket bucket = pageMap[hash];
bucket.bucketLock.lock();
try {
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
return cPage;
}
cPage = cPage.next;
}
} finally {
bucket.bucketLock.unlock();
}
/*
* If we got here, the page was not in the hash table. Now we ask the page replacement strategy to find us a victim.
*/
CachedPage victim = (CachedPage) pageReplacementStrategy.findVictim();
if (victim != null) {
/*
* We have a victim with the following invariants.
* 1. The dpid on the CachedPage may or may not be valid.
* 2. We have a pin on the CachedPage. We have to deal with three cases here.
* Case 1: The dpid on the CachedPage is invalid (-1). This indicates that this buffer has never been used.
* So we are the only ones holding it. Get a lock on the required dpid's hash bucket, check if someone inserted
* the page we want into the table. If so, decrement the pincount on the victim and return the winner page in the
* table. If such a winner does not exist, insert the victim and return it.
* Case 2: The dpid on the CachedPage is valid.
* Case 2a: The current dpid and required dpid hash to the same bucket.
* Get the bucket lock, check that the victim is still at pinCount == 1 If so check if there is a winning
* CachedPage with the required dpid. If so, decrement the pinCount on the victim and return the winner.
* If not, update the contents of the CachedPage to hold the required dpid and return it. If the picCount
* on the victim was != 1 or CachedPage was dirty someone used the victim for its old contents -- Decrement
* the pinCount and retry.
* Case 2b: The current dpid and required dpid hash to different buckets. Get the two bucket locks in the order
* of the bucket indexes (Ordering prevents deadlocks). Check for the existence of a winner in the new bucket
* and for potential use of the victim (pinCount != 1). If everything looks good, remove the CachedPage from
* the old bucket, and add it to the new bucket and update its header with the new dpid.
*/
if (victim.dpid < 0) {
/*
* Case 1.
*/
bucket.bucketLock.lock();
try {
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
}
victim.reset(dpid);
victim.next = bucket.cachedPage;
bucket.cachedPage = victim;
} finally {
bucket.bucketLock.unlock();
}
return victim;
}
int victimHash = hash(victim.dpid);
if (victimHash == hash) {
/*
* Case 2a.
*/
bucket.bucketLock.lock();
try {
if (victim.pinCount.get() != 1) {
victim.pinCount.decrementAndGet();
continue;
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
}
victim.reset(dpid);
} finally {
bucket.bucketLock.unlock();
}
return victim;
} else {
/*
* Case 2b.
*/
CacheBucket victimBucket = pageMap[victimHash];
if (victimHash < hash) {
victimBucket.bucketLock.lock();
bucket.bucketLock.lock();
} else {
bucket.bucketLock.lock();
victimBucket.bucketLock.lock();
}
try {
if (victim.pinCount.get() != 1) {
victim.pinCount.decrementAndGet();
continue;
}
cPage = bucket.cachedPage;
while (cPage != null) {
if (cPage.dpid == dpid) {
cPage.pinCount.incrementAndGet();
victim.pinCount.decrementAndGet();
return cPage;
}
cPage = cPage.next;
}
if (victimBucket.cachedPage == victim) {
victimBucket.cachedPage = victim.next;
} else {
CachedPage victimPrev = victimBucket.cachedPage;
while (victimPrev != null && victimPrev.next != victim) {
victimPrev = victimPrev.next;
}
assert victimPrev != null;
victimPrev.next = victim.next;
}
victim.reset(dpid);
victim.next = bucket.cachedPage;
bucket.cachedPage = victim;
} finally {
victimBucket.bucketLock.unlock();
bucket.bucketLock.unlock();
}
return victim;
}
}
/*
* Victimization failed -- all pages pinned? wait a bit, increment victimizationTryCount and loop around. Give up after MAX_VICTIMIZATION_TRY_COUNT trys.
*/
if (++victimizationTryCount >= MAX_VICTIMIZATION_TRY_COUNT) {
return null;
}
synchronized (cleanerThread) {
cleanerThread.notifyAll();
}
synchronized (cleanerThread.cleanNotification) {
if (cleanerThread.cleanCount == localCleanCount) {
try {
cleanerThread.cleanNotification.wait(1000);
} catch (InterruptedException e) {
// Do nothing
}
}
}
}
}
private void read(CachedPage cPage) throws HyracksDataException {
BufferedFileHandle fInfo = getFileInfo(cPage);
cPage.buffer.clear();
ioManager.syncRead(fInfo.getFileHandle(), (long) BufferedFileHandle.getPageId(cPage.dpid) * pageSize,
cPage.buffer);
}
private BufferedFileHandle getFileInfo(CachedPage cPage) throws HyracksDataException {
synchronized (fileInfoMap) {
BufferedFileHandle fInfo = fileInfoMap.get(BufferedFileHandle.getFileId(cPage.dpid));
if (fInfo == null) {
throw new HyracksDataException("No such file mapped");
}
return fInfo;
}
}
private void write(CachedPage cPage) throws HyracksDataException {
BufferedFileHandle fInfo = getFileInfo(cPage);
cPage.buffer.position(0);
cPage.buffer.limit(pageSize);
ioManager.syncWrite(fInfo.getFileHandle(), (long) BufferedFileHandle.getPageId(cPage.dpid) * pageSize,
cPage.buffer);
}
@Override
public void unpin(ICachedPage page) throws HyracksDataException {
if (closed) {
throw new HyracksDataException("unpin called on a closed cache");
}
((CachedPage) page).pinCount.decrementAndGet();
}
private int hash(long dpid) {
return (int) (dpid % pageMap.length);
}
private static class CacheBucket {
private final Lock bucketLock;
private CachedPage cachedPage;
public CacheBucket() {
bucketLock = new ReentrantLock();
}
}
private class CachedPage implements ICachedPageInternal {
private final int cpid;
private final ByteBuffer buffer;
private final AtomicInteger pinCount;
private final AtomicBoolean dirty;
private final ReadWriteLock latch;
private final Object replacementStrategyObject;
volatile long dpid;
CachedPage next;
volatile boolean valid;
public CachedPage(int cpid, ByteBuffer buffer, IPageReplacementStrategy pageReplacementStrategy) {
this.cpid = cpid;
this.buffer = buffer;
pinCount = new AtomicInteger();
dirty = new AtomicBoolean();
latch = new ReentrantReadWriteLock(true);
replacementStrategyObject = pageReplacementStrategy.createPerPageStrategyObject(cpid);
dpid = -1;
valid = false;
}
public void reset(long dpid) {
this.dpid = dpid;
dirty.set(false);
valid = false;
pageReplacementStrategy.notifyCachePageReset(this);
}
public void invalidate() {
reset(-1);
}
@Override
public ByteBuffer getBuffer() {
return buffer;
}
@Override
public Object getReplacementStrategyObject() {
return replacementStrategyObject;
}
@Override
public boolean pinIfGoodVictim() {
return pinCount.compareAndSet(0, 1);
}
@Override
public int getCachedPageId() {
return cpid;
}
@Override
public void acquireReadLatch() {
latch.readLock().lock();
}
private void acquireWriteLatch(boolean markDirty) {
latch.writeLock().lock();
if (markDirty) {
if (dirty.compareAndSet(false, true)) {
pinCount.incrementAndGet();
}
}
}
@Override
public void acquireWriteLatch() {
acquireWriteLatch(true);
}
@Override
public void releaseReadLatch() {
latch.readLock().unlock();
}
@Override
public void releaseWriteLatch() {
latch.writeLock().unlock();
}
}
@Override
public ICachedPageInternal getPage(int cpid) {
return cachedPages[cpid];
}
private class CleanerThread extends Thread {
private boolean shutdownStart = false;
private boolean shutdownComplete = false;
private final Object cleanNotification = new Object();
private int cleanCount = 0;
public CleanerThread() {
setPriority(MAX_PRIORITY);
}
@Override
public synchronized void run() {
try {
while (true) {
for (int i = 0; i < numPages; ++i) {
CachedPage cPage = cachedPages[i];
if (cPage.dirty.get()) {
if (cPage.latch.readLock().tryLock()) {
try {
boolean cleaned = true;
try {
write(cPage);
} catch (HyracksDataException e) {
cleaned = false;
}
if (cleaned) {
cPage.dirty.set(false);
cPage.pinCount.decrementAndGet();
synchronized (cleanNotification) {
++cleanCount;
cleanNotification.notifyAll();
}
}
} finally {
cPage.latch.readLock().unlock();
}
} else if (shutdownStart) {
throw new IllegalStateException(
"Cache closed, but unable to acquire read lock on dirty page: " + cPage.dpid);
}
}
}
if (shutdownStart) {
break;
}
try {
wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} finally {
shutdownComplete = true;
notifyAll();
}
}
}
@Override
public void close() {
closed = true;
synchronized (cleanerThread) {
cleanerThread.shutdownStart = true;
cleanerThread.notifyAll();
while (!cleanerThread.shutdownComplete) {
try {
cleanerThread.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
@Override
public void createFile(FileReference fileRef) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Creating file: " + fileRef + " in cache: " + this);
}
synchronized (fileInfoMap) {
fileMapManager.registerFile(fileRef);
}
}
@Override
public void openFile(int fileId) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Opening file: " + fileId + " in cache: " + this);
}
synchronized (fileInfoMap) {
BufferedFileHandle fInfo;
fInfo = fileInfoMap.get(fileId);
if (fInfo == null) {
FileReference fileRef = fileMapManager.lookupFileName(fileId);
FileHandle fh = ioManager.open(fileRef, IIOManager.FileReadWriteMode.READ_WRITE,
IIOManager.FileSyncMode.METADATA_ASYNC_DATA_ASYNC);
fInfo = new BufferedFileHandle(fileId, fh);
fileInfoMap.put(fileId, fInfo);
}
fInfo.incReferenceCount();
}
}
private void sweepAndFlush(int fileId) throws HyracksDataException {
for (int i = 0; i < pageMap.length; ++i) {
CacheBucket bucket = pageMap[i];
bucket.bucketLock.lock();
try {
CachedPage prev = bucket.cachedPage;
while (prev != null) {
CachedPage cPage = prev.next;
if (cPage == null) {
break;
}
if (invalidateIfFileIdMatch(fileId, cPage)) {
prev.next = cPage.next;
cPage.next = null;
} else {
prev = cPage;
}
}
// Take care of the head of the chain.
if (bucket.cachedPage != null) {
if (invalidateIfFileIdMatch(fileId, bucket.cachedPage)) {
CachedPage cPage = bucket.cachedPage;
bucket.cachedPage = bucket.cachedPage.next;
cPage.next = null;
}
}
} finally {
bucket.bucketLock.unlock();
}
}
}
private boolean invalidateIfFileIdMatch(int fileId, CachedPage cPage) throws HyracksDataException {
if (BufferedFileHandle.getFileId(cPage.dpid) == fileId) {
if (cPage.dirty.get()) {
write(cPage);
cPage.dirty.set(false);
cPage.pinCount.decrementAndGet();
}
if (cPage.pinCount.get() != 0) {
throw new IllegalStateException("Page is pinned and file is being closed");
}
cPage.invalidate();
return true;
}
return false;
}
@Override
public void closeFile(int fileId) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Closing file: " + fileId + " in cache: " + this);
}
synchronized (fileInfoMap) {
BufferedFileHandle fInfo = fileInfoMap.get(fileId);
if (fInfo == null) {
throw new HyracksDataException("Closing unopened file");
}
if (fInfo.decReferenceCount() <= 0) {
sweepAndFlush(fileId);
fileInfoMap.remove(fileId);
ioManager.close(fInfo.getFileHandle());
}
}
}
@Override
public synchronized void deleteFile(int fileId) throws HyracksDataException {
if (LOGGER.isLoggable(Level.INFO)) {
LOGGER.info("Deleting file: " + fileId + " in cache: " + this);
}
synchronized (fileInfoMap) {
BufferedFileHandle fInfo = fileInfoMap.get(fileId);
if (fInfo != null) {
throw new HyracksDataException("Deleting open file");
}
fileMapManager.unregisterFile(fileId);
}
}
}