blob: 051d51779bd62bf66380bc006b508da1a0e1a7a1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.checkpoint;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.LongJVMPauseDetector;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.persistence.CheckpointState;
import org.apache.ignite.internal.processors.cache.persistence.DataStorageMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheOffheapManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.CheckpointMetricsTracker;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteCacheSnapshotManager;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotOperation;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.failure.FailureProcessor;
import org.apache.ignite.internal.processors.performancestatistics.PerformanceStatisticsProcessor;
import org.apache.ignite.internal.util.GridConcurrentMultiPairQueue;
import org.apache.ignite.internal.util.StripedExecutor;
import org.apache.ignite.internal.util.future.CountDownFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.internal.util.worker.GridWorker;
import org.apache.ignite.internal.util.worker.WorkProgressDispatcher;
import org.apache.ignite.internal.worker.WorkersRegistry;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentLinkedHashMap;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
import static org.apache.ignite.internal.LongJVMPauseDetector.DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.LOCK_RELEASED;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC;
import static org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager.IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP;
import static org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointReadWriteLock.CHECKPOINT_RUNNER_THREAD_PREFIX;
/**
* Checkpointer object is used for notification on checkpoint begin, predicate is {@link #scheduledCp}<code>.nextCpTs -
* now > 0 </code>. Method {@link #scheduleCheckpoint} uses notify, {@link #waitCheckpointEvent} uses wait.
*
* Checkpointer is one threaded which means that only one checkpoint at the one moment possible.
*
* Responsiblity: Provide the API for schedule/trigger the checkpoint. Schedule new checkpoint after current one
* according to checkpoint frequency. Failure handling. Managing of page write threads - ? Logging and metrics of
* checkpoint. *
*
* Checkpointer steps:
* <p> Awaiting checkpoint event. </p>
* <p> Collect all dirty pages from page memory under write lock. </p>
* <p> Start to write dirty pages to disk. </p>
* <p> Finish the checkpoint and write end marker to disk. </p>
*/
@SuppressWarnings("NakedNotify")
public class Checkpointer extends GridWorker {
/** Checkpoint started log message format. */
private static final String CHECKPOINT_STARTED_LOG_FORMAT = "Checkpoint started [" +
"checkpointId=%s, " +
"startPtr=%s, " +
"checkpointBeforeLockTime=%dms, " +
"checkpointLockWait=%dms, " +
"checkpointListenersExecuteTime=%dms, " +
"checkpointLockHoldTime=%dms, " +
"walCpRecordFsyncDuration=%dms, " +
"writeCheckpointEntryDuration=%dms, " +
"splitAndSortCpPagesDuration=%dms, " +
"%s" +
"pages=%d, " +
"reason='%s']";
/** Skip sync. */
private final boolean skipSync = getBoolean(IGNITE_PDS_CHECKPOINT_TEST_SKIP_SYNC);
/** Timeout between partition file destroy and checkpoint to handle it. */
private static final long PARTITION_DESTROY_CHECKPOINT_TIMEOUT = 30 * 1000; // 30 Seconds.
/** Avoid the start checkpoint if checkpointer was canceled. */
private volatile boolean skipCheckpointOnNodeStop = getBoolean(IGNITE_PDS_SKIP_CHECKPOINT_ON_NODE_STOP, false);
/** Long JVM pause threshold. */
private final int longJvmPauseThreshold =
getInteger(IGNITE_JVM_PAUSE_DETECTOR_THRESHOLD, DEFAULT_JVM_PAUSE_DETECTOR_THRESHOLD);
/** Pause detector. */
private final LongJVMPauseDetector pauseDetector;
/** The interval in ms after which the checkpoint is triggered if there are no other events. */
private final long checkpointFreq;
/** Failure processor. */
private final FailureProcessor failureProcessor;
/** Snapshot manager. */
private final IgniteCacheSnapshotManager snapshotMgr;
/** Metrics. */
private final DataStorageMetricsImpl persStoreMetrics;
/**
* Cache processor.
*
* @deprecated Checkpointer should not know about the cache at all.
*/
private final GridCacheProcessor cacheProcessor;
/** Strategy of where and how to get the pages. */
private final CheckpointWorkflow checkpointWorkflow;
/** Factory for the creation of page-write workers. */
private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
/** The number of IO-bound threads which will write pages to disk. */
private final int checkpointWritePageThreads;
/** Checkpoint frequency deviation. */
private final Supplier<Integer> cpFreqDeviation;
/** Checkpoint runner thread pool. If null tasks are to be run in single thread */
@Nullable private volatile IgniteThreadPoolExecutor checkpointWritePagesPool;
/** Next scheduled checkpoint progress. */
private volatile CheckpointProgressImpl scheduledCp;
/** Current checkpoint. This field is updated only by checkpoint thread. */
private volatile CheckpointProgressImpl curCpProgress;
/** Shutdown now. */
private volatile boolean shutdownNow;
/** Last checkpoint timestamp. */
private long lastCpTs;
/** Performance statistics processor. */
private final PerformanceStatisticsProcessor psproc;
/** For testing only. */
private GridFutureAdapter<Void> enableChangeApplied;
/** For testing only. */
private volatile boolean checkpointsEnabled = true;
/**
* @param gridName Grid name.
* @param name Thread name.
* @param workersRegistry Worker registry.
* @param logger Logger.
* @param detector Long JVM pause detector.
* @param failureProcessor Failure processor.
* @param snapshotManager Snapshot manager.
* @param dsMetrics Data storage metrics.
* @param cacheProcessor Cache processor.
* @param checkpoint Implementation of checkpoint.
* @param factory Page writer factory.
* @param checkpointFrequency Checkpoint frequency.
* @param checkpointWritePageThreads The number of IO-bound threads which will write pages to disk.
* @param cpFreqDeviation Deviation of checkpoint frequency.
*/
Checkpointer(
@Nullable String gridName,
String name,
WorkersRegistry workersRegistry,
Function<Class<?>, IgniteLogger> logger,
LongJVMPauseDetector detector,
FailureProcessor failureProcessor,
IgniteCacheSnapshotManager snapshotManager,
DataStorageMetricsImpl dsMetrics,
GridCacheProcessor cacheProcessor,
CheckpointWorkflow checkpoint,
CheckpointPagesWriterFactory factory,
long checkpointFrequency,
int checkpointWritePageThreads,
Supplier<Integer> cpFreqDeviation
) {
super(gridName, name, logger.apply(Checkpointer.class), workersRegistry);
this.pauseDetector = detector;
this.checkpointFreq = checkpointFrequency;
this.failureProcessor = failureProcessor;
this.snapshotMgr = snapshotManager;
this.checkpointWorkflow = checkpoint;
this.checkpointPagesWriterFactory = factory;
this.persStoreMetrics = dsMetrics;
this.cacheProcessor = cacheProcessor;
this.checkpointWritePageThreads = Math.max(checkpointWritePageThreads, 1);
this.checkpointWritePagesPool = initializeCheckpointPool();
this.cpFreqDeviation = cpFreqDeviation;
this.psproc = cacheProcessor.context().kernalContext().performanceStatistics();
scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval());
}
/**
* @return Initialized checkpoint page write pool;
*/
private IgniteThreadPoolExecutor initializeCheckpointPool() {
if (checkpointWritePageThreads > 1)
return new IgniteThreadPoolExecutor(
CHECKPOINT_RUNNER_THREAD_PREFIX + "-IO",
igniteInstanceName(),
checkpointWritePageThreads,
checkpointWritePageThreads,
30_000,
new LinkedBlockingQueue<>()
);
return null;
}
/** {@inheritDoc} */
@Override protected void body() {
Throwable err = null;
try {
while (!isCancelled()) {
waitCheckpointEvent();
if (skipCheckpointOnNodeStop && (isCancelled() || shutdownNow)) {
if (log.isInfoEnabled())
log.warning("Skipping last checkpoint because node is stopping.");
return;
}
GridFutureAdapter<Void> enableChangeApplied = this.enableChangeApplied;
if (enableChangeApplied != null) {
enableChangeApplied.onDone();
this.enableChangeApplied = null;
}
if (checkpointsEnabled)
doCheckpoint();
else {
synchronized (this) {
scheduledCp.nextCpNanos(System.nanoTime() + U.millisToNanos(nextCheckpointInterval()));
}
}
}
// Final run after the cancellation.
if (checkpointsEnabled && !shutdownNow)
doCheckpoint();
}
catch (Throwable t) {
err = t;
scheduledCp.fail(t);
throw t;
}
finally {
if (err == null && !(isCancelled))
err = new IllegalStateException("Thread is terminated unexpectedly: " + name());
if (err instanceof OutOfMemoryError)
failureProcessor.process(new FailureContext(CRITICAL_ERROR, err));
else if (err != null)
failureProcessor.process(new FailureContext(SYSTEM_WORKER_TERMINATION, err));
scheduledCp.fail(new NodeStoppingException("Node is stopping."));
}
}
/**
*
*/
public CheckpointProgress scheduleCheckpoint(long delayFromNow, String reason) {
return scheduleCheckpoint(delayFromNow, reason, null);
}
/**
* Gets a checkpoint interval with a randomized delay.
* It helps when the cluster makes a checkpoint in the same time in every node.
*
* @return Next checkpoint interval.
*/
private long nextCheckpointInterval() {
Integer deviation = cpFreqDeviation.get();
if (deviation == null || deviation == 0)
return checkpointFreq;
long startDelay = ThreadLocalRandom.current().nextLong(U.ensurePositive(U.safeAbs(checkpointFreq * deviation) / 100, 1))
- U.ensurePositive(U.safeAbs(checkpointFreq * deviation) / 200, 1);
return U.safeAbs(checkpointFreq + startDelay);
}
/**
* Change the information for a scheduled checkpoint if it was scheduled further than {@code delayFromNow}, or do
* nothing otherwise.
*
* If lsnr is not null, new checkpoint is always triggered.
*
* @return Nearest scheduled checkpoint which is not started yet(Dirty pages weren't collected yet).
*/
public <R> CheckpointProgress scheduleCheckpoint(
long delayFromNow,
String reason,
IgniteInClosure<? super IgniteInternalFuture<R>> lsnr
) {
CheckpointProgressImpl sched = curCpProgress;
//If checkpoint haven't taken the write lock yet it shouldn't trigger a new checkpoint but should return current one.
if (lsnr == null && sched != null && !sched.greaterOrEqualTo(CheckpointState.LOCK_TAKEN))
return sched;
if (lsnr != null) {
//To be sure lsnr always will be executed in checkpoint thread.
synchronized (this) {
sched = scheduledCp;
sched.futureFor(FINISHED).listen(lsnr);
}
}
sched = scheduledCp;
long nextNanos = System.nanoTime() + U.millisToNanos(delayFromNow);
if (sched.nextCpNanos() - nextNanos <= 0)
return sched;
synchronized (this) {
sched = scheduledCp;
if (sched.nextCpNanos() - nextNanos > 0) {
sched.reason(reason);
sched.nextCpNanos(nextNanos);
}
notifyAll();
}
return sched;
}
/**
* @param snapshotOperation Snapshot operation.
*/
public IgniteInternalFuture wakeupForSnapshotCreation(SnapshotOperation snapshotOperation) {
GridFutureAdapter<Object> ret;
synchronized (this) {
scheduledCp.nextCpNanos(System.nanoTime());
scheduledCp.reason("snapshot");
scheduledCp.nextSnapshot(true);
scheduledCp.snapshotOperation(snapshotOperation);
ret = scheduledCp.futureFor(LOCK_RELEASED);
notifyAll();
}
return ret;
}
/**
*
*/
private void doCheckpoint() {
Checkpoint chp = null;
try {
CheckpointMetricsTracker tracker = new CheckpointMetricsTracker();
startCheckpointProgress();
try {
chp = checkpointWorkflow.markCheckpointBegin(lastCpTs, curCpProgress, tracker, this);
}
catch (Exception e) {
if (curCpProgress != null)
curCpProgress.fail(e);
// In case of checkpoint initialization error node should be invalidated and stopped.
failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
throw new IgniteException(e); // Re-throw as unchecked exception to force stopping checkpoint thread.
}
updateHeartbeat();
currentProgress().initCounters(chp.pagesSize);
if (chp.hasDelta()) {
if (log.isInfoEnabled()) {
long possibleJvmPauseDur = possibleLongJvmPauseDuration(tracker);
if (log.isInfoEnabled())
log.info(
String.format(
CHECKPOINT_STARTED_LOG_FORMAT,
chp.cpEntry == null ? "" : chp.cpEntry.checkpointId(),
chp.cpEntry == null ? "" : chp.cpEntry.checkpointMark(),
tracker.beforeLockDuration(),
tracker.lockWaitDuration(),
tracker.listenersExecuteDuration(),
tracker.lockHoldDuration(),
tracker.walCpRecordFsyncDuration(),
tracker.writeCheckpointEntryDuration(),
tracker.splitAndSortCpPagesDuration(),
possibleJvmPauseDur > 0 ? "possibleJvmPauseDuration=" + possibleJvmPauseDur + "ms, " : "",
chp.pagesSize,
chp.progress.reason()
)
);
}
if (!writePages(tracker, chp.cpPages, chp.progress, this, this::isShutdownNow))
return;
}
else {
if (log.isInfoEnabled())
LT.info(log, String.format(
"Skipping checkpoint (no pages were modified) [" +
"checkpointBeforeLockTime=%dms, checkpointLockWait=%dms, " +
"checkpointListenersExecuteTime=%dms, checkpointLockHoldTime=%dms, reason='%s']",
tracker.beforeLockDuration(),
tracker.lockWaitDuration(),
tracker.listenersExecuteDuration(),
tracker.lockHoldDuration(),
chp.progress.reason())
);
tracker.onPagesWriteStart();
tracker.onFsyncStart();
}
snapshotMgr.afterCheckpointPageWritten();
int destroyedPartitionsCnt = destroyEvictedPartitions();
// Must mark successful checkpoint only if there are no exceptions or interrupts.
checkpointWorkflow.markCheckpointEnd(chp);
tracker.onEnd();
if (chp.hasDelta() || destroyedPartitionsCnt > 0) {
if (log.isInfoEnabled()) {
log.info(String.format("Checkpoint finished [cpId=%s, pages=%d, markPos=%s, " +
"walSegmentsCovered=%s, markDuration=%dms, pagesWrite=%dms, fsync=%dms, total=%dms]",
chp.cpEntry != null ? chp.cpEntry.checkpointId() : "",
chp.pagesSize,
chp.cpEntry != null ? chp.cpEntry.checkpointMark() : "",
walRangeStr(chp.walSegsCoveredRange),
tracker.markDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.totalDuration()));
}
}
updateMetrics(chp, tracker);
}
catch (IgniteCheckedException e) {
chp.progress.fail(e);
failureProcessor.process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
}
/**
* @param workProgressDispatcher Work progress dispatcher.
* @param tracker Checkpoint metrics tracker.
* @param cpPages List of pages to write.
* @param curCpProgress Current checkpoint data.
* @param shutdownNow Checker of stop operation.
*/
boolean writePages(
CheckpointMetricsTracker tracker,
GridConcurrentMultiPairQueue<PageMemoryEx, FullPageId> cpPages,
CheckpointProgressImpl curCpProgress,
WorkProgressDispatcher workProgressDispatcher,
BooleanSupplier shutdownNow
) throws IgniteCheckedException {
IgniteThreadPoolExecutor pageWritePool = checkpointWritePagesPool;
int checkpointWritePageThreads = pageWritePool == null ? 1 : pageWritePool.getMaximumPoolSize();
// Identity stores set.
ConcurrentLinkedHashMap<PageStore, LongAdder> updStores = new ConcurrentLinkedHashMap<>();
CountDownFuture doneWriteFut = new CountDownFuture(checkpointWritePageThreads);
tracker.onPagesWriteStart();
for (int i = 0; i < checkpointWritePageThreads; i++) {
Runnable write = checkpointPagesWriterFactory.build(
tracker,
cpPages,
updStores,
doneWriteFut,
workProgressDispatcher::updateHeartbeat,
curCpProgress,
shutdownNow
);
if (pageWritePool == null)
write.run();
else {
try {
pageWritePool.execute(write);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
write.run();
}
}
}
workProgressDispatcher.updateHeartbeat();
// Wait and check for errors.
doneWriteFut.get();
// Must re-check shutdown flag here because threads may have skipped some pages.
// If so, we should not put finish checkpoint mark.
if (shutdownNow.getAsBoolean()) {
curCpProgress.fail(new NodeStoppingException("Node is stopping."));
return false;
}
tracker.onFsyncStart();
if (!skipSync) {
for (Map.Entry<PageStore, LongAdder> updStoreEntry : updStores.entrySet()) {
if (shutdownNow.getAsBoolean()) {
curCpProgress.fail(new NodeStoppingException("Node is stopping."));
return false;
}
workProgressDispatcher.blockingSectionBegin();
try {
updStoreEntry.getKey().sync();
}
finally {
workProgressDispatcher.blockingSectionEnd();
}
curCpProgress.updateSyncedPages(updStoreEntry.getValue().intValue());
}
}
return true;
}
/**
* @param chp Checkpoint.
* @param tracker Tracker.
*/
private void updateMetrics(Checkpoint chp, CheckpointMetricsTracker tracker) {
if (psproc.enabled()) {
psproc.checkpoint(
tracker.beforeLockDuration(),
tracker.lockWaitDuration(),
tracker.listenersExecuteDuration(),
tracker.markDuration(),
tracker.lockHoldDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.walCpRecordFsyncDuration(),
tracker.writeCheckpointEntryDuration(),
tracker.splitAndSortCpPagesDuration(),
tracker.totalDuration(),
tracker.checkpointStartTime(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten());
}
if (persStoreMetrics.metricsEnabled()) {
persStoreMetrics.onCheckpoint(
tracker.beforeLockDuration(),
tracker.lockWaitDuration(),
tracker.listenersExecuteDuration(),
tracker.markDuration(),
tracker.lockHoldDuration(),
tracker.pagesWriteDuration(),
tracker.fsyncDuration(),
tracker.walCpRecordFsyncDuration(),
tracker.writeCheckpointEntryDuration(),
tracker.splitAndSortCpPagesDuration(),
tracker.totalDuration(),
tracker.checkpointStartTime(),
chp.pagesSize,
tracker.dataPagesWritten(),
tracker.cowPagesWritten()
);
}
}
/**
* Creates a string of a range WAL segments.
*
* @param walRange Range of WAL segments.
* @return The message about how many WAL segments was between previous checkpoint and current one.
*/
private String walRangeStr(@Nullable IgniteBiTuple<Long, Long> walRange) {
if (walRange == null)
return "";
String res;
long startIdx = walRange.get1();
long endIdx = walRange.get2();
if (endIdx < 0 || endIdx < startIdx)
res = "[]";
else if (endIdx == startIdx)
res = "[" + endIdx + "]";
else
res = "[" + startIdx + " - " + endIdx + "]";
return res;
}
/**
* Processes all evicted partitions scheduled for destroy.
*
* @return The number of destroyed partition files.
* @throws IgniteCheckedException If failed.
*/
private int destroyEvictedPartitions() throws IgniteCheckedException {
PartitionDestroyQueue destroyQueue = curCpProgress.getDestroyQueue();
if (destroyQueue.pendingReqs().isEmpty())
return 0;
List<PartitionDestroyRequest> reqs = null;
for (final PartitionDestroyRequest req : destroyQueue.pendingReqs().values()) {
if (!req.beginDestroy())
continue;
final int grpId = req.groupId();
final int partId = req.partitionId();
CacheGroupContext grp = cacheProcessor.cacheGroup(grpId);
assert grp != null
: "Cache group is not initialized [grpId=" + grpId + "]";
assert grp.offheap() instanceof GridCacheOffheapManager
: "Destroying partition files when persistence is off " + grp.offheap();
final GridCacheOffheapManager offheap = (GridCacheOffheapManager)grp.offheap();
Runnable destroyPartTask = () -> {
try {
offheap.destroyPartitionStore(partId);
req.onDone(null);
grp.metrics().decrementInitializedLocalPartitions();
if (log.isDebugEnabled())
log.debug("Partition file has destroyed [grpId=" + grpId + ", partId=" + partId + "]");
}
catch (Exception e) {
req.onDone(new IgniteCheckedException(
"Partition file destroy has failed [grpId=" + grpId + ", partId=" + partId + "]", e));
}
};
IgniteThreadPoolExecutor pool = checkpointWritePagesPool;
if (pool != null) {
try {
pool.execute(destroyPartTask);
}
catch (RejectedExecutionException ignore) {
// Run the task synchronously.
destroyPartTask.run();
}
}
else
destroyPartTask.run();
if (reqs == null)
reqs = new ArrayList<>();
reqs.add(req);
}
if (reqs != null)
for (PartitionDestroyRequest req : reqs)
req.waitCompleted();
destroyQueue.pendingReqs().clear();
return reqs != null ? reqs.size() : 0;
}
/**
* @param grpCtx Group context. Can be {@code null} in case of crash recovery.
* @param grpId Group ID.
* @param partId Partition ID.
*/
public void schedulePartitionDestroy(@Nullable CacheGroupContext grpCtx, int grpId, int partId) {
synchronized (this) {
scheduledCp.getDestroyQueue().addDestroyRequest(grpCtx, grpId, partId);
}
if (log.isDebugEnabled())
log.debug("Partition file has been scheduled to destroy [grpId=" + grpId + ", partId=" + partId + "]");
if (grpCtx != null)
scheduleCheckpoint(PARTITION_DESTROY_CHECKPOINT_TIMEOUT, "partition destroy");
}
/**
* @param grpId Group ID.
* @param partId Partition ID.
* @return {@code True} if the request to destroy the partition was canceled.
*/
public boolean cancelOrWaitPartitionDestroy(int grpId, int partId) throws IgniteCheckedException {
PartitionDestroyRequest req;
boolean canceled = false;
synchronized (this) {
req = scheduledCp.getDestroyQueue().removeRequest(grpId, partId);
if (req != null) {
canceled = req.cancel();
assert canceled;
}
CheckpointProgressImpl cur = curCpProgress;
if (cur != null) {
req = cur.getDestroyQueue().removeRequest(grpId, partId);
if (req != null)
canceled = req.cancel();
}
}
if (!canceled) {
if (req != null)
req.waitCompleted();
return false;
}
if (log.isDebugEnabled())
log.debug("Partition file destroy has cancelled [grpId=" + grpId + ", partId=" + partId + "]");
return true;
}
/**
* Waiting until the next checkpoint time.
*/
private void waitCheckpointEvent() {
try {
synchronized (this) {
long remaining = U.nanosToMillis(scheduledCp.nextCpNanos() - System.nanoTime());
while (remaining > 0 && !isCancelled()) {
blockingSectionBegin();
try {
wait(remaining);
remaining = U.nanosToMillis(scheduledCp.nextCpNanos() - System.nanoTime());
}
finally {
blockingSectionEnd();
}
}
}
}
catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
isCancelled = true;
}
}
/**
* @param tracker Checkpoint metrics tracker.
* @return Duration of possible JVM pause, if it was detected, or {@code -1} otherwise.
*/
private long possibleLongJvmPauseDuration(CheckpointMetricsTracker tracker) {
if (LongJVMPauseDetector.enabled()) {
if (tracker.lockWaitDuration() + tracker.lockHoldDuration() > longJvmPauseThreshold) {
long now = System.currentTimeMillis();
// We must get last wake up time before search possible pause in events map.
long wakeUpTime = pauseDetector.getLastWakeUpTime();
IgniteBiTuple<Long, Long> lastLongPause = pauseDetector.getLastLongPause();
if (lastLongPause != null && tracker.checkpointStartTime() < lastLongPause.get1())
return lastLongPause.get2();
if (now - wakeUpTime > longJvmPauseThreshold)
return now - wakeUpTime;
}
}
return -1L;
}
/**
* Update the current checkpoint info from the scheduled one.
*/
private void startCheckpointProgress() {
long cpTs = System.currentTimeMillis();
// This can happen in an unlikely event of two checkpoints happening
// within a currentTimeMillis() granularity window.
if (cpTs == lastCpTs)
cpTs++;
lastCpTs = cpTs;
synchronized (this) {
CheckpointProgressImpl curr = scheduledCp;
if (curr.reason() == null)
curr.reason("timeout");
// It is important that we assign a new progress object before checkpoint mark in page memory.
scheduledCp = new CheckpointProgressImpl(nextCheckpointInterval());
curCpProgress = curr;
}
}
/** {@inheritDoc} */
@Override public void cancel() {
if (log.isDebugEnabled())
log.debug("Cancelling grid runnable: " + this);
// Do not interrupt runner thread.
isCancelled = true;
synchronized (this) {
notifyAll();
}
}
/**
* For test use only.
*
* @deprecated Should be rewritten to public API.
*/
public IgniteInternalFuture<Void> enableCheckpoints(boolean enable) {
GridFutureAdapter<Void> fut = new GridFutureAdapter<>();
enableChangeApplied = fut;
checkpointsEnabled = enable;
return fut;
}
/**
* Stopping all checkpoint activity immediately even if the current checkpoint is in progress.
*/
public void shutdownNow() {
shutdownNow = true;
if (!isCancelled)
cancel();
}
/**
* Restart worker in IgniteThread.
*/
public void start() {
if (runner() != null)
return;
assert runner() == null : "Checkpointer is running.";
new IgniteThread(this).start();
}
/**
* @param cancel Cancel flag.
*/
@SuppressWarnings("unused")
public void shutdownCheckpointer(boolean cancel) {
if (cancel)
shutdownNow();
else
cancel();
try {
U.join(this);
}
catch (IgniteInterruptedCheckedException ignore) {
U.warn(log, "Was interrupted while waiting for checkpointer shutdown, " +
"will not wait for checkpoint to finish.");
shutdownNow();
while (true) {
try {
U.join(this);
scheduledCp.fail(new NodeStoppingException("Checkpointer is stopped during node stop."));
break;
}
catch (IgniteInterruptedCheckedException ignored) {
//Ignore
}
}
Thread.currentThread().interrupt();
}
IgniteThreadPoolExecutor pool = checkpointWritePagesPool;
if (pool != null) {
pool.shutdownNow();
try {
pool.awaitTermination(2, TimeUnit.MINUTES);
}
catch (InterruptedException ignore) {
Thread.currentThread().interrupt();
}
checkpointWritePagesPool = null;
}
}
/**
* @throws IgniteCheckedException If failed.
*/
public void finalizeCheckpointOnRecovery(
long cpTs,
UUID cpId,
WALPointer walPtr,
StripedExecutor exec
) throws IgniteCheckedException {
checkpointWorkflow.finalizeCheckpointOnRecovery(cpTs, cpId, walPtr, exec, checkpointPagesWriterFactory);
}
/**
* @return Progress of current chekpoint, last finished one or {@code null}, if checkpoint has never started.
*/
public CheckpointProgress currentProgress() {
return curCpProgress;
}
/**
* @return {@code True} if checkpoint should be stopped immediately.
*/
private boolean isShutdownNow() {
return shutdownNow;
}
/**
* Skip checkpoint on node stop.
*
* @param skip If {@code true} skips checkpoint on node stop.
*/
public void skipCheckpointOnNodeStop(boolean skip) {
skipCheckpointOnNodeStop = skip;
}
}