blob: 44d7c22718b1d0ba4111c6b81a2af8d724331091 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
import java.io.FileDescriptor;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager.prepareSerializerVersionBuffer;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory.LATEST_SERIALIZER_VERSION;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
import static org.apache.ignite.internal.util.IgniteUtils.findField;
import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod;
import static org.apache.ignite.internal.util.IgniteUtils.jdkVersion;
import static org.apache.ignite.internal.util.IgniteUtils.majorJavaVersion;
/**
* File handle for one log segment.
*/
@SuppressWarnings("SignalWithoutCorrespondingAwait")
class FileWriteHandleImpl extends AbstractFileHandle implements FileWriteHandle {
/** Memory mapped buffer fsync operation runner. */
private static final MMapFSyncer FSYNCER = pickFsyncer();
/** {@link FileWriteHandleImpl#written} atomic field updater. */
private static final AtomicLongFieldUpdater<FileWriteHandleImpl> WRITTEN_UPD =
AtomicLongFieldUpdater.newUpdater(FileWriteHandleImpl.class, "written");
/** Page size. */
private static final int PAGE_SIZE = GridUnsafe.pageSize();
/** Serializer latest version to use. */
private final int serializerVer =
IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
/** Use mapped byte buffer. */
private final boolean mmap;
/** Created on resume logging. */
private volatile boolean resume;
/**
* Position in current file after the end of last written record (incremented after file channel write operation)
*/
volatile long written;
/** */
protected volatile long lastFsyncPos;
/** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}. */
protected final AtomicBoolean stop = new AtomicBoolean(false);
/** */
private final Lock lock = new ReentrantLock();
/** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()}. */
private final Condition fsync = lock.newCondition();
/**
* Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link
* #fileIO}=<code>null</code>.
*/
private final Condition nextSegment = lock.newCondition();
/** Buffer. */
protected final SegmentedRingByteBuffer buf;
/** Cdc Buffer, {@code null} if CDC is disabled. */
private final @Nullable CdcProcessor cdcProc;
/** */
private final WALMode mode;
/** Fsync delay. */
private final long fsyncDelay;
/** Persistence metrics tracker. */
private final DataStorageMetricsImpl metrics;
/** WAL segment size in bytes. This is maximum value, actual segments may be shorter. */
private final long maxWalSegmentSize;
/** Logger. */
protected final IgniteLogger log;
/** */
private final RecordSerializer serializer;
/** Context. */
protected final GridCacheSharedContext cctx;
/** WAL writer worker. */
private final FileHandleManagerImpl.WALWriter walWriter;
/** Switch segment record offset. */
private int switchSegmentRecordOffset;
/**
* @param cctx Context.
* @param fileIO I/O file interface to use
* @param serializer Serializer.
* @param metrics Data storage metrics.
* @param writer WAL writer.
* @param pos Initial position.
* @param mode WAL mode.
* @param mmap Mmap.
* @param resume Created on resume logging flag.
* @param fsyncDelay Fsync delay.
* @param maxWalSegmentSize Max WAL segment size.
* @throws IOException If failed.
*/
FileWriteHandleImpl(
GridCacheSharedContext cctx, SegmentIO fileIO, SegmentedRingByteBuffer rbuf, RecordSerializer serializer,
DataStorageMetricsImpl metrics, FileHandleManagerImpl.WALWriter writer, CdcProcessor cdcProc,
long pos, WALMode mode, boolean mmap, boolean resume, long fsyncDelay, long maxWalSegmentSize) throws IOException {
super(fileIO);
assert serializer != null;
this.mmap = mmap;
this.mode = mode;
this.fsyncDelay = fsyncDelay;
this.metrics = metrics;
this.maxWalSegmentSize = maxWalSegmentSize;
this.log = cctx.logger(FileWriteHandleImpl.class);
this.cctx = cctx;
this.walWriter = writer;
this.cdcProc = cdcProc;
this.serializer = serializer;
this.written = pos;
this.lastFsyncPos = pos;
this.resume = resume;
this.buf = rbuf;
if (!mmap)
fileIO.position(pos);
}
/** {@inheritDoc} */
@Override public int serializerVersion() {
return serializer.version();
}
/** {@inheritDoc} */
@Override public void finishResumeLogging() {
resume = false;
}
/**
* @throws StorageException If node is no longer valid and we missed a WAL operation.
*/
private void checkNode() throws StorageException {
if (cctx.kernalContext().invalid())
throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
"previous error)");
}
/**
* Write serializer version to current handle.
*/
@Override public void writeHeader() {
SegmentedRingByteBuffer.WriteSegment seg = buf.offer(HEADER_RECORD_SIZE);
assert seg != null && seg.position() > 0;
prepareSerializerVersionBuffer(getSegmentId(), serializerVer, false, seg.buffer());
seg.release();
}
/**
* @param rec Record to be added to write queue.
* @return Pointer or null if roll over to next segment is required or already started by other thread.
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
*/
@Override @Nullable public WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
assert rec.size() > 0 : rec;
for (; ; ) {
checkNode();
SegmentedRingByteBuffer.WriteSegment seg;
try {
// Buffer can be in open state in case of resuming with different serializer version.
if (rec.type() == SWITCH_SEGMENT_RECORD && !resume)
seg = buf.offerSafe(rec.size());
else
seg = buf.offer(rec.size());
}
catch (IgniteException e) {
// WAL record size is greater than the buffer's capacity.
throw new IgniteCheckedException(e);
}
WALPointer ptr = null;
if (seg != null) {
try {
int pos = (int)(seg.position() - rec.size());
ByteBuffer buf = seg.buffer();
if (buf == null)
return null; // Can not write to this segment, need to switch to the next one.
ptr = new WALPointer(getSegmentId(), pos, rec.size());
rec.position(ptr);
fillBuffer(buf, rec);
if (mmap) {
// written field must grow only, but segment with greater position can be serialized
// earlier than segment with smaller position.
while (true) {
long written0 = written;
if (seg.position() > written0) {
if (WRITTEN_UPD.compareAndSet(this, written0, seg.position()))
break;
}
else
break;
}
}
return ptr;
}
finally {
seg.release();
if (mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord)
flushOrWait(ptr);
}
}
else
walWriter.flushAll();
}
}
/**
* Flush or wait for concurrent flush completion.
*
* @param ptr Pointer.
*/
public void flushOrWait(WALPointer ptr) throws IgniteCheckedException {
if (ptr != null) {
// If requested obsolete file index, it must be already flushed by close.
if (ptr.index() != getSegmentId())
return;
}
flush(ptr);
}
/** {@inheritDoc} */
@Override public void flushAll() throws IgniteCheckedException {
flush(null);
}
/**
* @param ptr Pointer.
*/
public void flush(WALPointer ptr) throws IgniteCheckedException {
if (ptr == null) { // Unconditional flush.
walWriter.flushAll();
return;
}
assert ptr.index() == getSegmentId() : "Pointer segment idx is not equals to current write segment idx. " +
"ptr=" + ptr + " segmetntId=" + getSegmentId();
walWriter.flushBuffer(ptr.fileOffset() + ptr.length());
}
/**
* @param buf Buffer.
* @param rec WAL record.
* @throws IgniteCheckedException If failed.
*/
private void fillBuffer(ByteBuffer buf, WALRecord rec) throws IgniteCheckedException {
try {
serializer.writeRecord(rec, buf);
}
catch (RuntimeException e) {
throw new IllegalStateException("Failed to write record: " + rec, e);
}
}
/**
* Non-blocking check if this pointer needs to be sync'ed.
*
* @param ptr WAL pointer to check.
* @return {@code False} if this pointer has been already sync'ed.
*/
@Override public boolean needFsync(WALPointer ptr) {
// If index has changed, it means that the log was rolled over and already sync'ed.
// If requested position is smaller than last sync'ed, it also means all is good.
// If position is equal, then our record is the last not synced.
return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset();
}
/**
* @return Pointer to the end of the last written record (probably not fsync-ed).
*/
@Override public WALPointer position() {
lock.lock();
try {
return new WALPointer(getSegmentId(), (int)written, 0);
}
finally {
lock.unlock();
}
}
/**
* @param ptr Pointer to sync.
* @throws StorageException If failed.
*/
@Override public void fsync(WALPointer ptr) throws StorageException, IgniteCheckedException {
lock.lock();
try {
if (ptr != null) {
if (!needFsync(ptr))
return;
if (fsyncDelay > 0 && !stop.get()) {
// Delay fsync to collect as many updates as possible: trade latency for throughput.
U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
if (!needFsync(ptr))
return;
}
}
flushOrWait(ptr);
if (stop.get())
return;
long lastFsyncPos0 = lastFsyncPos;
long written0 = written;
if (lastFsyncPos0 != written0) {
// Fsync position must be behind.
assert lastFsyncPos0 < written0 : "lastFsyncPos=" + lastFsyncPos0 + ", written=" + written0;
boolean metricsEnabled = metrics.metricsEnabled();
long start = metricsEnabled ? System.nanoTime() : 0;
if (mmap) {
long pos = ptr == null ? -1 : ptr.fileOffset();
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(pos);
if (segs != null) {
assert segs.size() == 1;
SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);
int off = seg.buffer().position();
int len = seg.buffer().limit() - off;
fsync((MappedByteBuffer)buf.buf, off, len);
if (cdcProc != null) {
ByteBuffer cdcBuf = buf.buf.duplicate();
cdcBuf.position(off);
cdcBuf.limit(off + len);
cdcBuf.order(buf.buf.order());
cdcProc.collect(cdcBuf);
}
seg.release();
}
}
else
walWriter.force();
lastFsyncPos = written;
if (fsyncDelay > 0)
fsync.signalAll();
long end = metricsEnabled ? System.nanoTime() : 0;
if (metricsEnabled)
metrics.onFsync(end - start);
}
}
finally {
lock.unlock();
}
}
/**
* @param buf Mapped byte buffer.
* @param off Offset.
* @param len Length.
*/
private static void fsync(MappedByteBuffer buf, int off, int len) throws IgniteCheckedException {
FSYNCER.fsync(buf, off, len);
}
/** {@inheritDoc} */
@Override public void closeBuffer() {
buf.close();
}
/**
* @return {@code true} If this thread actually closed the segment.
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
@Override public boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
if (stop.compareAndSet(false, true)) {
lock.lock();
try {
flushOrWait(null);
RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
.createSerializer(serializerVer);
SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
if (rollOver && written + switchSegmentRecSize < maxWalSegmentSize) {
segmentRecord.size(switchSegmentRecSize);
WALPointer segRecPtr = addRecord(segmentRecord);
if (segRecPtr != null) {
fsync(segRecPtr);
switchSegmentRecordOffset = segRecPtr.fileOffset() + switchSegmentRecSize;
}
else {
if (log.isDebugEnabled())
log.debug("Not enough space in wal segment to write segment switch");
}
}
else {
if (log.isDebugEnabled()) {
log.debug("Not enough space in wal segment to write segment switch, written="
+ written + ", switchSegmentRecSize=" + switchSegmentRecSize);
}
}
// Unconditional flush (tail of the buffer)
flushOrWait(null);
if (mmap) {
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
if (segs != null) {
assert segs.size() == 1;
segs.get(0).release();
}
}
// Do the final fsync.
if (mode != WALMode.NONE) {
if (mmap)
((MappedByteBuffer)buf.buf).force();
else
walWriter.force();
lastFsyncPos = written;
}
if (mmap)
U.closeQuiet(fileIO);
else
walWriter.close();
if (!mmap && !rollOver)
buf.free();
if (log.isDebugEnabled())
log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
return true;
}
finally {
if (mmap)
buf.free();
lock.unlock();
}
}
else
return false;
}
/**
* Signals next segment available to wake up other worker threads waiting for WAL to write.
*/
@Override public void signalNextAvailable() {
lock.lock();
try {
assert cctx.kernalContext().invalid() ||
written == lastFsyncPos || mode != WALMode.FSYNC :
"fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']';
fileIO = null;
nextSegment.signalAll();
}
finally {
lock.unlock();
}
}
/** {@inheritDoc} */
@Override public void awaitNext() {
lock.lock();
try {
while (fileIO != null)
U.awaitQuiet(nextSegment);
}
finally {
lock.unlock();
}
}
/**
* @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
*/
public String safePosition() {
FileIO io = fileIO;
if (io == null)
return "null";
try {
return String.valueOf(io.position());
}
catch (IOException e) {
return "{Failed to read channel position: " + e.getMessage() + '}';
}
}
/** {@inheritDoc} */
@Override public int getSwitchSegmentRecordOffset() {
return switchSegmentRecordOffset;
}
/**
* Interface for performing {@link MappedByteBuffer} fsync.
*/
private interface MMapFSyncer {
/** {@link MappedByteBuffer#fd} */
static final Field fd = findField(MappedByteBuffer.class, "fd");
/**
* Performs fsync.
*
* @param buf Mmapped byte buffer.
* @param index Index of the syncing segment in the buffer.
* @param len Length of the syncing segment part.
* @throws IgniteCheckedException If failed.
*/
void fsync(MappedByteBuffer buf, int index, int len) throws IgniteCheckedException;
}
/**
* @return FSyncer suitable for the current JRE.
*/
private static MMapFSyncer pickFsyncer() {
int javaVersion = majorJavaVersion(jdkVersion());
if (javaVersion >= 15)
return new JDK15FSyncer();
return new LegacyFSyncer();
}
/**
* Runs fsync operation on JRE15 and higher using MappedMemoryUtils which provides aligned fsync.
*/
private static class JDK15FSyncer implements MMapFSyncer {
/** {@link MappedByteBuffer#address}. */
private static final Field address = findField(MappedByteBuffer.class, "address");
/**
* A flag true if this buffer is mapped against non-volatile
* memory using one of the extended FileChannel.MapMode modes.
*/
private static final Field isSync = findField(MappedByteBuffer.class, "isSync");
/**
* Mapped memory utils class. For compatibility reasons it might only be accessed via
* {@link Class#forName(String)}.
*/
private final Class<?> mappedMemoryUtils;
/**
* MappedMemoryUtils#force method.
*/
private final Method force;
/** Constructor. */
public JDK15FSyncer() {
try {
mappedMemoryUtils = Class.forName("java.nio.MappedMemoryUtils");
force = findNonPublicMethod(
mappedMemoryUtils,
"force",
FileDescriptor.class,
long.class, // Address
boolean.class, // Is sync?
long.class, // Index
long.class // Length
);
}
catch (ClassNotFoundException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void fsync(MappedByteBuffer buf, int index, int len) throws IgniteCheckedException {
try {
boolean isSync = (boolean)JDK15FSyncer.isSync.get(buf);
long address = (long)JDK15FSyncer.address.get(buf);
assert address % PAGE_SIZE == 0 : "Buffer's address is not aligned: " + address;
// Don't need to align manually as MappedMemoryUtils does the alignment
force.invoke(mappedMemoryUtils, fd.get(buf), address, isSync, index, len);
}
catch (IllegalAccessException | InvocationTargetException e) {
throw new IgniteCheckedException(e);
}
}
}
/**
* Runs fsync on pre-java15 JVMs that don't offer a possibility to fsync mapped byte buffers in an aligned way.
*/
private static class LegacyFSyncer implements MMapFSyncer {
/** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */
private static final Method force0 = findNonPublicMethod(
MappedByteBuffer.class, "force0",
java.io.FileDescriptor.class, long.class, long.class
);
/** {@link MappedByteBuffer#mappingOffset()}. */
private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset");
/** {@link MappedByteBuffer#mappingAddress(long)}. */
private static final Method mappingAddress = findNonPublicMethod(
MappedByteBuffer.class, "mappingAddress", long.class
);
/** {@inheritDoc} */
@Override public void fsync(MappedByteBuffer buf, int index, int len) throws IgniteCheckedException {
try {
long mappedOff = (Long)mappingOffset.invoke(buf);
assert mappedOff == 0 : mappedOff;
long addr = (Long)mappingAddress.invoke(buf, mappedOff);
long alignmentDelta = (addr + index) % PAGE_SIZE;
// Given an alignment delta calculate the largest page aligned address
// of the mapping less than or equal to the address of the buffer
// element identified by the index.
long alignedAddr = (addr + index) - alignmentDelta;
force0.invoke(buf, fd.get(buf), alignedAddr, len + alignmentDelta);
}
catch (IllegalAccessException | InvocationTargetException e) {
throw new IgniteCheckedException(e);
}
}
}
}