IGNITE-16392 PageMemory data regions configuration; porting of PageMemoryNoStoreImpl and all other necessary components. (#591)

diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java b/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java
index e47b02f..e0639e0 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/CoreDistributedConfigurationModule.java
@@ -22,6 +22,9 @@
 import org.apache.ignite.configuration.annotation.ConfigurationType;
 import org.apache.ignite.configuration.schemas.runner.ClusterConfiguration;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -54,7 +57,10 @@
         return List.of(
                 HashIndexConfigurationSchema.class,
                 SortedIndexConfigurationSchema.class,
-                PartialIndexConfigurationSchema.class
+                PartialIndexConfigurationSchema.class,
+                RocksDbDataRegionConfigurationSchema.class,
+                PageMemoryDataRegionConfigurationSchema.class,
+                UnsafeMemoryAllocatorConfigurationSchema.class
         );
     }
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
index f32381f..fe60b92 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataRegionConfigurationSchema.java
@@ -17,48 +17,24 @@
 
 package org.apache.ignite.configuration.schemas.store;
 
-import org.apache.ignite.configuration.annotation.Config;
-import org.apache.ignite.configuration.annotation.Value;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+
+import org.apache.ignite.configuration.annotation.InjectedName;
+import org.apache.ignite.configuration.annotation.PolymorphicConfig;
+import org.apache.ignite.configuration.annotation.PolymorphicId;
 import org.apache.ignite.configuration.validation.Immutable;
-import org.apache.ignite.configuration.validation.Min;
-import org.apache.ignite.configuration.validation.OneOf;
 
 /**
- * Configuration schema for data region. Currently it represents configuration for rocksdb storage engine only.
+ * Configuration schema for data region.
  */
-@Config
+@PolymorphicConfig
 public class DataRegionConfigurationSchema {
-    /** Type of the RocksDB data region. */
-    public static final String ROCKSDB_DATA_REGION_TYPE = "rocksdb";
-
-    /** Cache type for the RocksDB LRU cache. */
-    public static final String ROCKSDB_LRU_CACHE = "lru";
-
-    /** Cache type for the RocksDB LRU cache. */
-    public static final String ROCKSDB_CLOCK_CACHE = "clock";
-
     /** Type for the future polymorphic configuration schemas. */
     @Immutable
-    @OneOf(ROCKSDB_DATA_REGION_TYPE)
-    @Value(hasDefault = true)
+    @PolymorphicId(hasDefault = true)
     public String type = ROCKSDB_DATA_REGION_TYPE;
 
-    /** Size of the rocksdb offheap cache. */
-    @Value(hasDefault = true)
-    public long size = 256 * 1024 * 1024;
-
-    /** Size of rocksdb write buffer. */
-    @Value(hasDefault = true)
-    @Min(1)
-    public long writeBufferSize = 64 * 1024 * 1024;
-
-    /** Cache type - only {@code LRU} is supported at the moment. {@code Clock} implementation has known bugs. */
-    @OneOf({ROCKSDB_LRU_CACHE})
-    @Value(hasDefault = true)
-    public String cache = ROCKSDB_LRU_CACHE;
-
-    /** The cache is sharded to 2^numShardBits shards, by hash of the key. */
-    @Min(-1)
-    @Value(hasDefault = true)
-    public int numShardBits = -1;
+    /** Name of the data region. */
+    @InjectedName
+    public String name;
 }
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
index aff2f1e..a10858a 100644
--- a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/DataStorageConfigurationSchema.java
@@ -20,6 +20,7 @@
 import org.apache.ignite.configuration.annotation.ConfigValue;
 import org.apache.ignite.configuration.annotation.ConfigurationRoot;
 import org.apache.ignite.configuration.annotation.ConfigurationType;
+import org.apache.ignite.configuration.annotation.Name;
 import org.apache.ignite.configuration.annotation.NamedConfigValue;
 import org.apache.ignite.configuration.validation.ExceptKeys;
 
@@ -33,6 +34,7 @@
 
     /** Default data region. */
     @ConfigValue
