opt2
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java
index e08fad6..5276acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java
@@ -43,6 +43,8 @@
      */
     public ByteBuffer getForRead();
 
+    public long getForReadAddr();
+
     /**
      * Releases reserved page. Released page can be evicted from RAM after flushing modifications to disk.
      */
@@ -84,4 +86,6 @@
      * Release page.
      */
     @Override public void close();
+
+    public long address();
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
index 53b37f6..61414ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
@@ -37,6 +37,10 @@
      */
     public Page page(int cacheId, long pageId) throws IgniteCheckedException;
 
+    public Page readPage(int cacheId, long pageId) throws IgniteCheckedException;
+
+    public long pageAddr(int cacheId, long pageId) throws IgniteCheckedException;
+
     /**
      * @see #page(int, long)
      * Will not read page from file if it is not present in memory.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index 3106866..d5777de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -273,6 +273,20 @@
         return seg.acquirePage(cacheId, pageId, false);
     }
 
+    @Override
+    public Page readPage(int cacheId, long pageId) throws IgniteCheckedException {
+        Segment seg = segment(pageId);
+
+        return seg.acquireReadPage(cacheId, pageId, false);
+    }
+
+    @Override
+    public long pageAddr(int cacheId, long pageId) throws IgniteCheckedException {
+        Segment seg = segment(pageId);
+
+        return seg.address(cacheId, pageId);
+    }
+
     /** {@inheritDoc} */
     @Override public Page page(int cacheId, long pageId, boolean restore) throws IgniteCheckedException {
         Segment seg = segment(pageId);
@@ -534,6 +548,10 @@
             GridUnsafe.putLong(lastAllocatedIdxPtr, 0);
         }
 
+        long address(int cacheId, long pageId) {
+            return absolute(pageId) + PageMemoryNoStoreImpl.PAGE_OVERHEAD;
+        }
+
         /**
          * @param pageId Page ID to pin.
          * @return Pinned page impl.
@@ -542,27 +560,53 @@
         private PageNoStoreImpl acquirePage(int cacheId, long pageId, boolean restore) {
             long absPtr = absolute(pageId);
 
-            long marker = GridUnsafe.getLong(absPtr);
+//            long marker = GridUnsafe.getLong(absPtr);
+//
+//            if (marker != PAGE_MARKER)
+//                throw new IllegalStateException("Page was not allocated [absPtr=" + U.hexLong(absPtr) +
+//                    ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) +
+//                    ", marker=" + U.hexLong(marker) + ']');
+//
+//            while (true) {
+//                long pinCnt = GridUnsafe.getLong(absPtr + PIN_CNT_OFFSET);
+//
+//                if (pinCnt < 0)
+//                    throw new IllegalStateException("Page has been deallocated [absPtr=" + U.hexLong(absPtr) +
+//                        ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) + ", pinCnt=" + pinCnt + ']');
+//
+//                if (GridUnsafe.compareAndSwapLong(null, absPtr + PIN_CNT_OFFSET, pinCnt, pinCnt + 1))
+//                    break;
+//            }
+//
+//            acquiredPages.incrementAndGet();
 
-            if (marker != PAGE_MARKER)
-                throw new IllegalStateException("Page was not allocated [absPtr=" + U.hexLong(absPtr) +
-                    ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) +
-                    ", marker=" + U.hexLong(marker) + ']');
+            return new PageNoStoreImpl(PageMemoryNoStoreImpl.this, idx, absPtr, cacheId, pageId, restore, true);
+        }
 
-            while (true) {
-                long pinCnt = GridUnsafe.getLong(absPtr + PIN_CNT_OFFSET);
+        private PageNoStoreImpl acquireReadPage(int cacheId, long pageId, boolean restore) {
+            long absPtr = absolute(pageId);
 
-                if (pinCnt < 0)
-                    throw new IllegalStateException("Page has been deallocated [absPtr=" + U.hexLong(absPtr) +
-                        ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) + ", pinCnt=" + pinCnt + ']');
+//            long marker = GridUnsafe.getLong(absPtr);
+//
+//            if (marker != PAGE_MARKER)
+//                throw new IllegalStateException("Page was not allocated [absPtr=" + U.hexLong(absPtr) +
+//                    ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) +
+//                    ", marker=" + U.hexLong(marker) + ']');
+//
+//            while (true) {
+//                long pinCnt = GridUnsafe.getLong(absPtr + PIN_CNT_OFFSET);
+//
+//                if (pinCnt < 0)
+//                    throw new IllegalStateException("Page has been deallocated [absPtr=" + U.hexLong(absPtr) +
+//                        ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) + ", pinCnt=" + pinCnt + ']');
+//
+//                if (GridUnsafe.compareAndSwapLong(null, absPtr + PIN_CNT_OFFSET, pinCnt, pinCnt + 1))
+//                    break;
+//            }
+//
+//            acquiredPages.incrementAndGet();
 
-                if (GridUnsafe.compareAndSwapLong(null, absPtr + PIN_CNT_OFFSET, pinCnt, pinCnt + 1))
-                    break;
-            }
-
-            acquiredPages.incrementAndGet();
-
-            return new PageNoStoreImpl(PageMemoryNoStoreImpl.this, idx, absPtr, cacheId, pageId, restore);
+            return new PageNoStoreImpl(PageMemoryNoStoreImpl.this, idx, absPtr, cacheId, pageId, restore, false);
         }
 
         /**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java
index 404c0b2..5fe15a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java
@@ -57,7 +57,7 @@
      * @param absPtr Absolute pointer.
      */
     public PageNoStoreImpl(
-        PageMemoryNoStoreImpl pageMem, int segIdx, long absPtr, int cacheId, long pageId, boolean noTagCheck
+        PageMemoryNoStoreImpl pageMem, int segIdx, long absPtr, int cacheId, long pageId, boolean noTagCheck, boolean needBuf
     ) {
         this.pageMem = pageMem;
         this.segIdx = segIdx;
@@ -67,7 +67,14 @@
         this.pageId = pageId;
         this.noTagCheck = noTagCheck;
 
-        buf = pageMem.wrapPointer(absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD, pageMem.pageSize());
+        if (needBuf)
+            buf = pageMem.wrapPointer(absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD, pageMem.pageSize());
+        else
+            buf = null;
+    }
+
+    @Override public long address() {
+        return absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD;
     }
 
     /** {@inheritDoc} */
