blob: 4f5d554f4a9a85197886a1bb429e784315fbe5f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.system;
import static org.apache.jena.query.ReadWrite.WRITE ;
import java.util.concurrent.Semaphore ;
import java.util.concurrent.atomic.AtomicLong ;
import org.apache.jena.atlas.lib.Lib ;
import org.apache.jena.query.ReadWrite ;
import org.apache.jena.sparql.JenaTransactionException ;
import org.apache.jena.sparql.core.Transactional ;
/** A MR+SW transactional Counter */
public class TxnCounter implements Transactional {
// ---- TransactionCoordinator.
// Semaphore to implement "Single Active Writer" - independent of readers
// This is not reentrant.
private Semaphore writersWaiting = new Semaphore(1, true) ;
private void releaseWriterLock() {
int x = writersWaiting.availablePermits() ;
if ( x != 0 )
throw new JenaTransactionException("TransactionCoordinator: Probably mismatch of enable/disableWriter calls") ;
writersWaiting.release() ;
}
private boolean acquireWriterLock(boolean canBlock) {
if ( ! canBlock )
return writersWaiting.tryAcquire() ;
try {
writersWaiting.acquire() ;
return true;
} catch (InterruptedException e) { throw new JenaTransactionException(e) ; }
}
// ---- TransactionCoordinator.
// Transaction state.
static class IntegerState {
long txnValue ;
public IntegerState(long v) { this.txnValue = v ; }
}
// Global state - the exterally visible value and the starting point for any
// transaction. This is set to a new value when a write transaction commits.
private final AtomicLong value = new AtomicLong(-1712) ;
// ---- Transaction state.
// The per-transaction state (inside a transaction). Null outside a transaction
// cleared by commit or abort in a write transaction.
private ThreadLocal<IntegerState> txnValue = ThreadLocal.withInitial(()->null) ;
// The kind of transaction.
private ThreadLocal<ReadWrite> txnMode = ThreadLocal.withInitial(()->null) ;
// Syncrhonization for making changes.
private Object txnLifecycleLock = new Object() ;
public TxnCounter(long x) {
value.set(x) ;
}
@Override
public void begin(ReadWrite readWrite) {
begin(readWrite, true) ;
}
public void begin(ReadWrite readWrite, boolean canBlock) {
// Ensure a single writer.
// (Readers never block at this point.)
if ( readWrite == WRITE ) {
// Writers take a WRITE permit from the semaphore to ensure there
// is at most one active writer, else the attempt to start the
// transaction blocks.
// Released by in commit/abort.
acquireWriterLock(canBlock) ;
}
// at this point,
// One writer or one of many readers.
synchronized(txnLifecycleLock) {
if ( txnMode.get() != null )
throw new JenaTransactionException("Already in a transaction") ;
IntegerState state = new IntegerState(value.get()) ;
txnValue.set(state) ;
txnMode.set(readWrite);
}
}
@Override
public void commit() {
checkTxn();
if ( isWriteTxn() ) {
// Set global.
value.set(getDataState().txnValue) ;
txnValue.set(null);
releaseWriterLock();
}
endOnce() ;
}
@Override
public void abort() {
checkTxn();
if ( isWriteTxn() ) {
txnValue.set(null);
releaseWriterLock();
}
endOnce() ;
}
@Override
public boolean isInTransaction() {
ReadWrite mode = txnMode.get() ;
if ( mode == null )
// Remove it - avoid holding the memory.
txnMode.remove();
return mode != null ;
}
@Override
public void end() {
if ( ! isInTransaction() )
return ;
if ( isWriteTxn() && txnValue.get() != null )
throw new JenaTransactionException("No commit or abort before end for a write transaction") ;
endOnce() ;
}
private void endOnce() {
if ( isActiveTxn() ) {
synchronized(txnLifecycleLock) {
txnValue.remove();
txnMode.remove();
}
}
}
/** Increment the value inside a write transaction */
public void inc() {
checkWriteTxn() ;
IntegerState ts = getDataState() ;
ts.txnValue++ ;
}
/** Decrement the value inside a write transaction */
public void dec() {
checkWriteTxn() ;
IntegerState ts = getDataState() ;
ts.txnValue-- ;
}
/** Set the value inside a write transaction, return the old value*/
public long set(long x) {
checkWriteTxn() ;
IntegerState ts = getDataState() ;
long v = ts.txnValue ;
ts.txnValue = x ;
return v ;
}
/** Return the current value in a transaction.
* Must be inside a transaction.
* @see #get
*/
public long read() {
checkTxn();
return getDataState().txnValue ;
}
/** Return the current value.
* If inside a transaction, return the transaction view of the value.
* If not in a transaction return the state value (effectively
* a read transaction, optimized by the fact that reading the
* {@code TransInteger} state is atomic).
*/
public long get() {
if ( isActiveTxn() )
return getDataState().txnValue ;
else
return value.get() ;
}
/** Read the current global state (that is, the last committed value) outside a transaction. */
public long value() {
return value.get() ;
}
// These two operations not clear the thread local if we are not in a transaction.
// This is a potential memory leak.
// Use "isInTransaction" to read and clear.
/** Is this a write transaction? Shoudl be called inside a transaction. */
private boolean isWriteTxn() {
ReadWrite rw = txnMode.get() ;
if ( rw == null )
throw new JenaTransactionException(Lib.classShortName(this.getClass())+".isWriteTxn called outside a transaction") ;
return txnMode.get() == ReadWrite.WRITE ;
}
private boolean isActiveTxn() {
ReadWrite rw = txnMode.get() ;
return rw != null ;
}
private IntegerState getDataState() {
return txnValue.get() ;
}
private void checkWriteTxn() {
if ( ! isWriteTxn() )
throw new JenaTransactionException("Not in a write transaction") ;
}
private void checkTxn() {
if ( ! isActiveTxn() )
throw new JenaTransactionException("Not in a transaction") ;
}
}