blob: b2d0a7a993405813cde633851c0dc29a9dd5bdf9 [file] [log] [blame]
/*-
* Copyright (C) 2002, 2018, Oracle and/or its affiliates. All rights reserved.
*
* This file was distributed by Oracle as part of a version of Oracle Berkeley
* DB Java Edition made available at:
*
* http://www.oracle.com/technetwork/database/database-technologies/berkeleydb/downloads/index.html
*
* Please see the LICENSE file included in the top-level directory of the
* appropriate version of Oracle Berkeley DB Java Edition for a copy of the
* license and additional information.
*/
package com.sleepycat.je;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_CACHED_GETS;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_CACHE_LAST;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_CACHE_SIZE;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_CACHE_VALUE;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_GETS;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_RANGE_MAX;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_RANGE_MIN;
import static com.sleepycat.je.dbi.SequenceStatDefinition.SEQUENCE_STORED_VALUE;
import java.io.Closeable;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.logging.Level;
import java.util.logging.Logger;
import com.sleepycat.je.dbi.SequenceStatDefinition;
import com.sleepycat.je.log.LogUtils;
import com.sleepycat.je.txn.Locker;
import com.sleepycat.je.txn.LockerFactory;
import com.sleepycat.je.utilint.IntStat;
import com.sleepycat.je.utilint.LoggerUtils;
import com.sleepycat.je.utilint.LongStat;
import com.sleepycat.je.utilint.StatGroup;
/**
* A Sequence handle is used to manipulate a sequence record in a
* database. Sequence handles are opened using the {@link
* com.sleepycat.je.Database#openSequence Database.openSequence} method.
*/
public class Sequence implements Closeable {
private static final byte FLAG_INCR = ((byte) 0x1);
private static final byte FLAG_WRAP = ((byte) 0x2);
private static final byte FLAG_OVER = ((byte) 0x4);
/* Allocation size for the record data. */
private static final int MAX_DATA_SIZE = 50;
/* Version of the format for fields stored in the sequence record. */
private static final byte CURRENT_VERSION = 1;
/* A sequence is a unique record in a database. */
private final Database db;
private final DatabaseEntry key;
/* Persistent fields. */
private boolean wrapAllowed;
private boolean increment;
private boolean overflow;
private long rangeMin;
private long rangeMax;
private long storedValue;
/* Handle-specific fields. */
private final int cacheSize;
private long cacheValue;
private long cacheLast;
private int nGets;
private int nCachedGets;
private TransactionConfig autoCommitConfig;
private final Logger logger;
/*
* The cache holds the range of values [cacheValue, cacheLast], which is
* the same as [cacheValue, storedValue) at the time the record is written.
* At store time, cacheLast is set to one before (after) storedValue.
*
* storedValue may be used by other Sequence handles with separate caches.
* storedValue is always the next value to be returned by any handle that
* runs out of cached values.
*/
/**
* Opens a sequence handle, adding the sequence record if appropriate.
*
* @throws IllegalArgumentException via Database.openSequence.
*
* @throws IllegalStateException via Database.openSequence.
*/
Sequence(Database db,
Transaction txn,
DatabaseEntry key,
SequenceConfig config)
throws SequenceNotFoundException, SequenceExistsException {
if (db.getDbImpl().getSortedDuplicates()) {
throw new UnsupportedOperationException
("Sequences not supported in databases configured for " +
"duplicates");
}
SequenceConfig useConfig = (config != null) ?
config : SequenceConfig.DEFAULT;
if (useConfig.getRangeMin() >= useConfig.getRangeMax()) {
throw new IllegalArgumentException
("Minimum sequence value must be less than the maximum");
}
if (useConfig.getInitialValue() > useConfig.getRangeMax() ||
useConfig.getInitialValue() < useConfig.getRangeMin()) {
throw new IllegalArgumentException
("Initial sequence value is out of range");
}
if (useConfig.getRangeMin() >
useConfig.getRangeMax() - useConfig.getCacheSize()) {
throw new IllegalArgumentException
("The cache size is larger than the sequence range");
}
if (useConfig.getAutoCommitNoSync()) {
autoCommitConfig =
DbInternal.getDefaultTxnConfig(db.getEnvironment()).clone();
autoCommitConfig.overrideDurability(Durability.COMMIT_NO_SYNC);
} else {
/* Use the environment's default transaction config. */
autoCommitConfig = null;
}
this.db = db;
this.key = copyEntry(key);
logger = db.getEnvironment().getNonNullEnvImpl().getLogger();
/* Perform an auto-commit transaction to create the sequence. */
Locker locker = null;
Cursor cursor = null;
OperationStatus status = OperationStatus.NOTFOUND;
try {
locker = LockerFactory.getReadableLocker(
db, txn, false /*readCommitedIsolation*/);
cursor = new Cursor(db, locker, null);
boolean sequenceExists = readData(cursor, null);
boolean isWritableLocker = !db.getConfig().getTransactional() ||
(locker.isTransactional() &&
!DbInternal.getNonNullEnvImpl(db.getEnvironment()).
isReplicated());
if (sequenceExists) {
if (useConfig.getAllowCreate() &&
useConfig.getExclusiveCreate()) {
throw new SequenceExistsException
("ExclusiveCreate=true and the sequence record " +
"already exists.");
}
} else {
if (useConfig.getAllowCreate()) {
if (!isWritableLocker) {
if (cursor != null) {
cursor.close();
}
locker.operationEnd(OperationStatus.SUCCESS);
locker = LockerFactory.getWritableLocker
(db.getEnvironment(),
txn,
db.getDbImpl().isInternalDb(),
db.isTransactional(),
db.getDbImpl().isReplicated(),
autoCommitConfig);
cursor = new Cursor(db, locker, null);
}
/* Get the persistent fields from the config. */
rangeMin = useConfig.getRangeMin();
rangeMax = useConfig.getRangeMax();
increment = !useConfig.getDecrement();
wrapAllowed = useConfig.getWrap();
storedValue = useConfig.getInitialValue();
/*
* To avoid dependence on SerializableIsolation, try
* putNoOverwrite first. If it fails, then try to get an
* existing record.
*/
status = cursor.putNoOverwrite(key, makeData());
if (!readData(cursor, null)) {
/* A retry loop should be performed here. */
throw new IllegalStateException
("Sequence record removed during openSequence.");
}
status = OperationStatus.SUCCESS;
} else {
throw new SequenceNotFoundException
("AllowCreate=false and the sequence record " +
"does not exist.");
}
}
} finally {
if (cursor != null) {
cursor.close();
}
if (locker != null) {
locker.operationEnd(status);
}
}
/*
* cacheLast is initialized such that the cache will be considered
* empty the first time get() is called.
*/
cacheSize = useConfig.getCacheSize();
cacheValue = storedValue;
cacheLast = increment ? (storedValue - 1) : (storedValue + 1);
}
/**
* Closes a sequence. Any unused cached values are lost.
*
* <p>The sequence handle may not be used again after this method has
* been called, regardless of the method's success or failure.</p>
*
* <p>WARNING: To guard against memory leaks, the application should
* discard all references to the closed handle. While BDB makes an effort
* to discard references from closed objects to the allocated memory for an
* environment, this behavior is not guaranteed. The safe course of action
* for an application is to discard all references to closed BDB
* objects.</p>
*
* @throws EnvironmentFailureException if an unexpected, internal or
* environment-wide failure occurs.
*/
public void close()
throws DatabaseException {
/* Defined only for DB compatibility and possible future use. */
}
/**
* <p>Returns the next available element in the sequence and changes the
* sequence value by <code>delta</code>. The value of <code>delta</code>
* must be greater than zero. If there are enough cached values in the
* sequence handle then they will be returned. Otherwise the next value
* will be fetched from the database and incremented (decremented) by
* enough to cover the <code>delta</code> and the next batch of cached
* values.</p>
*
* <p>This method is synchronized to protect updating of the cached value,
* since multiple threads may share a single handle. Multiple handles for
* the same database/key may be used to increase concurrency.</p>
*
* <p>The <code>txn</code> handle must be null if the sequence handle was
* opened with a non-zero cache size.</p>
*
* <p>For maximum concurrency, a non-zero cache size should be specified
* prior to opening the sequence handle, the <code>txn</code> handle should
* be <code>null</code>, and {@link
* com.sleepycat.je.SequenceConfig#setAutoCommitNoSync
* SequenceConfig.setAutoCommitNoSync} should be called to disable log
* flushes.</p>
*
* @param txn For a transactional database, an explicit transaction may be
* specified, or null may be specified to use auto-commit. For a
* non-transactional database, null must be specified.
*
* @param delta the amount by which to increment or decrement the sequence
*
* @return the next available element in the sequence
*
* @throws SequenceOverflowException if the end of the sequence is reached
* and wrapping is not configured.
*
* @throws SequenceIntegrityException if the sequence record has been
* deleted.
*
* @throws OperationFailureException if one of the <a
* href="../je/OperationFailureException.html#writeFailures">Write
* Operation Failures</a> occurs.
*
* @throws EnvironmentFailureException if an unexpected, internal or
* environment-wide failure occurs.
*
* @throws IllegalArgumentException if the delta is less than or equal to
* zero, or larger than the size of the sequence's range.
*/
public synchronized long get(Transaction txn, int delta)
throws DatabaseException {
/* Check parameters, being careful of overflow. */
if (delta <= 0) {
throw new IllegalArgumentException
("Sequence delta must be greater than zero");
}
if (rangeMin > rangeMax - delta) {
throw new IllegalArgumentException
("Sequence delta is larger than the range");
}
/* Status variables for tracing. */
boolean cached = true;
boolean wrapped = false;
/*
* Determine whether we have exceeded the cache. The cache size is
* always <= Integer.MAX_VALUE, so we don't have to worry about
* overflow here as long as we subtract the two long values first.
*/
if ((increment && delta > ((cacheLast - cacheValue) + 1)) ||
(!increment && delta > ((cacheValue - cacheLast) + 1))) {
cached = false;
/*
* We need to allocate delta or cacheSize values, whichever is
* larger, by incrementing or decrementing the stored value by
* adjust.
*/
int adjust = (delta > cacheSize) ? delta : cacheSize;
/* Perform an auto-commit transaction to update the sequence. */
Locker locker = null;
Cursor cursor = null;
OperationStatus status = OperationStatus.NOTFOUND;
try {
locker = LockerFactory.getWritableLocker
(db.getEnvironment(),
txn,
db.getDbImpl().isInternalDb(),
db.isTransactional(),
db.getDbImpl().isReplicated(),
// autoTxnIsReplicated
autoCommitConfig);
cursor = new Cursor(db, locker, null);
/* Get the existing record. */
readDataRequired(cursor, LockMode.RMW);
/* If we would have wrapped when not allowed, overflow. */
if (overflow) {
throw new SequenceOverflowException
("Sequence overflow " + storedValue);
}
/*
* Handle wrapping. The range size can be larger than a long
* can hold, so to avoid arithmetic overflow we use BigInteger
* arithmetic. Since we are going to write, the BigInteger
* overhead is acceptable.
*/
BigInteger availBig;
if (increment) {
/* Available amount: rangeMax - storedValue */
availBig = BigInteger.valueOf(rangeMax).
subtract(BigInteger.valueOf(storedValue));
} else {
/* Available amount: storedValue - rangeMin */
availBig = BigInteger.valueOf(storedValue).
subtract(BigInteger.valueOf(rangeMin));
}
if (availBig.compareTo(BigInteger.valueOf(adjust)) < 0) {
/* If availBig < adjust then availBig fits in an int. */
int availInt = (int) availBig.longValue();
if (availInt < delta) {
if (wrapAllowed) {
/* Wrap to the opposite range end point. */
storedValue = increment ? rangeMin : rangeMax;
wrapped = true;
} else {
/* Signal an overflow next time. */
overflow = true;
adjust = 0;
}
} else {
/*
* If the delta fits in the cache available, don't wrap
* just to allocate the full cacheSize; instead,
* allocate as much as is available.
*/
adjust = availInt;
}
}
/* Negate the adjustment for decrementing. */
if (!increment) {
adjust = -adjust;
}
/* Set the stored value one past the cached amount. */
storedValue += adjust;
/* Write the new stored value. */
cursor.put(key, makeData());
status = OperationStatus.SUCCESS;
} finally {
if (cursor != null) {
cursor.close();
}
if (locker != null) {
locker.operationEnd(status);
}
}
/* The cache now contains the range: [cacheValue, storedValue) */
cacheValue = storedValue - adjust;
cacheLast = storedValue + (increment ? (-1) : 1);
}
/* Return the current value and increment/decrement it by delta. */
long retVal = cacheValue;
if (increment) {
cacheValue += delta;
} else {
cacheValue -= delta;
}
/* Increment stats. */
nGets += 1;
if (cached) {
nCachedGets += 1;
}
/* Trace this method at the FINEST level. */
if (logger.isLoggable(Level.FINEST)) {
LoggerUtils.finest(logger,
db.getEnvironment().getNonNullEnvImpl(),
"Sequence.get" + " value=" + retVal +
" cached=" + cached + " wrapped=" + wrapped);
}
return retVal;
}
/**
* Returns the Database handle associated with this sequence.
*
* @return The Database handle associated with this sequence.
*/
public Database getDatabase() {
return db;
}
/**
* Returns the DatabaseEntry used to open this sequence.
*
* @return The DatabaseEntry used to open this sequence.
*/
public DatabaseEntry getKey() {
return copyEntry(key);
}
/**
* Returns statistical information about the sequence.
*
* <p>In the presence of multiple threads or processes accessing an active
* sequence, the information returned by this method may be
* out-of-date.</p>
*
* <p>The getStats method cannot be transaction-protected. For this reason,
* it should be called in a thread of control that has no open cursors or
* active transactions.</p>
*
* @param config The statistics returned; if null, default statistics are
* returned.
*
* @return Sequence statistics.
*
* @throws SequenceIntegrityException if the sequence record has been
* deleted.
*/
public SequenceStats getStats(StatsConfig config)
throws DatabaseException {
if (config == null) {
config = StatsConfig.DEFAULT;
}
if (!config.getFast()) {
/*
* storedValue may have been updated by another handle since it
* was last read by this handle. Fetch the last written value.
* READ_UNCOMMITTED must be used to avoid lock conflicts.
*/
Cursor cursor = db.openCursor(null, null);
try {
readDataRequired(cursor, LockMode.READ_UNCOMMITTED);
} finally {
cursor.close();
}
}
StatGroup stats = new StatGroup(SequenceStatDefinition.GROUP_NAME,
SequenceStatDefinition.GROUP_DESC);
new IntStat(stats, SEQUENCE_GETS, nGets);
new IntStat(stats, SEQUENCE_CACHED_GETS, nCachedGets);
new IntStat(stats, SEQUENCE_CACHE_SIZE, cacheSize);
new LongStat(stats, SEQUENCE_STORED_VALUE, storedValue);
new LongStat(stats, SEQUENCE_CACHE_VALUE, cacheValue);
new LongStat(stats, SEQUENCE_CACHE_LAST, cacheLast);
new LongStat(stats, SEQUENCE_RANGE_MIN, rangeMin);
new LongStat(stats, SEQUENCE_RANGE_MAX, rangeMax);
SequenceStats seqStats = new SequenceStats(stats);
if (config.getClear()) {
nGets = 0;
nCachedGets = 0;
}
return seqStats;
}
/**
* Reads persistent fields from the sequence record. Throws an exception
* if the key is not present in the database.
*/
private void readDataRequired(Cursor cursor, LockMode lockMode)
throws DatabaseException {
if (!readData(cursor, lockMode)) {
throw new SequenceIntegrityException
("The sequence record has been deleted while it is open.");
}
}
/**
* Reads persistent fields from the sequence record. Returns false if the
* key is not present in the database.
*/
private boolean readData(Cursor cursor, LockMode lockMode)
throws DatabaseException {
/* Fetch the sequence record. */
DatabaseEntry data = new DatabaseEntry();
OperationStatus status = cursor.getSearchKey(key, data, lockMode);
if (status != OperationStatus.SUCCESS) {
return false;
}
ByteBuffer buf = ByteBuffer.wrap(data.getData());
/* Get the persistent fields from the record data. */
byte version = buf.get();
byte flags = buf.get();
boolean unpacked = (version < 1);
rangeMin = LogUtils.readLong(buf, unpacked);
rangeMax = LogUtils.readLong(buf, unpacked);
storedValue = LogUtils.readLong(buf, unpacked);
increment = (flags & FLAG_INCR) != 0;
wrapAllowed = (flags & FLAG_WRAP) != 0;
overflow = (flags & FLAG_OVER) != 0;
return true;
}
/**
* Makes a storable database entry from the persistent fields.
*/
private DatabaseEntry makeData() {
byte[] data = new byte[MAX_DATA_SIZE];
ByteBuffer buf = ByteBuffer.wrap(data);
byte flags = 0;
if (increment) {
flags |= FLAG_INCR;
}
if (wrapAllowed) {
flags |= FLAG_WRAP;
}
if (overflow) {
flags |= FLAG_OVER;
}
buf.put(CURRENT_VERSION);
buf.put(flags);
LogUtils.writePackedLong(buf, rangeMin);
LogUtils.writePackedLong(buf, rangeMax);
LogUtils.writePackedLong(buf, storedValue);
return new DatabaseEntry(data, 0, buf.position());
}
/**
* Returns a deep copy of the given database entry.
*/
private DatabaseEntry copyEntry(DatabaseEntry entry) {
int len = entry.getSize();
byte[] data;
if (len == 0) {
data = LogUtils.ZERO_LENGTH_BYTE_ARRAY;
} else {
data = new byte[len];
System.arraycopy
(entry.getData(), entry.getOffset(), data, 0, data.length);
}
return new DatabaseEntry(data);
}
}