blob: e958db74cbd9733728aa744347d6f2e8970fb8c6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.wal;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer;
import java.nio.channels.ClosedByInterruptException;
import java.nio.file.DirectoryStream;
import java.nio.file.FileAlreadyExistsException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Time;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.WALMode;
import org.apache.ignite.events.WalSegmentArchivedEvent;
import org.apache.ignite.events.WalSegmentCompactedEvent;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord;
import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter;
import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.StorageException;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.file.RandomAccessFileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.LockedSegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord;
import org.apache.ignite.internal.processors.cache.persistence.wal.aware.SegmentAware;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CIX1;
import org.apache.ignite.internal.util.typedef.CO;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
import static java.nio.file.StandardOpenOption.WRITE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_MMAP;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SEGMENT_SYNC_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION;
import static org.apache.ignite.configuration.WALMode.LOG_ONLY;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_ARCHIVED;
import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.SWITCH_SEGMENT_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.wal.SegmentedRingByteBuffer.BufferMode.DIRECT;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.HEADER_RECORD_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader;
import static org.apache.ignite.internal.util.IgniteUtils.findField;
import static org.apache.ignite.internal.util.IgniteUtils.findNonPublicMethod;
import static org.apache.ignite.internal.util.IgniteUtils.sleep;
/**
* File WAL manager.
*/
@SuppressWarnings("IfMayBeConditional")
public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager {
/** Dfault wal segment sync timeout. */
public static final long DFLT_WAL_SEGMENT_SYNC_TIMEOUT = 500L;
/** {@link MappedByteBuffer#force0(java.io.FileDescriptor, long, long)}. */
private static final Method force0 = findNonPublicMethod(
MappedByteBuffer.class, "force0",
java.io.FileDescriptor.class, long.class, long.class
);
/** {@link MappedByteBuffer#mappingOffset()}. */
private static final Method mappingOffset = findNonPublicMethod(MappedByteBuffer.class, "mappingOffset");
/** {@link MappedByteBuffer#mappingAddress(long)}. */
private static final Method mappingAddress = findNonPublicMethod(
MappedByteBuffer.class, "mappingAddress", long.class
);
/** {@link MappedByteBuffer#fd} */
private static final Field fd = findField(MappedByteBuffer.class, "fd");
/** Page size. */
private static final int PAGE_SIZE = GridUnsafe.pageSize();
/** */
private static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0];
/** */
private static final byte[] FILL_BUF = new byte[1024 * 1024];
/** Pattern for segment file names */
public static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal");
/** */
public static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp");
/** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches();
}
};
/** */
private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches();
}
};
/** */
public static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip");
/** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */
public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() ||
WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches());
}
};
/** */
private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp");
/** */
private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
}
};
/** */
private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() {
@Override public boolean accept(File file) {
return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches();
}
};
/** Latest serializer version to use. */
private static final int LATEST_SERIALIZER_VERSION = 2;
/** Buffer size. */
private static final int BUF_SIZE = 1024 * 1024;
/** Use mapped byte buffer. */
private final boolean mmap = IgniteSystemProperties.getBoolean(IGNITE_WAL_MMAP, true);
/** {@link FileWriteHandle#written} atomic field updater. */
private static final AtomicLongFieldUpdater<FileWriteHandle> WRITTEN_UPD =
AtomicLongFieldUpdater.newUpdater(FileWriteHandle.class, "written");
/**
* Percentage of archive size for checkpoint trigger. Need for calculate max size of WAL after last checkpoint.
* Checkpoint should be triggered when max size of WAL after last checkpoint more than maxWallArchiveSize * thisValue
*/
private static final double CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE =
IgniteSystemProperties.getDouble(IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, 0.25);
/**
* Percentage of WAL archive size to calculate threshold since which removing of old archive should be started.
*/
private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE =
IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 0.5);
/** */
private final boolean alwaysWriteFullPages;
/** WAL segment size in bytes. . This is maximum value, actual segments may be shorter. */
private final long maxWalSegmentSize;
/**
* Maximum number of allowed segments without checkpoint. If we have their more checkpoint should be triggered.
* It is simple way to calculate WAL size without checkpoint instead fair WAL size calculating.
*/
private long maxSegCountWithoutCheckpoint;
/** Size of wal archive since which removing of old archive should be started */
private final long allowedThresholdWalArchiveSize;
/** */
private final WALMode mode;
/** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */
private final long flushFreq;
/** Fsync delay. */
private final long fsyncDelay;
/** */
private final DataStorageConfiguration dsCfg;
/** Events service */
private final GridEventStorageManager evt;
/** Failure processor */
private final FailureProcessor failureProcessor;
/** */
private IgniteConfiguration igCfg;
/** Persistence metrics tracker. */
private DataStorageMetricsImpl metrics;
/** */
private File walWorkDir;
/** WAL archive directory (including consistent ID as subfolder) */
private File walArchiveDir;
/** Serializer of latest version, used to read header record and for write records */
private RecordSerializer serializer;
/** Serializer latest version to use. */
private final int serializerVer =
IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION);
/** Factory to provide I/O interfaces for read/write operations with files */
private volatile FileIOFactory ioFactory;
/** Factory to provide I/O interfaces for read primitives with files */
private final SegmentFileInputFactory segmentFileInputFactory;
/** Holder of actual information of latest manipulation on WAL segments. */
private volatile SegmentAware segmentAware;
/** Updater for {@link #currHnd}, used for verify there are no concurrent update for current log segment handle */
private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> CURR_HND_UPD =
AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currHnd");
/**
* File archiver moves segments from work directory to archive. Locked segments may be kept not moved until release.
* For mode archive and work folders set to equal value, archiver is not created.
*/
@Nullable private volatile FileArchiver archiver;
/** Compressor. */
private volatile FileCompressor compressor;
/** Decompressor. */
private volatile FileDecompressor decompressor;
/** */
private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>();
/** Current log segment handle */
private volatile FileWriteHandle currHnd;
/** */
private volatile WALDisableContext walDisableContext;
/**
* Positive (non-0) value indicates WAL can be archived even if not complete<br>
* See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br>
*/
private final long walAutoArchiveAfterInactivity;
/**
* Container with last WAL record logged timestamp.<br> Zero value means there was no records logged to current
* segment, skip possible archiving for this case<br> Value is filled only for case {@link
* #walAutoArchiveAfterInactivity} > 0<br>
*/
private AtomicLong lastRecordLoggedMs = new AtomicLong();
/**
* Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown.
* Null for non background modes.
*/
@Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule;
/**
* Reference to the last added next archive timeout check object. Null if mode is not enabled. Should be cancelled
* at shutdown
*/
@Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj;
/** WAL writer worker. */
private WALWriter walWriter;
/**
* Listener invoked for each segment file IO initializer.
*/
@Nullable private volatile IgniteInClosure<FileIO> createWalFileListener;
/** Wal segment sync worker. */
private WalSegmentSyncer walSegmentSyncWorker;
/**
* Manage of segment location.
*/
private SegmentRouter segmentRouter;
/** Segment factory with ability locked segment during reading. */
private SegmentFileInputFactory lockedSegmentFileInputFactory;
/**
* @param ctx Kernal context.
*/
public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) {
igCfg = ctx.config();
DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration();
assert dsCfg != null;
this.dsCfg = dsCfg;
maxWalSegmentSize = dsCfg.getWalSegmentSize();
mode = dsCfg.getWalMode();
flushFreq = dsCfg.getWalFlushFrequency();
fsyncDelay = dsCfg.getWalFsyncDelayNanos();
alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages();
ioFactory = new RandomAccessFileIOFactory();
segmentFileInputFactory = new SimpleSegmentFileInputFactory();
walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity();
allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE);
evt = ctx.event();
failureProcessor = ctx.failure();
}
/**
* For test purposes only.
*
* @param ioFactory IO factory.
*/
public void setFileIOFactory(FileIOFactory ioFactory) {
this.ioFactory = ioFactory;
}
/** {@inheritDoc} */
@Override public void start0() throws IgniteCheckedException {
if (!cctx.kernalContext().clientNode()) {
maxSegCountWithoutCheckpoint =
(long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE)
/ dsCfg.getWalSegmentSize());
final PdsFolderSettings resolveFolders = cctx.kernalContext().pdsFolderResolver().resolveFolders();
checkWalConfiguration();
final File walWorkDir0 = walWorkDir = initDirectory(
dsCfg.getWalPath(),
DataStorageConfiguration.DFLT_WAL_PATH,
resolveFolders.folderName(),
"write ahead log work directory"
);
final File walArchiveDir0 = walArchiveDir = initDirectory(
dsCfg.getWalArchivePath(),
DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH,
resolveFolders.folderName(),
"write ahead log archive directory"
);
serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVer);
GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database();
metrics = dbMgr.persistentStoreMetricsImpl();
checkOrPrepareFiles();
if (metrics != null)
metrics.setWalSizeProvider(new CO<Long>() {
@Override public Long apply() {
long size = 0;
for (File f : walWorkDir0.listFiles())
size += f.length();
for (File f : walArchiveDir0.listFiles())
size += f.length();
return size;
}
});
IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices();
segmentAware = new SegmentAware(dsCfg.getWalSegments());
segmentAware.lastTruncatedArchiveIdx(tup == null ? -1 : tup.get1() - 1);
long lastAbsArchivedIdx = tup == null ? -1 : tup.get2();
if (isArchiverEnabled())
archiver = new FileArchiver(lastAbsArchivedIdx, log);
else
archiver = null;
if (lastAbsArchivedIdx > 0)
segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
if (dsCfg.isWalCompactionEnabled()) {
compressor = new FileCompressor();
if (decompressor == null) { // Preventing of two file-decompressor thread instantiations.
decompressor = new FileDecompressor(log);
new IgniteThread(decompressor).start();
}
}
segmentRouter = new SegmentRouter(walWorkDir, walArchiveDir, segmentAware, dsCfg);
walDisableContext = cctx.walState().walDisableContext();
if (mode != WALMode.NONE && mode != WALMode.FSYNC) {
walSegmentSyncWorker = new WalSegmentSyncer(igCfg.getIgniteInstanceName(),
cctx.kernalContext().log(WalSegmentSyncer.class));
if (log.isInfoEnabled())
log.info("Started write-ahead log manager [mode=" + mode + ']');
}
else
U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " +
"a case of unexpected node failure. Make sure to deactivate the cluster before shutdown.");
lockedSegmentFileInputFactory = new LockedSegmentFileInputFactory(
segmentAware,
segmentRouter,
ioFactory
);
}
}
/**
* Archiver can be not created, all files will be written to WAL folder, using absolute segment index.
*
* @return flag indicating if archiver is disabled.
*/
private boolean isArchiverEnabled() {
if (walArchiveDir != null && walWorkDir != null)
return !walArchiveDir.equals(walWorkDir);
return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath()));
}
/**
* Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer.
*
* @param low Low bound.
* @param high High bound.
*/
public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException {
final long awaitIdx = high.index() - 1;
segmentAware.awaitSegmentArchived(awaitIdx);
if (!reserve(low))
throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]");
List<File> res = new ArrayList<>();
for (long i = low.index(); i < high.index(); i++) {
String segmentName = FileDescriptor.fileName(i);
File file = new File(walArchiveDir, segmentName);
File fileZip = new File(walArchiveDir, segmentName + FilePageStoreManager.ZIP_SUFFIX);
if (file.exists())
res.add(file);
else if (fileZip.exists())
res.add(fileZip);
else {
if (log.isInfoEnabled()) {
log.info("Segment not found: " + file.getName() + "/" + fileZip.getName());
log.info("Stopped iteration on idx: " + i);
}
break;
}
}
return res;
}
/**
* @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa)
*/
private void checkWalConfiguration() throws IgniteCheckedException {
if (dsCfg.getWalPath() == null ^ dsCfg.getWalArchivePath() == null) {
throw new IgniteCheckedException(
"Properties should be either both specified or both null " +
"[walStorePath = " + dsCfg.getWalPath() +
", walArchivePath = " + dsCfg.getWalArchivePath() + "]"
);
}
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule;
if (schedule != null)
schedule.close();
final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj;
if (timeoutObj != null)
cctx.time().removeTimeoutObject(timeoutObj);
final FileWriteHandle currHnd = currentHandle();
try {
if (mode == WALMode.BACKGROUND) {
if (currHnd != null)
currHnd.flush(null);
}
if (currHnd != null)
currHnd.close(false);
if (walSegmentSyncWorker != null)
walSegmentSyncWorker.shutdown();
if (walWriter != null)
walWriter.shutdown();
segmentAware.interrupt();
if (archiver != null)
archiver.shutdown();
if (compressor != null)
compressor.shutdown();
if (decompressor != null)
decompressor.shutdown();
}
catch (Exception e) {
U.error(log, "Failed to gracefully close WAL segment: " + this.currHnd.fileIO, e);
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
start0();
if (!cctx.kernalContext().clientNode()) {
if (isArchiverEnabled()) {
assert archiver != null;
new IgniteThread(archiver).start();
}
if (walSegmentSyncWorker != null)
new IgniteThread(walSegmentSyncWorker).start();
if (compressor != null)
compressor.start();
}
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate file write ahead log [nodeId=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
stop0(true);
currHnd = null;
}
/** {@inheritDoc} */
@Override public boolean isAlwaysWriteFullPages() {
return alwaysWriteFullPages;
}
/** {@inheritDoc} */
@Override public boolean isFullSync() {
return mode == WALMode.FSYNC;
}
/** {@inheritDoc} */
@Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException {
assert currHnd == null;
assert lastPtr == null || lastPtr instanceof FileWALPointer;
FileWALPointer filePtr = (FileWALPointer)lastPtr;
walWriter = new WALWriter(log);
if (!mmap)
new IgniteThread(walWriter).start();
currHnd = restoreWriteHandle(filePtr);
// For new handle write serializer version to it.
if (filePtr == null)
currHnd.writeHeader();
if (currHnd.serializer.version() != serializer.version()) {
if (log.isInfoEnabled())
log.info("Record serializer version change detected, will start logging with a new WAL record " +
"serializer to a new WAL segment [curFile=" + currHnd + ", newVer=" + serializer.version() +
", oldVer=" + currHnd.serializer.version() + ']');
rollOver(currHnd);
}
currHnd.resume = false;
if (mode == WALMode.BACKGROUND) {
backgroundFlushSchedule = cctx.time().schedule(new Runnable() {
@Override public void run() {
doFlush();
}
}, flushFreq, flushFreq);
}
if (walAutoArchiveAfterInactivity > 0)
scheduleNextInactivityPeriodElapsedCheck();
}
/**
* Schedules next check of inactivity period expired. Based on current record update timestamp. At timeout method
* does check of inactivity period and schedules new launch.
*/
private void scheduleNextInactivityPeriodElapsedCheck() {
final long lastRecMs = lastRecordLoggedMs.get();
final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity;
if (log.isDebugEnabled())
log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString());
nextAutoArchiveTimeoutObj = new GridTimeoutObject() {
private final IgniteUuid id = IgniteUuid.randomUuid();
@Override public IgniteUuid timeoutId() {
return id;
}
@Override public long endTime() {
return nextPossibleAutoArchive;
}
@Override public void onTimeout() {
if (log.isDebugEnabled())
log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")");
checkWalRolloverRequiredDuringInactivityPeriod();
scheduleNextInactivityPeriodElapsedCheck();
}
};
cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj);
}
/** {@inheritDoc} */
@Override public int serializerVersion() {
return serializerVer;
}
/**
* Checks if there was elapsed significant period of inactivity. If WAL auto-archive is enabled using
* {@link #walAutoArchiveAfterInactivity} > 0 this method will activate roll over by timeout.<br>
*/
private void checkWalRolloverRequiredDuringInactivityPeriod() {
if (walAutoArchiveAfterInactivity <= 0)
return; // feature not configured, nothing to do
final long lastRecMs = lastRecordLoggedMs.get();
if (lastRecMs == 0)
return; //no records were logged to current segment, does not consider inactivity
final long elapsedMs = U.currentTimeMillis() - lastRecMs;
if (elapsedMs <= walAutoArchiveAfterInactivity)
return; // not enough time elapsed since last write
if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0))
return; // record write occurred concurrently
final FileWriteHandle handle = currentHandle();
try {
handle.buf.close();
rollOver(handle);
}
catch (IgniteCheckedException e) {
U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e);
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
}
}
/** {@inheritDoc} */
@SuppressWarnings("TooBroadScope")
@Override public WALPointer log(WALRecord rec) throws IgniteCheckedException, StorageException {
if (serializer == null || mode == WALMode.NONE)
return null;
FileWriteHandle currWrHandle = currentHandle();
WALDisableContext isDisable = walDisableContext;
// Logging was not resumed yet.
if (currWrHandle == null || (isDisable != null && isDisable.check()))
return null;
// Need to calculate record size first.
rec.size(serializer.size(rec));
while (true) {
if (rec.rollOver()) {
assert cctx.database().checkpointLockIsHeldByThread();
long idx = currWrHandle.getSegmentId();
currWrHandle.buf.close();
currWrHandle = rollOver(currWrHandle);
if (log != null && log.isInfoEnabled())
log.info("Rollover segment [" + idx + " to " + currWrHandle.getSegmentId() + "], recordType=" + rec.type());
}
WALPointer ptr = currWrHandle.addRecord(rec);
if (ptr != null) {
metrics.onWalRecordLogged();
lastWALPtr.set(ptr);
if (walAutoArchiveAfterInactivity > 0)
lastRecordLoggedMs.set(U.currentTimeMillis());
return ptr;
}
else
currWrHandle = rollOver(currWrHandle);
checkNode();
if (isStopping())
throw new IgniteCheckedException("Stopping.");
}
}
/** {@inheritDoc} */
@Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException {
if (serializer == null || mode == WALMode.NONE)
return;
FileWriteHandle cur = currentHandle();
// WAL manager was not started (client node).
if (cur == null)
return;
FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr);
if (mode == LOG_ONLY)
cur.flushOrWait(filePtr);
if (!explicitFsync && mode != WALMode.FSYNC)
return; // No need to sync in LOG_ONLY or BACKGROUND unless explicit fsync is required.
// No need to sync if was rolled over.
if (filePtr != null && !cur.needFsync(filePtr))
return;
cur.fsync(filePtr);
}
/** {@inheritDoc} */
@Override public WALIterator replay(WALPointer start) throws IgniteCheckedException, StorageException {
assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start;
FileWriteHandle hnd = currentHandle();
FileWALPointer end = null;
if (hnd != null)
end = hnd.position();
return new RecordsIterator(
cctx,
walArchiveDir,
walWorkDir,
(FileWALPointer)start,
end,
dsCfg,
new RecordSerializerFactoryImpl(cctx),
ioFactory,
archiver,
decompressor,
log,
segmentAware,
segmentRouter,
lockedSegmentFileInputFactory);
}
/** {@inheritDoc} */
@Override public boolean reserve(WALPointer start) throws IgniteCheckedException {
assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
if (mode == WALMode.NONE)
return false;
segmentAware.reserve(((FileWALPointer)start).index());
if (!hasIndex(((FileWALPointer)start).index())) {
segmentAware.release(((FileWALPointer)start).index());
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public void release(WALPointer start) {
assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start;
if (mode == WALMode.NONE)
return;
segmentAware.release(((FileWALPointer)start).index());
}
/**
* @param absIdx Absolulte index to check.
* @return {@code true} if has this index.
*/
private boolean hasIndex(long absIdx) {
String segmentName = FileDescriptor.fileName(absIdx);
String zipSegmentName = FileDescriptor.fileName(absIdx) + FilePageStoreManager.ZIP_SUFFIX;
boolean inArchive = new File(walArchiveDir, segmentName).exists() ||
new File(walArchiveDir, zipSegmentName).exists();
if (inArchive)
return true;
if (absIdx <= lastArchivedIndex())
return false;
FileWriteHandle cur = currHnd;
return cur != null && cur.getSegmentId() >= absIdx;
}
/** {@inheritDoc} */
@Override public int truncate(WALPointer low, WALPointer high) {
if (high == null)
return 0;
assert high instanceof FileWALPointer : high;
// File pointer bound: older entries will be deleted from archive
FileWALPointer lowPtr = (FileWALPointer)low;
FileWALPointer highPtr = (FileWALPointer)high;
FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
int deleted = 0;
for (FileDescriptor desc : descs) {
if (lowPtr != null && desc.idx < lowPtr.index())
continue;
// Do not delete reserved or locked segment and any segment after it.
if (segmentReservedOrLocked(desc.idx))
return deleted;
long archivedAbsIdx = segmentAware.lastArchivedAbsoluteIndex();
long lastArchived = archivedAbsIdx >= 0 ? archivedAbsIdx : lastArchivedIndex();
// We need to leave at least one archived segment to correctly determine the archive index.
if (desc.idx < highPtr.index() && desc.idx < lastArchived) {
if (!desc.file.delete())
U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
desc.file.getAbsolutePath());
else
deleted++;
// Bump up the oldest archive segment index.
if (segmentAware.lastTruncatedArchiveIdx() < desc.idx)
segmentAware.lastTruncatedArchiveIdx(desc.idx);
}
}
return deleted;
}
/**
* Check if WAL segment locked (protected from move to archive) or reserved (protected from deletion from WAL
* cleanup).
*
* @param absIdx Absolute WAL segment index for check reservation.
* @return {@code True} if index is locked.
*/
private boolean segmentReservedOrLocked(long absIdx) {
FileArchiver archiver0 = archiver;
return ((archiver0 != null) && segmentAware.locked(absIdx)) || (segmentAware.reserved(absIdx));
}
/** {@inheritDoc} */
@Override public void notchLastCheckpointPtr(WALPointer ptr) {
if (compressor != null)
compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index());
}
/** {@inheritDoc} */
@Override public int walArchiveSegments() {
long lastTruncated = segmentAware.lastTruncatedArchiveIdx();
long lastArchived = segmentAware.lastArchivedAbsoluteIndex();
if (lastArchived == -1)
return 0;
int res = (int)(lastArchived - lastTruncated);
return res >= 0 ? res : 0;
}
/** {@inheritDoc} */
@Override public long lastArchivedSegment() {
return segmentAware.lastArchivedAbsoluteIndex();
}
/** {@inheritDoc} */
@Override public long lastCompactedSegment() {
return segmentAware.lastCompressedIdx();
}
/** {@inheritDoc} */
@Override public boolean reserved(WALPointer ptr) {
FileWALPointer fPtr = (FileWALPointer)ptr;
return segmentReservedOrLocked(fPtr.index());
}
/** {@inheritDoc} */
@Override public int reserved(WALPointer low, WALPointer high) {
// It is not clear now how to get the highest WAL pointer. So when high is null method returns 0.
if (high == null)
return 0;
assert high instanceof FileWALPointer : high;
assert low == null || low instanceof FileWALPointer : low;
FileWALPointer lowPtr = (FileWALPointer)low;
FileWALPointer highPtr = (FileWALPointer)high;
long lowIdx = lowPtr != null ? lowPtr.index() : 0;
long highIdx = highPtr.index();
while (lowIdx < highIdx) {
if (segmentReservedOrLocked(lowIdx))
break;
lowIdx++;
}
return (int)(highIdx - lowIdx + 1);
}
/** {@inheritDoc} */
@Override public boolean disabled(int grpId) {
return cctx.walState().isDisabled(grpId);
}
/**
* Lists files in archive directory and returns the index of last archived file.
*
* @return The absolute index of last archived file.
*/
private long lastArchivedIndex() {
long lastIdx = -1;
for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
try {
long idx = Long.parseLong(file.getName().substring(0, 16));
lastIdx = Math.max(lastIdx, idx);
}
catch (NumberFormatException | IndexOutOfBoundsException ignore) {
}
}
return lastIdx;
}
/**
* Lists files in archive directory and returns the indices of least and last archived files.
* In case of holes, first segment after last "hole" is considered as minimum.
* Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20
*
* @return The absolute indices of min and max archived files.
*/
private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() {
TreeSet<Long> archiveIndices = new TreeSet<>();
for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) {
try {
long idx = Long.parseLong(file.getName().substring(0, 16));
archiveIndices.add(idx);
}
catch (NumberFormatException | IndexOutOfBoundsException ignore) {
// No-op.
}
}
if (archiveIndices.isEmpty())
return null;
else {
Long min = archiveIndices.first();
Long max = archiveIndices.last();
if (max - min == archiveIndices.size() - 1)
return F.t(min, max); // Short path.
for (Long idx : archiveIndices.descendingSet()) {
if (!archiveIndices.contains(idx - 1))
return F.t(idx, max);
}
throw new IllegalStateException("Should never happen if TreeSet is valid.");
}
}
/**
* Creates a directory specified by the given arguments.
*
* @param cfg Configured directory path, may be {@code null}.
* @param defDir Default directory path, will be used if cfg is {@code null}.
* @param consId Local node consistent ID.
* @param msg File description to print out on successful initialization.
* @return Initialized directory.
* @throws IgniteCheckedException If failed to initialize directory.
*/
private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException {
File dir;
if (cfg != null) {
File workDir0 = new File(cfg);
dir = workDir0.isAbsolute() ?
new File(workDir0, consId) :
new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, false), consId);
}
else
dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), defDir, false), consId);
U.ensureDirectory(dir, msg, log);
return dir;
}
/**
* @return Current log segment handle.
*/
private FileWriteHandle currentHandle() {
return currHnd;
}
/**
* @param cur Handle that failed to fit the given entry.
* @return Handle that will fit the entry.
*/
private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, IgniteCheckedException {
FileWriteHandle hnd = currentHandle();
if (hnd != cur)
return hnd;
if (hnd.close(true)) {
if (metrics.metricsEnabled())
metrics.onWallRollOver();
FileWriteHandle next = initNextWriteHandle(cur);
next.writeHeader();
if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint)
cctx.database().forceCheckpoint("too big size of WAL without checkpoint");
boolean swapped = CURR_HND_UPD.compareAndSet(this, hnd, next);
assert swapped : "Concurrent updates on rollover are not allowed";
if (walAutoArchiveAfterInactivity > 0)
lastRecordLoggedMs.set(0);
// Let other threads to proceed with new segment.
hnd.signalNextAvailable();
}
else
hnd.awaitNext();
return currentHandle();
}
/**
* Give last checkpoint file idx.
*/
private long lashCheckpointFileIdx() {
WALPointer lastCheckpointMark = cctx.database().lastCheckpointMarkWalPointer();
return lastCheckpointMark == null ? 0 : ((FileWALPointer)lastCheckpointMark).index();
}
/**
* @param lastReadPtr Last read WAL file pointer.
* @return Initialized file write handle.
* @throws StorageException If failed to initialize WAL write handle.
*/
private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException {
long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index();
@Nullable FileArchiver archiver0 = archiver;
long segNo = archiver0 == null ? absIdx : absIdx % dsCfg.getWalSegments();
File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo));
int off = lastReadPtr == null ? 0 : lastReadPtr.fileOffset();
int len = lastReadPtr == null ? 0 : lastReadPtr.length();
try {
SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile));
IgniteInClosure<FileIO> lsnr = createWalFileListener;
if (lsnr != null)
lsnr.apply(fileIO);
try {
int serVer = serializerVer;
// If we have existing segment, try to read version from it.
if (lastReadPtr != null) {
try {
serVer = readSegmentHeader(fileIO, segmentFileInputFactory).getSerializerVersion();
}
catch (SegmentEofException | EOFException ignore) {
serVer = serializerVer;
}
}
RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer);
if (log.isInfoEnabled())
log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() +
", offset=" + off + ", ver=" + serVer + ']');
SegmentedRingByteBuffer rbuf;
if (mmap) {
MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
rbuf = new SegmentedRingByteBuffer(buf, metrics);
}
else
rbuf = new SegmentedRingByteBuffer(dsCfg.getWalBufferSize(), maxWalSegmentSize, DIRECT, metrics);
if (lastReadPtr != null)
rbuf.init(lastReadPtr.fileOffset() + lastReadPtr.length());
FileWriteHandle hnd = new FileWriteHandle(
fileIO,
off + len,
true,
ser,
rbuf);
if (archiver0 != null)
segmentAware.curAbsWalIdx(absIdx);
else
segmentAware.setLastArchivedAbsoluteIndex(absIdx - 1);
return hnd;
}
catch (IgniteCheckedException | IOException e) {
try {
fileIO.close();
}
catch (IOException suppressed) {
e.addSuppressed(suppressed);
}
if (e instanceof StorageException)
throw (StorageException) e;
throw e instanceof IOException ? (IOException) e : new IOException(e);
}
}
catch (IOException e) {
throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e);
}
}
/**
* Fills the file header for a new segment. Calling this method signals we are done with the segment and it can be
* archived. If we don't have prepared file yet and achiever is busy this method blocks
*
* @param cur Current file write handle released by WAL writer
* @return Initialized file handle.
* @throws IgniteCheckedException If exception occurred.
*/
private FileWriteHandle initNextWriteHandle(FileWriteHandle cur) throws IgniteCheckedException {
IgniteCheckedException error = null;
try {
File nextFile = pollNextFile(cur.getSegmentId());
if (log.isDebugEnabled())
log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath());
SegmentedRingByteBuffer rbuf = null;
SegmentIO fileIO = null;
FileWriteHandle hnd;
boolean interrupted = false;
while (true) {
try {
fileIO = new SegmentIO(cur.getSegmentId() + 1, ioFactory.create(nextFile));
IgniteInClosure<FileIO> lsnr = createWalFileListener;
if (lsnr != null)
lsnr.apply(fileIO);
if (mmap) {
MappedByteBuffer buf = fileIO.map((int)maxWalSegmentSize);
rbuf = new SegmentedRingByteBuffer(buf, metrics);
}
else
rbuf = cur.buf.reset();
hnd = new FileWriteHandle(
fileIO,
0,
false,
serializer,
rbuf);
if (interrupted)
Thread.currentThread().interrupt();
break;
}
catch (ClosedByInterruptException ignore) {
interrupted = true;
Thread.interrupted();
if (fileIO != null) {
try {
fileIO.close();
}
catch (IOException ignored) {
// No-op.
}
fileIO = null;
}
if (rbuf != null) {
rbuf.free();
rbuf = null;
}
}
}
return hnd;
}
catch (IgniteCheckedException e) {
throw error = e;
}
catch (IOException e) {
throw error = new StorageException("Unable to initialize WAL segment", e);
}
finally {
if (error != null)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error));
}
}
/**
* Deletes temp files, creates and prepares new; Creates first segment if necessary
*
* @throws StorageException If failed.
*/
private void checkOrPrepareFiles() throws StorageException {
// Clean temp files.
{
File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER);
if (!F.isEmpty(tmpFiles)) {
for (File tmp : tmpFiles) {
boolean deleted = tmp.delete();
if (!deleted)
throw new StorageException("Failed to delete previously created temp file " +
"(make sure Ignite process has enough rights): " + tmp.getAbsolutePath());
}
}
}
File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER);
if (isArchiverEnabled())
if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments())
throw new StorageException("Failed to initialize wal (work directory contains " +
"incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']');
// Allocate the first segment synchronously. All other segments will be allocated by archiver in background.
if (allFiles.length == 0) {
File first = new File(walWorkDir, FileDescriptor.fileName(0));
createFile(first);
}
else
checkFiles(0, false, null, null);
}
/** {@inheritDoc} */
@Override public void cleanupWalDirectories() throws IgniteCheckedException {
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(walWorkDir.toPath())) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup wal work directory: " + walWorkDir, e);
}
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(walArchiveDir.toPath())) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup wal archive directory: " + walArchiveDir, e);
}
}
/**
* Clears whole the file, fills with zeros for Default mode.
*
* @param file File to format.
* @throws StorageException if formatting failed
*/
private void formatFile(File file) throws StorageException {
formatFile(file, dsCfg.getWalSegmentSize());
}
/**
* Clears the file, fills with zeros for Default mode.
*
* @param file File to format.
* @param bytesCntToFormat Count of first bytes to format.
* @throws StorageException if formatting failed
*/
private void formatFile(File file, int bytesCntToFormat) throws StorageException {
if (log.isDebugEnabled())
log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) {
int left = bytesCntToFormat;
if (mode == WALMode.FSYNC || mmap) {
while ((left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left))) > 0)
;
fileIO.force();
}
else
fileIO.clear();
}
catch (IOException e) {
StorageException ex = new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e);
if (failureProcessor != null)
failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
throw ex;
}
}
/**
* Creates a file atomically with temp file.
*
* @param file File to create.
* @throws StorageException If failed.
*/
private void createFile(File file) throws StorageException {
if (log.isDebugEnabled())
log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']');
File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX);
formatFile(tmp);
try {
Files.move(tmp.toPath(), file.toPath());
}
catch (IOException e) {
throw new StorageException("Failed to move temp file to a regular WAL segment file: " +
file.getAbsolutePath(), e);
}
if (log.isDebugEnabled())
log.debug("Created WAL segment [file=" + file.getAbsolutePath() + ", size=" + file.length() + ']');
}
/**
* Retrieves next available file to write WAL data, waiting if necessary for a segment to become available.
*
* @param curIdx Current absolute WAL segment index.
* @return File ready for use as new WAL segment.
* @throws StorageException If exception occurred in the archiver thread.
*/
private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException {
FileArchiver archiver0 = archiver;
if (archiver0 == null) {
segmentAware.setLastArchivedAbsoluteIndex(curIdx);
return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1));
}
// Signal to archiver that we are done with the segment and it can be archived.
long absNextIdx = archiver0.nextAbsoluteSegmentIndex();
long segmentIdx = absNextIdx % dsCfg.getWalSegments();
return new File(walWorkDir, FileDescriptor.fileName(segmentIdx));
}
/**
* Files from archive WAL directory.
*/
private FileDescriptor[] walArchiveFiles() {
return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
}
/** {@inheritDoc} */
@Override public long maxArchivedSegmentToDelete() {
//When maxWalArchiveSize==MAX_VALUE deleting files is not permit.
if (dsCfg.getMaxWalArchiveSize() == Long.MAX_VALUE)
return -1;
FileDescriptor[] archivedFiles = walArchiveFiles();
Long totalArchiveSize = Stream.of(archivedFiles)
.map(desc -> desc.file().length())
.reduce(0L, Long::sum);
if (archivedFiles.length == 0 || totalArchiveSize < allowedThresholdWalArchiveSize)
return -1;
long sizeOfOldestArchivedFiles = 0;
for (FileDescriptor desc : archivedFiles) {
sizeOfOldestArchivedFiles += desc.file().length();
if (totalArchiveSize - sizeOfOldestArchivedFiles < allowedThresholdWalArchiveSize)
return desc.getIdx();
}
return archivedFiles[archivedFiles.length - 1].getIdx();
}
/**
* @return Sorted WAL files descriptors.
*/
public static FileDescriptor[] scan(File[] allFiles) {
if (allFiles == null)
return EMPTY_DESCRIPTORS;
FileDescriptor[] descs = new FileDescriptor[allFiles.length];
for (int i = 0; i < allFiles.length; i++) {
File f = allFiles[i];
descs[i] = new FileDescriptor(f);
}
Arrays.sort(descs);
return descs;
}
/**
* @throws StorageException If node is no longer valid and we missed a WAL operation.
*/
private void checkNode() throws StorageException {
if (cctx.kernalContext().invalid())
throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " +
"previous error)");
}
/**
* Setup listener for WAL segment write File IO creation.
* @param createWalFileListener Listener to be invoked for new segment file IO creation.
*/
public void setCreateWalFileListener(@Nullable IgniteInClosure<FileIO> createWalFileListener) {
this.createWalFileListener = createWalFileListener;
}
/**
* @return {@link #maxWalSegmentSize}.
*/
public long maxWalSegmentSize() {
return maxWalSegmentSize;
}
/**
* File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate the
* work WAL segment: S(N) = N % dsCfg.walSegments. When a work segment is finished, it is given to the archiver. If
* the absolute index of last archived segment is denoted by A and the absolute index of next segment we want to
* write is denoted by W, then we can allow write to S(W) if W - A <= walSegments. <br>
*
* Monitor of current object is used for notify on: <ul>
* <li>exception occurred ({@link FileArchiver#cleanErr}!=null)</li>
* <li>stopping thread ({@link FileArchiver#isCancelled==true})</li>
* <li>current file index changed </li>
* <li>last archived file index was changed</li>
* <li>some WAL index was removed from map</li>
* </ul>
*/
private class FileArchiver extends GridWorker {
/** Exception which occurred during initial creation of files or during archiving WAL segment */
private StorageException cleanErr;
/** Formatted index. */
private int formatted;
/**
*
*/
private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) {
super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log,
cctx.kernalContext().workersRegistry());
segmentAware.setLastArchivedAbsoluteIndex(lastAbsArchivedIdx);
}
/**
* @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
synchronized (this) {
isCancelled = true;
notifyAll();
}
U.join(runner());
}
/** {@inheritDoc} */
@Override protected void body() {
blockingSectionBegin();
try {
allocateRemainingFiles();
}
catch (StorageException e) {
synchronized (this) {
// Stop the thread and report to starter.
cleanErr = e;
segmentAware.forceInterrupt();
notifyAll();
}
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e));
return;
}
finally {
blockingSectionEnd();
}
Throwable err = null;
try {
blockingSectionBegin();
try {
segmentAware.awaitSegment(0);//wait for init at least one work segments.
}
finally {
blockingSectionEnd();
}
while (!Thread.currentThread().isInterrupted() && !isCancelled()) {
long toArchive;
blockingSectionBegin();
try {
toArchive = segmentAware.waitNextSegmentForArchivation();
}
finally {
blockingSectionEnd();
}
if (isCancelled())
break;
SegmentArchiveResult res;
blockingSectionBegin();
try {
res = archiveSegment(toArchive);
}
finally {
blockingSectionEnd();
}
blockingSectionBegin();
try {
segmentAware.markAsMovedToArchive(toArchive);
}
finally {
blockingSectionEnd();
}
if (evt.isRecordable(EVT_WAL_SEGMENT_ARCHIVED)) {
evt.record(new WalSegmentArchivedEvent(
cctx.discovery().localNode(),
res.getAbsIdx(),
res.getDstArchiveFile())
);
}
onIdle();
}
}
catch (IgniteInterruptedCheckedException e) {
Thread.currentThread().interrupt();
synchronized (this) {
isCancelled = true;
}
}
catch (Throwable t) {
err = t;
}
finally {
if (err == null && !isCancelled())
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
failureProcessor.process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
/**
* Gets the absolute index of the next WAL segment available to write. Blocks till there are available file to
* write
*
* @return Next index (curWalSegmIdx+1) when it is ready to be written.
* @throws StorageException If exception occurred in the archiver thread.
*/
private long nextAbsoluteSegmentIndex() throws StorageException, IgniteInterruptedCheckedException {
synchronized (this) {
if (cleanErr != null)
throw cleanErr;
try {
long nextIdx = segmentAware.nextAbsoluteSegmentIndex();
// Wait for formatter so that we do not open an empty file in DEFAULT mode.
while (nextIdx % dsCfg.getWalSegments() > formatted && cleanErr == null)
wait();
if (cleanErr != null)
throw cleanErr;
return nextIdx;
}
catch (IgniteInterruptedCheckedException e) {
if (cleanErr != null)
throw cleanErr;
throw e;
}
catch (InterruptedException e) {
throw new IgniteInterruptedCheckedException(e);
}
}
}
/**
* @param absIdx Segment absolute index.
* @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need
* release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul>
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
public boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) {
return segmentAware.checkCanReadArchiveOrReserveWorkSegment(absIdx);
}
/**
* @param absIdx Segment absolute index.
*/
@SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext")
public void releaseWorkSegment(long absIdx) {
segmentAware.releaseWorkSegment(absIdx);
}
/**
* Moves WAL segment from work folder to archive folder. Temp file is used to do movement
*
* @param absIdx Absolute index to archive.
*/
private SegmentArchiveResult archiveSegment(long absIdx) throws StorageException {
long segIdx = absIdx % dsCfg.getWalSegments();
File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx));
String name = FileDescriptor.fileName(absIdx);
File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX);
File dstFile = new File(walArchiveDir, name);
if (log.isInfoEnabled())
log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx +
", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']');
try {
Files.deleteIfExists(dstTmpFile.toPath());
Files.copy(origFile.toPath(), dstTmpFile.toPath());
Files.move(dstTmpFile.toPath(), dstFile.toPath());
if (mode != WALMode.NONE) {
try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) {
f0.force();
}
}
}
catch (IOException e) {
throw new StorageException("Failed to archive WAL segment [" +
"srcFile=" + origFile.getAbsolutePath() +
", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e);
}
if (log.isInfoEnabled())
log.info("Copied file [src=" + origFile.getAbsolutePath() +
", dst=" + dstFile.getAbsolutePath() + ']');
return new SegmentArchiveResult(absIdx, origFile, dstFile);
}
/**
*
*/
private boolean checkStop() {
return isCancelled();
}
/**
* Background creation of all segments except first. First segment was created in main thread by {@link
* FileWriteAheadLogManager#checkOrPrepareFiles()}
*/
private void allocateRemainingFiles() throws StorageException {
checkFiles(
1,
true,
new IgnitePredicate<Integer>() {
@Override public boolean apply(Integer integer) {
return !checkStop();
}
},
new CI1<Integer>() {
@Override public void apply(Integer idx) {
synchronized (FileArchiver.this) {
formatted = idx;
FileArchiver.this.notifyAll();
}
}
}
);
}
}
/**
* Responsible for compressing WAL archive segments.
* Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved.
*/
private class FileCompressor extends Thread {
/** Current thread stopping advice. */
private volatile boolean stopped;
/** All segments prior to this (inclusive) can be compressed. */
private volatile long minUncompressedIdxToKeep = -1L;
/** */
FileCompressor() {
super("wal-file-compressor%" + cctx.igniteInstanceName());
}
/** */
private void init() {
File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER);
for (File f : toDel) {
if (stopped)
return;
f.delete();
}
FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER));
if (alreadyCompressed.length > 0)
segmentAware.lastCompressedIdx(alreadyCompressed[alreadyCompressed.length - 1].idx());
}
/**
* @param idx Minimum raw segment index that should be preserved from deletion.
*/
void keepUncompressedIdxFrom(long idx) {
minUncompressedIdxToKeep = idx;
}
/**
* Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation.
* Waits if there's no segment to archive right now.
*/
private long tryReserveNextSegmentOrWait() throws IgniteCheckedException {
long segmentToCompress = segmentAware.waitNextSegmentToCompress();
boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0));
return reserved ? segmentToCompress : -1;
}
/**
* Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves.
*/
private void deleteObsoleteRawSegments() {
FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER));
Set<Long> indices = new HashSet<>();
Set<Long> duplicateIndices = new HashSet<>();
for (FileDescriptor desc : descs) {
if (!indices.add(desc.idx))
duplicateIndices.add(desc.idx);
}
for (FileDescriptor desc : descs) {
if (desc.isCompressed())
continue;
// Do not delete reserved or locked segment and any segment after it.
if (segmentReservedOrLocked(desc.idx))
return;
if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx)) {
if (!desc.file.delete())
U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " +
desc.file.getAbsolutePath() + ", exists: " + desc.file.exists());
}
}
}
/** {@inheritDoc} */
@Override public void run() {
init();
while (!Thread.currentThread().isInterrupted() && !stopped) {
long currReservedSegment = -1;
try {
deleteObsoleteRawSegments();
currReservedSegment = tryReserveNextSegmentOrWait();
if (currReservedSegment == -1)
continue;
File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)
+ FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX);
File zip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) + FilePageStoreManager.ZIP_SUFFIX);
File raw = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment));
if (!Files.exists(raw.toPath()))
throw new IgniteCheckedException("WAL archive segment is missing: " + raw);
compressSegmentToFile(currReservedSegment, raw, tmpZip);
Files.move(tmpZip.toPath(), zip.toPath());
if (mode != WALMode.NONE) {
try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) {
f0.force();
}
if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED)) {
evt.record(new WalSegmentCompactedEvent(
cctx.discovery().localNode(),
currReservedSegment,
zip.getAbsoluteFile())
);
}
}
segmentAware.lastCompressedIdx(currReservedSegment);
}
catch (IgniteInterruptedCheckedException ignore) {
Thread.currentThread().interrupt();
}
catch (IgniteCheckedException | IOException e) {
U.error(log, "Compression of WAL segment [idx=" + currReservedSegment +
"] was skipped due to unexpected error", e);
segmentAware.lastCompressedIdx(currReservedSegment);
}
finally {
if (currReservedSegment != -1)
release(new FileWALPointer(currReservedSegment, 0, 0));
}
}
}
/**
* @param nextSegment Next segment absolute idx.
* @param raw Raw file.
* @param zip Zip file.
*/
private void compressSegmentToFile(long nextSegment, File raw, File zip)
throws IOException, IgniteCheckedException {
int segmentSerializerVer;
try (FileIO fileIO = ioFactory.create(raw)) {
segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion();
}
try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) {
zos.setLevel(dsCfg.getWalCompactionLevel());
zos.putNextEntry(new ZipEntry(""));
ByteBuffer buf = ByteBuffer.allocate(HEADER_RECORD_SIZE);
buf.order(ByteOrder.nativeOrder());
zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true, buf).array());
final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() {
@Override public void applyx(WALRecord record) throws IgniteCheckedException {
final MarshalledRecord marshRec = (MarshalledRecord)record;
try {
zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining());
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
};
try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator(
log, cctx, ioFactory, BUF_SIZE, nextSegment, walArchiveDir, appendToZipC)) {
while (iter.hasNextX())
iter.nextX();
}
RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(segmentSerializerVer);
ByteBuffer heapBuf = prepareSwitchSegmentRecordBuffer(nextSegment, ser);
zos.write(heapBuf.array());
}
}
/**
* @param nextSegment Segment index.
* @param ser Record Serializer.
*/
@NotNull private ByteBuffer prepareSwitchSegmentRecordBuffer(long nextSegment, RecordSerializer ser)
throws IgniteCheckedException {
SwitchSegmentRecord switchRecord = new SwitchSegmentRecord();
int switchRecordSize = ser.size(switchRecord);
switchRecord.size(switchRecordSize);
switchRecord.position(new FileWALPointer(nextSegment, 0, switchRecordSize));
ByteBuffer heapBuf = ByteBuffer.allocate(switchRecordSize);
ser.writeRecord(switchRecord, heapBuf);
return heapBuf;
}
/**
* @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown.
*/
private void shutdown() throws IgniteInterruptedCheckedException {
synchronized (this) {
stopped = true;
notifyAll();
}
U.join(this);
}
}
/**
* Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay.
*/
private class FileDecompressor extends GridWorker {
/** Decompression futures. */
private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>();
/** Segments queue. */
private final PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>();
/** Byte array for draining data. */
private byte[] arr = new byte[BUF_SIZE];
/**
* @param log Logger.
*/
FileDecompressor(IgniteLogger log) {
super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log,
cctx.kernalContext().workersRegistry());
}
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
try {
while (!isCancelled()) {
long segmentToDecompress = -1L;
try {
blockingSectionBegin();
try {
segmentToDecompress = segmentsQueue.take();
}
finally {
blockingSectionEnd();
}
if (isCancelled())
break;
File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
+ FilePageStoreManager.ZIP_SUFFIX);
File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)
+ FilePageStoreManager.TMP_SUFFIX);
File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress));
try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip)));
FileIO io = ioFactory.create(unzipTmp)) {
zis.getNextEntry();
while (io.writeFully(arr, 0, zis.read(arr)) > 0)
updateHeartbeat();
}
try {
Files.move(unzipTmp.toPath(), unzip.toPath());
}
catch (FileAlreadyExistsException e) {
U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " +
"[tmp=" + unzipTmp + ", raw=" + unzip + "]", e);
if (!unzipTmp.delete())
U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + "]");
}
updateHeartbeat();
synchronized (this) {
decompressionFutures.remove(segmentToDecompress).onDone();
}
}
catch (IOException ex) {
if (!isCancelled && segmentToDecompress != -1L) {
IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " +
"decompression [segmentIdx=" + segmentToDecompress + "]", ex);
synchronized (this) {
decompressionFutures.remove(segmentToDecompress).onDone(e);
}
}
}
}
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
if (!isCancelled)
err = e;
}
catch (Throwable t) {
err = t;
}
finally {
if (err == null && !isCancelled)
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
failureProcessor.process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
/**
* Asynchronously decompresses WAL segment which is present only in .zip file.
*
* @return Future which is completed once file is decompressed.
*/
synchronized IgniteInternalFuture<Void> decompressFile(long idx) {
if (decompressionFutures.containsKey(idx))
return decompressionFutures.get(idx);
File f = new File(walArchiveDir, FileDescriptor.fileName(idx));
if (f.exists())
return new GridFinishedFuture<>();
segmentsQueue.put(idx);
GridFutureAdapter<Void> res = new GridFutureAdapter<>();
decompressionFutures.put(idx, res);
return res;
}
/** */
private void shutdown() {
synchronized (this) {
U.cancel(this);
// Put fake -1 to wake thread from queue.take()
segmentsQueue.put(-1L);
}
U.join(this, log);
}
}
/**
* Validate files depending on {@link DataStorageConfiguration#getWalSegments()} and create if need. Check end
* when exit condition return false or all files are passed.
*
* @param startWith Start with.
* @param create Flag create file.
* @param p Predicate Exit condition.
* @throws StorageException if validation or create file fail.
*/
private void checkFiles(
int startWith,
boolean create,
@Nullable IgnitePredicate<Integer> p,
@Nullable IgniteInClosure<Integer> completionCallback
) throws StorageException {
for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || p.apply(i)); i++) {
File checkFile = new File(walWorkDir, FileDescriptor.fileName(i));
if (checkFile.exists()) {
if (checkFile.isDirectory())
throw new StorageException("Failed to initialize WAL log segment (a directory with " +
"the same name already exists): " + checkFile.getAbsolutePath());
else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC)
throw new StorageException("Failed to initialize WAL log segment " +
"(WAL segment size change is not supported in 'DEFAULT' WAL mode) " +
"[filePath=" + checkFile.getAbsolutePath() +
", fileSize=" + checkFile.length() +
", configSize=" + dsCfg.getWalSegments() + ']');
}
else if (create)
createFile(checkFile);
if (completionCallback != null)
completionCallback.apply(i);
}
}
/**
* Needs only for WAL compaction.
*
* @param idx Index.
* @param ver Version.
* @param compacted Compacted flag.
*/
@NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted, ByteBuffer buf) {
// Write record type.
buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1));
// Write position.
RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0));
// Place magic number.
buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC);
// Place serializer version.
buf.putInt(ver);
// Place CRC if needed.
if (!RecordV1Serializer.skipCrc) {
int curPos = buf.position();
buf.position(0);
// This call will move buffer position to the end of the record again.
int crcVal = PureJavaCrc32.calcCrc32(buf, curPos);
buf.putInt(crcVal);
}
else
buf.putInt(0);
// Write header record through io.
buf.position(0);
return buf;
}
/**
*
*/
private abstract static class FileHandle {
/** I/O interface for read/write operations with file */
SegmentIO fileIO;
/** Segment idx corresponded to fileIo*/
final long segmentIdx;
/**
* @param fileIO I/O interface for read/write operations of FileHandle.
*/
private FileHandle(@NotNull SegmentIO fileIO) {
this.fileIO = fileIO;
segmentIdx = fileIO.getSegmentId();
}
/**
* @return Absolute WAL segment file index (incremental counter).
*/
public long getSegmentId(){
return segmentIdx;
}
}
/**
*
*/
public static class ReadFileHandle extends FileHandle implements AbstractWalRecordsIterator.AbstractReadFileHandle {
/** Entry serializer. */
RecordSerializer ser;
/** */
FileInput in;
/** Holder of actual information of latest manipulation on WAL segments. */
private final SegmentAware segmentAware;
/**
* @param fileIO I/O interface for read/write operations of FileHandle.
* @param ser Entry serializer.
* @param in File input.
* @param aware Segment aware.
*/
public ReadFileHandle(
SegmentIO fileIO,
RecordSerializer ser,
FileInput in,
SegmentAware aware) {
super(fileIO);
this.ser = ser;
this.in = in;
segmentAware = aware;
}
/**
* @throws IgniteCheckedException If failed to close the WAL segment file.
*/
@Override public void close() throws IgniteCheckedException {
try {
fileIO.close();
in.io().close();
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public long idx() {
return getSegmentId();
}
/** {@inheritDoc} */
@Override public FileInput in() {
return in;
}
/** {@inheritDoc} */
@Override public RecordSerializer ser() {
return ser;
}
/** {@inheritDoc} */
@Override public boolean workDir() {
return segmentAware != null && segmentAware.lastArchivedAbsoluteIndex() < getSegmentId();
}
}
/**
* File handle for one log segment.
*/
@SuppressWarnings("SignalWithoutCorrespondingAwait")
private class FileWriteHandle extends FileHandle {
/** */
private final RecordSerializer serializer;
/** Created on resume logging. */
private volatile boolean resume;
/**
* Position in current file after the end of last written record (incremented after file channel write
* operation)
*/
volatile long written;
/** */
private volatile long lastFsyncPos;
/** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)} */
private final AtomicBoolean stop = new AtomicBoolean(false);
/** */
private final Lock lock = new ReentrantLock();
/** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()} */
private final Condition fsync = lock.newCondition();
/**
* Next segment available condition. Protection from "spurious wakeup" is provided by predicate {@link
* #fileIO}=<code>null</code>
*/
private final Condition nextSegment = lock.newCondition();
/** Buffer. */
private final SegmentedRingByteBuffer buf;
/**
* @param fileIO I/O file interface to use
* @param pos Position.
* @param resume Created on resume logging flag.
* @param serializer Serializer.
* @param buf Buffer.
* @throws IOException If failed.
*/
private FileWriteHandle(
SegmentIO fileIO,
long pos,
boolean resume,
RecordSerializer serializer,
SegmentedRingByteBuffer buf
) throws IOException {
super(fileIO);
assert serializer != null;
if (!mmap)
fileIO.position(pos);
this.serializer = serializer;
written = pos;
lastFsyncPos = pos;
this.resume = resume;
this.buf = buf;
}
/**
* Write serializer version to current handle.
*/
public void writeHeader() {
SegmentedRingByteBuffer.WriteSegment seg = buf.offer(HEADER_RECORD_SIZE);
assert seg != null && seg.position() > 0;
prepareSerializerVersionBuffer(getSegmentId(), serializerVersion(), false, seg.buffer());
seg.release();
}
/**
* @param rec Record to be added to write queue.
* @return Pointer or null if roll over to next segment is required or already started by other thread.
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
*/
@Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException {
assert rec.size() > 0 : rec;
for (;;) {
checkNode();
SegmentedRingByteBuffer.WriteSegment seg;
// Buffer can be in open state in case of resuming with different serializer version.
if (rec.type() == SWITCH_SEGMENT_RECORD && !currHnd.resume)
seg = buf.offerSafe(rec.size());
else
seg = buf.offer(rec.size());
FileWALPointer ptr = null;
if (seg != null) {
try {
int pos = (int)(seg.position() - rec.size());
ByteBuffer buf = seg.buffer();
if (buf == null)
return null; // Can not write to this segment, need to switch to the next one.
ptr = new FileWALPointer(getSegmentId(), pos, rec.size());
rec.position(ptr);
fillBuffer(buf, rec);
if (mmap) {
// written field must grow only, but segment with greater position can be serialized
// earlier than segment with smaller position.
while (true) {
long written0 = written;
if (seg.position() > written0) {
if (WRITTEN_UPD.compareAndSet(this, written0, seg.position()))
break;
}
else
break;
}
}
return ptr;
}
finally {
seg.release();
if (mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord)
flushOrWait(ptr);
}
}
else
walWriter.flushAll();
}
}
/**
* Flush or wait for concurrent flush completion.
*
* @param ptr Pointer.
*/
private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException {
if (ptr != null) {
// If requested obsolete file index, it must be already flushed by close.
if (ptr.index() != getSegmentId())
return;
}
flush(ptr);
}
/**
* @param ptr Pointer.
*/
private void flush(FileWALPointer ptr) throws IgniteCheckedException {
if (ptr == null) { // Unconditional flush.
walWriter.flushAll();
return;
}
assert ptr.index() == getSegmentId();
walWriter.flushBuffer(ptr.fileOffset());
}
/**
* @param buf Buffer.
* @param rec WAL record.
* @throws IgniteCheckedException If failed.
*/
private void fillBuffer(ByteBuffer buf, WALRecord rec) throws IgniteCheckedException {
try {
serializer.writeRecord(rec, buf);
}
catch (RuntimeException e) {
throw new IllegalStateException("Failed to write record: " + rec, e);
}
}
/**
* Non-blocking check if this pointer needs to be sync'ed.
*
* @param ptr WAL pointer to check.
* @return {@code False} if this pointer has been already sync'ed.
*/
private boolean needFsync(FileWALPointer ptr) {
// If index has changed, it means that the log was rolled over and already sync'ed.
// If requested position is smaller than last sync'ed, it also means all is good.
// If position is equal, then our record is the last not synced.
return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset();
}
/**
* @return Pointer to the end of the last written record (probably not fsync-ed).
*/
private FileWALPointer position() {
lock.lock();
try {
return new FileWALPointer(getSegmentId(), (int)written, 0);
}
finally {
lock.unlock();
}
}
/**
* @param ptr Pointer to sync.
* @throws StorageException If failed.
*/
private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException {
lock.lock();
try {
if (ptr != null) {
if (!needFsync(ptr))
return;
if (fsyncDelay > 0 && !stop.get()) {
// Delay fsync to collect as many updates as possible: trade latency for throughput.
U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS);
if (!needFsync(ptr))
return;
}
}
flushOrWait(ptr);
if (stop.get())
return;
long lastFsyncPos0 = lastFsyncPos;
long written0 = written;
if (lastFsyncPos0 != written0) {
// Fsync position must be behind.
assert lastFsyncPos0 < written0 : "lastFsyncPos=" + lastFsyncPos0 + ", written=" + written0;
boolean metricsEnabled = metrics.metricsEnabled();
long start = metricsEnabled ? System.nanoTime() : 0;
if (mmap) {
long pos = ptr == null ? -1 : ptr.fileOffset();
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(pos);
if (segs != null) {
assert segs.size() == 1;
SegmentedRingByteBuffer.ReadSegment seg = segs.get(0);
int off = seg.buffer().position();
int len = seg.buffer().limit() - off;
fsync((MappedByteBuffer)buf.buf, off, len);
seg.release();
}
}
else
walWriter.force();
lastFsyncPos = written;
if (fsyncDelay > 0)
fsync.signalAll();
long end = metricsEnabled ? System.nanoTime() : 0;
if (metricsEnabled)
metrics.onFsync(end - start);
}
}
finally {
lock.unlock();
}
}
/**
* @param buf Mapped byte buffer..
* @param off Offset.
* @param len Length.
*/
private void fsync(MappedByteBuffer buf, int off, int len) throws IgniteCheckedException {
try {
long mappedOff = (Long)mappingOffset.invoke(buf);
assert mappedOff == 0 : mappedOff;
long addr = (Long)mappingAddress.invoke(buf, mappedOff);
long delta = (addr + off) % PAGE_SIZE;
long alignedAddr = (addr + off) - delta;
force0.invoke(buf, fd.get(buf), alignedAddr, len + delta);
}
catch (IllegalAccessException | InvocationTargetException e) {
throw new IgniteCheckedException(e);
}
}
/**
* @return {@code true} If this thread actually closed the segment.
* @throws IgniteCheckedException If failed.
* @throws StorageException If failed.
*/
private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException {
if (stop.compareAndSet(false, true)) {
lock.lock();
try {
flushOrWait(null);
try {
RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx)
.createSerializer(serializerVer);
SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord();
int switchSegmentRecSize = backwardSerializer.size(segmentRecord);
if (rollOver && written < (maxWalSegmentSize - switchSegmentRecSize)) {
segmentRecord.size(switchSegmentRecSize);
WALPointer segRecPtr = addRecord(segmentRecord);
if (segRecPtr != null)
fsync((FileWALPointer)segRecPtr);
}
if (mmap) {
List<SegmentedRingByteBuffer.ReadSegment> segs = buf.poll(maxWalSegmentSize);
if (segs != null) {
assert segs.size() == 1;
segs.get(0).release();
}
}
// Do the final fsync.
if (mode != WALMode.NONE) {
if (mmap)
((MappedByteBuffer)buf.buf).force();
else
fileIO.force();
lastFsyncPos = written;
}
if (mmap) {
try {
fileIO.close();
}
catch (IOException ignore) {
// No-op.
}
}
else {
walWriter.close();
if (!rollOver)
buf.free();
}
}
catch (IOException e) {
throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e);
}
if (log.isDebugEnabled())
log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]");
return true;
}
finally {
if (mmap)
buf.free();
lock.unlock();
}
}
else
return false;
}
/**
* Signals next segment available to wake up other worker threads waiting for WAL to write
*/
private void signalNextAvailable() {
lock.lock();
try {
assert cctx.kernalContext().invalid() ||
written == lastFsyncPos || mode != WALMode.FSYNC :
"fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ", idx=" + getSegmentId() + ']';
fileIO = null;
nextSegment.signalAll();
}
finally {
lock.unlock();
}
}
/**
*
*/
private void awaitNext() {
lock.lock();
try {
while (fileIO != null)
U.awaitQuiet(nextSegment);
}
finally {
lock.unlock();
}
}
/**
* @return Safely reads current position of the file channel as String. Will return "null" if channel is null.
*/
private String safePosition() {
FileIO io = fileIO;
if (io == null)
return "null";
try {
return String.valueOf(io.position());
}
catch (IOException e) {
return "{Failed to read channel position: " + e.getMessage() + '}';
}
}
}
/**
* Iterator over WAL-log.
*/
private static class RecordsIterator extends AbstractWalRecordsIterator {
/** */
private static final long serialVersionUID = 0L;
/** */
private final File walArchiveDir;
/** */
private final File walWorkDir;
/** See {@link FileWriteAheadLogManager#archiver}. */
@Nullable private final FileArchiver archiver;
/** */
private final FileDecompressor decompressor;
/** */
private final DataStorageConfiguration dsCfg;
/** Optional start pointer. */
@Nullable
private FileWALPointer start;
/** Optional end pointer. */
@Nullable
private FileWALPointer end;
/** Manager of segment location. */
private SegmentRouter segmentRouter;
/** Holder of actual information of latest manipulation on WAL segments. */
private SegmentAware segmentAware;
/**
* @param cctx Shared context.
* @param walArchiveDir WAL archive dir.
* @param walWorkDir WAL dir.
* @param start Optional start pointer.
* @param end Optional end pointer.
* @param dsCfg Database configuration.
* @param serializerFactory Serializer factory.
* @param archiver File Archiver.
* @param decompressor Decompressor.
* @param log Logger @throws IgniteCheckedException If failed to initialize WAL segment.
* @param segmentAware Segment aware.
* @param segmentRouter Segment router.
* @param segmentFileInputFactory
*/
private RecordsIterator(
GridCacheSharedContext cctx,
File walArchiveDir,
File walWorkDir,
@Nullable FileWALPointer start,
@Nullable FileWALPointer end,
DataStorageConfiguration dsCfg,
@NotNull RecordSerializerFactory serializerFactory,
FileIOFactory ioFactory,
@Nullable FileArchiver archiver,
FileDecompressor decompressor,
IgniteLogger log,
SegmentAware segmentAware,
SegmentRouter segmentRouter,
SegmentFileInputFactory segmentFileInputFactory
) throws IgniteCheckedException {
super(log,
cctx,
serializerFactory,
ioFactory,
dsCfg.getWalRecordIteratorBufferSize(),
segmentFileInputFactory
);
this.walArchiveDir = walArchiveDir;
this.walWorkDir = walWorkDir;
this.archiver = archiver;
this.start = start;
this.end = end;
this.dsCfg = dsCfg;
this.decompressor = decompressor;
this.segmentRouter = segmentRouter;
this.segmentAware = segmentAware;
init();
advance();
}
/** {@inheritDoc} */
@Override protected ReadFileHandle initReadHandle(
@NotNull AbstractFileDescriptor desc,
@Nullable FileWALPointer start
) throws IgniteCheckedException, FileNotFoundException {
AbstractFileDescriptor currDesc = desc;
if (!desc.file().exists()) {
FileDescriptor zipFile = new FileDescriptor(
new File(walArchiveDir, FileDescriptor.fileName(desc.idx())
+ FilePageStoreManager.ZIP_SUFFIX));
if (!zipFile.file.exists()) {
throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " +
"[segmentIdx=" + desc.idx() + "]");
}
if (decompressor != null)
decompressor.decompressFile(desc.idx()).get();
else
currDesc = zipFile;
}
return (ReadFileHandle) super.initReadHandle(currDesc, start);
}
/** {@inheritDoc} */
@Override protected void onClose() throws IgniteCheckedException {
super.onClose();
curRec = null;
closeCurrentWalSegment();
curWalSegmIdx = Integer.MAX_VALUE;
}
/**
* @throws IgniteCheckedException If failed to initialize first file handle.
*/
private void init() throws IgniteCheckedException {
AbstractFileDescriptor[] descs = loadFileDescriptors(walArchiveDir);
if (start != null) {
if (!F.isEmpty(descs)) {
if (descs[0].idx() > start.index())
throw new IgniteCheckedException("WAL history is too short " +
"[descs=" + Arrays.asList(descs) + ", start=" + start + ']');
for (AbstractFileDescriptor desc : descs) {
if (desc.idx() == start.index()) {
curWalSegmIdx = start.index();
break;
}
}
if (curWalSegmIdx == -1) {
long lastArchived = descs[descs.length - 1].idx();
if (lastArchived > start.index())
throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start);
// This pointer may be in work files because archiver did not
// copy the file yet, check that it is not too far forward.
curWalSegmIdx = start.index();
}
}
else {
// This means that whole checkpoint history fits in one segment in WAL work directory.
// Will start from this index right away.
curWalSegmIdx = start.index();
}
}
else
curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx() : 0;
curWalSegmIdx--;
if (log.isDebugEnabled())
log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']');
}
/** {@inheritDoc} */
@Override protected AbstractReadFileHandle advanceSegment(
@Nullable final AbstractReadFileHandle curWalSegment
) throws IgniteCheckedException {
if (curWalSegment != null)
curWalSegment.close();
// We are past the end marker.
if (end != null && curWalSegmIdx + 1 > end.index())
return null; //stop iteration
curWalSegmIdx++;
boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); //lock during creation handle.
ReadFileHandle nextHandle;
try {
FileDescriptor fd = segmentRouter.findSegment(curWalSegmIdx);
if (log.isDebugEnabled())
log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']');
nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null);
}
catch (FileNotFoundException e) {
if (readArchive)
throw new IgniteCheckedException("Missing WAL segment in the archive", e);
else
nextHandle = null;
}
if (!readArchive)
releaseWorkSegment(curWalSegmIdx);
curRec = null;
return nextHandle;
}
/** {@inheritDoc} */
@Override protected IgniteCheckedException handleRecordException(
@NotNull Exception e,
@Nullable FileWALPointer ptr) {
if (e instanceof IgniteCheckedException)
if (X.hasCause(e, IgniteDataIntegrityViolationException.class))
// This means that there is no explicit last sengment, so we iterate unil the very end.
if (end == null) {
long nextWalSegmentIdx = curWalSegmIdx + 1;
// Check that we should not look this segment up in archive directory.
// Basically the same check as in "advanceSegment" method.
if (archiver != null)
if (!canReadArchiveOrReserveWork(nextWalSegmentIdx))
try {
long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments();
FileDescriptor fd = new FileDescriptor(
new File(walWorkDir, FileDescriptor.fileName(workIdx)),
nextWalSegmentIdx
);
try {
ReadFileHandle nextHandle = initReadHandle(fd, null);
// "nextHandle == null" is true only if current segment is the last one in the
// whole history. Only in such case we ignore crc validation error and just stop
// as if we reached the end of the WAL.
if (nextHandle == null)
return null;
}
catch (IgniteCheckedException | FileNotFoundException initReadHandleException) {
e.addSuppressed(initReadHandleException);
}
}
finally {
releaseWorkSegment(nextWalSegmentIdx);
}
}
return super.handleRecordException(e, ptr);
}
/**
* @param absIdx Absolute index to check.
* @return <ul><li> {@code True} if we can safely read the archive, </li> <li>{@code false} if the segment has
* not been archived yet. In this case the corresponding work segment is reserved (will not be deleted until
* release). Use {@link #releaseWorkSegment} for unlock </li></ul>
*/
private boolean canReadArchiveOrReserveWork(long absIdx) {
return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx);
}
/**
* @param absIdx Absolute index to release.
*/
private void releaseWorkSegment(long absIdx) {
if (archiver != null)
archiver.releaseWorkSegment(absIdx);
}
/** {@inheritDoc} */
@Override protected AbstractReadFileHandle createReadFileHandle(SegmentIO fileIO,
RecordSerializer ser, FileInput in) {
return new ReadFileHandle(fileIO, ser, in, segmentAware);
}
}
/**
* Flushes current file handle for {@link WALMode#BACKGROUND} WALMode. Called periodically from scheduler.
*/
private void doFlush() {
FileWriteHandle hnd = currentHandle();
try {
hnd.flush(null);
}
catch (Exception e) {
U.warn(log, "Failed to flush WAL record queue", e);
}
}
/**
* WAL writer worker.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
private class WALWriter extends GridWorker {
/** Unconditional flush. */
private static final long UNCONDITIONAL_FLUSH = -1L;
/** File close. */
private static final long FILE_CLOSE = -2L;
/** File force. */
private static final long FILE_FORCE = -3L;
/** Err. */
private volatile Throwable err;
//TODO: replace with GC free data structure.
/** Parked threads. */
final Map<Thread, Long> waiters = new ConcurrentHashMap<>();
/**
* Default constructor.
*
* @param log Logger.
*/
WALWriter(IgniteLogger log) {
super(cctx.igniteInstanceName(), "wal-write-worker%" + cctx.igniteInstanceName(), log,
cctx.kernalContext().workersRegistry());
}
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
try {
while (!isCancelled()) {
onIdle();
while (waiters.isEmpty()) {
if (!isCancelled()) {
blockingSectionBegin();
try {
LockSupport.park();
}
finally {
blockingSectionEnd();
}
}
else {
unparkWaiters(Long.MAX_VALUE);
return;
}
}
Long pos = null;
for (Long val : waiters.values()) {
if (val > Long.MIN_VALUE)
pos = val;
}
updateHeartbeat();
if (pos == null)
continue;
else if (pos < UNCONDITIONAL_FLUSH) {
try {
assert pos == FILE_CLOSE || pos == FILE_FORCE : pos;
if (pos == FILE_CLOSE)
currHnd.fileIO.close();
else if (pos == FILE_FORCE)
currHnd.fileIO.force();
}
catch (IOException e) {
log.error("Exception in WAL writer thread: ", e);
err = e;
unparkWaiters(Long.MAX_VALUE);
return;
}
unparkWaiters(pos);
}
updateHeartbeat();
List<SegmentedRingByteBuffer.ReadSegment> segs = currentHandle().buf.poll(pos);
if (segs == null) {
unparkWaiters(pos);
continue;
}
for (int i = 0; i < segs.size(); i++) {
SegmentedRingByteBuffer.ReadSegment seg = segs.get(i);
updateHeartbeat();
try {
writeBuffer(seg.position(), seg.buffer());
}
catch (Throwable e) {
log.error("Exception in WAL writer thread:", e);
err = e;
}
finally {
seg.release();
long p = pos <= UNCONDITIONAL_FLUSH || err != null ? Long.MAX_VALUE : currentHandle().written;
unparkWaiters(p);
}
}
}
}
catch (Throwable t) {
err = t;
}
finally {
unparkWaiters(Long.MAX_VALUE);
if (err == null && !isCancelled)
err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
}
/**
* Shutdowns thread.
*/
public void shutdown() throws IgniteInterruptedCheckedException {
U.cancel(this);
LockSupport.unpark(runner());
U.join(runner());
}
/**
* Unparks waiting threads.
*
* @param pos Pos.
*/
private void unparkWaiters(long pos) {
assert pos > Long.MIN_VALUE : pos;
for (Map.Entry<Thread, Long> e : waiters.entrySet()) {
Long val = e.getValue();
if (val <= pos) {
if (val != Long.MIN_VALUE)
waiters.put(e.getKey(), Long.MIN_VALUE);
LockSupport.unpark(e.getKey());
}
}
}
/**
* Forces all made changes to the file.
*/
void force() throws IgniteCheckedException {
flushBuffer(FILE_FORCE);
}
/**
* Closes file.
*/
void close() throws IgniteCheckedException {
flushBuffer(FILE_CLOSE);
}
/**
* Flushes all data from the buffer.
*/
void flushAll() throws IgniteCheckedException {
flushBuffer(UNCONDITIONAL_FLUSH);
}
/**
* @param expPos Expected position.
*/
@SuppressWarnings("ForLoopReplaceableByForEach")
void flushBuffer(long expPos) throws IgniteCheckedException {
if (mmap)
return;
Throwable err = walWriter.err;
if (err != null)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
if (expPos == UNCONDITIONAL_FLUSH)
expPos = (currentHandle().buf.tail());
Thread t = Thread.currentThread();
waiters.put(t, expPos);
LockSupport.unpark(walWriter.runner());
while (true) {
Long val = waiters.get(t);
assert val != null : "Only this thread can remove thread from waiters";
if (val == Long.MIN_VALUE) {
waiters.remove(t);
Throwable walWriterError = walWriter.err;
if (walWriterError != null)
throw new IgniteCheckedException("Flush buffer failed.", walWriterError);
return;
}
else
LockSupport.park();
}
}
/**
* @param pos Position in file to start write from. May be checked against actual position to wait previous
* writes to complete
* @param buf Buffer to write to file
* @throws StorageException If failed.
* @throws IgniteCheckedException If failed.
*/
@SuppressWarnings("TooBroadScope")
private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException {
FileWriteHandle hdl = currentHandle();
assert hdl.fileIO != null : "Writing to a closed segment.";
checkNode();
long lastLogged = U.currentTimeMillis();
long logBackoff = 2_000;
// If we were too fast, need to wait previous writes to complete.
while (hdl.written != pos) {
assert hdl.written < pos : "written = " + hdl.written + ", pos = " + pos; // No one can write further than we are now.
// Permutation occurred between blocks write operations.
// Order of acquiring lock is not the same as order of write.
long now = U.currentTimeMillis();
if (now - lastLogged >= logBackoff) {
if (logBackoff < 60 * 60_000)
logBackoff *= 2;
U.warn(log, "Still waiting for a concurrent write to complete [written=" + hdl.written +
", pos=" + pos + ", lastFsyncPos=" + hdl.lastFsyncPos + ", stop=" + hdl.stop.get() +
", actualPos=" + hdl.safePosition() + ']');
lastLogged = now;
}
checkNode();
}
// Do the write.
int size = buf.remaining();
assert size > 0 : size;
try {
assert hdl.written == hdl.fileIO.position();
hdl.written += hdl.fileIO.writeFully(buf);
metrics.onWalBytesWritten(size);
assert hdl.written == hdl.fileIO.position();
}
catch (IOException e) {
StorageException se = new StorageException("Failed to write buffer.", e);
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se));
throw se;
}
}
}
/**
* Syncs WAL segment file.
*/
private class WalSegmentSyncer extends GridWorker {
/** Sync timeout. */
long syncTimeout;
/**
* @param igniteInstanceName Ignite instance name.
* @param log Logger.
*/
public WalSegmentSyncer(String igniteInstanceName, IgniteLogger log) {
super(igniteInstanceName, "wal-segment-syncer", log);
syncTimeout = Math.max(IgniteSystemProperties.getLong(IGNITE_WAL_SEGMENT_SYNC_TIMEOUT,
DFLT_WAL_SEGMENT_SYNC_TIMEOUT), 100L);
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
sleep(syncTimeout);
try {
flush(null, true);
}
catch (IgniteCheckedException e) {
U.error(log, "Exception when flushing WAL.", e);
}
}
}
/** Shutted down the worker. */
private void shutdown() {
synchronized (this) {
U.cancel(this);
}
U.join(this, log);
}
}
/**
* Scans provided folder for a WAL segment files
* @param walFilesDir directory to scan
* @return found WAL file descriptors
*/
public static FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException {
final File[] files = walFilesDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER);
if (files == null) {
throw new IgniteCheckedException("WAL files directory does not not denote a " +
"directory, or if an I/O error occurs: [" + walFilesDir.getAbsolutePath() + "]");
}
return scan(files);
}
}