@@ -88,6 +95,13 @@
         return null;
     }
 
+    @Override public long getForReadAddr() {
+        if (pageMem.readLockPage(absPtr, PageIdUtils.tag(pageId)))
+            return absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD;
+
+        return -1L;
+    }
+
     /** {@inheritDoc} */
     @Override public void releaseRead() {
         pageMem.readUnlockPage(absPtr);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
index 6c925ad..adf4e2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
@@ -38,22 +38,24 @@
         touch(txEntry.cached(), null);
     }
 
+    private static final GridCacheVersion VER = new GridCacheVersion();
+
     /** {@inheritDoc} */
     @Override public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) {
         if (e.detached())
             return;
 
         try {
-            if (e.markObsoleteIfEmpty(null) || e.obsolete()) {
-                e.context().cache().removeEntry(e);
+//            if (e.markObsoleteIfEmpty(null) || e.obsolete()) {
+//                e.context().cache().removeEntry(e);
+//
+//                return;
+//            }
 
-                return;
-            }
+            boolean evicted = e.evictInternal(VER, null);
 
-            boolean evicted = e.evictInternal(cctx.versions().next(), null);
-
-            if (evicted)
-                cctx.cache().removeEntry(e);
+//            if (evicted)
+//                cctx.cache().removeEntry(e);
         }
         catch (IgniteCheckedException ex) {
             U.error(log, "Failed to evict entry from cache: " + e, ex);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ea57c34..db5e58a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -54,6 +54,7 @@
 import org.apache.ignite.internal.util.GridAtomicLong;
 import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
 import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.lang.GridCloseableIterator;
 import org.apache.ignite.internal.util.lang.GridCursor;
 import org.apache.ignite.internal.util.lang.GridIterator;
@@ -1133,6 +1134,28 @@
             return CacheDataRowAdapter.compare(row.key().valueBytes(cctx.cacheObjectContext()), cctx, link);
         }
 
+        @Override
+        protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long buf, int idx) throws IgniteCheckedException {
+            int hash = ((RowLinkIO)io).getHash(buf, idx);
+            long link = ((RowLinkIO)io).getLink(buf, idx);
+
+            return rowStore.dataRow(hash, link);
+        }
+
+        @Override
+        protected int compare(BPlusIO<CacheSearchRow> io, long buf, int idx, CacheSearchRow row) throws IgniteCheckedException {
+            int hash = ((RowLinkIO)io).getHash(buf, idx);
+
+            int cmp = Integer.compare(hash, row.hash());
+
+            if (cmp != 0)
+                return cmp;
+
+            long link = ((RowLinkIO)io).getLink(buf, idx);
+
+            return CacheDataRowAdapter.compare(row.key().valueBytes(cctx.cacheObjectContext()), cctx, link);
+        }
+
         /** {@inheritDoc} */
         @Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, ByteBuffer buf, int idx)
             throws IgniteCheckedException {
@@ -1196,12 +1219,16 @@
          */
         public long getLink(ByteBuffer buf, int idx);
 
+        public long getLink(long buf, int idx);
+
         /**
          * @param buf Buffer.
          * @param idx Index.
          * @return Key hash code.
          */
         public int getHash(ByteBuffer buf, int idx);
+
+        public int getHash(long buf, int idx);
     }
 
     /**
@@ -1251,10 +1278,21 @@
             return buf.getLong(offset(idx));
         }
 
+        @Override
+        public long getLink(long buf, int idx) {
+            assert idx < getCount(buf) : idx;
+
+            return GridUnsafe.getLong(buf, offset(idx));
+        }
+
         /** {@inheritDoc} */
         @Override public int getHash(ByteBuffer buf, int idx) {
             return buf.getInt(offset(idx) + 8);
         }
+
+        @Override public int getHash(long buf, int idx) {
+            return GridUnsafe.getInt(buf, offset(idx) + 8);
+        }
     }
 
     /**
@@ -1305,6 +1343,16 @@
         @Override public int getHash(ByteBuffer buf, int idx) {
             return buf.getInt(offset(idx) + 8);
         }
+
+        @Override public long getLink(long buf, int idx) {
+            assert idx < getCount(buf) : idx;
+
+            return GridUnsafe.getLong(buf, offset(idx));
+        }
+
+        @Override public int getHash(long buf, int idx) {
+            return GridUnsafe.getInt(buf, offset(idx) + 8);
+        }
     }
 
     /**
@@ -1415,6 +1463,16 @@
             return Long.compare(link, row.link);
         }
 
+        @Override
+        protected int compare(BPlusIO<PendingRow> io, long buf, int idx, PendingRow row) throws IgniteCheckedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        protected PendingRow getRow(BPlusIO<PendingRow> io, long buf, int idx) throws IgniteCheckedException {
+            throw new UnsupportedOperationException();
+        }
+
         /** {@inheritDoc} */
         @Override protected PendingRow getRow(BPlusIO<PendingRow> io, ByteBuffer buf, int idx)
             throws IgniteCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 97c0cf7..89636cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -30,10 +30,12 @@
 import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
 import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.tostring.GridToStringInclude;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
+import sun.misc.Unsafe;
 
 import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
 import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
