| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence.wal; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.file.DirectoryStream; |
| import java.nio.file.FileAlreadyExistsException; |
| import java.nio.file.Files; |
| import java.nio.file.Path; |
| import java.sql.Time; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.LockSupport; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.regex.Pattern; |
| import java.util.stream.Stream; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| import java.util.zip.ZipOutputStream; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.WALMode; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.events.WalSegmentArchivedEvent; |
| import org.apache.ignite.events.WalSegmentCompactedEvent; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.WalStateManager.WALDisableContext; |
| import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.StorageException; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; |
| import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.io.FileInput; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentFileInputFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.io.SegmentIO; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.io.SimpleSegmentFileInputFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CIX1; |
| import org.apache.ignite.internal.util.typedef.CO; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.internal.util.worker.GridWorker; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.nio.file.StandardOpenOption.CREATE; |
| import static java.nio.file.StandardOpenOption.READ; |
| import static java.nio.file.StandardOpenOption.WRITE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; |
| import static org.apache.ignite.events.EventType.EVT_WAL_SEGMENT_COMPACTED; |
| import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR; |
| import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION; |
| import static org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer.readSegmentHeader; |
| |
| /** |
| * File WAL manager. |
| */ |
| public class FsyncModeFileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager { |
| /** */ |
| public static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0]; |
| |
| /** */ |
| private static final byte[] FILL_BUF = new byte[1024 * 1024]; |
| |
| /** Pattern for segment file names */ |
| private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); |
| |
| /** */ |
| private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); |
| |
| /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ |
| public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip"); |
| |
| /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ |
| public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() || |
| WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches()); |
| } |
| }; |
| |
| /** */ |
| private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp"); |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** Latest serializer version to use. */ |
| private static final int LATEST_SERIALIZER_VERSION = 2; |
| |
| /** |
| * Percentage of archive size for checkpoint trigger. Need for calculate max size of WAL after last checkpoint. |
| * Checkpoint should be triggered when max size of WAL after last checkpoint more than maxWallArchiveSize * thisValue |
| */ |
| private static final double CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE = |
| IgniteSystemProperties.getDouble(IGNITE_CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE, 0.25); |
| |
| /** |
| * Percentage of WAL archive size to calculate threshold since which removing of old archive should be started. |
| */ |
| private static final double THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE = |
| IgniteSystemProperties.getDouble(IGNITE_THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE, 0.5); |
| |
| /** */ |
| private final boolean alwaysWriteFullPages; |
| |
| /** WAL segment size in bytes */ |
| private final long maxWalSegmentSize; |
| |
| /** |
| * Maximum number of allowed segments without checkpoint. If we have their more checkpoint should be triggered. |
| * It is simple way to calculate wal size without checkpoint instead fair wal size calculating. |
| */ |
| private long maxSegCountWithoutCheckpoint; |
| |
| /** Size of wal archive since which removing of old archive should be started */ |
| private final long allowedThresholdWalArchiveSize; |
| |
| /** */ |
| private final WALMode mode; |
| |
| /** Thread local byte buffer size, see {@link #tlb} */ |
| private final int tlbSize; |
| |
| /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */ |
| private final long flushFreq; |
| |
| /** Fsync delay. */ |
| private final long fsyncDelay; |
| |
| /** */ |
| private final DataStorageConfiguration dsCfg; |
| |
| /** Events service */ |
| private final GridEventStorageManager evt; |
| |
| /** */ |
| private IgniteConfiguration igCfg; |
| |
| /** Persistence metrics tracker. */ |
| private DataStorageMetricsImpl metrics; |
| |
| /** */ |
| private File walWorkDir; |
| |
| /** WAL archive directory (including consistent ID as subfolder) */ |
| private File walArchiveDir; |
| |
| /** Serializer of latest version, used to read header record and for write records */ |
| private RecordSerializer serializer; |
| |
| /** Serializer latest version to use. */ |
| private final int serializerVersion = |
| IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION); |
| |
| /** Latest segment cleared by {@link #truncate(WALPointer, WALPointer)}. */ |
| private volatile long lastTruncatedArchiveIdx = -1L; |
| |
| /** Factory to provide I/O interfaces for read/write operations with files */ |
| private volatile FileIOFactory ioFactory; |
| |
| /** Factory to provide I/O interfaces for read primitives with files */ |
| private final SegmentFileInputFactory segmentFileInputFactory; |
| |
| /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */ |
| private static final AtomicReferenceFieldUpdater<FsyncModeFileWriteAheadLogManager, FileWriteHandle> currentHndUpd = |
| AtomicReferenceFieldUpdater.newUpdater(FsyncModeFileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd"); |
| |
| /** |
| * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}. |
| * Introduced to decrease number of buffers allocation. |
| * Used only for record itself is shorter than {@link #tlbSize}. |
| */ |
| private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() { |
| @Override protected ByteBuffer initialValue() { |
| ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize); |
| |
| buf.order(GridUnsafe.NATIVE_BYTE_ORDER); |
| |
| return buf; |
| } |
| }; |
| |
| /** */ |
| private volatile FileArchiver archiver; |
| |
| /** Compressor. */ |
| private volatile FileCompressor compressor; |
| |
| /** Decompressor. */ |
| private volatile FileDecompressor decompressor; |
| |
| /** */ |
| private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>(); |
| |
| /** Current log segment handle */ |
| private volatile FileWriteHandle currentHnd; |
| |
| /** */ |
| private volatile WALDisableContext walDisableContext; |
| |
| /** |
| * Positive (non-0) value indicates WAL can be archived even if not complete<br> |
| * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> |
| */ |
| private final long walAutoArchiveAfterInactivity; |
| |
| /** |
| * Container with last WAL record logged timestamp.<br> |
| * Zero value means there was no records logged to current segment, skip possible archiving for this case<br> |
| * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br> |
| */ |
| private AtomicLong lastRecordLoggedMs = new AtomicLong(); |
| |
| /** |
| * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown |
| * Null for non background modes |
| */ |
| @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule; |
| |
| /** |
| * Reference to the last added next archive timeout check object. |
| * Null if mode is not enabled. |
| * Should be cancelled at shutdown |
| */ |
| @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public FsyncModeFileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { |
| igCfg = ctx.config(); |
| |
| DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration(); |
| |
| assert dsCfg != null; |
| |
| this.dsCfg = dsCfg; |
| |
| maxWalSegmentSize = dsCfg.getWalSegmentSize(); |
| mode = dsCfg.getWalMode(); |
| tlbSize = dsCfg.getWalThreadLocalBufferSize(); |
| flushFreq = dsCfg.getWalFlushFrequency(); |
| fsyncDelay = dsCfg.getWalFsyncDelayNanos(); |
| alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages(); |
| ioFactory = dsCfg.getFileIOFactory(); |
| segmentFileInputFactory = new SimpleSegmentFileInputFactory(); |
| walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity(); |
| evt = ctx.event(); |
| |
| allowedThresholdWalArchiveSize = (long)(dsCfg.getMaxWalArchiveSize() * THRESHOLD_WAL_ARCHIVE_SIZE_PERCENTAGE); |
| |
| assert mode == WALMode.FSYNC : dsCfg; |
| } |
| |
| /** |
| * For test purposes only. |
| * |
| * @param ioFactory IO factory. |
| */ |
| public void setFileIOFactory(FileIOFactory ioFactory) { |
| this.ioFactory = ioFactory; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start0() throws IgniteCheckedException { |
| if (!cctx.kernalContext().clientNode()) { |
| maxSegCountWithoutCheckpoint = |
| (long)((U.adjustedWalHistorySize(dsCfg, log) * CHECKPOINT_TRIGGER_ARCHIVE_SIZE_PERCENTAGE) |
| / dsCfg.getWalSegmentSize()); |
| |
| final PdsFolderSettings resolveFolders = cctx.kernalContext().pdsFolderResolver().resolveFolders(); |
| |
| checkWalConfiguration(); |
| |
| final File walWorkDir0 = walWorkDir = initDirectory( |
| dsCfg.getWalPath(), |
| DataStorageConfiguration.DFLT_WAL_PATH, |
| resolveFolders.folderName(), |
| "write ahead log work directory" |
| ); |
| |
| final File walArchiveDir0 = walArchiveDir = initDirectory( |
| dsCfg.getWalArchivePath(), |
| DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH, |
| resolveFolders.folderName(), |
| "write ahead log archive directory" |
| ); |
| |
| serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion); |
| |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); |
| |
| metrics = dbMgr.persistentStoreMetricsImpl(); |
| |
| checkOrPrepareFiles(); |
| |
| metrics.setWalSizeProvider(new CO<Long>() { |
| @Override public Long apply() { |
| long size = 0; |
| |
| for (File f : walWorkDir0.listFiles()) |
| size += f.length(); |
| |
| for (File f : walArchiveDir0.listFiles()) |
| size += f.length(); |
| |
| return size; |
| } |
| }); |
| |
| IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices(); |
| |
| lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1; |
| |
| archiver = isArchiverEnabled() ? new FileArchiver(tup == null ? -1 : tup.get2(), log) : null; |
| |
| if (archiver != null && dsCfg.isWalCompactionEnabled()) { |
| compressor = new FileCompressor(); |
| |
| if (decompressor == null) { // Preventing of two file-decompressor thread instantiations. |
| decompressor = new FileDecompressor(log); |
| |
| new IgniteThread(decompressor).start(); |
| } |
| } |
| |
| walDisableContext = cctx.walState().walDisableContext(); |
| |
| if (mode != WALMode.NONE) { |
| if (log.isInfoEnabled()) |
| log.info("Started write-ahead log manager [mode=" + mode + ']'); |
| } |
| else |
| U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " + |
| "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown."); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa) |
| */ |
| private void checkWalConfiguration() throws IgniteCheckedException { |
| if (dsCfg.getWalPath() == null ^ dsCfg.getWalArchivePath() == null) { |
| throw new IgniteCheckedException( |
| "Properties should be either both specified or both null " + |
| "[walStorePath = " + dsCfg.getWalPath() + |
| ", walArchivePath = " + dsCfg.getWalArchivePath() + "]" |
| ); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void stop0(boolean cancel) { |
| final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule; |
| |
| if (schedule != null) |
| schedule.close(); |
| |
| final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj; |
| |
| if (timeoutObj != null) |
| cctx.time().removeTimeoutObject(timeoutObj); |
| |
| final FileWriteHandle currHnd = currentHandle(); |
| |
| try { |
| if (mode == WALMode.BACKGROUND) { |
| if (currHnd != null) |
| currHnd.flush((FileWALPointer)null, true); |
| } |
| |
| if (currHnd != null) |
| currHnd.close(false); |
| |
| if (archiver != null) |
| archiver.shutdown(); |
| |
| if (compressor != null) |
| compressor.shutdown(); |
| |
| if (decompressor != null) |
| decompressor.shutdown(); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() + |
| " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); |
| |
| start0(); |
| |
| if (!cctx.kernalContext().clientNode()) { |
| if (isArchiverEnabled()) { |
| assert archiver != null; |
| |
| new IgniteThread(archiver).start(); |
| } |
| |
| if (compressor != null) |
| compressor.start(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeActivate(GridKernalContext kctx) { |
| if (log.isDebugEnabled()) |
| log.debug("DeActivate file write ahead log [nodeId=" + cctx.localNodeId() + |
| " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); |
| |
| stop0(true); |
| |
| currentHnd = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isAlwaysWriteFullPages() { |
| return alwaysWriteFullPages; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isFullSync() { |
| return mode == WALMode.FSYNC; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { |
| assert currentHnd == null; |
| assert lastPtr == null || lastPtr instanceof FileWALPointer; |
| |
| FileWALPointer filePtr = (FileWALPointer)lastPtr; |
| |
| currentHnd = restoreWriteHandle(filePtr); |
| |
| if (currentHnd.serializer.version() != serializer.version()) { |
| if (log.isInfoEnabled()) |
| log.info("Record serializer version change detected, will start logging with a new WAL record " + |
| "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + |
| ", oldVer=" + currentHnd.serializer.version() + ']'); |
| |
| rollOver(currentHnd); |
| } |
| |
| if (mode == WALMode.BACKGROUND) { |
| backgroundFlushSchedule = cctx.time().schedule(new Runnable() { |
| @Override public void run() { |
| doFlush(); |
| } |
| }, flushFreq, flushFreq); |
| } |
| |
| if (walAutoArchiveAfterInactivity > 0) |
| scheduleNextInactivityPeriodElapsedCheck(); |
| } |
| |
| /** |
| * Schedules next check of inactivity period expired. Based on current record update timestamp. |
| * At timeout method does check of inactivity period and schedules new launch. |
| */ |
| private void scheduleNextInactivityPeriodElapsedCheck() { |
| final long lastRecMs = lastRecordLoggedMs.get(); |
| final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString()); |
| |
| nextAutoArchiveTimeoutObj = new GridTimeoutObject() { |
| private final IgniteUuid id = IgniteUuid.randomUuid(); |
| |
| @Override public IgniteUuid timeoutId() { |
| return id; |
| } |
| |
| @Override public long endTime() { |
| return nextPossibleAutoArchive; |
| } |
| |
| @Override public void onTimeout() { |
| if (log.isDebugEnabled()) |
| log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")"); |
| |
| checkWalRolloverRequiredDuringInactivityPeriod(); |
| |
| scheduleNextInactivityPeriodElapsedCheck(); |
| } |
| }; |
| cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj); |
| } |
| |
| /** |
| * Archiver can be not created, all files will be written to WAL folder, using absolute segment index. |
| * |
| * @return flag indicating if archiver is disabled. |
| */ |
| private boolean isArchiverEnabled() { |
| if (walArchiveDir != null && walWorkDir != null) |
| return !walArchiveDir.equals(walWorkDir); |
| |
| return !new File(dsCfg.getWalArchivePath()).equals(new File(dsCfg.getWalPath())); |
| } |
| |
| /** |
| * Collect wal segment files from low pointer (include) to high pointer (not include) and reserve low pointer. |
| * |
| * @param low Low bound. |
| * @param high High bound. |
| */ |
| public Collection<File> getAndReserveWalFiles(FileWALPointer low, FileWALPointer high) throws IgniteCheckedException { |
| final long awaitIdx = high.index() - 1; |
| |
| while (archiver != null && archiver.lastArchivedAbsoluteIndex() < awaitIdx) |
| LockSupport.parkNanos(Thread.currentThread(), 1_000_000); |
| |
| if (!reserve(low)) |
| throw new IgniteCheckedException("WAL archive segment has been deleted [idx=" + low.index() + "]"); |
| |
| List<File> res = new ArrayList<>(); |
| |
| for (long i = low.index(); i < high.index(); i++) { |
| String segmentName = FileDescriptor.fileName(i); |
| |
| File file = new File(walArchiveDir, segmentName); |
| File fileZip = new File(walArchiveDir, segmentName + FilePageStoreManager.ZIP_SUFFIX); |
| |
| if (file.exists()) |
| res.add(file); |
| else if (fileZip.exists()) |
| res.add(fileZip); |
| else { |
| if (log.isInfoEnabled()) { |
| log.info("Segment not found: " + file.getName() + "/" + fileZip.getName()); |
| |
| log.info("Stopped iteration on idx: " + i); |
| } |
| |
| break; |
| } |
| } |
| |
| return res; |
| } |
| |
| /** {@inheritDoc}*/ |
| @Override public int serializerVersion() { |
| return serializerVersion; |
| } |
| |
| /** |
| * Checks if there was elapsed significant period of inactivity. |
| * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate |
| * roll over by timeout<br> |
| */ |
| private void checkWalRolloverRequiredDuringInactivityPeriod() { |
| if (walAutoArchiveAfterInactivity <= 0) |
| return; // feature not configured, nothing to do |
| |
| final long lastRecMs = lastRecordLoggedMs.get(); |
| |
| if (lastRecMs == 0) |
| return; //no records were logged to current segment, does not consider inactivity |
| |
| final long elapsedMs = U.currentTimeMillis() - lastRecMs; |
| |
| if (elapsedMs <= walAutoArchiveAfterInactivity) |
| return; // not enough time elapsed since last write |
| |
| if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0)) |
| return; // record write occurred concurrently |
| |
| final FileWriteHandle handle = currentHandle(); |
| |
| try { |
| rollOver(handle); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); |
| |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("TooBroadScope") |
| @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException { |
| if (serializer == null || mode == WALMode.NONE) |
| return null; |
| |
| FileWriteHandle currWrHandle = currentHandle(); |
| |
| WALDisableContext isDisable = walDisableContext; |
| |
| // Logging was not resumed yet. |
| if (currWrHandle == null || (isDisable != null && isDisable.check())) |
| return null; |
| |
| // Need to calculate record size first. |
| record.size(serializer.size(record)); |
| |
| while (true) { |
| if (record.rollOver()){ |
| assert cctx.database().checkpointLockIsHeldByThread(); |
| |
| currWrHandle = rollOver(currWrHandle); |
| } |
| |
| WALPointer ptr = currWrHandle.addRecord(record); |
| |
| if (ptr != null) { |
| metrics.onWalRecordLogged(); |
| |
| lastWALPtr.set(ptr); |
| |
| if (walAutoArchiveAfterInactivity > 0) |
| lastRecordLoggedMs.set(U.currentTimeMillis()); |
| |
| return ptr; |
| } |
| else |
| currWrHandle = rollOver(currWrHandle); |
| |
| checkNode(); |
| |
| if (isStopping()) |
| throw new IgniteCheckedException("Stopping."); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void flush(WALPointer ptr, boolean explicitFsync) throws IgniteCheckedException, StorageException { |
| if (serializer == null || mode == WALMode.NONE) |
| return; |
| |
| FileWriteHandle cur = currentHandle(); |
| |
| // WAL manager was not started (client node). |
| if (cur == null) |
| return; |
| |
| FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr); |
| |
| // No need to sync if was rolled over. |
| if (filePtr != null && !cur.needFsync(filePtr)) |
| return; |
| |
| cur.fsync(filePtr, false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public WALIterator replay(WALPointer start) |
| throws IgniteCheckedException, StorageException { |
| assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| FileWriteHandle hnd = currentHandle(); |
| |
| FileWALPointer end = null; |
| |
| if (hnd != null) |
| end = hnd.position(); |
| |
| return new RecordsIterator( |
| cctx, |
| walWorkDir, |
| walArchiveDir, |
| (FileWALPointer)start, |
| end, |
| dsCfg, |
| new RecordSerializerFactoryImpl(cctx), |
| ioFactory, |
| archiver, |
| decompressor, |
| log, |
| segmentFileInputFactory |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean reserve(WALPointer start) throws IgniteCheckedException { |
| assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| if (mode == WALMode.NONE) |
| return false; |
| |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| throw new IgniteCheckedException("Could not reserve WAL segment: archiver == null"); |
| |
| archiver0.reserve(((FileWALPointer)start).index()); |
| |
| if (!hasIndex(((FileWALPointer)start).index())) { |
| archiver0.release(((FileWALPointer)start).index()); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void release(WALPointer start) throws IgniteCheckedException { |
| assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| if (mode == WALMode.NONE) |
| return; |
| |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| throw new IgniteCheckedException("Could not release WAL segment: archiver == null"); |
| |
| archiver0.release(((FileWALPointer)start).index()); |
| } |
| |
| /** |
| * @param absIdx Absolulte index to check. |
| * @return {@code true} if has this index. |
| */ |
| private boolean hasIndex(long absIdx) { |
| String segmentName = FileDescriptor.fileName(absIdx); |
| |
| String zipSegmentName = FileDescriptor.fileName(absIdx) + FilePageStoreManager.ZIP_SUFFIX; |
| |
| boolean inArchive = new File(walArchiveDir, segmentName).exists() || |
| new File(walArchiveDir, zipSegmentName).exists(); |
| |
| if (inArchive) |
| return true; |
| |
| if (absIdx <= lastArchivedIndex()) |
| return false; |
| |
| FileWriteHandle cur = currentHnd; |
| |
| return cur != null && cur.getSegmentId() >= absIdx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int truncate(WALPointer low, WALPointer high) { |
| if (high == null) |
| return 0; |
| |
| assert high instanceof FileWALPointer : high; |
| |
| // File pointer bound: older entries will be deleted from archive |
| FileWALPointer lowPtr = (FileWALPointer)low; |
| FileWALPointer highPtr = (FileWALPointer)high; |
| |
| FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); |
| |
| int deleted = 0; |
| |
| FileArchiver archiver0 = archiver; |
| |
| for (FileDescriptor desc : descs) { |
| if (lowPtr != null && desc.idx < lowPtr.index()) |
| continue; |
| |
| // Do not delete reserved or locked segment and any segment after it. |
| if (archiver0 != null && archiver0.reserved(desc.idx)) |
| return deleted; |
| |
| long lastArchived = archiver0 != null ? archiver0.lastArchivedAbsoluteIndex() : lastArchivedIndex(); |
| |
| // We need to leave at least one archived segment to correctly determine the archive index. |
| if (desc.idx < highPtr.index() && desc.idx < lastArchived) { |
| if (!desc.file.delete()) |
| U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + |
| desc.file.getAbsolutePath()); |
| else |
| deleted++; |
| |
| // Bump up the oldest archive segment index. |
| if (lastTruncatedArchiveIdx < desc.idx) |
| lastTruncatedArchiveIdx = desc.idx; |
| } |
| } |
| |
| return deleted; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void notchLastCheckpointPtr(WALPointer ptr) { |
| if (compressor != null) |
| compressor.keepUncompressedIdxFrom(((FileWALPointer)ptr).index()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int walArchiveSegments() { |
| if (archiver == null) |
| return 0; |
| |
| long lastTruncated = lastTruncatedArchiveIdx; |
| |
| long lastArchived = archiver.lastArchivedAbsoluteIndex(); |
| |
| if (lastArchived == -1) |
| return 0; |
| |
| int res = (int)(lastArchived - lastTruncated); |
| |
| return res >= 0 ? res : 0; |
| } |
| |
| /** |
| * Files from archive WAL directory. |
| */ |
| private FileDescriptor[] walArchiveFiles() { |
| return scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long maxArchivedSegmentToDelete() { |
| //When maxWalArchiveSize==MAX_VALUE deleting files is not permit. |
| if (dsCfg.getMaxWalArchiveSize() == Long.MAX_VALUE) |
| return -1; |
| |
| FileDescriptor[] archivedFiles = walArchiveFiles(); |
| |
| Long totalArchiveSize = Stream.of(archivedFiles) |
| .map(desc -> desc.file().length()) |
| .reduce(0L, Long::sum); |
| |
| if (archivedFiles.length == 0 || totalArchiveSize < allowedThresholdWalArchiveSize) |
| return -1; |
| |
| long sizeOfOldestArchivedFiles = 0; |
| |
| for (FileDescriptor desc : archivedFiles) { |
| sizeOfOldestArchivedFiles += desc.file().length(); |
| |
| if (totalArchiveSize - sizeOfOldestArchivedFiles < allowedThresholdWalArchiveSize) |
| return desc.getIdx(); |
| } |
| |
| return archivedFiles[archivedFiles.length - 1].getIdx(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long lastArchivedSegment() { |
| return archiver != null ? archiver.lastArchivedAbsoluteIndex() : -1L; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long lastCompactedSegment() { |
| return compressor != null ? compressor.lastCompressedIdx : -1L; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean reserved(WALPointer ptr) { |
| FileWALPointer fPtr = (FileWALPointer)ptr; |
| |
| FileArchiver archiver0 = archiver; |
| |
| return archiver0 != null && archiver0.reserved(fPtr.index()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int reserved(WALPointer low, WALPointer high) { |
| // It is not clear now how to get the highest WAL pointer. So when high is null method returns 0. |
| if (high == null) |
| return 0; |
| |
| assert high instanceof FileWALPointer : high; |
| |
| assert low == null || low instanceof FileWALPointer : low; |
| |
| FileWALPointer lowPtr = (FileWALPointer)low; |
| |
| FileWALPointer highPtr = (FileWALPointer)high; |
| |
| FileArchiver archiver0 = archiver; |
| |
| long lowIdx = lowPtr != null ? lowPtr.index() : 0; |
| |
| long highIdx = highPtr.index(); |
| |
| while (lowIdx < highIdx) { |
| if(archiver0 != null && archiver0.reserved(lowIdx)) |
| break; |
| |
| lowIdx++; |
| } |
| |
| return (int)(highIdx - lowIdx + 1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean disabled(int grpId) { |
| return cctx.walState().isDisabled(grpId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void cleanupWalDirectories() throws IgniteCheckedException { |
| try { |
| try (DirectoryStream<Path> files = Files.newDirectoryStream(walWorkDir.toPath())) { |
| for (Path path : files) |
| Files.delete(path); |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to cleanup wal work directory: " + walWorkDir, e); |
| } |
| |
| try { |
| try (DirectoryStream<Path> files = Files.newDirectoryStream(walArchiveDir.toPath())) { |
| for (Path path : files) |
| Files.delete(path); |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to cleanup wal archive directory: " + walArchiveDir, e); |
| } |
| } |
| |
| /** |
| * Lists files in archive directory and returns the index of last archived file. |
| * |
| * @return The absolute index of last archived file. |
| */ |
| private long lastArchivedIndex() { |
| long lastIdx = -1; |
| |
| for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { |
| try { |
| long idx = Long.parseLong(file.getName().substring(0, 16)); |
| |
| lastIdx = Math.max(lastIdx, idx); |
| } |
| catch (NumberFormatException | IndexOutOfBoundsException ignore) { |
| |
| } |
| } |
| |
| return lastIdx; |
| } |
| |
| /** |
| * Lists files in archive directory and returns the indices of least and last archived files. |
| * In case of holes, first segment after last "hole" is considered as minimum. |
| * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20 |
| * |
| * @return The absolute indices of min and max archived files. |
| */ |
| private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() { |
| TreeSet<Long> archiveIndices = new TreeSet<>(); |
| |
| for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { |
| try { |
| long idx = Long.parseLong(file.getName().substring(0, 16)); |
| |
| archiveIndices.add(idx); |
| } |
| catch (NumberFormatException | IndexOutOfBoundsException ignore) { |
| // No-op. |
| } |
| } |
| |
| if (archiveIndices.isEmpty()) |
| return null; |
| else { |
| Long min = archiveIndices.first(); |
| Long max = archiveIndices.last(); |
| |
| if (max - min == archiveIndices.size() - 1) |
| return F.t(min, max); // Short path. |
| |
| for (Long idx : archiveIndices.descendingSet()) { |
| if (!archiveIndices.contains(idx - 1)) |
| return F.t(idx, max); |
| } |
| |
| throw new IllegalStateException("Should never happen if TreeSet is valid."); |
| } |
| } |
| |
| /** |
| * Creates a directory specified by the given arguments. |
| * |
| * @param cfg Configured directory path, may be {@code null}. |
| * @param defDir Default directory path, will be used if cfg is {@code null}. |
| * @param consId Local node consistent ID. |
| * @param msg File description to print out on successful initialization. |
| * @return Initialized directory. |
| * @throws IgniteCheckedException If failed to initialize directory. |
| */ |
| private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException { |
| File dir; |
| |
| if (cfg != null) { |
| File workDir0 = new File(cfg); |
| |
| dir = workDir0.isAbsolute() ? |
| new File(workDir0, consId) : |
| new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, false), consId); |
| } |
| else |
| dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), defDir, false), consId); |
| |
| U.ensureDirectory(dir, msg, log); |
| |
| return dir; |
| } |
| |
| /** |
| * @return Current log segment handle. |
| */ |
| private FileWriteHandle currentHandle() { |
| return currentHnd; |
| } |
| |
| /** |
| * @param cur Handle that failed to fit the given entry. |
| * @return Handle that will fit the entry. |
| */ |
| private FileWriteHandle rollOver(FileWriteHandle cur) throws IgniteCheckedException { |
| FileWriteHandle hnd = currentHandle(); |
| |
| if (hnd != cur) |
| return hnd; |
| |
| if (hnd.close(true)) { |
| if (metrics.metricsEnabled()) |
| metrics.onWallRollOver(); |
| |
| FileWriteHandle next = initNextWriteHandle(cur.getSegmentId()); |
| |
| if (next.getSegmentId() - lashCheckpointFileIdx() >= maxSegCountWithoutCheckpoint) |
| cctx.database().forceCheckpoint("too big size of WAL without checkpoint"); |
| |
| boolean swapped = currentHndUpd.compareAndSet(this, hnd, next); |
| |
| assert swapped : "Concurrent updates on rollover are not allowed"; |
| |
| if (walAutoArchiveAfterInactivity > 0) |
| lastRecordLoggedMs.set(0); |
| |
| // Let other threads to proceed with new segment. |
| hnd.signalNextAvailable(); |
| } |
| else |
| hnd.awaitNext(); |
| |
| return currentHandle(); |
| } |
| |
| /** |
| * Give last checkpoint file idx |
| */ |
| private long lashCheckpointFileIdx() { |
| WALPointer lastCheckpointMark = cctx.database().lastCheckpointMarkWalPointer(); |
| |
| return lastCheckpointMark == null ? 0 : ((FileWALPointer)lastCheckpointMark).index(); |
| } |
| |
| /** |
| * @param lastReadPtr Last read WAL file pointer. |
| * @return Initialized file write handle. |
| * @throws StorageException If failed to initialize WAL write handle. |
| */ |
| private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws StorageException { |
| long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); |
| |
| long segNo = absIdx % dsCfg.getWalSegments(); |
| |
| File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); |
| |
| int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); |
| int len = lastReadPtr == null ? 0 : lastReadPtr.length(); |
| |
| try { |
| SegmentIO fileIO = new SegmentIO(absIdx, ioFactory.create(curFile)); |
| |
| try { |
| int serVer = serializerVersion; |
| |
| // If we have existing segment, try to read version from it. |
| if (lastReadPtr != null) { |
| try { |
| serVer = readSegmentHeader(fileIO, segmentFileInputFactory).getSerializerVersion(); |
| } |
| catch (SegmentEofException | EOFException ignore) { |
| serVer = serializerVersion; |
| } |
| } |
| |
| RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer); |
| |
| if (log.isInfoEnabled()) |
| log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + |
| ", offset=" + offset + ", ver=" + serVer + ']'); |
| |
| FileWriteHandle hnd = new FileWriteHandle( |
| fileIO, |
| offset + len, |
| maxWalSegmentSize, |
| ser); |
| |
| // For new handle write serializer version to it. |
| if (lastReadPtr == null) |
| hnd.writeSerializerVersion(); |
| |
| if (archiver != null) |
| archiver.currentWalIndex(absIdx); |
| |
| return hnd; |
| } |
| catch (IgniteCheckedException | IOException e) { |
| try { |
| fileIO.close(); |
| } |
| catch (IOException suppressed) { |
| e.addSuppressed(suppressed); |
| } |
| |
| if (e instanceof StorageException) |
| throw (StorageException) e; |
| |
| throw e instanceof IOException ? (IOException) e : new IOException(e); |
| } |
| } |
| catch (IOException e) { |
| throw new StorageException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * Fills the file header for a new segment. |
| * Calling this method signals we are done with the segment and it can be archived. |
| * If we don't have prepared file yet and achiever is busy this method blocks |
| * |
| * @param curIdx current absolute segment released by WAL writer |
| * @return Initialized file handle. |
| * @throws IgniteCheckedException If exception occurred. |
| */ |
| private FileWriteHandle initNextWriteHandle(long curIdx) throws IgniteCheckedException { |
| IgniteCheckedException error = null; |
| |
| try { |
| File nextFile = pollNextFile(curIdx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); |
| |
| SegmentIO fileIO = new SegmentIO(curIdx + 1, ioFactory.create(nextFile)); |
| |
| FileWriteHandle hnd = new FileWriteHandle( |
| fileIO, |
| 0, |
| maxWalSegmentSize, |
| serializer); |
| |
| hnd.writeSerializerVersion(); |
| |
| return hnd; |
| } |
| catch (IgniteCheckedException e) { |
| throw error = e; |
| } |
| catch (IOException e) { |
| throw error = new StorageException("Unable to initialize WAL segment", e); |
| } |
| finally { |
| if (error != null) |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, error)); |
| } |
| } |
| |
| /** |
| * Deletes temp files, creates and prepares new; Creates first segment if necessary. |
| * |
| * @throws StorageException If failed. |
| */ |
| private void checkOrPrepareFiles() throws StorageException { |
| // Clean temp files. |
| { |
| File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER); |
| |
| if (!F.isEmpty(tmpFiles)) { |
| for (File tmp : tmpFiles) { |
| boolean deleted = tmp.delete(); |
| |
| if (!deleted) |
| throw new StorageException("Failed to delete previously created temp file " + |
| "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); |
| } |
| } |
| } |
| |
| File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); |
| |
| if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) |
| throw new StorageException("Failed to initialize wal (work directory contains " + |
| "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); |
| |
| // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. |
| if (allFiles.length == 0) { |
| File first = new File(walWorkDir, FileDescriptor.fileName(0)); |
| |
| createFile(first); |
| } |
| else |
| checkFiles(0, false, null, null); |
| } |
| |
| /** |
| * Clears whole the file, fills with zeros for Default mode. |
| * |
| * @param file File to format. |
| * @throws StorageException if formatting failed. |
| */ |
| private void formatFile(File file) throws StorageException { |
| formatFile(file, dsCfg.getWalSegmentSize()); |
| } |
| |
| /** |
| * Clears the file, fills with zeros for Default mode. |
| * |
| * @param file File to format. |
| * @param bytesCntToFormat Count of first bytes to format. |
| * @throws StorageException If formatting failed. |
| */ |
| private void formatFile(File file, int bytesCntToFormat) throws StorageException { |
| if (log.isDebugEnabled()) |
| log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); |
| |
| try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) { |
| int left = bytesCntToFormat; |
| |
| if (mode == WALMode.FSYNC) { |
| while ((left -= fileIO.writeFully(FILL_BUF, 0, Math.min(FILL_BUF.length, left))) > 0) |
| ; |
| |
| fileIO.force(); |
| } |
| else |
| fileIO.clear(); |
| } |
| catch (IOException e) { |
| throw new StorageException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * Creates a file atomically with temp file. |
| * |
| * @param file File to create. |
| * @throws StorageException If failed. |
| */ |
| private void createFile(File file) throws StorageException { |
| if (log.isDebugEnabled()) |
| log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); |
| |
| File tmp = new File(file.getParent(), file.getName() + FilePageStoreManager.TMP_SUFFIX); |
| |
| formatFile(tmp); |
| |
| try { |
| Files.move(tmp.toPath(), file.toPath()); |
| } |
| catch (IOException e) { |
| throw new StorageException("Failed to move temp file to a regular WAL segment file: " + |
| file.getAbsolutePath(), e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created WAL segment [file=" + file.getAbsolutePath() + ", size=" + file.length() + ']'); |
| } |
| |
| /** |
| * Retrieves next available file to write WAL data, waiting |
| * if necessary for a segment to become available. |
| * |
| * @param curIdx Current absolute WAL segment index. |
| * @return File ready for use as new WAL segment. |
| * @throws StorageException If exception occurred in the archiver thread. |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| private File pollNextFile(long curIdx) throws StorageException, IgniteInterruptedCheckedException { |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| return new File(walWorkDir, FileDescriptor.fileName(curIdx + 1)); |
| |
| // Signal to archiver that we are done with the segment and it can be archived. |
| long absNextIdx = archiver0.nextAbsoluteSegmentIndex(curIdx); |
| |
| long segmentIdx = absNextIdx % dsCfg.getWalSegments(); |
| |
| return new File(walWorkDir, FileDescriptor.fileName(segmentIdx)); |
| } |
| |
| |
| /** |
| * @return Sorted WAL files descriptors. |
| */ |
| public static FileDescriptor[] scan(File[] allFiles) { |
| if (allFiles == null) |
| return EMPTY_DESCRIPTORS; |
| |
| FileDescriptor[] descs = new FileDescriptor[allFiles.length]; |
| |
| for (int i = 0; i < allFiles.length; i++) { |
| File f = allFiles[i]; |
| |
| descs[i] = new FileDescriptor(f); |
| } |
| |
| Arrays.sort(descs); |
| |
| return descs; |
| } |
| |
| /** |
| * @throws StorageException If node is no longer valid and we missed a WAL operation. |
| */ |
| private void checkNode() throws StorageException { |
| if (cctx.kernalContext().invalid()) |
| throw new StorageException("Failed to perform WAL operation (environment was invalidated by a " + |
| "previous error)"); |
| } |
| |
| /** |
| * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate |
| * the work WAL segment: S(N) = N % dsCfg.walSegments. |
| * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment |
| * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow |
| * write to S(W) if W - A <= walSegments. <br> |
| * |
| * Monitor of current object is used for notify on: |
| * <ul> |
| * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li> |
| * <li>stopping thread ({@link FileArchiver#isCancelled}==true)</li> |
| * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li> |
| * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li> |
| * <li>some WAL index was removed from {@link FileArchiver#locked} map</li> |
| * </ul> |
| */ |
| private class FileArchiver extends GridWorker { |
| /** Exception which occurred during initial creation of files or during archiving WAL segment */ |
| private StorageException cleanException; |
| |
| /** |
| * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. |
| * Incremented during rollover. Also may be directly set if WAL is resuming logging after start. |
| */ |
| private long curAbsWalIdx = -1; |
| |
| /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */ |
| private volatile long lastAbsArchivedIdx = -1; |
| |
| /** */ |
| private NavigableMap<Long, Integer> reserved = new TreeMap<>(); |
| |
| /** |
| * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may |
| * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. |
| */ |
| private Map<Long, Integer> locked = new HashMap<>(); |
| |
| /** Formatted index. */ |
| private int formatted; |
| |
| /** |
| * |
| */ |
| private FileArchiver(long lastAbsArchivedIdx, IgniteLogger log) { |
| super(cctx.igniteInstanceName(), "wal-file-archiver%" + cctx.igniteInstanceName(), log, |
| cctx.kernalContext().workersRegistry()); |
| |
| this.lastAbsArchivedIdx = lastAbsArchivedIdx; |
| } |
| |
| /** |
| * @return Last archived segment absolute index. |
| */ |
| private long lastArchivedAbsoluteIndex() { |
| return lastAbsArchivedIdx; |
| } |
| |
| /** |
| * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. |
| */ |
| private void shutdown() throws IgniteInterruptedCheckedException { |
| synchronized (this) { |
| isCancelled = true; |
| |
| notifyAll(); |
| } |
| |
| U.join(runner()); |
| } |
| |
| /** |
| * @param curAbsWalIdx Current absolute WAL segment index. |
| */ |
| private void currentWalIndex(long curAbsWalIdx) { |
| synchronized (this) { |
| this.curAbsWalIdx = curAbsWalIdx; |
| |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * @param absIdx Index for reservation. |
| */ |
| private synchronized void reserve(long absIdx) { |
| Integer cur = reserved.get(absIdx); |
| |
| if (cur == null) |
| reserved.put(absIdx, 1); |
| else |
| reserved.put(absIdx, cur + 1); |
| } |
| |
| /** |
| * Check if WAL segment locked or reserved |
| * |
| * @param absIdx Index for check reservation. |
| * @return {@code True} if index is reserved. |
| */ |
| private synchronized boolean reserved(long absIdx) { |
| return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null; |
| } |
| |
| /** |
| * @param absIdx Reserved index. |
| */ |
| private synchronized void release(long absIdx) { |
| Integer cur = reserved.get(absIdx); |
| |
| assert cur != null && cur >= 1 : cur; |
| |
| if (cur == 1) |
| reserved.remove(absIdx); |
| else |
| reserved.put(absIdx, cur - 1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() { |
| blockingSectionBegin(); |
| |
| try { |
| allocateRemainingFiles(); |
| } |
| catch (StorageException e) { |
| synchronized (this) { |
| // Stop the thread and report to starter. |
| cleanException = e; |
| |
| notifyAll(); |
| } |
| |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, e)); |
| |
| return; |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| |
| Throwable err = null; |
| |
| try { |
| synchronized (this) { |
| while (curAbsWalIdx == -1 && !isCancelled()) { |
| blockingSectionBegin(); |
| |
| try { |
| wait(); |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| } |
| |
| // If the archive directory is empty, we can be sure that there were no WAL segments archived. |
| // This is ensured by the check in truncate() which will leave at least one file there |
| // once it was archived. |
| } |
| |
| while (!Thread.currentThread().isInterrupted() && !isCancelled()) { |
| long toArchive; |
| |
| synchronized (this) { |
| assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx + |
| ", current=" + curAbsWalIdx; |
| |
| while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && !isCancelled()) { |
| blockingSectionBegin(); |
| |
| try { |
| wait(); |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| } |
| |
| toArchive = lastAbsArchivedIdx + 1; |
| } |
| |
| if (isCancelled()) |
| break; |
| |
| SegmentArchiveResult res; |
| |
| blockingSectionBegin(); |
| |
| try { |
| res = archiveSegment(toArchive); |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| |
| synchronized (this) { |
| while (locked.containsKey(toArchive) && !isCancelled()) { |
| blockingSectionBegin(); |
| |
| try { |
| wait(); |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| } |
| |
| changeLastArchivedIndexAndWakeupCompressor(toArchive); |
| |
| notifyAll(); |
| } |
| |
| if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED)) { |
| evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(), |
| res.getAbsIdx(), res.getDstArchiveFile())); |
| } |
| |
| onIdle(); |
| } |
| } |
| catch (InterruptedException t) { |
| Thread.currentThread().interrupt(); |
| |
| if (!isCancelled()) |
| err = t; |
| } |
| catch (Throwable t) { |
| err = t; |
| } |
| finally { |
| if (err == null && !isCancelled()) |
| err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); |
| |
| if (err instanceof OutOfMemoryError) |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); |
| else if (err != null) |
| cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); |
| } |
| } |
| |
| /** |
| * @param idx Index. |
| */ |
| private void changeLastArchivedIndexAndWakeupCompressor(long idx) { |
| lastAbsArchivedIdx = idx; |
| |
| if (compressor != null) |
| compressor.onNextSegmentArchived(); |
| } |
| |
| /** |
| * Gets the absolute index of the next WAL segment available to write. |
| * Blocks till there are available file to write |
| * |
| * @param curIdx Current absolute index that we want to increment. |
| * @return Next index (curWalSegmIdx+1) when it is ready to be written. |
| * @throws StorageException If exception occurred in the archiver thread. |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| private long nextAbsoluteSegmentIndex(long curIdx) throws StorageException, IgniteInterruptedCheckedException { |
| try { |
| synchronized (this) { |
| if (cleanException != null) |
| throw cleanException; |
| |
| assert curIdx == curAbsWalIdx; |
| |
| curAbsWalIdx++; |
| |
| // Notify archiver thread. |
| notifyAll(); |
| |
| int segments = dsCfg.getWalSegments(); |
| |
| if (isArchiverEnabled()) { |
| while ((curAbsWalIdx - lastAbsArchivedIdx > segments && cleanException == null)) |
| wait(); |
| } |
| |
| if (cleanException != null) |
| throw cleanException; |
| |
| // Wait for formatter so that we do not open an empty file in DEFAULT mode. |
| while (curAbsWalIdx % dsCfg.getWalSegments() > formatted && cleanException == null) |
| wait(); |
| |
| if (cleanException != null) |
| throw cleanException; |
| |
| return curAbsWalIdx; |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedCheckedException(e); |
| } |
| } |
| |
| /** |
| * @param absIdx Segment absolute index. |
| * @return <ul><li>{@code True} if can read, no lock is held, </li><li>{@code false} if work segment, need |
| * release segment later, use {@link #releaseWorkSegment} for unlock</li> </ul> |
| */ |
| @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") |
| private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { |
| synchronized (this) { |
| if (lastAbsArchivedIdx >= absIdx) { |
| if (log.isDebugEnabled()) |
| log.debug("Not needed to reserve WAL segment: absIdx=" + absIdx + ";" + |
| " lastAbsArchivedIdx=" + lastAbsArchivedIdx); |
| |
| return true; |
| } |
| |
| Integer cur = locked.get(absIdx); |
| |
| cur = cur == null ? 1 : cur + 1; |
| |
| locked.put(absIdx, cur); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reserved work segment [absIdx=" + absIdx + ", pins=" + cur + ']'); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * @param absIdx Segment absolute index. |
| */ |
| @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") |
| private void releaseWorkSegment(long absIdx) { |
| synchronized (this) { |
| Integer cur = locked.get(absIdx); |
| |
| assert cur != null && cur > 0 : "WAL Segment with Index " + absIdx + " is not locked;" + |
| " lastAbsArchivedIdx = " + lastAbsArchivedIdx; |
| |
| if (cur == 1) { |
| locked.remove(absIdx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Fully released work segment (ready to archive) [absIdx=" + absIdx + ']'); |
| } |
| else { |
| locked.put(absIdx, cur - 1); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partially released work segment [absIdx=" + absIdx + ", pins=" + (cur - 1) + ']'); |
| } |
| |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * Moves WAL segment from work folder to archive folder. |
| * Temp file is used to do movement |
| * |
| * @param absIdx Absolute index to archive. |
| */ |
| private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException { |
| long segIdx = absIdx % dsCfg.getWalSegments(); |
| |
| File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); |
| |
| String name = FileDescriptor.fileName(absIdx); |
| |
| File dstTmpFile = new File(walArchiveDir, name + FilePageStoreManager.TMP_SUFFIX); |
| |
| File dstFile = new File(walArchiveDir, name); |
| |
| if (log.isInfoEnabled()) |
| log.info("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + |
| ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']'); |
| |
| try { |
| Files.deleteIfExists(dstTmpFile.toPath()); |
| |
| Files.copy(origFile.toPath(), dstTmpFile.toPath()); |
| |
| Files.move(dstTmpFile.toPath(), dstFile.toPath()); |
| |
| if (mode == WALMode.FSYNC) { |
| try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) { |
| f0.force(); |
| } |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to archive WAL segment [" + |
| "srcFile=" + origFile.getAbsolutePath() + |
| ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); |
| } |
| |
| if (log.isInfoEnabled()) |
| log.info("Copied file [src=" + origFile.getAbsolutePath() + |
| ", dst=" + dstFile.getAbsolutePath() + ']'); |
| |
| return new SegmentArchiveResult(absIdx, origFile, dstFile); |
| } |
| |
| /** |
| * |
| */ |
| private boolean checkStop() { |
| return isCancelled(); |
| } |
| |
| /** |
| * Background creation of all segments except first. First segment was created in main thread by |
| * {@link FsyncModeFileWriteAheadLogManager#checkOrPrepareFiles()} |
| */ |
| private void allocateRemainingFiles() throws StorageException { |
| final FileArchiver archiver = this; |
| |
| checkFiles(1, |
| true, |
| new IgnitePredicate<Integer>() { |
| @Override public boolean apply(Integer integer) { |
| return !checkStop(); |
| } |
| }, new CI1<Integer>() { |
| @Override public void apply(Integer idx) { |
| synchronized (archiver) { |
| formatted = idx; |
| |
| archiver.notifyAll(); |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Responsible for compressing WAL archive segments. |
| * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved. |
| */ |
| private class FileCompressor extends Thread { |
| /** Current thread stopping advice. */ |
| private volatile boolean stopped; |
| |
| /** Last successfully compressed segment. */ |
| private volatile long lastCompressedIdx = -1L; |
| |
| /** All segments prior to this (inclusive) can be compressed. */ |
| private volatile long minUncompressedIdxToKeep = -1L; |
| |
| /** */ |
| FileCompressor() { |
| super("wal-file-compressor%" + cctx.igniteInstanceName()); |
| } |
| |
| /** */ |
| private void init() { |
| File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER); |
| |
| for (File f : toDel) { |
| if (stopped) |
| return; |
| |
| f.delete(); |
| } |
| |
| FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); |
| |
| if (alreadyCompressed.length > 0) |
| lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].idx(); |
| } |
| |
| /** |
| * @param idx Minimum raw segment index that should be preserved from deletion. |
| */ |
| synchronized void keepUncompressedIdxFrom(long idx) { |
| minUncompressedIdxToKeep = idx; |
| |
| notify(); |
| } |
| |
| /** |
| * Callback for waking up compressor when new segment is archived. |
| */ |
| synchronized void onNextSegmentArchived() { |
| notify(); |
| } |
| |
| /** |
| * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. |
| * Waits if there's no segment to archive right now. |
| */ |
| private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException { |
| long segmentToCompress = lastCompressedIdx + 1; |
| |
| synchronized (this) { |
| if (stopped) |
| return -1; |
| |
| while (segmentToCompress > archiver.lastArchivedAbsoluteIndex()) { |
| wait(); |
| |
| if (stopped) |
| return -1; |
| } |
| } |
| |
| segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1); |
| |
| boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); |
| |
| return reserved ? segmentToCompress : -1; |
| } |
| |
| /** |
| * Deletes raw WAL segments if they aren't locked and already have compressed copies of themselves. |
| */ |
| private void deleteObsoleteRawSegments() { |
| FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); |
| |
| Set<Long> indices = new HashSet<>(); |
| Set<Long> duplicateIndices = new HashSet<>(); |
| |
| for (FileDescriptor desc : descs) { |
| if (!indices.add(desc.idx)) |
| duplicateIndices.add(desc.idx); |
| } |
| |
| FileArchiver archiver0 = archiver; |
| |
| for (FileDescriptor desc : descs) { |
| if (desc.isCompressed()) |
| continue; |
| |
| // Do not delete reserved or locked segment and any segment after it. |
| if (archiver0 != null && archiver0.reserved(desc.idx)) |
| return; |
| |
| if (desc.idx < minUncompressedIdxToKeep && duplicateIndices.contains(desc.idx)) { |
| if (!desc.file.delete()) |
| U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + |
| desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| init(); |
| |
| while (!Thread.currentThread().isInterrupted() && !stopped) { |
| long currReservedSegment = -1; |
| |
| try { |
| deleteObsoleteRawSegments(); |
| |
| currReservedSegment = tryReserveNextSegmentOrWait(); |
| if (currReservedSegment == -1) |
| continue; |
| |
| File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) |
| + FilePageStoreManager.ZIP_SUFFIX + FilePageStoreManager.TMP_SUFFIX); |
| |
| File zip = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment) |
| + FilePageStoreManager.ZIP_SUFFIX); |
| |
| File raw = new File(walArchiveDir, FileDescriptor.fileName(currReservedSegment)); |
| if (!Files.exists(raw.toPath())) |
| throw new IgniteCheckedException("WAL archive segment is missing: " + raw); |
| |
| compressSegmentToFile(currReservedSegment, raw, tmpZip); |
| |
| Files.move(tmpZip.toPath(), zip.toPath()); |
| |
| if (mode != WALMode.NONE) { |
| try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { |
| f0.force(); |
| } |
| |
| if (evt.isRecordable(EVT_WAL_SEGMENT_COMPACTED)) { |
| evt.record(new WalSegmentCompactedEvent( |
| cctx.discovery().localNode(), |
| currReservedSegment, |
| zip.getAbsoluteFile()) |
| ); |
| } |
| } |
| |
| lastCompressedIdx = currReservedSegment; |
| } |
| catch (IgniteCheckedException | IOException e) { |
| U.error(log, "Compression of WAL segment [idx=" + currReservedSegment + |
| "] was skipped due to unexpected error", e); |
| |
| lastCompressedIdx++; |
| } |
| catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| finally { |
| try { |
| if (currReservedSegment != -1) |
| release(new FileWALPointer(currReservedSegment, 0, 0)); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Can't release raw WAL segment [idx=" + currReservedSegment + |
| "] after compression", e); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param nextSegment Next segment absolute idx. |
| * @param raw Raw file. |
| * @param zip Zip file. |
| */ |
| private void compressSegmentToFile(long nextSegment, File raw, File zip) |
| throws IOException, IgniteCheckedException { |
| int segmentSerializerVer; |
| |
| try (FileIO fileIO = ioFactory.create(raw)) { |
| segmentSerializerVer = readSegmentHeader(new SegmentIO(nextSegment, fileIO), segmentFileInputFactory).getSerializerVersion(); |
| } |
| |
| try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { |
| zos.setLevel(dsCfg.getWalCompactionLevel()); |
| zos.putNextEntry(new ZipEntry("")); |
| |
| zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array()); |
| |
| final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() { |
| @Override public void applyx(WALRecord record) throws IgniteCheckedException { |
| final MarshalledRecord marshRec = (MarshalledRecord)record; |
| |
| try { |
| zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining()); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| }; |
| |
| try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator( |
| log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) { |
| |
| while (iter.hasNextX()) |
| iter.nextX(); |
| } |
| } |
| } |
| |
| /** |
| * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. |
| */ |
| private void shutdown() throws IgniteInterruptedCheckedException { |
| synchronized (this) { |
| stopped = true; |
| |
| notifyAll(); |
| } |
| |
| U.join(this); |
| } |
| } |
| |
| /** |
| * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay. |
| */ |
| private class FileDecompressor extends GridWorker { |
| /** Decompression futures. */ |
| private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>(); |
| |
| /** Segments queue. */ |
| private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>(); |
| |
| /** Byte array for draining data. */ |
| private byte[] arr = new byte[tlbSize]; |
| |
| /** |
| * @param log Logger. |
| */ |
| FileDecompressor(IgniteLogger log) { |
| super(cctx.igniteInstanceName(), "wal-file-decompressor%" + cctx.igniteInstanceName(), log, |
| cctx.kernalContext().workersRegistry()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void body() { |
| Throwable err = null; |
| |
| try { |
| while (!isCancelled()) { |
| long segmentToDecompress = -1L; |
| |
| try { |
| blockingSectionBegin(); |
| |
| try { |
| segmentToDecompress = segmentsQueue.take(); |
| } |
| finally { |
| blockingSectionEnd(); |
| } |
| |
| if (isCancelled()) |
| break; |
| |
| File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) |
| + FilePageStoreManager.ZIP_SUFFIX); |
| File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) |
| + FilePageStoreManager.TMP_SUFFIX); |
| File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); |
| |
| try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); |
| FileIO io = ioFactory.create(unzipTmp)) { |
| zis.getNextEntry(); |
| |
| while (io.writeFully(arr, 0, zis.read(arr)) > 0) |
| updateHeartbeat(); |
| } |
| |
| try { |
| Files.move(unzipTmp.toPath(), unzip.toPath()); |
| } |
| catch (FileAlreadyExistsException e) { |
| U.error(log, "Can't rename temporary unzipped segment: raw segment is already present " + |
| "[tmp=" + unzipTmp + ", raw=" + unzip + ']', e); |
| |
| if (!unzipTmp.delete()) |
| U.error(log, "Can't delete temporary unzipped segment [tmp=" + unzipTmp + ']'); |
| } |
| |
| updateHeartbeat(); |
| |
| synchronized (this) { |
| decompressionFutures.remove(segmentToDecompress).onDone(); |
| } |
| } |
| catch (IOException ex) { |
| if (!isCancelled && segmentToDecompress != -1L) { |
| IgniteCheckedException e = new IgniteCheckedException("Error during WAL segment " + |
| "decompression [segmentIdx=" + segmentToDecompress + ']', ex); |
| |
| synchronized (this) { |
| decompressionFutures.remove(segmentToDecompress).onDone(e); |
| } |
| } |
| } |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| if (!isCancelled) |
| err = e; |
| } |
| catch (Throwable t) { |
| err = t; |
| } |
| finally { |
| if (err == null && !isCancelled) |
| err = new IllegalStateException("Worker " + name() + " is terminated unexpectedly"); |
| |
| if (err instanceof OutOfMemoryError) |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err)); |
| else if (err != null) |
| cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err)); |
| } |
| } |
| |
| /** |
| * Asynchronously decompresses WAL segment which is present only in .zip file. |
| * |
| * @return Future which is completed once file is decompressed. |
| */ |
| synchronized IgniteInternalFuture<Void> decompressFile(long idx) { |
| if (decompressionFutures.containsKey(idx)) |
| return decompressionFutures.get(idx); |
| |
| File f = new File(walArchiveDir, FileDescriptor.fileName(idx)); |
| |
| if (f.exists()) |
| return new GridFinishedFuture<>(); |
| |
| segmentsQueue.put(idx); |
| |
| GridFutureAdapter<Void> res = new GridFutureAdapter<>(); |
| |
| decompressionFutures.put(idx, res); |
| |
| return res; |
| } |
| |
| /** */ |
| private void shutdown() { |
| synchronized (this) { |
| U.cancel(this); |
| |
| // Put fake -1 to wake thread from queue.take() |
| segmentsQueue.put(-1L); |
| } |
| |
| U.join(this, log); |
| } |
| } |
| |
| /** |
| * Validate files depending on {@link DataStorageConfiguration#getWalSegments()} and create if need. |
| * Check end when exit condition return false or all files are passed. |
| * |
| * @param startWith Start with. |
| * @param create Flag create file. |
| * @param p Predicate Exit condition. |
| * @throws StorageException if validation or create file fail. |
| */ |
| private void checkFiles( |
| int startWith, |
| boolean create, |
| @Nullable IgnitePredicate<Integer> p, |
| @Nullable IgniteInClosure<Integer> completionCallback |
| ) throws StorageException { |
| for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { |
| File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); |
| |
| if (checkFile.exists()) { |
| if (checkFile.isDirectory()) |
| throw new StorageException("Failed to initialize WAL log segment (a directory with " + |
| "the same name already exists): " + checkFile.getAbsolutePath()); |
| else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.FSYNC) |
| throw new StorageException("Failed to initialize WAL log segment " + |
| "(WAL segment size change is not supported):" + checkFile.getAbsolutePath()); |
| } |
| else if (create) |
| createFile(checkFile); |
| |
| if (completionCallback != null) |
| completionCallback.apply(i); |
| } |
| } |
| |
| /** |
| * Writes record serializer version to provided {@code io}. |
| * NOTE: Method mutates position of {@code io}. |
| * |
| * @param io I/O interface for file. |
| * @param idx Segment index. |
| * @param version Serializer version. |
| * @return I/O position after write version. |
| * @throws IOException If failed to write serializer version. |
| */ |
| public static long writeSerializerVersion(FileIO io, long idx, int version, WALMode mode) throws IOException { |
| ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false); |
| |
| io.writeFully(buffer); |
| |
| // Flush |
| if (mode == WALMode.FSYNC) |
| io.force(); |
| |
| return io.position(); |
| } |
| |
| /** |
| * @param idx Index. |
| * @param ver Version. |
| * @param compacted Compacted flag. |
| */ |
| @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) { |
| ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); |
| buf.order(ByteOrder.nativeOrder()); |
| |
| // Write record type. |
| buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); |
| |
| // Write position. |
| RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0)); |
| |
| // Place magic number. |
| buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC); |
| |
| // Place serializer version. |
| buf.putInt(ver); |
| |
| // Place CRC if needed. |
| if (!RecordV1Serializer.skipCrc) { |
| int curPos = buf.position(); |
| |
| buf.position(0); |
| |
| // This call will move buffer position to the end of the record again. |
| int crcVal = PureJavaCrc32.calcCrc32(buf, curPos); |
| |
| buf.putInt(crcVal); |
| } |
| else |
| buf.putInt(0); |
| |
| // Write header record through io. |
| buf.position(0); |
| |
| return buf; |
| } |
| |
| /** |
| * |
| */ |
| private abstract static class FileHandle { |
| /** I/O interface for read/write operations with file */ |
| protected SegmentIO fileIO; |
| /** Segment idx corresponded to fileIo*/ |
| final long segmentIdx; |
| |
| /** |
| * @param fileIO I/O interface for read/write operations of FileHandle. |
| */ |
| private FileHandle(@NotNull SegmentIO fileIO) { |
| this.fileIO = fileIO; |
| this.segmentIdx = fileIO.getSegmentId(); |
| } |
| |
| /** |
| * @return Current segment id. |
| */ |
| public long getSegmentId(){ |
| return segmentIdx; |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class ReadFileHandle extends FileHandle implements AbstractWalRecordsIterator.AbstractReadFileHandle { |
| /** Entry serializer. */ |
| RecordSerializer ser; |
| |
| /** */ |
| FileInput in; |
| |
| /** |
| * <code>true</code> if this file handle came from work directory. |
| * <code>false</code> if this file handle came from archive directory. |
| */ |
| private boolean workDir; |
| |
| /** |
| * @param fileIO I/O interface for read/write operations of FileHandle. |
| * @param ser Entry serializer. |
| * @param in File input. |
| */ |
| ReadFileHandle( |
| SegmentIO fileIO, |
| RecordSerializer ser, |
| FileInput in |
| ) { |
| super(fileIO); |
| |
| this.ser = ser; |
| this.in = in; |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to close the WAL segment file. |
| */ |
| @Override public void close() throws IgniteCheckedException { |
| try { |
| fileIO.close(); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long idx() { |
| return getSegmentId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public FileInput in() { |
| return in; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RecordSerializer ser() { |
| return ser; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean workDir() { |
| return workDir; |
| } |
| } |
| |
| /** |
| * File handle for one log segment. |
| */ |
| @SuppressWarnings("SignalWithoutCorrespondingAwait") |
| private class FileWriteHandle extends FileHandle { |
| /** */ |
| private final RecordSerializer serializer; |
| |
| /** See {@link FsyncModeFileWriteAheadLogManager#maxWalSegmentSize} */ |
| private final long maxSegmentSize; |
| |
| /** |
| * Accumulated WAL records chain. |
| * This reference points to latest WAL record. |
| * When writing records chain is iterated from latest to oldest (see {@link WALRecord#previous()}) |
| * Records from chain are saved into buffer in reverse order |
| */ |
| private final AtomicReference<WALRecord> head = new AtomicReference<>(); |
| |
| /** |
| * Position in current file after the end of last written record (incremented after file channel write |
| * operation) |
| */ |
| private volatile long written; |
| |
| /** */ |
| private volatile long lastFsyncPos; |
| |
| /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/ |
| private final AtomicBoolean stop = new AtomicBoolean(false); |
| |
| /** */ |
| private final Lock lock = new ReentrantLock(); |
| |
| /** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */ |
| private final Condition writeComplete = lock.newCondition(); |
| |
| /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()} */ |
| private final Condition fsync = lock.newCondition(); |
| |
| /** |
| * Next segment available condition. |
| * Protection from "spurious wakeup" is provided by predicate {@link #fileIO}=<code>null</code> |
| */ |
| private final Condition nextSegment = lock.newCondition(); |
| |
| /** |
| * @param fileIO I/O file interface to use |
| * @param pos Position. |
| * @param maxSegmentSize Max segment size. |
| * @param serializer Serializer. |
| * @throws IOException If failed. |
| */ |
| private FileWriteHandle( |
| SegmentIO fileIO, |
| long pos, |
| long maxSegmentSize, |
| RecordSerializer serializer |
| ) throws IOException { |
| super(fileIO); |
| |
| assert serializer != null; |
| |
| fileIO.position(pos); |
| |
| this.maxSegmentSize = maxSegmentSize; |
| this.serializer = serializer; |
| |
| head.set(new FakeRecord(new FileWALPointer(fileIO.getSegmentId(), (int)pos, 0), false)); |
| written = pos; |
| lastFsyncPos = pos; |
| } |
| |
| /** |
| * Write serializer version to current handle. |
| * NOTE: Method mutates {@code fileIO} position, written and lastFsyncPos fields. |
| * |
| * @throws IOException If fail to write serializer version. |
| */ |
| private void writeSerializerVersion() throws IOException { |
| try { |
| assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + |
| fileIO.position(); |
| |
| long updatedPosition = FsyncModeFileWriteAheadLogManager.writeSerializerVersion(fileIO, getSegmentId(), |
| serializer.version(), mode); |
| |
| written = updatedPosition; |
| lastFsyncPos = updatedPosition; |
| head.set(new FakeRecord(new FileWALPointer(getSegmentId(), (int)updatedPosition, 0), false)); |
| } |
| catch (IOException e) { |
| throw new IOException("Unable to write serializer version for segment " + getSegmentId(), e); |
| } |
| } |
| |
| /** |
| * Checks if current head is a close fake record and returns {@code true} if so. |
| * |
| * @return {@code true} if current head is close record. |
| */ |
| private boolean stopped() { |
| return stopped(head.get()); |
| } |
| |
| /** |
| * @param record Record to check. |
| * @return {@code true} if the record is fake close record. |
| */ |
| private boolean stopped(WALRecord record) { |
| return record instanceof FakeRecord && ((FakeRecord)record).stop; |
| } |
| |
| /** |
| * @param rec Record to be added to record chain as new {@link #head} |
| * @return Pointer or null if roll over to next segment is required or already started by other thread. |
| * @throws StorageException If failed. |
| */ |
| @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException { |
| assert rec.size() > 0 || rec.getClass() == FakeRecord.class; |
| |
| boolean flushed = false; |
| |
| for (; ; ) { |
| WALRecord h = head.get(); |
| |
| long nextPos = nextPosition(h); |
| |
| if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) { |
| // Can not write to this segment, need to switch to the next one. |
| return null; |
| } |
| |
| int newChainSize = h.chainSize() + rec.size(); |
| |
| if (newChainSize > tlbSize && !flushed) { |
| boolean res = h.previous() == null || flush(h, false); |
| |
| if (rec.size() > tlbSize) |
| flushed = res; |
| |
| continue; |
| } |
| |
| rec.chainSize(newChainSize); |
| rec.previous(h); |
| |
| FileWALPointer ptr = new FileWALPointer( |
| getSegmentId(), |
| (int)nextPos, |
| rec.size()); |
| |
| rec.position(ptr); |
| |
| if (head.compareAndSet(h, rec)) |
| return ptr; |
| } |
| } |
| |
| /** |
| * @param rec Record. |
| * @return Position for the next record. |
| */ |
| private long nextPosition(WALRecord rec) { |
| return recordOffset(rec) + rec.size(); |
| } |
| |
| /** |
| * Flush or wait for concurrent flush completion. |
| * |
| * @param ptr Pointer. |
| * @throws StorageException If failed. |
| */ |
| private void flushOrWait(FileWALPointer ptr, boolean stop) throws StorageException { |
| long expWritten; |
| |
| if (ptr != null) { |
| // If requested obsolete file index, it must be already flushed by close. |
| if (ptr.index() != getSegmentId()) |
| return; |
| |
| expWritten = ptr.fileOffset(); |
| } |
| else // We read head position before the flush because otherwise we can get wrong position. |
| expWritten = recordOffset(head.get()); |
| |
| if (flush(ptr, stop)) |
| return; |
| else if (stop) { |
| FakeRecord fr = (FakeRecord)head.get(); |
| |
| assert fr.stop : "Invalid fake record on top of the queue: " + fr; |
| |
| expWritten = recordOffset(fr); |
| } |
| |
| // Spin-wait for a while before acquiring the lock. |
| for (int i = 0; i < 64; i++) { |
| if (written >= expWritten) |
| return; |
| } |
| |
| // If we did not flush ourselves then await for concurrent flush to complete. |
| lock.lock(); |
| |
| try { |
| while (written < expWritten && !cctx.kernalContext().invalid()) |
| U.awaitQuiet(writeComplete); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param ptr Pointer. |
| * @return {@code true} If the flush really happened. |
| * @throws StorageException If failed. |
| */ |
| private boolean flush(FileWALPointer ptr, boolean stop) throws StorageException { |
| if (ptr == null) { // Unconditional flush. |
| for (; ; ) { |
| WALRecord expHead = head.get(); |
| |
| if (expHead.previous() == null) { |
| FakeRecord frHead = (FakeRecord)expHead; |
| |
| if (frHead.stop == stop || frHead.stop || |
| head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop))) |
| return false; |
| } |
| |
| if (flush(expHead, stop)) |
| return true; |
| } |
| } |
| |
| assert ptr.index() == getSegmentId(); |
| |
| for (; ; ) { |
| WALRecord h = head.get(); |
| |
| // If current chain begin position is greater than requested, then someone else flushed our changes. |
| if (chainBeginPosition(h) > ptr.fileOffset()) |
| return false; |
| |
| if (flush(h, stop)) |
| return true; // We are lucky. |
| } |
| } |
| |
| /** |
| * @param h Head of the chain. |
| * @return Chain begin position. |
| */ |
| private long chainBeginPosition(WALRecord h) { |
| return recordOffset(h) + h.size() - h.chainSize(); |
| } |
| |
| /** |
| * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread |
| * @throws StorageException If failed. |
| */ |
| private boolean flush(WALRecord expHead, boolean stop) throws StorageException { |
| if (expHead.previous() == null) { |
| FakeRecord frHead = (FakeRecord)expHead; |
| |
| if (!stop || frHead.stop) // Protects from CASing terminal FakeRecord(true) to FakeRecord(false) |
| return false; |
| } |
| |
| // Fail-fast before CAS. |
| checkNode(); |
| |
| if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(getSegmentId(), (int)nextPosition(expHead), 0), stop))) |
| return false; |
| |
| if (expHead.chainSize() == 0) |
| return false; |
| |
| // At this point we grabbed the piece of WAL chain. |
| // Any failure in this code must invalidate the environment. |
| try { |
| // We can safely allow other threads to start building next chains while we are doing flush here. |
| ByteBuffer buf; |
| |
| boolean tmpBuf = false; |
| |
| if (expHead.chainSize() > tlbSize) { |
| buf = GridUnsafe.allocateBuffer(expHead.chainSize()); |
| |
| tmpBuf = true; // We need to manually release this temporary direct buffer. |
| } |
| else |
| buf = tlb.get(); |
| |
| try { |
| long pos = fillBuffer(buf, expHead); |
| |
| writeBuffer(pos, buf); |
| } |
| finally { |
| if (tmpBuf) |
| GridUnsafe.freeBuffer(buf); |
| } |
| |
| return true; |
| } |
| catch (Throwable e) { |
| StorageException se = e instanceof StorageException ? (StorageException) e : |
| new StorageException("Unable to write", new IOException(e)); |
| |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); |
| |
| // All workers waiting for a next segment must be woken up and stopped |
| signalNextAvailable(); |
| |
| throw se; |
| } |
| } |
| |
| /** |
| * Serializes WAL records chain to provided byte buffer |
| * |
| * @param buf Buffer, will be filled with records chain from end to beginning |
| * @param head Head of the chain to write to the buffer. |
| * @return Position in file for this buffer. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private long fillBuffer(ByteBuffer buf, WALRecord head) throws IgniteCheckedException { |
| final int limit = head.chainSize(); |
| |
| assert limit <= buf.capacity(); |
| |
| buf.rewind(); |
| buf.limit(limit); |
| |
| do { |
| buf.position(head.chainSize() - head.size()); |
| buf.limit(head.chainSize()); // Just to make sure that serializer works in bounds. |
| |
| try { |
| serializer.writeRecord(head, buf); |
| } |
| catch (RuntimeException e) { |
| throw new IllegalStateException("Failed to write record: " + head, e); |
| } |
| |
| assert !buf.hasRemaining() : "Reported record size is greater than actual: " + head; |
| |
| head = head.previous(); |
| } |
| while (head.previous() != null); |
| |
| assert head instanceof FakeRecord : head.getClass(); |
| |
| buf.rewind(); |
| buf.limit(limit); |
| |
| return recordOffset(head); |
| } |
| |
| /** |
| * Non-blocking check if this pointer needs to be sync'ed. |
| * |
| * @param ptr WAL pointer to check. |
| * @return {@code False} if this pointer has been already sync'ed. |
| */ |
| private boolean needFsync(FileWALPointer ptr) { |
| // If index has changed, it means that the log was rolled over and already sync'ed. |
| // If requested position is smaller than last sync'ed, it also means all is good. |
| // If position is equal, then our record is the last not synced. |
| return getSegmentId() == ptr.index() && lastFsyncPos <= ptr.fileOffset(); |
| } |
| |
| /** |
| * @return Pointer to the end of the last written record (probably not fsync-ed). |
| */ |
| private FileWALPointer position() { |
| lock.lock(); |
| |
| try { |
| return new FileWALPointer(getSegmentId(), (int)written, 0); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param ptr Pointer to sync. |
| * @throws StorageException If failed. |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteInterruptedCheckedException { |
| lock.lock(); |
| |
| try { |
| if (ptr != null) { |
| if (!needFsync(ptr)) |
| return; |
| |
| if (fsyncDelay > 0 && !stopped()) { |
| // Delay fsync to collect as many updates as possible: trade latency for throughput. |
| U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS); |
| |
| if (!needFsync(ptr)) |
| return; |
| } |
| } |
| |
| flushOrWait(ptr, stop); |
| |
| if (stopped()) |
| return; |
| |
| if (lastFsyncPos != written) { |
| assert lastFsyncPos < written; // Fsync position must be behind. |
| |
| boolean metricsEnabled = metrics.metricsEnabled(); |
| |
| long start = metricsEnabled ? System.nanoTime() : 0; |
| |
| try { |
| fileIO.force(); |
| } |
| catch (IOException e) { |
| throw new StorageException(e); |
| } |
| |
| lastFsyncPos = written; |
| |
| if (fsyncDelay > 0) |
| fsync.signalAll(); |
| |
| long end = metricsEnabled ? System.nanoTime() : 0; |
| |
| if (metricsEnabled) |
| metrics.onFsync(end - start); |
| } |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @return {@code true} If this thread actually closed the segment. |
| * @throws StorageException If failed. |
| */ |
| private boolean close(boolean rollOver) throws StorageException { |
| if (stop.compareAndSet(false, true)) { |
| lock.lock(); |
| |
| try { |
| flushOrWait(null, true); |
| |
| assert stopped() : "Segment is not closed after close flush: " + head.get(); |
| |
| try { |
| try { |
| RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx) |
| .createSerializer(serializerVersion); |
| |
| SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); |
| |
| int switchSegmentRecSize = backwardSerializer.size(segmentRecord); |
| |
| if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { |
| final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); |
| |
| segmentRecord.position(new FileWALPointer(getSegmentId(), (int)written, switchSegmentRecSize)); |
| backwardSerializer.writeRecord(segmentRecord, buf); |
| |
| buf.rewind(); |
| |
| written += fileIO.writeFully(buf, written); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| throw new IOException(e); |
| } |
| finally { |
| assert mode == WALMode.FSYNC; |
| |
| // Do the final fsync. |
| fileIO.force(); |
| |
| lastFsyncPos = written; |
| |
| fileIO.close(); |
| } |
| } |
| catch (IOException e) { |
| throw new StorageException("Failed to close WAL write handle [idx=" + getSegmentId() + "]", e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closed WAL write handle [idx=" + getSegmentId() + "]"); |
| |
| return true; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| else |
| return false; |
| } |
| |
| /** |
| * Signals next segment available to wake up other worker threads waiting for WAL to write |
| */ |
| private void signalNextAvailable() { |
| lock.lock(); |
| |
| try { |
| WALRecord rec = head.get(); |
| |
| if (!cctx.kernalContext().invalid()) { |
| assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head " |
| + (rec != null ? rec.getClass().getSimpleName() : "null"); |
| |
| assert written == lastFsyncPos || mode != WALMode.FSYNC : |
| "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']'; |
| |
| fileIO = null; |
| } |
| else { |
| try { |
| fileIO.close(); |
| } |
| catch (IOException e) { |
| U.error(log, "Failed to close WAL file [idx=" + getSegmentId() + ", fileIO=" + fileIO + "]", e); |
| } |
| } |
| |
| nextSegment.signalAll(); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void awaitNext() { |
| lock.lock(); |
| |
| try { |
| while (fileIO != null && !cctx.kernalContext().invalid()) |
| U.awaitQuiet(nextSegment); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param pos Position in file to start write from. May be checked against actual position to wait previous |
| * writes to complete |
| * @param buf Buffer to write to file |
| * @throws StorageException If failed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| private void writeBuffer(long pos, ByteBuffer buf) throws StorageException { |
| boolean interrupted = false; |
| |
| lock.lock(); |
| |
| try { |
| assert fileIO != null : "Writing to a closed segment."; |
| |
| checkNode(); |
| |
| long lastLogged = U.currentTimeMillis(); |
| |
| long logBackoff = 2_000; |
| |
| // If we were too fast, need to wait previous writes to complete. |
| while (written != pos) { |
| assert written < pos : "written = " + written + ", pos = " + pos; // No one can write further than we are now. |
| |
| // Permutation occurred between blocks write operations. |
| // Order of acquiring lock is not the same as order of write. |
| long now = U.currentTimeMillis(); |
| |
| if (now - lastLogged >= logBackoff) { |
| if (logBackoff < 60 * 60_000) |
| logBackoff *= 2; |
| |
| U.warn(log, "Still waiting for a concurrent write to complete [written=" + written + |
| ", pos=" + pos + ", lastFsyncPos=" + lastFsyncPos + ", stop=" + stop.get() + |
| ", actualPos=" + safePosition() + ']'); |
| |
| lastLogged = now; |
| } |
| |
| try { |
| writeComplete.await(2, TimeUnit.SECONDS); |
| } |
| catch (InterruptedException ignore) { |
| interrupted = true; |
| } |
| |
| checkNode(); |
| } |
| |
| // Do the write. |
| int size = buf.remaining(); |
| |
| assert size > 0 : size; |
| |
| try { |
| assert written == fileIO.position(); |
| |
| fileIO.writeFully(buf); |
| |
| written += size; |
| |
| metrics.onWalBytesWritten(size); |
| |
| assert written == fileIO.position(); |
| } |
| catch (IOException e) { |
| StorageException se = new StorageException("Unable to write", e); |
| |
| cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, se)); |
| |
| throw se; |
| } |
| } |
| finally { |
| writeComplete.signalAll(); |
| |
| lock.unlock(); |
| |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * @return Safely reads current position of the file channel as String. Will return "null" if channel is null. |
| */ |
| private String safePosition() { |
| FileIO io = this.fileIO; |
| |
| if (io == null) |
| return "null"; |
| |
| try { |
| return String.valueOf(io.position()); |
| } |
| catch (IOException e) { |
| return "{Failed to read channel position: " + e.getMessage() + "}"; |
| } |
| } |
| } |
| |
| /** |
| * Gets WAL record offset relative to the WAL segment file beginning. |
| * |
| * @param rec WAL record. |
| * @return File offset. |
| */ |
| private static int recordOffset(WALRecord rec) { |
| FileWALPointer ptr = (FileWALPointer)rec.position(); |
| |
| assert ptr != null; |
| |
| return ptr.fileOffset(); |
| } |
| |
| /** |
| * Fake record is zero-sized record, which is not stored into file. |
| * Fake record is used for storing position in file {@link WALRecord#position()}. |
| * Fake record is allowed to have no previous record. |
| */ |
| private static final class FakeRecord extends WALRecord { |
| /** */ |
| private final boolean stop; |
| |
| /** |
| * @param pos Position. |
| */ |
| FakeRecord(FileWALPointer pos, boolean stop) { |
| position(pos); |
| |
| this.stop = stop; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RecordType type() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public FileWALPointer position() { |
| return (FileWALPointer) super.position(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(FakeRecord.class, this, "super", super.toString()); |
| } |
| } |
| |
| /** |
| * Iterator over WAL-log. |
| */ |
| private class RecordsIterator extends AbstractWalRecordsIterator { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private final File walWorkDir; |
| |
| /** */ |
| private final File walArchiveDir; |
| |
| /** */ |
| private final FileArchiver archiver; |
| |
| /** */ |
| private final FileDecompressor decompressor; |
| |
| /** */ |
| private final DataStorageConfiguration psCfg; |
| |
| /** Optional start pointer. */ |
| @Nullable |
| private FileWALPointer start; |
| |
| /** Optional end pointer. */ |
| @Nullable |
| private FileWALPointer end; |
| |
| /** |
| * @param cctx Shared context. |
| * @param walWorkDir WAL work dir. |
| * @param walArchiveDir WAL archive dir. |
| * @param start Optional start pointer. |
| * @param end Optional end pointer. |
| * @param psCfg Database configuration. |
| * @param serializerFactory Serializer factory. |
| * @param archiver Archiver. |
| * @param decompressor Decompressor. |
| * @param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. |
| * @param segmentFileInputFactory Segment file input factory. |
| */ |
| private RecordsIterator( |
| GridCacheSharedContext cctx, |
| File walWorkDir, |
| File walArchiveDir, |
| @Nullable FileWALPointer start, |
| @Nullable FileWALPointer end, |
| DataStorageConfiguration psCfg, |
| @NotNull RecordSerializerFactory serializerFactory, |
| FileIOFactory ioFactory, |
| FileArchiver archiver, |
| FileDecompressor decompressor, |
| IgniteLogger log, |
| SegmentFileInputFactory segmentFileInputFactory |
| ) throws IgniteCheckedException { |
| super( |
| log, |
| cctx, |
| serializerFactory, |
| ioFactory, |
| psCfg.getWalRecordIteratorBufferSize(), |
| segmentFileInputFactory |
| ); |
| this.walWorkDir = walWorkDir; |
| this.walArchiveDir = walArchiveDir; |
| this.psCfg = psCfg; |
| this.archiver = archiver; |
| this.start = start; |
| this.end = end; |
| this.decompressor = decompressor; |
| |
| init(); |
| |
| advance(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected ReadFileHandle initReadHandle( |
| @NotNull AbstractFileDescriptor desc, |
| @Nullable FileWALPointer start |
| ) throws IgniteCheckedException, FileNotFoundException { |
| AbstractFileDescriptor currDesc = desc; |
| |
| if (!desc.file().exists()) { |
| FileDescriptor zipFile = new FileDescriptor( |
| new File(walArchiveDir, FileDescriptor.fileName(desc.idx()) |
| + FilePageStoreManager.ZIP_SUFFIX)); |
| |
| if (!zipFile.file.exists()) { |
| throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + |
| "[segmentIdx=" + desc.idx() + "]"); |
| } |
| |
| if (decompressor != null) |
| decompressor.decompressFile(desc.idx()).get(); |
| else |
| currDesc = zipFile; |
| } |
| |
| return (ReadFileHandle) super.initReadHandle(currDesc, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onClose() throws IgniteCheckedException { |
| super.onClose(); |
| |
| curRec = null; |
| |
| final AbstractReadFileHandle handle = closeCurrentWalSegment(); |
| |
| if (handle != null && handle.workDir()) |
| releaseWorkSegment(curWalSegmIdx); |
| |
| curWalSegmIdx = Integer.MAX_VALUE; |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to initialize first file handle. |
| */ |
| private void init() throws IgniteCheckedException { |
| AbstractFileDescriptor[] descs = loadFileDescriptors(walArchiveDir); |
| |
| if (start != null) { |
| if (!F.isEmpty(descs)) { |
| if (descs[0].idx() > start.index()) |
| throw new IgniteCheckedException("WAL history is too short " + |
| "[descs=" + Arrays.asList(descs) + ", start=" + start + ']'); |
| |
| for (AbstractFileDescriptor desc : descs) { |
| if (desc.idx() == start.index()) { |
| curWalSegmIdx = start.index(); |
| |
| break; |
| } |
| } |
| |
| if (curWalSegmIdx == -1) { |
| long lastArchived = descs[descs.length - 1].idx(); |
| |
| if (lastArchived > start.index()) |
| throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start); |
| |
| // This pointer may be in work files because archiver did not |
| // copy the file yet, check that it is not too far forward. |
| curWalSegmIdx = start.index(); |
| } |
| } |
| else { |
| // This means that whole checkpoint history fits in one segment in WAL work directory. |
| // Will start from this index right away. |
| curWalSegmIdx = start.index(); |
| } |
| } |
| else |
| curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx() : 0; |
| |
| curWalSegmIdx--; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']'); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected AbstractReadFileHandle advanceSegment( |
| @Nullable final AbstractReadFileHandle curWalSegment |
| ) throws IgniteCheckedException { |
| if (curWalSegment != null) { |
| curWalSegment.close(); |
| |
| if (curWalSegment.workDir()) |
| releaseWorkSegment(curWalSegment.idx()); |
| |
| } |
| |
| // We are past the end marker. |
| if (end != null && curWalSegmIdx + 1 > end.index()) |
| return null; //stop iteration |
| |
| curWalSegmIdx++; |
| |
| FileDescriptor fd; |
| |
| boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); |
| |
| if (readArchive) |
| fd = new FileDescriptor(new File(walArchiveDir, FileDescriptor.fileName(curWalSegmIdx))); |
| else { |
| long workIdx = curWalSegmIdx % psCfg.getWalSegments(); |
| |
| fd = new FileDescriptor( |
| new File(walWorkDir, FileDescriptor.fileName(workIdx)), |
| curWalSegmIdx); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file().getAbsolutePath() + ']'); |
| |
| ReadFileHandle nextHandle; |
| |
| try { |
| nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null); |
| } |
| catch (FileNotFoundException e) { |
| if (readArchive) |
| throw new IgniteCheckedException("Missing WAL segment in the archive", e); |
| else |
| nextHandle = null; |
| } |
| |
| if (nextHandle == null) { |
| if (!readArchive) |
| releaseWorkSegment(curWalSegmIdx); |
| } |
| else |
| nextHandle.workDir = !readArchive; |
| |
| curRec = null; |
| |
| return nextHandle; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteCheckedException handleRecordException( |
| @NotNull Exception e, |
| @Nullable FileWALPointer ptr) { |
| |
| if (e instanceof IgniteCheckedException) |
| if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) |
| // This means that there is no explicit last sengment, so we iterate unil the very end. |
| if (end == null) { |
| long nextWalSegmentIdx = curWalSegmIdx + 1; |
| |
| // Check that we should not look this segment up in archive directory. |
| // Basically the same check as in "advanceSegment" method. |
| if (archiver != null) |
| if (!canReadArchiveOrReserveWork(nextWalSegmentIdx)) |
| try { |
| long workIdx = nextWalSegmentIdx % dsCfg.getWalSegments(); |
| |
| FileDescriptor fd = new FileDescriptor( |
| new File(walWorkDir, FileDescriptor.fileName(workIdx)), |
| nextWalSegmentIdx |
| ); |
| |
| try { |
| ReadFileHandle nextHandle = initReadHandle(fd, null); |
| |
| // "nextHandle == null" is true only if current segment is the last one in the |
| // whole history. Only in such case we ignore crc validation error and just stop |
| // as if we reached the end of the WAL. |
| if (nextHandle == null) |
| return null; |
| } |
| catch (IgniteCheckedException | FileNotFoundException initReadHandleException) { |
| e.addSuppressed(initReadHandleException); |
| } |
| } |
| finally { |
| releaseWorkSegment(nextWalSegmentIdx); |
| } |
| } |
| |
| return super.handleRecordException(e, ptr); |
| } |
| |
| /** |
| * @param absIdx Absolute index to check. |
| * @return <ul><li> {@code True} if we can safely read the archive, </li> <li>{@code false} if the segment has |
| * not been archived yet. In this case the corresponding work segment is reserved (will not be deleted until |
| * release). Use {@link #releaseWorkSegment} for unlock </li></ul> |
| */ |
| private boolean canReadArchiveOrReserveWork(long absIdx) { |
| return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx); |
| } |
| |
| /** |
| * @param absIdx Absolute index to release. |
| */ |
| private void releaseWorkSegment(long absIdx) { |
| if (archiver != null) |
| archiver.releaseWorkSegment(absIdx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected AbstractReadFileHandle createReadFileHandle(SegmentIO fileIO, |
| RecordSerializer ser, FileInput in) { |
| return new ReadFileHandle(fileIO, ser, in); |
| } |
| } |
| |
| /** |
| * Flushes current file handle for {@link WALMode#BACKGROUND} WALMode. |
| * Called periodically from scheduler. |
| */ |
| private void doFlush() { |
| final FileWriteHandle hnd = currentHandle(); |
| try { |
| hnd.flush(hnd.head.get(), false); |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to flush WAL record queue", e); |
| } |
| } |
| |
| /** |
| * Scans provided folder for a WAL segment files |
| * @param walFilesDir directory to scan |
| * @return found WAL file descriptors |
| */ |
| private static FileDescriptor[] loadFileDescriptors(@NotNull final File walFilesDir) throws IgniteCheckedException { |
| final File[] files = walFilesDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER); |
| |
| if (files == null) { |
| throw new IgniteCheckedException("WAL files directory does not not denote a " + |
| "directory, or if an I/O error occurs: [" + walFilesDir.getAbsolutePath() + "]"); |
| } |
| return scan(files); |
| } |
| } |