blob: db091e711832dc67a8abe54c565930d80cea61af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.tdb.transaction;
import static java.lang.String.format ;
import static org.apache.jena.tdb.sys.SystemTDB.syslog ;
import static org.apache.jena.tdb.transaction.TransactionManager.TxnPoint.BEGIN ;
import static org.apache.jena.tdb.transaction.TransactionManager.TxnPoint.CLOSE ;
import java.io.File ;
import java.util.ArrayList ;
import java.util.HashSet ;
import java.util.List ;
import java.util.Set ;
import java.util.concurrent.BlockingQueue ;
import java.util.concurrent.LinkedBlockingDeque ;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.atomic.AtomicLong ;
import java.util.concurrent.atomic.AtomicReference ;
import java.util.concurrent.locks.ReadWriteLock ;
import java.util.concurrent.locks.ReentrantReadWriteLock ;
import org.apache.jena.atlas.lib.Pair ;
import org.apache.jena.atlas.logging.Log ;
import org.apache.jena.query.ReadWrite ;
import org.apache.jena.shared.Lock ;
import org.apache.jena.tdb.store.DatasetGraphTDB ;
import org.apache.jena.tdb.sys.SystemTDB ;
import org.slf4j.Logger ;
import org.slf4j.LoggerFactory ;
public class TransactionManager
{
private static boolean checking = true ;
private static Logger log = LoggerFactory.getLogger(TransactionManager.class) ;
private Set<Transaction> activeTransactions = new HashSet<>() ;
synchronized public boolean activeTransactions() { return !activeTransactions.isEmpty() ; }
// Setting this true cause the TransactionManager to keep lists of transactions
// and what has happened. Nothing is thrown away, but eventually it will
// consume too much memory.
// Record happenings.
private boolean recordHistory = false ;
/** This controls how many write transactions we batch up before
* deciding to flush the journal to the main database.
*/
// Temporarily public ....
// When improved, rename to chase down any systems directly setting it.
public static /*final*/ int QueueBatchSize = setQueueBatchSize() ;
private static int setQueueBatchSize() {
if ( SystemTDB.is64bitSystem )
return 10 ;
// On 32bit systems are memory constrained. The Java address space is
// limited to about 1.5G - the heap can not be bigger.
// So we don't do batching (change if batching is less memory hungry).
return 0 ;
}
// Records the states that a transaction goes though.
enum TxnPoint { BEGIN, COMMIT, ABORT, CLOSE, QUEUE, UNQUEUE }
private List<Pair<Transaction, TxnPoint>> transactionStateTransition ;
private void record(Transaction txn, TxnPoint state) {
if ( !recordHistory )
return ;
initRecordingState() ;
transactionStateTransition.add(new Pair<>(txn, state)) ;
}
// Statistic variables to record the maximum length of the flush queue.
int maxQueue = 0 ;
List<Transaction> commitedAwaitingFlush = new ArrayList<>() ;
static AtomicLong transactionId = new AtomicLong(1) ;
// Accessed by SysTxnState
// These must be AtomicLong
/*package*/ AtomicLong activeReaders = new AtomicLong(0) ;
/*package*/ AtomicLong activeWriters = new AtomicLong(0) ; // 0 or 1
public long getCountActiveReaders() { return activeReaders.get() ; }
public long getCountActiveWriters() { return activeWriters.get() ; }
// Misc stats (should be LongAdder / Java8?)
/*package*/ AtomicLong finishedReaders = new AtomicLong(0) ;
/*package*/ AtomicLong committedWriters = new AtomicLong(0) ;
/*package*/ AtomicLong abortedWriters = new AtomicLong(0) ;
// This is the DatasetGraphTDB for the first read-transaction created for
// a particular view. The read DatasetGraphTDB can be used by all the readers
// seeing the same view.
// A write transaction clears this when it commits; the first reader of a
// particular state creates the view datasetgraph and sets the lastreader.
private AtomicReference<DatasetGraphTDB> currentReaderView = new AtomicReference<>(null) ;
// Ensure single writer.
private Semaphore writersWaiting = new Semaphore(1, true) ;
// All transactions need a "read" lock throughout their lifetime.
// Do not confuse with read/write transactions. We need a
// "one exclusive, or many other" lock which happens to be called a ReadWriteLock
private ReadWriteLock exclusivitylock = new ReentrantReadWriteLock() ;
// Delayes enacting transactions.
private BlockingQueue<Transaction> queue = new LinkedBlockingDeque<>() ;
private DatasetGraphTDB baseDataset ;
private Journal journal ;
/*
* The order of calls is:
* 1/ transactionStarts
* 2/ readerStarts or writerStarts
* 3/ readerFinishes or writerCommits or writerAborts
* 4/ transactionFinishes
* 5/ transactionCloses
*/
private interface TSM {
void transactionStarts(Transaction txn) ;
void transactionFinishes(Transaction txn) ;
void transactionCloses(Transaction txn) ;
void readerStarts(Transaction txn) ;
void readerFinishes(Transaction txn) ;
void writerStarts(Transaction txn) ;
void writerCommits(Transaction txn) ;
void writerAborts(Transaction txn) ;
}
class TSM_Base implements TSM {
@Override public void transactionStarts(Transaction txn) {}
@Override public void transactionFinishes(Transaction txn) {}
@Override public void transactionCloses(Transaction txn) {}
@Override public void readerStarts(Transaction txn) {}
@Override public void readerFinishes(Transaction txn) {}
@Override public void writerStarts(Transaction txn) {}
@Override public void writerCommits(Transaction txn) {}
@Override public void writerAborts(Transaction txn) {}
}
class TSM_Logger extends TSM_Base {
TSM_Logger() {}
@Override public void readerStarts(Transaction txn) { log("start", txn) ; }
@Override public void readerFinishes(Transaction txn) { log("finish", txn) ; }
@Override public void writerStarts(Transaction txn) { log("begin", txn) ; }
@Override public void writerCommits(Transaction txn) { log("commit", txn) ; }
@Override public void writerAborts(Transaction txn) { log("abort", txn) ; }
}
/** More detailed */
class TSM_LoggerDebug extends TSM_Base {
TSM_LoggerDebug() {}
@Override public void readerStarts(Transaction txn) { logInternal("start", txn) ; }
@Override public void readerFinishes(Transaction txn) { logInternal("finish", txn) ; }
@Override public void writerStarts(Transaction txn) { logInternal("begin", txn) ; }
@Override public void writerCommits(Transaction txn) { logInternal("commit", txn) ; }
@Override public void writerAborts(Transaction txn) { logInternal("abort", txn) ; }
}
// Mixes stats and state variables :-(
class TSM_Counters implements TSM {
TSM_Counters() {}
@Override public void transactionStarts(Transaction txn) { activeTransactions.add(txn) ; }
@Override public void transactionFinishes(Transaction txn) { activeTransactions.remove(txn) ; }
@Override public void transactionCloses(Transaction txn) { }
@Override public void readerStarts(Transaction txn) { inc(activeReaders) ; }
@Override public void readerFinishes(Transaction txn) { dec(activeReaders) ; inc(finishedReaders); }
@Override public void writerStarts(Transaction txn) { inc(activeWriters) ; }
@Override public void writerCommits(Transaction txn) { dec(activeWriters) ; inc(committedWriters) ; }
@Override public void writerAborts(Transaction txn) { dec(activeWriters) ; inc(abortedWriters) ; }
}
// Short name: x++
static long inc(AtomicLong x) { return x.getAndIncrement() ; }
// Short name: --x
static long dec(AtomicLong x) { return x.decrementAndGet() ; }
// Transaction policy:
// TSM + WriterEnters, WriterLeaves which may use the semaphore. (+ReaderEnters, ReaderLeaves ??)
// Policy for writing back journal'ed data to the base datasetgraph
// Writes if no reader at end of writer, else queues.
// Queue cleared at en dof any transaction finding itself the only transaction.
class TSM_WriteBackEndTxn extends TSM_Base {
// Safe mode.
// Take a READ lock over the base dataset.
// Write-back takes a WRITE lock.
@Override public void readerStarts(Transaction txn) { txn.getBaseDataset().getLock().enterCriticalSection(Lock.READ) ; }
@Override public void writerStarts(Transaction txn) { txn.getBaseDataset().getLock().enterCriticalSection(Lock.READ) ; }
// Currently, the writer semaphore is managed explicitly in the main code.
@Override
public void readerFinishes(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
readerFinishesWorker(txn) ;
}
@Override
public void writerCommits(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
writerCommitsWorker(txn) ;
}
@Override
public void writerAborts(Transaction txn) {
txn.getBaseDataset().getLock().leaveCriticalSection() ;
writerAbortsWorker(txn) ;
}
}
// Policy for writing back that always writes from the writer by using a
// MRSW lock, with a write step at the end of the writer.
// Always a read loc for any active transaction (reader or the writer)
// Still use semaphore for writer entry control.
class TSM_WriterWriteBack extends TSM_Base
{
// Not implemented
}
// Policy for writing where a transaction takes an MRSW at the start.
// Semaphore for writer entry unnecessary.
class TSM_MRSW_Writer extends TSM_Base
{
// Not implemented
}
class TSM_Record extends TSM_Base {
// Later - record on one list the state transition.
@Override
public void transactionStarts(Transaction txn) { record(txn, BEGIN) ; }
@Override
public void transactionFinishes(Transaction txn) { record(txn, CLOSE) ; }
}
private TSM[] actions = new TSM[] {
new TSM_Counters() , // Must be first.
//new TSM_LoggerDebug() ,
new TSM_Logger() ,
(recordHistory ? new TSM_Record() : null ) ,
new TSM_WriteBackEndTxn() // Write back policy. Must be last.
} ;
public TransactionManager(DatasetGraphTDB dsg){
this.baseDataset = dsg ;
this.journal = Journal.create(dsg.getLocation()) ;
// LATER
// Committer c = new Committer() ;
// this.committerThread = new Thread(c) ;
// committerThread.setDaemon(true) ;
// committerThread.start() ;
}
public void closedown() {
processDelayedReplayQueue(null) ;
journal.close() ;
}
public DatasetGraphTxn begin(ReadWrite mode) {
return begin(mode, null) ;
}
public /*for testing only*/ static final boolean DEBUG = false ;
public DatasetGraphTxn begin(ReadWrite mode, String label) {
exclusivitylock.readLock().lock() ;
// Not synchronized (else blocking on semaphore will never wake up
// because Semaphore.release is inside synchronized.
// Allow only one active writer.
if ( mode == ReadWrite.WRITE ) {
// Writers take a WRITE permit from the semaphore to ensure there
// is at most one active writer, else the attempt to start the
// transaction blocks.
acquireWriterLock(true) ;
}
// entry synchronized part
return begin$(mode, label) ;
}
// If DatasetGraphTransaction has a sync lock on sConn, this
// does not need to be sync'ed. But it's possible to use some
// of the low level object directly so we'll play safe.
synchronized
private DatasetGraphTxn begin$(ReadWrite mode, String label) {
if ( mode == ReadWrite.WRITE && activeWriters.get() > 0 ) // Guard
throw new TDBTransactionException("Existing active write transaction") ;
if ( DEBUG )
switch ( mode )
{
case READ : System.out.print("r") ; break ;
case WRITE : System.out.print("w") ; break ;
}
DatasetGraphTDB dsg = baseDataset ;
// *** But, if there are pending, committed transactions, use latest.
if ( !commitedAwaitingFlush.isEmpty() ) {
if ( DEBUG )
System.out.print(commitedAwaitingFlush.size()) ;
dsg = commitedAwaitingFlush.get(commitedAwaitingFlush.size() - 1).getActiveDataset().getView() ;
} else {
if ( DEBUG )
System.out.print('_') ;
}
Transaction txn = createTransaction(dsg, mode, label) ;
log("begin$", txn) ;
DatasetGraphTxn dsgTxn = createDSGTxn(dsg, txn, mode) ;
txn.setActiveDataset(dsgTxn) ;
// Empty for READ ; only WRITE transactions have components that need notifiying.
List<TransactionLifecycle> components = dsgTxn.getTransaction().lifecycleComponents() ;
if ( mode == ReadWrite.READ ) {
// ---- Consistency check. View caching does not reset components.
if ( components.size() != 0 )
log.warn("read transaction, non-empty lifecycleComponents list") ;
}
for ( TransactionLifecycle component : components )
component.begin(dsgTxn.getTransaction()) ;
noteStartTxn(txn) ;
return dsgTxn ;
}
private Transaction createTransaction(DatasetGraphTDB dsg, ReadWrite mode, String label) {
Transaction txn = new Transaction(dsg, mode, transactionId.getAndIncrement(), label, this) ;
return txn ;
}
private DatasetGraphTxn createDSGTxn(DatasetGraphTDB dsg, Transaction txn, ReadWrite mode) {
// A read transaction (if it has no lifecycle components) can be shared over all
// read transactions at the same commit level.
// lastreader
if ( mode == ReadWrite.READ ) {
// If a READ transaction, and a previously built one is cached, use it.
DatasetGraphTDB dsgCached = currentReaderView.get() ;
if ( dsgCached != null ) {
// No components so we don't need to notify them.
// We can just reuse the storage dataset.
return new DatasetGraphTxn(dsgCached, txn) ;
}
}
DatasetGraphTxn dsgTxn = new DatasetBuilderTxn(this).build(txn, mode, dsg) ;
if ( mode == ReadWrite.READ ) {
// If a READ transaction, cache the storage view.
// This is cleared when a WRITE commits
currentReaderView.set(dsgTxn.getView()) ;
}
return dsgTxn ;
}
/* Signal a transaction has commited. The journal has a commit record
* and a sync to disk. The code here manages the inter-transaction stage
* of deciding how to play the changes back to the base data
* together with general recording of transaction details and status.
*/
synchronized
public void notifyCommit(Transaction transaction) {
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
noteTxnCommit(transaction) ;
switch ( transaction.getMode() ) {
case READ: break ;
case WRITE:
currentReaderView.set(null) ; // Clear the READ transaction cache.
releaseWriterLock();
}
}
synchronized
public void notifyAbort(Transaction transaction) {
// Transaction has done the abort on all the transactional elements.
if ( ! activeTransactions.contains(transaction) )
SystemTDB.errlog.warn("Transaction not active: "+transaction.getTxnId()) ;
noteTxnAbort(transaction) ;
switch ( transaction.getMode() )
{
case READ: break ;
case WRITE: releaseWriterLock();
}
}
private void releaseWriterLock() {
int x = writersWaiting.availablePermits() ;
if ( x != 0 )
throw new TDBTransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls") ;
writersWaiting.release() ;
}
private boolean acquireWriterLock(boolean canBlock) {
if ( ! canBlock )
return writersWaiting.tryAcquire() ;
try {
writersWaiting.acquire() ;
return true;
} catch (InterruptedException e) { throw new TDBTransactionException(e) ; }
}
/** Block until no writers are active.
* When this returns, yhis guarantees that the database is not changing
* and the jounral is flush to disk.
* <p>
* The application must call {@link #enableWriters} later.
* <p>
* This operation must not be nested (it will block).
*
* @see #tryBlockWriters()
* @see #enableWriters()
*
*/
public void blockWriters() {
acquireWriterLock(true) ;
}
/** Block until no writers are active or, optionally, return if can't at the moment.
* Return 'true' if the operation succeeded.
* <p>
* If it returns true, the application must call {@link #enableWriters} later.
*
* @see #blockWriters()
* @see #enableWriters()
*/
public boolean tryBlockWriters() {
return acquireWriterLock(false) ;
}
/** Allow writers.
* This must be used in conjunction with {@link #blockWriters()} or {@link #tryBlockWriters()}
*
* @see #blockWriters()
* @see #tryBlockWriters()
*/
public void enableWriters() {
releaseWriterLock();
}
/** Enter exclusive mode.
* <p>
* There are no active transactions on return; new transactions will be held up in 'begin'.
* Return to normal (release waiting transactions, allow new transactions)
* with {@link #finishExclusiveMode}.
* <p>
* The caller must not be inside a transaction associated with this TransactionManager.
* (The call will block waiting for that transaction to finish.)
*/
public void startExclusiveMode() {
startExclusiveMode(true);
}
/** Try to enter exclusive mode.
* If return is true, then are no active transactions on return and new transactions will be held up in 'begin'.
* If false, there is an in-progress transactions.
* Return to normal (release waiting transactions, allow new transactions)
* with {@link #finishExclusiveMode}.
* <p>
* The call must not itself be in a transaction (this call wil return false).
*/
public boolean tryExclusiveMode() {
return startExclusiveMode(false);
}
private boolean startExclusiveMode(boolean canBlock) {
if ( canBlock ) {
exclusivitylock.writeLock().lock() ;
processDelayedReplayQueue(null);
return true ;
}
boolean b = exclusivitylock.writeLock().tryLock() ;
if ( ! b ) return false ;
processDelayedReplayQueue(null);
return true ;
}
/** Return the exclusivity lock. Testing and internal use only. */
public ReadWriteLock getExclusivityLock$() { return exclusivitylock ; }
/** Return to normal (release waiting transactions, allow new transactions).
* Must be paired with an earlier {@link #startExclusiveMode}.
*/
public void finishExclusiveMode() {
exclusivitylock.writeLock().unlock() ;
}
/** The stage in a commit after committing - make the changes permanent in the base data */
private void enactTransaction(Transaction transaction)
{
// Really, really do it!
for ( TransactionLifecycle x : transaction.lifecycleComponents() )
{
x.commitEnact(transaction) ;
x.commitClearup(transaction) ;
}
transaction.signalEnacted() ;
}
/** Try to flush the delayed write queue - only happens if there are no active transactions */
synchronized
public void flush() {
processDelayedReplayQueue(null) ;
}
// -- The main operations to undertake when a transaction finishes.
// Called from TSM_WriteBackEndTxn but the worker code is shere so all
// related code, including queue flushing is close together.
private void readerFinishesWorker(Transaction txn) {
if ( queue.size() >= QueueBatchSize )
processDelayedReplayQueue(txn) ;
}
private void writerAbortsWorker(Transaction txn) {
if ( queue.size() >= QueueBatchSize )
processDelayedReplayQueue(txn) ;
}
private void writerCommitsWorker(Transaction txn) {
if ( activeReaders.get() == 0 && queue.size() >= QueueBatchSize ) {
// Can commit immediately.
// Ensure the queue is empty though.
// Could simply add txn to the commit queue and do it that way.
if ( log() ) log("Commit immediately", txn) ;
// Currently, all we need is
// JournalControl.replay(txn) ;
// because that plays queued transactions.
// But for long term generallity, at the cost of one check of the journal size
// we do this sequence.
processDelayedReplayQueue(txn) ;
enactTransaction(txn) ;
JournalControl.replay(txn) ;
} else {
// Can't write back to the base database at the moment.
commitedAwaitingFlush.add(txn) ;
maxQueue = Math.max(commitedAwaitingFlush.size(), maxQueue) ;
if ( log() ) log("Add to pending queue", txn) ;
queue.add(txn) ;
}
}
private void processDelayedReplayQueue(Transaction txn)
{
// Can we do work?
if ( activeReaders.get() != 0 || activeWriters.get() != 0 ) {
if ( queue.size() > 0 && log() )
log(format("Pending transactions: R=%s / W=%s", activeReaders, activeWriters), txn) ;
return ;
}
if ( DEBUG ) {
if ( queue.size() > 0 )
System.out.print("!"+queue.size()+"!") ;
}
if ( DEBUG ) checkNodesDatJrnl("1", txn) ;
if ( queue.size() == 0 && txn != null )
// Nothing to do - journal should be empty.
return ;
if ( log() )
log("Start flush delayed commits", txn) ;
// Drop the cached reader view so that next time it is recreated
// against the updated database.
currentReaderView.set(null) ;
while (queue.size() > 0) {
// Currently, replay is replay everything
// so looping on a per-transaction basis is
// pointless but harmless.
try {
Transaction txn2 = queue.take() ;
if ( txn2.getMode() == ReadWrite.READ )
continue ;
if ( log() )
log(" Flush delayed commit of "+txn2.getLabel(), txn) ;
if ( DEBUG ) checkNodesDatJrnl("2", txn) ;
checkReplaySafe() ;
enactTransaction(txn2) ;
commitedAwaitingFlush.remove(txn2) ;
} catch (InterruptedException ex)
{ Log.fatal(this, "Interruped!", ex) ; }
}
checkReplaySafe() ;
if ( DEBUG ) checkNodesDatJrnl("3", txn) ;
// Whole journal to base database
JournalControl.replay(journal, baseDataset) ;
if ( DEBUG ) checkNodesDatJrnl("4", txn) ;
checkReplaySafe() ;
if ( log() )
log("End flush delayed commits", txn) ;
}
private void checkNodesDatJrnl(String label, Transaction txn) {
if ( txn != null ) {
String x = txn.getBaseDataset().getLocation().getPath(label + ": nodes.dat-jrnl") ;
long len = new File(x).length() ;
if ( len != 0 )
log("nodes.dat-jrnl: not empty", txn) ;
}
}
private void checkReplaySafe() {
if ( ! checking ) return ;
if ( activeReaders.get() != 0 || activeWriters.get() != 0 )
log.error("There are now active transactions") ;
}
synchronized
public void notifyClose(Transaction txn) {
if ( txn.getState() == TxnState.ACTIVE )
{
String x = txn.getBaseDataset().getLocation().getDirectoryPath() ;
syslog.warn("close: Transaction not commited or aborted: Transaction: "+txn.getTxnId()+" @ "+x) ;
// Force abort then close
txn.abort() ;
txn.close() ;
return ;
}
noteTxnClose(txn) ;
}
private void noteStartTxn(Transaction transaction) {
switch (transaction.getMode())
{
case READ : readerStarts(transaction) ; break ;
case WRITE : writerStarts(transaction) ; break ;
}
transactionStarts(transaction) ;
}
private void noteTxnCommit(Transaction transaction) {
switch (transaction.getMode())
{
case READ : readerFinishes(transaction) ; break ;
case WRITE : writerCommits(transaction) ; break ;
}
transactionFinishes(transaction) ;
exclusivitylock.readLock().unlock() ;
}
private void noteTxnAbort(Transaction transaction) {
switch (transaction.getMode())
{
case READ : readerFinishes(transaction) ; break ;
case WRITE : writerAborts(transaction) ; break ;
}
transactionFinishes(transaction) ;
exclusivitylock.readLock().unlock() ;
}
private void noteTxnClose(Transaction transaction) {
transactionCloses(transaction) ;
}
// ---- Recording
/** Get recording state */
public boolean recording() { return recordHistory ; }
/** Set recording on or off */
public void recording(boolean flag) {
recordHistory = flag ;
if ( recordHistory )
initRecordingState() ;
}
/** Clear all recording state - does not clear stats */
public void clearRecordingState() {
initRecordingState() ;
transactionStateTransition.clear() ;
}
private void initRecordingState() {
if ( transactionStateTransition == null )
transactionStateTransition = new ArrayList<>() ;
}
public Journal getJournal() {
return journal ;
}
// ---- Logging
// Choose log output once when this object is created.
private final boolean logstate = (syslog.isDebugEnabled() || log.isDebugEnabled()) ;
private boolean log() {
return logstate ;
}
private void log(String msg, Transaction txn) {
if ( ! log() )
return ;
if ( txn == null )
logger().debug("<No txn>: "+msg) ;
else
logger().debug(txn.getLabel()+": "+msg) ;
}
private void logInternal(String action, Transaction txn) {
if ( ! log() )
return ;
String txnStr = ( txn == null ) ? "<null>" : txn.getLabel() ;
//System.err.printf(format("%6s %s -- %s", action, txnStr, state())) ;
logger().debug(format("%6s %s -- %s", action, txnStr, state())) ;
}
private static Logger logger() {
if ( syslog.isDebugEnabled() )
return syslog ;
else
return log ;
}
synchronized
public SysTxnState state() {
return new SysTxnState(this) ;
}
// LATER.
class Committer implements Runnable {
@Override
public void run() {
for ( ;; ) {
// Wait until the reader count goes to zero.
// This wakes up for every transation but maybe
// able to play several transactions at once (later).
try {
Transaction txn = queue.take() ;
// This takes a Write lock on the DSG - this is where it blocks.
JournalControl.replay(txn) ;
synchronized (TransactionManager.this) {
commitedAwaitingFlush.remove(txn) ;
}
} catch (InterruptedException ex)
{ Log.fatal(this, "Interruped!", ex) ; }
}
}
}
private void transactionStarts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionStarts(txn) ;
}
private void transactionFinishes(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionFinishes(txn) ;
}
private void transactionCloses(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.transactionCloses(txn) ;
}
private void readerStarts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.readerStarts(txn) ;
}
private void readerFinishes(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.readerFinishes(txn) ;
}
private void writerStarts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.writerStarts(txn) ;
}
private void writerCommits(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.writerCommits(txn) ;
}
private void writerAborts(Transaction txn) {
for ( TSM tsm : actions )
if ( tsm != null )
tsm.writerAborts(txn) ;
}
}