blob: 9912e0a35b95853d9f276da5def9f38f46a43c27 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.pagememory.persistence.checkpoint;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.MUST_TRIGGER;
import static org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency.NOT_REQUIRED;
import static org.apache.ignite.internal.util.IgniteUtils.closeAll;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.FullPageId;
import org.apache.ignite.internal.pagememory.PageMemory;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointConfiguration;
import org.apache.ignite.internal.pagememory.configuration.schema.PageMemoryCheckpointView;
import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
import org.apache.ignite.internal.pagememory.persistence.CheckpointUrgency;
import org.apache.ignite.internal.pagememory.persistence.GroupPartitionId;
import org.apache.ignite.internal.pagememory.persistence.PartitionMetaManager;
import org.apache.ignite.internal.pagememory.persistence.PersistentPageMemory;
import org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointDirtyPages.CheckpointDirtyPagesView;
import org.apache.ignite.internal.pagememory.persistence.compaction.Compactor;
import org.apache.ignite.internal.pagememory.persistence.store.DeltaFilePageStoreIo;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStore;
import org.apache.ignite.internal.pagememory.persistence.store.FilePageStoreManager;
import org.apache.ignite.internal.util.worker.IgniteWorkerListener;
import org.jetbrains.annotations.Nullable;
/**
* Main class to abstract checkpoint-related processes and actions and hide them from higher-level components.
*
* <p>Implements sharp checkpointing algorithm.
*
* <p>Represents only an intermediate step in refactoring of checkpointing component and may change in the future.
*
* <p>This checkpoint ensures that all pages marked as dirty under {@link #checkpointTimeoutLock} will be consistently saved to disk.
*
* <p>Configuration of this checkpoint allows the following:
* <ul>
* <li>Collecting all pages from configured dataRegions which was marked as dirty under {@link #checkpointTimeoutLock}.</li>
* <li>Marking the start of checkpoint on disk.</li>
* <li>Notifying the subscribers of different checkpoint states through {@link CheckpointListener}.</li>
* <li>Synchronizing collected pages with disk using {@link FilePageStoreManager}.</li>
* </ul>
*/
public class CheckpointManager {
/** Checkpoint worker. */
private final Checkpointer checkpointer;
/** Main checkpoint steps. */
private final CheckpointWorkflow checkpointWorkflow;
/** Timeout checkpoint lock which should be used while write to memory happened. */
private final CheckpointTimeoutLock checkpointTimeoutLock;
/** Checkpoint page writer factory. */
private final CheckpointPagesWriterFactory checkpointPagesWriterFactory;
/** File page store manager. */
private final FilePageStoreManager filePageStoreManager;
/** Delta file compactor. */
private final Compactor compactor;
/**
* Constructor.
*
* @param igniteInstanceName Ignite instance name.
* @param checkpointConfig Checkpoint configuration.
* @param workerListener Listener for life-cycle checkpoint worker events.
* @param longJvmPauseDetector Long JVM pause detector.
* @param failureProcessor Failure processor that is used to handle critical errors.
* @param filePageStoreManager File page store manager.
* @param partitionMetaManager Partition meta information manager.
* @param dataRegions Data regions.
* @param ioRegistry Page IO registry.
* @param pageSize Page size in bytes.
* @throws IgniteInternalCheckedException If failed.
*/
public CheckpointManager(
String igniteInstanceName,
@Nullable IgniteWorkerListener workerListener,
@Nullable LongJvmPauseDetector longJvmPauseDetector,
FailureProcessor failureProcessor,
PageMemoryCheckpointConfiguration checkpointConfig,
FilePageStoreManager filePageStoreManager,
PartitionMetaManager partitionMetaManager,
Collection<? extends DataRegion<PersistentPageMemory>> dataRegions,
PageIoRegistry ioRegistry,
LogSyncer logSyncer,
// TODO: IGNITE-17017 Move to common config
int pageSize
) throws IgniteInternalCheckedException {
this.filePageStoreManager = filePageStoreManager;
PageMemoryCheckpointView checkpointConfigView = checkpointConfig.value();
long logReadLockThresholdTimeout = checkpointConfigView.logReadLockThresholdTimeout();
ReentrantReadWriteLockWithTracking reentrantReadWriteLockWithTracking = logReadLockThresholdTimeout > 0
? new ReentrantReadWriteLockWithTracking(Loggers.forClass(CheckpointReadWriteLock.class), logReadLockThresholdTimeout)
: new ReentrantReadWriteLockWithTracking();
CheckpointReadWriteLock checkpointReadWriteLock = new CheckpointReadWriteLock(reentrantReadWriteLockWithTracking);
checkpointWorkflow = new CheckpointWorkflow(
igniteInstanceName,
checkpointReadWriteLock,
dataRegions,
checkpointConfigView.checkpointThreads()
);
checkpointPagesWriterFactory = new CheckpointPagesWriterFactory(
(pageMemory, fullPageId, pageBuf) -> writePageToDeltaFilePageStore(pageMemory, fullPageId, pageBuf, true),
ioRegistry,
partitionMetaManager,
pageSize
);
compactor = new Compactor(
Loggers.forClass(Compactor.class),
igniteInstanceName,
workerListener,
checkpointConfig.compactionThreads(),
filePageStoreManager,
pageSize,
failureProcessor
);
checkpointer = new Checkpointer(
igniteInstanceName,
workerListener,
longJvmPauseDetector,
failureProcessor,
checkpointWorkflow,
checkpointPagesWriterFactory,
filePageStoreManager,
compactor,
checkpointConfig,
logSyncer
);
checkpointTimeoutLock = new CheckpointTimeoutLock(
checkpointReadWriteLock,
checkpointConfigView.readLockTimeout(),
() -> checkpointUrgency(dataRegions),
checkpointer,
failureProcessor
);
}
/**
* Starts a checkpoint manger.
*/
public void start() {
checkpointWorkflow.start();
checkpointer.start();
checkpointTimeoutLock.start();
compactor.start();
}
/**
* Stops a checkpoint manger.
*/
public void stop() throws Exception {
closeAll(
checkpointTimeoutLock::stop,
checkpointer::stop,
checkpointWorkflow::stop,
compactor::stop
);
}
/**
* Returns checkpoint timeout lock which can be used for protection of writing to memory.
*/
public CheckpointTimeoutLock checkpointTimeoutLock() {
return checkpointTimeoutLock;
}
/**
* Adds a listener to be called for the corresponding persistent data region.
*
* @param listener Listener.
* @param dataRegion Persistent data region for which listener is corresponded to, {@code null} for all regions.
*/
public void addCheckpointListener(CheckpointListener listener, @Nullable DataRegion<PersistentPageMemory> dataRegion) {
checkpointWorkflow.addCheckpointListener(listener, dataRegion);
}
/**
* Removes the listener.
*
* @param listener Listener.
*/
public void removeCheckpointListener(CheckpointListener listener) {
checkpointWorkflow.removeCheckpointListener(listener);
}
/**
* Start the new checkpoint immediately.
*
* @param reason Checkpoint reason.
* @return Triggered checkpoint progress.
*/
public CheckpointProgress forceCheckpoint(String reason) {
return checkpointer.scheduleCheckpoint(0, reason);
}
/**
* Schedules a checkpoint in the future.
*
* @param delayMillis Delay in milliseconds from the curent moment.
* @param reason Checkpoint reason.
* @return Triggered checkpoint progress.
*/
public CheckpointProgress scheduleCheckpoint(long delayMillis, String reason) {
return checkpointer.scheduleCheckpoint(delayMillis, reason);
}
/**
* Returns the progress of the last checkpoint, or the current checkpoint if in progress, {@code null} if no checkpoint has occurred.
*/
public @Nullable CheckpointProgress lastCheckpointProgress() {
return checkpointer.lastCheckpointProgress();
}
/**
* Marks partition as dirty, forcing partition's meta-page to be written on disk during next checkpoint.
*/
public void markPartitionAsDirty(DataRegion<?> dataRegion, int groupId, int partitionId) {
checkpointer.markPartitionAsDirty(dataRegion, groupId, partitionId);
}
/**
* Returns checkpoint urgency status. {@link CheckpointUrgency#NOT_REQUIRED} if it is safe for all {@link DataRegion data regions} to
* update their {@link PageMemory}.
*
* @param dataRegions Data regions.
* @see PersistentPageMemory#checkpointUrgency()
* @see CheckpointUrgency
*/
static CheckpointUrgency checkpointUrgency(Collection<? extends DataRegion<PersistentPageMemory>> dataRegions) {
CheckpointUrgency urgency = NOT_REQUIRED;
for (DataRegion<PersistentPageMemory> dataRegion : dataRegions) {
CheckpointUrgency regionCheckpointUrgency = dataRegion.pageMemory().checkpointUrgency();
if (regionCheckpointUrgency.compareTo(urgency) > 0) {
urgency = regionCheckpointUrgency;
}
if (urgency == MUST_TRIGGER) {
return MUST_TRIGGER;
}
}
return urgency;
}
/**
* Writes a page to delta file page store.
*
* <p>Must be used at breakpoint and page replacement.
*
* @param pageMemory Page memory.
* @param pageId Page ID.
* @param pageBuf Page buffer to write from.
* @param calculateCrc If {@code false} crc calculation will be forcibly skipped.
* @throws IgniteInternalCheckedException If page writing failed (IO error occurred).
*/
public void writePageToDeltaFilePageStore(
PersistentPageMemory pageMemory,
FullPageId pageId,
ByteBuffer pageBuf,
boolean calculateCrc
) throws IgniteInternalCheckedException {
FilePageStore filePageStore = filePageStoreManager.getStore(new GroupPartitionId(pageId.groupId(), pageId.partitionId()));
// If the partition is deleted (or will be soon), then such writes to the disk should be skipped.
if (filePageStore == null || filePageStore.isMarkedToDestroy()) {
return;
}
CheckpointProgress lastCheckpointProgress = lastCheckpointProgress();
assert lastCheckpointProgress != null : "Checkpoint has not happened yet";
assert lastCheckpointProgress.inProgress() : "Checkpoint must be in progress";
CheckpointDirtyPages pagesToWrite = lastCheckpointProgress.pagesToWrite();
assert pagesToWrite != null : "Dirty pages must be sorted out";
CompletableFuture<DeltaFilePageStoreIo> deltaFilePageStoreFuture = filePageStore.getOrCreateNewDeltaFile(
index -> filePageStoreManager.tmpDeltaFilePageStorePath(pageId.groupId(), pageId.partitionId(), index),
() -> pageIndexesForDeltaFilePageStore(pagesToWrite.getPartitionView(pageMemory, pageId.groupId(), pageId.partitionId()))
);
deltaFilePageStoreFuture.join().write(pageId.pageId(), pageBuf, calculateCrc);
}
/**
* Returns the indexes of the dirty pages to be written to the delta file page store.
*
* @param partitionDirtyPages Dirty pages of the partition.
*/
static int[] pageIndexesForDeltaFilePageStore(CheckpointDirtyPagesView partitionDirtyPages) {
// If there is no partition meta page among the dirty pages, then we add an additional page to the result.
int offset = partitionDirtyPages.get(0).pageIdx() == 0 ? 0 : 1;
int[] pageIndexes = new int[partitionDirtyPages.size() + offset];
for (int i = 0; i < partitionDirtyPages.size(); i++) {
pageIndexes[i + offset] = partitionDirtyPages.get(i).pageIdx();
}
return pageIndexes;
}
/**
* Triggers compacting for new delta files.
*/
public void triggerCompaction() {
compactor.triggerCompaction();
}
/**
* Callback on destruction of the partition of the corresponding group.
*
* <p>Prepares the checkpointer and compactor for partition destruction.
*
* @param groupPartitionId Pair of group ID with partition ID.
* @return Future that will complete when the callback completes.
*/
public CompletableFuture<Void> onPartitionDestruction(GroupPartitionId groupPartitionId) {
return CompletableFuture.allOf(
checkpointer.prepareToDestroyPartition(groupPartitionId),
compactor.prepareToDestroyPartition(groupPartitionId)
);
}
}