blob: 8000f5fc6510f98f18f31ea5612bb0d62e9dfb6c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.ignite.DataRegionMetricsProvider;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteInterruptedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.LongJVMPauseDetector;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.DirectMemoryRegion;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MasterKeyChangeRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.WalRecordCacheGroupAware;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.compress.CompressionProcessor;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.processors.query.GridQueryProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridCountDownCallback;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.GridReadOnlyArrayView;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.TimeBag;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_RECOVERY_SEMAPHORE_PERMITS;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.cache.CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_CRITICAL_OPERATION_TIMEOUT;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
import static org.apache.ignite.internal.pagemem.PageIdUtils.partId;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.MASTER_KEY_CHANGE_RECORD;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.METASTORE_DATA_RECORD;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.fromOrdinal;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_TAKEN;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.MARKER_STORED_TO_DISK;
import static org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager.TMP_FILE_MATCHER;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
/**
*
*/
@SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"})
public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager implements CheckpointWriteProgressSupplier {
/** */
public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
/** */
public static final String IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP = "IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP";
/** Log read lock holders. */
public static final String IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS = "IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS";
/** MemoryPolicyConfiguration name reserved for meta store. */
public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc";
/**
* Threshold to calculate limit for pages list on-heap caches.
* <p>
* Note: When a checkpoint is triggered, we need some amount of page memory to store pages list on-heap cache.
* If a checkpoint is triggered by "too many dirty pages" reason and pages list cache is rather big, we can get
* {@code IgniteOutOfMemoryException}. To prevent this, we can limit the total amount of cached page list buckets,
* assuming that checkpoint will be triggered if no more then 3/4 of pages will be marked as dirty (there will be
* at least 1/4 of clean pages) and each cached page list bucket can be stored to up to 2 pages (this value is not
* static, but depends on PagesCache.MAX_SIZE, so if PagesCache.MAX_SIZE > PagesListNodeIO#getCapacity it can take
* more than 2 pages). Also some amount of page memory needed to store page list metadata.
*/
private static final double PAGE_LIST_CACHE_LIMIT_THRESHOLD = 0.1;
/** Skip sync. */
private final boolean skipSync = getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
/** */
private final int walRebalanceThreshold = getInteger(IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
/** Value of property for throttling policy override. */
private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
/** */
private final boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
/** */
private final boolean logReadLockHolders = getBoolean(IGNITE_PDS_LOG_CP_READ_LOCK_HOLDERS);
/**
* Starting from this number of dirty pages in checkpoint, array will be sorted with
* {@link Arrays#parallelSort(Comparable[])} in case of {@link CheckpointWriteOrder#SEQUENTIAL}.
*/
private final int parallelSortThreshold = IgniteSystemProperties.getInteger(
IgniteSystemProperties.CHECKPOINT_PARALLEL_SORT_THRESHOLD, 512 * 1024);
/** Checkpoint lock hold count. */
private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = ThreadLocal.withInitial(() -> 0);
/** Assertion enabled. */
private static final boolean ASSERTION_ENABLED = GridCacheDatabaseSharedManager.class.desiredAssertionStatus();
/** Checkpoint file name pattern. */
public static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
/** */
private static final String MBEAN_NAME = "DataStorageMetrics";
/** */
private static final String MBEAN_GROUP = "Persistent Store";
/** WAL marker prefix for meta store. */
private static final String WAL_KEY_PREFIX = "grp-wal-";
/** Prefix for meta store records which means that WAL was disabled globally for some group. */
private static final String WAL_GLOBAL_KEY_PREFIX = WAL_KEY_PREFIX + "disabled-";
/** Prefix for meta store records which means that WAL was disabled locally for some group. */
private static final String WAL_LOCAL_KEY_PREFIX = WAL_KEY_PREFIX + "local-disabled-";
/** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
/** Timeout between partition file destroy and checkpoint to handle it. */
private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
/** */
private static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
/** This number of threads will be created and used for parallel sorting. */
private static final int PARALLEL_SORT_THREADS = Math.min(Runtime.getRuntime().availableProcessors(), 8);
/** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
private volatile Checkpointer checkpointer;
/** Checkpointer thread instance. */
private volatile IgniteThread checkpointerThread;
/** For testing only. */
private volatile boolean checkpointsEnabled = true;
/** For testing only. */
private volatile GridFutureAdapter<Void> enableChangeApplied;
/** Checkpont lock. */
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
/** */
private long checkpointFreq;
/** */
private CheckpointHistory cpHistory;
/** */
private FilePageStoreManager storeMgr;
/** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end */
private File cpDir;
/** */
private volatile boolean printCheckpointStats = true;
/** Database configuration. */
private final DataStorageConfiguration persistenceCfg;
/** */
private final Collection<DbCheckpointListener> lsnrs = new CopyOnWriteArrayList<>();
/** */
private boolean stopping;
/**
* The position of last seen WAL pointer. Used for resumming logging from this pointer.
*
* If binary memory recovery pefrormed on node start, the checkpoint END pointer will store
* not the last WAL pointer and can't be used for resumming logging.
*/
private volatile WALPointer walTail;
/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
@Nullable private IgniteThreadPoolExecutor asyncRunner;
/** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
private ThreadLocal<ByteBuffer> threadBuf;
/** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
/**
* Lock holder for compatible folders mode. Null if lock holder was created at start node. <br>
* In this case lock is held on PDS resover manager and it is not required to manage locking here
*/
@Nullable private FileLockHolder fileLockHolder;
/** Lock wait time. */
private final long lockWaitTime;
/** */
private final boolean truncateWalOnCpFinish;
/** */
private Map</*grpId*/Integer, Map</*partId*/Integer, T2</*updCntr*/Long, WALPointer>>> reservedForExchange;
/** */
private final ConcurrentMap<T2</*grpId*/Integer, /*partId*/Integer>, T2</*updCntr*/Long, WALPointer>> reservedForPreloading = new ConcurrentHashMap<>();
/** Snapshot manager. */
private IgniteCacheSnapshotManager snapshotMgr;
/** */
private DataStorageMetricsImpl persStoreMetrics;
/** Counter for written checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger writtenPagesCntr = null;
/** Counter for fsynced checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger syncedPagesCntr = null;
/** Counter for evicted checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger evictedPagesCntr = null;
/** Number of pages in current checkpoint at the beginning of checkpoint. */
private volatile int currCheckpointPagesCnt;
/**
* MetaStorage instance. Value {@code null} means storage not initialized yet.
* Guarded by {@link GridCacheDatabaseSharedManager#checkpointReadLock()}
*/
private MetaStorage metaStorage;
/** */
private List<MetastorageLifecycleListener> metastorageLifecycleLsnrs;
/** Initially disabled cache groups. */
private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>();
/** Initially local wal disabled groups. */
private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>();
/** File I/O factory for writing checkpoint markers. */
private final FileIOFactory ioFactory;
/** Timeout for checkpoint read lock acquisition in milliseconds. */
private volatile long checkpointReadLockTimeout;
/** Flag allows to log additional information about partitions during recovery phases. */
private final boolean recoveryVerboseLogging =
getBoolean(IgniteSystemProperties.IGNITE_RECOVERY_VERBOSE_LOGGING, false);
/** Pointer to a memory recovery record that should be included into the next checkpoint record. */
private volatile WALPointer memoryRecoveryRecordPtr;
/** Page list cache limits per data region. */
private final Map<String, AtomicLong> pageListCacheLimits = new ConcurrentHashMap<>();
/**
* @param ctx Kernal context.
*/
public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
IgniteConfiguration cfg = ctx.config();
persistenceCfg = cfg.getDataStorageConfiguration();
assert persistenceCfg != null;
checkpointFreq = persistenceCfg.getCheckpointFrequency();
truncateWalOnCpFinish = persistenceCfg.isWalHistorySizeParameterUsed()
? persistenceCfg.getWalHistorySize() != Integer.MAX_VALUE
: persistenceCfg.getMaxWalArchiveSize() != Long.MAX_VALUE;
lockWaitTime = persistenceCfg.getLockWaitTime();
persStoreMetrics = new DataStorageMetricsImpl(
ctx.metric(),
persistenceCfg.isMetricsEnabled(),
persistenceCfg.getMetricsRateTimeInterval(),
persistenceCfg.getMetricsSubIntervalCount()
);
ioFactory = persistenceCfg.getFileIOFactory();
Long cfgCheckpointReadLockTimeout = ctx.config().getDataStorageConfiguration() != null
? ctx.config().getDataStorageConfiguration().getCheckpointReadLockTimeout()
: null;
checkpointReadLockTimeout = IgniteSystemProperties.getLong(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT,
cfgCheckpointReadLockTimeout != null
? cfgCheckpointReadLockTimeout
: (ctx.workersRegistry() != null
? ctx.workersRegistry().getSystemWorkerBlockedTimeout()
: ctx.config().getFailureDetectionTimeout()));
}
/**
* @return File store manager.
*/
public FilePageStoreManager getFileStoreManager() {
return storeMgr;
}
/** */
private void notifyMetastorageReadyForRead() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
lsnr.onReadyForRead(metaStorage);
}
/** */
private void notifyMetastorageReadyForReadWrite() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
lsnr.onReadyForReadWrite(metaStorage);
}
/**
*
*/
public Checkpointer getCheckpointer() {
return checkpointer;
}
/**
* For test use only.
*
* @return Checkpointer thread instance.
*/
public IgniteThread checkpointerThread() {
return checkpointerThread;
}
/**
* For test use only.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
enableChangeApplied = fut;
checkpointsEnabled = enable;
wakeupForCheckpoint("enableCheckpoints()");
return fut;
}
/** {@inheritDoc} */
@Override protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
super.initDataRegions0(memCfg);
addDataRegion(
memCfg,
createMetastoreDataRegionConfig(memCfg),
false
);
persStoreMetrics.regionMetrics(memMetricsMap.values());
}
/**
* Create metastorage data region configuration with enabled persistence by default.
*
* @param storageCfg Data storage configuration.
* @return Data region configuration.
*/
private DataRegionConfiguration createMetastoreDataRegionConfig(DataStorageConfiguration storageCfg) {
DataRegionConfiguration cfg = new DataRegionConfiguration();
cfg.setName(METASTORE_DATA_REGION_NAME);
cfg.setInitialSize(storageCfg.getSystemRegionInitialSize());
cfg.setMaxSize(storageCfg.getSystemRegionMaxSize());
cfg.setPersistenceEnabled(true);
cfg.setLazyMemoryAllocation(false);
return cfg;
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
threadBuf = new ThreadLocal<ByteBuffer>() {
/** {@inheritDoc} */
@Override protected ByteBuffer initialValue() {
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
return tmpWriteBuf;
}
};
snapshotMgr = cctx.snapshot();
final GridKernalContext kernalCtx = cctx.kernalContext();
if (logReadLockHolders)
checkpointLock = new U.ReentrantReadWriteLockTracer(checkpointLock, kernalCtx, 5_000);
if (!kernalCtx.clientNode()) {
kernalCtx.internalSubscriptionProcessor().registerDatabaseListener(new MetastorageRecoveryLifecycle());
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
cpHistory = new CheckpointHistory(kernalCtx);
IgnitePageStoreManager store = cctx.pageStore();
assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
storeMgr = (FilePageStoreManager)store;
cpDir = Paths.get(storeMgr.workDir().getAbsolutePath(), "cp").toFile();
if (!U.mkdirs(cpDir))
throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
.resolveFolders()
.getLockedFileLockHolder();
acquireFileLock(preLocked);
cleanupTempCheckpointDirectory();
persStoreMetrics.wal(cctx.wal());
}
}
/**
* Cleanup checkpoint directory from all temporary files.
*/
@Override public void cleanupTempCheckpointDirectory() throws IgniteCheckedException {
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath(), TMP_FILE_MATCHER::matches)) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup checkpoint directory from temporary files: " + cpDir, e);
}
}
/** {@inheritDoc} */
@Override public void cleanupRestoredCaches() {
if (dataRegionMap.isEmpty())
return;
boolean hasMvccCache = false;
for (CacheGroupDescriptor grpDesc : cctx.cache().cacheGroupDescriptors().values()) {
hasMvccCache |= grpDesc.config().getAtomicityMode() == TRANSACTIONAL_SNAPSHOT;
String regionName = grpDesc.config().getDataRegionName();
DataRegion region = regionName != null ? dataRegionMap.get(regionName) : dfltDataRegion;
if (region == null)
continue;
if (log.isInfoEnabled())
log.info("Page memory " + region.config().getName() + " for " + grpDesc + " has invalidated.");
int partitions = grpDesc.config().getAffinity().partitions();
if (region.pageMemory() instanceof PageMemoryEx) {
PageMemoryEx memEx = (PageMemoryEx)region.pageMemory();
for (int partId = 0; partId < partitions; partId++)
memEx.invalidate(grpDesc.groupId(), partId);
memEx.invalidate(grpDesc.groupId(), PageIdAllocator.INDEX_PARTITION);
}
}
if (!hasMvccCache && dataRegionMap.containsKey(TxLog.TX_LOG_CACHE_NAME)) {
PageMemory memory = dataRegionMap.get(TxLog.TX_LOG_CACHE_NAME).pageMemory();
if (memory instanceof PageMemoryEx)
((PageMemoryEx)memory).invalidate(TxLog.TX_LOG_CACHE_ID, PageIdAllocator.INDEX_PARTITION);
}
final boolean hasMvccCache0 = hasMvccCache;
storeMgr.cleanupPageStoreIfMatch(
new Predicate<Integer>() {
@Override public boolean test(Integer grpId) {
return MetaStorage.METASTORAGE_CACHE_ID != grpId &&
(TxLog.TX_LOG_CACHE_ID != grpId || !hasMvccCache0);
}
},
true);
}
/** {@inheritDoc} */
@Override public void cleanupCheckpointDirectory() throws IgniteCheckedException {
if (cpHistory != null)
cpHistory = new CheckpointHistory(cctx.kernalContext());
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath())) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup checkpoint directory: " + cpDir, e);
}
}
/**
* @param preLocked Pre-locked file lock holder.
*/
private void acquireFileLock(FileLockHolder preLocked) throws IgniteCheckedException {
if (cctx.kernalContext().clientNode())
return;
fileLockHolder = preLocked == null ?
new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log) : preLocked;
if (!fileLockHolder.isLocked()) {
if (log.isDebugEnabled())
log.debug("Try to capture file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.tryLock(lockWaitTime);
}
}
/**
*
*/
private void releaseFileLock() {
if (cctx.kernalContext().clientNode() || fileLockHolder == null)
return;
if (log.isDebugEnabled())
log.debug("Release file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.close();
}
/**
* Retreives checkpoint history form specified {@code dir}.
*
* @return List of checkpoints.
*/
private List<CheckpointEntry> retreiveHistory() throws IgniteCheckedException {
if (!cpDir.exists())
return Collections.emptyList();
try (DirectoryStream<Path> cpFiles = Files.newDirectoryStream(
cpDir.toPath(),
path -> CP_FILE_NAME_PATTERN.matcher(path.toFile().getName()).matches())
) {
List<CheckpointEntry> checkpoints = new ArrayList<>();
ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
for (Path cpFile : cpFiles) {
CheckpointEntry cp = parseFromFile(buf, cpFile.toFile());
if (cp != null)
checkpoints.add(cp);
}
return checkpoints;
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to load checkpoint history.", e);
}
}
/**
* Parses checkpoint entry from given file.
*
* @param buf Temporary byte buffer.
* @param file Checkpoint file.
*/
@Nullable private CheckpointEntry parseFromFile(ByteBuffer buf, File file) throws IgniteCheckedException {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (!matcher.matches())
return null;
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type != CheckpointEntryType.START)
return null;
long cpTs = Long.parseLong(matcher.group(1));
UUID cpId = UUID.fromString(matcher.group(2));
WALPointer ptr = readPointer(file, buf);
return createCheckPointEntry(cpTs, ptr, cpId, null, CheckpointEntryType.START);
}
/**
* Removes checkpoint start/end files belongs to given {@code cpEntry}.
*
* @param cpEntry Checkpoint entry.
*
* @throws IgniteCheckedException If failed to delete.
*/
private void removeCheckpointFiles(CheckpointEntry cpEntry) throws IgniteCheckedException {
Path startFile = new File(cpDir.getAbsolutePath(), checkpointFileName(cpEntry, CheckpointEntryType.START)).toPath();
Path endFile = new File(cpDir.getAbsolutePath(), checkpointFileName(cpEntry, CheckpointEntryType.END)).toPath();
try {
if (Files.exists(startFile))
Files.delete(startFile);
if (Files.exists(endFile))
Files.delete(endFile);
}
catch (IOException e) {
throw new StorageException("Failed to delete stale checkpoint files: " + cpEntry, e);
}
}
/** */
private void readMetastore() throws IgniteCheckedException {
try {
CheckpointStatus status = readCheckpointStatus();
checkpointReadLock();
try {
dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().start();
performBinaryMemoryRestore(status, onlyMetastorageGroup(), physicalRecords(), false);
metaStorage = createMetastorage(true);
applyLogicalUpdates(status, onlyMetastorageGroup(), onlyMetastorageAndEncryptionRecords(), false);
fillWalDisabledGroups();
notifyMetastorageReadyForRead();
}
finally {
metaStorage = null;
dataRegion(METASTORE_DATA_REGION_NAME).pageMemory().stop(false);
cctx.pageStore().cleanupPageStoreIfMatch(new Predicate<Integer>() {
@Override public boolean test(Integer grpId) {
return MetaStorage.METASTORAGE_CACHE_ID == grpId;
}
}, false);
checkpointReadUnlock();
}
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Activate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
snapshotMgr = cctx.snapshot();
if (!cctx.kernalContext().clientNode() && checkpointer == null)
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
super.onActivate(ctx);
if (!cctx.kernalContext().clientNode()) {
initializeCheckpointPool();
finishRecovery();
}
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
onKernalStop0(false);
super.onDeActivate(kctx);
/* Must be here, because after deactivate we can invoke activate and file lock must be already configured */
stopping = false;
}
/**
*
*/
private void initializeCheckpointPool() {
if (persistenceCfg.getCheckpointThreads() > 1)
asyncRunner = new IgniteThreadPoolExecutor(
CHECKPOINT_RUNNER_THREAD_PREFIX,
cctx.igniteInstanceName(),
persistenceCfg.getCheckpointThreads(),
persistenceCfg.getCheckpointThreads(),
30_000,
new LinkedBlockingQueue<Runnable>()
);
}
/** {@inheritDoc} */
@Override protected void registerMetricsMBeans(IgniteConfiguration cfg) {
super.registerMetricsMBeans(cfg);
registerMetricsMBean(
cctx.kernalContext().config(),
MBEAN_GROUP,
MBEAN_NAME,
persStoreMetrics,
DataStorageMetricsMXBean.class
);
}
/** {@inheritDoc} */
@Deprecated
@Override protected IgniteOutClosure<Long> freeSpaceProvider(final DataRegionConfiguration dataRegCfg) {
if (!dataRegCfg.isPersistenceEnabled())
return super.freeSpaceProvider(dataRegCfg);
final String dataRegName = dataRegCfg.getName();
return new IgniteOutClosure<Long>() {
@Override public Long apply() {
long freeSpace = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
freeSpace += ((GridCacheOffheapManager)grpCtx.offheap()).freeSpace();
}
return freeSpace;
}
};
}
/** {@inheritDoc} */
@Override protected DataRegionMetricsProvider dataRegionMetricsProvider(final DataRegionConfiguration dataRegCfg) {
if (!dataRegCfg.isPersistenceEnabled())
return super.dataRegionMetricsProvider(dataRegCfg);
final String dataRegName = dataRegCfg.getName();
return new DataRegionMetricsProvider() {
@Override public long partiallyFilledPagesFreeSpace() {
long freeSpace = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
freeSpace += ((GridCacheOffheapManager)grpCtx.offheap()).freeSpace();
}
return freeSpace;
}
@Override public long emptyDataPages() {
long emptyDataPages = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
emptyDataPages += ((GridCacheOffheapManager)grpCtx.offheap()).emptyDataPages();
}
return emptyDataPages;
}
};
}
/**
* Restores last valid WAL pointer and resumes logging from that pointer.
* Re-creates metastorage if needed.
*
* @throws IgniteCheckedException If failed.
*/
private void finishRecovery() throws IgniteCheckedException {
assert !cctx.kernalContext().clientNode();
long time = System.currentTimeMillis();
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
try {
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeResumeWalLogging(this);
// Try to resume logging since last finished checkpoint if possible.
if (walTail == null) {
CheckpointStatus status = readCheckpointStatus();
walTail = CheckpointStatus.NULL_PTR.equals(status.endPtr) ? null : status.endPtr;
}
cctx.wal().resumeLogging(walTail);
walTail = null;
// Recreate metastorage to refresh page memory state after deactivation.
if (metaStorage == null)
metaStorage = createMetastorage(false);
notifyMetastorageReadyForReadWrite();
U.log(log, "Finish recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw e;
}
finally {
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
}
/**
* @param readOnly Metastorage read-only mode.
* @return Instance of Metastorage.
* @throws IgniteCheckedException If failed to create metastorage.
*/
private MetaStorage createMetastorage(boolean readOnly) throws IgniteCheckedException {
cctx.pageStore().initializeForMetastorage();
MetaStorage storage = new MetaStorage(
cctx,
dataRegion(METASTORE_DATA_REGION_NAME),
(DataRegionMetricsImpl) memMetricsMap.get(METASTORE_DATA_REGION_NAME),
readOnly
);
storage.init(this);
return storage;
}
/**
* @param cacheGroupsPredicate Cache groups to restore.
* @param recordTypePredicate Filter records by type.
* @return Last seen WAL pointer during binary memory recovery.
* @throws IgniteCheckedException If failed.
*/
private RestoreBinaryState restoreBinaryMemory(
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate
) throws IgniteCheckedException {
long time = System.currentTimeMillis();
try {
log.info("Starting binary memory restore for: " + cctx.cache().cacheGroupDescriptors().keySet());
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeBinaryMemoryRestore(this);
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
RestoreBinaryState binaryState = performBinaryMemoryRestore(
status,
cacheGroupsPredicate,
recordTypePredicate,
true
);
WALPointer restored = binaryState.lastReadRecordPointer();
if(restored.equals(CheckpointStatus.NULL_PTR))
restored = null; // This record is first
else
restored = restored.next();
if (restored == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
throw new StorageException("The memory cannot be restored. The critical part of WAL archive is missing " +
"[tailWalPtr=" + restored + ", endPtr=" + status.endPtr + ']');
}
else if (restored != null)
U.log(log, "Binary memory state restored at node startup [restoredPtr=" + restored + ']');
// Wal logging is now available.
cctx.wal().resumeLogging(restored);
// Log MemoryRecoveryRecord to make sure that old physical records are not replayed during
// next physical recovery.
memoryRecoveryRecordPtr = cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.afterBinaryMemoryRestore(this, binaryState);
if (log.isInfoEnabled())
log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
return binaryState;
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw e;
}
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
checkpointLock.writeLock().lock();
try {
stopping = true;
}
finally {
checkpointLock.writeLock().unlock();
}
shutdownCheckpointer(cancel);
lsnrs.clear();
super.onKernalStop0(cancel);
unregisterMetricsMBean(
cctx.gridConfig(),
MBEAN_GROUP,
MBEAN_NAME
);
metaStorage = null;
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
super.stop0(cancel);
releaseFileLock();
}
/** */
private long[] calculateFragmentSizes(int concLvl, long cacheSize, long chpBufSize) {
if (concLvl < 2)
concLvl = Runtime.getRuntime().availableProcessors();
long fragmentSize = cacheSize / concLvl;
if (fragmentSize < 1024 * 1024)
fragmentSize = 1024 * 1024;
long[] sizes = new long[concLvl + 1];
for (int i = 0; i < concLvl; i++)
sizes[i] = fragmentSize;
sizes[concLvl] = chpBufSize;
return sizes;
}
/** {@inheritDoc} */
@Override protected PageMemory createPageMemory(
DirectMemoryProvider memProvider,
DataStorageConfiguration memCfg,
DataRegionConfiguration plcCfg,
DataRegionMetricsImpl memMetrics,
final boolean trackable
) {
if (!plcCfg.isPersistenceEnabled())
return super.createPageMemory(memProvider, memCfg, plcCfg, memMetrics, trackable);
memMetrics.persistenceEnabled(true);
long cacheSize = plcCfg.getMaxSize();
// Checkpoint buffer size can not be greater than cache size, it does not make sense.
long chpBufSize = checkpointBufferSize(plcCfg);
if (chpBufSize > cacheSize) {
U.quietAndInfo(log,
"Configured checkpoint page buffer size is too big, setting to the max region size [size="
+ U.readableSize(cacheSize, false) + ", memPlc=" + plcCfg.getName() + ']');
chpBufSize = cacheSize;
}
GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
if (trackable)
changeTracker = new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(
Long page,
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
if (trackable)
snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
}
};
else
changeTracker = null;
PageMemoryImpl pageMem = new PageMemoryImpl(
wrapMetricsMemoryProvider(memProvider, memMetrics),
calculateFragmentSizes(
memCfg.getConcurrencyLevel(),
cacheSize,
chpBufSize
),
cctx,
memCfg.getPageSize(),
(fullId, pageBuf, tag) -> {
memMetrics.onPageWritten();
// We can write only page from disk into snapshot.
snapshotMgr.beforePageWrite(fullId);
// Write page to disk.
storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, tag);
AtomicInteger cntr = evictedPagesCntr;
if (cntr != null)
cntr.incrementAndGet();
},
changeTracker,
this,
memMetrics,
resolveThrottlingPolicy(),
this
);
memMetrics.pageMemory(pageMem);
return pageMem;
}
/**
* @param memoryProvider0 Memory provider.
* @param memMetrics Memory metrics.
* @return Wrapped memory provider.
*/
@Override protected DirectMemoryProvider wrapMetricsMemoryProvider(
final DirectMemoryProvider memoryProvider0,
final DataRegionMetricsImpl memMetrics
) {
return new DirectMemoryProvider() {
private AtomicInteger checkPointBufferIdxCnt = new AtomicInteger();
private final DirectMemoryProvider memProvider = memoryProvider0;
@Override public void initialize(long[] chunkSizes) {
memProvider.initialize(chunkSizes);
checkPointBufferIdxCnt.set(chunkSizes.length);
}
@Override public void shutdown(boolean deallocate) {
memProvider.shutdown(deallocate);
}
@Override public DirectMemoryRegion nextRegion() {
DirectMemoryRegion nextMemoryRegion = memProvider.nextRegion();
if (nextMemoryRegion == null)
return null;
int idx = checkPointBufferIdxCnt.decrementAndGet();
long chunkSize = nextMemoryRegion.size();
// Checkpoint chunk last in the long[] chunkSizes.
if (idx != 0)
memMetrics.updateOffHeapSize(chunkSize);
else
memMetrics.updateCheckpointBufferSize(chunkSize);
return nextMemoryRegion;
}
};
}
/**
* Resolves throttling policy according to the settings.
*/
@NotNull private PageMemoryImpl.ThrottlingPolicy resolveThrottlingPolicy() {
PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled()
? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
: PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
if (throttlingPolicyOverride != null) {
try {
plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase());
}
catch (IllegalArgumentException e) {
log.error("Incorrect value of IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property. " +
"The default throttling policy will be used [plc=" + throttlingPolicyOverride +
", defaultPlc=" + plc + ']');
}
}
return plc;
}
/** {@inheritDoc} */
@Override protected void checkRegionEvictionProperties(DataRegionConfiguration regCfg, DataStorageConfiguration dbCfg)
throws IgniteCheckedException {
if (!regCfg.isPersistenceEnabled())
super.checkRegionEvictionProperties(regCfg, dbCfg);
else if (regCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
U.warn(log, "Page eviction mode will have no effect because the oldest pages are evicted automatically " +
"if Ignite persistence is enabled: " + regCfg.getName());
}
}
/** {@inheritDoc} */
@Override protected void checkPageSize(DataStorageConfiguration memCfg) {
if (memCfg.getPageSize() == 0) {
try {
assert cctx.pageStore() instanceof FilePageStoreManager :
"Invalid page store manager was created: " + cctx.pageStore();
Path anyIdxPartFile = IgniteUtils.searchFileRecursively(
((FilePageStoreManager)cctx.pageStore()).workDir().toPath(), FilePageStoreManager.INDEX_FILE_NAME);
if (anyIdxPartFile != null) {
memCfg.setPageSize(resolvePageSizeFromPartitionFile(anyIdxPartFile));
return;
}
}
catch (IgniteCheckedException | IOException | IllegalArgumentException e) {
U.quietAndWarn(log, "Attempt to resolve pageSize from store files failed: " + e.getMessage());
U.quietAndWarn(log, "Default page size will be used: " + DataStorageConfiguration.DFLT_PAGE_SIZE + " bytes");
}
memCfg.setPageSize(DataStorageConfiguration.DFLT_PAGE_SIZE);
}
}
/**
* @param partFile Partition file.
*/
private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException {
try (FileIO fileIO = ioFactory.create(partFile.toFile())) {
int minimalHdr = FilePageStore.HEADER_SIZE;
if (fileIO.size() < minimalHdr)
throw new IgniteCheckedException("Partition file is too small: " + partFile);
ByteBuffer hdr = ByteBuffer.allocate(minimalHdr).order(ByteOrder.nativeOrder());
fileIO.readFully(hdr);
hdr.rewind();
hdr.getLong(); // Read signature.
hdr.getInt(); // Read version.
hdr.get(); // Read type.
int pageSize = hdr.getInt();
if (pageSize == 2048) {
U.quietAndWarn(log, "You are currently using persistent store with 2K pages (DataStorageConfiguration#" +
"pageSize). If you use SSD disk, consider migrating to 4K pages for better IO performance.");
}
return pageSize;
}
}
/**
* @param cancel Cancel flag.
*/
@SuppressWarnings("unused")
private void shutdownCheckpointer(boolean cancel) {
Checkpointer cp = checkpointer;
if (cp != null) {
if (cancel)
cp.shutdownNow();
else
cp.cancel();
try {
U.join(cp);
checkpointer = null;
}
catch (IgniteInterruptedCheckedException ignore) {
U.warn(log, "Was interrupted while waiting for checkpointer shutdown, " +
"will not wait for checkpoint to finish.");
cp.shutdownNow();
while (true) {
try {
U.join(cp);
checkpointer = null;
cp.scheduledCp.fail(new NodeStoppingException("Checkpointer is stopped during node stop."));
break;
}
catch (IgniteInterruptedCheckedException ignored) {
//Ignore
}
}
Thread.currentThread().interrupt();
}
}
if (asyncRunner != null) {
asyncRunner.shutdownNow();
try {
asyncRunner.awaitTermination(2, TimeUnit.MINUTES);
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
// Try to restore partition states.
if (fut.localJoinExchange() || fut.activateCluster()
|| (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart()))) {
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
cctx.cache().cacheGroups(),
cacheGroup -> {
if (cacheGroup.isLocal())
return null;
cctx.database().checkpointReadLock();
try {
cacheGroup.offheap().restorePartitionStates(Collections.emptyMap());
if (cacheGroup.localStartVersion().equals(fut.initialVersion()))
cacheGroup.topology().afterStateRestored(fut.initialVersion());
fut.timeBag().finishLocalStage("Restore partition states " +
"[grp=" + cacheGroup.cacheOrGroupName() + "]");
}
finally {
cctx.database().checkpointReadUnlock();
}
return null;
}
);
fut.timeBag().finishGlobalStage("Restore partition states");
}
if (cctx.kernalContext().query().moduleEnabled()) {
ExchangeActions acts = fut.exchangeActions();
if (acts != null) {
if (!F.isEmpty(acts.cacheStartRequests())) {
for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests())
prepareIndexRebuildFuture(CU.cacheId(actionData.request().cacheName()));
}
else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
for (T2<DynamicCacheDescriptor, NearCacheConfiguration> tup : acts.localJoinContext().caches())
prepareIndexRebuildFuture(tup.get1().cacheId());
}
}
}
}
/**
* Creates a new index rebuild future that should be completed later after exchange is done. The future
* has to be created before exchange is initialized to guarantee that we will capture a correct future
* after activation or restore completes.
* If there was an old future for the given ID, it will be completed.
*
* @param cacheId Cache ID.
*/
private void prepareIndexRebuildFuture(int cacheId) {
GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
if (old != null)
old.onDone();
}
/** {@inheritDoc} */
@Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
GridQueryProcessor qryProc = cctx.kernalContext().query();
if (qryProc.moduleEnabled()) {
GridCountDownCallback rebuildIndexesCompleteCntr = new GridCountDownCallback(
cctx.cacheContexts().size(),
() -> log().info("Indexes rebuilding completed for all caches."),
1 //need at least 1 index rebuilded to print message about rebuilding completion
);
for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
if (cacheCtx.startTopologyVersion().equals(fut.initialVersion())) {
final int cacheId = cacheCtx.cacheId();
final GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
IgniteInternalFuture<?> rebuildFut = qryProc.rebuildIndexesFromHash(cacheCtx);
if (rebuildFut != null) {
log().info("Started indexes rebuilding for cache [name=" + cacheCtx.name()
+ ", grpName=" + cacheCtx.group().name() + ']');
assert usrFut != null : "Missing user future for cache: " + cacheCtx.name();
rebuildFut.listen(new CI1<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
idxRebuildFuts.remove(cacheId, usrFut);
Throwable err = fut.error();
usrFut.onDone(err);
CacheConfiguration ccfg = cacheCtx.config();
if (ccfg != null) {
if (err == null)
log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName()
+ ", grpName=" + ccfg.getGroupName() + ']');
else {
if (!(err instanceof NodeStoppingException))
log().error("Failed to rebuild indexes for cache [name=" + ccfg.getName()
+ ", grpName=" + ccfg.getGroupName() + ']', err);
}
}
rebuildIndexesCompleteCntr.countDown(true);
}
});
}
else {
if (usrFut != null) {
idxRebuildFuts.remove(cacheId, usrFut);
usrFut.onDone();
rebuildIndexesCompleteCntr.countDown(false);
}
}
}
}
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
return idxRebuildFuts.get(cacheId);
}
/** {@inheritDoc} */
@Override public void onCacheGroupsStopped(
Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps
) {
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext gctx = tup.get1();
if (!gctx.persistenceEnabled())
continue;
snapshotMgr.onCacheGroupStop(gctx, tup.get2());
PageMemoryEx pageMem = (PageMemoryEx)gctx.dataRegion().pageMemory();
Collection<Integer> grpIds = destroyed.computeIfAbsent(pageMem, k -> new HashSet<>());
grpIds.add(tup.get1().groupId());
pageMem.onCacheGroupDestroyed(tup.get1().groupId());
if (tup.get2())
cctx.kernalContext().encryption().onCacheGroupDestroyed(gctx.groupId());
}
Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
final Collection<Integer> grpIds = entry.getValue();
clearFuts.add(entry.getKey().clearAsync((grpId, pageIdg) -> grpIds.contains(grpId), false));
}
for (IgniteInternalFuture<Void> clearFut : clearFuts) {
try {
clearFut.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to clear page memory", e);
}
}
if (cctx.pageStore() != null) {
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext grp = tup.get1();
try {
cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
}
/**
* Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory
* state.
* @throws IgniteException If failed.
*/
@Override public void checkpointReadLock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
long timeout = checkpointReadLockTimeout;
long start = U.currentTimeMillis();
boolean interruped = false;
try {
for (; ; ) {
try {
if (timeout > 0 && (U.currentTimeMillis() - start) >= timeout)
failCheckpointReadLock();
try {
if (timeout > 0) {
if (!checkpointLock.readLock().tryLock(timeout - (U.currentTimeMillis() - start),
TimeUnit.MILLISECONDS))
failCheckpointReadLock();
}
else
checkpointLock.readLock().lock();
}
catch (InterruptedException e) {
interruped = true;
continue;
}
if (stopping) {
checkpointLock.readLock().unlock();
throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping."));
}
if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories() || checkpointerThread == null)
break;
else {
checkpointLock.readLock().unlock();
if (timeout > 0 && U.currentTimeMillis() - start >= timeout)
failCheckpointReadLock();
try {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages")
.futureFor(LOCK_RELEASED)
.getUninterruptibly();
}
catch (IgniteFutureTimeoutCheckedException e) {
failCheckpointReadLock();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e);
}
}
}
catch (CheckpointReadLockTimeoutException e) {
log.error(e.getMessage(), e);
timeout = 0;
}
}
}
finally {
if (interruped)
Thread.currentThread().interrupt();
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
}
/**
* Invokes critical failure processing. Always throws.
*
* @throws CheckpointReadLockTimeoutException If node was not invalidated as result of handling.
* @throws IgniteException If node was invalidated as result of handling.
*/
private void failCheckpointReadLock() throws CheckpointReadLockTimeoutException, IgniteException {
String msg = "Checkpoint read lock acquisition has been timed out.";
IgniteException e = new IgniteException(msg);
if (cctx.kernalContext().failure().process(new FailureContext(SYSTEM_CRITICAL_OPERATION_TIMEOUT, e)))
throw e;
throw new CheckpointReadLockTimeoutException(msg);
}
/** {@inheritDoc} */
@Override public boolean checkpointLockIsHeldByThread() {
return !ASSERTION_ENABLED ||
checkpointLock.isWriteLockedByCurrentThread() ||
CHECKPOINT_LOCK_HOLD_COUNT.get() > 0 ||
Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX);
}
/**
* @return {@code true} if all PageMemory instances are safe to update.
*/
private boolean safeToUpdatePageMemories() {
Collection<DataRegion> memPlcs = context().database().dataRegions();
if (memPlcs == null)
return true;
for (DataRegion memPlc : memPlcs) {
if (!memPlc.config().isPersistenceEnabled())
continue;
PageMemoryEx pageMemEx = (PageMemoryEx)memPlc.pageMemory();
if (!pageMemEx.safeToUpdate())
return false;
}
return true;
}
/**
* Releases the checkpoint read lock.
*/
@Override public void checkpointReadUnlock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
checkpointLock.readLock().unlock();
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
/** {@inheritDoc} */
@Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
assert reservedForExchange == null : reservedForExchange;
reservedForExchange = new HashMap<>();
Map</*grpId*/Integer, Set</*partId*/Integer>> applicableGroupsAndPartitions = partitionsApplicableForWalRebalance();
Map</*grpId*/Integer, Map</*partId*/Integer, CheckpointEntry>> earliestValidCheckpoints;
checkpointReadLock();
try {
earliestValidCheckpoints = cpHistory.searchAndReserveCheckpoints(applicableGroupsAndPartitions);
}
finally {
checkpointReadUnlock();
}
Map</*grpId*/Integer, Map</*partId*/Integer, /*updCntr*/Long>> grpPartsWithCnts = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, CheckpointEntry>> e : earliestValidCheckpoints.entrySet()) {
int grpId = e.getKey();
for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().entrySet()) {
CheckpointEntry cpEntry = e0.getValue();
int partId = e0.getKey();
assert cctx.wal().reserved(cpEntry.checkpointMark())
: "WAL segment for checkpoint " + cpEntry + " has not reserved";
Long updCntr = cpEntry.partitionCounter(cctx, grpId, partId);
if (updCntr != null) {
reservedForExchange.computeIfAbsent(grpId, k -> new HashMap<>())
.put(partId, new T2<>(updCntr, cpEntry.checkpointMark()));
grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
}
}
}
return grpPartsWithCnts;
}
/**
* @return Map of group id -> Set of partitions which can be used as suppliers for WAL rebalance.
*/
private Map<Integer, Set<Integer>> partitionsApplicableForWalRebalance() {
Map<Integer, Set<Integer>> res = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
if (locPart.state() == GridDhtPartitionState.OWNING && locPart.fullSize() > walRebalanceThreshold)
res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
}
}
return res;
}
/** {@inheritDoc} */
@Override public synchronized void releaseHistoryForExchange() {
if (reservedForExchange == null)
return;
FileWALPointer earliestPtr = null;
for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet()) {
FileWALPointer ptr = (FileWALPointer) e0.getValue().get2();
if (earliestPtr == null || ptr.index() < earliestPtr.index())
earliestPtr = ptr;
}
}
reservedForExchange = null;
if (earliestPtr == null)
return;
assert cctx.wal().reserved(earliestPtr)
: "Earliest checkpoint WAL pointer is not reserved for exchange: " + earliestPtr;
try {
cctx.wal().release(earliestPtr);
}
catch (IgniteCheckedException e) {
log.error("Failed to release earliest checkpoint WAL pointer: " + earliestPtr, e);
}
}
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
CheckpointEntry cpEntry = cpHistory.searchCheckpointEntry(grpId, partId, cntr);
if (cpEntry == null)
return false;
WALPointer ptr = cpEntry.checkpointMark();
if (ptr == null)
return false;
boolean reserved = cctx.wal().reserve(ptr);
if (reserved)
reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr));
return reserved;
}
/** {@inheritDoc} */
@Override public void releaseHistoryForPreloading() {
for (Map.Entry<T2<Integer, Integer>, T2<Long, WALPointer>> e : reservedForPreloading.entrySet()) {
try {
cctx.wal().release(e.getValue().get2());
}
catch (IgniteCheckedException ex) {
U.error(log, "Could not release WAL reservation", ex);
throw new IgniteException(ex);
}
}
reservedForPreloading.clear();
}
/**
*
*/
@Nullable @Override public IgniteInternalFuture wakeupForCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp != null)
return cp.wakeupForCheckpoint(0, reason).futureFor(LOCK_RELEASED);
return null;
}
/** {@inheritDoc} */
@Override public <R> void waitForCheckpoint(String reason, IgniteInClosure<? super IgniteInternalFuture<R>> lsnr)
throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp == null)
return;
cp.wakeupForCheckpoint(0, reason, lsnr).futureFor(FINISHED).get();
}
/** {@inheritDoc} */
@Override public CheckpointProgress forceCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp == null)
return null;
return cp.wakeupForCheckpoint(0, reason);
}
/** {@inheritDoc} */
@Override public WALPointer lastCheckpointMarkWalPointer() {
CheckpointEntry lastCheckpointEntry = cpHistory == null ? null : cpHistory.lastCheckpoint();
return lastCheckpointEntry == null ? null : lastCheckpointEntry.checkpointMark();
}
/**
* @return Checkpoint directory.
*/
public File checkpointDirectory() {
return cpDir;
}
/**
* @param lsnr Listener.
*/
public void addCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.add(lsnr);
}
/**
* @param lsnr Listener.
*/
public void removeCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.remove(lsnr);
}
/**
* @return Read checkpoint status.
* @throws IgniteCheckedException If failed to read checkpoint status page.
*/
@SuppressWarnings("TooBroadScope")
private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
long lastStartTs = 0;
long lastEndTs = 0;
UUID startId = CheckpointStatus.NULL_UUID;
UUID endId = CheckpointStatus.NULL_UUID;
File startFile = null;
File endFile = null;
WALPointer startPtr = CheckpointStatus.NULL_PTR;
WALPointer endPtr = CheckpointStatus.NULL_PTR;
File dir = cpDir;
if (!dir.exists()) {
log.warning("Read checkpoint status: checkpoint directory is not found.");
return new CheckpointStatus(0, startId, startPtr, endId, endPtr);
}
File[] files = dir.listFiles();
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
long ts = Long.parseLong(matcher.group(1));
UUID id = UUID.fromString(matcher.group(2));
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START && ts > lastStartTs) {
lastStartTs = ts;
startId = id;
startFile = file;
}
else if (type == CheckpointEntryType.END && ts > lastEndTs) {
lastEndTs = ts;
endId = id;
endFile = file;
}
}
}
ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
if (startFile != null)
startPtr = readPointer(startFile, buf);
if (endFile != null)
endPtr = readPointer(endFile, buf);
if (log.isInfoEnabled())
log.info("Read checkpoint status [startMarker=" + startFile + ", endMarker=" + endFile + ']');
return new CheckpointStatus(lastStartTs, startId, startPtr, endId, endPtr);
}
/**
* Loads WAL pointer from CP file
*
* @param cpMarkerFile Checkpoint mark file.
* @return WAL pointer.
* @throws IgniteCheckedException If failed to read mark file.
*/
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
try (FileIO io = ioFactory.create(cpMarkerFile, READ)) {
io.readFully(buf);
buf.flip();
return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
}
catch (IOException e) {
throw new IgniteCheckedException(
"Failed to read checkpoint pointer from marker file: " + cpMarkerFile.getAbsolutePath(), e);
}
}
/** {@inheritDoc} */
@Override public void startMemoryRestore(GridKernalContext kctx, TimeBag startTimer) throws IgniteCheckedException {
if (kctx.clientNode())
return;
checkpointReadLock();
try {
// Preform early regions startup before restoring state.
initAndStartRegions(kctx.config().getDataStorageConfiguration());
startTimer.finishGlobalStage("Init and start regions");
// Restore binary memory for all not WAL disabled cache groups.
restoreBinaryMemory(
groupsWithEnabledWal(),
physicalRecords()
);
if (recoveryVerboseLogging && log.isInfoEnabled()) {
log.info("Partition states information after BINARY RECOVERY phase:");
dumpPartitionsInfo(cctx, log);
}
startTimer.finishGlobalStage("Restore binary memory");
CheckpointStatus status = readCheckpointStatus();
RestoreLogicalState logicalState = applyLogicalUpdates(
status,
groupsWithEnabledWal(),
logicalRecords(),
true
);
if (recoveryVerboseLogging && log.isInfoEnabled()) {
log.info("Partition states information after LOGICAL RECOVERY phase:");
dumpPartitionsInfo(cctx, log);
}
startTimer.finishGlobalStage("Restore logical state");
walTail = tailPointer(logicalState);
cctx.wal().onDeActivate(kctx);
}
catch (IgniteCheckedException e) {
releaseFileLock();
throw e;
}
finally {
checkpointReadUnlock();
}
}
/**
* @param f Consumer.
* @return Accumulated result for all page stores.
*/
public long forAllPageStores(ToLongFunction<PageStore> f) {
long res = 0;
for (CacheGroupContext gctx : cctx.cache().cacheGroups())
res += forGroupPageStores(gctx, f);
return res;
}
/**
* @param grpId Cache group id.
* @param partId Partition ID.
* @return Page store.
* @throws IgniteCheckedException If failed.
*/
public PageStore getPageStore(int grpId, int partId) throws IgniteCheckedException {
return storeMgr.getStore(grpId, partId);
}
/**
* @param gctx Group context.
* @param f Consumer.
* @return Accumulated result for all page stores.
*/
public long forGroupPageStores(CacheGroupContext gctx, ToLongFunction<PageStore> f) {
int groupId = gctx.groupId();
long res = 0;
try {
Collection<PageStore> stores = storeMgr.getStores(groupId);
if (stores != null) {
for (PageStore store : stores)
res += f.applyAsLong(store);
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
return res;
}
/**
* Calculates tail pointer for WAL at the end of logical recovery.
*
* @param logicalState State after logical recovery.
* @return Tail pointer.
* @throws IgniteCheckedException If failed.
*/
private WALPointer tailPointer(RestoreLogicalState logicalState) throws IgniteCheckedException {
// Should flush all data in buffers before read last WAL pointer.
// Iterator read records only from files.
WALPointer lastFlushPtr = cctx.wal().flush(null, true);
// We must return null for NULL_PTR record, because FileWriteAheadLogManager.resumeLogging
// can't write header without that condition.
WALPointer lastReadPtr = logicalState.lastReadRecordPointer();
if (lastFlushPtr != null && lastReadPtr == null)
return lastFlushPtr;
if (lastFlushPtr == null && lastReadPtr != null)
return lastReadPtr;
if (lastFlushPtr != null && lastReadPtr != null) {
FileWALPointer lastFlushPtr0 = (FileWALPointer)lastFlushPtr;
FileWALPointer lastReadPtr0 = (FileWALPointer)lastReadPtr;
return lastReadPtr0.compareTo(lastFlushPtr0) >= 0 ? lastReadPtr : lastFlushPtr0;
}
return null;
}
/**
* Called when all partitions have been fully restored and pre-created on node start.
*
* Starts checkpointing process and initiates first checkpoint.
*
* @throws IgniteCheckedException If first checkpoint has failed.
*/
@Override public void onStateRestored(AffinityTopologyVersion topVer) throws IgniteCheckedException {
IgniteThread cpThread = new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer);
cpThread.start();
checkpointerThread = cpThread;
CheckpointProgress chp = checkpointer.wakeupForCheckpoint(0, "node started");
if (chp != null)
chp.futureFor(LOCK_RELEASED).get();
}
/**
* @param status Checkpoint status.
* @param cacheGroupsPredicate Cache groups to restore.
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
private RestoreBinaryState performBinaryMemoryRestore(
CheckpointStatus status,
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate,
boolean finalizeState
) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
WALPointer recPtr = status.endPtr;
boolean apply = status.needRestoreMemory();
try {
WALRecord startRec = !CheckpointStatus.NULL_PTR.equals(status.startPtr) || apply ? cctx.wal().read(status.startPtr) : null;
if (apply) {
if (finalizeState)
U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
"finish checkpoint on node start.");
cctx.cache().cacheGroupDescriptors().forEach((grpId, desc) -> {
if (!cacheGroupsPredicate.apply(grpId))
return;
try {
DataRegion region = cctx.database().dataRegion(desc.config().getDataRegionName());
if (region == null || !cctx.isLazyMemoryAllocation(region))
return;
region.pageMemory().start();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
cctx.pageStore().beginRecover();
if (!(startRec instanceof CheckpointRecord))
throw new StorageException("Checkpoint marker doesn't point to checkpoint record " +
"[ptr=" + status.startPtr + ", rec=" + startRec + "]");
WALPointer cpMark = ((CheckpointRecord)startRec).checkpointMark();
if (cpMark != null) {
log.info("Restoring checkpoint after logical recovery, will start physical recovery from " +
"back pointer: " + cpMark);
recPtr = cpMark;
}
}
else
cctx.wal().notchLastCheckpointPtr(status.startPtr);
}
catch (NoSuchElementException e) {
throw new StorageException("Failed to read checkpoint record from WAL, persistence consistency " +
"cannot be guaranteed. Make sure configuration points to correct WAL folders and WAL folder is " +
"properly mounted [ptr=" + status.startPtr + ", walPath=" + persistenceCfg.getWalPath() +
", walArchive=" + persistenceCfg.getWalArchivePath() + "]");
}
AtomicReference<IgniteCheckedException> applyError = new AtomicReference<>();
StripedExecutor exec = cctx.kernalContext().getStripedExecutorService();
Semaphore semaphore = new Semaphore(semaphorePertmits(exec));
long start = U.currentTimeMillis();
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
WALIterator it = cctx.wal().replay(recPtr, recordTypePredicate);
RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, it, lastArchivedSegment, cacheGroupsPredicate);
AtomicLong applied = new AtomicLong();
try {
while (it.hasNextX()) {
if (applyError.get() != null)
break;
WALRecord rec = restoreBinaryState.next();
if (rec == null)
break;
switch (rec.type()) {
case PAGE_RECORD:
if (restoreBinaryState.needApplyBinaryUpdate()) {
PageSnapshot pageSnapshot = (PageSnapshot)rec;
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
int groupId = pageSnapshot.fullPageId().groupId();
int partId = partId(pageSnapshot.fullPageId().pageId());
if (skipRemovedIndexUpdates(groupId, partId))
break;
stripedApplyPage((pageMem) -> {
try {
applyPageSnapshot(pageMem, pageSnapshot);
applied.incrementAndGet();
}
catch (Throwable t) {
U.error(log, "Failed to apply page snapshot. rec=[" + pageSnapshot + ']');
applyError.compareAndSet(
null,
(t instanceof IgniteCheckedException)?
(IgniteCheckedException)t:
new IgniteCheckedException("Failed to apply page snapshot", t));
}
}, groupId, partId, exec, semaphore
);
}
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
{
int groupId = metaStateRecord.groupId();
int partId = metaStateRecord.partitionId();
stripedApplyPage((pageMem) -> {
GridDhtPartitionState state = fromOrdinal(metaStateRecord.state());
if (state == null || state == GridDhtPartitionState.EVICTED)
schedulePartitionDestroy(groupId, partId);
else {
try {
cancelOrWaitPartitionDestroy(groupId, partId);
}
catch (Throwable t) {
U.error(log, "Failed to cancel or wait partition destroy. rec=[" + metaStateRecord + ']');
applyError.compareAndSet(
null,
(t instanceof IgniteCheckedException) ?
(IgniteCheckedException)t :
new IgniteCheckedException("Failed to cancel or wait partition destroy", t));
}
}
}, groupId, partId, exec, semaphore);
}
break;
case PARTITION_DESTROY:
PartitionDestroyRecord destroyRecord = (PartitionDestroyRecord)rec;
{
int groupId = destroyRecord.groupId();
int partId = destroyRecord.partitionId();
stripedApplyPage((pageMem) -> {
pageMem.invalidate(groupId, partId);
schedulePartitionDestroy(groupId, partId);
}, groupId, partId, exec, semaphore);
}
break;
default:
if (restoreBinaryState.needApplyBinaryUpdate() && rec instanceof PageDeltaRecord) {
PageDeltaRecord pageDelta = (PageDeltaRecord)rec;
int groupId = pageDelta.groupId();
int partId = partId(pageDelta.pageId());
if (skipRemovedIndexUpdates(groupId, partId))
break;
stripedApplyPage((pageMem) -> {
try {
applyPageDelta(pageMem, pageDelta, true);
applied.incrementAndGet();
}
catch (Throwable t) {
U.error(log, "Failed to apply page delta. rec=[" + pageDelta + ']');
applyError.compareAndSet(
null,
(t instanceof IgniteCheckedException) ?
(IgniteCheckedException)t :
new IgniteCheckedException("Failed to apply page delta", t));
}
}, groupId, partId, exec, semaphore);
}
}
}
}
finally {
it.close();
awaitApplyComplete(exec, applyError);
}
if (!finalizeState)
return null;
FileWALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
if (status.needRestoreMemory()) {
if (restoreBinaryState.needApplyBinaryUpdate())
throw new StorageException("Failed to restore memory state (checkpoint marker is present " +
"on disk, but checkpoint record is missed in WAL) " +
"[cpStatus=" + status + ", lastRead=" + lastReadPtr + "]");
log.info("Finished applying memory changes [changesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr, exec);
}
cpHistory.initialize(retreiveHistory());
return restoreBinaryState;
}
/**
* Calculate the maximum number of concurrent tasks for apply through the striped executor.
*
* @param exec Striped executor.
* @return Number of permits.
*/
private int semaphorePertmits(StripedExecutor exec) {
// 4 task per-stripe by default.
int permits = exec.stripesCount() * 4;
long maxMemory = Runtime.getRuntime().maxMemory();
// Heuristic calculation part of heap size as a maximum number of concurrent tasks.
int permits0 = (int)((maxMemory * 0.2) / (4096 * 2));
// May be for small heap. Get a low number of permits.
if (permits0 < permits)
permits = permits0;
// Property for override any calculation.
return getInteger(IGNITE_RECOVERY_SEMAPHORE_PERMITS, permits);
}
/**
* @param exec Striped executor.
* @param applyError Check error reference.
*/
private void awaitApplyComplete(
StripedExecutor exec,
AtomicReference<IgniteCheckedException> applyError
) throws IgniteCheckedException {
try {
// Await completion apply tasks in all stripes.
exec.awaitComplete();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
// Checking error after all task applied.
if (applyError.get() != null)
throw applyError.get();
}
/**
* @param consumer Runnable task.
* @param grpId Group Id.
* @param partId Partition Id.
* @param exec Striped executor.
*/
public void stripedApplyPage(
Consumer<PageMemoryEx> consumer,
int grpId,
int partId,
StripedExecutor exec,
Semaphore semaphore
) throws IgniteCheckedException {
assert consumer != null;
assert exec != null;
assert semaphore != null;
PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
if (pageMem == null)
return;
stripedApply(() -> consumer.accept(pageMem), grpId, partId, exec, semaphore);
}
/**
* @param run Runnable task.
* @param grpId Group Id.
* @param partId Partition Id.
* @param exec Striped executor.
*/
public void stripedApply(
Runnable run,
int grpId,
int partId,
StripedExecutor exec,
Semaphore semaphore
) {
assert run != null;
assert exec != null;
assert semaphore != null;
int stripes = exec.stripesCount();
int stripe = U.stripeIdx(stripes, grpId, partId);
assert stripe >= 0 && stripe <= stripes : "idx=" + stripe + ", stripes=" + stripes;
try {
semaphore.acquire();
}
catch (InterruptedException e) {
throw new IgniteInterruptedException(e);
}
exec.execute(stripe, () -> {
// WA for avoid assert check in PageMemory, that current thread hold chpLock.
CHECKPOINT_LOCK_HOLD_COUNT.set(1);
try {
run.run();
}
finally {
CHECKPOINT_LOCK_HOLD_COUNT.set(0);
semaphore.release();
}
});
}
/**
* @param pageMem Page memory.
* @param pageSnapshotRecord Page snapshot record.
* @throws IgniteCheckedException If failed.
*/
public void applyPageSnapshot(PageMemoryEx pageMem, PageSnapshot pageSnapshotRecord) throws IgniteCheckedException {
int grpId = pageSnapshotRecord.fullPageId().groupId();
long pageId = pageSnapshotRecord.fullPageId().pageId();
long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page, true);
try {
PageUtils.putBytes(pageAddr, 0, pageSnapshotRecord.pageData());
if (PageIO.getCompressionType(pageAddr) != CompressionProcessor.UNCOMPRESSED_PAGE) {
int realPageSize = pageMem.realPageSize(pageSnapshotRecord.groupId());
assert pageSnapshotRecord.pageDataSize() < realPageSize : pageSnapshotRecord.pageDataSize();
cctx.kernalContext().compress().decompressPage(pageMem.pageBuffer(pageAddr), realPageSize);
}
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
}
/**
* @param pageMem Page memory.
* @param pageDeltaRecord Page delta record.
* @param restore Get page for restore.
* @throws IgniteCheckedException If failed.
*/
private void applyPageDelta(PageMemoryEx pageMem, PageDeltaRecord pageDeltaRecord, boolean restore) throws IgniteCheckedException {
int grpId = pageDeltaRecord.groupId();
long pageId = pageDeltaRecord.pageId();
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
long page = pageMem.acquirePage(grpId, pageId, IoStatisticsHolderNoOp.INSTANCE, restore);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page, restore);
try {
pageDeltaRecord.applyDelta(pageMem, pageAddr);
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, restore);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
}
/**
* @param grpId Group id.
* @param partId Partition id.
*/
private boolean skipRemovedIndexUpdates(int grpId, int partId) {
return (partId == PageIdAllocator.INDEX_PARTITION) && !storeMgr.hasIndexStore(grpId);
}
/**
* Obtains PageMemory reference from cache descriptor instead of cache context.
*
* @param grpId Cache group id.
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
return (PageMemoryEx)dataRegion(METASTORE_DATA_REGION_NAME).pageMemory();
// TODO IGNITE-7792 add generic mapping.
if (grpId == TxLog.TX_LOG_CACHE_ID)
return (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
// TODO IGNITE-5075: cache descriptor can be removed.
GridCacheSharedContext sharedCtx = context();
CacheGroupDescriptor desc = sharedCtx.cache().cacheGroupDescriptors().get(grpId);
if (desc == null)
return null;
String memPlcName = desc.config().getDataRegionName();
return (PageMemoryEx)sharedCtx.database().dataRegion(memPlcName).pageMemory();
}
/**
* Apply update from some iterator and with specific filters.
*
* @param it WalIterator.
* @param recPredicate Wal record filter.
* @param entryPredicate Entry filter.
*/
public void applyUpdatesOnRecovery(
@Nullable WALIterator it,
IgniteBiPredicate<WALPointer, WALRecord> recPredicate,
IgnitePredicate<DataEntry> entryPredicate
) throws IgniteCheckedException {
if (it == null)
return;
cctx.walState().runWithOutWAL(() -> {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> next = it.next();
WALRecord rec = next.get2();
if (!recPredicate.apply(next.get1(), rec))
break;
switch (rec.type()) {
case MVCC_DATA_RECORD:
case DATA_RECORD:
checkpointReadLock();
try {
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
if (entryPredicate.apply(dataEntry)) {
checkpointReadLock();
try {
int cacheId = dataEntry.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null)
applyUpdate(cacheCtx, dataEntry);
else if (log != null)
log.warning("Cache is not started. Updates cannot be applied " +
"[cacheId=" + cacheId + ']');
}
finally {
checkpointReadUnlock();
}
}
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
checkpointReadUnlock();
}
break;
case MVCC_TX_RECORD:
checkpointReadLock();
try {
MvccTxRecord txRecord = (MvccTxRecord)rec;
byte txState = convertToTxState(txRecord.state());
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, true);
}
finally {
checkpointReadUnlock();
}
break;
default:
// Skip other records.
}
}
});
}
/**
* @param status Last registered checkpoint status.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
private RestoreLogicalState applyLogicalUpdates(
CheckpointStatus status,
IgnitePredicate<Integer> cacheGroupsPredicate,
IgniteBiPredicate<WALRecord.RecordType, WALPointer> recordTypePredicate,
boolean skipFieldLookup
) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
if (skipFieldLookup)
cctx.kernalContext().query().skipFieldLookup(true);
long start = U.currentTimeMillis();
AtomicReference<IgniteCheckedException> applyError = new AtomicReference<>();
AtomicLong applied = new AtomicLong();
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
StripedExecutor exec = cctx.kernalContext().getStripedExecutorService();
Semaphore semaphore = new Semaphore(semaphorePertmits(exec));
Map<GroupPartitionId, Integer> partitionRecoveryStates = new HashMap<>();
WALIterator it = cctx.wal().replay(status.startPtr, recordTypePredicate);
RestoreLogicalState restoreLogicalState =
new RestoreLogicalState(status, it, lastArchivedSegment, cacheGroupsPredicate, partitionRecoveryStates);
try {
while (it.hasNextX()) {
WALRecord rec = restoreLogicalState.next();
if (rec == null)
break;
switch (rec.type()) {
case CHECKPOINT_RECORD: // Calculate initial partition states
CheckpointRecord cpRec = (CheckpointRecord)rec;
for (Map.Entry<Integer, CacheState> entry : cpRec.cacheGroupStates().entrySet()) {
CacheState cacheState = entry.getValue();
for (int i = 0; i < cacheState.size(); i++) {
int partId = cacheState.partitionByIndex(i);
byte state = cacheState.stateByIndex(i);
// Ignore undefined state.
if (state != -1) {
partitionRecoveryStates.put(new GroupPartitionId(entry.getKey(), partId),
(int)state);
}
}
}
break;
case ROLLBACK_TX_RECORD:
RollbackRecord rbRec = (RollbackRecord)rec;
CacheGroupContext ctx = cctx.cache().cacheGroup(rbRec.groupId());
if (ctx != null && !ctx.isLocal()) {
ctx.topology().forceCreatePartition(rbRec.partitionId());
ctx.offheap().onPartitionInitialCounterUpdated(rbRec.partitionId(), rbRec.start(),
rbRec.range());
}
break;
case MVCC_DATA_RECORD:
case DATA_RECORD:
case ENCRYPTED_DATA_RECORD:
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
int cacheId = dataEntry.cacheId();
DynamicCacheDescriptor cacheDesc = cctx.cache().cacheDescriptor(cacheId);
// Can empty in case recovery node on blt changed.
if (cacheDesc == null)
continue;
stripedApply(() -> {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (skipRemovedIndexUpdates(cacheCtx.groupId(), PageIdAllocator.INDEX_PARTITION))
cctx.kernalContext().query().markAsRebuildNeeded(cacheCtx);
try {
applyUpdate(cacheCtx, dataEntry);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to apply data entry, dataEntry=" + dataEntry +
", ptr=" + dataRec.position());
applyError.compareAndSet(null, e);
}
applied.incrementAndGet();
}, cacheDesc.groupId(), dataEntry.partitionId(), exec, semaphore);
}
break;
case MVCC_TX_RECORD:
MvccTxRecord txRecord = (MvccTxRecord)rec;
byte txState = convertToTxState(txRecord.state());
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, true);
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
GroupPartitionId groupPartitionId = new GroupPartitionId(
metaStateRecord.groupId(), metaStateRecord.partitionId()
);
restoreLogicalState.partitionRecoveryStates.put(groupPartitionId, (int)metaStateRecord.state());
break;
case METASTORE_DATA_RECORD:
MetastoreDataRecord metastoreDataRecord = (MetastoreDataRecord)rec;
metaStorage.applyUpdate(metastoreDataRecord.key(), metastoreDataRecord.value());
break;
case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_ALLOCATED_INDEX:
PageDeltaRecord pageDelta = (PageDeltaRecord)rec;
stripedApplyPage((pageMem) -> {
try {
applyPageDelta(pageMem, pageDelta, false);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to apply page delta, " + pageDelta);
applyError.compareAndSet(null, e);
}
}, pageDelta.groupId(), partId(pageDelta.pageId()), exec, semaphore);
break;
case MASTER_KEY_CHANGE_RECORD:
cctx.kernalContext().encryption().applyKeys((MasterKeyChangeRecord)rec);
break;
default:
// Skip other records.
}
}
}
finally {
it.close();
if (skipFieldLookup)
cctx.kernalContext().query().skipFieldLookup(false);
}
awaitApplyComplete(exec, applyError);
if (log.isInfoEnabled())
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + " ms]");
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.afterLogicalUpdatesApplied(this, restoreLogicalState);
return restoreLogicalState;
}
/**
* Convert {@link TransactionState} to Mvcc {@link TxState}.
*
* @param state TransactionState.
* @return TxState.
*/
private byte convertToTxState(TransactionState state) {
switch (state) {
case PREPARED:
return TxState.PREPARED;
case COMMITTED:
return TxState.COMMITTED;
case ROLLED_BACK:
return TxState.ABORTED;
default:
throw new IllegalStateException("Unsupported TxState.");
}
}
/**
* Wal truncate callBack.
*
* @param highBound WALPointer.
*/
public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
List<CheckpointEntry> removedFromHistory = cpHistory.onWalTruncated(highBound);
for (CheckpointEntry cp : removedFromHistory)
removeCheckpointFiles(cp);
}
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
* @throws IgniteCheckedException If failed to restore.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
int partId = dataEntry.partitionId();
if (partId == -1)
partId = cacheCtx.affinity().partition(dataEntry.key());
GridDhtLocalPartition locPart = cacheCtx.isLocal() ? null : cacheCtx.topology().forceCreatePartition(partId);
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
if (dataEntry instanceof MvccDataEntry) {
cacheCtx.offheap().mvccApplyUpdate(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
dataEntry.expireTime(),
locPart,
((MvccDataEntry)dataEntry).mvccVer());
}
else {
cacheCtx.offheap().update(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
dataEntry.expireTime(),
locPart,
null);
}
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter() - 1, 1);
break;
case DELETE:
if (dataEntry instanceof MvccDataEntry) {
cacheCtx.offheap().mvccApplyUpdate(
cacheCtx,
dataEntry.key(),
null,
dataEntry.writeVersion(),
0L,
locPart,
((MvccDataEntry)dataEntry).mvccVer());
}
else
cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter() - 1, 1);
break;
case READ:
// do nothing
break;
default:
throw new IgniteCheckedException("Invalid operation for WAL entry update: " + dataEntry.op());
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void finalizeCheckpointOnRecovery(
long cpTs,
UUID cpId,
WALPointer walPtr,
StripedExecutor exec
) throws IgniteCheckedException {
assert cpTs != 0;
long start = System.currentTimeMillis();
Collection<DataRegion> regions = dataRegions();
Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(regions.size());
int pagesNum = 0;
GridFinishedFuture finishedFuture = new GridFinishedFuture();
// Collect collection of dirty pages from all regions.
for (DataRegion memPlc : regions) {
if (memPlc.config().isPersistenceEnabled()){
GridMultiCollectionWrapper<FullPageId> nextCpPagesCol =
((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint(finishedFuture);
pagesNum += nextCpPagesCol.size();
res.add(nextCpPagesCol);
}
}
// Sort and split all dirty pages set to several stripes.
GridMultiCollectionWrapper<FullPageId> pages = splitAndSortCpPagesIfNeeded(
new IgniteBiTuple<>(res, pagesNum), exec.stripesCount());
// Identity stores set for future fsync.
Collection<PageStore> updStores = new GridConcurrentHashSet<>();
AtomicInteger cpPagesCnt = new AtomicInteger();
// Shared refernce for tracking exception during write pages.
AtomicReference<IgniteCheckedException> writePagesError = new AtomicReference<>();
for (int i = 0; i < pages.collectionsSize(); i++) {
// Calculate stripe index.
int stripeIdx = i % exec.stripesCount();
// Inner collection index.
int innerIdx = i;
exec.execute(stripeIdx, () -> {
PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
int groupId = fullPageId.groupId();
long pageId = fullPageId.pageId();
// Write buf to page store.
PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
// Save store for future fsync.
updStores.add(store);
};
// Local buffer for write pages.
ByteBuffer writePageBuf = ByteBuffer.allocateDirect(pageSize());
writePageBuf.order(ByteOrder.nativeOrder());
Collection<FullPageId> pages0 = pages.innerCollection(innerIdx);
FullPageId fullPageId = null;
try {
for (FullPageId fullId : pages0) {
// Fail-fast break if some exception occurred.
if (writePagesError.get() != null)
break;
// Save pageId to local variable for future using if exception occurred.
fullPageId = fullId;
PageMemoryEx pageMem = getPageMemoryForCacheGroup(fullId.groupId());
// Write page content to page store via pageStoreWriter.
// Tracker is null, because no need to track checkpoint metrics on recovery.
pageMem.checkpointWritePage(fullId, writePageBuf, pageStoreWriter, null);
}
// Add number of handled pages.
cpPagesCnt.addAndGet(pages0.size());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to write page to pageStore, pageId=" + fullPageId);
writePagesError.compareAndSet(null, e);
}
});
}
// Await completion all write tasks.
awaitApplyComplete(exec, writePagesError);
long written = U.currentTimeMillis();
// Fsync all touched stores.
for (PageStore updStore : updStores)
updStore.sync();
long fsync = U.currentTimeMillis();
for (DataRegion memPlc : regions) {
if (memPlc.config().isPersistenceEnabled())
((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
}
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
CheckpointEntry cp = prepareCheckpointEntry(
tmpWriteBuf,
cpTs,
cpId,
walPtr,
null,
CheckpointEntryType.END);
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.END);
cctx.pageStore().finishRecover();
if (log.isInfoEnabled())
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"pagesWrite=%dms, fsync=%dms, total=%dms]",
cpId,
cpPagesCnt.get(),
walPtr,
written - start,
fsync - written,
fsync - start));
}
/**
* Prepares checkpoint entry containing WAL pointer to checkpoint record.
* Writes into given {@code ptrBuf} WAL pointer content.
*
* @param entryBuf Buffer to fill
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint id.
* @param ptr WAL pointer containing record.
* @param rec Checkpoint WAL record.
* @param type Checkpoint type.
* @return Checkpoint entry.
*/
private CheckpointEntry prepareCheckpointEntry(
ByteBuffer entryBuf,
long cpTs,
UUID cpId,
WALPointer ptr,
@Nullable CheckpointRecord rec,
CheckpointEntryType type
) {
assert ptr instanceof FileWALPointer;
FileWALPointer filePtr = (FileWALPointer)ptr;
entryBuf.rewind();
entryBuf.putLong(filePtr.index());
entryBuf.putInt(filePtr.fileOffset());
entryBuf.putInt(filePtr.length());
entryBuf.flip();
return createCheckPointEntry(cpTs, ptr, cpId, rec, type);
}
/**
* Writes checkpoint entry buffer {@code entryBuf} to specified checkpoint file with 2-phase protocol.
*
* @param entryBuf Checkpoint entry buffer to write.
* @param cp Checkpoint entry.
* @param type Checkpoint entry type.
* @throws StorageException If failed to write checkpoint entry.
*/
public void writeCheckpointEntry(ByteBuffer entryBuf, CheckpointEntry cp, CheckpointEntryType type) throws StorageException {
String fileName = checkpointFileName(cp, type);
String tmpFileName = fileName + FilePageStoreManager.TMP_SUFFIX;
try {
try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : tmpFileName).toFile(),
StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
io.writeFully(entryBuf);
entryBuf.clear();
if (!skipSync)
io.force(true);
}
if (!skipSync)
Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
}
catch (IOException e) {
throw new StorageException("Failed to write checkpoint entry [ptr=" + cp.checkpointMark()
+ ", cpTs=" + cp.timestamp()
+ ", cpId=" + cp.checkpointId()
+ ", type=" + type + "]", e);
}
}
/** {@inheritDoc} */
@Override public AtomicInteger writtenPagesCounter() {
return writtenPagesCntr;
}
/** {@inheritDoc} */
@Override public AtomicInteger syncedPagesCounter() {
return syncedPagesCntr;
}
/** {@inheritDoc} */
@Override public AtomicInteger evictedPagesCntr() {
return evictedPagesCntr;
}
/** {@inheritDoc} */
@Override public int currentCheckpointPagesCount() {
return currCheckpointPagesCnt;
}
/**
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint ID.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
private static String checkpointFileName(long cpTs, UUID cpId, CheckpointEntryType type) {
return cpTs + "-" + cpId + "-" + type + ".bin";
}
/**
* @param cp Checkpoint entry.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
public static String checkpointFileName(CheckpointEntry cp, CheckpointEntryType type) {
return checkpointFileName(cp.timestamp(), cp.checkpointId(), type);
}
/**
* Replace thread local with buffers. Thread local should provide direct buffer with one page in length.
*
* @param threadBuf new thread-local with buffers for the checkpoint threads.
*/
public void setThreadBuf(final ThreadLocal<ByteBuffer> threadBuf) {
this.threadBuf = threadBuf;
}
/**
* @param cpTs Checkpoint timestamp.
* @param ptr Wal pointer of checkpoint.
* @param cpId Checkpoint ID.
* @param rec Checkpoint record.
* @param type Checkpoint type.
*
* @return Checkpoint entry.
*/
public CheckpointEntry createCheckPointEntry(
long cpTs,
WALPointer ptr,
UUID cpId,
@Nullable CheckpointRecord rec,
CheckpointEntryType type
) {
assert cpTs > 0;
assert ptr != null;
assert cpId != null;
assert type != null;
Map<Integer, CacheState> cacheGrpStates = null;
// Do not hold groups state in-memory if there is no space in the checkpoint history to prevent possible OOM.
// In this case the actual group states will be readed from WAL by demand.
if (rec != null && cpHistory.hasSpace())
cacheGrpStates = rec.cacheGroupStates();
return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
}
/**
* @return Checkpoint history.
*/
@Nullable public CheckpointHistory checkpointHistory() {
return cpHistory;
}
/**
* Adds given partition to checkpointer destroy queue.
*
* @param grpId Group ID.
* @param partId Partition ID.
*/
public void schedulePartitionDestroy(int grpId, int partId) {
Checkpointer cp = checkpointer;
if (cp != null)
cp.schedulePartitionDestroy(cctx.cache().cacheGroup(grpId), grpId, partId);
}
/**
* Cancels or wait for partition destroy.
*
* @param grpId Group ID.
* @param partId Partition ID.
* @throws IgniteCheckedException If failed.
*/
public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp != null)
cp.cancelOrWaitPartitionDestroy(grpId, partId);
}
/**
* Timeout for checkpoint read lock acquisition.
*
* @return Timeout for checkpoint read lock acquisition in milliseconds.
*/
@Override public long checkpointReadLockTimeout() {
return checkpointReadLockTimeout;
}
/**
* Sets timeout for checkpoint read lock acquisition.
*
* @param val New timeout in milliseconds, non-positive value denotes infinite timeout.
*/
@Override public void checkpointReadLockTimeout(long val) {
checkpointReadLockTimeout = val;
}
/**
* @return Holder for page list cache limit for given data region.
*/
public AtomicLong pageListCacheLimitHolder(DataRegion dataRegion) {
if (dataRegion.config().isPersistenceEnabled()) {
return pageListCacheLimits.computeIfAbsent(dataRegion.config().getName(), name -> new AtomicLong(
(long)(((PageMemoryEx)dataRegion.pageMemory()).totalPages() * PAGE_LIST_CACHE_LIMIT_THRESHOLD)));
}
return null;
}
/**
* Partition destroy queue.
*/
private static class PartitionDestroyQueue {
/** */
private final ConcurrentMap<T2<Integer, Integer>, PartitionDestroyRequest> pendingReqs =
new ConcurrentHashMap<>();
/**
* @param grpCtx Group context.
* @param partId Partition ID to destroy.
*/
private void addDestroyRequest(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
PartitionDestroyRequest req = new PartitionDestroyRequest(grpId, partId);
PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grpId, partId), req);
assert old == null || grpCtx == null : "Must wait for old destroy request to finish before adding a new one "
+ "[grpId=" + grpId
+ ", grpName=" + grpCtx.cacheOrGroupName()
+ ", partId=" + partId + ']';
}
/**
* @param destroyId Destroy ID.
* @return Destroy request to complete if was not concurrently cancelled.
*/
private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) {
PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId);
return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null;
}
/**
* @param grpId Group ID.
* @param partId Partition ID.
* @return Destroy request to wait for if destroy has begun.
*/
private PartitionDestroyRequest cancelDestroy(int grpId, int partId) {
PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(grpId, partId));
return rmvd == null ? null : !rmvd.cancel() ? rmvd : null;
}
}
/**
* Partition destroy request.
*/
private static class PartitionDestroyRequest {
/** */
private final int grpId;
/** */
private final int partId;
/** Destroy cancelled flag. */
private boolean cancelled;
/** Destroy future. Not null if partition destroy has begun. */
private GridFutureAdapter<Void> destroyFut;
/**
* @param grpId Group ID.
* @param partId Partition ID.
*/
private PartitionDestroyRequest(int grpId, int partId) {
this.grpId = grpId;
this.partId = partId;
}
/**
* Cancels partition destroy request.
*
* @return {@code False} if this request needs to be waited for.
*/
private synchronized boolean cancel() {
if (destroyFut != null) {
assert !cancelled;
return false;
}
cancelled = true;
return true;
}
/**
* Initiates partition destroy.
*
* @return {@code True} if destroy request should be executed, {@code false} otherwise.
*/
private synchronized boolean beginDestroy() {
if (cancelled) {
assert destroyFut == null;
return false;
}
if (destroyFut != null)
return false;
destroyFut = new GridFutureAdapter<>();
return true;
}
/**
*
*/
private synchronized void onDone(Throwable err) {
assert destroyFut != null;
destroyFut.onDone(err);
}
/**
*
*/
private void waitCompleted() throws IgniteCheckedException {
GridFutureAdapter<Void> fut;
synchronized (this) {
assert destroyFut != null;
fut = destroyFut;
}
fut.get();
}
/** {@inheritDoc} */
@Override public String toString() {
return "PartitionDestroyRequest [grpId=" + grpId + ", partId=" + partId + ']';
}
}
/**
* Checkpointer object is used for notification on checkpoint begin, predicate is {@link #scheduledCp}<code>.nextCpTs - now
* > 0 </code>. Method {@link #wakeupForCheckpoint} uses notify, {@link #waitCheckpointEvent} uses wait
*/
@SuppressWarnings("NakedNotify")
public class Checkpointer extends GridWorker {
/** Checkpoint started log message format. */
private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" +
"checkpointId=%s, " +
"startPtr=%s, " +
"checkpointBeforeLockTime=%dms, " +
"checkpointLockWait=%dms, " +
"checkpointListenersExecuteTime=%dms, " +
"checkpointLockHoldTime=%dms, " +
"walCpRecordFsyncDuration=%dms, " +
"writeCheckpointEntryDuration=%dms, " +
"splitAndSortCpPagesDuration=%dms, " +
"%s pages=%d, " +
"reason='%s']";
/** Temporary write buffer. */
private final ByteBuffer tmpWriteBuf;
/** Next scheduled checkpoint progress. */
private volatile CheckpointProgressImpl scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
@Nullable private volatile CheckpointProgressImpl curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
/** */
private long lastCpTs;
/** Pause detector. */
private final LongJVMPauseDetector pauseDetector;
/** Long JVM pause threshold. */
private final int longJvmPauseThreshold =
getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);
/**
* @param gridName Grid name.
* @param name Thread name.
* @param log Logger.
*/
protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log, cctx.kernalContext().workersRegistry());
scheduledCp = new CheckpointProgressImpl(checkpointFreq);
tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
pauseDetector = cctx.kernalContext().longJvmPauseDetector();
}
/**
* @return Progress of current chekpoint or {@code null}, if isn't checkpoint at this moment.
*/
public @Nullable CheckpointProgress currentProgress(){
return curCpProgress;
}
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
try {
while (!isCancelled()) {
waitCheckpointEvent();
if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
if (log.isInfoEnabled())
log.warning("Skipping last checkpoint because node is stopping.");
return;
}
GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
if (enableChangeApplied != null) {
enableChangeApplied.onDone();
GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
}
if (checkpointsEnabled)
doCheckpoint();
else {
synchronized (this) {
scheduledCp.nextCpNanos = System.nanoTime() + U.millisToNanos(checkpointFreq);
}
}
}
// Final run after the cancellation.
if (checkpointsEnabled && !shutdownNow)
doCheckpoint();
}
catch (Throwable t) {
err = t;
scheduledCp.fail(t);
throw t;
}
finally {
if (err == null && !(stopping && isCancelled))
err = new IllegalStateException("Thread is terminated unexpectedly: " + name());
if (err instanceof OutOfMemoryError)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
scheduledCp.fail(new NodeStoppingException("Node is stopping."));
}
}
/**
*
*/
private CheckpointProgress wakeupForCheckpoint(long delayFromNow, String reason) {
return wakeupForCheckpoint(delayFromNow, reason, null);
}
/**
*
*/
private <R> CheckpointProgress wakeupForCheckpoint(
long delayFromNow,
String reason,
IgniteInClosure<? super IgniteInternalFuture<R>> lsnr
) {
if (lsnr != null) {
//To be sure lsnr always will be executed in checkpoint thread.
synchronized (this) {
CheckpointProgressImpl sched = scheduledCp;
sched.futureFor(FINISHED).listen(lsnr);
}
}
CheckpointProgressImpl sched = scheduledCp;
long nextNanos = System.nanoTime() + U.millisToNanos(delayFromNow);
if (sched.nextCpNanos - nextNanos <= 0)
return sched;
synchronized (this) {
sched = scheduledCp;
if (sched.nextCpNanos - nextNanos > 0) {
sched.reason = reason;
sched.nextCpNanos = nextNanos;
}
notifyAll();
}
return sched;
}
/**
* @param snapshotOperation Snapshot operation.
*/
public IgniteInternalFuture wakeupForSnapshotCreation(SnapshotOperation snapshotOperation) {
GridFutureAdapter<Object> ret;
synchronized (this) {
scheduledCp.nextCpNanos = System.nanoTime();
scheduledCp.reason = "snapshot";
scheduledCp.nextSnapshot = true;
scheduledCp.snapshotOperation = snapshotOperation;
ret = scheduledCp.futureFor(LOCK_RELEASED);
notifyAll();
}
return ret;
}
/**
*
*/
private void doCheckpoint() {
Checkpoint chp = null;
try {
CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
try {
chp = markCheckpointBegin(tracker);
}
catch (Exception e) {
if (curCpProgress != null)
curCpProgress.fail(e);
// In case of checkpoint initialization error node should be invalidated and stopped.
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteException(e); // Re-throw as unchecked exception to force stopping checkpoint thread.
}
updateHeartbeat();
currCheckpointPagesCnt = chp.pagesSize;
writtenPagesCntr = new AtomicInteger();
syncedPagesCntr = new AtomicInteger();
evictedPagesCntr = new AtomicInteger();
boolean success = false;
int destroyedPartitionsCnt;
try {
if (chp.hasDelta()) {
// Identity stores set.
ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>();
CountDownFuture doneWriteFut = new CountDownFuture(
asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
tracker.onPagesWriteStart();
final int totalPagesToWriteCnt = chp.cpPages.size();
if (asyncRunner != null) {
for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
Runnable write = new WriteCheckpointPages(
tracker,
chp.cpPages.innerCollection(i),
updStores,
doneWriteFut,
totalPagesToWriteCnt,
new Runnable() {
@Override public void run() {
updateHeartbeat();
}
},
asyncRunner
);
try {
asyncRunner.execute(write);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
updateHeartbeat();
write.run();
}
}
}
else {
// Single-threaded checkpoint.
updateHeartbeat();
Runnable write = new WriteCheckpointPages(
tracker,
chp.cpPages,
updStores,
doneWriteFut,
totalPagesToWriteCnt,
new Runnable() {
@Override public void run() {
updateHeartbeat();
}
},
null);
write.run();
}
updateHeartbeat();
// Wait and check for errors.
doneWriteFut.get();
// Must re-check shutdown flag here because threads may have skipped some pages.
// If so, we should not put finish checkpoint mark.
if (shutdownNow) {
chp.progress.fail(new NodeStoppingException("Node is stopping."));
return;
}
tracker.onFsyncStart();
if (!skipSync) {
for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) {
if (shutdownNow) {
chp.progress.fail(new NodeStoppingException("Node is stopping."));
return;
}
blockingSectionBegin();
try {
updStoreEntry.getKey().sync();
}
finally {
blockingSectionEnd();
}
syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue());
}
}
}
else {
tracker.onPagesWriteStart();
tracker.onFsyncStart();
}
snapshotMgr.afterCheckpointPageWritten();
destroyedPartitionsCnt = destroyEvictedPartitions();
// Must mark successful checkpoint only if there are no exceptions or interrupts.
success = true;
}
finally {
if (success)
markCheckpointEnd(chp);
}
tracker.onEnd();
if (chp.hasDelta() || destroyedPartitionsCnt > 0) {
if (printCheckpointStats) {
if (log.isInfoEnabled()) {
String walSegsCoveredMsg = prepareWalSegsCoveredMsg(chp.walSegsCoveredRange);
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"walSegmentsCleared=%d, walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
"total=%dms]",
chp.cpEntry != null ? chp.cpEntry.checkpointId() : "",
chp.pagesSize,
chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "",
chp.walFilesDeleted,
walSegsCoveredMsg,
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration()));
}
}
}
updateMetrics(chp, tracker);
}
catch (IgniteCheckedException e) {
if (chp != null)
chp.progress.fail(e);
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
/**
* @param chp Checkpoint.
* @param tracker Tracker.
*/
private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) {
if (persStoreMetrics.metricsEnabled()) {
persStoreMetrics.onCheckpoint(
tracker.lockWaitDuration(),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten(),
forAllPageStores(PageStore::size),
forAllPageStores(PageStore::getSparseSize));
}
}
/** */
private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) {
String res;
long startIdx = walRange.get1();
long endIdx = walRange.get2();
if (endIdx < 0 || endIdx < startIdx)
res = "[]";
else if (endIdx == startIdx)
res = "[" + endIdx + "]";
else
res = "[" + startIdx + " - " + endIdx + "]";
return res;
}
/**
* Processes all evicted partitions scheduled for destroy.
*
* @throws IgniteCheckedException If failed.
*
* @return The number of destroyed partition files.
*/
private int destroyEvictedPartitions() throws IgniteCheckedException {
PartitionDestroyQueue destroyQueue = curCpProgress.destroyQueue;
if (destroyQueue.pendingReqs.isEmpty())
return 0;
List<PartitionDestroyRequest> reqs = null;
for (final PartitionDestroyRequest req : destroyQueue.pendingReqs.values()) {
if (!req.beginDestroy())
continue;
final int grpId = req.grpId;
final int partId = req.partId;
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
assert grp != null
: "Cache group is not initialized [grpId=" + grpId + "]";
assert grp.offheap() instanceof GridCacheOffheapManager
: "Destroying partition files when persistence is off " + grp.offheap();
final GridCacheOffheapManager offheap = (GridCacheOffheapManager) grp.offheap();
Runnable destroyPartTask = () -> {
try {
offheap.destroyPartitionStore(grpId, partId);
req.onDone(null);
grp.metrics().decrementInitializedLocalPartitions();
if (log.isDebugEnabled())
log.debug("Partition file has destroyed [grpId=" + grpId + ", partId=" + partId + "]");
}
catch (Exception e) {
req.onDone(new IgniteCheckedException(
"Partition file destroy has failed [grpId=" + grpId + ", partId=" + partId + "]", e));
}
};
if (asyncRunner != null) {
try {
asyncRunner.execute(destroyPartTask);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
destroyPartTask.run();
}
}
else
destroyPartTask.run();
if (reqs == null)
reqs = new ArrayList<>();
reqs.add(req);
}
if (reqs != null)
for (PartitionDestroyRequest req : reqs)
req.waitCompleted();
destroyQueue.pendingReqs.clear();
return reqs != null ? reqs.size() : 0;
}
/**
* @param grpCtx Group context. Can be {@code null} in case of crash recovery.
* @param grpId Group ID.
* @param partId Partition ID.
*/
private void schedulePartitionDestroy(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
synchronized (this) {
scheduledCp.destroyQueue.addDestroyRequest(grpCtx, grpId, partId);
}
if (log.isDebugEnabled())
log.debug("Partition file has been scheduled to destroy [grpId=" + grpId + ", partId=" + partId + "]");
if (grpCtx != null)
wakeupForCheckpoint(PARTITION_DESTROY_CHECKPOINT_TIMEOUT, "partition destroy");
}
/**
* @param grpId Group ID.
* @param partId Partition ID.
*/
private void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
PartitionDestroyRequest req;
synchronized (this) {
req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId);
}
if (req != null)
req.waitCompleted();
CheckpointProgressImpl cur;
synchronized (this) {
cur = curCpProgress;
if (cur != null)
req = cur.destroyQueue.cancelDestroy(grpId, partId);
}
if (req != null)
req.waitCompleted();
if (req != null && log.isDebugEnabled())
log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
}
/**
*
*/
private void waitCheckpointEvent() {
boolean cancel = false;
try {
synchronized (this) {
long remaining = U.nanosToMillis(scheduledCp.nextCpNanos - System.nanoTime());
while (remaining > 0 && !isCancelled()) {
blockingSectionBegin();
try {
wait(remaining);
remaining = U.nanosToMillis(scheduledCp.nextCpNanos - System.nanoTime());
}
finally {
blockingSectionEnd();
}
}
}
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
cancel = true;
}
if (cancel)
isCancelled = true;
}
/**
*
*/
@SuppressWarnings("TooBroadScope")
private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
long cpTs = updateLastCheckpointTime();
CheckpointProgressImpl curr = scheduledCp;
CheckpointRecord cpRec = new CheckpointRecord(memoryRecoveryRecordPtr);
memoryRecoveryRecordPtr = null;
CheckpointEntry cp = null;
IgniteFuture snapFut = null;
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
boolean hasPages, hasPartitionsToDestroy;
DbCheckpointContextImpl ctx0 = new DbCheckpointContextImpl(curr, new PartitionAllocationMap());
internalReadLock();
try {
for (DbCheckpointListener lsnr : lsnrs)
lsnr.beforeCheckpointBegin(ctx0);
ctx0.awaitPendingTasksFinished();
}
finally {
internalReadUnlock();
}
tracker.onLockWaitStart();
checkpointLock.writeLock().lock();
try {
updateCurrentCheckpointProgress();
assert curCpProgress == curr : "Concurrent checkpoint begin should not be happened";
tracker.onMarkStart();
// Listeners must be invoked before we write checkpoint record to WAL.
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onMarkCheckpointBegin(ctx0);
ctx0.awaitPendingTasksFinished();
tracker.onListenersExecuteEnd();
if (curr.nextSnapshot)
snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, ctx0.partitionStatMap());
fillCacheGroupState(cpRec);
//There are allowable to replace pages only after checkpoint entry was stored to disk.
cpPagesTuple = beginAllCheckpoints(curr.futureFor(MARKER_STORED_TO_DISK));
hasPages = hasPageForWrite(cpPagesTuple.get1());
hasPartitionsToDestroy = !curr.destroyQueue.pendingReqs.isEmpty();
WALPointer cpPtr = null;
if (hasPages || curr.nextSnapshot || hasPartitionsToDestroy) {
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
if (cpPtr == null)
cpPtr = CheckpointStatus.NULL_PTR;
}
if (hasPages || hasPartitionsToDestroy) {
cp = prepareCheckpointEntry(
tmpWriteBuf,
cpTs,
cpRec.checkpointId(),
cpPtr,
cpRec,
CheckpointEntryType.START);
cpHistory.addCheckpoint(cp);
}
}
finally {
checkpointLock.writeLock().unlock();
tracker.onLockRelease();
}
DbCheckpointListener.Context ctx = createOnCheckpointBeginContext(ctx0, hasPages);
curr.transitTo(LOCK_RELEASED);
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx);
if (snapFut != null) {
try {
snapFut.get();
}
catch (IgniteException e) {
U.error(log, "Failed to wait for snapshot operation initialization: " +
curr.snapshotOperation, e);
}
}
if (hasPages || hasPartitionsToDestroy) {
assert cp != null;
assert cp.checkpointMark() != null;
tracker.onWalCpRecordFsyncStart();
// Sync log outside the checkpoint write lock.
cctx.wal().flush(cp.checkpointMark(), true);
tracker.onWalCpRecordFsyncEnd();
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.START);
curr.transitTo(MARKER_STORED_TO_DISK);
tracker.onSplitAndSortCpPagesStart();
GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(
cpPagesTuple, persistenceCfg.getCheckpointThreads());
tracker.onSplitAndSortCpPagesEnd();
if (printCheckpointStats && log.isInfoEnabled()) {
long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);
log.info(
String.format(
CHECKPOINT_STARTED_LOG_FORMAT,
cpRec.checkpointId(),
cp.checkpointMark(),
tracker.beforeLockDuration(),
tracker.lockWaitDuration(),
tracker.listenersExecuteDuration(),
tracker.lockHoldDuration(),
tracker.walCpRecordFsyncDuration(),
tracker.writeCheckpointEntryDuration(),
tracker.splitAndSortCpPagesDuration(),
possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms," : "",
cpPages.size(),
curr.reason
)
);
}
return new Checkpoint(cp, cpPages, curr);
}
else {
if (curr.nextSnapshot)
cctx.wal().flush(null, true);
if (printCheckpointStats) {
if (log.isInfoEnabled())
LT.info(log, String.format("Skipping checkpoint (no pages were modified) [" +
"checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, " +
"checkpointListenersExecuteTime=%dms, checkpointLockHoldTime=%dms, reason='%s']",
tracker.beforeLockDuration(),
tracker.lockWaitDuration(),
tracker.listenersExecuteDuration(),
tracker.lockHoldDuration(),
curr.reason));
}
return new Checkpoint(null, new GridMultiCollectionWrapper<>(new Collection[0]), curr);
}
}
/**
* @param tracker Checkpoint metrics tracker.
* @return Duration of possible JVM pause, if it was detected, or {@code -1} otherwise.
*/
private long possibleLongJvmPauseDuration(CheckpointMetricsTracker tracker) {
if (LongJVMPauseDetector.enabled()) {
if (tracker.lockWaitDuration() + tracker.lockHoldDuration() > longJvmPauseThreshold) {
long now = System.currentTimeMillis();
// We must get last wake up time before search possible pause in events map.
long wakeUpTime = pauseDetector.getLastWakeUpTime();
IgniteBiTuple<Long, Long> lastLongPause = pauseDetector.getLastLongPause();
if (lastLongPause != null && tracker.checkpointStartTime() < lastLongPause.get1())
return lastLongPause.get2();
if (now - wakeUpTime > longJvmPauseThreshold)
return now - wakeUpTime;
}
}
return -1L;
}
/**
* Take read lock for internal use.
*/
private void internalReadUnlock() {
checkpointLock.readLock().unlock();
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
/**
* Release read lock.
*/
private void internalReadLock() {
checkpointLock.readLock().lock();
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
}
/**
* Fill cache group state in checkpoint record.
*
* @param cpRec Checkpoint record for filling.
* @throws IgniteCheckedException if fail.
*/
private void fillCacheGroupState(CheckpointRecord cpRec) throws IgniteCheckedException {
GridCompoundFuture grpHandleFut = asyncRunner == null ? null : new GridCompoundFuture();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.walEnabled())
continue;
Runnable r = () -> {
ArrayList<GridDhtLocalPartition> parts = new ArrayList<>(grp.topology().localPartitions().size());
for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
parts.add(part);
CacheState state = new CacheState(parts.size());
for (GridDhtLocalPartition part : parts) {
state.addPartitionState(
part.id(),
part.dataStore().fullSize(),
part.updateCounter(),
(byte)part.state().ordinal()
);
}
synchronized (cpRec) {
cpRec.addCacheGroupState(grp.groupId(), state);
}
};
if (asyncRunner == null)
r.run();
else
try {
GridFutureAdapter<?> res = new GridFutureAdapter<>();
asyncRunner.execute(U.wrapIgniteFuture(r, res));
grpHandleFut.add(res);
}
catch (RejectedExecutionException e) {
assert false : "Task should never be rejected by async runner";
throw new IgniteException(e); //to protect from disabled asserts and call to failure handler
}
}
if (grpHandleFut != null) {
grpHandleFut.markInitialized();
grpHandleFut.get();
}
}
/**
* @return Last checkpoint time.
*/
private long updateLastCheckpointTime() {
long cpTs = System.currentTimeMillis();
// This can happen in an unlikely event of two checkpoints happening
// within a currentTimeMillis() granularity window.
if (cpTs == lastCpTs)
cpTs++;
lastCpTs = cpTs;
return cpTs;
}
/**
* Update current checkpoint progress by scheduled.
*
* @return Current checkpoint progress.
*/
@NotNull private CheckpointProgress updateCurrentCheckpointProgress() {
final CheckpointProgressImpl curr;
synchronized (this) {
curr = scheduledCp;
curr.transitTo(LOCK_TAKEN);
if (curr.reason == null)
curr.reason = "timeout";
// It is important that we assign a new progress object before checkpoint mark in page memory.
scheduledCp = new CheckpointProgressImpl(checkpointFreq);
curCpProgress = curr;
}
return curr;
}
/** */
private DbCheckpointListener.Context createOnCheckpointBeginContext(
DbCheckpointListener.Context delegate,
boolean hasPages
) {
return new DbCheckpointListener.Context() {
/** {@inheritDoc} */
@Override public boolean nextSnapshot() {
return delegate.nextSnapshot();
}
/** {@inheritDoc} */
@Override public PartitionAllocationMap partitionStatMap() {
return delegate.partitionStatMap();
}
/** {@inheritDoc} */
@Override public boolean needToSnapshot(String cacheOrGrpName) {
return delegate.needToSnapshot(cacheOrGrpName);
}
/** {@inheritDoc} */
@Override public @Nullable Executor executor() {
return delegate.executor();
}
/** {@inheritDoc} */
@Override public boolean hasPages() {
return hasPages;
}
};
}
/**
* Check that at least one collection is not empty.
*
* @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages.
*/
private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) {
boolean hasPages = false;
for (Collection c : cpPagesCollWrapper)
if (!c.isEmpty()) {
hasPages = true;
break;
}
return hasPages;
}
/**
* @return tuple with collections of FullPageIds obtained from each PageMemory, overall number of dirty
* pages, and flag defines at least one user page became a dirty since last checkpoint.
* @param allowToReplace The sign which allows to replace pages from a checkpoint by page replacer.
*/
private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints(
IgniteInternalFuture allowToReplace
) {
Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(dataRegions().size());
int pagesNum = 0;
for (DataRegion memPlc : dataRegions()) {
if (!memPlc.config().isPersistenceEnabled())
continue;
GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory())
.beginCheckpoint(allowToReplace);
pagesNum += nextCpPagesCol.size();
res.add(nextCpPagesCol);
}
currCheckpointPagesCnt = pagesNum;
return new IgniteBiTuple<>(res, pagesNum);
}
/**
* @param chp Checkpoint snapshot.
*/
private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
synchronized (this) {
writtenPagesCntr = null;
syncedPagesCntr = null;
evictedPagesCntr = null;
for (DataRegion memPlc : dataRegions()) {
if (!memPlc.config().isPersistenceEnabled())
continue;
((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
}
currCheckpointPagesCnt = 0;
}
if (chp.hasDelta()) {
CheckpointEntry cp = prepareCheckpointEntry(
tmpWriteBuf,
chp.cpEntry.timestamp(),
chp.cpEntry.checkpointId(),
chp.cpEntry.checkpointMark(),
null,
CheckpointEntryType.END);
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.END);
cctx.wal().notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
}
List<CheckpointEntry> removedFromHistory = cpHistory.onCheckpointFinished(chp, truncateWalOnCpFinish);
for (CheckpointEntry cp : removedFromHistory)
removeCheckpointFiles(cp);
if (chp.progress != null)
chp.progress.transitTo(FINISHED);
}
/** {@inheritDoc} */
@Override public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
isCancelled = true;
synchronized (this) {
notifyAll();
}
}
/**
*
*/
public void shutdownNow() {
shutdownNow = true;
if (!isCancelled)
cancel();
}
/**
* Context with information about current snapshots.
*/
private class DbCheckpointContextImpl implements DbCheckpointListener.Context {
/** Current checkpoint progress. */
private final CheckpointProgressImpl curr;
/** Partition map. */
private final PartitionAllocationMap map;
/** Pending tasks from executor. */
private GridCompoundFuture pendingTaskFuture;
/**
* @param curr Current checkpoint progress.
* @param map Partition map.
*/
private DbCheckpointContextImpl(CheckpointProgressImpl curr, PartitionAllocationMap map) {
this.curr = curr;
this.map = map;
this.pendingTaskFuture = asyncRunner == null ? null : new GridCompoundFuture();
}
/** {@inheritDoc} */
@Override public boolean nextSnapshot() {
return curr.nextSnapshot;
}
/** {@inheritDoc} */
@Override public PartitionAllocationMap partitionStatMap() {
return map;
}
/** {@inheritDoc} */
@Override public boolean needToSnapshot(String cacheOrGrpName) {
return curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
}
/** {@inheritDoc} */
@Override public Executor executor() {
return asyncRunner == null ? null : cmd -> {
try {
GridFutureAdapter<?> res = new GridFutureAdapter<>();
res.listen(fut -> updateHeartbeat());
asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
pendingTaskFuture.add(res);
}
catch (RejectedExecutionException e) {
assert false : "A task should never be rejected by async runner";
}
};
}
/** {@inheritDoc} */
@Override public boolean hasPages() {
throw new IllegalStateException(
"Property is unknown at this moment. You should use onCheckpointBegin() method."
);
}
/**
* Await all async tasks from executor was finished.
*
* @throws IgniteCheckedException if fail.
*/
public void awaitPendingTasksFinished() throws IgniteCheckedException {
GridCompoundFuture pendingFut = this.pendingTaskFuture;
this.pendingTaskFuture = new GridCompoundFuture();
if (pendingFut != null) {
pendingFut.markInitialized();
pendingFut.get();
}
}
}
}
/**
* Reorders list of checkpoint pages and splits them into needed number of sublists according to
* {@link DataStorageConfiguration#getCheckpointThreads()} and
* {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
*
* @param cpPagesTuple Checkpoint pages tuple.
* @param threads Checkpoint runner threads.
*/
private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple,
int threads
) throws IgniteCheckedException {
FullPageId[] pagesArr = new FullPageId[cpPagesTuple.get2()];
int realPagesArrSize = 0;
for (GridMultiCollectionWrapper<FullPageId> colWrapper : cpPagesTuple.get1()) {
for (int i = 0; i < colWrapper.collectionsSize(); i++)
for (FullPageId page : colWrapper.innerCollection(i)) {
if (realPagesArrSize == pagesArr.length)
throw new AssertionError("Incorrect estimated dirty pages number: " + pagesArr.length);
pagesArr[realPagesArrSize++] = page;
}
}
FullPageId fakeMaxFullPageId = new FullPageId(Long.MAX_VALUE, Integer.MAX_VALUE);
// Some pages may have been replaced, need to fill end of array with fake ones to prevent NPE during sort.
for (int i = realPagesArrSize; i < pagesArr.length; i++)
pagesArr[i] = fakeMaxFullPageId;
if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
Comparator<FullPageId> cmp = new Comparator<FullPageId>() {
@Override public int compare(FullPageId o1, FullPageId o2) {
int cmp = Long.compare(o1.groupId(), o2.groupId());
if (cmp != 0)
return cmp;
return Long.compare(o1.effectivePageId(), o2.effectivePageId());
}
};
if (pagesArr.length >= parallelSortThreshold)
parallelSortInIsolatedPool(pagesArr, cmp);
else
Arrays.sort(pagesArr, cmp);
}
int pagesSubLists = threads == 1 ? 1 : threads * 4;
// Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
Collection[] pagesSubListArr = new Collection[pagesSubLists];
for (int i = 0; i < pagesSubLists; i++) {
int from = (int)((long)realPagesArrSize * i / pagesSubLists);
int to = (int)((long)realPagesArrSize * (i + 1) / pagesSubLists);
pagesSubListArr[i] = new GridReadOnlyArrayView(pagesArr, from, to);
}
return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
}
/**
* Performs parallel sort in isolated fork join pool.
*
* @param pagesArr Pages array.
* @param cmp Cmp.
*/
private static void parallelSortInIsolatedPool(
FullPageId[] pagesArr,
Comparator<FullPageId> cmp
) throws IgniteCheckedException {
ForkJoinPool.ForkJoinWorkerThreadFactory factory = new ForkJoinPool.ForkJoinWorkerThreadFactory() {
@Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
worker.setName("checkpoint-pages-sorter-" + worker.getPoolIndex());
return worker;
}
};
ForkJoinPool forkJoinPool = new ForkJoinPool(PARALLEL_SORT_THREADS + 1, factory, null, false);
ForkJoinTask sortTask = forkJoinPool.submit(() -> Arrays.parallelSort(pagesArr, cmp));
try {
sortTask.get();
}
catch (InterruptedException e) {
throw new IgniteInterruptedCheckedException(e);
}
catch (ExecutionException e) {
throw new IgniteCheckedException("Failed to perform pages array parallel sort", e.getCause());
}
forkJoinPool.shutdown();
}
/** Pages write task */
private class WriteCheckpointPages implements Runnable {
/** */
private final CheckpointMetricsTracker tracker;
/** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection */
private final Collection<FullPageId> writePageIds;
/** */
private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
/** */
private final CountDownFuture doneFut;
/** Total pages to write, counter may be greater than {@link #writePageIds} size */
private final int totalPagesToWrite;
/** */
private final Runnable beforePageWrite;
/** If any pages were skipped, new task with remaining pages will be submitted here. */
private final ExecutorService retryWriteExecutor;
/**
* Creates task for write pages
*
* @param tracker
* @param writePageIds Collection of page IDs to write.
* @param updStores
* @param doneFut
* @param totalPagesToWrite total pages to be written under this checkpoint
* @param beforePageWrite Action to be performed before every page write.
* @param retryWriteExecutor Retry write executor.
*/
private WriteCheckpointPages(
final CheckpointMetricsTracker tracker,
final Collection<FullPageId> writePageIds,
final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
final CountDownFuture doneFut,
final int totalPagesToWrite,
final Runnable beforePageWrite,
final ExecutorService retryWriteExecutor
) {
this.tracker = tracker;
this.writePageIds = writePageIds;
this.updStores = updStores;
this.doneFut = doneFut;
this.totalPagesToWrite = totalPagesToWrite;
this.beforePageWrite = beforePageWrite;
this.retryWriteExecutor = retryWriteExecutor;
}
/** {@inheritDoc} */
@Override public void run() {
snapshotMgr.beforeCheckpointPageWritten();
Collection<FullPageId> writePageIds = this.writePageIds;
try {
List<FullPageId> pagesToRetry = writePages(writePageIds);
if (pagesToRetry.isEmpty())
doneFut.onDone();
else {
LT.warn(log, pagesToRetry.size() + " checkpoint pages were not written yet due to unsuccessful " +
"page write lock acquisition and will be retried");
while (!pagesToRetry.isEmpty())
pagesToRetry = writePages(pagesToRetry);
doneFut.onDone();
}
}
catch (Throwable e) {
doneFut.onDone(e);
}
}
/**
* @param writePageIds Collections of pages to write.
* @return pagesToRetry Pages which should be retried.
*/
private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
List<FullPageId> pagesToRetry = new ArrayList<>();
CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);
ByteBuffer tmpWriteBuf = threadBuf.get();
boolean throttlingEnabled = resolveThrottlingPolicy() != PageMemoryImpl.ThrottlingPolicy.DISABLED;
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
beforePageWrite.run();
int grpId = fullId.groupId();
PageMemoryEx pageMem;
// TODO IGNITE-7792 add generic mapping.
if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
pageMem = (PageMemoryEx)metaStorage.pageMemory();
else if (grpId == TxLog.TX_LOG_CACHE_ID)
pageMem = (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
else {
CacheGroupContext grp = context().cache().cacheGroup(grpId);
DataRegion region = grp != null ? grp.dataRegion() : null;
if (region == null || !region.config().isPersistenceEnabled())
continue;
pageMem = (PageMemoryEx)region.pageMemory();
}
snapshotMgr.beforePageWrite(fullId);
tmpWriteBuf.rewind();
pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
if (throttlingEnabled) {
while (pageMem.shouldThrottle()) {
FullPageId cpPageId = pageMem.pullPageFromCpBuffer();
if (cpPageId.equals(FullPageId.NULL_PAGE))
break;
snapshotMgr.beforePageWrite(cpPageId);
tmpWriteBuf.rewind();
pageMem.checkpointWritePage(cpPageId, tmpWriteBuf, pageStoreWriter, tracker);
}
}
}
return pagesToRetry;
}
/**
* Factory method for create {@link PageStoreWriter}.
*
* @param pagesToRetry List pages for retry.
* @return Checkpoint page write context.
*/
private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
return new PageStoreWriter() {
/** {@inheritDoc} */
@Override public void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException {
if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
pagesToRetry.add(fullPageId);
return;
}
int groupId = fullPageId.groupId();
long pageId = fullPageId.pageId();
assert getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
assert getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
if (persStoreMetrics.metricsEnabled()) {
int pageType = getType(buf);
if (PageIO.isDataPageType(pageType))
tracker.onDataPageWritten();
}
writtenPagesCntr.incrementAndGet();
PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
}
};
}
}
/**
*
*/
public static class Checkpoint {
/** Checkpoint entry. */
@Nullable private final CheckpointEntry cpEntry;
/** Checkpoint pages. */
private final GridMultiCollectionWrapper<FullPageId> cpPages;
/** */
private final CheckpointProgressImpl progress;
/** Number of deleted WAL files. */
private int walFilesDeleted;
/** WAL segments fully covered by this checkpoint. */
private IgniteBiTuple<Long, Long> walSegsCoveredRange;
/** */
private final int pagesSize;
/**
* @param cpEntry Checkpoint entry.
* @param cpPages Pages to write to the page store.
* @param progress Checkpoint progress status.
*/
private Checkpoint(
@Nullable CheckpointEntry cpEntry,
@NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
CheckpointProgressImpl progress
) {
this.cpEntry = cpEntry;
this.cpPages = cpPages;
this.progress = progress;
pagesSize = cpPages.size();
}
/**
* @return {@code true} if this checkpoint contains at least one dirty page.
*/
public boolean hasDelta() {
return pagesSize != 0;
}
/**
* @param walFilesDeleted Wal files deleted.
*/
public void walFilesDeleted(int walFilesDeleted) {
this.walFilesDeleted = walFilesDeleted;
}
/**
* @param walSegsCoveredRange WAL segments fully covered by this checkpoint.
*/
public void walSegsCoveredRange(final IgniteBiTuple<Long, Long> walSegsCoveredRange) {
this.walSegsCoveredRange = walSegsCoveredRange;
}
}
/**
*
*/
public static class CheckpointStatus {
/** Null checkpoint UUID. */
private static final UUID NULL_UUID = new UUID(0L, 0L);
/** Null WAL pointer. */
public static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
/** */
private long cpStartTs;
/** */
private UUID cpStartId;
/** */
@GridToStringInclude
private WALPointer startPtr;
/** */
private UUID cpEndId;
/** */
@GridToStringInclude
private WALPointer endPtr;
/**
* @param cpStartId Checkpoint start ID.
* @param startPtr Checkpoint start pointer.
* @param cpEndId Checkpoint end ID.
* @param endPtr Checkpoint end pointer.
*/
private CheckpointStatus(long cpStartTs, UUID cpStartId, WALPointer startPtr, UUID cpEndId, WALPointer endPtr) {
this.cpStartTs = cpStartTs;
this.cpStartId = cpStartId;
this.startPtr = startPtr;
this.cpEndId = cpEndId;
this.endPtr = endPtr;
}
/**
* @return {@code True} if need perform binary memory recovery. Only records {@link PageDeltaRecord}
* and {@link PageSnapshot} needs to be applyed from {@link #cpStartId}.
*/
public boolean needRestoreMemory() {
return !F.eq(cpStartId, cpEndId) && !F.eq(NULL_UUID, cpStartId);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CheckpointStatus.class, this);
}
}
/**
* Data class representing the state of running/scheduled checkpoint.
*/
public static class CheckpointProgressImpl implements CheckpointProgress {
/** Scheduled time of checkpoint. */
private volatile long nextCpNanos;
/** Current checkpoint state. */
private volatile AtomicReference<CheckpointState> state = new AtomicReference(CheckpointState.SCHEDULED);
/** Future which would be finished when corresponds state is set. */
private final Map<CheckpointState, GridFutureAdapter> stateFutures = new ConcurrentHashMap<>();
/** Cause of fail, which has happened during the checkpoint or null if checkpoint was successful. */
private volatile Throwable failCause;
/** Flag indicates that snapshot operation will be performed after checkpoint. */
private volatile boolean nextSnapshot;
/** Snapshot operation that should be performed if {@link #nextSnapshot} set to true. */
private volatile SnapshotOperation snapshotOperation;
/** Partitions destroy queue. */
private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue();
/** Wakeup reason. */
private String reason;
/**
* @param cpFreq Timeout until next checkpoint.
*/
private CheckpointProgressImpl(long cpFreq) {
this.nextCpNanos = System.nanoTime() + U.millisToNanos(cpFreq);
}
/**
* @return {@code true} If checkpoint already started but have not finished yet.
*/
@Override public boolean inProgress() {
return greaterOrEqualTo(LOCK_RELEASED) && !greaterOrEqualTo(FINISHED);
}
/**
* @param expectedState Expected state.
* @return {@code true} if current state equal to given state.
*/
public boolean greaterOrEqualTo(CheckpointState expectedState) {
return state.get().ordinal() >= expectedState.ordinal();
}
/**
* @param state State for which future should be returned.
* @return Existed or new future which corresponds to the given state.
*/
@Override public GridFutureAdapter futureFor(CheckpointState state) {
GridFutureAdapter stateFut = stateFutures.computeIfAbsent(state, (k) -> new GridFutureAdapter());
if (greaterOrEqualTo(state) && !stateFut.isDone())
stateFut.onDone(failCause);
return stateFut;
}
/**
* Mark this checkpoint execution as failed.
*
* @param error Causal error of fail.
*/
public void fail(Throwable error) {
failCause = error;
transitTo(FINISHED);
}
/**
* Changing checkpoint state if order of state is correct.
*
* @param newState New checkpoint state.
*/
public void transitTo(@NotNull CheckpointState newState) {
CheckpointState state = this.state.get();
if (state.ordinal() < newState.ordinal()) {
this.state.compareAndSet(state, newState);
doFinishFuturesWhichLessOrEqualTo(newState);
}
}
/**
* Finishing futures with correct result in direct state order until lastState(included).
*
* @param lastState State until which futures should be done.
*/
private void doFinishFuturesWhichLessOrEqualTo(@NotNull CheckpointState lastState) {
for (CheckpointState old : CheckpointState.values()) {
GridFutureAdapter fut = stateFutures.get(old);
if (fut != null && !fut.isDone())
fut.onDone(failCause);
if (old == lastState)
return;
}
}
}
/**
*
*/
public static class FileLockHolder implements AutoCloseable {
/** Lock file name. */
private static final String lockFileName = "lock";
/** File. */
private File file;
/** Channel. */
private RandomAccessFile lockFile;
/** Lock. */
private volatile FileLock lock;
/** Kernal context to generate Id of locked node in file. */
@NotNull private GridKernalContext ctx;
/** Logger. */
private IgniteLogger log;
/**
* @param path Path.
*/
public FileLockHolder(String path, @NotNull GridKernalContext ctx, IgniteLogger log) {
try {
file = Paths.get(path, lockFileName).toFile();
lockFile = new RandomAccessFile(file, "rw");
this.ctx = ctx;
this.log = log;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* @param lockWaitTimeMillis During which time thread will try capture file lock.
* @throws IgniteCheckedException If failed to capture file lock.
*/
public void tryLock(long lockWaitTimeMillis) throws IgniteCheckedException {
assert lockFile != null;
FileChannel ch = lockFile.getChannel();
SB sb = new SB();
//write node id
sb.a("[").a(ctx.localNodeId().toString()).a("]");
//write ip addresses
final GridDiscoveryManager discovery = ctx.discovery();
if (discovery != null) { //discovery may be not up and running
final ClusterNode node = discovery.localNode();
if (node != null)
sb.a(node.addresses());
}
//write ports
sb.a("[");
Iterator<GridPortRecord> it = ctx.ports().records().iterator();
while (it.hasNext()) {
GridPortRecord rec = it.next();
sb.a(rec.protocol()).a(":").a(rec.port());
if (it.hasNext())
sb.a(", ");
}
sb.a("]");
String failMsg;
try {
String content = null;
// Try to get lock, if not available wait 1 sec and re-try.
for (int i = 0; i < lockWaitTimeMillis; i += 1000) {
try {
lock = ch.tryLock(0, 1, false);
if (lock != null && lock.isValid()) {
writeContent(sb.toString());
return;
}
}
catch (OverlappingFileLockException ignore) {
if (content == null)
content = readContent();
log.warning("Failed to acquire file lock. Will try again in 1s " +
"[nodeId=" + ctx.localNodeId() + ", holder=" + content +
", path=" + file.getAbsolutePath() + ']');
}
U.sleep(1000);
}
if (content == null)
content = readContent();
failMsg = "Failed to acquire file lock [holder=" + content + ", time=" + (lockWaitTimeMillis / 1000) +
" sec, path=" + file.getAbsolutePath() + ']';
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
if (failMsg != null)
throw new IgniteCheckedException(failMsg);
}
/**
* Write node id (who captured lock) into lock file.
*
* @param content Node id.
* @throws IOException if some fail while write node it.
*/
private void writeContent(String content) throws IOException {
FileChannel ch = lockFile.getChannel();
byte[] bytes = content.getBytes();
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
ch.write(buf, 1);
ch.force(false);
}
/**
*
*/
private String readContent() throws IOException {
FileChannel ch = lockFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate((int)(ch.size() - 1));
ch.read(buf, 1);
String content = new String(buf.array());
buf.clear();
return content;
}
/** Locked or not. */
public boolean isLocked() {
return lock != null && lock.isValid();
}
/** Releases file lock */
public void release() {
U.releaseQuiet(lock);
}
/** Closes file channel */
@Override public void close() {
release();
U.closeQuiet(lockFile);
}
/**
* @return Absolute path to lock file.
*/
private String lockPath() {
return file.getAbsolutePath();
}
}
/** {@inheritDoc} */
@Override public DataStorageMetrics persistentStoreMetrics() {
return new DataStorageMetricsSnapshot(persStoreMetrics);
}
/**
*
*/
public DataStorageMetricsImpl persistentStoreMetricsImpl() {
return persStoreMetrics;
}
/** {@inheritDoc} */
@Override public MetaStorage metaStorage() {
return metaStorage;
}
/** {@inheritDoc} */
@Override public void notifyMetaStorageSubscribersOnReadyForRead() throws IgniteCheckedException {
metastorageLifecycleLsnrs = cctx.kernalContext().internalSubscriptionProcessor().getMetastorageSubscribers();
readMetastore();
}
/** {@inheritDoc} */
@Override public boolean walEnabled(int grpId, boolean local) {
if (local)
return !initiallyLocalWalDisabledGrps.contains(grpId);
else
return !initiallyGlobalWalDisabledGrps.contains(grpId);
}
/** {@inheritDoc} */
@Override public void walEnabled(int grpId, boolean enabled, boolean local) {
String key = walGroupIdToKey(grpId, local);
checkpointReadLock();
try {
if (enabled)
metaStorage.remove(key);
else {
metaStorage.write(key, true);
lastCheckpointInapplicableForWalRebalance(grpId);
}
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to write cache group WAL state [grpId=" + grpId +
", enabled=" + enabled + ']', e);
}
finally {
checkpointReadUnlock();
}
}
/**
* Checks that checkpoint with timestamp {@code cpTs} is inapplicable as start point for WAL rebalance for given group {@code grpId}.
*
* @param cpTs Checkpoint timestamp.
* @param grpId Group ID.
* @return {@code true} if checkpoint {@code cpTs} is inapplicable as start point for WAL rebalance for {@code grpId}.
* @throws IgniteCheckedException If failed to check.
*/
public boolean isCheckpointInapplicableForWalRebalance(Long cpTs, int grpId) throws IgniteCheckedException {
return metaStorage.read(checkpointInapplicableCpAndGroupIdToKey(cpTs, grpId)) != null;
}
/**
* Set last checkpoint as inapplicable for WAL rebalance for given group {@code grpId}.
*
* @param grpId Group ID.
*/
@Override public void lastCheckpointInapplicableForWalRebalance(int grpId) {
checkpointReadLock();
try {
CheckpointEntry lastCp = cpHistory.lastCheckpoint();
long lastCpTs = lastCp != null ? lastCp.timestamp() : 0;
if (lastCpTs != 0)
metaStorage.write(checkpointInapplicableCpAndGroupIdToKey(lastCpTs, grpId), true);
}
catch (IgniteCheckedException e) {
log.error("Failed to mark last checkpoint as inapplicable for WAL rebalance for group: " + grpId, e);
}
finally {
checkpointReadUnlock();
}
}
/**
*
*/
private void fillWalDisabledGroups() {
assert metaStorage != null;
try {
metaStorage.iterate(WAL_KEY_PREFIX, (key, val) -> {
T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key);
if (t2 != null) {
if (t2.get2())
initiallyLocalWalDisabledGrps.add(t2.get1());
else
initiallyGlobalWalDisabledGrps.add(t2.get1());
}
}, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to read cache groups WAL state.", e);
}
}
/**
* Convert cache group ID to WAL state key.
*
* @param grpId Group ID.
* @return Key.
*/
private static String walGroupIdToKey(int grpId, boolean local) {
if (local)
return WAL_LOCAL_KEY_PREFIX + grpId;
else
return WAL_GLOBAL_KEY_PREFIX + grpId;
}
/**
* Convert checkpoint timestamp and cache group ID to key for {@link #CHECKPOINT_INAPPLICABLE_FOR_REBALANCE} metastorage records.
*
* @param cpTs Checkpoint timestamp.
* @param grpId Group ID.
* @return Key.
*/
private static String checkpointInapplicableCpAndGroupIdToKey(long cpTs, int grpId) {
return CHECKPOINT_INAPPLICABLE_FOR_REBALANCE + cpTs + "-" + grpId;
}
/**
* Convert WAL state key to cache group ID.
*
* @param key Key.
* @return Group ID.
*/
private static T2<Integer, Boolean> walKeyToGroupIdAndLocalFlag(String key) {
if (key.startsWith(WAL_LOCAL_KEY_PREFIX))
return new T2<>(Integer.parseInt(key.substring(WAL_LOCAL_KEY_PREFIX.length())), true);
else if (key.startsWith(WAL_GLOBAL_KEY_PREFIX))
return new T2<>(Integer.parseInt(key.substring(WAL_GLOBAL_KEY_PREFIX.length())), false);
else
return null;
}
/**
* Method dumps partitions info see {@link #dumpPartitionsInfo(CacheGroupContext, IgniteLogger)}
* for all persistent cache groups.
*
* @param cctx Shared context.
* @param log Logger.
* @throws IgniteCheckedException If failed.
*/
private static void dumpPartitionsInfo(GridCacheSharedContext cctx, IgniteLogger log) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.persistenceEnabled())
continue;
dumpPartitionsInfo(grp, log);
}
}
/**
* Retrieves from page memory meta information about given {@code grp} group partitions
* and dumps this information to log INFO level.
*
* @param grp Cache group.
* @param log Logger.
* @throws IgniteCheckedException If failed.
*/
private static void dumpPartitionsInfo(CacheGroupContext grp, IgniteLogger log) throws IgniteCheckedException {
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
IgnitePageStoreManager pageStore = grp.shared().pageStore();
assert pageStore != null : "Persistent cache should have initialize page store manager.";
for (int p = 0; p < grp.affinity().partitions(); p++) {
GridDhtLocalPartition part = grp.topology().localPartition(p);
if (part != null) {
log.info("Partition [grp=" + grp.cacheOrGroupName()
+ ", id=" + p
+ ", state=" + part.state()
+ ", counter=" + part.dataStore().partUpdateCounter()
+ ", size=" + part.fullSize() + "]");
continue;
}
if (!pageStore.exists(grp.groupId(), p))
continue;
pageStore.ensure(grp.groupId(), p);
if (pageStore.pages(grp.groupId(), p) <= 1) {
log.info("Partition [grp=" + grp.cacheOrGroupName() + ", id=" + p + ", state=N/A (only file header) ]");
continue;
}
long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
try {
long pageAddr = pageMem.readLock(grp.groupId(), partMetaId, partMetaPage);
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
GridDhtPartitionState partState = fromOrdinal(io.getPartitionState(pageAddr));
String state = partState != null ? partState.toString() : "N/A";
long updateCntr = io.getUpdateCounter(pageAddr);
long size = io.getSize(pageAddr);
log.info("Partition [grp=" + grp.cacheOrGroupName()
+ ", id=" + p
+ ", state=" + state
+ ", counter=" + updateCntr
+ ", size=" + size + "]");
}
finally {
pageMem.readUnlock(grp.groupId(), partMetaId, partMetaPage);
}
}
finally {
pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
}
}
}
/**
* Recovery lifecycle for read-write metastorage.
*/
private class MetastorageRecoveryLifecycle implements DatabaseLifecycleListener {
/** {@inheritDoc} */
@Override public void beforeBinaryMemoryRestore(IgniteCacheDatabaseSharedManager mgr) throws IgniteCheckedException {
cctx.pageStore().initializeForMetastorage();
}
/** {@inheritDoc} */
@Override public void afterBinaryMemoryRestore(
IgniteCacheDatabaseSharedManager mgr,
RestoreBinaryState restoreState
) throws IgniteCheckedException {
assert metaStorage == null;
metaStorage = createMetastorage(false);
}
}
/**
* @return Cache group predicate that passes only Metastorage cache group id.
*/
private IgnitePredicate<Integer> onlyMetastorageGroup() {
return groupId -> MetaStorage.METASTORAGE_CACHE_ID == groupId;
}
/**
* @return Cache group predicate that passes only cache groups with enabled WAL.
*/
private IgnitePredicate<Integer> groupsWithEnabledWal() {
return groupId -> !initiallyGlobalWalDisabledGrps.contains(groupId)
&& !initiallyLocalWalDisabledGrps.contains(groupId);
}
/**
* @return WAL records predicate that passes only Metastorage and encryption data records.
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> onlyMetastorageAndEncryptionRecords() {
return (type, ptr) -> type == METASTORE_DATA_RECORD || type == MASTER_KEY_CHANGE_RECORD;
}
/**
* @return WAL records predicate that passes only physical and mixed WAL records.
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> physicalRecords() {
return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.PHYSICAL
|| type.purpose() == WALRecord.RecordPurpose.MIXED;
}
/**
* @return WAL records predicate that passes only logical and mixed WAL records +
* CP record (used for restoring initial partition states).
*/
private IgniteBiPredicate<WALRecord.RecordType, WALPointer> logicalRecords() {
return (type, ptr) -> type.purpose() == WALRecord.RecordPurpose.LOGICAL
|| type.purpose() == WALRecord.RecordPurpose.MIXED || type == CHECKPOINT_RECORD;
}
/**
* Abstract class to create restore context.
*/
private abstract class RestoreStateContext {
/** Last archived segment. */
protected final long lastArchivedSegment;
/** Checkpoint status. */
protected final CheckpointStatus status;
/** WAL iterator. */
private final WALIterator iterator;
/** Only {@link WalRecordCacheGroupAware} records satisfied this predicate will be applied. */
private final IgnitePredicate<Integer> cacheGroupPredicate;
/**
* @param status Checkpoint status.
* @param iterator WAL iterator.
* @param lastArchivedSegment Last archived segment index.
* @param cacheGroupPredicate Cache groups predicate.
*/
protected RestoreStateContext(
CheckpointStatus status,
WALIterator iterator,
long lastArchivedSegment,
IgnitePredicate<Integer> cacheGroupPredicate
) {
this.status = status;
this.iterator = iterator;
this.lastArchivedSegment = lastArchivedSegment;
this.cacheGroupPredicate = cacheGroupPredicate;
}
/**
* Advance iterator to the next record.
*
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
public WALRecord next() throws IgniteCheckedException {
try {
for (;;) {
if (!iterator.hasNextX())
return null;
IgniteBiTuple<WALPointer, WALRecord> tup = iterator.nextX();
if (tup == null)
return null;
WALRecord rec = tup.get2();
WALPointer ptr = tup.get1();
rec.position(ptr);
// Filter out records by group id.
if (rec instanceof WalRecordCacheGroupAware) {
WalRecordCacheGroupAware grpAwareRecord = (WalRecordCacheGroupAware) rec;
if (!cacheGroupPredicate.apply(grpAwareRecord.groupId()))
continue;
}
// Filter out data entries by group id.
if (rec instanceof DataRecord)
rec = filterEntriesByGroupId((DataRecord) rec);
return rec;
}
}
catch (IgniteCheckedException e) {
boolean throwsCRCError = throwsCRCError();
if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) {
if (throwsCRCError)
throw e;
else
return null;
}
log.error("There is an error during restore state [throwsCRCError=" + throwsCRCError + ']', e);
throw e;
}
}
/**
* Filter outs data entries from given data record that not satisfy {@link #cacheGroupPredicate}.
*
* @param record Original data record.
* @return Data record with filtered data entries.
*/
private DataRecord filterEntriesByGroupId(DataRecord record) {
List<DataEntry> filteredEntries = record.writeEntries().stream()
.filter(entry -> {
int cacheId = entry.cacheId();
return cctx.cacheContext(cacheId) != null && cacheGroupPredicate.apply(cctx.cacheContext(cacheId).groupId());
})
.collect(Collectors.toList());
return record.setWriteEntries(filteredEntries);
}
/**
*
* @return Last read WAL record pointer.
*/
public FileWALPointer lastReadRecordPointer() {
assert status.startPtr != null && status.startPtr instanceof FileWALPointer;
return iterator.lastRead()
.map(ptr -> (FileWALPointer)ptr)
.orElseGet(() -> (FileWALPointer)status.startPtr);
}
/**
*
* @return Flag indicates need throws CRC exception or not.
*/
public boolean throwsCRCError() {
return lastReadRecordPointer().index() <= lastArchivedSegment;
}
}
/**
* Restore memory context. Tracks the safety of binary recovery.
*/
public class RestoreBinaryState extends RestoreStateContext {
/** The flag indicates need to apply the binary update or no needed. */
private boolean needApplyBinaryUpdates;
/**
* @param status Checkpoint status.
* @param iterator WAL iterator.
* @param lastArchivedSegment Last archived segment index.
* @param cacheGroupsPredicate Cache groups predicate.
*/
public RestoreBinaryState(
CheckpointStatus status,
WALIterator iterator,
long lastArchivedSegment,
IgnitePredicate<Integer> cacheGroupsPredicate
) {
super(status, iterator, lastArchivedSegment, cacheGroupsPredicate);
this.needApplyBinaryUpdates = status.needRestoreMemory();
}
/**
* Advance iterator to the next record.
*
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
@Override public WALRecord next() throws IgniteCheckedException {
WALRecord rec = super.next();
if (rec == null)
return null;
if (rec.type() == CHECKPOINT_RECORD) {
CheckpointRecord cpRec = (CheckpointRecord)rec;
// We roll memory up until we find a checkpoint start record registered in the status.
if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() +
", pos=" + rec.position() + ']');
needApplyBinaryUpdates = false;
}
else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() +
", expCpId=" + status.cpStartId + ", pos=" + rec.position() + ']');
}
return rec;
}
/**
*
* @return Flag indicates need apply binary record or not.
*/
public boolean needApplyBinaryUpdate() {
return needApplyBinaryUpdates;
}
/**
*
* @return Flag indicates need throws CRC exception or not.
*/
@Override public boolean throwsCRCError() {
log.info("Throws CRC error check [needApplyBinaryUpdates=" + needApplyBinaryUpdates +
", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" + lastReadRecordPointer() + ']');
if (needApplyBinaryUpdates)
return true;
return super.throwsCRCError();
}
}
/**
* Restore logical state context. Tracks the safety of logical recovery.
*/
public class RestoreLogicalState extends RestoreStateContext {
/** States of partitions recovered during applying logical updates. */
private final Map<GroupPartitionId, Integer> partitionRecoveryStates;
/**
* @param lastArchivedSegment Last archived segment index.
* @param partitionRecoveryStates Initial partition recovery states.
*/
public RestoreLogicalState(CheckpointStatus status, WALIterator iterator, long lastArchivedSegment,
IgnitePredicate<Integer> cacheGroupsPredicate, Map<GroupPartitionId, Integer> partitionRecoveryStates) {
super(status, iterator, lastArchivedSegment, cacheGroupsPredicate);
this.partitionRecoveryStates = partitionRecoveryStates;
}
/**
* @return Map of restored partition states for cache groups.
*/
public Map<GroupPartitionId, Integer> partitionRecoveryStates() {
return Collections.unmodifiableMap(partitionRecoveryStates);
}
}
/** Indicates checkpoint read lock acquisition failure which did not lead to node invalidation. */
private static class CheckpointReadLockTimeoutException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;
/** */
private CheckpointReadLockTimeoutException(String msg) {
super(msg);
}
}
}