blob: dc281831bb6aff687d5362db4b5a59ab85adfa42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal.filehandle;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.cdc.CdcProcessor;
import org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static java.lang.Long.MAX_VALUE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
import static org.apache.ignite.internal.util.IgniteUtils.sleep;
/**
* Manager for {@link FileWriteHandleImpl}.
*/
public class FileHandleManagerImpl implements FileHandleManager {
/** Default wal segment sync timeout. */
public static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500L;
/** WAL writer worker. */
private final WALWriter walWriter;
/** Wal segment sync worker. */
private final WalSegmentSyncer walSegmentSyncWorker;
/** CDC processor, {@code null} if CDC is disabled. */
private final @Nullable CdcProcessor cdcProc;
/** Context. */
protected final GridCacheSharedContext cctx;
/** Logger. */
private final IgniteLogger log;
/** */
private final WALMode mode;
/** Persistence metrics tracker. */
private final DataStorageMetricsImpl metrics;
/** Use mapped byte buffer. */
private final boolean mmap;
/** */
private final RecordSerializer serializer;
/** Current handle supplier. */
private final Supplier<FileWriteHandle> currentHandleSupplier;
/** WAL buffer size. */
private final int walBufferSize;
/** WAL segment size in bytes. . This is maximum value, actual segments may be shorter. */
private final long maxWalSegmentSize;
/** Fsync delay. */
private final long fsyncDelay;
/**
* @param cctx Context.
* @param metrics Data storage metrics.
* @param mmap Mmap.
* @param serializer Serializer.
* @param currentHandleSupplier Current handle supplier.
* @param mode WAL mode.
* @param walBufferSize WAL buffer size.
* @param maxWalSegmentSize Max WAL segment size.
* @param fsyncDelay Fsync delay.
*/
public FileHandleManagerImpl(
GridCacheSharedContext cctx,
DataStorageMetricsImpl metrics,
boolean mmap,
RecordSerializer serializer,
@Nullable CdcProcessor cdcProc,
Supplier<FileWriteHandle> currentHandleSupplier,
WALMode mode,
int walBufferSize,
long maxWalSegmentSize,
long fsyncDelay
) {
this.cctx = cctx;
log = cctx.logger(FileHandleManagerImpl.class);
this.mode = mode;
this.metrics = metrics;
this.mmap = mmap;
this.serializer = serializer;
this.currentHandleSupplier = currentHandleSupplier;
this.walBufferSize = walBufferSize;
this.maxWalSegmentSize = maxWalSegmentSize;
this.fsyncDelay = fsyncDelay;
walWriter = new WALWriter(log);
this.cdcProc = cdcProc;
if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
walSegmentSyncWorker = new WalSegmentSyncer(
cctx.igniteInstanceName(),
cctx.kernalContext().log(WalSegmentSyncer.class)
);
if (log.isInfoEnabled())
log.info("Initialized write-ahead log manager [mode=" + mode + ']');
}
else {
U.quietAndWarn(log, "Initialized write-ahead log manager in NONE mode, persisted data may be lost in " +
"a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
walSegmentSyncWorker = null;
}
}
/** {@inheritDoc} */
@Override public FileWriteHandle initHandle(
SegmentIO fileIO,
long position,
RecordSerializer serializer
) throws IOException {
SegmentedRingByteBuffer rbuf;
if (mmap) {
MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
rbuf = new SegmentedRingByteBuffer(buf, metrics);
}
else
rbuf = new SegmentedRingByteBuffer(walBufferSize, maxWalSegmentSize, DIRECT, metrics);
rbuf.init(position);
return new FileWriteHandleImpl(
cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, position,
mode, mmap, true, fsyncDelay, maxWalSegmentSize
);
}
/** {@inheritDoc} */
@Override public FileWriteHandle nextHandle(SegmentIO fileIO, RecordSerializer serializer) throws IOException {
SegmentedRingByteBuffer rbuf;
if (mmap) {
MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
rbuf = new SegmentedRingByteBuffer(buf, metrics);
}
else
rbuf = currentHandle().buf.reset();
return new FileWriteHandleImpl(
cctx, fileIO, rbuf, serializer, metrics, walWriter, cdcProc, 0,
mode, mmap, false, fsyncDelay, maxWalSegmentSize
);
}
/**
* @return Current handle.
*/
private FileWriteHandleImpl currentHandle() {
return (FileWriteHandleImpl)currentHandleSupplier.get();
}
/** {@inheritDoc} */
@Override public void onDeactivate() throws IgniteCheckedException {
FileWriteHandleImpl currHnd = currentHandle();
try {
if (mode == WALMode.BACKGROUND) {
if (currHnd != null)
currHnd.flush(null);
}
if (currHnd != null)
currHnd.close(false);
}
finally {
if (walSegmentSyncWorker != null)
walSegmentSyncWorker.shutdown();
walWriter.shutdown();
}
}
/** {@inheritDoc} */
@Override public void resumeLogging() {
if (!mmap)
walWriter.restart();
if (cctx.kernalContext().clientNode())
return;
if (walSegmentSyncWorker != null)
walSegmentSyncWorker.restart();
}
/** {@inheritDoc} */
@Override public WALPointer flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
if (serializer == null || mode == WALMode.NONE)
return null;
FileWriteHandleImpl cur = currentHandle();
// WAL manager was not started (client node).
if (cur == null)
return null;
WALPointer filePtr;
if (ptr == null) {
long pos = cur.buf.tail();
filePtr = new WALPointer(cur.getSegmentId(), (int)pos, 0);
}
else
filePtr = ptr;
if (mode == LOG_ONLY)
cur.flushOrWait(filePtr);
if (!explicitFsync && mode != WALMode.FSYNC)
return filePtr; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
// No need to sync if was rolled over.
if (!cur.needFsync(filePtr))
return filePtr;
cur.fsync(filePtr);
return filePtr;
}
/**
* @throws StorageException If node is no longer valid and we missed a WAL operation.
*/
private void checkNode() throws StorageException {
if (cctx.kernalContext().invalid())
throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
"previous error)");
}
/**
* WAL writer worker.
*/
public class WALWriter extends GridWorker {
/** Unconditional flush. */
private static final long UNCONDITIONAL_FLUSH = -1L;
/** File close. */
private static final long FILE_CLOSE = -2L;
/** File force. */
private static final long FILE_FORCE = -3L;
/** Err. */
private volatile Throwable err;
//TODO: replace with GC free data structure.
/** Parked threads. */
final Map<Thread, Long> waiters = new ConcurrentHashMap<>();
/**
* Default constructor.
*
* @param log Logger.
*/
WALWriter(IgniteLogger log) {
super(cctx.igniteInstanceName(), "wal-write-worker%" + cctx.igniteInstanceName(), log,
cctx.kernalContext().workersRegistry());
}
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
try {
while (!isCancelled()) {
onIdle();
while (waiters.isEmpty()) {
if (!isCancelled()) {
blockingSectionBegin();
try {
LockSupport.park();
}
finally {
blockingSectionEnd();
}
}
else {
unparkWaiters(MAX_VALUE);
return;
}
}
Long pos = null;
for (Long val : waiters.values()) {
if (val > Long.MIN_VALUE)
pos = val;
}
updateHeartbeat();
if (pos == null)
continue;
else if (pos < UNCONDITIONAL_FLUSH) {
try {
assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
if (pos == FILE_CLOSE)
currentHandle().fileIO.close();
else if (pos == FILE_FORCE)
currentHandle().fileIO.force();
}
catch (IOException e) {
log.error("Exception in WAL writer thread: ", e);
err = e;
unparkWaiters(MAX_VALUE);
return;
}
unparkWaiters(pos);
}
updateHeartbeat();
List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos);
if (segs == null) {
unparkWaiters(pos);
continue;
}
for (int i = 0; i < segs.size(); i++) {
SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
updateHeartbeat();
try {
int bufPos = seg.buffer().position();
writeBuffer(seg.position(), seg.buffer());
// TODO: do not duplicate if processor disabled.
if (cdcProc != null) {
ByteBuffer cdcBuf = seg.buffer().duplicate();
cdcBuf.position(bufPos);
cdcBuf.limit(seg.buffer().limit());
cdcBuf.order(seg.buffer().order());
cdcProc.collect(cdcBuf);
}
}
catch (Throwable e) {
log.error("Exception in WAL writer thread:", e);
err = e;
}
finally {
seg.release();
boolean unparkAll = (pos == UNCONDITIONAL_FLUSH || pos == FILE_CLOSE) || err != null;
long p = unparkAll ? MAX_VALUE : currentHandle().written;
unparkWaiters(p);
}
}
}
}
catch (Throwable t) {
err = t;
}
finally {
this.err = err;
unparkWaiters(MAX_VALUE);
if (err == null && !isCancelled.get())
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
/**
* Shutdowns thread.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
U.cancel(this);
Thread runner = runner();
if (runner != null) {
LockSupport.unpark(runner);
U.join(runner);
}
assert walWriter.runner() == null : "WALWriter should be stopped.";
}
/**
* Unparks waiting threads.
*
* @param pos Pos.
*/
private void unparkWaiters(long pos) {
assert pos > Long.MIN_VALUE : pos;
for (Map.Entry<Thread, Long> e : waiters.entrySet()) {
Long val = e.getValue();
if (val <= pos) {
if (val != Long.MIN_VALUE)
waiters.put(e.getKey(), Long.MIN_VALUE);
LockSupport.unpark(e.getKey());
}
}
}
/**
* Forces all made changes to the file.
*/
void force() throws IgniteCheckedException {
flushBuffer(FILE_FORCE);
}
/**
* Closes file.
*/
void close() throws IgniteCheckedException {
flushBuffer(FILE_CLOSE);
}
/**
* Flushes all data from the buffer.
*/
void flushAll() throws IgniteCheckedException {
flushBuffer(UNCONDITIONAL_FLUSH);
}
/**
* @param expPos Expected position.
*/
void flushBuffer(long expPos) throws IgniteCheckedException {
if (mmap)
return;
Throwable err = walWriter.err;
if (err != null)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
if (expPos == UNCONDITIONAL_FLUSH)
expPos = (currentHandle().buf.tail());
Thread t = Thread.currentThread();
waiters.put(t, expPos);
LockSupport.unpark(walWriter.runner());
while (true) {
Long val = waiters.get(t);
assert val != null : "Only this thread can remove thread from waiters";
if (val == Long.MIN_VALUE) {
waiters.remove(t);
Throwable walWriterError = walWriter.err;
if (walWriterError != null)
throw new IgniteCheckedException("Flush buffer failed.", walWriterError);
return;
}
else
LockSupport.park();
}
}
/**
* @param pos Position in file to start write from. May be checked against actual position to wait previous
* writes to complete.
* @param buf Buffer to write to file.
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
*/
private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException {
FileWriteHandleImpl hdl = currentHandle();
assert hdl.fileIO != null : "Writing to a closed segment.";
checkNode();
long lastLogged = U.currentTimeMillis();
long logBackoff = 2_000;
// If we were too fast, need to wait previous writes to complete.
while (hdl.written != pos) {
assert hdl.written < pos : "written = " + hdl.written + ", pos = " + pos; // No one can write further than we are now.
// Permutation occurred between blocks write operations.
// Order of acquiring lock is not the same as order of write.
long now = U.currentTimeMillis();
if (now - lastLogged >= logBackoff) {
if (logBackoff < 60 * 60_000)
logBackoff *= 2;
U.warn(log, "Still waiting for a concurrent write to complete [written=" + hdl.written +
", pos=" + pos + ", lastFsyncPos=" + hdl.lastFsyncPos + ", stop=" + hdl.stop.get() +
", actualPos=" + hdl.safePosition() + ']');
lastLogged = now;
}
checkNode();
}
// Do the write.
int size = buf.remaining();
assert size > 0 : size;
try {
assert hdl.written == hdl.fileIO.position();
hdl.written += hdl.fileIO.writeFully(buf);
assert hdl.written == hdl.fileIO.position();
}
catch (IOException e) {
err = e;
StorageException se = new StorageException("Failed to write buffer.", e);
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
throw se;
}
}
/**
* Restart worker in IgniteThread.
*/
public void restart() {
assert runner() == null : "WALWriter is still running.";
isCancelled.set(false);
new IgniteThread(this).start();
}
}
/**
* Syncs WAL segment file.
*/
private class WalSegmentSyncer extends GridWorker {
/** Sync timeout. */
private final long syncTimeout;
/**
* @param igniteInstanceName Ignite instance name.
* @param log Logger.
*/
private WalSegmentSyncer(String igniteInstanceName, IgniteLogger log) {
super(igniteInstanceName, "wal-segment-syncer", log);
syncTimeout = Math.max(IgniteSystemProperties.getLong(IGNITE_WAL_SEGMENT_SYNC_TIMEOUT,
DFLT_WAL_SEGMENT_SYNC_TIMEOUT), 100L);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
sleep(syncTimeout);
try {
flush(null, true);
}
catch (IgniteCheckedException e) {
U.error(log, "Exception when flushing WAL.", e);
}
}
}
/**
* Shutted down the worker.
*
* @throws IgniteInterruptedCheckedException If the worker was interrupted while waiting for shutting down.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
synchronized (this) {
U.cancel(this);
}
U.join(runner());
}
/**
* Restart worker in IgniteThread.
*/
public void restart() {
assert runner() == null : "WalSegmentSyncer is running.";
isCancelled.set(false);
new IgniteThread(walSegmentSyncWorker).start();
}
}
}