blob: 33fc753691b61f0d781bc638a53e86b9492f280c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.persistence;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.pagemem.FullPageId;
import org.apache.ignite.internal.pagemem.PageIdAllocator;
import org.apache.ignite.internal.pagemem.PageIdUtils;
import org.apache.ignite.internal.pagemem.PageMemory;
import org.apache.ignite.internal.pagemem.PageSupport;
import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.WALIterator;
import org.apache.ignite.internal.pagemem.wal.WALPointer;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord;
import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV2;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheDiagnosticManager;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheTtlManager;
import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion;
import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeList;
import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow;
import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask;
import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx;
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange;
import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap;
import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage;
import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorageImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO;
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList;
import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl;
import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer;
import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore;
import org.apache.ignite.internal.processors.cache.tree.CacheDataTree;
import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree;
import org.apache.ignite.internal.processors.cache.tree.PendingRow;
import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult;
import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.lang.GridCursor;
import org.apache.ignite.internal.util.lang.IgniteInClosure2X;
import org.apache.ignite.internal.util.lang.IgnitePredicateX;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
/**
* Used when persistence enabled.
*/
public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl implements DbCheckpointListener {
/**
* Throttling timeout in millis which avoid excessive PendingTree access on unwind
* if there is nothing to clean yet.
*/
private final long unwindThrottlingTimeout = Long.getLong(
IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, 500L);
/** */
private IndexStorage indexStorage;
/** */
private ReuseListImpl reuseList;
/** Page list cache limit for data region of this cache group. */
private AtomicLong pageListCacheLimit;
/** Flag indicates that all group partitions have restored their state from page memory / disk. */
private volatile boolean partitionStatesRestored;
/** {@inheritDoc} */
@Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException {
// No-op. Per-partition PendingTree should be used.
}
/** {@inheritDoc} */
@Override protected void initDataStructures() throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
Metas metas = getOrAllocateCacheMetas();
CacheDiagnosticManager diagnosticMgr = ctx.diagnostic();
String reuseListName = grp.cacheOrGroupName() + "##ReuseList";
String indexStorageTreeName = grp.cacheOrGroupName() + "##IndexStorageTree";
RootPage reuseListRoot = metas.reuseListRoot;
pageListCacheLimit = ((GridCacheDatabaseSharedManager)ctx.database()).pageListCacheLimitHolder(grp.dataRegion());
reuseList = new ReuseListImpl(
grp.groupId(),
grp.cacheOrGroupName(),
grp.dataRegion().pageMemory(),
ctx.wal(),
reuseListRoot.pageId().pageId(),
reuseListRoot.isAllocated(),
diagnosticMgr.pageLockTracker().createPageLockTracker(reuseListName),
ctx.kernalContext(),
pageListCacheLimit
);
RootPage metastoreRoot = metas.treeRoot;
indexStorage = new IndexStorageImpl(
grp.dataRegion().pageMemory(),
ctx.wal(),
globalRemoveId(),
grp.groupId(),
grp.sharedGroup(),
PageIdAllocator.INDEX_PARTITION,
PageIdAllocator.FLAG_IDX,
reuseList,
metastoreRoot.pageId().pageId(),
metastoreRoot.isAllocated(),
ctx.kernalContext().failure(),
diagnosticMgr.pageLockTracker().createPageLockTracker(indexStorageTreeName)
);
((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this);
}
/**
* Get internal IndexStorage.
* See {@link UpgradePendingTreeToPerPartitionTask} for details.
*/
public IndexStorage getIndexStorage() {
return indexStorage;
}
/** {@inheritDoc} */
@Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException {
if (ctx.database() instanceof GridCacheDatabaseSharedManager)
((GridCacheDatabaseSharedManager) ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p);
boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p);
return new GridCacheDataStore(p, exists);
}
/** {@inheritDoc} */
@Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException {
/* No-op. */
}
/** {@inheritDoc} */
@Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException {
assert grp.dataRegion().pageMemory() instanceof PageMemoryEx;
syncMetadata(ctx);
}
/** {@inheritDoc} */
@Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException {
if (!ctx.nextSnapshot())
syncMetadata(ctx);
}
/**
* Syncs and saves meta-information of all data structures to page memory.
*
* @throws IgniteCheckedException If failed.
*/
private void syncMetadata(Context ctx) throws IgniteCheckedException {
Executor execSvc = ctx.executor();
boolean needSnapshot = ctx.nextSnapshot() && ctx.needToSnapshot(grp.cacheOrGroupName());
if (needSnapshot) {
if (execSvc == null)
addPartitions(ctx);
else {
execSvc.execute(() -> {
try {
addPartitions(ctx);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
}
}
syncMetadata(ctx, ctx.executor(), needSnapshot);
}
/**
* Syncs and saves meta-information of all data structures to page memory.
*
* @param execSvc Executor service to run save process
* @throws IgniteCheckedException If failed.
*/
private void syncMetadata(Context ctx, Executor execSvc, boolean needSnapshot) throws IgniteCheckedException {
if (execSvc == null) {
reuseList.saveMetadata(grp.statisticsHolderData());
for (CacheDataStore store : partDataStores.values())
saveStoreMetadata(store, ctx, false, needSnapshot);
}
else {
execSvc.execute(() -> {
try {
reuseList.saveMetadata(grp.statisticsHolderData());
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
for (CacheDataStore store : partDataStores.values())
execSvc.execute(() -> {
try {
saveStoreMetadata(store, ctx, false, needSnapshot);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
});
}
}
/**
* @param store Store to save metadata.
* @throws IgniteCheckedException If failed.
*/
private void saveStoreMetadata(
CacheDataStore store,
Context ctx,
boolean beforeDestroy,
boolean needSnapshot
) throws IgniteCheckedException {
RowStore rowStore0 = store.rowStore();
if (rowStore0 != null) {
((CacheFreeList)rowStore0.freeList()).saveMetadata(grp.statisticsHolderData());
PartitionMetaStorage<SimpleDataRow> partStore = store.partStorage();
long updCntr = store.updateCounter();
long size = store.fullSize();
long rmvId = globalRemoveId().get();
byte[] updCntrsBytes = store.partUpdateCounter().getBytes();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
IgniteWriteAheadLogManager wal = this.ctx.wal();
if (size > 0 || updCntr > 0 || !store.partUpdateCounter().sequential()) {
GridDhtPartitionState state = null;
// localPartition will not acquire writeLock here because create=false.
GridDhtLocalPartition part = null;
if (!grp.isLocal()) {
if (beforeDestroy)
state = GridDhtPartitionState.EVICTED;
else {
part = getPartition(store);
if (part != null && part.state() != GridDhtPartitionState.EVICTED)
state = part.state();
}
// Do not save meta for evicted partitions on next checkpoints.
if (state == null)
return;
}
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId());
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
if (partMetaPageAddr == 0L) {
U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage +
", beforeDestroy=" + beforeDestroy + ", size=" + size +
", updCntr=" + updCntr + ", state=" + state + ']');
return;
}
boolean changed = false;
try {
PagePartitionMetaIOV2 io = PageIO.getPageIO(partMetaPageAddr);
long link = io.getGapsLink(partMetaPageAddr);
if (updCntrsBytes == null && link != 0) {
partStore.removeDataRowByLink(link, grp.statisticsHolderData());
io.setGapsLink(partMetaPageAddr, (link = 0));
changed = true;
}
else if (updCntrsBytes != null && link == 0) {
SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes);
partStore.insertDataRow(row, grp.statisticsHolderData());
io.setGapsLink(partMetaPageAddr, (link = row.link()));
changed = true;
}
else if (updCntrsBytes != null && link != 0) {
byte[] prev = partStore.readRow(link);
assert prev != null : "Read null gaps using link=" + link;
if (!Arrays.equals(prev, updCntrsBytes)) {
partStore.removeDataRowByLink(link, grp.statisticsHolderData());
SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes);
partStore.insertDataRow(row, grp.statisticsHolderData());
io.setGapsLink(partMetaPageAddr, (link = row.link()));
changed = true;
}
}
if (changed)
partStore.saveMetadata(grp.statisticsHolderData());
changed |= io.setUpdateCounter(partMetaPageAddr, updCntr);
changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId);
changed |= io.setSize(partMetaPageAddr, size);
if (state != null)
changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal());
else
assert grp.isLocal() : grp.cacheOrGroupName();
long cntrsPageId;
if (grp.sharedGroup()) {
long initCntrPageId = io.getCountersPageId(partMetaPageAddr);
Map<Integer, Long> newSizes = store.cacheSizes();
Map<Integer, Long> prevSizes = readSharedGroupCacheSizes(pageMem, grpId, initCntrPageId);
if (prevSizes != null && prevSizes.equals(newSizes))
cntrsPageId = initCntrPageId; // Preventing modification of sizes pages for store
else {
cntrsPageId = writeSharedGroupCacheSizes(pageMem, grpId, initCntrPageId,
store.partId(), newSizes);
if (initCntrPageId == 0 && cntrsPageId != 0) {
io.setCountersPageId(partMetaPageAddr, cntrsPageId);
changed = true;
}
}
}
else
cntrsPageId = 0L;
int pageCnt;
if (needSnapshot) {
pageCnt = this.ctx.pageStore().pages(grpId, store.partId());
io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt);
if (state == OWNING) {
assert part != null;
if (!addPartition(
part,
ctx.partitionStatMap(),
partMetaPageAddr,
io,
grpId,
store.partId(),
this.ctx.pageStore().pages(grpId, store.partId()),
store.fullSize()
))
U.warn(log, "Partition was concurrently evicted grpId=" + grpId +
", partitionId=" + part.id());
}
else if (state == MOVING || state == RENTING) {
if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) {
if (log.isInfoEnabled())
log.info("Will not include SQL indexes to snapshot because there is " +
"a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() +
", partId=" + store.partId() + ", state=" + state + ']');
}
}
changed = true;
}
else
pageCnt = io.getCandidatePageCount(partMetaPageAddr);
if (changed && PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null))
wal.log(new MetaPageUpdatePartitionDataRecordV2(
grpId,
partMetaId,
updCntr,
rmvId,
(int)size, // TODO: Partition size may be long
cntrsPageId,
state == null ? -1 : (byte)state.ordinal(),
pageCnt,
link
));
}
finally {
pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed);
}
}
finally {
pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
else if (needSnapshot)
tryAddEmptyPartitionToSnapshot(store, ctx);
}
else if (needSnapshot)
tryAddEmptyPartitionToSnapshot(store, ctx);
}
/** {@inheritDoc} */
@Override public long restorePartitionStates(Map<GroupPartitionId, Integer> partitionRecoveryStates) throws IgniteCheckedException {
if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled())
return 0;
if (partitionStatesRestored)
return 0;
long processed = 0;
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
for (int p = 0; p < grp.affinity().partitions(); p++) {
Integer recoverState = partitionRecoveryStates.get(new GroupPartitionId(grp.groupId(), p));
if (ctx.pageStore().exists(grp.groupId(), p)) {
ctx.pageStore().ensure(grp.groupId(), p);
if (ctx.pageStore().pages(grp.groupId(), p) <= 1) {
if (log.isDebugEnabled())
log.debug("Skipping partition on recovery (pages less than 1) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
continue;
}
if (log.isDebugEnabled())
log.debug("Creating partition on recovery (exists in page store) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
processed++;
GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
// Triggers initialization of existing(having datafile) partition before acquiring cp read lock.
part.dataStore().init();
ctx.database().checkpointReadLock();
try {
long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p);
long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId);
try {
long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage);
boolean changed = false;
try {
PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr);
if (recoverState != null) {
io.setPartitionState(pageAddr, (byte) recoverState.intValue());
changed = updateState(part, recoverState);
if (log.isDebugEnabled())
log.debug("Restored partition state (from WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
", updCntr=" + part.initialUpdateCounter() + "]");
}
else {
int stateId = (int) io.getPartitionState(pageAddr);
changed = updateState(part, stateId);
if (log.isDebugEnabled())
log.debug("Restored partition state (from page memory) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + "]");
}
}
finally {
pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed);
}
}
finally {
pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage);
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
else if (recoverState != null) { // Pre-create partition if having valid state.
GridDhtLocalPartition part = grp.topology().forceCreatePartition(p);
updateState(part, recoverState);
processed++;
if (log.isDebugEnabled())
log.debug("Restored partition state (from WAL) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() +
", updCntr=" + part.initialUpdateCounter() + "]");
}
else {
if (log.isDebugEnabled())
log.debug("Skipping partition on recovery (no page store OR wal state) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + "]");
}
}
partitionStatesRestored = true;
return processed;
}
/**
* @param part Partition to restore state for.
* @param stateId State enum ordinal.
* @return Updated flag.
*/
private boolean updateState(GridDhtLocalPartition part, int stateId) {
if (stateId != -1) {
GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId);
assert state != null;
part.restoreState(state == GridDhtPartitionState.EVICTED ? GridDhtPartitionState.RENTING : state);
return true;
}
return false;
}
/**
* Check that we need to snapshot this partition and add it to map.
*
* @param store Store.
* @param ctx Snapshot context.
*/
private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context ctx) {
if (getPartition(store).state() == OWNING) {
ctx.partitionStatMap().put(
new GroupPartitionId(grp.groupId(), store.partId()),
new PagesAllocationRange(0, 0));
}
}
/**
* @param store Store.
*
* @return corresponding to store local partition
*/
private GridDhtLocalPartition getPartition(CacheDataStore store) {
return grp.topology().localPartition(store.partId(),
AffinityTopologyVersion.NONE, false, true);
}
/**
* Loads cache sizes for all caches in shared group.
*
* @param pageMem page memory to perform operations on pages.
* @param grpId Cache group ID.
* @param cntrsPageId Counters page ID, if zero is provided that means no counters page exist.
* @return Cache sizes if store belongs to group containing multiple caches and sizes are available in memory. May
* return null if counter page does not exist.
* @throws IgniteCheckedException If page memory operation failed.
*/
@Nullable private static Map<Integer, Long> readSharedGroupCacheSizes(PageSupport pageMem, int grpId,
long cntrsPageId) throws IgniteCheckedException {
if (cntrsPageId == 0L)
return null;
Map<Integer, Long> cacheSizes = new HashMap<>();
long nextId = cntrsPageId;
while (true) {
final long curId = nextId;
final long curPage = pageMem.acquirePage(grpId, curId);
try {
final long curAddr = pageMem.readLock(grpId, curId, curPage);
assert curAddr != 0;
try {
PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr);
if (cntrsIO.readCacheSizes(curAddr, cacheSizes))
break;
nextId = cntrsIO.getNextCountersPageId(curAddr);
assert nextId != 0;
}
finally {
pageMem.readUnlock(grpId, curId, curPage);
}
}
finally {
pageMem.releasePage(grpId, curId, curPage);
}
}
return cacheSizes;
}
/**
* Saves cache sizes for all caches in shared group. Unconditionally marks pages as dirty.
*
* @param pageMem page memory to perform operations on pages.
* @param grpId Cache group ID.
* @param cntrsPageId Counters page ID, if zero is provided that means no counters page exist.
* @param partId Partition ID.
* @param sizes Cache sizes of all caches in group. Not null.
* @return new counter page Id. Same as {@code cntrsPageId} or new value if cache size pages were initialized.
* @throws IgniteCheckedException if page memory operation failed.
*/
private static long writeSharedGroupCacheSizes(PageMemory pageMem, int grpId,
long cntrsPageId, int partId, Map<Integer, Long> sizes) throws IgniteCheckedException {
byte[] data = PagePartitionCountersIO.VERSIONS.latest().serializeCacheSizes(sizes);
int items = data.length / PagePartitionCountersIO.ITEM_SIZE;
boolean init = cntrsPageId == 0;
if (init && !sizes.isEmpty())
cntrsPageId = pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
long nextId = cntrsPageId;
int written = 0;
while (written != items) {
final long curId = nextId;
final long curPage = pageMem.acquirePage(grpId, curId);
try {
final long curAddr = pageMem.writeLock(grpId, curId, curPage);
assert curAddr != 0;
try {
PagePartitionCountersIO partCntrIo;
if (init) {
partCntrIo = PagePartitionCountersIO.VERSIONS.latest();
partCntrIo.initNewPage(curAddr, curId, pageMem.realPageSize(grpId));
}
else
partCntrIo = PageIO.getPageIO(curAddr);
written += partCntrIo.writeCacheSizes(pageMem.realPageSize(grpId), curAddr, data, written);
nextId = partCntrIo.getNextCountersPageId(curAddr);
if (written != items && (init = nextId == 0)) {
//allocate new counters page
nextId = pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
partCntrIo.setNextCountersPageId(curAddr, nextId);
}
}
finally {
// Write full page
pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE, true);
}
}
finally {
pageMem.releasePage(grpId, curId, curPage);
}
}
return cntrsPageId;
}
/**
* @param ctx Context.
*/
private void addPartitions(Context ctx) throws IgniteCheckedException {
int grpId = grp.groupId();
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
long metaPageId = pageMem.metaPageId(grpId);
long metaPage = pageMem.acquirePage(grpId, metaPageId);
try {
long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage);
try {
PageMetaIO metaIo = PageMetaIO.getPageIO(metaPageAddr);
addPartition(
null,
ctx.partitionStatMap(),
metaPageAddr,
metaIo,
grpId,
PageIdAllocator.INDEX_PARTITION,
this.ctx.pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION),
-1);
}
finally {
pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true);
}
}
finally {
pageMem.releasePage(grpId, metaPageId, metaPage);
}
}
/**
* @param part Local partition.
* @param map Map to add values to.
* @param metaPageAddr Meta page address
* @param io Page Meta IO
* @param grpId Cache Group ID.
* @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, grpId]</code>
*/
private static boolean addPartition(
GridDhtLocalPartition part,
final PartitionAllocationMap map,
final long metaPageAddr,
final PageMetaIO io,
final int grpId,
final int partId,
final int currAllocatedPageCnt,
final long partSize
) {
if (part != null) {
boolean reserved = part.reserve();
if (!reserved)
return false;
}
else
assert partId == PageIdAllocator.INDEX_PARTITION : partId;
assert PageIO.getPageId(metaPageAddr) != 0;
int lastAllocatedPageCnt = io.getLastAllocatedPageCount(metaPageAddr);
int curPageCnt = partSize == 0 ? 0 : currAllocatedPageCnt;
map.put(
new GroupPartitionId(grpId, partId),
new PagesAllocationRange(lastAllocatedPageCnt, curPageCnt));
return true;
}
/** {@inheritDoc} */
@Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException {
assert ctx.database() instanceof GridCacheDatabaseSharedManager
: "Destroying cache data store when persistence is not enabled: " + ctx.database();
int partId = store.partId();
ctx.database().checkpointReadLock();
try {
saveStoreMetadata(store, null, true, false);
}
finally {
ctx.database().checkpointReadUnlock();
}
((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId);
}
/**
* Invalidates page memory for given partition. Destroys partition store.
* <b>NOTE:</b> This method can be invoked only within checkpoint lock or checkpointer thread.
*
* @param grpId Group ID.
* @param partId Partition ID.
*
* @throws IgniteCheckedException If destroy has failed.
*/
public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException {
PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory();
int tag = pageMemory.invalidate(grp.groupId(), partId);
if (grp.walEnabled())
ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId));
ctx.pageStore().onPartitionDestroyed(grpId, partId, tag);
}
/** {@inheritDoc} */
@Override public void onPartitionCounterUpdated(int part, long cntr) {
CacheDataStore store = partDataStores.get(part);
assert store != null;
long oldCnt = store.updateCounter();
if (oldCnt < cntr)
store.updateCounter(cntr);
}
/** {@inheritDoc} */
@Override public void onPartitionInitialCounterUpdated(int part, long start, long delta) {
CacheDataStore store = partDataStores.get(part);
assert store != null;
store.updateInitialCounter(start, delta);
}
/** {@inheritDoc} */
@Override public long lastUpdatedPartitionCounter(int part) {
return partDataStores.get(part).updateCounter();
}
/** {@inheritDoc} */
@Override public RootPage rootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException {
return indexStorage.allocateCacheIndex(cacheId, idxName, segment);
}
/** {@inheritDoc} */
@Override public void dropRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException {
indexStorage.dropCacheIndex(cacheId, idxName, segment);
}
/** {@inheritDoc} */
@Override public ReuseList reuseListForIndex(String idxName) {
return reuseList;
}
/** {@inheritDoc} */
@Override public void stop() {
if (grp.affinityNode())
((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this);
}
/**
* @return Meta root pages info.
* @throws IgniteCheckedException If failed.
*/
private Metas getOrAllocateCacheMetas() throws IgniteCheckedException {
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
IgniteWriteAheadLogManager wal = ctx.wal();
int grpId = grp.groupId();
long metaId = pageMem.metaPageId(grpId);
long metaPage = pageMem.acquirePage(grpId, metaId);
try {
final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage);
boolean allocated = false;
try {
long metastoreRoot, reuseListRoot;
if (PageIO.getType(pageAddr) != PageIO.T_META) {
PageMetaIO pageIO = PageMetaIO.VERSIONS.latest();
pageIO.initNewPage(pageAddr, metaId, pageMem.realPageSize(grpId));
metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX);
pageIO.setTreeRoot(pageAddr, metastoreRoot);
pageIO.setReuseListRoot(pageAddr, reuseListRoot);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) {
assert pageIO.getType() == PageIO.T_META;
wal.log(new MetaPageInitRecord(
grpId,
metaId,
pageIO.getType(),
pageIO.getVersion(),
metastoreRoot,
reuseListRoot
));
}
allocated = true;
}
else {
PageMetaIO pageIO = PageIO.getPageIO(pageAddr);
metastoreRoot = pageIO.getTreeRoot(pageAddr);
reuseListRoot = pageIO.getReuseListRoot(pageAddr);
assert reuseListRoot != 0L;
}
return new Metas(
new RootPage(new FullPageId(metastoreRoot, grpId), allocated),
new RootPage(new FullPageId(reuseListRoot, grpId), allocated),
null,
null);
}
finally {
pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated);
}
}
finally {
pageMem.releasePage(grpId, metaId, metaPage);
}
}
/** {@inheritDoc} */
@Override @Nullable protected IgniteHistoricalIterator historicalIterator(
CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException {
if (partCntrs == null || partCntrs.isEmpty())
return null;
if (grp.mvccEnabled()) // TODO IGNITE-7384
return super.historicalIterator(partCntrs, missing);
GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database();
FileWALPointer minPtr = null;
for (int i = 0; i < partCntrs.size(); i++) {
int p = partCntrs.partitionAt(i);
long initCntr = partCntrs.initialUpdateCounterAt(i);
FileWALPointer startPtr = (FileWALPointer)database.checkpointHistory().searchPartitionCounter(
grp.groupId(), p, initCntr);
if (startPtr == null)
throw new IgniteCheckedException("Could not find start pointer for partition [part=" + p + ", partCntrSince=" + initCntr + "]");
if (minPtr == null || startPtr.compareTo(minPtr) < 0)
minPtr = startPtr;
}
WALIterator it = grp.shared().wal().replay(minPtr);
WALHistoricalIterator iterator = new WALHistoricalIterator(log, grp, partCntrs, it);
// Add historical partitions which are unabled to reserve to missing set.
missing.addAll(iterator.missingParts);
return iterator;
}
/** {@inheritDoc} */
@Override public boolean expire(
GridCacheContext cctx,
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
int amount
) throws IgniteCheckedException {
assert !cctx.isNear() : cctx.name();
// Prevent manager being stopped in the middle of pds operation.
if (!busyLock.enterBusy())
return false;
try {
int cleared = 0;
for (CacheDataStore store : cacheDataStores()) {
cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, amount - cleared);
if (amount != -1 && cleared >= amount)
return true;
}
}
finally {
busyLock.leaveBusy();
}
return false;
}
/** {@inheritDoc} */
@Override public long expiredSize() throws IgniteCheckedException {
long size = 0;
for (CacheDataStore store : cacheDataStores())
size += ((GridCacheDataStore)store).expiredSize();
return size;
}
/** {@inheritDoc} */
@Override public void preloadPartition(int part) throws IgniteCheckedException {
if (grp.isLocal()) {
dataStore(part).preload();
return;
}
GridDhtLocalPartition locPart = grp.topology().localPartition(part, AffinityTopologyVersion.NONE, false, false);
assert locPart != null && locPart.reservations() > 0;
locPart.dataStore().preload();
}
/**
* Calculates free space of all partition data stores - number of bytes available for use in allocated pages.
*
* @return free space size in bytes.
*/
long freeSpace() {
long freeSpace = 0;
for (CacheDataStore store : partDataStores.values()) {
assert store instanceof GridCacheDataStore;
AbstractFreeList freeList = ((GridCacheDataStore)store).freeList;
if (freeList == null)
continue;
freeSpace += freeList.freeSpace();
}
return freeSpace;
}
/**
* Calculates empty data pages of all partition data stores.
*
* @return empty data pages count.
*/
long emptyDataPages() {
long emptyDataPages = 0;
for (CacheDataStore store : partDataStores.values()) {
assert store instanceof GridCacheDataStore;
AbstractFreeList freeList = ((GridCacheDataStore)store).freeList;
if (freeList == null)
continue;
emptyDataPages += freeList.emptyDataPages();
}
return emptyDataPages;
}
/**
* @param cacheId Which was stopped, but its data still presented.
* @throws IgniteCheckedException If failed.
*/
public void findAndCleanupLostIndexesForStoppedCache(int cacheId) throws IgniteCheckedException {
for (String name : indexStorage.getIndexNames()) {
if (indexStorage.nameIsAssosiatedWithCache(name, cacheId)) {
ctx.database().checkpointReadLock();
try {
RootPage page = indexStorage.allocateIndex(name);
ctx.kernalContext().query().getIndexing().destroyOrphanIndex(
page,
name,
grp.groupId(),
grp.dataRegion().pageMemory(), globalRemoveId(),
reuseListForIndex(name),
grp.mvccEnabled()
);
indexStorage.dropIndex(name);
}
finally {
ctx.database().checkpointReadUnlock();
}
}
}
}
/**
*
*/
private static class WALHistoricalIterator implements IgniteHistoricalIterator {
/** */
private static final long serialVersionUID = 0L;
/** Logger. */
private IgniteLogger log;
/** Cache context. */
private final CacheGroupContext grp;
/** Partition counters map. */
private final CachePartitionPartialCountersMap partMap;
/** Partitions marked as missing (unable to reserve or partition is not in OWNING state). */
private final Set<Integer> missingParts = new HashSet<>();
/** Partitions marked as done. */
private final Set<Integer> doneParts = new HashSet<>();
/** Cache IDs. This collection is stored as field to avoid re-calculation on each iteration. */
private final Set<Integer> cacheIds;
/** WAL iterator. */
private WALIterator walIt;
/** */
private Iterator<DataEntry> entryIt;
/** */
private DataEntry next;
/**
* Rebalanced counters in the range from initialUpdateCntr to updateCntr.
* Invariant: initUpdCntr[idx] + rebalancedCntrs[idx] = updateCntr[idx]
*/
private long[] rebalancedCntrs;
/** A partition what will be finished on next iteration. */
private int donePart = -1;
/**
* @param log Logger.
* @param grp Cache context.
* @param walIt WAL iterator.
*/
private WALHistoricalIterator(IgniteLogger log, CacheGroupContext grp, CachePartitionPartialCountersMap partMap,
WALIterator walIt) {
this.log = log;
this.grp = grp;
this.partMap = partMap;
this.walIt = walIt;
cacheIds = grp.cacheIds();
rebalancedCntrs = new long[partMap.size()];
for (int i = 0; i < rebalancedCntrs.length; i++)
rebalancedCntrs[i] = partMap.initialUpdateCounterAt(i);
reservePartitions();
advance();
}
/** {@inheritDoc} */
@Override public boolean contains(int partId) {
return partMap.contains(partId);
}
/** {@inheritDoc} */
@Override public boolean isDone(int partId) {
return doneParts.contains(partId);
}
/** {@inheritDoc} */
@Override public void close() throws IgniteCheckedException {
walIt.close();
releasePartitions();
}
/** {@inheritDoc} */
@Override public boolean isClosed() {
return walIt.isClosed();
}
/** {@inheritDoc} */
@Override public boolean hasNextX() {
return hasNext();
}
/** {@inheritDoc} */
@Override public CacheDataRow nextX() throws IgniteCheckedException {
return next();
}
/** {@inheritDoc} */
@Override public void removeX() throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public Iterator<CacheDataRow> iterator() {
return this;
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
return next != null;
}
/** {@inheritDoc} */
@Override public CacheDataRow next() {
if (next == null)
throw new NoSuchElementException();
CacheDataRow val = new DataEntryRow(next);
if (donePart != -1) {
int pIdx = partMap.partitionIndex(donePart);
if (log.isDebugEnabled()) {
log.debug("Partition done [grpId=" + grp.groupId() +
", partId=" + donePart +
", from=" + partMap.initialUpdateCounterAt(pIdx) +
", to=" + partMap.updateCounterAt(pIdx) + ']');
}
doneParts.add(donePart);
donePart = -1;
}
advance();
return val;
}
/** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException();
}
/**
* Reserve historical partitions.
* If partition is unable to reserve, id of that partition is placed to {@link #missingParts} set.
*/
private void reservePartitions() {
for (int i = 0; i < partMap.size(); i++) {
int p = partMap.partitionAt(i);
GridDhtLocalPartition part = grp.topology().localPartition(p);
if (part == null || !part.reserve()) {
missingParts.add(p);
continue;
}
if (part.state() != OWNING) {
part.release();
missingParts.add(p);
}
}
}
/**
* Release historical partitions.
*/
private void releasePartitions() {
for (int i = 0; i < partMap.size(); i++) {
int p = partMap.partitionAt(i);
if (missingParts.contains(p))
continue;
GridDhtLocalPartition part = grp.topology().localPartition(p);
assert part != null && part.state() == OWNING && part.reservations() > 0
: "Partition should in OWNING state and has at least 1 reservation";
part.release();
}
}
/**
*
*/
private void advance() {
next = null;
outer: while (doneParts.size() != partMap.size()) {
if (entryIt != null) {
while (entryIt.hasNext()) {
DataEntry entry = entryIt.next();
if (cacheIds.contains(entry.cacheId())) {
int idx = partMap.partitionIndex(entry.partitionId());
if (idx < 0 || missingParts.contains(idx))
continue;
long from = partMap.initialUpdateCounterAt(idx);
long to = partMap.updateCounterAt(idx);
if (entry.partitionCounter() > from && entry.partitionCounter() <= to) {
// Partition will be marked as done for current entry on next iteration.
if (++rebalancedCntrs[idx] == to)
donePart = entry.partitionId();
next = entry;
return;
}
}
}
}
entryIt = null;
// Search for next DataEntry while applying rollback counters.
while (walIt.hasNext()) {
IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next();
if (rec.get2() instanceof DataRecord) {
DataRecord data = (DataRecord)rec.get2();
entryIt = data.writeEntries().iterator();
// Move on to the next valid data entry.
continue outer;
}
else if (rec.get2() instanceof RollbackRecord) {
RollbackRecord rbRec = (RollbackRecord)rec.get2();
if (grp.groupId() == rbRec.groupId()) {
int idx = partMap.partitionIndex(rbRec.partitionId());
if (idx < 0 || missingParts.contains(idx))
continue;
long from = partMap.initialUpdateCounterAt(idx);
long to = partMap.updateCounterAt(idx);
rebalancedCntrs[idx] += rbRec.overlap(from, to);
if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) {
if (log.isDebugEnabled()) {
log.debug("Partition done [grpId=" + grp.groupId() +
", partId=" + donePart +
", from=" + from +
", to=" + to + ']');
}
doneParts.add(rbRec.partitionId()); // Add to done set immediately.
}
}
}
}
if (entryIt == null && doneParts.size() != partMap.size()) {
for (int i = 0; i < partMap.size(); i++) {
int p = partMap.partitionAt(i);
if (!doneParts.contains(p)) {
log.warning("Some partition entries were missed during historical rebalance [grp=" + grp + ", part=" + p + ", missed=" +
(partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']');
doneParts.add(p);
}
}
return;
}
}
}
}
/**
* Data entry row.
*/
private static class DataEntryRow implements CacheDataRow {
/** */
private final DataEntry entry;
/**
* @param entry Data entry.
*/
private DataEntryRow(DataEntry entry) {
this.entry = entry;
}
/** {@inheritDoc} */
@Override public KeyCacheObject key() {
return entry.key();
}
/** {@inheritDoc} */
@Override public void key(KeyCacheObject key) {
throw new IllegalStateException();
}
/** {@inheritDoc} */
@Override public CacheObject value() {
return entry.value();
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return entry.writeVersion();
}
/** {@inheritDoc} */
@Override public long expireTime() {
return entry.expireTime();
}
/** {@inheritDoc} */
@Override public int partition() {
return entry.partitionId();
}
/** {@inheritDoc} */
@Override public int size() throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public int headerSize() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public long link() {
return 0;
}
/** {@inheritDoc} */
@Override public void link(long link) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public int hash() {
return entry.key().hashCode();
}
/** {@inheritDoc} */
@Override public int cacheId() {
return entry.cacheId();
}
/** {@inheritDoc} */
@Override public long mvccCoordinatorVersion() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public long mvccCounter() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public int mvccOperationCounter() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public long newMvccCoordinatorVersion() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public long newMvccCounter() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public int newMvccOperationCounter() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public byte mvccTxState() {
return 0; // TODO IGNITE-7384
}
/** {@inheritDoc} */
@Override public byte newMvccTxState() {
return 0; // TODO IGNITE-7384
}
}
/**
*
*/
private static class Metas {
/** */
@GridToStringInclude
private final RootPage reuseListRoot;
/** */
@GridToStringInclude
private final RootPage treeRoot;
/** */
@GridToStringInclude
private final RootPage pendingTreeRoot;
/** */
@GridToStringInclude
private final RootPage partMetastoreReuseListRoot;
/**
* @param treeRoot Metadata storage root.
* @param reuseListRoot Reuse list root.
*/
Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage pendingTreeRoot, RootPage partMetastoreReuseListRoot) {
this.treeRoot = treeRoot;
this.reuseListRoot = reuseListRoot;
this.pendingTreeRoot = pendingTreeRoot;
this.partMetastoreReuseListRoot = partMetastoreReuseListRoot;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(Metas.class, this);
}
}
/**
*
*/
public class GridCacheDataStore implements CacheDataStore {
/** */
private final int partId;
/** */
private volatile AbstractFreeList<CacheDataRow> freeList;
/** */
private PendingEntriesTree pendingTree;
/** */
private volatile CacheDataStoreImpl delegate;
/**
* Cache id which should be throttled.
*/
private volatile int lastThrottledCacheId;
/**
* Timestamp when next clean try will be allowed for the current partition
* in accordance with the value of {@code lastThrottledCacheId}.
* Used for fine-grained throttling on per-partition basis.
*/
private volatile long nextStoreCleanTimeNanos;
/** */
private PartitionMetaStorage<SimpleDataRow> partStorage;
/** */
private final boolean exists;
/** */
private final AtomicBoolean init = new AtomicBoolean();
/** */
private final CountDownLatch latch = new CountDownLatch(1);
/**
* @param partId Partition.
* @param exists {@code True} if store exists.
*/
private GridCacheDataStore(int partId, boolean exists) {
this.partId = partId;
this.exists = exists;
}
/**
* @return Name of free pages list.
*/
private String freeListName() {
return grp.cacheOrGroupName() + "-" + partId;
}
/**
* @return Name of partition meta store.
*/
private String partitionMetaStoreName() {
return grp.cacheOrGroupName() + "-partstore-" + partId;
}
/**
* @return Name of data tree.
*/
private String dataTreeName() {
return grp.cacheOrGroupName() + "-" + treeName(partId);
}
/**
* @return Name of pending entires tree.
*/
private String pendingEntriesTreeName() {
return grp.cacheOrGroupName() + "-" +"PendingEntries-" + partId;
}
/**
* @param checkExists If {@code true} data store won't be initialized if it doesn't exists
* (has non empty data file). This is an optimization for lazy store initialization on writes.
*
* @return Store delegate.
* @throws IgniteCheckedException If failed.
*/
private CacheDataStore init0(boolean checkExists) throws IgniteCheckedException {
CacheDataStoreImpl delegate0 = delegate;
if (delegate0 != null)
return delegate0;
if (checkExists) {
if (!exists)
return null;
}
if (init.compareAndSet(false, true)) {
IgniteCacheDatabaseSharedManager dbMgr = ctx.database();
dbMgr.checkpointReadLock();
try {
Metas metas = getOrAllocatePartitionMetas();
if (PageIdUtils.partId(metas.reuseListRoot.pageId().pageId()) != partId ||
PageIdUtils.partId(metas.treeRoot.pageId().pageId()) != partId ||
PageIdUtils.partId(metas.pendingTreeRoot.pageId().pageId()) != partId ||
PageIdUtils.partId(metas.partMetastoreReuseListRoot.pageId().pageId()) != partId
) {
throw new IgniteCheckedException("Invalid meta root allocated [" +
"cacheOrGroupName=" + grp.cacheOrGroupName() +
", partId=" + partId +
", metas=" + metas + ']');
}
String freeListName = freeListName();
RootPage reuseRoot = metas.reuseListRoot;
freeList = new CacheFreeList(
grp.groupId(),
freeListName,
grp.dataRegion().memoryMetrics(),
grp.dataRegion(),
ctx.wal(),
reuseRoot.pageId().pageId(),
reuseRoot.isAllocated(),
ctx.diagnostic().pageLockTracker().createPageLockTracker(freeListName),
ctx.kernalContext(),
pageListCacheLimit
) {
/** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
assert grp.shared().database().checkpointLockIsHeldByThread();
return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
}
};
String partitionMetaStoreName = partitionMetaStoreName();
RootPage partMetastoreReuseListRoot = metas.partMetastoreReuseListRoot;
partStorage = new PartitionMetaStorageImpl<SimpleDataRow>(
grp.groupId(),
partitionMetaStoreName,
grp.dataRegion().memoryMetrics(),
grp.dataRegion(),
freeList,
ctx.wal(),
partMetastoreReuseListRoot.pageId().pageId(),
partMetastoreReuseListRoot.isAllocated(),
ctx.diagnostic().pageLockTracker().createPageLockTracker(partitionMetaStoreName),
ctx.kernalContext(),
pageListCacheLimit
) {
/** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
assert grp.shared().database().checkpointLockIsHeldByThread();
return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
}
};
String dataTreeName = dataTreeName();
CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId);
RootPage treeRoot = metas.treeRoot;
CacheDataTree dataTree = new CacheDataTree(
grp,
dataTreeName,
freeList,
rowStore,
treeRoot.pageId().pageId(),
treeRoot.isAllocated(),
ctx.diagnostic().pageLockTracker().createPageLockTracker(dataTreeName)
) {
/** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
assert grp.shared().database().checkpointLockIsHeldByThread();
return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
}
};
String pendingEntriesTreeName = pendingEntriesTreeName();
RootPage pendingTreeRoot = metas.pendingTreeRoot;
final PendingEntriesTree pendingTree0 = new PendingEntriesTree(
grp,
pendingEntriesTreeName,
grp.dataRegion().pageMemory(),
pendingTreeRoot.pageId().pageId(),
freeList,
pendingTreeRoot.isAllocated(),
ctx.diagnostic().pageLockTracker().createPageLockTracker(pendingEntriesTreeName)
) {
/** {@inheritDoc} */
@Override protected long allocatePageNoReuse() throws IgniteCheckedException {
assert grp.shared().database().checkpointLockIsHeldByThread();
return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA);
}
};
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
delegate0 = new CacheDataStoreImpl(partId, rowStore, dataTree) {
/** {@inheritDoc} */
@Override public PendingEntriesTree pendingTree() {
return pendingTree0;
}
/** {@inheritDoc} */
@Override public void preload() throws IgniteCheckedException {
IgnitePageStoreManager pageStoreMgr = ctx.pageStore();
if (pageStoreMgr == null)
return;
final int pages = pageStoreMgr.pages(grp.groupId(), partId);
long pageId = pageMem.partitionMetaPageId(grp.groupId(), partId);
// For each page sequentially pin/unpin.
for (int pageNo = 0; pageNo < pages; pageId++, pageNo++) {
long pagePointer = -1;
try {
pagePointer = pageMem.acquirePage(grp.groupId(), pageId);
}
finally {
if (pagePointer != -1)
pageMem.releasePage(grp.groupId(), pageId, pagePointer);
}
}
}
};
pendingTree = pendingTree0;
if (!pendingTree0.isEmpty())
grp.caches().forEach(cctx -> cctx.ttl().hasPendingEntries(true));
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
long partMetaPage = pageMem.acquirePage(grpId, partMetaId);
try {
long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage);
try {
if (PageIO.getType(pageAddr) != 0) {
PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2)PagePartitionMetaIO.VERSIONS.latest();
Map<Integer, Long> cacheSizes = null;
if (grp.sharedGroup())
cacheSizes = readSharedGroupCacheSizes(pageMem, grpId, io.getCountersPageId(pageAddr));
long link = io.getGapsLink(pageAddr);
byte[] data = link == 0 ? null : partStorage.readRow(link);
delegate0.restoreState(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes, data);
globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr));
}
}
finally {
pageMem.readUnlock(grpId, partMetaId, partMetaPage);
}
}
finally {
pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
delegate = delegate0;
}
catch (Throwable ex) {
U.error(log, "Unhandled exception during page store initialization. All further operations will " +
"be failed and local node will be stopped.", ex);
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex));
throw ex;
}
finally {
latch.countDown();
dbMgr.checkpointReadUnlock();
}
}
else {
U.await(latch);
delegate0 = delegate;
if (delegate0 == null)
throw new IgniteCheckedException("Cache store initialization failed.");
}
return delegate0;
}
/**
* @return Partition metas.
*/
private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException {
PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory();
IgniteWriteAheadLogManager wal = ctx.wal();
int grpId = grp.groupId();
long partMetaId = pageMem.partitionMetaPageId(grpId, partId);
AtomicBoolean metaPageAllocated = new AtomicBoolean(false);
long partMetaPage = pageMem.acquirePage(grpId, partMetaId, metaPageAllocated);
if (metaPageAllocated.get())
grp.metrics().incrementInitializedLocalPartitions();
try {
boolean allocated = false;
boolean pendingTreeAllocated = false;
boolean partMetastoreReuseListAllocated = false;
long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage);
try {
long treeRoot, reuseListRoot, pendingTreeRoot, partMetaStoreReuseListRoot;
// Initialize new page.
if (PageIO.getType(pageAddr) != PageIO.T_PART_META) {
PagePartitionMetaIOV2 io = (PagePartitionMetaIOV2)PagePartitionMetaIO.VERSIONS.latest();
io.initNewPage(pageAddr, partMetaId, pageMem.realPageSize(grpId));
treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
partMetaStoreReuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA;
assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA;
assert PageIdUtils.flag(pendingTreeRoot) == PageMemory.FLAG_DATA;
assert PageIdUtils.flag(partMetaStoreReuseListRoot) == PageMemory.FLAG_DATA;
io.setTreeRoot(pageAddr, treeRoot);
io.setReuseListRoot(pageAddr, reuseListRoot);
io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
io.setPartitionMetaStoreReuseListRoot(pageAddr, partMetaStoreReuseListRoot);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) {
wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr,
pageMem.pageSize(), pageMem.realPageSize(grpId)));
}
allocated = true;
}
else {
PagePartitionMetaIO io = PageIO.getPageIO(pageAddr);
treeRoot = io.getTreeRoot(pageAddr);
reuseListRoot = io.getReuseListRoot(pageAddr);
int pageVer = PagePartitionMetaIO.getVersion(pageAddr);
if (pageVer < 2) {
assert pageVer == 1;
if (log.isDebugEnabled())
log.info("Upgrade partition meta page version: [part=" + partId +
", grpId=" + grpId + ", oldVer=" + pageVer +
", newVer=" + io.getVersion()
);
io = PagePartitionMetaIO.VERSIONS.latest();
((PagePartitionMetaIOV2)io).upgradePage(pageAddr);
pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
partMetaStoreReuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
io.setPendingTreeRoot(pageAddr, pendingTreeRoot);
io.setPartitionMetaStoreReuseListRoot(pageAddr, partMetaStoreReuseListRoot);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal,
null)) {
wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr,
pageMem.pageSize(), pageMem.realPageSize(grpId)));
}
pendingTreeAllocated = partMetastoreReuseListAllocated = true;
}
else {
pendingTreeRoot = io.getPendingTreeRoot(pageAddr);
partMetaStoreReuseListRoot = io.getPartitionMetaStoreReuseListRoot(pageAddr);
if (partMetaStoreReuseListRoot == 0) {
partMetaStoreReuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA);
if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal,
null)) {
wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr,
pageMem.pageSize(), pageMem.realPageSize(grpId)));
}
partMetastoreReuseListAllocated = true;
}
}
if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA)
throw new StorageException("Wrong tree root page id flag: treeRoot="
+ U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId);
if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA)
throw new StorageException("Wrong reuse list root page id flag: reuseListRoot="
+ U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
if (PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_DATA)
throw new StorageException("Wrong pending tree root page id flag: reuseListRoot="
+ U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
if (PageIdUtils.flag(partMetaStoreReuseListRoot) != PageMemory.FLAG_DATA)
throw new StorageException("Wrong partition meta store list root page id flag: partMetaStoreReuseListRoot="
+ U.hexLong(partMetaStoreReuseListRoot) + ", part=" + partId + ", grpId=" + grpId);
}
return new Metas(
new RootPage(new FullPageId(treeRoot, grpId), allocated),
new RootPage(new FullPageId(reuseListRoot, grpId), allocated),
new RootPage(new FullPageId(pendingTreeRoot, grpId), allocated || pendingTreeAllocated),
new RootPage(new FullPageId(partMetaStoreReuseListRoot, grpId), allocated || partMetastoreReuseListAllocated));
}
finally {
pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null,
allocated || pendingTreeAllocated || partMetastoreReuseListAllocated);
}
}
finally {
pageMem.releasePage(grpId, partMetaId, partMetaPage);
}
}
/** {@inheritDoc} */
@Override public boolean init() {
try {
return init0(true) != null;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public int partId() {
return partId;
}
/** {@inheritDoc} */
@Override public RowStore rowStore() {
CacheDataStore delegate0 = delegate;
return delegate0 == null ? null : delegate0.rowStore();
}
/** {@inheritDoc} */
@Override public long fullSize() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? 0 : delegate0.fullSize();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public boolean isEmpty() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null || delegate0.isEmpty();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long cacheSize(int cacheId) {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? 0 : delegate0.cacheSize(cacheId);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public Map<Integer, Long> cacheSizes() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? null : delegate0.cacheSizes();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void updateSize(int cacheId, long delta) {
try {
CacheDataStore delegate0 = init0(false);
if (delegate0 != null)
delegate0.updateSize(cacheId, delta);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long updateCounter() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? 0 : delegate0.updateCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long reservedCounter() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? 0 : delegate0.reservedCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public PartitionUpdateCounter partUpdateCounter() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? null : delegate0.partUpdateCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long getAndIncrementUpdateCounter(long delta) {
try {
CacheDataStore delegate0 = init0(false);
return delegate0 == null ? 0 : delegate0.getAndIncrementUpdateCounter(delta);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long reserve(long delta) {
try {
CacheDataStore delegate0 = init0(false);
if (delegate0 == null)
throw new IllegalStateException("Should be never called.");
return delegate0.reserve(delta);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void updateCounter(long val) {
try {
CacheDataStore delegate0 = init0(false);
if (delegate0 != null)
delegate0.updateCounter(val);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public boolean updateCounter(long start, long delta) {
try {
CacheDataStore delegate0 = init0(false);
return delegate0 != null && delegate0.updateCounter(start, delta);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public GridLongList finalizeUpdateCounters() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 != null ? delegate0.finalizeUpdateCounters() : null;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long nextUpdateCounter() {
try {
CacheDataStore delegate0 = init0(false);
if (delegate0 == null)
throw new IllegalStateException("Should be never called.");
return delegate0.nextUpdateCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public long initialUpdateCounter() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? 0 : delegate0.initialUpdateCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void updateInitialCounter(long start, long delta) {
try {
CacheDataStore delegate0 = init0(false);
// Partition may not exists before recovery starts in case of recovering counters from RollbackRecord.
delegate0.updateInitialCounter(start, delta);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) {
try {
CacheDataStore delegate0 = init0(true);
if (delegate0 != null)
delegate0.setRowCacheCleaner(rowCacheCleaner);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void update(
GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow
) throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
CacheDataStore delegate = init0(false);
delegate.update(cctx, key, val, ver, expireTime, oldRow);
}
/** {@inheritDoc} */
@Override public boolean mvccInitialValue(
GridCacheContext cctx,
KeyCacheObject key,
@Nullable CacheObject val,
GridCacheVersion ver,
long expireTime,
MvccVersion mvccVer,
MvccVersion newMvccVer)
throws IgniteCheckedException
{
CacheDataStore delegate = init0(false);
return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer, newMvccVer);
}
/** {@inheritDoc} */
@Override public boolean mvccApplyHistoryIfAbsent(
GridCacheContext cctx,
KeyCacheObject key,
List<GridCacheMvccEntryInfo> hist)
throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.mvccApplyHistoryIfAbsent(cctx, key, hist);
}
/** {@inheritDoc} */
@Override public boolean mvccUpdateRowWithPreloadInfo(
GridCacheContext cctx,
KeyCacheObject key,
@Nullable CacheObject val,
GridCacheVersion ver,
long expireTime,
MvccVersion mvccVer,
MvccVersion newMvccVer,
byte mvccTxState,
byte newMvccTxState) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.mvccUpdateRowWithPreloadInfo(cctx,
key,
val,
ver,
expireTime,
mvccVer,
newMvccVer,
mvccTxState,
newMvccTxState);
}
/** {@inheritDoc} */
@Override public MvccUpdateResult mvccUpdate(
GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
MvccSnapshot mvccVer,
CacheEntryPredicate filter,
EntryProcessor entryProc,
Object[] invokeArgs,
boolean primary,
boolean needHistory,
boolean noCreate,
boolean needOldVal,
boolean retVal,
boolean keepBinary) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, entryProc, invokeArgs, primary,
needHistory, noCreate, needOldVal, retVal, keepBinary);
}
/** {@inheritDoc} */
@Override public MvccUpdateResult mvccRemove(
GridCacheContext cctx,
KeyCacheObject key,
MvccSnapshot mvccVer,
CacheEntryPredicate filter,
boolean primary,
boolean needHistory,
boolean needOldVal,
boolean retVal) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.mvccRemove(cctx, key, mvccVer,filter, primary, needHistory, needOldVal, retVal);
}
/** {@inheritDoc} */
@Override public MvccUpdateResult mvccLock(
GridCacheContext cctx,
KeyCacheObject key,
MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.mvccLock(cctx, key, mvccSnapshot);
}
/** {@inheritDoc} */
@Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
delegate.mvccRemoveAll(cctx, key);
}
/** {@inheritDoc} */
@Override public void mvccApplyUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver,
long expireTime, MvccVersion mvccVer) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
delegate.mvccApplyUpdate(cctx, key, val, ver, expireTime, mvccVer);
}
/** {@inheritDoc} */
@Override public CacheDataRow createRow(
GridCacheContext cctx,
KeyCacheObject key,
CacheObject val,
GridCacheVersion ver,
long expireTime,
@Nullable CacheDataRow oldRow) throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
CacheDataStore delegate = init0(false);
return delegate.createRow(cctx, key, val, ver, expireTime, oldRow);
}
/** {@inheritDoc} */
@Override public void insertRows(Collection<DataRowCacheAware> rows,
IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
delegate.insertRows(rows, initPred);
}
/** {@inheritDoc} */
@Override public int cleanup(GridCacheContext cctx,
@Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
return delegate.cleanup(cctx, cleanupRows);
}
/** {@inheritDoc} */
@Override public void updateTxState(GridCacheContext cctx, CacheSearchRow row) throws IgniteCheckedException {
CacheDataStore delegate = init0(false);
delegate.updateTxState(cctx, row);
}
/** {@inheritDoc} */
@Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c)
throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
CacheDataStore delegate = init0(false);
delegate.invoke(cctx, key, c);
}
/** {@inheritDoc} */
@Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId)
throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
CacheDataStore delegate = init0(false);
delegate.remove(cctx, key, partId);
}
/** {@inheritDoc} */
@Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.find(cctx, key);
return null;
}
/** {@inheritDoc} */
@Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot snapshot)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.mvccFind(cctx, key, snapshot);
return null;
}
/** {@inheritDoc} */
@Override public List<IgniteBiTuple<Object, MvccVersion>> mvccFindAllVersions(GridCacheContext cctx, KeyCacheObject key)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.mvccFindAllVersions(cctx, key);
return Collections.emptyList();
}
/** {@inheritDoc} */
@Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx,
KeyCacheObject key, Object x) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.mvccAllVersionsCursor(cctx, key, x);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor();
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(x);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(mvccSnapshot);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(
int cacheId,
KeyCacheObject lower,
KeyCacheObject upper) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(cacheId, lower, upper);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
KeyCacheObject lower,
KeyCacheObject upper,
Object x)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(cacheId, lower, upper, x);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
KeyCacheObject lower,
KeyCacheObject upper,
Object x,
MvccSnapshot mvccSnapshot)
throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public void destroy() throws IgniteCheckedException {
// No need to destroy delegate.
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(cacheId);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public GridCursor<? extends CacheDataRow> cursor(int cacheId,
MvccSnapshot mvccSnapshot) throws IgniteCheckedException {
CacheDataStore delegate = init0(true);
if (delegate != null)
return delegate.cursor(cacheId, mvccSnapshot);
return EMPTY_CURSOR;
}
/** {@inheritDoc} */
@Override public void clear(int cacheId) throws IgniteCheckedException {
assert ctx.database().checkpointLockIsHeldByThread();
CacheDataStore delegate0 = init0(true);
if (delegate0 == null)
return;
// Clear persistent pendingTree
if (pendingTree != null) {
PendingRow row = new PendingRow(cacheId);
GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY);
while (cursor.next()) {
PendingRow row0 = cursor.get();
assert row0.link != 0 : row;
boolean res = pendingTree.removex(row0);
assert res;
}
}
delegate0.clear(cacheId);
}
/**
* Gets the number of entries pending expire.
*
* @return Number of pending entries.
* @throws IgniteCheckedException If failed to get number of pending entries.
*/
public long expiredSize() throws IgniteCheckedException {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? 0 : pendingTree.size();
}
/**
* Try to remove expired entries from data store.
*
* @param cctx Cache context.
* @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details.
* @param amount Limit of processed entries by single call, {@code -1} for no limit.
* @return cleared entries count.
* @throws IgniteCheckedException If failed.
*/
public int purgeExpired(
GridCacheContext cctx,
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
int amount
) throws IgniteCheckedException {
CacheDataStore delegate0 = init0(true);
long nowNanos = System.nanoTime();
if (delegate0 == null || (cctx.cacheId() == lastThrottledCacheId && nextStoreCleanTimeNanos - nowNanos > 0))
return 0;
assert pendingTree != null : "Partition data store was not initialized.";
int cleared = purgeExpiredInternal(cctx, c, amount);
// Throttle if there is nothing to clean anymore.
if (cleared < amount) {
lastThrottledCacheId = cctx.cacheId();
nextStoreCleanTimeNanos = nowNanos + U.millisToNanos(unwindThrottlingTimeout);
}
return cleared;
}
/**
* Removes expired entries from data store.
*
* @param cctx Cache context.
* @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details.
* @param amount Limit of processed entries by single call, {@code -1} for no limit.
* @return cleared entries count.
* @throws IgniteCheckedException If failed.
*/
private int purgeExpiredInternal(
GridCacheContext cctx,
IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c,
int amount
) throws IgniteCheckedException {
GridDhtLocalPartition part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false);
// Skip non-owned partitions.
if (part == null || part.state() != OWNING)
return 0;
cctx.shared().database().checkpointReadLock();
try {
if (!part.reserve())
return 0;
try {
if (part.state() != OWNING)
return 0;
long now = U.currentTimeMillis();
GridCursor<PendingRow> cur;
if (grp.sharedGroup())
cur = pendingTree.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0));
else
cur = pendingTree.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0));
if (!cur.next())
return 0;
GridCacheVersion obsoleteVer = null;
int cleared = 0;
do {
PendingRow row = cur.get();
if (amount != -1 && cleared > amount)
return cleared;
assert row.key != null && row.link != 0 && row.expireTime != 0 : row;
row.key.partition(partId);
if (pendingTree.removex(row)) {
if (obsoleteVer == null)
obsoleteVer = ctx.versions().next();
GridCacheEntryEx e1 = cctx.cache().entryEx(row.key);
if (e1 != null)
c.apply(e1, obsoleteVer);
}
cleared++;
}
while (cur.next());
return cleared;
}
finally {
part.release();
}
}
finally {
cctx.shared().database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public PendingEntriesTree pendingTree() {
try {
CacheDataStore delegate0 = init0(true);
return delegate0 == null ? null : pendingTree;
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void preload() throws IgniteCheckedException {
CacheDataStore delegate0 = init0(true);
if (delegate0 != null)
delegate0.preload();
}
/** {@inheritDoc} */
@Override public void resetUpdateCounter() {
try {
CacheDataStore delegate0 = init0(true);
if (delegate0 == null)
return;
delegate0.resetUpdateCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
/** {@inheritDoc} */
@Override public void resetInitialUpdateCounter() {
try {
CacheDataStore delegate0 = init0(true);
if (delegate0 == null)
return;
delegate0.resetInitialUpdateCounter();
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
@Override public PartitionMetaStorage partStorage() {
return partStorage;
}
}
/**
*
*/
public static final GridCursor<CacheDataRow> EMPTY_CURSOR = new GridCursor<CacheDataRow>() {
/** {@inheritDoc} */
@Override public boolean next() {
return false;
}
/** {@inheritDoc} */
@Override public CacheDataRow get() {
return null;
}
};
}