blob: e07aef70a9429d537c28979ae238ca8068376be5 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.management.ObjectName;
import org.apache.ignite.DataStorageMetrics;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CheckpointWriteOrder;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.StoredCacheData;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.file.FileIO;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStore;
import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.persistence.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P3;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteFuture;
import org.apache.ignite.lang.IgniteOutClosure;
import org.apache.ignite.mxbean.DataStorageMetricsMXBean;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static java.nio.file.StandardOpenOption.READ;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
/**
*
*/
@SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"})
public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager {
/** */
public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
/** */
private static final long GB = 1024L * 1024 * 1024;
/** Minimum checkpointing page buffer size (may be adjusted by Ignite). */
public static final Long DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE = GB / 4;
/** Default minimum checkpointing page buffer size (may be adjusted by Ignite). */
public static final Long DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE = 2 * GB;
/** Skip sync. */
private final boolean skipSync = IgniteSystemProperties.getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
/** */
private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
/** */
private final int walRebalanceThreshold = IgniteSystemProperties.getInteger(
IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
/** Checkpoint lock hold count. */
private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
return 0;
}
};
/** Assertion enabled. */
private static final boolean ASSERTION_ENABLED = GridCacheDatabaseSharedManager.class.desiredAssertionStatus();
/** Checkpoint file name pattern. */
private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
/** */
private static final FileFilter CP_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File f) {
return CP_FILE_NAME_PATTERN.matcher(f.getName()).matches();
}
};
/** */
private static final Comparator<GridDhtLocalPartition> ASC_PART_COMPARATOR = new Comparator<GridDhtLocalPartition>() {
@Override public int compare(GridDhtLocalPartition a, GridDhtLocalPartition b) {
return Integer.compare(a.id(), b.id());
}
};
/** */
private static final Comparator<File> CP_TS_COMPARATOR = new Comparator<File>() {
/** {@inheritDoc} */
@Override public int compare(File o1, File o2) {
Matcher m1 = CP_FILE_NAME_PATTERN.matcher(o1.getName());
Matcher m2 = CP_FILE_NAME_PATTERN.matcher(o2.getName());
boolean s1 = m1.matches();
boolean s2 = m2.matches();
assert s1 : "Failed to match CP file: " + o1.getAbsolutePath();
assert s2 : "Failed to match CP file: " + o2.getAbsolutePath();
long ts1 = Long.parseLong(m1.group(1));
long ts2 = Long.parseLong(m2.group(1));
int res = Long.compare(ts1, ts2);
if (res == 0) {
CheckpointEntryType type1 = CheckpointEntryType.valueOf(m1.group(3));
CheckpointEntryType type2 = CheckpointEntryType.valueOf(m2.group(3));
assert type1 != type2 : "o1=" + o1.getAbsolutePath() + ", o2=" + o2.getAbsolutePath();
res = type1 == CheckpointEntryType.START ? -1 : 1;
}
return res;
}
};
/** */
private static final String MBEAN_NAME = "DataStorageMetrics";
/** */
private static final String MBEAN_GROUP = "Persistent Store";
/** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
private volatile Checkpointer checkpointer;
/** For testing only. */
private volatile boolean checkpointsEnabled = true;
/** For testing only. */
private volatile GridFutureAdapter<Void> enableChangeApplied;
/** */
private ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
/** */
private long checkpointFreq;
/** */
private FilePageStoreManager storeMgr;
/** Checkpoint metadata directory ("cp"), contains files with checkpoint start and end */
private File cpDir;
/** */
private volatile boolean printCheckpointStats = true;
/** Database configuration. */
private final DataStorageConfiguration persistenceCfg;
/** */
private final Collection<DbCheckpointListener> lsnrs = new CopyOnWriteArrayList<>();
/** Checkpoint history. */
private final CheckpointHistory checkpointHist = new CheckpointHistory();
/** */
private boolean stopping;
/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
@Nullable private ExecutorService asyncRunner;
/** Buffer for the checkpoint threads. */
private ThreadLocal<ByteBuffer> threadBuf;
/** */
private final ConcurrentMap<Integer, IgniteInternalFuture> idxRebuildFuts = new ConcurrentHashMap<>();
/**
* Lock holder for compatible folders mode. Null if lock holder was created at start node. <br>
* In this case lock is held on PDS resover manager and it is not required to manage locking here
*/
@Nullable private FileLockHolder fileLockHolder;
/** Lock wait time. */
private final long lockWaitTime;
/** */
private Map<Integer, Map<Integer, T2<Long, WALPointer>>> reservedForExchange;
/** */
private final ConcurrentMap<T2<Integer, Integer>, T2<Long, WALPointer>> reservedForPreloading = new ConcurrentHashMap<>();
/** Snapshot manager. */
private IgniteCacheSnapshotManager snapshotMgr;
/** */
private DataStorageMetricsImpl persStoreMetrics;
/** */
private ObjectName persistenceMetricsMbeanName;
/** Counter for written checkpoint pages. Not null only if checkpoint is running. */
private volatile AtomicInteger writtenPagesCntr = null;
/** Number of pages in current checkpoint. */
private volatile int currCheckpointPagesCnt;
/**
* @param ctx Kernal context.
*/
public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
IgniteConfiguration cfg = ctx.config();
persistenceCfg = cfg.getDataStorageConfiguration();
assert persistenceCfg != null;
checkpointFreq = persistenceCfg.getCheckpointFrequency();
lockWaitTime = persistenceCfg.getLockWaitTime();
persStoreMetrics = new DataStorageMetricsImpl(
persistenceCfg.isMetricsEnabled(),
persistenceCfg.getMetricsRateTimeInterval(),
persistenceCfg.getMetricsSubIntervalCount()
);
}
/**
*
*/
public Checkpointer getCheckpointer() {
return checkpointer;
}
/**
* For test use only.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
enableChangeApplied = fut;
checkpointsEnabled = enable;
wakeupForCheckpoint("enableCheckpoints()");
return fut;
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
threadBuf = new ThreadLocal<ByteBuffer>() {
/** {@inheritDoc} */
@Override protected ByteBuffer initialValue() {
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
return tmpWriteBuf;
}
};
snapshotMgr = cctx.snapshot();
final GridKernalContext kernalCtx = cctx.kernalContext();
if (!kernalCtx.clientNode()) {
IgnitePageStoreManager store = cctx.pageStore();
assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
storeMgr = (FilePageStoreManager)store;
cpDir = Paths.get(storeMgr.workDir().getAbsolutePath(), "cp").toFile();
if (!U.mkdirs(cpDir))
throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
final FileLockHolder preLocked = kernalCtx.pdsFolderResolver()
.resolveFolders()
.getLockedFileLockHolder();
if (preLocked == null)
fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), kernalCtx, log);
persStoreMetrics.wal(cctx.wal());
}
}
/**
*
*/
private void initDataBase() {
if (persistenceCfg.getCheckpointThreads() > 1)
asyncRunner = new IgniteThreadPoolExecutor(
"checkpoint-runner",
cctx.igniteInstanceName(),
persistenceCfg.getCheckpointThreads(),
persistenceCfg.getCheckpointThreads(),
30_000,
new LinkedBlockingQueue<Runnable>()
);
}
/**
* Get checkpoint buffer size for the given configuration.
*
* @param regCfg Configuration.
* @return Checkpoint buffer size.
*/
public static long checkpointBufferSize(DataRegionConfiguration regCfg) {
if (!regCfg.isPersistenceEnabled())
return 0L;
long res = regCfg.getCheckpointPageBufferSize();
if (res == 0L) {
if (regCfg.getMaxSize() < GB)
res = Math.min(DFLT_MIN_CHECKPOINTING_PAGE_BUFFER_SIZE, regCfg.getMaxSize());
else if (regCfg.getMaxSize() < 8 * GB)
res = regCfg.getMaxSize() / 4;
else
res = DFLT_MAX_CHECKPOINTING_PAGE_BUFFER_SIZE;
}
return res;
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext ctx) throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Activate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
snapshotMgr = cctx.snapshot();
if (!cctx.localNode().isClient()) {
initDataBase();
registrateMetricsMBean();
}
super.onActivate(ctx);
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) {
if (log.isDebugEnabled())
log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
onKernalStop0(false);
stop0(false);
/* Must be here, because after deactivate we can invoke activate and file lock must be already configured */
stopping = false;
if (!cctx.localNode().isClient()) {
//we replace lock with new instance (only if we're responsible for locking folders)
if (fileLockHolder != null)
fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
}
}
/**
* Try to register Metrics MBean.
*
* @throws IgniteCheckedException If failed.
*/
private void registrateMetricsMBean() throws IgniteCheckedException {
if (U.IGNITE_MBEANS_DISABLED)
return;
try {
persistenceMetricsMbeanName = U.registerMBean(
cctx.kernalContext().config().getMBeanServer(),
cctx.kernalContext().igniteInstanceName(),
MBEAN_GROUP,
MBEAN_NAME,
persStoreMetrics,
DataStorageMetricsMXBean.class);
}
catch (Throwable e) {
throw new IgniteCheckedException("Failed to register " + MBEAN_NAME + " MBean.", e);
}
}
/**
* Unregister metrics MBean.
*/
private void unRegistrateMetricsMBean() {
if (persistenceMetricsMbeanName == null)
return;
assert !U.IGNITE_MBEANS_DISABLED;
try {
cctx.kernalContext().config().getMBeanServer().unregisterMBean(persistenceMetricsMbeanName);
persistenceMetricsMbeanName = null;
}
catch (Throwable e) {
U.error(log, "Failed to unregister " + MBEAN_NAME + " MBean.", e);
}
}
/** {@inheritDoc} */
@Override protected IgniteOutClosure<Float> fillFactorProvider(final DataRegionConfiguration dataRegCfg) {
if (!dataRegCfg.isPersistenceEnabled())
return super.fillFactorProvider(dataRegCfg);
final String dataRegName = dataRegCfg.getName();
return new IgniteOutClosure<Float>() {
@Override public Float apply() {
long loadSize = 0L;
long totalSize = 0L;
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.dataRegion().config().getName().equals(dataRegName))
continue;
assert grpCtx.offheap() instanceof GridCacheOffheapManager;
T2<Long, Long> fillFactor = ((GridCacheOffheapManager)grpCtx.offheap()).fillFactor();
loadSize += fillFactor.get1();
totalSize += fillFactor.get2();
}
if (totalSize == 0)
return (float)0;
return (float)loadSize / totalSize;
}
};
}
/** {@inheritDoc} */
@Override public void readCheckpointAndRestoreMemory(
List<DynamicCacheDescriptor> cachesToStart) throws IgniteCheckedException {
checkpointReadLock();
try {
if (!F.isEmpty(cachesToStart)) {
for (DynamicCacheDescriptor desc : cachesToStart) {
if (CU.affinityNode(cctx.localNode(), desc.cacheConfiguration().getNodeFilter()))
storeMgr.initializeForCache(desc.groupDescriptor(), new StoredCacheData(desc.cacheConfiguration()));
}
}
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
WALPointer restore = restoreMemory(status);
cctx.wal().resumeLogging(restore);
cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
}
catch (StorageException e) {
throw new IgniteCheckedException(e);
}
finally {
checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public void lock() throws IgniteCheckedException {
if (fileLockHolder != null) {
if (log.isDebugEnabled())
log.debug("Try to capture file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.tryLock(lockWaitTime);
}
}
/** {@inheritDoc} */
@Override public void unLock() {
if (fileLockHolder != null) {
if (log.isDebugEnabled())
log.debug("Release file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.release();
}
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
checkpointLock.writeLock().lock();
try {
stopping = true;
}
finally {
checkpointLock.writeLock().unlock();
}
shutdownCheckpointer(cancel);
lsnrs.clear();
super.onKernalStop0(cancel);
if (!cctx.kernalContext().clientNode()) {
unLock();
if (fileLockHolder != null)
fileLockHolder.close();
}
unRegistrateMetricsMBean();
}
/** */
private long[] calculateFragmentSizes(int concLvl, long cacheSize, long chpBufSize) {
if (concLvl < 2)
concLvl = Runtime.getRuntime().availableProcessors();
long fragmentSize = cacheSize / concLvl;
if (fragmentSize < 1024 * 1024)
fragmentSize = 1024 * 1024;
long[] sizes = new long[concLvl + 1];
for (int i = 0; i < concLvl; i++)
sizes[i] = fragmentSize;
sizes[concLvl] = chpBufSize;
return sizes;
}
/** {@inheritDoc} */
@Override protected PageMemory createPageMemory(
DirectMemoryProvider memProvider,
DataStorageConfiguration memCfg,
DataRegionConfiguration plcCfg,
DataRegionMetricsImpl memMetrics
) {
if (!plcCfg.isPersistenceEnabled())
return super.createPageMemory(memProvider, memCfg, plcCfg, memMetrics);
memMetrics.persistenceEnabled(true);
long cacheSize = plcCfg.getMaxSize();
// Checkpoint buffer size can not be greater than cache size, it does not make sense.
long chpBufSize = checkpointBufferSize(plcCfg);
if (chpBufSize > cacheSize) {
U.quietAndInfo(log,
"Configured checkpoint page buffer size is too big, setting to the max region size [size="
+ U.readableSize(cacheSize, false) + ", memPlc=" + plcCfg.getName() + ']');
chpBufSize = cacheSize;
}
boolean writeThrottlingEnabled = persistenceCfg.isWriteThrottlingEnabled();
if (IgniteSystemProperties.getBoolean(IgniteSystemProperties.IGNITE_OVERRIDE_WRITE_THROTTLING_ENABLED, false))
writeThrottlingEnabled = true;
PageMemoryImpl pageMem = new PageMemoryImpl(
memProvider,
calculateFragmentSizes(
memCfg.getConcurrencyLevel(),
cacheSize,
chpBufSize
),
cctx,
memCfg.getPageSize(),
new GridInClosure3X<FullPageId, ByteBuffer, Integer>() {
@Override public void applyx(
FullPageId fullId,
ByteBuffer pageBuf,
Integer tag
) throws IgniteCheckedException {
// First of all, write page to disk.
storeMgr.write(fullId.groupId(), fullId.pageId(), pageBuf, tag);
// Only after write we can write page into snapshot.
snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
}
},
new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
@Override public void applyx(
Long page,
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
snapshotMgr.onChangeTrackerPage(page, fullId, pageMem);
}
},
this,
memMetrics,
writeThrottlingEnabled
);
memMetrics.pageMemory(pageMem);
return pageMem;
}
/** {@inheritDoc} */
@Override protected void checkRegionEvictionProperties(DataRegionConfiguration regCfg, DataStorageConfiguration dbCfg)
throws IgniteCheckedException {
if (!regCfg.isPersistenceEnabled())
super.checkRegionEvictionProperties(regCfg, dbCfg);
if (regCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED)
U.warn(log, "Page eviction mode for [" + regCfg.getName() + "] memory region is ignored " +
"because Ignite Native Persistence is enabled");
}
/** {@inheritDoc} */
@Override protected void checkPageSize(DataStorageConfiguration memCfg) {
if (memCfg.getPageSize() == 0) {
try {
assert cctx.pageStore() instanceof FilePageStoreManager :
"Invalid page store manager was created: " + cctx.pageStore();
Path anyIdxPartFile = IgniteUtils.searchFileRecursively(
((FilePageStoreManager)cctx.pageStore()).workDir().toPath(), FilePageStoreManager.INDEX_FILE_NAME);
if (anyIdxPartFile != null) {
memCfg.setPageSize(resolvePageSizeFromPartitionFile(anyIdxPartFile));
return;
}
}
catch (IgniteCheckedException | IOException | IllegalArgumentException e) {
U.quietAndWarn(log, "Attempt to resolve pageSize from store files failed: " + e.getMessage());
U.quietAndWarn(log, "Default page size will be used: " + DataStorageConfiguration.DFLT_PAGE_SIZE + " bytes");
}
memCfg.setPageSize(DataStorageConfiguration.DFLT_PAGE_SIZE);
}
}
/**
* @param partFile Partition file.
*/
private int resolvePageSizeFromPartitionFile(Path partFile) throws IOException, IgniteCheckedException {
try (FileIO fileIO = persistenceCfg.getFileIOFactory().create(partFile.toFile())) {
int minimalHdr = FilePageStore.HEADER_SIZE;
if (fileIO.size() < minimalHdr)
throw new IgniteCheckedException("Partition file is too small: " + partFile);
ByteBuffer hdr = ByteBuffer.allocate(minimalHdr).order(ByteOrder.LITTLE_ENDIAN);
while (hdr.remaining() > 0)
fileIO.read(hdr);
hdr.rewind();
hdr.getLong(); // Read signature.
hdr.getInt(); // Read version.
hdr.get(); // Read type.
int pageSize = hdr.getInt();
if (pageSize == 2048) {
U.quietAndWarn(log, "You are currently using persistent store with 2K pages (DataStorageConfiguration#" +
"pageSize). If you use SSD disk, consider migrating to 4K pages for better IO performance.");
}
return pageSize;
}
}
/**
* @param cancel Cancel flag.
*/
@SuppressWarnings("unused")
private void shutdownCheckpointer(boolean cancel) {
Checkpointer cp = checkpointer;
if (cp != null) {
if (cancel)
cp.shutdownNow();
else
cp.cancel();
try {
U.join(cp);
checkpointer = null;
}
catch (IgniteInterruptedCheckedException ignore) {
U.warn(log, "Was interrupted while waiting for checkpointer shutdown, " +
"will not wait for checkpoint to finish.");
cp.shutdownNow();
while (true) {
try {
U.join(cp);
checkpointer = null;
cp.scheduledCp.cpFinishFut.onDone(
new NodeStoppingException("Checkpointer is stopped during node stop."));
break;
}
catch (IgniteInterruptedCheckedException ignored) {
//Ignore
}
}
Thread.currentThread().interrupt();
}
}
if (asyncRunner != null) {
asyncRunner.shutdownNow();
try {
asyncRunner.awaitTermination(2, TimeUnit.MINUTES);
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
DiscoveryEvent discoEvt = fut.firstEvent();
boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
boolean locNode = discoEvt.eventNode().isLocal();
boolean isSrvNode = !cctx.kernalContext().clientNode();
boolean clusterInTransitionStateToActive = fut.activateCluster();
// Before local node join event.
if (clusterInTransitionStateToActive || (joinEvt && locNode && isSrvNode))
restoreState();
if (cctx.kernalContext().query().moduleEnabled()) {
for (final GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
if (cacheCtx.startTopologyVersion().equals(fut.initialVersion()) &&
!cctx.pageStore().hasIndexStore(cacheCtx.groupId()) && cacheCtx.affinityNode()) {
final int cacheId = cacheCtx.cacheId();
final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
.rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
idxRebuildFuts.put(cacheId, rebuildFut);
rebuildFut.listen(new CI1<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture igniteInternalFut) {
idxRebuildFuts.remove(cacheId, rebuildFut);
log().info("Finished indexes rebuilding for cache: [name=" + cacheCtx.config().getName()
+ ", grpName=" + cacheCtx.config().getGroupName());
}
});
}
}
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
return idxRebuildFuts.get(cacheId);
}
/** {@inheritDoc} */
@Override public void onCacheGroupsStopped(
Collection<IgniteBiTuple<CacheGroupContext, Boolean>> stoppedGrps
) {
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext gctx = tup.get1();
if (!gctx.persistenceEnabled())
continue;
snapshotMgr.onCacheGroupStop(gctx);
PageMemoryEx pageMem = (PageMemoryEx)gctx.dataRegion().pageMemory();
Collection<Integer> grpIds = destroyed.get(pageMem);
if (grpIds == null) {
grpIds = new HashSet<>();
destroyed.put(pageMem, grpIds);
}
grpIds.add(tup.get1().groupId());
pageMem.onCacheGroupDestroyed(tup.get1().groupId());
}
Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
final Collection<Integer> grpIds = entry.getValue();
clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() {
@Override public boolean apply(Integer grpId, Long pageId, Integer tag) {
return grpIds.contains(grpId);
}
}, false));
}
for (IgniteInternalFuture<Void> clearFut : clearFuts) {
try {
clearFut.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to clear page memory", e);
}
}
if (cctx.pageStore() != null) {
for (IgniteBiTuple<CacheGroupContext, Boolean> tup : stoppedGrps) {
CacheGroupContext grp = tup.get1();
if (grp.affinityNode()) {
try {
cctx.pageStore().shutdownForCacheGroup(grp, tup.get2());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + grp.cacheOrGroupName() + "]", e);
}
}
}
}
}
/**
* Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquireSnapshotWorker memory
* state.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void checkpointReadLock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
for (; ; ) {
checkpointLock.readLock().lock();
if (stopping) {
checkpointLock.readLock().unlock();
throw new RuntimeException("Failed to perform cache update: node is stopping.");
}
if (safeToUpdatePageMemories() || checkpointLock.getReadHoldCount() > 1)
break;
else {
checkpointLock.readLock().unlock();
try {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.getUninterruptibly();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e);
}
}
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
}
/** {@inheritDoc} */
@Override public boolean checkpointLockIsHeldByThread() {
return !ASSERTION_ENABLED ||
checkpointLock.isWriteLockedByCurrentThread() ||
CHECKPOINT_LOCK_HOLD_COUNT.get() > 0;
}
/**
* @return {@code true} if all PageMemory instances are safe to update.
*/
private boolean safeToUpdatePageMemories() {
Collection<DataRegion> memPlcs = context().database().dataRegions();
if (memPlcs == null)
return true;
for (DataRegion memPlc : memPlcs) {
if (!memPlc.config().isPersistenceEnabled())
continue;
PageMemoryEx pageMemEx = (PageMemoryEx)memPlc.pageMemory();
if (!pageMemEx.safeToUpdate())
return false;
}
return true;
}
/**
* Releases the checkpoint read lock.
*/
@Override public void checkpointReadUnlock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
checkpointLock.readLock().unlock();
if (checkpointer != null) {
Collection<DataRegion> dataRegs = context().database().dataRegions();
if (dataRegs != null) {
for (DataRegion dataReg : dataRegs) {
if (!dataReg.config().isPersistenceEnabled())
continue;
PageMemoryEx mem = (PageMemoryEx)dataReg.pageMemory();
if (mem != null && !mem.safeToUpdate()) {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages");
break;
}
}
}
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
/**
* @throws IgniteCheckedException If failed to restore database status from WAL.
*/
private void restoreState() throws IgniteCheckedException {
try {
CheckpointStatus status = readCheckpointStatus();
checkpointReadLock();
try {
applyLastUpdates(status);
}
finally {
checkpointReadUnlock();
}
snapshotMgr.restoreState();
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
}
catch (StorageException e) {
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
assert reservedForExchange == null : reservedForExchange;
reservedForExchange = new HashMap<>();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions()) {
if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().fullSize() <= walRebalanceThreshold)
continue;
CheckpointEntry cpEntry = searchCheckpointEntry(grp.groupId(), part.id(), null);
try {
if (cpEntry != null && cctx.wal().reserve(cpEntry.cpMark)) {
Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(grp.groupId());
if (cacheMap == null) {
cacheMap = new HashMap<>();
reservedForExchange.put(grp.groupId(), cacheMap);
}
cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(grp.groupId(), part.id()), cpEntry.cpMark));
}
}
catch (IgniteCheckedException ex) {
U.error(log, "Error while trying to reserve history", ex);
}
}
}
Map<Integer, Map<Integer, Long>> resMap = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
Map<Integer, Long> cacheMap = new HashMap<>();
for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet())
cacheMap.put(e0.getKey(), e0.getValue().get1());
resMap.put(e.getKey(), cacheMap);
}
return resMap;
}
/** {@inheritDoc} */
@Override public synchronized void releaseHistoryForExchange() {
if (reservedForExchange == null)
return;
for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet()) {
try {
cctx.wal().release(e0.getValue().get2());
}
catch (IgniteCheckedException ex) {
U.error(log, "Could not release history lock", ex);
}
}
}
reservedForExchange = null;
}
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(int grpId, int partId, long cntr) {
CheckpointEntry cpEntry = searchCheckpointEntry(grpId, partId, cntr);
if (cpEntry == null)
return false;
WALPointer ptr = cpEntry.cpMark;
if (ptr == null)
return false;
boolean reserved;
try {
reserved = cctx.wal().reserve(ptr);
}
catch (IgniteCheckedException e) {
U.error(log, "Error while trying to reserve history", e);
reserved = false;
}
if (reserved)
reservedForPreloading.put(new T2<>(grpId, partId), new T2<>(cntr, ptr));
return reserved;
}
/** {@inheritDoc} */
@Override public void releaseHistoryForPreloading() {
for (Map.Entry<T2<Integer, Integer>, T2<Long, WALPointer>> e : reservedForPreloading.entrySet()) {
try {
cctx.wal().release(e.getValue().get2());
}
catch (IgniteCheckedException ex) {
U.error(log, "Could not release WAL reservation", ex);
throw new IgniteException(ex);
}
}
reservedForPreloading.clear();
}
/**
* For debugging only. TODO: remove.
*/
public Map<T2<Integer, Integer>, T2<Long, WALPointer>> reservedForPreloading() {
return reservedForPreloading;
}
/**
*
*/
@Nullable @Override public IgniteInternalFuture wakeupForCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp != null)
return cp.wakeupForCheckpoint(0, reason).cpBeginFut;
return null;
}
/** {@inheritDoc} */
@Override public void waitForCheckpoint(String reason) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp == null)
return;
CheckpointProgressSnapshot progSnapshot = cp.wakeupForCheckpoint(0, reason);
IgniteInternalFuture fut1 = progSnapshot.cpFinishFut;
fut1.get();
if (!progSnapshot.started)
return;
IgniteInternalFuture fut2 = cp.wakeupForCheckpoint(0, reason).cpFinishFut;
assert fut1 != fut2;
fut2.get();
}
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
* @param grpId Cache group ID.
* @param part Partition ID.
* @param partCntrSince Partition counter or {@code null} to search for minimal counter.
* @return Checkpoint entry or {@code null} if failed to search.
*/
@Nullable public WALPointer searchPartitionCounter(int grpId, int part, @Nullable Long partCntrSince) {
CheckpointEntry entry = searchCheckpointEntry(grpId, part, partCntrSince);
if (entry == null)
return null;
return entry.cpMark;
}
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
* @param grpId Cache group ID.
* @param part Partition ID.
* @param partCntrSince Partition counter or {@code null} to search for minimal counter.
* @return Checkpoint entry or {@code null} if failed to search.
*/
@Nullable private CheckpointEntry searchCheckpointEntry(int grpId, int part, @Nullable Long partCntrSince) {
boolean hasGap = false;
CheckpointEntry first = null;
for (Long cpTs : checkpointHist.checkpoints()) {
try {
CheckpointEntry entry = checkpointHist.entry(cpTs);
Long foundCntr = entry.partitionCounter(grpId, part);
if (foundCntr != null) {
if (partCntrSince == null) {
if (hasGap) {
first = entry;
hasGap = false;
}
if (first == null)
first = entry;
}
else if (foundCntr <= partCntrSince) {
first = entry;
hasGap = false;
}
else
return hasGap ? null : first;
}
else
hasGap = true;
}
catch (IgniteCheckedException ignore) {
// Treat exception the same way as a gap.
hasGap = true;
}
}
return hasGap ? null : first;
}
/**
* @return Checkpoint history. For tests only.
*/
public CheckpointHistory checkpointHistory() {
return checkpointHist;
}
/**
* @return Checkpoint directory.
*/
public File checkpointDirectory() {
return cpDir;
}
/**
* @param lsnr Listener.
*/
public void addCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.add(lsnr);
}
/**
* @param lsnr Listener.
*/
public void removeCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.remove(lsnr);
}
/**
* @return Read checkpoint status.
* @throws IgniteCheckedException If failed to read checkpoint status page.
*/
@SuppressWarnings("TooBroadScope")
private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
long lastStartTs = 0;
long lastEndTs = 0;
UUID startId = CheckpointStatus.NULL_UUID;
UUID endId = CheckpointStatus.NULL_UUID;
File startFile = null;
File endFile = null;
WALPointer startPtr = CheckpointStatus.NULL_PTR;
WALPointer endPtr = CheckpointStatus.NULL_PTR;
File dir = cpDir;
if (!dir.exists()) {
// TODO: remove excessive logging after GG-12116 fix.
File[] files = dir.listFiles();
if (files != null && files.length > 0) {
log.warning("Read checkpoint status: cpDir.exists() is false, cpDir.listFiles() is: " +
Arrays.toString(files));
}
if (Files.exists(dir.toPath()))
log.warning("Read checkpoint status: cpDir.exists() is false, Files.exists(cpDir) is true.");
if (log.isInfoEnabled())
log.info("Read checkpoint status: checkpoint directory is not found.");
return new CheckpointStatus(0, startId, startPtr, endId, endPtr);
}
File[] files = dir.listFiles();
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
long ts = Long.parseLong(matcher.group(1));
UUID id = UUID.fromString(matcher.group(2));
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START && ts > lastStartTs) {
lastStartTs = ts;
startId = id;
startFile = file;
}
else if (type == CheckpointEntryType.END && ts > lastEndTs) {
lastEndTs = ts;
endId = id;
endFile = file;
}
}
}
ByteBuffer buf = ByteBuffer.allocate(20);
buf.order(ByteOrder.nativeOrder());
if (startFile != null)
startPtr = readPointer(startFile, buf);
if (endFile != null)
endPtr = readPointer(endFile, buf);
if (log.isInfoEnabled())
log.info("Read checkpoint status [startMarker=" + startFile + ", endMarker=" + endFile + ']');
return new CheckpointStatus(lastStartTs, startId, startPtr, endId, endPtr);
}
/**
* Loads WAL pointer from CP file
*
* @param cpMarkerFile Checkpoint mark file.
* @return WAL pointer.
* @throws IgniteCheckedException If failed to read mark file.
*/
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), READ)) {
ch.read(buf);
buf.flip();
return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to read checkpoint pointer from marker file: " +
cpMarkerFile.getAbsolutePath(), e);
}
}
/**
* @param status Checkpoint status.
*/
private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
boolean apply = status.needRestoreMemory();
if (apply) {
U.quietAndWarn(log, "Ignite node stopped in the middle of checkpoint. Will restore memory state and " +
"finish checkpoint on node start.");
cctx.pageStore().beginRecover();
}
long start = U.currentTimeMillis();
int applied = 0;
WALPointer lastRead = null;
try (WALIterator it = cctx.wal().replay(status.endPtr)) {
while (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
WALRecord rec = tup.get2();
lastRead = tup.get1();
switch (rec.type()) {
case CHECKPOINT_RECORD:
CheckpointRecord cpRec = (CheckpointRecord)rec;
// We roll memory up until we find a checkpoint start record registered in the status.
if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() +
", pos=" + tup.get1() + ']');
apply = false;
}
else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() +
", expCpId=" + status.cpStartId + ", pos=" + tup.get1() + ']');
break;
case PAGE_RECORD:
if (apply) {
PageSnapshot pageRec = (PageSnapshot)rec;
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
int grpId = pageRec.fullPageId().groupId();
long pageId = pageRec.fullPageId().pageId();
PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
long page = pageMem.acquirePage(grpId, pageId, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page);
try {
PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
applied++;
}
break;
case PARTITION_DESTROY:
if (apply) {
PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec;
final int gId = destroyRec.groupId();
final int pId = destroyRec.partitionId();
PageMemoryEx pageMem = getPageMemoryForCacheGroup(gId);
pageMem.clearAsync(new P3<Integer, Long, Integer>() {
@Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
return cacheId == gId && PageIdUtils.partId(pageId) == pId;
}
}, true).get();
}
break;
default:
if (apply && rec instanceof PageDeltaRecord) {
PageDeltaRecord r = (PageDeltaRecord)rec;
int grpId = r.groupId();
long pageId = r.pageId();
PageMemoryEx pageMem = getPageMemoryForCacheGroup(grpId);
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
long page = pageMem.acquirePage(grpId, pageId, true);
try {
long pageAddr = pageMem.writeLock(grpId, pageId, page);
try {
r.applyDelta(pageMem, pageAddr);
}
finally {
pageMem.writeUnlock(grpId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
applied++;
}
}
}
}
if (status.needRestoreMemory()) {
if (apply)
throw new IgniteCheckedException("Failed to restore memory state (checkpoint marker is present " +
"on disk, but checkpoint record is missed in WAL) " +
"[cpStatus=" + status + ", lastRead=" + lastRead + "]");
log.info("Finished applying memory changes [changesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + "ms]");
if (applied > 0)
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr);
}
checkpointHist.loadHistory(cpDir);
return lastRead == null ? null : lastRead.next();
}
/**
* Obtains PageMemory reference from cache descriptor instead of cache context.
*
* @param grpId Cache group id.
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no DataRegion is configured for a name obtained from cache descriptor.
*/
private PageMemoryEx getPageMemoryForCacheGroup(int grpId) throws IgniteCheckedException {
// TODO IGNITE-5075: cache descriptor can be removed.
GridCacheSharedContext sharedCtx = context();
CacheGroupDescriptor desc = sharedCtx.cache().cacheGroupDescriptors().get(grpId);
if (desc == null)
throw new IgniteCheckedException("Failed to find cache group descriptor [grpId=" + grpId + ']');
String memPlcName = desc.config().getDataRegionName();
return (PageMemoryEx)sharedCtx.database().dataRegion(memPlcName).pageMemory();
}
/**
* @param status Last registered checkpoint status.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
private void applyLastUpdates(CheckpointStatus status) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
cctx.kernalContext().query().skipFieldLookup(true);
long start = U.currentTimeMillis();
int applied = 0;
try (WALIterator it = cctx.wal().replay(status.startPtr)) {
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>();
while (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
WALRecord rec = next.get2();
switch (rec.type()) {
case DATA_RECORD:
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
int cacheId = dataEntry.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
applyUpdate(cacheCtx, dataEntry);
applied++;
}
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
partStates.put(new T2<>(metaStateRecord.groupId(), metaStateRecord.partitionId()),
new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
break;
default:
// Skip other records.
}
}
restorePartitionState(partStates);
}
finally {
cctx.kernalContext().query().skipFieldLookup(false);
}
if (log.isInfoEnabled())
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + "ms]");
}
/**
* @param partStates Partition states.
* @throws IgniteCheckedException If failed to restore.
*/
private void restorePartitionState(
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || !grp.affinityNode()) {
// Local cache has no partitions and its states.
continue;
}
if (!grp.dataRegion().config().isPersistenceEnabled())
continue;
int grpId = grp.groupId();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
for (int i = 0; i < grp.affinity().partitions(); i++) {
if (storeMgr.exists(grpId, i)) {
storeMgr.ensure(grpId, i);
if (storeMgr.pages(grpId, i) <= 1)
continue;
GridDhtLocalPartition part = grp.topology().forceCreatePartition(i);
assert part != null;
// TODO: https://issues.apache.org/jira/browse/IGNITE-6097
grp.offheap().onPartitionInitialCounterUpdated(i, 0);
long partMetaId = pageMem.partitionMetaPageId(grpId, i);
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
boolean changed = false;
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
T2<Integer, Long> fromWal = partStates.get(new T2<>(grpId, i));
if (fromWal != null) {
int stateId = fromWal.get1();
io.setPartitionState(pageAddr, (byte)stateId);
changed = updateState(part, stateId);
if (stateId == GridDhtPartitionState.OWNING.ordinal()) {
grp.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2());
if (part.initialUpdateCounter() < fromWal.get2()) {
part.initialUpdateCounter(fromWal.get2());
changed = true;
}
}
}
else
changed = updateState(part, (int)io.getPartitionState(pageAddr));
}
finally {
pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
}
}
finally {
pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
}
}
}
/**
* @param part Partition to restore state for.
* @param stateId State enum ordinal.
* @return Updated flag.
*/
private boolean updateState(GridDhtLocalPartition part, int stateId) {
if (stateId != -1) {
GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
assert state != null;
part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
return true;
}
return false;
}
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
* @throws IgniteCheckedException If failed to restore.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
int partId = dataEntry.partitionId();
if (partId == -1)
partId = cacheCtx.affinity().partition(dataEntry.key());
GridDhtLocalPartition locPart = cacheCtx.topology().forceCreatePartition(partId);
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
cacheCtx.offheap().update(
cacheCtx,
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
0L,
locPart,
null);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
break;
case DELETE:
cacheCtx.offheap().remove(cacheCtx, dataEntry.key(), partId, locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(partId, dataEntry.partitionCounter());
break;
default:
throw new IgniteCheckedException("Invalid operation for WAL entry update: " + dataEntry.op());
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void finalizeCheckpointOnRecovery(long cpTs, UUID cpId, WALPointer walPtr) throws IgniteCheckedException {
assert cpTs != 0;
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
long start = System.currentTimeMillis();
Collection<DataRegion> memPolicies = context().database().dataRegions();
List<IgniteBiTuple<PageMemory, Collection<FullPageId>>> cpEntities = new ArrayList<>(memPolicies.size());
for (DataRegion memPlc : memPolicies) {
if (memPlc.config().isPersistenceEnabled()) {
PageMemoryEx pageMem = (PageMemoryEx)memPlc.pageMemory();
cpEntities.add(new IgniteBiTuple<PageMemory, Collection<FullPageId>>(
pageMem, (pageMem).beginCheckpoint()));
}
}
tmpWriteBuf.order(ByteOrder.nativeOrder());
// Identity stores set.
Collection<PageStore> updStores = new HashSet<>();
int cpPagesCnt = 0;
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities) {
PageMemoryEx pageMem = (PageMemoryEx)e.get1();
Collection<FullPageId> cpPages = e.get2();
cpPagesCnt += cpPages.size();
for (FullPageId fullId : cpPages) {
tmpWriteBuf.rewind();
Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, null);
if (tag != null) {
tmpWriteBuf.rewind();
PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag);
tmpWriteBuf.rewind();
updStores.add(store);
}
}
}
long written = U.currentTimeMillis();
for (PageStore updStore : updStores)
updStore.sync();
long fsync = U.currentTimeMillis();
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities)
((PageMemoryEx)e.get1()).finishCheckpoint();
writeCheckpointEntry(
tmpWriteBuf,
cpTs,
cpId,
walPtr,
null,
CheckpointEntryType.END);
cctx.pageStore().finishRecover();
if (log.isInfoEnabled())
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"pagesWrite=%dms, fsync=%dms, total=%dms]",
cpId,
cpPagesCnt,
walPtr,
written - start,
fsync - written,
fsync - start));
}
/**
* @param cpId Checkpoint ID.
* @param ptr Wal pointer of current checkpoint.
*/
private CheckpointEntry writeCheckpointEntry(
ByteBuffer tmpWriteBuf,
long cpTs,
UUID cpId,
WALPointer ptr,
CheckpointRecord rec,
CheckpointEntryType type
) throws IgniteCheckedException {
assert ptr instanceof FileWALPointer;
FileWALPointer filePtr = (FileWALPointer)ptr;
String fileName = checkpointFileName(cpTs, cpId, type);
try (FileChannel ch = FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName),
StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) {
tmpWriteBuf.rewind();
tmpWriteBuf.putLong(filePtr.index());
tmpWriteBuf.putInt(filePtr.fileOffset());
tmpWriteBuf.putInt(filePtr.length());
tmpWriteBuf.flip();
ch.write(tmpWriteBuf);
tmpWriteBuf.clear();
if (!skipSync)
ch.force(true);
return type == CheckpointEntryType.START ?
new CheckpointEntry(cpTs, ptr, cpId, rec.cacheGroupStates()) : null;
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
/**
* Counter for written checkpoint pages. Not null only if checkpoint is running.
*/
public AtomicInteger writtenPagesCounter() {
return writtenPagesCntr;
}
/**
* @return Number of pages in current checkpoint. If checkpoint is not running, returns 0.
*/
public int currentCheckpointPagesCount() {
return currCheckpointPagesCnt;
}
/**
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint ID.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
private static String checkpointFileName(long cpTs, UUID cpId, CheckpointEntryType type) {
return cpTs + "-" + cpId + "-" + type + ".bin";
}
/**
*
*/
@SuppressWarnings("NakedNotify")
public class Checkpointer extends GridWorker {
/** Temporary write buffer. */
private final ByteBuffer tmpWriteBuf;
/** Next scheduled checkpoint progress. */
private volatile CheckpointProgress scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
private volatile CheckpointProgress curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
/** */
private long lastCpTs;
/**
* @param gridName Grid name.
* @param name Thread name.
* @param log Logger.
*/
protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log);
scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
waitCheckpointEvent();
GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
if (enableChangeApplied != null) {
enableChangeApplied.onDone();
GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
}
if (checkpointsEnabled)
doCheckpoint();
else {
synchronized (this) {
scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq;
}
}
}
// Final run after the cancellation.
if (checkpointsEnabled && !shutdownNow)
doCheckpoint();
scheduledCp.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
}
/**
*
*/
private CheckpointProgressSnapshot wakeupForCheckpoint(long delayFromNow, String reason) {
CheckpointProgress sched = scheduledCp;
long next = U.currentTimeMillis() + delayFromNow;
if (sched.nextCpTs <= next)
return new CheckpointProgressSnapshot(sched);
CheckpointProgressSnapshot ret;
synchronized (this) {
sched = scheduledCp;
if (sched.nextCpTs > next) {
sched.reason = reason;
sched.nextCpTs = next;
}
ret = new CheckpointProgressSnapshot(sched);
notifyAll();
}
return ret;
}
/**
* @param snapshotOperation Snapshot operation.
*/
public IgniteInternalFuture wakeupForSnapshotCreation(SnapshotOperation snapshotOperation) {
GridFutureAdapter<Object> ret;
synchronized (this) {
scheduledCp.nextCpTs = U.currentTimeMillis();
scheduledCp.reason = "snapshot";
scheduledCp.nextSnapshot = true;
scheduledCp.snapshotOperation = snapshotOperation;
ret = scheduledCp.cpBeginFut;
notifyAll();
}
return ret;
}
/**
*
*/
private void doCheckpoint() {
try {
CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
Checkpoint chp = markCheckpointBegin(tracker);
currCheckpointPagesCnt = chp.pagesSize;
writtenPagesCntr = new AtomicInteger();
boolean interrupted = true;
try {
if (chp.hasDelta()) {
// Identity stores set.
GridConcurrentHashSet<PageStore> updStores = new GridConcurrentHashSet<>();
CountDownFuture doneWriteFut = new CountDownFuture(
asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
tracker.onPagesWriteStart();
final int totalPagesToWriteCnt = chp.cpPages.size();
if (asyncRunner != null) {
for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
Runnable write = new WriteCheckpointPages(
tracker,
chp.cpPages.innerCollection(i),
updStores,
doneWriteFut,
totalPagesToWriteCnt
);
try {
asyncRunner.execute(write);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
write.run();
}
}
}
else {
// Single-threaded checkpoint.
Runnable write = new WriteCheckpointPages(tracker,
chp.cpPages,
updStores,
doneWriteFut,
totalPagesToWriteCnt);
write.run();
}
// Wait and check for errors.
doneWriteFut.get();
// Must re-check shutdown flag here because threads may have skipped some pages.
// If so, we should not put finish checkpoint mark.
if (shutdownNow) {
chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
return;
}
tracker.onFsyncStart();
if (!skipSync) {
for (PageStore updStore : updStores) {
if (shutdownNow) {
chp.progress.cpFinishFut.onDone(new NodeStoppingException("Node is stopping."));
return;
}
updStore.sync();
}
}
}
else {
tracker.onPagesWriteStart();
tracker.onFsyncStart();
}
snapshotMgr.afterCheckpointPageWritten();
// Must mark successful checkpoint only if there are no exceptions or interrupts.
interrupted = false;
}
finally {
if (!interrupted)
markCheckpointEnd(chp);
}
tracker.onEnd();
if (chp.hasDelta()) {
if (printCheckpointStats) {
if (log.isInfoEnabled())
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"walSegmentsCleared=%d, markDuration=%dms, pagesWrite=%dms, fsync=%dms, " +
"total=%dms]",
chp.cpEntry.checkpointId(),
chp.pagesSize,
chp.cpEntry.checkpointMark(),
chp.walFilesDeleted,
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration()));
}
persStoreMetrics.onCheckpoint(
tracker.lockWaitDuration(),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten());
}
else {
persStoreMetrics.onCheckpoint(
tracker.lockWaitDuration(),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten());
}
}
catch (IgniteCheckedException e) {
// TODO-ignite-db how to handle exception?
U.error(log, "Failed to create checkpoint.", e);
}
}
/**
*
*/
@SuppressWarnings("WaitNotInLoop")
private void waitCheckpointEvent() {
boolean cancel = false;
try {
long now = U.currentTimeMillis();
synchronized (this) {
long remaining;
while ((remaining = scheduledCp.nextCpTs - now) > 0 && !isCancelled()) {
wait(remaining);
now = U.currentTimeMillis();
}
}
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
cancel = true;
}
if (cancel)
isCancelled = true;
}
/**
*
*/
@SuppressWarnings("TooBroadScope")
private Checkpoint markCheckpointBegin(CheckpointMetricsTracker tracker) throws IgniteCheckedException {
CheckpointRecord cpRec = new CheckpointRecord(null);
WALPointer cpPtr = null;
final CheckpointProgress curr;
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple;
tracker.onLockWaitStart();
boolean hasPages;
IgniteFuture snapFut = null;
checkpointLock.writeLock().lock();
try {
tracker.onMarkStart();
synchronized (this) {
curr = scheduledCp;
curr.started = true;
if (curr.reason == null)
curr.reason = "timeout";
// It is important that we assign a new progress object before checkpoint mark in page memory.
scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
curCpProgress = curr;
}
final PartitionAllocationMap map = new PartitionAllocationMap();
DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
@Override public boolean nextSnapshot() {
return curr.nextSnapshot;
}
/** {@inheritDoc} */
@Override public PartitionAllocationMap partitionStatMap() {
return map;
}
@Override public boolean needToSnapshot(String cacheOrGrpName) {
return curr.snapshotOperation.cacheGroupIds().contains(CU.cacheId(cacheOrGrpName));
}
};
// Listeners must be invoked before we write checkpoint record to WAL.
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx0);
if (curr.nextSnapshot)
snapFut = snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
List<GridDhtLocalPartition> locParts = new ArrayList<>();
for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
locParts.add(part);
Collections.sort(locParts, ASC_PART_COMPARATOR);
CacheState state = new CacheState(locParts.size());
for (GridDhtLocalPartition part : grp.topology().currentLocalPartitions())
state.addPartitionState(part.id(), part.dataStore().fullSize(), part.updateCounter());
cpRec.addCacheGroupState(grp.groupId(), state);
}
cpPagesTuple = beginAllCheckpoints();
hasPages = hasPageForWrite(cpPagesTuple.get1());
if (hasPages) {
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
if (cpPtr == null)
cpPtr = CheckpointStatus.NULL_PTR;
}
}
finally {
checkpointLock.writeLock().unlock();
tracker.onLockRelease();
}
curr.cpBeginFut.onDone();
if (snapFut != null) {
try {
snapFut.get();
}
catch (IgniteException e) {
U.error(log, "Failed to wait for snapshot operation initialization: " +
curr.snapshotOperation + "]", e);
}
}
if (hasPages) {
assert cpPtr != null;
// Sync log outside the checkpoint write lock.
cctx.wal().fsync(cpPtr);
long cpTs = System.currentTimeMillis();
// This can happen in an unlikely event of two checkpoints happening
// within a currentTimeMillis() granularity window.
if (cpTs == lastCpTs)
cpTs++;
lastCpTs = cpTs;
CheckpointEntry cpEntry = writeCheckpointEntry(
tmpWriteBuf,
cpTs,
cpRec.checkpointId(),
cpPtr,
cpRec,
CheckpointEntryType.START);
checkpointHist.addCheckpointEntry(cpEntry);
GridMultiCollectionWrapper<FullPageId> cpPages = splitAndSortCpPagesIfNeeded(cpPagesTuple);
if (printCheckpointStats)
if (log.isInfoEnabled())
log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
"checkpointLockHoldTime=%dms, pages=%d, reason='%s']",
cpRec.checkpointId(),
cpPtr,
tracker.lockWaitDuration(),
tracker.lockHoldDuration(),
cpPages.size(),
curr.reason)
);
return new Checkpoint(cpEntry, cpPages, curr);
}
else {
if (curr.nextSnapshot)
cctx.wal().fsync(null);
if (printCheckpointStats) {
if (log.isInfoEnabled())
LT.info(log, String.format("Skipping checkpoint (no pages were modified) [" +
"checkpointLockWait=%dms, checkpointLockHoldTime=%dms, reason='%s']",
tracker.lockWaitDuration(),
tracker.lockHoldDuration(),
curr.reason));
}
GridMultiCollectionWrapper<FullPageId> wrapper = new GridMultiCollectionWrapper<>(new Collection[0]);
return new Checkpoint(null, wrapper, curr);
}
}
/**
* Check that at least one collection is not empty.
*
* @param cpPagesCollWrapper Collection of {@link GridMultiCollectionWrapper} checkpoint pages.
*/
private boolean hasPageForWrite(Collection<GridMultiCollectionWrapper<FullPageId>> cpPagesCollWrapper) {
boolean hasPages = false;
for (Collection c : cpPagesCollWrapper)
if (!c.isEmpty()) {
hasPages = true;
break;
}
return hasPages;
}
/**
* @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty
* pages.
*/
private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() {
Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(dataRegions().size());
int pagesNum = 0;
for (DataRegion memPlc : dataRegions()) {
if (!memPlc.config().isPersistenceEnabled())
continue;
GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx)memPlc.pageMemory()).beginCheckpoint();
pagesNum += nextCpPagesCol.size();
res.add(nextCpPagesCol);
}
return new IgniteBiTuple<>(res, pagesNum);
}
/**
* @param chp Checkpoint snapshot.
*/
private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
synchronized (this) {
for (DataRegion memPlc : dataRegions()) {
if (!memPlc.config().isPersistenceEnabled())
continue;
((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
}
if (chp.hasDelta())
writeCheckpointEntry(
tmpWriteBuf,
chp.cpEntry.checkpointTimestamp(),
chp.cpEntry.checkpointId(),
chp.cpEntry.checkpointMark(),
null,
CheckpointEntryType.END);
writtenPagesCntr = null;
currCheckpointPagesCnt = 0;
}
checkpointHist.onCheckpointFinished(chp);
if (chp.progress != null)
chp.progress.cpFinishFut.onDone();
}
/** {@inheritDoc} */
@Override public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
isCancelled = true;
synchronized (this) {
notifyAll();
}
}
/**
*
*/
public void shutdownNow() {
shutdownNow = true;
if (!isCancelled)
cancel();
}
}
/**
* Reorders list of checkpoint pages and splits them into needed number of sublists according to
* {@link DataStorageConfiguration#getCheckpointThreads()} and
* {@link DataStorageConfiguration#getCheckpointWriteOrder()}.
*
* @param cpPagesTuple Checkpoint pages tuple.
*/
private GridMultiCollectionWrapper<FullPageId> splitAndSortCpPagesIfNeeded(
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> cpPagesTuple
) {
List<FullPageId> cpPagesList = new ArrayList<>(cpPagesTuple.get2());
for (GridMultiCollectionWrapper<FullPageId> col : cpPagesTuple.get1()) {
for (int i = 0; i < col.collectionsSize(); i++)
cpPagesList.addAll(col.innerCollection(i));
}
if (persistenceCfg.getCheckpointWriteOrder() == CheckpointWriteOrder.SEQUENTIAL) {
Collections.sort(cpPagesList, new Comparator<FullPageId>() {
@Override public int compare(FullPageId o1, FullPageId o2) {
int cmp = Long.compare(o1.groupId(), o2.groupId());
if (cmp != 0)
return cmp;
return Long.compare(PageIdUtils.effectivePageId(o1.pageId()),
PageIdUtils.effectivePageId(o2.pageId()));
}
});
}
int cpThreads = persistenceCfg.getCheckpointThreads();
int pagesSubLists = cpThreads == 1 ? 1 : cpThreads * 4;
// Splitting pages to (threads * 4) subtasks. If any thread will be faster, it will help slower threads.
Collection[] pagesSubListArr = new Collection[pagesSubLists];
for (int i = 0; i < pagesSubLists; i++) {
int totalSize = cpPagesList.size();
int from = totalSize * i / (pagesSubLists);
int to = totalSize * (i + 1) / (pagesSubLists);
pagesSubListArr[i] = cpPagesList.subList(from, to);
}
return new GridMultiCollectionWrapper<FullPageId>(pagesSubListArr);
}
/** Pages write task */
private class WriteCheckpointPages implements Runnable {
/** */
private CheckpointMetricsTracker tracker;
/** Collection of page IDs to write under this task. Overall pages to write may be greater than this collection */
private Collection<FullPageId> writePageIds;
/** */
private GridConcurrentHashSet<PageStore> updStores;
/** */
private CountDownFuture doneFut;
/** Total pages to write, counter may be greater than {@link #writePageIds} size */
private final int totalPagesToWrite;
/**
* Creates task for write pages
*
* @param tracker
* @param writePageIds Collection of page IDs to write.
* @param updStores
* @param doneFut
* @param totalPagesToWrite total pages to be written under this checkpoint
*/
private WriteCheckpointPages(
final CheckpointMetricsTracker tracker,
final Collection<FullPageId> writePageIds,
final GridConcurrentHashSet<PageStore> updStores,
final CountDownFuture doneFut,
final int totalPagesToWrite) {
this.tracker = tracker;
this.writePageIds = writePageIds;
this.updStores = updStores;
this.doneFut = doneFut;
this.totalPagesToWrite = totalPagesToWrite;
}
/** {@inheritDoc} */
@Override public void run() {
ByteBuffer tmpWriteBuf = threadBuf.get();
long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
snapshotMgr.beforeCheckpointPageWritten();
try {
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
tmpWriteBuf.rewind();
snapshotMgr.beforePageWrite(fullId);
int grpId = fullId.groupId();
CacheGroupContext grp = context().cache().cacheGroup(grpId);
if (grp == null)
continue;
if (!grp.dataRegion().config().isPersistenceEnabled())
continue;
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
Integer tag = pageMem.getForCheckpoint(
fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
if (tag != null) {
tmpWriteBuf.rewind();
if (persStoreMetrics.metricsEnabled()) {
int pageType = PageIO.getType(tmpWriteBuf);
if (PageIO.isDataPageType(pageType))
tracker.onDataPageWritten();
}
if (!skipCrc) {
PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
tmpWriteBuf.rewind();
}
int curWrittenPages = writtenPagesCntr.incrementAndGet();
snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
tmpWriteBuf.rewind();
PageIO.setCrc(writeAddr, 0);
PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag);
updStores.add(store);
}
}
doneFut.onDone((Void)null);
}
catch (Throwable e) {
doneFut.onDone(e);
}
}
}
/**
*
*/
private enum CheckpointEntryType {
/** */
START,
/** */
END
}
/**
*
*/
private static class Checkpoint {
/** Checkpoint entry. */
private final CheckpointEntry cpEntry;
/** Checkpoint pages. */
private final GridMultiCollectionWrapper<FullPageId> cpPages;
/** */
private final CheckpointProgress progress;
/** Number of deleted WAL files. */
private int walFilesDeleted;
/** */
private final int pagesSize;
/**
* @param cpEntry Checkpoint entry.
* @param cpPages Pages to write to the page store.
* @param progress Checkpoint progress status.
*/
private Checkpoint(
CheckpointEntry cpEntry,
@NotNull GridMultiCollectionWrapper<FullPageId> cpPages,
CheckpointProgress progress
) {
assert cpEntry == null || cpEntry.initGuard != 0;
this.cpEntry = cpEntry;
this.cpPages = cpPages;
this.progress = progress;
pagesSize = cpPages.size();
}
/**
* @return {@code true} if this checkpoint contains at least one dirty page.
*/
private boolean hasDelta() {
return pagesSize != 0;
}
}
/**
*
*/
private static class CheckpointStatus {
/** Null checkpoint UUID. */
private static final UUID NULL_UUID = new UUID(0L, 0L);
/** Null WAL pointer. */
private static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
/** */
private long cpStartTs;
/** */
private UUID cpStartId;
/** */
@GridToStringInclude
private WALPointer startPtr;
/** */
private UUID cpEndId;
/** */
@GridToStringInclude
private WALPointer endPtr;
/**
* @param cpStartId Checkpoint start ID.
* @param startPtr Checkpoint start pointer.
* @param cpEndId Checkpoint end ID.
* @param endPtr Checkpoint end pointer.
*/
private CheckpointStatus(long cpStartTs, UUID cpStartId, WALPointer startPtr, UUID cpEndId, WALPointer endPtr) {
this.cpStartTs = cpStartTs;
this.cpStartId = cpStartId;
this.startPtr = startPtr;
this.cpEndId = cpEndId;
this.endPtr = endPtr;
}
/**
* @return {@code True} if need to apply page log to restore tree structure.
*/
public boolean needRestoreMemory() {
return !F.eq(cpStartId, cpEndId) && !F.eq(NULL_UUID, cpStartId);
}
/** {@inheritDoc} */
public String toString() {
return S.toString(CheckpointStatus.class, this);
}
}
/**
*
*/
private static class CheckpointProgress {
/** */
private volatile long nextCpTs;
/** */
private GridFutureAdapter cpBeginFut = new GridFutureAdapter<>();
/** */
private GridFutureAdapter cpFinishFut = new GridFutureAdapter<>();
/** */
private volatile boolean nextSnapshot;
/** */
private volatile boolean started;
/** */
private volatile SnapshotOperation snapshotOperation;
/** Wakeup reason. */
private String reason;
/**
* @param nextCpTs Next checkpoint timestamp.
*/
private CheckpointProgress(long nextCpTs) {
this.nextCpTs = nextCpTs;
}
}
/**
*
*/
private static class CheckpointProgressSnapshot {
/** */
private final boolean started;
/** */
private final GridFutureAdapter<Object> cpBeginFut;
/** */
private final GridFutureAdapter<Object> cpFinishFut;
/** */
CheckpointProgressSnapshot(CheckpointProgress cpProgress) {
started = cpProgress.started;
cpBeginFut = cpProgress.cpBeginFut;
cpFinishFut = cpProgress.cpFinishFut;
}
}
/**
* Checkpoint history. Holds chronological ordered map with {@link GridCacheDatabaseSharedManager.CheckpointEntry
* CheckpointEntries}. Data is loaded from corresponding checkpoint directory. This directory holds files for
* checkpoint start and end.
*/
@SuppressWarnings("PublicInnerClass")
public class CheckpointHistory {
/**
* Maps checkpoint's timestamp (from CP file name) to CP entry.
* Using TS provides historical order of CP entries in map ( first is oldest )
*/
private final NavigableMap<Long, CheckpointEntry> histMap = new ConcurrentSkipListMap<>();
/**
* Load history form checkpoint directory.
*
* @param dir Checkpoint state dir.
*/
private void loadHistory(File dir) throws IgniteCheckedException {
if (!dir.exists())
return;
File[] files = dir.listFiles(CP_FILE_FILTER);
if (!F.isEmpty(files)) {
Arrays.sort(files, CP_TS_COMPARATOR);
ByteBuffer buf = ByteBuffer.allocate(16);
buf.order(ByteOrder.nativeOrder());
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START) {
long cpTs = Long.parseLong(matcher.group(1));
WALPointer ptr = readPointer(file, buf);
if (ptr == null)
continue;
// Create lazy checkpoint entry.
CheckpointEntry entry = new CheckpointEntry(cpTs, ptr);
histMap.put(cpTs, entry);
}
}
}
}
}
/**
* @param cpTs Checkpoint timestamp.
* @return Initialized entry.
* @throws IgniteCheckedException If failed to initialize entry.
*/
private CheckpointEntry entry(Long cpTs) throws IgniteCheckedException {
CheckpointEntry entry = histMap.get(cpTs);
if (entry == null)
throw new IgniteCheckedException("Checkpoint entry was removed: " + cpTs);
entry.initIfNeeded(cctx);
return entry;
}
/**
* @return Collection of checkpoint timestamps.
*/
public Collection<Long> checkpoints() {
return histMap.keySet();
}
/**
* Adds checkpoint entry after the corresponding WAL record has been written to WAL. The checkpoint itself
* is not finished yet.
*
* @param entry Entry to ad.
*/
private void addCheckpointEntry(CheckpointEntry entry) {
histMap.put(entry.checkpointTimestamp(), entry);
}
/**
* Clears checkpoint history.
*/
private void onCheckpointFinished(Checkpoint chp) {
int deleted = 0;
while (histMap.size() > persistenceCfg.getWalHistorySize()) {
Map.Entry<Long, CheckpointEntry> entry = histMap.firstEntry();
CheckpointEntry cpEntry = entry.getValue();
if (cctx.wal().reserved(cpEntry.checkpointMark())) {
U.warn(log, "Could not clear historyMap due to WAL reservation on cpEntry " + cpEntry.cpId +
", history map size is " + histMap.size());
break;
}
File startFile = new File(cpDir.getAbsolutePath(), cpEntry.startFile());
File endFile = new File(cpDir.getAbsolutePath(), cpEntry.endFile());
boolean rmvdStart = !startFile.exists() || startFile.delete();
boolean rmvdEnd = !endFile.exists() || endFile.delete();
boolean fail = !rmvdStart || !rmvdEnd;
if (fail) {
U.warn(log, "Failed to remove stale checkpoint files [startFile=" + startFile.getAbsolutePath() +
", endFile=" + endFile.getAbsolutePath() + ']');
if (histMap.size() > 2 * persistenceCfg.getWalHistorySize()) {
U.error(log, "Too many stale checkpoint entries in the map, will truncate WAL archive anyway.");
fail = false;
}
}
if (!fail) {
deleted += cctx.wal().truncate(cpEntry.checkpointMark());
histMap.remove(entry.getKey());
}
else
break;
}
chp.walFilesDeleted = deleted;
}
/**
* @param cacheId Cache ID.
* @param partId Partition ID.
* @return Reserved counter or null if couldn't reserve.
*/
@Nullable private Long reserve(int cacheId, int partId) {
for (CheckpointEntry entry : histMap.values()) {
try {
entry.initIfNeeded(cctx);
if (entry.cacheGrpStates == null)
continue;
CacheState grpState = entry.cacheGrpStates.get(cacheId);
if (grpState == null)
continue;
long partCntr = grpState.counterByPartition(partId);
if (partCntr >= 0) {
if (cctx.wal().reserve(entry.checkpointMark()))
return partCntr;
}
}
catch (Exception e) {
U.error(log, "Error while trying to reserve history", e);
}
}
return null;
}
}
/**
*
*/
private static class CheckpointEntry {
/** */
private static final AtomicIntegerFieldUpdater<CheckpointEntry> initGuardUpdater =
AtomicIntegerFieldUpdater.newUpdater(CheckpointEntry.class, "initGuard");
/** Checkpoint timestamp. */
private long cpTs;
/** Checkpoint end mark. */
private WALPointer cpMark;
/** Initialization latch. */
private CountDownLatch initLatch;
/** */
@SuppressWarnings("unused")
private volatile int initGuard;
/** Checkpoint ID. Initialized lazily. */
private UUID cpId;
/** Cache states. Initialized lazily. */
private Map<Integer, CacheState> cacheGrpStates;
/** Initialization exception. */
private IgniteCheckedException initEx;
/**
* Lazy entry constructor.
*
* @param cpTs Checkpoint timestamp.
* @param cpMark Checkpoint end mark (WAL pointer).
*/
private CheckpointEntry(long cpTs, WALPointer cpMark) {
assert cpMark != null;
this.cpTs = cpTs;
this.cpMark = cpMark;
initLatch = new CountDownLatch(1);
}
/**
* Creates complete entry.
*
* @param cpTs Checkpoint timestamp.
* @param cpMark Checkpoint mark pointer.
* @param cpId Checkpoint ID.
* @param cacheGrpStates Cache groups states.
*/
private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheGrpStates) {
this.cpTs = cpTs;
this.cpMark = cpMark;
this.cpId = cpId;
this.cacheGrpStates = cacheGrpStates;
initGuard = 1;
initLatch = new CountDownLatch(0);
}
/**
* @return Checkpoint timestamp.
*/
private long checkpointTimestamp() {
return cpTs;
}
/**
* @return Checkpoint ID.
*/
private UUID checkpointId() {
return cpId;
}
/**
* @return Checkpoint mark.
*/
private WALPointer checkpointMark() {
return cpMark;
}
/**
* @return Start file name.
*/
private String startFile() {
return checkpointFileName(cpTs, cpId, CheckpointEntryType.START);
}
/**
* @return End file name.
*/
private String endFile() {
return checkpointFileName(cpTs, cpId, CheckpointEntryType.END);
}
/**
* @param grpId Cache group ID.
* @param part Partition ID.
* @return Partition counter or {@code null} if not found.
*/
private Long partitionCounter(int grpId, int part) {
assert initGuard != 0;
if (initEx != null || cacheGrpStates == null)
return null;
CacheState state = cacheGrpStates.get(grpId);
if (state != null) {
long cntr = state.counterByPartition(part);
return cntr < 0 ? null : cntr;
}
return null;
}
/**
* @throws IgniteCheckedException If failed to read WAL entry.
*/
private void initIfNeeded(GridCacheSharedContext cctx) throws IgniteCheckedException {
if (initGuardUpdater.compareAndSet(this, 0, 1)) {
try (WALIterator it = cctx.wal().replay(cpMark)) {
if (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
CheckpointRecord rec = (CheckpointRecord)tup.get2();
cpId = rec.checkpointId();
cacheGrpStates = rec.cacheGroupStates();
}
else
initEx = new IgniteCheckedException("Failed to find checkpoint record at " +
"the given WAL pointer: " + cpMark);
}
catch (IgniteCheckedException e) {
initEx = e;
}
finally {
initLatch.countDown();
}
}
else {
U.await(initLatch);
if (initEx != null)
throw initEx;
}
}
}
/**
*
*/
public static class FileLockHolder implements AutoCloseable {
/** Lock file name. */
private static final String lockFileName = "lock";
/** File. */
private File file;
/** Channel. */
private RandomAccessFile lockFile;
/** Lock. */
private FileLock lock;
/** Kernal context to generate Id of locked node in file. */
@NotNull private GridKernalContext ctx;
/** Logger. */
private IgniteLogger log;
/**
* @param path Path.
*/
public FileLockHolder(String path, @NotNull GridKernalContext ctx, IgniteLogger log) {
try {
file = Paths.get(path, lockFileName).toFile();
lockFile = new RandomAccessFile(file, "rw");
this.ctx = ctx;
this.log = log;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* @param lockWaitTimeMillis During which time thread will try capture file lock.
* @throws IgniteCheckedException If failed to capture file lock.
*/
public void tryLock(long lockWaitTimeMillis) throws IgniteCheckedException {
assert lockFile != null;
FileChannel ch = lockFile.getChannel();
SB sb = new SB();
//write node id
sb.a("[").a(ctx.localNodeId().toString()).a("]");
//write ip addresses
final GridDiscoveryManager discovery = ctx.discovery();
if (discovery != null) { //discovery may be not up and running
final ClusterNode node = discovery.localNode();
if (node != null)
sb.a(node.addresses());
}
//write ports
sb.a("[");
Iterator<GridPortRecord> it = ctx.ports().records().iterator();
while (it.hasNext()) {
GridPortRecord rec = it.next();
sb.a(rec.protocol()).a(":").a(rec.port());
if (it.hasNext())
sb.a(", ");
}
sb.a("]");
String failMsg;
try {
String content = null;
// Try to get lock, if not available wait 1 sec and re-try.
for (int i = 0; i < lockWaitTimeMillis; i += 1000) {
try {
lock = ch.tryLock(0, 1, false);
if (lock != null && lock.isValid()) {
writeContent(sb.toString());
return;
}
}
catch (OverlappingFileLockException ignore) {
if (content == null)
content = readContent();
log.warning("Failed to acquire file lock (local nodeId:" + ctx.localNodeId()
+ ", already locked by " + content + "), will try again in 1s: "
+ file.getAbsolutePath());
}
U.sleep(1000);
}
if (content == null)
content = readContent();
failMsg = "Failed to acquire file lock during " + (lockWaitTimeMillis / 1000) +
" sec, (locked by " + content + "): " + file.getAbsolutePath();
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
if (failMsg != null)
throw new IgniteCheckedException(failMsg);
}
/**
* Write node id (who captured lock) into lock file.
*
* @param content Node id.
* @throws IOException if some fail while write node it.
*/
private void writeContent(String content) throws IOException {
FileChannel ch = lockFile.getChannel();
byte[] bytes = content.getBytes();
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
ch.write(buf, 1);
ch.force(false);
}
/**
*
*/
private String readContent() throws IOException {
FileChannel ch = lockFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate((int)(ch.size() - 1));
ch.read(buf, 1);
String content = new String(buf.array());
buf.clear();
return content;
}
/** Releases file lock */
public void release() {
U.releaseQuiet(lock);
}
/** Closes file channel */
public void close() {
U.closeQuiet(lockFile);
}
/**
* @return Absolute path to lock file.
*/
private String lockPath() {
return file.getAbsolutePath();
}
}
/** {@inheritDoc} */
@Override public DataStorageMetrics persistentStoreMetrics() {
return new DataStorageMetricsSnapshot(persStoreMetrics);
}
/**
*
*/
public DataStorageMetricsImpl persistentStoreMetricsImpl() {
return persStoreMetrics;
}
}