blob: 196e079089f829de9d06a1bdad0edcca65905976 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.dboe.transaction;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.jena.atlas.io.IO;
import org.apache.jena.atlas.lib.FileOps;
import org.apache.jena.atlas.lib.InternalErrorException;
import org.apache.jena.atlas.logging.Log;
import org.apache.jena.dboe.transaction.txn.ComponentId;
import org.apache.jena.dboe.transaction.txn.TransactionalComponentLifecycle;
import org.apache.jena.dboe.transaction.txn.TxnId;
import org.apache.jena.query.ReadWrite;
/** A transaction component with an integer supporting MR+SW (=one writer AND many readers).
*/
public class TransInteger extends TransactionalComponentLifecycle<TransInteger.IntegerState> {
private final AtomicLong value = new AtomicLong(-1712);
private final String filename;
/** Per transaction state - and per thread safe because we subclass
* TransactionalComponentLifecycle
*/
static class IntegerState {
long txnValue;
public IntegerState(long v) { this.txnValue = v; }
}
static int counter = 0;
/** In-memory, non persistent, transactional integer */
public TransInteger() { this(0L); }
/** In-memory, non persistent, transactional integer */
public TransInteger(long v) {
this(v, ComponentId.allocLocal());
}
/** In-memory, non persistent, transactional integer */
public TransInteger(long v, ComponentId componentId) {
this(null, componentId);
value.set(v);
}
/** Persistent, transactional integer. The persistent state is held in
* filename. When first initialized, the value is 0L.
* @param filename Persistent state
* @param cid Component id
*/
public TransInteger(String filename, ComponentId cid) {
super(cid);
this.filename = filename;
// Set the value now for "fast read" transactions.
readLocation();
}
private void readLocation() {
if ( filename != null ) {
if ( ! FileOps.exists(filename) ) {
value.set(0L);
writeLocation();
return;
}
long x = read(filename);
value.set(x);
}
}
private void writeLocation() {
writeLocation(value.get());
}
private void writeLocation(long value) {
if ( filename != null ) {
write(filename, value);
}
}
//-- Read/write the value
// This should really be checksum'ed or other internal check to make sure IO worked.
private static long read(String filename) {
try {
String str = IO.readWholeFileAsUTF8(filename);
if ( str.endsWith("\n") ) {
str = str.substring(0, str.length()-1);
}
str = str.trim();
return Long.parseLong(str);
}
catch (IOException ex) {
Log.error(TransInteger.class, "IOException: " + ex.getMessage(), ex);
IO.exception(ex);
}
catch (NumberFormatException ex) {
Log.error(TransInteger.class, "NumberformatException: " + ex.getMessage());
throw new InternalErrorException(ex);
}
// Not reached.
return Long.MIN_VALUE;
}
private static void write(String filename, long value) {
try { IO.writeStringAsUTF8(filename, Long.toString(value)); }
catch (IOException ex) {}
catch (NumberFormatException ex) {}
}
private boolean recoveryAction = false;
@Override
public void startRecovery() {
recoveryAction = false;
}
@Override
public void recover(ByteBuffer ref) {
long x = ref.getLong();
value.set(x);
recoveryAction = true;
}
@Override
public void finishRecovery() {
if ( recoveryAction )
writeLocation();
// Leave true as a record.
}
@Override
public void cleanStart() {
recoveryAction = false;
}
/** Set the value, return the old value*/
public void inc() {
requireWriteTxn();
IntegerState ts = getDataState();
ts.txnValue++;
}
/** Set the value, return the old value*/
public long set(long x) {
requireWriteTxn();
IntegerState ts = getDataState();
long v = ts.txnValue;
ts.txnValue = x;
return v;
}
/** Return the current value in a transaction. s*/
public long read() {
checkTxn();
return getDataState().txnValue;
}
/** Return the current value.
* If inside a transaction, return the transaction view of the value.
* If not in a transaction return the state value (effectively
* a read transaction, optimized by the fact that reading the
* {@code TransInteger} state is atomic).
*/
public long get() {
if ( super.isActiveTxn() )
return getDataState().txnValue;
else
return value.get();
}
/** Read the current global state (that is, the last committed value) outside a transaction. */
public long value() {
return value.get();
}
@Override
protected IntegerState _begin(ReadWrite readWrite, TxnId txnId) {
return createState();
}
private IntegerState createState() {
return new IntegerState(value.get());
}
@Override
protected IntegerState _promote(TxnId txnId, IntegerState state) {
return createState();
}
@Override
protected ByteBuffer _commitPrepare(TxnId txnId, IntegerState state) {
if ( isReadTxn() )
return null;
ByteBuffer x = ByteBuffer.allocate(Long.BYTES);
x.putLong(state.txnValue);
return x;
}
@Override
protected void _commit(TxnId txnId, IntegerState state) {
if ( isReadTxn() )
return;
writeLocation(state.txnValue);
}
@Override
protected void _commitEnd(TxnId txnId, IntegerState state) {
if ( isReadTxn() )
return;
value.set(state.txnValue);
}
@Override
protected void _abort(TxnId txnId, IntegerState state) {
// Nothing
}
@Override
protected void _complete(TxnId txnId, IntegerState state) {
// Nothing
}
@Override
protected void _shutdown() {
}
@Override
public String toString() {
return String.valueOf(super.getComponentId());
}
}