IGNITE-12081 Page replacement can reload invalid page during checkpoint - Fixes #6787.

Signed-off-by: Dmitriy Pavlov <dpavlov@apache.org>
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
index d3a465d..7be735f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/wal/record/PageSnapshot.java
@@ -19,8 +19,6 @@
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
-import java.util.Arrays;
-import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.util.GridUnsafe;
@@ -101,10 +99,6 @@
                 + "],\nsuper = ["
                 + super.toString() + "]]";
         }
-        catch (IgniteCheckedException ignored) {
-            return "Error during call'toString' of PageSnapshot [fullPageId=" + fullPageId() +
-                ", pageData = " + Arrays.toString(pageData) + ", super=" + super.toString() + "]";
-        }
         finally {
             GridUnsafe.cleanDirectBuffer(buf);
         }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
index c4abf8f..4df13a7 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/GridCacheDatabaseSharedManager.java
@@ -177,7 +177,10 @@
 import static org.apache.ignite.failure.FailureType.SYSTEM_WORKER_TERMINATION;
 import static org.apache.ignite.internal.pagemem.wal.record.WALRecord.RecordType.CHECKPOINT_RECORD;
 import static org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage.METASTORAGE_CACHE_ID;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getType;
+import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.getVersion;
 import static org.apache.ignite.internal.util.IgniteUtils.checkpointBufferSize;
+import static org.apache.ignite.internal.util.IgniteUtils.hexLong;
 
 /**
  *
@@ -2724,6 +2727,19 @@
 
         int cpPagesCnt = 0;
 
+        PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+            assert tag != PageMemoryImpl.TRY_AGAIN_TAG : "Lock is held by other thread for page " + fullPageId;
+
+            int groupId = fullPageId.groupId();
+            long pageId = fullPageId.pageId();
+
+            // Write buf to page store.
+            PageStore store = storeMgr.writeInternal(groupId, pageId, buf, tag, true);
+
+            // Save store for future fsync.
+            updStores.add(store);
+        };
+
         for (IgniteBiTuple<PageMemory, Collection<FullPageId>> e : cpEntities) {
             PageMemoryEx pageMem = (PageMemoryEx)e.get1();
 
@@ -2734,20 +2750,9 @@
             for (FullPageId fullId : cpPages) {
                 tmpWriteBuf.rewind();
 
-                Integer tag = pageMem.getForCheckpoint(fullId, tmpWriteBuf, null);
-
-                assert tag == null || tag != PageMemoryImpl.TRY_AGAIN_TAG :
-                        "Lock is held by other thread for page " + fullId;
-
-                if (tag != null) {
-                    tmpWriteBuf.rewind();
-
-                    PageStore store = storeMgr.writeInternal(fullId.groupId(), fullId.pageId(), tmpWriteBuf, tag, true);
-
-                    tmpWriteBuf.rewind();
-
-                    updStores.add(store);
-                }
+                // Write page content to page store via pageStoreWriter.
+                // Tracker is null, because no need to track checkpoint metrics on recovery.
+                pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, null);
             }
         }
 
@@ -4135,12 +4140,14 @@
          * @return pagesToRetry Pages which should be retried.
          */
         private List<FullPageId> writePages(Collection<FullPageId> writePageIds) throws IgniteCheckedException {
-            ByteBuffer tmpWriteBuf = threadBuf.get();
-
-            long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
-
             List<FullPageId> pagesToRetry = new ArrayList<>();
 
+            CheckpointMetricsTracker tracker = persStoreMetrics.metricsEnabled() ? this.tracker : null;
+
+            PageStoreWriter pageStoreWriter = createPageStoreWriter(pagesToRetry);
+
+            ByteBuffer tmpWriteBuf = threadBuf.get();
+
             for (FullPageId fullId : writePageIds) {
                 if (checkpointer.shutdownNow)
                     break;
@@ -4171,18 +4178,35 @@
                     pageMem = (PageMemoryEx)region.pageMemory();
                 }
 
-                Integer tag = pageMem.getForCheckpoint(
-                    fullId, tmpWriteBuf, persStoreMetrics.metricsEnabled() ? tracker : null);
+                pageMem.checkpointWritePage(fullId, tmpWriteBuf, pageStoreWriter, tracker);
+            }
 