+    @Name(DEFAULT_DATA_REGION_NAME)
     public DataRegionConfigurationSchema defaultRegion;
 
     /** Other data regions. */
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/MemoryAllocatorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/MemoryAllocatorConfigurationSchema.java
new file mode 100644
index 0000000..6ca6a73
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/MemoryAllocatorConfigurationSchema.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema.UNSAFE_MEMORY_ALLOCATOR_TYPE;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfig;
+import org.apache.ignite.configuration.annotation.PolymorphicId;
+
+/**
+ * Configuration schema for memory allocation strategies.
+ */
+@PolymorphicConfig
+public class MemoryAllocatorConfigurationSchema {
+    @PolymorphicId(hasDefault = true)
+    public String type = UNSAFE_MEMORY_ALLOCATOR_TYPE;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/PageMemoryDataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/PageMemoryDataRegionConfigurationSchema.java
new file mode 100644
index 0000000..eb58218
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/PageMemoryDataRegionConfigurationSchema.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema.PAGE_MEMORY_DATA_REGION_TYPE;
+
+import org.apache.ignite.configuration.annotation.ConfigValue;
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Immutable;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Data region configuration for Page Memory storage engine.
+ */
+@PolymorphicConfigInstance(PAGE_MEMORY_DATA_REGION_TYPE)
+public class PageMemoryDataRegionConfigurationSchema extends DataRegionConfigurationSchema {
+    /** Type of the Page Memory data region. */
+    public static final String PAGE_MEMORY_DATA_REGION_TYPE = "pagemem";
+
+    /** Default initial size. */
+    public static final long DFLT_DATA_REGION_INITIAL_SIZE = 256 * 1024 * 1024;
+
+    /** Default max size. */
+    public static final long DFLT_DATA_REGION_MAX_SIZE = 256 * 1024 * 1024;
+
+    /** Eviction is disabled. */
+    public static final String DISABLED_EVICTION_MODE = "DISABLED";
+
+    /** Random-LRU algorithm. */
+    public static final String RANDOM_LRU_EVICTION_MODE = "RANDOM_LRU";
+
+    /** Random-2-LRU algorithm: scan-resistant version of Random-LRU. */
+    public static final String RANDOM_2_LRU_EVICTION_MODE = "RANDOM_2_LRU";
+
+    /** Random-LRU algorithm. */
+    public static final String RANDOM_LRU_REPLACEMENT_MODE = "RANDOM_LRU";
+
+    /** Segmented-LRU algorithm. */
+    public static final String SEGMENTED_LRU_REPLACEMENT_MODE = "SEGMENTED_LRU";
+
+    /** CLOCK algorithm. */
+    public static final String CLOCK_REPLACEMENT_MODE = "CLOCK";
+
+    @Immutable
+    @Value(hasDefault = true)
+    public int pageSize = 16 * 1024;
+
+    @Value(hasDefault = true)
+    public boolean persistent = false;
+
+    @Value(hasDefault = true)
+    public long initSize = DFLT_DATA_REGION_INITIAL_SIZE;
+
+    @Value(hasDefault = true)
+    public long maxSize = DFLT_DATA_REGION_MAX_SIZE;
+
+    @ConfigValue
+    public MemoryAllocatorConfigurationSchema memoryAllocator;
+
+    @OneOf({DISABLED_EVICTION_MODE, RANDOM_LRU_EVICTION_MODE, RANDOM_2_LRU_EVICTION_MODE})
+    @Value(hasDefault = true)
+    public String evictionMode = DISABLED_EVICTION_MODE;
+
+    @OneOf({RANDOM_LRU_REPLACEMENT_MODE, SEGMENTED_LRU_REPLACEMENT_MODE, CLOCK_REPLACEMENT_MODE})
+    @Value(hasDefault = true)
+    public String replacementMode = CLOCK_REPLACEMENT_MODE;
+
+    @Value(hasDefault = true)
+    public double evictionThreshold = 0.9;
+
+    @Value(hasDefault = true)
+    public int emptyPagesPoolSize = 100;
+
+    @Value(hasDefault = true)
+    public long checkpointPageBufSize = 0;
+
+    @Value(hasDefault = true)
+    public boolean lazyMemoryAllocation = true;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/RocksDbDataRegionConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/RocksDbDataRegionConfigurationSchema.java
new file mode 100644
index 0000000..247e935
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/RocksDbDataRegionConfigurationSchema.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+import org.apache.ignite.configuration.annotation.Value;
+import org.apache.ignite.configuration.validation.Min;
+import org.apache.ignite.configuration.validation.OneOf;
+
+/**
+ * Data region configuration for rocksdb storage engine.
+ */
+@PolymorphicConfigInstance(ROCKSDB_DATA_REGION_TYPE)
+public class RocksDbDataRegionConfigurationSchema extends DataRegionConfigurationSchema {
+    /** Type of the RocksDB data region. */
+    public static final String ROCKSDB_DATA_REGION_TYPE = "rocksdb";
+
+    /** Cache type for the RocksDB LRU cache. */
+    public static final String ROCKSDB_LRU_CACHE = "lru";
+
+    /** Cache type for the RocksDB LRU cache. */
+    public static final String ROCKSDB_CLOCK_CACHE = "clock";
+
+    /** Size of the rocksdb offheap cache. */
+    @Value(hasDefault = true)
+    public long size = 256 * 1024 * 1024;
+
+    /** Size of rocksdb write buffer. */
+    @Value(hasDefault = true)
+    @Min(1)
+    public long writeBufferSize = 64 * 1024 * 1024;
+
+    /** Cache type - only {@code LRU} is supported at the moment. {@code Clock} implementation has known bugs. */
+    @OneOf(ROCKSDB_LRU_CACHE)
+    @Value(hasDefault = true)
+    public String cache = ROCKSDB_LRU_CACHE;
+
+    /** The cache is sharded to 2^numShardBits shards, by hash of the key. */
+    @Min(-1)
+    @Value(hasDefault = true)
+    public int numShardBits = -1;
+}
diff --git a/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/UnsafeMemoryAllocatorConfigurationSchema.java b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/UnsafeMemoryAllocatorConfigurationSchema.java
new file mode 100644
index 0000000..d31b813
--- /dev/null
+++ b/modules/api/src/main/java/org/apache/ignite/configuration/schemas/store/UnsafeMemoryAllocatorConfigurationSchema.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.configuration.schemas.store;
+
+import static org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema.UNSAFE_MEMORY_ALLOCATOR_TYPE;
+
+import org.apache.ignite.configuration.annotation.PolymorphicConfigInstance;
+
+/**
+ * Memory allocator that allocates data in offheap using {@link sun.misc.Unsafe}.
+ */
+@PolymorphicConfigInstance(UNSAFE_MEMORY_ALLOCATOR_TYPE)
+public class UnsafeMemoryAllocatorConfigurationSchema extends MemoryAllocatorConfigurationSchema {
+    public static final String UNSAFE_MEMORY_ALLOCATOR_TYPE = "unsafe";
+}
diff --git a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationTestUtils.java b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationTestUtils.java
new file mode 100644
index 0000000..5f73467
--- /dev/null
+++ b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationTestUtils.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import org.apache.ignite.configuration.ConfigurationTree;
+
+/**
+ * Utility methods, related to testing configurations.
+ */
+public class ConfigurationTestUtils {
+    /**
+     * Casts dynamic configuration to its specific type. Can be applied to polymorphic configuration instances.
+     */
+    public static <C extends ConfigurationTree> C fixConfiguration(C cfg) {
+        return (C) ((DynamicConfiguration) cfg).specificConfigTree();
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 4aa50af..2d37a5c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -61,6 +61,12 @@
     /** Unsafe. */
     private static final Unsafe UNSAFE = unsafe();
 
+    /** Page size. */
+    private static final int PAGE_SIZE = UNSAFE.pageSize();
+
+    /** Empty page. */
+    private static final byte[] EMPTY_PAGE = new byte[PAGE_SIZE];
+
     /** Unaligned flag. */
     private static final boolean UNALIGNED = unaligned();
 
@@ -1258,6 +1264,24 @@
     }
 
     /**
+     * Fills memory with zeroes.
+     *
+     * @param addr Address.
+     * @param len Length.
+     */
+    public static void zeroMemory(long addr, long len) {
+        long off = 0;
+
+        for (; off + PAGE_SIZE <= len; off += PAGE_SIZE) {
+            GridUnsafe.copyMemory(EMPTY_PAGE, GridUnsafe.BYTE_ARR_OFF, null, addr + off, PAGE_SIZE);
+        }
+
+        if (len != off) {
+            GridUnsafe.copyMemory(EMPTY_PAGE, GridUnsafe.BYTE_ARR_OFF, null, addr + off, len - off);
+        }
+    }
+
+    /**
      * Copy memory between offheap locations.
      *
      * @param srcAddr Source address.
@@ -1499,7 +1523,7 @@
      * @return Page size.
      */
     public static int pageSize() {
-        return UNSAFE.pageSize();
+        return PAGE_SIZE;
     }
 
     /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 355c75b..3a9cd8c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -285,6 +285,27 @@
     }
 
     /**
+     * Returns size in human-readable format.
+     *
+     * @param bytes Number of bytes to display.
+     * @param si If {@code true}, then unit base is 1000, otherwise unit base is 1024.
+     * @return Formatted size.
+     */
+    public static String readableSize(long bytes, boolean si) {
+        int unit = si ? 1000 : 1024;
+
+        if (bytes < unit) {
+            return bytes + " B";
+        }
+
+        int exp = (int) (Math.log(bytes) / Math.log(unit));
+
+        String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
+
+        return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+    }
+
+    /**
      * Gets absolute value for integer. If integer is {@link Integer#MIN_VALUE}, then {@code 0} is returned.
      *
      * @param i Integer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
new file mode 100644
index 0000000..f04f470
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.ignite.lang.IgniteSystemProperties;
+
+/**
+ * Lock state structure is as follows.
+ * <pre>
+ *     +----------------+---------------+---------+----------+
+ *     | WRITE WAIT CNT | READ WAIT CNT |   TAG   | LOCK CNT |
+ *     +----------------+---------------+---------+----------+
+ *     |     2 bytes    |     2 bytes   | 2 bytes |  2 bytes |
+ *     +----------------+---------------+---------+----------+
+ * </pre>
+ */
+public class OffheapReadWriteLock {
+    /**
+     * Default empirical value for the spin count.
+     *
+     * @see #IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT
+     */
+    private static final int DFLT_OFFHEAP_RWLOCK_SPIN_COUNT = 32;
+
+    /** A number of spin-lock iterations to take before falling back to the blocking approach. */
+    private static final String IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT = "IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT";
+
+    /** Count of spins before the fallback to {@link ReentrantLock} and {@link Condition}. */
+    private static final int SPIN_CNT = IgniteSystemProperties.getInteger(IGNITE_OFFHEAP_RWLOCK_SPIN_COUNT, DFLT_OFFHEAP_RWLOCK_SPIN_COUNT);
+
+    /** Always lock tag. */
+    public static final int TAG_LOCK_ALWAYS = -1;
+
+    /** Lock size. */
+    public static final int LOCK_SIZE = 8;
+
+    /** Maximum number of waiting threads, read or write. */
+    public static final int MAX_WAITERS = 0xFFFF;
+
+    /** Striped locks array. */
+    private final ReentrantLock[] locks;
+
+    /** Conditions for reads. */
+    private final Condition[] readConditions;
+
+    /** Conditions for write. */
+    private final Condition[] writeConditions;
+
+    /** Mask to extract stripe index from the hash. */
+    private final int monitorsMask;
+
+    /**
+     * Constructor.
+     *
+     * @param concLvl Concurrency level, must be a power of two.
+     */
+    public OffheapReadWriteLock(int concLvl) {
+        if ((concLvl & concLvl - 1) != 0) {
+            throw new IllegalArgumentException("Concurrency level must be a power of 2: " + concLvl);
+        }
+
+        monitorsMask = concLvl - 1;
+
+        locks = new ReentrantLock[concLvl];
+        readConditions = new Condition[concLvl];
+        writeConditions = new Condition[concLvl];
+
+        for (int i = 0; i < locks.length; i++) {
+            ReentrantLock lock = new ReentrantLock();
+
+            locks[i] = lock;
+            readConditions[i] = lock.newCondition();
+            writeConditions[i] = lock.newCondition();
+        }
+    }
+
+    /**
+     * Initializes the lock.
+     *
+     * @param lock Lock pointer to initialize.
+     */
+    public void init(long lock, int tag) {
+        tag &= 0xFFFF;
+
+        assert tag != 0;
+
+        GridUnsafe.putLong(lock, (long) tag << 16);
+    }
+
+    /**
+     * Acquires a read lock.
+     *
+     * @param lock Lock address.
+     */
+    public boolean readLock(long lock, int tag) {
+        long state = GridUnsafe.getLongVolatile(null, lock);
+
+        assert state != 0;
+
+        // Check write waiters first.
+        int writeWaitCnt = writersWaitCount(state);
+
+        if (writeWaitCnt == 0) {
+            for (int i = 0; i < SPIN_CNT; i++) {
+                if (!checkTag(state, tag)) {
+                    return false;
+                }
+
+                if (canReadLock(state)) {
+                    if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, 1, 0, 0))) {
+                        return true;
+                    } else {
+                        // Retry CAS, do not count as spin cycle.
+                        i--;
+                    }
+                }
+
+                state = GridUnsafe.getLongVolatile(null, lock);
+            }
+        }
+
+        int idx = lockIndex(lock);
+
+        ReentrantLock lockObj = locks[idx];
+
+        lockObj.lock();
+
+        try {
+            updateReadersWaitCount(lock, lockObj, 1);
+
+            return waitAcquireReadLock(lock, idx, tag);
+        } finally {
+            lockObj.unlock();
+        }
+    }
+
+    /**
+     * Releases read lock.
+     *
+     * @param lock Lock address.
+     */
+    public void readUnlock(long lock) {
+        while (true) {
+            long state = GridUnsafe.getLongVolatile(null, lock);
+
+            if (lockCount(state) <= 0) {
+                throw new IllegalMonitorStateException("Attempted to release a read lock while not holding it "
+                        + "[lock=" + IgniteUtils.hexLong(lock) + ", state=" + IgniteUtils.hexLong(state) + ']');
+            }
+
+            long updated = updateState(state, -1, 0, 0);
+
+            assert updated != 0;
+
+            if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                // Notify monitor if we were CASed to zero and there is a write waiter.
+                if (lockCount(updated) == 0 && writersWaitCount(updated) > 0) {
+                    int idx = lockIndex(lock);
+
+                    ReentrantLock lockObj = locks[idx];
+
+                    lockObj.lock();
+
+                    try {
+                        // Note that we signal all waiters for this stripe. Since not all waiters in this
+                        // stripe/index belong to this particular lock, we can't wake up just one of them.
+                        writeConditions[idx].signalAll();
+                    } finally {
+                        lockObj.unlock();
+                    }
+                }
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Tries to acquire a write lock.
+     *
+     * @param lock Lock address.
+     */
+    public boolean tryWriteLock(long lock, int tag) {
+        long state = GridUnsafe.getLongVolatile(null, lock);
+
+        return checkTag(state, tag) && canWriteLock(state)
+                && GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 0));
+    }
+
+    /**
+     * Acquires a write lock.
+     *
+     * @param lock Lock address.
+     */
+    public boolean writeLock(long lock, int tag) {
+        assert tag != 0;
+
+        for (int i = 0; i < SPIN_CNT; i++) {
+            long state = GridUnsafe.getLongVolatile(null, lock);
+
+            assert state != 0;
+
+            if (!checkTag(state, tag)) {
+                return false;
+            }
+
+            if (canWriteLock(state)) {
+                if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 0))) {
+                    return true;
+                } else {
+                    // Retry CAS, do not count as spin cycle.
+                    i--;
+                }
+            }
+        }
+
+        int idx = lockIndex(lock);
+
+        ReentrantLock lockObj = locks[idx];
+
+        lockObj.lock();
+
+        try {
+            updateWritersWaitCount(lock, lockObj, 1);
+
+            return waitAcquireWriteLock(lock, idx, tag);
+        } finally {
+            lockObj.unlock();
+        }
+    }
+
+    /**
+     * Checks whether write lock is acquired.
+     *
+     * @param lock Lock to check.
+     * @return {@code True} if write lock is held by any thread for the given offheap RW lock.
+     */
+    public boolean isWriteLocked(long lock) {
+        return lockCount(GridUnsafe.getLongVolatile(null, lock)) == -1;
+    }
+
+    /**
+     * Checks whether read lock is acquired.
+     *
+     * @param lock Lock to check.
+     * @return {@code True} if at least one read lock is held by any thread for the given offheap RW lock.
+     */
+    public boolean isReadLocked(long lock) {
+        return lockCount(GridUnsafe.getLongVolatile(null, lock)) > 0;
+    }
+
+    /**
+     * Releases write lock.
+     *
+     * @param lock Lock address.
+     */
+    public void writeUnlock(long lock, int tag) {
+        long updated;
+
+        assert tag != 0;
+
+        while (true) {
+            long state = GridUnsafe.getLongVolatile(null, lock);
+
+            if (lockCount(state) != -1) {
+                throw new IllegalMonitorStateException("Attempted to release write lock while not holding it "
+                        + "[lock=" + IgniteUtils.hexLong(lock) + ", state=" + IgniteUtils.hexLong(state) + ']');
+            }
+
+            updated = releaseWithTag(state, tag);
+
+            assert updated != 0;
+
+            if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                break;
+            }
+        }
+
+        int writeWaitCnt = writersWaitCount(updated);
+        int readWaitCnt = readersWaitCount(updated);
+
+        if (writeWaitCnt > 0 || readWaitCnt > 0) {
+            int idx = lockIndex(lock);
+
+            ReentrantLock lockObj = locks[idx];
+
+            lockObj.lock();
+
+            try {
+                signalNextWaiter(writeWaitCnt, idx);
+            } finally {
+                lockObj.unlock();
+            }
+        }
+    }
+
+    /**
+     * Signals readers or writers depending on a counter value.
+     *
+     * @param writeWaitCnt Writers wait count.
+     * @param idx Lock index.
+     */
+    private void signalNextWaiter(int writeWaitCnt, int idx) {
+        // Note that we signal all waiters for this stripe. Since not all waiters in this stripe/index belong
+        // to this particular lock, we can't wake up just one of them.
+        if (writeWaitCnt == 0) {
+            Condition readCondition = readConditions[idx];
+
+            readCondition.signalAll();
+        } else {
+            Condition writeCond = writeConditions[idx];
+
+            writeCond.signalAll();
+        }
+    }
+
+    /**
+     * Upgrades a read lock to a write lock. If this thread is the only read-owner of the read lock,
+     * this method will atomically upgrade the read lock to the write lock. In this case {@code true}
+     * will be returned. If not, the read lock will be released and write lock will be acquired, leaving
+     * a potential gap for other threads to modify a protected resource. In this case this method will return
+     * {@code false}.
+     *
+     * <p>After this method has been called, there is no need to call to {@link #readUnlock(long)} because
+     * read lock will be released in any case.
+     *
+     * @param lock Lock to upgrade.
+     * @return {@code null} if tag validation failed, {@code true} if successfully traded the read lock to
+     *      the write lock without leaving a gap. Returns {@code false} otherwise, in this case the resource
+     *      state must be re-validated.
+     */
+    public Boolean upgradeToWriteLock(long lock, int tag) {
+        for (int i = 0; i < SPIN_CNT; i++) {
+            long state = GridUnsafe.getLongVolatile(null, lock);
+
+            if (!checkTag(state, tag)) {
+                return null;
+            }
+
+            if (lockCount(state) == 1) {
+                if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -2, 0, 0))) {
+                    return true;
+                } else {
+                    // Retry CAS, do not count as spin cycle.
+                    i--;
+                }
+            }
+        }
+
+        int idx = lockIndex(lock);
+
+        ReentrantLock lockObj = locks[idx];
+
+        lockObj.lock();
+
+        try {
+            // First, add write waiter.
+            while (true) {
+                long state = GridUnsafe.getLongVolatile(null, lock);
+
+                if (!checkTag(state, tag)) {
+                    return null;
+                }
+
+                if (lockCount(state) == 1) {
+                    if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -2, 0, 0))) {
+                        return true;
+                    } else {
+                        continue;
+                    }
+                }
+
+                // Remove read lock and add write waiter simultaneously.
+                if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, -1, 0, 1))) {
+                    break;
+                }
+            }
+
+            return waitAcquireWriteLock(lock, idx, tag);
+        } finally {
+            lockObj.unlock();
+        }
+    }
+
+    /**
+     * Acquires read lock in waiting loop.
+     *
+     * @param lock Lock address.
+     * @param lockIdx Lock index.
+     * @param tag Validation tag.
+     * @return {@code True} if lock was acquired, {@code false} if tag validation failed.
+     */
+    private boolean waitAcquireReadLock(long lock, int lockIdx, int tag) {
+        ReentrantLock lockObj = locks[lockIdx];
+        Condition waitCond = readConditions[lockIdx];
+
+        assert lockObj.isHeldByCurrentThread();
+
+        boolean interrupted = false;
+
+        try {
+            while (true) {
+                try {
+                    long state = GridUnsafe.getLongVolatile(null, lock);
+
+                    if (!checkTag(state, tag)) {
+                        // We cannot lock with this tag, release waiter.
+                        long updated = updateState(state, 0, -1, 0);
+
+                        if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                            int writeWaitCnt = writersWaitCount(updated);
+
+                            signalNextWaiter(writeWaitCnt, lockIdx);
+
+                            return false;
+                        }
+                    } else if (canReadLock(state)) {
+                        long updated = updateState(state, 1, -1, 0);
+
+                        if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                            return true;
+                        }
+                    } else {
+                        waitCond.await();
+                    }
+                } catch (InterruptedException ignore) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Acquires write lock in waiting loop.
+     *
+     * @param lock Lock address.
+     * @param lockIdx Lock index.
+     * @param tag Validation tag.
+     * @return {@code True} if lock was acquired, {@code false} if tag validation failed.
+     */
+    private boolean waitAcquireWriteLock(long lock, int lockIdx, int tag) {
+        ReentrantLock lockObj = locks[lockIdx];
+        Condition waitCond = writeConditions[lockIdx];
+
+        assert lockObj.isHeldByCurrentThread();
+
+        boolean interrupted = false;
+
+        try {
+            while (true) {
+                try {
+                    long state = GridUnsafe.getLongVolatile(null, lock);
+
+                    if (!checkTag(state, tag)) {
+                        // We cannot lock with this tag, release waiter.
+                        long updated = updateState(state, 0, 0, -1);
+
+                        if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                            int writeWaitCnt = writersWaitCount(updated);
+
+                            signalNextWaiter(writeWaitCnt, lockIdx);
+
+                            return false;
+                        }
+                    } else if (canWriteLock(state)) {
+                        long updated = updateState(state, -1, 0, -1);
+
+                        if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                            return true;
+                        }
+                    } else {
+                        waitCond.await();
+                    }
+                } catch (InterruptedException ignore) {
+                    interrupted = true;
+                }
+            }
+        } finally {
+            if (interrupted) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    /**
+     * Returns index of lock object corresponding to the stripe of this lock address.
+     *
+     * @param lock Lock address.
+     * @return Lock monitor object that corresponds to the stripe for this lock address.
+     */
+    private int lockIndex(long lock) {
+        return IgniteUtils.safeAbs(IgniteUtils.hash(lock)) & monitorsMask;
+    }
+
+    /**
+     * Checks that read lock can be acquired.
+     *
+     * @param state Lock state.
+     * @return {@code True} if write lock is not acquired.
+     */
+    private boolean canReadLock(long state) {
+        return lockCount(state) >= 0;
+    }
+
+    /**
+     * Checks that write lock can be acquired.
+     *
+     * @param state Lock state.
+     * @return {@code True} if no read locks are acquired.
+     */
+    private boolean canWriteLock(long state) {
+        return lockCount(state) == 0;
+    }
+
+    /**
+     * Checks that tag in the state matches the expected value.
+     *
+     * @param state State.
+     * @param tag Tag.
+     */
+    private boolean checkTag(long state, int tag) {
+        // If passed in tag is negative, lock regardless of the state.
+        return tag < 0 || tag(state) == tag;
+    }
+
+    /**
+     * Extracts lock count from the state.
+     *
+     * @param state State.
+     * @return Lock count.
+     */
+    private int lockCount(long state) {
+        return (short) (state & 0xFFFF);
+    }
+
+    /**
+     * Extracts tag value from the state.
+     *
+     * @param state Lock state.
+     * @return Lock tag.
+     */
+    private int tag(long state) {
+        return (int) ((state >>> 16) & 0xFFFF);
+    }
+
+    /**
+     * Extracts writers wait count from the state.
+     *
+     * @param state State.
+     * @return Writers wait count.
+     */
+    private int writersWaitCount(long state) {
+        return (int) ((state >>> 48) & 0xFFFF);
+    }
+
+    /**
+     * Extracts readers wait count from the state.
+     *
+     * @param state State.
+     * @return Readers wait count.
+     */
+    private int readersWaitCount(long state) {
+        return (int) ((state >>> 32) & 0xFFFF);
+    }
+
+    /**
+     * Updates lock state with deltas.
+     *
+     * @param state State to update.
+     * @param lockDelta Lock counter delta.
+     * @param readersWaitDelta Readers wait delta.
+     * @param writersWaitDelta Writers wait delta.
+     * @return Modified state.
+     */
+    private long updateState(long state, int lockDelta, int readersWaitDelta, int writersWaitDelta) {
+        int lock = lockCount(state);
+        int tag = tag(state);
+        int readersWait = readersWaitCount(state);
+        int writersWait = writersWaitCount(state);
+
+        lock += lockDelta;
+        readersWait += readersWaitDelta;
+        writersWait += writersWaitDelta;
+
+        if (readersWait > MAX_WAITERS) {
+            throw new IllegalStateException("Failed to add read waiter (too many waiting threads): " + MAX_WAITERS);
+        }
+
+        if (writersWait > MAX_WAITERS) {
+            throw new IllegalStateException("Failed to add write waiter (too many waiting threads): " + MAX_WAITERS);
+        }
+
+        assert readersWait >= 0 : readersWait;
+        assert writersWait >= 0 : writersWait;
+        assert lock >= -1;
+
+        return buildState(writersWait, readersWait, tag, lock);
+    }
+
+    /**
+     * Releases write lock and updates tag value.
+     *
+     * @param state State to update.
+     * @return Modified state.
+     */
+    private long releaseWithTag(long state, int newTag) {
+        int lock = lockCount(state);
+        int readersWait = readersWaitCount(state);
+        int writersWait = writersWaitCount(state);
+        int tag = newTag == TAG_LOCK_ALWAYS ? tag(state) : newTag & 0xFFFF;
+
+        lock += 1;
+
+        assert readersWait >= 0 : readersWait;
+        assert writersWait >= 0 : writersWait;
+        assert lock >= -1;
+
+        return buildState(writersWait, readersWait, tag, lock);
+    }
+
+    /**
+     * Creates state from counters.
+     *
+     * @param writersWait Writers wait count.
+     * @param readersWait Readers wait count.
+     * @param tag Tag.
+     * @param lock Lock count.
+     * @return State.
+     */
+    private long buildState(int writersWait, int readersWait, int tag, int lock) {
+        assert (tag & 0xFFFF0000) == 0;
+
+        return ((long) writersWait << 48) | ((long) readersWait << 32) | ((tag & 0x0000FFFFL) << 16) | (lock & 0xFFFFL);
+    }
+
+    /**
+     * Updates readers wait count.
+     *
+     * @param lock Lock to update.
+     * @param delta Delta to update.
+     */
+    private void updateReadersWaitCount(long lock, ReentrantLock lockObj, int delta) {
+        assert lockObj.isHeldByCurrentThread();
+
+        while (true) {
+            // Safe to do non-volatile read because of CAS below.
+            long state = GridUnsafe.getLongVolatile(null, lock);
+
+            long updated = updateState(state, 0, delta, 0);
+
+            if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                return;
+            }
+        }
+    }
+
+    /**
+     * Updates writers wait count.
+     *
+     * @param lock Lock to update.
+     * @param delta Delta to update.
+     */
+    private void updateWritersWaitCount(long lock, ReentrantLock lockObj, int delta) {
+        assert lockObj.isHeldByCurrentThread();
+
+        while (true) {
+            long state = GridUnsafe.getLongVolatile(null, lock);
+
+            long updated = updateState(state, 0, 0, delta);
+
+            if (GridUnsafe.compareAndSwapLong(null, lock, state, updated)) {
+                return;
+            }
+        }
+    }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index 0d58dcf..5cee67b 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -23,10 +23,15 @@
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
 import java.lang.reflect.Modifier;
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ThreadFactory;
 import java.util.function.BooleanSupplier;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -203,7 +208,7 @@
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static CompletableFuture runAsync(final Runnable task) {
+    public static CompletableFuture<?> runAsync(final Runnable task) {
         return runAsync(task, "async-runnable-runner");
     }
 
@@ -213,7 +218,7 @@
      * @param task Runnable.
      * @return Future with task result.
      */
-    public static CompletableFuture runAsync(final Runnable task, String threadName) {
+    public static CompletableFuture<?> runAsync(final Runnable task, String threadName) {
         return runAsync(() -> {
             task.run();
 
@@ -258,6 +263,96 @@
     }
 
     /**
+     * Runs callable tasks each in separate threads.
+     *
+     * @param calls Callable tasks.
+     * @param threadFactory Thread factory.
+     * @return Execution time in milliseconds.
+     * @throws Exception If failed.
+     */
+    public static long runMultiThreaded(Iterable<Callable<?>> calls, ThreadFactory threadFactory) throws Exception {
+        Collection<Thread> threads = new ArrayList<>();
+
+        Collection<CompletableFuture<?>> futures = new ArrayList<>();
+
+        for (Callable<?> task : calls) {
+            CompletableFuture<?> fut = new CompletableFuture<>();
+
+            futures.add(fut);
+
+            threads.add(threadFactory.newThread(() -> {
+                try {
+                    // Execute task.
+                    task.call();
+
+                    fut.complete(null);
+                } catch (Throwable e) {
+                    fut.completeExceptionally(e);
+                }
+            }));
+        }
+
+        long time = System.currentTimeMillis();
+
+        for (Thread t : threads) {
+            t.start();
+        }
+
+        // Wait threads finish their job.
+        try {
+            for (Thread t : threads) {
+                t.join();
+            }
+        } catch (InterruptedException e) {
+            for (Thread t : threads) {
+                t.interrupt();
+            }
+
+            throw e;
+        }
+
+        time = System.currentTimeMillis() - time;
+
+        for (CompletableFuture<?> fut : futures) {
+            fut.join();
+        }
+
+        return time;
+    }
+
+    /**
+     * Runs runnable object in specified number of threads.
+     *
+     * @param run Target runnable.
+     * @param threadNum Number of threads.
+     * @param threadName Thread name.
+     * @return Future for the run. Future returns execution time in milliseconds.
+     */
+    public static CompletableFuture<Long> runMultiThreadedAsync(Runnable run, int threadNum, String threadName) {
+        return runMultiThreadedAsync(() -> {
+            run.run();
+
+            return null;
+        }, threadNum, threadName);
+    }
+
+    /**
+     * Runs callable object in specified number of threads.
+     *
+     * @param call Callable.
+     * @param threadNum Number of threads.
+     * @param threadName Thread names.
+     * @return Future for the run. Future returns execution time in milliseconds.
+     */
+    public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?> call, int threadNum, final String threadName) {
+        List<Callable<?>> calls = Collections.<Callable<?>>nCopies(threadNum, call);
+
+        NamedThreadFactory threadFactory = new NamedThreadFactory(threadName);
+
+        return runAsync(() -> runMultiThreaded(calls, threadFactory));
+    }
+
+    /**
      * Waits for the condition.
      *
      * @param cond          Condition.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteOffheapReadWriteLockSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteOffheapReadWriteLockSelfTest.java
new file mode 100644
index 0000000..1828710
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/IgniteOffheapReadWriteLockSelfTest.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.junit.jupiter.api.Test;
+
+/** Tests basic invariants of {@link OffheapReadWriteLock}. */
+public class IgniteOffheapReadWriteLockSelfTest extends BaseIgniteAbstractTest {
+    /** Initial value for tag in tests. */
+    private static final int TAG_0 = 1;
+
+    /** Number of 1-second iterations in every test. */
+    private static final int ROUNDS_PER_TEST = 3;
+
+    /** Sleep interval for the test. */
+    private static final long SLEEP_TIME = 50L;
+
+    @Test
+    public void testConcurrentUpdatesSingleLock() throws Exception {
+        final int numPairs = 100;
+        final Pair[] data = new Pair[numPairs];
+
+        for (int i = 0; i < numPairs; i++) {
+            data[i] = new Pair();
+        }
+
+        final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+        final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE);
+
+        lock.init(ptr, TAG_0);
+
+        final AtomicInteger reads = new AtomicInteger();
+        final AtomicInteger writes = new AtomicInteger();
+        final AtomicBoolean done = new AtomicBoolean(false);
+
+        CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                while (!done.get()) {
+                    boolean write = rnd.nextInt(10) < 2;
+
+                    if (write) {
+                        boolean locked = lock.writeLock(ptr, TAG_0);
+
+                        try {
+                            // No tag change in this test.
+                            assert locked;
+
+                            assertTrue(lock.isWriteLocked(ptr));
+                            assertFalse(lock.isReadLocked(ptr));
+
+                            int idx = rnd.nextInt(numPairs);
+                            int delta = rnd.nextInt(100_000);
+
+                            data[idx].left += delta;
+                            data[idx].right -= delta;
+                        } finally {
+                            lock.writeUnlock(ptr, TAG_0);
+                        }
+
+                        writes.incrementAndGet();
+                    } else {
+                        boolean locked = lock.readLock(ptr, TAG_0);
+
+                        try {
+                            assert locked;
+
+                            assertFalse(lock.isWriteLocked(ptr));
+                            assertTrue(lock.isReadLocked(ptr));
+
+                            for (int i1 = 0; i1 < data.length; i1++) {
+                                Pair pair = data[i1];
+
+                                assertEquals(pair.left, -pair.right, "Failed check for index: " + i1);
+                            }
+                        } finally {
+                            lock.readUnlock(ptr);
+                        }
+
+                        reads.incrementAndGet();
+                    }
+                }
+            } catch (Throwable e) {
+                log.error(e.getMessage(), e);
+            }
+
+            return null;
+        }, 32, "tester");
+
+        for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+            Thread.sleep(SLEEP_TIME);
+
+            log.info("Reads: " + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0));
+        }
+
+        done.set(true);
+
+        fut.get();
+
+        validate(data);
+    }
+
+    @Test
+    public void testConcurrentUpdatesMultipleLocks() throws Exception {
+        final int numPairs = 100;
+        final Pair[] data = new Pair[numPairs];
+
+        final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+        final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE * numPairs);
+
+        for (int i = 0; i < numPairs; i++) {
+            data[i] = new Pair();
+
+            lock.init(ptr + i * OffheapReadWriteLock.LOCK_SIZE, TAG_0);
+        }
+
+        final AtomicInteger reads = new AtomicInteger();
+        final AtomicInteger writes = new AtomicInteger();
+        final AtomicBoolean done = new AtomicBoolean(false);
+
+        CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (!done.get()) {
+                boolean write = rnd.nextInt(10) < 2;
+                int idx = rnd.nextInt(numPairs);
+
+                long lockPtr = ptr + idx * OffheapReadWriteLock.LOCK_SIZE;
+
+                if (write) {
+                    lock.writeLock(lockPtr, TAG_0);
+
+                    try {
+                        assertTrue(lock.isWriteLocked(lockPtr));
+                        assertFalse(lock.isReadLocked(lockPtr));
+
+                        int delta = rnd.nextInt(100_000);
+
+                        data[idx].left += delta;
+                        data[idx].right -= delta;
+                    } finally {
+                        lock.writeUnlock(lockPtr, TAG_0);
+                    }
+
+                    writes.incrementAndGet();
+                } else {
+                    lock.readLock(lockPtr, TAG_0);
+
+                    try {
+                        assertFalse(lock.isWriteLocked(lockPtr));
+                        assertTrue(lock.isReadLocked(lockPtr));
+
+                        Pair pair = data[idx];
+
+                        assertEquals(pair.left, -pair.right, "Failed check for index: " + idx);
+                    } finally {
+                        lock.readUnlock(lockPtr);
+                    }
+
+                    reads.incrementAndGet();
+                }
+            }
+
+            return null;
+        }, 32, "tester");
+
+        for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+            Thread.sleep(SLEEP_TIME);
+
+            log.info("Reads: " + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0));
+        }
+
+        done.set(true);
+
+        fut.get();
+
+        validate(data);
+    }
+
+    @Test
+    public void testLockUpgradeMultipleLocks() throws Exception {
+        final int numPairs = 100;
+        final Pair[] data = new Pair[numPairs];
+
+        final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+        final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE * numPairs);
+
+        for (int i = 0; i < numPairs; i++) {
+            data[i] = new Pair();
+
+            lock.init(ptr + i * OffheapReadWriteLock.LOCK_SIZE, TAG_0);
+        }
+
+        final AtomicInteger reads = new AtomicInteger();
+        final AtomicInteger writes = new AtomicInteger();
+        final AtomicInteger successfulUpgrades = new AtomicInteger();
+        final AtomicBoolean done = new AtomicBoolean(false);
+
+        CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+            ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+            while (!done.get()) {
+                int idx = rnd.nextInt(numPairs);
+
+                long lockPtr = ptr + idx * OffheapReadWriteLock.LOCK_SIZE;
+
+                boolean locked = lock.readLock(lockPtr, TAG_0);
+
+                boolean write = false;
+
+                try {
+                    assert locked;
+
+                    Pair pair = data[idx];
+
+                    assertEquals(pair.left, -pair.right, "Failed check for index: " + idx);
+
+                    write = rnd.nextInt(10) < 2;
+
+                    if (write) {
+                        // TAG fail will cause NPE.
+                        boolean upg = lock.upgradeToWriteLock(lockPtr, TAG_0);
+
+                        writes.incrementAndGet();
+
+                        if (upg) {
+                            successfulUpgrades.incrementAndGet();
+                        }
+
+                        int delta = rnd.nextInt(100_000);
+
+                        pair.left += delta;
+                        pair.right -= delta;
+                    }
+                } finally {
+                    if (write) {
+                        lock.writeUnlock(lockPtr, TAG_0);
+                    } else {
+                        lock.readUnlock(lockPtr);
+                    }
+                }
+
+                reads.incrementAndGet();
+            }
+
+            return null;
+        }, 32, "tester");
+
+        for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+            Thread.sleep(SLEEP_TIME);
+
+            log.info("Reads=" + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0) + ", upgrades=" + successfulUpgrades.getAndSet(0));
+        }
+
+        done.set(true);
+
+        fut.get();
+
+        validate(data);
+    }
+
+    @Test
+    public void testTagIdUpdateWait() throws Exception {
+        checkTagIdUpdate(true);
+    }
+
+    @Test
+    public void testTagIdUpdateContinuous() throws Exception {
+        checkTagIdUpdate(false);
+    }
+
+    private void checkTagIdUpdate(final boolean waitBeforeSwitch) throws Exception {
+        final int numPairs = 100;
+        final Pair[] data = new Pair[numPairs];
+
+        for (int i = 0; i < numPairs; i++) {
+            data[i] = new Pair();
+        }
+
+        final OffheapReadWriteLock lock = new OffheapReadWriteLock(16);
+
+        final long ptr = GridUnsafe.allocateMemory(OffheapReadWriteLock.LOCK_SIZE);
+
+        lock.init(ptr, TAG_0);
+
+        final AtomicInteger reads = new AtomicInteger();
+        final AtomicInteger writes = new AtomicInteger();
+        final AtomicBoolean done = new AtomicBoolean(false);
+        final AtomicBoolean run = new AtomicBoolean(true);
+
+        final int threadCnt = 32;
+
+        final CyclicBarrier barr = new CyclicBarrier(threadCnt, () -> {
+            if (done.get()) {
+                run.set(false);
+            }
+        });
+
+        CompletableFuture<Long> fut = IgniteTestUtils.runMultiThreadedAsync(() -> {
+            try {
+                ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+                int tag = TAG_0;
+
+                long lastSwitch = System.currentTimeMillis();
+
+                while (run.get()) {
+                    boolean write = rnd.nextInt(10) < 2;
+
+                    boolean locked;
+
+                    boolean switched = false;
+
+                    if (write) {
+                        locked = lock.writeLock(ptr, tag);
+
+                        if (locked) {
+                            try {
+                                assertTrue(lock.isWriteLocked(ptr));
+                                assertFalse(lock.isReadLocked(ptr));
+
+                                int idx = rnd.nextInt(numPairs);
+                                int delta = rnd.nextInt(100_000);
+
+                                data[idx].left += delta;
+                                data[idx].right -= delta;
+                            } finally {
+                                switched = System.currentTimeMillis() - lastSwitch > 20 || !waitBeforeSwitch;
+
+                                if (switched && waitBeforeSwitch) {
+                                    log.info("Switching...");
+                                }
+
+                                int tag1 = (tag + (switched ? 1 : 0)) & 0xFFFF;
+
+                                if (tag1 == 0) {
+                                    tag1 = 1;
+                                }
+
+                                lock.writeUnlock(ptr, tag1);
+                            }
+
+                            writes.incrementAndGet();
+                        }
+                    } else {
+                        locked = lock.readLock(ptr, tag);
+
+                        if (locked) {
+                            try {
+                                assert locked;
+
+                                assertFalse(lock.isWriteLocked(ptr));
+                                assertTrue(lock.isReadLocked(ptr));
+
+                                for (int i1 = 0; i1 < data.length; i1++) {
+                                    Pair pair = data[i1];
+
+                                    assertEquals(pair.left, -pair.right, "Failed check for index: " + i1);
+                                }
+                            } finally {
+                                lock.readUnlock(ptr);
+                            }
+
+                            reads.incrementAndGet();
+                        }
+                    }
+
+                    if (!locked || switched) {
+                        try {
+                            barr.await();
+                        } catch (BrokenBarrierException e) {
+                            // Done.
+                            log.error(e.getMessage(), e);
+
+                            return null;
+                        }
+
+                        tag = (tag + 1) & 0xFFFF;
+
+                        if (tag == 0) {
+                            tag = 1;
+                        }
+
+                        if (waitBeforeSwitch || (!waitBeforeSwitch && tag == 1)) {
+                            log.info("Switch to a new tag: " + tag);
+                        }
+
+                        lastSwitch = System.currentTimeMillis();
+                    }
+                }
+            } catch (Throwable e) {
+                log.error(e.getMessage(), e);
+            }
+
+            return null;
+        }, threadCnt, "tester");
+
+        for (int i = 0; i < ROUNDS_PER_TEST; i++) {
+            Thread.sleep(SLEEP_TIME);
+
+            log.info("Reads: " + reads.getAndSet(0) + ", writes=" + writes.getAndSet(0));
+        }
+
+        done.set(true);
+
+        fut.get();
+
+        validate(data);
+    }
+
+    /**
+     * Validates data integrity.
+     *
+     * @param data Data to validate.
+     */
+    private void validate(Pair[] data) {
+        for (int i = 0; i < data.length; i++) {
+            Pair pair = data[i];
+
+            assertEquals(pair.left, -pair.right, "Failed for index: " + i);
+        }
+    }
+
+    /** Pair of integers. */
+    private static class Pair {
+        /** Left value of the pair. */
+        private int left;
+
+        /** Right value of the pair. */
+        private int right;
+    }
+}
diff --git a/modules/page-memory/pom.xml b/modules/page-memory/pom.xml
index 9785896..55a8871 100644
--- a/modules/page-memory/pom.xml
+++ b/modules/page-memory/pom.xml
@@ -35,6 +35,11 @@
     <dependencies>
         <dependency>
             <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-api</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
             <artifactId>ignite-core</artifactId>
         </dependency>
 
