blob: 86d9dc2ab8571f8cd166390ada2cb0da9b54b91a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.dboe.transaction.txn;
import static org.apache.jena.dboe.transaction.txn.TxnState.*;
import java.nio.ByteBuffer;
import java.util.Objects;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.atlas.lib.InternalErrorException;
/**
* Base implementation of the component interface for {@link TransactionalComponent}.
*/
public abstract class TransactionalComponentLifecycle<X> implements TransactionalComponent {
// Pass down recovery operations.
// This class has no transaction-recorded state.
@Override public abstract void startRecovery();
@Override public abstract void recover(ByteBuffer ref);
@Override public abstract void finishRecovery();
// ---- Normal operation
private static final boolean CHECKING = false;
private ThreadLocal<TxnState> trackTxn = CHECKING ? ThreadLocal.withInitial(() -> INACTIVE) : null;
// Access to these two must be via the getter/setters below only.
// Allows stuff for thread switching.
private ThreadLocal<Transaction> threadTxn = new ThreadLocal<>();
private ThreadLocal<X> componentState = new ThreadLocal<>();
private final ComponentId componentId;
protected TransactionalComponentLifecycle(ComponentId componentId) {
this.componentId = componentId;
}
@Override
public ComponentId getComponentId() {
return componentId;
}
// // Very dangerous!
// protected void setForThread(Transaction txn, X state) {
// threadTxn.set(txn);
// componentState.set(state);
// }
/** Start a transaction */
@Override
final
public void begin(Transaction transaction) {
Objects.requireNonNull(transaction);
setTransaction(transaction);
checkState(INACTIVE, COMMITTED, ABORTED);
setTrackTxn(ACTIVE);
X x = _begin(transaction.getMode(), transaction.getTxnId());
setDataState(x);
}
/** Promote a component in a transaction */
@Override
final
public boolean promote(Transaction transaction) {
Objects.requireNonNull(transaction);
checkState(ACTIVE);
X newState = _promote(transaction.getTxnId(), getDataState());
if ( newState == null )
return false;
setDataState(newState);
return true;
}
/** Commit a transaction (make durable): prepare - commit - cleanup */
@Override
final
public ByteBuffer commitPrepare(Transaction transaction) {
checkAligned(transaction);
checkState(ACTIVE);
try { return _commitPrepare(transaction.getTxnId(), getDataState()); }
finally { setTrackTxn(PREPARE); }
}
@Override
final
public void commit(Transaction transaction) {
checkAligned(transaction);
checkState(PREPARE);
_commit(transaction.getTxnId(), getDataState());
setTrackTxn(COMMIT);
}
@Override
final
public void commitEnd(Transaction transaction) {
checkAligned(transaction);
checkState(COMMIT);
_commitEnd(transaction.getTxnId(), getDataState());
setTrackTxn(COMMITTED);
internalComplete(transaction);
}
@Override
final
public void abort(Transaction transaction) {
checkAligned(transaction);
checkState(ACTIVE, PREPARE, COMMIT);
_abort(transaction.getTxnId(), getDataState());
setTrackTxn(ABORTED);
internalComplete(transaction);
}
private void internalComplete(Transaction transaction) {
_complete(transaction.getTxnId(), getDataState());
setTrackTxn(INACTIVE);
releaseThreadState();
}
@Override
final
public void complete(Transaction transaction) {
if ( transaction.hasFinished() )
return;
checkAligned(transaction);
ReadWrite m = getReadWriteMode();
switch(m) {
case READ:
checkState(ACTIVE, COMMITTED, ABORTED);
break;
case WRITE:
// If bad, force abort?
checkState(COMMITTED, ABORTED);
break;
}
_complete(transaction.getTxnId(), getDataState());
switch(m) {
case READ:
internalComplete(transaction);
break;
case WRITE:
// complete happened in the commit or abort.
break;
}
}
@Override
final
public void shutdown() {
_shutdown();
clearInternal();
}
@Override
final public SysTransState detach() {
TxnState txnState = getTxnState();
if ( txnState == null )
return null;
checkState(ACTIVE);
setTrackTxn(DETACHED);
SysTransState transState = new SysTransState(this, getTransaction(), getDataState());
//****** Thread locals
releaseThreadState();
return transState;
}
@Override
public void attach(SysTransState state) {
@SuppressWarnings("unchecked")
X x = (X)state.getState();
// // reset to not thread not in
// if ( CHECKING )
// trackTxn : ThreadLocal<TxnState>
//
//
setTransaction(state.getTransaction());
setDataState(x);
setTrackTxn(ACTIVE);
}
// -- Access object members.
public static class ComponentState<X> {
final TxnState state;
final Transaction txn;
final X componentState;
ComponentState(TxnState state, Transaction txn, X componentState) {
super();
this.state = state;
this.txn = txn;
this.componentState = componentState;
}
}
public ComponentState<X> getComponentState() {
return new ComponentState<>(getTrackTxn(), getTransaction(), getDataState());
}
public void setComponentState(ComponentState<X> state) {
setTrackTxn(state.state);
setTransaction(state.txn);
setDataState(state.componentState);
}
protected void releaseThreadState() {
// Remove thread locals
if ( trackTxn != null )
trackTxn.remove();
componentState.remove();
// java version "1.8.0_31"
// Java(TM) SE Runtime Environment (build 1.8.0_31-b13)
// Java HotSpot(TM) 64-Bit Server VM (build 25.31-b07, mixed mode)
//
// openjdk version "1.8.0_40-internal"
// OpenJDK Runtime Environment (build 1.8.0_40-internal-b09)
// OpenJDK 64-Bit Server VM (build 25.40-b13, mixed mode)
// This one is very important else the memory usage grows. Not clear why.
// A Transaction has an internal AtomicReference. Replacing the AtomicReference
// with a plain member variable slows the growth down greatly.
threadTxn.remove();
}
protected void clearInternal() {
trackTxn = null;
threadTxn = null;
componentState = null;
}
//protected ComponentState<X> getState()
protected X getDataState() { return componentState.get(); }
protected void setDataState(X data) { componentState.set(data); }
protected Transaction getTransaction() { return threadTxn.get(); }
protected void setTransaction(Transaction txn) { threadTxn.set(txn); }
private void setTrackTxn(TxnState newState) {
if ( ! CHECKING ) return;
trackTxn.set(newState);
}
// This is our record of the state - it is not necessarily the transactions
// view during changes. This class tracks the expected incomign state and
// the transaction
private TxnState getTrackTxn() {
if ( ! CHECKING ) return null;
return trackTxn.get();
}
// -- Access object members.
// XXX Align to javadoc in TransactionalComponent.
/* There are two lifecycles, one for write transaction, one
* for read transactions. This affects how transaction end so
* when/if promoted read->write transactions happen, a promoted
* transaction will follow the write lifecycle.
*
* In both lifecyles, the implementer can assume that calls
* happen at the right points and called only as needed. Framework
* takes care of checking.
*
* Read lifecycle:
* A read transaction be be just begin(READ)-end() but may also
* have commit or abort before end. The _commitRead and _abortRead
* calls note if an explicit commit or abort occurs but may not be
* called. _endRead is always called exactly once.
*
* _commitRead
* _abortRead
* _endRead
* _complete
*
* Write lifecycle:
* A write transaction must have a commit() or abort() before end().
* The fraemwork will check this.
*
* If the transaction commits:
* _commitPrepareWrite
* _commitWrite -- The transaction is
* _commitEndWrite
*
* If the transaction aborts:
* _abortWrite
*
* After any lifecycle, a final call of
* _complete()
*
* indicates ths transaction has fully finished.
*
* Typically, an implementation does not need to take action in every call.
*/
protected abstract X _begin(ReadWrite readWrite, TxnId txnId);
protected abstract X _promote(TxnId txnId, X oldState);
protected abstract ByteBuffer _commitPrepare(TxnId txnId, X state);
protected abstract void _commit(TxnId txnId, X state);
protected abstract void _commitEnd(TxnId txnId, X state);
protected abstract void _abort(TxnId txnId, X state);
protected abstract void _complete(TxnId txnId, X state);
protected abstract void _shutdown();
protected ReadWrite getReadWriteMode() {
Transaction txn = getTransaction();
return txn.getMode();
}
protected boolean isActiveTxn() {
TxnState txnState = getTxnState();
if ( txnState == null )
return false;
switch(getTxnState()) {
case INACTIVE: case END_ABORTED: case END_COMMITTED:
return false;
case ACTIVE: case DETACHED: case PREPARE: case ABORTED: case COMMIT: case COMMITTED:
return true;
//null: default: return false;
// Get the compiler to check all states covered.
}
// Should not happen.
throw new InternalErrorException("Unclear transaction state");
}
protected boolean isReadTxn() { return ! isWriteTxn(); }
protected boolean isWriteTxn() {
Transaction txn = getTransaction();
return txn.isWriteTxn();
}
protected void checkTxn() {
if ( ! isActiveTxn() )
throw new TransactionException("Not in a transaction");
}
protected void requireTxn() {
Transaction txn = getTransaction();
if ( txn == null )
throw new TransactionException("Not in a transaction");
}
protected void requireWriteTxn() {
Transaction txn = getTransaction();
if ( txn == null )
throw new TransactionException("Not a transaction");
txn.requireWriteTxn();
}
// Better done
// protected void ensureWriteTxn() {
// Transaction txn = getTransaction();
// if ( txn == null )
// throw new TransactionException("Not a transaction");
// txn.ensureWriteTxn();
// }
// -- Access object members.
private TxnState getTxnState() {
Transaction txn = getTransaction();
if ( txn == null )
return null;
return txn.getState();
}
private void checkAligned(Transaction transaction) {
if ( ! CHECKING ) return;
Transaction txn = getTransaction();
if ( txn != transaction )
throw new TransactionException("Transaction is not the transaction of the thread");
}
private void checkState(TxnState expected) {
if ( ! CHECKING ) return;
TxnState s = getTrackTxn();
if ( s != expected )
throw new TransactionException("Transaction is in state "+s+": expected state "+expected);
}
private void checkState(TxnState expected1, TxnState expected2) {
if ( ! CHECKING ) return;
TxnState s = getTrackTxn();
if ( s != expected1 && s != expected2 )
throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2);
}
// Avoid varargs ... undue worry?
private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
if ( ! CHECKING ) return;
TxnState s = getTrackTxn();
if ( s != expected1 && s != expected2 && s != expected3 )
throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3);
}
// private void checkStateNot(State unexpected) {
// State s = state.get();
// if ( s == unexpected )
// throw new TransactionException("Transaction in unexpected state "+s);
// }
}