| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence.wal; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.EOFException; |
| import java.io.File; |
| import java.io.FileFilter; |
| import java.io.FileInputStream; |
| import java.io.FileNotFoundException; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.ByteOrder; |
| import java.nio.file.Files; |
| import java.sql.Time; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.NavigableMap; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| import java.util.concurrent.PriorityBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.regex.Pattern; |
| import java.util.zip.ZipEntry; |
| import java.util.zip.ZipInputStream; |
| import java.util.zip.ZipOutputStream; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.configuration.DataStorageConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.WALMode; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.events.WalSegmentArchivedEvent; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.IgnitionEx; |
| import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.StorageException; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.MarshalledRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.SwitchSegmentRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedManagerAdapter; |
| import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIO; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.filename.PdsFolderSettings; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.record.HeaderRecord; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializer; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactory; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordSerializerFactoryImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.serializer.RecordV1Serializer; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutProcessor; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.CIX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static java.nio.file.StandardOpenOption.CREATE; |
| import static java.nio.file.StandardOpenOption.READ; |
| import static java.nio.file.StandardOpenOption.WRITE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAL_SERIALIZER_VERSION; |
| |
| /** |
| * File WAL manager. |
| */ |
| public class FileWriteAheadLogManager extends GridCacheSharedManagerAdapter implements IgniteWriteAheadLogManager { |
| /** */ |
| public static final FileDescriptor[] EMPTY_DESCRIPTORS = new FileDescriptor[0]; |
| |
| /** */ |
| public static final String WAL_SEGMENT_FILE_EXT = ".wal"; |
| |
| /** */ |
| private static final byte[] FILL_BUF = new byte[1024 * 1024]; |
| |
| /** Pattern for segment file names */ |
| private static final Pattern WAL_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal"); |
| |
| /** */ |
| private static final Pattern WAL_TEMP_NAME_PATTERN = Pattern.compile("\\d{16}\\.wal\\.tmp"); |
| |
| /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ |
| public static final FileFilter WAL_SEGMENT_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_NAME_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_TEMP_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_TEMP_NAME_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final Pattern WAL_SEGMENT_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip"); |
| |
| /** WAL segment file filter, see {@link #WAL_NAME_PATTERN} */ |
| public static final FileFilter WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && (WAL_NAME_PATTERN.matcher(file.getName()).matches() || |
| WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches()); |
| } |
| }; |
| |
| /** */ |
| private static final Pattern WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN = Pattern.compile("\\d{16}\\.wal\\.zip\\.tmp"); |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_FILE_COMPACTED_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_SEGMENT_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** */ |
| private static final FileFilter WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER = new FileFilter() { |
| @Override public boolean accept(File file) { |
| return !file.isDirectory() && WAL_SEGMENT_TEMP_FILE_COMPACTED_PATTERN.matcher(file.getName()).matches(); |
| } |
| }; |
| |
| /** Latest serializer version to use. */ |
| private static final int LATEST_SERIALIZER_VERSION = 2; |
| |
| /** */ |
| private final boolean alwaysWriteFullPages; |
| |
| /** WAL segment size in bytes */ |
| private final long maxWalSegmentSize; |
| |
| /** */ |
| private final WALMode mode; |
| |
| /** Thread local byte buffer size, see {@link #tlb} */ |
| private final int tlbSize; |
| |
| /** WAL flush frequency. Makes sense only for {@link WALMode#BACKGROUND} log WALMode. */ |
| private final long flushFreq; |
| |
| /** Fsync delay. */ |
| private final long fsyncDelay; |
| |
| /** */ |
| private final DataStorageConfiguration dsCfg; |
| |
| /** Events service */ |
| private final GridEventStorageManager evt; |
| |
| /** */ |
| private IgniteConfiguration igCfg; |
| |
| /** Persistence metrics tracker. */ |
| private DataStorageMetricsImpl metrics; |
| |
| /** */ |
| private File walWorkDir; |
| |
| /** WAL archive directory (including consistent ID as subfolder) */ |
| private File walArchiveDir; |
| |
| /** Serializer of latest version, used to read header record and for write records */ |
| private RecordSerializer serializer; |
| |
| /** Serializer latest version to use. */ |
| private final int serializerVersion = |
| IgniteSystemProperties.getInteger(IGNITE_WAL_SERIALIZER_VERSION, LATEST_SERIALIZER_VERSION); |
| |
| /** Latest segment cleared by {@link #truncate(WALPointer)}. */ |
| private volatile long lastTruncatedArchiveIdx = -1L; |
| |
| /** Factory to provide I/O interfaces for read/write operations with files */ |
| private final FileIOFactory ioFactory; |
| |
| /** Updater for {@link #currentHnd}, used for verify there are no concurrent update for current log segment handle */ |
| private static final AtomicReferenceFieldUpdater<FileWriteAheadLogManager, FileWriteHandle> currentHndUpd = |
| AtomicReferenceFieldUpdater.newUpdater(FileWriteAheadLogManager.class, FileWriteHandle.class, "currentHnd"); |
| |
| /** |
| * Thread local byte buffer for saving serialized WAL records chain, see {@link FileWriteHandle#head}. |
| * Introduced to decrease number of buffers allocation. |
| * Used only for record itself is shorter than {@link #tlbSize}. |
| */ |
| private final ThreadLocal<ByteBuffer> tlb = new ThreadLocal<ByteBuffer>() { |
| @Override protected ByteBuffer initialValue() { |
| ByteBuffer buf = ByteBuffer.allocateDirect(tlbSize); |
| |
| buf.order(GridUnsafe.NATIVE_BYTE_ORDER); |
| |
| return buf; |
| } |
| }; |
| |
| /** */ |
| private volatile FileArchiver archiver; |
| |
| /** Compressor. */ |
| private volatile FileCompressor compressor; |
| |
| /** Decompressor. */ |
| private volatile FileDecompressor decompressor; |
| |
| /** */ |
| private final ThreadLocal<WALPointer> lastWALPtr = new ThreadLocal<>(); |
| |
| /** Current log segment handle */ |
| private volatile FileWriteHandle currentHnd; |
| |
| /** Environment failure. */ |
| private volatile Throwable envFailed; |
| |
| /** |
| * Positive (non-0) value indicates WAL can be archived even if not complete<br> |
| * See {@link DataStorageConfiguration#setWalAutoArchiveAfterInactivity(long)}<br> |
| */ |
| private final long walAutoArchiveAfterInactivity; |
| |
| /** |
| * Container with last WAL record logged timestamp.<br> |
| * Zero value means there was no records logged to current segment, skip possible archiving for this case<br> |
| * Value is filled only for case {@link #walAutoArchiveAfterInactivity} > 0<br> |
| */ |
| private AtomicLong lastRecordLoggedMs = new AtomicLong(); |
| |
| /** |
| * Cancellable task for {@link WALMode#BACKGROUND}, should be cancelled at shutdown |
| * Null for non background modes |
| */ |
| @Nullable private volatile GridTimeoutProcessor.CancelableTask backgroundFlushSchedule; |
| |
| /** |
| * Reference to the last added next archive timeout check object. |
| * Null if mode is not enabled. |
| * Should be cancelled at shutdown |
| */ |
| @Nullable private volatile GridTimeoutObject nextAutoArchiveTimeoutObj; |
| |
| /** |
| * @param ctx Kernal context. |
| */ |
| public FileWriteAheadLogManager(@NotNull final GridKernalContext ctx) { |
| igCfg = ctx.config(); |
| |
| DataStorageConfiguration dsCfg = igCfg.getDataStorageConfiguration(); |
| |
| assert dsCfg != null; |
| |
| this.dsCfg = dsCfg; |
| |
| maxWalSegmentSize = dsCfg.getWalSegmentSize(); |
| mode = dsCfg.getWalMode(); |
| tlbSize = dsCfg.getWalThreadLocalBufferSize(); |
| flushFreq = dsCfg.getWalFlushFrequency(); |
| fsyncDelay = dsCfg.getWalFsyncDelayNanos(); |
| alwaysWriteFullPages = dsCfg.isAlwaysWriteFullPages(); |
| ioFactory = dsCfg.getFileIOFactory(); |
| walAutoArchiveAfterInactivity = dsCfg.getWalAutoArchiveAfterInactivity(); |
| evt = ctx.event(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start0() throws IgniteCheckedException { |
| if (!cctx.kernalContext().clientNode()) { |
| final PdsFolderSettings resolveFolders = cctx.kernalContext().pdsFolderResolver().resolveFolders(); |
| |
| checkWalConfiguration(); |
| |
| walWorkDir = initDirectory( |
| dsCfg.getWalPath(), |
| DataStorageConfiguration.DFLT_WAL_PATH, |
| resolveFolders.folderName(), |
| "write ahead log work directory" |
| ); |
| |
| walArchiveDir = initDirectory( |
| dsCfg.getWalArchivePath(), |
| DataStorageConfiguration.DFLT_WAL_ARCHIVE_PATH, |
| resolveFolders.folderName(), |
| "write ahead log archive directory" |
| ); |
| |
| serializer = new RecordSerializerFactoryImpl(cctx).createSerializer(serializerVersion); |
| |
| GridCacheDatabaseSharedManager dbMgr = (GridCacheDatabaseSharedManager)cctx.database(); |
| |
| metrics = dbMgr.persistentStoreMetricsImpl(); |
| |
| checkOrPrepareFiles(); |
| |
| IgniteBiTuple<Long, Long> tup = scanMinMaxArchiveIndices(); |
| |
| lastTruncatedArchiveIdx = tup == null ? -1 : tup.get1() - 1; |
| |
| archiver = new FileArchiver(tup == null ? -1 : tup.get2()); |
| |
| if (dsCfg.isWalCompactionEnabled()) { |
| compressor = new FileCompressor(); |
| |
| decompressor = new FileDecompressor(); |
| } |
| |
| if (mode != WALMode.NONE) { |
| if (log.isInfoEnabled()) |
| log.info("Started write-ahead log manager [mode=" + mode + ']'); |
| } |
| else |
| U.quietAndWarn(log, "Started write-ahead log manager in NONE mode, persisted data may be lost in " + |
| "a case of unexpected node failure. Make sure to deactivate the cluster before shutdown."); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException if WAL store path is configured and archive path isn't (or vice versa) |
| */ |
| private void checkWalConfiguration() throws IgniteCheckedException { |
| if (dsCfg.getWalPath() == null ^ dsCfg.getWalArchivePath() == null) { |
| throw new IgniteCheckedException( |
| "Properties should be either both specified or both null " + |
| "[walStorePath = " + dsCfg.getWalPath() + |
| ", walArchivePath = " + dsCfg.getWalArchivePath() + "]" |
| ); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void stop0(boolean cancel) { |
| final GridTimeoutProcessor.CancelableTask schedule = backgroundFlushSchedule; |
| |
| if (schedule != null) |
| schedule.close(); |
| |
| final GridTimeoutObject timeoutObj = nextAutoArchiveTimeoutObj; |
| |
| if (timeoutObj != null) |
| cctx.time().removeTimeoutObject(timeoutObj); |
| |
| final FileWriteHandle currHnd = currentHandle(); |
| |
| try { |
| if (mode == WALMode.BACKGROUND) { |
| if (currHnd != null) |
| currHnd.flush((FileWALPointer)null, true); |
| } |
| |
| if (currHnd != null) |
| currHnd.close(false); |
| |
| if (archiver != null) |
| archiver.shutdown(); |
| |
| if (compressor != null) |
| compressor.shutdown(); |
| |
| if (decompressor != null) |
| decompressor.shutdown(); |
| } |
| catch (Exception e) { |
| U.error(log, "Failed to gracefully close WAL segment: " + currentHnd.fileIO, e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Activated file write ahead log manager [nodeId=" + cctx.localNodeId() + |
| " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); |
| |
| start0(); |
| |
| if (!cctx.kernalContext().clientNode()) { |
| assert archiver != null; |
| archiver.start(); |
| |
| if (compressor != null) |
| compressor.start(); |
| |
| if (decompressor != null) |
| decompressor.start(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeActivate(GridKernalContext kctx) { |
| if (log.isDebugEnabled()) |
| log.debug("DeActivate file write ahead log [nodeId=" + cctx.localNodeId() + |
| " topVer=" + cctx.discovery().topologyVersionEx() + " ]"); |
| |
| stop0(true); |
| |
| currentHnd = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isAlwaysWriteFullPages() { |
| return alwaysWriteFullPages; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isFullSync() { |
| return mode == WALMode.DEFAULT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resumeLogging(WALPointer lastPtr) throws IgniteCheckedException { |
| try { |
| assert currentHnd == null; |
| assert lastPtr == null || lastPtr instanceof FileWALPointer; |
| |
| FileWALPointer filePtr = (FileWALPointer)lastPtr; |
| |
| currentHnd = restoreWriteHandle(filePtr); |
| |
| if (currentHnd.serializer.version() != serializer.version()) { |
| if (log.isInfoEnabled()) |
| log.info("Record serializer version change detected, will start logging with a new WAL record " + |
| "serializer to a new WAL segment [curFile=" + currentHnd + ", newVer=" + serializer.version() + |
| ", oldVer=" + currentHnd.serializer.version() + ']'); |
| |
| rollOver(currentHnd); |
| } |
| |
| if (mode == WALMode.BACKGROUND) { |
| backgroundFlushSchedule = cctx.time().schedule(new Runnable() { |
| @Override public void run() { |
| doFlush(); |
| } |
| }, flushFreq, flushFreq); |
| } |
| |
| if (walAutoArchiveAfterInactivity > 0) |
| scheduleNextInactivityPeriodElapsedCheck(); |
| } |
| catch (StorageException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| |
| /** |
| * Schedules next check of inactivity period expired. Based on current record update timestamp. |
| * At timeout method does check of inactivity period and schedules new launch. |
| */ |
| private void scheduleNextInactivityPeriodElapsedCheck() { |
| final long lastRecMs = lastRecordLoggedMs.get(); |
| final long nextPossibleAutoArchive = (lastRecMs <= 0 ? U.currentTimeMillis() : lastRecMs) + walAutoArchiveAfterInactivity; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Schedule WAL rollover check at " + new Time(nextPossibleAutoArchive).toString()); |
| |
| nextAutoArchiveTimeoutObj = new GridTimeoutObject() { |
| private final IgniteUuid id = IgniteUuid.randomUuid(); |
| |
| @Override public IgniteUuid timeoutId() { |
| return id; |
| } |
| |
| @Override public long endTime() { |
| return nextPossibleAutoArchive; |
| } |
| |
| @Override public void onTimeout() { |
| if (log.isDebugEnabled()) |
| log.debug("Checking if WAL rollover required (" + new Time(U.currentTimeMillis()).toString() + ")"); |
| |
| checkWalRolloverRequiredDuringInactivityPeriod(); |
| |
| scheduleNextInactivityPeriodElapsedCheck(); |
| } |
| }; |
| cctx.time().addTimeoutObject(nextAutoArchiveTimeoutObj); |
| } |
| |
| /** |
| * @return Latest serializer version. |
| */ |
| public int serializerVersion() { |
| return serializerVersion; |
| } |
| |
| /** |
| * Checks if there was elapsed significant period of inactivity. |
| * If WAL auto-archive is enabled using {@link #walAutoArchiveAfterInactivity} > 0 this method will activate |
| * roll over by timeout<br> |
| */ |
| private void checkWalRolloverRequiredDuringInactivityPeriod() { |
| if (walAutoArchiveAfterInactivity <= 0) |
| return; // feature not configured, nothing to do |
| |
| final long lastRecMs = lastRecordLoggedMs.get(); |
| |
| if (lastRecMs == 0) |
| return; //no records were logged to current segment, does not consider inactivity |
| |
| final long elapsedMs = U.currentTimeMillis() - lastRecMs; |
| |
| if (elapsedMs <= walAutoArchiveAfterInactivity) |
| return; // not enough time elapsed since last write |
| |
| if (!lastRecordLoggedMs.compareAndSet(lastRecMs, 0)) |
| return; // record write occurred concurrently |
| |
| final FileWriteHandle handle = currentHandle(); |
| |
| try { |
| rollOver(handle); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Unable to perform segment rollover: " + e.getMessage(), e); |
| handle.invalidateEnvironment(e); |
| } |
| } |
| |
| public static volatile boolean print = false; |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("TooBroadScope") |
| @Override public WALPointer log(WALRecord record) throws IgniteCheckedException, StorageException { |
| if (serializer == null || mode == WALMode.NONE) |
| return null; |
| |
| FileWriteHandle currWrHandle = currentHandle(); |
| |
| // Logging was not resumed yet. |
| if (currWrHandle == null) |
| return null; |
| |
| // Need to calculate record size first. |
| int size = serializer.size(record); |
| |
| if (print) |
| System.out.println("RECORD " + size + ": " + record.getClass().getSimpleName()); |
| |
| record.size(size); |
| |
| for (; ; currWrHandle = rollOver(currWrHandle)) { |
| WALPointer ptr = currWrHandle.addRecord(record); |
| |
| if (ptr != null) { |
| metrics.onWalRecordLogged(); |
| |
| lastWALPtr.set(ptr); |
| |
| if (walAutoArchiveAfterInactivity > 0) |
| lastRecordLoggedMs.set(U.currentTimeMillis()); |
| |
| return ptr; |
| } |
| |
| checkEnvironment(); |
| |
| if (isStopping()) |
| throw new IgniteCheckedException("Stopping."); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void fsync(WALPointer ptr) throws IgniteCheckedException, StorageException { |
| if (serializer == null || mode == WALMode.NONE) |
| return; |
| |
| FileWriteHandle cur = currentHandle(); |
| |
| // WAL manager was not started (client node). |
| if (cur == null) |
| return; |
| |
| FileWALPointer filePtr = (FileWALPointer)(ptr == null ? lastWALPtr.get() : ptr); |
| |
| boolean forceFlush = filePtr != null && filePtr.forceFlush(); |
| |
| if (mode == WALMode.BACKGROUND && !forceFlush) |
| return; |
| |
| if (mode == WALMode.LOG_ONLY || forceFlush) { |
| cur.flushOrWait(filePtr, false); |
| |
| return; |
| } |
| |
| // No need to sync if was rolled over. |
| if (filePtr != null && !cur.needFsync(filePtr)) |
| return; |
| |
| cur.fsync(filePtr, false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public WALIterator replay(WALPointer start) |
| throws IgniteCheckedException, StorageException { |
| assert start == null || start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| FileWriteHandle hnd = currentHandle(); |
| |
| FileWALPointer end = null; |
| |
| if (hnd != null) |
| end = hnd.position(); |
| |
| return new RecordsIterator( |
| cctx, |
| walWorkDir, |
| walArchiveDir, |
| (FileWALPointer)start, |
| end, |
| dsCfg, |
| new RecordSerializerFactoryImpl(cctx), |
| ioFactory, |
| archiver, |
| decompressor, |
| log |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean reserve(WALPointer start) throws IgniteCheckedException { |
| assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| if (mode == WALMode.NONE) |
| return false; |
| |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| throw new IgniteCheckedException("Could not reserve WAL segment: archiver == null"); |
| |
| archiver0.reserve(((FileWALPointer)start).index()); |
| |
| if (!hasIndex(((FileWALPointer)start).index())) { |
| archiver0.release(((FileWALPointer)start).index()); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void release(WALPointer start) throws IgniteCheckedException { |
| assert start != null && start instanceof FileWALPointer : "Invalid start pointer: " + start; |
| |
| if (mode == WALMode.NONE) |
| return; |
| |
| FileArchiver archiver0 = archiver; |
| |
| if (archiver0 == null) |
| throw new IgniteCheckedException("Could not release WAL segment: archiver == null"); |
| |
| archiver0.release(((FileWALPointer)start).index()); |
| } |
| |
| /** |
| * @param absIdx Absolulte index to check. |
| * @return {@code true} if has this index. |
| */ |
| private boolean hasIndex(long absIdx) { |
| String segmentName = FileDescriptor.fileName(absIdx); |
| |
| String zipSegmentName = FileDescriptor.fileName(absIdx) + ".zip"; |
| |
| boolean inArchive = new File(walArchiveDir, segmentName).exists() || |
| new File(walArchiveDir, zipSegmentName).exists(); |
| |
| if (inArchive) |
| return true; |
| |
| if (absIdx <= lastArchivedIndex()) |
| return false; |
| |
| FileWriteHandle cur = currentHnd; |
| |
| return cur != null && cur.idx >= absIdx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int truncate(WALPointer ptr) { |
| if (ptr == null) |
| return 0; |
| |
| assert ptr instanceof FileWALPointer : ptr; |
| |
| // File pointer bound: older entries will be deleted from archive |
| FileWALPointer fPtr = (FileWALPointer)ptr; |
| |
| FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)); |
| |
| int deleted = 0; |
| |
| FileArchiver archiver0 = archiver; |
| |
| for (FileDescriptor desc : descs) { |
| // Do not delete reserved or locked segment and any segment after it. |
| if (archiver0 != null && archiver0.reserved(desc.idx)) |
| return deleted; |
| |
| // We need to leave at least one archived segment to correctly determine the archive index. |
| if (desc.idx + 1 < fPtr.index()) { |
| if (!desc.file.delete()) |
| U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + |
| desc.file.getAbsolutePath()); |
| else |
| deleted++; |
| |
| // Bump up the oldest archive segment index. |
| if (lastTruncatedArchiveIdx < desc.idx) |
| lastTruncatedArchiveIdx = desc.idx; |
| } |
| } |
| |
| return deleted; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void allowCompressionUntil(WALPointer ptr) { |
| if (compressor != null) |
| compressor.allowCompressionUntil(((FileWALPointer)ptr).index()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int walArchiveSegments() { |
| long lastTruncated = lastTruncatedArchiveIdx; |
| |
| long lastArchived = archiver.lastArchivedAbsoluteIndex(); |
| |
| if (lastArchived == -1) |
| return 0; |
| |
| int res = (int)(lastArchived - lastTruncated); |
| |
| return res >= 0 ? res : 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean reserved(WALPointer ptr) { |
| FileWALPointer fPtr = (FileWALPointer)ptr; |
| |
| FileArchiver archiver0 = archiver; |
| |
| return archiver0 != null && archiver0.reserved(fPtr.index()); |
| } |
| |
| /** |
| * Lists files in archive directory and returns the index of last archived file. |
| * |
| * @return The absolute index of last archived file. |
| */ |
| private long lastArchivedIndex() { |
| long lastIdx = -1; |
| |
| for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { |
| try { |
| long idx = Long.parseLong(file.getName().substring(0, 16)); |
| |
| lastIdx = Math.max(lastIdx, idx); |
| } |
| catch (NumberFormatException | IndexOutOfBoundsException ignore) { |
| |
| } |
| } |
| |
| return lastIdx; |
| } |
| |
| /** |
| * Lists files in archive directory and returns the indices of least and last archived files. |
| * In case of holes, first segment after last "hole" is considered as minimum. |
| * Example: minimum(0, 1, 10, 11, 20, 21, 22) should be 20 |
| * |
| * @return The absolute indices of min and max archived files. |
| */ |
| private IgniteBiTuple<Long, Long> scanMinMaxArchiveIndices() { |
| TreeSet<Long> archiveIndices = new TreeSet<>(); |
| |
| for (File file : walArchiveDir.listFiles(WAL_SEGMENT_COMPACTED_OR_RAW_FILE_FILTER)) { |
| try { |
| long idx = Long.parseLong(file.getName().substring(0, 16)); |
| |
| archiveIndices.add(idx); |
| } |
| catch (NumberFormatException | IndexOutOfBoundsException ignore) { |
| // No-op. |
| } |
| } |
| |
| if (archiveIndices.isEmpty()) |
| return null; |
| else { |
| Long min = archiveIndices.first(); |
| Long max = archiveIndices.last(); |
| |
| if (max - min == archiveIndices.size() - 1) |
| return F.t(min, max); // Short path. |
| |
| for (Long idx : archiveIndices.descendingSet()) { |
| if (!archiveIndices.contains(idx - 1)) |
| return F.t(idx, max); |
| } |
| |
| throw new IllegalStateException("Should never happen if TreeSet is valid."); |
| } |
| } |
| |
| /** |
| * Creates a directory specified by the given arguments. |
| * |
| * @param cfg Configured directory path, may be {@code null}. |
| * @param defDir Default directory path, will be used if cfg is {@code null}. |
| * @param consId Local node consistent ID. |
| * @param msg File description to print out on successful initialization. |
| * @return Initialized directory. |
| * @throws IgniteCheckedException If failed to initialize directory. |
| */ |
| private File initDirectory(String cfg, String defDir, String consId, String msg) throws IgniteCheckedException { |
| File dir; |
| |
| if (cfg != null) { |
| File workDir0 = new File(cfg); |
| |
| dir = workDir0.isAbsolute() ? |
| new File(workDir0, consId) : |
| new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), cfg, false), consId); |
| } |
| else |
| dir = new File(U.resolveWorkDirectory(igCfg.getWorkDirectory(), defDir, false), consId); |
| |
| U.ensureDirectory(dir, msg, log); |
| |
| return dir; |
| } |
| |
| /** |
| * @return Current log segment handle. |
| */ |
| private FileWriteHandle currentHandle() { |
| return currentHnd; |
| } |
| |
| /** |
| * @param cur Handle that failed to fit the given entry. |
| * @return Handle that will fit the entry. |
| */ |
| private FileWriteHandle rollOver(FileWriteHandle cur) throws StorageException, IgniteCheckedException { |
| FileWriteHandle hnd = currentHandle(); |
| |
| if (hnd != cur) |
| return hnd; |
| |
| if (hnd.close(true)) { |
| FileWriteHandle next = initNextWriteHandle(cur.idx); |
| |
| boolean swapped = currentHndUpd.compareAndSet(this, hnd, next); |
| |
| assert swapped : "Concurrent updates on rollover are not allowed"; |
| |
| if (walAutoArchiveAfterInactivity > 0) |
| lastRecordLoggedMs.set(0); |
| |
| // Let other threads to proceed with new segment. |
| hnd.signalNextAvailable(); |
| } |
| else |
| hnd.awaitNext(); |
| |
| return currentHandle(); |
| } |
| |
| /** |
| * @param lastReadPtr Last read WAL file pointer. |
| * @return Initialized file write handle. |
| * @throws IgniteCheckedException If failed to initialize WAL write handle. |
| */ |
| private FileWriteHandle restoreWriteHandle(FileWALPointer lastReadPtr) throws IgniteCheckedException { |
| long absIdx = lastReadPtr == null ? 0 : lastReadPtr.index(); |
| |
| long segNo = absIdx % dsCfg.getWalSegments(); |
| |
| File curFile = new File(walWorkDir, FileDescriptor.fileName(segNo)); |
| |
| int offset = lastReadPtr == null ? 0 : lastReadPtr.fileOffset(); |
| int len = lastReadPtr == null ? 0 : lastReadPtr.length(); |
| |
| try { |
| FileIO fileIO = ioFactory.create(curFile); |
| |
| try { |
| int serVer = serializerVersion; |
| |
| // If we have existing segment, try to read version from it. |
| if (lastReadPtr != null) { |
| try { |
| serVer = readSerializerVersionAndCompactedFlag(fileIO).get1(); |
| } |
| catch (SegmentEofException | EOFException ignore) { |
| serVer = serializerVersion; |
| } |
| } |
| |
| RecordSerializer ser = new RecordSerializerFactoryImpl(cctx).createSerializer(serVer); |
| |
| if (log.isInfoEnabled()) |
| log.info("Resuming logging to WAL segment [file=" + curFile.getAbsolutePath() + |
| ", offset=" + offset + ", ver=" + serVer + ']'); |
| |
| FileWriteHandle hnd = new FileWriteHandle( |
| fileIO, |
| absIdx, |
| cctx.igniteInstanceName(), |
| offset + len, |
| maxWalSegmentSize, |
| ser); |
| |
| // For new handle write serializer version to it. |
| if (lastReadPtr == null) |
| hnd.writeSerializerVersion(); |
| |
| archiver.currentWalIndex(absIdx); |
| |
| return hnd; |
| } |
| catch (IgniteCheckedException | IOException e) { |
| fileIO.close(); |
| |
| throw e; |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to restore WAL write handle: " + curFile.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * Fills the file header for a new segment. |
| * Calling this method signals we are done with the segment and it can be archived. |
| * If we don't have prepared file yet and achiever is busy this method blocks |
| * |
| * @param curIdx current absolute segment released by WAL writer |
| * @return Initialized file handle. |
| * @throws StorageException If IO exception occurred. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private FileWriteHandle initNextWriteHandle(long curIdx) throws StorageException, IgniteCheckedException { |
| try { |
| File nextFile = pollNextFile(curIdx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Switching to a new WAL segment: " + nextFile.getAbsolutePath()); |
| |
| FileIO fileIO = ioFactory.create(nextFile); |
| |
| FileWriteHandle hnd = new FileWriteHandle( |
| fileIO, |
| curIdx + 1, |
| cctx.igniteInstanceName(), |
| 0, |
| maxWalSegmentSize, |
| serializer); |
| |
| hnd.writeSerializerVersion(); |
| |
| return hnd; |
| } |
| catch (IOException e) { |
| throw new StorageException(e); |
| } |
| } |
| |
| /** |
| * Deletes temp files, creates and prepares new; Creates first segment if necessary |
| */ |
| private void checkOrPrepareFiles() throws IgniteCheckedException { |
| // Clean temp files. |
| { |
| File[] tmpFiles = walWorkDir.listFiles(WAL_SEGMENT_TEMP_FILE_FILTER); |
| |
| if (!F.isEmpty(tmpFiles)) { |
| for (File tmp : tmpFiles) { |
| boolean deleted = tmp.delete(); |
| |
| if (!deleted) |
| throw new IgniteCheckedException("Failed to delete previously created temp file " + |
| "(make sure Ignite process has enough rights): " + tmp.getAbsolutePath()); |
| } |
| } |
| } |
| |
| File[] allFiles = walWorkDir.listFiles(WAL_SEGMENT_FILE_FILTER); |
| |
| if (allFiles.length != 0 && allFiles.length > dsCfg.getWalSegments()) |
| throw new IgniteCheckedException("Failed to initialize wal (work directory contains " + |
| "incorrect number of segments) [cur=" + allFiles.length + ", expected=" + dsCfg.getWalSegments() + ']'); |
| |
| // Allocate the first segment synchronously. All other segments will be allocated by archiver in background. |
| if (allFiles.length == 0) { |
| File first = new File(walWorkDir, FileDescriptor.fileName(0)); |
| |
| createFile(first); |
| } |
| else |
| checkFiles(0, false, null); |
| } |
| |
| /** |
| * Clears the file with zeros. |
| * |
| * @param file File to format. |
| */ |
| private void formatFile(File file) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Formatting file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); |
| |
| try (FileIO fileIO = ioFactory.create(file, CREATE, READ, WRITE)) { |
| int left = dsCfg.getWalSegmentSize(); |
| |
| if (mode == WALMode.DEFAULT) { |
| while (left > 0) { |
| int toWrite = Math.min(FILL_BUF.length, left); |
| |
| fileIO.write(FILL_BUF, 0, toWrite); |
| |
| left -= toWrite; |
| } |
| |
| fileIO.force(); |
| } |
| else |
| fileIO.clear(); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to format WAL segment file: " + file.getAbsolutePath(), e); |
| } |
| } |
| |
| /** |
| * Creates a file atomically with temp file. |
| * |
| * @param file File to create. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void createFile(File file) throws IgniteCheckedException { |
| if (log.isDebugEnabled()) |
| log.debug("Creating new file [exists=" + file.exists() + ", file=" + file.getAbsolutePath() + ']'); |
| |
| File tmp = new File(file.getParent(), file.getName() + ".tmp"); |
| |
| formatFile(tmp); |
| |
| try { |
| Files.move(tmp.toPath(), file.toPath()); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to move temp file to a regular WAL segment file: " + |
| file.getAbsolutePath(), e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created WAL segment [file=" + file.getAbsolutePath() + ", size=" + file.length() + ']'); |
| } |
| |
| /** |
| * Retrieves next available file to write WAL data, waiting |
| * if necessary for a segment to become available. |
| * |
| * @param curIdx Current absolute WAL segment index. |
| * @return File ready for use as new WAL segment. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private File pollNextFile(long curIdx) throws IgniteCheckedException { |
| // Signal to archiver that we are done with the segment and it can be archived. |
| long absNextIdx = archiver.nextAbsoluteSegmentIndex(curIdx); |
| |
| long segmentIdx = absNextIdx % dsCfg.getWalSegments(); |
| |
| return new File(walWorkDir, FileDescriptor.fileName(segmentIdx)); |
| } |
| |
| |
| /** |
| * @return Sorted WAL files descriptors. |
| */ |
| public static FileDescriptor[] scan(File[] allFiles) { |
| if (allFiles == null) |
| return EMPTY_DESCRIPTORS; |
| |
| FileDescriptor[] descs = new FileDescriptor[allFiles.length]; |
| |
| for (int i = 0; i < allFiles.length; i++) { |
| File f = allFiles[i]; |
| |
| descs[i] = new FileDescriptor(f); |
| } |
| |
| Arrays.sort(descs); |
| |
| return descs; |
| } |
| |
| /** |
| * @throws StorageException If environment is no longer valid and we missed a WAL write. |
| */ |
| private void checkEnvironment() throws StorageException { |
| if (envFailed != null) |
| throw new StorageException("Failed to flush WAL buffer (environment was invalidated by a " + |
| "previous error)", envFailed); |
| } |
| |
| /** |
| * File archiver operates on absolute segment indexes. For any given absolute segment index N we can calculate |
| * the work WAL segment: S(N) = N % dsCfg.walSegments. |
| * When a work segment is finished, it is given to the archiver. If the absolute index of last archived segment |
| * is denoted by A and the absolute index of next segment we want to write is denoted by W, then we can allow |
| * write to S(W) if W - A <= walSegments. <br> |
| * |
| * Monitor of current object is used for notify on: |
| * <ul> |
| * <li>exception occurred ({@link FileArchiver#cleanException}!=null)</li> |
| * <li>stopping thread ({@link FileArchiver#stopped}==true)</li> |
| * <li>current file index changed ({@link FileArchiver#curAbsWalIdx})</li> |
| * <li>last archived file index was changed ({@link FileArchiver#lastAbsArchivedIdx})</li> |
| * <li>some WAL index was removed from {@link FileArchiver#locked} map</li> |
| * </ul> |
| */ |
| private class FileArchiver extends Thread { |
| /** Exception which occurred during initial creation of files or during archiving WAL segment */ |
| private IgniteCheckedException cleanException; |
| |
| /** |
| * Absolute current segment index WAL Manager writes to. Guarded by <code>this</code>. |
| * Incremented during rollover. Also may be directly set if WAL is resuming logging after start. |
| */ |
| private long curAbsWalIdx = -1; |
| |
| /** Last archived file index (absolute, 0-based). Guarded by <code>this</code>. */ |
| private volatile long lastAbsArchivedIdx = -1; |
| |
| /** current thread stopping advice */ |
| private volatile boolean stopped; |
| |
| /** */ |
| private NavigableMap<Long, Integer> reserved = new TreeMap<>(); |
| |
| /** |
| * Maps absolute segment index to locks counter. Lock on segment protects from archiving segment and may |
| * come from {@link RecordsIterator} during WAL replay. Map itself is guarded by <code>this</code>. |
| */ |
| private Map<Long, Integer> locked = new HashMap<>(); |
| |
| /** |
| * |
| */ |
| private FileArchiver(long lastAbsArchivedIdx) { |
| super("wal-file-archiver%" + cctx.igniteInstanceName()); |
| |
| this.lastAbsArchivedIdx = lastAbsArchivedIdx; |
| } |
| |
| /** |
| * @return Last archived segment absolute index. |
| */ |
| private long lastArchivedAbsoluteIndex() { |
| return lastAbsArchivedIdx; |
| } |
| |
| /** |
| * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. |
| */ |
| private void shutdown() throws IgniteInterruptedCheckedException { |
| synchronized (this) { |
| stopped = true; |
| |
| notifyAll(); |
| } |
| |
| U.join(this); |
| } |
| |
| /** |
| * @param curAbsWalIdx Current absolute WAL segment index. |
| */ |
| private void currentWalIndex(long curAbsWalIdx) { |
| synchronized (this) { |
| this.curAbsWalIdx = curAbsWalIdx; |
| |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * @param absIdx Index for reservation. |
| */ |
| private synchronized void reserve(long absIdx) { |
| Integer cur = reserved.get(absIdx); |
| |
| if (cur == null) |
| reserved.put(absIdx, 1); |
| else |
| reserved.put(absIdx, cur + 1); |
| } |
| |
| /** |
| * Check if WAL segment locked or reserved |
| * |
| * @param absIdx Index for check reservation. |
| * @return {@code True} if index is reserved. |
| */ |
| private synchronized boolean reserved(long absIdx) { |
| return locked.containsKey(absIdx) || reserved.floorKey(absIdx) != null; |
| } |
| |
| /** |
| * @param absIdx Reserved index. |
| */ |
| private synchronized void release(long absIdx) { |
| Integer cur = reserved.get(absIdx); |
| |
| assert cur != null && cur >= 1 : cur; |
| |
| if (cur == 1) |
| reserved.remove(absIdx); |
| else |
| reserved.put(absIdx, cur - 1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| try { |
| allocateRemainingFiles(); |
| } |
| catch (IgniteCheckedException e) { |
| synchronized (this) { |
| // Stop the thread and report to starter. |
| cleanException = e; |
| |
| notifyAll(); |
| |
| return; |
| } |
| } |
| |
| try { |
| synchronized (this) { |
| while (curAbsWalIdx == -1 && !stopped) |
| wait(); |
| |
| if (curAbsWalIdx != 0 && lastAbsArchivedIdx == -1) |
| changeLastArchivedIndexAndWakeupCompressor(curAbsWalIdx - 1); |
| } |
| |
| while (!Thread.currentThread().isInterrupted() && !stopped) { |
| long toArchive; |
| |
| synchronized (this) { |
| assert lastAbsArchivedIdx <= curAbsWalIdx : "lastArchived=" + lastAbsArchivedIdx + |
| ", current=" + curAbsWalIdx; |
| |
| while (lastAbsArchivedIdx >= curAbsWalIdx - 1 && !stopped) |
| wait(); |
| |
| toArchive = lastAbsArchivedIdx + 1; |
| } |
| |
| if (stopped) |
| break; |
| |
| try { |
| final SegmentArchiveResult res = archiveSegment(toArchive); |
| |
| synchronized (this) { |
| while (locked.containsKey(toArchive) && !stopped) |
| wait(); |
| |
| // Firstly, format working file |
| if (!stopped) |
| formatFile(res.getOrigWorkFile()); |
| |
| // Then increase counter to allow rollover on clean working file |
| changeLastArchivedIndexAndWakeupCompressor(toArchive); |
| |
| notifyAll(); |
| } |
| if (evt.isRecordable(EventType.EVT_WAL_SEGMENT_ARCHIVED)) |
| evt.record(new WalSegmentArchivedEvent(cctx.discovery().localNode(), |
| res.getAbsIdx(), res.getDstArchiveFile())); |
| } |
| catch (IgniteCheckedException e) { |
| synchronized (this) { |
| cleanException = e; |
| |
| notifyAll(); |
| } |
| } |
| } |
| } |
| catch (InterruptedException ignore) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * @param idx Index. |
| */ |
| private void changeLastArchivedIndexAndWakeupCompressor(long idx) { |
| lastAbsArchivedIdx = idx; |
| |
| if (compressor != null) |
| compressor.onNextSegmentArchived(); |
| } |
| |
| /** |
| * Gets the absolute index of the next WAL segment available to write. |
| * Blocks till there are available file to write |
| * |
| * @param curIdx Current absolute index that we want to increment. |
| * @return Next index (curWalSegmIdx+1) when it is ready to be written. |
| * @throws IgniteCheckedException If failed (if interrupted or if exception occurred in the archiver thread). |
| */ |
| private long nextAbsoluteSegmentIndex(long curIdx) throws IgniteCheckedException { |
| try { |
| synchronized (this) { |
| if (cleanException != null) |
| throw cleanException; |
| |
| assert curIdx == curAbsWalIdx; |
| |
| curAbsWalIdx++; |
| |
| // Notify archiver thread. |
| notifyAll(); |
| |
| while (curAbsWalIdx - lastAbsArchivedIdx > dsCfg.getWalSegments() && cleanException == null) |
| wait(); |
| |
| return curAbsWalIdx; |
| } |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| throw new IgniteInterruptedCheckedException(e); |
| } |
| } |
| |
| /** |
| * @param absIdx Segment absolute index. |
| * @return {@code True} if can read, {@code false} if work segment |
| */ |
| @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") |
| private boolean checkCanReadArchiveOrReserveWorkSegment(long absIdx) { |
| synchronized (this) { |
| if (lastAbsArchivedIdx >= absIdx) |
| return true; |
| |
| Integer cur = locked.get(absIdx); |
| |
| cur = cur == null ? 1 : cur + 1; |
| |
| locked.put(absIdx, cur); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reserved work segment [absIdx=" + absIdx + ", pins=" + cur + ']'); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * @param absIdx Segment absolute index. |
| */ |
| @SuppressWarnings("NonPrivateFieldAccessedInSynchronizedContext") |
| private void releaseWorkSegment(long absIdx) { |
| synchronized (this) { |
| Integer cur = locked.get(absIdx); |
| |
| assert cur != null && cur > 0; |
| |
| if (cur == 1) { |
| locked.remove(absIdx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Fully released work segment (ready to archive) [absIdx=" + absIdx + ']'); |
| } |
| else { |
| locked.put(absIdx, cur - 1); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partially released work segment [absIdx=" + absIdx + ", pins=" + (cur - 1) + ']'); |
| } |
| |
| notifyAll(); |
| } |
| } |
| |
| /** |
| * Moves WAL segment from work folder to archive folder. |
| * Temp file is used to do movement |
| * |
| * @param absIdx Absolute index to archive. |
| */ |
| private SegmentArchiveResult archiveSegment(long absIdx) throws IgniteCheckedException { |
| long segIdx = absIdx % dsCfg.getWalSegments(); |
| |
| File origFile = new File(walWorkDir, FileDescriptor.fileName(segIdx)); |
| |
| String name = FileDescriptor.fileName(absIdx); |
| |
| File dstTmpFile = new File(walArchiveDir, name + ".tmp"); |
| |
| File dstFile = new File(walArchiveDir, name); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Starting to copy WAL segment [absIdx=" + absIdx + ", segIdx=" + segIdx + |
| ", origFile=" + origFile.getAbsolutePath() + ", dstFile=" + dstFile.getAbsolutePath() + ']'); |
| |
| try { |
| Files.deleteIfExists(dstTmpFile.toPath()); |
| |
| Files.copy(origFile.toPath(), dstTmpFile.toPath()); |
| |
| Files.move(dstTmpFile.toPath(), dstFile.toPath()); |
| |
| if (mode == WALMode.DEFAULT) { |
| try (FileIO f0 = ioFactory.create(dstFile, CREATE, READ, WRITE)) { |
| f0.force(); |
| } |
| } |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Failed to archive WAL segment [" + |
| "srcFile=" + origFile.getAbsolutePath() + |
| ", dstFile=" + dstTmpFile.getAbsolutePath() + ']', e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Copied file [src=" + origFile.getAbsolutePath() + |
| ", dst=" + dstFile.getAbsolutePath() + ']'); |
| |
| return new SegmentArchiveResult(absIdx, origFile, dstFile); |
| } |
| |
| /** |
| * |
| */ |
| private boolean checkStop() { |
| return stopped; |
| } |
| |
| /** |
| * Background creation of all segments except first. First segment was created in main thread by |
| * {@link FileWriteAheadLogManager#checkOrPrepareFiles()} |
| */ |
| private void allocateRemainingFiles() throws IgniteCheckedException { |
| checkFiles(1, true, new IgnitePredicate<Integer>() { |
| @Override public boolean apply(Integer integer) { |
| return !checkStop(); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Responsible for compressing WAL archive segments. |
| * Also responsible for deleting raw copies of already compressed WAL archive segments if they are not reserved. |
| */ |
| private class FileCompressor extends Thread { |
| /** Current thread stopping advice. */ |
| private volatile boolean stopped; |
| |
| /** Last successfully compressed segment. */ |
| private volatile long lastCompressedIdx = -1L; |
| |
| /** All segments prior to this (inclusive) can be compressed. */ |
| private volatile long lastAllowedToCompressIdx = -1L; |
| |
| /** |
| * |
| */ |
| FileCompressor() { |
| super("wal-file-compressor%" + cctx.igniteInstanceName()); |
| } |
| |
| /** |
| * |
| */ |
| private void init() { |
| File[] toDel = walArchiveDir.listFiles(WAL_SEGMENT_TEMP_FILE_COMPACTED_FILTER); |
| |
| for (File f : toDel) { |
| if (stopped) |
| return; |
| |
| f.delete(); |
| } |
| |
| FileDescriptor[] alreadyCompressed = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_COMPACTED_FILTER)); |
| |
| if (alreadyCompressed.length > 0) |
| lastCompressedIdx = alreadyCompressed[alreadyCompressed.length - 1].getIdx(); |
| } |
| |
| /** |
| * @param lastCpStartIdx Segment index to allow compression until (exclusively). |
| */ |
| synchronized void allowCompressionUntil(long lastCpStartIdx) { |
| lastAllowedToCompressIdx = lastCpStartIdx - 1; |
| |
| notify(); |
| } |
| |
| /** |
| * Callback for waking up compressor when new segment is archived. |
| */ |
| synchronized void onNextSegmentArchived() { |
| notify(); |
| } |
| |
| /** |
| * Pessimistically tries to reserve segment for compression in order to avoid concurrent truncation. |
| * Waits if there's no segment to archive right now. |
| */ |
| private long tryReserveNextSegmentOrWait() throws InterruptedException, IgniteCheckedException { |
| long segmentToCompress = lastCompressedIdx + 1; |
| |
| synchronized (this) { |
| while (segmentToCompress > Math.min(lastAllowedToCompressIdx, archiver.lastArchivedAbsoluteIndex())) { |
| wait(); |
| |
| if (stopped) |
| return -1; |
| } |
| } |
| |
| segmentToCompress = Math.max(segmentToCompress, lastTruncatedArchiveIdx + 1); |
| |
| boolean reserved = reserve(new FileWALPointer(segmentToCompress, 0, 0)); |
| |
| return reserved ? segmentToCompress : -1; |
| } |
| |
| /** |
| * |
| */ |
| private void deleteObsoleteRawSegments() { |
| FileDescriptor[] descs = scan(walArchiveDir.listFiles(WAL_SEGMENT_FILE_FILTER)); |
| |
| FileArchiver archiver0 = archiver; |
| |
| for (FileDescriptor desc : descs) { |
| // Do not delete reserved or locked segment and any segment after it. |
| if (archiver0 != null && archiver0.reserved(desc.idx)) |
| return; |
| |
| if (desc.idx < lastCompressedIdx) { |
| if (!desc.file.delete()) |
| U.warn(log, "Failed to remove obsolete WAL segment (make sure the process has enough rights): " + |
| desc.file.getAbsolutePath() + ", exists: " + desc.file.exists()); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| init(); |
| |
| while (!Thread.currentThread().isInterrupted() && !stopped) { |
| try { |
| deleteObsoleteRawSegments(); |
| |
| long nextSegment = tryReserveNextSegmentOrWait(); |
| if (nextSegment == -1) |
| continue; |
| |
| File tmpZip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip" + ".tmp"); |
| |
| File zip = new File(walArchiveDir, FileDescriptor.fileName(nextSegment) + ".zip"); |
| |
| File raw = new File(walArchiveDir, FileDescriptor.fileName(nextSegment)); |
| if (!Files.exists(raw.toPath())) |
| throw new IgniteCheckedException("WAL archive segment is missing: " + raw); |
| |
| compressSegmentToFile(nextSegment, raw, tmpZip); |
| |
| Files.move(tmpZip.toPath(), zip.toPath()); |
| |
| if (mode == WALMode.DEFAULT) { |
| try (FileIO f0 = ioFactory.create(zip, CREATE, READ, WRITE)) { |
| f0.force(); |
| } |
| } |
| |
| lastCompressedIdx = nextSegment; |
| } |
| catch (IgniteCheckedException | IOException e) { |
| U.error(log, "Unexpected error during WAL compression", e); |
| |
| FileWriteHandle handle = currentHandle(); |
| |
| if (handle != null) |
| handle.invalidateEnvironment(e); |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| |
| /** |
| * @param nextSegment Next segment absolute idx. |
| * @param raw Raw file. |
| * @param zip Zip file. |
| */ |
| private void compressSegmentToFile(long nextSegment, File raw, File zip) |
| throws IOException, IgniteCheckedException { |
| int segmentSerializerVer; |
| |
| try (FileIO fileIO = ioFactory.create(raw)) { |
| IgniteBiTuple<Integer, Boolean> tup = FileWriteAheadLogManager.readSerializerVersionAndCompactedFlag(fileIO); |
| |
| segmentSerializerVer = tup.get1(); |
| } |
| |
| try (ZipOutputStream zos = new ZipOutputStream(new BufferedOutputStream(new FileOutputStream(zip)))) { |
| zos.putNextEntry(new ZipEntry("")); |
| |
| zos.write(prepareSerializerVersionBuffer(nextSegment, segmentSerializerVer, true).array()); |
| |
| final CIX1<WALRecord> appendToZipC = new CIX1<WALRecord>() { |
| @Override public void applyx(WALRecord record) throws IgniteCheckedException { |
| final MarshalledRecord marshRec = (MarshalledRecord)record; |
| |
| try { |
| zos.write(marshRec.buffer().array(), 0, marshRec.buffer().remaining()); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| }; |
| |
| try (SingleSegmentLogicalRecordsIterator iter = new SingleSegmentLogicalRecordsIterator( |
| log, cctx, ioFactory, tlbSize, nextSegment, walArchiveDir, appendToZipC)) { |
| |
| while (iter.hasNextX()) |
| iter.nextX(); |
| } |
| } |
| finally { |
| release(new FileWALPointer(nextSegment, 0, 0)); |
| } |
| } |
| |
| /** |
| * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. |
| */ |
| private void shutdown() throws IgniteInterruptedCheckedException { |
| synchronized (this) { |
| stopped = true; |
| |
| notifyAll(); |
| } |
| |
| U.join(this); |
| } |
| } |
| |
| /** |
| * Responsible for decompressing previously compressed segments of WAL archive if they are needed for replay. |
| */ |
| private class FileDecompressor extends Thread { |
| /** Current thread stopping advice. */ |
| private volatile boolean stopped; |
| |
| /** Decompression futures. */ |
| private Map<Long, GridFutureAdapter<Void>> decompressionFutures = new HashMap<>(); |
| |
| /** Segments queue. */ |
| private PriorityBlockingQueue<Long> segmentsQueue = new PriorityBlockingQueue<>(); |
| |
| /** Byte array for draining data. */ |
| private byte[] arr = new byte[tlbSize]; |
| |
| /** |
| * |
| */ |
| FileDecompressor() { |
| super("wal-file-decompressor%" + cctx.igniteInstanceName()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| while (!Thread.currentThread().isInterrupted() && !stopped) { |
| try { |
| long segmentToDecompress = segmentsQueue.take(); |
| |
| if (stopped) |
| break; |
| |
| File zip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".zip"); |
| File unzipTmp = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress) + ".tmp"); |
| File unzip = new File(walArchiveDir, FileDescriptor.fileName(segmentToDecompress)); |
| |
| try (ZipInputStream zis = new ZipInputStream(new BufferedInputStream(new FileInputStream(zip))); |
| FileIO io = ioFactory.create(unzipTmp)) { |
| zis.getNextEntry(); |
| |
| int bytesRead; |
| while ((bytesRead = zis.read(arr)) > 0) |
| io.write(arr, 0, bytesRead); |
| } |
| |
| Files.move(unzipTmp.toPath(), unzip.toPath()); |
| |
| synchronized (this) { |
| decompressionFutures.remove(segmentToDecompress).onDone(); |
| } |
| } |
| catch (InterruptedException e){ |
| Thread.currentThread().interrupt(); |
| } |
| catch (IOException e) { |
| U.error(log, "Unexpected error during WAL decompression", e); |
| |
| FileWriteHandle handle = currentHandle(); |
| |
| if (handle != null) |
| handle.invalidateEnvironment(e); |
| } |
| } |
| } |
| |
| /** |
| * Asynchronously decompresses WAL segment which is present only in .zip file. |
| * |
| * @return Future which is completed once file is decompressed. |
| */ |
| synchronized IgniteInternalFuture<Void> decompressFile(long idx) { |
| if (decompressionFutures.containsKey(idx)) |
| return decompressionFutures.get(idx); |
| |
| File f = new File(walArchiveDir, FileDescriptor.fileName(idx)); |
| |
| if (f.exists()) |
| return new GridFinishedFuture<>(); |
| |
| segmentsQueue.put(idx); |
| |
| GridFutureAdapter<Void> res = new GridFutureAdapter<>(); |
| |
| decompressionFutures.put(idx, res); |
| |
| return res; |
| } |
| |
| /** |
| * @throws IgniteInterruptedCheckedException If failed to wait for thread shutdown. |
| */ |
| private void shutdown() throws IgniteInterruptedCheckedException { |
| synchronized (this) { |
| stopped = true; |
| |
| // Put fake -1 to wake thread from queue.take() |
| segmentsQueue.put(-1L); |
| } |
| |
| U.join(this); |
| } |
| } |
| |
| /** |
| * Validate files depending on {@link DataStorageConfiguration#getWalSegments()} and create if need. |
| * Check end when exit condition return false or all files are passed. |
| * |
| * @param startWith Start with. |
| * @param create Flag create file. |
| * @param p Predicate Exit condition. |
| * @throws IgniteCheckedException if validation or create file fail. |
| */ |
| private void checkFiles(int startWith, boolean create, IgnitePredicate<Integer> p) throws IgniteCheckedException { |
| for (int i = startWith; i < dsCfg.getWalSegments() && (p == null || (p != null && p.apply(i))); i++) { |
| File checkFile = new File(walWorkDir, FileDescriptor.fileName(i)); |
| |
| if (checkFile.exists()) { |
| if (checkFile.isDirectory()) |
| throw new IgniteCheckedException("Failed to initialize WAL log segment (a directory with " + |
| "the same name already exists): " + checkFile.getAbsolutePath()); |
| else if (checkFile.length() != dsCfg.getWalSegmentSize() && mode == WALMode.DEFAULT) |
| throw new IgniteCheckedException("Failed to initialize WAL log segment " + |
| "(WAL segment size change is not supported):" + checkFile.getAbsolutePath()); |
| } |
| else if (create) |
| createFile(checkFile); |
| } |
| } |
| |
| /** |
| * Reads record serializer version from provided {@code io} along with compacted flag. |
| * NOTE: Method mutates position of {@code io}. |
| * |
| * @param io I/O interface for file. |
| * @return Serializer version stored in the file. |
| * @throws IgniteCheckedException If failed to read serializer version. |
| */ |
| public static IgniteBiTuple<Integer, Boolean> readSerializerVersionAndCompactedFlag(FileIO io) |
| throws IgniteCheckedException, IOException { |
| try (ByteBufferExpander buf = new ByteBufferExpander(RecordV1Serializer.HEADER_RECORD_SIZE, ByteOrder.nativeOrder())) { |
| FileInput in = new FileInput(io, buf); |
| |
| in.ensure(RecordV1Serializer.HEADER_RECORD_SIZE); |
| |
| int recordType = in.readUnsignedByte(); |
| |
| if (recordType == WALRecord.RecordType.STOP_ITERATION_RECORD_TYPE) |
| throw new SegmentEofException("Reached logical end of the segment", null); |
| |
| WALRecord.RecordType type = WALRecord.RecordType.fromOrdinal(recordType - 1); |
| |
| if (type != WALRecord.RecordType.HEADER_RECORD) |
| throw new IOException("Can't read serializer version", null); |
| |
| // Read file pointer. |
| FileWALPointer ptr = RecordV1Serializer.readPosition(in); |
| |
| assert ptr.fileOffset() == 0 : "Header record should be placed at the beginning of file " + ptr; |
| |
| long hdrMagicNum = in.readLong(); |
| |
| boolean compacted; |
| if (hdrMagicNum == HeaderRecord.REGULAR_MAGIC) |
| compacted = false; |
| else if (hdrMagicNum == HeaderRecord.COMPACTED_MAGIC) |
| compacted = true; |
| else { |
| throw new IOException("Magic is corrupted [exp=" + U.hexLong(HeaderRecord.REGULAR_MAGIC) + |
| ", actual=" + U.hexLong(hdrMagicNum) + ']'); |
| } |
| |
| // Read serializer version. |
| int ver = in.readInt(); |
| |
| // Read and skip CRC. |
| in.readInt(); |
| |
| return new IgniteBiTuple<>(ver, compacted); |
| } |
| } |
| |
| /** |
| * Writes record serializer version to provided {@code io}. |
| * NOTE: Method mutates position of {@code io}. |
| * |
| * @param io I/O interface for file. |
| * @param idx Segment index. |
| * @param version Serializer version. |
| * @return I/O position after write version. |
| * @throws IOException If failed to write serializer version. |
| */ |
| public static long writeSerializerVersion(FileIO io, long idx, int version) throws IOException { |
| ByteBuffer buffer = prepareSerializerVersionBuffer(idx, version, false); |
| |
| do { |
| io.write(buffer); |
| } |
| while (buffer.hasRemaining()); |
| |
| // Flush |
| io.force(); |
| |
| return io.position(); |
| } |
| |
| /** |
| * @param idx Index. |
| * @param ver Version. |
| * @param compacted Compacted flag. |
| */ |
| @NotNull private static ByteBuffer prepareSerializerVersionBuffer(long idx, int ver, boolean compacted) { |
| ByteBuffer buf = ByteBuffer.allocate(RecordV1Serializer.HEADER_RECORD_SIZE); |
| buf.order(ByteOrder.nativeOrder()); |
| |
| // Write record type. |
| buf.put((byte) (WALRecord.RecordType.HEADER_RECORD.ordinal() + 1)); |
| |
| // Write position. |
| RecordV1Serializer.putPosition(buf, new FileWALPointer(idx, 0, 0)); |
| |
| // Place magic number. |
| buf.putLong(compacted ? HeaderRecord.COMPACTED_MAGIC : HeaderRecord.REGULAR_MAGIC); |
| |
| // Place serializer version. |
| buf.putInt(ver); |
| |
| // Place CRC if needed. |
| if (!RecordV1Serializer.SKIP_CRC) { |
| int curPos = buf.position(); |
| |
| buf.position(0); |
| |
| // This call will move buffer position to the end of the record again. |
| int crcVal = PureJavaCrc32.calcCrc32(buf, curPos); |
| |
| buf.putInt(crcVal); |
| } |
| else |
| buf.putInt(0); |
| |
| // Write header record through io. |
| buf.position(0); |
| |
| return buf; |
| } |
| |
| /** |
| * WAL file descriptor. |
| */ |
| public static class FileDescriptor implements Comparable<FileDescriptor> { |
| /** */ |
| protected final File file; |
| |
| /** Absolute WAL segment file index */ |
| protected final long idx; |
| |
| /** |
| * Creates file descriptor. Index is restored from file name |
| * |
| * @param file WAL segment file. |
| */ |
| public FileDescriptor(@NotNull File file) { |
| this(file, null); |
| } |
| |
| /** |
| * @param file WAL segment file. |
| * @param idx Absolute WAL segment file index. For null value index is restored from file name |
| */ |
| public FileDescriptor(@NotNull File file, @Nullable Long idx) { |
| this.file = file; |
| |
| String fileName = file.getName(); |
| |
| assert fileName.contains(WAL_SEGMENT_FILE_EXT); |
| |
| this.idx = idx == null ? Long.parseLong(fileName.substring(0, 16)) : idx; |
| } |
| |
| /** |
| * @param segment Segment index. |
| * @return Segment file name. |
| */ |
| public static String fileName(long segment) { |
| SB b = new SB(); |
| |
| String segmentStr = Long.toString(segment); |
| |
| for (int i = segmentStr.length(); i < 16; i++) |
| b.a('0'); |
| |
| b.a(segmentStr).a(WAL_SEGMENT_FILE_EXT); |
| |
| return b.toString(); |
| } |
| |
| /** |
| * @param segment Segment number as integer. |
| * @return Segment number as aligned string. |
| */ |
| private static String segmentNumber(long segment) { |
| SB b = new SB(); |
| |
| String segmentStr = Long.toString(segment); |
| |
| for (int i = segmentStr.length(); i < 16; i++) |
| b.a('0'); |
| |
| b.a(segmentStr); |
| |
| return b.toString(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int compareTo(FileDescriptor o) { |
| return Long.compare(idx, o.idx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (!(o instanceof FileDescriptor)) |
| return false; |
| |
| FileDescriptor that = (FileDescriptor)o; |
| |
| return idx == that.idx; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return (int)(idx ^ (idx >>> 32)); |
| } |
| |
| /** |
| * @return Absolute WAL segment file index |
| */ |
| public long getIdx() { |
| return idx; |
| } |
| |
| /** |
| * @return absolute pathname string of this file descriptor pathname. |
| */ |
| public String getAbsolutePath() { |
| return file.getAbsolutePath(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private abstract static class FileHandle { |
| /** I/O interface for read/write operations with file */ |
| protected FileIO fileIO; |
| |
| /** Absolute WAL segment file index (incremental counter) */ |
| protected final long idx; |
| |
| /** */ |
| protected String gridName; |
| |
| /** |
| * @param fileIO I/O interface for read/write operations of FileHandle. |
| * @param idx Absolute WAL segment file index (incremental counter). |
| */ |
| private FileHandle(FileIO fileIO, long idx, String gridName) { |
| this.fileIO = fileIO; |
| this.idx = idx; |
| this.gridName = gridName; |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class ReadFileHandle extends FileHandle { |
| /** Entry serializer. */ |
| RecordSerializer ser; |
| |
| /** */ |
| FileInput in; |
| |
| /** |
| * <code>true</code> if this file handle came from work directory. |
| * <code>false</code> if this file handle came from archive directory. |
| */ |
| private boolean workDir; |
| |
| /** |
| * @param fileIO I/O interface for read/write operations of FileHandle. |
| * @param idx Absolute WAL segment file index (incremental counter). |
| * @param ser Entry serializer. |
| * @param in File input. |
| */ |
| ReadFileHandle( |
| FileIO fileIO, |
| long idx, |
| String gridName, |
| RecordSerializer ser, |
| FileInput in |
| ) { |
| super(fileIO, idx, gridName); |
| |
| this.ser = ser; |
| this.in = in; |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to close the WAL segment file. |
| */ |
| public void close() throws IgniteCheckedException { |
| try { |
| fileIO.close(); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| } |
| } |
| |
| /** |
| * File handle for one log segment. |
| */ |
| @SuppressWarnings("SignalWithoutCorrespondingAwait") |
| private class FileWriteHandle extends FileHandle { |
| /** */ |
| private final RecordSerializer serializer; |
| |
| /** See {@link FileWriteAheadLogManager#maxWalSegmentSize} */ |
| private final long maxSegmentSize; |
| |
| /** |
| * Accumulated WAL records chain. |
| * This reference points to latest WAL record. |
| * When writing records chain is iterated from latest to oldest (see {@link WALRecord#previous()}) |
| * Records from chain are saved into buffer in reverse order |
| */ |
| private final AtomicReference<WALRecord> head = new AtomicReference<>(); |
| |
| /** |
| * Position in current file after the end of last written record (incremented after file channel write |
| * operation) |
| */ |
| private volatile long written; |
| |
| /** */ |
| private volatile long lastFsyncPos; |
| |
| /** Stop guard to provide warranty that only one thread will be successful in calling {@link #close(boolean)}*/ |
| private final AtomicBoolean stop = new AtomicBoolean(false); |
| |
| /** */ |
| private final Lock lock = new ReentrantLock(); |
| |
| /** Condition activated each time writeBuffer() completes. Used to wait previously flushed write to complete */ |
| private final Condition writeComplete = lock.newCondition(); |
| |
| /** Condition for timed wait of several threads, see {@link DataStorageConfiguration#getWalFsyncDelayNanos()} */ |
| private final Condition fsync = lock.newCondition(); |
| |
| /** |
| * Next segment available condition. |
| * Protection from "spurious wakeup" is provided by predicate {@link #fileIO}=<code>null</code> |
| */ |
| private final Condition nextSegment = lock.newCondition(); |
| |
| /** |
| * @param fileIO I/O file interface to use |
| * @param idx Absolute WAL segment file index for easy access. |
| * @param pos Position. |
| * @param maxSegmentSize Max segment size. |
| * @param serializer Serializer. |
| * @throws IOException If failed. |
| */ |
| private FileWriteHandle( |
| FileIO fileIO, |
| long idx, |
| String gridName, |
| long pos, |
| long maxSegmentSize, |
| RecordSerializer serializer |
| ) throws IOException { |
| super(fileIO, idx, gridName); |
| |
| assert serializer != null; |
| |
| fileIO.position(pos); |
| |
| this.maxSegmentSize = maxSegmentSize; |
| this.serializer = serializer; |
| |
| head.set(new FakeRecord(new FileWALPointer(idx, (int)pos, 0), false)); |
| written = pos; |
| lastFsyncPos = pos; |
| } |
| |
| /** |
| * Write serializer version to current handle. |
| * NOTE: Method mutates {@code fileIO} position, written and lastFsyncPos fields. |
| * |
| * @throws IgniteCheckedException If fail to write serializer version. |
| */ |
| public void writeSerializerVersion() throws IgniteCheckedException { |
| try { |
| assert fileIO.position() == 0 : "Serializer version can be written only at the begin of file " + fileIO.position(); |
| |
| long updatedPosition = FileWriteAheadLogManager.writeSerializerVersion(fileIO, idx, serializer.version()); |
| |
| written = updatedPosition; |
| lastFsyncPos = updatedPosition; |
| head.set(new FakeRecord(new FileWALPointer(idx, (int)updatedPosition, 0), false)); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException("Unable to write serializer version for segment " + idx, e); |
| } |
| } |
| |
| /** |
| * Checks if current head is a close fake record and returns {@code true} if so. |
| * |
| * @return {@code true} if current head is close record. |
| */ |
| private boolean stopped() { |
| return stopped(head.get()); |
| } |
| |
| /** |
| * @param record Record to check. |
| * @return {@code true} if the record is fake close record. |
| */ |
| private boolean stopped(WALRecord record) { |
| return record instanceof FakeRecord && ((FakeRecord)record).stop; |
| } |
| |
| /** |
| * @param rec Record to be added to record chain as new {@link #head} |
| * @return Pointer or null if roll over to next segment is required or already started by other thread. |
| * @throws StorageException If failed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable private WALPointer addRecord(WALRecord rec) throws StorageException, IgniteCheckedException { |
| assert rec.size() > 0 || rec.getClass() == FakeRecord.class; |
| |
| boolean flushed = false; |
| |
| for (; ; ) { |
| WALRecord h = head.get(); |
| |
| long nextPos = nextPosition(h); |
| |
| if (nextPos + rec.size() >= maxSegmentSize || stopped(h)) { |
| // Can not write to this segment, need to switch to the next one. |
| return null; |
| } |
| |
| int newChainSize = h.chainSize() + rec.size(); |
| |
| if (newChainSize > tlbSize && !flushed) { |
| boolean res = h.previous() == null || flush(h, false); |
| |
| if (rec.size() > tlbSize) |
| flushed = res; |
| |
| continue; |
| } |
| |
| rec.chainSize(newChainSize); |
| rec.previous(h); |
| |
| FileWALPointer ptr = new FileWALPointer( |
| idx, |
| (int)nextPos, |
| rec.size(), |
| // We need to force checkpoint records into file in BACKGROUND WALMode. |
| mode == WALMode.BACKGROUND && rec instanceof CheckpointRecord); |
| |
| rec.position(ptr); |
| |
| if (head.compareAndSet(h, rec)) |
| return ptr; |
| } |
| } |
| |
| /** |
| * @param rec Record. |
| * @return Position for the next record. |
| */ |
| private long nextPosition(WALRecord rec) { |
| return recordOffset(rec) + rec.size(); |
| } |
| |
| /** |
| * Flush or wait for concurrent flush completion. |
| * |
| * @param ptr Pointer. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void flushOrWait(FileWALPointer ptr, boolean stop) throws IgniteCheckedException { |
| long expWritten; |
| |
| if (ptr != null) { |
| // If requested obsolete file index, it must be already flushed by close. |
| if (ptr.index() != idx) |
| return; |
| |
| expWritten = ptr.fileOffset(); |
| } |
| else // We read head position before the flush because otherwise we can get wrong position. |
| expWritten = recordOffset(head.get()); |
| |
| if (flush(ptr, stop)) |
| return; |
| else if (stop) { |
| FakeRecord fr = (FakeRecord)head.get(); |
| |
| assert fr.stop : "Invalid fake record on top of the queue: " + fr; |
| |
| expWritten = recordOffset(fr); |
| } |
| |
| // Spin-wait for a while before acquiring the lock. |
| for (int i = 0; i < 64; i++) { |
| if (written >= expWritten) |
| return; |
| } |
| |
| // If we did not flush ourselves then await for concurrent flush to complete. |
| lock.lock(); |
| |
| try { |
| while (written < expWritten && envFailed == null) |
| U.awaitQuiet(writeComplete); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param ptr Pointer. |
| * @return {@code true} If the flush really happened. |
| * @throws IgniteCheckedException If failed. |
| * @throws StorageException If failed. |
| */ |
| private boolean flush(FileWALPointer ptr, boolean stop) throws IgniteCheckedException, StorageException { |
| if (ptr == null) { // Unconditional flush. |
| for (; ; ) { |
| WALRecord expHead = head.get(); |
| |
| if (expHead.previous() == null) { |
| FakeRecord frHead = (FakeRecord)expHead; |
| |
| if (frHead.stop == stop || frHead.stop || |
| head.compareAndSet(expHead, new FakeRecord(frHead.position(), stop))) |
| return false; |
| } |
| |
| if (flush(expHead, stop)) |
| return true; |
| } |
| } |
| |
| assert ptr.index() == idx; |
| |
| for (; ; ) { |
| WALRecord h = head.get(); |
| |
| // If current chain begin position is greater than requested, then someone else flushed our changes. |
| if (chainBeginPosition(h) > ptr.fileOffset()) |
| return false; |
| |
| if (flush(h, stop)) |
| return true; // We are lucky. |
| } |
| } |
| |
| /** |
| * @param h Head of the chain. |
| * @return Chain begin position. |
| */ |
| private long chainBeginPosition(WALRecord h) { |
| return recordOffset(h) + h.size() - h.chainSize(); |
| } |
| |
| /** |
| * @param expHead Expected head of chain. If head was changed, flush is not performed in this thread |
| * @throws IgniteCheckedException If failed. |
| * @throws StorageException If failed. |
| */ |
| private boolean flush(WALRecord expHead, boolean stop) throws StorageException, IgniteCheckedException { |
| if (expHead.previous() == null) { |
| FakeRecord frHead = (FakeRecord)expHead; |
| |
| if (!stop || frHead.stop) // Protects from CASing terminal FakeRecord(true) to FakeRecord(false) |
| return false; |
| } |
| |
| // Fail-fast before CAS. |
| checkEnvironment(); |
| |
| if (!head.compareAndSet(expHead, new FakeRecord(new FileWALPointer(idx, (int)nextPosition(expHead), 0), stop))) |
| return false; |
| |
| if (expHead.chainSize() == 0) |
| return false; |
| |
| // At this point we grabbed the piece of WAL chain. |
| // Any failure in this code must invalidate the environment. |
| try { |
| // We can safely allow other threads to start building next chains while we are doing flush here. |
| ByteBuffer buf; |
| |
| boolean tmpBuf = false; |
| |
| if (expHead.chainSize() > tlbSize) { |
| buf = GridUnsafe.allocateBuffer(expHead.chainSize()); |
| |
| tmpBuf = true; // We need to manually release this temporary direct buffer. |
| } |
| else |
| buf = tlb.get(); |
| |
| try { |
| long pos = fillBuffer(buf, expHead); |
| |
| writeBuffer(pos, buf); |
| } |
| finally { |
| if (tmpBuf) |
| GridUnsafe.freeBuffer(buf); |
| } |
| |
| return true; |
| } |
| catch (Throwable e) { |
| invalidateEnvironment(e); |
| |
| // All workers waiting for a next segment must be woken up and stopped |
| signalNextAvailable(); |
| |
| throw e; |
| } |
| } |
| |
| /** |
| * Serializes WAL records chain to provided byte buffer |
| * |
| * @param buf Buffer, will be filled with records chain from end to beginning |
| * @param head Head of the chain to write to the buffer. |
| * @return Position in file for this buffer. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private long fillBuffer(ByteBuffer buf, WALRecord head) throws IgniteCheckedException { |
| final int limit = head.chainSize(); |
| |
| assert limit <= buf.capacity(); |
| |
| buf.rewind(); |
| buf.limit(limit); |
| |
| do { |
| buf.position(head.chainSize() - head.size()); |
| buf.limit(head.chainSize()); // Just to make sure that serializer works in bounds. |
| |
| try { |
| serializer.writeRecord(head, buf); |
| } |
| catch (RuntimeException e) { |
| throw new IllegalStateException("Failed to write record: " + head, e); |
| } |
| |
| assert !buf.hasRemaining() : "Reported record size is greater than actual: " + head; |
| |
| head = head.previous(); |
| } |
| while (head.previous() != null); |
| |
| assert head instanceof FakeRecord : head.getClass(); |
| |
| buf.rewind(); |
| buf.limit(limit); |
| |
| return recordOffset(head); |
| } |
| |
| /** |
| * Non-blocking check if this pointer needs to be sync'ed. |
| * |
| * @param ptr WAL pointer to check. |
| * @return {@code False} if this pointer has been already sync'ed. |
| */ |
| private boolean needFsync(FileWALPointer ptr) { |
| // If index has changed, it means that the log was rolled over and already sync'ed. |
| // If requested position is smaller than last sync'ed, it also means all is good. |
| // If position is equal, then our record is the last not synced. |
| return idx == ptr.index() && lastFsyncPos <= ptr.fileOffset(); |
| } |
| |
| /** |
| * @return Pointer to the end of the last written record (probably not fsync-ed). |
| */ |
| private FileWALPointer position() { |
| lock.lock(); |
| |
| try { |
| return new FileWALPointer(idx, (int)written, 0); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param ptr Pointer to sync. |
| * @throws StorageException If failed. |
| */ |
| private void fsync(FileWALPointer ptr, boolean stop) throws StorageException, IgniteCheckedException { |
| lock.lock(); |
| |
| try { |
| if (ptr != null) { |
| if (!needFsync(ptr)) |
| return; |
| |
| if (fsyncDelay > 0 && !stopped()) { |
| // Delay fsync to collect as many updates as possible: trade latency for throughput. |
| U.await(fsync, fsyncDelay, TimeUnit.NANOSECONDS); |
| |
| if (!needFsync(ptr)) |
| return; |
| } |
| } |
| |
| flushOrWait(ptr, stop); |
| |
| if (stopped()) |
| return; |
| |
| if (lastFsyncPos != written) { |
| assert lastFsyncPos < written; // Fsync position must be behind. |
| |
| boolean metricsEnabled = metrics.metricsEnabled(); |
| |
| long start = metricsEnabled ? System.nanoTime() : 0; |
| |
| try { |
| fileIO.force(); |
| } |
| catch (IOException e) { |
| throw new StorageException(e); |
| } |
| |
| lastFsyncPos = written; |
| |
| if (fsyncDelay > 0) |
| fsync.signalAll(); |
| |
| long end = metricsEnabled ? System.nanoTime() : 0; |
| |
| if (metricsEnabled) |
| metrics.onFsync(end - start); |
| } |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @return {@code true} If this thread actually closed the segment. |
| * @throws IgniteCheckedException If failed. |
| * @throws StorageException If failed. |
| */ |
| private boolean close(boolean rollOver) throws IgniteCheckedException, StorageException { |
| if (stop.compareAndSet(false, true)) { |
| lock.lock(); |
| |
| try { |
| flushOrWait(null, true); |
| |
| assert stopped() : "Segment is not closed after close flush: " + head.get(); |
| |
| try { |
| RecordSerializer backwardSerializer = new RecordSerializerFactoryImpl(cctx) |
| .createSerializer(serializerVersion); |
| |
| SwitchSegmentRecord segmentRecord = new SwitchSegmentRecord(); |
| |
| int switchSegmentRecSize = backwardSerializer.size(segmentRecord); |
| |
| if (rollOver && written < (maxSegmentSize - switchSegmentRecSize)) { |
| final ByteBuffer buf = ByteBuffer.allocate(switchSegmentRecSize); |
| |
| segmentRecord.position(new FileWALPointer(idx, (int)written, switchSegmentRecSize)); |
| backwardSerializer.writeRecord(segmentRecord, buf); |
| |
| buf.rewind(); |
| |
| int rem = buf.remaining(); |
| |
| while (rem > 0) { |
| int written0 = fileIO.write(buf, written); |
| |
| written += written0; |
| |
| rem -= written0; |
| } |
| } |
| |
| // Do the final fsync. |
| if (mode == WALMode.DEFAULT) { |
| fileIO.force(); |
| |
| lastFsyncPos = written; |
| } |
| |
| fileIO.close(); |
| } |
| catch (IOException e) { |
| throw new IgniteCheckedException(e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Closed WAL write handle [idx=" + idx + "]"); |
| |
| return true; |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| else |
| return false; |
| } |
| |
| /** |
| * Signals next segment available to wake up other worker threads waiting for WAL to write |
| */ |
| private void signalNextAvailable() { |
| lock.lock(); |
| |
| try { |
| WALRecord rec = head.get(); |
| |
| if (envFailed == null) { |
| assert rec instanceof FakeRecord : "Expected head FakeRecord, actual head " |
| + (rec != null ? rec.getClass().getSimpleName() : "null"); |
| |
| assert written == lastFsyncPos || mode != WALMode.DEFAULT : |
| "fsync [written=" + written + ", lastFsync=" + lastFsyncPos + ']'; |
| } |
| |
| fileIO = null; |
| |
| nextSegment.signalAll(); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void awaitNext() throws IgniteCheckedException { |
| lock.lock(); |
| |
| try { |
| while (fileIO != null) |
| U.awaitQuiet(nextSegment); |
| } |
| finally { |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param pos Position in file to start write from. May be checked against actual position to wait previous |
| * writes to complete |
| * @param buf Buffer to write to file |
| * @throws StorageException If failed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| private void writeBuffer(long pos, ByteBuffer buf) throws StorageException, IgniteCheckedException { |
| boolean interrupted = false; |
| |
| lock.lock(); |
| |
| try { |
| assert fileIO != null : "Writing to a closed segment."; |
| |
| checkEnvironment(); |
| |
| long lastLogged = U.currentTimeMillis(); |
| |
| long logBackoff = 2_000; |
| |
| // If we were too fast, need to wait previous writes to complete. |
| while (written != pos) { |
| assert written < pos : "written = " + written + ", pos = " + pos; // No one can write further than we are now. |
| |
| // Permutation occurred between blocks write operations. |
| // Order of acquiring lock is not the same as order of write. |
| long now = U.currentTimeMillis(); |
| |
| if (now - lastLogged >= logBackoff) { |
| if (logBackoff < 60 * 60_000) |
| logBackoff *= 2; |
| |
| U.warn(log, "Still waiting for a concurrent write to complete [written=" + written + |
| ", pos=" + pos + ", lastFsyncPos=" + lastFsyncPos + ", stop=" + stop.get() + |
| ", actualPos=" + safePosition() + ']'); |
| |
| lastLogged = now; |
| } |
| |
| try { |
| writeComplete.await(2, TimeUnit.SECONDS); |
| } |
| catch (InterruptedException ignore) { |
| interrupted = true; |
| } |
| |
| checkEnvironment(); |
| } |
| |
| // Do the write. |
| int size = buf.remaining(); |
| |
| assert size > 0 : size; |
| |
| try { |
| assert written == fileIO.position(); |
| |
| do { |
| fileIO.write(buf); |
| } |
| while (buf.hasRemaining()); |
| |
| written += size; |
| |
| metrics.onWalBytesWritten(size); |
| |
| assert written == fileIO.position(); |
| } |
| catch (IOException e) { |
| invalidateEnvironmentLocked(e); |
| |
| throw new StorageException(e); |
| } |
| } |
| finally { |
| writeComplete.signalAll(); |
| |
| lock.unlock(); |
| |
| if (interrupted) |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** |
| * @param e Exception to set as a cause for all further operations. |
| */ |
| private void invalidateEnvironment(Throwable e) { |
| lock.lock(); |
| |
| try { |
| invalidateEnvironmentLocked(e); |
| } |
| finally { |
| writeComplete.signalAll(); |
| |
| lock.unlock(); |
| } |
| } |
| |
| /** |
| * @param e Exception to set as a cause for all further operations. |
| */ |
| private void invalidateEnvironmentLocked(Throwable e) { |
| if (envFailed == null) { |
| envFailed = e; |
| |
| U.error(log, "IO error encountered while running WAL flush. All further operations will be failed and " + |
| "local node will be stopped.", e); |
| |
| new Thread() { |
| @Override public void run() { |
| IgnitionEx.stop(gridName, true, true); |
| } |
| }.start(); |
| } |
| } |
| |
| /** |
| * @return Safely reads current position of the file channel as String. Will return "null" if channel is null. |
| */ |
| private String safePosition() { |
| FileIO io = this.fileIO; |
| |
| if (io == null) |
| return "null"; |
| |
| try { |
| return String.valueOf(io.position()); |
| } |
| catch (IOException e) { |
| return "{Failed to read channel position: " + e.getMessage() + "}"; |
| } |
| } |
| } |
| |
| /** |
| * Gets WAL record offset relative to the WAL segment file beginning. |
| * |
| * @param rec WAL record. |
| * @return File offset. |
| */ |
| private static int recordOffset(WALRecord rec) { |
| FileWALPointer ptr = (FileWALPointer)rec.position(); |
| |
| assert ptr != null; |
| |
| return ptr.fileOffset(); |
| } |
| |
| /** |
| * Fake record is zero-sized record, which is not stored into file. |
| * Fake record is used for storing position in file {@link WALRecord#position()}. |
| * Fake record is allowed to have no previous record. |
| */ |
| private static final class FakeRecord extends WALRecord { |
| /** */ |
| private final boolean stop; |
| |
| /** |
| * @param pos Position. |
| */ |
| FakeRecord(FileWALPointer pos, boolean stop) { |
| position(pos); |
| |
| this.stop = stop; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RecordType type() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public FileWALPointer position() { |
| return (FileWALPointer) super.position(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(FakeRecord.class, this, "super", super.toString()); |
| } |
| } |
| |
| /** |
| * Iterator over WAL-log. |
| */ |
| private static class RecordsIterator extends AbstractWalRecordsIterator { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| /** */ |
| private final File walWorkDir; |
| |
| /** */ |
| private final File walArchiveDir; |
| |
| /** */ |
| private final FileArchiver archiver; |
| |
| /** */ |
| private final FileDecompressor decompressor; |
| |
| /** */ |
| private final DataStorageConfiguration psCfg; |
| |
| /** Optional start pointer. */ |
| @Nullable |
| private FileWALPointer start; |
| |
| /** Optional end pointer. */ |
| @Nullable |
| private FileWALPointer end; |
| |
| /** |
| * @param cctx Shared context. |
| * @param walWorkDir WAL work dir. |
| * @param walArchiveDir WAL archive dir. |
| * @param start Optional start pointer. |
| * @param end Optional end pointer. |
| * @param psCfg Database configuration. |
| * @param serializerFactory Serializer factory. |
| * @param archiver Archiver. |
| * @param decompressor Decompressor. |
| *@param log Logger @throws IgniteCheckedException If failed to initialize WAL segment. |
| */ |
| private RecordsIterator( |
| GridCacheSharedContext cctx, |
| File walWorkDir, |
| File walArchiveDir, |
| @Nullable FileWALPointer start, |
| @Nullable FileWALPointer end, |
| DataStorageConfiguration psCfg, |
| @NotNull RecordSerializerFactory serializerFactory, |
| FileIOFactory ioFactory, |
| FileArchiver archiver, |
| FileDecompressor decompressor, |
| IgniteLogger log |
| ) throws IgniteCheckedException { |
| super(log, |
| cctx, |
| serializerFactory, |
| ioFactory, |
| psCfg.getWalRecordIteratorBufferSize()); |
| this.walWorkDir = walWorkDir; |
| this.walArchiveDir = walArchiveDir; |
| this.psCfg = psCfg; |
| this.archiver = archiver; |
| this.start = start; |
| this.end = end; |
| this.decompressor = decompressor; |
| |
| init(); |
| |
| advance(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected ReadFileHandle initReadHandle( |
| @NotNull FileDescriptor desc, |
| @Nullable FileWALPointer start |
| ) throws IgniteCheckedException, FileNotFoundException { |
| if (decompressor != null && !desc.file.exists()) { |
| FileDescriptor zipFile = new FileDescriptor( |
| new File(walArchiveDir, FileDescriptor.fileName(desc.getIdx()) + ".zip")); |
| |
| if (!zipFile.file.exists()) { |
| throw new FileNotFoundException("Both compressed and raw segment files are missing in archive " + |
| "[segmentIdx=" + desc.idx + "]"); |
| } |
| |
| decompressor.decompressFile(desc.idx).get(); |
| } |
| |
| return super.initReadHandle(desc, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onClose() throws IgniteCheckedException { |
| super.onClose(); |
| |
| curRec = null; |
| |
| final ReadFileHandle handle = closeCurrentWalSegment(); |
| |
| if (handle != null && handle.workDir) |
| releaseWorkSegment(curWalSegmIdx); |
| |
| curWalSegmIdx = Integer.MAX_VALUE; |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed to initialize first file handle. |
| */ |
| private void init() throws IgniteCheckedException { |
| FileDescriptor[] descs = loadFileDescriptors(walArchiveDir); |
| |
| if (start != null) { |
| if (!F.isEmpty(descs)) { |
| if (descs[0].idx > start.index()) |
| throw new IgniteCheckedException("WAL history is too short " + |
| "[descs=" + Arrays.asList(descs) + ", start=" + start + ']'); |
| |
| for (FileDescriptor desc : descs) { |
| if (desc.idx == start.index()) { |
| curWalSegmIdx = start.index(); |
| |
| break; |
| } |
| } |
| |
| if (curWalSegmIdx == -1) { |
| long lastArchived = descs[descs.length - 1].idx; |
| |
| if (lastArchived > start.index()) |
| throw new IgniteCheckedException("WAL history is corrupted (segment is missing): " + start); |
| |
| // This pointer may be in work files because archiver did not |
| // copy the file yet, check that it is not too far forward. |
| curWalSegmIdx = start.index(); |
| } |
| } |
| else { |
| // This means that whole checkpoint history fits in one segment in WAL work directory. |
| // Will start from this index right away. |
| curWalSegmIdx = start.index(); |
| } |
| } |
| else |
| curWalSegmIdx = !F.isEmpty(descs) ? descs[0].idx : 0; |
| |
| curWalSegmIdx--; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Initialized WAL cursor [start=" + start + ", end=" + end + ", curWalSegmIdx=" + curWalSegmIdx + ']'); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected ReadFileHandle advanceSegment( |
| @Nullable final ReadFileHandle curWalSegment) throws IgniteCheckedException { |
| if (curWalSegment != null) { |
| curWalSegment.close(); |
| |
| if (curWalSegment.workDir) |
| releaseWorkSegment(curWalSegment.idx); |
| |
| } |
| |
| // We are past the end marker. |
| if (end != null && curWalSegmIdx + 1 > end.index()) |
| return null; //stop iteration |
| |
| curWalSegmIdx++; |
| |
| FileDescriptor fd; |
| |
| boolean readArchive = canReadArchiveOrReserveWork(curWalSegmIdx); |
| |
| if (readArchive) { |
| fd = new FileDescriptor(new File(walArchiveDir, |
| FileDescriptor.fileName(curWalSegmIdx))); |
| } |
| else { |
| long workIdx = curWalSegmIdx % psCfg.getWalSegments(); |
| |
| fd = new FileDescriptor( |
| new File(walWorkDir, FileDescriptor.fileName(workIdx)), |
| curWalSegmIdx); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Reading next file [absIdx=" + curWalSegmIdx + ", file=" + fd.file.getAbsolutePath() + ']'); |
| |
| assert fd != null; |
| |
| ReadFileHandle nextHandle; |
| try { |
| nextHandle = initReadHandle(fd, start != null && curWalSegmIdx == start.index() ? start : null); |
| } |
| catch (FileNotFoundException e) { |
| if (readArchive) |
| throw new IgniteCheckedException("Missing WAL segment in the archive", e); |
| else |
| nextHandle = null; |
| } |
| |
| if (nextHandle != null) |
| nextHandle.workDir = !readArchive; |
| else |
| releaseWorkSegment(curWalSegmIdx); |
| |
| curRec = null; |
| return nextHandle; |
| } |
| |
| /** |
| * @param absIdx Absolute index to check. |
| * @return {@code True} if we can safely read the archive, {@code false} if the segment has not been archived |
| * yet. In this case the corresponding work segment is reserved (will not be deleted until release). |
| */ |
| private boolean canReadArchiveOrReserveWork(long absIdx) { |
| return archiver != null && archiver.checkCanReadArchiveOrReserveWorkSegment(absIdx); |
| } |
| |
| /** |
| * @param absIdx Absolute index to release. |
| */ |
| private void releaseWorkSegment(long absIdx) { |
| if (archiver != null) |
| archiver.releaseWorkSegment(absIdx); |
| } |
| } |
| |
| /** |
| * Flushes current file handle for {@link WALMode#BACKGROUND} WALMode. |
| * Called periodically from scheduler. |
| */ |
| private void doFlush() { |
| final FileWriteHandle hnd = currentHandle(); |
| try { |
| hnd.flush(hnd.head.get(), false); |
| } |
| catch (Exception e) { |
| U.warn(log, "Failed to flush WAL record queue", e); |
| } |
| } |
| } |