blob: f5e4fe0e0e67483d7a975fc6806003357b00fed2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.dboe.transaction.txn;
import static org.apache.jena.dboe.transaction.txn.TxnState.ABORTED;
import static org.apache.jena.dboe.transaction.txn.TxnState.ACTIVE;
import static org.apache.jena.dboe.transaction.txn.TxnState.COMMIT;
import static org.apache.jena.dboe.transaction.txn.TxnState.COMMITTED;
import static org.apache.jena.dboe.transaction.txn.TxnState.DETACHED;
import static org.apache.jena.dboe.transaction.txn.TxnState.END_ABORTED;
import static org.apache.jena.dboe.transaction.txn.TxnState.END_COMMITTED;
import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE;
import static org.apache.jena.dboe.transaction.txn.TxnState.PREPARE;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.jena.query.ReadWrite;
import org.apache.jena.query.TxnType;
/**
* A transaction as the composition of actions on components.
* Works in conjunction with the TransactionCoordinator
* to provide the transaction lifecycle.
* @see TransactionCoordinator
* @see TransactionalComponent
*/
final
public class Transaction implements TransactionInfo {
// Using an AtomicReference<TxnState> requires that
// TransactionalComponentLifecycle.internalComplete
// frees the thread local for the threadTxn, otherwise memory
// usage grows. If a plain member variable is used slow growth
// is still seen.
// Nulling txnMgr and clearing components stops that slow growth.
private TransactionCoordinator txnMgr;
private final TxnId txnId;
private final List<SysTrans> components;
// Using an AtomicReference makes this observable from the outside.
// It also allow for multithreaded transactions (later).
private final AtomicReference<TxnState> state = new AtomicReference<>();
//private TxnState state;
private final long dataVersion;
private final TxnType txnType;
private ReadWrite mode;
public Transaction(TransactionCoordinator txnMgr, TxnType txnType, ReadWrite readWrite, TxnId txnId, long dataVersion, List<SysTrans> components) {
Objects.requireNonNull(txnMgr);
Objects.requireNonNull(txnId);
Objects.requireNonNull(readWrite);
Objects.requireNonNull(components);
this.txnMgr = txnMgr;
this.txnId = txnId;
this.txnType = txnType;
this.mode = readWrite;
this.dataVersion = dataVersion;
this.components = components;
setState(INACTIVE);
}
// /*package*/ void resetDataVersion(long dataVersion) {
// this.dataVersion = dataVersion;
// }
/*package*/ void setState(TxnState newState) {
state.set(newState);
}
@Override
public TxnState getState() {
return state.get();
}
/**
* Each transaction is allocated a serialization point by the transaction
* coordinator. Normally, this is related to this number and it increases
* over time as the data changes. Two readers can have the same
* serialization point - they are working with the same view of the data.
*/
@Override
public long getDataVersion() {
return dataVersion;
}
public void begin() {
checkState(INACTIVE);
components.forEach((c) -> c.begin());
setState(ACTIVE);
}
private boolean promoteReadCommitted() {
if ( txnType == TxnType.READ_COMMITTED_PROMOTE ) return true;
if ( txnType == TxnType.READ_PROMOTE ) return false;
return false;
}
public boolean promote() {
return promote(promoteReadCommitted());
}
public boolean promote(boolean readCommitted) {
checkState(ACTIVE);
if ( txnType == TxnType.READ )
return false;
boolean b = txnMgr.executePromote(this, readCommitted);
if ( !b )
return false;
mode = ReadWrite.WRITE;
return true;
}
/*package*/ void promoteComponents() {
// Call back from the Transaction coordinator during promote.
components.forEach((c) -> {
if ( ! c.promote() )
throw new TransactionException("Failed to promote");
});
mode = ReadWrite.WRITE;
}
public void notifyUpdate() {
checkState(ACTIVE);
if ( mode == ReadWrite.READ ) {
promote(promoteReadCommitted());
mode = ReadWrite.WRITE;
}
}
public void prepare() {
checkState(ACTIVE);
if ( mode == ReadWrite.WRITE )
txnMgr.executePrepare(this);
setState(PREPARE);
}
public void commit() {
// The commit point is in TransactionCoordinator.executeCommit().
TxnState s = getState();
if ( s == ACTIVE )
// Auto exec prepare().
prepare();
checkState(PREPARE);
setState(COMMIT);
// Sys abort -> state?
switch(mode) {
case WRITE:
txnMgr.executeCommit(this,
()->{components.forEach((c) -> c.commit()); } ,
()->{components.forEach((c) -> c.commitEnd()); },
()->{components.forEach((c) -> c.abort()); }
);
break;
case READ:
// Different lifecycle?
txnMgr.executeCommit(this,
()->{components.forEach((c) -> c.commit()); } ,
()->{components.forEach((c) -> c.commitEnd()); } ,
()->{components.forEach((c) -> c.abort()); }
);
break;
}
setState(COMMITTED);
endInternal();
}
public void abort() {
abort$();
endInternal();
}
private void abort$() {
// Split into READ and WRITE forms.
checkState(ACTIVE, ABORTED);
// Includes notification start/finish
txnMgr.executeAbort(this, ()-> { components.forEach((c) -> c.abort()); }) ;
setState(ABORTED);
}
public void end() {
// [1746]
// txnMgr.executeEnd(thus, ()->{});
txnMgr.notifyEndStart(this);
if ( isWriteTxn() && getState() == ACTIVE ) {
//Log.warn(this, "Write transaction with no commit() or abort() before end()");
// Just abort process.
abort$();
endInternal();
throw new TransactionException("Write transaction with no commit() or abort() before end() - forced abort");
}
endInternal();
txnMgr.notifyEndFinish(this);
txnMgr = null;
//components.clear();
}
private void endInternal() {
if ( hasFinalised() )
return;
// Called once, at the first abort/commit/end.
txnMgr.notifyCompleteStart(this);
components.forEach((c) -> c.complete());
txnMgr.completed(this);
if ( getState() == COMMITTED )
setState(END_COMMITTED);
else
setState(END_ABORTED);
txnMgr.notifyCompleteFinish(this);
}
/*package*/ List<SysTrans> getComponents() {
return components;
}
/*package*/ void detach() {
checkState(ACTIVE,PREPARE);
setState(DETACHED);
}
/*package*/ void attach() {
checkState(DETACHED);
setState(ACTIVE);
}
/** Require a WRITE transaction - do not try to promote. */
public void requireWriteTxn() {
checkState(ACTIVE);
if ( mode != ReadWrite.WRITE )
throw new TransactionException("Not a write transaction");
}
/** Require a WRITE transaction - includes trying to promote. */
public void ensureWriteTxn() {
checkState(ACTIVE);
if ( mode != ReadWrite.WRITE ) {
boolean b = this.promote();
if ( ! b )
throw new TransactionException("Can't become a write transaction");
}
}
@Override
public boolean hasStarted() {
TxnState x = getState();
return x == INACTIVE;
}
@Override
public boolean hasFinished() {
TxnState x = getState();
return x == COMMITTED || x == ABORTED || x == END_COMMITTED || x == END_ABORTED;
}
@Override
public boolean hasFinalised() {
TxnState x = getState();
return x == END_COMMITTED || x == END_ABORTED;
}
@Override
public TxnId getTxnId() { return txnId; }
@Override
public TxnType getTxnType() { return txnType; }
@Override
public ReadWrite getMode() { return mode; }
/** Is this a READ transaction?
* Convenience operation equivalent to {@code (getMode() == READ)}
*/
@Override
public boolean isReadTxn() { return mode == ReadWrite.READ; }
/** Is this a WRITE transaction?
* Convenience operation equivalent to {@code (getMode() == WRITE)}
*/
@Override
public boolean isWriteTxn() { return mode == ReadWrite.WRITE; }
// hashCode/equality
// These must be object equality. No two transactions objects are .equals unless they are ==
private void checkWriteTxn() {
if ( ! isActiveTxn() || ! isWriteTxn() )
throw new TransactionException("Not in a write transaction");
}
// XXX Duplicate -- TransactionalComponentLifecycle
private void checkState(TxnState expected) {
TxnState s = getState();
if ( s != expected )
throw new TransactionException("Transaction is in state "+s+": expected state "+expected);
}
private void checkState(TxnState expected1, TxnState expected2) {
TxnState s = getState();
if ( s != expected1 && s != expected2 )
throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2);
}
// Avoid varargs ... undue worry?
private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) {
TxnState s = getState();
if ( s != expected1 && s != expected2 && s != expected3 )
throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3);
}
@Override
public boolean isActiveTxn() {
return getState() != INACTIVE;
}
}