blob: ac3464e593b21148f66caf9ca8182af106e0327e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.geode.internal.cache;
import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.logging.log4j.Logger;
import org.apache.geode.annotations.VisibleForTesting;
import org.apache.geode.cache.DiskAccessException;
import org.apache.geode.cache.EntryDestroyedException;
import org.apache.geode.distributed.OplogCancelledException;
import org.apache.geode.internal.Assert;
import org.apache.geode.internal.cache.DiskStoreImpl.OplogCompactor;
import org.apache.geode.internal.cache.Oplog.OplogDiskEntry;
import org.apache.geode.internal.cache.entries.DiskEntry;
import org.apache.geode.internal.cache.entries.DiskEntry.Helper.Flushable;
import org.apache.geode.internal.cache.entries.DiskEntry.Helper.ValueWrapper;
import org.apache.geode.internal.cache.persistence.BytesAndBits;
import org.apache.geode.internal.cache.persistence.DiskRegionView;
import org.apache.geode.internal.logging.log4j.LogMarker;
import org.apache.geode.logging.internal.log4j.api.LogService;
/**
* An oplog used for overflow-only regions. For regions that are persistent (i.e. they can be
* recovered) see {@link Oplog}.
*
* @since GemFire prPersistSprint2
*/
class OverflowOplog implements CompactableOplog, Flushable {
private static final Logger logger = LogService.getLogger();
/* System property to override the disk store write buffer size. */
public final String WRITE_BUFFER_SIZE_SYS_PROP_NAME = "WRITE_BUF_SIZE";
/** Extension of the oplog file * */
static final String CRF_FILE_EXT = ".crf";
/** The file which will be created on disk * */
private final File diskFile;
/** boolean marked true when this oplog is closed * */
private volatile boolean closed;
private final OplogFile crf = new OplogFile();
private final ByteBuffer[] bbArray = new ByteBuffer[2];
/** The stats for this store */
private final DiskStoreStats stats;
/** The store that owns this Oplog* */
private final DiskStoreImpl parent;
/**
* The oplog set this oplog is part of
*/
private final OverflowOplogSet oplogSet;
/** oplog id * */
protected final int oplogId;
/** Directory in which the file is present* */
private final DirectoryHolder dirHolder;
/**
* The max Oplog size (user configurable). Set to zero when oplog is deleted.
*/
private long maxOplogSize;
private final OpState opState;
/**
* Set to true when this oplog will no longer be written to. Never set to false once it becomes
* true.
*/
private boolean doneAppending = false;
private final OplogDiskEntry liveEntries = new OplogDiskEntry();
/**
* A copy constructor used for creating a new oplog based on the previous Oplog. This constructor
* is invoked only from the function switchOplog
*
* @param oplogId integer identifying the new oplog
* @param dirHolder The directory in which to create new Oplog
* @param minSize Minimum oplog file size in bytes
*/
OverflowOplog(int oplogId, OverflowOplogSet parent, DirectoryHolder dirHolder, long minSize) {
this.oplogId = oplogId;
this.parent = parent.getParent();
oplogSet = parent;
this.dirHolder = dirHolder;
opState = new OpState();
long maxOplogSizeParam = this.parent.getMaxOplogSizeInBytes();
if (maxOplogSizeParam < minSize) {
maxOplogSizeParam = minSize;
}
long availableSpace = this.dirHolder.getAvailableSpace();
if (availableSpace < minSize
// fix for bug 42464
&& !this.parent.isCompactionEnabled()) {
availableSpace = minSize;
}
if (availableSpace < maxOplogSizeParam
// fix for bug 42464
&& !this.parent.isCompactionEnabled() && availableSpace > 0) {
// createOverflowOplog decided that we should use this dir. So do it.
maxOplogSize = availableSpace;
} else {
maxOplogSize = maxOplogSizeParam;
}
stats = this.parent.getStats();
closed = false;
String n = this.parent.getName();
diskFile = new File(this.dirHolder.getDir(), "OVERFLOW" + n + "_" + oplogId);
try {
createCrf(parent.getActiveOverflowOplog());
} catch (IOException ex) {
throw new DiskAccessException(
String.format("Failed creating operation log because: %s", ex),
this.parent);
}
}
DiskStoreImpl getParent() {
return parent;
}
private OverflowOplogSet getOplogSet() {
return oplogSet;
}
private void preblow() {
dirHolder.incrementTotalOplogSize(maxOplogSize);
final OplogFile olf = getOLF();
try {
olf.raf.setLength(maxOplogSize);
olf.raf.seek(0);
} catch (IOException ignore) {
// TODO: need a warning since this can impact perf.
// I don't think I need any of this. If setLength throws then
// the file is still ok.
}
}
/**
* Creates the crf oplog file
*/
private void createCrf(OverflowOplog previous) throws IOException {
File f = new File(diskFile.getPath() + CRF_FILE_EXT);
if (logger.isDebugEnabled()) {
logger.debug("Creating operation log file {}", f);
}
crf.f = f;
crf.raf = new RandomAccessFile(f, "rw");
crf.writeBuf = allocateWriteBuf(previous);
preblow();
logger.info("Created {} {} for disk store {}.",
new Object[] {toString(), "crf", parent.getName()});
crf.channel = crf.raf.getChannel();
stats.incOpenOplogs();
}
@VisibleForTesting
Integer getWriteBufferSizeProperty() {
return Integer.getInteger(WRITE_BUFFER_SIZE_SYS_PROP_NAME);
}
@VisibleForTesting
Integer getWriteBufferCapacity() {
Integer writeBufferSizeProperty = getWriteBufferSizeProperty();
if (writeBufferSizeProperty != null) {
return writeBufferSizeProperty;
}
return getParent().getWriteBufferSize();
}
private ByteBuffer allocateWriteBuf(OverflowOplog previous) {
ByteBuffer result = null;
if (previous != null) {
result = previous.consumeWriteBuf();
}
if (result == null) {
return ByteBuffer.allocateDirect(getWriteBufferCapacity());
}
return result;
}
private ByteBuffer consumeWriteBuf() {
synchronized (crf) {
ByteBuffer result = crf.writeBuf;
crf.writeBuf = null;
return result;
}
}
/**
* Returns the <code>DiskStoreStats</code> for this oplog
*/
public DiskStoreStats getStats() {
return stats;
}
/**
* Test Method to be used only for testing purposes. Gets the underlying File object for the Oplog
* . Oplog class uses this File object to obtain the RandomAccessFile object. Before returning the
* File object , the dat present in the buffers of the RandomAccessFile object is flushed.
* Otherwise, for windows the actual file length does not match with the File size obtained from
* the File object
*/
File getOplogFile() throws IOException {
synchronized (crf) {
if (!crf.RAFClosed) {
crf.raf.getFD().sync();
}
return crf.f;
}
}
/** the oplog identifier * */
public int getOplogId() {
return oplogId;
}
/**
* Returns the unserialized bytes and bits for the given Entry. If Oplog is destroyed while
* querying, then the DiskRegion is queried again to obatin the value This method should never get
* invoked for an entry which has been destroyed
*
* @since GemFire 3.2.1
* @param id The DiskId for the entry @param offset The offset in this OpLog where the entry is
* present. @param faultingIn @param bitOnly boolean indicating whether to extract just the
* UserBit or UserBit with value @return BytesAndBits object wrapping the value & user bit
*/
@Override
public BytesAndBits getBytesAndBits(DiskRegionView dr, DiskId id, boolean faultingIn,
boolean bitOnly) {
OverflowOplog retryOplog = null;
long offset = 0;
synchronized (id) {
int opId = (int) id.getOplogId();
if (opId != getOplogId()) {
// the oplog changed on us so we need to do a recursive
// call after unsyncing
retryOplog = getOplogSet().getChild(opId);
} else {
// fetch this while synced so it will be consistent with oplogId
offset = id.getOffsetInOplog();
}
}
if (retryOplog != null) {
return retryOplog.getBytesAndBits(dr, id, faultingIn, bitOnly);
}
BytesAndBits bb = null;
long start = stats.startRead();
// If the offset happens to be -1, still it is possible that
// the data is present in the current oplog file.
if (offset == -1) {
// Since it is given that a get operation has alreadty
// taken a
// lock on an entry , no put operation could have modified the
// oplog ID
// there fore synchronization is not needed
// synchronized (id) {
offset = id.getOffsetInOplog();
}
// If the current OpLog is not destroyed ( its opLogRaf file
// is still open) we can retrieve the value from this oplog.
try {
bb = basicGet(dr, offset, bitOnly, id.getValueLength(), id.getUserBits());
} catch (DiskAccessException dae) {
logger.error(
String.format("Oplog::basicGet: Error in reading the data from disk for Disk ID %s",
id),
dae);
throw dae;
}
// If bb is still null then entry has been compacted to the Htree
// or in case of concurrent get & put , to a new OpLog ( Concurrent Get
// &
// Put is not possible at this point).
// Since the compacter takes a lock on Entry as well as DiskId , the
// situation below
// will not be possible and hence commenting the code
if (bb == null) {
throw new EntryDestroyedException(
String.format(
"No value was found for entry with disk Id %s on a region with synchronous writing set to %s",
id, dr.isSync()));
}
if (bitOnly) {
dr.endRead(start, stats.endRead(start, 1), 1);
} else {
dr.endRead(start, stats.endRead(start, bb.getBytes().length), bb.getBytes().length);
}
return bb;
}
/**
* Returns the object stored on disk with the given id. This method is used for testing purposes
* only. As such, it bypasses the buffer and goes directly to the disk. This is not a thread safe
* function , in the sense, it is possible that by the time the OpLog is queried , data might move
* HTree with the oplog being destroyed
*
* @param id A DiskId object for which the value on disk will be fetched
*/
@Override
public BytesAndBits getNoBuffer(DiskRegion dr, DiskId id) {
if (logger.isTraceEnabled()) {
logger.trace("Oplog::getNoBuffer:Before invoking Oplog.basicGet for DiskID ={}", id);
}
try {
return basicGet(dr, id.getOffsetInOplog(), false, id.getValueLength(), id.getUserBits());
} catch (DiskAccessException dae) {
logger.error(
String.format("Oplog::getNoBuffer:Exception in retrieving value from disk for diskId=%s",
id),
dae);
throw dae;
} catch (IllegalStateException ise) {
logger.error(
String.format("Oplog::getNoBuffer:Exception in retrieving value from disk for diskId=%s",
id),
ise);
throw ise;
}
}
void freeEntry(DiskEntry de) {
rmLive(de);
}
/**
* Call this when the cache is closed or region is destroyed. Deletes the lock files and if it is
* Overflow only, deletes the oplog file as well
*/
public void close() {
if (closed) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Oplog::close: Store name ={} Oplog ID = {}", parent.getName(), oplogId);
}
basicClose();
}
/**
* Close the files of a oplog but don't set any state. Used by unit tests
*/
public void testClose() {
try {
crf.channel.close();
} catch (IOException ignore) {
}
try {
crf.raf.close();
} catch (IOException ignore) {
}
}
private void basicClose() {
flushAll();
synchronized (crf) {
if (!crf.RAFClosed) {
try {
crf.channel.close();
} catch (IOException ignore) {
}
try {
crf.raf.close();
} catch (IOException ignore) {
}
crf.RAFClosed = true;
stats.decOpenOplogs();
}
closed = true;
}
deleteFiles();
}
/**
* Destroys this oplog. First it will call close which will cleanly close all Async threads. The
* deletion of lock files will be taken care of by the close. Close will also take care of
* deleting the files if it is overflow only mode
*
*/
public void destroy() {
if (!closed) {
lockCompactor();
try {
basicClose();
} finally {
unlockCompactor();
}
}
}
/**
* A check to confirm that the oplog has been closed because of the cache being closed
*
*/
private void checkClosed() {
parent.getCancelCriterion().checkCancelInProgress(null);
if (!closed) {
return;
}
throw new OplogCancelledException("This Oplog has been closed.");
}
/**
* Return absolute value of v.
*/
static long abs(long v) {
if (v < 0) {
return -v;
} else {
return v;
}
}
private void initOpState(DiskEntry entry, ValueWrapper value, byte userBits) {
opState.initialize(entry, value, userBits);
}
private void clearOpState() {
opState.clear();
}
/**
* Returns the number of bytes it will take to serialize this.opState.
*/
private int getOpStateSize() {
return opState.getSize();
}
private byte calcUserBits(ValueWrapper value) {
return value.getUserBits();
}
/**
* Modifies a key/value pair from a region entry on disk. Updates all of the necessary
* {@linkplain DiskStoreStats statistics} and invokes basicModify
* <p>
* Modified the code so as to reuse the already created ByteBuffer during transition. Minimizing
* the synchronization allowing multiple put operations for different entries to proceed
* concurrently for asynch mode
*
* @param entry DiskEntry object representing the current Entry
* @param value byte array representing the value
* @return true if modify was done; false if this file did not have room
*/
public boolean modify(DiskRegion dr, DiskEntry entry, ValueWrapper value, boolean async) {
try {
byte userBits = calcUserBits(value);
return basicModify(entry, value, userBits, async);
} catch (IOException ex) {
throw new DiskAccessException(
String.format("Failed writing key to %s", diskFile.getPath()),
ex, dr.getName());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
dr.getCancelCriterion().checkCancelInProgress(ie);
throw new DiskAccessException(
String.format(
"Failed writing key to %s due to failure in acquiring read lock for asynch writing",
diskFile.getPath()),
ie, dr.getName());
}
}
public boolean copyForwardForOverflowCompact(DiskEntry entry, byte[] value, int length,
byte userBits) {
try {
ValueWrapper vw = new DiskEntry.Helper.CompactorValueWrapper(value, length);
return basicModify(entry, vw, userBits, true);
} catch (IOException ex) {
throw new DiskAccessException(
String.format("Failed writing key to %s", diskFile.getPath()),
ex, getParent().getName());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
getParent().getCancelCriterion().checkCancelInProgress(ie);
throw new DiskAccessException(
String.format(
"Failed writing key to %s due to failure in acquiring read lock for asynch writing",
diskFile.getPath()),
ie, getParent().getName());
}
}
/**
* A helper function which identifies whether to modify the entry in the current oplog or to make
* the switch to the next oplog. This function enables us to reuse the byte buffer which got
* created for an oplog which no longer permits us to use itself. It will also take acre of
* compaction if required
*
* @param entry DiskEntry object representing the current Entry
* @return true if modify was done; false if this file did not have room
*/
private boolean basicModify(DiskEntry entry, ValueWrapper value, byte userBits, boolean async)
throws IOException, InterruptedException {
DiskId id = entry.getDiskId();
long startPosForSynchOp = -1L;
OverflowOplog emptyOplog = null;
synchronized (crf) {
initOpState(entry, value, userBits);
int adjustment = getOpStateSize();
assert adjustment > 0;
int oldOplogId;
// do the io while holding lock so that switch can set doneAppending
// Write the data to the opLog for the synch mode
startPosForSynchOp = writeOpLogBytes(async);
if (startPosForSynchOp == -1) {
return false;
} else {
if (logger.isTraceEnabled()) {
logger.trace("Oplog::basicModify:Released ByteBuffer with data for Disk ID = {}", id);
}
synchronized (id) {
// Need to do this while synced on id
oldOplogId = (int) id.setOplogId(getOplogId());
id.setOffsetInOplog(startPosForSynchOp);
if (EntryBits.isNeedsValue(userBits)) {
id.setValueLength(value.getLength());
} else {
id.setValueLength(0);
}
id.setUserBits(userBits);
}
// Note: we always call rmLive (and addLive) even if this mod was to an entry
// last modified in this same oplog to cause the list to be sorted by offset
// so when the compactor iterates over it will read values with a sequential scan
// instead of hopping around the oplog.
if (oldOplogId > 0) {
OverflowOplog oldOplog = getOplogSet().getChild(oldOplogId);
if (oldOplog != null) {
if (oldOplog.rmLive(entry)) {
if (oldOplogId != getOplogId()) {
emptyOplog = oldOplog;
}
}
}
}
addLive(entry);
}
clearOpState();
}
if (LocalRegion.ISSUE_CALLBACKS_TO_CACHE_OBSERVER) {
CacheObserverHolder.getInstance().afterSettingOplogOffSet(startPosForSynchOp);
}
if (emptyOplog != null
&& (!emptyOplog.isCompacting() || emptyOplog.calledByCompactorThread())) {
if (emptyOplog.calledByCompactorThread() && emptyOplog.hasNoLiveValues()) {
flushAll();
}
emptyOplog.handleNoLiveValues();
}
return true;
}
/**
* Removes the key/value pair with the given id on disk.
*
* @param entry DiskEntry object on which remove operation is called
*/
public void remove(DiskRegion dr, DiskEntry entry) {
try {
basicRemove(dr, entry);
} catch (IOException ex) {
throw new DiskAccessException(
String.format("Failed writing key to %s", diskFile.getPath()),
ex, dr.getName());
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
dr.getCancelCriterion().checkCancelInProgress(ie);
throw new DiskAccessException(
String.format(
"Failed writing key to %s due to failure in acquiring read lock for asynch writing",
diskFile.getPath()),
ie, dr.getName());
}
}
/**
* A helper function which identifies whether to record a removal of entry in the current oplog or
* to make the switch to the next oplog. This function enables us to reuse the byte buffer which
* got created for an oplog which no longer permits us to use itself.
*
* @param entry DiskEntry object representing the current Entry
*/
private void basicRemove(DiskRegion dr, DiskEntry entry)
throws IOException, InterruptedException {
DiskId id = entry.getDiskId();
// no need to lock since this code no longer does io
if (EntryBits.isNeedsValue(id.getUserBits())) {
// oplogId already done up in DiskStoreImpl
long oldOffset = id.getOffsetInOplog();
if (oldOffset != -1) {
id.setOffsetInOplog(-1);
if (rmLive(entry)) {
if (!isCompacting() || calledByCompactorThread()) {
handleNoLiveValues();
}
}
}
}
}
/**
* test hook
*/
public ByteBuffer getWriteBuf() {
return crf.writeBuf;
}
private static final int MAX_CHANNEL_RETRIES = 5;
@Override
public void flush() throws IOException {
final OplogFile olf = crf;
synchronized (olf) {
if (olf.RAFClosed) {
return;
}
try {
ByteBuffer bb = olf.writeBuf;
if (bb != null && bb.position() != 0) {
bb.flip();
int flushed = 0;
int numChannelRetries = 0;
do {
int channelBytesWritten = 0;
final int bbStartPos = bb.position();
final long channelStartPos = olf.channel.position();
// differentiate between bytes written on this channel.write() iteration and the
// total number of bytes written to the channel on this call
channelBytesWritten = olf.channel.write(bb);
// Expect channelBytesWritten and the changes in pp.position() and channel.position() to
// be the same. If they are not, then the channel.write() silently failed. The following
// retry separates spurious failures from permanent channel failures.
if (channelBytesWritten != bb.position() - bbStartPos) {
if (numChannelRetries++ < MAX_CHANNEL_RETRIES) {
// Reset the ByteBuffer position, but take into account anything that did get
// written to the channel
channelBytesWritten = (int) (olf.channel.position() - channelStartPos);
bb.position(bbStartPos + channelBytesWritten);
} else {
throw new IOException("Failed to write Oplog entry to " + olf.f.getName() + ": "
+ "channel.write() returned " + channelBytesWritten + ", "
+ "change in channel position = " + (olf.channel.position() - channelStartPos)
+ ", " + "change in source buffer position = " + (bb.position() - bbStartPos));
}
}
flushed += channelBytesWritten;
} while (bb.hasRemaining());
// update bytesFlushed after entire writeBuffer is flushed to fix bug 41201
olf.bytesFlushed += flushed;
bb.clear();
}
} catch (ClosedChannelException ignore) {
// It is possible for a channel to be closed when our code does not
// explicitly call channel.close (when we will set RAFclosed).
// This can happen when a thread is doing an io op and is interrupted.
// That thread will see ClosedByInterruptException but it will also
// close the channel and then we will see ClosedChannelException.
}
}
}
/**
* Method to be used only for testing
*
* @param ch Object to replace the channel in the Oplog.crf
* @return original channel object
*/
FileChannel testSetCrfChannel(FileChannel ch) {
FileChannel chPrev = crf.channel;
crf.channel = ch;
return chPrev;
}
@Override
public void flush(ByteBuffer b1, ByteBuffer b2) throws IOException {
final OplogFile olf = crf;
synchronized (olf) {
if (olf.RAFClosed) {
return;
}
try {
bbArray[0] = b1;
bbArray[1] = b2;
b1.flip();
long flushed = 0;
do {
flushed += olf.channel.write(bbArray);
} while (b2.hasRemaining());
bbArray[0] = null;
bbArray[1] = null;
// update bytesFlushed after entire writeBuffer is flushed to fix bug 41201
olf.bytesFlushed += flushed;
b1.clear();
} catch (ClosedChannelException ignore) {
// It is possible for a channel to be closed when our code does not
// explicitly call channel.close (when we will set RAFclosed).
// This can happen when a thread is doing an io op and is interrupted.
// That thread will see ClosedByInterruptException but it will also
// close the channel and then we will see ClosedChannelException.
}
}
}
public void flushAll() {
try {
flush();
} catch (IOException ex) {
throw new DiskAccessException(
String.format("Failed writing key to %s", diskFile.getPath()),
ex, parent);
}
}
/**
* Since the ByteBuffer being writen to can have additional bytes which are used for extending the
* size of the file, it is necessary that the ByteBuffer provided should have limit which is set
* to the position till which it contains the actual bytes. If the mode is synched write then only
* we will write up to the capacity & opLogSpace variable have any meaning. For asynch mode it
* will be zero. Also this method must be synchronized on the file , whether we use synch or
* asynch write because the fault in operations can clash with the asynch writing. Write the
* specified bytes to the oplog. Note that since extending a file is expensive this code will
* possibly write OPLOG_EXTEND_SIZE zero bytes to reduce the number of times the file is extended.
*
*
* @return The long offset at which the data present in the ByteBuffer gets written to
*/
private long writeOpLogBytes(boolean async) throws IOException {
long startPos = -1L;
final OplogFile olf = crf;
synchronized (olf) {
if (doneAppending) {
return -1;
}
if (closed) {
Assert.assertTrue(false, this + " for store " + parent.getName()
+ " has been closed for synch mode while writing is going on. This should not happen");
}
// It is assumed that the file pointer is already at the
// appropriate position in the file so as to allow writing at the end.
// Any fault in operations will set the pointer back to the write location.
// Also it is only in case of synch writing, we are writing more
// than what is actually needed, we will have to reset the pointer.
// Also need to add in offset in writeBuf in case we are not flushing writeBuf
long curFileOffset = olf.channel.position() + olf.writeBuf.position();
startPos = allocate(curFileOffset, getOpStateSize());
if (startPos != -1) {
if (startPos != curFileOffset) {
flush();
olf.channel.position(startPos);
olf.bytesFlushed = startPos;
stats.incOplogSeeks();
}
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
logger.trace(LogMarker.PERSIST_WRITES_VERBOSE, "writeOpLogBytes startPos={} oplog#{}",
startPos, getOplogId());
}
long oldBytesFlushed = olf.bytesFlushed;
long bytesWritten = opState.write();
if ((startPos + bytesWritten) > olf.currSize) {
olf.currSize = startPos + bytesWritten;
}
if (logger.isTraceEnabled(LogMarker.PERSIST_WRITES_VERBOSE)) {
logger.trace(LogMarker.PERSIST_WRITES_VERBOSE,
"writeOpLogBytes bytesWritten={} oldBytesFlushed={} byteFlushed={} oplog#{}",
bytesWritten, oldBytesFlushed, olf.bytesFlushed, getOplogId());
}
if (oldBytesFlushed != olf.bytesFlushed) {
// opState.write must have done an implicit flush
// so we need to do an explicit flush so the value
// can be read back in entirely from disk.
flush();
}
getStats().incWrittenBytes(bytesWritten, async);
// // Moved the set of lastWritePos to after write
// // so if write throws an exception it will not be updated.
// // This fixes bug 40449.
}
}
return startPos;
}
private BytesAndBits attemptGet(DiskRegionView dr, long offsetInOplog, int valueLength,
byte userBits) throws IOException {
synchronized (crf) {
assert offsetInOplog >= 0;
RandomAccessFile myRAF = crf.raf;
BytesAndBits bb = null;
long writePosition = 0;
if (!doneAppending) {
writePosition = myRAF.getFilePointer();
bb = attemptWriteBufferGet(writePosition, offsetInOplog, valueLength, userBits);
if (bb == null) {
if ((offsetInOplog + valueLength) > crf.bytesFlushed && !closed) {
flushAll(); // fix for bug 41205
writePosition = myRAF.getFilePointer();
}
}
}
if (bb == null) {
myRAF.seek(offsetInOplog);
try {
stats.incOplogSeeks();
byte[] valueBytes = new byte[valueLength];
myRAF.readFully(valueBytes);
stats.incOplogReads();
bb = new BytesAndBits(valueBytes, userBits);
} finally {
// if this oplog is no longer being appended to then don't waste disk io
if (!doneAppending) {
myRAF.seek(writePosition);
stats.incOplogSeeks();
}
}
}
return bb;
} // sync
}
private BytesAndBits attemptWriteBufferGet(long writePosition, long readPosition, int valueLength,
byte userBits) {
BytesAndBits bb = null;
ByteBuffer writeBuf = crf.writeBuf;
int curWriteBufPos = writeBuf.position();
if (writePosition <= readPosition
&& (writePosition + curWriteBufPos) >= (readPosition + valueLength)) {
int bufOffset = (int) (readPosition - writePosition);
byte[] valueBytes = new byte[valueLength];
int oldLimit = writeBuf.limit();
writeBuf.limit(curWriteBufPos);
writeBuf.position(bufOffset);
writeBuf.get(valueBytes);
writeBuf.position(curWriteBufPos);
writeBuf.limit(oldLimit);
bb = new BytesAndBits(valueBytes, userBits);
}
return bb;
}
/**
* Extracts the Value byte array & UserBit from the OpLog
*
* @param offsetInOplog The starting position from which to read the data in the opLog
* @param bitOnly boolean indicating whether the value needs to be extracted along with the
* UserBit or not.
* @param valueLength The length of the byte array which represents the value
* @param userBits The userBits of the value.
* @return BytesAndBits object which wraps the extracted value & user bit
*/
private BytesAndBits basicGet(DiskRegionView dr, long offsetInOplog, boolean bitOnly,
int valueLength, byte userBits) {
BytesAndBits bb = null;
if (EntryBits.isAnyInvalid(userBits) || EntryBits.isTombstone(userBits) || bitOnly
|| valueLength == 0) {
if (EntryBits.isInvalid(userBits)) {
bb = new BytesAndBits(DiskEntry.INVALID_BYTES, userBits);
} else if (EntryBits.isTombstone(userBits)) {
bb = new BytesAndBits(DiskEntry.TOMBSTONE_BYTES, userBits);
} else {
bb = new BytesAndBits(DiskEntry.LOCAL_INVALID_BYTES, userBits);
}
} else {
if (offsetInOplog == -1) {
return null;
}
try {
for (;;) {
dr.getCancelCriterion().checkCancelInProgress(null);
boolean interrupted = Thread.interrupted();
try {
bb = attemptGet(dr, offsetInOplog, valueLength, userBits);
break;
} catch (InterruptedIOException ignore) {
// bug 39756
// ignore, we'll clear and retry.
} finally {
if (interrupted) {
Thread.currentThread().interrupt();
}
}
} // for
} catch (IOException ex) {
throw new DiskAccessException(
String.format(
"Failed reading from %s. oplogID, %s Offset being read= %s Current Oplog Size= %s Actual File Size, %s IS ASYNCH MODE, %s IS ASYNCH WRITER ALIVE= %s",
diskFile.getPath(), (long) oplogId, offsetInOplog,
crf.currSize, crf.bytesFlushed, !dr.isSync(), false),
ex, dr.getName());
} catch (IllegalStateException ex) {
checkClosed();
throw ex;
}
}
return bb;
}
private final AtomicBoolean deleted = new AtomicBoolean();
/**
* deletes the oplog's file(s)
*/
void deleteFiles() {
boolean needsDestroy = deleted.compareAndSet(false, true);
if (needsDestroy) {
getOplogSet().removeOverflow(this);
deleteFile();
}
}
private void deleteFile() {
final OplogFile olf = getOLF();
if (maxOplogSize != 0) {
dirHolder.decrementTotalOplogSize(maxOplogSize);
maxOplogSize = 0;
olf.currSize = 0;
}
if (olf.f == null) {
return;
}
if (!olf.f.exists()) {
return;
}
if (!olf.f.delete() && olf.f.exists()) {
throw new DiskAccessException(
String.format("Could not delete %s.", olf.f.getAbsolutePath()),
parent);
}
logger.info("Deleted {} {} for disk store {}.",
toString(), "crf", parent.getName());
}
/**
* Helper function for the test
*
* @return FileChannel object representing the Oplog
*/
FileChannel getFileChannel() {
return crf.channel;
}
DirectoryHolder getDirectoryHolder() {
return dirHolder;
}
/**
* The current size of Oplog. It may be less than the actual Oplog file size ( in case of asynch
* writing as it also takes into account data present in asynch buffers which will get flushed in
* course of time o
*
* @return long value indicating the current size of the oplog.
*/
long getOplogSize() {
return crf.currSize;
}
/**
* The HighWaterMark of recentValues.
*/
private final AtomicLong totalCount = new AtomicLong(0);
/**
* The number of records in this oplog that contain the most recent value of the entry.
*/
private final AtomicLong totalLiveCount = new AtomicLong(0);
private long allocate(long suggestedOffset, int length) {
if (suggestedOffset + length > maxOplogSize) {
flushAll();
doneAppending = true;
return -1;
} else {
return suggestedOffset;
}
}
private void addLive(DiskEntry de) {
totalCount.incrementAndGet();
totalLiveCount.incrementAndGet();
if (isCompactionPossible()) {
liveEntries.insert(de);
}
}
private boolean rmLive(DiskEntry de) {
if (isCompactionPossible()) {
// Fix for 49898 - by synchronizing on the live entries, we ensure
// that if the compaction thread does not find an entry in the live list,
// it will also see an updated totalLiveCount.
synchronized (liveEntries) {
if (liveEntries.remove(de)) {
totalLiveCount.decrementAndGet();
return true;
} else {
return false;
}
}
} else {
totalLiveCount.decrementAndGet();
return true;
}
}
/**
* Return true if it is possible that compaction of this oplog will be done.
*/
private boolean isCompactionPossible() {
return getParent().isCompactionPossible();
}
boolean needsCompaction() {
if (!isCompactionPossible()) {
return false;
}
if (getParent().getCompactionThreshold() == 100) {
return true;
}
if (getParent().getCompactionThreshold() == 0) {
return false;
}
// otherwise check if we have enough garbage to collect with a compact
long rvHWMtmp = totalCount.get();
if (rvHWMtmp > 0) {
long tlc = totalLiveCount.get();
if (tlc < 0) {
tlc = 0;
}
double rv = tlc;
return ((rv / (double) rvHWMtmp) * 100) <= parent.getCompactionThreshold();
} else {
return true;
}
}
public boolean hasNoLiveValues() {
return totalLiveCount.get() <= 0;
}
private void handleEmpty(boolean calledByCompactor) {
if (!calledByCompactor) {
logger.info("Closing {} early since it is empty. It is for disk store {}.",
new Object[] {parent.getName(), toString()});
}
destroy();
}
private void handleNoLiveValues() {
if (!doneAppending) {
return;
}
// At one point this method was a noop for the pure overflow case.
// But it turns out it was cheaper to delete empty oplogs and create
// new ones instead of overwriting the empty one.
// This is surprising and may be specific to Linux ext3.
// So at some point we may want to comment out the following block
// of code and check performance again.
if (hasNoLiveValues()) {
getOplogSet().removeOverflow(this);
if (calledByCompactorThread()) {
handleEmpty(true);
} else {
getParent().executeDiskStoreTask(() -> handleEmpty(false));
}
} else if (!isCompacting() && needsCompaction()) {
addToBeCompacted();
}
}
private void addToBeCompacted() {
getOplogSet().addOverflowToBeCompacted(this);
}
long testGetOplogFileLength() throws IOException {
long result = 0;
if (crf.raf != null) {
result += crf.raf.length();
}
return result;
}
private OplogFile getOLF() {
return crf;
}
private static class OplogFile {
public File f;
public RandomAccessFile raf;
public boolean RAFClosed;
public FileChannel channel;
public ByteBuffer writeBuf;
public long currSize; // HWM
public long bytesFlushed;
}
/**
* Holds all the state for the current operation. Since an oplog can only have one operation in
* progress at any given time we only need a single instance of this class per oplog.
*/
private class OpState {
private byte userBits;
/**
* How many bytes it will be when serialized
*/
private int size;
private boolean needsValue;
private ValueWrapper value;
public int getSize() {
return size;
}
/**
* Free up any references to possibly large data.
*/
public void clear() {
value = null;
}
private void write(ValueWrapper vw) throws IOException {
vw.sendTo(getOLF().writeBuf, OverflowOplog.this);
}
public void initialize(DiskEntry entry, ValueWrapper value, byte userBits) {
this.userBits = userBits;
this.value = value;
size = 0;
needsValue = EntryBits.isNeedsValue(this.userBits);
if (needsValue) {
size += this.value.getLength();
}
}
public long write() throws IOException {
long bytesWritten = 0;
if (needsValue) {
int valueLength = value.getLength();
if (valueLength > 0) {
write(value);
bytesWritten += valueLength;
}
}
return bytesWritten;
}
}
// private static String baToString(byte[] ba) {
// if ( ba == null) return "null";
// StringBuilder sb = new StringBuilder();
// for (int i=0; i < ba.length; i++) {
// sb.append(ba[i]).append(", ");
// }
// return sb.toString();
// }
private DiskEntry getNextLiveEntry() {
DiskEntry result = liveEntries.getPrev();
if (result == liveEntries) {
result = null;
}
return result;
}
@Override
public String toString() {
return "oplog#OV" + getOplogId();
}
private boolean compacting;
private boolean isCompacting() {
return compacting;
}
@Override
public void prepareForCompact() {
compacting = true;
}
private static final ThreadLocal isCompactorThread = new ThreadLocal();
private boolean calledByCompactorThread() {
if (!compacting) {
return false;
}
Object v = isCompactorThread.get();
return v != null && v == Boolean.TRUE;
}
private final Lock compactorLock = new ReentrantLock();
private void lockCompactor() {
compactorLock.lock();
}
private void unlockCompactor() {
compactorLock.unlock();
}
@Override
public int compact(OplogCompactor compactor) {
if (!needsCompaction()) {
return 0;
}
isCompactorThread.set(Boolean.TRUE);
getParent().acquireCompactorReadLock();
try {
lockCompactor();
try {
if (hasNoLiveValues()) {
handleNoLiveValues();
return 0;
}
// Start with a fresh wrapper on every compaction so that
// if previous run used some high memory byte array which was
// exceptional, it gets garbage collected.
long opStart = getStats().getStatTime();
BytesAndBitsForCompactor wrapper = new BytesAndBitsForCompactor();
DiskEntry de;
DiskEntry lastDe = null;
boolean compactFailed = !compactor.keepCompactorRunning();
int totalCount = 0;
boolean didCompact = false;
while ((de = getNextLiveEntry()) != null) {
if (!compactor.keepCompactorRunning()) {
compactFailed = true;
break;
}
if (lastDe != null) {
if (lastDe == de) {
throw new IllegalStateException("compactor would have gone into infinite loop");
}
assert lastDe != de;
}
lastDe = de;
didCompact = false;
synchronized (de) { // fix for bug 41797
DiskId did = de.getDiskId();
assert did != null;
synchronized (did) {
long oplogId = did.getOplogId();
if (oplogId != getOplogId()) {
if (oplogId == -1) {
// to prevent bug 42304 do a rmLive call
rmLive(de);
}
continue;
}
// Bug 42304 - If the entry has been invalidated, don't copy it forward.
boolean toCompact = getBytesAndBitsForCompaction(de, wrapper);
if (toCompact) {
byte[] valueBytes = wrapper.getBytes();
int length = wrapper.getValidLength();
byte userBits = wrapper.getBits();
if (oplogId != did.getOplogId()) {
// @todo: Is this even possible? Perhaps I should just assert here
// skip this one, its oplogId changed
if (did.getOplogId() == -1) {
// to prevent bug 42304 do a rmLive call
rmLive(de);
}
if (!wrapper.isReusable()) {
wrapper = new BytesAndBitsForCompactor();
}
continue;
}
if (EntryBits.isAnyInvalid(userBits)) {
rmLive(de);
if (!wrapper.isReusable()) {
wrapper = new BytesAndBitsForCompactor();
}
continue;
}
// write it to the current oplog
getOplogSet().copyForwardForOverflowCompact(de, valueBytes, length, userBits);
// the did's oplogId will now be set to the current active oplog
didCompact = true;
}
} // did
} // de
if (didCompact) {
totalCount++;
getStats().endCompactionUpdate(opStart);
opStart = getStats().getStatTime();
// Check if the value byte array happens to be any of the constant
// static byte arrays or references the value byte array of underlying RegionEntry.
// If so for preventing data corruption across regions
// ( in case of static byte arrays) & for RegionEntry,
// recreate the wrapper
if (!wrapper.isReusable()) {
wrapper = new BytesAndBitsForCompactor();
}
}
}
if (!compactFailed) {
// Need to still remove the oplog even if it had nothing to compact.
handleNoLiveValues();
}
return totalCount;
} finally {
unlockCompactor();
isCompactorThread.remove();
}
} finally {
getParent().releaseCompactorReadLock();
}
}
/**
* This function retrieves the value for an entry being compacted subject to entry referencing the
* oplog being compacted. Attempt is made to retrieve the value from in memory , if available,
* else from asynch buffers ( if asynch mode is enabled), else from the Oplog being compacted. It
* is invoked from switchOplog as well as OplogCompactor's compact function.
*
* @param entry DiskEntry being compacted referencing the Oplog being compacted
* @param wrapper Object of type BytesAndBitsForCompactor. The data if found is set in the wrapper
* Object. The wrapper Object also contains the user bit associated with the entry
* @return boolean false indicating that entry need not be compacted. If true it means that
* wrapper has been appropriately filled with data
*/
private boolean getBytesAndBitsForCompaction(DiskEntry entry, BytesAndBitsForCompactor wrapper) {
// caller is synced on did
DiskId did = entry.getDiskId();
byte userBits = 0;
long oplogOffset = did.getOffsetInOplog();
boolean foundData = false;
if (entry.isValueNull()) {
// If the mode is synch it is guaranteed to be present in the disk
foundData = basicGetForCompactor(oplogOffset, false, did.getValueLength(), did.getUserBits(),
wrapper);
// after we have done the get do one more check to see if the
// disk id of interest is still stored in the current oplog.
// Do this to fix bug 40648
// Since we now call this with the diskId synced I think
// it is impossible for this oplogId to change.
if (did.getOplogId() != getOplogId()) {
return false;
} else {
// if the disk id indicates its most recent value is in oplogInFocus
// then we should have found data
assert foundData : "compactor get failed on oplog#" + getOplogId();
}
userBits = wrapper.getBits();
if (EntryBits.isAnyInvalid(userBits)) {
if (EntryBits.isInvalid(userBits)) {
wrapper.setData(DiskEntry.INVALID_BYTES, userBits, DiskEntry.INVALID_BYTES.length,
false/* Can not be reused */);
} else {
wrapper.setData(DiskEntry.LOCAL_INVALID_BYTES, userBits,
DiskEntry.LOCAL_INVALID_BYTES.length, false/* Can not be reused */);
}
} else if (EntryBits.isTombstone(userBits)) {
wrapper.setData(DiskEntry.TOMBSTONE_BYTES, userBits, DiskEntry.TOMBSTONE_BYTES.length,
false/* Can not be reused */);
}
} else {
entry.getDiskId().markForWriting();
rmLive(entry);
foundData = false;
}
if (foundData) {
// since the compactor is writing it out clear the async flag
entry.getDiskId().setPendingAsync(false);
}
return foundData;
}
/**
* Extracts the Value byte array & UserBit from the OpLog and inserts it in the wrapper Object of
* type BytesAndBitsForCompactor which is passed
*
* @param offsetInOplog The starting position from which to read the data in the opLog
* @param bitOnly boolean indicating whether the value needs to be extracted along with the
* UserBit or not.
* @param valueLength The length of the byte array which represents the value
* @param userBits The userBits of the value.
* @param wrapper Object of type BytesAndBitsForCompactor. The data is set in the wrapper Object.
* The wrapper Object also contains the user bit associated with the entry
* @return true if data is found false if not
*/
private boolean basicGetForCompactor(long offsetInOplog, boolean bitOnly, int valueLength,
byte userBits, BytesAndBitsForCompactor wrapper) {
if (EntryBits.isAnyInvalid(userBits) || EntryBits.isTombstone(userBits) || bitOnly
|| valueLength == 0) {
if (EntryBits.isInvalid(userBits)) {
wrapper.setData(DiskEntry.INVALID_BYTES, userBits, DiskEntry.INVALID_BYTES.length,
false /* Cannot be reused */);
} else if (EntryBits.isTombstone(userBits)) {
wrapper.setData(DiskEntry.TOMBSTONE_BYTES, userBits, DiskEntry.TOMBSTONE_BYTES.length,
false /* Cannot be reused */);
} else {
wrapper.setData(DiskEntry.LOCAL_INVALID_BYTES, userBits,
DiskEntry.LOCAL_INVALID_BYTES.length, false /* Cannot be reused */);
}
} else {
try {
synchronized (crf) {
if (/*
* !getParent().isSync() since compactor groups writes &&
*/ (offsetInOplog + valueLength) > crf.bytesFlushed && !closed) {
flushAll(); // fix for bug 41205
}
final long writePosition =
(doneAppending) ? crf.bytesFlushed : crf.raf.getFilePointer();
if ((offsetInOplog + valueLength) > writePosition) {
throw new DiskAccessException(
String.format(
"Tried to seek to %s, but the file length is %s. Oplog File object used for reading=%s",
offsetInOplog + valueLength, writePosition, crf.raf),
getParent().getName());
} else if (offsetInOplog < 0) {
throw new DiskAccessException(
String.format("Cannot find record %s when reading from %s",
offsetInOplog, diskFile.getPath()),
getParent().getName());
}
try {
crf.raf.seek(offsetInOplog);
stats.incOplogSeeks();
byte[] valueBytes = null;
if (wrapper.getBytes().length < valueLength) {
valueBytes = new byte[valueLength];
crf.raf.readFully(valueBytes);
} else {
valueBytes = wrapper.getBytes();
crf.raf.readFully(valueBytes, 0, valueLength);
}
stats.incOplogReads();
wrapper.setData(valueBytes, userBits, valueLength, true);
} finally {
// if this oplog is no longer being appended to then don't waste disk io
if (!doneAppending) {
crf.raf.seek(writePosition);
stats.incOplogSeeks();
}
// if (this.closed || this.deleted.get()) {
// throw new DiskAccessException("attempting get on "
// + (this.deleted.get() ? "destroyed" : "closed")
// + " oplog #" + getOplogId(), this.owner);
// }
}
}
} catch (IOException ex) {
throw new DiskAccessException(
String.format(
"Failed reading from %s. oplogID, %s Offset being read=%s Current Oplog Size=%s Actual File Size,%s IS ASYNCH MODE,%s IS ASYNCH WRITER ALIVE=%s",
diskFile.getPath(), (long) oplogId, offsetInOplog,
crf.currSize, crf.bytesFlushed, false, false),
ex, getParent().getName());
} catch (IllegalStateException ex) {
checkClosed();
throw ex;
}
}
return true;
}
}