| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.database.wal; |
| |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.RandomAccessFile; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.channels.FileChannel; |
| import java.nio.file.Files; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.TreeMap; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.LockSupport; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.regex.Pattern; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.PersistentStoreConfiguration; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.StorageException; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.database.wal.record.HeaderRecord; |
| import org.apache.ignite.internal.processors.cache.database.wal.serializer.RecordV1Serializer; |
| import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.G; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * File WAL manager. |
| */ |
| public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager { |
| /** */ |
| public static final String WAL_SEGMENT_FILE_EXT = ".wal"; |
| |
| /** */ |
| private static final byte[] FILL_BUF = new byte[1024 * 1024]; |
| |
| /** */ |
| private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal"); |
| |
| /** */ |
| private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.v\\d+\\.wal\\.tmp"); |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** System property (env variable) for configuring DB WAL mode, see values in {@link Mode} */ |
| public static final String IGNITE_PDS_WAL_MODE = "IGNITE_PDS_WAL_MODE"; |
| |
| /** Thread local byte buffer size */ |
| public static final String IGNITE_PDS_WAL_TLB_SIZE = "IGNITE_PDS_WAL_TLB_SIZE"; |
| |
| /** WAL flush frequency for {@link Mode#BACKGROUND} log mode */ |
| public static final String IGNITE_PDS_WAL_FLUSH_FREQ = "IGNITE_PDS_WAL_FLUSH_FREQUENCY"; |
| |
| /** */ |
| public static final String IGNITE_PDS_WAL_FSYNC_DELAY = "IGNITE_PDS_WAL_FSYNC_DELAY"; // TODO may be move to config |
| |
| /** Ignite pds wal record iterator buffer size. */ |
| public static final String IGNITE_PDS_WAL_RECORD_ITERATOR_BUFFER_SIZE = "IGNITE_PDS_WAL_RECORD_ITERATOR_BUFFER_SIZE"; |
| |
| /** */ |
| public static final String IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES = "IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES"; |
| |
| /** */ |
| private static long fsyncDelayNanos = IgniteSystemProperties.getLong(IGNITE_PDS_WAL_FSYNC_DELAY, 1); |
| |
| /** Thread local byte buffer size, see {@link #tlb} */ |
| public final int tlbSize = IgniteSystemProperties.getInteger(IGNITE_PDS_WAL_TLB_SIZE, 128 * 1024); |
| |
| /** WAL flush frequency. Makes sense only for {@link Mode#BACKGROUND} log mode. */ |
| public static final int FLUSH_FREQ = IgniteSystemProperties.getInteger(IGNITE_PDS_WAL_FLUSH_FREQ, 2_000); |
| |
| /** */ |
| private final boolean alwaysWriteFullPages = |
| IgniteSystemProperties.getBoolean(IGNITE_PDS_WAL_ALWAYS_WRITE_FULL_PAGES, false); |
| |
| /** WAL segment size in bytes */ |
| private long maxWalSegmentSize; |
| |
| /** */ |
| private final PersistentStoreConfiguration dbCfg; |
| |
| /** */ |
| private IgniteConfiguration igCfg; |
| |
| /** */ |
| private File walWorkDir; |
| |
| /** */ |
| private File walArchiveDir; |
| |
| /** Current log segment handle */ |
| private volatile FileWriteHandle currentHnd; |
| |
| /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */ |
| private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd = |
| AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd"); |
| |
| /** |
| * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}. |
| * Introduced to decrease number of buffers allocation. |
| * Used only for record itself is shorter than {@link #tlbSize}. |
| */ |
| private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() { |
| @Override protected ByteBuffer initialValue() { |
| ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize); |
| |
| buf.order(GridUnsafe.NATIVE_BYTE_ORDER); |
| |
| return buf; |
| } |
| }; |
| |
| /** */ |
| private RecordSerializer serializer; |
| |
| /** WAL file archiver thread, started on server nodes at cluster init */ |
| private volatile FileArchiver archiver; |
| |
| /** */ |
| private final Mode mode; |
| |
| /** */ |
| private QueueFlusher flusher; |
| |
| /** */ |
| private ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>(); |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public FileWriteAheadLogManager(GridKernalContext ctx) { |
| igCfg = ctx.config(); |
| |
| PersistentStoreConfiguration dbCfg = igCfg.getPersistentStoreConfiguration(); |
| |
| assert dbCfg != null : "WAL should not be created if persistence is disabled."; |
| |
| this.dbCfg = dbCfg; |
| this.igCfg = igCfg; |
| |
| maxWalSegmentSize = dbCfg.getWalSegmentSize(); |
| |
| String modeStr = IgniteSystemProperties.getString(IGNITE_PDS_WAL_MODE); |
| mode = modeStr == null ? Mode.DEFAULT : Mode.valueOf(modeStr.trim().toUpperCase()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start0() throws IgniteCheckedException { |
| String consId = consistentId(); |
| |
| if (!cctx.kernalContext().clientNode()) { |
| A.notNullOrEmpty(consId, "consistentId"); |
| |
| consId = U.maskForFileName(consId); |
| |
| checkWalConfiguration(); |
| walWorkDir = initDirectory(dbCfg.getWalStorePath(), "db/wal", consId, "write ahead log work directory"); |
| walArchiveDir = initDirectory(dbCfg.getWalArchivePath(), "db/wal/archive", consId, |
| "write ahead log archive directory"); |
| |
| serializer = new RecordV1Serializer(cctx); |
| |
| checkOrPrepareFiles(); |
| |
| archiver = new FileArchiver(); |
| |
| if (mode != Mode.DEFAULT) { |
| if (log.isInfoEnabled()) |
| log.info("Started write-ahead log manager [mode=" + mode + ']'); |
| } |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa) |
| */ |
| private void checkWalConfiguration() throws IgniteCheckedException { |
| if (dbCfg.getWalStorePath() == null ^ dbCfg.getWalArchivePath() == null) { |
| throw new IgniteCheckedException("Properties should be either both specified or both null " + |
| "[walStorePath = " + dbCfg.getWalStorePath() + ", walArchivePath = " + dbCfg.getWalArchivePath() + "]"); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException { |
| super.onKernalStart0(reconnect); |
| |
| if (!cctx.kernalContext().clientNode() && cctx.kernalContext().state().active()) |
| archiver.start(); |
| } |
| |
| /** |
| * @return Consistent ID. |
| */ |
| protected String consistentId() { |
| return cctx.discovery().consistentId().toString(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void stop0(boolean cancel) { |
| FileWriteHandle currentHnd = currentHandle(); |
| |
| try { |
| QueueFlusher flusher0 = flusher; |
| |
| if (flusher0 != null) { |
| flusher0.shutdown(); |
| |
| if (currentHnd != null) |
| currentHnd.flush((FileWALPointer)null); |
| } |
| |
| if (currentHnd != null) |
| currentHnd.close(false); |
| |
| if (archiver != null) |
| archiver.shutdown(); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.file, e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Activate file write ahead log [nodeId=" + cctx.localNodeId() + |
| " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); |
| |
| start0(); |
| |
| if (!cctx.kernalContext().clientNode()) { |
| archiver = new FileArchiver(); |
| |
| archiver.start(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeActivate(GridKernalContext kctx) { |
| if (log.isDebugEnabled()) |
| log.debug("DeActivate file write ahead log [nodeId=" + cctx.localNodeId() + |
| " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); |
| |
| stop0(true); |
| |
| currentHnd = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isAlwaysWriteFullPages() { |
| return alwaysWriteFullPages; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isFullSync() { |
| return mode == Mode.DEFAULT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { |
| try { |
| assert currentHnd == null; |
| assert lastPtr == null || lastPtr instanceof FileWALPointer; |
| |
| FileWALPointer filePtr = (FileWALPointer)lastPtr; |
| |
| currentHnd = restoreWriteHandle(filePtr); |
| |
| if (mode == Mode.BACKGROUND) { |
| flusher = new QueueFlusher(cctx.igniteInstanceName()); |
| |
| flusher.start(); |
| } |
| } |
| catch (StorageException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("TooBroadScope") |
| @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException { |
| if (serializer == null || mode == Mode.NONE) |
| return null; |
| |
| FileWriteHandle current = currentHandle(); |
| |
| // Logging was not resumed yet. |
| if (current == null) |
| return null; |
| |
| // Need to calculate record size first. |
| record.size(serializer.size(record)); |
| |
| for (; ; current = rollOver(current)) { |
| WALPointer ptr = current.addRecord(record); |
| |
| if (ptr != null) { |
| lastWALPtr.set(ptr); |
| |
| return ptr; |
| } |
| |
| if (isStopping()) |
| throw new IgniteCheckedException("Stopping."); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void fsync(WALPointer ptr) throws IgniteCheckedException, StorageException { |
| if (serializer == null || mode == Mode.NONE || mode == Mode.BACKGROUND) |
| return; |
| |
| FileWriteHandle cur = currentHandle(); |
| |
| // WAL manager was not started (client node). |
| if (cur == null) |
| return; |
| |
| FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr); |
| |
| if (mode == Mode.LOG_ONLY) { |
| cur.flushOrWait(filePtr); |
| |
| return; |
| } |
| |
| // No need to sync if was rolled over. |
| if (filePtr != null && !cur.needFsync(filePtr)) |
| return; |
| |
| cur.fsync(filePtr); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public WALIterator replay(WALPointer start) |
| throws IgniteCheckedException, StorageException { |
| assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| FileWriteHandle hnd = currentHandle(); |
| |
| FileWALPointer end = null; |
| |
| if (hnd != null) |
| end = hnd.position(); |
| |
| return new RecordsIterator(cctx, walWorkDir, walArchiveDir, (FileWALPointer)start, end, dbCfg, serializer, |
| archiver, log, tlbSize); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean reserve(WALPointer start) throws IgniteCheckedException { |
| assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| if (mode == Mode.NONE) |
| return false; |
| |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| throw new IgniteCheckedException("Could not reserve WAL segment: archiver == null"); |
| |
| archiver0.reserve(((FileWALPointer)start).index()); |
| |
| if (!hasIndex(((FileWALPointer)start).index())) { |
| archiver0.release(((FileWALPointer)start).index()); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void release(WALPointer start) throws IgniteCheckedException { |
| assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| if (mode == Mode.NONE) |
| return; |
| |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| throw new IgniteCheckedException("Could not release WAL segment: archiver == null"); |
| |
| archiver0.release(((FileWALPointer)start).index()); |
| } |
| |
| private boolean hasIndex(int absIdx) { |
| String name = FileDescriptor.fileName(absIdx, serializer.version()); |
| |
| boolean inArchive = new File(walArchiveDir, name).exists(); |
| |
| if (inArchive) |
| return true; |
| |
| if (absIdx <= archiver.lastArchivedIndex()) |
| return false; |
| |
| FileWriteHandle cur = currentHnd; |
| |
| return cur != null && cur.idx >= absIdx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int truncate(WALPointer ptr) { |
| if (ptr == null) |
| return 0; |
| |
| assert ptr instanceof FileWALPointer : ptr; |
| |
| FileWALPointer fPtr = (FileWALPointer)ptr; |
| |
| FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); |
| |
| int deleted = 0; |
| |
| FileArchiver archiver0 = archiver; |
| |
| for (FileDescriptor desc : descs) { |
| // Do not delete reserved segment and any segment after it. |
| if (archiver0 != null && archiver0.reserved(desc.idx)) |
| return deleted; |
| |
| // We need to leave at least one archived segment to correctly determine the archive index. |
| if (desc.idx + 1 < fPtr.index()) { |
| if (!desc.file.delete()) |
| U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + |
| desc.file.getAbsolutePath()); |
| else |
| deleted++; |
| } |
| } |
| |
| return deleted; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean reserved(WALPointer ptr) { |
| FileWALPointer fPtr = (FileWALPointer)ptr; |
| |
| FileArchiver archiver0 = archiver; |
| |
| return archiver0 != null && archiver0.reserved(fPtr.index()); |
| } |
| |
| /** |
| * Creates a directory specified by the given arguments. |
| * |
| * @param cfg Configured directory path, may be {@code null}. |
| * @param defDir Default directory path, will be used if cfg is {@code null}. |
| * @param consId Local node consistent ID. |
| * @param msg File description to print out on successful initialization. |
| * @return Initialized directory. |
| * @throws IgniteCheckedException |
| */ |
| private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException { |
| File dir; |
| |
| if (cfg != null) { |
| File workDir0 = new File(cfg); |
| |
| //TODO check path |
| dir = workDir0.isAbsolute() ? |
| new File(workDir0, consId) : |
| new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, false), consId); |
| } |
| else |
| dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), defDir, false), consId); |
| |
| U.ensureDirectory(dir, msg, log); |
| |
| return dir; |
| } |
| |
| /** |
| * @return Current log segment handle. |
| */ |
| private FileWriteHandle currentHandle() { |
| return currentHnd; |
| } |
| |
| /** |
| * @param cur Handle that failed to fit the given entry. |
| * @return Handle that will fit the entry. |
| */ |
| private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, IgniteCheckedException { |
| FileWriteHandle hnd = currentHandle(); |
| |
| if (hnd != cur) |
| return hnd; |
| |
| if (hnd.close(true)) { |
| FileWriteHandle next = initNextWriteHandle(cur.idx); |
| |
| boolean swapped = currentHndUpd.compareAndSet(this, hnd, next); |
| |
| assert swapped : "Concurrent updates on rollover are not allowed"; |
| |
| hnd.signalNextAvailable(); // let other threads to proceed with new segment |
| } |
| else |
| hnd.awaitNext(); |
| |
| return currentHandle(); |
| } |
| |
| /** |
| * @param lastReadPtr Last read WAL file pointer. |
| * @return Initialized file write handle. |
| * @throws IgniteCheckedException If failed to initialize WAL write handle. |
| */ |
| private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { |
| int absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); |
| |
| archiver.currentWalIndex(absIdx); |
| |
| int segNo = absIdx % dbCfg.getWalSegments(); |
| |
| File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo, serializer.version())); |
| |
| int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); |
| int len = lastReadPtr == null ? 0 : lastReadPtr.length(); |
| |
| log.info("Resuming logging in WAL segment [file=" + curFile.getAbsolutePath() + |
| ", offset=" + offset + ']'); |
| |
| try { |
| RandomAccessFile file = new RandomAccessFile(curFile, "rw"); |
| |
| try { |
| FileWriteHandle hnd = new FileWriteHandle( |
| file, |
| absIdx, |
| cctx.igniteInstanceName(), |
| offset + len, |
| maxWalSegmentSize, |
| serializer); |
| |
| if (lastReadPtr == null) { |
| HeaderRecord header = new HeaderRecord(serializer.version()); |
| |
| header.size(serializer.size(header)); |
| |
| hnd.addRecord(header); |
| } |
| |
| return hnd; |
| } |
| catch (IgniteCheckedException | IOException e) { |
| file.close(); |
| |
| throw e; |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * Fills the file header for a new segment. |
| * Calling this method signals we are done with the segment and it can be archived. |
| * If we don't have prepared file yet and achiever is busy this method blocks |
| * |
| * @param curIdx current segment released by WAL writer |
| * @return Initialized file handle. |
| * @throws StorageException If IO exception occurred. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private FileWriteHandle initNextWriteHandle(int curIdx) throws StorageException, IgniteCheckedException { |
| try { |
| File nextFile = pollNextFile(curIdx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); |
| |
| RandomAccessFile file = new RandomAccessFile(nextFile, "rw"); |
| |
| FileWriteHandle hnd = new FileWriteHandle( |
| file, |
| curIdx + 1, |
| cctx.igniteInstanceName(), |
| 0, |
| maxWalSegmentSize, |
| serializer); |
| |
| HeaderRecord header = new HeaderRecord(serializer.version()); |
| |
| header.size(serializer.size(header)); |
| |
| hnd.addRecord(header); |
| |
| return hnd; |
| } |
| catch (IOException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| /** |
| * Deletes temp files, creates and prepares new; Creates first segment if necessary |
| */ |
| private void checkOrPrepareFiles() throws IgniteCheckedException { |
| // Clean temp files. |
| { |
| File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER); |
| |
| if (!F.isEmpty(tmpFiles)) { |
| for (File tmp : tmpFiles) { |
| boolean deleted = tmp.delete(); |
| |
| if (!deleted) |
| throw new IgniteCheckedException("Failed to delete previously created temp file " + |
| "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); |
| } |
| } |
| } |
| |
| File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); |
| |
| if (allFiles.length != 0 && allFiles.length > dbCfg.getWalSegments()) |
| throw new IgniteCheckedException("Failed to initialize wal (work directory contains " + |
| "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dbCfg.getWalSegments() + ']'); |
| |
| // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. |
| if (allFiles.length == 0) { |
| File first = new File(walWorkDir, FileDescriptor.fileName(0, serializer.version())); |
| |
| createFile(first); |
| } |
| else |
| checkFiles(0, false, null); |
| } |
| |
| /** |
| * Clears the file with zeros. |
| * |
| * @param file File to format. |
| */ |
| private void formatFile(File file) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); |
| |
| try (RandomAccessFile rnd = new RandomAccessFile(file, "rw")) { |
| int left = dbCfg.getWalSegmentSize(); |
| |
| if (mode == Mode.DEFAULT) { |
| while (left > 0) { |
| int toWrite = Math.min(FILL_BUF.length, left); |
| |
| rnd.write(FILL_BUF, 0, toWrite); |
| |
| left -= toWrite; |
| } |
| |
| rnd.getChannel().force(false); |
| } |
| else { |
| rnd.setLength(0); |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * Creates a file atomically with temp file. |
| * |
| * @param file File to create. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void createFile(File file) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); |
| |
| File tmp = new File(file.getParent(), file.getName() + ".tmp"); |
| |
| formatFile(tmp); |
| |
| try { |
| Files.move(tmp.toPath(), file.toPath()); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to move temp file to a regular WAL segment file: " + |
| file.getAbsolutePath(), e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created WAL segment [file=" + file.getAbsolutePath() + ", size=" + file.length() + ']'); |
| } |
| |
| /** |
| * Retrieves next available file to write WAL data, waiting |
| * if necessary for a segment to become available. |
| * |
| * @param curIdx Current absolute WAL segment index. |
| * @return File ready for use as new WAL segment. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private File pollNextFile(int curIdx) throws IgniteCheckedException { |
| // Signal to archiver that we are done with the segment and it can be archived. |
| int absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx); |
| |
| int segmentIdx = absNextIdx % dbCfg.getWalSegments(); |
| |
| return new File(walWorkDir, FileDescriptor.fileName(segmentIdx, serializer.version())); |
| } |
| |
| /** |
| * @param ver Serializer version. |
| * @return Entry serializer. |
| */ |
| private static RecordSerializer forVersion(GridCacheSharedContext cctx, int ver) throws IgniteCheckedException { |
| if (ver <= 0) |
| throw new IgniteCheckedException("Failed to create a serializer (corrupted WAL file)."); |
| |
| switch (ver) { |
| case 1: |
| return new RecordV1Serializer(cctx); |
| |
| default: |
| throw new IgniteCheckedException("Failed to create a serializer with the given version " + |
| "(forward compatibility is not supported): " + ver); |
| } |
| } |
| |
| /** |
| * @return Sorted WAL files descriptors. |
| */ |
| private static FileDescriptor[] scan(File[] allFiles) { |
| if (allFiles == null) |
| return new FileDescriptor[0]; |
| |
| FileDescriptor[] descs = new FileDescriptor[allFiles.length]; |
| |
| for (int i = 0; i < allFiles.length; i++) { |
| File f = allFiles[i]; |
| |
| descs[i] = new FileDescriptor(f); |
| } |
| |
| Arrays.sort(descs); |
| |
| return descs; |
| } |
| |
| /** |
| * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate |
| * the work WAL segment: S(N) = N % dbCfg.walSegments. |
| * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment |
| * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow |
| * write to S(W) if W - A <= walSegments. <br> |
| * |
| * Monitor of current object is used for notify on: |
| * <ul> |
| * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li> |
| * <li>stopping thread ({@link FileArchiver#stopped}==true)</li> |
| * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li> |
| * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li> |
| * <li>some WAL index was removed from {@link FileArchiver#locked} map</li> |
| * </ul> |
| */ |
| private class FileArchiver extends Thread { |
| /** Exception which occurred during initial creation of files or during archiving WAL segment */ |
| private IgniteCheckedException cleanException; |
| |
| /** |
| * Absolute current segment index WAL Manger writes to. Guarded by <code>this</code>. |
| * Incremented during rollover. Also may be directly set if WAL is resuming logging after start. |
| */ |
| private int curAbsWalIdx = -1; |
| |
| /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */ |
| private int lastAbsArchivedIdx = -1; |
| |
| /** current thread stopping advice */ |
| private volatile boolean stopped; |
| |
| /** */ |
| private NavigableMap<Integer, Integer> reserved = new TreeMap<>(); |
| |
| /** |
| * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may |
| * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. |
| */ |
| private Map<Integer, Integer> locked = new HashMap<>(); |
| |
| /** |
| * |
| */ |
| private FileArchiver() { |
| super("wal-file-archiver%" + cctx.igniteInstanceName()); |
| |
| lastAbsArchivedIdx = lastArchivedIndex(); |
| } |
| |
| /** |
| * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. |
| */ |
| private void shutdown() throws IgniteInterruptedCheckedException { |
| synchronized (this) { |
| stopped = true; |
| |
| notifyAll(); |
| } |
| |
| U.join(this); |
| } |
| |
| /** |
| * @param curAbsWalIdx Current absolute WAL segment index. |
| */ |
| private void currentWalIndex(int curAbsWalIdx) { |
| synchronized (this) { |
| this.curAbsWalIdx = curAbsWalIdx; |
| |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * @param absIdx Index for reservation. |
| */ |
| private synchronized void reserve(int absIdx) { |
| Integer cur = reserved.get(absIdx); |
| |
| if (cur == null) |
| reserved.put(absIdx, 1); |
| else |
| reserved.put(absIdx, cur + 1); |
| } |
| |
| /** |
| * @param absIdx Index for reservation. |
| * @return {@code True} if index is reserved. |
| */ |
| private synchronized boolean reserved(int absIdx) { |
| return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null; |
| } |
| |
| /** |
| * @param absIdx Reserved index. |
| */ |
| private synchronized void release(int absIdx) { |
| Integer cur = reserved.get(absIdx); |
| |
| assert cur != null && cur >= 1 : cur; |
| |
| if (cur == 1) |
| reserved.remove(absIdx); |
| else |
| reserved.put(absIdx, cur - 1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| allocateRemainingFiles(); |
| } |
| catch (IgniteCheckedException e) { |
| synchronized (this) { |
| // Stop the thread and report to starter. |
| cleanException = e; |
| |
| notifyAll(); |
| |
| return; |
| } |
| } |
| |
| try { |
| synchronized (this) { |
| while (curAbsWalIdx == -1 && !stopped) |
| wait(); |
| |
| if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1) |
| lastAbsArchivedIdx = curAbsWalIdx - 1; |
| } |
| |
| while (!Thread.currentThread().isInterrupted() && !stopped) { |
| int toArchive; |
| |
| synchronized (this) { |
| assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx + |
| ", current=" + curAbsWalIdx; |
| |
| while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && !stopped) |
| wait(); |
| |
| toArchive = lastAbsArchivedIdx + 1; |
| } |
| |
| if (stopped) |
| break; |
| |
| try { |
| File workFile = archiveSegment(toArchive); |
| |
| synchronized (this) { |
| while (locked.containsKey(toArchive) && !stopped) |
| wait(); |
| |
| // Firstly, format working file |
| if (!stopped) |
| formatFile(workFile); |
| |
| // Then increase counter to allow rollover on clean working file |
| lastAbsArchivedIdx = toArchive; |
| |
| notifyAll(); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| synchronized (this) { |
| cleanException = e; |
| |
| notifyAll(); |
| } |
| } |
| } |
| } |
| catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * Gets the absolute index of the next WAL segment available to write. |
| * Blocks till there are available file to write |
| * |
| * @param curIdx Current absolute index that we want to increment. |
| * @return Next index (curIdx+1) when it is ready to be written. |
| * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread). |
| */ |
| private int nextAbsoluteSegmentIndex(int curIdx) throws IgniteCheckedException { |
| try { |
| synchronized (this) { |
| if (cleanException != null) |
| throw cleanException; |
| |
| assert curIdx == curAbsWalIdx; |
| |
| curAbsWalIdx++; |
| |
| // Notify archiver thread. |
| notifyAll(); |
| |
| while (curAbsWalIdx - lastAbsArchivedIdx > dbCfg.getWalSegments() && cleanException == null) |
| wait(); |
| |
| return curAbsWalIdx; |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedCheckedException(e); |
| } |
| } |
| |
| /** |
| * @param absIdx Segment absolute index. |
| * @return {@code True} if can read, {@code false} if work segment |
| */ |
| @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") |
| private boolean checkCanReadArchiveOrReserveWorkSegment(int absIdx) { |
| synchronized (this) { |
| if (lastAbsArchivedIdx >= absIdx) |
| return true; |
| |
| Integer cur = locked.get(absIdx); |
| |
| cur = cur == null ? 1 : cur + 1; |
| |
| locked.put(absIdx, cur); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reserved work segment [absIdx=" + absIdx + ", pins=" + cur + ']'); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * @param absIdx Segment absolute index. |
| */ |
| @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") |
| private void releaseWorkSegment(int absIdx) { |
| synchronized (this) { |
| Integer cur = locked.get(absIdx); |
| |
| assert cur != null && cur > 0; |
| |
| if (cur == 1) { |
| locked.remove(absIdx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Fully released work segment (ready to archive) [absIdx=" + absIdx + ']'); |
| } |
| else { |
| locked.put(absIdx, cur - 1); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partially released work segment [absIdx=" + absIdx + ", pins=" + (cur - 1) + ']'); |
| } |
| |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * @param absIdx Absolute index to archive. |
| */ |
| private File archiveSegment(int absIdx) throws IgniteCheckedException { |
| int segIdx = absIdx % dbCfg.getWalSegments(); |
| |
| File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx, serializer.version())); |
| |
| String name = FileDescriptor.fileName(absIdx, serializer.version()); |
| |
| File dstTmpFile = new File(walArchiveDir, name + ".tmp"); |
| |
| File dstFile = new File(walArchiveDir, name); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + |
| ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']'); |
| |
| try { |
| Files.deleteIfExists(dstTmpFile.toPath()); |
| |
| Files.copy(origFile.toPath(), dstTmpFile.toPath()); |
| |
| Files.move(dstTmpFile.toPath(), dstFile.toPath()); |
| |
| if (mode == Mode.DEFAULT) { |
| try (RandomAccessFile f0 = new RandomAccessFile(dstFile, "rw")) { |
| f0.getChannel().force(false); |
| } |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to archive WAL segment [" + |
| "srcFile=" + origFile.getAbsolutePath() + |
| ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Copied file [src=" + origFile.getAbsolutePath() + |
| ", dst=" + dstFile.getAbsolutePath() + ']'); |
| |
| return origFile; |
| } |
| |
| /** |
| * Lists files in archive directory and returns the index of last archived file. |
| * |
| * @return The absolute index of last archived file. |
| */ |
| private int lastArchivedIndex() { |
| int lastIdx = -1; |
| |
| for (File file : walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)) { |
| try { |
| int idx = Integer.parseInt(file.getName().substring(0, 16)); |
| |
| lastIdx = Math.max(lastIdx, idx); |
| } |
| catch (NumberFormatException | IndexOutOfBoundsException ignore) { |
| |
| } |
| } |
| |
| return lastIdx; |
| } |
| |
| /** |
| * |
| */ |
| private boolean checkStop() { |
| return stopped; |
| } |
| |
| /** |
| * Background creation of all segments except first. First segment was created in main thread by |
| * {@link FileWriteAheadLogManager#checkOrPrepareFiles()} |
| */ |
| private void allocateRemainingFiles() throws IgniteCheckedException { |
| checkFiles(1, true, new IgnitePredicate<Integer>() { |
| @Override public boolean apply(Integer integer) { |
| return !checkStop(); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Validate files depending on {@link PersistentStoreConfiguration#getWalSegments()} and create if need. |
| * Check end when exit condition return false or all files are passed. |
| * |
| * @param startWith Start with. |
| * @param create Flag create file. |
| * @param p Predicate Exit condition. |
| * @throws IgniteCheckedException if validation or create file fail. |
| */ |
| private void checkFiles(int startWith, boolean create, IgnitePredicate<Integer> p) throws IgniteCheckedException { |
| for (int i = startWith; i < dbCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { |
| File checkFile = new File(walWorkDir, FileDescriptor.fileName(i, serializer.version())); |
| |
| if (checkFile.exists()) { |
| if (checkFile.isDirectory()) |
| throw new IgniteCheckedException("Failed to initialize WAL log segment (a directory with " + |
| "the same name already exists): " + checkFile.getAbsolutePath()); |
| else if (checkFile.length() != dbCfg.getWalSegmentSize() && mode == Mode.DEFAULT) |
| throw new IgniteCheckedException("Failed to initialize WAL log segment " + |
| "(WAL segment size change is not supported):" + checkFile.getAbsolutePath()); |
| } |
| else if (create) |
| createFile(checkFile); |
| } |
| } |
| |
| /** |
| * WAL file descriptor. |
| */ |
| private static class FileDescriptor implements Comparable<FileDescriptor> { |
| /** */ |
| protected final File file; |
| |
| /** Absolute WAL segment file index */ |
| protected final int idx; |
| |
| /** */ |
| protected final int ver; |
| |
| /** |
| * @param file File. |
| */ |
| private FileDescriptor(File file) { |
| this(file, null); |
| } |
| |
| /** |
| * @param file File. |
| * @param idx Absolute WAL segment file index. |
| */ |
| private FileDescriptor(File file, Integer idx) { |
| this.file = file; |
| |
| String fileName = file.getName(); |
| |
| assert fileName.endsWith(WAL_SEGMENT_FILE_EXT); |
| |
| int v = fileName.lastIndexOf(".v"); |
| |
| assert v > 0; |
| |
| int begin = v + 2; |
| int end = fileName.length() - WAL_SEGMENT_FILE_EXT.length(); |
| |
| if (idx == null) |
| this.idx = Integer.parseInt(fileName.substring(0, v)); |
| else |
| this.idx = idx; |
| |
| ver = Integer.parseInt(fileName.substring(begin, end)); |
| } |
| |
| /** |
| * @param segment Segment index. |
| * @param ver Serializer version. |
| * @return Segment file name. |
| */ |
| private static String fileName(long segment, int ver) { |
| SB b = new SB(); |
| |
| String segmentStr = Long.toString(segment); |
| |
| for (int i = segmentStr.length(); i < 16; i++) |
| b.a('0'); |
| |
| b.a(segmentStr).a(".v").a(ver).a(WAL_SEGMENT_FILE_EXT); |
| |
| return b.toString(); |
| } |
| |
| /** |
| * @param segment Segment number as integer. |
| * @return Segment number as aligned string. |
| */ |
| private static String segmentNumber(long segment) { |
| SB b = new SB(); |
| |
| String segmentStr = Long.toString(segment); |
| |
| for (int i = segmentStr.length(); i < 16; i++) |
| b.a('0'); |
| |
| b.a(segmentStr); |
| |
| return b.toString(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int compareTo(FileDescriptor o) { |
| return Long.compare(idx, o.idx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (!(o instanceof FileDescriptor)) |
| return false; |
| |
| FileDescriptor that = (FileDescriptor)o; |
| |
| return idx == that.idx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return idx; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private abstract static class FileHandle { |
| /** */ |
| protected RandomAccessFile file; |
| |
| /** */ |
| protected FileChannel ch; |
| |
| /** */ |
| protected final int idx; |
| |
| /** */ |
| protected String gridName; |
| |
| /** |
| * @param file File. |
| * @param idx Index. |
| */ |
| private FileHandle(RandomAccessFile file, int idx, String gridName) { |
| this.file = file; |
| this.idx = idx; |
| this.gridName = gridName; |
| |
| ch = file.getChannel(); |
| |
| assert ch != null; |
| } |
| } |
| |
| private static class ReadFileHandle extends FileHandle { |
| /** Entry serializer. */ |
| private RecordSerializer ser; |
| |
| /** */ |
| private FileInput in; |
| |
| /** */ |
| private boolean workDir; |
| |
| /** |
| * @param file File to read. |
| * @param idx File index. |
| * @param ser Entry serializer. |
| * @param in File input. |
| */ |
| private ReadFileHandle( |
| RandomAccessFile file, |
| int idx, |
| String gridName, |
| RecordSerializer ser, |
| FileInput in |
| ) { |
| super(file, idx, gridName); |
| |
| this.ser = ser; |
| this.in = in; |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to close the WAL segment file. |
| */ |
| public void close() throws IgniteCheckedException { |
| try { |
| file.close(); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| } |
| |
| /** |
| * File handle for one log segment. |
| */ |
| @SuppressWarnings("SignalWithoutCorrespondingAwait") |
| private class FileWriteHandle extends FileHandle { |
| /** */ |
| private final RecordSerializer serializer; |
| |
| /** See {@link FileWriteAheadLogManager#maxWalSegmentSize} */ |
| private final long maxSegmentSize; |
| |
| /** |
| * Accumulated WAL records chain. |
| * This reference points to latest WAL record. |
| * When writing records chain is iterated from latest to oldest (see {@link WALRecord#previous()}) |
| * Records from chain are saved into buffer in reverse order |
| */ |
| private final AtomicReference<WALRecord> head = new AtomicReference<>(); |
| |
| /** Position in current file after the end of last written record (incremented after file channel write operation) */ |
| private volatile long written; |
| |
| /** */ |
| private volatile long lastFsyncPos; |
| |
| /** Environment failure. */ |
| private volatile Throwable envFailed; |
| |
| /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/ |
| private final AtomicBoolean stop = new AtomicBoolean(false); |
| |
| /** */ |
| private final Lock lock = new ReentrantLock(); |
| |
| /** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */ |
| private final Condition writeComplete = lock.newCondition(); |
| |
| /** Condition for timed wait of several threads, see {@link #fsyncDelayNanos} */ |
| private final Condition fsync = lock.newCondition(); |
| |
| /** |
| * Next segment available condition. |
| * Protection from "spurious wakeup" is provided by predicate {@link #ch}=<code>null</code> |
| */ |
| private final Condition nextSegment = lock.newCondition(); |
| |
| /** |
| * @param file Mapped file to use. |
| * @param idx Absolute WAL segment file index for easy access. |
| * @param pos Position. |
| * @param maxSegmentSize Max segment size. |
| * @param serializer Serializer. |
| * @throws IOException If failed. |
| */ |
| private FileWriteHandle( |
| RandomAccessFile file, |
| int idx, |
| String gridName, |
| long pos, |
| long maxSegmentSize, |
| RecordSerializer serializer |
| ) throws IOException { |
| super(file, idx, gridName); |
| |
| assert serializer != null; |
| |
| ch.position(pos); |
| |
| this.maxSegmentSize = maxSegmentSize; |
| this.serializer = serializer; |
| |
| head.set(new FakeRecord(pos)); |
| written = pos; |
| lastFsyncPos = pos; |
| } |
| |
| /** |
| * @param rec Record to be added to record chain as new {@link #head} |
| * @return Pointer or null if roll over to next segment is required or already started by other thread. |
| * @throws StorageException If failed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException { |
| assert rec.size() > 0 || rec.getClass() == FakeRecord.class; |
| |
| boolean flushed = false; |
| |
| for (; ; ) { |
| WALRecord h = head.get(); |
| |
| long nextPos = nextPosition(h); |
| |
| // It is important that we read `stop` after `head` in this loop for correct close, |
| // because otherwise we will have a race on the last flush in close. |
| if (nextPos + rec.size() >= maxSegmentSize || stop.get()) { |
| // Can not write to this segment, need to switch to the next one. |
| return null; |
| } |
| |
| int newChainSize = h.chainSize() + rec.size(); |
| |
| if (newChainSize > tlbSize && !flushed) { |
| boolean res = h.previous() == null || flush(h); |
| |
| if (rec.size() > tlbSize) |
| flushed = res; |
| |
| continue; |
| } |
| |
| rec.chainSize(newChainSize); |
| rec.previous(h); |
| rec.position(nextPos); |
| |
| if (head.compareAndSet(h, rec)) |
| return new FileWALPointer(idx, (int)rec.position(), rec.size()); |
| } |
| } |
| |
| /** |
| * @param rec Record. |
| * @return Position for the next record. |
| */ |
| private long nextPosition(WALRecord rec) { |
| return rec.position() + rec.size(); |
| } |
| |
| /** |
| * Flush or wait for concurrent flush completion. |
| * |
| * @param ptr Pointer. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void flushOrWait(FileWALPointer ptr) throws IgniteCheckedException { |
| long expWritten; |
| |
| if (ptr != null) { |
| // If requested obsolete file index, it must be already flushed by close. |
| if (ptr.index() != idx) |
| return; |
| |
| expWritten = ptr.fileOffset(); |
| } |
| else // We read head position before the flush because otherwise we can get wrong position. |
| expWritten = head.get().position(); |
| |
| if (flush(ptr)) |
| return; |
| |
| // Spin-wait for a while before acquiring the lock. |
| for (int i = 0; i < 64; i++) { |
| if (written >= expWritten) |
| return; |
| } |
| |
| // If we did not flush ourselves then await for concurrent flush to complete. |
| lock.lock(); |
| |
| try { |
| while (written < expWritten && envFailed == null) |
| U.await(writeComplete); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param ptr Pointer. |
| * @return {@code true} If the flush really happened. |
| * @throws IgniteCheckedException If failed. |
| * @throws StorageException If failed. |
| */ |
| private boolean flush(FileWALPointer ptr) throws IgniteCheckedException, StorageException { |
| if (ptr == null) { // Unconditional flush. |
| for (; ; ) { |
| WALRecord expHead = head.get(); |
| |
| if (expHead.previous() == null) { |
| assert expHead instanceof FakeRecord; |
| |
| return false; |
| } |
| |
| if (flush(expHead)) |
| return true; |
| } |
| } |
| |
| assert ptr.index() == idx; |
| |
| for (; ; ) { |
| WALRecord h = head.get(); |
| |
| // If current chain begin position is greater than requested, then someone else flushed our changes. |
| if (chainBeginPosition(h) > ptr.fileOffset()) |
| return false; |
| |
| if (flush(h)) |
| return true; // We are lucky. |
| } |
| } |
| |
| /** |
| * @param h Head of the chain. |
| * @return Chain begin position. |
| */ |
| private long chainBeginPosition(WALRecord h) { |
| return h.position() + h.size() - h.chainSize(); |
| } |
| |
| /** |
| * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread |
| * @throws IgniteCheckedException If failed. |
| * @throws StorageException If failed. |
| */ |
| private boolean flush(WALRecord expHead) throws StorageException, IgniteCheckedException { |
| if (expHead.previous() == null) { |
| assert expHead instanceof FakeRecord; |
| |
| return false; |
| } |
| |
| // Fail-fast before CAS. |
| checkEnvironment(); |
| |
| if (!head.compareAndSet(expHead, new FakeRecord(nextPosition(expHead)))) |
| return false; |
| |
| // At this point we grabbed the piece of WAL chain. |
| // Any failure in this code must invalidate the environment. |
| try { |
| // We can safely allow other threads to start building next chains while we are doing flush here. |
| ByteBuffer buf; |
| |
| boolean tmpBuf = false; |
| |
| if (expHead.chainSize() > tlbSize) { |
| buf = GridUnsafe.allocateBuffer(expHead.chainSize()); |
| |
| tmpBuf = true; // We need to manually release this temporary direct buffer. |
| } |
| else |
| buf = tlb.get(); |
| |
| try { |
| long pos = fillBuffer(buf, expHead); |
| |
| writeBuffer(pos, buf); |
| } |
| finally { |
| if (tmpBuf) |
| GridUnsafe.freeBuffer(buf); |
| } |
| |
| return true; |
| } |
| catch (Throwable e) { |
| invalidateEnvironment(e); |
| |
| throw e; |
| } |
| } |
| |
| /** |
| * Serializes WAL records chain to provided byte buffer |
| * @param buf Buffer, will be filled with records chain from end to beginning |
| * @param head Head of the chain to write to the buffer. |
| * @return Position in file for this buffer. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private long fillBuffer(ByteBuffer buf, WALRecord head) throws IgniteCheckedException { |
| final int limit = head.chainSize(); |
| |
| assert limit <= buf.capacity(); |
| |
| buf.rewind(); |
| buf.limit(limit); |
| |
| do { |
| buf.position(head.chainSize() - head.size()); |
| buf.limit(head.chainSize()); // Just to make sure that serializer works in bounds. |
| |
| try { |
| serializer.writeRecord(head, buf); |
| } |
| catch (RuntimeException e) { |
| throw new IllegalStateException("Failed to write record: " + head, e); |
| } |
| |
| assert !buf.hasRemaining() : "Reported record size is greater than actual: " + head; |
| |
| head = head.previous(); |
| } |
| while (head.previous() != null); |
| |
| assert head instanceof FakeRecord : head.getClass(); |
| |
| buf.rewind(); |
| buf.limit(limit); |
| |
| return head.position(); |
| } |
| |
| /** |
| * Non-blocking check if this pointer needs to be sync'ed. |
| * |
| * @param ptr WAL pointer to check. |
| * @return {@code False} if this pointer has been already sync'ed. |
| */ |
| private boolean needFsync(FileWALPointer ptr) { |
| // If index has changed, it means that the log was rolled over and already sync'ed. |
| // If requested position is smaller than last sync'ed, it also means all is good. |
| // If position is equal, then our record is the last not synced. |
| return idx == ptr.index() && lastFsyncPos <= ptr.fileOffset(); |
| } |
| |
| /** |
| * @return Pointer to the end of the last written record (probably not fsync-ed). |
| */ |
| private FileWALPointer position() { |
| lock.lock(); |
| |
| try { |
| return new FileWALPointer(idx, (int)written, 0); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param ptr Pointer to sync. |
| * @throws StorageException If failed. |
| */ |
| private void fsync(FileWALPointer ptr) throws StorageException, IgniteCheckedException { |
| lock.lock(); |
| |
| try { |
| if (ptr != null) { |
| if (!needFsync(ptr)) |
| return; |
| |
| if (fsyncDelayNanos > 0 && !stop.get()) { |
| // Delay fsync to collect as many updates as possible: trade latency for throughput. |
| U.await(fsync, fsyncDelayNanos, TimeUnit.NANOSECONDS); |
| |
| if (!needFsync(ptr)) |
| return; |
| } |
| } |
| |
| flushOrWait(ptr); |
| |
| if (lastFsyncPos != written) { |
| assert lastFsyncPos < written; // Fsync position must be behind. |
| |
| try { |
| ch.force(false); |
| } |
| catch (IOException e) { |
| throw new StorageException(e); |
| } |
| |
| lastFsyncPos = written; |
| |
| if (fsyncDelayNanos > 0) |
| fsync.signalAll(); |
| } |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @return {@code true} If this thread actually closed the segment. |
| * @throws IgniteCheckedException If failed. |
| * @throws StorageException If failed. |
| */ |
| private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException { |
| if (stop.compareAndSet(false, true)) { |
| // Here we can be sure that no other records will be added and this fsync will be the last. |
| if (mode == Mode.DEFAULT) |
| fsync(null); |
| else |
| flushOrWait(null); |
| |
| try { |
| if (rollOver && written < (maxSegmentSize - 1)) { |
| ByteBuffer allocate = ByteBuffer.allocate(1); |
| allocate.put((byte) WALRecord.RecordType.SWITCH_SEGMENT_RECORD.ordinal()); |
| |
| ch.write(allocate, written); |
| |
| if (mode == Mode.DEFAULT) |
| ch.force(false); |
| } |
| |
| ch.close(); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closed WAL write handle [idx=" + idx + "]"); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| |
| /** Signals next segment available to wake up other worker threads waiting for WAL to write */ |
| private void signalNextAvailable() { |
| lock.lock(); |
| |
| try { |
| assert head.get() instanceof FakeRecord: "head"; |
| assert written == lastFsyncPos || mode != Mode.DEFAULT : |
| "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']'; |
| |
| ch = null; |
| |
| nextSegment.signalAll(); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void awaitNext() throws IgniteCheckedException { |
| lock.lock(); |
| |
| try { |
| while (ch != null) |
| U.await(nextSegment); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param pos Position in file to start write from. |
| * May be checked against actual position to wait previous writes to complete |
| * @param buf Buffer to write to file |
| * @throws StorageException If failed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException { |
| boolean interrupted = false; |
| |
| lock.lock(); |
| |
| try { |
| assert ch != null : "Writing to a closed segment."; |
| |
| checkEnvironment(); |
| |
| long lastLogged = U.currentTimeMillis(); |
| |
| long logBackoff = 2_000; |
| |
| // If we were too fast, need to wait previous writes to complete. |
| while (written != pos) { |
| assert written < pos : "written = " + written + ", pos = " + pos; // No one can write further than we are now. |
| |
| // Permutation occurred between blocks write operations |
| // order of acquiring lock is not the same as order of write |
| long now = U.currentTimeMillis(); |
| |
| if (now - lastLogged >= logBackoff) { |
| if (logBackoff < 60 * 60_000) |
| logBackoff *= 2; |
| |
| U.warn(log, "Still waiting for a concurrent write to complete [written=" + written + |
| ", pos=" + pos + ", lastFsyncPos=" + lastFsyncPos + ", stop=" + stop.get() + |
| ", actualPos=" + safePosition() + ']'); |
| |
| lastLogged = now; |
| } |
| |
| try { |
| writeComplete.await(2, TimeUnit.SECONDS); |
| } |
| catch (InterruptedException ignore) { |
| interrupted = true; |
| } |
| |
| checkEnvironment(); |
| } |
| |
| // Do the write. |
| int size = buf.remaining(); |
| |
| assert size > 0 : size; |
| |
| try { |
| assert written == ch.position(); |
| |
| do { |
| ch.write(buf); |
| } |
| while (buf.hasRemaining()); |
| |
| written += size; |
| |
| assert written == ch.position(); |
| } |
| catch (IOException e) { |
| invalidateEnvironmentLocked(e); |
| |
| throw new StorageException(e); |
| } |
| } |
| finally { |
| writeComplete.signalAll(); |
| |
| lock.unlock(); |
| |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * @param e Exception to set as a cause for all further operations. |
| */ |
| private void invalidateEnvironment(Throwable e) { |
| lock.lock(); |
| |
| try { |
| invalidateEnvironmentLocked(e); |
| } |
| finally { |
| writeComplete.signalAll(); |
| |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param e Exception to set as a cause for all further operations. |
| */ |
| private void invalidateEnvironmentLocked(Throwable e) { |
| if (envFailed == null) { |
| envFailed = e; |
| |
| U.error(log, "IO error encountered while running WAL flush. All further operations will be failed and " + |
| "local node will be stopped.", e); |
| |
| new Thread() { |
| @Override public void run() { |
| G.stop(gridName, true); |
| } |
| }.start(); |
| } |
| } |
| |
| /** |
| * @throws StorageException If environment is no longer valid and we missed a WAL write. |
| */ |
| private void checkEnvironment() throws StorageException { |
| if (envFailed != null) |
| throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " + |
| "previous error)", envFailed); |
| } |
| |
| /** |
| * @return Safely reads current position of the file channel as String. Will return "null" if channel is null. |
| */ |
| private String safePosition() { |
| FileChannel ch = this.ch; |
| |
| if (ch == null) |
| return "null"; |
| |
| try { |
| return String.valueOf(ch.position()); |
| } |
| catch (IOException e) { |
| return "{Failed to read channel position: " + e.getMessage() + "}"; |
| } |
| } |
| } |
| |
| /** |
| * Fake record is zero-sized record, which is not stored into file. |
| * Fake record is used for storing position in file {@link WALRecord#position()}. |
| * Fake record is allowed to have no previous record. |
| */ |
| private static final class FakeRecord extends WALRecord { |
| /** |
| * @param pos Position. |
| */ |
| FakeRecord(long pos) { |
| position(pos); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RecordType type() { |
| return null; |
| } |
| } |
| |
| /** |
| * Iterator over WAL-log. |
| */ |
| public static class RecordsIterator extends GridCloseableIteratorAdapter<IgniteBiTuple<WALPointer, WALRecord>> |
| implements WALIterator { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Buffer size. */ |
| private final int buffSize = IgniteSystemProperties.getInteger( |
| IGNITE_PDS_WAL_RECORD_ITERATOR_BUFFER_SIZE, 64 * 1024 * 1024); |
| |
| /** */ |
| private final File walWorkDir; |
| |
| /** */ |
| private final File walArchiveDir; |
| |
| /** */ |
| private final FileArchiver archiver; |
| |
| /** */ |
| private final PersistentStoreConfiguration dbCfg; |
| |
| /** */ |
| private final RecordSerializer serializer; |
| |
| /** */ |
| private final GridCacheSharedContext cctx; |
| |
| /** */ |
| private FileWALPointer start; |
| |
| /** */ |
| private FileWALPointer end; |
| |
| /** */ |
| private IgniteBiTuple<WALPointer, WALRecord> curRec; |
| |
| /** */ |
| private int curIdx = -1; |
| |
| /** */ |
| private ReadFileHandle curHandle; |
| |
| /** */ |
| private ByteBuffer buf; |
| |
| /** */ |
| private IgniteLogger log; |
| |
| /** |
| * @param cctx Shared context. |
| * @param walWorkDir WAL work dir. |
| * @param walArchiveDir WAL archive dir. |
| * @param start Optional start pointer. |
| * @param end Optional end pointer. |
| * @param dbCfg Database configuration. |
| * @param serializer Serializer. |
| * @param archiver Archiver. |
| * @throws IgniteCheckedException If failed to initialize WAL segment. |
| */ |
| public RecordsIterator( |
| GridCacheSharedContext cctx, |
| File walWorkDir, |
| File walArchiveDir, |
| FileWALPointer start, |
| FileWALPointer end, |
| PersistentStoreConfiguration dbCfg, |
| RecordSerializer serializer, |
| FileArchiver archiver, |
| IgniteLogger log, |
| int tlbSize |
| ) throws IgniteCheckedException { |
| this.cctx = cctx; |
| this.walWorkDir = walWorkDir; |
| this.walArchiveDir = walArchiveDir; |
| this.dbCfg = dbCfg; |
| this.serializer = serializer; |
| this.archiver = archiver; |
| this.start = start; |
| this.end = end; |
| this.log = log; |
| |
| int buffSize0 = Math.min(16 * tlbSize, buffSize); |
| |
| // Do not allocate direct buffer for iterator. |
| buf = ByteBuffer.allocate(buffSize0); |
| buf.order(ByteOrder.nativeOrder()); |
| |
| init(); |
| |
| advance(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteBiTuple<WALPointer, WALRecord> onNext() throws IgniteCheckedException { |
| IgniteBiTuple<WALPointer, WALRecord> ret = curRec; |
| |
| advance(); |
| |
| return ret; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean onHasNext() throws IgniteCheckedException { |
| return curRec != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onClose() throws IgniteCheckedException { |
| curRec = null; |
| |
| if (curHandle != null) { |
| curHandle.close(); |
| |
| if (curHandle.workDir) |
| releaseWorkSegment(curIdx); |
| |
| curHandle = null; |
| } |
| |
| curIdx = Integer.MAX_VALUE; |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to initialize first file handle. |
| */ |
| private void init() throws IgniteCheckedException { |
| FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); |
| |
| if (start != null) { |
| if (!F.isEmpty(descs)) { |
| if (descs[0].idx > start.index()) |
| throw new IgniteCheckedException("WAL history is too short " + |
| "[descs=" + Arrays.asList(descs) + ", start=" + start + ']'); |
| |
| for (FileDescriptor desc : descs) { |
| if (desc.idx == start.index()) { |
| curIdx = start.index(); |
| |
| break; |
| } |
| } |
| |
| if (curIdx == -1) { |
| int lastArchived = descs[descs.length - 1].idx; |
| |
| if (lastArchived > start.index()) |
| throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start); |
| |
| // This pointer may be in work files because archiver did not |
| // copy the file yet, check that it is not too far forward. |
| curIdx = start.index(); |
| } |
| } |
| else { |
| // This means that whole checkpoint history fits in one segment in WAL work directory. |
| // Will start from this index right away. |
| curIdx = start.index(); |
| } |
| } |
| else |
| curIdx = !F.isEmpty(descs) ? descs[0].idx : 0; |
| |
| curIdx--; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curIdx=" + curIdx + ']'); |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void advance() throws IgniteCheckedException { |
| while (true) { |
| advanceRecord(); |
| |
| if (curRec != null) |
| return; |
| else { |
| advanceSegment(); |
| |
| if (curHandle == null) |
| return; |
| } |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void advanceRecord() throws IgniteCheckedException { |
| try { |
| ReadFileHandle hnd = curHandle; |
| |
| if (hnd != null) { |
| RecordSerializer ser = hnd.ser; |
| |
| int pos = (int)hnd.in.position(); |
| |
| WALRecord rec = ser.readRecord(hnd.in); |
| |
| WALPointer ptr = new FileWALPointer(hnd.idx, pos, rec.size()); |
| |
| curRec = new IgniteBiTuple<>(ptr, rec); |
| } |
| } |
| catch (IOException | IgniteCheckedException e) { |
| // TODO: verify that wrapped IntegrityException is acceptable in this case. |
| curRec = null; |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void advanceSegment() throws IgniteCheckedException { |
| ReadFileHandle cur0 = curHandle; |
| |
| if (cur0 != null) { |
| cur0.close(); |
| |
| if (cur0.workDir) |
| releaseWorkSegment(cur0.idx); |
| |
| curHandle = null; |
| } |
| |
| // We are past the end marker. |
| if (end != null && curIdx + 1 > end.index()) |
| return; |
| |
| curIdx++; |
| |
| FileDescriptor fd; |
| |
| boolean readArchive = canReadArchiveOrReserveWork(curIdx); |
| |
| if (readArchive) { |
| fd = new FileDescriptor(new File(walArchiveDir, |
| FileDescriptor.fileName(curIdx, serializer.version()))); |
| } |
| else { |
| int workIdx = curIdx % dbCfg.getWalSegments(); |
| |
| fd = new FileDescriptor( |
| new File(walWorkDir, FileDescriptor.fileName(workIdx, serializer.version())), |
| curIdx); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reading next file [absIdx=" + curIdx + ", file=" + fd.file.getAbsolutePath() + ']'); |
| |
| assert fd != null; |
| |
| try { |
| curHandle = initReadHandle(fd, start != null && curIdx == start.index() ? start : null); |
| } |
| catch (FileNotFoundException e) { |
| if (readArchive) |
| throw new IgniteCheckedException("Missing WAL segment in the archive", e); |
| else |
| curHandle = null; |
| } |
| |
| if (curHandle != null) |
| curHandle.workDir = !readArchive; |
| else |
| releaseWorkSegment(curIdx); |
| |
| curRec = null; |
| } |
| |
| /** |
| * @param desc File descriptor. |
| * @param start Optional start pointer. |
| * @return Initialized file handle. |
| * @throws FileNotFoundException If segment file is missing. |
| * @throws IgniteCheckedException If initialized failed due to another unexpected error. |
| */ |
| private ReadFileHandle initReadHandle(FileDescriptor desc, FileWALPointer start) |
| throws IgniteCheckedException, FileNotFoundException { |
| try { |
| RandomAccessFile rf = new RandomAccessFile(desc.file, "r"); |
| |
| try { |
| RecordSerializer ser = forVersion(cctx, desc.ver); |
| FileInput in = new FileInput(rf.getChannel(), buf); |
| |
| WALRecord rec = ser.readRecord(in); |
| |
| if (rec == null) |
| return null; |
| |
| if (rec.type() != WALRecord.RecordType.HEADER_RECORD) |
| throw new IOException("Missing file header record: " + desc.file.getAbsoluteFile()); |
| |
| int ver = ((HeaderRecord)rec).version(); |
| |
| if (ver != ser.version()) |
| throw new IOException("Unexpected file format version: " + ver + ", " + |
| desc.file.getAbsoluteFile()); |
| |
| if (start != null && desc.idx == start.index()) |
| in.seek(start.fileOffset()); |
| |
| return new ReadFileHandle(rf, desc.idx, cctx.igniteInstanceName(), ser, in); |
| } |
| catch (SegmentEofException | EOFException ignore) { |
| try { |
| rf.close(); |
| } |
| catch (IOException ce) { |
| throw new IgniteCheckedException(ce); |
| } |
| |
| return null; |
| } |
| catch (IOException | IgniteCheckedException e) { |
| try { |
| rf.close(); |
| } |
| catch (IOException ce) { |
| e.addSuppressed(ce); |
| } |
| |
| throw e; |
| } |
| } |
| catch (FileNotFoundException e) { |
| throw e; |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException( |
| "Failed to initialize WAL segment: " + desc.file.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * @param absIdx Absolute index to check. |
| * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been |
| * archived yet. In this case the corresponding work segment is reserved (will not be deleted until |
| * release). |
| */ |
| private boolean canReadArchiveOrReserveWork(int absIdx) { |
| return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx); |
| } |
| |
| /** |
| * @param absIdx Absolute index to release. |
| */ |
| private void releaseWorkSegment(int absIdx) { |
| if (archiver != null) |
| archiver.releaseWorkSegment(absIdx); |
| } |
| } |
| |
| /** Periodically flushes current file handle for {@link Mode#BACKGROUND} mode */ |
| private class QueueFlusher extends Thread { |
| /** */ |
| private volatile boolean stopped; |
| |
| /** |
| * @param gridName Grid name. |
| */ |
| private QueueFlusher(String gridName) { |
| super("wal-queue-flusher-#" + gridName); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| while (!stopped) { |
| long wakeup = U.currentTimeMillis() + FLUSH_FREQ; |
| |
| LockSupport.parkUntil(wakeup); |
| |
| FileWriteHandle hnd = currentHandle(); |
| |
| try { |
| hnd.flush(hnd.head.get()); |
| } |
| catch (IgniteCheckedException e) { |
| U.warn(log, "Failed to flush WAL record queue", e); |
| } |
| } |
| } |
| |
| /** Signals stop, wakes up thread and waiting until completion */ |
| private void shutdown() { |
| stopped = true; |
| |
| LockSupport.unpark(this); |
| |
| try { |
| join(); |
| } |
| catch (InterruptedException ignore) { |
| // Got interrupted while waiting for flusher to shutdown. |
| } |
| } |
| } |
| |
| /** |
| * WAL Mode. |
| */ |
| private enum Mode { |
| NONE, LOG_ONLY, |
| |
| /** |
| * Write is performed periodically, initiated by background thread, |
| * calls to {@link IgniteWriteAheadLogManager#fsync(org.apache.ignite.internal.pagemem.wal.WALPointer)} have no effect. |
| * Using this mode will decrease persistence reliability for performance |
| */ |
| BACKGROUND, DEFAULT |
| } |
| } |