blob: 4e1f783b2c287112a3dd0a7f60a0390ca51c0041 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence.freelist;
import java.util.concurrent.atomic.AtomicReferenceArray;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageUtils;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertFragmentRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageInsertRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageRemoveRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.DataPageUpdateRecord;
import org.apache.ignite.internal.processors.cache.persistence.DataRegion;
import org.apache.ignite.internal.processors.cache.persistence.DataRegionMetricsImpl;
import org.apache.ignite.internal.processors.cache.persistence.Storable;
import org.apache.ignite.internal.processors.cache.persistence.evict.PageEvictionTracker;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.AbstractDataPageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.IOVersions;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.LongListReuseBag;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseBag;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.util.typedef.internal.U;
/**
*/
public abstract class AbstractFreeList<T extends Storable> extends PagesList implements FreeList<T>, ReuseList {
/** */
private static final int BUCKETS = 256; // Must be power of 2.
/** */
private static final int REUSE_BUCKET = BUCKETS - 1;
/** */
private static final Integer COMPLETE = Integer.MAX_VALUE;
/** */
private static final Integer FAIL_I = Integer.MIN_VALUE;
/** */
private static final Long FAIL_L = Long.MAX_VALUE;
/** */
private static final int MIN_PAGE_FREE_SPACE = 8;
/**
* Step between buckets in free list, measured in powers of two.
* For example, for page size 4096 and 256 buckets, shift is 4 and step is 16 bytes.
*/
private final int shift;
/** */
private final AtomicReferenceArray<Stripe[]> buckets = new AtomicReferenceArray<>(BUCKETS);
/** */
private final int MIN_SIZE_FOR_DATA_PAGE;
/** */
private final PageHandler<T, Boolean> updateRow = new UpdateRowHandler();
/** */
private final DataRegionMetricsImpl memMetrics;
/** */
private final PageEvictionTracker evictionTracker;
/**
*
*/
private final class UpdateRowHandler extends PageHandler<T, Boolean> {
@Override public Boolean run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
T row,
int itemId)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int rowSize = row.size();
boolean updated = io.updateRow(pageAddr, itemId, pageSize(), null, row, rowSize);
evictionTracker.touchPage(pageId);
if (updated && needWalDeltaRecord(pageId, page, walPlc)) {
// TODO This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[rowSize];
DataPagePayload data = io.readPayload(pageAddr, itemId, pageSize());
assert data.payloadSize() == rowSize;
PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
wal.log(new DataPageUpdateRecord(
cacheId,
pageId,
itemId,
payload));
}
return updated;
}
}
/** */
private final PageHandler<T, Integer> writeRow = new WriteRowHandler();
/**
*
*/
private final class WriteRowHandler extends PageHandler<T, Integer> {
@Override public Integer run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
T row,
int written)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int rowSize = row.size();
int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace > 0 : oldFreeSpace;
// If the full row does not fit into this page write only a fragment.
written = (written == 0 && oldFreeSpace >= rowSize) ? addRow(pageId, page, pageAddr, io, row, rowSize) :
addRowFragment(pageId, page, pageAddr, io, row, written, rowSize);
// Reread free space after update.
int newFreeSpace = io.getFreeSpace(pageAddr);
if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
int bucket = bucket(newFreeSpace, false);
put(null, pageId, page, pageAddr, bucket);
}
if (written == rowSize)
evictionTracker.touchPage(pageId);
// Avoid boxing with garbage generation for usual case.
return written == rowSize ? COMPLETE : written;
}
/**
* @param pageId Page ID.
* @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
* @param row Row.
* @param rowSize Row size.
* @return Written size which is always equal to row size here.
* @throws IgniteCheckedException If failed.
*/
private int addRow(
long pageId,
long page,
long pageAddr,
AbstractDataPageIO<T> io,
T row,
int rowSize
) throws IgniteCheckedException {
io.addRow(pageId, pageAddr, row, rowSize, pageSize());
if (needWalDeltaRecord(pageId, page, null)) {
// TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[rowSize];
DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
assert data.payloadSize() == rowSize;
PageUtils.getBytes(pageAddr, data.offset(), payload, 0, rowSize);
wal.log(new DataPageInsertRecord(
grpId,
pageId,
payload));
}
return rowSize;
}
/**
* @param pageId Page ID.
* @param page Page pointer.
* @param pageAddr Page address.
* @param io IO.
* @param row Row.
* @param written Written size.
* @param rowSize Row size.
* @return Updated written size.
* @throws IgniteCheckedException If failed.
*/
private int addRowFragment(
long pageId,
long page,
long pageAddr,
AbstractDataPageIO<T> io,
T row,
int written,
int rowSize
) throws IgniteCheckedException {
// Read last link before the fragment write, because it will be updated there.
long lastLink = row.link();
int payloadSize = io.addRowFragment(pageMem, pageId, pageAddr, row, written, rowSize, pageSize());
assert payloadSize > 0 : payloadSize;
if (needWalDeltaRecord(pageId, page, null)) {
// TODO IGNITE-5829 This record must contain only a reference to a logical WAL record with the actual data.
byte[] payload = new byte[payloadSize];
DataPagePayload data = io.readPayload(pageAddr, PageIdUtils.itemId(row.link()), pageSize());
PageUtils.getBytes(pageAddr, data.offset(), payload, 0, payloadSize);
wal.log(new DataPageInsertFragmentRecord(grpId, pageId, payload, lastLink));
}
return written + payloadSize;
}
}
/** */
private final PageHandler<ReuseBag, Long> rmvRow;
/**
*
*/
private final class RemoveRowHandler extends PageHandler<ReuseBag, Long> {
/** Indicates whether partition ID should be masked from page ID. */
private final boolean maskPartId;
/** */
RemoveRowHandler(boolean maskPartId) {
this.maskPartId = maskPartId;
}
@Override public Long run(
int cacheId,
long pageId,
long page,
long pageAddr,
PageIO iox,
Boolean walPlc,
ReuseBag reuseBag,
int itemId)
throws IgniteCheckedException {
AbstractDataPageIO<T> io = (AbstractDataPageIO<T>)iox;
int oldFreeSpace = io.getFreeSpace(pageAddr);
assert oldFreeSpace >= 0 : oldFreeSpace;
long nextLink = io.removeRow(pageAddr, itemId, pageSize());
if (needWalDeltaRecord(pageId, page, walPlc))
wal.log(new DataPageRemoveRecord(cacheId, pageId, itemId));
int newFreeSpace = io.getFreeSpace(pageAddr);
if (newFreeSpace > MIN_PAGE_FREE_SPACE) {
int newBucket = bucket(newFreeSpace, false);
boolean putIsNeeded = oldFreeSpace <= MIN_PAGE_FREE_SPACE;
if (!putIsNeeded) {
int oldBucket = bucket(oldFreeSpace, false);
if (oldBucket != newBucket) {
// It is possible that page was concurrently taken for put, in this case put will handle bucket change.
pageId = maskPartId ? PageIdUtils.maskPartitionId(pageId) : pageId;
putIsNeeded = removeDataPage(pageId, page, pageAddr, io, oldBucket);
}
}
if (io.isEmpty(pageAddr)) {
evictionTracker.forgetPage(pageId);
if (putIsNeeded)
reuseBag.addFreePage(recyclePage(pageId, page, pageAddr, null));
}
else if (putIsNeeded)
put(null, pageId, page, pageAddr, newBucket);
}
// For common case boxed 0L will be cached inside of Long, so no garbage will be produced.
return nextLink;
}
}
/**
* @param cacheId Cache ID.
* @param name Name (for debug purpose).
* @param memMetrics Memory metrics.
* @param memPlc Data region.
* @param reuseList Reuse list or {@code null} if this free list will be a reuse list for itself.
* @param wal Write ahead log manager.
* @param metaPageId Metadata page ID.
* @param initNew {@code True} if new metadata should be initialized.
* @throws IgniteCheckedException If failed.
*/
public AbstractFreeList(
int cacheId,
String name,
DataRegionMetricsImpl memMetrics,
DataRegion memPlc,
ReuseList reuseList,
IgniteWriteAheadLogManager wal,
long metaPageId,
boolean initNew) throws IgniteCheckedException {
super(cacheId, name, memPlc.pageMemory(), BUCKETS, wal, metaPageId);
rmvRow = new RemoveRowHandler(cacheId == 0);
this.evictionTracker = memPlc.evictionTracker();
this.reuseList = reuseList == null ? this : reuseList;
int pageSize = pageMem.pageSize();
assert U.isPow2(pageSize) : "Page size must be a power of 2: " + pageSize;
assert U.isPow2(BUCKETS);
assert BUCKETS <= pageSize : pageSize;
// TODO this constant is used because currently we cannot reuse data pages as index pages
// TODO and vice-versa. It should be removed when data storage format is finalized.
MIN_SIZE_FOR_DATA_PAGE = pageSize - AbstractDataPageIO.MIN_DATA_PAGE_OVERHEAD;
int shift = 0;
while (pageSize > BUCKETS) {
shift++;
pageSize >>>= 1;
}
this.shift = shift;
this.memMetrics = memMetrics;
init(metaPageId, initNew);
}
/**
* Calculates free space tracked by this FreeListImpl instance.
*
* @return Free space available for use, in bytes.
*/
public long freeSpace() {
long freeSpace = 0;
for (int b = BUCKETS - 2; b > 0; b--) {
long perPageFreeSpace = b << shift;
long pages = bucketsSize[b].longValue();
freeSpace += pages * perPageFreeSpace;
}
return freeSpace;
}
/** {@inheritDoc} */
@Override public void dumpStatistics(IgniteLogger log) {
long dataPages = 0;
final boolean dumpBucketsInfo = false;
for (int b = 0; b < BUCKETS; b++) {
long size = bucketsSize[b].longValue();
if (!isReuseBucket(b))
dataPages += size;
if (dumpBucketsInfo) {
Stripe[] stripes = getBucket(b);
boolean empty = true;
if (stripes != null) {
for (Stripe stripe : stripes) {
if (!stripe.empty) {
empty = false;
break;
}
}
}
if (log.isInfoEnabled())
log.info("Bucket [b=" + b +
", size=" + size +
", stripes=" + (stripes != null ? stripes.length : 0) +
", stripesEmpty=" + empty + ']');
}
}
if (dataPages > 0) {
if (log.isInfoEnabled())
log.info("FreeList [name=" + name +
", buckets=" + BUCKETS +
", dataPages=" + dataPages +
", reusePages=" + bucketsSize[REUSE_BUCKET].longValue() + "]");
}
}
/**
* @param freeSpace Page free space.
* @param allowReuse {@code True} if it is allowed to get reuse bucket.
* @return Bucket.
*/
private int bucket(int freeSpace, boolean allowReuse) {
assert freeSpace > 0 : freeSpace;
int bucket = freeSpace >>> shift;
assert bucket >= 0 && bucket < BUCKETS : bucket;
if (!allowReuse && isReuseBucket(bucket))
bucket--;
return bucket;
}
/**
* @param part Partition.
* @return Page ID.
* @throws IgniteCheckedException If failed.
*/
private long allocateDataPage(int part) throws IgniteCheckedException {
assert part <= PageIdAllocator.MAX_PARTITION_ID;
assert part != PageIdAllocator.INDEX_PARTITION;
return pageMem.allocatePage(grpId, part, PageIdAllocator.FLAG_DATA);
}
/** {@inheritDoc} */
@Override public void insertDataRow(T row) throws IgniteCheckedException {
int rowSize = row.size();
int written = 0;
do {
if (written != 0)
memMetrics.incrementLargeEntriesPages();
int remaining = rowSize - written;
long pageId = 0L;
for (int b = remaining < MIN_SIZE_FOR_DATA_PAGE ? bucket(remaining, false) + 1 : REUSE_BUCKET; b < BUCKETS; b++) {
pageId = takeEmptyPage(b, ioVersions());
if (pageId != 0L)
break;
}
AbstractDataPageIO<T> initIo = null;
if (pageId == 0L) {
pageId = allocateDataPage(row.partition());
initIo = ioVersions().latest();
}
else if (PageIdUtils.tag(pageId) != PageIdAllocator.FLAG_DATA)
pageId = initReusedPage(pageId, row.partition());
else
pageId = PageIdUtils.changePartitionId(pageId, (row.partition()));
written = write(pageId, writeRow, initIo, row, written, FAIL_I);
assert written != FAIL_I; // We can't fail here.
}
while (written != COMPLETE);
}
/**
* @param reusedPageId Reused page id.
* @param partId Partition id.
* @return Prepared page id.
*
* @see PagesList#initReusedPage(long, long, long, int, byte, PageIO)
*/
private long initReusedPage(long reusedPageId, int partId) throws IgniteCheckedException {
long reusedPage = acquirePage(reusedPageId);
try {
long reusedPageAddr = writeLock(reusedPageId, reusedPage);
assert reusedPageAddr != 0;
try {
return initReusedPage(reusedPageId, reusedPage, reusedPageAddr,
partId, PageIdAllocator.FLAG_DATA, ioVersions().latest());
}
finally {
writeUnlock(reusedPageId, reusedPage, reusedPageAddr, true);
}
}
finally {
releasePage(reusedPageId, reusedPage);
}
}
/** {@inheritDoc} */
@Override public boolean updateDataRow(long link, T row) throws IgniteCheckedException {
assert link != 0;
long pageId = PageIdUtils.pageId(link);
int itemId = PageIdUtils.itemId(link);
Boolean updated = write(pageId, updateRow, row, itemId, null);
assert updated != null; // Can't fail here.
return updated;
}
/** {@inheritDoc} */
@Override public <S, R> R updateDataRow(long link, PageHandler<S, R> pageHnd, S arg) throws IgniteCheckedException {
assert link != 0;
long pageId = PageIdUtils.pageId(link);
int itemId = PageIdUtils.itemId(link);
R updRes = write(pageId, pageHnd, arg, itemId, null);
assert updRes != null; // Can't fail here.
return updRes;
}
/** {@inheritDoc} */
@Override public void removeDataRowByLink(long link) throws IgniteCheckedException {
assert link != 0;
long pageId = PageIdUtils.pageId(link);
int itemId = PageIdUtils.itemId(link);
ReuseBag bag = new LongListReuseBag();
long nextLink = write(pageId, rmvRow, bag, itemId, FAIL_L);
assert nextLink != FAIL_L; // Can't fail here.
while (nextLink != 0L) {
memMetrics.decrementLargeEntriesPages();
itemId = PageIdUtils.itemId(nextLink);
pageId = PageIdUtils.pageId(nextLink);
nextLink = write(pageId, rmvRow, bag, itemId, FAIL_L);
assert nextLink != FAIL_L; // Can't fail here.
}
reuseList.addForRecycle(bag);
}
/** {@inheritDoc} */
@Override protected Stripe[] getBucket(int bucket) {
return buckets.get(bucket);
}
/** {@inheritDoc} */
@Override protected boolean casBucket(int bucket, Stripe[] exp, Stripe[] upd) {
return buckets.compareAndSet(bucket, exp, upd);
}
/** {@inheritDoc} */
@Override protected boolean isReuseBucket(int bucket) {
return bucket == REUSE_BUCKET;
}
/**
* @return Number of empty data pages in free list.
*/
public int emptyDataPages() {
return bucketsSize[REUSE_BUCKET].intValue();
}
/** {@inheritDoc} */
@Override public void addForRecycle(ReuseBag bag) throws IgniteCheckedException {
assert reuseList == this : "not allowed to be a reuse list";
put(bag, 0, 0, 0L, REUSE_BUCKET);
}
/** {@inheritDoc} */
@Override public long takeRecycledPage() throws IgniteCheckedException {
assert reuseList == this : "not allowed to be a reuse list";
return takeEmptyPage(REUSE_BUCKET, null);
}
/** {@inheritDoc} */
@Override public long recycledPagesCount() throws IgniteCheckedException {
assert reuseList == this : "not allowed to be a reuse list";
return storedPagesCount(REUSE_BUCKET);
}
/**
* @return IOVersions.
*/
public abstract IOVersions<? extends AbstractDataPageIO<T>> ioVersions();
/** {@inheritDoc} */
@Override public String toString() {
return "FreeList [name=" + name + ']';
}
}