| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.jena.dboe.transaction.txn; |
| |
| import static org.apache.jena.dboe.transaction.txn.journal.JournalEntryType.UNDO; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.channels.ClosedByInterruptException; |
| import java.util.*; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Consumer; |
| |
| import org.apache.jena.atlas.logging.FmtLog; |
| import org.apache.jena.atlas.logging.Log; |
| import org.apache.jena.dboe.base.file.FileException; |
| import org.apache.jena.dboe.base.file.Location; |
| import org.apache.jena.dboe.sys.Sys; |
| import org.apache.jena.dboe.transaction.txn.journal.Journal; |
| import org.apache.jena.dboe.transaction.txn.journal.JournalEntry; |
| import org.apache.jena.query.ReadWrite; |
| import org.apache.jena.query.TxnType; |
| import org.slf4j.Logger; |
| |
| /** |
| * One {@code TransactionCoordinator} per group of {@link TransactionalComponent}s. |
| * {@link TransactionalComponent}s can not be shared across TransactionCoordinators. |
| * <p> |
| * This is a general engine although tested and most used for multiple-reader |
| * and single-writer (MR+SW). {@link TransactionalComponentLifecycle} provides the |
| * per-thread style. |
| * <p> |
| * Contrast to MRSW: multiple-reader or single-writer. |
| * <h3>Block writers</h3> |
| * Block until no writers are active. |
| * When this returns, this guarantees that the database is not changing |
| * and the journal is flushed to disk. |
| * <p> |
| * See {@link #blockWriters()}, {@link #enableWriters()}, {@link #execAsWriter(Runnable)} |
| * <h3>Exclusive mode</h3> |
| * Exclusive mode is when the current thread is the only active code : no readers, no writers. |
| * <p> |
| * See {@link #startExclusiveMode()}/{@link #tryExclusiveMode()} {@link #finishExclusiveMode()}, {@link #execExclusive(Runnable)} |
| * |
| * @see Transaction |
| * @see TransactionalComponent |
| * @see TransactionalSystem |
| */ |
| final |
| public class TransactionCoordinator { |
| private static Logger SysLog = Sys.syslog; |
| private static Logger SysErr = Sys.errlog; |
| |
| private final Journal journal; |
| // Lock on configuration changes. |
| private boolean configurable = true; |
| |
| private final ComponentGroup components = new ComponentGroup(); |
| private final List<TransactionListener> listeners = new ArrayList<>(); |
| |
| // Components |
| private List<ShutdownHook> shutdownHooks; |
| private TxnIdGenerator txnIdGenerator = TxnIdFactory.txnIdGenSimple; |
| |
| private QuorumGenerator quorumGenerator = null; |
| //private QuorumGenerator quorumGenerator = (m) -> components; |
| |
| // Semaphore to implement "Single Active Writer" - independent of readers |
| // This is not reentrant. |
| private Semaphore writersWaiting = new Semaphore(1, true); |
| |
| // All transaction need a "read" lock through out their lifetime. |
| // Do not confuse with read/write transactions. We need a |
| // "one exclusive, or many other" lock which happens to be called ReadWriteLock |
| // See also {@code lock} which protects the datastructures during transaction management. |
| private ReadWriteLock exclusivitylock = new ReentrantReadWriteLock(); |
| |
| // The version is the serialization point for a transaction. |
| // All transactions on the same view of the data get the same serialization point. |
| |
| // A read transaction can be promoted if writer does not start |
| // This TransactionCoordinator provides Serializable, Read-lock-free |
| // execution. With no item locking, a read can only be promoted |
| // if no writer started since the reader started or if it is "read committed", |
| // seeing changes made since it started and comitted at the poiont of promotion. |
| |
| /* The version of the data - incremented when transaction commits. |
| * This is the version with repest to the last commited transaction. |
| * Aborts do not cause the data version to advance. |
| * This counter never goes backwards. |
| */ |
| private final AtomicLong dataVersion = new AtomicLong(0); |
| |
| // Coordinator wide lock object. |
| private Object coordinatorLock = new Object(); |
| |
| @FunctionalInterface |
| public interface ShutdownHook { void shutdown(); } |
| |
| /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ |
| public TransactionCoordinator(Location location) { |
| this(Journal.create(location)); |
| } |
| |
| /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ |
| public TransactionCoordinator(Journal journal) { |
| this(journal, null , new ArrayList<>()); |
| } |
| |
| /** Create a TransactionCoordinator, initially with {@link TransactionalComponent} in the ComponentGroup */ |
| public TransactionCoordinator(Journal journal, List<TransactionalComponent> components) { |
| this(journal, components , new ArrayList<>()); |
| } |
| |
| // /** Create a TransactionCoordinator, initially with no associated {@link TransactionalComponent}s */ |
| // public TransactionCoordinator(Location journalLocation) { |
| // this(Journal.create(journalLocation), new ArrayList<>() , new ArrayList<>()); |
| // } |
| |
| private TransactionCoordinator(Journal journal, List<TransactionalComponent> txnComp, List<ShutdownHook> shutdownHooks) { |
| this.journal = journal; |
| this.shutdownHooks = new ArrayList<>(shutdownHooks); |
| if ( txnComp != null ) |
| components.addAll(txnComp); |
| } |
| |
| /** Add a {@link TransactionalComponent}. |
| * Safe to call at any time but it is good practice is to add all the |
| * components before any transactions start. |
| * Internally, the coordinator ensures the add will safely happen but it |
| * does not add the component to existing transactions. |
| * This must be setup before recovery is attempted. |
| */ |
| public TransactionCoordinator add(TransactionalComponent elt) { |
| checklAllowModification(); |
| components.add(elt); |
| return this; |
| } |
| |
| /** |
| * Remove a {@link TransactionalComponent}. |
| * @see #add |
| */ |
| public TransactionCoordinator remove(TransactionalComponent elt) { |
| checklAllowModification(); |
| components.remove(elt.getComponentId()); |
| return this; |
| } |
| |
| public TransactionCoordinator addListener(TransactionListener listener) { |
| checklAllowModification(); |
| listeners.add(listener); |
| return this; |
| } |
| |
| public TransactionCoordinator removeListener(TransactionListener listener) { |
| checklAllowModification(); |
| listeners.remove(listener); |
| return this; |
| } |
| |
| /** |
| * Perform modification of this {@code TransactionCoordiator} after it has been |
| * started. |
| * <p> |
| * This operation enters {@linkplain #startExclusiveMode() exclusive mode}, releases the |
| * configuration lock, then calls the {@code action}. On exit from the action, |
| * it resets the configuration lock, and exits exclusive mode. |
| * <p> |
| * Do not call inside a transaction, it may cause a deadlock. |
| * <p> |
| * Use with care! |
| */ |
| public void modifyConfig(Runnable action) { |
| try { |
| startExclusiveMode(); |
| configurable = true; |
| action.run(); |
| } finally { |
| configurable = false; |
| finishExclusiveMode(); |
| } |
| } |
| |
| /** Call the action for each listener */ |
| private void listeners(Consumer<TransactionListener> action) { |
| listeners.forEach(x->action.accept(x)); |
| } |
| |
| /** |
| * Add a shutdown hook. Shutdown is not guaranteed to be called |
| * and hence hooks may not get called. |
| */ |
| public void add(TransactionCoordinator.ShutdownHook hook) { |
| checklAllowModification(); |
| shutdownHooks.add(hook); |
| } |
| |
| /** Remove a shutdown hook */ |
| public void remove(TransactionCoordinator.ShutdownHook hook) { |
| checklAllowModification(); |
| shutdownHooks.remove(hook); |
| } |
| |
| public void setQuorumGenerator(QuorumGenerator qGen) { |
| checklAllowModification(); |
| this.quorumGenerator = qGen; |
| } |
| |
| public void start() { |
| checklAllowModification(); |
| recovery(); |
| configurable = false; |
| } |
| |
| private /*public*/ void recovery() { |
| |
| Iterator<JournalEntry> iter = journal.entries(); |
| if ( ! iter.hasNext() ) { |
| components.forEachComponent(c -> c.cleanStart()); |
| return; |
| } |
| |
| SysLog.info("Journal recovery start"); |
| components.forEachComponent(c -> c.startRecovery()); |
| |
| // Group to commit |
| |
| List<JournalEntry> entries = new ArrayList<>(); |
| |
| iter.forEachRemaining( entry -> { |
| switch(entry.getType()) { |
| case ABORT : |
| entries.clear(); |
| break; |
| case COMMIT : |
| recover(entries); |
| entries.clear(); |
| break; |
| case REDO : case UNDO : |
| entries.add(entry); |
| break; |
| } |
| }); |
| |
| components.forEachComponent(c -> c.finishRecovery()); |
| journal.reset(); |
| SysLog.info("Journal recovery end"); |
| } |
| |
| private void recover(List<JournalEntry> entries) { |
| entries.forEach(e -> { |
| if ( e.getType() == UNDO ) { |
| Log.warn(TransactionCoordinator.this, "UNDO entry : not handled"); |
| return; |
| } |
| ComponentId cid = e.getComponentId(); |
| ByteBuffer bb = e.getByteBuffer(); |
| // find component. |
| TransactionalComponent c = components.findComponent(cid); |
| if ( c == null ) { |
| Log.warn(TransactionCoordinator.this, "No component for "+cid); |
| return; |
| } |
| c.recover(bb); |
| }); |
| } |
| |
| public void setTxnIdGenerator(TxnIdGenerator generator) { |
| this.txnIdGenerator = generator; |
| } |
| |
| public Journal getJournal() { |
| return journal; |
| } |
| |
| public Location getLocation() { |
| return getJournal().getLocation(); |
| } |
| |
| public TransactionCoordinatorState detach(Transaction txn) { |
| txn.detach(); |
| TransactionCoordinatorState coordinatorState = new TransactionCoordinatorState(txn); |
| components.forEach((id, c) -> { |
| SysTransState s = c.detach(); |
| coordinatorState.componentStates.put(id, s); |
| } ); |
| // The txn still counts as "active" for tracking purposes below. |
| return coordinatorState; |
| } |
| |
| public void attach(TransactionCoordinatorState coordinatorState) { |
| Transaction txn = coordinatorState.transaction; |
| txn.attach(); |
| coordinatorState.componentStates.forEach((id, obj) -> { |
| components.findComponent(id).attach(obj); |
| }); |
| } |
| |
| public void shutdown() { |
| shutdown(false); |
| } |
| |
| public void shutdown(boolean silent) { |
| if ( coordinatorLock == null ) |
| return; |
| if ( ! silent && countActive() > 0 ) |
| FmtLog.warn(SysErr, "Transactions active: W=%d, R=%d", countActiveWriter(), countActiveReaders()); |
| components.forEach((id, c) -> c.shutdown()); |
| shutdownHooks.forEach((h)-> h.shutdown()); |
| coordinatorLock = null; |
| journal.close(); |
| } |
| |
| // Can modifications be made? |
| private void checklAllowModification() { |
| if ( ! configurable ) |
| throw new TransactionException("TransactionCoordinator configuration is locked"); |
| } |
| |
| // Is this TransactionCoordinator up and running? |
| private void checkActive() { |
| if ( configurable ) |
| throw new TransactionException("TransactionCoordinator has not been started"); |
| checkNotShutdown(); |
| } |
| |
| private void checkNotShutdown() { |
| if ( coordinatorLock == null ) |
| throw new TransactionException("TransactionCoordinator has been shutdown"); |
| } |
| |
| private void releaseWriterLock() { |
| int x = writersWaiting.availablePermits(); |
| if ( x != 0 ) |
| throw new TransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls"); |
| writersWaiting.release(); |
| } |
| |
| /** Acquire the writer lock - return true if succeeded */ |
| private boolean acquireWriterLock(boolean canBlock) { |
| if ( ! canBlock ) |
| return writersWaiting.tryAcquire(); |
| try { |
| writersWaiting.acquire(); |
| return true; |
| } catch (InterruptedException e) { throw new TransactionException(e); } |
| } |
| |
| /** Enter exclusive mode; block if necessary. |
| * There are no active transactions on return; new transactions will be held up in 'begin'. |
| * Return to normal (release waiting transactions, allow new transactions) |
| * with {@link #finishExclusiveMode}. |
| * <p> |
| * Do not call inside an existing transaction. |
| */ |
| public void startExclusiveMode() { |
| startExclusiveMode(true); |
| } |
| |
| /** Try to enter exclusive mode. |
| * If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'. |
| * If false, there were in-progress transactions. |
| * Return to normal (release waiting transactions, allow new transactions) |
| * with {@link #finishExclusiveMode}. |
| * <p> |
| * Do not call inside an existing transaction. |
| */ |
| public boolean tryExclusiveMode() { |
| return tryExclusiveMode(false); |
| } |
| |
| /** Try to enter exclusive mode. |
| * If return is true, then there are no active transactions on return and new transactions will be held up in 'begin'. |
| * If false, there were in-progress transactions. |
| * Return to normal (release waiting transactions, allow new transactions) |
| * with {@link #finishExclusiveMode}. |
| * <p> |
| * Do not call inside an existing transaction. |
| * @param canBlock Allow the operation block and wait for the exclusive mode lock. |
| */ |
| public boolean tryExclusiveMode(boolean canBlock) { |
| return startExclusiveMode(canBlock); |
| } |
| |
| private boolean startExclusiveMode(boolean canBlock) { |
| if ( canBlock ) { |
| exclusivitylock.writeLock().lock(); |
| return true; |
| } |
| return exclusivitylock.writeLock().tryLock(); |
| } |
| |
| /** Return to normal (release waiting transactions, allow new transactions). |
| * Must be paired with an earlier {@link #startExclusiveMode}. |
| */ |
| public void finishExclusiveMode() { |
| exclusivitylock.writeLock().unlock(); |
| } |
| |
| /** Execute an action in exclusive mode. This method can block. |
| * Equivalent to: |
| * <pre> |
| * startExclusiveMode(); |
| * try { action.run(); } |
| * finally { finishExclusiveMode(); } |
| * </pre> |
| * |
| * @param action |
| */ |
| public void execExclusive(Runnable action) { |
| startExclusiveMode(); |
| try { action.run(); } |
| finally { finishExclusiveMode(); } |
| } |
| |
| /** Block until no writers are active. |
| * When this returns, this guarantees that the database is not changing |
| * and the journal is flushed to disk. |
| * <p> |
| * The application must call {@link #enableWriters} later. |
| * <p> |
| * This operation must not be nested (it will block). |
| * |
| * @see #tryBlockWriters() |
| * @see #enableWriters() |
| */ |
| public void blockWriters() { |
| acquireWriterLock(true); |
| } |
| |
| /** Try to block all writers, or return if can't at the moment. |
| * <p> |
| * Unlike a write transction, there is no associated transaction. |
| * <p> |
| * If it returns true, the application must call {@link #enableWriters} later. |
| * |
| * @see #blockWriters() |
| * @see #enableWriters() |
| * |
| * @return true if the operation succeeded and writers are blocked |
| */ |
| public boolean tryBlockWriters() { |
| return tryBlockWriters(false); |
| } |
| |
| /** |
| * Block until no writers are active, optionally blocking or returning if can't at the moment. |
| * <p> |
| * Unlike a write transction, there is no associated transaction. |
| * <p> |
| * If it returns true, the application must call {@link #enableWriters} later. |
| * @param canBlock |
| * @return true if the operation succeeded and writers are blocked |
| */ |
| public boolean tryBlockWriters(boolean canBlock) { |
| return acquireWriterLock(canBlock); |
| } |
| /** Allow writers. |
| * This must be used in conjunction with {@link #blockWriters()} or {@link #tryBlockWriters()} |
| * |
| * @see #blockWriters() |
| * @see #tryBlockWriters() |
| */ |
| public void enableWriters() { |
| releaseWriterLock(); |
| } |
| |
| /** Execute an action in as if a Write but no write transaction started. |
| * This method can block. |
| * <p> |
| * Equivalent to: |
| * <pre> |
| * blockWriters(); |
| * try { action.run(); } |
| * finally { enableWriters(); } |
| * </pre> |
| * |
| * @param action |
| */ |
| public void execAsWriter(Runnable action) { |
| blockWriters(); |
| try { action.run(); } |
| finally { enableWriters(); } |
| } |
| |
| /** Start a transaction. This may block. */ |
| public Transaction begin(TxnType txnType) { |
| return begin(txnType, true); |
| } |
| |
| /** |
| * Start a transaction. |
| * Returns null if this operation would block. |
| * Readers can start at any time. |
| * A single writer policy is currently imposed so a "begin(WRITE)" may block. |
| */ |
| public Transaction begin(TxnType txnType, boolean canBlock) { |
| Objects.nonNull(txnType); |
| checkActive(); |
| |
| if ( false /* bounceWritersAtTheMoment */) { |
| // Is this stil needed? |
| // Switching happens as copy, not in-place compaction (at the moment). |
| // so we don't need a write-reject mode currently. |
| if ( txnType == TxnType.WRITE ) { |
| throw new TransactionException("Writers currently being rejected"); |
| } |
| } |
| |
| if ( canBlock ) |
| exclusivitylock.readLock().lock(); |
| else { |
| if ( ! exclusivitylock.readLock().tryLock() ) |
| return null; |
| } |
| |
| // Readers never block. |
| if ( txnType == TxnType.WRITE ) { |
| // Writers take a WRITE permit from the semaphore to ensure there |
| // is at most one active writer, else the attempt to start the |
| // transaction blocks. |
| // Released by in notifyCommitFinish/notifyAbortFinish |
| boolean b = acquireWriterLock(canBlock); |
| if ( !b ) { |
| exclusivitylock.readLock().unlock(); |
| return null; |
| } |
| } |
| Transaction transaction = begin$(txnType); |
| startActiveTransaction(transaction); |
| transaction.begin(); |
| notifyBegin(transaction); |
| return transaction; |
| } |
| |
| private Transaction begin$(TxnType txnType) { |
| synchronized(coordinatorLock) { |
| // Inside the lock - check again. |
| checkActive(); |
| // Thread safe part of 'begin' |
| // Allocate the transaction serialization point. |
| TxnId txnId = txnIdGenerator.generate(); |
| List<SysTrans> sysTransList = new ArrayList<>(); |
| Transaction transaction = new Transaction(this, txnType, initialMode(txnType), txnId, dataVersion.get(), sysTransList); |
| |
| ComponentGroup txnComponents = chooseComponents(this.components, txnType); |
| |
| try { |
| txnComponents.forEachComponent(elt -> { |
| SysTrans sysTrans = new SysTrans(elt, transaction, txnId); |
| sysTransList.add(sysTrans); }) ; |
| // Calling each component must be inside the lock |
| // so that a transaction does not commit overlapping with setup. |
| // If it did, different components might end up starting from |
| // different start states of the overall system. |
| txnComponents.forEachComponent(elt -> elt.begin(transaction)); |
| } catch(Throwable ex) { |
| // Careful about incomplete. |
| //abort(); |
| //complete(); |
| throw ex; |
| } |
| return transaction; |
| } |
| } |
| |
| // Detemine ReadWrite for the transaction start from initial TxnType. |
| private static ReadWrite initialMode(TxnType txnType) { |
| return TxnType.initial(txnType); |
| } |
| |
| private ComponentGroup chooseComponents(ComponentGroup components, TxnType txnType) { |
| if ( quorumGenerator == null ) |
| return components; |
| ComponentGroup cg = quorumGenerator.genQuorum(txnType); |
| if ( cg == null ) |
| return components; |
| cg.forEach((id, c) -> { |
| TransactionalComponent tcx = components.findComponent(id); |
| if ( ! tcx.equals(c) ) |
| SysLog.warn("TransactionalComponent not in TransactionCoordinator's ComponentGroup"); |
| }); |
| if ( SysLog.isDebugEnabled() ) |
| SysLog.debug("Custom ComponentGroup for transaction "+txnType+": size="+cg.size()+" of "+components.size()); |
| return cg; |
| } |
| |
| /** Attempt to promote a transaction from READ mode to WRITE mode based. |
| * Whether intervening commits are seen is determined by the boolean flag. |
| * Return true if the transaction is already a writer. |
| */ |
| /*package*/ boolean executePromote(Transaction transaction, boolean readCommittedPromotion) { |
| if ( transaction.getMode() == ReadWrite.WRITE ) |
| return true; |
| // Even if promotion of TxnType.READ allowed, this ability is usually rejected |
| // by the transaction system around it. e.g. TransactionalBase. |
| if ( transaction.getTxnType() == TxnType.READ ) |
| throw new TransactionException("promote: can't promote a READ transaction"); |
| |
| notifyPromoteStart(transaction); |
| boolean b = promoteTxn$(transaction, readCommittedPromotion); |
| notifyPromoteFinish(transaction); |
| return b; |
| } |
| |
| private boolean promoteTxn$(Transaction transaction, boolean readCommittedPromotion) { |
| // == Read committed path. |
| if ( transaction.getTxnType() == TxnType.READ_COMMITTED_PROMOTE ) { |
| if ( ! promotionWaitForWriters() ) |
| return false; |
| // Now single writer. |
| synchronized(coordinatorLock) { |
| try { |
| transaction.promoteComponents(); |
| // Because we want to see the new state of the data. |
| // transaction.resetDataVersion(dataVersion.get()); |
| } catch (TransactionException ex) { |
| try { transaction.abort(); } catch(RuntimeException ex2) {} |
| releaseWriterLock(); |
| return false; |
| } |
| promoteActiveTransaction(transaction); |
| } |
| return true; |
| } |
| |
| // == Read with no committed allowed |
| // Check epoch is current - no "read committed". |
| // Check now outside synchronized (will need to check again to confirm) for speed |
| // and to allow for "no wait for writes". |
| |
| if ( ! checkNoInterveningCommits(transaction) ) |
| return false; |
| |
| // Take writer lock. |
| if ( ! promotionWaitForWriters() ) |
| // Failed to become a writer. |
| return false; |
| |
| // Now a proto-writer. We need to confirm when inside the synchronized. |
| synchronized(coordinatorLock) { |
| // Not read committed. |
| // Need to check the data version once we are the writer and all previous |
| // writers have committed or aborted. |
| // Has there been an writer active since the transaction started? |
| |
| if ( ! checkNoInterveningCommits(transaction) ) { |
| // Failed to promote. |
| releaseWriterLock(); |
| return false; |
| } |
| |
| // ... we have now got the writer lock ... |
| try { |
| transaction.promoteComponents(); |
| // No need to reset the data version because strict isolation. |
| } catch (TransactionException ex) { |
| try { transaction.abort(); } catch(RuntimeException ex2) {} |
| releaseWriterLock(); |
| return false; |
| } |
| promoteActiveTransaction(transaction); |
| } |
| return true; |
| } |
| |
| private boolean checkNoInterveningCommits(Transaction transaction) { |
| long txnEpoch = transaction.getDataVersion(); // The transaction-start point. |
| long currentEpoch = dataVersion.get(); // The current data serialization point. |
| |
| if ( txnEpoch < currentEpoch ) |
| // The data has changed and "read committed" not allowed. |
| // We can reject now. |
| return false; |
| return true; |
| } |
| |
| /** Whether to wait for writers when trying to promote */ |
| private static final boolean promotionWaitForWriters = true; |
| |
| private boolean promotionWaitForWriters() { |
| if ( promotionWaitForWriters ) |
| return acquireWriterLock(true); |
| else |
| return acquireWriterLock(false); |
| } |
| |
| // Called once by Transaction after the action of commit()/abort() or end() |
| /** Signal that the transaction has finished. */ |
| /*package*/ void completed(Transaction transaction) { |
| finishActiveTransaction(transaction); |
| journal.reset(); |
| notifyEnd(transaction); |
| } |
| |
| // Internally, an APi call "commit" is "prepare then commit". |
| |
| /*package*/ void executePrepare(Transaction transaction) { |
| // Do here because it needs access to the journal. |
| notifyPrepareStart(transaction); |
| transaction.getComponents().forEach(sysTrans -> { |
| ByteBuffer data = sysTrans.commitPrepare(); |
| if ( data != null ) { |
| PrepareState s = new PrepareState(sysTrans.getComponentId(), data); |
| journal.write(s); |
| } |
| }); |
| notifyPrepareFinish(transaction); |
| } |
| |
| /*package*/ void executeCommit(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) { |
| notifyCommitStart(transaction); |
| if ( transaction.getMode() == ReadWrite.READ ) { |
| |
| //[1746] |
| //executeCommitReader(); |
| // No commit on components, all "end". |
| // Make abort the same? |
| finish.run(); |
| notifyCommitFinish(transaction); |
| return; |
| } |
| journal.startWrite(); |
| try { |
| executeCommitWriter(transaction, commit, finish, sysabort); |
| journal.commitWrite(); |
| } catch (TransactionException ex) { |
| throw ex; |
| } catch (Throwable th) { |
| throw th; |
| } finally { journal.endWrite(); } |
| notifyCommitFinish(transaction); |
| } |
| |
| private void executeCommitWriter(Transaction transaction, Runnable commit, Runnable finish, Runnable sysabort) { |
| synchronized(coordinatorLock) { |
| try { |
| // Simulate a Thread.interrupt during I/O. |
| // if ( true ) |
| // throw new FileException(new ClosedByInterruptException()); |
| |
| // *** COMMIT POINT |
| journal.writeJournal(JournalEntry.COMMIT); |
| journal.sync(); |
| // *** COMMIT POINT |
| } |
| // catch (ClosedByInterruptException ex) {} |
| // Some low level system error - probably a sign of something serious like disk error. |
| catch(FileException ex) { |
| if ( ex.getCause() instanceof ClosedByInterruptException ) { |
| // Thread interrupt during java I/O. |
| // File was closed by java.nio. |
| // Reopen - this truncates to the last write start position. |
| journal.reopen(); |
| // This call should clear up the transaction state. |
| rollback(transaction, sysabort); |
| SysLog.warn("Thread interrupt during I/O in 'commit' : executed transaction rollback: "+ex.getMessage()); |
| throw new TransactionException("Thread interrupt during I/O in 'commit' : transaction rollback.", ex); |
| } |
| if ( isIOException(ex) ) |
| SysErr.warn("IOException during 'commit' : transaction may have committed. Attempting rollback: "+ex.getMessage()); |
| else |
| SysErr.warn("Exception during 'commit' : transaction may have committed. Attempting rollback. Details:",ex); |
| if ( abandonTxn(transaction, sysabort) ) { |
| SysErr.warn("Transaction rollback"); |
| throw new TransactionException("Exception during 'commit' - transaction rollback.", ex); |
| } |
| // Very bad. (This have been dealt with already and should get to here.) |
| SysErr.error("Transaction rollback failed. System unstable."+ |
| "\nPlease contact users@jena.apache.org, giving details of the environment and this incident."); |
| throw new Error("Exception during 'rollback' - System unstable.", ex); |
| } |
| catch (Throwable ex) { |
| SysErr.warn("Unexpected Throwable during 'commit' : transaction may have committed. Attempting rollback: ",ex); |
| if ( abandonTxn(transaction, sysabort) ) { |
| SysErr.warn("Transaction rollback"); |
| throw new TransactionException("Exception during 'commit' - transaction rollback.", ex); |
| } |
| // Very bad. (This should not happen.) |
| SysErr.error("Transaction rollback failed. System unstable."); |
| throw new TransactionException("Exception during 'rollback' - System unstable.", ex); |
| } |
| |
| // Now run the Transactions commit actions. |
| commit.run(); |
| journal.truncate(0); |
| // and tell the Transaction it's finished. |
| finish.run(); |
| // Bump global serialization point |
| advanceDataVersion(); |
| } |
| } |
| |
| // Inside the global transaction start/commit lock. |
| private void advanceDataVersion() { |
| dataVersion.incrementAndGet(); |
| } |
| |
| /** Test whether the thread is interrupted and if it is, abort the transaction. */ |
| private void abandonIfInterruped(Transaction txn, Runnable sysabort, String msg) { |
| // Clears interrupted status |
| if (Thread.interrupted()) { |
| abandonTxn(txn, sysabort); |
| Thread.currentThread().interrupt(); |
| throw new TransactionException(msg); |
| } |
| } |
| |
| /** |
| * Try to abort, including removing the journal entries (including commit if written) |
| * Return true for succeeded and false for throwable, state unknown. |
| */ |
| private boolean abandonTxn(Transaction txn, Runnable sysabort ) { |
| try { |
| journal.abortWrite(); |
| rollback(txn, sysabort); |
| return true; |
| } catch (Throwable th) { |
| SysErr.warn("Exception during system rollback", th); |
| return false; |
| } |
| } |
| |
| private void rollback(Transaction txn, Runnable sysabort) { |
| txn.setState(TxnState.ACTIVE); |
| sysabort.run(); |
| txn.setState(TxnState.ABORTED); |
| } |
| |
| private boolean isIOException(Throwable ex) { |
| while (ex != null) { |
| if ( ex instanceof IOException ) |
| return true; |
| ex = ex.getCause(); |
| } |
| return false; |
| } |
| |
| /*package*/ void executeAbort(Transaction transaction, Runnable abort) { |
| notifyAbortStart(transaction); |
| abort.run(); |
| notifyAbortFinish(transaction); |
| } |
| |
| // Active transactions. |
| private Set<Transaction> activeTransactions = ConcurrentHashMap.newKeySet(); |
| private AtomicLong activeTransactionCount = new AtomicLong(0); |
| private AtomicLong activeReadersCount = new AtomicLong(0); |
| private AtomicLong activeWritersCount = new AtomicLong(0); |
| |
| private void startActiveTransaction(Transaction transaction) { |
| synchronized(coordinatorLock) { |
| // Use lock to ensure all the counters move together. |
| // Thread safe - we have not let the Transaction object out yet. |
| countBegin.incrementAndGet(); |
| switch(transaction.getMode()) { |
| case READ: countBeginRead.incrementAndGet(); activeReadersCount.incrementAndGet() ; break ; |
| case WRITE: countBeginWrite.incrementAndGet(); activeWritersCount.incrementAndGet() ; break ; |
| } |
| activeTransactionCount.incrementAndGet(); |
| activeTransactions.add(transaction); |
| } |
| } |
| |
| private void promoteActiveTransaction(Transaction transaction) { |
| // Called for a real promote as READ-> WRITE |
| activeReadersCount.decrementAndGet(); |
| activeWritersCount.incrementAndGet(); |
| } |
| |
| private void finishActiveTransaction(Transaction transaction) { |
| synchronized(coordinatorLock) { |
| // Idempotent. |
| boolean x = activeTransactions.remove(transaction); |
| if ( ! x ) |
| return; |
| countFinished.incrementAndGet(); |
| activeTransactionCount.decrementAndGet(); |
| switch(transaction.getMode()) { |
| case READ: activeReadersCount.decrementAndGet(); break ; |
| case WRITE: activeWritersCount.decrementAndGet(); break ; |
| } |
| } |
| exclusivitylock.readLock().unlock(); |
| } |
| |
| public long countActiveReaders() { return activeReadersCount.get(); } |
| public long countActiveWriter() { return activeWritersCount.get(); } |
| public long countActive() { return activeTransactionCount.get(); } |
| |
| // notify*Start/Finish called round each transaction lifecycle step |
| |
| private void notifyBegin(Transaction transaction) { |
| listeners(x -> x.notifyTxnStart(transaction)); |
| } |
| |
| private void notifyEnd(Transaction transaction) { |
| listeners(x -> x.notifyTxnFinish(transaction)); |
| } |
| |
| private void notifyPromoteStart(Transaction transaction) { |
| listeners(x -> x.notifyPromoteStart(transaction)); |
| } |
| |
| private void notifyPromoteFinish(Transaction transaction) { |
| listeners(x -> x.notifyPromoteFinish(transaction)); |
| } |
| |
| private void notifyPrepareStart(Transaction transaction) { |
| listeners(x -> x.notifyPrepareStart(transaction)); |
| } |
| |
| private void notifyPrepareFinish(Transaction transaction) { |
| listeners(x -> x.notifyPrepareFinish(transaction)); |
| } |
| |
| // Writers released here - can happen because of commit() or abort(). |
| |
| private void notifyCommitStart(Transaction transaction) { |
| listeners(x -> x.notifyCommitStart(transaction)); |
| } |
| |
| private void notifyCommitFinish(Transaction transaction) { |
| listeners(x->x.notifyCommitFinish(transaction)); |
| if ( transaction.getMode() == ReadWrite.WRITE ) |
| releaseWriterLock(); |
| } |
| |
| private void notifyAbortStart(Transaction transaction) { |
| listeners(x->x.notifyAbortStart(transaction)); |
| } |
| |
| private void notifyAbortFinish(Transaction transaction) { |
| listeners(x->x.notifyAbortFinish(transaction)); |
| if ( transaction.getMode() == ReadWrite.WRITE ) |
| releaseWriterLock(); |
| } |
| |
| /*package*/ void notifyEndStart(Transaction transaction) { |
| listeners(x->x.notifyEndStart(transaction)); |
| } |
| |
| /*package*/ void notifyEndFinish(Transaction transaction) { |
| listeners(x->x.notifyEndFinish(transaction)); |
| } |
| |
| // Called by Transaction once at the end of first commit()/abort() or end() |
| |
| /*package*/ void notifyCompleteStart(Transaction transaction) { |
| listeners(x -> x.notifyCompleteStart(transaction)); |
| } |
| |
| /*package*/ void notifyCompleteFinish(Transaction transaction) { |
| listeners(x -> x.notifyCompleteFinish(transaction)); |
| } |
| |
| // Coordinator state. |
| private final AtomicLong countBegin = new AtomicLong(0); |
| |
| private final AtomicLong countBeginRead = new AtomicLong(0); |
| |
| private final AtomicLong countBeginWrite = new AtomicLong(0); |
| |
| private final AtomicLong countFinished = new AtomicLong(0); |
| |
| // Access counters |
| public long countBegin() { return countBegin.get(); } |
| |
| public long countBeginRead() { return countBeginRead.get(); } |
| |
| public long countBeginWrite() { return countBeginWrite.get(); } |
| |
| public long countFinished() { return countFinished.get(); } |
| } |