blob: e2b6509d41c796d2860ca62d04a6e20c313ffb27 [file] [log] [blame]
/*
* Copyright 2009-2011 by The Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* you may obtain a copy of the License from
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package edu.uci.ics.asterix.runtime.transaction;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import edu.uci.ics.asterix.transaction.management.exception.ACIDException;
import edu.uci.ics.asterix.transaction.management.resource.ICloseable;
import edu.uci.ics.asterix.transaction.management.resource.TransactionalResourceRepository;
import edu.uci.ics.asterix.transaction.management.service.logging.DataUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.ILogger;
import edu.uci.ics.asterix.transaction.management.service.logging.LogActionType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogType;
import edu.uci.ics.asterix.transaction.management.service.logging.LogUtil;
import edu.uci.ics.asterix.transaction.management.service.logging.LogicalLogLocator;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionContext;
import edu.uci.ics.asterix.transaction.management.service.transaction.TransactionProvider;
import edu.uci.ics.hyracks.dataflow.common.data.accessors.ITupleReference;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndex;
import edu.uci.ics.hyracks.storage.am.common.api.ITreeIndexTupleWriter;
import edu.uci.ics.hyracks.storage.am.common.ophelpers.IndexOp;
/**
* Represents a utility class for generating log records corresponding to
* operations on a ITreeIndex implementation. A TreeLogger instance is thread
* safe and can be shared across multiple threads that may belong to same or
* different transactions.
*/
class TransactionState {
private final Map<Long, TxnThreadState> transactionThreads = new HashMap<Long, TxnThreadState>();
public synchronized TxnThreadState getTransactionThreadState(long threadId) {
return transactionThreads.get(threadId);
}
public synchronized void putTransactionThreadState(long threadId, TxnThreadState txnThreadState) {
this.transactionThreads.put(threadId, txnThreadState);
}
public synchronized void remove(long threadId) {
transactionThreads.remove(threadId);
}
}
/**
* Represents the state of a transaction thread. The state contains information
* that includes the tuple being operated, the operation and the location of the
* log record corresponding to the operation.
*/
class TxnThreadState {
private ITupleReference tuple;
private IndexOp indexOperation;
private LogicalLogLocator logicalLogLocator;
public TxnThreadState(LogicalLogLocator logicalLogLocator, IndexOp indexOperation, ITupleReference tupleReference) {
this.tuple = tupleReference;
this.indexOperation = indexOperation;
this.logicalLogLocator = logicalLogLocator;
}
public synchronized ITupleReference getTuple() {
return tuple;
}
public synchronized void setTuple(ITupleReference tuple) {
this.tuple = tuple;
}
public synchronized IndexOp getIndexOperation() {
return indexOperation;
}
public synchronized void setIndexOperation(IndexOp indexOperation) {
this.indexOperation = indexOperation;
}
public synchronized LogicalLogLocator getLogicalLogLocator() {
return logicalLogLocator;
}
public synchronized void setLogicalLogLocator(LogicalLogLocator logicalLogLocator) {
this.logicalLogLocator = logicalLogLocator;
}
}
public class TreeLogger implements ILogger, ICloseable {
private static final byte resourceMgrId = TreeResourceManager.ID;
private final Map<Object, Object> arguments = new ConcurrentHashMap<Object, Object>();
public static final String TREE_INDEX = "TREE_INDEX";
public static final String TUPLE_REFERENCE = "TUPLE_REFERENCE";
public static final String TUPLE_WRITER = "TUPLE_WRITER";
public static final String INDEX_OPERATION = "INDEX_OPERATION";
public static final String RESOURCE_ID = "RESOURCE_ID";
private final ITreeIndex treeIndex;
private final ITreeIndexTupleWriter treeIndexTupleWriter;
private final byte[] resourceIdBytes;
private final byte[] resourceIdLengthBytes;
public class BTreeOperationCodes {
public static final byte INSERT = 0;
public static final byte DELETE = 1;
}
public TreeLogger(byte[] resourceIdBytes) {
this.resourceIdBytes = resourceIdBytes;
treeIndex = (ITreeIndex) TransactionalResourceRepository.getTransactionalResource(resourceIdBytes);
treeIndexTupleWriter = treeIndex.getLeafFrameFactory().getTupleWriterFactory().createTupleWriter();
this.resourceIdLengthBytes = DataUtil.intToByteArray(resourceIdBytes.length);
}
public synchronized void close(TransactionContext context) {
TransactionState txnState = (TransactionState) arguments.get(context.getTransactionID());
txnState.remove(Thread.currentThread().getId());
arguments.remove(context.getTransactionID());
}
public void generateLogRecord(TransactionProvider provider, TransactionContext context, IndexOp operation,
ITupleReference tuple) throws ACIDException {
context.addCloseableResource(this); // the close method would be called
// on this TreeLogger instance at
// the time of transaction
// commit/abort.
if (operation != IndexOp.INSERT && operation != IndexOp.DELETE) {
throw new ACIDException("Loging for Operation " + operation + " not supported");
}
TxnThreadState txnThreadState = null;
TransactionState txnState;
txnState = (TransactionState) arguments.get(context.getTransactionID());
if (txnState == null) {
synchronized (context) { // threads belonging to different
// transaction do not need to
// synchronize amongst them.
if (txnState == null) {
txnState = new TransactionState();
arguments.put(context.getTransactionID(), txnState);
}
}
}
txnThreadState = txnState.getTransactionThreadState(Thread.currentThread().getId());
if (txnThreadState == null) {
LogicalLogLocator logicalLogLocator = LogUtil.getDummyLogicalLogLocator(provider.getLogManager());
txnThreadState = new TxnThreadState(logicalLogLocator, operation, tuple);
txnState.putTransactionThreadState(Thread.currentThread().getId(), txnThreadState);
}
txnThreadState.setIndexOperation(operation);
txnThreadState.setTuple(tuple);
int tupleSize = treeIndexTupleWriter.bytesRequired(tuple);
// Below 4 is for the int representing the length of resource id and 1
// is for
// the byte representing the operation
int logContentLength = 4 + resourceIdBytes.length + 1 + tupleSize;
provider.getLogManager().log(txnThreadState.getLogicalLogLocator(), context, resourceMgrId, 0L, LogType.UPDATE,
LogActionType.REDO_UNDO, logContentLength, (ILogger) this, arguments);
}
@Override
public void log(TransactionContext context, LogicalLogLocator logicalLogLocator, int logRecordSize,
Map<Object, Object> loggerArguments) throws ACIDException {
TransactionState txnState = (TransactionState) loggerArguments.get(context.getTransactionID());
TxnThreadState state = (TxnThreadState) txnState.getTransactionThreadState(Thread.currentThread().getId());
int count = 0;
byte[] logBuffer = logicalLogLocator.getBuffer().getArray();
System.arraycopy(resourceIdLengthBytes, 0, logBuffer, logicalLogLocator.getMemoryOffset(), 4);
count += 4; // count is incremented by 4 because we wrote the length
// that is an int and hence 4 bytes
System.arraycopy(resourceIdBytes, 0, logBuffer, logicalLogLocator.getMemoryOffset() + count,
resourceIdBytes.length);
count += resourceIdBytes.length;
logBuffer[logicalLogLocator.getMemoryOffset() + count] = (byte) state.getIndexOperation().ordinal();
count += 1; // count is incremented by 1 to account for the byte
// written.
treeIndexTupleWriter.writeTuple(state.getTuple(), logicalLogLocator.getBuffer().getArray(),
logicalLogLocator.getMemoryOffset() + count);
}
@Override
public void postLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
}
@Override
public void preLog(TransactionContext context, Map<Object, Object> loggerArguments) throws ACIDException {
}
}