blob: 7efe7f1a5d61f5ec15a31f711efcab7048cd552d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kahadb.page;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.apache.kahadb.page.PageFile.PageWrite;
import org.apache.kahadb.util.ByteSequence;
import org.apache.kahadb.util.DataByteArrayInputStream;
import org.apache.kahadb.util.DataByteArrayOutputStream;
import org.apache.kahadb.util.Marshaller;
import org.apache.kahadb.util.Sequence;
import org.apache.kahadb.util.SequenceSet;
/**
* The class used to read/update a PageFile object. Using a transaction allows you to
* do multiple update operations in a single unit of work.
*/
public class Transaction implements Iterable<Page> {
/**
* The PageOverflowIOException occurs when a page write is requested
* and it's data is larger than what would fit into a single page.
*/
public class PageOverflowIOException extends IOException {
public PageOverflowIOException(String message) {
super(message);
}
}
/**
* The InvalidPageIOException is thrown if try to load/store a a page
* with an invalid page id.
*/
public class InvalidPageIOException extends IOException {
private final long page;
public InvalidPageIOException(String message, long page) {
super(message);
this.page = page;
}
public long getPage() {
return page;
}
}
/**
* This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
*
* @param <T> The type of exceptions that operation will throw.
*/
public interface Closure <T extends Throwable> {
public void execute(Transaction tx) throws T;
}
/**
* This closure interface is intended for the end user implement callbacks for the Transaction.exectue() method.
*
* @param <R> The type of result that the closure produces.
* @param <T> The type of exceptions that operation will throw.
*/
public interface CallableClosure<R, T extends Throwable> {
public R execute(Transaction tx) throws T;
}
// The page file that this Transaction operates against.
private final PageFile pageFile;
// If this transaction is updating stuff.. this is the tx of
private long writeTransactionId=-1;
// List of pages that this transaction has modified.
private HashMap<Long, PageWrite> writes=new HashMap<Long, PageWrite>();
// List of pages allocated in this transaction
private final SequenceSet allocateList = new SequenceSet();
// List of pages freed in this transaction
private final SequenceSet freeList = new SequenceSet();
Transaction(PageFile pageFile) {
this.pageFile = pageFile;
}
/**
* @return the page file that created this Transaction
*/
public PageFile getPageFile() {
return this.pageFile;
}
/**
* Allocates a free page that you can write data to.
*
* @return a newly allocated page.
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public <T> Page<T> allocate() throws IOException {
return allocate(1);
}
/**
* Allocates a block of free pages that you can write data to.
*
* @param count the number of sequential pages to allocate
* @return the first page of the sequential set.
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public <T> Page<T> allocate(int count) throws IOException {
// TODO: we need to track allocated pages so that they can be returned if the
// transaction gets rolled back.
Page<T> rc = pageFile.allocate(count);
allocateList.add(new Sequence(rc.getPageId(), rc.getPageId()+count-1));
return rc;
}
/**
* Frees up a previously allocated page so that it can be re-allocated again.
*
* @param page the page to free up
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public void free(long pageId) throws IOException {
free(load(pageId, null));
}
/**
* Frees up a previously allocated sequence of pages so that it can be re-allocated again.
*
* @param page the initial page of the sequence that will be getting freed
* @param count the number of pages in the sequence
*
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public void free(long pageId, int count) throws IOException {
free(load(pageId, null), count);
}
/**
* Frees up a previously allocated sequence of pages so that it can be re-allocated again.
*
* @param page the initial page of the sequence that will be getting freed
* @param count the number of pages in the sequence
*
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public <T> void free(Page<T> page, int count) throws IOException {
pageFile.assertLoaded();
long offsetPage = page.getPageId();
for (int i = 0; i < count; i++) {
if (page == null) {
page = load(offsetPage + i, null);
}
free(page);
page = null;
}
}
/**
* Frees up a previously allocated page so that it can be re-allocated again.
*
* @param page the page to free up
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public <T> void free(Page<T> page) throws IOException {
pageFile.assertLoaded();
// We may need loop to free up a page chain.
while (page != null) {
// Is it already free??
if (page.getType() == Page.PAGE_FREE_TYPE) {
return;
}
Page<T> next = null;
if (page.getType() == Page.PAGE_PART_TYPE) {
next = load(page.getNext(), null);
}
page.makeFree(getWriteTransactionId());
DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize());
page.write(out);
write(page, out.getData());
freeList.add(page.getPageId());
page = next;
}
}
/**
*
* @param page
* the page to write. The Page object must be fully populated with a valid pageId, type, and data.
* @param marshaller
* the marshaler to use to load the data portion of the Page, may be null if you do not wish to write the data.
* @param overflow
* If true, then if the page data marshalls to a bigger size than can fit in one page, then additional
* overflow pages are automatically allocated and chained to this page to store all the data. If false,
* and the overflow condition would occur, then the PageOverflowIOException is thrown.
* @throws IOException
* If an disk error occurred.
* @throws PageOverflowIOException
* If the page data marshalls to size larger than maximum page size and overflow was false.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public <T> void store(Page<T> page, Marshaller<T> marshaller, final boolean overflow) throws IOException {
DataByteArrayOutputStream out = (DataByteArrayOutputStream)openOutputStream(page, overflow);
if (marshaller != null) {
marshaller.writePayload(page.get(), out);
}
out.close();
}
/**
* @throws IOException
*/
public OutputStream openOutputStream(Page page, final boolean overflow) throws IOException {
pageFile.assertLoaded();
// Copy to protect against the end user changing
// the page instance while we are doing a write.
final Page copy = page.copy();
pageFile.addToCache(copy);
//
// To support writing VERY large data, we override the output stream so
// that we
// we do the page writes incrementally while the data is being
// marshalled.
DataByteArrayOutputStream out = new DataByteArrayOutputStream(pageFile.getPageSize() * 2) {
Page current = copy;
@SuppressWarnings("unchecked")
@Override
protected void onWrite() throws IOException {
// Are we at an overflow condition?
final int pageSize = pageFile.getPageSize();
if (pos >= pageSize) {
// If overflow is allowed
if (overflow) {
Page next;
if (current.getType() == Page.PAGE_PART_TYPE) {
next = load(current.getNext(), null);
} else {
next = allocate();
}
next.txId = current.txId;
// Write the page header
int oldPos = pos;
pos = 0;
current.makePagePart(next.getPageId(), getWriteTransactionId());
current.write(this);
// Do the page write..
byte[] data = new byte[pageSize];
System.arraycopy(buf, 0, data, 0, pageSize);
Transaction.this.write(current, data);
// Reset for the next page chunk
pos = 0;
// The page header marshalled after the data is written.
skip(Page.PAGE_HEADER_SIZE);
// Move the overflow data after the header.
System.arraycopy(buf, pageSize, buf, pos, oldPos - pageSize);
pos += oldPos - pageSize;
current = next;
} else {
throw new PageOverflowIOException("Page overflow.");
}
}
}
@SuppressWarnings("unchecked")
@Override
public void close() throws IOException {
super.close();
// We need to free up the rest of the page chain..
if (current.getType() == Page.PAGE_PART_TYPE) {
free(current.getNext());
}
current.makePageEnd(pos, getWriteTransactionId());
// Write the header..
pos = 0;
current.write(this);
Transaction.this.write(current, buf);
}
};
// The page header marshaled after the data is written.
out.skip(Page.PAGE_HEADER_SIZE);
return out;
}
/**
* Loads a page from disk.
*
* @param pageId
* the id of the page to load
* @param marshaller
* the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
* @return The page with the given id
* @throws IOException
* If an disk error occurred.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public <T> Page<T> load(long pageId, Marshaller<T> marshaller) throws IOException {
pageFile.assertLoaded();
Page<T> page = new Page<T>(pageId);
load(page, marshaller);
return page;
}
/**
* Loads a page from disk.
*
* @param page - The pageId field must be properly set
* @param marshaller
* the marshaler to use to load the data portion of the Page, may be null if you do not wish to load the data.
* @throws IOException
* If an disk error occurred.
* @throws InvalidPageIOException
* If the page is is not valid.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
@SuppressWarnings("unchecked")
public <T> void load(Page<T> page, Marshaller<T> marshaller) throws IOException {
pageFile.assertLoaded();
// Can't load invalid offsets...
long pageId = page.getPageId();
if (pageId < 0) {
throw new InvalidPageIOException("Page id is not valid", pageId);
}
// It might be a page this transaction has modified...
PageWrite update = writes.get(pageId);
if (update != null) {
page.copy(update.getPage());
return;
}
// We may be able to get it from the cache...
Page<T> t = pageFile.getFromCache(pageId);
if (t != null) {
page.copy(t);
return;
}
if (marshaller != null) {
// Full page read..
InputStream is = openInputStream(page);
DataInputStream dataIn = new DataInputStream(is);
page.set(marshaller.readPayload(dataIn));
is.close();
} else {
// Page header read.
DataByteArrayInputStream in = new DataByteArrayInputStream(new byte[Page.PAGE_HEADER_SIZE]);
pageFile.readPage(pageId, in.getRawData());
page.read(in);
page.set(null);
}
// Cache it.
if (marshaller != null) {
pageFile.addToCache(page);
}
}
/**
* @see org.apache.kahadb.page.Transaction#load(org.apache.kahadb.page.Page,
* org.apache.kahadb.util.Marshaller)
*/
public InputStream openInputStream(final Page p) throws IOException {
return new InputStream() {
private ByteSequence chunk = new ByteSequence(new byte[pageFile.getPageSize()]);
private Page page = readPage(p);
private int pageCount = 1;
private Page markPage;
private ByteSequence markChunk;
private Page readPage(Page page) throws IOException {
// Read the page data
pageFile.readPage(page.getPageId(), chunk.getData());
chunk.setOffset(0);
chunk.setLength(pageFile.getPageSize());
DataByteArrayInputStream in = new DataByteArrayInputStream(chunk);
page.read(in);
chunk.setOffset(Page.PAGE_HEADER_SIZE);
if (page.getType() == Page.PAGE_END_TYPE) {
chunk.setLength((int)(page.getNext()));
}
if (page.getType() == Page.PAGE_FREE_TYPE) {
throw new EOFException("Chunk stream does not exist at page: " + page.getPageId());
}
return page;
}
public int read() throws IOException {
if (!atEOF()) {
return chunk.data[chunk.offset++] & 0xff;
} else {
return -1;
}
}
private boolean atEOF() throws IOException {
if (chunk.offset < chunk.length) {
return false;
}
if (page.getType() == Page.PAGE_END_TYPE) {
return true;
}
fill();
return chunk.offset >= chunk.length;
}
private void fill() throws IOException {
page = readPage(new Page(page.getNext()));
pageCount++;
}
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}
public int read(byte b[], int off, int len) throws IOException {
if (!atEOF()) {
int rc = 0;
while (!atEOF() && rc < len) {
len = Math.min(len, chunk.length - chunk.offset);
if (len > 0) {
System.arraycopy(chunk.data, chunk.offset, b, off, len);
chunk.offset += len;
}
rc += len;
}
return rc;
} else {
return -1;
}
}
public long skip(long len) throws IOException {
if (atEOF()) {
int rc = 0;
while (!atEOF() && rc < len) {
len = Math.min(len, chunk.length - chunk.offset);
if (len > 0) {
chunk.offset += len;
}
rc += len;
}
return rc;
} else {
return -1;
}
}
public int available() {
return chunk.length - chunk.offset;
}
public boolean markSupported() {
return true;
}
public void mark(int markpos) {
markPage = page;
byte data[] = new byte[pageFile.getPageSize()];
System.arraycopy(chunk.getData(), 0, data, 0, pageFile.getPageSize());
markChunk = new ByteSequence(data, chunk.getOffset(), chunk.getLength());
}
public void reset() {
page = markPage;
chunk = markChunk;
}
};
}
/**
* Allows you to iterate through all active Pages in this object. Pages with type Page.FREE_TYPE are
* not included in this iteration.
*
* Pages removed with Iterator.remove() will not actually get removed until the transaction commits.
*
* @throws IllegalStateException
* if the PageFile is not loaded
*/
@SuppressWarnings("unchecked")
public Iterator<Page> iterator() {
return (Iterator<Page>)iterator(false);
}
/**
* Allows you to iterate through all active Pages in this object. You can optionally include free pages in the pages
* iterated.
*
* @param includeFreePages - if true, free pages are included in the iteration
* @param tx - if not null, then the remove() opeation on the Iterator will operate in scope of that transaction.
* @throws IllegalStateException
* if the PageFile is not loaded
*/
public Iterator<Page> iterator(final boolean includeFreePages) {
pageFile.assertLoaded();
return new Iterator<Page>() {
long nextId;
Page nextPage;
Page lastPage;
private void findNextPage() {
if (!pageFile.isLoaded()) {
throw new IllegalStateException("Cannot iterate the pages when the page file is not loaded");
}
if (nextPage != null) {
return;
}
try {
while (nextId < pageFile.getPageCount()) {
Page page = load(nextId, null);
if (includeFreePages || page.getType() != Page.PAGE_FREE_TYPE) {
nextPage = page;
return;
} else {
nextId++;
}
}
} catch (IOException e) {
}
}
public boolean hasNext() {
findNextPage();
return nextPage != null;
}
public Page next() {
findNextPage();
if (nextPage != null) {
lastPage = nextPage;
nextPage = null;
nextId++;
return lastPage;
} else {
throw new NoSuchElementException();
}
}
@SuppressWarnings("unchecked")
public void remove() {
if (lastPage == null) {
throw new IllegalStateException();
}
try {
free(lastPage);
lastPage = null;
} catch (IOException e) {
new RuntimeException(e);
}
}
};
}
///////////////////////////////////////////////////////////////////
// Commit / Rollback related methods..
///////////////////////////////////////////////////////////////////
/**
* Commits the transaction to the PageFile as a single 'Unit of Work'. Either all page updates associated
* with the transaction are written to disk or none will.
*/
public void commit() throws IOException {
if( writeTransactionId!=-1 ) {
// Actually do the page writes...
pageFile.write(writes.entrySet());
// Release the pages that were freed up in the transaction..
freePages(freeList);
freeList.clear();
allocateList.clear();
writes.clear();
writeTransactionId = -1;
}
}
/**
* Rolls back the transaction.
*/
public void rollback() throws IOException {
if( writeTransactionId!=-1 ) {
// Release the pages that were allocated in the transaction...
freePages(allocateList);
freeList.clear();
allocateList.clear();
writes.clear();
writeTransactionId = -1;
}
}
private long getWriteTransactionId() {
if( writeTransactionId==-1 ) {
writeTransactionId = pageFile.getNextWriteTransactionId();
}
return writeTransactionId;
}
/**
* Queues up a page write that should get done when commit() gets called.
*/
@SuppressWarnings("unchecked")
private void write(final Page page, byte[] data) throws IOException {
Long key = page.getPageId();
// TODO: if a large update transaction is in progress, we may want to move
// all the current updates to a temp file so that we don't keep using
// up memory.
writes.put(key, new PageWrite(page, data));
}
/**
* @param list
* @throws RuntimeException
*/
private void freePages(SequenceSet list) throws RuntimeException {
Sequence seq = list.getHead();
while( seq!=null ) {
seq.each(new Sequence.Closure<RuntimeException>(){
public void execute(long value) {
pageFile.freePage(value);
}
});
seq = seq.getNext();
}
}
/**
* @return true if there are no uncommitted page file updates associated with this transaction.
*/
public boolean isReadOnly() {
return writeTransactionId==-1;
}
///////////////////////////////////////////////////////////////////
// Transaction closure helpers...
///////////////////////////////////////////////////////////////////
/**
* Executes a closure and if it does not throw any exceptions, then it commits the transaction.
* If the closure throws an Exception, then the transaction is rolled back.
*
* @param <T>
* @param closure - the work to get exectued.
* @throws T if the closure throws it
* @throws IOException If the commit fails.
*/
public <T extends Throwable> void execute(Closure<T> closure) throws T, IOException {
boolean success = false;
try {
closure.execute(this);
success = true;
} finally {
if (success) {
commit();
} else {
rollback();
}
}
}
/**
* Executes a closure and if it does not throw any exceptions, then it commits the transaction.
* If the closure throws an Exception, then the transaction is rolled back.
*
* @param <T>
* @param closure - the work to get exectued.
* @throws T if the closure throws it
* @throws IOException If the commit fails.
*/
public <R, T extends Throwable> R execute(CallableClosure<R, T> closure) throws T, IOException {
boolean success = false;
try {
R rc = closure.execute(this);
success = true;
return rc;
} finally {
if (success) {
commit();
} else {
rollback();
}
}
}
}