@@ -74,6 +76,40 @@
         boolean first = true;
 
         do {
+            long buf = cctx.shared().database().pageMemory().pageAddr(cctx.cacheId(), pageId(nextLink));
+
+            DataPageIO io = DataPageIO.VERSION1;
+
+            long addr = io.getPositionOnPayload(buf, itemId(nextLink));
+
+            addr = addr + buf;
+
+            int len = GridUnsafe.UNSAFE.getInt(addr);
+
+            //byte type = buf.get();
+
+            int size = Math.min(bytes.length, len);
+
+            addr += 5;
+
+            for (int i = 0; i < size; i++) {
+                byte b1 = GridUnsafe.UNSAFE.getByte(addr++);
+                byte b2 = bytes[i];
+
+                if (b1 != b2)
+                    return b1 > b2 ? 1 : -1;
+            }
+
+            return Integer.compare(len, bytes.length);
+        }
+        while(nextLink != 0);
+    }
+
+    public static int compare0(byte[] bytes, GridCacheContext<?, ?> cctx, long link) throws IgniteCheckedException {
+        long nextLink = link;
+        boolean first = true;
+
+        do {
             try (Page page = page(pageId(nextLink), cctx)) {
                 ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled.
 
@@ -134,6 +170,35 @@
         IncompleteObject<?> incomplete = null;
         boolean first = true;
 
+        long buf = cctx.shared().database().pageMemory().pageAddr(cctx.cacheId(), pageId(nextLink));
+
+        DataPageIO io = DataPageIO.VERSION1;
+
+        long addr = io.getPositionOnPayload(buf, itemId(nextLink));
+
+        addr = addr + buf;
+
+        readFullRow(coctx, addr, keyOnly);
+    }
+
+    /**
+     * Read row from data pages.
+     *
+     * @param cctx Cache context.
+     * @param keyOnly {@code true} If need to read only key object.
+     * @throws IgniteCheckedException If failed.
+     */
+    public final void initFromLink0(GridCacheContext<?, ?> cctx, boolean keyOnly) throws IgniteCheckedException {
+        assert cctx != null : "cctx";
+        assert link != 0 : "link";
+        assert key == null : "key";
+
+        final CacheObjectContext coctx = cctx.cacheObjectContext();
+
+        long nextLink = link;
+        IncompleteObject<?> incomplete = null;
+        boolean first = true;
+
         do {
             try (Page page = page(pageId(nextLink), cctx)) {
                 ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled.
@@ -247,6 +312,37 @@
         //assert isReady(): "ready";
     }
 
+    private void readFullRow(CacheObjectContext coctx, long buf, boolean keyOnly) throws IgniteCheckedException {
+//        key = coctx.processor().toKeyCacheObject(coctx, buf);
+//
+//        if (keyOnly) {
+//            assert key != null: "key";
+//
+//            return;
+//        }
+        int len = GridUnsafe.getInt(buf, 0);
+
+        // skip key type and bytes.
+        buf += len + 5;
+
+        len = GridUnsafe.getInt(buf, 0);
+        byte type = GridUnsafe.getByte(buf, 4);
+
+        byte[] data = U.copyMemory(buf + 5, len);
+
+        val = coctx.processor().toCacheObject(coctx, type, data);
+
+        buf += len + 5;
+
+        ver = CacheVersionIO.read(buf, false);
+
+        buf += 25;
+
+        expireTime = GridUnsafe.getLong(buf);
+
+        //assert isReady(): "ready";
+    }
+
     /**
      * @param coctx Cache object context.
      * @param buf Buffer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
index 5fd64b0..724196d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
@@ -129,6 +129,13 @@
         return pageMem.page(cacheId, pageId);
     }
 
+    protected final Page pageForRead(long pageId) throws IgniteCheckedException {
+        assert PageIdUtils.flag(pageId) == FLAG_IDX && PageIdUtils.partId(pageId) == INDEX_PARTITION ||
+            PageIdUtils.flag(pageId) == FLAG_DATA && PageIdUtils.partId(pageId) <= MAX_PARTITION_ID : U.hexLong(pageId);
+
+        return pageMem.readPage(cacheId, pageId);
+    }
+
     /**
      * @param page Page.
      * @return Buffer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index 26151ac..2f0258d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -211,6 +211,16 @@
             return Integer.compare(len, row.idxName.length);
         }
 
+        @Override
+        protected int compare(BPlusIO<IndexItem> io, long buf, int idx, IndexItem row) throws IgniteCheckedException {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        protected IndexItem getRow(BPlusIO<IndexItem> io, long buf, int idx) throws IgniteCheckedException {
+            throw new UnsupportedOperationException();
+        }
+
         /** {@inheritDoc} */
         @Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final ByteBuffer buf,
             final int idx) throws IgniteCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index 74887dd..834a6bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -56,6 +56,7 @@
 import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseBag;
 import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler2;
 import org.apache.ignite.internal.util.GridArrays;
 import org.apache.ignite.internal.util.GridLongList;
 import org.apache.ignite.internal.util.lang.GridCursor;
@@ -293,6 +294,82 @@
     };
 
     /** */
+    private final GetPageHandler2<Get> search2 = new GetPageHandler2<Get>() {
+        @Override public Result run0(Page page, long buf, BPlusIO<L> io, Get g, int lvl)
+            throws IgniteCheckedException {
+            // Check the triangle invariant.
+            if (io.getForward(buf) != g.fwdId)
+                return RETRY;
+
+            boolean needBackIfRouting = g.backId != 0;
+
+            g.backId = 0; // Usually we'll go left down and don't need it.
+
+            int cnt = io.getCount(buf);
+            int idx = findInsertionPoint(io, buf, 0, cnt, g.row, g.shift);
+
+            boolean found = idx >= 0;
+
+            if (found) { // Found exact match.
+                assert g.getClass() != GetCursor.class;
+
+                if (g.found(io, buf, idx, lvl))
+                    return FOUND;
+
+                // Else we need to reach leaf page, go left down.
+            }
+            else {
+                idx = fix(idx);
+
+                if (g.notFound(io, buf, idx, lvl)) // No way down, stop here.
+                    return NOT_FOUND;
+            }
+
+            assert !io.isLeaf();
+
+            // If idx == cnt then we go right down, else left down: getLeft(cnt) == getRight(cnt - 1).
+            g.pageId = inner(io).getLeft(buf, idx);
+
+            // If we see the tree in consistent state, then our right down page must be forward for our left down page,
+            // we need to setup fwdId and/or backId to be able to check this invariant on lower level.
+            if (idx < cnt) {
+                // Go left down here.
+                g.fwdId = inner(io).getRight(buf, idx);
+            }
+            else {
+                // Go right down here or it is an empty branch.
+                assert idx == cnt;
+
+                // Here child's forward is unknown to us (we either go right or it is an empty "routing" page),
+                // need to ask our forward about the child's forward (it must be leftmost child of our forward page).
+                // This is ok from the locking standpoint because we take all locks in the forward direction.
+                long fwdId = io.getForward(buf);
+
+                // Setup fwdId.
+                if (fwdId == 0)
+                    g.fwdId = 0;
+                else {
+                    // We can do askNeighbor on forward page here because we always take locks in forward direction.
+                    Result res = askNeighbor(fwdId, g, false);
+
+                    if (res != FOUND)
+                        return res; // Retry.
+                }
+
+                // Setup backId.
+                if (cnt != 0) // It is not a routing page and we are going to the right, can get backId here.
+                    g.backId = inner(io).getLeft(buf, cnt - 1);
+                else if (needBackIfRouting) {
+                    // Can't get backId here because of possible deadlock and it is only needed for remove operation.
+                    return GO_DOWN_X;
+                }
+            }
+
+            return GO_DOWN;
+        }
+    };
+
+    /** */
     private final GetPageHandler<Put> replace = new GetPageHandler<Put>() {
         @Override public Result run0(Page page, ByteBuffer buf, BPlusIO<L> io, Put p, int lvl)
             throws IgniteCheckedException {
@@ -817,7 +894,7 @@
      */
     private Result findDown(final Get g, final long pageId, final long fwdId, final int lvl)
         throws IgniteCheckedException {
-        Page page = page(pageId);
+        Page page = pageForRead(pageId);
 
         try {
             for (;;) {
@@ -825,7 +902,7 @@
                 g.pageId = pageId;
                 g.fwdId = fwdId;
 
-                Result res = readPage(page, this, search, g, lvl, RETRY);
+                Result res = PageHandler2.readPage(page, this, search2, g, lvl, RETRY);
 
                 switch (res) {
                     case GO_DOWN:
@@ -1915,20 +1992,26 @@
 
             Page meta = metaPage();
 
-            ByteBuffer buf = readLock(meta); // Meta can't be removed.
+//            ByteBuffer buf = readLock(meta); // Meta can't be removed.
+//
+//            assert buf != null : "Failed to read lock meta page [page=" + meta + ", metaPageId=" +
+//                U.hexLong(meta.id()) + ']';
+//
+//            try {
+//                BPlusMetaIO io = BPlusMetaIO.VERSIONS.forPage(buf);
+//
+//                rootLvl = io.getRootLevel(buf);
+//                rootId = io.getFirstPageId(buf, rootLvl);
+//            }
+//            finally {
+//                readUnlock(meta, buf);
+//            }
+            BPlusMetaIO io = BPlusMetaIO.VERSION1;
 
-            assert buf != null : "Failed to read lock meta page [page=" + meta + ", metaPageId=" +
-                U.hexLong(meta.id()) + ']';
+            long buf = meta.address();
 
-            try {
-                BPlusMetaIO io = BPlusMetaIO.VERSIONS.forPage(buf);
-
-                rootLvl = io.getRootLevel(buf);
-                rootId = io.getFirstPageId(buf, rootLvl);
-            }
-            finally {
-                readUnlock(meta, buf);
-            }
+            rootLvl = io.getRootLevel(buf);
+            rootId = io.getFirstPageId(buf, rootLvl);
 
             restartFromRoot(rootId, rootLvl, globalRmvId.get());
         }
@@ -1958,6 +2041,12 @@
             return lvl == 0; // Stop if we are at the bottom.
         }
 
+        boolean found(BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException {
+            assert lvl >= 0;
+
+            return lvl == 0; // Stop if we are at the bottom.
+        }
+
         /**
          * @param io IO.
          * @param buf Buffer.
@@ -1972,6 +2061,12 @@
             return lvl == 0; // Stop if we are at the bottom.
         }
 
+        boolean notFound(BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException {
+            assert lvl >= 0;
+
+            return lvl == 0; // Stop if we are at the bottom.
+        }
+
         /**
          * @param page Page.
          * @param lvl Level.
@@ -2003,6 +2098,16 @@
 
             return true;
         }
+
+        @Override boolean found(BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException {
+            // Check if we are on an inner page and can't get row from it.
+            if (lvl != 0 && !canGetRowFromInner)
+                return false;
+
+            row = getRow(io, buf, idx);
+
+            return true;
+        }
     }
 
     /**
@@ -3346,6 +3451,32 @@
         return -(low + 1);  // Not found.
     }
 
+    private int findInsertionPoint(BPlusIO<L> io, long buf, int low, int cnt, L row, int shift)
+        throws IgniteCheckedException {
+        assert row != null;
+
+        int high = cnt - 1;
+
+        while (low <= high) {
+            int mid = (low + high) >>> 1;
+
+            int cmp = compare(io, buf, mid, row);
+
+            if (cmp == 0)
+                cmp = -shift; // We need to fix the case when search row matches multiple data rows.
+
+            //noinspection Duplicates
+            if (cmp < 0)
+                low = mid + 1;
+            else if (cmp > 0)
+                high = mid - 1;
+            else
+                return mid; // Found.
+        }
+
+        return -(low + 1);  // Not found.
+    }
+
     /**
      * @param buf Buffer.
      * @return IO.
@@ -3398,6 +3529,8 @@
      */
     protected abstract int compare(BPlusIO<L> io, ByteBuffer buf, int idx, L row) throws IgniteCheckedException;
 
+    protected abstract int compare(BPlusIO<L> io, long buf, int idx, L row) throws IgniteCheckedException;
+
     /**
      * Get the full detached row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}.
      *
@@ -3409,6 +3542,8 @@
      */
     protected abstract T getRow(BPlusIO<L> io, ByteBuffer buf, int idx) throws IgniteCheckedException;
 
+    protected abstract T getRow(BPlusIO<L> io, long buf, int idx) throws IgniteCheckedException;
+
     /**
      * Forward cursor.
      */
@@ -3715,6 +3850,42 @@
         }
     }
 
+    private abstract class GetPageHandler2<G extends Get> extends PageHandler2<G, Result> {
+        /** {@inheritDoc} */
+        @SuppressWarnings("unchecked")
+        @Override public final Result run(Page page, PageIO iox, long buf, G g, int lvl)
+            throws IgniteCheckedException {
+            assert PageIO.getPageId(buf) == page.id();
+
+            // If we've passed the check for correct page ID, we can safely cast.
+            BPlusIO<L> io = (BPlusIO<L>)iox;
+
+            // In case of intersection with inner replace in remove operation
+            // we need to restart our operation from the tree root.
+            if (lvl == 0 && g.rmvId < io.getRemoveId(buf))
+                return RETRY_ROOT;
+
+            return run0(page, buf, io, g, lvl);
+        }
+
+        /**
+         * @param page Page.
+         * @param buf Buffer.
+         * @param io IO.
+         * @param g Operation.
+         * @param lvl Level.
+         * @return Result code.
+         * @throws IgniteCheckedException If failed.
+         */
+        protected abstract Result run0(Page page, long buf, BPlusIO<L> io, G g, int lvl)
+            throws IgniteCheckedException;
+
+        /** {@inheritDoc} */
+        @Override public final boolean releaseAfterWrite(Page page, G g, int lvl) {
+            return g.canRelease(page, lvl);
+        }
+    }
+
     /**
      * Reuse bag for destroy.
      */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
index 5fc3d25..86c80cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * Abstract IO routines for B+Tree pages.
@@ -88,6 +89,10 @@
         return buf.getLong(FORWARD_OFF);
     }
 
+    public final long getForward(long buf) {
+        return GridUnsafe.getLong(buf, FORWARD_OFF);
+    }
+
     /**
      * @param buf Buffer.
      * @param pageId Forward page ID.
@@ -106,6 +111,10 @@
         return buf.getLong(REMOVE_ID_OFF);
     }
 
+    public final long getRemoveId(long buf) {
+        return GridUnsafe.getLong(buf, REMOVE_ID_OFF);
+    }
+
     /**
      * @param buf Buffer.
      * @param rmvId Remove ID.
@@ -128,6 +137,14 @@
         return cnt;
     }
 
+    public final int getCount(long buf) {
+        int cnt = GridUnsafe.getShort(buf, CNT_OFF) & 0xFFFF;
+
+        assert cnt >= 0: cnt;
+
+        return cnt;
+    }
+
     /**
      * @param buf Buffer.
      * @param cnt Count.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
index 90b0f37..92909ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * Abstract IO routines for B+Tree inner pages.
@@ -61,6 +62,10 @@
         return buf.getLong(offset(idx, SHIFT_LEFT));
     }
 
+    public final long getLeft(long buf, int idx) {
+        return GridUnsafe.getLong(buf, offset(idx, SHIFT_LEFT));
+    }
+
     /**
      * @param buf Buffer.
      * @param idx Index.
@@ -81,6 +86,10 @@
         return buf.getLong(offset(idx, SHIFT_RIGHT));
     }
 
+    public final long getRight(long buf, int idx) {
+        return GridUnsafe.getLong(buf, offset(idx, SHIFT_RIGHT));
+    }
+
     /**
      * @param buf Buffer.
      * @param idx Index.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
index 15a49ef..ec3a3dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
@@ -18,14 +18,17 @@
 package org.apache.ignite.internal.processors.cache.database.tree.io;
 
 import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * IO routines for B+Tree meta pages.
  */
 public class BPlusMetaIO extends PageIO {
+    public static final BPlusMetaIO VERSION1 = new BPlusMetaIO(1);
+
     /** */
     public static final IOVersions<BPlusMetaIO> VERSIONS = new IOVersions<>(
-        new BPlusMetaIO(1)
+        VERSION1
     );
 
     /** */
@@ -58,6 +61,10 @@
         return buf.get(LVLS_OFF);
     }
 
+    public int getLevelsCount(long buf) {
+        return GridUnsafe.getByte(buf, LVLS_OFF);
+    }
+
     /**
      * @param buf Buffer.
      * @return Max levels possible for this page size.
@@ -95,6 +102,10 @@
         return buf.getLong(offset(lvl));
     }
 
+    public long getFirstPageId(long buf, int lvl) {
+        return GridUnsafe.getLong(buf, offset(lvl));
+    }
+
     /**
      * @param buf    Buffer.
      * @param lvl    Level.
@@ -120,6 +131,14 @@
         return lvls - 1;
     }
 
+    public int getRootLevel(long buf) {
+        int lvls = getLevelsCount(buf); // The highest level page is root.
+
+        assert lvls > 0 : lvls;
+
+        return lvls - 1;
+    }
+
     /**
      * @param buf Buffer.
      * @param rootPageId New root page ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
index 5e9fd6d..2d57ec4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
@@ -20,6 +20,7 @@
 import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * Utility to read and write {@link GridCacheVersion} instances.
@@ -139,4 +140,18 @@
 
         return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
     }
+
+    public static GridCacheVersion read(long buf, boolean allowNull) throws IgniteCheckedException {
+        byte protoVer = checkProtocolVersion(GridUnsafe.getByte(buf, 0), allowNull);
+
+        if (protoVer == NULL_PROTO_VER)
+            return null;
+
+        int topVer = GridUnsafe.getInt(buf, 1);
+        int nodeOrderDrId = GridUnsafe.getInt(buf, 5);
+        long globalTime = GridUnsafe.getInt(buf, 9);
+        long order = GridUnsafe.getInt(buf, 17);
+
+        return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
+    }
 }
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
index a69caab..e78277f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
@@ -27,15 +27,18 @@
 import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
 import org.apache.ignite.internal.util.typedef.internal.SB;
+import sun.misc.Unsafe;
 
 /**
  * Data pages IO.
  */
 public class DataPageIO extends PageIO {
+    public static final DataPageIO VERSION1 = new DataPageIO(1);
     /** */
     public static final IOVersions<DataPageIO> VERSIONS = new IOVersions<>(
-        new DataPageIO(1)
+        VERSION1
     );
 
     /** */
@@ -245,6 +248,10 @@
         return buf.get(DIRECT_CNT_OFF) & 0xFF;
     }
 
+    private int getDirectCount(long buf) {
+        return GridUnsafe.UNSAFE.getByte(buf + DIRECT_CNT_OFF) & 0xFF;
+    }
+
     /**
      * @param buf Buffer.
      * @param cnt Indirect count.
@@ -279,6 +286,10 @@
         return buf.get(INDIRECT_CNT_OFF) & 0xFF;
     }
 
+    private int getIndirectCount(long buf) {
+        return GridUnsafe.UNSAFE.getByte(buf + INDIRECT_CNT_OFF) & 0xFF;
+    }
+
     /**
      * @param buf Buffer.
      * @return Number of free entry slots.
@@ -314,6 +325,26 @@
         throw new IllegalStateException("Item not found: " + itemId);
     }
 
+    private int findIndirectItemIndex(long buf, int itemId, int directCnt, int indirectCnt) {
+        int low = directCnt;
+        int high = directCnt + indirectCnt - 1;
+
+        while (low <= high) {
+            int mid = (low + high) >>> 1;
+
+            int cmp = Integer.compare(itemId(getItem(buf, mid)), itemId);
+
+            if (cmp < 0)
+                low = mid + 1;
+            else if (cmp > 0)
+                high = mid - 1;
+            else
+                return mid; // found
+        }
+
+        throw new IllegalStateException("Item not found: " + itemId);
+    }
+
     /**
      * @param buf Buffer.
      * @return String representation.
@@ -390,6 +421,27 @@
         return b.toString();
     }
 
+    private int getDataOffset(long buf, int itemId) {
+        assert checkIndex(itemId): itemId;
+
+        int directCnt = getDirectCount(buf);
+
+        if (itemId >= directCnt) { // Need to do indirect lookup.
+            int indirectCnt = getIndirectCount(buf);
+
+            int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt);
+
+            assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
+            assert indirectItemIdx < directCnt + indirectCnt: indirectItemIdx + " " + directCnt + " " + indirectCnt;
+
+            itemId = directItemIndex(getItem(buf, indirectItemIdx));
+
+            assert itemId >= 0 && itemId < directCnt: itemId + " " + directCnt + " " + indirectCnt; // Direct item.
+        }
+
+        return directItemToOffset(getItem(buf, itemId));
+    }
+
     /**
      * @param buf Buffer.
      * @param itemId Fixed item ID (the index used for referencing an entry from the outside).
@@ -463,6 +515,19 @@
         return nextLink;
     }
 
+    public long getPositionOnPayload(long buf, final int itemId) {
+        int dataOff = getDataOffset(buf, itemId);
+
+        boolean fragmented = false;
+        //long nextLink = 0;//fragmented ? getNextFragmentLink(buf, dataOff) : 0;
+
+//        int payloadSize = getPageEntrySize(buf, dataOff, 0);
+//        buf.position(dataOff + PAYLOAD_LEN_SIZE + (fragmented ? LINK_SIZE : 0));
+//        buf.limit(buf.position() + payloadSize);
+
+        return dataOff + PAYLOAD_LEN_SIZE;
+    }
+
     /**
      * @param buf Buffer.
      * @param idx Item index.
@@ -472,6 +537,10 @@
         return buf.getShort(itemOffset(idx));
     }
 
+    private short getItem(long buf, int idx) {
+        return GridUnsafe.UNSAFE.getShort(buf + itemOffset(idx));
+    }
+
     /**
      * @param buf Buffer.
      * @param idx Item index.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index aa2d368..38f1520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -27,6 +27,7 @@
 import org.apache.ignite.internal.processors.cache.database.freelist.io.PagesListNodeIO;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
 import org.apache.ignite.internal.processors.cache.database.tree.util.PageLockListener;
+import org.apache.ignite.internal.util.GridUnsafe;
 
 /**
  * Base format for all the page types.
@@ -170,6 +171,10 @@
         return buf.getShort(TYPE_OFF) & 0xFFFF;
     }
 
+    public static int getType(long buf) {
+        return GridUnsafe.UNSAFE.getShort(buf + TYPE_OFF) & 0xFFFF;
+    }
+
     /**
      * @param buf Buffer.
      * @param type Type.
@@ -206,6 +211,10 @@
         return buf.getLong(PAGE_ID_OFF);
     }
 
+    public static long getPageId(long buf) {
+        return GridUnsafe.getLong(buf, PAGE_ID_OFF);
+    }
+
     /**
      * @param buf Buffer.
      * @param pageId Page ID.
@@ -303,6 +312,12 @@
         return getPageIO(type, ver);
     }
 
+    public static <Q extends PageIO> Q getPageIO(long buf) throws IgniteCheckedException {
+        int type = getType(buf);
+
+        return getPageIO(type, 1);
+    }
+
     /**
      * @param type IO Type.
      * @param ver IO Version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler2.java
new file mode 100644
index 0000000..b23e4ef
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler2.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.database.tree.util;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.Page;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
+import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+import sun.nio.ch.DirectBuffer;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+
+/**
+ * Page handler.
+ */
+public abstract class PageHandler2<X, R> {
+    /** */
+    private static final PageHandler2<Void, Boolean> NOOP = new PageHandler2<Void, Boolean>() {
+        @Override public Boolean run(Page page, PageIO io, long buf, Void arg, int intArg)
+            throws IgniteCheckedException {
+            return TRUE;
+        }
+    };
+
+    /**
+     * @param page Page.
+     * @param io IO.
+     * @param buf Page buffer.
+     * @param arg Argument.
+     * @param intArg Argument of type {@code int}.
+     * @return Result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public abstract R run(Page page, PageIO io, long buf, X arg, int intArg)
+        throws IgniteCheckedException;
+
+    /**
+     * @param page Page.
+     * @param arg Argument.
+     * @param intArg Argument of type {@code int}.
+     * @return {@code true} If release.
+     */
+    public boolean releaseAfterWrite(Page page, X arg, int intArg) {
+        return true;
+    }
+
+    /**
+     * @param page Page.
+     * @param h Handler.
+     * @param arg Argument.
+     * @param intArg Argument of type {@code int}.
+     * @param lockFailed Result in case of lock failure due to page recycling.
+     * @return Handler result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static <X, R> R readPage(
+        Page page,
+        PageLockListener lockListener,
+        PageHandler2<X, R> h,
+        X arg,
+        int intArg,
+        R lockFailed
+    ) throws IgniteCheckedException {
+        long buf = readLock(page, lockListener);
+
+        if (buf == -1L)
+            return lockFailed;
+
+        try {
+            PageIO io = PageIO.getPageIO(buf);
+
+            return h.run(page, io, buf, arg, intArg);
+        }
+        finally {
+            readUnlock(page, buf, lockListener);
+        }
+    }
+
+    /**
+     * @param page Page.
+     * @param h Handler.
+     * @param arg Argument.
+     * @param intArg Argument of type {@code int}.
+     * @param lockFailed Result in case of lock failure due to page recycling.
+     * @return Handler result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static <X, R> R writePage(
+        Page page,
+        PageLockListener lockListener,
+        PageHandler2<X, R> h,
+        X arg,
+        int intArg,
+        R lockFailed
+    ) throws IgniteCheckedException {
+        return writePage(page, lockListener, h, null, null, arg, intArg, lockFailed);
+    }
+
+    /**
+     * @param page Page.
+     * @param lockListener Lock listener.
+     * @param init IO for new page initialization or {@code null} if it is an existing page.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static void initPage(
+        Page page,
+        PageLockListener lockListener,
+        PageIO init,
+        IgniteWriteAheadLogManager wal
+    ) throws IgniteCheckedException {
+        Boolean res = writePage(page, lockListener, NOOP, init, wal, null, 0, FALSE);
+
+        assert res == TRUE : res; // It must be newly allocated page, can't be recycled.
+    }
+
+    /**
+     * @param page Page.
+     * @param lockListener Lock listener.
+     * @return Byte buffer or {@code null} if failed to lock due to recycling.
+     */
+    public static long readLock(Page page, PageLockListener lockListener) {
+        long buf = page.getForReadAddr();
+
+        return buf;
+    }
+
+    /**
+     * @param page Page.
+     * @param buf Page buffer.
+     * @param lockListener Lock listener.
+     */
+    public static void readUnlock(Page page, long buf, PageLockListener lockListener) {
+        //lockListener.onReadUnlock(page, buf);
+
+        page.releaseRead();
+    }
+
+    /**
+     * @param page Page.
+     * @param lockListener Lock listener.
+     * @param tryLock Only try to lock without waiting.
+     * @return Byte buffer or {@code null} if failed to lock due to recycling.
+     */
+    public static ByteBuffer writeLock(Page page, PageLockListener lockListener, boolean tryLock) {
+        lockListener.onBeforeWriteLock(page);
+
+        ByteBuffer buf = tryLock ? page.tryGetForWrite() : page.getForWrite();
+
+        lockListener.onWriteLock(page, buf);
+
+        return buf;
+    }
+
+    /**
+     * @param page Page.
+     * @param buf Page buffer.
+     * @param lockListener Lock listener.
+     * @param dirty Page is dirty.
+     */
+    public static void writeUnlock(Page page, ByteBuffer buf, PageLockListener lockListener, boolean dirty) {
+        lockListener.onWriteUnlock(page, buf);
+
+        page.releaseWrite(dirty);
+    }
+
+    /**
+     * @param page Page.
+     * @param lockListener Lock listener.
+     * @param h Handler.
+     * @param init IO for new page initialization or {@code null} if it is an existing page.
+     * @param arg Argument.
+     * @param intArg Argument of type {@code int}.
+     * @param lockFailed Result in case of lock failure due to page recycling.
+     * @return Handler result.
+     * @throws IgniteCheckedException If failed.
+     */
+    public static <X, R> R writePage(
+        Page page,
+        PageLockListener lockListener,
+        PageHandler2<X, R> h,
+        PageIO init,
+        IgniteWriteAheadLogManager wal,
+        X arg,
+        int intArg,
+        R lockFailed
+    ) throws IgniteCheckedException {
+        ByteBuffer buf = writeLock(page, lockListener, false);
+
+        if (buf == null)
+            return lockFailed;
+
+        R res;
+
+        boolean ok = false;
+
+        try {
+            if (init != null) // It is a new page and we have to initialize it.
+                doInitPage(page, buf, init, wal);
+            else
+                init = PageIO.getPageIO(buf);
+
+            res = null;//h.run(page, init, buf, arg, intArg);
+
+            ok = true;
+        }
+        finally {
+            assert PageIO.getCrc(buf) == 0; //TODO GG-11480
+
+            if (h.releaseAfterWrite(page, arg, intArg))
+                writeUnlock(page, buf, lockListener, ok);
+        }
+
+        return res;
+    }
+
+    /**
+     * @param page Page.
+     * @param buf Buffer.
+     * @param init Initial IO.
+     * @param wal Write ahead log.
+     * @throws IgniteCheckedException If failed.
+     */
+    private static void doInitPage(
+        Page page,
+        ByteBuffer buf,
+        PageIO init,
+        IgniteWriteAheadLogManager wal
+    ) throws IgniteCheckedException {
+        assert PageIO.getCrc(buf) == 0; //TODO GG-11480
+
+        long pageId = page.id();
+
+        init.initNewPage(buf, pageId);
+
+        // Here we should never write full page, because it is known to be new.
+        page.fullPageWalRecordPolicy(FALSE);
+
+        if (isWalDeltaRecordNeeded(wal, page))
+            wal.log(new InitNewPageRecord(page.fullId().cacheId(), page.id(),
+                init.getType(), init.getVersion(), pageId));
+    }
+
+    /**
+     * @param wal Write ahead log.
+     * @param page Page.
+     * @return {@code true} If we need to make a delta WAL record for the change in this page.
+     */
+    public static boolean isWalDeltaRecordNeeded(IgniteWriteAheadLogManager wal, Page page) {
+        // If the page is clean, then it is either newly allocated or just after checkpoint.
+        // In both cases we have to write full page contents to WAL.
+        return wal != null && !wal.isAlwaysWriteFullPages() && page.fullPageWalRecordPolicy() != TRUE &&
+            (page.fullPageWalRecordPolicy() == FALSE || page.isDirty());
+    }
+
+    /**
+     * @param src Source.
+     * @param dst Destination.
+     * @param srcOff Source offset in bytes.
+     * @param dstOff Destination offset in bytes.
+     * @param cnt Bytes count to copy.
+     */
+    public static void copyMemory(ByteBuffer src, ByteBuffer dst, long srcOff, long dstOff, long cnt) {
+        byte[] srcArr = src.hasArray() ? src.array() : null;
+        byte[] dstArr = dst.hasArray() ? dst.array() : null;
+        long srcArrOff = src.hasArray() ? src.arrayOffset() + GridUnsafe.BYTE_ARR_OFF : 0;
+        long dstArrOff = dst.hasArray() ? dst.arrayOffset() + GridUnsafe.BYTE_ARR_OFF : 0;
+
+        long srcPtr = src.isDirect() ? ((DirectBuffer)src).address() : 0;
+        long dstPtr = dst.isDirect() ? ((DirectBuffer)dst).address() : 0;
+
+        GridUnsafe.copyMemory(srcArr, srcPtr + srcArrOff + srcOff, dstArr, dstPtr + dstArrOff + dstOff, cnt);
+    }
+
+    public static void zeroMemory(ByteBuffer buf, int off, int length) {
+        if (buf.isDirect())
+            GridUnsafe.setMemory(((DirectBuffer)buf).address() + off, length, (byte)0);
+
+        else {
+            for (int i = off; i < off + length; i++)
+                buf.put(i, (byte)0);
+
+        }
+    }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 1f7a53c..3e554d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -44,7 +44,7 @@
  */
 public abstract class GridUnsafe {
     /** Unsafe. */
-    private static final Unsafe UNSAFE = unsafe();
+    public static final Unsafe UNSAFE = unsafe();
 
     /** Unaligned flag. */
     private static final boolean UNALIGNED = unaligned();
@@ -86,6 +86,22 @@
         // No-op.
     }
 
+    public static long getLong(long buf, int off) {
+        return UNSAFE.getLong(buf + off);
+    }
+
+    public static int getInt(long buf, int off) {
+        return UNSAFE.getInt(buf + off);
+    }
+
+    public static byte getByte(long buf, int off) {
+        return UNSAFE.getByte(buf + off);
+    }
+
+    public static short getShort(long buf, int off) {
+        return UNSAFE.getShort(buf + off);
+    }
+
     /**
      * Gets boolean value from object field.
      *
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
index 8197bff..948d07a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -119,7 +119,7 @@
                     return false;
 
                 if (canReadLock(state)) {
-                    if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, 1, 0, 0)))
+                    if (GridUnsafe.compareAndSwapLong(null, lock, state, incLock(state)))
                         return true;
                     else
                         // Retry CAS, do not count as spin cycle.
@@ -153,11 +153,13 @@
         while (true) {
             long state = GridUnsafe.getLongVolatile(null, lock);
 
-            if (lockCount(state) <= 0)
+            int lockCnt = lockCount(state);
+
+            if (lockCnt <= 0)
                 throw new IllegalMonitorStateException("Attempted to release a read lock while not holding it " +
                     "[lock=" + U.hexLong(lock) + ", state=" + U.hexLong(state) + ']');
 
-            long updated = updateState(state, -1, 0, 0);
+            long updated = decLock(state, lockCnt);
 
             assert updated != 0;
 
@@ -582,6 +584,35 @@
         return buildState(writersWait, readersWait, tag, lock);
     }
 
+    private long incLock(long state) {
+        int lock = lockCount(state);
+        int tag = tag(state);
+        int readersWait = readersWaitCount(state);
+        int writersWait = writersWaitCount(state);
+
+        lock++;
+
+        assert readersWait >= 0 : readersWait;
+        assert writersWait >= 0 : writersWait;
+        assert lock >= -1;
+
+        return buildState(writersWait, readersWait, tag, lock);
+    }
+
+    private long decLock(long state, int lock) {
+        int tag = tag(state);
+        int readersWait = readersWaitCount(state);
+        int writersWait = writersWaitCount(state);
+
+        lock--;
+
+        assert readersWait >= 0 : readersWait;
+        assert writersWait >= 0 : writersWait;
+        assert lock >= -1;
+
+        return buildState(writersWait, readersWait, tag, lock);
+    }
+
     /**
      * @param state State to update.
      * @return Modified state.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 4bc39ea..9dbeadf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -1206,6 +1206,16 @@
             return Long.compare(n1, n2);
         }
 
+        @Override
+        protected int compare(BPlusIO<Long> io, long buf, int idx, Long row) throws IgniteCheckedException {
+            return 0;
+        }
+
+        @Override
+        protected Long getRow(BPlusIO<Long> io, long buf, int idx) throws IgniteCheckedException {
+            return null;
+        }
+
         /** {@inheritDoc} */
         @Override protected Long getRow(BPlusIO<Long> io, ByteBuffer buf, int idx) throws IgniteCheckedException {
             assert io.canGetRow() : io;