blob: 3baef3732f1b1004b5ba551ae93f1282aca8baae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.DirectoryStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import javax.management.ObjectName;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.mem.DirectMemoryRegion;
import org.apache.ignite.internal.mem.file.MappedFileMemoryProvider;
import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.MetastoreDataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MvccDataEntry;
import org.apache.ignite.internal.pagemem.wal.record.MvccTxRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxLog;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntry;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointEntryType;
import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointHistory;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIOFactory;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.IgniteDataIntegrityViolationException;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.apache.ignite.transactions.TransactionState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_CRITICAL_OPERATION_TIMEOUT;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
/**
*
*/
@SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"})
public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager implements CheckpointWriteProgressSupplier {
/** */
public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
/** MemoryPolicyConfiguration name reserved for meta store. */
public static final String METASTORE_DATA_REGION_NAME = "metastoreMemPlc";
/** Skip sync. */
private final boolean skipSync = IgniteSystemProperties.getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
/** */
private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
/** */
private final int walRebalanceThreshold = IgniteSystemProperties.getInteger(
IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
/** Value of property for throttling policy override. */
private final String throttlingPolicyOverride = IgniteSystemProperties.getString(
IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED);
/** Checkpoint lock hold count. */
private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
return 0;
}
};
/** Assertion enabled. */
private static final boolean ASSERTION_ENABLED = GridCacheDatabaseSharedManager.class.desiredAssertionStatus();
/** Checkpoint file name pattern. */
public static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
/** Node started file suffix. */
public static final String NODE_STARTED_FILE_NAME_SUFFIX = "-node-started.bin";
/** */
private static final String MBEAN_NAME = "DataStorageMetrics";
/** */
private static final String MBEAN_GROUP = "Persistent Store";
/** WAL marker prefix for meta store. */
private static final String WAL_KEY_PREFIX = "grp-wal-";
/** Prefix for meta store records which means that WAL was disabled globally for some group. */
private static final String WAL_GLOBAL_KEY_PREFIX = WAL_KEY_PREFIX + "disabled-";
/** Prefix for meta store records which means that WAL was disabled locally for some group. */
private static final String WAL_LOCAL_KEY_PREFIX = WAL_KEY_PREFIX + "local-disabled-";
/** Prefix for meta store records which means that checkpoint entry for some group is not applicable for WAL rebalance. */
private static final String CHECKPOINT_INAPPLICABLE_FOR_REBALANCE = "cp-wal-rebalance-inapplicable-";
/** WAL marker predicate for meta store. */
private static final IgnitePredicate<String> WAL_KEY_PREFIX_PRED = new IgnitePredicate<String>() {
@Override public boolean apply(String key) {
return key.startsWith(WAL_KEY_PREFIX);
}
};
/** Timeout between partition file destroy and checkpoint to handle it. */
private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
/** */
private static final String CHECKPOINT_RUNNER_THREAD_PREFIX = "checkpoint-runner";
/** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
private volatile Checkpointer checkpointer;
/** For testing only. */
private volatile boolean checkpointsEnabled = true;
/** For testing only. */
private volatile GridFutureAdapter<Void> enableChangeApplied;
/** */
ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
/** */
private long checkpointFreq;
/** */
private CheckpointHistory cpHistory;
/** */
private FilePageStoreManager storeMgr;
/** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end */
private File cpDir;
/** */
private volatile boolean printCheckpointStats = true;
/** Database configuration. */
private final DataStorageConfiguration persistenceCfg;
/** */
private final Collection<DbCheckpointListener> lsnrs = new CopyOnWriteArrayList<>();
/** */
private boolean stopping;
/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
@Nullable private IgniteThreadPoolExecutor asyncRunner;
/** Thread local with buffers for the checkpoint threads. Each buffer represent one page for durable memory. */
private ThreadLocal<ByteBuffer> threadBuf;
/** Map from a cacheId to a future indicating that there is an in-progress index rebuild for the given cache. */
private final ConcurrentMap<Integer, GridFutureAdapter<Void>> idxRebuildFuts = new ConcurrentHashMap<>();
/**
* Lock holder for compatible folders mode. Null if lock holder was created at start node. <br>
* In this case lock is held on PDS resover manager and it is not required to manage locking here
*/
@Nullable private FileLockHolder fileLockHolder;
/** Lock wait time. */
private final long lockWaitTime;
/** */
private final boolean truncateWalOnCpFinish;
/** */
private Map</*grpId*/Integer, Map</*partId*/Integer, T2</*updCntr*/Long, WALPointer>>> reservedForExchange;
/** */
private final ConcurrentMap<T2</*grpId*/Integer, /*partId*/Integer>, T2</*updCntr*/Long, WALPointer>> reservedForPreloading = new ConcurrentHashMap<>();
/** Snapshot manager. */
private IgniteCacheSnapshotManager snapshotMgr;
/** */
private DataStorageMetricsImpl persStoreMetrics;
/** */
private ObjectName persistenceMetricsMbeanName;
/** Counter for written checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger writtenPagesCntr = null;
/** Counter for fsynced checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger syncedPagesCntr = null;
/** Counter for evicted checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger evictedPagesCntr = null;
/** Number of pages in current checkpoint at the beginning of checkpoint. */
private volatile int currCheckpointPagesCnt;
/** */
private MetaStorage metaStorage;
/** */
private List<MetastorageLifecycleListener> metastorageLifecycleLsnrs;
/** Initially disabled cache groups. */
private Collection<Integer> initiallyGlobalWalDisabledGrps = new HashSet<>();
/** Initially local wal disabled groups. */
private Collection<Integer> initiallyLocalWalDisabledGrps = new HashSet<>();
/** File I/O factory for writing checkpoint markers. */
private final FileIOFactory ioFactory;
/** Timeout for checkpoint read lock acquisition in milliseconds. */
private volatile long checkpointReadLockTimeout;
/**
* @param ctx Kernal context.
*/
public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
IgniteConfiguration cfg = ctx.config();
persistenceCfg = cfg.getDataStorageConfiguration();
assert persistenceCfg != null;
checkpointFreq = persistenceCfg.getCheckpointFrequency();
truncateWalOnCpFinish = persistenceCfg.isWalHistorySizeParameterUsed()
? persistenceCfg.getWalHistorySize() != Integer.MAX_VALUE
: persistenceCfg.getMaxWalArchiveSize() != Long.MAX_VALUE;
lockWaitTime = persistenceCfg.getLockWaitTime();
persStoreMetrics = new DataStorageMetricsImpl(
persistenceCfg.isMetricsEnabled(),
persistenceCfg.getMetricsRateTimeInterval(),
persistenceCfg.getMetricsSubIntervalCount()
);
ioFactory = persistenceCfg.getFileIOFactory();
Long cfgCheckpointReadLockTimeout = ctx.config().getDataStorageConfiguration() != null
? ctx.config().getDataStorageConfiguration().getCheckpointReadLockTimeout()
: null;
checkpointReadLockTimeout = IgniteSystemProperties.getLong(IGNITE_CHECKPOINT_READ_LOCK_TIMEOUT,
cfgCheckpointReadLockTimeout != null
? cfgCheckpointReadLockTimeout
: (ctx.workersRegistry() != null
? ctx.workersRegistry().getSystemWorkerBlockedTimeout()
: ctx.config().getFailureDetectionTimeout()));
}
/** */
private void notifyMetastorageReadyForRead() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
lsnr.onReadyForRead(metaStorage);
}
/** */
private void notifyMetastorageReadyForReadWrite() throws IgniteCheckedException {
for (MetastorageLifecycleListener lsnr : metastorageLifecycleLsnrs)
lsnr.onReadyForReadWrite(metaStorage);
}
/**
*
*/
public Checkpointer getCheckpointer() {
return checkpointer;
}
/**
* For test use only.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
enableChangeApplied = fut;
checkpointsEnabled = enable;
wakeupForCheckpoint("enableCheckpoints()");
return fut;
}
/** {@inheritDoc} */
@Override protected void initDataRegions0(DataStorageConfiguration memCfg) throws IgniteCheckedException {
super.initDataRegions0(memCfg);
addDataRegion(
memCfg,
createDataRegionConfiguration(memCfg),
false
);
persStoreMetrics.regionMetrics(memMetricsMap.values());
}
/**
* @param storageCfg Data storage configuration.
* @return Data region configuration.
*/
private DataRegionConfiguration createDataRegionConfiguration(DataStorageConfiguration storageCfg) {
DataRegionConfiguration cfg = new DataRegionConfiguration();
cfg.setName(METASTORE_DATA_REGION_NAME);
cfg.setInitialSize(storageCfg.getSystemRegionInitialSize());
cfg.setMaxSize(storageCfg.getSystemRegionMaxSize());
cfg.setPersistenceEnabled(true);
return cfg;
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
threadBuf = new ThreadLocal<ByteBuffer>() {
/** {@inheritDoc} */
@Override protected ByteBuffer initialValue() {
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
return tmpWriteBuf;
}
};
snapshotMgr = cctx.snapshot();
final GridKernalContext kernalCtx = cctx.kernalContext();
if (!kernalCtx.clientNode()) {
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
cpHistory = new CheckpointHistory(kernalCtx);
IgnitePageStoreManager store = cctx.pageStore();
assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
storeMgr = (FilePageStoreManager)store;
cpDir = Paths.get(storeMgr.workDir().getAbsolutePath(), "cp").toFile();
if (!U.mkdirs(cpDir))
throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
.resolveFolders()
.getLockedFileLockHolder();
fileLockHolder = preLocked == null ?
new FileLockHolder(storeMgr.workDir().getPath(), kernalCtx, log) : preLocked;
if (log.isDebugEnabled())
log.debug("Try to capture file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
if (!fileLockHolder.isLocked())
fileLockHolder.tryLock(lockWaitTime);
cleanupTempCheckpointDirectory();
persStoreMetrics.wal(cctx.wal());
}
}
/**
* Cleanup checkpoint directory from all temporary files.
*/
@Override public void cleanupTempCheckpointDirectory() throws IgniteCheckedException {
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(
cpDir.toPath(),
path -> path.endsWith(FilePageStoreManager.TMP_SUFFIX))
) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup checkpoint directory from temporary files: " + cpDir, e);
}
}
/**
* Cleanup checkpoint directory.
*/
@Override public void cleanupCheckpointDirectory() throws IgniteCheckedException {
try {
try (DirectoryStream<Path> files = Files.newDirectoryStream(cpDir.toPath())) {
for (Path path : files)
Files.delete(path);
}
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to cleanup checkpoint directory: " + cpDir, e);
}
}
/**
* Retreives checkpoint history form specified {@code dir}.
*
* @return List of checkpoints.
*/
private List<CheckpointEntry> retreiveHistory() throws IgniteCheckedException {
if (!cpDir.exists())
return Collections.emptyList();
try (DirectoryStream<Path> cpFiles = Files.newDirectoryStream(
cpDir.toPath(),
path -> CP_FILE_NAME_PATTERN.matcher(path.toFile().getName()).matches())
) {
List<CheckpointEntry> checkpoints = new ArrayList<>();
ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
for (Path cpFile : cpFiles) {
CheckpointEntry cp = parseFromFile(buf, cpFile.toFile());
if (cp != null)
checkpoints.add(cp);
}
return checkpoints;
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to load checkpoint history.", e);
}
}
/**
* Parses checkpoint entry from given file.
*
* @param buf Temporary byte buffer.
* @param file Checkpoint file.
*/
@Nullable private CheckpointEntry parseFromFile(ByteBuffer buf, File file) throws IgniteCheckedException {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (!matcher.matches())
return null;
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type != CheckpointEntryType.START)
return null;
long cpTs = Long.parseLong(matcher.group(1));
UUID cpId = UUID.fromString(matcher.group(2));
WALPointer ptr = readPointer(file, buf);
return createCheckPointEntry(cpTs, ptr, cpId, null, CheckpointEntryType.START);
}
/**
* Removes checkpoint start/end files belongs to given {@code cpEntry}.
*
* @param cpEntry Checkpoint entry.
*
* @throws IgniteCheckedException If failed to delete.
*/
private void removeCheckpointFiles(CheckpointEntry cpEntry) throws IgniteCheckedException {
Path startFile = new File(cpDir.getAbsolutePath(), checkpointFileName(cpEntry, CheckpointEntryType.START)).toPath();
Path endFile = new File(cpDir.getAbsolutePath(), checkpointFileName(cpEntry, CheckpointEntryType.END)).toPath();
try {
if (Files.exists(startFile))
Files.delete(startFile);
if (Files.exists(endFile))
Files.delete(endFile);
}
catch (IOException e) {
throw new StorageException("Failed to delete stale checkpoint files: " + cpEntry, e);
}
}
/** */
private void readMetastore() throws IgniteCheckedException {
try {
DataStorageConfiguration memCfg = cctx.kernalContext().config().getDataStorageConfiguration();
DataRegionConfiguration plcCfg = createDataRegionConfiguration(memCfg);
File allocPath = buildAllocPath(plcCfg);
DirectMemoryProvider memProvider = allocPath == null ?
new UnsafeMemoryProvider(log) :
new MappedFileMemoryProvider(
log,
allocPath);
DataRegionMetricsImpl memMetrics = new DataRegionMetricsImpl(plcCfg);
PageMemoryEx storePageMem = (PageMemoryEx)createPageMemory(memProvider, memCfg, plcCfg, memMetrics, false);
DataRegion regCfg = new DataRegion(storePageMem, plcCfg, memMetrics, createPageEvictionTracker(plcCfg, storePageMem));
CheckpointStatus status = readCheckpointStatus();
cctx.pageStore().initializeForMetastorage();
storePageMem.start();
checkpointReadLock();
try {
restoreMemory(status, true, storePageMem);
metaStorage = new MetaStorage(cctx, regCfg, memMetrics, true);
metaStorage.init(this);
applyLastUpdates(status, true);
fillWalDisabledGroups();
notifyMetastorageReadyForRead();
}
finally {
checkpointReadUnlock();
}
metaStorage = null;
storePageMem.stop(true);
}
catch (StorageException e) {
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Activate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
snapshotMgr = cctx.snapshot();
if (!cctx.localNode().isClient()) {
initDataBase();
registrateMetricsMBean();
}
if (checkpointer == null)
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
super.onActivate(ctx);
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
onKernalStop0(false);
super.onDeActivate(kctx);
/* Must be here, because after deactivate we can invoke activate and file lock must be already configured */
stopping = false;
}
/**
*
*/
private void initDataBase() {
if (persistenceCfg.getCheckpointThreads() > 1)
asyncRunner = new IgniteThreadPoolExecutor(
CHECKPOINT_RUNNER_THREAD_PREFIX,
cctx.igniteInstanceName(),
persistenceCfg.getCheckpointThreads(),
persistenceCfg.getCheckpointThreads(),
30_000,
new LinkedBlockingQueue<Runnable>()
);
}
/**
* Try to register Metrics MBean.
*
* @throws IgniteCheckedException If failed.
*/
private void registrateMetricsMBean() throws IgniteCheckedException {
if (U.IGNITE_MBEANS_DISABLED)
return;
try {
persistenceMetricsMbeanName = U.registerMBean(
cctx.kernalContext().config().getMBeanServer(),
cctx.kernalContext().igniteInstanceName(),
MBEAN_GROUP,
MBEAN_NAME,
persStoreMetrics,
DataStorageMetricsMXBean.class);
}
catch (Throwable e) {
throw new IgniteCheckedException("Failed to register " + MBEAN_NAME + " MBean.", e);
}
}
/**
* Unregister metrics MBean.
*/
private void unRegistrateMetricsMBean() {
if (persistenceMetricsMbeanName == null)
return;
assert !U.IGNITE_MBEANS_DISABLED;
try {
cctx.kernalContext().config().getMBeanServer().unregisterMBean(persistenceMetricsMbeanName);
persistenceMetricsMbeanName = null;
}
catch (Throwable e) {
U.error(log, "Failed to unregister " + MBEAN_NAME + " MBean.", e);
}
}
/** {@inheritDoc} */
@Override protected IgniteOutClosure<Long> freeSpaceProvider(final DataRegionConfiguration dataRegCfg) {
if (!dataRegCfg.isPersistenceEnabled())
return super.freeSpaceProvider(dataRegCfg);
final String dataRegName = dataRegCfg.getName();
return new IgniteOutClosure<Long>() {
@Override public Long apply() {
long freeSpace = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
freeSpace += ((GridCacheOffheapManager)grpCtx.offheap()).freeSpace();
}
return freeSpace;
}
};
}
/** {@inheritDoc} */
@Override public void readCheckpointAndRestoreMemory(
List<DynamicCacheDescriptor> cachesToStart
) throws IgniteCheckedException {
assert !cctx.localNode().isClient();
long time = System.currentTimeMillis();
checkpointReadLock();
try {
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.beforeMemoryRestore(this);
if (!F.isEmpty(cachesToStart)) {
for (DynamicCacheDescriptor desc : cachesToStart) {
if (CU.affinityNode(cctx.localNode(), desc.cacheConfiguration().getNodeFilter()))
storeMgr.initializeForCache(desc.groupDescriptor(), new StoredCacheData(desc.cacheConfiguration()));
}
}
CheckpointStatus status = readCheckpointStatus();
cctx.pageStore().initializeForMetastorage();
metaStorage = new MetaStorage(
cctx,
dataRegionMap.get(METASTORE_DATA_REGION_NAME),
(DataRegionMetricsImpl)memMetricsMap.get(METASTORE_DATA_REGION_NAME)
);
WALPointer restore = restoreMemory(status);
if (restore == null && !status.endPtr.equals(CheckpointStatus.NULL_PTR)) {
throw new StorageException("Restore wal pointer = " + restore + ", while status.endPtr = " +
status.endPtr + ". Can't restore memory - critical part of WAL archive is missing.");
}
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
cctx.wal().resumeLogging(restore);
WALPointer ptr = cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
if (ptr != null) {
cctx.wal().flush(ptr, true);
nodeStart(ptr);
}
metaStorage.init(this);
notifyMetastorageReadyForReadWrite();
for (DatabaseLifecycleListener lsnr : getDatabaseListeners(cctx.kernalContext()))
lsnr.afterMemoryRestore(this);
}
catch (IgniteCheckedException e) {
if (X.hasCause(e, StorageException.class, IOException.class))
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw e;
}
finally {
checkpointReadUnlock();
if (log.isInfoEnabled())
log.info("Binary recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
}
}
/**
* Creates file with current timestamp and specific "node-started.bin" suffix
* and writes into memory recovery pointer.
*
* @param ptr Memory recovery wal pointer.
*/
private void nodeStart(WALPointer ptr) throws IgniteCheckedException {
FileWALPointer p = (FileWALPointer)ptr;
String fileName = U.currentTimeMillis() + NODE_STARTED_FILE_NAME_SUFFIX;
String tmpFileName = fileName + FilePageStoreManager.TMP_SUFFIX;
ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
try {
try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), tmpFileName).toFile(),
StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
buf.putLong(p.index());
buf.putInt(p.fileOffset());
buf.putInt(p.length());
buf.flip();
io.writeFully(buf);
buf.clear();
io.force(true);
}
Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
}
catch (IOException e) {
throw new StorageException("Failed to write node start marker: " + ptr, e);
}
}
/**
* Collects memory recovery pointers from node started files. See {@link #nodeStart(WALPointer)}.
* Each pointer associated with timestamp extracted from file.
* Tuples are sorted by timestamp.
*
* @return Sorted list of tuples (node started timestamp, memory recovery pointer).
*
* @throws IgniteCheckedException If failed.
*/
public List<T2<Long, WALPointer>> nodeStartedPointers() throws IgniteCheckedException {
List<T2<Long, WALPointer>> res = new ArrayList<>();
try (DirectoryStream<Path> nodeStartedFiles = Files.newDirectoryStream(
cpDir.toPath(),
path -> path.toFile().getName().endsWith(NODE_STARTED_FILE_NAME_SUFFIX))
) {
ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
for (Path path : nodeStartedFiles) {
File f = path.toFile();
String name = f.getName();
Long ts = Long.valueOf(name.substring(0, name.length() - NODE_STARTED_FILE_NAME_SUFFIX.length()));
try (FileIO io = ioFactory.create(f, READ)) {
io.readFully(buf);
buf.flip();
FileWALPointer ptr = new FileWALPointer(
buf.getLong(), buf.getInt(), buf.getInt());
res.add(new T2<>(ts, ptr));
buf.clear();
}
catch (IOException e) {
throw new StorageException("Failed to read node started marker file: " + f.getAbsolutePath(), e);
}
}
}
catch (IOException e) {
throw new StorageException("Failed to retreive node started files.", e);
}
// Sort start markers by file timestamp.
res.sort(Comparator.comparingLong(IgniteBiTuple::get1));
return res;
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
checkpointLock.writeLock().lock();
try {
stopping = true;
}
finally {
checkpointLock.writeLock().unlock();
}
shutdownCheckpointer(cancel);
lsnrs.clear();
super.onKernalStop0(cancel);
unRegistrateMetricsMBean();
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
super.stop0(cancel);
if (!cctx.kernalContext().clientNode()) {
if (fileLockHolder != null) {
if (log.isDebugEnabled())
log.debug("Release file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.close();
}
}
}
/** */
private long[] calculateFragmentSizes(int concLvl, long cacheSize, long chpBufSize) {
if (concLvl < 2)
concLvl = Runtime.getRuntime().availableProcessors();
long fragmentSize = cacheSize / concLvl;
if (fragmentSize < 1024 * 1024)
fragmentSize = 1024 * 1024;
long[] sizes = new long[concLvl + 1];
for (int i = 0; i < concLvl; i++)
sizes[i] = fragmentSize;
sizes[concLvl] = chpBufSize;
return sizes;
}
/** {@inheritDoc} */
@Override protected PageMemory createPageMemory(
DirectMemoryProvider memProvider,
DataStorageConfiguration memCfg,
DataRegionConfiguration plcCfg,
DataRegionMetricsImpl memMetrics,
final boolean trackable
) {
if (!plcCfg.isPersistenceEnabled())
return super.createPageMemory(memProvider, memCfg, plcCfg, memMetrics, trackable);
memMetrics.persistenceEnabled(true);
long cacheSize = plcCfg.getMaxSize();
// Checkpoint buffer size can not be greater than cache size, it does not make sense.
long chpBufSize = checkpointBufferSize(plcCfg);
if (chpBufSize > cacheSize) {
U.quietAndInfo(log,
"Configured checkpoint page buffer size is too big, setting to the max region size [size="
+ U.readableSize(cacheSize, false) + ", memPlc=" + plcCfg.getName() + ']');
chpBufSize = cacheSize;
}
GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker;
if (trackable)
changeTracker = new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(
Long page,
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
if (trackable)
snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
}
};
else
changeTracker = null;
PageMemoryImpl pageMem = new PageMemoryImpl(
wrapMetricsMemoryProvider(memProvider, memMetrics),
calculateFragmentSizes(
memCfg.getConcurrencyLevel(),
cacheSize,
chpBufSize
),
cctx,
memCfg.getPageSize(),
(fullId, pageBuf, tag) -> {
memMetrics.onPageWritten();
// First of all, write page to disk.
storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, tag);
pageBuf.rewind();
// Only after write we can write page into snapshot.
snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
AtomicInteger cntr = evictedPagesCntr;
if (cntr != null)
cntr.incrementAndGet();
},
changeTracker,
this,
memMetrics,
resolveThrottlingPolicy(),
this
);
memMetrics.pageMemory(pageMem);
return pageMem;
}
/**
* @param memoryProvider0 Memory provider.
* @param memMetrics Memory metrics.
* @return Wrapped memory provider.
*/
@Override protected DirectMemoryProvider wrapMetricsMemoryProvider(
final DirectMemoryProvider memoryProvider0,
final DataRegionMetricsImpl memMetrics
) {
return new DirectMemoryProvider() {
private AtomicInteger checkPointBufferIdxCnt = new AtomicInteger();
private final DirectMemoryProvider memProvider = memoryProvider0;
@Override public void initialize(long[] chunkSizes) {
memProvider.initialize(chunkSizes);
checkPointBufferIdxCnt.set(chunkSizes.length);
}
@Override public void shutdown(boolean deallocate) {
memProvider.shutdown(deallocate);
}
@Override public DirectMemoryRegion nextRegion() {
DirectMemoryRegion nextMemoryRegion = memProvider.nextRegion();
if (nextMemoryRegion == null)
return null;
int idx = checkPointBufferIdxCnt.decrementAndGet();
long chunkSize = nextMemoryRegion.size();
// Checkpoint chunk last in the long[] chunkSizes.
if (idx != 0)
memMetrics.updateOffHeapSize(chunkSize);
else
memMetrics.updateCheckpointBufferSize(chunkSize);
return nextMemoryRegion;
}
};
}
/**
* Resolves throttling policy according to the settings.
*/
@NotNull private PageMemoryImpl.ThrottlingPolicy resolveThrottlingPolicy() {
PageMemoryImpl.ThrottlingPolicy plc = persistenceCfg.isWriteThrottlingEnabled()
? PageMemoryImpl.ThrottlingPolicy.SPEED_BASED
: PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY;
if (throttlingPolicyOverride != null) {
try {
plc = PageMemoryImpl.ThrottlingPolicy.valueOf(throttlingPolicyOverride.toUpperCase());
}
catch (IllegalArgumentException e) {
log.error("Incorrect value of IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED property: " +
throttlingPolicyOverride + ". Default throttling policy " + plc + " will be used.");
}
}
return plc;
}
/** {@inheritDoc} */
@Override protected void checkRegionEvictionProperties(DataRegionConfiguration regCfg, DataStorageConfiguration dbCfg)
throws IgniteCheckedException {
if (!regCfg.isPersistenceEnabled())
super.checkRegionEvictionProperties(regCfg, dbCfg);
else if (regCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED) {
U.warn(log, "Page eviction mode set for [" + regCfg.getName() + "] data will have no effect" +
" because the oldest pages are evicted automatically if Ignite persistence is enabled.");
}
}
/** {@inheritDoc} */
@Override protected void checkPageSize(DataStorageConfiguration memCfg) {
if (memCfg.getPageSize() == 0) {
try {
assert cctx.pageStore() instanceof FilePageStoreManager :
"Invalid page store manager was created: " + cctx.pageStore();
Path anyIdxPartFile = IgniteUtils.searchFileRecursively(
((FilePageStoreManager)cctx.pageStore()).workDir().toPath(), FilePageStoreManager.INDEX_FILE_NAME);
if (anyIdxPartFile != null) {
memCfg.setPageSize(resolvePageSizeFromPartitionFile(anyIdxPartFile));
return;
}
}
catch (IgniteCheckedException | IOException | IllegalArgumentException e) {
U.quietAndWarn(log, "Attempt to resolve pageSize from store files failed: " + e.getMessage());
U.quietAndWarn(log, "Default page size will be used: " + DataStorageConfiguration.DFLT_PAGE_SIZE + " bytes");
}
memCfg.setPageSize(DataStorageConfiguration.DFLT_PAGE_SIZE);
}
}
/**
* @param partFile Partition file.
*/
private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException {
try (FileIO fileIO = ioFactory.create(partFile.toFile())) {
int minimalHdr = FilePageStore.HEADER_SIZE;
if (fileIO.size() < minimalHdr)
throw new IgniteCheckedException("Partition file is too small: " + partFile);
ByteBuffer hdr = ByteBuffer.allocate(minimalHdr).order(ByteOrder.LITTLE_ENDIAN);
fileIO.readFully(hdr);
hdr.rewind();
hdr.getLong(); // Read signature.
hdr.getInt(); // Read version.
hdr.get(); // Read type.
int pageSize = hdr.getInt();
if (pageSize == 2048) {
U.quietAndWarn(log, "You are currently using persistent store with 2K pages (DataStorageConfiguration#" +
"pageSize). If you use SSD disk, consider migrating to 4K pages for better IO performance.");
}
return pageSize;
}
}
/**
* @param cancel Cancel flag.
*/
@SuppressWarnings("unused")
private void shutdownCheckpointer(boolean cancel) {
Checkpointer cp = checkpointer;
if (cp != null) {
if (cancel)
cp.shutdownNow();
else
cp.cancel();
try {
U.join(cp);
checkpointer = null;
}
catch (IgniteInterruptedCheckedException ignore) {
U.warn(log, "Was interrupted while waiting for checkpointer shutdown, " +
"will not wait for checkpoint to finish.");
cp.shutdownNow();
while (true) {
try {
U.join(cp);
checkpointer = null;
cp.scheduledCp.cpFinishFut.onDone(
new NodeStoppingException("Checkpointer is stopped during node stop."));
break;
}
catch (IgniteInterruptedCheckedException ignored) {
//Ignore
}
}
Thread.currentThread().interrupt();
}
}
if (asyncRunner != null) {
asyncRunner.shutdownNow();
try {
asyncRunner.awaitTermination(2, TimeUnit.MINUTES);
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
/** {@inheritDoc} */
@Override public boolean beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
DiscoveryEvent discoEvt = fut.firstEvent();
boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
boolean locNode = discoEvt.eventNode().isLocal();
boolean isSrvNode = !cctx.kernalContext().clientNode();
boolean clusterInTransitionStateToActive = fut.activateCluster();
boolean restored = false;
long time = System.currentTimeMillis();
// In case of cluster activation or local join restore, restore whole manager state.
if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode)) {
restoreState();
restored = true;
}
// In case of starting groups, restore partition states only for these groups.
else if (fut.exchangeActions() != null && !F.isEmpty(fut.exchangeActions().cacheGroupsToStart())) {
Set<Integer> restoreGroups = fut.exchangeActions().cacheGroupsToStart().stream()
.map(actionData -> actionData.descriptor().groupId())
.collect(Collectors.toSet());
restorePartitionStates(Collections.emptyMap(), restoreGroups);
}
if (cctx.kernalContext().query().moduleEnabled()) {
ExchangeActions acts = fut.exchangeActions();
if (acts != null) {
if (!F.isEmpty(acts.cacheStartRequests())) {
for (ExchangeActions.CacheActionData actionData : acts.cacheStartRequests())
prepareIndexRebuildFuture(CU.cacheId(actionData.request().cacheName()));
}
else if (acts.localJoinContext() != null && !F.isEmpty(acts.localJoinContext().caches())) {
for (T2<DynamicCacheDescriptor, NearCacheConfiguration> tup : acts.localJoinContext().caches())
prepareIndexRebuildFuture(tup.get1().cacheId());
}
}
}
if (log.isInfoEnabled())
log.info("Logical recovery performed in " + (System.currentTimeMillis() - time) + " ms.");
return restored;
}
/**
* Creates a new index rebuild future that should be completed later after exchange is done. The future
* has to be created before exchange is initialized to guarantee that we will capture a correct future
* after activation or restore completes.
* If there was an old future for the given ID, it will be completed.
*
* @param cacheId Cache ID.
*/
private void prepareIndexRebuildFuture(int cacheId) {
GridFutureAdapter<Void> old = idxRebuildFuts.put(cacheId, new GridFutureAdapter<>());
if (old != null)
old.onDone();
}
/** {@inheritDoc} */
@Override public void rebuildIndexesIfNeeded(GridDhtPartitionsExchangeFuture fut) {
if (cctx.kernalContext().query().moduleEnabled()) {
for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
if (cacheCtx.startTopologyVersion().equals(fut.initialVersion())) {
final int cacheId = cacheCtx.cacheId();
final GridFutureAdapter<Void> usrFut = idxRebuildFuts.get(cacheId);
if (!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()
&& cacheCtx.group().persistenceEnabled()) {
IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
.rebuildIndexesFromHash(Collections.singleton(cacheCtx.cacheId()));
assert usrFut != null : "Missing user future for cache: " + cacheCtx.name();
rebuildFut.listen(new CI1<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture igniteInternalFut) {
idxRebuildFuts.remove(cacheId, usrFut);
usrFut.onDone(igniteInternalFut.error());
CacheConfiguration ccfg = cacheCtx.config();
if (ccfg != null) {
log().info("Finished indexes rebuilding for cache [name=" + ccfg.getName()
+ ", grpName=" + ccfg.getGroupName() + ']');
}
}
});
}
else {
if (usrFut != null) {
idxRebuildFuts.remove(cacheId, usrFut);
usrFut.onDone();
}
}
}
}
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
return idxRebuildFuts.get(cacheId);
}
/** {@inheritDoc} */
@Override public void onCacheGroupsStopped(
Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps
) {
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext gctx = tup.get1();
if (!gctx.persistenceEnabled())
continue;
snapshotMgr.onCacheGroupStop(gctx);
PageMemoryEx pageMem = (PageMemoryEx)gctx.dataRegion().pageMemory();
Collection<Integer> grpIds = destroyed.computeIfAbsent(pageMem, k -> new HashSet<>());
grpIds.add(tup.get1().groupId());
pageMem.onCacheGroupDestroyed(tup.get1().groupId());
if (tup.get2())
cctx.kernalContext().encryption().onCacheGroupDestroyed(gctx.groupId());
}
Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
final Collection<Integer> grpIds = entry.getValue();
clearFuts.add(entry.getKey().clearAsync((grpId, pageIdg) -> grpIds.contains(grpId), false));
}
for (IgniteInternalFuture<Void> clearFut : clearFuts) {
try {
clearFut.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to clear page memory", e);
}
}
if (cctx.pageStore() != null) {
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext grp = tup.get1();
if (grp.affinityNode()) {
try {
cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
}
}
/**
* Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory
* state.
* @throws IgniteException If failed.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void checkpointReadLock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
long timeout = checkpointReadLockTimeout;
long start = U.currentTimeMillis();
boolean interruped = false;
try {
for (; ; ) {
try {
if (timeout > 0 && (U.currentTimeMillis() - start) >= timeout)
failCheckpointReadLock();
try {
if (timeout > 0) {
if (!checkpointLock.readLock().tryLock(timeout - (U.currentTimeMillis() - start),
TimeUnit.MILLISECONDS))
failCheckpointReadLock();
}
else
checkpointLock.readLock().lock();
}
catch (InterruptedException e) {
interruped = true;
continue;
}
if (stopping) {
checkpointLock.readLock().unlock();
throw new IgniteException(new NodeStoppingException("Failed to perform cache update: node is stopping."));
}
if (checkpointLock.getReadHoldCount() > 1 || safeToUpdatePageMemories())
break;
else {
checkpointLock.readLock().unlock();
if (timeout > 0 && U.currentTimeMillis() - start >= timeout)
failCheckpointReadLock();
try {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut
.getUninterruptibly();
}
catch (IgniteFutureTimeoutCheckedException e) {
failCheckpointReadLock();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e);
}
}
}
catch (CheckpointReadLockTimeoutException e) {
log.error(e.getMessage(), e);
timeout = 0;
}
}
}
finally {
if (interruped)
Thread.currentThread().interrupt();
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
}
/**
* Invokes critical failure processing. Always throws.
*
* @throws CheckpointReadLockTimeoutException If node was not invalidated as result of handling.
* @throws IgniteException If node was invalidated as result of handling.
*/
private void failCheckpointReadLock() throws CheckpointReadLockTimeoutException, IgniteException {
String msg = "Checkpoint read lock acquisition has been timed out.";
IgniteException e = new IgniteException(msg);
if (cctx.kernalContext().failure().process(new FailureContext(SYSTEM_CRITICAL_OPERATION_TIMEOUT, e)))
throw e;
throw new CheckpointReadLockTimeoutException(msg);
}
/** {@inheritDoc} */
@Override public boolean checkpointLockIsHeldByThread() {
return !ASSERTION_ENABLED ||
checkpointLock.isWriteLockedByCurrentThread() ||
CHECKPOINT_LOCK_HOLD_COUNT.get() > 0 ||
Thread.currentThread().getName().startsWith(CHECKPOINT_RUNNER_THREAD_PREFIX);
}
/**
* @return {@code true} if all PageMemory instances are safe to update.
*/
private boolean safeToUpdatePageMemories() {
Collection<DataRegion> memPlcs = context().database().dataRegions();
if (memPlcs == null)
return true;
for (DataRegion memPlc : memPlcs) {
if (!memPlc.config().isPersistenceEnabled())
continue;
PageMemoryEx pageMemEx = (PageMemoryEx)memPlc.pageMemory();
if (!pageMemEx.safeToUpdate())
return false;
}
return true;
}
/**
* Releases the checkpoint read lock.
*/
@Override public void checkpointReadUnlock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
checkpointLock.readLock().unlock();
if (checkpointer != null) {
Collection<DataRegion> dataRegs = context().database().dataRegions();
if (dataRegs != null) {
for (DataRegion dataReg : dataRegs) {
if (!dataReg.config().isPersistenceEnabled())
continue;
PageMemoryEx mem = (PageMemoryEx)dataReg.pageMemory();
if (mem != null && !mem.safeToUpdate()) {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages");
break;
}
}
}
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
/**
* Restores from last checkpoint and applies WAL changes since this checkpoint.
*
* @throws IgniteCheckedException If failed to restore database status from WAL.
*/
private void restoreState() throws IgniteCheckedException {
try {
CheckpointStatus status = readCheckpointStatus();
checkpointReadLock();
try {
applyLastUpdates(status, false);
}
finally {
checkpointReadUnlock();
}
snapshotMgr.restoreState();
}
catch (StorageException e) {
throw new IgniteCheckedException(e);
}
}
/**
* Called when all partitions have been fully restored and pre-created on node start.
*
* Starts checkpointing process and initiates first checkpoint.
*
* @throws IgniteCheckedException If first checkpoint has failed.
*/
@Override public void onStateRestored() throws IgniteCheckedException {
long time = System.currentTimeMillis();
new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
CheckpointProgressSnapshot chp = checkpointer.wakeupForCheckpoint(0, "node started");
if (chp != null)
chp.cpBeginFut.get();
if (log.isInfoEnabled())
log.info("Checkpointer initilialzation performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/** {@inheritDoc} */
@Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
assert reservedForExchange == null : reservedForExchange;
reservedForExchange = new HashMap<>();
Map</*grpId*/Integer, Set</*partId*/Integer>> applicableGroupsAndPartitions = partitionsApplicableForWalRebalance();
Map</*grpId*/Integer, Map</*partId*/Integer, CheckpointEntry>> earliestValidCheckpoints;
checkpointReadLock();
try {
earliestValidCheckpoints = cpHistory.searchAndReserveCheckpoints(applicableGroupsAndPartitions);
}
finally {
checkpointReadUnlock();
}
Map</*grpId*/Integer, Map</*partId*/Integer, /*updCntr*/Long>> grpPartsWithCnts = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, CheckpointEntry>> e : earliestValidCheckpoints.entrySet()) {
int grpId = e.getKey();
for (Map.Entry<Integer, CheckpointEntry> e0 : e.getValue().entrySet()) {
CheckpointEntry cpEntry = e0.getValue();
int partId = e0.getKey();
assert cctx.wal().reserved(cpEntry.checkpointMark())
: "WAL segment for checkpoint " + cpEntry + " has not reserved";
Long updCntr = cpEntry.partitionCounter(cctx, grpId, partId);
if (updCntr != null) {
reservedForExchange.computeIfAbsent(grpId, k -> new HashMap<>())
.put(partId, new T2<>(updCntr, cpEntry.checkpointMark()));
grpPartsWithCnts.computeIfAbsent(grpId, k -> new HashMap<>()).put(partId, updCntr);
}
}
}
return grpPartsWithCnts;
}
/**
* @return Map of group id -> Set of partitions which can be used as suppliers for WAL rebalance.
*/
private Map<Integer, Set<Integer>> partitionsApplicableForWalRebalance() {
Map<Integer, Set<Integer>> res = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
for (GridDhtLocalPartition locPart : grp.topology().currentLocalPartitions()) {
if (locPart.state() == GridDhtPartitionState.OWNING && locPart.fullSize() > walRebalanceThreshold)
res.computeIfAbsent(grp.groupId(), k -> new HashSet<>()).add(locPart.id());
}
}
return res;
}
/** {@inheritDoc} */
@Override public synchronized void releaseHistoryForExchange() {
if (reservedForExchange == null)
return;
FileWALPointer earliestPtr = null;
for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet()) {
FileWALPointer ptr = (FileWALPointer) e0.getValue().get2();
if (earliestPtr == null || ptr.index() < earliestPtr.index())
earliestPtr = ptr;
}
}
reservedForExchange = null;
if (earliestPtr == null)
return;
assert cctx.wal().reserved(earliestPtr)
: "Earliest checkpoint WAL pointer is not reserved for exchange: " + earliestPtr;
try {
cctx.wal().release(earliestPtr);
}
catch (IgniteCheckedException e) {
log.error("Failed to release earliest checkpoint WAL pointer: " + earliestPtr, e);
}
}
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
CheckpointEntry cpEntry = cpHistory.searchCheckpointEntry(grpId, partId, cntr);
if (cpEntry == null)
return false;
WALPointer ptr = cpEntry.checkpointMark();
if (ptr == null)
return false;
boolean reserved;
try {
reserved = cctx.wal().reserve(ptr);
}
catch (IgniteCheckedException e) {
U.error(log, "Error while trying to reserve history", e);
reserved = false;
}
if (reserved)
reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr));
return reserved;
}
/** {@inheritDoc} */
@Override public void releaseHistoryForPreloading() {
for (Map.Entry<T2<Integer, Integer>, T2<Long, WALPointer>> e : reservedForPreloading.entrySet()) {
try {
cctx.wal().release(e.getValue().get2());
}
catch (IgniteCheckedException ex) {
U.error(log, "Could not release WAL reservation", ex);
throw new IgniteException(ex);
}
}
reservedForPreloading.clear();
}
/**
*
*/
@Nullable @Override public IgniteInternalFuture wakeupForCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp != null)
return cp.wakeupForCheckpoint(0, reason).cpBeginFut;
return null;
}
/** {@inheritDoc} */
@Override public void waitForCheckpoint(String reason) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp == null)
return;
CheckpointProgressSnapshot progSnapshot = cp.wakeupForCheckpoint(0, reason);
IgniteInternalFuture fut1 = progSnapshot.cpFinishFut;
fut1.get();
if (!progSnapshot.started)
return;
IgniteInternalFuture fut2 = cp.wakeupForCheckpoint(0, reason).cpFinishFut;
assert fut1 != fut2;
fut2.get();
}
/** {@inheritDoc} */
@Override public CheckpointFuture forceCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp == null)
return null;
return cp.wakeupForCheckpoint(0, reason);
}
/** {@inheritDoc} */
@Override public WALPointer lastCheckpointMarkWalPointer() {
CheckpointEntry lastCheckpointEntry = cpHistory == null ? null : cpHistory.lastCheckpoint();
return lastCheckpointEntry == null ? null : lastCheckpointEntry.checkpointMark();
}
/**
* @return Checkpoint directory.
*/
public File checkpointDirectory() {
return cpDir;
}
/**
* @param lsnr Listener.
*/
public void addCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.add(lsnr);
}
/**
* @param lsnr Listener.
*/
public void removeCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.remove(lsnr);
}
/**
* @return Read checkpoint status.
* @throws IgniteCheckedException If failed to read checkpoint status page.
*/
@SuppressWarnings("TooBroadScope")
private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
long lastStartTs = 0;
long lastEndTs = 0;
UUID startId = CheckpointStatus.NULL_UUID;
UUID endId = CheckpointStatus.NULL_UUID;
File startFile = null;
File endFile = null;
WALPointer startPtr = CheckpointStatus.NULL_PTR;
WALPointer endPtr = CheckpointStatus.NULL_PTR;
File dir = cpDir;
if (!dir.exists()) {
log.warning("Read checkpoint status: checkpoint directory is not found.");
return new CheckpointStatus(0, startId, startPtr, endId, endPtr);
}
File[] files = dir.listFiles();
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
long ts = Long.parseLong(matcher.group(1));
UUID id = UUID.fromString(matcher.group(2));
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START && ts > lastStartTs) {
lastStartTs = ts;
startId = id;
startFile = file;
}
else if (type == CheckpointEntryType.END && ts > lastEndTs) {
lastEndTs = ts;
endId = id;
endFile = file;
}
}
}
ByteBuffer buf = ByteBuffer.allocate(FileWALPointer.POINTER_SIZE);
buf.order(ByteOrder.nativeOrder());
if (startFile != null)
startPtr = readPointer(startFile, buf);
if (endFile != null)
endPtr = readPointer(endFile, buf);
if (log.isInfoEnabled())
log.info("Read checkpoint status [startMarker=" + startFile + ", endMarker=" + endFile + ']');
return new CheckpointStatus(lastStartTs, startId, startPtr, endId, endPtr);
}
/**
* Loads WAL pointer from CP file
*
* @param cpMarkerFile Checkpoint mark file.
* @return WAL pointer.
* @throws IgniteCheckedException If failed to read mark file.
*/
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
try (FileIO io = ioFactory.create(cpMarkerFile, READ)) {
io.readFully(buf);
buf.flip();
return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
}
catch (IOException e) {
throw new IgniteCheckedException(
"Failed to read checkpoint pointer from marker file: " + cpMarkerFile.getAbsolutePath(), e);
}
}
/**
* @param status Checkpoint status.
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
@Nullable private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException {
return restoreMemory(status, false, (PageMemoryEx)metaStorage.pageMemory());
}
/**
* @param status Checkpoint status.
* @param metastoreOnly If {@code True} restores Metastorage only.
* @param storePageMem Metastore page memory.
* @throws IgniteCheckedException If failed.
* @throws StorageException In case I/O error occurred during operations with storage.
*/
@Nullable private WALPointer restoreMemory(
CheckpointStatus status,
boolean metastoreOnly,
PageMemoryEx storePageMem
) throws IgniteCheckedException {
assert !metastoreOnly || storePageMem != null;
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
boolean apply = status.needRestoreMemory();
if (apply) {
U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
"finish checkpoint on node start.");
cctx.pageStore().beginRecover();
}
else
cctx.wal().notchLastCheckpointPtr(status.startPtr);
long start = U.currentTimeMillis();
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
RestoreBinaryState restoreBinaryState = new RestoreBinaryState(status, lastArchivedSegment, log);
Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() :
F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps);
int applied = 0;
try (WALIterator it = cctx.wal().replay(status.endPtr)) {
while (it.hasNextX()) {
WALRecord rec = restoreBinaryState.next(it);
if (rec == null)
break;
switch (rec.type()) {
case PAGE_RECORD:
if (restoreBinaryState.needApplyBinaryUpdate()) {
PageSnapshot pageRec = (PageSnapshot)rec;
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
int grpId = pageRec.fullPageId().groupId();
if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
continue;
if (!ignoreGrps.contains(grpId)) {
long pageId = pageRec.fullPageId().pageId();
PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
long page = pageMem.acquirePage(grpId, pageId, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page);
try {
PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
applied++;
}
}
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
{
int grpId = metaStateRecord.groupId();
if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
continue;
if (ignoreGrps.contains(grpId))
continue;
int partId = metaStateRecord.partitionId();
GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(metaStateRecord.state());
if (state == null || state == GridDhtPartitionState.EVICTED)
schedulePartitionDestroy(grpId, partId);
else
cancelOrWaitPartitionDestroy(grpId, partId);
}
break;
case PARTITION_DESTROY:
PartitionDestroyRecord destroyRecord = (PartitionDestroyRecord)rec;
{
int grpId = destroyRecord.groupId();
if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
continue;
if (ignoreGrps.contains(grpId))
continue;
PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
pageMem.invalidate(grpId, destroyRecord.partitionId());
schedulePartitionDestroy(grpId, destroyRecord.partitionId());
}
break;
default:
if (restoreBinaryState.needApplyBinaryUpdate() && rec instanceof PageDeltaRecord) {
PageDeltaRecord r = (PageDeltaRecord)rec;
int grpId = r.groupId();
if (metastoreOnly && grpId != METASTORAGE_CACHE_ID)
continue;
if (!ignoreGrps.contains(grpId)) {
long pageId = r.pageId();
PageMemoryEx pageMem = grpId == METASTORAGE_CACHE_ID ? storePageMem : getPageMemoryForCacheGroup(grpId);
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
long page = pageMem.acquirePage(grpId, pageId, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page);
try {
r.applyDelta(pageMem, pageAddr);
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
applied++;
}
}
}
}
}
if (metastoreOnly)
return null;
WALPointer lastReadPtr = restoreBinaryState.lastReadRecordPointer();
if (status.needRestoreMemory()) {
if (restoreBinaryState.needApplyBinaryUpdate())
throw new StorageException("Failed to restore memory state (checkpoint marker is present " +
"on disk, but checkpoint record is missed in WAL) " +
"[cpStatus=" + status + ", lastRead=" + lastReadPtr + "]");
log.info("Finished applying memory changes [changesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + "ms]");
if (applied > 0)
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr);
}
cpHistory.initialize(retreiveHistory());
return lastReadPtr == null ? null : lastReadPtr.next();
}
/**
* Obtains PageMemory reference from cache descriptor instead of cache context.
*
* @param grpId Cache group id.
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
// TODO IGNITE-7792 add generic mapping.
if (grpId == TxLog.TX_LOG_CACHE_ID)
return (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
// TODO IGNITE-5075: cache descriptor can be removed.
GridCacheSharedContext sharedCtx = context();
CacheGroupDescriptor desc = sharedCtx.cache().cacheGroupDescriptors().get(grpId);
if (desc == null)
throw new IgniteCheckedException("Failed to find cache group descriptor [grpId=" + grpId + ']');
String memPlcName = desc.config().getDataRegionName();
return (PageMemoryEx)sharedCtx.database().dataRegion(memPlcName).pageMemory();
}
/**
* Apply update from some iterator and with specific filters.
*
* @param it WalIterator.
* @param recPredicate Wal record filter.
* @param entryPredicate Entry filter.
* @param partStates Partition to restore state.
*/
public void applyUpdatesOnRecovery(
@Nullable WALIterator it,
IgnitePredicate<IgniteBiTuple<WALPointer, WALRecord>> recPredicate,
IgnitePredicate<DataEntry> entryPredicate,
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
) throws IgniteCheckedException {
cctx.walState().runWithOutWAL(() -> {
if (it != null) {
while (it.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> next = it.next();
WALRecord rec = next.get2();
if (!recPredicate.apply(next))
break;
switch (rec.type()) {
case MVCC_DATA_RECORD:
case DATA_RECORD:
checkpointReadLock();
try {
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
if (entryPredicate.apply(dataEntry)) {
checkpointReadLock();
try {
int cacheId = dataEntry.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
if (cacheCtx != null)
applyUpdate(cacheCtx, dataEntry);
else if (log != null)
log.warning("Cache (cacheId=" + cacheId + ") is not started, can't apply updates.");
}
finally {
checkpointReadUnlock();
}
}
}
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
checkpointReadUnlock();
}
break;
case MVCC_TX_RECORD:
checkpointReadLock();
try {
MvccTxRecord txRecord = (MvccTxRecord)rec;
byte txState = convertToTxState(txRecord.state());
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
checkpointReadUnlock();
}
break;
default:
// Skip other records.
}
}
}
checkpointReadLock();
try {
restorePartitionStates(partStates, null);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
checkpointReadUnlock();
}
});
}
/**
* @param status Last registered checkpoint status.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
private void applyLastUpdates(CheckpointStatus status, boolean metastoreOnly) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
if (!metastoreOnly)
cctx.kernalContext().query().skipFieldLookup(true);
long lastArchivedSegment = cctx.wal().lastArchivedSegment();
RestoreLogicalState restoreLogicalState = new RestoreLogicalState(lastArchivedSegment, log);
long start = U.currentTimeMillis();
int applied = 0;
Collection<Integer> ignoreGrps = metastoreOnly ? Collections.emptySet() :
F.concat(false, initiallyGlobalWalDisabledGrps, initiallyLocalWalDisabledGrps);
try (WALIterator it = cctx.wal().replay(status.startPtr)) {
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>();
while (it.hasNextX()) {
WALRecord rec = restoreLogicalState.next(it);
if (rec == null)
break;
switch (rec.type()) {
case MVCC_DATA_RECORD:
case DATA_RECORD:
if (metastoreOnly)
continue;
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
int cacheId = dataEntry.cacheId();
int grpId = cctx.cache().cacheDescriptor(cacheId).groupId();
if (!ignoreGrps.contains(grpId)) {
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
applyUpdate(cacheCtx, dataEntry);
applied++;
}
}
break;
case PART_META_UPDATE_STATE:
if (metastoreOnly)
continue;
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
if (!ignoreGrps.contains(metaStateRecord.groupId())) {
partStates.put(new T2<>(metaStateRecord.groupId(), metaStateRecord.partitionId()),
new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
}
break;
case METASTORE_DATA_RECORD:
MetastoreDataRecord metastoreDataRecord = (MetastoreDataRecord)rec;
metaStorage.applyUpdate(metastoreDataRecord.key(), metastoreDataRecord.value());
break;
case META_PAGE_UPDATE_NEXT_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_SNAPSHOT_ID:
case META_PAGE_UPDATE_LAST_SUCCESSFUL_FULL_SNAPSHOT_ID:
if (metastoreOnly)
continue;
PageDeltaRecord rec0 = (PageDeltaRecord) rec;
PageMemoryEx pageMem = getPageMemoryForCacheGroup(rec0.groupId());
long page = pageMem.acquirePage(rec0.groupId(), rec0.pageId(), true);
try {
long addr = pageMem.writeLock(rec0.groupId(), rec0.pageId(), page, true);
try {
rec0.applyDelta(pageMem, addr);
}
finally {
pageMem.writeUnlock(rec0.groupId(), rec0.pageId(), page, null, true, true);
}
}
finally {
pageMem.releasePage(rec0.groupId(), rec0.pageId(), page);
}
break;
case MVCC_TX_RECORD:
if (metastoreOnly)
continue;
MvccTxRecord txRecord = (MvccTxRecord)rec;
byte txState = convertToTxState(txRecord.state());
cctx.coordinators().updateState(txRecord.mvccVersion(), txState, false);
break;
default:
// Skip other records.
}
}
if (!metastoreOnly) {
long startRestorePart = U.currentTimeMillis();
if (log.isInfoEnabled())
log.info("Restoring partition state for local groups [cntPartStateWal="
+ partStates.size() + ", lastCheckpointId=" + status.cpStartId + ']');
long proc = restorePartitionStates(partStates, null);
if (log.isInfoEnabled())
log.info("Finished restoring partition state for local groups [cntProcessed=" + proc +
", cntPartStateWal=" + partStates.size() +
", time=" + (U.currentTimeMillis() - startRestorePart) + "ms]");
}
}
finally {
if (!metastoreOnly)
cctx.kernalContext().query().skipFieldLookup(false);
}
if (log.isInfoEnabled())
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + "ms]");
}
/**
* Initializes not empty partitions and restores their state from page memory or WAL.
* Partition states presented in page memory may be overriden by states restored from WAL {@code partStates}.
*
* @param partStates Partition states restored from WAL.
* @param onlyForGroups If not {@code null} restore states only for specified cache groups.
* @return cntParts Count of partitions processed.
* @throws IgniteCheckedException If failed to restore partition states.
*/
private long restorePartitionStates(
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates,
@Nullable Set<Integer> onlyForGroups
) throws IgniteCheckedException {
long cntParts = 0;
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.affinityNode()) {
// Local cache has no partitions and its states.
continue;
}
if (!grp.dataRegion().config().isPersistenceEnabled())
continue;
if (onlyForGroups != null && !onlyForGroups.contains(grp.groupId()))
continue;
int grpId = grp.groupId();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
for (int i = 0; i < grp.affinity().partitions(); i++) {
T2<Integer, Long> restore = partStates.get(new T2<>(grpId, i));
if (storeMgr.exists(grpId, i)) {
storeMgr.ensure(grpId, i);
if (storeMgr.pages(grpId, i) <= 1)
continue;
if (log.isDebugEnabled())
log.debug("Creating partition on recovery (exists in page store) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + i + "]");
GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
assert part != null;
// TODO: https://issues.apache.org/jira/browse/IGNITE-6097
grp.offheap().onPartitionInitialCounterUpdated(i, 0);
checkpointReadLock();
try {
long partMetaId = pageMem.partitionMetaPageId(grpId, i);
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
boolean changed = false;
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
if (restore != null) {
int stateId = restore.get1();
io.setPartitionState(pageAddr, (byte)stateId);
changed = updateState(part, stateId);
if (stateId == GridDhtPartitionState.OWNING.ordinal()
|| (stateId == GridDhtPartitionState.MOVING.ordinal()
&& part.initialUpdateCounter() < restore.get2())) {
part.initialUpdateCounter(restore.get2());
changed = true;
}
if (log.isDebugEnabled())
log.debug("Restored partition state (from WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() +
"updCntr=" + part.initialUpdateCounter() + "]");
}
else {
changed = updateState(part, (int) io.getPartitionState(pageAddr));
if (log.isDebugEnabled())
log.debug("Restored partition state (from page memory) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() +
"updCntr=" + part.initialUpdateCounter() + "]");
}
}
finally {
pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
}
}
finally {
pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
finally {
checkpointReadUnlock();
}
}
else if (restore != null) {
if (log.isDebugEnabled())
log.debug("Creating partition on recovery (exists in WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + i + "]");
GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
assert part != null;
// TODO: https://issues.apache.org/jira/browse/IGNITE-6097
grp.offheap().onPartitionInitialCounterUpdated(i, 0);
updateState(part, restore.get1());
if (log.isDebugEnabled())
log.debug("Restored partition state (from WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + i + ", state=" + part.state() +
"updCntr=" + part.initialUpdateCounter() + "]");
}
cntParts++;
}
// After partition states are restored, it is necessary to update internal data structures in topology.
grp.topology().afterStateRestored(grp.topology().lastTopologyChangeVersion());
}
return cntParts;
}
/**
* Convert {@link TransactionState} to Mvcc {@link TxState}.
*
* @param state TransactionState.
* @return TxState.
*/
private byte convertToTxState(TransactionState state) {
switch (state) {
case PREPARED:
return TxState.PREPARED;
case COMMITTED:
return TxState.COMMITTED;
case ROLLED_BACK:
return TxState.ABORTED;
default:
throw new IllegalStateException("Unsupported TxState.");
}
}
/**
* Wal truncate callBack.
*
* @param highBound WALPointer.
*/
public void onWalTruncated(WALPointer highBound) throws IgniteCheckedException {
List<CheckpointEntry> removedFromHistory = cpHistory.onWalTruncated(highBound);
for (CheckpointEntry cp : removedFromHistory)
removeCheckpointFiles(cp);
}
/**
* @param part Partition to restore state for.
* @param stateId State enum ordinal.
* @return Updated flag.
*/
private boolean updateState(GridDhtLocalPartition part, int stateId) {
if (stateId != -1) {
GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
assert state != null;
part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
return true;
}
return false;
}
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
* @throws IgniteCheckedException If failed to restore.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
int partId = dataEntry.partitionId();
if (partId == -1)
partId = cacheCtx.affinity().partition(dataEntry.key());
GridDhtLocalPartition locPart = cacheCtx.isLocal() ? null : cacheCtx.topology().forceCreatePartition(partId);
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
if (dataEntry instanceof MvccDataEntry) {
cacheCtx.offheap().mvccApplyUpdate(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
0L,
locPart,
((MvccDataEntry)dataEntry).mvccVer());
}
else {
cacheCtx.offheap().update(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
0L,
locPart,
null);
}
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
break;
case DELETE:
if (dataEntry instanceof MvccDataEntry) {
cacheCtx.offheap().mvccApplyUpdate(
cacheCtx,
dataEntry.key(),
null,
dataEntry.writeVersion(),
0L,
locPart,
((MvccDataEntry)dataEntry).mvccVer());
}
else
cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
break;
case READ:
// do nothing
break;
default:
throw new IgniteCheckedException("Invalid operation for WAL entry update: " + dataEntry.op());
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void finalizeCheckpointOnRecovery(long cpTs, UUID cpId, WALPointer walPtr) throws IgniteCheckedException {
assert cpTs != 0;
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
long start = System.currentTimeMillis();
Collection<DataRegion> memPolicies = context().database().dataRegions();
List<IgniteBiTuple<PageMemory, Collection<FullPageId>>> cpEntities = new ArrayList<>(memPolicies.size());
for (DataRegion memPlc : memPolicies) {
if (memPlc.config().isPersistenceEnabled()) {
PageMemoryEx pageMem = (PageMemoryEx)memPlc.pageMemory();
cpEntities.add(new IgniteBiTuple<PageMemory, Collection<FullPageId>>(
pageMem, (pageMem).beginCheckpoint()));
}
}
tmpWriteBuf.order(ByteOrder.nativeOrder());
// Identity stores set.
Collection<PageStore> updStores = new HashSet<>();
int cpPagesCnt = 0;
PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
int groupId = fullPageId.groupId();
long pageId = fullPageId.pageId();
// Write buf to page store.
PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
// Save store for future fsync.
updStores.add(store);
};
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities) {
PageMemoryEx pageMem = (PageMemoryEx)e.get1();
Collection<FullPageId> cpPages = e.get2();
cpPagesCnt += cpPages.size();
for (FullPageId fullId : cpPages) {
tmpWriteBuf.rewind();
// Write page content to page store via pageStoreWriter.
// Tracker is null, because no need to track checkpoint metrics on recovery.
pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, null);
}
}
long written = U.currentTimeMillis();
for (PageStore updStore : updStores)
updStore.sync();
long fsync = U.currentTimeMillis();
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities)
((PageMemoryEx)e.get1()).finishCheckpoint();
CheckpointEntry cp = prepareCheckpointEntry(
tmpWriteBuf,
cpTs,
cpId,
walPtr,
null,
CheckpointEntryType.END);
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.END);
cctx.pageStore().finishRecover();
if (log.isInfoEnabled())
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"pagesWrite=%dms, fsync=%dms, total=%dms]",
cpId,
cpPagesCnt,
walPtr,
written - start,
fsync - written,
fsync - start));
}
/**
* Prepares checkpoint entry containing WAL pointer to checkpoint record.
* Writes into given {@code ptrBuf} WAL pointer content.
*
* @param entryBuf Buffer to fill
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint id.
* @param ptr WAL pointer containing record.
* @param rec Checkpoint WAL record.
* @param type Checkpoint type.
* @return Checkpoint entry.
*/
private CheckpointEntry prepareCheckpointEntry(
ByteBuffer entryBuf,
long cpTs,
UUID cpId,
WALPointer ptr,
@Nullable CheckpointRecord rec,
CheckpointEntryType type
) {
assert ptr instanceof FileWALPointer;
FileWALPointer filePtr = (FileWALPointer)ptr;
entryBuf.rewind();
entryBuf.putLong(filePtr.index());
entryBuf.putInt(filePtr.fileOffset());
entryBuf.putInt(filePtr.length());
entryBuf.flip();
return createCheckPointEntry(cpTs, ptr, cpId, rec, type);
}
/**
* Writes checkpoint entry buffer {@code entryBuf} to specified checkpoint file with 2-phase protocol.
*
* @param entryBuf Checkpoint entry buffer to write.
* @param cp Checkpoint entry.
* @param type Checkpoint entry type.
* @throws StorageException If failed to write checkpoint entry.
*/
public void writeCheckpointEntry(ByteBuffer entryBuf, CheckpointEntry cp, CheckpointEntryType type) throws StorageException {
String fileName = checkpointFileName(cp, type);
String tmpFileName = fileName + FilePageStoreManager.TMP_SUFFIX;
try {
try (FileIO io = ioFactory.create(Paths.get(cpDir.getAbsolutePath(), skipSync ? fileName : tmpFileName).toFile(),
StandardOpenOption.CREATE_NEW, StandardOpenOption.WRITE)) {
io.writeFully(entryBuf);
entryBuf.clear();
if (!skipSync)
io.force(true);
}
if (!skipSync)
Files.move(Paths.get(cpDir.getAbsolutePath(), tmpFileName), Paths.get(cpDir.getAbsolutePath(), fileName));
}
catch (IOException e) {
throw new StorageException("Failed to write checkpoint entry [ptr=" + cp.checkpointMark()
+ ", cpTs=" + cp.timestamp()
+ ", cpId=" + cp.checkpointId()
+ ", type=" + type + "]", e);
}
}
/** {@inheritDoc} */
@Override public AtomicInteger writtenPagesCounter() {
return writtenPagesCntr;
}
/** {@inheritDoc} */
@Override public AtomicInteger syncedPagesCounter() {
return syncedPagesCntr;
}
/** {@inheritDoc} */
@Override public AtomicInteger evictedPagesCntr() {
return evictedPagesCntr;
}
/** {@inheritDoc} */
@Override public int currentCheckpointPagesCount() {
return currCheckpointPagesCnt;
}
/**
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint ID.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
private static String checkpointFileName(long cpTs, UUID cpId, CheckpointEntryType type) {
return cpTs + "-" + cpId + "-" + type + ".bin";
}
/**
* @param cp Checkpoint entry.
* @param type Checkpoint type.
*/
private static String checkpointFileName(CheckpointEntry cp, CheckpointEntryType type) {
return checkpointFileName(cp.timestamp(), cp.checkpointId(), type);
}
/**
* Replace thread local with buffers. Thread local should provide direct buffer with one page in length.
*
* @param threadBuf new thread-local with buffers for the checkpoint threads.
*/
public void setThreadBuf(final ThreadLocal<ByteBuffer> threadBuf) {
this.threadBuf = threadBuf;
}
/**
* @param cpTs Checkpoint timestamp.
* @param ptr Wal pointer of checkpoint.
* @param cpId Checkpoint ID.
* @param rec Checkpoint record.
* @param type Checkpoint type.
*
* @return Checkpoint entry.
*/
public CheckpointEntry createCheckPointEntry(
long cpTs,
WALPointer ptr,
UUID cpId,
@Nullable CheckpointRecord rec,
CheckpointEntryType type
) {
assert cpTs > 0;
assert ptr != null;
assert cpId != null;
assert type != null;
Map<Integer, CacheState> cacheGrpStates = null;
// Do not hold groups state in-memory if there is no space in the checkpoint history to prevent possible OOM.
// In this case the actual group states will be readed from WAL by demand.
if (rec != null && cpHistory.hasSpace())
cacheGrpStates = rec.cacheGroupStates();
return new CheckpointEntry(cpTs, ptr, cpId, cacheGrpStates);
}
/**
* @return Checkpoint history.
*/
@Nullable public CheckpointHistory checkpointHistory() {
return cpHistory;
}
/**
* Adds given partition to checkpointer destroy queue.
*
* @param grpId Group ID.
* @param partId Partition ID.
*/
public void schedulePartitionDestroy(int grpId, int partId) {
Checkpointer cp = checkpointer;
if (cp != null)
cp.schedulePartitionDestroy(cctx.cache().cacheGroup(grpId), grpId, partId);
}
/**
* Cancels or wait for partition destroy.
*
* @param grpId Group ID.
* @param partId Partition ID.
* @throws IgniteCheckedException If failed.
*/
public void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp != null)
cp.cancelOrWaitPartitionDestroy(grpId, partId);
}
/**
* Timeout for checkpoint read lock acquisition.
*
* @return Timeout for checkpoint read lock acquisition in milliseconds.
*/
@Override public long checkpointReadLockTimeout() {
return checkpointReadLockTimeout;
}
/**
* Sets timeout for checkpoint read lock acquisition.
*
* @param val New timeout in milliseconds, non-positive value denotes infinite timeout.
*/
@Override public void checkpointReadLockTimeout(long val) {
checkpointReadLockTimeout = val;
}
/**
* Partition destroy queue.
*/
private static class PartitionDestroyQueue {
/** */
private final ConcurrentMap<T2<Integer, Integer>, PartitionDestroyRequest> pendingReqs =
new ConcurrentHashMap<>();
/**
* @param grpCtx Group context.
* @param partId Partition ID to destroy.
*/
private void addDestroyRequest(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
PartitionDestroyRequest req = new PartitionDestroyRequest(grpId, partId);
PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(grpId, partId), req);
assert old == null || grpCtx == null : "Must wait for old destroy request to finish before adding a new one "
+ "[grpId=" + grpId
+ ", grpName=" + grpCtx.cacheOrGroupName()
+ ", partId=" + partId + ']';
}
/**
* @param destroyId Destroy ID.
* @return Destroy request to complete if was not concurrently cancelled.
*/
private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) {
PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId);
return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null;
}
/**
* @param grpId Group ID.
* @param partId Partition ID.
* @return Destroy request to wait for if destroy has begun.
*/
private PartitionDestroyRequest cancelDestroy(int grpId, int partId) {
PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(grpId, partId));
return rmvd == null ? null : !rmvd.cancel() ? rmvd : null;
}
}
/**
* Partition destroy request.
*/
private static class PartitionDestroyRequest {
/** */
private final int grpId;
/** */
private final int partId;
/** Destroy cancelled flag. */
private boolean cancelled;
/** Destroy future. Not null if partition destroy has begun. */
private GridFutureAdapter<Void> destroyFut;
/**
* @param grpId Group ID.
* @param partId Partition ID.
*/
private PartitionDestroyRequest(int grpId, int partId) {
this.grpId = grpId;
this.partId = partId;
}
/**
* Cancels partition destroy request.
*
* @return {@code False} if this request needs to be waited for.
*/
private synchronized boolean cancel() {
if (destroyFut != null) {
assert !cancelled;
return false;
}
cancelled = true;
return true;
}
/**
* Initiates partition destroy.
*
* @return {@code True} if destroy request should be executed, {@code false} otherwise.
*/
private synchronized boolean beginDestroy() {
if (cancelled) {
assert destroyFut == null;
return false;
}
if (destroyFut != null)
return false;
destroyFut = new GridFutureAdapter<>();
return true;
}
/**
*
*/
private synchronized void onDone(Throwable err) {
assert destroyFut != null;
destroyFut.onDone(err);
}
/**
*
*/
private void waitCompleted() throws IgniteCheckedException {
GridFutureAdapter<Void> fut;
synchronized (this) {
assert destroyFut != null;
fut = destroyFut;
}
fut.get();
}
/** {@inheritDoc} */
@Override public String toString() {
return "PartitionDestroyRequest [grpId=" + grpId + ", partId=" + partId + ']';
}
}
/**
* Checkpointer object is used for notification on checkpoint begin, predicate is {@link #scheduledCp}<code>.nextCpTs - now
* > 0 </code>. Method {@link #wakeupForCheckpoint} uses notify, {@link #waitCheckpointEvent} uses wait
*/
@SuppressWarnings("NakedNotify")
public class Checkpointer extends GridWorker {
/** Temporary write buffer. */
private final ByteBuffer tmpWriteBuf;
/** Next scheduled checkpoint progress. */
private volatile CheckpointProgress scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
@Nullable private volatile CheckpointProgress curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
/** */
private long lastCpTs;
/**
* @param gridName Grid name.
* @param name Thread name.
* @param log Logger.
*/
protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log, cctx.kernalContext().workersRegistry());
scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
}
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
try {
while (!isCancelled()) {
waitCheckpointEvent();
GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
if (enableChangeApplied != null) {
enableChangeApplied.onDone();
GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
}
if (checkpointsEnabled)
doCheckpoint();
else {
synchronized (this) {
scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq;
}
}
}
}
catch (Throwable t) {
err = t;
scheduledCp.cpFinishFut.onDone(t);
throw t;
}
finally {
if (err == null && !(stopping && isCancelled))
err = new IllegalStateException("Thread " + name() + " is terminated unexpectedly");
if (err instanceof OutOfMemoryError)
cctx.kernalContext().failure().process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
cctx.kernalContext().failure().process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
}
// Final run after the cancellation.
if (checkpointsEnabled && !shutdownNow) {
try {
doCheckpoint();
scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
}
catch (Throwable e) {
scheduledCp.cpFinishFut.onDone(e);
}
}
}
/**
*
*/
private CheckpointProgressSnapshot wakeupForCheckpoint(long delayFromNow, String reason) {
CheckpointProgress sched = scheduledCp;
long next = U.currentTimeMillis() + delayFromNow;
if (sched.nextCpTs <= next)
return new CheckpointProgressSnapshot(sched);
CheckpointProgressSnapshot ret;
synchronized (this) {
sched = scheduledCp;
if (sched.nextCpTs > next) {
sched.reason = reason;
sched.nextCpTs = next;
}
ret = new CheckpointProgressSnapshot(sched);
notifyAll();
}
return ret;
}
/**
* @param snapshotOperation Snapshot operation.
*/
public IgniteInternalFuture wakeupForSnapshotCreation(SnapshotOperation snapshotOperation) {
GridFutureAdapter<Object> ret;
synchronized (this) {
scheduledCp.nextCpTs = U.currentTimeMillis();
scheduledCp.reason = "snapshot";
scheduledCp.nextSnapshot = true;
scheduledCp.snapshotOperation = snapshotOperation;
ret = scheduledCp.cpBeginFut;
notifyAll();
}
return ret;
}
/**
*
*/
private void doCheckpoint() {
Checkpoint chp = null;
try {
CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
try {
chp = markCheckpointBegin(tracker);
}
catch (IgniteCheckedException e) {
if (curCpProgress != null)
curCpProgress.cpFinishFut.onDone(e);
// In case of checkpoint initialization error node should be invalidated and stopped.
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteException(e); // Re-throw as unchecked exception to force stopping checkpoint thread.
}
updateHeartbeat();
currCheckpointPagesCnt = chp.pagesSize;
writtenPagesCntr = new AtomicInteger();
syncedPagesCntr = new AtomicInteger();
evictedPagesCntr = new AtomicInteger();
boolean success = false;
int destroyedPartitionsCnt;
try {
if (chp.hasDelta()) {
// Identity stores set.
ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>();
CountDownFuture doneWriteFut = new CountDownFuture(
asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
tracker.onPagesWriteStart();
final int totalPagesToWriteCnt = chp.cpPages.size();
if (asyncRunner != null) {
for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
Runnable write = new WriteCheckpointPages(
tracker,
chp.cpPages.innerCollection(i),
updStores,
doneWriteFut,
totalPagesToWriteCnt,
new Runnable() {
@Override public void run() {
updateHeartbeat();
}
},
asyncRunner
);
try {
asyncRunner.execute(write);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
updateHeartbeat();
write.run();
}
}
}
else {
// Single-threaded checkpoint.
updateHeartbeat();
Runnable write = new WriteCheckpointPages(
tracker,
chp.cpPages,
updStores,
doneWriteFut,
totalPagesToWriteCnt,
new Runnable() {
@Override public void run() {
updateHeartbeat();
}
},
null);
write.run();
}
updateHeartbeat();
// Wait and check for errors.
doneWriteFut.get();
// Must re-check shutdown flag here because threads may have skipped some pages.
// If so, we should not put finish checkpoint mark.
if (shutdownNow) {
chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
return;
}
tracker.onFsyncStart();
if (!skipSync) {
for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) {
if (shutdownNow) {
chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
return;
}
blockingSectionBegin();
try {
updStoreEntry.getKey().sync();
}
finally {
blockingSectionEnd();
}
syncedPagesCntr.addAndGet(updStoreEntry.getValue().intValue());
}
}
}
else {
tracker.onPagesWriteStart();
tracker.onFsyncStart();
}
snapshotMgr.afterCheckpointPageWritten();
destroyedPartitionsCnt = destroyEvictedPartitions();
// Must mark successful checkpoint only if there are no exceptions or interrupts.
success = true;
}
finally {
if (success)
markCheckpointEnd(chp);
}
tracker.onEnd();
if (chp.hasDelta() || destroyedPartitionsCnt > 0) {
if (printCheckpointStats) {
if (log.isInfoEnabled()) {
String walSegsCoveredMsg = prepareWalSegsCoveredMsg(chp.walSegsCoveredRange);
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"walSegmentsCleared=%d, walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
"total=%dms]",
chp.cpEntry != null ? chp.cpEntry.checkpointId() : "",
chp.pagesSize,
chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "",
chp.walFilesDeleted,
walSegsCoveredMsg,
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration()));
}
}
persStoreMetrics.onCheckpoint(
tracker.lockWaitDuration(),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten());
}
else {
persStoreMetrics.onCheckpoint(
tracker.lockWaitDuration(),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten());
}
}
catch (IgniteCheckedException e) {
if (chp != null)
chp.progress.cpFinishFut.onDone(e);
cctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
/** */
private String prepareWalSegsCoveredMsg(IgniteBiTuple<Long, Long> walRange) {
String res;
long startIdx = walRange.get1();
long endIdx = walRange.get2();
if (endIdx < 0 || endIdx < startIdx)
res = "[]";
else if (endIdx == startIdx)
res = "[" + endIdx + "]";
else
res = "[" + startIdx + " - " + endIdx + "]";
return res;
}
/**
* Processes all evicted partitions scheduled for destroy.
*
* @throws IgniteCheckedException If failed.
*
* @return The number of destroyed partition files.
*/
private int destroyEvictedPartitions() throws IgniteCheckedException {
PartitionDestroyQueue destroyQueue = curCpProgress.destroyQueue;
if (destroyQueue.pendingReqs.isEmpty())
return 0;
List<PartitionDestroyRequest> reqs = null;
for (final PartitionDestroyRequest req : destroyQueue.pendingReqs.values()) {
if (!req.beginDestroy())
continue;
final int grpId = req.grpId;
final int partId = req.partId;
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
assert grp != null
: "Cache group is not initialized [grpId=" + grpId + "]";
assert grp.offheap() instanceof GridCacheOffheapManager
: "Destroying partition files when persistence is off " + grp.offheap();
final GridCacheOffheapManager offheap = (GridCacheOffheapManager) grp.offheap();
Runnable destroyPartTask = () -> {
try {
offheap.destroyPartitionStore(grpId, partId);
req.onDone(null);
if (log.isDebugEnabled())
log.debug("Partition file has destroyed [grpId=" + grpId + ", partId=" + partId + "]");
}
catch (Exception e) {
req.onDone(new IgniteCheckedException(
"Partition file destroy has failed [grpId=" + grpId + ", partId=" + partId + "]", e));
}
};
if (asyncRunner != null) {
try {
asyncRunner.execute(destroyPartTask);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
destroyPartTask.run();
}
}
else
destroyPartTask.run();
if (reqs == null)
reqs = new ArrayList<>();
reqs.add(req);
}
if (reqs != null)
for (PartitionDestroyRequest req : reqs)
req.waitCompleted();
destroyQueue.pendingReqs.clear();
return reqs != null ? reqs.size() : 0;
}
/**
* @param grpCtx Group context. Can be {@code null} in case of crash recovery.
* @param grpId Group ID.
* @param partId Partition ID.
*/
private void schedulePartitionDestroy(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
synchronized (this) {
scheduledCp.destroyQueue.addDestroyRequest(grpCtx, grpId, partId);
}
if (log.isDebugEnabled())
log.debug("Partition file has been scheduled to destroy [grpId=" + grpId + ", partId=" + partId + "]");
if (grpCtx != null)
wakeupForCheckpoint(PARTITION_DESTROY_CHECKPOINT_TIMEOUT, "partition destroy");
}
/**
* @param grpId Group ID.
* @param partId Partition ID.
*/
private void cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
PartitionDestroyRequest req;
synchronized (this) {
req = scheduledCp.destroyQueue.cancelDestroy(grpId, partId);
}
if (req != null)
req.waitCompleted();
CheckpointProgress cur;
synchronized (this) {
cur = curCpProgress;
if (cur != null)
req = cur.destroyQueue.cancelDestroy(grpId, partId);
}
if (req != null)
req.waitCompleted();
if (log.isDebugEnabled())
log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
}
/**
*
*/
@SuppressWarnings("WaitNotInLoop")
private void waitCheckpointEvent() {
boolean cancel = false;
try {
synchronized (this) {
long remaining;
while ((remaining = scheduledCp.nextCpTs - U.currentTimeMillis()) > 0 && !isCancelled()) {
blockingSectionBegin();
try {
wait(remaining);
}
finally {
blockingSectionEnd();
}
}
}
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
cancel = true;
}
if (cancel)
isCancelled = true;
}
/**
*
*/
@SuppressWarnings("TooBroadScope")
private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
CheckpointRecord cpRec = new CheckpointRecord(null);
WALPointer cpPtr = null;
final CheckpointProgress curr;
CheckpointEntry cp = null;
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
tracker.onLockWaitStart();
boolean hasPages;
boolean hasPartitionsToDestroy;
IgniteFuture snapFut = null;
long cpTs = System.currentTimeMillis();
// This can happen in an unlikely event of two checkpoints happening
// within a currentTimeMillis() granularity window.
if (cpTs == lastCpTs)
cpTs++;
lastCpTs = cpTs;
checkpointLock.writeLock().lock();
try {
tracker.onMarkStart();
synchronized (this) {
curr = scheduledCp;
curr.started = true;
if (curr.reason == null)
curr.reason = "timeout";
// It is important that we assign a new progress object before checkpoint mark in page memory.
scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
curCpProgress = curr;
}
final PartitionAllocationMap map = new PartitionAllocationMap();
GridCompoundFuture asyncLsnrFut = asyncRunner == null ? null : new GridCompoundFuture();
DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
@Override public boolean nextSnapshot() {
return curr.nextSnapshot;
}
/** {@inheritDoc} */
@Override public PartitionAllocationMap partitionStatMap() {
return map;
}
/** {@inheritDoc} */
@Override public boolean needToSnapshot(String cacheOrGrpName) {
return curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
}
/** {@inheritDoc} */
@Override public Executor executor() {
return asyncRunner == null ? null : cmd -> {
try {
GridFutureAdapter<?> res = new GridFutureAdapter<>();
asyncRunner.execute(U.wrapIgniteFuture(cmd, res));
asyncLsnrFut.add(res);
}
catch (RejectedExecutionException e) {
assert false : "A task should never be rejected by async runner";
}
};
}
};
// Listeners must be invoked before we write checkpoint record to WAL.
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx0);
if (asyncLsnrFut != null) {
asyncLsnrFut.markInitialized();
asyncLsnrFut.get();
}
if (curr.nextSnapshot)
snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
GridCompoundFuture grpHandleFut = asyncRunner == null ? null : new GridCompoundFuture();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.walEnabled())
continue;
Runnable r = () -> {
ArrayList<GridDhtLocalPartition> parts = new ArrayList<>(grp.topology().localPartitions().size());
for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
parts.add(part);
CacheState state = new CacheState(parts.size());
for (GridDhtLocalPartition part : parts) {
state.addPartitionState(
part.id(),
part.dataStore().fullSize(),
part.updateCounter(),
(byte)part.state().ordinal()
);
}
synchronized (cpRec) {
cpRec.addCacheGroupState(grp.groupId(), state);
}
};
if (asyncRunner == null)
r.run();
else
try {
GridFutureAdapter<?> res = new GridFutureAdapter<>();
asyncRunner.execute(U.wrapIgniteFuture(r, res));
grpHandleFut.add(res);
}
catch (RejectedExecutionException e) {
assert false : "Task should never be rejected by async runner";
}
}
if (grpHandleFut != null) {
grpHandleFut.markInitialized();
grpHandleFut.get();
}
cpPagesTuple = beginAllCheckpoints();
hasPages = hasPageForWrite(cpPagesTuple.get1());
hasPartitionsToDestroy = !curr.destroyQueue.pendingReqs.isEmpty();
if (hasPages || curr.nextSnapshot || hasPartitionsToDestroy) {
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
if (cpPtr == null)
cpPtr = CheckpointStatus.NULL_PTR;
}
if (hasPages || hasPartitionsToDestroy) {
cp = prepareCheckpointEntry(
tmpWriteBuf,
cpTs,
cpRec.checkpointId(),
cpPtr,
cpRec,
CheckpointEntryType.START);
cpHistory.addCheckpoint(cp);
}
}
finally {
checkpointLock.writeLock().unlock();
tracker.onLockRelease();
}
curr.cpBeginFut.onDone();
if (snapFut != null) {
try {
snapFut.get();
}
catch (IgniteException e) {
U.error(log, "Failed to wait for snapshot operation initialization: " +
curr.snapshotOperation + "]", e);
}
}
if (hasPages || hasPartitionsToDestroy) {
assert cpPtr != null;
assert cp != null;
tracker.onWalCpRecordFsyncStart();
// Sync log outside the checkpoint write lock.
cctx.wal().flush(cpPtr, true);
tracker.onWalCpRecordFsyncEnd();
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.START);
GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple);
if (printCheckpointStats)
if (log.isInfoEnabled())
log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
"checkpointLockHoldTime=%dms, walCpRecordFsyncDuration=%dms, pages=%d, reason='%s']",
cpRec.checkpointId(),
cpPtr,
tracker.lockWaitDuration(),
tracker.lockHoldDuration(),
tracker.walCpRecordFsyncDuration(),
cpPages.size(),
curr.reason)
);
return new Checkpoint(cp, cpPages, curr);
}
else {
if (curr.nextSnapshot)
cctx.wal().flush(null, true);
if (printCheckpointStats) {
if (log.isInfoEnabled())
LT.info(log, String.format("Skipping checkpoint (no pages were modified) [" +
"checkpointLockWait=%dms, checkpointLockHoldTime=%dms, reason='%s']",
tracker.lockWaitDuration(),
tracker.lockHoldDuration(),
curr.reason));
}
GridMultiCollectionWrapper<FullPageId> wrapper = new GridMultiCollectionWrapper<>(new Collection[0]);
return new Checkpoint(null, wrapper, curr);
}
}
/**
* Check that at least one collection is not empty.
*
* @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages.
*/
private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) {
boolean hasPages = false;
for (Collection c : cpPagesCollWrapper)
if (!c.isEmpty()) {
hasPages = true;
break;
}
return hasPages;
}
/**
* @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty
* pages.
*/
private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() {
Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(dataRegions().size());
int pagesNum = 0;
for (DataRegion memPlc : dataRegions()) {
if (!memPlc.config().isPersistenceEnabled())
continue;
GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint();
pagesNum += nextCpPagesCol.size();
res.add(nextCpPagesCol);
}
currCheckpointPagesCnt = pagesNum;
return new IgniteBiTuple<>(res, pagesNum);
}
/**
* @param chp Checkpoint snapshot.
*/
private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
synchronized (this) {
writtenPagesCntr = null;
syncedPagesCntr = null;
evictedPagesCntr = null;
for (DataRegion memPlc : dataRegions()) {
if (!memPlc.config().isPersistenceEnabled())
continue;
((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
}
currCheckpointPagesCnt = 0;
}
if (chp.hasDelta()) {
CheckpointEntry cp = prepareCheckpointEntry(
tmpWriteBuf,
chp.cpEntry.timestamp(),
chp.cpEntry.checkpointId(),
chp.cpEntry.checkpointMark(),
null,
CheckpointEntryType.END);
writeCheckpointEntry(tmpWriteBuf, cp, CheckpointEntryType.END);
cctx.wal().notchLastCheckpointPtr(chp.cpEntry.checkpointMark());
}
List<CheckpointEntry> removedFromHistory = cpHistory.onCheckpointFinished(chp, truncateWalOnCpFinish);
for (CheckpointEntry cp : removedFromHistory)
removeCheckpointFiles(cp);
if (chp.progress != null)
chp.progress.cpFinishFut.onDone();
}
/** {@inheritDoc} */
@Override public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
isCancelled = true;
synchronized (this) {
notifyAll();
}
}
/**
*
*/
public void shutdownNow() {
shutdownNow = true;
if (!isCancelled)
cancel();
}
}
/**
* Reorders list of checkpoint pages and splits them into needed number of sublists according to
* {@link DataStorageConfiguration#getCheckpointThreads()} and
* {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
*
* @param cpPagesTuple Checkpoint pages tuple.
*/
private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple
) {
List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) {
for (int i = 0; i < col.collectionsSize(); i++)
cpPagesList.addAll(col.innerCollection(i));
}
if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
FullPageId[] objects = cpPagesList.toArray(new FullPageId[cpPagesList.size()]);
Arrays.parallelSort(objects, new Comparator<FullPageId>() {
@Override public int compare(FullPageId o1, FullPageId o2) {
int cmp = Long.compare(o1.groupId(), o2.groupId());
if (cmp != 0)
return cmp;
return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
PageIdUtils.effectivePageId(o2.pageId()));
}
});
cpPagesList = Arrays.asList(objects);
}
int cpThreads = persistenceCfg.getCheckpointThreads();
int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
// Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
Collection[] pagesSubListArr = new Collection[pagesSubLists];
for (int i = 0; i < pagesSubLists; i++) {
int totalSize = cpPagesList.size();
int from = totalSize * i / (pagesSubLists);
int to = totalSize * (i + 1) / (pagesSubLists);
pagesSubListArr[i] = cpPagesList.subList(from, to);
}
return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
}
/** Pages write task */
private class WriteCheckpointPages implements Runnable {
/** */
private final CheckpointMetricsTracker tracker;
/** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection */
private final Collection<FullPageId> writePageIds;
/** */
private final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores;
/** */
private final CountDownFuture doneFut;
/** Total pages to write, counter may be greater than {@link #writePageIds} size */
private final int totalPagesToWrite;
/** */
private final Runnable beforePageWrite;
/** If any pages were skipped, new task with remaining pages will be submitted here. */
private final ExecutorService retryWriteExecutor;
/**
* Creates task for write pages
*
* @param tracker
* @param writePageIds Collection of page IDs to write.
* @param updStores
* @param doneFut
* @param totalPagesToWrite total pages to be written under this checkpoint
* @param beforePageWrite Action to be performed before every page write.
* @param retryWriteExecutor Retry write executor.
*/
private WriteCheckpointPages(
final CheckpointMetricsTracker tracker,
final Collection<FullPageId> writePageIds,
final ConcurrentLinkedHashMap<PageStore, LongAdder> updStores,
final CountDownFuture doneFut,
final int totalPagesToWrite,
final Runnable beforePageWrite,
final ExecutorService retryWriteExecutor
) {
this.tracker = tracker;
this.writePageIds = writePageIds;
this.updStores = updStores;
this.doneFut = doneFut;
this.totalPagesToWrite = totalPagesToWrite;
this.beforePageWrite = beforePageWrite;
this.retryWriteExecutor = retryWriteExecutor;
}
/** {@inheritDoc} */
@Override public void run() {
snapshotMgr.beforeCheckpointPageWritten();
Collection<FullPageId> writePageIds = this.writePageIds;
try {
List<FullPageId> pagesToRetry = writePages(writePageIds);
if (pagesToRetry.isEmpty())
doneFut.onDone((Void)null);
else {
if (retryWriteExecutor == null) {
while (!pagesToRetry.isEmpty())
pagesToRetry = writePages(pagesToRetry);
doneFut.onDone((Void)null);
}
else {
// Submit current retry pages to the end of the queue to avoid starvation.
WriteCheckpointPages retryWritesTask = new WriteCheckpointPages(
tracker,
pagesToRetry,
updStores,
doneFut,
totalPagesToWrite,
beforePageWrite,
retryWriteExecutor);
retryWriteExecutor.submit(retryWritesTask);
}
}
}
catch (Throwable e) {
doneFut.onDone(e);
}
}
/**
* @param writePageIds Collections of pages to write.
* @return pagesToRetry Pages which should be retried.
*/
private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
List<FullPageId> pagesToRetry = new ArrayList<>();
CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);
ByteBuffer tmpWriteBuf = threadBuf.get();
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
tmpWriteBuf.rewind();
beforePageWrite.run();
snapshotMgr.beforePageWrite(fullId);
int grpId = fullId.groupId();
PageMemoryEx pageMem;
// TODO IGNITE-7792 add generic mapping.
if (grpId == MetaStorage.METASTORAGE_CACHE_ID)
pageMem = (PageMemoryEx)metaStorage.pageMemory();
else if (grpId == TxLog.TX_LOG_CACHE_ID)
pageMem = (PageMemoryEx)dataRegion(TxLog.TX_LOG_CACHE_NAME).pageMemory();
else {
CacheGroupContext grp = context().cache().cacheGroup(grpId);
DataRegion region = grp != null ?grp .dataRegion() : null;
if (region == null || !region.config().isPersistenceEnabled())
continue;
pageMem = (PageMemoryEx)region.pageMemory();
}
pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
}
return pagesToRetry;
}
/**
* Factory method for create {@link PageStoreWriter}.
*
* @param pagesToRetry List pages for retry.
* @return Checkpoint page write context.
*/
private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
return new PageStoreWriter() {
/** {@inheritDoc} */
@Override public void writePage(FullPageId fullPageId, ByteBuffer tmpWriteBuf, int tag) throws IgniteCheckedException {
if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
pagesToRetry.add(fullPageId);
return;
}
long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
int groupId = fullPageId.groupId();
long pageId = fullPageId.pageId();
assert getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
assert getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
tmpWriteBuf.rewind();
if (persStoreMetrics.metricsEnabled()) {
int pageType = PageIO.getType(tmpWriteBuf);
if (PageIO.isDataPageType(pageType))
tracker.onDataPageWritten();
}
if (!skipCrc) {
PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
tmpWriteBuf.rewind();
}
int curWrittenPages = writtenPagesCntr.incrementAndGet();
snapshotMgr.onPageWrite(fullPageId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
tmpWriteBuf.rewind();
PageStore store = storeMgr.writeInternal(groupId, pageId, tmpWriteBuf, tag, false);
updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
}
};
}
}
/**
*
*/
public static class Checkpoint {
/** Checkpoint entry. */
@Nullable private final CheckpointEntry cpEntry;
/** Checkpoint pages. */
private final GridMultiCollectionWrapper<FullPageId> cpPages;
/** */
private final CheckpointProgress progress;
/** Number of deleted WAL files. */
private int walFilesDeleted;
/** WAL segments fully covered by this checkpoint. */
private IgniteBiTuple<Long, Long> walSegsCoveredRange;
/** */
private final int pagesSize;
/**
* @param cpEntry Checkpoint entry.
* @param cpPages Pages to write to the page store.
* @param progress Checkpoint progress status.
*/
private Checkpoint(
@Nullable CheckpointEntry cpEntry,
@NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
CheckpointProgress progress
) {
this.cpEntry = cpEntry;
this.cpPages = cpPages;
this.progress = progress;
pagesSize = cpPages.size();
}
/**
* @return {@code true} if this checkpoint contains at least one dirty page.
*/
public boolean hasDelta() {
return pagesSize != 0;
}
/**
* @param walFilesDeleted Wal files deleted.
*/
public void walFilesDeleted(int walFilesDeleted) {
this.walFilesDeleted = walFilesDeleted;
}
/**
* @param walSegsCoveredRange WAL segments fully covered by this checkpoint.
*/
public void walSegsCoveredRange(final IgniteBiTuple<Long, Long> walSegsCoveredRange) {
this.walSegsCoveredRange = walSegsCoveredRange;
}
}
/**
*
*/
private static class CheckpointStatus {
/** Null checkpoint UUID. */
private static final UUID NULL_UUID = new UUID(0L, 0L);
/** Null WAL pointer. */
private static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
/** */
private long cpStartTs;
/** */
private UUID cpStartId;
/** */
@GridToStringInclude
private WALPointer startPtr;
/** */
private UUID cpEndId;
/** */
@GridToStringInclude
private WALPointer endPtr;
/**
* @param cpStartId Checkpoint start ID.
* @param startPtr Checkpoint start pointer.
* @param cpEndId Checkpoint end ID.
* @param endPtr Checkpoint end pointer.
*/
private CheckpointStatus(long cpStartTs, UUID cpStartId, WALPointer startPtr, UUID cpEndId, WALPointer endPtr) {
this.cpStartTs = cpStartTs;
this.cpStartId = cpStartId;
this.startPtr = startPtr;
this.cpEndId = cpEndId;
this.endPtr = endPtr;
}
/**
* @return {@code True} if need to apply page log to restore tree structure.
*/
public boolean needRestoreMemory() {
return !F.eq(cpStartId, cpEndId) && !F.eq(NULL_UUID, cpStartId);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CheckpointStatus.class, this);
}
}
/**
* Data class representing the state of running/scheduled checkpoint.
*/
private static class CheckpointProgress {
/** Scheduled time of checkpoint. */
private volatile long nextCpTs;
/** Checkpoint begin phase future. */
private GridFutureAdapter cpBeginFut = new GridFutureAdapter<>();
/** Checkpoint finish phase future. */
private GridFutureAdapter cpFinishFut = new GridFutureAdapter<Void>() {
@Override protected boolean onDone(@Nullable Void res, @Nullable Throwable err, boolean cancel) {
if (err != null && !cpBeginFut.isDone())
cpBeginFut.onDone(err);
return super.onDone(res, err, cancel);
}
};
/** Flag indicates that snapshot operation will be performed after checkpoint. */
private volatile boolean nextSnapshot;
/** Flag indicates that checkpoint is started. */
private volatile boolean started;
/** Snapshot operation that should be performed if {@link #nextSnapshot} set to true. */
private volatile SnapshotOperation snapshotOperation;
/** Partitions destroy queue. */
private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue();
/** Wakeup reason. */
private String reason;
/**
* @param nextCpTs Next checkpoint timestamp.
*/
private CheckpointProgress(long nextCpTs) {
this.nextCpTs = nextCpTs;
}
}
/**
*
*/
private static class CheckpointProgressSnapshot implements CheckpointFuture {
/** */
private final boolean started;
/** */
private final GridFutureAdapter<Object> cpBeginFut;
/** */
private final GridFutureAdapter<Object> cpFinishFut;
/** */
CheckpointProgressSnapshot(CheckpointProgress cpProgress) {
started = cpProgress.started;
cpBeginFut = cpProgress.cpBeginFut;
cpFinishFut = cpProgress.cpFinishFut;
}
/** {@inheritDoc} */
@Override public GridFutureAdapter beginFuture() {
return cpBeginFut;
}
/** {@inheritDoc} */
@Override public GridFutureAdapter finishFuture() {
return cpFinishFut;
}
}
/**
*
*/
public static class FileLockHolder implements AutoCloseable {
/** Lock file name. */
private static final String lockFileName = "lock";
/** File. */
private File file;
/** Channel. */
private RandomAccessFile lockFile;
/** Lock. */
private volatile FileLock lock;
/** Kernal context to generate Id of locked node in file. */
@NotNull private GridKernalContext ctx;
/** Logger. */
private IgniteLogger log;
/**
* @param path Path.
*/
public FileLockHolder(String path, @NotNull GridKernalContext ctx, IgniteLogger log) {
try {
file = Paths.get(path, lockFileName).toFile();
lockFile = new RandomAccessFile(file, "rw");
this.ctx = ctx;
this.log = log;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* @param lockWaitTimeMillis During which time thread will try capture file lock.
* @throws IgniteCheckedException If failed to capture file lock.
*/
public void tryLock(long lockWaitTimeMillis) throws IgniteCheckedException {
assert lockFile != null;
FileChannel ch = lockFile.getChannel();
SB sb = new SB();
//write node id
sb.a("[").a(ctx.localNodeId().toString()).a("]");
//write ip addresses
final GridDiscoveryManager discovery = ctx.discovery();
if (discovery != null) { //discovery may be not up and running
final ClusterNode node = discovery.localNode();
if (node != null)
sb.a(node.addresses());
}
//write ports
sb.a("[");
Iterator<GridPortRecord> it = ctx.ports().records().iterator();
while (it.hasNext()) {
GridPortRecord rec = it.next();
sb.a(rec.protocol()).a(":").a(rec.port());
if (it.hasNext())
sb.a(", ");
}
sb.a("]");
String failMsg;
try {
String content = null;
// Try to get lock, if not available wait 1 sec and re-try.
for (int i = 0; i < lockWaitTimeMillis; i += 1000) {
try {
lock = ch.tryLock(0, 1, false);
if (lock != null && lock.isValid()) {
writeContent(sb.toString());
return;
}
}
catch (OverlappingFileLockException ignore) {
if (content == null)
content = readContent();
log.warning("Failed to acquire file lock (local nodeId:" + ctx.localNodeId()
+ ", already locked by " + content + "), will try again in 1s: "
+ file.getAbsolutePath());
}
U.sleep(1000);
}
if (content == null)
content = readContent();
failMsg = "Failed to acquire file lock during " + (lockWaitTimeMillis / 1000) +
" sec, (locked by " + content + "): " + file.getAbsolutePath();
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
if (failMsg != null)
throw new IgniteCheckedException(failMsg);
}
/**
* Write node id (who captured lock) into lock file.
*
* @param content Node id.
* @throws IOException if some fail while write node it.
*/
private void writeContent(String content) throws IOException {
FileChannel ch = lockFile.getChannel();
byte[] bytes = content.getBytes();
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
ch.write(buf, 1);
ch.force(false);
}
/**
*
*/
private String readContent() throws IOException {
FileChannel ch = lockFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate((int)(ch.size() - 1));
ch.read(buf, 1);
String content = new String(buf.array());
buf.clear();
return content;
}
/** Locked or not. */
public boolean isLocked() {
return lock != null && lock.isValid();
}
/** Releases file lock */
public void release() {
U.releaseQuiet(lock);
}
/** Closes file channel */
@Override public void close() {
release();
U.closeQuiet(lockFile);
}
/**
* @return Absolute path to lock file.
*/
private String lockPath() {
return file.getAbsolutePath();
}
}
/** {@inheritDoc} */
@Override public DataStorageMetrics persistentStoreMetrics() {
return new DataStorageMetricsSnapshot(persStoreMetrics);
}
/**
*
*/
public DataStorageMetricsImpl persistentStoreMetricsImpl() {
return persStoreMetrics;
}
/** {@inheritDoc} */
@Override public MetaStorage metaStorage() {
return metaStorage;
}
/** {@inheritDoc} */
public void notifyMetaStorageSubscribersOnReadyForRead() throws IgniteCheckedException {
metastorageLifecycleLsnrs = cctx.kernalContext().internalSubscriptionProcessor().getMetastorageSubscribers();
readMetastore();
}
/** {@inheritDoc} */
@Override public boolean walEnabled(int grpId, boolean local) {
if (local)
return !initiallyLocalWalDisabledGrps.contains(grpId);
else
return !initiallyGlobalWalDisabledGrps.contains(grpId);
}
/** {@inheritDoc} */
@Override public void walEnabled(int grpId, boolean enabled, boolean local) {
String key = walGroupIdToKey(grpId, local);
checkpointReadLock();
try {
if (enabled)
metaStorage.remove(key);
else {
metaStorage.write(key, true);
lastCheckpointInapplicableForWalRebalance(grpId);
}
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to write cache group WAL state [grpId=" + grpId +
", enabled=" + enabled + ']', e);
}
finally {
checkpointReadUnlock();
}
}
/**
* Checks that checkpoint with timestamp {@code cpTs} is inapplicable as start point for WAL rebalance for given group {@code grpId}.
*
* @param cpTs Checkpoint timestamp.
* @param grpId Group ID.
* @return {@code true} if checkpoint {@code cpTs} is inapplicable as start point for WAL rebalance for {@code grpId}.
* @throws IgniteCheckedException If failed to check.
*/
public boolean isCheckpointInapplicableForWalRebalance(Long cpTs, int grpId) throws IgniteCheckedException {
return metaStorage.read(checkpointInapplicableCpAndGroupIdToKey(cpTs, grpId)) != null;
}
/**
* Set last checkpoint as inapplicable for WAL rebalance for given group {@code grpId}.
*
* @param grpId Group ID.
*/
@Override public void lastCheckpointInapplicableForWalRebalance(int grpId) {
checkpointReadLock();
try {
CheckpointEntry lastCp = cpHistory.lastCheckpoint();
long lastCpTs = lastCp != null ? lastCp.timestamp() : 0;
if (lastCpTs != 0)
metaStorage.write(checkpointInapplicableCpAndGroupIdToKey(lastCpTs, grpId), true);
}
catch (IgniteCheckedException e) {
log.error("Failed to mark last checkpoint as inapplicable for WAL rebalance for group: " + grpId, e);
}
finally {
checkpointReadUnlock();
}
}
/**
*
*/
private void fillWalDisabledGroups() {
MetaStorage meta = cctx.database().metaStorage();
try {
Set<String> keys = meta.readForPredicate(WAL_KEY_PREFIX_PRED).keySet();
if (keys.isEmpty())
return;
for (String key : keys) {
T2<Integer, Boolean> t2 = walKeyToGroupIdAndLocalFlag(key);
if (t2 == null)
continue;
if (t2.get2())
initiallyLocalWalDisabledGrps.add(t2.get1());
else
initiallyGlobalWalDisabledGrps.add(t2.get1());
}
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to read cache groups WAL state.", e);
}
}
/**
* Convert cache group ID to WAL state key.
*
* @param grpId Group ID.
* @return Key.
*/
private static String walGroupIdToKey(int grpId, boolean local) {
if (local)
return WAL_LOCAL_KEY_PREFIX + grpId;
else
return WAL_GLOBAL_KEY_PREFIX + grpId;
}
/**
* Convert checkpoint timestamp and cache group ID to key for {@link #CHECKPOINT_INAPPLICABLE_FOR_REBALANCE} metastorage records.
*
* @param cpTs Checkpoint timestamp.
* @param grpId Group ID.
* @return Key.
*/
private static String checkpointInapplicableCpAndGroupIdToKey(long cpTs, int grpId) {
return CHECKPOINT_INAPPLICABLE_FOR_REBALANCE + cpTs + "-" + grpId;
}
/**
* Convert WAL state key to cache group ID.
*
* @param key Key.
* @return Group ID.
*/
private static T2<Integer, Boolean> walKeyToGroupIdAndLocalFlag(String key) {
if (key.startsWith(WAL_LOCAL_KEY_PREFIX))
return new T2<>(Integer.parseInt(key.substring(WAL_LOCAL_KEY_PREFIX.length())), true);
else if (key.startsWith(WAL_GLOBAL_KEY_PREFIX))
return new T2<>(Integer.parseInt(key.substring(WAL_GLOBAL_KEY_PREFIX.length())), false);
else
return null;
}
/**
* Abstract class for create restore context.
*/
public abstract static class RestoreStateContext {
/** */
protected final IgniteLogger log;
/** Last archived segment. */
protected final long lastArchivedSegment;
/** Last read record WAL pointer. */
protected FileWALPointer lastRead;
/**
* @param lastArchivedSegment Last archived segment index.
* @param log Ignite logger.
*/
public RestoreStateContext(long lastArchivedSegment, IgniteLogger log) {
this.lastArchivedSegment = lastArchivedSegment;
this.log = log;
}
/**
* Advance iterator to the next record.
*
* @param it WAL iterator.
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
public WALRecord next(WALIterator it) throws IgniteCheckedException {
try {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
WALRecord rec = tup.get2();
WALPointer ptr = tup.get1();
lastRead = (FileWALPointer)ptr;
rec.position(ptr);
return rec;
}
catch (IgniteCheckedException e) {
boolean throwsCRCError = throwsCRCError();
if (X.hasCause(e, IgniteDataIntegrityViolationException.class)) {
if (throwsCRCError)
throw e;
else
return null;
}
log.error("Catch error during restore state, throwsCRCError=" + throwsCRCError, e);
throw e;
}
}
/**
*
* @return Last read WAL record pointer.
*/
public WALPointer lastReadRecordPointer() {
return lastRead;
}
/**
*
* @return Flag indicates need throws CRC exception or not.
*/
public boolean throwsCRCError(){
FileWALPointer lastReadPtr = lastRead;
return lastReadPtr != null && lastReadPtr.index() <= lastArchivedSegment;
}
}
/**
* Restore memory context. Tracks the safety of binary recovery.
*/
public static class RestoreBinaryState extends RestoreStateContext {
/** Checkpoint status. */
private final CheckpointStatus status;
/** The flag indicates need to apply the binary update or no needed. */
private boolean needApplyBinaryUpdates;
/**
* @param status Checkpoint status.
* @param lastArchivedSegment Last archived segment index.
* @param log Ignite logger.
*/
public RestoreBinaryState(CheckpointStatus status, long lastArchivedSegment, IgniteLogger log) {
super(lastArchivedSegment, log);
this.status = status;
needApplyBinaryUpdates = status.needRestoreMemory();
}
/**
* Advance iterator to the next record.
*
* @param it WAL iterator.
* @return WALRecord entry.
* @throws IgniteCheckedException If CRC check fail during binary recovery state or another exception occurring.
*/
@Override public WALRecord next(WALIterator it) throws IgniteCheckedException {
WALRecord rec = super.next(it);
if (rec == null)
return null;
if (rec.type() == CHECKPOINT_RECORD) {
CheckpointRecord cpRec = (CheckpointRecord)rec;
// We roll memory up until we find a checkpoint start record registered in the status.
if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() +
", pos=" + rec.position() + ']');
needApplyBinaryUpdates = false;
}
else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() +
", expCpId=" + status.cpStartId + ", pos=" + rec.position() + ']');
}
return rec;
}
/**
*
* @return Flag indicates need apply binary record or not.
*/
public boolean needApplyBinaryUpdate() {
return needApplyBinaryUpdates;
}
/**
*
* @return Flag indicates need throws CRC exception or not.
*/
@Override public boolean throwsCRCError() {
log.info("Throws CRC error check, needApplyBinaryUpdates=" + needApplyBinaryUpdates +
", lastArchivedSegment=" + lastArchivedSegment + ", lastRead=" + lastRead);
if (needApplyBinaryUpdates)
return true;
return super.throwsCRCError();
}
}
/**
* Restore logical state context. Tracks the safety of logical recovery.
*/
public static class RestoreLogicalState extends RestoreStateContext {
/**
* @param lastArchivedSegment Last archived segment index.
* @param log Ignite logger.
*/
public RestoreLogicalState(long lastArchivedSegment, IgniteLogger log) {
super(lastArchivedSegment, log);
}
}
/** Indicates checkpoint read lock acquisition failure which did not lead to node invalidation. */
private static class CheckpointReadLockTimeoutException extends IgniteCheckedException {
/** */
private static final long serialVersionUID = 0L;
/** */
private CheckpointReadLockTimeoutException(String msg) {
super(msg);
}
}
}