-                if (tag != null) {
+            return pagesToRetry;
+        }
+
+        /**
+         * Factory method for create {@link PageStoreWriter}.
+         *
+         * @param pagesToRetry List pages for retry.
+         * @return Checkpoint page write context.
+         */
+        private PageStoreWriter createPageStoreWriter(List<FullPageId> pagesToRetry) {
+            return new PageStoreWriter() {
+                /** {@inheritDoc} */
+                @Override public void writePage(FullPageId fullPageId, ByteBuffer tmpWriteBuf, int tag) throws IgniteCheckedException {
                     if (tag == PageMemoryImpl.TRY_AGAIN_TAG) {
-                        pagesToRetry.add(fullId);
+                        pagesToRetry.add(fullPageId);
 
-                        continue;
+                        return;
                     }
 
-                    assert PageIO.getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
-                    assert PageIO.getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+                    long writeAddr = GridUnsafe.bufferAddress(tmpWriteBuf);
+
+                    int grpId = fullPageId.groupId();
+                    long pageId = fullPageId.pageId();
+
+                    assert getType(tmpWriteBuf) != 0 : "Invalid state. Type is 0! pageId = " + hexLong(pageId);
+                    assert getVersion(tmpWriteBuf) != 0 : "Invalid state. Version is 0! pageId = " + hexLong(pageId);
 
                     tmpWriteBuf.rewind();
 
@@ -4201,17 +4225,15 @@
 
                     int curWrittenPages = writtenPagesCntr.incrementAndGet();
 
-                    snapshotMgr.onPageWrite(fullId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
+                    snapshotMgr.onPageWrite(fullPageId, tmpWriteBuf, curWrittenPages, totalPagesToWrite);
 
                     tmpWriteBuf.rewind();
 
-                    PageStore store = storeMgr.writeInternal(grpId, fullId.pageId(), tmpWriteBuf, tag, false);
+                    PageStore store = storeMgr.writeInternal(grpId, pageId, tmpWriteBuf, tag, false);
 
                     updStores.computeIfAbsent(store, k -> new LongAdder()).increment();
                 }
-            }
-
-            return pagesToRetry;
+            };
         }
     }
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
new file mode 100644
index 0000000..2808a48
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/PageStoreWriter.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.persistence;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.pagemem.store.PageStore;
+import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
+
+/**
+ * Interface for write page to {@link PageStore}.
+ */
+public interface PageStoreWriter {
+    /**
+     * Callback for write page. {@link PageMemoryEx} will copy page content to buffer before call.
+     *
+     * @param fullPageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
+     *      the {@link PageMemoryEx#beginCheckpoint()} method call.
+     * @param buf Temporary buffer to write changes into.
+     * @param tag  {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
+     * @throws IgniteCheckedException If write page failed.
+     */
+    void writePage(FullPageId fullPageId, ByteBuffer buf, int tag) throws IgniteCheckedException;
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
similarity index 89%
rename from modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
rename to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
index b08ddc2..2061b4a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageWrite.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedDirtyPageStoreWrite.java
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.util.GridUnsafe;
 import org.jetbrains.annotations.Nullable;
 
@@ -28,9 +29,9 @@
  * content without holding segment lock. Page data is copied into temp buffer during {@link #writePage(FullPageId,
  * ByteBuffer, int)} and then sent to real implementation by {@link #finishReplacement()}.
  */
-public class DelayedDirtyPageWrite implements ReplacedPageWriter {
+public class DelayedDirtyPageStoreWrite implements PageStoreWriter {
     /** Real flush dirty page implementation. */
-    private final ReplacedPageWriter flushDirtyPage;
+    private final PageStoreWriter flushDirtyPage;
 
     /** Page size. */
     private final int pageSize;
@@ -56,9 +57,12 @@
      * @param pageSize page size.
      * @param tracker tracker to lock/unlock page reads.
      */
-    public DelayedDirtyPageWrite(ReplacedPageWriter flushDirtyPage,
-        ThreadLocal<ByteBuffer> byteBufThreadLoc, int pageSize,
-        DelayedPageReplacementTracker tracker) {
+    public DelayedDirtyPageStoreWrite(
+        PageStoreWriter flushDirtyPage,
+        ThreadLocal<ByteBuffer> byteBufThreadLoc,
+        int pageSize,
+        DelayedPageReplacementTracker tracker
+    ) {
         this.flushDirtyPage = flushDirtyPage;
         this.pageSize = pageSize;
         this.byteBufThreadLoc = byteBufThreadLoc;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
index aa1b061..e1d9137 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/DelayedPageReplacementTracker.java
@@ -26,6 +26,7 @@
 import org.apache.ignite.IgniteInterruptedException;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.pagemem.FullPageId;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 
 /**
  * Delayed page writes tracker. Provides delayed write implementations and allows to check if page is actually being
@@ -36,7 +37,7 @@
     private final int pageSize;
 
     /** Flush dirty page real implementation. */
-    private final ReplacedPageWriter flushDirtyPage;
+    private final PageStoreWriter flushDirtyPage;
 
     /** Logger. */
     private final IgniteLogger log;
@@ -57,11 +58,11 @@
     };
 
     /**
-     * Dirty page write for replacement operations thread local. Because page write {@link DelayedDirtyPageWrite} is
+     * Dirty page write for replacement operations thread local. Because page write {@link PageStoreWriter} is
      * stateful and not thread safe, this thread local protects from GC pressure on pages replacement. <br> Map is used
      * instead of build-in thread local to allow GC to remove delayed writers for alive threads after node stop.
      */
-    private final Map<Long, DelayedDirtyPageWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
+    private final Map<Long, DelayedDirtyPageStoreWrite> delayedPageWriteThreadLocMap = new ConcurrentHashMap<>();
 
     /**
      * @param pageSize Page size.
@@ -69,8 +70,12 @@
      * @param log Logger.
      * @param segmentCnt Segments count.
      */
-    public DelayedPageReplacementTracker(int pageSize, ReplacedPageWriter flushDirtyPage,
-        IgniteLogger log, int segmentCnt) {
+    public DelayedPageReplacementTracker(
+        int pageSize,
+        PageStoreWriter flushDirtyPage,
+        IgniteLogger log,
+        int segmentCnt
+    ) {
         this.pageSize = pageSize;
         this.flushDirtyPage = flushDirtyPage;
         this.log = log;
@@ -83,9 +88,9 @@
     /**
      * @return delayed page write implementation, finish method to be called to actually write page.
      */
-    public DelayedDirtyPageWrite delayedPageWrite() {
+    public DelayedDirtyPageStoreWrite delayedPageWrite() {
         return delayedPageWriteThreadLocMap.computeIfAbsent(Thread.currentThread().getId(),
-            id -> new DelayedDirtyPageWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
+            id -> new DelayedDirtyPageStoreWrite(flushDirtyPage, byteBufThreadLoc, pageSize, this));
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
index af204dd..5cd848f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryEx.java
@@ -23,9 +23,9 @@
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.pagemem.FullPageId;
 import org.apache.ignite.internal.pagemem.PageMemory;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
-import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -112,17 +112,22 @@
     public void finishCheckpoint();
 
     /**
-     * Gets page byte buffer for the checkpoint procedure.
+     * Prepare page for write during checkpoint.
+     *{@link PageStoreWriter} will be called when the page will be ready to write.
      *
      * @param pageId Page ID to get byte buffer for. The page ID must be present in the collection returned by
      *      the {@link #beginCheckpoint()} method call.
-     * @param outBuf Temporary buffer to write changes into.
+     * @param buf Temporary buffer to write changes into.
+     * @param pageWriter Checkpoint page write context.
      * @param tracker Checkpoint metrics tracker.
-     * @return {@code Partition generation} if data was read, {@code null} otherwise (data already saved to storage).
-     * @throws IgniteException If failed to obtain page data.
+     * @throws IgniteCheckedException If failed to obtain page data.
      */
-    @Nullable public Integer getForCheckpoint(FullPageId pageId, ByteBuffer outBuf, CheckpointMetricsTracker tracker);
-
+    public void checkpointWritePage(
+        FullPageId pageId,
+        ByteBuffer buf,
+        PageStoreWriter pageWriter,
+        CheckpointMetricsTracker tracker
+    ) throws IgniteCheckedException;
     /**
      * Marks partition as invalid / outdated.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
index 6e00a10..c339ab6 100755
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImpl.java
@@ -64,6 +64,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.processors.cache.persistence.StorageException;
 import org.apache.ignite.internal.processors.cache.persistence.freelist.io.PagesListMetaIO;
 import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
@@ -85,7 +86,6 @@
 import org.apache.ignite.internal.util.typedef.internal.CU;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteBiTuple;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -246,7 +246,7 @@
     private OffheapReadWriteLock rwLock;
 
     /** Flush dirty page closure. When possible, will be called by evictPage(). */
-    private final ReplacedPageWriter flushDirtyPage;
+    private final PageStoreWriter flushDirtyPage;
 
     /**
      * Delayed page replacement (rotation with disk) tracker. Because other thread may require exactly the same page to be loaded from store,
@@ -301,7 +301,7 @@
         long[] sizes,
         GridCacheSharedContext<?, ?> ctx,
         int pageSize,
-        ReplacedPageWriter flushDirtyPage,
+        PageStoreWriter flushDirtyPage,
         @Nullable GridInClosure3X<Long, FullPageId, PageMemoryEx> changeTracker,
         CheckpointLockStateChecker stateChecker,
         DataRegionMetricsImpl memMetrics,
@@ -523,7 +523,7 @@
         // because there is no crc inside them.
         Segment seg = segment(grpId, pageId);
 
-        DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null
+        DelayedDirtyPageStoreWrite delayedWriter = delayedPageReplacementTracker != null
             ? delayedPageReplacementTracker.delayedPageWrite() : null;
 
         FullPageId fullId = new FullPageId(pageId, grpId);
@@ -717,7 +717,7 @@
             seg.readLock().unlock();
         }
 
-        DelayedDirtyPageWrite delayedWriter = delayedPageReplacementTracker != null
+        DelayedDirtyPageStoreWrite delayedWriter = delayedPageReplacementTracker != null
             ? delayedPageReplacementTracker.delayedPageWrite() : null;
 
         seg.writeLock().lock();
@@ -1108,8 +1108,13 @@
     }
 
     /** {@inheritDoc} */
-    @Override public Integer getForCheckpoint(FullPageId fullId, ByteBuffer outBuf, CheckpointMetricsTracker tracker) {
-        assert outBuf.remaining() == pageSize();
+    @Override public void checkpointWritePage(
+        FullPageId fullId,
+        ByteBuffer buf,
+        PageStoreWriter pageStoreWriter,
+        CheckpointMetricsTracker metricsTracker
+    ) throws IgniteCheckedException {
+        assert buf.remaining() == pageSize();
 
         Segment seg = segment(fullId.groupId(), fullId.pageId());
 
@@ -1125,21 +1130,13 @@
 
         try {
             if (!isInCheckpoint(fullId))
-                return null;
+                return;
 
-            tag = seg.partGeneration(fullId.groupId(), PageIdUtils.partId(fullId.pageId()));
-
-            relPtr = seg.loadedPages.get(
-                fullId.groupId(),
-                PageIdUtils.effectivePageId(fullId.pageId()),
-                tag,
-                INVALID_REL_PTR,
-                OUTDATED_REL_PTR
-            );
+            relPtr = resolveRelativePointer(seg, fullId, tag = generationTag(seg, fullId));
 
             // Page may have been cleared during eviction. We have nothing to do in this case.
             if (relPtr == INVALID_REL_PTR)
-                return null;
+                return;
 
             if (relPtr != OUTDATED_REL_PTR) {
                 absPtr = seg.absolute(relPtr);
@@ -1160,19 +1157,10 @@
 
             try {
                 // Double-check.
-                relPtr = seg.loadedPages.get(
-                    fullId.groupId(),
-                    PageIdUtils.effectivePageId(fullId.pageId()),
-                    seg.partGeneration(
-                        fullId.groupId(),
-                        PageIdUtils.partId(fullId.pageId())
-                    ),
-                    INVALID_REL_PTR,
-                    OUTDATED_REL_PTR
-                );
+                relPtr = resolveRelativePointer(seg, fullId, generationTag(seg, fullId));
 
                 if (relPtr == INVALID_REL_PTR)
-                    return null;
+                    return;
 
                 if (relPtr == OUTDATED_REL_PTR) {
                     relPtr = refreshOutdatedPage(
@@ -1185,36 +1173,40 @@
                     seg.pool.releaseFreePage(relPtr);
                 }
 
-                return null;
+                return;
             }
             finally {
                 seg.writeLock().unlock();
             }
         }
-        else
-            return copyPageForCheckpoint(absPtr, fullId, outBuf, pageSingleAcquire, tracker) ? tag : TRY_AGAIN_TAG;
+
+        copyPageForCheckpoint(absPtr, fullId, buf, tag, pageSingleAcquire, pageStoreWriter, metricsTracker);
     }
 
     /**
      * @param absPtr Absolute ptr.
      * @param fullId Full id.
-     * @param outBuf Output buffer to write page content into.
+     * @param buf Buffer for copy page content for future write via {@link PageStoreWriter}.
      * @param pageSingleAcquire Page is acquired only once. We don't pin the page second time (until page will not be
      * copied) in case checkpoint temporary buffer is used.
-     * @param tracker Checkpoint statistics tracker.
-     *
-     * @return False if someone else holds lock on page.
+     * @param pageStoreWriter Checkpoint page write context.
      */
-    private boolean copyPageForCheckpoint(
+    private void copyPageForCheckpoint(
         long absPtr,
         FullPageId fullId,
-        ByteBuffer outBuf,
+        ByteBuffer buf,
+        Integer tag,
         boolean pageSingleAcquire,
+        PageStoreWriter pageStoreWriter,
         CheckpointMetricsTracker tracker
-    ) {
+    ) throws IgniteCheckedException {
         assert absPtr != 0;
         assert PageHeader.isAcquired(absPtr);
 
+        // Exception protection flag.
+        // No need to write if exception occurred.
+        boolean canWrite = false;
+
         boolean locked = rwLock.tryWriteLock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
 
         if (!locked) {
@@ -1223,7 +1215,11 @@
             if (!pageSingleAcquire)
                 PageHeader.releasePage(absPtr);
 
-            return false;
+            buf.clear();
+
+            pageStoreWriter.writePage(fullId, buf, TRY_AGAIN_TAG);
+
+            return;
         }
 
         try {
@@ -1238,7 +1234,7 @@
 
                 long tmpAbsPtr = checkpointPool.absolute(tmpRelPtr);
 
-                copyInBuffer(tmpAbsPtr, outBuf);
+                copyInBuffer(tmpAbsPtr, buf);
 
                 GridUnsafe.setMemory(tmpAbsPtr + PAGE_OVERHEAD, pageSize(), (byte)0);
 
@@ -1251,24 +1247,31 @@
                 // and page did not have tmp buffer page.
                 if (!pageSingleAcquire)
                     PageHeader.releasePage(absPtr);
-
             }
             else {
-                copyInBuffer(absPtr, outBuf);
+                copyInBuffer(absPtr, buf);
 
                 PageHeader.dirty(absPtr, false);
             }
 
-            assert PageIO.getType(outBuf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
-            assert PageIO.getVersion(outBuf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
+            assert PageIO.getType(buf) != 0 : "Invalid state. Type is 0! pageId = " + U.hexLong(fullId.pageId());
+            assert PageIO.getVersion(buf) != 0 : "Invalid state. Version is 0! pageId = " + U.hexLong(fullId.pageId());
 
-            memMetrics.onPageWritten();
-
-            return true;
+            canWrite = true;
         }
         finally {
             rwLock.writeUnlock(absPtr + PAGE_LOCK_OFFSET, OffheapReadWriteLock.TAG_LOCK_ALWAYS);
 
+            if (canWrite) {
+                buf.rewind();
+
+                pageStoreWriter.writePage(fullId, buf, tag);
+
+                memMetrics.onPageWritten();
+
+                buf.rewind();
+            }
+
             // We pinned the page either when allocated the temp buffer, or when resolved abs pointer.
             // Must release the page only after write unlock.
             PageHeader.releasePage(absPtr);
@@ -1298,6 +1301,38 @@
         }
     }
 
+    /**
+     * Get current prartition generation tag.
+     *
+     * @param seg Segment.
+     * @param fullId Full page id.
+     * @return Current partition generation tag.
+     */
+    private int generationTag(Segment seg, FullPageId fullId) {
+        return seg.partGeneration(
+            fullId.groupId(),
+            PageIdUtils.partId(fullId.pageId())
+        );
+    }
+
+    /**
+     * Resolver relative pointer via {@link LoadedPagesMap}.
+     *
+     * @param seg Segment.
+     * @param fullId Full page id.
+     * @param reqVer Required version.
+     * @return Relative pointer.
+     */
+    private long resolveRelativePointer(Segment seg, FullPageId fullId, int reqVer) {
+        return seg.loadedPages.get(
+            fullId.groupId(),
+            PageIdUtils.effectivePageId(fullId.pageId()),
+            reqVer,
+            INVALID_REL_PTR,
+            OUTDATED_REL_PTR
+        );
+    }
+
     /** {@inheritDoc} */
     @Override public int invalidate(int grpId, int partId) {
         int tag = 0;
@@ -1408,6 +1443,30 @@
     }
 
     /**
+     * @param fullPageId Full page ID to check.
+     * @return {@code true} if the page is contained in the loaded pages table, {@code false} otherwise.
+     */
+    public boolean hasLoadedPage(FullPageId fullPageId) {
+        int grpId = fullPageId.groupId();
+        long pageId = PageIdUtils.effectivePageId(fullPageId.pageId());
+        int partId = PageIdUtils.partId(pageId);
+
+        Segment seg = segment(grpId, pageId);
+
+        seg.readLock().lock();
+
+        try {
+            long res =
+                seg.loadedPages.get(grpId, pageId, seg.partGeneration(grpId, partId), INVALID_REL_PTR, INVALID_REL_PTR);
+
+            return res != INVALID_REL_PTR;
+        }
+        finally {
+            seg.readLock().unlock();
+        }
+    }
+
+    /**
      * @param absPtr Absolute pointer to read lock.
      * @param fullId Full page ID.
      * @param force Force flag.
@@ -2095,7 +2154,7 @@
          * @return {@code True} if it is ok to replace this page, {@code false} if another page should be selected.
          * @throws IgniteCheckedException If failed to write page to the underlying store during eviction.
          */
-        private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+        private boolean preparePageRemoval(FullPageId fullPageId, long absPtr, PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
             assert writeLock().isHeldByCurrentThread();
 
             // Do not evict cache meta pages.
@@ -2188,7 +2247,7 @@
          * @throws IgniteCheckedException If failed to evict page.
          * @param saveDirtyPage Replaced page writer, implementation to save dirty page to persistent storage.
          */
-        private long removePageForReplacement(ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+        private long removePageForReplacement(PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
             assert getWriteHoldCount() > 0;
 
             if (!pageReplacementWarned) {
@@ -2360,7 +2419,7 @@
          * @param cap Capacity.
          * @param saveDirtyPage Evicted page writer.
          */
-        private long tryToFindSequentially(int cap, ReplacedPageWriter saveDirtyPage) throws IgniteCheckedException {
+        private long tryToFindSequentially(int cap, PageStoreWriter saveDirtyPage) throws IgniteCheckedException {
             assert getWriteHoldCount() > 0;
 
             long prevAddr = INVALID_REL_PTR;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
deleted file mode 100644
index 30f9633..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/ReplacedPageWriter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.ignite.internal.processors.cache.persistence.pagemem;
-
-import java.nio.ByteBuffer;
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.internal.pagemem.FullPageId;
-
-/**
- * Flush (write) dirty page implementation for freed page during page replacement. When possible, will be called by
- * removePageForReplacement().
- */
-public interface ReplacedPageWriter {
-    /**
-     * @param fullPageId Full page ID being evicted.
-     * @param byteBuf Buffer with page data.
-     * @param tag partition update tag, increasing counter.
-     * @throws IgniteCheckedException if page write failed.
-     */
-    void writePage(FullPageId fullPageId, ByteBuffer byteBuf, int tag) throws IgniteCheckedException;
-}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
index ee61e25..8341e1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/tree/io/PageIO.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.spi.encryption.EncryptionSpi;
 import org.apache.ignite.internal.pagemem.PageIdUtils;
 import org.apache.ignite.internal.pagemem.PageMemory;
 import org.apache.ignite.internal.pagemem.PageUtils;
@@ -36,17 +35,18 @@
 import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageLockListener;
 import org.apache.ignite.internal.processors.cache.tree.CacheIdAwareDataInnerIO;
 import org.apache.ignite.internal.processors.cache.tree.CacheIdAwareDataLeafIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataInnerIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
 import org.apache.ignite.internal.processors.cache.tree.CacheIdAwarePendingEntryInnerIO;
 import org.apache.ignite.internal.processors.cache.tree.CacheIdAwarePendingEntryLeafIO;
 import org.apache.ignite.internal.processors.cache.tree.DataInnerIO;
 import org.apache.ignite.internal.processors.cache.tree.DataLeafIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
-import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntryInnerIO;
 import org.apache.ignite.internal.processors.cache.tree.PendingEntryLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataInnerIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
+import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
 import org.apache.ignite.internal.util.GridStringBuilder;
+import org.apache.ignite.spi.encryption.EncryptionSpi;
 
 /**
  * Base format for all the page types.
@@ -717,17 +717,22 @@
     /**
      * @param addr Address.
      */
-    public static String printPage(long addr, int pageSize) throws IgniteCheckedException {
-        PageIO io = getPageIO(addr);
-
+    public static String printPage(long addr, int pageSize) {
         GridStringBuilder sb = new GridStringBuilder("Header [\n\ttype=");
 
-        sb.a(getType(addr)).a(" (").a(io.getClass().getSimpleName())
-            .a("),\n\tver=").a(getVersion(addr)).a(",\n\tcrc=").a(getCrc(addr))
-            .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
-            .a("\n],\n");
+        try {
+            PageIO io = getPageIO(addr);
 
-        io.printPage(addr, pageSize, sb);
+            sb.a(getType(addr)).a(" (").a(io.getClass().getSimpleName())
+                .a("),\n\tver=").a(getVersion(addr)).a(",\n\tcrc=").a(getCrc(addr))
+                .a(",\n\t").a(PageIdUtils.toDetailString(getPageId(addr)))
+                .a("\n],\n");
+
+            io.printPage(addr, pageSize, sb);
+        }
+        catch (IgniteCheckedException e) {
+            sb.a("Failed to print page: ").a(e.getMessage());
+        }
 
         return sb.toString();
     }
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
index 15205e0..4af7298 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/IgnitePdsRecoveryAfterFileCorruptionTest.java
@@ -21,6 +21,7 @@
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 import org.apache.ignite.IgniteCache;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.cache.CacheRebalanceMode;
@@ -322,44 +323,43 @@
         info("Acquired pages for checkpoint: " + pageIds.size());
 
         try {
-            ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
-
-            tmpBuf.order(ByteOrder.nativeOrder());
-
             long begin = System.currentTimeMillis();
 
             long cp = 0;
 
-            long write = 0;
+            AtomicLong write = new AtomicLong();
+
+            PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+                int groupId = fullPageId.groupId();
+                long pageId = fullPageId.pageId();
+
+                for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(groupId); j += 4)
+                    assertEquals(j + (int)pageId, buf.getInt(j));
+
+                buf.rewind();
+
+                long writeStart = System.nanoTime();
+
+                storeMgr.write(cacheId, pageId, buf, tag);
+
+                long writeEnd = System.nanoTime();
+
+                write.getAndAdd(writeEnd - writeStart);
+            };
+
+            ByteBuffer tmpBuf = ByteBuffer.allocate(mem.pageSize());
+
+            tmpBuf.order(ByteOrder.nativeOrder());
 
             for (FullPageId fullId : pages) {
                 if (pageIds.contains(fullId)) {
                     long cpStart = System.nanoTime();
 
-                    Integer tag = mem.getForCheckpoint(fullId, tmpBuf, null);
-
-                    if (tag == null)
-                        continue;
+                    mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null);
 
                     long cpEnd = System.nanoTime();
 
                     cp += cpEnd - cpStart;
-                    tmpBuf.rewind();
-
-                    for (int j = PageIO.COMMON_HEADER_END; j < mem.realPageSize(fullId.groupId()); j += 4)
-                        assertEquals(j + (int)fullId.pageId(), tmpBuf.getInt(j));
-
-                    tmpBuf.rewind();
-
-                    long writeStart = System.nanoTime();
-
-                    storeMgr.write(cacheId, fullId.pageId(), tmpBuf, tag);
-
-                    long writeEnd = System.nanoTime();
-
-                    write += writeEnd - writeStart;
-
-                    tmpBuf.rewind();
                 }
             }
 
@@ -370,7 +370,7 @@
             long end = System.currentTimeMillis();
 
             info("Written pages in " + (end - begin) + "ms, copy took " + (cp / 1_000_000) + "ms, " +
-                "write took " + (write / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
+                "write took " + (write.get() / 1_000_000) + "ms, sync took " + (end - syncStart) + "ms");
         }
         finally {
             info("Finishing checkpoint...");
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
index 620814f..e72c5ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/db/file/IgnitePdsCheckpointSimulationWithRealCpDisabledTest.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import org.apache.ignite.IgniteCheckedException;
@@ -65,6 +66,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
 import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
@@ -595,7 +597,8 @@
 
                     buf.rewind();
 
-                    mem.getForCheckpoint(fullId, buf, null);
+                    mem.checkpointWritePage(fullId, buf, (fullPageId, buffer, tag) -> {
+                    }, null);
 
                     buf.position(PageIO.COMMON_HEADER_END);
 
@@ -906,8 +909,16 @@
 
                     Integer tag;
 
+                    AtomicReference<Integer> tag0 = new AtomicReference<>();
+
+                    PageStoreWriter pageStoreWriter = (fullPageId, buf, tagx) -> {
+                        tag0.set(tagx);
+                    };
+
                     while (true) {
-                        tag = mem.getForCheckpoint(fullId, tmpBuf, null);
+                        mem.checkpointWritePage(fullId, tmpBuf, pageStoreWriter, null);
+
+                        tag = tag0.get();
 
                         if (tag != null && tag == PageMemoryImpl.TRY_AGAIN_TAG)
                             continue;
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
index 7b35499..4a8c7b1 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/IgnitePageMemReplaceDelayedWriteUnitTest.java
@@ -41,6 +41,7 @@
 import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
 import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
 import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
 import org.apache.ignite.internal.util.typedef.internal.U;
@@ -89,7 +90,7 @@
 
         AtomicInteger totalEvicted = new AtomicInteger();
 
-        ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
+        PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
             log.info("Evicting " + fullPageId);
 
             assert getLockedPages(fullPageId).contains(fullPageId);
@@ -146,7 +147,7 @@
 
         AtomicInteger totalEvicted = new AtomicInteger();
 
-        ReplacedPageWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
+        PageStoreWriter pageWriter = (FullPageId fullPageId, ByteBuffer byteBuf, int tag) -> {
             log.info("Evicting " + fullPageId);
 
             assert getSegment(fullPageId).writeLock().isHeldByCurrentThread();
@@ -211,7 +212,7 @@
      * @return implementation for test
      */
     @NotNull
-    private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, ReplacedPageWriter pageWriter, int pageSize) {
+    private PageMemoryImpl createPageMemory(IgniteConfiguration cfg, PageStoreWriter pageWriter, int pageSize) {
         IgniteCacheDatabaseSharedManager db = mock(GridCacheDatabaseSharedManager.class);
 
         when(db.checkpointLockIsHeldByThread()).thenReturn(true);
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
index cfd9543..c0be692 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/pagemem/PageMemoryImplTest.java
@@ -20,7 +20,9 @@
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -31,30 +33,38 @@
 import org.apache.ignite.failure.NoOpFailureHandler;
 import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
 import org.apache.ignite.internal.managers.encryption.GridEncryptionManager;
+import org.apache.ignite.internal.managers.eventstorage.GridEventStorageManager;
 import org.apache.ignite.internal.mem.DirectMemoryProvider;
 import org.apache.ignite.internal.mem.IgniteOutOfMemoryException;
 import org.apache.ignite.internal.mem.unsafe.UnsafeMemoryProvider;
 import org.apache.ignite.internal.pagemem.FullPageId;
-import org.apache.ignite.internal.pagemem.PageIdAllocator;
 import org.apache.ignite.internal.pagemem.PageUtils;
+import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
 import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointLockStateChecker;
 import org.apache.ignite.internal.processors.cache.persistence.CheckpointWriteProgressSupplier;
 import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
+import org.apache.ignite.internal.processors.cache.persistence.DummyPageIO;
 import org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
+import org.apache.ignite.internal.processors.cache.persistence.PageStoreWriter;
 import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
 import org.apache.ignite.internal.processors.failure.FailureProcessor;
 import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
 import org.apache.ignite.internal.processors.subscription.GridInternalSubscriptionProcessor;
+import org.apache.ignite.internal.util.GridMultiCollectionWrapper;
 import org.apache.ignite.internal.util.lang.GridInClosure3X;
 import org.apache.ignite.plugin.PluginProvider;
 import org.apache.ignite.spi.encryption.noop.NoopEncryptionSpi;
+import org.apache.ignite.spi.eventstorage.NoopEventStorageSpi;
 import org.apache.ignite.testframework.GridTestUtils;
 import org.apache.ignite.testframework.junits.GridTestKernalContext;
 import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
 import org.apache.ignite.testframework.junits.logger.GridTestLog4jLogger;
+import org.junit.Test;
 import org.mockito.Mockito;
 
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX;
+import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION;
 import static org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryImpl.CHECKPOINT_POOL_OVERFLOW_ERROR_MSG;
 
 /**
@@ -73,12 +83,13 @@
     /**
      * @throws Exception if failed.
      */
+    @Test
     public void testThatAllocationTooMuchPagesCauseToOOMException() throws Exception {
         PageMemoryImpl memory = createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
 
         try {
             while (!Thread.currentThread().isInterrupted())
-                memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+                memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
         }
         catch (IgniteOutOfMemoryException ignore) {
             //Success
@@ -90,6 +101,7 @@
     /**
      * @throws Exception If failed.
      */
+    @Test
     public void testCheckpointBufferOverusageDontCauseWriteLockLeak() throws Exception {
         PageMemoryImpl memory = createPageMemory(PageMemoryImpl.ThrottlingPolicy.DISABLED);
 
@@ -97,7 +109,7 @@
 
         try {
             while (!Thread.currentThread().isInterrupted()) {
-                long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+                long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
 
                 FullPageId fullPageId = new FullPageId(pageId, 1);
 
@@ -143,6 +155,7 @@
      * Tests that checkpoint buffer won't be overflowed with enabled CHECKPOINT_BUFFER_ONLY throttling.
      * @throws Exception If failed.
      */
+    @Test
     public void testCheckpointBufferCantOverflowMixedLoad() throws Exception {
         testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.CHECKPOINT_BUFFER_ONLY);
     }
@@ -151,6 +164,7 @@
      * Tests that checkpoint buffer won't be overflowed with enabled SPEED_BASED throttling.
      * @throws Exception If failed.
      */
+    @Test
     public void testCheckpointBufferCantOverflowMixedLoadSpeedBased() throws Exception {
         testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.SPEED_BASED);
     }
@@ -159,11 +173,176 @@
      * Tests that checkpoint buffer won't be overflowed with enabled TARGET_RATIO_BASED throttling.
      * @throws Exception If failed.
      */
+    @Test
     public void testCheckpointBufferCantOverflowMixedLoadRatioBased() throws Exception {
         testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED);
     }
 
     /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testCheckpointProtocolWriteDirtyPageAfterWriteUnlock() throws Exception {
+        TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+        // Create a 1 mb page memory.
+        PageMemoryImpl memory = createPageMemory(
+            1,
+            PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
+            pageStoreMgr,
+            pageStoreMgr
+        );
+
+        int initPageCnt = 10;
+
+        List<FullPageId> allocated = new ArrayList<>(initPageCnt);
+
+        for (int i = 0; i < initPageCnt; i++) {
+            long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+            FullPageId fullId = new FullPageId(id, 1);
+
+            allocated.add(fullId);
+
+            writePage(memory, fullId, (byte)1);
+        }
+
+        doCheckpoint(memory.beginCheckpoint(), memory, pageStoreMgr);
+
+        FullPageId cowPageId = allocated.get(0);
+
+        // Mark some pages as dirty.
+        writePage(memory, cowPageId, (byte)2);
+
+        GridMultiCollectionWrapper<FullPageId> cpPages = memory.beginCheckpoint();
+
+        assertEquals(1, cpPages.size());
+
+        // At this point COW mechanics kicks in.
+        writePage(memory, cowPageId, (byte)3);
+
+        doCheckpoint(cpPages, memory, pageStoreMgr);
+
+        byte[] data = pageStoreMgr.storedPages.get(cowPageId);
+
+        for (int i = PageIO.COMMON_HEADER_END; i < PAGE_SIZE; i++)
+            assertEquals(2, data[i]);
+    }
+
+    /**
+     * @param cpPages Checkpoint pages acuiqred by {@code beginCheckpoint()}.
+     * @param memory Page memory.
+     * @param pageStoreMgr Test page store manager.
+     * @throws Exception If failed.
+     */
+    private void doCheckpoint(
+        GridMultiCollectionWrapper<FullPageId> cpPages,
+        PageMemoryImpl memory,
+        TestPageStoreManager pageStoreMgr
+    ) throws Exception {
+        PageStoreWriter pageStoreWriter = (fullPageId, buf, tag) -> {
+            assertNotNull(tag);
+
+            pageStoreMgr.write(fullPageId.groupId(), fullPageId.pageId(), buf, 1);
+        };
+
+        for (FullPageId cpPage : cpPages) {
+            byte[] data = new byte[PAGE_SIZE];
+
+            ByteBuffer buf = ByteBuffer.wrap(data);
+
+            memory.checkpointWritePage(cpPage, buf, pageStoreWriter, null);
+        }
+
+        memory.finishCheckpoint();
+    }
+
+    /**
+     * @throws Exception if failed.
+     */
+    @Test
+    public void testCheckpointProtocolCannotReplaceUnwrittenPage() throws Exception {
+        TestPageStoreManager pageStoreMgr = new TestPageStoreManager();
+
+        // Create a 1 mb page memory.
+        PageMemoryImpl memory = createPageMemory(
+            1,
+            PageMemoryImpl.ThrottlingPolicy.TARGET_RATIO_BASED,
+            pageStoreMgr,
+            pageStoreMgr);
+
+        int initPageCnt = 500;
+
+        List<FullPageId> allocated = new ArrayList<>(initPageCnt);
+
+        for (int i = 0; i < initPageCnt; i++) {
+            long id = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+
+            FullPageId fullId = new FullPageId(id, 1);
+            allocated.add(fullId);
+
+            writePage(memory, fullId, (byte)1);
+        }
+
+        // CP Write lock.
+        memory.beginCheckpoint();
+        // CP Write unlock.
+
+        byte[] buf = new byte[PAGE_SIZE];
+
+        memory.checkpointWritePage(allocated.get(0), ByteBuffer.wrap(buf),
+            (fullPageId, buf0, tag) -> {
+                assertNotNull(tag);
+
+                boolean oom = false;
+
+                try {
+                    // Try force page replacement.
+                    while (true) {
+                        memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
+                    }
+                }
+                catch (IgniteOutOfMemoryException ex) {
+                    oom = true;
+                }
+
+                assertTrue("Should oom before check replaced page.", oom);
+
+                assertTrue("Missing page: " + fullPageId, memory.hasLoadedPage(fullPageId));
+            }
+            , null);
+    }
+
+    /**
+     * @param mem Page memory.
+     * @param fullPageId Full page ID to write.
+     * @param val Value to write.
+     * @throws Exception If failed.
+     */
+    private void writePage(PageMemoryImpl mem, FullPageId fullPageId, byte val) throws Exception {
+        int grpId = fullPageId.groupId();
+        long pageId = fullPageId.pageId();
+        long page = mem.acquirePage(grpId, pageId);
+
+        try {
+            long ptr = mem.writeLock(grpId, pageId, page);
+
+            try {
+                new DummyPageIO().initNewPage(ptr, pageId, PAGE_SIZE);
+
+                for (int i = PageIO.COMMON_HEADER_END; i < mem.pageSize(); i++)
+                    PageUtils.putByte(ptr, i, val);
+            }
+            finally {
+                mem.writeUnlock(grpId, pageId, page, Boolean.FALSE, true);
+            }
+        }
+        finally {
+            mem.releasePage(grpId, pageId, page);
+        }
+    }
+
+    /**
      * @throws Exception If failed.
      */
     private void testCheckpointBufferCantOverflowWithThrottlingMixedLoad(PageMemoryImpl.ThrottlingPolicy plc) throws Exception {
@@ -172,7 +351,7 @@
         List<FullPageId> pages = new ArrayList<>();
 
         for (int i = 0; i < (MAX_SIZE - 10) * MB / PAGE_SIZE / 2; i++) {
-            long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+            long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
 
             FullPageId fullPageId = new FullPageId(pageId, 1);
 
@@ -186,12 +365,15 @@
         CheckpointMetricsTracker mockTracker = Mockito.mock(CheckpointMetricsTracker.class);
 
         for (FullPageId checkpointPage : pages)
-            memory.getForCheckpoint(checkpointPage, ByteBuffer.allocate(PAGE_SIZE), mockTracker);
+            memory.checkpointWritePage(checkpointPage, ByteBuffer.allocate(PAGE_SIZE),
+                (fullPageId, buffer, tag) -> {
+                    // No-op.
+                }, mockTracker);
 
         memory.finishCheckpoint();
 
         for (int i = (int)((MAX_SIZE - 10) * MB / PAGE_SIZE / 2); i < (MAX_SIZE - 20) * MB / PAGE_SIZE; i++) {
-            long pageId = memory.allocatePage(1, PageIdAllocator.INDEX_PARTITION, PageIdAllocator.FLAG_IDX);
+            long pageId = memory.allocatePage(1, INDEX_PARTITION, FLAG_IDX);
 
             FullPageId fullPageId = new FullPageId(pageId, 1);
 
@@ -261,11 +443,31 @@
      * @param throttlingPlc Throttling Policy.
      * @throws Exception If creating mock failed.
      */
-    private PageMemoryImpl createPageMemory(PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+    private PageMemoryImpl createPageMemory(
+        PageMemoryImpl.ThrottlingPolicy throttlingPlc) throws Exception {
+        return createPageMemory(
+            MAX_SIZE,
+            throttlingPlc,
+            new NoOpPageStoreManager(),
+            (fullPageId, byteBuf, tag) -> {
+                assert false : "No page replacement (rotation with disk) should happen during the test";
+            });
+    }
+
+    /**
+     * @param throttlingPlc Throttling Policy.
+     * @throws Exception If creating mock failed.
+     */
+    private PageMemoryImpl createPageMemory(
+        int maxSize,
+        PageMemoryImpl.ThrottlingPolicy throttlingPlc,
+        IgnitePageStoreManager mgr,
+        PageStoreWriter replaceWriter
+    ) throws Exception {
         long[] sizes = new long[5];
 
         for (int i = 0; i < sizes.length; i++)
-            sizes[i] = MAX_SIZE * MB / 4;
+            sizes[i] = maxSize * MB / 4;
 
         sizes[4] = 5 * MB;
 
@@ -275,12 +477,14 @@
         igniteCfg.setDataStorageConfiguration(new DataStorageConfiguration());
         igniteCfg.setFailureHandler(new NoOpFailureHandler());
         igniteCfg.setEncryptionSpi(new NoopEncryptionSpi());
+        igniteCfg.setEventStorageSpi(new NoopEventStorageSpi());
 
         GridTestKernalContext kernalCtx = new GridTestKernalContext(new GridTestLog4jLogger(), igniteCfg);
 
         kernalCtx.add(new IgnitePluginProcessor(kernalCtx, igniteCfg, Collections.<PluginProvider>emptyList()));
         kernalCtx.add(new GridInternalSubscriptionProcessor(kernalCtx));
         kernalCtx.add(new GridEncryptionManager(kernalCtx));
+        kernalCtx.add(new GridEventStorageManager(kernalCtx));
 
         FailureProcessor failureProc = new FailureProcessor(kernalCtx);
 
@@ -293,7 +497,7 @@
             null,
             null,
             null,
-            new NoOpPageStoreManager(),
+            mgr,
             new NoOpWALManager(),
             null,
             new IgniteCacheDatabaseSharedManager(),
@@ -321,17 +525,15 @@
             sizes,
             sharedCtx,
             PAGE_SIZE,
-            (fullPageId, byteBuf, tag) -> {
-                assert false : "No page replacement (rotation with disk) should happen during the test";
-            },
+            replaceWriter,
             new GridInClosure3X<Long, FullPageId, PageMemoryEx>() {
                 @Override public void applyx(Long page, FullPageId fullId, PageMemoryEx pageMem) {
                 }
             }, new CheckpointLockStateChecker() {
-                @Override public boolean checkpointLockIsHeldByThread() {
-                    return true;
-                }
-            },
+            @Override public boolean checkpointLockIsHeldByThread() {
+                return true;
+            }
+        },
             new DataRegionMetricsImpl(igniteCfg.getDataStorageConfiguration().getDefaultDataRegionConfiguration()),
             throttlingPlc,
             noThrottle
@@ -341,4 +543,42 @@
 
         return mem;
     }
+
+    /**
+     *
+     */
+    private static class TestPageStoreManager extends NoOpPageStoreManager implements PageStoreWriter {
+        /** */
+        private Map<FullPageId, byte[]> storedPages = new HashMap<>();
+
+        /** {@inheritDoc} */
+        @Override public void read(int grpId, long pageId, ByteBuffer pageBuf) throws IgniteCheckedException {
+            FullPageId fullPageId = new FullPageId(pageId, grpId);
+
+            byte[] bytes = storedPages.get(fullPageId);
+
+            if (bytes != null)
+                pageBuf.put(bytes);
+            else
+                pageBuf.put(new byte[PAGE_SIZE]);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void write(int grpId, long pageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+            byte[] data = new byte[PAGE_SIZE];
+
+            pageBuf.get(data);
+
+            storedPages.put(new FullPageId(pageId, grpId), data);
+        }
+
+        /** {@inheritDoc} */
+        @Override public void writePage(FullPageId fullPageId, ByteBuffer pageBuf, int tag) throws IgniteCheckedException {
+            byte[] data = new byte[PAGE_SIZE];
+
+            pageBuf.get(data);
+
+            storedPages.put(fullPageId, data);
+        }
+    }
 }