blob: c57f9cb91519514cb092bc0c0755211d41f2c598 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.database;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.channels.OverlappingFileLockException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataPageEvictionMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.MemoryConfiguration;
import org.apache.ignite.configuration.MemoryPolicyConfiguration;
import org.apache.ignite.configuration.PersistentStoreConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.mem.DirectMemoryProvider;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.snapshot.SnapshotOperation;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.pagemem.wal.StorageException;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.CacheState;
import org.apache.ignite.internal.pagemem.wal.record.CheckpointRecord;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.MemoryRecoveryRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PageDeltaRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.ClusterState;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.database.file.FilePageStoreManager;
import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.database.pagemem.PageMemoryImpl;
import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.database.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.database.wal.crc.PureJavaCrc32;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.port.GridPortRecord;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridInClosure3X;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P3;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_PARTITION_DESTROY_CHECKPOINT_DELAY;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_SKIP_CRC;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PDS_WAL_REBALANCE_THRESHOLD;
/**
*
*/
@SuppressWarnings({"unchecked", "NonPrivateFieldAccessedInSynchronizedContext"})
public class GridCacheDatabaseSharedManager extends IgniteCacheDatabaseSharedManager {
/** */
public static final String IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC = "IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC";
/** Skip sync. */
private final boolean skipSync = IgniteSystemProperties.getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
/** */
private boolean skipCrc = IgniteSystemProperties.getBoolean(IGNITE_PDS_SKIP_CRC, false);
/** Checkpoint initiation delay after a partition has been scheduled for destroy. */
private volatile long partDestroyCheckpointDelay =
IgniteSystemProperties.getLong(IGNITE_PDS_PARTITION_DESTROY_CHECKPOINT_DELAY, 30_000);
/** */
private final int walRebalanceThreshold = IgniteSystemProperties.getInteger(
IGNITE_PDS_WAL_REBALANCE_THRESHOLD, 500_000);
/** Checkpoint lock hold count. */
private static final ThreadLocal<Integer> CHECKPOINT_LOCK_HOLD_COUNT = new ThreadLocal<Integer>() {
@Override protected Integer initialValue() {
return 0;
}
};
/** Assertion enabled. */
private static final boolean ASSERTION_ENABLED = GridCacheDatabaseSharedManager.class.desiredAssertionStatus();
/** Checkpoint file name pattern. */
private static final Pattern CP_FILE_NAME_PATTERN = Pattern.compile("(\\d+)-(.*)-(START|END)\\.bin");
/** */
private static final FileFilter CP_FILE_FILTER = new FileFilter() {
@Override public boolean accept(File f) {
return CP_FILE_NAME_PATTERN.matcher(f.getName()).matches();
}
};
/** */
private static final Comparator<File> CP_TS_COMPARATOR = new Comparator<File>() {
/** {@inheritDoc} */
@Override public int compare(File o1, File o2) {
Matcher m1 = CP_FILE_NAME_PATTERN.matcher(o1.getName());
Matcher m2 = CP_FILE_NAME_PATTERN.matcher(o2.getName());
boolean s1 = m1.matches();
boolean s2 = m2.matches();
assert s1 : "Failed to match CP file: " + o1.getAbsolutePath();
assert s2 : "Failed to match CP file: " + o2.getAbsolutePath();
long ts1 = Long.parseLong(m1.group(1));
long ts2 = Long.parseLong(m2.group(1));
int res = Long.compare(ts1, ts2);
if (res == 0) {
CheckpointEntryType type1 = CheckpointEntryType.valueOf(m1.group(3));
CheckpointEntryType type2 = CheckpointEntryType.valueOf(m2.group(3));
assert type1 != type2 : "o1=" + o1.getAbsolutePath() + ", o2=" + o2.getAbsolutePath();
res = type1 == CheckpointEntryType.START ? -1 : 1;
}
return res;
}
};
/** Checkpoint thread. Needs to be volatile because it is created in exchange worker. */
private volatile Checkpointer checkpointer;
/** For testing only. */
private volatile boolean checkpointsEnabled = true;
/** For testing only. */
private volatile GridFutureAdapter<Void> enableChangeApplied;
/** */
public ReentrantReadWriteLock checkpointLock = new ReentrantReadWriteLock();
/** */
private long checkpointFreq;
/** */
private long checkpointPageBufSize;
/** */
private FilePageStoreManager storeMgr;
/** */
private File cpDir;
/** */
private volatile boolean printCheckpointStats = true;
/** Database configuration. */
private final PersistentStoreConfiguration dbCfg;
/** */
private final Collection<DbCheckpointListener> lsnrs = new CopyOnWriteArrayList<>();
/** Checkpoint history. */
private final CheckpointHistory checkpointHist = new CheckpointHistory();
/** */
private boolean stopping;
/** Checkpoint runner thread pool. */
private ExecutorService asyncRunner;
/** Buffer for the checkpoint threads. */
private ThreadLocal<ByteBuffer> threadBuf;
/** */
private final ConcurrentMap<Integer, IgniteInternalFuture> idxRebuildFuts = new ConcurrentHashMap<>();
/** Lock holder. */
private FileLockHolder fileLockHolder;
/** Lock wait time. */
private final int lockWaitTime;
/** */
private Map<Integer, Map<Integer, T2<Long, WALPointer>>> reservedForExchange;
/** */
private final ConcurrentMap<T2<Integer, Integer>, T2<Long, WALPointer>> reservedForPreloading = new ConcurrentHashMap<>();
/** Snapshot manager. */
private IgniteCacheSnapshotManager snapshotMgr;
/**
* @param ctx Kernal context.
*/
public GridCacheDatabaseSharedManager(GridKernalContext ctx) {
IgniteConfiguration cfg = ctx.config();
dbCfg = cfg.getPersistentStoreConfiguration();
assert dbCfg != null : "PageStore should not be created if persistence is disabled.";
checkpointFreq = dbCfg.getCheckpointFrequency();
lockWaitTime = dbCfg.getLockWaitTime();
final int pageSize = cfg.getMemoryConfiguration().getPageSize();
threadBuf = new ThreadLocal<ByteBuffer>() {
/** {@inheritDoc} */
@Override protected ByteBuffer initialValue() {
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize);
tmpWriteBuf.order(ByteOrder.nativeOrder());
return tmpWriteBuf;
}
};
}
/**
*
*/
public Checkpointer getCheckpointer() {
return checkpointer;
}
/**
* For test use only.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
enableChangeApplied = fut;
checkpointsEnabled = enable;
wakeupForCheckpoint("enableCheckpoints()");
return fut;
}
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
snapshotMgr = cctx.snapshot();
initDataBase();
if (!cctx.kernalContext().clientNode()) {
IgnitePageStoreManager store = cctx.pageStore();
assert store instanceof FilePageStoreManager : "Invalid page store manager was created: " + store;
storeMgr = (FilePageStoreManager)store;
cpDir = Paths.get(storeMgr.workDir().getAbsolutePath(), "cp").toFile();
if (!U.mkdirs(cpDir))
throw new IgniteCheckedException("Could not create directory for checkpoint metadata: " + cpDir);
fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
}
}
/**
*
*/
@Override public void initDataBase() throws IgniteCheckedException {
Long cpBufSize = dbCfg.getCheckpointPageBufferSize();
if (dbCfg.getCheckpointThreads() > 1)
asyncRunner = new ThreadPoolExecutor(
dbCfg.getCheckpointThreads(),
dbCfg.getCheckpointThreads(),
30L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>()
);
// Intentionally use identity comparison to check if configuration default has changed.
// Noinspection NumberEquality.
if (cpBufSize == PersistentStoreConfiguration.DFLT_CHECKPOINT_PAGE_BUFFER_SIZE) {
MemoryConfiguration memCfg = cctx.kernalContext().config().getMemoryConfiguration();
assert memCfg != null;
long totalSize = memCfg.getSystemCacheMaxSize();
if (memCfg.getMemoryPolicies() == null)
totalSize += MemoryConfiguration.DFLT_MEMORY_POLICY_MAX_SIZE;
else {
for (MemoryPolicyConfiguration memPlc : memCfg.getMemoryPolicies()) {
if (Long.MAX_VALUE - memPlc.getMaxSize() > totalSize)
totalSize += memPlc.getMaxSize();
else {
totalSize = Long.MAX_VALUE;
break;
}
}
assert totalSize > 0;
}
// Limit the checkpoint page buffer size by 2GB.
long dfltSize = 2 * 1024L * 1024L * 1024L;
long adjusted = Math.min(totalSize / 4, dfltSize);
if (cpBufSize < adjusted) {
U.quietAndInfo(log,
"Default checkpoint page buffer size is too small, setting to an adjusted value: "
+ U.readableSize(adjusted, false)
);
cpBufSize = adjusted;
}
}
checkpointPageBufSize = cpBufSize;
super.start0();
}
/** {@inheritDoc} */
@Override protected void initPageMemoryDataStructures(MemoryConfiguration dbCfg) throws IgniteCheckedException {
// No-op.
}
/** {@inheritDoc} */
@Override protected void onKernalStart0(boolean reconnect) throws IgniteCheckedException {
if (!reconnect && !cctx.kernalContext().clientNode() && cctx.kernalContext().state().active()) {
Collection<String> cacheNames = new HashSet<>();
for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
if (CU.isSystemCache(ccfg.getName())) {
storeMgr.initializeForCache(ccfg);
cacheNames.add(ccfg.getName());
}
for (CacheConfiguration ccfg : cctx.kernalContext().config().getCacheConfiguration())
if (!CU.isSystemCache(ccfg.getName())) {
storeMgr.initializeForCache(ccfg);
cacheNames.add(ccfg.getName());
}
for (String name : cctx.pageStore().savedCacheNames()) {
CacheConfiguration ccfg = cctx.pageStore().readConfiguration(name);
if (ccfg != null && !cacheNames.contains(name))
storeMgr.initializeForCache(ccfg);
}
readCheckpointAndRestoreMemory();
}
}
/** {@inheritDoc} */
@Override public void onActivate(GridKernalContext kctx) throws IgniteCheckedException {
super.onActivate(kctx);
if (log.isDebugEnabled())
log.debug("Activate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
if (!cctx.kernalContext().clientNode()) {
readCheckpointAndRestoreMemory();
if (log.isDebugEnabled())
log.debug("Restore state after activation [nodeId=" + cctx.localNodeId() + " ]");
}
}
/** {@inheritDoc} */
@Override public void onDeActivate(GridKernalContext kctx) throws IgniteCheckedException {
super.onDeActivate(kctx);
if (log.isDebugEnabled())
log.debug("DeActivate database manager [id=" + cctx.localNodeId() +
" topVer=" + cctx.discovery().topologyVersionEx() + " ]");
onKernalStop0(false);
/* Must be here, because after deactivate we can invoke activate and file lock must be already configured */
stopping = false;
fileLockHolder = new FileLockHolder(storeMgr.workDir().getPath(), cctx.kernalContext(), log);
}
/**
*
*/
private void readCheckpointAndRestoreMemory() throws IgniteCheckedException {
checkpointReadLock();
try {
CheckpointStatus status = readCheckpointStatus();
// First, bring memory to the last consistent checkpoint state if needed.
// This method should return a pointer to the last valid record in the WAL.
WALPointer restore = restoreMemory(status);
cctx.wal().resumeLogging(restore);
cctx.wal().log(new MemoryRecoveryRecord(U.currentTimeMillis()));
}
catch (StorageException e) {
throw new IgniteCheckedException(e);
}
finally {
checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public void lock() throws IgniteCheckedException {
if (log.isDebugEnabled())
log.debug("Try to capture file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.tryLock(lockWaitTime);
}
/** {@inheritDoc} */
@Override public void unLock() {
if (log.isDebugEnabled())
log.debug("Release file lock [nodeId=" +
cctx.localNodeId() + " path=" + fileLockHolder.lockPath() + "]");
fileLockHolder.release();
}
/** {@inheritDoc} */
@Override public void onCacheStop(GridCacheContext cctx) {
snapshotMgr.onCacheStop(cctx);
}
/** {@inheritDoc} */
@Override protected void onKernalStop0(boolean cancel) {
checkpointLock.writeLock().lock();
try {
stopping = true;
}
finally {
checkpointLock.writeLock().unlock();
}
snapshotMgr.onKernalStop(cancel);
shutdownCheckpointer(cancel);
lsnrs.clear();
super.onKernalStop0(cancel);
if (!cctx.kernalContext().clientNode()) {
unLock();
fileLockHolder.close();
}
}
/** {@inheritDoc} */
@Override protected void stop0(boolean cancel) {
super.stop0(cancel);
snapshotMgr.stop(cancel);
}
/** {@inheritDoc} */
protected long[] calculateFragmentSizes(int concLvl, long cacheSize) {
if (concLvl < 2)
concLvl = Runtime.getRuntime().availableProcessors();
long fragmentSize = cacheSize / concLvl;
if (fragmentSize < 1024 * 1024)
fragmentSize = 1024 * 1024;
long[] sizes = new long[concLvl + 1];
for (int i = 0; i < concLvl; i++)
sizes[i] = fragmentSize;
sizes[concLvl] = checkpointPageBufSize;
return sizes;
}
/** {@inheritDoc} */
@Override protected PageMemory createPageMemory(
DirectMemoryProvider memProvider,
MemoryConfiguration memCfg,
MemoryPolicyConfiguration plcCfg,
MemoryMetricsImpl memMetrics
) {
return new PageMemoryImpl(
memProvider,
calculateFragmentSizes(
memCfg.getConcurrencyLevel(),
plcCfg.getMaxSize()
),
cctx,
memCfg.getPageSize(),
new GridInClosure3X<FullPageId, ByteBuffer, Integer>() {
@Override public void applyx(
FullPageId fullId,
ByteBuffer pageBuf,
Integer tag
) throws IgniteCheckedException {
storeMgr.write(fullId.cacheId(), fullId.pageId(), pageBuf, tag);
snapshotMgr.flushDirtyPageHandler(fullId, pageBuf, tag);
}
},
new GridInClosure3X<Long,FullPageId, PageMemoryEx>() {
@Override public void applyx(
Long page,
FullPageId fullId,
PageMemoryEx pageMem
) throws IgniteCheckedException {
snapshotMgr.onChangeTrackerPage(page,fullId,pageMem);
}
},
this
);
}
/** {@inheritDoc} */
@Override protected void checkPolicyEvictionProperties(MemoryPolicyConfiguration plcCfg, MemoryConfiguration dbCfg)
throws IgniteCheckedException {
if (plcCfg.getPageEvictionMode() != DataPageEvictionMode.DISABLED)
throw new IgniteCheckedException("Page eviction is not compatible with persistence: " + plcCfg.getName());
}
/**
* @param cancel Cancel flag.
*/
@SuppressWarnings("unused")
private void shutdownCheckpointer(boolean cancel) {
Checkpointer cp = checkpointer;
if (cp != null) {
if (cancel)
cp.shutdownNow();
else
cp.cancel();
try {
U.join(cp);
checkpointer = null;
}
catch (IgniteInterruptedCheckedException ignore) {
U.warn(log, "Was interrupted while waiting for checkpointer shutdown, " +
"will not wait for checkpoint to finish.");
cp.shutdownNow();
while (true) {
try {
U.join(cp);
checkpointer = null;
break;
}
catch (IgniteInterruptedCheckedException ignored) {
//Ignore
}
}
Thread.currentThread().interrupt();
}
}
if (asyncRunner != null) {
asyncRunner.shutdownNow();
try {
asyncRunner.awaitTermination(2, TimeUnit.MINUTES);
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
}
}
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
DiscoveryEvent discoEvt = fut.discoveryEvent();
boolean joinEvt = discoEvt.type() == EventType.EVT_NODE_JOINED;
boolean locNode = discoEvt.eventNode().isLocal();
boolean isSrvNode = !cctx.kernalContext().clientNode();
boolean clusterStatusActive = cctx.kernalContext().state().active();
boolean clusterInTransitionStateToActive = fut.newClusterState() == ClusterState.ACTIVE;
// Before local node join event.
if (clusterInTransitionStateToActive ||
(joinEvt && locNode && isSrvNode && clusterStatusActive))
restoreState();
if (cctx.kernalContext().query().moduleEnabled()) {
for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
if (cacheCtx.startTopologyVersion().equals(fut.topologyVersion()) &&
!cctx.pageStore().hasIndexStore(cacheCtx.cacheId()) && cacheCtx.affinityNode()) {
final int cacheId = cacheCtx.cacheId();
final IgniteInternalFuture<?> rebuildFut = cctx.kernalContext().query()
.rebuildIndexesFromHash(Collections.singletonList(cacheCtx.cacheId()));
idxRebuildFuts.put(cacheId, rebuildFut);
rebuildFut.listen(new CI1<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture igniteInternalFut) {
idxRebuildFuts.remove(cacheId, rebuildFut);
}
});
}
}
}
}
/** {@inheritDoc} */
@Nullable @Override public IgniteInternalFuture indexRebuildFuture(int cacheId) {
return idxRebuildFuts.get(cacheId);
}
/** {@inheritDoc} */
@Override public boolean persistenceEnabled() {
return true;
}
/** {@inheritDoc} */
@Override public void onCachesStopped(Collection<IgniteBiTuple<GridCacheContext, Boolean>> stoppedCtxs) {
try {
waitForCheckpoint("caches stop");
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for checkpoint finish during cache stop.", e);
}
Map<PageMemoryEx, Collection<Integer>> destroyed = new HashMap<>();
for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) {
PageMemoryEx pageMem = (PageMemoryEx)tup.get1().memoryPolicy().pageMemory();
Collection<Integer> cacheIds = destroyed.get(pageMem);
if (cacheIds == null) {
cacheIds = new HashSet<>();
destroyed.put(pageMem, cacheIds);
}
cacheIds.add(tup.get1().cacheId());
pageMem.onCacheDestroyed(tup.get1().cacheId());
}
Collection<IgniteInternalFuture<Void>> clearFuts = new ArrayList<>(destroyed.size());
for (Map.Entry<PageMemoryEx, Collection<Integer>> entry : destroyed.entrySet()) {
final Collection<Integer> cacheIds = entry.getValue();
clearFuts.add(entry.getKey().clearAsync(new P3<Integer, Long, Integer>() {
@Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
return cacheIds.contains(cacheId);
}
}, false));
}
for (IgniteInternalFuture<Void> clearFut : clearFuts) {
try {
clearFut.get();
}
catch (IgniteCheckedException e) {
log.error("Failed to clear page memory", e);
}
}
if (cctx.pageStore() != null) {
for (IgniteBiTuple<GridCacheContext, Boolean> tup : stoppedCtxs) {
GridCacheContext cacheCtx = tup.get1();
try {
cctx.pageStore().shutdownForCache(cacheCtx, tup.get2());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to gracefully clean page store resources for destroyed cache " +
"[cache=" + cacheCtx.name() + "]", e);
}
}
}
}
/**
* Gets the checkpoint read lock. While this lock is held, checkpoint thread will not acquiSnapshotWorkerre memory state.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void checkpointReadLock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
for (; ; ) {
checkpointLock.readLock().lock();
if (stopping) {
checkpointLock.readLock().unlock();
throw new RuntimeException("Failed to perform cache update: node is stopping.");
}
if (safeToUpdatePageMemories() || checkpointLock.getReadHoldCount() > 1)
break;
else {
checkpointLock.readLock().unlock();
try {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages").cpBeginFut.get();
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to wait for checkpoint begin.", e);
}
}
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() + 1);
}
/** {@inheritDoc} */
@Override public boolean checkpointLockIsHeldByThread() {
return !ASSERTION_ENABLED ||
checkpointLock.isWriteLockedByCurrentThread() ||
CHECKPOINT_LOCK_HOLD_COUNT.get() > 0;
}
/**
* @return {@code true} if all PageMemory instances are safe to update.
*/
private boolean safeToUpdatePageMemories() {
Collection<MemoryPolicy> memPlcs = context().database().memoryPolicies();
if (memPlcs == null)
return true;
for (MemoryPolicy memPlc : memPlcs) {
PageMemoryEx pageMemEx = (PageMemoryEx) memPlc.pageMemory();
if (!pageMemEx.safeToUpdate())
return false;
}
return true;
}
/**
* Releases the checkpoint read lock.
*/
@Override public void checkpointReadUnlock() {
if (checkpointLock.writeLock().isHeldByCurrentThread())
return;
checkpointLock.readLock().unlock();
if (checkpointer != null) {
Collection<GridCacheContext> cacheCtxs = context().cacheContexts();
for (GridCacheContext cacheCtx : cacheCtxs) {
PageMemoryEx mem = (PageMemoryEx) cacheCtx.memoryPolicy().pageMemory();
if (mem != null && !mem.safeToUpdate()) {
checkpointer.wakeupForCheckpoint(0, "too many dirty pages");
break;
}
}
}
if (ASSERTION_ENABLED)
CHECKPOINT_LOCK_HOLD_COUNT.set(CHECKPOINT_LOCK_HOLD_COUNT.get() - 1);
}
/**
* @throws IgniteCheckedException If failed to restore database status from WAL.
*/
public void restoreState() throws IgniteCheckedException {
try {
CheckpointStatus status = readCheckpointStatus();
checkpointReadLock();
try {
applyLastUpdates(status);
}
finally {
checkpointReadUnlock();
}
snapshotMgr.restoreState();
checkpointer = new Checkpointer(cctx.igniteInstanceName(), "db-checkpoint-thread", log);
new IgniteThread(cctx.igniteInstanceName(), "db-checkpoint-thread", checkpointer).start();
}
catch (StorageException e) {
throw new IgniteCheckedException(e);
}
}
/** {@inheritDoc} */
@Override public synchronized Map<Integer, Map<Integer, Long>> reserveHistoryForExchange() {
assert reservedForExchange == null : reservedForExchange;
reservedForExchange = new HashMap<>();
for (GridCacheContext cacheCtx : (Collection<GridCacheContext>)cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions()) {
if (part.state() != GridDhtPartitionState.OWNING || part.dataStore().size() <= walRebalanceThreshold)
continue;
CheckpointEntry cpEntry = searchCheckpointEntry(cacheCtx, part.id(), null);
try {
if (cpEntry != null && cctx.wal().reserve(cpEntry.cpMark)) {
Map<Integer, T2<Long, WALPointer>> cacheMap = reservedForExchange.get(cacheCtx.cacheId());
if (cacheMap == null) {
cacheMap = new HashMap<>();
reservedForExchange.put(cacheCtx.cacheId(), cacheMap);
}
cacheMap.put(part.id(), new T2<>(cpEntry.partitionCounter(cacheCtx.cacheId(), part.id()), cpEntry.cpMark));
}
}
catch (IgniteCheckedException ex) {
U.error(log, "Error while trying to reserve history", ex);
}
}
}
Map<Integer, Map<Integer, Long>> resMap = new HashMap<>();
for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
Map<Integer, Long> cacheMap = new HashMap<>();
for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet())
cacheMap.put(e0.getKey(), e0.getValue().get1());
resMap.put(e.getKey(), cacheMap);
}
return resMap;
}
/** {@inheritDoc} */
@Override public synchronized void releaseHistoryForExchange() {
if (reservedForExchange == null)
return;
for (Map.Entry<Integer, Map<Integer, T2<Long, WALPointer>>> e : reservedForExchange.entrySet()) {
for (Map.Entry<Integer, T2<Long, WALPointer>> e0 : e.getValue().entrySet()) {
try {
cctx.wal().release(e0.getValue().get2());
}
catch (IgniteCheckedException ex) {
U.error(log, "Could not release history lock", ex);
}
}
}
reservedForExchange = null;
}
/** {@inheritDoc} */
@Override public boolean reserveHistoryForPreloading(int cacheId, int partId, long cntr) {
CheckpointEntry cpEntry = searchCheckpointEntry(cctx.cacheContext(cacheId), partId, cntr);
if (cpEntry == null)
return false;
WALPointer ptr = cpEntry.cpMark;
if (ptr == null)
return false;
boolean reserved;
try {
reserved = cctx.wal().reserve(ptr);
}
catch (IgniteCheckedException e) {
U.error(log, "Error while trying to reserve history", e);
reserved = false;
}
if (reserved)
reservedForPreloading.put(new T2<>(cacheId, partId), new T2<>(cntr, ptr));
return reserved;
}
/** {@inheritDoc} */
@Override public void releaseHistoryForPreloading() {
for (Map.Entry<T2<Integer, Integer>, T2<Long, WALPointer>> e : reservedForPreloading.entrySet()) {
try {
cctx.wal().release(e.getValue().get2());
}
catch (IgniteCheckedException ex) {
U.error(log, "Could not release WAL reservation", ex);
throw new IgniteException(ex);
}
}
reservedForPreloading.clear();
}
/**
* For debugging only. TODO: remove.
*
*/
public Map<T2<Integer, Integer>, T2<Long, WALPointer>> reservedForPreloading() {
return reservedForPreloading;
}
/**
*
*/
@Nullable @Override public IgniteInternalFuture wakeupForCheckpoint(String reason) {
Checkpointer cp = checkpointer;
if (cp != null)
return cp.wakeupForCheckpoint(0, reason).cpBeginFut;
return null;
}
/** {@inheritDoc} */
@Override public void waitForCheckpoint(String reason) throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp == null)
return;
CheckpointProgressSnapshot progSnapshot = cp.wakeupForCheckpoint(0, reason);
IgniteInternalFuture fut1 = progSnapshot.cpFinishFut;
fut1.get();
if (!progSnapshot.started)
return;
IgniteInternalFuture fut2 = cp.wakeupForCheckpoint(0, reason).cpFinishFut;
assert fut1 != fut2;
fut2.get();
}
/**
* Schedules partition destroy during next checkpoint. This method must be called inside checkpoint read lock.
*
* @param cacheCtx Cache context.
* @param partId Partition ID.
*/
public void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) {
Checkpointer cp = checkpointer;
if (cp != null)
cp.schedulePartitionDestroy(cacheCtx, partId);
}
/**
* Cancels partition destroy if it has not begun yet. Otherwise, will wait for cleanup to finish.
*
* @param cacheCtx Cache context.
* @param partId Partition ID.
*/
public void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId)
throws IgniteCheckedException {
Checkpointer cp = checkpointer;
if (cp != null)
cp.cancelOrWaitPartitionDestroy(cacheCtx, partId);
}
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
* @param cacheCtx Cache context.
* @param part Partition ID.
* @param partCntrSince Partition counter or {@code null} to search for minimal counter.
* @return Checkpoint entry or {@code null} if failed to search.
*/
@Nullable public WALPointer searchPartitionCounter(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) {
CheckpointEntry entry = searchCheckpointEntry(cacheCtx, part, partCntrSince);
if (entry == null)
return null;
return entry.cpMark;
}
/**
* Tries to search for a WAL pointer for the given partition counter start.
*
* @param cacheCtx Cache context.
* @param part Partition ID.
* @param partCntrSince Partition counter or {@code null} to search for minimal counter.
* @return Checkpoint entry or {@code null} if failed to search.
*/
@Nullable private CheckpointEntry searchCheckpointEntry(GridCacheContext cacheCtx, int part, @Nullable Long partCntrSince) {
boolean hasGap = false;
CheckpointEntry first = null;
for (Long cpTs : checkpointHist.checkpoints()) {
try {
CheckpointEntry entry = checkpointHist.entry(cpTs);
Long foundCntr = entry.partitionCounter(cacheCtx.cacheId(), part);
if (foundCntr != null) {
if (partCntrSince == null) {
if (hasGap) {
first = entry;
hasGap = false;
}
if (first == null)
first = entry;
}
else if (foundCntr <= partCntrSince) {
first = entry;
hasGap = false;
}
else
return hasGap ? null : first;
}
else
hasGap = true;
}
catch (IgniteCheckedException ignore) {
// Treat exception the same way as a gap.
hasGap = true;
}
}
return hasGap ? null : first;
}
/**
* @return Checkpoint history. For tests only.
*/
public CheckpointHistory checkpointHistory() {
return checkpointHist;
}
/**
* @return Checkpoint directory.
*/
public File checkpointDirectory() {
return cpDir;
}
/**
* @param lsnr Listener.
*/
public void addCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.add(lsnr);
}
/**
* @param lsnr Listener.
*/
public void removeCheckpointListener(DbCheckpointListener lsnr) {
lsnrs.remove(lsnr);
}
/**
* @return Read checkpoint status.
* @throws IgniteCheckedException If failed to read checkpoint status page.
*/
@SuppressWarnings("TooBroadScope")
private CheckpointStatus readCheckpointStatus() throws IgniteCheckedException {
long lastStartTs = 0;
long lastEndTs = 0;
UUID startId = CheckpointStatus.NULL_UUID;
UUID endId = CheckpointStatus.NULL_UUID;
File startFile = null;
File endFile = null;
WALPointer startPtr = CheckpointStatus.NULL_PTR;
WALPointer endPtr = CheckpointStatus.NULL_PTR;
File dir = cpDir;
if (!dir.exists()) {
// TODO: remove excessive logging after GG-12116 fix.
File[] files = dir.listFiles();
if (files != null && files.length > 0) {
log.warning("Read checkpoint status: cpDir.exists() is false, cpDir.listFiles() is: " +
Arrays.toString(files));
}
if (Files.exists(dir.toPath()))
log.warning("Read checkpoint status: cpDir.exists() is false, Files.exists(cpDir) is true.");
log.info("Read checkpoint status: checkpoint directory is not found.");
return new CheckpointStatus(0, startId, startPtr, endId, endPtr);
}
File[] files = dir.listFiles();
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
long ts = Long.parseLong(matcher.group(1));
UUID id = UUID.fromString(matcher.group(2));
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START && ts > lastStartTs) {
lastStartTs = ts;
startId = id;
startFile = file;
}
else if (type == CheckpointEntryType.END && ts > lastEndTs) {
lastEndTs = ts;
endId = id;
endFile = file;
}
}
}
ByteBuffer buf = ByteBuffer.allocate(20);
buf.order(ByteOrder.nativeOrder());
if (startFile != null)
startPtr = readPointer(startFile, buf);
if (endFile != null)
endPtr = readPointer(endFile, buf);
// TODO: remove excessive logging after GG-12116 fix.
log.info("Read checkpoint status: start marker = " + startFile + ", end marker = " + endFile);
return new CheckpointStatus(lastStartTs, startId, startPtr, endId, endPtr);
}
/**
* @param cpMarkerFile Checkpoint mark file.
* @return WAL pointer.
* @throws IgniteCheckedException If failed to read mark file.
*/
private WALPointer readPointer(File cpMarkerFile, ByteBuffer buf) throws IgniteCheckedException {
buf.position(0);
try (FileChannel ch = FileChannel.open(cpMarkerFile.toPath(), StandardOpenOption.READ)) {
ch.read(buf);
buf.flip();
return new FileWALPointer(buf.getLong(), buf.getInt(), buf.getInt());
}
catch (IOException e) {
throw new IgniteCheckedException("Failed to read checkpoint pointer from marker file: " +
cpMarkerFile.getAbsolutePath(), e);
}
}
/**
* @param status Checkpoint status.
*/
private WALPointer restoreMemory(CheckpointStatus status) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Checking memory state [lastValidPos=" + status.endPtr + ", lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
boolean apply = status.needRestoreMemory();
if (apply) {
U.quietAndWarn(log, "Ignite node crashed in the middle of checkpoint. Will restore memory state and " +
"enforce checkpoint on node start.");
cctx.pageStore().beginRecover();
}
long start = U.currentTimeMillis();
int applied = 0;
WALPointer lastRead = null;
try (WALIterator it = cctx.wal().replay(status.endPtr)) {
while (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
WALRecord rec = tup.get2();
lastRead = tup.get1();
switch (rec.type()) {
case CHECKPOINT_RECORD:
CheckpointRecord cpRec = (CheckpointRecord)rec;
// We roll memory up until we find a checkpoint start record registered in the status.
if (F.eq(cpRec.checkpointId(), status.cpStartId)) {
log.info("Found last checkpoint marker [cpId=" + cpRec.checkpointId() +
", pos=" + tup.get1() + ']');
apply = false;
}
else if (!F.eq(cpRec.checkpointId(), status.cpEndId))
U.warn(log, "Found unexpected checkpoint marker, skipping [cpId=" + cpRec.checkpointId() +
", expCpId=" + status.cpStartId + ", pos=" + tup.get1() + ']');
break;
case PAGE_RECORD:
if (apply) {
PageSnapshot pageRec = (PageSnapshot)rec;
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
int cacheId = pageRec.fullPageId().cacheId();
long pageId = pageRec.fullPageId().pageId();
PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId);
long page = pageMem.acquirePage(cacheId, pageId, true);
try {
long pageAddr = pageMem.writeLock(cacheId, pageId, page);
try {
PageUtils.putBytes(pageAddr, 0, pageRec.pageData());
}
finally {
pageMem.writeUnlock(cacheId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(cacheId, pageId, page);
}
applied++;
}
break;
case PARTITION_DESTROY:
if (apply) {
PartitionDestroyRecord destroyRec = (PartitionDestroyRecord)rec;
final int cId = destroyRec.cacheId();
final int pId = destroyRec.partitionId();
PageMemoryEx pageMem = getPageMemoryForCacheId(cId);
pageMem.clearAsync(new P3<Integer, Long, Integer>() {
@Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
return cacheId == cId && PageIdUtils.partId(pageId) == pId;
}
}, true).get();
}
break;
default:
if (apply && rec instanceof PageDeltaRecord) {
PageDeltaRecord r = (PageDeltaRecord)rec;
int cacheId = r.cacheId();
long pageId = r.pageId();
PageMemoryEx pageMem = getPageMemoryForCacheId(cacheId);
// Here we do not require tag check because we may be applying memory changes after
// several repetitive restarts and the same pages may have changed several times.
long page = pageMem.acquirePage(cacheId, pageId, true);
try {
long pageAddr = pageMem.writeLock(cacheId, pageId, page);
try {
r.applyDelta(pageMem, pageAddr);
}
finally {
pageMem.writeUnlock(cacheId, pageId, page, null, true, true);
}
}
finally {
pageMem.releasePage(cacheId, pageId, page);
}
applied++;
}
}
}
}
if (status.needRestoreMemory()) {
assert !apply : "Restoring memory state failed, checkpoint marker [cpId=" + status.cpStartId +
"] was not found in WAL";
log.info("Finished applying memory changes [changesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + "ms]");
if (applied > 0)
finalizeCheckpointOnRecovery(status.cpStartTs, status.cpStartId, status.startPtr);
}
checkpointHist.loadHistory(cpDir);
return lastRead == null ? null : lastRead.next();
}
/**
* Obtains PageMemory reference from cache descriptor instead of cache context.
*
* @param cacheId Cache id.
* @return PageMemoryEx instance.
* @throws IgniteCheckedException if no MemoryPolicy is configured for a name obtained from cache descriptor.
*/
private PageMemoryEx getPageMemoryForCacheId(int cacheId) throws IgniteCheckedException {
GridCacheSharedContext sharedCtx = context();
String memPlcName = sharedCtx
.cache()
.cacheDescriptor(cacheId)
.cacheConfiguration()
.getMemoryPolicyName();
return (PageMemoryEx) sharedCtx.database().memoryPolicy(memPlcName).pageMemory();
}
/**
* @param status Last registered checkpoint status.
* @throws IgniteCheckedException If failed to apply updates.
* @throws StorageException If IO exception occurred while reading write-ahead log.
*/
private void applyLastUpdates(CheckpointStatus status) throws IgniteCheckedException {
if (log.isInfoEnabled())
log.info("Applying lost cache updates since last checkpoint record [lastMarked="
+ status.startPtr + ", lastCheckpointId=" + status.cpStartId + ']');
cctx.kernalContext().query().skipFieldLookup(true);
long start = U.currentTimeMillis();
int applied = 0;
try (WALIterator it = cctx.wal().replay(status.startPtr)) {
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates = new HashMap<>();
while (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> next = it.nextX();
WALRecord rec = next.get2();
switch (rec.type()) {
case DATA_RECORD:
DataRecord dataRec = (DataRecord)rec;
for (DataEntry dataEntry : dataRec.writeEntries()) {
int cacheId = dataEntry.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
applyUpdate(cacheCtx, dataEntry);
applied++;
}
break;
case PART_META_UPDATE_STATE:
PartitionMetaStateRecord metaStateRecord = (PartitionMetaStateRecord)rec;
partStates.put(new T2<>(metaStateRecord.cacheId(), metaStateRecord.partitionId()),
new T2<>((int)metaStateRecord.state(), metaStateRecord.updateCounter()));
break;
default:
// Skip other records.
}
}
restorePartitionState(partStates);
}
finally {
cctx.kernalContext().query().skipFieldLookup(false);
}
if (log.isInfoEnabled())
log.info("Finished applying WAL changes [updatesApplied=" + applied +
", time=" + (U.currentTimeMillis() - start) + "ms]");
}
/**
* @param partStates Partition states.
* @throws IgniteCheckedException If failed to restore.
*/
private void restorePartitionState(
Map<T2<Integer, Integer>, T2<Integer, Long>> partStates
) throws IgniteCheckedException {
Collection<GridCacheContext> cacheContexts = cctx.cacheContexts();
for (GridCacheContext context : cacheContexts) {
int cacheId = context.cacheId();
GridCacheContext cacheCtx = cctx.cacheContext(cacheId);
PageMemoryEx pageMem = (PageMemoryEx)cacheCtx.memoryPolicy().pageMemory();
for (int i = 0; i < context.affinity().partitions(); i++) {
if (storeMgr.exists(cacheId, i)) {
storeMgr.ensure(cacheId, i);
if (storeMgr.pages(cacheId, i) <= 1)
continue;
long partMetaId = pageMem.partitionMetaPageId(cacheId, i);
long partMetaPage = pageMem.acquirePage(cacheId, partMetaId);
try {
long pageAddr = pageMem.writeLock(cacheId, partMetaId, partMetaPage);
boolean changed = false;
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
T2<Integer, Long> fromWal = partStates.get(new T2<>(cacheId, i));
GridDhtLocalPartition part = context.topology()
.localPartition(i, AffinityTopologyVersion.NONE, true);
assert part != null;
if (fromWal != null) {
int stateId = fromWal.get1();
io.setPartitionState(pageAddr, (byte)stateId);
changed = updateState(part, stateId);
if (stateId == GridDhtPartitionState.OWNING.ordinal()) {
cacheCtx.offheap().onPartitionInitialCounterUpdated(i, fromWal.get2());
if (part.initialUpdateCounter() < fromWal.get2()) {
part.initialUpdateCounter(fromWal.get2());
changed = true;
}
}
}
else
changed = updateState(part, (int)io.getPartitionState(pageAddr));
}
finally {
pageMem.writeUnlock(cacheId, partMetaId, partMetaPage, null, changed);
}
}
finally {
pageMem.releasePage(cacheId, partMetaId, partMetaPage);
}
}
}
}
}
/**
* @param part Partition to restore state for.
* @param stateId State enum ordinal.
* @return Updated flag.
*/
private boolean updateState(GridDhtLocalPartition part, int stateId) {
if (stateId != -1) {
GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
assert state != null;
part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
return true;
}
return false;
}
/**
* @param cacheCtx Cache context to apply an update.
* @param dataEntry Data entry to apply.
*/
private void applyUpdate(GridCacheContext cacheCtx, DataEntry dataEntry) throws IgniteCheckedException {
GridDhtLocalPartition locPart = cacheCtx.topology()
.localPartition(dataEntry.partitionId(), AffinityTopologyVersion.NONE, true);
switch (dataEntry.op()) {
case CREATE:
case UPDATE:
cacheCtx.offheap().update(
dataEntry.key(),
dataEntry.value(),
dataEntry.writeVersion(),
0L,
locPart,
null);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter());
break;
case DELETE:
cacheCtx.offheap().remove(dataEntry.key(), dataEntry.partitionId(), locPart);
if (dataEntry.partitionCounter() != 0)
cacheCtx.offheap().onPartitionInitialCounterUpdated(dataEntry.partitionId(), dataEntry.partitionCounter());
break;
default:
throw new IgniteCheckedException("Invalid operation for WAL entry update: " + dataEntry.op());
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void finalizeCheckpointOnRecovery(long cpTs, UUID cpId, WALPointer walPtr) throws IgniteCheckedException {
assert cpTs != 0;
ByteBuffer tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
long start = System.currentTimeMillis();
Collection<MemoryPolicy> memPolicies = context().database().memoryPolicies();
List<IgniteBiTuple<PageMemory, Collection<FullPageId>>> cpEntities = new ArrayList<>(memPolicies.size());
for (MemoryPolicy memPlc : memPolicies) {
PageMemoryEx pageMem = (PageMemoryEx) memPlc.pageMemory();
cpEntities.add(new IgniteBiTuple<PageMemory, Collection<FullPageId>>(pageMem,
(pageMem).beginCheckpoint()));
}
tmpWriteBuf.order(ByteOrder.nativeOrder());
// Identity stores set.
Collection<PageStore> updStores = new HashSet<>();
int cpPagesCnt = 0;
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities) {
PageMemoryEx pageMem = (PageMemoryEx) e.get1();
Collection<FullPageId> cpPages = e.get2();
cpPagesCnt += cpPages.size();
for (FullPageId fullId : cpPages) {
tmpWriteBuf.rewind();
Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf);
if (tag != null) {
tmpWriteBuf.rewind();
PageStore store = storeMgr.writeInternal(fullId.cacheId(), fullId.pageId(), tmpWriteBuf, tag);
tmpWriteBuf.rewind();
updStores.add(store);
}
}
}
long written = U.currentTimeMillis();
for (PageStore updStore : updStores)
updStore.sync();
long fsync = U.currentTimeMillis();
for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities)
((PageMemoryEx)e.get1()).finishCheckpoint();
writeCheckpointEntry(
tmpWriteBuf,
cpTs,
cpId,
walPtr,
null,
CheckpointEntryType.END);
cctx.pageStore().finishRecover();
if (log.isInfoEnabled())
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"pagesWrite=%dms, fsync=%dms, total=%dms]",
cpId,
cpPagesCnt,
walPtr,
written - start,
fsync - written,
fsync - start));
}
/**
* @param cpId Checkpoint ID.
* @param ptr Wal pointer of current checkpoint.
*/
private CheckpointEntry writeCheckpointEntry(
ByteBuffer tmpWriteBuf,
long cpTs,
UUID cpId,
WALPointer ptr,
CheckpointRecord rec,
CheckpointEntryType type
) throws IgniteCheckedException {
assert ptr instanceof FileWALPointer;
FileWALPointer filePtr = (FileWALPointer)ptr;
String fileName = checkpointFileName(cpTs, cpId, type);
try (FileChannel ch = FileChannel.open(Paths.get(cpDir.getAbsolutePath(), fileName),
StandardOpenOption.CREATE_NEW, StandardOpenOption.APPEND)) {
tmpWriteBuf.rewind();
tmpWriteBuf.putLong(filePtr.index());
tmpWriteBuf.putInt(filePtr.fileOffset());
tmpWriteBuf.putInt(filePtr.length());
tmpWriteBuf.flip();
ch.write(tmpWriteBuf);
tmpWriteBuf.clear();
if (!skipSync)
ch.force(true);
return type == CheckpointEntryType.START ?
new CheckpointEntry(cpTs, ptr, cpId, rec.cacheStates()) : null;
}
catch (IOException e) {
throw new IgniteCheckedException(e);
}
}
/**
* @param cpTs Checkpoint timestamp.
* @param cpId Checkpoint ID.
* @param type Checkpoint type.
* @return Checkpoint file name.
*/
private static String checkpointFileName(long cpTs, UUID cpId, CheckpointEntryType type) {
return cpTs + "-" + cpId + "-" + type + ".bin";
}
/**
* @param reqs Destroy requests.
*/
@SuppressWarnings("TypeMayBeWeakened")
private void finishDestroyPartitionsAsync(final Collection<PartitionDestroyRequest> reqs) {
final Map<Integer, Collection<Integer>> filterMap = new HashMap<>();
final Set<PageMemoryEx> pageMemSet = new HashSet<>();
for (PartitionDestroyRequest req : reqs) {
Collection<Integer> partIds = filterMap.get(req.cacheId);
if (partIds == null) {
partIds = new HashSet<>();
filterMap.put(req.cacheId, partIds);
}
partIds.add(req.partId);
pageMemSet.add((PageMemoryEx)cctx.cacheContext(req.cacheId).memoryPolicy().pageMemory());
}
for (PageMemoryEx pageMem : pageMemSet) {
IgniteInternalFuture<Void> clearFut = pageMem.clearAsync(new P3<Integer, Long, Integer>() {
@Override public boolean apply(Integer cacheId, Long pageId, Integer tag) {
assert cacheId != null;
assert pageId != null;
int partId = PageIdUtils.partId(pageId);
Collection<Integer> parts = filterMap.get(cacheId);
return parts != null && parts.contains(partId);
}
}, true);
clearFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() {
@Override public void apply(IgniteInternalFuture<Void> clearFut) {
for (PartitionDestroyRequest req : reqs) {
try {
assert !req.allowFastEviction;
// Tag should never grow in this case.
cctx.pageStore().onPartitionDestroyed(req.cacheId, req.partId, 1);
}
catch (IgniteCheckedException e) {
req.onDone(e);
}
finally {
req.onDone(clearFut.error());
}
}
}
});
}
}
/**
*
*/
@SuppressWarnings("NakedNotify")
public class Checkpointer extends GridWorker {
/** Temporary write buffer. */
private final ByteBuffer tmpWriteBuf;
/** Next scheduled checkpoint progress. */
private volatile CheckpointProgress scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
private volatile CheckpointProgress curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
/**
* @param gridName Grid name.
* @param name Thread name.
* @param log Logger.
*/
protected Checkpointer(@Nullable String gridName, String name, IgniteLogger log) {
super(gridName, name, log);
scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
tmpWriteBuf = ByteBuffer.allocateDirect(pageSize());
tmpWriteBuf.order(ByteOrder.nativeOrder());
}
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException {
while (!isCancelled()) {
waitCheckpointEvent();
GridFutureAdapter<Void> enableChangeApplied = GridCacheDatabaseSharedManager.this.enableChangeApplied;
if (enableChangeApplied != null) {
enableChangeApplied.onDone();
GridCacheDatabaseSharedManager.this.enableChangeApplied = null;
}
if (checkpointsEnabled)
doCheckpoint();
else {
synchronized (this) {
scheduledCp.nextCpTs = U.currentTimeMillis() + checkpointFreq;
}
}
}
// Final run after the cancellation.
if (checkpointsEnabled && !shutdownNow)
doCheckpoint();
}
/**
*
*/
private CheckpointProgressSnapshot wakeupForCheckpoint(long delayFromNow, String reason) {
CheckpointProgress sched = scheduledCp;
long next = U.currentTimeMillis() + delayFromNow;
if (sched.nextCpTs <= next)
return new CheckpointProgressSnapshot(sched);
CheckpointProgressSnapshot ret;
synchronized (this) {
sched = scheduledCp;
if (sched.nextCpTs > next) {
sched.reason = reason;
sched.nextCpTs = next;
}
ret = new CheckpointProgressSnapshot(sched);
notifyAll();
}
return ret;
}
/**
* @param snapshotOperation Snapshot operation.
*/
public IgniteInternalFuture wakeupForSnapshotCreation(SnapshotOperation snapshotOperation) {
GridFutureAdapter<Object> ret;
synchronized (this) {
scheduledCp.nextCpTs = U.currentTimeMillis();
scheduledCp.reason = "snapshot";
scheduledCp.nextSnapshot = true;
scheduledCp.snapshotOperation = snapshotOperation;
ret = scheduledCp.cpBeginFut;
notifyAll();
}
return ret;
}
/**
* @param cacheCtx Cache context.
* @param partId Partition ID.
*/
private void schedulePartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId) {
synchronized (this) {
scheduledCp.destroyQueue.addDestroyRequest(cacheCtx, partId);
}
wakeupForCheckpoint(partDestroyCheckpointDelay, "partition destroy");
}
/**
* @param cacheCtx Cache context.
* @param partId Partition ID.
*/
private void cancelOrWaitPartitionDestroy(GridCacheContext<?, ?> cacheCtx, int partId)
throws IgniteCheckedException {
CheckpointProgress cur = curCpProgress;
PartitionDestroyRequest req;
if (cur != null) {
req = cur.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId);
if (req != null)
req.waitCompleted();
}
synchronized (this) {
req = scheduledCp.destroyQueue.cancelDestroy(cacheCtx.cacheId(), partId);
}
if (req != null)
req.waitCompleted();
}
/**
*
*/
private void doCheckpoint() {
try {
long start = U.currentTimeMillis();
Checkpoint chp = markCheckpointBegin();
snapshotMgr.onCheckPointBegin();
long written, fsync, marked = U.currentTimeMillis();
int pages = chp.cpPages.size();
boolean interrupted = true;
try {
// Identity stores set.
GridConcurrentHashSet<PageStore> updStores = new GridConcurrentHashSet<>();
CountDownFuture doneWriteFut = new CountDownFuture(
asyncRunner == null ? 1 : chp.cpPages.collectionsSize());
if (asyncRunner != null) {
for (int i = 0; i < chp.cpPages.collectionsSize(); i++) {
Runnable write = new WriteCheckpointPages(
chp.cpPages.innerCollection(i),
updStores,
doneWriteFut
);
try {
asyncRunner.execute(write);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
write.run();
}
}
}
else {
// Single-threaded checkpoint.
Runnable write = new WriteCheckpointPages(chp.cpPages, updStores, doneWriteFut);
write.run();
}
// Wait and check for errors.
doneWriteFut.get();
// Must re-check shutdown flag here because threads may have skipped some pages.
// If so, we should not put finish checkpoint mark.
if (shutdownNow)
return;
snapshotMgr.afterCheckpointPageWritten();
written = U.currentTimeMillis();
if (!skipSync) {
for (PageStore updStore : updStores) {
if (shutdownNow)
return;
updStore.sync();
}
}
fsync = U.currentTimeMillis();
// Must mark successful checkpoint only if there are no exceptions or interrupts.
interrupted = false;
}
finally {
if (!interrupted)
markCheckpointEnd(chp);
}
long fsyncEnd = U.currentTimeMillis();
// We finished this checkpoint, now it's time to clean up partitions.
PartitionDestroyQueue destroyQueue = chp.progress.destroyQueue;
Collection<PartitionDestroyRequest> reqs = null;
WALPointer lastPtr = null;
for (T2<Integer, Integer> destroyId : destroyQueue.pendingReqs.keySet()) {
PartitionDestroyRequest req = destroyQueue.beginDestroy(destroyId);
if (req != null) {
// Log destroy record before actual partition clear.
lastPtr = cctx.wal().log(new PartitionDestroyRecord(req.cacheId, req.partId));
if (reqs == null)
reqs = new ArrayList<>();
reqs.add(req);
}
}
if (reqs != null) {
assert lastPtr != null;
cctx.wal().fsync(lastPtr);
finishDestroyPartitionsAsync(reqs);
}
if (printCheckpointStats)
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"walSegmentsCleared=%d, markBegin=%dms, pagesWrite=%dms, fsync=%dms, markEnd=%dms, " +
"total=%dms]",
chp.cpEntry.checkpointId(),
pages,
chp.cpEntry.checkpointMark(),
chp.walFilesDeleted,
marked - start,
written - marked,
fsync - written,
fsyncEnd - fsync,
fsyncEnd - start));
}
catch (IgniteCheckedException e) {
// TODO-ignite-db how to handle exception?
U.error(log, "Failed to create checkpoint.", e);
}
}
/**
*
*/
@SuppressWarnings("WaitNotInLoop")
private void waitCheckpointEvent() {
boolean cancel = false;
try {
long now = U.currentTimeMillis();
synchronized (this) {
long remaining;
while ((remaining = scheduledCp.nextCpTs - now) > 0 && !isCancelled()) {
wait(remaining);
now = U.currentTimeMillis();
}
}
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
cancel = true;
}
if (cancel)
isCancelled = true;
}
/**
*
*/
@SuppressWarnings("TooBroadScope")
private Checkpoint markCheckpointBegin() throws IgniteCheckedException {
CheckpointRecord cpRec = new CheckpointRecord(null, false);
WALPointer cpPtr;
GridMultiCollectionWrapper<FullPageId> cpPages;
long lockAcquired, lockReleased, lockStart = U.currentTimeMillis();
final CheckpointProgress curr;
checkpointLock.writeLock().lock();
try {
lockAcquired = U.currentTimeMillis();
synchronized (this) {
curr = scheduledCp;
curr.started = true;
if (curr.reason == null)
curr.reason = "timeout";
// It is important that we assign a new progress object before checkpoint mark in page memory.
scheduledCp = new CheckpointProgress(U.currentTimeMillis() + checkpointFreq);
curCpProgress = curr;
}
final NavigableMap<T2<Integer, Integer>, T2<Integer, Integer>> map =
new TreeMap<>(FullPageIdIterableComparator.INSTANCE);
DbCheckpointListener.Context ctx0 = new DbCheckpointListener.Context() {
@Override public boolean nextSnapshot() {
return curr.nextSnapshot;
}
@Override public Map<T2<Integer, Integer>, T2<Integer, Integer>> partitionStatMap() {
return map;
}
};
// Listeners must be invoked before we write checkpoint record to WAL.
for (DbCheckpointListener lsnr : lsnrs)
lsnr.onCheckpointBegin(ctx0);
Collection<GridCacheContext> cacheCtxs = ((GridCacheSharedContext<Object, Object>)cctx).cacheContexts();
for (GridCacheContext cacheCtx : cacheCtxs) {
CacheState state = new CacheState();
if (cacheCtx.isLocal())
continue;
for (GridDhtLocalPartition part : cacheCtx.topology().currentLocalPartitions())
state.addPartitionState(part.id(), part.dataStore().size(), part.lastAppliedUpdate());
cpRec.addCacheState(cacheCtx.cacheId(), state);
}
if (curr.nextSnapshot)
snapshotMgr.onMarkCheckPointBegin(curr.snapshotOperation, map);
// No page updates for this checkpoint are allowed from now on.
cpPtr = cctx.wal().log(cpRec);
if (cpPtr == null)
cpPtr = CheckpointStatus.NULL_PTR;
IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> tup = beginAllCheckpoints();
// Todo it maybe more optimally
Collection<FullPageId> cpPagesList = new ArrayList<>(tup.get2());
for (GridMultiCollectionWrapper<FullPageId> col : tup.get1()) {
for (int i = 0; i < col.collectionsSize(); i++)
cpPagesList.addAll(col.innerCollection(i));
}
cpPages = new GridMultiCollectionWrapper<>(cpPagesList);
}
finally {
checkpointLock.writeLock().unlock();
lockReleased = U.currentTimeMillis();
}
curr.cpBeginFut.onDone();
// Sync log outside the checkpoint write lock.
cctx.wal().fsync(cpPtr);
long cpTs = System.currentTimeMillis();
CheckpointEntry cpEntry = writeCheckpointEntry(
tmpWriteBuf,
cpTs,
cpRec.checkpointId(),
cpPtr,
cpRec,
CheckpointEntryType.START);
checkpointHist.addCheckpointEntry(cpEntry);
if (printCheckpointStats)
if (log.isInfoEnabled())
log.info(String.format("Checkpoint started [checkpointId=%s, startPtr=%s, checkpointLockWait=%dms, " +
"checkpointLockHoldTime=%dms, pages=%d, reason='%s']",
cpRec.checkpointId(),
cpPtr,
lockAcquired - lockStart,
lockReleased - lockAcquired,
cpPages.size(),
curr.reason)
);
return new Checkpoint(cpEntry, cpPages, curr);
}
/**
* @return tuple with collections of FullPageIds obtained from each PageMemory and overall number of dirty pages.
*/
private IgniteBiTuple<Collection<GridMultiCollectionWrapper<FullPageId>>, Integer> beginAllCheckpoints() {
Collection<GridMultiCollectionWrapper<FullPageId>> res = new ArrayList(memoryPolicies().size());
int pagesNum = 0;
for (MemoryPolicy memPlc : memoryPolicies()) {
GridMultiCollectionWrapper<FullPageId> nextCpPagesCol = ((PageMemoryEx) memPlc.pageMemory()).beginCheckpoint();
pagesNum += nextCpPagesCol.size();
res.add(nextCpPagesCol);
}
return new IgniteBiTuple<>(res, pagesNum);
}
/**
* @param chp Checkpoint snapshot.
*/
private void markCheckpointEnd(Checkpoint chp) throws IgniteCheckedException {
synchronized (this) {
for (MemoryPolicy memPlc : memoryPolicies())
((PageMemoryEx)memPlc.pageMemory()).finishCheckpoint();
writeCheckpointEntry(
tmpWriteBuf,
chp.cpEntry.checkpointTimestamp(),
chp.cpEntry.checkpointId(),
chp.cpEntry.checkpointMark(),
null,
CheckpointEntryType.END);
}
chp.walFilesDeleted = checkpointHist.onCheckpointFinished();
chp.progress.cpFinishFut.onDone();
}
/** {@inheritDoc} */
@Override public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
isCancelled = true;
synchronized (this) {
notifyAll();
}
}
/**
*
*/
public void shutdownNow() {
shutdownNow = true;
if (!isCancelled)
cancel();
}
}
/**
*
*/
private class WriteCheckpointPages implements Runnable {
/** */
private Collection<FullPageId> writePageIds;
/** */
private GridConcurrentHashSet<PageStore> updStores;
/** */
private CountDownFuture doneFut;
/**
* @param writePageIds Write page IDs.
*/
private WriteCheckpointPages(
Collection<FullPageId> writePageIds,
GridConcurrentHashSet<PageStore> updStores,
CountDownFuture doneFut
) {
this.writePageIds = writePageIds;
this.updStores = updStores;
this.doneFut = doneFut;
}
/** {@inheritDoc} */
@Override public void run() {
ByteBuffer tmpWriteBuf = threadBuf.get();
long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
snapshotMgr.beforeCheckpointPageWritten();
try {
for (FullPageId fullId : writePageIds) {
if (checkpointer.shutdownNow)
break;
tmpWriteBuf.rewind();
snapshotMgr.beforePageWrite(fullId);
int cacheId = fullId.cacheId();
GridCacheContext cacheCtx = context().cacheContext(cacheId);
if (cacheCtx == null)
continue;
PageMemoryEx pageMem = (PageMemoryEx) cacheCtx.memoryPolicy().pageMemory();
Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf);
if (tag != null) {
tmpWriteBuf.rewind();
if (!skipCrc) {
PageIO.setCrc(writeAddr, PureJavaCrc32.calcCrc32(tmpWriteBuf, pageSize()));
tmpWriteBuf.rewind();
}
snapshotMgr.onPageWrite(fullId, tmpWriteBuf);
tmpWriteBuf.rewind();
PageIO.setCrc(writeAddr, 0);
PageStore store = storeMgr.writeInternal(cacheId, fullId.pageId(), tmpWriteBuf, tag);
updStores.add(store);
}
}
doneFut.onDone((Void)null);
}
catch (Throwable e) {
doneFut.onDone(e);
}
}
}
/**
*
*/
private enum CheckpointEntryType {
/** */
START,
/** */
END
}
/**
*
*/
private static class Checkpoint {
/** Checkpoint entry. */
private final CheckpointEntry cpEntry;
/** Checkpoint pages. */
private final GridMultiCollectionWrapper<FullPageId> cpPages;
/** */
private final CheckpointProgress progress;
/** Number of deleted WAL files. */
private int walFilesDeleted;
/**
* @param cpEntry Checkpoint entry.
* @param cpPages Pages to write to the page store.
* @param progress Checkpoint progress status.
*/
private Checkpoint(
CheckpointEntry cpEntry,
GridMultiCollectionWrapper<FullPageId> cpPages,
CheckpointProgress progress
) {
assert cpEntry.initGuard != 0;
this.cpEntry = cpEntry;
this.cpPages = cpPages;
this.progress = progress;
}
}
/**
*
*/
private static class CheckpointStatus {
/** Null checkpoint UUID. */
private static final UUID NULL_UUID = new UUID(0L, 0L);
/** Null WAL pointer. */
private static final WALPointer NULL_PTR = new FileWALPointer(0, 0, 0);
/** */
private long cpStartTs;
/** */
private UUID cpStartId;
/** */
private WALPointer startPtr;
/** */
private UUID cpEndId;
/** */
private WALPointer endPtr;
/**
* @param cpStartId Checkpoint start ID.
* @param startPtr Checkpoint start pointer.
* @param cpEndId Checkpoint end ID.
* @param endPtr Checkpoint end pointer.
*/
private CheckpointStatus(long cpStartTs, UUID cpStartId, WALPointer startPtr, UUID cpEndId, WALPointer endPtr) {
this.cpStartTs = cpStartTs;
this.cpStartId = cpStartId;
this.startPtr = startPtr;
this.cpEndId = cpEndId;
this.endPtr = endPtr;
}
/**
* @return {@code True} if need to apply page log to restore tree structure.
*/
public boolean needRestoreMemory() {
return !F.eq(cpStartId, cpEndId) && !F.eq(NULL_UUID, cpStartId);
}
}
/**
*
*/
public static class CheckpointProgress {
/** */
private volatile long nextCpTs;
/** */
private GridFutureAdapter cpBeginFut = new GridFutureAdapter<>();
/** */
private GridFutureAdapter cpFinishFut = new GridFutureAdapter<>();
/** */
public volatile boolean nextSnapshot;
/** */
private volatile boolean started;
/** */
public volatile SnapshotOperation snapshotOperation;
/** Wakeup reason. */
private String reason;
/** */
private final PartitionDestroyQueue destroyQueue = new PartitionDestroyQueue();
/**
* @param nextCpTs Next checkpoint timestamp.
*/
private CheckpointProgress(long nextCpTs) {
this.nextCpTs = nextCpTs;
}
}
/**
*
*/
private static class CheckpointProgressSnapshot {
/** */
private final boolean started;
/** */
private final GridFutureAdapter<Object> cpBeginFut;
/** */
private final GridFutureAdapter<Object> cpFinishFut;
/** */
CheckpointProgressSnapshot(CheckpointProgress cpProgress) {
started = cpProgress.started;
cpBeginFut = cpProgress.cpBeginFut;
cpFinishFut = cpProgress.cpFinishFut;
}
}
/**
* Checkpoint history.
*/
@SuppressWarnings("PublicInnerClass")
public class CheckpointHistory {
/** */
private final NavigableMap<Long, CheckpointEntry> histMap = new ConcurrentSkipListMap<>();
/**
* Load history form checkpoint directory.
*
* @param dir Checkpoint state dir.
*/
private void loadHistory(File dir) throws IgniteCheckedException {
if (!dir.exists())
return;
File[] files = dir.listFiles(CP_FILE_FILTER);
if (!F.isEmpty(files)) {
Arrays.sort(files, CP_TS_COMPARATOR);
ByteBuffer buf = ByteBuffer.allocate(16);
buf.order(ByteOrder.nativeOrder());
for (File file : files) {
Matcher matcher = CP_FILE_NAME_PATTERN.matcher(file.getName());
if (matcher.matches()) {
CheckpointEntryType type = CheckpointEntryType.valueOf(matcher.group(3));
if (type == CheckpointEntryType.START) {
long cpTs = Long.parseLong(matcher.group(1));
WALPointer ptr = readPointer(file, buf);
if (ptr == null)
continue;
// Create lazy checkpoint entry.
CheckpointEntry entry = new CheckpointEntry(cpTs, ptr);
histMap.put(cpTs, entry);
}
}
}
}
}
/**
* @param cpTs Checkpoint timestamp.
* @return Initialized entry.
* @throws IgniteCheckedException If failed to initialize entry.
*/
private CheckpointEntry entry(Long cpTs) throws IgniteCheckedException {
CheckpointEntry entry = histMap.get(cpTs);
if (entry == null)
throw new IgniteCheckedException("Checkpoint entry was removed: " + cpTs);
entry.initIfNeeded(cctx);
return entry;
}
/**
* @return Collection of checkpoint timestamps.
*/
public Collection<Long> checkpoints() {
return histMap.keySet();
}
/**
* Adds checkpoint entry after the corresponding WAL record has been written to WAL. The checkpoint itself
* is not finished yet.
*
* @param entry Entry to ad.
*/
private void addCheckpointEntry(CheckpointEntry entry) {
histMap.put(entry.checkpointTimestamp(), entry);
}
/**
* Clears checkpoint history.
*/
private int onCheckpointFinished() {
int deleted = 0;
while (histMap.size() > dbCfg.getWalHistorySize()) {
Map.Entry<Long, CheckpointEntry> entry = histMap.firstEntry();
CheckpointEntry cpEntry = entry.getValue();
if (cctx.wal().reserved(cpEntry.checkpointMark()))
break;
File startFile = new File(cpDir.getAbsolutePath(), cpEntry.startFile());
File endFile = new File(cpDir.getAbsolutePath(), cpEntry.endFile());
boolean rmvdStart = !startFile.exists() || startFile.delete();
boolean rmvdEnd = !endFile.exists() || endFile.delete();
boolean fail = !rmvdStart || !rmvdEnd;
if (fail) {
U.warn(log, "Failed to remove stale checkpoint files [startFile=" + startFile.getAbsolutePath() +
", endFile=" + endFile.getAbsolutePath() + ']');
if (histMap.size() > 2 * dbCfg.getWalHistorySize()) {
U.error(log, "Too many stale checkpoint entries in the map, will truncate WAL archive anyway.");
fail = false;
}
}
if (!fail) {
deleted += cctx.wal().truncate(cpEntry.checkpointMark());
histMap.remove(entry.getKey());
}
else
break;
}
return deleted;
}
/**
*
* @param cacheId Cache ID.
* @param partId Partition ID.
* @return Reserved counter or null if couldn't reserve.
*/
@Nullable private Long reserve(int cacheId, int partId) {
for (CheckpointEntry entry : histMap.values()) {
try {
entry.initIfNeeded(cctx);
if (entry.cacheStates == null)
continue;
CacheState cacheState = entry.cacheStates.get(cacheId);
if (cacheState == null)
continue;
CacheState.PartitionState partState = cacheState.partitions().get(partId);
if (partState != null) {
if (cctx.wal().reserve(entry.checkpointMark()))
return partState.partitionCounter();
}
}
catch (Exception e) {
U.error(log, "Error while trying to reserve history", e);
}
}
return null;
}
}
/**
*
*/
private static class CheckpointEntry {
/** */
private static final AtomicIntegerFieldUpdater<CheckpointEntry> initGuardUpdater =
AtomicIntegerFieldUpdater.newUpdater(CheckpointEntry.class, "initGuard");
/** Checkpoint timestamp. */
private long cpTs;
/** Checkpoint end mark. */
private WALPointer cpMark;
/** Initialization latch. */
private CountDownLatch initLatch;
/** */
@SuppressWarnings("unused")
private volatile int initGuard;
/** Checkpoint ID. Initalized lazily. */
private UUID cpId;
/** Cache states. Initialized lazily. */
private Map<Integer, CacheState> cacheStates;
/** Initialization exception. */
private IgniteCheckedException initEx;
/**
* Lazy entry constructor.
*
* @param cpTs Checkpoint timestamp.
* @param cpMark Checkpoint WAL mark.
*/
private CheckpointEntry(long cpTs, WALPointer cpMark) {
assert cpMark != null;
this.cpTs = cpTs;
this.cpMark = cpMark;
initLatch = new CountDownLatch(1);
}
/**
* Creates complete entry.
*
* @param cpTs Checkpoint timestamp.
* @param cpMark Checkpoint mark pointer.
* @param cpId Checkpoint ID.
* @param cacheStates Cache states.
*/
private CheckpointEntry(long cpTs, WALPointer cpMark, UUID cpId, Map<Integer, CacheState> cacheStates) {
this.cpTs = cpTs;
this.cpMark = cpMark;
this.cpId = cpId;
this.cacheStates = cacheStates;
initGuard = 1;
initLatch = new CountDownLatch(0);
}
/**
* @return Checkpoint timestamp.
*/
private long checkpointTimestamp() {
return cpTs;
}
/**
* @return Checkpoint ID.
*/
private UUID checkpointId() {
return cpId;
}
/**
* @return Checkpoint mark.
*/
private WALPointer checkpointMark() {
return cpMark;
}
/**
* @return Start file name.
*/
private String startFile() {
return checkpointFileName(cpTs, cpId, CheckpointEntryType.START);
}
/**
* @return End file name.
*/
private String endFile() {
return checkpointFileName(cpTs, cpId, CheckpointEntryType.END);
}
/**
* @param cacheId Cache ID.
* @param part Partition ID.
* @return Partition counter or {@code null} if not found.
*/
private Long partitionCounter(int cacheId, int part) {
assert initGuard != 0;
if (initEx != null || cacheStates == null)
return null;
CacheState state = cacheStates.get(cacheId);
if (state != null) {
CacheState.PartitionState partState = state.partitions().get(part);
return partState == null ? null : partState.partitionCounter();
}
return null;
}
/**
* @throws IgniteCheckedException If failed to read WAL entry.
*/
private void initIfNeeded(GridCacheSharedContext cctx) throws IgniteCheckedException {
if (initGuardUpdater.compareAndSet(this, 0, 1)) {
try (WALIterator it = cctx.wal().replay(cpMark)) {
if (it.hasNextX()) {
IgniteBiTuple<WALPointer, WALRecord> tup = it.nextX();
CheckpointRecord rec = (CheckpointRecord)tup.get2();
cpId = rec.checkpointId();
cacheStates = rec.cacheStates();
}
else
initEx = new IgniteCheckedException("Failed to find checkpoint record at " +
"the given WAL pointer: " + cpMark);
}
catch (IgniteCheckedException e) {
initEx = e;
}
finally {
initLatch.countDown();
}
}
else {
U.await(initLatch);
if (initEx != null)
throw initEx;
}
}
}
/**
* Partition destroy queue.
*/
private static class PartitionDestroyQueue {
/** */
private final ConcurrentMap<T2<Integer, Integer>, PartitionDestroyRequest> pendingReqs =
new ConcurrentHashMap<>();
/**
* @param cacheCtx Cache context.
* @param partId Partition ID to destroy.
*/
private void addDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) {
PartitionDestroyRequest req = new PartitionDestroyRequest(cacheCtx, partId);
PartitionDestroyRequest old = pendingReqs.putIfAbsent(new T2<>(cacheCtx.cacheId(), partId), req);
assert old == null : "Must wait for old destroy request to finish before adding a new one " +
"[cacheId=" + cacheCtx.cacheId() + ", cacheName=" + cacheCtx.name() + ", partId=" + partId + ']';
}
/**
* @param destroyId Destroy ID.
* @return Destroy request to complete if was not concurrently cancelled.
*/
private PartitionDestroyRequest beginDestroy(T2<Integer, Integer> destroyId) {
PartitionDestroyRequest rmvd = pendingReqs.remove(destroyId);
return rmvd == null ? null : rmvd.beginDestroy() ? rmvd : null;
}
/**
* @param cacheId Cache ID.
* @param partId Partition ID.
* @return Destroy request to wait for if destroy has begun.
*/
private PartitionDestroyRequest cancelDestroy(int cacheId, int partId) {
PartitionDestroyRequest rmvd = pendingReqs.remove(new T2<>(cacheId, partId));
return rmvd == null ? null : !rmvd.cancel() ? rmvd : null;
}
}
/**
* Partition destroy request.
*/
private static class PartitionDestroyRequest {
/** */
private int cacheId;
/** */
private String cacheName;
/** */
private int partId;
/** */
private boolean allowFastEviction;
/** Destroy cancelled flag. */
private boolean cancelled;
/** Destroy future. Not null if partition destroy has begun. */
private GridFutureAdapter<Void> destroyFut;
/**
* @param cacheCtx Cache context.
* @param partId Partition ID.
*/
private PartitionDestroyRequest(GridCacheContext<?, ?> cacheCtx, int partId) {
cacheId = cacheCtx.cacheId();
cacheName = cacheCtx.name();
allowFastEviction = cacheCtx.allowFastEviction();
this.partId = partId;
}
/**
* Cancels partition destroy request.
*
* @return {@code False} if this request needs to be waited for.
*/
private synchronized boolean cancel() {
if (destroyFut != null) {
assert !cancelled;
return false;
}
cancelled = true;
return true;
}
/**
* Initiates partition destroy.
*
* @return {@code True} if destroy request should be executed, {@code false} otherwise.
*/
private synchronized boolean beginDestroy() {
if (cancelled) {
assert destroyFut == null;
return false;
}
if (destroyFut != null)
return false;
destroyFut = new GridFutureAdapter<>();
return true;
}
/**
*
*/
private synchronized void onDone(Throwable err) {
assert destroyFut != null;
destroyFut.onDone(err);
}
/**
*
*/
private void waitCompleted() throws IgniteCheckedException {
GridFutureAdapter<Void> fut;
synchronized (this) {
assert destroyFut != null;
fut = destroyFut;
}
fut.get();
}
/** {@inheritDoc} */
@Override public String toString() {
return "PartitionDestroyRequest [cacheId=" + cacheId + ", cacheName=" + cacheName +
", partId=" + partId + ']';
}
}
/**
*
*/
private static class FileLockHolder {
/** Lock file name. */
private static final String lockFileName = "lock";
/** File. */
private File file;
/** Channel. */
private RandomAccessFile lockFile;
/** Lock. */
private FileLock lock;
/** Id. */
private GridKernalContext ctx;
/** Logger. */
private IgniteLogger log;
/**
* @param path Path.
*/
private FileLockHolder(String path, GridKernalContext ctx, IgniteLogger log) {
try {
file = Paths.get(path, lockFileName).toFile();
lockFile = new RandomAccessFile(file, "rw");
this.ctx = ctx;
this.log = log;
}
catch (IOException e) {
throw new IgniteException(e);
}
}
/**
* @param lockWaitTime During which time thread will try capture file lock.
* @throws IgniteCheckedException If failed to capture file lock.
*/
public void tryLock(int lockWaitTime) throws IgniteCheckedException {
assert lockFile != null;
FileChannel ch = lockFile.getChannel();
SB sb = new SB();
//write node id
sb.a("[").a(ctx.localNodeId().toString()).a("]");
//write ip addresses
sb.a(ctx.discovery().localNode().addresses());
//write ports
sb.a("[");
Iterator<GridPortRecord> it = ctx.ports().records().iterator();
while (it.hasNext()) {
GridPortRecord rec = it.next();
sb.a(rec.protocol()).a(":").a(rec.port());
if (it.hasNext())
sb.a(", ");
}
sb.a("]");
String failMsg;
try {
String content = null;
// Try to get lock, if not available wait 1 sec and re-try.
for (int i = 0; i < lockWaitTime; i += 1000) {
try {
lock = ch.tryLock(0, 1, false);
if (lock != null && lock.isValid()) {
writeContent(sb.toString());
return;
}
}
catch (OverlappingFileLockException ignore) {
if (content == null)
content = readContent();
log.warning("Failed to acquire file lock (already locked by " + content
+ "), will try again in 1s: " + file.getAbsolutePath());
}
U.sleep(1000);
}
if (content == null)
content = readContent();
failMsg = "Failed to acquire file lock during " + (lockWaitTime / 1000) +
" sec, (locked by " + content + "): " + file.getAbsolutePath();
}
catch (Exception e) {
throw new IgniteCheckedException(e);
}
if (failMsg != null)
throw new IgniteCheckedException(failMsg);
}
/**
* Write node id (who captured lock) into lock file.
*
* @param content Node id.
* @throws IOException if some fail while write node it.
*/
private void writeContent(String content) throws IOException {
FileChannel ch = lockFile.getChannel();
byte[] bytes = content.getBytes();
ByteBuffer buf = ByteBuffer.allocate(bytes.length);
buf.put(bytes);
buf.flip();
ch.write(buf, 1);
ch.force(false);
}
/**
*
*/
private String readContent() throws IOException {
FileChannel ch = lockFile.getChannel();
ByteBuffer buf = ByteBuffer.allocate((int)(ch.size() - 1));
ch.read(buf, 1);
String content = new String(buf.array());
buf.clear();
return content;
}
/**
*
*/
private void release() {
U.releaseQuiet(lock);
}
/**
*
*/
private void close() {
U.closeQuiet(lockFile);
}
/**
* @return Absolute path to lock file.
*/
private String lockPath() {
return file.getAbsolutePath();
}
}
}