@@ -44,5 +49,25 @@
             <artifactId>junit-jupiter-engine</artifactId>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-core</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.ignite</groupId>
+            <artifactId>ignite-configuration</artifactId>
+            <scope>test</scope>
+            <type>test-jar</type>
+        </dependency>
     </dependencies>
 </project>
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java
index 88e63f3..80d92ed 100644
--- a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/PageMemory.java
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 import org.apache.ignite.internal.pagememory.io.PageIo;
 import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.lang.IgniteInternalException;
 
 /**
  * Class responsible for pages storage and handling.
@@ -27,6 +28,18 @@
 //TODO IGNITE-16350 Improve javadoc in this class.
 public interface PageMemory extends PageIdAllocator, PageSupport {
     /**
+     * Starts page memory.
+     */
+    void start() throws IgniteInternalException;
+
+    /**
+     * Stops page memory.
+     *
+     * @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse on subsequent {@link #start()}
+     */
+    void stop(boolean deallocate) throws IgniteInternalException;
+
+    /**
      * Returns a page's size in bytes.
      */
     int pageSize();
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
new file mode 100644
index 0000000..dd69e13
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoStoreImpl.java
@@ -0,0 +1,890 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.impl;
+
+import static org.apache.ignite.internal.util.GridUnsafe.wrapPointer;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionView;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolder;
+import org.apache.ignite.internal.pagememory.metric.IoStatisticsHolderNoOp;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.OffheapReadWriteLock;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.apache.ignite.lang.IgniteSystemProperties;
+
+/**
+ * Page header structure is described by the following diagram.
+ *
+ * <p>When page is not allocated (in a free list):
+ * <pre>
+ * +--------+--------+---------------------------------------------+
+ * |8 bytes |8 bytes |     PAGE_SIZE + PAGE_OVERHEAD - 16 bytes    |
+ * +--------+--------+---------------------------------------------+
+ * |Next ptr|Rel ptr |              Empty                          |
+ * +--------+--------+---------------------------------------------+
+ * </pre>
+ * <p/>
+ * When page is allocated and is in use:
+ * <pre>
+ * +--------+--------+--------+--------+---------------------------+
+ * |8 bytes |8 bytes |8 bytes |8 bytes |        PAGE_SIZE          |
+ * +--------+--------+--------+--------+---------------------------+
+ * | Marker |Page ID |Pin CNT |  Lock  |        Page data          |
+ * +--------+--------+--------+--------+---------------------------+
+ * </pre>
+ *
+ * <p>Note that first 8 bytes of page header are used either for page marker or for next relative pointer depending
+ * on whether the page is in use or not.
+ */
+public class PageMemoryNoStoreImpl implements PageMemory {
+    /** Logger. */
+    private static final IgniteLogger LOG = IgniteLogger.forClass(PageMemoryNoStoreImpl.class);
+
+    /** Ignite page memory concurrency level. */
+    private static final String IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL = "IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL";
+
+    /** Marker bytes that signify beginning of used page in memory. */
+    public static final long PAGE_MARKER = 0xBEEAAFDEADBEEF01L;
+
+    /** Full relative pointer mask. */
+    private static final long RELATIVE_PTR_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Invalid relative pointer value. */
+    private static final long INVALID_REL_PTR = RELATIVE_PTR_MASK;
+
+    /** Address mask to avoid ABA problem. */
+    private static final long ADDRESS_MASK = 0xFFFFFFFFFFFFFFL;
+
+    /** Counter mask to avoid ABA problem. */
+    private static final long COUNTER_MASK = ~ADDRESS_MASK;
+
+    /** Counter increment to avoid ABA problem. */
+    private static final long COUNTER_INC = ADDRESS_MASK + 1;
+
+    /** Page ID offset. */
+    public static final int PAGE_ID_OFFSET = 8;
+
+    /** Page pin counter offset. */
+    public static final int LOCK_OFFSET = 16;
+
+    /**
+     * Need a 8-byte pointer for linked list, 8 bytes for internal needs (flags),
+     * 4 bytes cache ID, 8 bytes timestamp.
+     */
+    public static final int PAGE_OVERHEAD = LOCK_OFFSET + OffheapReadWriteLock.LOCK_SIZE;
+
+    /** Number of bits required to store segment index. */
+    private static final int SEG_BITS = 4;
+
+    /** Number of bits required to store segment index. */
+    private static final int SEG_CNT = (1 << SEG_BITS);
+
+    /** Number of bits left to store page index. */
+    private static final int IDX_BITS = PageIdUtils.PAGE_IDX_SIZE - SEG_BITS;
+
+    /** Segment mask. */
+    private static final int SEG_MASK = ~(-1 << SEG_BITS);
+
+    /** Index mask. */
+    private static final int IDX_MASK = ~(-1 << IDX_BITS);
+
+    /** Page size. */
+    private final int sysPageSize;
+
+    /** Direct memory allocator. */
+    private final DirectMemoryProvider directMemoryProvider;
+
+    /** Data region configuration view. */
+    private final PageMemoryDataRegionView dataRegionCfg;
+
+    /** Head of the singly linked list of free pages. */
+    private final AtomicLong freePageListHead = new AtomicLong(INVALID_REL_PTR);
+
+    /** Segments array. */
+    private volatile Segment[] segments;
+
+    /** Lock for segments changes. */
+    private final Object segmentsLock = new Object();
+
+    /** Total number of pages loaded into memory. */
+    private final AtomicInteger allocatedPages = new AtomicInteger();
+
+    /** Offheap read write lock instance. */
+    private final OffheapReadWriteLock rwLock;
+
+    /** Concurrency level. */
+    private final int lockConcLvl = IgniteSystemProperties.getInteger(
+            IGNITE_OFFHEAP_LOCK_CONCURRENCY_LEVEL,
+            Integer.highestOneBit(Runtime.getRuntime().availableProcessors() * 4)
+    );
+
+    /** Total number of pages may be allocated for this instance. */
+    private final int totalPages;
+
+    /** Flag for enabling of acquired pages tracking. */
+    private final boolean trackAcquiredPages;
+
+    /** Page IO registry. */
+    private final PageIoRegistry ioRegistry;
+
+    /**
+     * {@code False} if memory was not started or already stopped and is not supposed for any usage.
+     */
+    private volatile boolean started;
+
+    /**
+     * Constructor.
+     *
+     * @param directMemoryProvider Memory allocator to use.
+     * @param dataRegionCfg Data region configuration.
+     * @param ioRegistry IO registry.
+     */
+    public PageMemoryNoStoreImpl(
+            DirectMemoryProvider directMemoryProvider,
+            PageMemoryDataRegionConfiguration dataRegionCfg,
+            PageIoRegistry ioRegistry
+    ) {
+        this.directMemoryProvider = directMemoryProvider;
+        this.ioRegistry = ioRegistry;
+        this.trackAcquiredPages = false;
+        this.dataRegionCfg = (PageMemoryDataRegionView) dataRegionCfg.value();
+
+        int pageSize = this.dataRegionCfg.pageSize();
+
+        sysPageSize = pageSize + PAGE_OVERHEAD;
+
+        assert sysPageSize % 8 == 0 : sysPageSize;
+
+        totalPages = (int) (this.dataRegionCfg.maxSize() / sysPageSize);
+
+        rwLock = new OffheapReadWriteLock(lockConcLvl);
+    }
+
+    @Override
+    public void start() throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (started) {
+                return;
+            }
+
+            started = true;
+
+            long startSize = dataRegionCfg.initSize();
+            long maxSize = dataRegionCfg.maxSize();
+
+            long[] chunks = new long[SEG_CNT];
+
+            chunks[0] = startSize;
+
+            long total = startSize;
+
+            long allocChunkSize = Math.max((maxSize - startSize) / (SEG_CNT - 1), 256L * 1024 * 1024);
+
+            int lastIdx = 0;
+
+            for (int i = 1; i < SEG_CNT; i++) {
+                long allocSize = Math.min(allocChunkSize, maxSize - total);
+
+                if (allocSize <= 0) {
+                    break;
+                }
+
+                chunks[i] = allocSize;
+
+                total += allocSize;
+
+                lastIdx = i;
+            }
+
+            if (lastIdx != SEG_CNT - 1) {
+                chunks = Arrays.copyOf(chunks, lastIdx + 1);
+            }
+
+            if (segments == null) {
+                directMemoryProvider.initialize(chunks);
+            }
+
+            addSegment(null);
+        }
+    }
+
+    @Override
+    public void stop(boolean deallocate) throws IgniteInternalException {
+        synchronized (segmentsLock) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stopping page memory.");
+            }
+
+            started = false;
+
+            directMemoryProvider.shutdown(deallocate);
+
+            if (directMemoryProvider instanceof Closeable) {
+                try {
+                    ((Closeable) directMemoryProvider).close();
+                } catch (IOException e) {
+                    throw new IgniteInternalException(e);
+                }
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer pageBuffer(long pageAddr) {
+        return wrapPointer(pageAddr, pageSize());
+    }
+
+    /** {@inheritDoc} */
+    @Override public long allocatePage(int grpId, int partId, byte flags) {
+        assert started;
+
+        long relPtr = borrowFreePage();
+        long absPtr = 0;
+
+        if (relPtr != INVALID_REL_PTR) {
+            int pageIdx = PageIdUtils.pageIndex(relPtr);
+
+            Segment seg = segment(pageIdx);
+
+            absPtr = seg.absolute(pageIdx);
+        } else {
+            // No segments contained a free page.
+            Segment[] seg0 = segments;
+            Segment allocSeg = seg0[seg0.length - 1];
+
+            while (allocSeg != null) {
+                relPtr = allocSeg.allocateFreePage(flags);
+
+                if (relPtr != INVALID_REL_PTR) {
+                    absPtr = allocSeg.absolute(PageIdUtils.pageIndex(relPtr));
+
+                    allocatedPages.incrementAndGet();
+
+                    break;
+                } else {
+                    allocSeg = addSegment(seg0);
+                }
+            }
+        }
+
+        if (relPtr == INVALID_REL_PTR) {
+            IgniteOutOfMemoryException oom = new IgniteOutOfMemoryException("Out of memory in data region ["
+                    + "name=" + dataRegionCfg.name()
+                    + ", initSize=" + IgniteUtils.readableSize(dataRegionCfg.initSize(), false)
+                    + ", maxSize=" + IgniteUtils.readableSize(dataRegionCfg.maxSize(), false)
+                    + ", persistenceEnabled=" + dataRegionCfg.persistent() + "] Try the following:\n"
+                    + "  ^-- Increase maximum off-heap memory size (DataRegionConfiguration.maxSize)\n"
+                    + "  ^-- Enable Ignite persistence (DataRegionConfiguration.persistenceEnabled)\n"
+                    + "  ^-- Enable eviction or expiration policies"
+            );
+
+            //TODO Fail node with failure handler.
+
+            throw oom;
+        }
+
+        assert (relPtr & ~PageIdUtils.PAGE_IDX_MASK) == 0 : IgniteUtils.hexLong(relPtr & ~PageIdUtils.PAGE_IDX_MASK);
+
+        // Assign page ID according to flags and partition ID.
+        long pageId = PageIdUtils.pageId(partId, flags, (int) relPtr);
+
+        writePageId(absPtr, pageId);
+
+        GridUnsafe.zeroMemory(absPtr + PAGE_OVERHEAD, sysPageSize - PAGE_OVERHEAD);
+
+        return pageId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean freePage(int grpId, long pageId) {
+        assert started;
+
+        releaseFreePage(pageId);
+
+        return true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int pageSize() {
+        return sysPageSize - PAGE_OVERHEAD;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int systemPageSize() {
+        return sysPageSize;
+    }
+
+    /** {@inheritDoc} */
+    @Override public int realPageSize(int grpId) {
+        return pageSize();
+    }
+
+    /** {@inheritDoc} */
+    @Override public long loadedPages() {
+        return allocatedPages.get();
+    }
+
+    /**
+     * Returns a total number of pages may be allocated for this instance.
+     */
+    public int totalPages() {
+        return totalPages;
+    }
+
+    /**
+     * Return a total number of acquired pages.
+     */
+    public long acquiredPages() {
+        long total = 0;
+
+        for (Segment seg : segments) {
+            seg.readLock().lock();
+
+            try {
+                int acquired = seg.acquiredPages();
+
+                assert acquired >= 0;
+
+                total += acquired;
+            } finally {
+                seg.readLock().unlock();
+            }
+        }
+
+        return total;
+    }
+
+    /**
+     * Writes page ID to the page at the given absolute position.
+     *
+     * @param absPtr Absolute memory pointer to the page header.
+     * @param pageId Page ID to write.
+     */
+    private void writePageId(long absPtr, long pageId) {
+        GridUnsafe.putLong(absPtr + PAGE_ID_OFFSET, pageId);
+    }
+
+    /**
+     * Returns the segment that contains given page.
+     *
+     * @param pageIdx Page index.
+     * @return Segment.
+     */
+    private Segment segment(int pageIdx) {
+        int segIdx = segmentIndex(pageIdx);
+
+        return segments[segIdx];
+    }
+
+    /**
+     * Extracts a segment index from the full page index.
+     *
+     * @param pageIdx Page index to extract segment index from.
+     * @return Segment index.
+     */
+    private int segmentIndex(long pageIdx) {
+        return (int) ((pageIdx >> IDX_BITS) & SEG_MASK);
+    }
+
+    /**
+     * Creates a full page index.
+     *
+     * @param segIdx Segment index.
+     * @param pageIdx Page index inside of the segment.
+     * @return Full page index.
+     */
+    private long fromSegmentIndex(int segIdx, long pageIdx) {
+        long res = 0;
+
+        res = (res << SEG_BITS) | (segIdx & SEG_MASK);
+        res = (res << IDX_BITS) | (pageIdx & IDX_MASK);
+
+        return res;
+    }
+
+    // *** PageSupport methods ***
+
+    /** {@inheritDoc} */
+    @Override public long acquirePage(int cacheId, long pageId) {
+        return acquirePage(cacheId, pageId, IoStatisticsHolderNoOp.INSTANCE);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long acquirePage(int cacheId, long pageId, IoStatisticsHolder statHolder) {
+        assert started;
+
+        int pageIdx = PageIdUtils.pageIndex(pageId);
+
+        Segment seg = segment(pageIdx);
+
+        long absPtr = seg.acquirePage(pageIdx);
+
+        statHolder.trackLogicalRead(absPtr + PAGE_OVERHEAD);
+
+        return absPtr;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void releasePage(int cacheId, long pageId, long page) {
+        assert started;
+
+        if (trackAcquiredPages) {
+            Segment seg = segment(PageIdUtils.pageIndex(pageId));
+
+            seg.onPageRelease();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLock(int cacheId, long pageId, long page) {
+        assert started;
+
+        if (rwLock.readLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId))) {
+            return page + PAGE_OVERHEAD;
+        }
+
+        return 0L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long readLockForce(int cacheId, long pageId, long page) {
+        assert started;
+
+        if (rwLock.readLock(page + LOCK_OFFSET, -1)) {
+            return page + PAGE_OVERHEAD;
+        }
+
+        return 0L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readUnlock(int cacheId, long pageId, long page) {
+        assert started;
+
+        rwLock.readUnlock(page + LOCK_OFFSET);
+    }
+
+    /** {@inheritDoc} */
+    @Override public long writeLock(int cacheId, long pageId, long page) {
+        assert started;
+
+        if (rwLock.writeLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId))) {
+            return page + PAGE_OVERHEAD;
+        }
+
+        return 0L;
+    }
+
+    /** {@inheritDoc} */
+    @Override public long tryWriteLock(int cacheId, long pageId, long page) {
+        assert started;
+
+        if (rwLock.tryWriteLock(page + LOCK_OFFSET, PageIdUtils.tag(pageId))) {
+            return page + PAGE_OVERHEAD;
+        }
+
+        return 0L;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void writeUnlock(
+            int cacheId,
+            long pageId,
+            long page,
+            boolean dirtyFlag
+    ) {
+        assert started;
+
+        long actualId = PageIo.getPageId(page + PAGE_OVERHEAD);
+
+        rwLock.writeUnlock(page + LOCK_OFFSET, PageIdUtils.tag(actualId));
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isDirty(int cacheId, long pageId, long page) {
+        // always false for page no store.
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public PageIoRegistry ioRegistry() {
+        return ioRegistry;
+    }
+
+    /**
+     * Converts page index into a sequence number.
+     *
+     * @param pageIdx Page index.
+     * @return Total page sequence number.
+     */
+    public int pageSequenceNumber(int pageIdx) {
+        Segment seg = segment(pageIdx);
+
+        return seg.sequenceNumber(pageIdx);
+    }
+
+    /**
+     * Converts sequential page number to the page index.
+     *
+     * @param seqNo Page sequence number.
+     * @return Page index.
+     */
+    public int pageIndex(int seqNo) {
+        Segment[] segs = segments;
+
+        int low = 0;
+        int high = segs.length - 1;
+
+        while (low <= high) {
+            int mid = (low + high) >>> 1;
+
+            Segment seg = segs[mid];
+
+            int cmp = seg.containsPageBySequence(seqNo);
+
+            if (cmp < 0) {
+                high = mid - 1;
+            } else if (cmp > 0) {
+                low = mid + 1;
+            } else {
+                return seg.pageIndex(seqNo);
+            }
+        }
+
+        throw new IgniteInternalException("Allocated page must always be present in one of the segments [seqNo=" + seqNo
+                + ", segments=" + Arrays.toString(segs) + ']');
+    }
+
+    /**
+     * Adds a page to the free pages list.
+     *
+     * @param pageId Page ID to release.
+     */
+    private void releaseFreePage(long pageId) {
+        int pageIdx = PageIdUtils.pageIndex(pageId);
+
+        // Clear out flags and file ID.
+        long relPtr = PageIdUtils.pageId(0, (byte) 0, pageIdx);
+
+        Segment seg = segment(pageIdx);
+
+        long absPtr = seg.absolute(pageIdx);
+
+        // Second, write clean relative pointer instead of page ID.
+        writePageId(absPtr, relPtr);
+
+        // Third, link the free page.
+        while (true) {
+            long freePageRelPtrMasked = freePageListHead.get();
+
+            long freePageRelPtr = freePageRelPtrMasked & RELATIVE_PTR_MASK;
+
+            GridUnsafe.putLong(absPtr, freePageRelPtr);
+
+            if (freePageListHead.compareAndSet(freePageRelPtrMasked, relPtr)) {
+                allocatedPages.decrementAndGet();
+
+                return;
+            }
+        }
+    }
+
+    /**
+     * Returns a relative pointer to a free page that was borrowed from the allocated pool.
+     */
+    private long borrowFreePage() {
+        while (true) {
+            long freePageRelPtrMasked = freePageListHead.get();
+
+            long freePageRelPtr = freePageRelPtrMasked & ADDRESS_MASK;
+
+            // no free pages available
+            if (freePageRelPtr == INVALID_REL_PTR) {
+                return INVALID_REL_PTR;
+            }
+
+            int pageIdx = PageIdUtils.pageIndex(freePageRelPtr);
+
+            Segment seg = segment(pageIdx);
+
+            long freePageAbsPtr = seg.absolute(pageIdx);
+            long nextFreePageRelPtr = GridUnsafe.getLong(freePageAbsPtr) & ADDRESS_MASK;
+            long cnt = ((freePageRelPtrMasked & COUNTER_MASK) + COUNTER_INC) & COUNTER_MASK;
+
+            if (freePageListHead.compareAndSet(freePageRelPtrMasked, nextFreePageRelPtr | cnt)) {
+                GridUnsafe.putLong(freePageAbsPtr, PAGE_MARKER);
+
+                allocatedPages.incrementAndGet();
+
+                return freePageRelPtr;
+            }
+        }
+    }
+
+    /**
+     * Attempts to add a new memory segment.
+     *
+     * @param oldRef Old segments array. If this method observes another segments array, it will allocate a new
+     *      segment (if possible). If the array has already been updated, it will return the last element in the
+     *      new array.
+     * @return Added segment, if successfull, {@code null} if failed to add.
+     */
+    private synchronized Segment addSegment(Segment[] oldRef) {
+        if (segments == oldRef) {
+            DirectMemoryRegion region = directMemoryProvider.nextRegion();
+
+            // No more memory is available.
+            if (region == null) {
+                return null;
+            }
+
+            if (oldRef != null) {
+                if (LOG.isInfoEnabled()) {
+                    LOG.info("Allocated next memory segment [plcName=" + dataRegionCfg.name()
+                            + ", chunkSize=" + IgniteUtils.readableSize(region.size(), true) + ']');
+                }
+            }
+
+            Segment[] newRef = new Segment[oldRef == null ? 1 : oldRef.length + 1];
+
+            if (oldRef != null) {
+                System.arraycopy(oldRef, 0, newRef, 0, oldRef.length);
+            }
+
+            Segment lastSeg = oldRef == null ? null : oldRef[oldRef.length - 1];
+
+            Segment allocated = new Segment(newRef.length - 1, region, lastSeg == null ? 0 : lastSeg.sumPages());
+
+            allocated.init();
+
+            newRef[newRef.length - 1] = allocated;
+
+            segments = newRef;
+        }
+
+        // Only this synchronized method writes to segments, so it is safe to read twice.
+        return segments[segments.length - 1];
+    }
+
+    private class Segment extends ReentrantReadWriteLock {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Segment index. */
+        private final int idx;
+
+        /** Direct memory chunk. */
+        private final DirectMemoryRegion region;
+
+        /** Last allocated page index. */
+        private long lastAllocatedIdxPtr;
+
+        /** Base address for all pages. */
+        private long pagesBase;
+
+        /** Capacity of all previous segments combined. */
+        private final int pagesInPrevSegments;
+
+        /** Segments capacity. */
+        private int maxPages;
+
+        /** Total number of currently acquired pages. */
+        private final AtomicInteger acquiredPages;
+
+        /**
+         * Constructor.
+         *
+         * @param idx Index.
+         * @param region Memory region to use.
+         * @param pagesInPrevSegments Number of pages in previously allocated segments.
+         */
+        private Segment(int idx, DirectMemoryRegion region, int pagesInPrevSegments) {
+            this.idx = idx;
+            this.region = region;
+            this.pagesInPrevSegments = pagesInPrevSegments;
+
+            acquiredPages = new AtomicInteger();
+        }
+
+        /**
+         * Initializes page memory segment.
+         */
+        private void init() {
+            long base = region.address();
+
+            lastAllocatedIdxPtr = base;
+
+            base += 8;
+
+            // Align by 8 bytes.
+            pagesBase = (base + 7) & ~0x7;
+
+            GridUnsafe.putLong(lastAllocatedIdxPtr, 0);
+
+            long limit = region.address() + region.size();
+
+            maxPages = (int) ((limit - pagesBase) / sysPageSize);
+        }
+
+        /**
+         * Acquires a page from the segment..
+         *
+         * @param pageIdx Page index.
+         * @return Page absolute pointer.
+         */
+        private long acquirePage(int pageIdx) {
+            long absPtr = absolute(pageIdx);
+
+            assert absPtr % 8 == 0 : absPtr;
+
+            if (trackAcquiredPages) {
+                acquiredPages.incrementAndGet();
+            }
+
+            return absPtr;
+        }
+
+        private void onPageRelease() {
+            acquiredPages.decrementAndGet();
+        }
+
+        /**
+         * Returns absolute pointer to the page with given index.
+         *
+         * @param pageIdx Page index.
+         * @return Absolute pointer.
+         */
+        private long absolute(int pageIdx) {
+            pageIdx &= IDX_MASK;
+
+            long off = ((long) pageIdx) * sysPageSize;
+
+            return pagesBase + off;
+        }
+
+        /**
+         * Converts page index into a sequence number.
+         *
+         * @param pageIdx Page index with encoded segment.
+         * @return Absolute page sequence number.
+         */
+        private int sequenceNumber(int pageIdx) {
+            pageIdx &= IDX_MASK;
+
+            return pagesInPrevSegments + pageIdx;
+        }
+
+        /**
+         * Returns a page sequence number upper bound.
+         */
+        private int sumPages() {
+            return pagesInPrevSegments + maxPages;
+        }
+
+        /**
+         * Returns a total number of currently acquired pages.
+         */
+        private int acquiredPages() {
+            return acquiredPages.get();
+        }
+
+        /**
+         * Allocates new page in the segment.
+         *
+         * @param tag Tag to initialize RW lock.
+         * @return Relative pointer of the allocated page or {@link #INVALID_REL_PTR} if segment is overflown.
+         */
+        private long allocateFreePage(int tag) {
+            long limit = region.address() + region.size();
+
+            while (true) {
+                long lastIdx = GridUnsafe.getLongVolatile(null, lastAllocatedIdxPtr);
+
+                // Check if we have enough space to allocate a page.
+                if (pagesBase + (lastIdx + 1) * sysPageSize > limit) {
+                    return INVALID_REL_PTR;
+                }
+
+                if (GridUnsafe.compareAndSwapLong(null, lastAllocatedIdxPtr, lastIdx, lastIdx + 1)) {
+                    long absPtr = pagesBase + lastIdx * sysPageSize;
+
+                    assert lastIdx <= PageIdUtils.MAX_PAGE_NUM : lastIdx;
+
+                    long pageIdx = fromSegmentIndex(idx, lastIdx);
+
+                    assert pageIdx != INVALID_REL_PTR;
+
+                    writePageId(absPtr, pageIdx);
+
+                    GridUnsafe.putLong(absPtr, PAGE_MARKER);
+
+                    rwLock.init(absPtr + LOCK_OFFSET, tag);
+
+                    return pageIdx;
+                }
+            }
+        }
+
+        /**
+         * Checks if given sequence number belongs to the current segment.
+         *
+         * @param seqNo Page sequence number.
+         * @return {@code 0} if this segment contains the page with the given sequence number,
+         *      {@code -1} if one of the previous segments contains the page with the given sequence number,
+         *      {@code 1} if one of the next segments contains the page with the given sequence number.
+         */
+        public int containsPageBySequence(int seqNo) {
+            if (seqNo < pagesInPrevSegments) {
+                return -1;
+            } else if (seqNo < pagesInPrevSegments + maxPages) {
+                return 0;
+            } else {
+                return 1;
+            }
+        }
+
+        /**
+         * Converts sequential page number to the page index.
+         *
+         * @param seqNo Page sequence number.
+         * @return Page index
+         */
+        public int pageIndex(int seqNo) {
+            return PageIdUtils.pageIndex(fromSegmentIndex(idx, seqNo - pagesInPrevSegments));
+        }
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
new file mode 100644
index 0000000..f8dfcdc
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryProvider.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+/**
+ * Direct memory provider interface. Not thread-safe.
+ */
+public interface DirectMemoryProvider {
+    /**
+     * Initializes provider with the chunk sizes.
+     *
+     * @param chunkSizes Chunk sizes.
+     */
+    public void initialize(long[] chunkSizes);
+
+    /**
+     * Shuts down the provider.
+     *
+     * @param deallocate {@code True} to deallocate memory, {@code false} to allow memory reuse.
+     */
+    public void shutdown(boolean deallocate);
+
+    /**
+     * Attempts to allocate next memory region. Will return {@code null} if no more regions are available.
+     *
+     * @return Next memory region.
+     */
+    public DirectMemoryRegion nextRegion();
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryRegion.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryRegion.java
new file mode 100644
index 0000000..1070c96
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/DirectMemoryRegion.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+/**
+ * Interface for a direct memory region allocated by provider.
+ */
+public interface DirectMemoryRegion {
+    /**
+     * Returns a region start address.
+     */
+    public long address();
+
+    /**
+     * Returns a region size.
+     */
+    public long size();
+
+    /**
+     * Creates a sub-region of this region starting from the given offset.
+     *
+     * @param offset Offset within this region.
+     * @return Sub-region.
+     */
+    public DirectMemoryRegion slice(long offset);
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/IgniteOutOfMemoryException.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/IgniteOutOfMemoryException.java
new file mode 100644
index 0000000..a7e8647
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/IgniteOutOfMemoryException.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+import java.io.IOException;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.lang.IgniteInternalException;
+
+/**
+ * Exception that signifies that there's no more available page in the data region.
+ *
+ * @see PageMemory#allocatePage(int, int, byte)
+ */
+public class IgniteOutOfMemoryException extends IgniteInternalException {
+    /** Serial version uid. */
+    private static final long serialVersionUID = 0L;
+
+    /**
+     * Constructor.
+     */
+    public IgniteOutOfMemoryException() {
+        // No-op.
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param msg Error message.
+     */
+    public IgniteOutOfMemoryException(String msg) {
+        super(msg);
+    }
+
+    /**
+     * Constructor.
+     *
+     * @param msg Error message.
+     * @param e Cause exception.
+     */
+    public IgniteOutOfMemoryException(String msg, IOException e) {
+        super(msg, e);
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/MemoryAllocator.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/MemoryAllocator.java
new file mode 100644
index 0000000..4918610
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/MemoryAllocator.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem;
+
+/**
+ * Base interface for offheap memory allocator.
+ */
+public interface MemoryAllocator {
+    /**
+     * Allocates memory.
+     *
+     * @param size Size of allocated memory.
+     *
+     * @return Pointer to memory or {@code 0} if failed.
+     */
+    public long allocateMemory(long size);
+
+    /**
+     * Deallocates memory.
+     *
+     * @param addr Address of memory.
+     */
+    public void freeMemory(long addr);
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeChunk.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeChunk.java
new file mode 100644
index 0000000..72c2c3c
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeChunk.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem.unsafe;
+
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.IgniteUtils;
+
+/**
+ * Basic implementation of {@link DirectMemoryRegion} that stores direct memory address and the length of the region.
+ */
+public class UnsafeChunk implements DirectMemoryRegion {
+    /** Raw pointer. */
+    private long ptr;
+
+    /** Size of the chunk. */
+    private long len;
+
+    /**
+     * Constructor.
+     *
+     * @param ptr Pointer to the memory start.
+     * @param len Memory length.
+     */
+    public UnsafeChunk(long ptr, long len) {
+        this.ptr = ptr;
+        this.len = len;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long address() {
+        return ptr;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long size() {
+        return len;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public DirectMemoryRegion slice(long offset) {
+        if (offset < 0 || offset >= len) {
+            throw new IllegalArgumentException("Failed to create a memory region slice [ptr=" + IgniteUtils.hexLong(ptr)
+                    + ", len=" + len + ", offset=" + offset + ']');
+        }
+
+        return new UnsafeChunk(ptr + offset, len - offset);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String toString() {
+        return S.toString(UnsafeChunk.class, this);
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryAllocator.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryAllocator.java
new file mode 100644
index 0000000..784424a
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryAllocator.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem.unsafe;
+
+import org.apache.ignite.internal.pagememory.mem.MemoryAllocator;
+import org.apache.ignite.internal.util.GridUnsafe;
+
+/** Memory allocator implementation that uses {@link GridUnsafe}. */
+public class UnsafeMemoryAllocator implements MemoryAllocator {
+    /** {@inheritDoc} */
+    @Override
+    public long allocateMemory(long size) {
+        return GridUnsafe.allocateMemory(size);
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void freeMemory(long addr) {
+        GridUnsafe.freeMemory(addr);
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryProvider.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryProvider.java
new file mode 100644
index 0000000..1b23185
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/mem/unsafe/UnsafeMemoryProvider.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.mem.unsafe;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryRegion;
+import org.apache.ignite.internal.pagememory.mem.MemoryAllocator;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteInternalException;
+import org.apache.ignite.lang.IgniteLogger;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Memory provider implementation based on unsafe memory access.
+ *
+ * <p>Supports memory reuse semantics.
+ */
+public class UnsafeMemoryProvider implements DirectMemoryProvider {
+    /** Logger.*/
+    private static final IgniteLogger LOG = IgniteLogger.forClass(UnsafeMemoryProvider.class);
+
+    /** Array with memory regions sizes. */
+    private long[] sizes;
+
+    /** List of allocated memory regions. */
+    private List<DirectMemoryRegion> regions;
+
+    /** Flag shows if current memory provider have been already initialized. */
+    private boolean isInit;
+
+    /** Number of used data regions in {@link #regions} list. */
+    private int used = 0;
+
+    /** Memory allocator. */
+    private final MemoryAllocator allocator;
+
+    /**
+     * Public constructor.
+     *
+     * @param allocator Memory allocator. If {@code null}, default {@link UnsafeMemoryAllocator} will be used.
+     */
+    public UnsafeMemoryProvider(@Nullable MemoryAllocator allocator) {
+        this.allocator = allocator == null ? new UnsafeMemoryAllocator() : allocator;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void initialize(long[] sizes) {
+        if (isInit) {
+            return;
+        }
+
+        this.sizes = sizes;
+
+        regions = new ArrayList<>();
+
+        isInit = true;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void shutdown(boolean deallocate) {
+        if (!deallocate) {
+            used = 0;
+
+            return;
+        }
+
+        if (regions != null) {
+            for (Iterator<DirectMemoryRegion> it = regions.iterator(); it.hasNext(); ) {
+                DirectMemoryRegion chunk = it.next();
+
+                allocator.freeMemory(chunk.address());
+
+                // Safety.
+                it.remove();
+            }
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public DirectMemoryRegion nextRegion() {
+        if (used == sizes.length) {
+            return null;
+        }
+
+        if (used < regions.size()) {
+            return regions.get(used++);
+        }
+
+        long chunkSize = sizes[regions.size()];
+
+        long ptr;
+
+        try {
+            ptr = allocator.allocateMemory(chunkSize);
+        } catch (IllegalArgumentException e) {
+            String msg = "Failed to allocate next memory chunk: " + IgniteUtils.readableSize(chunkSize, true)
+                    + ". Check if chunkSize is too large and 32-bit JVM is used.";
+
+            if (regions.isEmpty()) {
+                throw new IgniteInternalException(msg, e);
+            }
+
+            LOG.error(msg);
+
+            return null;
+        }
+
+        if (ptr <= 0) {
+            LOG.error("Failed to allocate next memory chunk: " + IgniteUtils.readableSize(chunkSize, true));
+
+            return null;
+        }
+
+        DirectMemoryRegion region = new UnsafeChunk(ptr, chunkSize);
+
+        regions.add(region);
+
+        used++;
+
+        return region;
+    }
+}
diff --git a/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/metric/IoStatisticsHolderNoOp.java b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/metric/IoStatisticsHolderNoOp.java
new file mode 100644
index 0000000..3816419
--- /dev/null
+++ b/modules/page-memory/src/main/java/org/apache/ignite/internal/pagememory/metric/IoStatisticsHolderNoOp.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.metric;
+
+/**
+ * No Operation IO statistics holder. Use in case statistics shouldn't be gathered.
+ */
+public class IoStatisticsHolderNoOp implements IoStatisticsHolder {
+    /** No-op statistics. */
+    public static final IoStatisticsHolderNoOp INSTANCE = new IoStatisticsHolderNoOp();
+
+    /**
+     * Private constructor.
+     */
+    private IoStatisticsHolderNoOp() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void trackLogicalRead(long pageAddr) {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public void trackPhysicalAndLogicalRead(long pageAddr) {
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long logicalReads() {
+        return 0;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public long physicalReads() {
+        return 0;
+    }
+}
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
index 21f33c8..6171af1 100644
--- a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/TestPageIoModule.java
@@ -45,11 +45,8 @@
     public static class TestPageIo extends PageIo {
         /**
          * Constructor.
-         *
-         * @param type Page type.
-         * @param ver  Page format version.
          */
-        protected TestPageIo() {
+        public TestPageIo() {
             super(TEST_PAGE_TYPE, TEST_PAGE_VER);
         }
 
diff --git a/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
new file mode 100644
index 0000000..3678af3
--- /dev/null
+++ b/modules/page-memory/src/test/java/org/apache/ignite/internal/pagememory/impl/PageMemoryNoLoadSelfTest.java
@@ -0,0 +1,391 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.pagememory.impl;
+
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.fail;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
+import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
+import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
+import org.apache.ignite.internal.pagememory.FullPageId;
+import org.apache.ignite.internal.pagememory.PageIdAllocator;
+import org.apache.ignite.internal.pagememory.PageMemory;
+import org.apache.ignite.internal.pagememory.TestPageIoModule.TestPageIo;
+import org.apache.ignite.internal.pagememory.io.PageIo;
+import org.apache.ignite.internal.pagememory.io.PageIoRegistry;
+import org.apache.ignite.internal.pagememory.mem.DirectMemoryProvider;
+import org.apache.ignite.internal.pagememory.mem.IgniteOutOfMemoryException;
+import org.apache.ignite.internal.pagememory.mem.unsafe.UnsafeMemoryProvider;
+import org.apache.ignite.internal.pagememory.util.PageIdUtils;
+import org.apache.ignite.internal.pagememory.util.PageUtils;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.lang.IgniteInternalCheckedException;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+/**
+ * Tests {@link PageMemoryNoStoreImpl}.
+ */
+@ExtendWith(ConfigurationExtension.class)
+public class PageMemoryNoLoadSelfTest extends BaseIgniteAbstractTest {
+    protected static final int PAGE_SIZE = 8 * 1024;
+
+    private static final int MAX_MEMORY_SIZE = 10 * 1024 * 1024;
+
+    private static final PageIo PAGE_IO = new TestPageIo();
+
+    @InjectConfiguration(
+            value = "mock.type = pagemem",
+            polymorphicExtensions = {
+                PageMemoryDataRegionConfigurationSchema.class,
+                UnsafeMemoryAllocatorConfigurationSchema.class
+            })
+    private DataRegionConfiguration dataRegionCfg;
+
+    @Test
+    public void testPageTearingInner() throws Exception {
+        PageMemory mem = memory();
+
+        mem.start();
+
+        try {
+            FullPageId fullId1 = allocatePage(mem);
+            FullPageId fullId2 = allocatePage(mem);
+
+            long page1 = mem.acquirePage(fullId1.groupId(), fullId1.pageId());
+
+            try {
+                long page2 = mem.acquirePage(fullId2.groupId(), fullId2.pageId());
+
+                log.info("Allocated pages [page1Id=" + fullId1.pageId() + ", page1=" + page1
+                        + ", page2Id=" + fullId2.pageId() + ", page2=" + page2 + ']');
+
+                try {
+                    writePage(mem, fullId1, page1, 1);
+                    writePage(mem, fullId2, page2, 2);
+
+                    readPage(mem, fullId1.pageId(), page1, 1);
+                    readPage(mem, fullId2.pageId(), page2, 2);
+
+                    // Check read after read.
+                    readPage(mem, fullId1.pageId(), page1, 1);
+                    readPage(mem, fullId2.pageId(), page2, 2);
+                } finally {
+                    mem.releasePage(fullId2.groupId(), fullId2.pageId(), page2);
+                }
+            } finally {
+                mem.releasePage(fullId1.groupId(), fullId1.pageId(), page1);
+            }
+        } finally {
+            mem.stop(true);
+        }
+    }
+
+    @Test
+    public void testLoadedPagesCount() throws Exception {
+        PageMemory mem = memory();
+
+        mem.start();
+
+        int expPages = MAX_MEMORY_SIZE / mem.systemPageSize();
+
+        try {
+            for (int i = 0; i < expPages * 2; i++) {
+                allocatePage(mem);
+            }
+        } catch (IgniteOutOfMemoryException e) {
+            log.error(e.getMessage(), e);
+
+            // Expected.
+            assertEquals(mem.loadedPages(), expPages);
+        } finally {
+            mem.stop(true);
+        }
+    }
+
+    @Test
+    public void testPageTearingSequential() throws Exception {
+        PageMemory mem = memory();
+
+        mem.start();
+
+        try {
+            int pagesCnt = 1024;
+
+            List<FullPageId> pages = new ArrayList<>(pagesCnt);
+
+            for (int i = 0; i < pagesCnt; i++) {
+                FullPageId fullId = allocatePage(mem);
+
+                pages.add(fullId);
+
+                long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+                try {
+                    if (i % 64 == 0) {
+                        log.info("Writing page [idx=" + i + ", pageId=" + fullId.pageId() + ", page=" + page + ']');
+                    }
+
+                    writePage(mem, fullId, page, i + 1);
+                } finally {
+                    mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+                }
+            }
+
+            for (int i = 0; i < pagesCnt; i++) {
+                FullPageId fullId = pages.get(i);
+
+                long page = mem.acquirePage(fullId.groupId(), fullId.pageId());
+
+                try {
+                    if (i % 64 == 0) {
+                        log.info("Reading page [idx=" + i + ", pageId=" + fullId.pageId() + ", page=" + page + ']');
+                    }
+
+                    readPage(mem, fullId.pageId(), page, i + 1);
+                } finally {
+                    mem.releasePage(fullId.groupId(), fullId.pageId(), page);
+                }
+            }
+        } finally {
+            mem.stop(true);
+        }
+    }
+
+    @Test
+    public void testPageHandleDeallocation() throws Exception {
+        PageMemory mem = memory();
+
+        mem.start();
+
+        try {
+            int pages = 3 * 1024 * 1024 / (8 * 1024);
+
+            Collection<FullPageId> handles = new HashSet<>();
+
+            for (int i = 0; i < pages; i++) {
+                handles.add(allocatePage(mem));
+            }
+
+            for (FullPageId fullId : handles) {
+                mem.freePage(fullId.groupId(), fullId.pageId());
+            }
+
+            for (int i = 0; i < pages; i++) {
+                assertFalse(handles.add(allocatePage(mem)));
+            }
+        } finally {
+            mem.stop(true);
+        }
+    }
+
+    @Test
+    public void testPageIdRotation() throws Exception {
+        PageMemory mem = memory();
+
+        mem.start();
+
+        try {
+            int pages = 5;
+
+            Collection<FullPageId> old = new ArrayList<>();
+            Collection<FullPageId> updated = new ArrayList<>();
+
+            for (int i = 0; i < pages; i++) {
+                old.add(allocatePage(mem));
+            }
+
+            // Check that initial pages are accessible.
+            for (FullPageId id : old) {
+                long pageApsPtr = mem.acquirePage(id.groupId(), id.pageId());
+                try {
+                    long pageAddr = mem.writeLock(id.groupId(), id.pageId(), pageApsPtr);
+
+                    assertNotNull(pageAddr);
+
+                    try {
+                        PAGE_IO.initNewPage(pageAddr, id.pageId(), mem.realPageSize(id.groupId()));
+
+                        long updId = PageIdUtils.rotatePageId(id.pageId());
+
+                        PageIo.setPageId(pageAddr, updId);
+
+                        updated.add(new FullPageId(updId, id.groupId()));
+                    } finally {
+                        mem.writeUnlock(id.groupId(), id.pageId(), pageApsPtr, true);
+                    }
+                } finally {
+                    mem.releasePage(id.groupId(), id.pageId(), pageApsPtr);
+                }
+            }
+
+            // Check that updated pages are inaccessible using old IDs.
+            for (FullPageId id : old) {
+                long pageApsPtr = mem.acquirePage(id.groupId(), id.pageId());
+                try {
+                    long pageAddr = mem.writeLock(id.groupId(), id.pageId(), pageApsPtr);
+
+                    if (pageAddr != 0L) {
+                        mem.writeUnlock(id.groupId(), id.pageId(), pageApsPtr, false);
+
+                        fail("Was able to acquire page write lock.");
+                    }
+
+                    mem.readLock(id.groupId(), id.pageId(), pageApsPtr);
+
+                    if (pageAddr != 0) {
+                        mem.readUnlock(id.groupId(), id.pageId(), pageApsPtr);
+
+                        fail("Was able to acquire page read lock.");
+                    }
+                } finally {
+                    mem.releasePage(id.groupId(), id.pageId(), pageApsPtr);
+                }
+            }
+
+            // Check that updated pages are accessible using new IDs.
+            for (FullPageId id : updated) {
+                long pageApsPtr = mem.acquirePage(id.groupId(), id.pageId());
+                try {
+                    long pageAddr = mem.writeLock(id.groupId(), id.pageId(), pageApsPtr);
+
+                    assertNotSame(0L, pageAddr);
+
+                    try {
+                        assertEquals(id.pageId(), PageIo.getPageId(pageAddr));
+                    } finally {
+                        mem.writeUnlock(id.groupId(), id.pageId(), pageApsPtr, false);
+                    }
+
+                    pageAddr = mem.readLock(id.groupId(), id.pageId(), pageApsPtr);
+
+                    assertNotSame(0L, pageAddr);
+
+                    try {
+                        assertEquals(id.pageId(), PageIo.getPageId(pageAddr));
+                    } finally {
+                        mem.readUnlock(id.groupId(), id.pageId(), pageApsPtr);
+                    }
+                } finally {
+                    mem.releasePage(id.groupId(), id.pageId(), pageApsPtr);
+                }
+            }
+        } finally {
+            mem.stop(true);
+        }
+    }
+
+    /**
+     * Creates new page memory instance.
+     *
+     * @return Page memory implementation.
+     * @throws Exception If failed.
+     */
+    protected PageMemory memory() throws Exception {
+        dataRegionCfg.change(cfg ->
+                cfg.convert(PageMemoryDataRegionChange.class)
+                        .changePageSize(PAGE_SIZE)
+                        .changeInitSize(MAX_MEMORY_SIZE)
+                        .changeMaxSize(MAX_MEMORY_SIZE)
+        );
+
+        DirectMemoryProvider provider = new UnsafeMemoryProvider(null);
+
+        PageIoRegistry ioRegistry = new PageIoRegistry();
+
+        ioRegistry.loadFromServiceLoader();
+
+        return new PageMemoryNoStoreImpl(
+            provider,
+            (PageMemoryDataRegionConfiguration) fixConfiguration(dataRegionCfg),
+            ioRegistry
+        );
+    }
+
+    /**
+     * Fills page with passed value.
+     *
+     * @param mem Page memory.
+     * @param fullId Page ID.
+     * @param page Page pointer.
+     * @param val Value to write.
+     */
+    private void writePage(PageMemory mem, FullPageId fullId, long page, int val) {
+        long pageAddr = mem.writeLock(-1, fullId.pageId(), page);
+
+        try {
+            PAGE_IO.initNewPage(pageAddr, fullId.pageId(), mem.realPageSize(fullId.groupId()));
+
+            for (int i = PageIo.COMMON_HEADER_END; i < PAGE_SIZE; i++) {
+                PageUtils.putByte(pageAddr, i, (byte) val);
+            }
+        } finally {
+            mem.writeUnlock(-1, fullId.pageId(), page, true);
+        }
+    }
+
+    /**
+     * Reads a page from page memory and asserts that it is full of expected values.
+     *
+     * @param mem Page memory.
+     * @param pageId Page ID.
+     * @param page Page pointer.
+     * @param expVal Expected value.
+     */
+    private void readPage(PageMemory mem, long pageId, long page, int expVal) {
+        expVal &= 0xFF;
+
+        long pageAddr = mem.readLock(-1, pageId, page);
+
+        assert pageAddr != 0;
+
+        try {
+            for (int i = PageIo.COMMON_HEADER_END; i < PAGE_SIZE; i++) {
+                int val = PageUtils.getByte(pageAddr, i) & 0xFF;
+
+                assertEquals(expVal, val, "Unexpected value at position: " + i);
+            }
+        } finally {
+            mem.readUnlock(-1, pageId, page);
+        }
+    }
+
+    /**
+     * Allocates page.
+     *
+     * @param mem Memory.
+     * @return Page.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static FullPageId allocatePage(PageIdAllocator mem) throws IgniteInternalCheckedException {
+        return new FullPageId(mem.allocatePage(-1, 1, PageIdAllocator.FLAG_DATA), -1);
+    }
+}
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
index f8c9964..0d46a52 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/SchemaConfigurationConverterTest.java
@@ -30,6 +30,7 @@
 import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -80,7 +81,10 @@
                 Map.of(TableValidator.class, Set.of(TableValidatorImpl.INSTANCE)),
                 new TestConfigurationStorage(DISTRIBUTED),
                 List.of(),
-                List.of(HashIndexConfigurationSchema.class, SortedIndexConfigurationSchema.class, PartialIndexConfigurationSchema.class)
+                List.of(
+                        HashIndexConfigurationSchema.class, SortedIndexConfigurationSchema.class, PartialIndexConfigurationSchema.class,
+                        RocksDbDataRegionConfigurationSchema.class
+                )
         );
 
         confRegistry.start();
diff --git a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
index 62fc4d3..07522d5 100644
--- a/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
+++ b/modules/schema/src/test/java/org/apache/ignite/internal/schema/configuration/TableValidatorImplTest.java
@@ -34,6 +34,9 @@
 import org.apache.ignite.configuration.NamedListView;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
 import org.apache.ignite.configuration.schemas.store.DataStorageView;
+import org.apache.ignite.configuration.schemas.store.PageMemoryDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
+import org.apache.ignite.configuration.schemas.store.UnsafeMemoryAllocatorConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexChange;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
@@ -72,7 +75,9 @@
 
     /** Tests that validator finds no issues in a simple valid configuration. */
     @Test
-    public void testNoIssues(@InjectConfiguration DataStorageConfiguration dbCfg) {
+    public void testNoIssues(
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+    ) {
         ValidationContext<NamedListView<TableView>> ctx = mockContext(null, dbCfg.value());
 
         ArgumentCaptor<ValidationIssue> issuesCaptor = validate(ctx);
@@ -82,7 +87,9 @@
 
     /** Tests that the validator catches nonexistent data regions. */
     @Test
-    public void testMissingDataRegion(@InjectConfiguration DataStorageConfiguration dbCfg) throws Exception {
+    public void testMissingDataRegion(
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+    ) throws Exception {
         tablesCfg.tables().get("table").dataRegion().update("r0").get(1, TimeUnit.SECONDS);
 
         ValidationContext<NamedListView<TableView>> ctx = mockContext(null, dbCfg.value());
@@ -100,7 +107,13 @@
     /** Tests that new data region must have the same type. */
     @Test
     public void testChangeDataRegionType(
-            @InjectConfiguration("mock.regions.r0.type = foo") DataStorageConfiguration dbCfg
+            @InjectConfiguration(
+                    value = "mock.regions.r0.type = pagemem",
+                    polymorphicExtensions = {
+                            RocksDbDataRegionConfigurationSchema.class, PageMemoryDataRegionConfigurationSchema.class,
+                            UnsafeMemoryAllocatorConfigurationSchema.class
+                    })
+                    DataStorageConfiguration dbCfg
     ) throws Exception {
         NamedListView<TableView> oldValue = tablesCfg.tables().value();
 
@@ -114,7 +127,7 @@
 
         assertEquals(
                 "Unable to move table 'schema.table' from region 'default' to region 'r0' because it has"
-                        + " different type (old=rocksdb, new=foo)",
+                        + " different type (old=rocksdb, new=pagemem)",
                 issuesCaptor.getValue().message()
         );
     }
@@ -123,7 +136,9 @@
      * Tests that column names and column keys inside a Named List must be equal.
      */
     @Test
-    void testMisalignedColumnNamedListKeys(@InjectConfiguration DataStorageConfiguration dbCfg) {
+    void testMisalignedColumnNamedListKeys(
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+    ) {
         NamedListView<TableView> oldValue = tablesCfg.tables().value();
 
         TableConfiguration tableCfg = tablesCfg.tables().get("table");
@@ -153,7 +168,9 @@
      * Tests that index names and index keys inside a Named List must be equal.
      */
     @Test
-    void testMisalignedIndexNamedListKeys(@InjectConfiguration DataStorageConfiguration dbCfg) {
+    void testMisalignedIndexNamedListKeys(
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataStorageConfiguration dbCfg
+    ) {
         NamedListView<TableView> oldValue = tablesCfg.tables().value();
 
         TableConfiguration tableCfg = tablesCfg.tables().get("table");
diff --git a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java
index 8b29299..b558c62 100644
--- a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java
+++ b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/DdlWithMockedManagersTest.java
@@ -31,6 +31,7 @@
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -112,7 +113,7 @@
     private TablesConfiguration tblsCfg;
 
     /** Data storage configuration. */
-    @InjectConfiguration
+    @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class)
     private DataStorageConfiguration dataStorageCfg;
 
     TableManager tblManager;
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
index b6a1dc3..de246af 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbDataRegion.java
@@ -17,13 +17,13 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
-import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_CLOCK_CACHE;
-import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
-import static org.apache.ignite.configuration.schemas.store.DataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_CLOCK_CACHE;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_DATA_REGION_TYPE;
+import static org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema.ROCKSDB_LRU_CACHE;
 
 import java.util.Locale;
-import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
-import org.apache.ignite.configuration.schemas.store.DataRegionView;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionView;
 import org.apache.ignite.internal.storage.engine.DataRegion;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.rocksdb.Cache;
@@ -36,7 +36,7 @@
  */
 public class RocksDbDataRegion implements DataRegion {
     /** Region configuration. */
-    private final DataRegionConfiguration cfg;
+    private final RocksDbDataRegionConfiguration cfg;
 
     /** RocksDB cache instance. */
     private Cache cache;
@@ -49,7 +49,7 @@
      *
      * @param cfg Data region configuration.
      */
-    public RocksDbDataRegion(DataRegionConfiguration cfg) {
+    public RocksDbDataRegion(RocksDbDataRegionConfiguration cfg) {
         this.cfg = cfg;
 
         assert ROCKSDB_DATA_REGION_TYPE.equalsIgnoreCase(cfg.type().value());
@@ -58,7 +58,7 @@
     /** {@inheritDoc} */
     @Override
     public void start() {
-        DataRegionView dataRegionView = cfg.value();
+        RocksDbDataRegionView dataRegionView = (RocksDbDataRegionView) cfg.value();
 
         long writeBufferSize = dataRegionView.writeBufferSize();
 
diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
index 942ad53..56acd89 100644
--- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
+++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbStorageEngine.java
@@ -22,6 +22,7 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfiguration;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.internal.storage.StorageException;
 import org.apache.ignite.internal.storage.engine.DataRegion;
@@ -59,7 +60,9 @@
     /** {@inheritDoc} */
     @Override
     public DataRegion createDataRegion(DataRegionConfiguration regionCfg) {
-        return new RocksDbDataRegion(regionCfg);
+        assert regionCfg instanceof RocksDbDataRegionConfiguration : regionCfg;
+
+        return new RocksDbDataRegion((RocksDbDataRegionConfiguration) regionCfg);
     }
 
     /** {@inheritDoc} */
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java
index a91e5e6..c55784a 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbPartitionStorageTest.java
@@ -17,12 +17,15 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import java.nio.file.Path;
 import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -56,10 +59,14 @@
     @BeforeEach
     public void setUp(
             @WorkDirectory Path workDir,
-            @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataRegionConfiguration dataRegionCfg,
             @InjectConfiguration(polymorphicExtensions = HashIndexConfigurationSchema.class) TableConfiguration tableCfg
     ) throws Exception {
-        dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)).get();
+        dataRegionCfg.change(cfg ->
+                cfg.convert(RocksDbDataRegionChange.class).changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)
+        ).get();
+
+        dataRegionCfg = fixConfiguration(dataRegionCfg);
 
         dataRegion = engine.createDataRegion(dataRegionCfg);
 
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
index dea4a40..31d0be6 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbTableStorageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage.rocksdb;
 
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
 import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.instanceOf;
@@ -32,6 +33,8 @@
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
 import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
@@ -67,10 +70,12 @@
 
     @BeforeEach
     public void setUp(
-            @InjectConfiguration DataRegionConfiguration dataRegionCfg,
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataRegionConfiguration dataRegionCfg,
             @InjectConfiguration(polymorphicExtensions = HashIndexConfigurationSchema.class) TableConfiguration tableCfg
     ) throws Exception {
-        CompletableFuture<Void> changeFuture = dataRegionCfg.change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
+        CompletableFuture<Void> changeFuture = dataRegionCfg.change(cfg ->
+                cfg.convert(RocksDbDataRegionChange.class).changeSize(16 * 1024).changeWriteBufferSize(16 * 1024)
+        );
 
         assertThat(changeFuture, willBe(nullValue(Void.class)));
 
@@ -78,6 +83,8 @@
 
         assertThat(changeFuture, willBe(nullValue(Void.class)));
 
+        dataRegionCfg = fixConfiguration(dataRegionCfg);
+
         dataRegion = engine.createDataRegion(dataRegionCfg);
 
         assertThat(dataRegion, is(instanceOf(RocksDbDataRegion.class)));
diff --git a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
index 140c3df..768e189 100644
--- a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
+++ b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/index/RocksDbSortedIndexStorageTest.java
@@ -19,6 +19,7 @@
 
 import static java.util.stream.Collectors.toList;
 import static java.util.stream.Collectors.toUnmodifiableList;
+import static org.apache.ignite.internal.configuration.ConfigurationTestUtils.fixConfiguration;
 import static org.apache.ignite.internal.schema.SchemaTestUtils.generateRandomValue;
 import static org.apache.ignite.internal.schema.configuration.SchemaConfigurationConverter.convert;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.randomBytes;
@@ -47,6 +48,8 @@
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.configuration.schemas.store.DataRegionConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionChange;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.TableConfiguration;
@@ -116,7 +119,10 @@
     private final List<AutoCloseable> resources = new ArrayList<>();
 
     @BeforeEach
-    void setUp(@WorkDirectory Path workDir, @InjectConfiguration DataRegionConfiguration dataRegionCfg) {
+    void setUp(
+            @WorkDirectory Path workDir,
+            @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class) DataRegionConfiguration dataRegionCfg
+    ) {
         long seed = System.currentTimeMillis();
 
         log.info("Using random seed: " + seed);
@@ -125,6 +131,8 @@
 
         createTestConfiguration(dataRegionCfg);
 
+        dataRegionCfg = fixConfiguration(dataRegionCfg);
+
         var engine = new RocksDbStorageEngine();
 
         engine.start();
@@ -152,7 +160,7 @@
      */
     private void createTestConfiguration(DataRegionConfiguration dataRegionCfg) {
         CompletableFuture<Void> dataRegionChangeFuture = dataRegionCfg
-                .change(cfg -> cfg.changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
+                .change(cfg -> cfg.convert(RocksDbDataRegionChange.class).changeSize(16 * 1024).changeWriteBufferSize(16 * 1024));
 
         assertThat(dataRegionChangeFuture, willBe(nullValue(Void.class)));
 
diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
index 095c25a..96c8e54 100644
--- a/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
+++ b/modules/table/src/test/java/org/apache/ignite/internal/table/TableManagerTest.java
@@ -43,6 +43,7 @@
 import java.util.concurrent.Phaser;
 import java.util.function.Consumer;
 import org.apache.ignite.configuration.schemas.store.DataStorageConfiguration;
+import org.apache.ignite.configuration.schemas.store.RocksDbDataRegionConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.HashIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.PartialIndexConfigurationSchema;
 import org.apache.ignite.configuration.schemas.table.SortedIndexConfigurationSchema;
@@ -140,7 +141,7 @@
     private TablesConfiguration tblsCfg;
 
     /** Data storage configuration. */
-    @InjectConfiguration
+    @InjectConfiguration(polymorphicExtensions = RocksDbDataRegionConfigurationSchema.class)
     private DataStorageConfiguration dataStorageCfg;
 
     /** Test node. */