blob: 65887df7fc85783c820098dac916db8261b6b6f3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.tree;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.store.PageStore;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter;
import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow;
import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManager;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.BPlusIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccCacheIdAwareDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataInnerIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataLeafIO;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccDataRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccDataPageClosure;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.jetbrains.annotations.TestOnly;
import static java.lang.Boolean.FALSE;
import static java.lang.Boolean.TRUE;
import static org.apache.ignite.internal.pagemem.PageIdUtils.itemId;
import static org.apache.ignite.internal.pagemem.PageIdUtils.pageId;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO.MVCC_INFO_SIZE;
import static org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO.T_DATA;
import static org.apache.ignite.internal.util.GridArrays.clearTail;
/**
*
*/
public class CacheDataTree extends BPlusTree<CacheSearchRow, CacheDataRow> {
/** */
private static final CacheDataRow[] EMPTY_ROWS = {};
/** */
private static Boolean lastFindWithDataPageScan;
/** */
private static final ThreadLocal<Boolean> dataPageScanEnabled =
ThreadLocal.withInitial(() -> false);
/** */
private final CacheDataRowStore rowStore;
/** */
private final CacheGroupContext grp;
/**
* @param grp Cache group.
* @param name Tree name.
* @param reuseList Reuse list.
* @param rowStore Row store.
* @param metaPageId Meta page ID.
* @param initNew Initialize new index.
* @param pageFlag Default flag value for allocated pages.
* @param pageLockTrackerManager Page lock tracker manager.
* @throws IgniteCheckedException If failed.
*/
public CacheDataTree(
CacheGroupContext grp,
String name,
ReuseList reuseList,
CacheDataRowStore rowStore,
long metaPageId,
boolean initNew,
PageLockTrackerManager pageLockTrackerManager,
byte pageFlag
) throws IgniteCheckedException {
super(
name,
grp.groupId(),
grp.name(),
grp.dataRegion().pageMemory(),
grp.dataRegion().config().isPersistenceEnabled() ? grp.shared().wal() : null,
grp.offheap().globalRemoveId(),
metaPageId,
reuseList,
innerIO(grp),
leafIO(grp),
pageFlag,
grp.shared().kernalContext().failure(),
pageLockTrackerManager
);
assert rowStore != null;
this.rowStore = rowStore;
this.grp = grp;
assert !grp.dataRegion().config().isPersistenceEnabled() || grp.shared().database().checkpointLockIsHeldByThread();
initTree(initNew);
}
/**
* Enable or disable data page scan.
* @param enabled {code true} If enabled.
*/
public static void setDataPageScanEnabled(boolean enabled) {
dataPageScanEnabled.set(enabled);
}
/**
* @return {@code true} If data page scan is enabled.
*/
public static boolean isDataPageScanEnabled() {
return dataPageScanEnabled.get();
}
/**
* @return {@code true} If the last observed call to the method {@code find(...)} used data page scan.
*/
@TestOnly
public static Boolean isLastFindWithDataPageScan() {
Boolean res = lastFindWithDataPageScan;
lastFindWithDataPageScan = null;
return res;
}
/** {@inheritDoc} */
@Override public GridCursor<CacheDataRow> find(
CacheSearchRow lower,
CacheSearchRow upper,
TreeRowClosure<CacheSearchRow, CacheDataRow> c,
Object x
) throws IgniteCheckedException {
// If there is a group of caches, lower and upper bounds will not be null here.
if (lower == null
&& upper == null
&& grp.persistenceEnabled()
&& dataPageScanEnabled.get()
&& (c == null || c instanceof MvccDataPageClosure))
return scanDataPages(asRowData(x), (MvccDataPageClosure)c);
lastFindWithDataPageScan = FALSE;
return super.find(lower, upper, c, x);
}
/**
* @param rowData Required row data.
* @param c Optional MVCC closure.
* @return Cache row cursor.
* @throws IgniteCheckedException If failed.
*/
private GridCursor<CacheDataRow> scanDataPages(CacheDataRowAdapter.RowData rowData, MvccDataPageClosure c)
throws IgniteCheckedException {
lastFindWithDataPageScan = TRUE;
checkDestroyed();
assert rowData != null;
assert grp.persistenceEnabled();
int partId = rowStore.getPartitionId();
GridCacheSharedContext shared = grp.shared();
GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)shared.database();
PageStore pageStore = db.getPageStore(grpId, partId);
boolean mvccEnabled = grp.mvccEnabled();
int pageSize = pageSize();
long startPageId = ((PageMemoryEx)pageMem).partitionMetaPageId(grp.groupId(), partId);
/** */
final class DataPageScanCursor implements GridCursor<CacheDataRow> {
/** */
int pagesCnt = pageStore.pages();
/** */
int curPage = -1;
/** */
CacheDataRow[] rows = EMPTY_ROWS;
/** */
int curRow = -1;
/** {@inheritDoc} */
@Override public boolean next() throws IgniteCheckedException {
if (rows == null)
return false;
if (++curRow < rows.length && rows[curRow] != null)
return true;
return readNextDataPage();
}
/**
* @return {@code true} If new rows were fetched.
* @throws IgniteCheckedException If failed.
*/
private boolean readNextDataPage() throws IgniteCheckedException {
checkDestroyed();
for (;;) {
if (++curPage >= pagesCnt) {
// Reread number of pages when we reach it (it may grow).
int newPagesCnt = pageStore.pages();
if (newPagesCnt <= pagesCnt) {
rows = null;
return false;
}
pagesCnt = newPagesCnt;
}
long pageId = startPageId + curPage;
long page = pageMem.acquirePage(grpId, pageId);
try {
boolean skipVer = CacheDataRowStore.getSkipVersion();
long pageAddr = ((PageMemoryEx)pageMem).readLock(page, pageId, true, false);
try {
// TODO https://issues.apache.org/jira/browse/IGNITE-11998.
// Here we should also exclude fragmented pages that don't contain the head of the entry.
if (PageIO.getType(pageAddr) != T_DATA)
continue; // Not a data page.
DataPageIO io = PageIO.getPageIO(T_DATA, PageIO.getVersion(pageAddr));
int rowsCnt = io.getRowsCount(pageAddr);
if (rowsCnt == 0)
continue; // Empty page.
if (rowsCnt > rows.length)
rows = new CacheDataRow[rowsCnt];
else
clearTail(rows, rowsCnt);
int r = 0;
for (int i = 0; i < rowsCnt; i++) {
if (c == null || c.applyMvcc(io, pageAddr, i, pageSize)) {
DataRow row = mvccEnabled ? new MvccDataRow() : new DataRow();
row.initFromDataPage(
io,
pageAddr,
i,
grp,
shared,
pageMem,
rowData,
skipVer
);
rows[r++] = row;
}
}
if (r == 0)
continue; // No rows fetched in this page.
curRow = 0;
return true;
}
finally {
pageMem.readUnlock(grpId, pageId, page);
}
}
finally {
pageMem.releasePage(grpId, pageId, page);
}
}
}
/** {@inheritDoc} */
@Override public CacheDataRow get() {
return rows[curRow];
}
}
return new DataPageScanCursor();
}
/**
* @param flags Flags.
* @return Row data.
*/
private static CacheDataRowAdapter.RowData asRowData(Object flags) {
return flags != null ? (CacheDataRowAdapter.RowData)flags :
CacheDataRowAdapter.RowData.FULL;
}
/**
* @param grp Cache group.
* @return Tree inner IO.
*/
private static IOVersions<? extends AbstractDataInnerIO> innerIO(CacheGroupContext grp) {
if (grp.mvccEnabled())
return grp.sharedGroup() ? MvccCacheIdAwareDataInnerIO.VERSIONS : MvccDataInnerIO.VERSIONS;
return grp.sharedGroup() ? CacheIdAwareDataInnerIO.VERSIONS : DataInnerIO.VERSIONS;
}
/**
* @param grp Cache group.
* @return Tree leaf IO.
*/
private static IOVersions<? extends AbstractDataLeafIO> leafIO(CacheGroupContext grp) {
if (grp.mvccEnabled())
return grp.sharedGroup() ? MvccCacheIdAwareDataLeafIO.VERSIONS : MvccDataLeafIO.VERSIONS;
return grp.sharedGroup() ? CacheIdAwareDataLeafIO.VERSIONS : DataLeafIO.VERSIONS;
}
/**
* @return Row store.
*/
public CacheDataRowStore rowStore() {
return rowStore;
}
/** {@inheritDoc} */
@Override protected int compare(BPlusIO<CacheSearchRow> iox, long pageAddr, int idx, CacheSearchRow row)
throws IgniteCheckedException {
assert !grp.mvccEnabled() || row.mvccCoordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA
|| (row.getClass() == SearchRow.class && row.key() == null) : row;
RowLinkIO io = (RowLinkIO)iox;
int cmp;
if (grp.sharedGroup()) {
assert row.cacheId() != CU.UNDEFINED_CACHE_ID : "Cache ID is not provided: " + row;
int cacheId = io.getCacheId(pageAddr, idx);
cmp = Integer.compare(cacheId, row.cacheId());
if (cmp != 0)
return cmp;
if (row.key() == null) {
assert row.getClass() == SearchRow.class : row;
// A search row with a cache ID only is used as a cache bound.
// The found position will be shifted until the exact cache bound is found;
// See for details:
// o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findLowerBound()
// o.a.i.i.p.c.database.tree.BPlusTree.ForwardCursor.findUpperBound()
return cmp;
}
}
cmp = Integer.compare(io.getHash(pageAddr, idx), row.hash());
if (cmp != 0)
return cmp;
long link = io.getLink(pageAddr, idx);
assert row.key() != null : row;
cmp = compareKeys(row.key(), link);
if (cmp != 0 || !grp.mvccEnabled())
return cmp;
long crd = io.getMvccCoordinatorVersion(pageAddr, idx);
long cntr = io.getMvccCounter(pageAddr, idx);
int opCntr = io.getMvccOperationCounter(pageAddr, idx);
assert MvccUtils.mvccVersionIsValid(crd, cntr, opCntr);
return -MvccUtils.compare(crd, cntr, opCntr, row); // descending order
}
/** {@inheritDoc} */
@Override public CacheDataRow getRow(BPlusIO<CacheSearchRow> io, long pageAddr, int idx, Object flags) {
RowLinkIO rowIo = (RowLinkIO)io;
long link = rowIo.getLink(pageAddr, idx);
int hash = rowIo.getHash(pageAddr, idx);
int cacheId = grp.sharedGroup() ? rowIo.getCacheId(pageAddr, idx) : CU.UNDEFINED_CACHE_ID;
CacheDataRowAdapter.RowData x = asRowData(flags);
if (grp.mvccEnabled()) {
long mvccCrdVer = rowIo.getMvccCoordinatorVersion(pageAddr, idx);
long mvccCntr = rowIo.getMvccCounter(pageAddr, idx);
int mvccOpCntr = rowIo.getMvccOperationCounter(pageAddr, idx);
return rowStore.mvccRow(cacheId, hash, link, x, mvccCrdVer, mvccCntr, mvccOpCntr);
}
else
return rowStore.dataRow(cacheId, hash, link, x);
}
/** {@inheritDoc} */
@Override protected IoStatisticsHolder statisticsHolder() {
return grp.statisticsHolderIdx();
}
/**
* @param key Key.
* @param link Link.
* @return Compare result.
* @throws IgniteCheckedException If failed.
*/
private int compareKeys(KeyCacheObject key, final long link) throws IgniteCheckedException {
byte[] bytes = key.valueBytes(grp.cacheObjectContext());
final long pageId = pageId(link);
final long page = acquirePage(pageId);
try {
long pageAddr = readLock(pageId, page); // Non-empty data page must not be recycled.
assert pageAddr != 0L : link;
try {
DataPageIO io = DataPageIO.VERSIONS.forPage(pageAddr);
DataPagePayload data = io.readPayload(pageAddr,
itemId(link),
pageSize());
if (data.nextLink() == 0) {
long addr = pageAddr + data.offset();
if (grp.mvccEnabled())
addr += MVCC_INFO_SIZE; // Skip MVCC info.
if (grp.storeCacheIdInDataPage())
addr += 4; // Skip cache id.
final int len = PageUtils.getInt(addr, 0);
int lenCmp = Integer.compare(len, bytes.length);
if (lenCmp != 0)
return lenCmp;
addr += 5; // Skip length and type byte.
final int words = len / 8;
for (int i = 0; i < words; i++) {
int off = i * 8;
long b1 = PageUtils.getLong(addr, off);
long b2 = GridUnsafe.getLong(bytes, GridUnsafe.BYTE_ARR_OFF + off);
int cmp = Long.compare(b1, b2);
if (cmp != 0)
return cmp;
}
for (int i = words * 8; i < len; i++) {
byte b1 = PageUtils.getByte(addr, i);
byte b2 = bytes[i];
if (b1 != b2)
return b1 > b2 ? 1 : -1;
}
return 0;
}
}
finally {
readUnlock(pageId, page, pageAddr);
}
}
finally {
releasePage(pageId, page);
}
// TODO GG-11768.
CacheDataRowAdapter other = grp.mvccEnabled() ? new MvccDataRow(link) : new CacheDataRowAdapter(link);
other.initFromLink(grp, CacheDataRowAdapter.RowData.KEY_ONLY);
byte[] bytes1 = other.key().valueBytes(grp.cacheObjectContext());
byte[] bytes2 = key.valueBytes(grp.cacheObjectContext());
int lenCmp = Integer.compare(bytes1.length, bytes2.length);
if (lenCmp != 0)
return lenCmp;
final int len = bytes1.length;
final int words = len / 8;
for (int i = 0; i < words; i++) {
int off = GridUnsafe.BYTE_ARR_INT_OFF + i * 8;
long b1 = GridUnsafe.getLong(bytes1, off);
long b2 = GridUnsafe.getLong(bytes2, off);
int cmp = Long.compare(b1, b2);
if (cmp != 0)
return cmp;
}
for (int i = words * 8; i < len; i++) {
byte b1 = bytes1[i];
byte b2 = bytes2[i];
if (b1 != b2)
return b1 > b2 ? 1 : -1;
}
return 0;
}
}