blob: 15db2139533be2211cd5f742a924af8a61de2d89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
/**
* File handle for one log segment.
*/
@SuppressWarnings("SignalWithoutCorrespondingAwait")
class FsyncFileWriteHandle extends AbstractFileHandle implements FileWriteHandle {
/** */
private final RecordSerializer serializer;
/** Max segment size. */
private final long maxSegmentSize;
/** Serializer latest version to use. */
private final int serializerVersion =
IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
/**
* Accumulated WAL records chain. This reference points to latest WAL record. When writing records chain is iterated
* from latest to oldest (see {@link WALRecord#previous()}) Records from chain are saved into buffer in reverse
* order
*/
final AtomicReference<WALRecord> head = new AtomicReference<>();
/**
* Position in current file after the end of last written record (incremented after file channel write operation)
*/
private volatile long written;
/** */
private volatile long lastFsyncPos;
/** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */
private final AtomicBoolean stop = new AtomicBoolean(false);
/** */
private final Lock lock = new ReentrantLock();
/** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */
private final Condition writeComplete = lock.newCondition();
/** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()} */
private final Condition fsync = lock.newCondition();
/**
* Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link
* #fileIO}=<code>null</code>
*/
private final Condition nextSegment = lock.newCondition();
/** */
private final WALMode mode;
/** Thread local byte buffer size, see {@link #tlb} */
private final int tlbSize;
/** Context. */
protected final GridCacheSharedContext cctx;
/** Persistence metrics tracker. */
private final DataStorageMetricsImpl metrics;
/** */
private final @Nullable CdcProcessor cdcProc;
/** Logger. */
protected final IgniteLogger log;
/** Fsync delay. */
private final long fsyncDelay;
/** Switch segment record offset. */
private int switchSegmentRecordOffset;
/**
* Thread local byte buffer for saving serialized WAL records chain, see {@link FsyncFileWriteHandle#head}.
* Introduced to decrease number of buffers allocation. Used only for record itself is shorter than {@link
* #tlbSize}.
*/
private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() {
@Override protected ByteBuffer initialValue() {
ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize);
buf.order(GridUnsafe.NATIVE_BYTE_ORDER);
return buf;
}
};
/**
* @param cctx Context.
* @param fileIO I/O file interface to use.
* @param metrics Data storage metrics.
* @param serializer Serializer.
* @param pos Position.
* @param mode WAL mode.
* @param maxSegmentSize Max segment size.
* @param size Thread local byte buffer size.
* @param fsyncDelay Fsync delay.
* @throws IOException If failed.
*/
FsyncFileWriteHandle(
GridCacheSharedContext cctx, SegmentIO fileIO,
DataStorageMetricsImpl metrics, RecordSerializer serializer, CdcProcessor cdcProc, long pos,
WALMode mode, long maxSegmentSize, int size, long fsyncDelay) throws IOException {
super(fileIO);
assert serializer != null;
this.mode = mode;
tlbSize = size;
this.cctx = cctx;
this.metrics = metrics;
this.log = cctx.logger(FsyncFileWriteHandle.class);
this.fsyncDelay = fsyncDelay;
this.maxSegmentSize = maxSegmentSize;
this.serializer = serializer;
this.cdcProc = cdcProc;
this.written = pos;
this.lastFsyncPos = pos;
head.set(new FakeRecord(new WALPointer(fileIO.getSegmentId(), (int)pos, 0), false));
fileIO.position(pos);
}
/** {@inheritDoc} */
@Override public int serializerVersion() {
return serializer.version();
}
/** {@inheritDoc} */
@Override public void finishResumeLogging() {
// NOOP.
}
/**
* Write serializer version to current handle. NOTE: Method mutates {@code fileIO} position, written and
* lastFsyncPos fields.
*
* @throws StorageException If fail to write serializer version.
*/
@Override public void writeHeader() throws StorageException {
try {
assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " +
fileIO.position();
long updatedPosition = writeSerializerVersion(fileIO, getSegmentId(),
serializer.version(), mode);
written = updatedPosition;
lastFsyncPos = updatedPosition;
head.set(new FakeRecord(new WALPointer(getSegmentId(), (int)updatedPosition, 0), false));
}
catch (IOException e) {
throw new StorageException("Unable to write serializer version for segment " + getSegmentId(), e);
}
}
/**
* Writes record serializer version to provided {@code io}. NOTE: Method mutates position of {@code io}.
*
* @param io I/O interface for file.
* @param idx Segment index.
* @param version Serializer version.
* @return I/O position after write version.
* @throws IOException If failed to write serializer version.
*/
private static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException {
ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE);
buf.order(ByteOrder.nativeOrder());
io.writeFully(prepareSerializerVersionBuffer(idx, version, false, buf));
// Flush
if (mode == WALMode.FSYNC)
io.force();
return io.position();
}
/**
* Checks if current head is a close fake record and returns {@code true} if so.
*
* @return {@code true} if current head is close record.
*/
private boolean stopped() {
return stopped(head.get());
}
/**
* @param record Record to check.
* @return {@code true} if the record is fake close record.
*/
private boolean stopped(WALRecord record) {
return record instanceof FakeRecord && ((FakeRecord)record).stop;
}
/** {@inheritDoc} */
@Nullable @Override public WALPointer addRecord(WALRecord rec) throws StorageException {
assert rec.size() > 0 || rec.getClass() == FakeRecord.class;
boolean flushed = false;
for (; ; ) {
WALRecord h = head.get();
long nextPos = nextPosition(h);
if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) {
// Can not write to this segment, need to switch to the next one.
return null;
}
int newChainSize = h.chainSize() + rec.size();
if (newChainSize > tlbSize && !flushed) {
boolean res = h.previous() == null || flush(h, false);
if (rec.size() > tlbSize)
flushed = res;
continue;
}
rec.chainSize(newChainSize);
rec.previous(h);
WALPointer ptr = new WALPointer(
getSegmentId(),
(int)nextPos,
rec.size());
rec.position(ptr);
if (head.compareAndSet(h, rec))
return ptr;
}
}
/** {@inheritDoc} */
@Override public void flushAll() throws IgniteCheckedException {
flush(head.get(), false);
}
/**
* @throws IgniteCheckedException if failed.
*/
public void flushAllOnStop() throws IgniteCheckedException {
flush(head.get(), true);
}
/**
* @param rec Record.
* @return Position for the next record.
*/
private long nextPosition(WALRecord rec) {
return recordOffset(rec) + rec.size();
}
/**
* Gets WAL record offset relative to the WAL segment file beginning.
*
* @param rec WAL record.
* @return File offset.
*/
private static int recordOffset(WALRecord rec) {
WALPointer ptr = rec.position();
assert ptr != null;
return ptr.fileOffset();
}
/**
* Flush or wait for concurrent flush completion.
*
* @param ptr Pointer.
* @throws StorageException If failed.
*/
private void flushOrWait(WALPointer ptr, boolean stop) throws StorageException {
long expWritten;
if (ptr != null) {
// If requested obsolete file index, it must be already flushed by close.
if (ptr.index() != getSegmentId())
return;
expWritten = ptr.fileOffset();
}
else // We read head position before the flush because otherwise we can get wrong position.
expWritten = recordOffset(head.get());
if (flush(ptr, stop))
return;
else if (stop) {
FakeRecord fr = (FakeRecord)head.get();
assert fr.stop : "Invalid fake record on top of the queue: " + fr;
expWritten = recordOffset(fr);
}
// Spin-wait for a while before acquiring the lock.
for (int i = 0; i < 64; i++) {
if (written >= expWritten)
return;
}
// If we did not flush ourselves then await for concurrent flush to complete.
lock.lock();
try {
while (written < expWritten && !cctx.kernalContext().invalid())
U.awaitQuiet(writeComplete);
}
finally {
lock.unlock();
}
}
/**
* @param ptr Pointer.
* @return {@code true} If the flush really happened.
* @throws StorageException If failed.
*/
private boolean flush(WALPointer ptr, boolean stop) throws StorageException {
if (ptr == null) { // Unconditional flush.
for (; ; ) {
WALRecord expHead = head.get();
if (expHead.previous() == null) {
FakeRecord frHead = (FakeRecord)expHead;
if (frHead.stop == stop || frHead.stop ||
head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop)))
return false;
}
if (flush(expHead, stop))
return true;
}
}
assert ptr.index() == getSegmentId();
for (; ; ) {
WALRecord h = head.get();
// If current chain begin position is greater than requested, then someone else flushed our changes.
if (chainBeginPosition(h) > ptr.fileOffset())
return false;
if (flush(h, stop))
return true; // We are lucky.
}
}
/**
* @param h Head of the chain.
* @return Chain begin position.
*/
private long chainBeginPosition(WALRecord h) {
return recordOffset(h) + h.size() - h.chainSize();
}
/**
* @throws StorageException If node is no longer valid and we missed a WAL operation.
*/
private void checkNode() throws StorageException {
if (cctx.kernalContext().invalid())
throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
"previous error)");
}
/**
* @param expHead Expected head of chain. If head was changed, flush is not performed in this thread
* @throws StorageException If failed.
*/
private boolean flush(WALRecord expHead, boolean stop) throws StorageException {
if (expHead.previous() == null) {
FakeRecord frHead = (FakeRecord)expHead;
if (!stop || frHead.stop) // Protects from CASing terminal FakeRecord(true) to FakeRecord(false)
return false;
}
// Fail-fast before CAS.
checkNode();
if (!head.compareAndSet(expHead, new FakeRecord(new WALPointer(getSegmentId(), (int)nextPosition(expHead), 0), stop)))
return false;
if (expHead.chainSize() == 0)
return false;
// At this point we grabbed the piece of WAL chain.
// Any failure in this code must invalidate the environment.
try {
// We can safely allow other threads to start building next chains while we are doing flush here.
ByteBuffer buf;
boolean tmpBuf = false;
if (expHead.chainSize() > tlbSize) {
buf = GridUnsafe.allocateBuffer(expHead.chainSize());
tmpBuf = true; // We need to manually release this temporary direct buffer.
}
else
buf = tlb.get();
try {
long pos = fillBuffer(buf, expHead);
writeBuffer(pos, buf);
}
finally {
if (tmpBuf)
GridUnsafe.freeBuffer(buf);
}
return true;
}
catch (Throwable e) {
StorageException se = e instanceof StorageException ? (StorageException)e :
new StorageException("Unable to write", new IOException(e));
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
// All workers waiting for a next segment must be woken up and stopped
signalNextAvailable();
throw se;
}
}
/**
* Serializes WAL records chain to provided byte buffer.
*
* @param buf Buffer, will be filled with records chain from end to beginning.
* @param head Head of the chain to write to the buffer.
* @return Position in file for this buffer.
* @throws IgniteCheckedException If failed.
*/
private long fillBuffer(ByteBuffer buf, WALRecord head) throws IgniteCheckedException {
final int limit = head.chainSize();
assert limit <= buf.capacity();
buf.rewind();
buf.limit(limit);
do {
buf.position(head.chainSize() - head.size());
buf.limit(head.chainSize()); // Just to make sure that serializer works in bounds.
try {
serializer.writeRecord(head, buf);
}
catch (RuntimeException e) {
throw new IllegalStateException("Failed to write record: " + head, e);
}
assert !buf.hasRemaining() : "Reported record size is greater than actual: " + head;
head = head.previous();
}
while (head.previous() != null);
assert head instanceof FakeRecord : head.getClass();
buf.rewind();
buf.limit(limit);
return recordOffset(head);
}
/**
* Non-blocking check if this pointer needs to be sync'ed.
*
* @param ptr WAL pointer to check.
* @return {@code False} if this pointer has been already sync'ed.
*/
@Override public boolean needFsync(WALPointer ptr) {
// If index has changed, it means that the log was rolled over and already sync'ed.
// If requested position is smaller than last sync'ed, it also means all is good.
// If position is equal, then our record is the last not synced.
return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset();
}
/** {@inheritDoc} */
@Override public WALPointer position() {
lock.lock();
try {
return new WALPointer(getSegmentId(), (int)written, 0);
}
finally {
lock.unlock();
}
}
/** {@inheritDoc} */
@Override public void fsync(WALPointer ptr) throws StorageException, IgniteCheckedException {
fsync(ptr, false);
}
/** {@inheritDoc} */
@Override public void closeBuffer() {
//NOOP.
}
/**
* @param ptr Pointer to sync.
* @throws StorageException If failed.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
protected void fsync(WALPointer ptr, boolean stop) throws StorageException, IgniteInterruptedCheckedException {
lock.lock();
try {
if (ptr != null) {
if (!needFsync(ptr))
return;
if (fsyncDelay > 0 && !stopped()) {
// Delay fsync to collect as many updates as possible: trade latency for throughput.
U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
if (!needFsync(ptr))
return;
}
}
flushOrWait(ptr, stop);
if (stopped())
return;
if (lastFsyncPos != written) {
assert lastFsyncPos < written; // Fsync position must be behind.
boolean metricsEnabled = metrics.metricsEnabled();
long start = metricsEnabled ? System.nanoTime() : 0;
try {
fileIO.force();
}
catch (IOException e) {
throw new StorageException(e);
}
lastFsyncPos = written;
if (fsyncDelay > 0)
fsync.signalAll();
long end = metricsEnabled ? System.nanoTime() : 0;
if (metricsEnabled)
metrics.onFsync(end - start);
}
}
finally {
lock.unlock();
}
}
/**
* @return {@code true} If this thread actually closed the segment.
* @throws StorageException If failed.
*/
@Override public boolean close(boolean rollOver) throws StorageException {
if (stop.compareAndSet(false, true)) {
lock.lock();
try {
flushOrWait(null, true);
assert stopped() : "Segment is not closed after close flush: " + head.get();
try {
try {
RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
.createSerializer(serializerVersion);
SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
if (rollOver && written + switchSegmentRecSize < maxSegmentSize) {
final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize);
segmentRecord.position(new WALPointer(getSegmentId(), (int)written, switchSegmentRecSize));
backwardSerializer.writeRecord(segmentRecord, buf);
buf.rewind();
written += fileIO.writeFully(buf, written);
switchSegmentRecordOffset = (int)written;
}
}
catch (IgniteCheckedException e) {
throw new IOException(e);
}
finally {
assert mode == WALMode.FSYNC;
// Do the final fsync.
fileIO.force();
lastFsyncPos = written;
fileIO.close();
}
}
catch (IOException e) {
throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e);
}
if (log.isDebugEnabled())
log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
return true;
}
finally {
lock.unlock();
}
}
else
return false;
}
/** {@inheritDoc} */
@Override public void signalNextAvailable() {
lock.lock();
try {
WALRecord rec = head.get();
if (!cctx.kernalContext().invalid()) {
assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head "
+ (rec != null ? rec.getClass().getSimpleName() : "null");
assert written == lastFsyncPos || mode != WALMode.FSYNC :
"fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']';
fileIO = null;
}
else {
try {
fileIO.close();
}
catch (IOException e) {
U.error(log, "Failed to close WAL file [idx=" + getSegmentId() + ", fileIO=" + fileIO + "]", e);
}
}
nextSegment.signalAll();
}
finally {
lock.unlock();
}
}
/** {@inheritDoc} */
@Override public void awaitNext() {
lock.lock();
try {
while (fileIO != null && !cctx.kernalContext().invalid())
U.awaitQuiet(nextSegment);
}
finally {
lock.unlock();
}
}
/**
* @param pos Position in file to start write from. May be checked against actual position to wait previous writes
* to complete.
* @param buf Buffer to write to file.
* @throws StorageException If failed.
*/
@SuppressWarnings("TooBroadScope")
private void writeBuffer(long pos, ByteBuffer buf) throws StorageException {
boolean interrupted = false;
lock.lock();
try {
assert fileIO != null : "Writing to a closed segment.";
checkNode();
long lastLogged = U.currentTimeMillis();
long logBackoff = 2_000;
// If we were too fast, need to wait previous writes to complete.
while (written != pos) {
assert written < pos : "written = " + written + ", pos = " + pos; // No one can write further than we are now.
// Permutation occurred between blocks write operations.
// Order of acquiring lock is not the same as order of write.
long now = U.currentTimeMillis();
if (now - lastLogged >= logBackoff) {
if (logBackoff < 60 * 60_000)
logBackoff *= 2;
U.warn(log, "Still waiting for a concurrent write to complete [written=" + written +
", pos=" + pos + ", lastFsyncPos=" + lastFsyncPos + ", stop=" + stop.get() +
", actualPos=" + safePosition() + ']');
lastLogged = now;
}
try {
writeComplete.await(2, TimeUnit.SECONDS);
}
catch (InterruptedException ignore) {
interrupted = true;
}
checkNode();
}
// Do the write.
int size = buf.remaining();
int bufPos = buf.position();
assert size > 0 : size;
try {
assert written == fileIO.position();
fileIO.writeFully(buf);
written += size;
assert written == fileIO.position();
}
catch (IOException e) {
StorageException se = new StorageException("Unable to write", e);
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
throw se;
}
if (cdcProc != null) {
ByteBuffer cdcBuf = buf.duplicate();
cdcBuf.position(bufPos);
cdcBuf.limit(buf.limit());
cdcBuf.order(buf.order());
cdcProc.collect(cdcBuf);
}
}
finally {
writeComplete.signalAll();
lock.unlock();
if (interrupted)
Thread.currentThread().interrupt();
}
}
/**
* @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
*/
public String safePosition() {
FileIO io = this.fileIO;
if (io == null)
return "null";
try {
return String.valueOf(io.position());
}
catch (IOException e) {
return "{Failed to read channel position: " + e.getMessage() + "}";
}
}
/** {@inheritDoc} */
@Override public int getSwitchSegmentRecordOffset() {
return switchSegmentRecordOffset;
}
/**
* Fake record is zero-sized record, which is not stored into file. Fake record is used for storing position in file
* {@link WALRecord#position()}. Fake record is allowed to have no previous record.
*/
static final class FakeRecord extends WALRecord {
/** */
private final boolean stop;
/**
* @param pos Position.
*/
FakeRecord(WALPointer pos, boolean stop) {
position(pos);
this.stop = stop;
}
/** {@inheritDoc} */
@Override public RecordType type() {
return null;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(FakeRecord.class, this, "super", super.toString());
}
}
}