blob: 839b16e3a8ef4b20751e22d7cad001520fe66d33 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.freelist;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.metric.IoStatisticsHolder;
import org.apache.ignite.internal.metric.IoStatisticsHolderNoOp;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.Storable;
import org.apache.ignite.internal.processors.cache.persistence.diagnostic.pagelocktracker.PageLockTrackerManager;
import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.util.GridCursorIteratorWrapper;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_DATA;
/**
*/
public abstract class AbstractFreeList<T extends Storable> extends PagesList implements FreeList<T>, ReuseList {
/** */
private static final int BUCKETS = 256; // Must be power of 2.
/** */
private static final int REUSE_BUCKET = BUCKETS - 1;
/** */
private static final Integer COMPLETE = Integer.MAX_VALUE;
/** */
private static final Integer FAIL_I = Integer.MIN_VALUE;
/** */
private static final Long FAIL_L = Long.MAX_VALUE;
/** */
private static final int MIN_PAGE_FREE_SPACE = 8;
/**
* Step between buckets in free list, measured in powers of two.
* For example, for page size 4096 and 256 buckets, shift is 4 and step is 16 bytes.
*/
private final int shift;
/** */
private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
/** Onheap bucket page list caches. */
private final AtomicReferenceArray<PagesCache> bucketCaches = new AtomicReferenceArray<>(BUCKETS);
/** */
private final int MIN_SIZE_FOR_DATA_PAGE;
/** */
private final PageHandler<T, Boolean> updateRow = new UpdateRowHandler();
/** */
private final DataRegionMetricsImpl memMetrics;
/** */
private final PageEvictionTracker evictionTracker;
/** Page list cache limit. */
private final AtomicLong pageListCacheLimit;
/**
*
*/
private final class UpdateRowHandler extends PageHandler<T, Boolean> {
/** {@inheritDoc} */
@Override public Boolean run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
T row,
int itemId,
IoStatisticsHolder statHolder)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int rowSize = row.size();
boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
evictionTracker.touchPage(pageId);
if (updated && needWalDeltaRecord(pageId, page, walPlc)) {
// TODO This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[rowSize];
DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
assert data.payloadSize() == rowSize;
PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
wal.log(new DataPageUpdateRecord(
cacheId,
pageId,
itemId,
payload));
}
return updated;
}
}
/** Write a single row on a single page. */
private final WriteRowHandler writeRowHnd = new WriteRowHandler();
/** Write multiple rows on a single page. */
private final WriteRowsHandler writeRowsHnd = new WriteRowsHandler();
/** */
private class WriteRowHandler extends PageHandler<T, Integer> {
/** {@inheritDoc} */
@Override public Integer run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
T row,
int written,
IoStatisticsHolder statHolder)
throws IgniteCheckedException {
written = addRow(pageId, page, pageAddr, iox, row, written);
putPage(((AbstractDataPageIO)iox).getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder);
return written;
}
/**
* @param pageId Page ID.
* @param page Page absolute pointer.
* @param pageAddr Page address.
* @param iox IO.
* @param row Row to write.
* @param written Written size.
* @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
* @throws IgniteCheckedException If failed.
*/
protected Integer addRow(
long pageId,
long page,
long pageAddr,
PageIO iox,
T row,
int written)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int rowSize = row.size();
int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace > 0 : oldFreeSpace;
// If the full row does not fit into this page write only a fragment.
written = (written == 0 && oldFreeSpace >= rowSize) ? addRowFull(pageId, page, pageAddr, io, row, rowSize) :
addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
if (written == rowSize)
evictionTracker.touchPage(pageId);
// Avoid boxing with garbage generation for usual case.
return written == rowSize ? COMPLETE : written;
}
/**
* @param pageId Page ID.
* @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
* @param row Row.
* @param rowSize Row size.
* @return Written size which is always equal to row size here.
* @throws IgniteCheckedException If failed.
*/
protected int addRowFull(
long pageId,
long page,
long pageAddr,
AbstractDataPageIO<T> io,
T row,
int rowSize
) throws IgniteCheckedException {
io.addRow(pageId, pageAddr, row, rowSize, pageSize());
if (needWalDeltaRecord(pageId, page, null)) {
// TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[rowSize];
DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
assert data.payloadSize() == rowSize;
PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
wal.log(new DataPageInsertRecord(
grpId,
pageId,
payload));
}
return rowSize;
}
/**
* @param pageId Page ID.
* @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
* @param row Row.
* @param written Written size.
* @param rowSize Row size.
* @return Updated written size.
* @throws IgniteCheckedException If failed.
*/
protected int addRowFragment(
long pageId,
long page,
long pageAddr,
AbstractDataPageIO<T> io,
T row,
int written,
int rowSize
) throws IgniteCheckedException {
// Read last link before the fragment write, because it will be updated there.
long lastLink = row.link();
int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize());
assert payloadSize > 0 : payloadSize;
if (needWalDeltaRecord(pageId, page, null)) {
// TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[payloadSize];
DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
wal.log(new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink));
}
return written + payloadSize;
}
/**
* Put page into the free list if needed.
*
* @param freeSpace Page free space.
* @param pageId Page ID.
* @param page Page pointer.
* @param pageAddr Page address.
* @param statHolder Statistics holder to track IO operations.
*/
protected void putPage(int freeSpace, long pageId, long page, long pageAddr, IoStatisticsHolder statHolder)
throws IgniteCheckedException {
if (freeSpace > MIN_PAGE_FREE_SPACE) {
int bucket = bucket(freeSpace, false);
put(null, pageId, page, pageAddr, bucket, statHolder);
}
}
}
/** */
private final class WriteRowsHandler extends PageHandler<GridCursor<T>, Integer> {
/** {@inheritDoc} */
@Override public Integer run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
GridCursor<T> cur,
int written,
IoStatisticsHolder statHolder)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
// Fill the page up to the end.
while (written != COMPLETE || (!evictionTracker.evictionRequired() && cur.next())) {
T row = cur.get();
if (written == COMPLETE) {
// If the data row was completely written without remainder, proceed to the next.
if ((written = writeWholePages(row, statHolder)) == COMPLETE)
continue;
if (io.getFreeSpace(pageAddr) < row.size() - written)
break;
}
written = writeRowHnd.addRow(pageId, page, pageAddr, io, row, written);
assert written == COMPLETE;
evictionTracker.touchPage(pageId);
}
writeRowHnd.putPage(io.getFreeSpace(pageAddr), pageId, page, pageAddr, statHolder);
return written;
}
}
/** */
private final PageHandler<ReuseBag, Long> rmvRow;
/**
*
*/
private final class RemoveRowHandler extends PageHandler<ReuseBag, Long> {
/** Indicates whether partition ID should be masked from page ID. */
private final boolean maskPartId;
/** */
RemoveRowHandler(boolean maskPartId) {
this.maskPartId = maskPartId;
}
/** {@inheritDoc} */
@Override public Long run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
ReuseBag reuseBag,
int itemId,
IoStatisticsHolder statHolder)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace >= 0 : oldFreeSpace;
long nextLink = io.removeRow(pageAddr, itemId, pageSize());
if (needWalDeltaRecord(pageId, page, walPlc))
wal.log(new DataPageRemoveRecord(cacheId, pageId, itemId));
int newFreeSpace = io.getFreeSpace(pageAddr);
if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
int newBucket = bucket(newFreeSpace, false);
boolean putIsNeeded = oldFreeSpace <= MIN_PAGE_FREE_SPACE;
if (!putIsNeeded) {
int oldBucket = bucket(oldFreeSpace, false);
if (oldBucket != newBucket) {
// It is possible that page was concurrently taken for put, in this case put will handle bucket change.
pageId = maskPartId ? PageIdUtils.maskPartitionId(pageId) : pageId;
putIsNeeded = removeDataPage(pageId, page, pageAddr, io, oldBucket, statHolder);
}
}
if (io.isEmpty(pageAddr)) {
evictionTracker.forgetPage(pageId);
if (putIsNeeded)
reuseBag.addFreePage(recyclePage(pageId, page, pageAddr, null));
}
else if (putIsNeeded)
put(null, pageId, page, pageAddr, newBucket, statHolder);
}
// For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
return nextLink;
}
}
/**
* @param cacheGrpId Cache group ID.
* @param name Name (for debug purpose).
* @param dataRegion Data region.
* @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
* @param wal Write ahead log manager.
* @param metaPageId Metadata page ID.
* @param initNew {@code True} if new metadata should be initialized.
* @param pageLockTrackerManager Page lock tracker manager.
* @param pageFlag Default flag value for allocated pages.
* @throws IgniteCheckedException If failed.
*/
public AbstractFreeList(
int cacheGrpId,
String name,
DataRegion dataRegion,
@Nullable ReuseList reuseList,
@Nullable IgniteWriteAheadLogManager wal,
long metaPageId,
boolean initNew,
PageLockTrackerManager pageLockTrackerManager,
GridKernalContext ctx,
@Nullable AtomicLong pageListCacheLimit,
byte pageFlag
) throws IgniteCheckedException {
super(cacheGrpId, name, dataRegion.pageMemory(), BUCKETS, wal, metaPageId, pageLockTrackerManager, ctx, pageFlag);
rmvRow = new RemoveRowHandler(cacheGrpId == 0);
this.evictionTracker = dataRegion.evictionTracker();
this.reuseList = reuseList == null ? this : reuseList;
int pageSize = pageMem.pageSize();
assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
assert U.isPow2(BUCKETS);
assert BUCKETS <= pageSize : pageSize;
// TODO this constant is used because currently we cannot reuse data pages as index pages
// TODO and vice-versa. It should be removed when data storage format is finalized.
MIN_SIZE_FOR_DATA_PAGE = pageSize - AbstractDataPageIO.MIN_DATA_PAGE_OVERHEAD;
int shift = 0;
while (pageSize > BUCKETS) {
shift++;
pageSize >>>= 1;
}
this.shift = shift;
this.memMetrics = dataRegion.metrics();
this.pageListCacheLimit = pageListCacheLimit;
init(metaPageId, initNew);
}
/**
* Calculates free space tracked by this FreeListImpl instance.
*
* @return Free space available for use, in bytes.
*/
public long freeSpace() {
long freeSpace = 0;
for (int b = BUCKETS - 2; b > 0; b--) {
long perPageFreeSpace = b << shift;
long pages = bucketsSize.get(b);
freeSpace += pages * perPageFreeSpace;
}
return freeSpace;
}
/** {@inheritDoc} */
@Override public void dumpStatistics(IgniteLogger log) {
long dataPages = 0;
final boolean dumpBucketsInfo = false;
for (int b = 0; b < BUCKETS; b++) {
long size = bucketsSize.get(b);
if (!isReuseBucket(b))
dataPages += size;
if (dumpBucketsInfo) {
Stripe[] stripes = getBucket(b);
boolean empty = true;
if (stripes != null) {
for (Stripe stripe : stripes) {
if (!stripe.empty) {
empty = false;
break;
}
}
}
if (log.isInfoEnabled())
log.info("Bucket [b=" + b +
", size=" + size +
", stripes=" + (stripes != null ? stripes.length : 0) +
", stripesEmpty=" + empty + ']');
}
}
if (dataPages > 0) {
if (log.isInfoEnabled())
log.info("FreeList [name=" + name() +
", buckets=" + BUCKETS +
", dataPages=" + dataPages +
", reusePages=" + bucketsSize.get(REUSE_BUCKET) + "]");
}
}
/**
* @param freeSpace Page free space.
* @param allowReuse {@code True} if it is allowed to get reuse bucket.
* @return Bucket.
*/
private int bucket(int freeSpace, boolean allowReuse) {
assert freeSpace > 0 : freeSpace;
int bucket = freeSpace >>> shift;
assert bucket >= 0 && bucket < BUCKETS : bucket;
if (!allowReuse && isReuseBucket(bucket))
bucket--;
return bucket;
}
/** {@inheritDoc} */
@Override protected int getBucketIndex(int freeSpace) {
return freeSpace > MIN_PAGE_FREE_SPACE ? bucket(freeSpace, false) : -1;
}
/**
* @param part Partition.
* @return Page ID.
* @throws IgniteCheckedException If failed.
*/
private long allocateDataPage(int part) throws IgniteCheckedException {
assert part <= PageIdAllocator.MAX_PARTITION_ID;
return pageMem.allocatePage(grpId, part, FLAG_DATA);
}
/** {@inheritDoc} */
@Override public void insertDataRow(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
int written = 0;
try {
do {
if (written != 0)
memMetrics.incrementLargeEntriesPages();
written = writeSinglePage(row, written, statHolder);
}
while (written != COMPLETE);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to insert data row", t, grpId);
}
}
/**
* Reduces the workload on the free list by writing multiple rows into a single memory page at once.<br>
* <br>
* Rows are sequentially added to the page as long as there is enough free space on it. If the row is large then
* those fragments that occupy the whole memory page are written to other pages, and the remainder is added to the
* current one.
*
* @param rows Rows.
* @param statHolder Statistics holder to track IO operations.
* @throws IgniteCheckedException If failed.
*/
@Override public void insertDataRows(Collection<T> rows,
IoStatisticsHolder statHolder) throws IgniteCheckedException {
try {
GridCursor<T> cur = new GridCursorIteratorWrapper<>(rows.iterator());
int written = COMPLETE;
while (written != COMPLETE || cur.next()) {
T row = cur.get();
// If eviction is required - free up memory before locking the next page.
while (evictionTracker.evictionRequired()) {
evictionTracker.evictDataPage();
memMetrics.updateEvictionRate();
}
if (written == COMPLETE) {
written = writeWholePages(row, statHolder);
continue;
}
AbstractDataPageIO initIo = null;
long pageId = takePage(row.size() - written, row, statHolder);
if (pageId == 0L) {
pageId = allocateDataPage(row.partition());
initIo = row.ioVersions().latest();
}
written = write(pageId, writeRowsHnd, initIo, cur, written, FAIL_I, statHolder);
assert written != FAIL_I; // We can't fail here.
}
}
catch (RuntimeException e) {
throw new CorruptedFreeListException("Failed to insert data rows", e, grpId);
}
}
/**
* Write fragments of the row, which occupy the whole memory page. A data row is ignored if it is less than the max
* payload of an empty data page.
*
* @param row Row to process.
* @param statHolder Statistics holder to track IO operations.
* @return Number of bytes written, {@link #COMPLETE} if the row was fully written, {@code 0} if data row was
* ignored because it is less than the max payload of an empty data page.
* @throws IgniteCheckedException If failed.
*/
private int writeWholePages(T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
assert row.link() == 0 : row.link();
int written = 0;
int rowSize = row.size();
while (rowSize - written >= MIN_SIZE_FOR_DATA_PAGE) {
written = writeSinglePage(row, written, statHolder);
memMetrics.incrementLargeEntriesPages();
}
return written;
}
/**
* Take a page and write row on it.
*
* @param row Row to write.
* @param written Written size.
* @param statHolder Statistics holder to track IO operations.
* @return Number of bytes written, {@link #COMPLETE} if the row was fully written.
* @throws IgniteCheckedException If failed.
*/
private int writeSinglePage(T row, int written, IoStatisticsHolder statHolder) throws IgniteCheckedException {
AbstractDataPageIO initIo = null;
long pageId = takePage(row.size() - written, row, statHolder);
if (pageId == 0L) {
pageId = allocateDataPage(row.partition());
initIo = row.ioVersions().latest();
}
written = write(pageId, writeRowHnd, initIo, row, written, FAIL_I, statHolder);
assert written != FAIL_I; // We can't fail here.
return written;
}
/**
* Take page from free list.
*
* @param size Required free space on page.
* @param row Row to write.
* @param statHolder Statistics holder to track IO operations.
* @return Page identifier or 0 if no page found in free list.
* @throws IgniteCheckedException If failed.
*/
private long takePage(int size, T row, IoStatisticsHolder statHolder) throws IgniteCheckedException {
long pageId = 0;
if (size < MIN_SIZE_FOR_DATA_PAGE) {
for (int b = bucket(size, false) + 1; b < REUSE_BUCKET; b++) {
pageId = takeEmptyPage(b, row.ioVersions(), statHolder);
if (pageId != 0L)
break;
}
}
if (pageId == 0L) { // Handle reuse bucket.
if (reuseList == this)
pageId = takeEmptyPage(REUSE_BUCKET, row.ioVersions(), statHolder);
else {
pageId = reuseList.takeRecycledPage();
if (pageId != 0)
pageId = reuseList.initRecycledPage(pageId, FLAG_DATA, row.ioVersions().latest());
}
}
if (pageId == 0L)
return 0;
assert PageIdUtils.flag(pageId) == FLAG_DATA
: "rowVersions=" + row.ioVersions() + ", pageId=" + PageIdUtils.toDetailString(pageId);
return PageIdUtils.changePartitionId(pageId, row.partition());
}
/**
* @param row Row.
* @param reusedPageId Reused page id.
* @param statHolder Statistics holder to track IO operations.
* @return Prepared page id.
*
* @see PagesList#initReusedPage(long, long, long, int, byte, PageIO)
*/
private long initReusedPage(T row, long reusedPageId, IoStatisticsHolder statHolder) throws IgniteCheckedException {
long reusedPage = acquirePage(reusedPageId, statHolder);
try {
long reusedPageAddr = writeLock(reusedPageId, reusedPage);
assert reusedPageAddr != 0;
try {
return initReusedPage(reusedPageId, reusedPage, reusedPageAddr,
row.partition(), PageIdAllocator.FLAG_DATA, row.ioVersions().latest());
}
finally {
writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true);
}
}
finally {
releasePage(reusedPageId, reusedPage);
}
}
/** {@inheritDoc} */
@Override public boolean updateDataRow(long link, T row,
IoStatisticsHolder statHolder) throws IgniteCheckedException {
assert link != 0;
try {
long pageId = PageIdUtils.pageId(link);
int itemId = PageIdUtils.itemId(link);
Boolean updated = write(pageId, updateRow, row, itemId, null, statHolder);
assert updated != null; // Can't fail here.
return updated;
}
catch (AssertionError e) {
throw corruptedFreeListException(e);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to update data row", t, grpId);
}
}
/** {@inheritDoc} */
@Override public <S, R> R updateDataRow(long link, PageHandler<S, R> pageHnd, S arg,
IoStatisticsHolder statHolder) throws IgniteCheckedException {
assert link != 0;
try {
long pageId = PageIdUtils.pageId(link);
int itemId = PageIdUtils.itemId(link);
R updRes = write(pageId, pageHnd, arg, itemId, null, statHolder);
assert updRes != null; // Can't fail here.
return updRes;
}
catch (AssertionError e) {
throw corruptedFreeListException(e);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to update data row", t, grpId);
}
}
/** {@inheritDoc} */
@Override public void removeDataRowByLink(long link, IoStatisticsHolder statHolder) throws IgniteCheckedException {
assert link != 0;
try {
long pageId = PageIdUtils.pageId(link);
int itemId = PageIdUtils.itemId(link);
ReuseBag bag = new LongListReuseBag();
long nextLink = write(pageId, rmvRow, bag, itemId, FAIL_L, statHolder);
assert nextLink != FAIL_L; // Can't fail here.
while (nextLink != 0L) {
memMetrics.decrementLargeEntriesPages();
itemId = PageIdUtils.itemId(nextLink);
pageId = PageIdUtils.pageId(nextLink);
nextLink = write(pageId, rmvRow, bag, itemId, FAIL_L, statHolder);
assert nextLink != FAIL_L; // Can't fail here.
}
reuseList.addForRecycle(bag);
}
catch (AssertionError e) {
throw corruptedFreeListException(e);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to remove data by link", t, grpId);
}
}
/** {@inheritDoc} */
@Override protected Stripe[] getBucket(int bucket) {
return buckets.get(bucket);
}
/** {@inheritDoc} */
@Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd) {
boolean res = buckets.compareAndSet(bucket, exp, upd);
if (log.isDebugEnabled()) {
log.debug("CAS bucket [list=" + name() + ", bucket=" + bucket + ", old=" + Arrays.toString(exp) +
", new=" + Arrays.toString(upd) + ", res=" + res + ']');
}
return res;
}
/** {@inheritDoc} */
@Override protected boolean isReuseBucket(int bucket) {
return bucket == REUSE_BUCKET;
}
/** {@inheritDoc} */
@Override protected PagesCache getBucketCache(int bucket, boolean create) {
PagesCache pagesCache = bucketCaches.get(bucket);
if (pagesCache == null && create &&
!bucketCaches.compareAndSet(bucket, null, pagesCache = new PagesCache(pageListCacheLimit)))
pagesCache = bucketCaches.get(bucket);
return pagesCache;
}
/**
* @return Number of empty data pages in free list.
*/
public int emptyDataPages() {
return (int)bucketsSize.get(REUSE_BUCKET);
}
/** {@inheritDoc} */
@Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
assert reuseList == this : "not allowed to be a reuse list";
try {
put(bag, 0, 0, 0L, REUSE_BUCKET, IoStatisticsHolderNoOp.INSTANCE);
}
catch (AssertionError e) {
throw corruptedFreeListException(e);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to add page for recycle", t, grpId);
}
}
/** {@inheritDoc} */
@Override public long takeRecycledPage() throws IgniteCheckedException {
assert reuseList == this : "not allowed to be a reuse list";
try {
return takeEmptyPage(REUSE_BUCKET, null, IoStatisticsHolderNoOp.INSTANCE);
}
catch (AssertionError e) {
throw corruptedFreeListException(e);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to take recycled page", t, grpId);
}
}
/** {@inheritDoc} */
@Override public long initRecycledPage(long pageId, byte flag, PageIO initIO) throws IgniteCheckedException {
return initRecycledPage0(pageId, flag, initIO);
}
/** {@inheritDoc} */
@Override public long recycledPagesCount() throws IgniteCheckedException {
assert reuseList == this : "not allowed to be a reuse list";
try {
return storedPagesCount(REUSE_BUCKET);
}
catch (AssertionError e) {
throw corruptedFreeListException(e);
}
catch (IgniteCheckedException | Error e) {
throw e;
}
catch (Throwable t) {
throw new CorruptedFreeListException("Failed to count recycled pages", t, grpId);
}
}
/** {@inheritDoc} */
@Override public String toString() {
return "FreeList [name=" + name() + ']';
}
}