opt2
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java
index e08fad6..5276acb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/Page.java
@@ -43,6 +43,8 @@
*/
public ByteBuffer getForRead();
+ public long getForReadAddr();
+
/**
* Releases reserved page. Released page can be evicted from RAM after flushing modifications to disk.
*/
@@ -84,4 +86,6 @@
* Release page.
*/
@Override public void close();
+
+ public long address();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
index 53b37f6..61414ea 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/PageMemory.java
@@ -37,6 +37,10 @@
*/
public Page page(int cacheId, long pageId) throws IgniteCheckedException;
+ public Page readPage(int cacheId, long pageId) throws IgniteCheckedException;
+
+ public long pageAddr(int cacheId, long pageId) throws IgniteCheckedException;
+
/**
* @see #page(int, long)
* Will not read page from file if it is not present in memory.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
index 3106866..d5777de 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageMemoryNoStoreImpl.java
@@ -273,6 +273,20 @@
return seg.acquirePage(cacheId, pageId, false);
}
+ @Override
+ public Page readPage(int cacheId, long pageId) throws IgniteCheckedException {
+ Segment seg = segment(pageId);
+
+ return seg.acquireReadPage(cacheId, pageId, false);
+ }
+
+ @Override
+ public long pageAddr(int cacheId, long pageId) throws IgniteCheckedException {
+ Segment seg = segment(pageId);
+
+ return seg.address(cacheId, pageId);
+ }
+
/** {@inheritDoc} */
@Override public Page page(int cacheId, long pageId, boolean restore) throws IgniteCheckedException {
Segment seg = segment(pageId);
@@ -534,6 +548,10 @@
GridUnsafe.putLong(lastAllocatedIdxPtr, 0);
}
+ long address(int cacheId, long pageId) {
+ return absolute(pageId) + PageMemoryNoStoreImpl.PAGE_OVERHEAD;
+ }
+
/**
* @param pageId Page ID to pin.
* @return Pinned page impl.
@@ -542,27 +560,53 @@
private PageNoStoreImpl acquirePage(int cacheId, long pageId, boolean restore) {
long absPtr = absolute(pageId);
- long marker = GridUnsafe.getLong(absPtr);
+// long marker = GridUnsafe.getLong(absPtr);
+//
+// if (marker != PAGE_MARKER)
+// throw new IllegalStateException("Page was not allocated [absPtr=" + U.hexLong(absPtr) +
+// ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) +
+// ", marker=" + U.hexLong(marker) + ']');
+//
+// while (true) {
+// long pinCnt = GridUnsafe.getLong(absPtr + PIN_CNT_OFFSET);
+//
+// if (pinCnt < 0)
+// throw new IllegalStateException("Page has been deallocated [absPtr=" + U.hexLong(absPtr) +
+// ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) + ", pinCnt=" + pinCnt + ']');
+//
+// if (GridUnsafe.compareAndSwapLong(null, absPtr + PIN_CNT_OFFSET, pinCnt, pinCnt + 1))
+// break;
+// }
+//
+// acquiredPages.incrementAndGet();
- if (marker != PAGE_MARKER)
- throw new IllegalStateException("Page was not allocated [absPtr=" + U.hexLong(absPtr) +
- ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) +
- ", marker=" + U.hexLong(marker) + ']');
+ return new PageNoStoreImpl(PageMemoryNoStoreImpl.this, idx, absPtr, cacheId, pageId, restore, true);
+ }
- while (true) {
- long pinCnt = GridUnsafe.getLong(absPtr + PIN_CNT_OFFSET);
+ private PageNoStoreImpl acquireReadPage(int cacheId, long pageId, boolean restore) {
+ long absPtr = absolute(pageId);
- if (pinCnt < 0)
- throw new IllegalStateException("Page has been deallocated [absPtr=" + U.hexLong(absPtr) +
- ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) + ", pinCnt=" + pinCnt + ']');
+// long marker = GridUnsafe.getLong(absPtr);
+//
+// if (marker != PAGE_MARKER)
+// throw new IllegalStateException("Page was not allocated [absPtr=" + U.hexLong(absPtr) +
+// ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) +
+// ", marker=" + U.hexLong(marker) + ']');
+//
+// while (true) {
+// long pinCnt = GridUnsafe.getLong(absPtr + PIN_CNT_OFFSET);
+//
+// if (pinCnt < 0)
+// throw new IllegalStateException("Page has been deallocated [absPtr=" + U.hexLong(absPtr) +
+// ", cacheId=" + cacheId + ", pageId=" + U.hexLong(pageId) + ", pinCnt=" + pinCnt + ']');
+//
+// if (GridUnsafe.compareAndSwapLong(null, absPtr + PIN_CNT_OFFSET, pinCnt, pinCnt + 1))
+// break;
+// }
+//
+// acquiredPages.incrementAndGet();
- if (GridUnsafe.compareAndSwapLong(null, absPtr + PIN_CNT_OFFSET, pinCnt, pinCnt + 1))
- break;
- }
-
- acquiredPages.incrementAndGet();
-
- return new PageNoStoreImpl(PageMemoryNoStoreImpl.this, idx, absPtr, cacheId, pageId, restore);
+ return new PageNoStoreImpl(PageMemoryNoStoreImpl.this, idx, absPtr, cacheId, pageId, restore, false);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java
index 404c0b2..5fe15a0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/pagemem/impl/PageNoStoreImpl.java
@@ -57,7 +57,7 @@
* @param absPtr Absolute pointer.
*/
public PageNoStoreImpl(
- PageMemoryNoStoreImpl pageMem, int segIdx, long absPtr, int cacheId, long pageId, boolean noTagCheck
+ PageMemoryNoStoreImpl pageMem, int segIdx, long absPtr, int cacheId, long pageId, boolean noTagCheck, boolean needBuf
) {
this.pageMem = pageMem;
this.segIdx = segIdx;
@@ -67,7 +67,14 @@
this.pageId = pageId;
this.noTagCheck = noTagCheck;
- buf = pageMem.wrapPointer(absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD, pageMem.pageSize());
+ if (needBuf)
+ buf = pageMem.wrapPointer(absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD, pageMem.pageSize());
+ else
+ buf = null;
+ }
+
+ @Override public long address() {
+ return absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD;
}
/** {@inheritDoc} */
@@ -88,6 +95,13 @@
return null;
}
+ @Override public long getForReadAddr() {
+ if (pageMem.readLockPage(absPtr, PageIdUtils.tag(pageId)))
+ return absPtr + PageMemoryNoStoreImpl.PAGE_OVERHEAD;
+
+ return -1L;
+ }
+
/** {@inheritDoc} */
@Override public void releaseRead() {
pageMem.readUnlockPage(absPtr);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
index 6c925ad..adf4e2a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheOffheapEvictionManager.java
@@ -38,22 +38,24 @@
touch(txEntry.cached(), null);
}
+ private static final GridCacheVersion VER = new GridCacheVersion();
+
/** {@inheritDoc} */
@Override public void touch(GridCacheEntryEx e, AffinityTopologyVersion topVer) {
if (e.detached())
return;
try {
- if (e.markObsoleteIfEmpty(null) || e.obsolete()) {
- e.context().cache().removeEntry(e);
+// if (e.markObsoleteIfEmpty(null) || e.obsolete()) {
+// e.context().cache().removeEntry(e);
+//
+// return;
+// }
- return;
- }
+ boolean evicted = e.evictInternal(VER, null);
- boolean evicted = e.evictInternal(cctx.versions().next(), null);
-
- if (evicted)
- cctx.cache().removeEntry(e);
+// if (evicted)
+// cctx.cache().removeEntry(e);
}
catch (IgniteCheckedException ex) {
U.error(log, "Failed to evict entry from cache: " + e, ex);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
index ea57c34..db5e58a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/IgniteCacheOffheapManagerImpl.java
@@ -54,6 +54,7 @@
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridCloseableIteratorAdapter;
import org.apache.ignite.internal.util.GridEmptyCloseableIterator;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.GridCloseableIterator;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.GridIterator;
@@ -1133,6 +1134,28 @@
return CacheDataRowAdapter.compare(row.key().valueBytes(cctx.cacheObjectContext()), cctx, link);
}
+ @Override
+ protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long buf, int idx) throws IgniteCheckedException {
+ int hash = ((RowLinkIO)io).getHash(buf, idx);
+ long link = ((RowLinkIO)io).getLink(buf, idx);
+
+ return rowStore.dataRow(hash, link);
+ }
+
+ @Override
+ protected int compare(BPlusIO<CacheSearchRow> io, long buf, int idx, CacheSearchRow row) throws IgniteCheckedException {
+ int hash = ((RowLinkIO)io).getHash(buf, idx);
+
+ int cmp = Integer.compare(hash, row.hash());
+
+ if (cmp != 0)
+ return cmp;
+
+ long link = ((RowLinkIO)io).getLink(buf, idx);
+
+ return CacheDataRowAdapter.compare(row.key().valueBytes(cctx.cacheObjectContext()), cctx, link);
+ }
+
/** {@inheritDoc} */
@Override protected CacheDataRow getRow(BPlusIO<CacheSearchRow> io, ByteBuffer buf, int idx)
throws IgniteCheckedException {
@@ -1196,12 +1219,16 @@
*/
public long getLink(ByteBuffer buf, int idx);
+ public long getLink(long buf, int idx);
+
/**
* @param buf Buffer.
* @param idx Index.
* @return Key hash code.
*/
public int getHash(ByteBuffer buf, int idx);
+
+ public int getHash(long buf, int idx);
}
/**
@@ -1251,10 +1278,21 @@
return buf.getLong(offset(idx));
}
+ @Override
+ public long getLink(long buf, int idx) {
+ assert idx < getCount(buf) : idx;
+
+ return GridUnsafe.getLong(buf, offset(idx));
+ }
+
/** {@inheritDoc} */
@Override public int getHash(ByteBuffer buf, int idx) {
return buf.getInt(offset(idx) + 8);
}
+
+ @Override public int getHash(long buf, int idx) {
+ return GridUnsafe.getInt(buf, offset(idx) + 8);
+ }
}
/**
@@ -1305,6 +1343,16 @@
@Override public int getHash(ByteBuffer buf, int idx) {
return buf.getInt(offset(idx) + 8);
}
+
+ @Override public long getLink(long buf, int idx) {
+ assert idx < getCount(buf) : idx;
+
+ return GridUnsafe.getLong(buf, offset(idx));
+ }
+
+ @Override public int getHash(long buf, int idx) {
+ return GridUnsafe.getInt(buf, offset(idx) + 8);
+ }
}
/**
@@ -1415,6 +1463,16 @@
return Long.compare(link, row.link);
}
+ @Override
+ protected int compare(BPlusIO<PendingRow> io, long buf, int idx, PendingRow row) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected PendingRow getRow(BPlusIO<PendingRow> io, long buf, int idx) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override protected PendingRow getRow(BPlusIO<PendingRow> io, ByteBuffer buf, int idx)
throws IgniteCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
index 97c0cf7..89636cb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/CacheDataRowAdapter.java
@@ -30,10 +30,12 @@
import org.apache.ignite.internal.processors.cache.database.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.database.tree.io.CacheVersionIO;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
+import sun.misc.Unsafe;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
@@ -74,6 +76,40 @@
boolean first = true;
do {
+ long buf = cctx.shared().database().pageMemory().pageAddr(cctx.cacheId(), pageId(nextLink));
+
+ DataPageIO io = DataPageIO.VERSION1;
+
+ long addr = io.getPositionOnPayload(buf, itemId(nextLink));
+
+ addr = addr + buf;
+
+ int len = GridUnsafe.UNSAFE.getInt(addr);
+
+ //byte type = buf.get();
+
+ int size = Math.min(bytes.length, len);
+
+ addr += 5;
+
+ for (int i = 0; i < size; i++) {
+ byte b1 = GridUnsafe.UNSAFE.getByte(addr++);
+ byte b2 = bytes[i];
+
+ if (b1 != b2)
+ return b1 > b2 ? 1 : -1;
+ }
+
+ return Integer.compare(len, bytes.length);
+ }
+ while(nextLink != 0);
+ }
+
+ public static int compare0(byte[] bytes, GridCacheContext<?, ?> cctx, long link) throws IgniteCheckedException {
+ long nextLink = link;
+ boolean first = true;
+
+ do {
try (Page page = page(pageId(nextLink), cctx)) {
ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled.
@@ -134,6 +170,35 @@
IncompleteObject<?> incomplete = null;
boolean first = true;
+ long buf = cctx.shared().database().pageMemory().pageAddr(cctx.cacheId(), pageId(nextLink));
+
+ DataPageIO io = DataPageIO.VERSION1;
+
+ long addr = io.getPositionOnPayload(buf, itemId(nextLink));
+
+ addr = addr + buf;
+
+ readFullRow(coctx, addr, keyOnly);
+ }
+
+ /**
+ * Read row from data pages.
+ *
+ * @param cctx Cache context.
+ * @param keyOnly {@code true} If need to read only key object.
+ * @throws IgniteCheckedException If failed.
+ */
+ public final void initFromLink0(GridCacheContext<?, ?> cctx, boolean keyOnly) throws IgniteCheckedException {
+ assert cctx != null : "cctx";
+ assert link != 0 : "link";
+ assert key == null : "key";
+
+ final CacheObjectContext coctx = cctx.cacheObjectContext();
+
+ long nextLink = link;
+ IncompleteObject<?> incomplete = null;
+ boolean first = true;
+
do {
try (Page page = page(pageId(nextLink), cctx)) {
ByteBuffer buf = page.getForRead(); // Non-empty data page must not be recycled.
@@ -247,6 +312,37 @@
//assert isReady(): "ready";
}
+ private void readFullRow(CacheObjectContext coctx, long buf, boolean keyOnly) throws IgniteCheckedException {
+// key = coctx.processor().toKeyCacheObject(coctx, buf);
+//
+// if (keyOnly) {
+// assert key != null: "key";
+//
+// return;
+// }
+ int len = GridUnsafe.getInt(buf, 0);
+
+ // skip key type and bytes.
+ buf += len + 5;
+
+ len = GridUnsafe.getInt(buf, 0);
+ byte type = GridUnsafe.getByte(buf, 4);
+
+ byte[] data = U.copyMemory(buf + 5, len);
+
+ val = coctx.processor().toCacheObject(coctx, type, data);
+
+ buf += len + 5;
+
+ ver = CacheVersionIO.read(buf, false);
+
+ buf += 25;
+
+ expireTime = GridUnsafe.getLong(buf);
+
+ //assert isReady(): "ready";
+ }
+
/**
* @param coctx Cache object context.
* @param buf Buffer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
index 5fd64b0..724196d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/DataStructure.java
@@ -129,6 +129,13 @@
return pageMem.page(cacheId, pageId);
}
+ protected final Page pageForRead(long pageId) throws IgniteCheckedException {
+ assert PageIdUtils.flag(pageId) == FLAG_IDX && PageIdUtils.partId(pageId) == INDEX_PARTITION ||
+ PageIdUtils.flag(pageId) == FLAG_DATA && PageIdUtils.partId(pageId) <= MAX_PARTITION_ID : U.hexLong(pageId);
+
+ return pageMem.readPage(cacheId, pageId);
+ }
+
/**
* @param page Page.
* @return Buffer.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
index 26151ac..2f0258d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/MetadataStorage.java
@@ -211,6 +211,16 @@
return Integer.compare(len, row.idxName.length);
}
+ @Override
+ protected int compare(BPlusIO<IndexItem> io, long buf, int idx, IndexItem row) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected IndexItem getRow(BPlusIO<IndexItem> io, long buf, int idx) throws IgniteCheckedException {
+ throw new UnsupportedOperationException();
+ }
+
/** {@inheritDoc} */
@Override protected IndexItem getRow(final BPlusIO<IndexItem> io, final ByteBuffer buf,
final int idx) throws IgniteCheckedException {
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
index 74887dd..834a6bc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/BPlusTree.java
@@ -56,6 +56,7 @@
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseBag;
import org.apache.ignite.internal.processors.cache.database.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
+import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler2;
import org.apache.ignite.internal.util.GridArrays;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.lang.GridCursor;
@@ -293,6 +294,82 @@
};
/** */
+ private final GetPageHandler2<Get> search2 = new GetPageHandler2<Get>() {
+ @Override public Result run0(Page page, long buf, BPlusIO<L> io, Get g, int lvl)
+ throws IgniteCheckedException {
+ // Check the triangle invariant.
+ if (io.getForward(buf) != g.fwdId)
+ return RETRY;
+
+ boolean needBackIfRouting = g.backId != 0;
+
+ g.backId = 0; // Usually we'll go left down and don't need it.
+
+ int cnt = io.getCount(buf);
+ int idx = findInsertionPoint(io, buf, 0, cnt, g.row, g.shift);
+
+ boolean found = idx >= 0;
+
+ if (found) { // Found exact match.
+ assert g.getClass() != GetCursor.class;
+
+ if (g.found(io, buf, idx, lvl))
+ return FOUND;
+
+ // Else we need to reach leaf page, go left down.
+ }
+ else {
+ idx = fix(idx);
+
+ if (g.notFound(io, buf, idx, lvl)) // No way down, stop here.
+ return NOT_FOUND;
+ }
+
+ assert !io.isLeaf();
+
+ // If idx == cnt then we go right down, else left down: getLeft(cnt) == getRight(cnt - 1).
+ g.pageId = inner(io).getLeft(buf, idx);
+
+ // If we see the tree in consistent state, then our right down page must be forward for our left down page,
+ // we need to setup fwdId and/or backId to be able to check this invariant on lower level.
+ if (idx < cnt) {
+ // Go left down here.
+ g.fwdId = inner(io).getRight(buf, idx);
+ }
+ else {
+ // Go right down here or it is an empty branch.
+ assert idx == cnt;
+
+ // Here child's forward is unknown to us (we either go right or it is an empty "routing" page),
+ // need to ask our forward about the child's forward (it must be leftmost child of our forward page).
+ // This is ok from the locking standpoint because we take all locks in the forward direction.
+ long fwdId = io.getForward(buf);
+
+ // Setup fwdId.
+ if (fwdId == 0)
+ g.fwdId = 0;
+ else {
+ // We can do askNeighbor on forward page here because we always take locks in forward direction.
+ Result res = askNeighbor(fwdId, g, false);
+
+ if (res != FOUND)
+ return res; // Retry.
+ }
+
+ // Setup backId.
+ if (cnt != 0) // It is not a routing page and we are going to the right, can get backId here.
+ g.backId = inner(io).getLeft(buf, cnt - 1);
+ else if (needBackIfRouting) {
+ // Can't get backId here because of possible deadlock and it is only needed for remove operation.
+ return GO_DOWN_X;
+ }
+ }
+
+ return GO_DOWN;
+ }
+ };
+
+ /** */
private final GetPageHandler<Put> replace = new GetPageHandler<Put>() {
@Override public Result run0(Page page, ByteBuffer buf, BPlusIO<L> io, Put p, int lvl)
throws IgniteCheckedException {
@@ -817,7 +894,7 @@
*/
private Result findDown(final Get g, final long pageId, final long fwdId, final int lvl)
throws IgniteCheckedException {
- Page page = page(pageId);
+ Page page = pageForRead(pageId);
try {
for (;;) {
@@ -825,7 +902,7 @@
g.pageId = pageId;
g.fwdId = fwdId;
- Result res = readPage(page, this, search, g, lvl, RETRY);
+ Result res = PageHandler2.readPage(page, this, search2, g, lvl, RETRY);
switch (res) {
case GO_DOWN:
@@ -1915,20 +1992,26 @@
Page meta = metaPage();
- ByteBuffer buf = readLock(meta); // Meta can't be removed.
+// ByteBuffer buf = readLock(meta); // Meta can't be removed.
+//
+// assert buf != null : "Failed to read lock meta page [page=" + meta + ", metaPageId=" +
+// U.hexLong(meta.id()) + ']';
+//
+// try {
+// BPlusMetaIO io = BPlusMetaIO.VERSIONS.forPage(buf);
+//
+// rootLvl = io.getRootLevel(buf);
+// rootId = io.getFirstPageId(buf, rootLvl);
+// }
+// finally {
+// readUnlock(meta, buf);
+// }
+ BPlusMetaIO io = BPlusMetaIO.VERSION1;
- assert buf != null : "Failed to read lock meta page [page=" + meta + ", metaPageId=" +
- U.hexLong(meta.id()) + ']';
+ long buf = meta.address();
- try {
- BPlusMetaIO io = BPlusMetaIO.VERSIONS.forPage(buf);
-
- rootLvl = io.getRootLevel(buf);
- rootId = io.getFirstPageId(buf, rootLvl);
- }
- finally {
- readUnlock(meta, buf);
- }
+ rootLvl = io.getRootLevel(buf);
+ rootId = io.getFirstPageId(buf, rootLvl);
restartFromRoot(rootId, rootLvl, globalRmvId.get());
}
@@ -1958,6 +2041,12 @@
return lvl == 0; // Stop if we are at the bottom.
}
+ boolean found(BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException {
+ assert lvl >= 0;
+
+ return lvl == 0; // Stop if we are at the bottom.
+ }
+
/**
* @param io IO.
* @param buf Buffer.
@@ -1972,6 +2061,12 @@
return lvl == 0; // Stop if we are at the bottom.
}
+ boolean notFound(BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException {
+ assert lvl >= 0;
+
+ return lvl == 0; // Stop if we are at the bottom.
+ }
+
/**
* @param page Page.
* @param lvl Level.
@@ -2003,6 +2098,16 @@
return true;
}
+
+ @Override boolean found(BPlusIO<L> io, long buf, int idx, int lvl) throws IgniteCheckedException {
+ // Check if we are on an inner page and can't get row from it.
+ if (lvl != 0 && !canGetRowFromInner)
+ return false;
+
+ row = getRow(io, buf, idx);
+
+ return true;
+ }
}
/**
@@ -3346,6 +3451,32 @@
return -(low + 1); // Not found.
}
+ private int findInsertionPoint(BPlusIO<L> io, long buf, int low, int cnt, L row, int shift)
+ throws IgniteCheckedException {
+ assert row != null;
+
+ int high = cnt - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+
+ int cmp = compare(io, buf, mid, row);
+
+ if (cmp == 0)
+ cmp = -shift; // We need to fix the case when search row matches multiple data rows.
+
+ //noinspection Duplicates
+ if (cmp < 0)
+ low = mid + 1;
+ else if (cmp > 0)
+ high = mid - 1;
+ else
+ return mid; // Found.
+ }
+
+ return -(low + 1); // Not found.
+ }
+
/**
* @param buf Buffer.
* @return IO.
@@ -3398,6 +3529,8 @@
*/
protected abstract int compare(BPlusIO<L> io, ByteBuffer buf, int idx, L row) throws IgniteCheckedException;
+ protected abstract int compare(BPlusIO<L> io, long buf, int idx, L row) throws IgniteCheckedException;
+
/**
* Get the full detached row. Can be called on inner page only if {@link #canGetRowFromInner} is {@code true}.
*
@@ -3409,6 +3542,8 @@
*/
protected abstract T getRow(BPlusIO<L> io, ByteBuffer buf, int idx) throws IgniteCheckedException;
+ protected abstract T getRow(BPlusIO<L> io, long buf, int idx) throws IgniteCheckedException;
+
/**
* Forward cursor.
*/
@@ -3715,6 +3850,42 @@
}
}
+ private abstract class GetPageHandler2<G extends Get> extends PageHandler2<G, Result> {
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public final Result run(Page page, PageIO iox, long buf, G g, int lvl)
+ throws IgniteCheckedException {
+ assert PageIO.getPageId(buf) == page.id();
+
+ // If we've passed the check for correct page ID, we can safely cast.
+ BPlusIO<L> io = (BPlusIO<L>)iox;
+
+ // In case of intersection with inner replace in remove operation
+ // we need to restart our operation from the tree root.
+ if (lvl == 0 && g.rmvId < io.getRemoveId(buf))
+ return RETRY_ROOT;
+
+ return run0(page, buf, io, g, lvl);
+ }
+
+ /**
+ * @param page Page.
+ * @param buf Buffer.
+ * @param io IO.
+ * @param g Operation.
+ * @param lvl Level.
+ * @return Result code.
+ * @throws IgniteCheckedException If failed.
+ */
+ protected abstract Result run0(Page page, long buf, BPlusIO<L> io, G g, int lvl)
+ throws IgniteCheckedException;
+
+ /** {@inheritDoc} */
+ @Override public final boolean releaseAfterWrite(Page page, G g, int lvl) {
+ return g.canRelease(page, lvl);
+ }
+ }
+
/**
* Reuse bag for destroy.
*/
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
index 5fc3d25..86c80cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusIO.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.database.tree.BPlusTree;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* Abstract IO routines for B+Tree pages.
@@ -88,6 +89,10 @@
return buf.getLong(FORWARD_OFF);
}
+ public final long getForward(long buf) {
+ return GridUnsafe.getLong(buf, FORWARD_OFF);
+ }
+
/**
* @param buf Buffer.
* @param pageId Forward page ID.
@@ -106,6 +111,10 @@
return buf.getLong(REMOVE_ID_OFF);
}
+ public final long getRemoveId(long buf) {
+ return GridUnsafe.getLong(buf, REMOVE_ID_OFF);
+ }
+
/**
* @param buf Buffer.
* @param rmvId Remove ID.
@@ -128,6 +137,14 @@
return cnt;
}
+ public final int getCount(long buf) {
+ int cnt = GridUnsafe.getShort(buf, CNT_OFF) & 0xFFFF;
+
+ assert cnt >= 0: cnt;
+
+ return cnt;
+ }
+
/**
* @param buf Buffer.
* @param cnt Count.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
index 90b0f37..92909ca 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusInnerIO.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* Abstract IO routines for B+Tree inner pages.
@@ -61,6 +62,10 @@
return buf.getLong(offset(idx, SHIFT_LEFT));
}
+ public final long getLeft(long buf, int idx) {
+ return GridUnsafe.getLong(buf, offset(idx, SHIFT_LEFT));
+ }
+
/**
* @param buf Buffer.
* @param idx Index.
@@ -81,6 +86,10 @@
return buf.getLong(offset(idx, SHIFT_RIGHT));
}
+ public final long getRight(long buf, int idx) {
+ return GridUnsafe.getLong(buf, offset(idx, SHIFT_RIGHT));
+ }
+
/**
* @param buf Buffer.
* @param idx Index.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
index 15a49ef..ec3a3dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/BPlusMetaIO.java
@@ -18,14 +18,17 @@
package org.apache.ignite.internal.processors.cache.database.tree.io;
import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* IO routines for B+Tree meta pages.
*/
public class BPlusMetaIO extends PageIO {
+ public static final BPlusMetaIO VERSION1 = new BPlusMetaIO(1);
+
/** */
public static final IOVersions<BPlusMetaIO> VERSIONS = new IOVersions<>(
- new BPlusMetaIO(1)
+ VERSION1
);
/** */
@@ -58,6 +61,10 @@
return buf.get(LVLS_OFF);
}
+ public int getLevelsCount(long buf) {
+ return GridUnsafe.getByte(buf, LVLS_OFF);
+ }
+
/**
* @param buf Buffer.
* @return Max levels possible for this page size.
@@ -95,6 +102,10 @@
return buf.getLong(offset(lvl));
}
+ public long getFirstPageId(long buf, int lvl) {
+ return GridUnsafe.getLong(buf, offset(lvl));
+ }
+
/**
* @param buf Buffer.
* @param lvl Level.
@@ -120,6 +131,14 @@
return lvls - 1;
}
+ public int getRootLevel(long buf) {
+ int lvls = getLevelsCount(buf); // The highest level page is root.
+
+ assert lvls > 0 : lvls;
+
+ return lvls - 1;
+ }
+
/**
* @param buf Buffer.
* @param rootPageId New root page ID.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
index 5e9fd6d..2d57ec4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/CacheVersionIO.java
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* Utility to read and write {@link GridCacheVersion} instances.
@@ -139,4 +140,18 @@
return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
}
+
+ public static GridCacheVersion read(long buf, boolean allowNull) throws IgniteCheckedException {
+ byte protoVer = checkProtocolVersion(GridUnsafe.getByte(buf, 0), allowNull);
+
+ if (protoVer == NULL_PROTO_VER)
+ return null;
+
+ int topVer = GridUnsafe.getInt(buf, 1);
+ int nodeOrderDrId = GridUnsafe.getInt(buf, 5);
+ long globalTime = GridUnsafe.getInt(buf, 9);
+ long order = GridUnsafe.getInt(buf, 17);
+
+ return new GridCacheVersion(topVer, nodeOrderDrId, globalTime, order);
+ }
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
index a69caab..e78277f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/DataPageIO.java
@@ -27,15 +27,18 @@
import org.apache.ignite.internal.processors.cache.database.CacheDataRow;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
+import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.typedef.internal.SB;
+import sun.misc.Unsafe;
/**
* Data pages IO.
*/
public class DataPageIO extends PageIO {
+ public static final DataPageIO VERSION1 = new DataPageIO(1);
/** */
public static final IOVersions<DataPageIO> VERSIONS = new IOVersions<>(
- new DataPageIO(1)
+ VERSION1
);
/** */
@@ -245,6 +248,10 @@
return buf.get(DIRECT_CNT_OFF) & 0xFF;
}
+ private int getDirectCount(long buf) {
+ return GridUnsafe.UNSAFE.getByte(buf + DIRECT_CNT_OFF) & 0xFF;
+ }
+
/**
* @param buf Buffer.
* @param cnt Indirect count.
@@ -279,6 +286,10 @@
return buf.get(INDIRECT_CNT_OFF) & 0xFF;
}
+ private int getIndirectCount(long buf) {
+ return GridUnsafe.UNSAFE.getByte(buf + INDIRECT_CNT_OFF) & 0xFF;
+ }
+
/**
* @param buf Buffer.
* @return Number of free entry slots.
@@ -314,6 +325,26 @@
throw new IllegalStateException("Item not found: " + itemId);
}
+ private int findIndirectItemIndex(long buf, int itemId, int directCnt, int indirectCnt) {
+ int low = directCnt;
+ int high = directCnt + indirectCnt - 1;
+
+ while (low <= high) {
+ int mid = (low + high) >>> 1;
+
+ int cmp = Integer.compare(itemId(getItem(buf, mid)), itemId);
+
+ if (cmp < 0)
+ low = mid + 1;
+ else if (cmp > 0)
+ high = mid - 1;
+ else
+ return mid; // found
+ }
+
+ throw new IllegalStateException("Item not found: " + itemId);
+ }
+
/**
* @param buf Buffer.
* @return String representation.
@@ -390,6 +421,27 @@
return b.toString();
}
+ private int getDataOffset(long buf, int itemId) {
+ assert checkIndex(itemId): itemId;
+
+ int directCnt = getDirectCount(buf);
+
+ if (itemId >= directCnt) { // Need to do indirect lookup.
+ int indirectCnt = getIndirectCount(buf);
+
+ int indirectItemIdx = findIndirectItemIndex(buf, itemId, directCnt, indirectCnt);
+
+ assert indirectItemIdx >= directCnt : indirectItemIdx + " " + directCnt;
+ assert indirectItemIdx < directCnt + indirectCnt: indirectItemIdx + " " + directCnt + " " + indirectCnt;
+
+ itemId = directItemIndex(getItem(buf, indirectItemIdx));
+
+ assert itemId >= 0 && itemId < directCnt: itemId + " " + directCnt + " " + indirectCnt; // Direct item.
+ }
+
+ return directItemToOffset(getItem(buf, itemId));
+ }
+
/**
* @param buf Buffer.
* @param itemId Fixed item ID (the index used for referencing an entry from the outside).
@@ -463,6 +515,19 @@
return nextLink;
}
+ public long getPositionOnPayload(long buf, final int itemId) {
+ int dataOff = getDataOffset(buf, itemId);
+
+ boolean fragmented = false;
+ //long nextLink = 0;//fragmented ? getNextFragmentLink(buf, dataOff) : 0;
+
+// int payloadSize = getPageEntrySize(buf, dataOff, 0);
+// buf.position(dataOff + PAYLOAD_LEN_SIZE + (fragmented ? LINK_SIZE : 0));
+// buf.limit(buf.position() + payloadSize);
+
+ return dataOff + PAYLOAD_LEN_SIZE;
+ }
+
/**
* @param buf Buffer.
* @param idx Item index.
@@ -472,6 +537,10 @@
return buf.getShort(itemOffset(idx));
}
+ private short getItem(long buf, int idx) {
+ return GridUnsafe.UNSAFE.getShort(buf + itemOffset(idx));
+ }
+
/**
* @param buf Buffer.
* @param idx Item index.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
index aa2d368..38f1520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/io/PageIO.java
@@ -27,6 +27,7 @@
import org.apache.ignite.internal.processors.cache.database.freelist.io.PagesListNodeIO;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.database.tree.util.PageLockListener;
+import org.apache.ignite.internal.util.GridUnsafe;
/**
* Base format for all the page types.
@@ -170,6 +171,10 @@
return buf.getShort(TYPE_OFF) & 0xFFFF;
}
+ public static int getType(long buf) {
+ return GridUnsafe.UNSAFE.getShort(buf + TYPE_OFF) & 0xFFFF;
+ }
+
/**
* @param buf Buffer.
* @param type Type.
@@ -206,6 +211,10 @@
return buf.getLong(PAGE_ID_OFF);
}
+ public static long getPageId(long buf) {
+ return GridUnsafe.getLong(buf, PAGE_ID_OFF);
+ }
+
/**
* @param buf Buffer.
* @param pageId Page ID.
@@ -303,6 +312,12 @@
return getPageIO(type, ver);
}
+ public static <Q extends PageIO> Q getPageIO(long buf) throws IgniteCheckedException {
+ int type = getType(buf);
+
+ return getPageIO(type, 1);
+ }
+
/**
* @param type IO Type.
* @param ver IO Version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler2.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler2.java
new file mode 100644
index 0000000..b23e4ef
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/database/tree/util/PageHandler2.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.database.tree.util;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.pagemem.Page;
+import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
+import org.apache.ignite.internal.pagemem.wal.record.delta.InitNewPageRecord;
+import org.apache.ignite.internal.processors.cache.database.tree.io.PageIO;
+import org.apache.ignite.internal.util.GridUnsafe;
+import sun.nio.ch.DirectBuffer;
+
+import static java.lang.Boolean.FALSE;
+import static java.lang.Boolean.TRUE;
+
+/**
+ * Page handler.
+ */
+public abstract class PageHandler2<X, R> {
+ /** */
+ private static final PageHandler2<Void, Boolean> NOOP = new PageHandler2<Void, Boolean>() {
+ @Override public Boolean run(Page page, PageIO io, long buf, Void arg, int intArg)
+ throws IgniteCheckedException {
+ return TRUE;
+ }
+ };
+
+ /**
+ * @param page Page.
+ * @param io IO.
+ * @param buf Page buffer.
+ * @param arg Argument.
+ * @param intArg Argument of type {@code int}.
+ * @return Result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public abstract R run(Page page, PageIO io, long buf, X arg, int intArg)
+ throws IgniteCheckedException;
+
+ /**
+ * @param page Page.
+ * @param arg Argument.
+ * @param intArg Argument of type {@code int}.
+ * @return {@code true} If release.
+ */
+ public boolean releaseAfterWrite(Page page, X arg, int intArg) {
+ return true;
+ }
+
+ /**
+ * @param page Page.
+ * @param h Handler.
+ * @param arg Argument.
+ * @param intArg Argument of type {@code int}.
+ * @param lockFailed Result in case of lock failure due to page recycling.
+ * @return Handler result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <X, R> R readPage(
+ Page page,
+ PageLockListener lockListener,
+ PageHandler2<X, R> h,
+ X arg,
+ int intArg,
+ R lockFailed
+ ) throws IgniteCheckedException {
+ long buf = readLock(page, lockListener);
+
+ if (buf == -1L)
+ return lockFailed;
+
+ try {
+ PageIO io = PageIO.getPageIO(buf);
+
+ return h.run(page, io, buf, arg, intArg);
+ }
+ finally {
+ readUnlock(page, buf, lockListener);
+ }
+ }
+
+ /**
+ * @param page Page.
+ * @param h Handler.
+ * @param arg Argument.
+ * @param intArg Argument of type {@code int}.
+ * @param lockFailed Result in case of lock failure due to page recycling.
+ * @return Handler result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <X, R> R writePage(
+ Page page,
+ PageLockListener lockListener,
+ PageHandler2<X, R> h,
+ X arg,
+ int intArg,
+ R lockFailed
+ ) throws IgniteCheckedException {
+ return writePage(page, lockListener, h, null, null, arg, intArg, lockFailed);
+ }
+
+ /**
+ * @param page Page.
+ * @param lockListener Lock listener.
+ * @param init IO for new page initialization or {@code null} if it is an existing page.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static void initPage(
+ Page page,
+ PageLockListener lockListener,
+ PageIO init,
+ IgniteWriteAheadLogManager wal
+ ) throws IgniteCheckedException {
+ Boolean res = writePage(page, lockListener, NOOP, init, wal, null, 0, FALSE);
+
+ assert res == TRUE : res; // It must be newly allocated page, can't be recycled.
+ }
+
+ /**
+ * @param page Page.
+ * @param lockListener Lock listener.
+ * @return Byte buffer or {@code null} if failed to lock due to recycling.
+ */
+ public static long readLock(Page page, PageLockListener lockListener) {
+ long buf = page.getForReadAddr();
+
+ return buf;
+ }
+
+ /**
+ * @param page Page.
+ * @param buf Page buffer.
+ * @param lockListener Lock listener.
+ */
+ public static void readUnlock(Page page, long buf, PageLockListener lockListener) {
+ //lockListener.onReadUnlock(page, buf);
+
+ page.releaseRead();
+ }
+
+ /**
+ * @param page Page.
+ * @param lockListener Lock listener.
+ * @param tryLock Only try to lock without waiting.
+ * @return Byte buffer or {@code null} if failed to lock due to recycling.
+ */
+ public static ByteBuffer writeLock(Page page, PageLockListener lockListener, boolean tryLock) {
+ lockListener.onBeforeWriteLock(page);
+
+ ByteBuffer buf = tryLock ? page.tryGetForWrite() : page.getForWrite();
+
+ lockListener.onWriteLock(page, buf);
+
+ return buf;
+ }
+
+ /**
+ * @param page Page.
+ * @param buf Page buffer.
+ * @param lockListener Lock listener.
+ * @param dirty Page is dirty.
+ */
+ public static void writeUnlock(Page page, ByteBuffer buf, PageLockListener lockListener, boolean dirty) {
+ lockListener.onWriteUnlock(page, buf);
+
+ page.releaseWrite(dirty);
+ }
+
+ /**
+ * @param page Page.
+ * @param lockListener Lock listener.
+ * @param h Handler.
+ * @param init IO for new page initialization or {@code null} if it is an existing page.
+ * @param arg Argument.
+ * @param intArg Argument of type {@code int}.
+ * @param lockFailed Result in case of lock failure due to page recycling.
+ * @return Handler result.
+ * @throws IgniteCheckedException If failed.
+ */
+ public static <X, R> R writePage(
+ Page page,
+ PageLockListener lockListener,
+ PageHandler2<X, R> h,
+ PageIO init,
+ IgniteWriteAheadLogManager wal,
+ X arg,
+ int intArg,
+ R lockFailed
+ ) throws IgniteCheckedException {
+ ByteBuffer buf = writeLock(page, lockListener, false);
+
+ if (buf == null)
+ return lockFailed;
+
+ R res;
+
+ boolean ok = false;
+
+ try {
+ if (init != null) // It is a new page and we have to initialize it.
+ doInitPage(page, buf, init, wal);
+ else
+ init = PageIO.getPageIO(buf);
+
+ res = null;//h.run(page, init, buf, arg, intArg);
+
+ ok = true;
+ }
+ finally {
+ assert PageIO.getCrc(buf) == 0; //TODO GG-11480
+
+ if (h.releaseAfterWrite(page, arg, intArg))
+ writeUnlock(page, buf, lockListener, ok);
+ }
+
+ return res;
+ }
+
+ /**
+ * @param page Page.
+ * @param buf Buffer.
+ * @param init Initial IO.
+ * @param wal Write ahead log.
+ * @throws IgniteCheckedException If failed.
+ */
+ private static void doInitPage(
+ Page page,
+ ByteBuffer buf,
+ PageIO init,
+ IgniteWriteAheadLogManager wal
+ ) throws IgniteCheckedException {
+ assert PageIO.getCrc(buf) == 0; //TODO GG-11480
+
+ long pageId = page.id();
+
+ init.initNewPage(buf, pageId);
+
+ // Here we should never write full page, because it is known to be new.
+ page.fullPageWalRecordPolicy(FALSE);
+
+ if (isWalDeltaRecordNeeded(wal, page))
+ wal.log(new InitNewPageRecord(page.fullId().cacheId(), page.id(),
+ init.getType(), init.getVersion(), pageId));
+ }
+
+ /**
+ * @param wal Write ahead log.
+ * @param page Page.
+ * @return {@code true} If we need to make a delta WAL record for the change in this page.
+ */
+ public static boolean isWalDeltaRecordNeeded(IgniteWriteAheadLogManager wal, Page page) {
+ // If the page is clean, then it is either newly allocated or just after checkpoint.
+ // In both cases we have to write full page contents to WAL.
+ return wal != null && !wal.isAlwaysWriteFullPages() && page.fullPageWalRecordPolicy() != TRUE &&
+ (page.fullPageWalRecordPolicy() == FALSE || page.isDirty());
+ }
+
+ /**
+ * @param src Source.
+ * @param dst Destination.
+ * @param srcOff Source offset in bytes.
+ * @param dstOff Destination offset in bytes.
+ * @param cnt Bytes count to copy.
+ */
+ public static void copyMemory(ByteBuffer src, ByteBuffer dst, long srcOff, long dstOff, long cnt) {
+ byte[] srcArr = src.hasArray() ? src.array() : null;
+ byte[] dstArr = dst.hasArray() ? dst.array() : null;
+ long srcArrOff = src.hasArray() ? src.arrayOffset() + GridUnsafe.BYTE_ARR_OFF : 0;
+ long dstArrOff = dst.hasArray() ? dst.arrayOffset() + GridUnsafe.BYTE_ARR_OFF : 0;
+
+ long srcPtr = src.isDirect() ? ((DirectBuffer)src).address() : 0;
+ long dstPtr = dst.isDirect() ? ((DirectBuffer)dst).address() : 0;
+
+ GridUnsafe.copyMemory(srcArr, srcPtr + srcArrOff + srcOff, dstArr, dstPtr + dstArrOff + dstOff, cnt);
+ }
+
+ public static void zeroMemory(ByteBuffer buf, int off, int length) {
+ if (buf.isDirect())
+ GridUnsafe.setMemory(((DirectBuffer)buf).address() + off, length, (byte)0);
+
+ else {
+ for (int i = off; i < off + length; i++)
+ buf.put(i, (byte)0);
+
+ }
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
index 1f7a53c..3e554d5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/GridUnsafe.java
@@ -44,7 +44,7 @@
*/
public abstract class GridUnsafe {
/** Unsafe. */
- private static final Unsafe UNSAFE = unsafe();
+ public static final Unsafe UNSAFE = unsafe();
/** Unaligned flag. */
private static final boolean UNALIGNED = unaligned();
@@ -86,6 +86,22 @@
// No-op.
}
+ public static long getLong(long buf, int off) {
+ return UNSAFE.getLong(buf + off);
+ }
+
+ public static int getInt(long buf, int off) {
+ return UNSAFE.getInt(buf + off);
+ }
+
+ public static byte getByte(long buf, int off) {
+ return UNSAFE.getByte(buf + off);
+ }
+
+ public static short getShort(long buf, int off) {
+ return UNSAFE.getShort(buf + off);
+ }
+
/**
* Gets boolean value from object field.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
index 8197bff..948d07a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/OffheapReadWriteLock.java
@@ -119,7 +119,7 @@
return false;
if (canReadLock(state)) {
- if (GridUnsafe.compareAndSwapLong(null, lock, state, updateState(state, 1, 0, 0)))
+ if (GridUnsafe.compareAndSwapLong(null, lock, state, incLock(state)))
return true;
else
// Retry CAS, do not count as spin cycle.
@@ -153,11 +153,13 @@
while (true) {
long state = GridUnsafe.getLongVolatile(null, lock);
- if (lockCount(state) <= 0)
+ int lockCnt = lockCount(state);
+
+ if (lockCnt <= 0)
throw new IllegalMonitorStateException("Attempted to release a read lock while not holding it " +
"[lock=" + U.hexLong(lock) + ", state=" + U.hexLong(state) + ']');
- long updated = updateState(state, -1, 0, 0);
+ long updated = decLock(state, lockCnt);
assert updated != 0;
@@ -582,6 +584,35 @@
return buildState(writersWait, readersWait, tag, lock);
}
+ private long incLock(long state) {
+ int lock = lockCount(state);
+ int tag = tag(state);
+ int readersWait = readersWaitCount(state);
+ int writersWait = writersWaitCount(state);
+
+ lock++;
+
+ assert readersWait >= 0 : readersWait;
+ assert writersWait >= 0 : writersWait;
+ assert lock >= -1;
+
+ return buildState(writersWait, readersWait, tag, lock);
+ }
+
+ private long decLock(long state, int lock) {
+ int tag = tag(state);
+ int readersWait = readersWaitCount(state);
+ int writersWait = writersWaitCount(state);
+
+ lock--;
+
+ assert readersWait >= 0 : readersWait;
+ assert writersWait >= 0 : writersWait;
+ assert lock >= -1;
+
+ return buildState(writersWait, readersWait, tag, lock);
+ }
+
/**
* @param state State to update.
* @return Modified state.
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
index 4bc39ea..9dbeadf 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/database/BPlusTreeSelfTest.java
@@ -1206,6 +1206,16 @@
return Long.compare(n1, n2);
}
+ @Override
+ protected int compare(BPlusIO<Long> io, long buf, int idx, Long row) throws IgniteCheckedException {
+ return 0;
+ }
+
+ @Override
+ protected Long getRow(BPlusIO<Long> io, long buf, int idx) throws IgniteCheckedException {
+ return null;
+ }
+
/** {@inheritDoc} */
@Override protected Long getRow(BPlusIO<Long> io, ByteBuffer buf, int idx) throws IgniteCheckedException {
assert io.canGetRow() : io;