| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.jena.dboe.transaction.txn; |
| |
| import static org.apache.jena.dboe.transaction.txn.TxnState.ABORTED; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.ACTIVE; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.COMMIT; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.COMMITTED; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.DETACHED; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.END_ABORTED; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.END_COMMITTED; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.INACTIVE; |
| import static org.apache.jena.dboe.transaction.txn.TxnState.PREPARE; |
| |
| import java.util.List ; |
| import java.util.Objects ; |
| import java.util.concurrent.atomic.AtomicReference ; |
| |
| import org.apache.jena.query.ReadWrite ; |
| import org.apache.jena.query.TxnType; |
| |
| /** |
| * A transaction as the composition of actions on components. |
| * Works in conjunction with the TransactionCoordinator |
| * to provide the transaction lifecycle. |
| * @see TransactionCoordinator |
| * @see TransactionalComponent |
| */ |
| final |
| public class Transaction implements TransactionInfo { |
| // Using an AtomicReference<TxnState> requires that |
| // TransactionalComponentLifecycle.internalComplete |
| // frees the thread local for the threadTxn, otherwise memory |
| // usage grows. If a plain member variable is used slow growth |
| // is still seen. |
| // Nulling txnMgr and clearing components stops that slow growth. |
| |
| private TransactionCoordinator txnMgr ; |
| private final TxnId txnId ; |
| private final List<SysTrans> components ; |
| |
| // Using an AtomicReference makes this observable from the outside. |
| // It also allow for multithreaded transactions (later). |
| private final AtomicReference<TxnState> state = new AtomicReference<>() ; |
| //private TxnState state ; |
| private final long dataVersion ; |
| private final TxnType txnType ; |
| private ReadWrite mode ; |
| |
| public Transaction(TransactionCoordinator txnMgr, TxnType txnType, ReadWrite readWrite, TxnId txnId, long dataVersion, List<SysTrans> components) { |
| Objects.requireNonNull(txnMgr) ; |
| Objects.requireNonNull(txnId) ; |
| Objects.requireNonNull(readWrite) ; |
| Objects.requireNonNull(components) ; |
| this.txnMgr = txnMgr ; |
| this.txnId = txnId ; |
| this.txnType = txnType ; |
| this.mode = readWrite ; |
| this.dataVersion = dataVersion ; |
| this.components = components ; |
| setState(INACTIVE) ; |
| } |
| |
| // /*package*/ void resetDataVersion(long dataVersion) { |
| // this.dataVersion = dataVersion; |
| // } |
| |
| /*package*/ void setState(TxnState newState) { |
| state.set(newState) ; |
| } |
| |
| @Override |
| public TxnState getState() { |
| return state.get() ; |
| } |
| |
| /** |
| * Each transaction is allocated a serialization point by the transaction |
| * coordinator. Normally, this is related to this number and it increases |
| * over time as the data changes. Two readers can have the same |
| * serialization point - they are working with the same view of the data. |
| */ |
| @Override |
| public long getDataVersion() { |
| return dataVersion ; |
| } |
| |
| public void begin() { |
| checkState(INACTIVE); |
| components.forEach((c) -> c.begin()) ; |
| setState(ACTIVE) ; |
| } |
| |
| private boolean promoteReadCommitted() { |
| if ( txnType == TxnType.READ_COMMITTED_PROMOTE ) return true; |
| if ( txnType == TxnType.READ_PROMOTE ) return false; |
| return false; |
| } |
| |
| public boolean promote() { |
| return promote(promoteReadCommitted()); |
| } |
| |
| public boolean promote(boolean readCommitted) { |
| checkState(ACTIVE); |
| if ( txnType == TxnType.READ ) |
| return false; |
| boolean b = txnMgr.promoteTxn(this, readCommitted) ; |
| if ( !b ) |
| return false ; |
| mode = ReadWrite.WRITE; |
| return true ; |
| } |
| |
| /*package*/ void promoteComponents() { |
| // Call back from the Transaction coordinator during promote. |
| components.forEach((c) -> { |
| if ( ! c.promote() ) |
| throw new TransactionException("Failed to promote") ; |
| }) ; |
| mode = ReadWrite.WRITE ; |
| } |
| |
| public void notifyUpdate() { |
| checkState(ACTIVE) ; |
| if ( mode == ReadWrite.READ ) { |
| promote(promoteReadCommitted()) ; |
| mode = ReadWrite.WRITE ; |
| } |
| } |
| |
| public void prepare() { |
| checkState(ACTIVE) ; |
| if ( mode == ReadWrite.WRITE ) |
| txnMgr.executePrepare(this) ; |
| setState(PREPARE); |
| } |
| |
| public void commit() { |
| // XXX Split into READ and WRITE forms. |
| TxnState s = getState(); |
| if ( s == ACTIVE ) |
| // Auto exec prepare(). |
| prepare() ; |
| checkState(PREPARE) ; |
| setState(COMMIT) ; |
| switch(mode) { |
| case WRITE: |
| txnMgr.executeCommit(this, |
| ()->{components.forEach((c) -> c.commit()) ; } , |
| ()->{components.forEach((c) -> c.commitEnd()) ; } ) ; |
| break ; |
| case READ: |
| // Different lifecycle? |
| txnMgr.executeCommit(this, |
| ()->{components.forEach((c) -> c.commit()) ; } , |
| ()->{components.forEach((c) -> c.commitEnd()) ; } ) ; |
| break ; |
| } |
| setState(COMMITTED) ; |
| } |
| |
| public void abort() { |
| abort$(); |
| endInternal() ; |
| } |
| |
| private void abort$() { |
| // Split into READ and WRITE forms. |
| checkState(ACTIVE, ABORTED) ; |
| // Includes notification start/finish |
| txnMgr.executeAbort(this, ()-> { components.forEach((c) -> c.abort()) ; }) ; |
| setState(ABORTED) ; |
| } |
| |
| public void end() { |
| txnMgr.notifyEndStart(this) ; |
| if ( isWriteTxn() && getState() == ACTIVE ) { |
| //Log.warn(this, "Write transaction with no commit() or abort() before end()"); |
| // Just the abort process. |
| abort$() ; |
| endInternal() ; |
| throw new TransactionException("Write transaction with no commit() or abort() before end() - forced abort") ; |
| } |
| endInternal() ; |
| txnMgr.notifyEndFinish(this) ; |
| txnMgr = null ; |
| //components.clear() ; |
| } |
| |
| private void endInternal() { |
| if ( hasFinalised() ) |
| return ; |
| // Called once, at the first abort/commit/end. |
| txnMgr.notifyCompleteStart(this); |
| components.forEach((c) -> c.complete()) ; |
| txnMgr.completed(this) ; |
| if ( getState() == COMMITTED ) |
| setState(END_COMMITTED); |
| else |
| setState(END_ABORTED); |
| txnMgr.notifyCompleteFinish(this); |
| } |
| |
| /*package*/ List<SysTrans> getComponents() { |
| return components ; |
| } |
| |
| /*package*/ void detach() { |
| checkState(ACTIVE,PREPARE) ; |
| setState(DETACHED) ; |
| } |
| |
| /*package*/ void attach() { |
| checkState(DETACHED) ; |
| setState(ACTIVE) ; |
| } |
| |
| public void requireWriteTxn() { |
| checkState(ACTIVE) ; |
| if ( mode != ReadWrite.WRITE ) |
| throw new TransactionException("Not a write transaction") ; |
| } |
| |
| @Override |
| public boolean hasStarted() { |
| TxnState x = getState() ; |
| return x == INACTIVE ; |
| } |
| |
| @Override |
| public boolean hasFinished() { |
| TxnState x = getState() ; |
| return x == COMMITTED || x == ABORTED || x == END_COMMITTED || x == END_ABORTED ; |
| } |
| |
| @Override |
| public boolean hasFinalised() { |
| TxnState x = getState() ; |
| return x == END_COMMITTED || x == END_ABORTED ; |
| } |
| |
| @Override |
| public TxnId getTxnId() { return txnId ; } |
| |
| @Override |
| public TxnType getTxnType() { return txnType ; } |
| |
| @Override |
| public ReadWrite getMode() { return mode ; } |
| |
| /** Is this a READ transaction? |
| * Convenience operation equivalent to {@code (getMode() == READ)} |
| */ |
| @Override |
| public boolean isReadTxn() { return mode == ReadWrite.READ ; } |
| |
| /** Is this a WRITE transaction? |
| * Convenience operation equivalent to {@code (getMode() == WRITE)} |
| */ |
| @Override |
| public boolean isWriteTxn() { return mode == ReadWrite.WRITE ; } |
| |
| // hashCode/equality |
| // These must be object equality. No two transactions objects are .equals unless they are == |
| |
| private void checkWriteTxn() { |
| if ( ! isActiveTxn() || ! isWriteTxn() ) |
| throw new TransactionException("Not in a write transaction") ; |
| } |
| |
| // XXX Duplicate -- TransactionalComponentLifecycle |
| private void checkState(TxnState expected) { |
| TxnState s = getState(); |
| if ( s != expected ) |
| throw new TransactionException("Transaction is in state "+s+": expected state "+expected) ; |
| } |
| |
| private void checkState(TxnState expected1, TxnState expected2) { |
| TxnState s = getState(); |
| if ( s != expected1 && s != expected2 ) |
| throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+" or "+expected2) ; |
| } |
| |
| // Avoid varargs ... undue worry? |
| private void checkState(TxnState expected1, TxnState expected2, TxnState expected3) { |
| TxnState s = getState(); |
| if ( s != expected1 && s != expected2 && s != expected3 ) |
| throw new TransactionException("Transaction is in state "+s+": expected state "+expected1+", "+expected2+" or "+expected3) ; |
| } |
| |
| @Override |
| public boolean isActiveTxn() { |
| return getState() != INACTIVE ; |
| } |
| } |
| |