| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence; |
| |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import javax.cache.processor.EntryProcessor; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.pagemem.PageIdAllocator; |
| import org.apache.ignite.internal.pagemem.PageIdUtils; |
| import org.apache.ignite.internal.pagemem.PageMemory; |
| import org.apache.ignite.internal.pagemem.PageSupport; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.WALPointer; |
| import org.apache.ignite.internal.pagemem.wal.record.DataEntry; |
| import org.apache.ignite.internal.pagemem.wal.record.DataRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateNextSnapshotId; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheGroupContext; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; |
| import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeListImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask; |
| import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; |
| import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; |
| import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange; |
| import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV2; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.FileWALPointer; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; |
| import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; |
| import org.apache.ignite.internal.processors.cache.tree.PendingRow; |
| import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult; |
| import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.lang.GridCursor; |
| import org.apache.ignite.internal.util.lang.IgniteInClosure2X; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; |
| |
| /** |
| * Used when persistence enabled. |
| */ |
| public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl implements DbCheckpointListener { |
| /** */ |
| private IndexStorage indexStorage; |
| |
| /** */ |
| private ReuseListImpl reuseList; |
| |
| /** {@inheritDoc} */ |
| @Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException { |
| // No-op. Per-partition PendingTree should be used. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void initDataStructures() throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| Metas metas = getOrAllocateCacheMetas(); |
| |
| RootPage reuseListRoot = metas.reuseListRoot; |
| |
| reuseList = new ReuseListImpl(grp.groupId(), |
| grp.cacheOrGroupName(), |
| grp.dataRegion().pageMemory(), |
| ctx.wal(), |
| reuseListRoot.pageId().pageId(), |
| reuseListRoot.isAllocated()); |
| |
| RootPage metastoreRoot = metas.treeRoot; |
| |
| indexStorage = new IndexStorageImpl(grp.dataRegion().pageMemory(), |
| ctx.wal(), |
| globalRemoveId(), |
| grp.groupId(), |
| PageIdAllocator.INDEX_PARTITION, |
| PageIdAllocator.FLAG_IDX, |
| reuseList, |
| metastoreRoot.pageId().pageId(), |
| metastoreRoot.isAllocated(), |
| ctx.kernalContext().failure()); |
| |
| ((GridCacheDatabaseSharedManager)ctx.database()).addCheckpointListener(this); |
| } |
| |
| /** |
| * Get internal IndexStorage. |
| * See {@link UpgradePendingTreeToPerPartitionTask} for details. |
| */ |
| public IndexStorage getIndexStorage() { |
| return indexStorage; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException { |
| if (ctx.database() instanceof GridCacheDatabaseSharedManager) |
| ((GridCacheDatabaseSharedManager) ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p); |
| |
| boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p); |
| |
| return new GridCacheDataStore(p, exists); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { |
| assert grp.dataRegion().pageMemory() instanceof PageMemoryEx; |
| |
| Executor execSvc = ctx.executor(); |
| |
| boolean needSnapshot = ctx.nextSnapshot() && ctx.needToSnapshot(grp.cacheOrGroupName()); |
| |
| boolean hasNonEmptyGroups = false; |
| |
| for (CacheDataStore store : partDataStores.values()) { |
| if (notEmpty(store)) { |
| hasNonEmptyGroups = true; |
| |
| break; |
| } |
| } |
| |
| if (needSnapshot && hasNonEmptyGroups) { |
| if (execSvc == null) |
| updateSnapshotTag(ctx); |
| else { |
| execSvc.execute(() -> { |
| try { |
| updateSnapshotTag(ctx); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| } |
| } |
| |
| if (execSvc == null) { |
| reuseList.saveMetadata(); |
| |
| for (CacheDataStore store : partDataStores.values()) |
| saveStoreMetadata(store, ctx, false, needSnapshot); |
| } |
| else { |
| execSvc.execute(() -> { |
| try { |
| reuseList.saveMetadata(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| |
| for (CacheDataStore store : partDataStores.values()) |
| execSvc.execute(() -> { |
| try { |
| saveStoreMetadata(store, ctx, false, needSnapshot); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @return {@code True} is group is not empty. |
| */ |
| private boolean notEmpty(CacheDataStore store) { |
| return store.rowStore() != null && (store.fullSize() > 0 || store.updateCounter() > 0); |
| } |
| |
| /** |
| * @param store Store to save metadata. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void saveStoreMetadata( |
| CacheDataStore store, |
| Context ctx, |
| boolean beforeDestroy, |
| boolean needSnapshot |
| ) throws IgniteCheckedException { |
| RowStore rowStore0 = store.rowStore(); |
| |
| if (rowStore0 != null) { |
| CacheFreeListImpl freeList = (CacheFreeListImpl)rowStore0.freeList(); |
| |
| freeList.saveMetadata(); |
| |
| long updCntr = store.updateCounter(); |
| long size = store.fullSize(); |
| long rmvId = globalRemoveId().get(); |
| |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = this.ctx.wal(); |
| |
| if (size > 0 || updCntr > 0) { |
| GridDhtPartitionState state = null; |
| |
| // localPartition will not acquire writeLock here because create=false. |
| GridDhtLocalPartition part = null; |
| |
| if (!grp.isLocal()) { |
| if (beforeDestroy) |
| state = GridDhtPartitionState.EVICTED; |
| else { |
| part = getPartition(store); |
| |
| if (part != null && part.state() != GridDhtPartitionState.EVICTED) |
| state = part.state(); |
| } |
| |
| // Do not save meta for evicted partitions on next checkpoints. |
| if (state == null) |
| return; |
| } |
| |
| int grpId = grp.groupId(); |
| long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); |
| |
| long partMetaPage = pageMem.acquirePage(grpId, partMetaId); |
| try { |
| long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); |
| |
| if (partMetaPageAddr == 0L) { |
| U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + |
| ", beforeDestroy=" + beforeDestroy + ", size=" + size + |
| ", updCntr=" + updCntr + ", state=" + state + ']'); |
| |
| return; |
| } |
| |
| boolean changed = false; |
| |
| try { |
| PagePartitionMetaIO io = PageIO.getPageIO(partMetaPageAddr); |
| |
| changed |= io.setUpdateCounter(partMetaPageAddr, updCntr); |
| changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId); |
| changed |= io.setSize(partMetaPageAddr, size); |
| |
| if (state != null) |
| changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal()); |
| else |
| assert grp.isLocal() : grp.cacheOrGroupName(); |
| |
| long cntrsPageId; |
| |
| if (grp.sharedGroup()) { |
| long initCntrPageId = io.getCountersPageId(partMetaPageAddr); |
| |
| Map<Integer, Long> newSizes = store.cacheSizes(); |
| Map<Integer, Long> prevSizes = readSharedGroupCacheSizes(pageMem, grpId, initCntrPageId); |
| |
| if (prevSizes != null && prevSizes.equals(newSizes)) |
| cntrsPageId = initCntrPageId; // Preventing modification of sizes pages for store |
| else { |
| cntrsPageId = writeSharedGroupCacheSizes(pageMem, grpId, initCntrPageId, |
| store.partId(), newSizes); |
| |
| if (initCntrPageId == 0 && cntrsPageId != 0) { |
| io.setCountersPageId(partMetaPageAddr, cntrsPageId); |
| |
| changed = true; |
| } |
| } |
| } |
| else |
| cntrsPageId = 0L; |
| |
| int pageCnt; |
| |
| if (needSnapshot) { |
| pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); |
| |
| io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt); |
| |
| if (state == OWNING) { |
| assert part != null; |
| |
| if (!addPartition( |
| part, |
| ctx.partitionStatMap(), |
| partMetaPageAddr, |
| io, |
| grpId, |
| store.partId(), |
| this.ctx.pageStore().pages(grpId, store.partId()), |
| store.fullSize() |
| )) |
| U.warn(log, "Partition was concurrently evicted grpId=" + grpId + |
| ", partitionId=" + part.id()); |
| } |
| else if (state == MOVING || state == RENTING) { |
| if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) { |
| if (log.isInfoEnabled()) |
| log.info("Will not include SQL indexes to snapshot because there is " + |
| "a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() + |
| ", partId=" + store.partId() + ", state=" + state + ']'); |
| } |
| } |
| |
| changed = true; |
| } |
| else |
| pageCnt = io.getCandidatePageCount(partMetaPageAddr); |
| |
| if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) |
| wal.log(new MetaPageUpdatePartitionDataRecord( |
| grpId, |
| partMetaId, |
| updCntr, |
| rmvId, |
| (int)size, // TODO: Partition size may be long |
| cntrsPageId, |
| state == null ? -1 : (byte)state.ordinal(), |
| pageCnt |
| )); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, partMetaId, partMetaPage); |
| } |
| } |
| else if (needSnapshot) |
| tryAddEmptyPartitionToSnapshot(store, ctx); |
| } |
| else if (needSnapshot) |
| tryAddEmptyPartitionToSnapshot(store, ctx); |
| } |
| |
| /** |
| * Check that we need to snapshot this partition and add it to map. |
| * |
| * @param store Store. |
| * @param ctx Snapshot context. |
| */ |
| private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context ctx) { |
| if (getPartition(store).state() == OWNING) { |
| ctx.partitionStatMap().put( |
| new GroupPartitionId(grp.groupId(), store.partId()), |
| new PagesAllocationRange(0, 0)); |
| } |
| } |
| |
| /** |
| * @param store Store. |
| * |
| * @return corresponding to store local partition |
| */ |
| private GridDhtLocalPartition getPartition(CacheDataStore store) { |
| return grp.topology().localPartition(store.partId(), |
| AffinityTopologyVersion.NONE, false, true); |
| } |
| |
| /** |
| * Loads cache sizes for all caches in shared group. |
| * |
| * @param pageMem page memory to perform operations on pages. |
| * @param grpId Cache group ID. |
| * @param cntrsPageId Counters page ID, if zero is provided that means no counters page exist. |
| * @return Cache sizes if store belongs to group containing multiple caches and sizes are available in memory. May |
| * return null if counter page does not exist. |
| * @throws IgniteCheckedException If page memory operation failed. |
| */ |
| @Nullable private static Map<Integer, Long> readSharedGroupCacheSizes(PageSupport pageMem, int grpId, |
| long cntrsPageId) throws IgniteCheckedException { |
| |
| if (cntrsPageId == 0L) |
| return null; |
| |
| Map<Integer, Long> cacheSizes = new HashMap<>(); |
| |
| long nextId = cntrsPageId; |
| |
| while (true) { |
| final long curId = nextId; |
| final long curPage = pageMem.acquirePage(grpId, curId); |
| |
| try { |
| final long curAddr = pageMem.readLock(grpId, curId, curPage); |
| |
| assert curAddr != 0; |
| |
| try { |
| PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr); |
| |
| if (cntrsIO.readCacheSizes(curAddr, cacheSizes)) |
| break; |
| |
| nextId = cntrsIO.getNextCountersPageId(curAddr); |
| |
| assert nextId != 0; |
| } |
| finally { |
| pageMem.readUnlock(grpId, curId, curPage); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, curId, curPage); |
| } |
| } |
| return cacheSizes; |
| } |
| |
| /** |
| * Saves cache sizes for all caches in shared group. Unconditionally marks pages as dirty. |
| * |
| * @param pageMem page memory to perform operations on pages. |
| * @param grpId Cache group ID. |
| * @param cntrsPageId Counters page ID, if zero is provided that means no counters page exist. |
| * @param partId Partition ID. |
| * @param sizes Cache sizes of all caches in group. Not null. |
| * @return new counter page Id. Same as {@code cntrsPageId} or new value if cache size pages were initialized. |
| * @throws IgniteCheckedException if page memory operation failed. |
| */ |
| private static long writeSharedGroupCacheSizes(PageMemory pageMem, int grpId, |
| long cntrsPageId, int partId, Map<Integer, Long> sizes) throws IgniteCheckedException { |
| byte[] data = PagePartitionCountersIO.VERSIONS.latest().serializeCacheSizes(sizes); |
| |
| int items = data.length / PagePartitionCountersIO.ITEM_SIZE; |
| boolean init = cntrsPageId == 0; |
| |
| if (init && !sizes.isEmpty()) |
| cntrsPageId = pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); |
| |
| long nextId = cntrsPageId; |
| int written = 0; |
| |
| while (written != items) { |
| final long curId = nextId; |
| final long curPage = pageMem.acquirePage(grpId, curId); |
| |
| try { |
| final long curAddr = pageMem.writeLock(grpId, curId, curPage); |
| |
| assert curAddr != 0; |
| |
| try { |
| PagePartitionCountersIO partCntrIo; |
| |
| if (init) { |
| partCntrIo = PagePartitionCountersIO.VERSIONS.latest(); |
| |
| partCntrIo.initNewPage(curAddr, curId, pageMem.realPageSize(grpId)); |
| } |
| else |
| partCntrIo = PageIO.getPageIO(curAddr); |
| |
| written += partCntrIo.writeCacheSizes(pageMem.realPageSize(grpId), curAddr, data, written); |
| |
| nextId = partCntrIo.getNextCountersPageId(curAddr); |
| |
| if (written != items && (init = nextId == 0)) { |
| //allocate new counters page |
| nextId = pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); |
| partCntrIo.setNextCountersPageId(curAddr, nextId); |
| } |
| } |
| finally { |
| // Write full page |
| pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE, true); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, curId, curPage); |
| } |
| } |
| |
| return cntrsPageId; |
| } |
| |
| /** |
| * @param ctx Context. |
| */ |
| private void updateSnapshotTag(Context ctx) throws IgniteCheckedException { |
| int grpId = grp.groupId(); |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = this.ctx.wal(); |
| |
| long metaPageId = pageMem.metaPageId(grpId); |
| long metaPage = pageMem.acquirePage(grpId, metaPageId); |
| |
| try { |
| long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage); |
| |
| try { |
| PageMetaIO metaIo = PageMetaIO.getPageIO(metaPageAddr); |
| |
| long nextSnapshotTag = metaIo.getNextSnapshotTag(metaPageAddr); |
| |
| metaIo.setNextSnapshotTag(metaPageAddr, nextSnapshotTag + 1); |
| |
| if (log != null && log.isDebugEnabled()) |
| log.debug("Save next snapshot before checkpoint start for grId = " + grpId |
| + ", nextSnapshotTag = " + nextSnapshotTag); |
| |
| if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId, |
| metaPage, wal, null)) |
| wal.log(new MetaPageUpdateNextSnapshotId(grpId, metaPageId, |
| nextSnapshotTag + 1)); |
| |
| addPartition( |
| null, |
| ctx.partitionStatMap(), |
| metaPageAddr, |
| metaIo, |
| grpId, |
| PageIdAllocator.INDEX_PARTITION, |
| this.ctx.pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION), |
| -1); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, metaPageId, metaPage, null, true); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, metaPageId, metaPage); |
| } |
| } |
| |
| /** |
| * @param part Local partition. |
| * @param map Map to add values to. |
| * @param metaPageAddr Meta page address |
| * @param io Page Meta IO |
| * @param grpId Cache Group ID. |
| * @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, grpId]</code> |
| */ |
| private static boolean addPartition( |
| GridDhtLocalPartition part, |
| final PartitionAllocationMap map, |
| final long metaPageAddr, |
| final PageMetaIO io, |
| final int grpId, |
| final int partId, |
| final int currAllocatedPageCnt, |
| final long partSize |
| ) { |
| if (part != null) { |
| boolean reserved = part.reserve(); |
| |
| if (!reserved) |
| return false; |
| } |
| else |
| assert partId == PageIdAllocator.INDEX_PARTITION : partId; |
| |
| assert PageIO.getPageId(metaPageAddr) != 0; |
| |
| int lastAllocatedPageCnt = io.getLastAllocatedPageCount(metaPageAddr); |
| |
| int curPageCnt = partSize == 0 ? 0 : currAllocatedPageCnt; |
| |
| map.put( |
| new GroupPartitionId(grpId, partId), |
| new PagesAllocationRange(lastAllocatedPageCnt, curPageCnt)); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { |
| assert ctx.database() instanceof GridCacheDatabaseSharedManager |
| : "Destroying cache data store when persistence is not enabled: " + ctx.database(); |
| |
| int partId = store.partId(); |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| saveStoreMetadata(store, null, true, false); |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| ((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId); |
| } |
| |
| /** |
| * Invalidates page memory for given partition. Destroys partition store. |
| * <b>NOTE:</b> This method can be invoked only within checkpoint lock or checkpointer thread. |
| * |
| * @param grpId Group ID. |
| * @param partId Partition ID. |
| * |
| * @throws IgniteCheckedException If destroy has failed. |
| */ |
| public void destroyPartitionStore(int grpId, int partId) throws IgniteCheckedException { |
| PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| |
| int tag = pageMemory.invalidate(grp.groupId(), partId); |
| |
| if (grp.walEnabled()) |
| ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId)); |
| |
| ctx.pageStore().onPartitionDestroyed(grpId, partId, tag); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onPartitionCounterUpdated(int part, long cntr) { |
| CacheDataStore store = partDataStores.get(part); |
| |
| assert store != null; |
| |
| long oldCnt = store.updateCounter(); |
| |
| if (oldCnt < cntr) |
| store.updateCounter(cntr); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onPartitionInitialCounterUpdated(int part, long cntr) { |
| CacheDataStore store = partDataStores.get(part); |
| |
| assert store != null; |
| |
| long oldCnt = store.initialUpdateCounter(); |
| |
| if (oldCnt < cntr) |
| store.updateInitialCounter(cntr); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long lastUpdatedPartitionCounter(int part) { |
| return partDataStores.get(part).updateCounter(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RootPage rootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException { |
| if (grp.sharedGroup()) |
| idxName = Integer.toString(cacheId) + "_" + idxName; |
| |
| return indexStorage.getOrAllocateForTree(idxName); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void dropRootPageForIndex(int cacheId, String idxName) throws IgniteCheckedException { |
| if (grp.sharedGroup()) |
| idxName = Integer.toString(cacheId) + "_" + idxName; |
| |
| indexStorage.dropRootPage(idxName); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ReuseList reuseListForIndex(String idxName) { |
| return reuseList; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| if (grp.affinityNode()) |
| ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this); |
| } |
| |
| /** |
| * @return Meta root pages info. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = ctx.wal(); |
| |
| int grpId = grp.groupId(); |
| long metaId = pageMem.metaPageId(grpId); |
| long metaPage = pageMem.acquirePage(grpId, metaId); |
| |
| try { |
| final long pageAddr = pageMem.writeLock(grpId, metaId, metaPage); |
| |
| boolean allocated = false; |
| |
| try { |
| long metastoreRoot, reuseListRoot; |
| |
| if (PageIO.getType(pageAddr) != PageIO.T_META) { |
| PageMetaIO pageIO = PageMetaIO.VERSIONS.latest(); |
| |
| pageIO.initNewPage(pageAddr, metaId, pageMem.realPageSize(grpId)); |
| |
| metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); |
| reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); |
| |
| pageIO.setTreeRoot(pageAddr, metastoreRoot); |
| pageIO.setReuseListRoot(pageAddr, reuseListRoot); |
| |
| if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) |
| wal.log(new MetaPageInitRecord( |
| grpId, |
| metaId, |
| pageIO.getType(), |
| pageIO.getVersion(), |
| metastoreRoot, |
| reuseListRoot |
| )); |
| |
| allocated = true; |
| } |
| else { |
| PageMetaIO pageIO = PageIO.getPageIO(pageAddr); |
| |
| metastoreRoot = pageIO.getTreeRoot(pageAddr); |
| reuseListRoot = pageIO.getReuseListRoot(pageAddr); |
| |
| assert reuseListRoot != 0L; |
| } |
| |
| return new Metas( |
| new RootPage(new FullPageId(metastoreRoot, grpId), allocated), |
| new RootPage(new FullPageId(reuseListRoot, grpId), allocated), |
| null); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, metaId, metaPage); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override @Nullable protected IgniteHistoricalIterator historicalIterator( |
| CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) throws IgniteCheckedException { |
| if (partCntrs == null || partCntrs.isEmpty()) |
| return null; |
| |
| if (grp.mvccEnabled()) // TODO IGNITE-7384 |
| return super.historicalIterator(partCntrs, missing); |
| |
| GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database(); |
| |
| FileWALPointer minPtr = null; |
| |
| for (int i = 0; i < partCntrs.size(); i++) { |
| int p = partCntrs.partitionAt(i); |
| long initCntr = partCntrs.initialUpdateCounterAt(i); |
| |
| FileWALPointer startPtr = (FileWALPointer)database.checkpointHistory().searchPartitionCounter( |
| grp.groupId(), p, initCntr); |
| |
| if (startPtr == null) |
| throw new IgniteCheckedException("Could not find start pointer for partition [part=" + p + ", partCntrSince=" + initCntr + "]"); |
| |
| if (minPtr == null || startPtr.compareTo(minPtr) < 0) |
| minPtr = startPtr; |
| } |
| |
| WALIterator it = grp.shared().wal().replay(minPtr); |
| |
| WALHistoricalIterator iterator = new WALHistoricalIterator(grp, partCntrs, it); |
| |
| // Add historical partitions which are unabled to reserve to missing set. |
| missing.addAll(iterator.missingParts); |
| |
| return iterator; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean expire( |
| GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount |
| ) throws IgniteCheckedException { |
| assert !cctx.isNear() : cctx.name(); |
| |
| if (!hasPendingEntries || nextCleanTime > U.currentTimeMillis()) |
| return false; |
| |
| // Prevent manager being stopped in the middle of pds operation. |
| if (!busyLock.enterBusy()) |
| return false; |
| |
| try { |
| int cleared = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) { |
| cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, amount - cleared); |
| |
| if (amount != -1 && cleared >= amount) |
| return true; |
| } |
| |
| // Throttle if there is nothing to clean anymore. |
| if (cleared < amount) |
| nextCleanTime = U.currentTimeMillis() + UNWIND_THROTTLING_TIMEOUT; |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long expiredSize() throws IgniteCheckedException { |
| long size = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) |
| size += ((GridCacheDataStore)store).expiredSize(); |
| |
| return size; |
| } |
| |
| /** |
| * Calculates free space of all partition data stores - number of bytes available for use in allocated pages. |
| * |
| * @return Tuple (numenator, denominator). |
| */ |
| long freeSpace() { |
| long freeSpace = 0; |
| |
| for (CacheDataStore store : partDataStores.values()) { |
| assert store instanceof GridCacheDataStore; |
| |
| CacheFreeListImpl freeList = ((GridCacheDataStore)store).freeList; |
| |
| if (freeList == null) |
| continue; |
| |
| freeSpace += freeList.freeSpace(); |
| } |
| |
| return freeSpace; |
| } |
| |
| /** |
| * |
| */ |
| private static class WALHistoricalIterator implements IgniteHistoricalIterator { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Cache context. */ |
| private final CacheGroupContext grp; |
| |
| /** Partition counters map. */ |
| private final CachePartitionPartialCountersMap partMap; |
| |
| /** Partitions marked as missing (unable to reserve or partition is not in OWNING state). */ |
| private final Set<Integer> missingParts = new HashSet<>(); |
| |
| /** Partitions marked as done. */ |
| private final Set<Integer> doneParts = new HashSet<>(); |
| |
| /** Cache IDs. This collection is stored as field to avoid re-calculation on each iteration. */ |
| private final Set<Integer> cacheIds; |
| |
| /** WAL iterator. */ |
| private WALIterator walIt; |
| |
| /** */ |
| private Iterator<DataEntry> entryIt; |
| |
| /** */ |
| private DataEntry next; |
| |
| /** Flag indicates that partition belongs to current {@link #next} is finished and no longer needs to rebalance. */ |
| private boolean reachedPartitionEnd; |
| |
| /** Flag indicates that update counters for requested partitions have been reached and done. |
| * It means that no further iteration is needed. */ |
| private boolean doneAllPartitions; |
| |
| /** |
| * @param grp Cache context. |
| * @param walIt WAL iterator. |
| */ |
| private WALHistoricalIterator(CacheGroupContext grp, CachePartitionPartialCountersMap partMap, WALIterator walIt) { |
| this.grp = grp; |
| this.partMap = partMap; |
| this.walIt = walIt; |
| |
| cacheIds = grp.cacheIds(); |
| |
| reservePartitions(); |
| |
| advance(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean contains(int partId) { |
| return partMap.contains(partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDone(int partId) { |
| return doneParts.contains(partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() throws IgniteCheckedException { |
| walIt.close(); |
| releasePartitions(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isClosed() { |
| return walIt.isClosed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNextX() { |
| return hasNext(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow nextX() throws IgniteCheckedException { |
| return next(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeX() throws IgniteCheckedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterator<CacheDataRow> iterator() { |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow next() { |
| if (next == null) |
| throw new NoSuchElementException(); |
| |
| CacheDataRow val = new DataEntryRow(next); |
| |
| if (reachedPartitionEnd) { |
| doneParts.add(next.partitionId()); |
| |
| reachedPartitionEnd = false; |
| |
| if (doneParts.size() == partMap.size()) |
| doneAllPartitions = true; |
| } |
| |
| advance(); |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Reserve historical partitions. |
| * If partition is unable to reserve, id of that partition is placed to {@link #missingParts} set. |
| */ |
| private void reservePartitions() { |
| for (int i = 0; i < partMap.size(); i++) { |
| int p = partMap.partitionAt(i); |
| GridDhtLocalPartition part = grp.topology().localPartition(p); |
| |
| if (part == null || !part.reserve()) { |
| missingParts.add(p); |
| continue; |
| } |
| |
| if (part.state() != OWNING) { |
| part.release(); |
| missingParts.add(p); |
| } |
| } |
| } |
| |
| /** |
| * Release historical partitions. |
| */ |
| private void releasePartitions() { |
| for (int i = 0; i < partMap.size(); i++) { |
| int p = partMap.partitionAt(i); |
| |
| if (missingParts.contains(p)) |
| continue; |
| |
| GridDhtLocalPartition part = grp.topology().localPartition(p); |
| |
| assert part != null && part.state() == OWNING && part.reservations() > 0 |
| : "Partition should in OWNING state and has at least 1 reservation"; |
| |
| part.release(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void advance() { |
| next = null; |
| |
| if (doneAllPartitions) |
| return; |
| |
| while (true) { |
| if (entryIt != null) { |
| while (entryIt.hasNext()) { |
| DataEntry entry = entryIt.next(); |
| |
| if (cacheIds.contains(entry.cacheId())) { |
| int idx = partMap.partitionIndex(entry.partitionId()); |
| |
| if (idx < 0 || missingParts.contains(idx)) |
| continue; |
| |
| long from = partMap.initialUpdateCounterAt(idx); |
| long to = partMap.updateCounterAt(idx); |
| |
| if (entry.partitionCounter() > from && entry.partitionCounter() <= to) { |
| if (entry.partitionCounter() == to) |
| reachedPartitionEnd = true; |
| |
| next = entry; |
| |
| return; |
| } |
| } |
| } |
| } |
| |
| entryIt = null; |
| |
| while (walIt.hasNext()) { |
| IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next(); |
| |
| if (rec.get2() instanceof DataRecord) { |
| DataRecord data = (DataRecord)rec.get2(); |
| |
| entryIt = data.writeEntries().iterator(); |
| // Move on to the next valid data entry. |
| |
| break; |
| } |
| } |
| |
| if (entryIt == null) |
| return; |
| } |
| } |
| } |
| |
| /** |
| * Data entry row. |
| */ |
| private static class DataEntryRow implements CacheDataRow { |
| /** */ |
| private final DataEntry entry; |
| |
| /** |
| * @param entry Data entry. |
| */ |
| private DataEntryRow(DataEntry entry) { |
| this.entry = entry; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public KeyCacheObject key() { |
| return entry.key(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void key(KeyCacheObject key) { |
| throw new IllegalStateException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheObject value() { |
| return entry.value(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCacheVersion version() { |
| return entry.writeVersion(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long expireTime() { |
| return entry.expireTime(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partition() { |
| return entry.partitionId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() throws IgniteCheckedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int headerSize() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long link() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void link(long link) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hash() { |
| return entry.key().hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int cacheId() { |
| return entry.cacheId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long mvccCoordinatorVersion() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long mvccCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int mvccOperationCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long newMvccCoordinatorVersion() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long newMvccCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int newMvccOperationCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte mvccTxState() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte newMvccTxState() { |
| return 0; // TODO IGNITE-7384 |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class Metas { |
| /** */ |
| @GridToStringInclude |
| private final RootPage reuseListRoot; |
| |
| /** */ |
| @GridToStringInclude |
| private final RootPage treeRoot; |
| |
| /** */ |
| @GridToStringInclude |
| private final RootPage pendingTreeRoot; |
| |
| /** |
| * @param treeRoot Metadata storage root. |
| * @param reuseListRoot Reuse list root. |
| */ |
| Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage pendingTreeRoot) { |
| this.treeRoot = treeRoot; |
| this.reuseListRoot = reuseListRoot; |
| this.pendingTreeRoot = pendingTreeRoot; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(Metas.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public class GridCacheDataStore implements CacheDataStore { |
| /** */ |
| private final int partId; |
| |
| /** */ |
| private String name; |
| |
| /** */ |
| private volatile CacheFreeListImpl freeList; |
| |
| /** */ |
| private PendingEntriesTree pendingTree; |
| |
| /** */ |
| private volatile CacheDataStore delegate; |
| |
| /** Timestamp when next clean try will be allowed for current partition. |
| * Used for fine-grained throttling on per-partition basis. */ |
| private volatile long nextStoreCleanTime; |
| |
| /** */ |
| private final boolean exists; |
| |
| /** */ |
| private final AtomicBoolean init = new AtomicBoolean(); |
| |
| /** */ |
| private final CountDownLatch latch = new CountDownLatch(1); |
| |
| /** |
| * @param partId Partition. |
| * @param exists {@code True} if store for this index exists. |
| */ |
| private GridCacheDataStore(int partId, boolean exists) { |
| this.partId = partId; |
| this.exists = exists; |
| |
| name = treeName(partId); |
| } |
| |
| /** |
| * @return Store delegate. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private CacheDataStore init0(boolean checkExists) throws IgniteCheckedException { |
| CacheDataStore delegate0 = delegate; |
| |
| if (delegate0 != null) |
| return delegate0; |
| |
| if (checkExists) { |
| if (!exists) |
| return null; |
| } |
| |
| if (init.compareAndSet(false, true)) { |
| IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); |
| |
| dbMgr.checkpointReadLock(); |
| |
| try { |
| Metas metas = getOrAllocatePartitionMetas(); |
| |
| RootPage reuseRoot = metas.reuseListRoot; |
| |
| freeList = new CacheFreeListImpl( |
| grp.groupId(), |
| grp.cacheOrGroupName() + "-" + partId, |
| grp.dataRegion().memoryMetrics(), |
| grp.dataRegion(), |
| null, |
| ctx.wal(), |
| reuseRoot.pageId().pageId(), |
| reuseRoot.isAllocated()) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); |
| } |
| }; |
| |
| CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId); |
| |
| RootPage treeRoot = metas.treeRoot; |
| |
| CacheDataTree dataTree = new CacheDataTree( |
| grp, |
| name, |
| freeList, |
| rowStore, |
| treeRoot.pageId().pageId(), |
| treeRoot.isAllocated()) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); |
| } |
| }; |
| |
| RootPage pendingTreeRoot = metas.pendingTreeRoot; |
| |
| final PendingEntriesTree pendingTree0 = new PendingEntriesTree( |
| grp, |
| "PendingEntries-" + partId, |
| grp.dataRegion().pageMemory(), |
| pendingTreeRoot.pageId().pageId(), |
| freeList, |
| pendingTreeRoot.isAllocated()) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_DATA); |
| } |
| }; |
| |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| |
| delegate0 = new CacheDataStoreImpl(partId, name, rowStore, dataTree) { |
| /** {@inheritDoc} */ |
| @Override public PendingEntriesTree pendingTree() { |
| return pendingTree0; |
| } |
| }; |
| |
| pendingTree = pendingTree0; |
| |
| if (!hasPendingEntries && pendingTree0.size() > 0) |
| hasPendingEntries = true; |
| |
| int grpId = grp.groupId(); |
| long partMetaId = pageMem.partitionMetaPageId(grpId, partId); |
| long partMetaPage = pageMem.acquirePage(grpId, partMetaId); |
| |
| try { |
| long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); |
| |
| try { |
| if (PageIO.getType(pageAddr) != 0) { |
| PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); |
| |
| Map<Integer, Long> cacheSizes = null; |
| |
| if (grp.sharedGroup()) |
| cacheSizes = readSharedGroupCacheSizes(pageMem, grpId, io.getCountersPageId(pageAddr)); |
| |
| delegate0.init(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes); |
| |
| globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); |
| } |
| } |
| finally { |
| pageMem.readUnlock(grpId, partMetaId, partMetaPage); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, partMetaId, partMetaPage); |
| } |
| |
| delegate = delegate0; |
| } |
| catch (Throwable ex) { |
| U.error(log, "Unhandled exception during page store initialization. All further operations will " + |
| "be failed and local node will be stopped.", ex); |
| |
| ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex)); |
| |
| throw ex; |
| } |
| finally { |
| latch.countDown(); |
| |
| dbMgr.checkpointReadUnlock(); |
| } |
| } |
| else { |
| U.await(latch); |
| |
| delegate0 = delegate; |
| |
| if (delegate0 == null) |
| throw new IgniteCheckedException("Cache store initialization failed."); |
| } |
| |
| return delegate0; |
| } |
| |
| /** |
| * @return Partition metas. |
| */ |
| private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = ctx.wal(); |
| |
| int grpId = grp.groupId(); |
| long partMetaId = pageMem.partitionMetaPageId(grpId, partId); |
| |
| long partMetaPage = pageMem.acquirePage(grpId, partMetaId); |
| try { |
| boolean allocated = false; |
| boolean pendingTreeAllocated = false; |
| |
| long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); |
| try { |
| long treeRoot, reuseListRoot, pendingTreeRoot; |
| |
| // Initialize new page. |
| if (PageIO.getType(pageAddr) != PageIO.T_PART_META) { |
| PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.latest(); |
| |
| io.initNewPage(pageAddr, partMetaId, pageMem.realPageSize(grpId)); |
| |
| treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); |
| reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); |
| pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); |
| |
| assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_DATA; |
| assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_DATA; |
| assert PageIdUtils.flag(pendingTreeRoot) == PageMemory.FLAG_DATA; |
| |
| io.setTreeRoot(pageAddr, treeRoot); |
| io.setReuseListRoot(pageAddr, reuseListRoot); |
| io.setPendingTreeRoot(pageAddr, pendingTreeRoot); |
| |
| if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) { |
| wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, |
| pageMem.pageSize(), pageMem.realPageSize(grpId))); |
| } |
| |
| allocated = true; |
| } |
| else { |
| PagePartitionMetaIO io = PageIO.getPageIO(pageAddr); |
| |
| treeRoot = io.getTreeRoot(pageAddr); |
| reuseListRoot = io.getReuseListRoot(pageAddr); |
| |
| int pageVersion = PagePartitionMetaIO.getVersion(pageAddr); |
| |
| if (pageVersion < 2) { |
| assert pageVersion == 1; |
| |
| if (log.isDebugEnabled()) |
| log.info("Upgrade partition meta page version: [part=" + partId + |
| ", grpId=" + grpId + ", oldVer=" + pageVersion + |
| ", newVer=" + io.getVersion() |
| ); |
| |
| io = PagePartitionMetaIO.VERSIONS.latest(); |
| |
| ((PagePartitionMetaIOV2)io).upgradePage(pageAddr); |
| |
| pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_DATA); |
| |
| io.setPendingTreeRoot(pageAddr, pendingTreeRoot); |
| |
| if (PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, |
| null)) { |
| wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, |
| pageMem.pageSize(), pageMem.realPageSize(grpId))); |
| } |
| |
| pendingTreeAllocated = true; |
| } |
| else |
| pendingTreeRoot = io.getPendingTreeRoot(pageAddr); |
| |
| if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong tree root page id flag: treeRoot=" |
| + U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId); |
| |
| if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong reuse list root page id flag: reuseListRoot=" |
| + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId); |
| |
| if (PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong pending tree root page id flag: reuseListRoot=" |
| + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId); |
| } |
| |
| return new Metas( |
| new RootPage(new FullPageId(treeRoot, grpId), allocated), |
| new RootPage(new FullPageId(reuseListRoot, grpId), allocated), |
| new RootPage(new FullPageId(pendingTreeRoot, grpId), allocated || pendingTreeAllocated)); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, allocated || pendingTreeAllocated); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, partMetaId, partMetaPage); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partId() { |
| return partId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String name() { |
| return name; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RowStore rowStore() { |
| CacheDataStore delegate0 = delegate; |
| |
| return delegate0 == null ? null : delegate0.rowStore(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long fullSize() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.fullSize(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long cacheSize(int cacheId) { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.cacheSize(cacheId); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<Integer, Long> cacheSizes() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? null : delegate0.cacheSizes(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateSize(int cacheId, long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 != null) |
| delegate0.updateSize(cacheId, delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long updateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.updateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getAndIncrementUpdateCounter(long delta) { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.getAndIncrementUpdateCounter(delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void init(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes) { |
| throw new IllegalStateException("Should be never called."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateCounter(long val) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 != null) |
| delegate0.updateCounter(val); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateCounter(long start, long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 != null) |
| delegate0.updateCounter(start, delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void finalizeUpdateCountres() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 != null) |
| delegate0.finalizeUpdateCountres(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long nextUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| return delegate0 == null ? 0 : delegate0.nextUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long initialUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.initialUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateInitialCounter(long cntr) { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 != null) |
| delegate0.updateInitialCounter(cntr); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 != null) |
| delegate0.setRowCacheCleaner(rowCacheCleaner); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void update( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| @Nullable CacheDataRow oldRow |
| ) throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| delegate.update(cctx, key, val, ver, expireTime, oldRow); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean mvccInitialValue( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| @Nullable CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccVersion mvccVer, |
| MvccVersion newMvccVer) |
| throws IgniteCheckedException |
| { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer, newMvccVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean mvccInitialValueIfAbsent( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| @Nullable CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccVersion mvccVer, |
| MvccVersion newMvccVer, |
| byte txState, |
| byte newTxState) |
| throws IgniteCheckedException |
| { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccInitialValueIfAbsent(cctx, key, val, ver, expireTime, mvccVer, newMvccVer, |
| txState, newTxState); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean mvccUpdateRowWithPreloadInfo( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| @Nullable CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccVersion mvccVer, |
| MvccVersion newMvccVer, |
| byte mvccTxState, |
| byte newMvccTxState) throws IgniteCheckedException { |
| |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccUpdateRowWithPreloadInfo(cctx, |
| key, |
| val, |
| ver, |
| expireTime, |
| mvccVer, |
| newMvccVer, |
| mvccTxState, |
| newMvccTxState); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MvccUpdateResult mvccUpdate( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccSnapshot mvccVer, |
| CacheEntryPredicate filter, |
| EntryProcessor entryProc, |
| Object[] invokeArgs, |
| boolean primary, |
| boolean needHistory, |
| boolean noCreate, |
| boolean needOldVal, |
| boolean retVal) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, entryProc, invokeArgs, primary, |
| needHistory, noCreate, needOldVal, retVal); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MvccUpdateResult mvccRemove( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| MvccSnapshot mvccVer, |
| CacheEntryPredicate filter, |
| boolean primary, |
| boolean needHistory, |
| boolean needOldVal, |
| boolean retVal) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccRemove(cctx, key, mvccVer,filter, primary, needHistory, needOldVal, retVal); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MvccUpdateResult mvccLock( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccLock(cctx, key, mvccSnapshot); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridLongList mvccUpdateNative(GridCacheContext cctx, boolean primary, KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccUpdateNative(cctx, primary, key, val, ver, expireTime, mvccSnapshot); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridLongList mvccRemoveNative(GridCacheContext cctx, boolean primary, KeyCacheObject key, MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccRemoveNative(cctx, primary, key, mvccSnapshot); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.mvccRemoveAll(cctx, key); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void mvccApplyUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, |
| long expireTime, MvccVersion mvccVer) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.mvccApplyUpdate(cctx, key, val, ver, expireTime, mvccVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow createRow( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| @Nullable CacheDataRow oldRow) throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int cleanup(GridCacheContext cctx, |
| @Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.cleanup(cctx, cleanupRows); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateTxState(GridCacheContext cctx, CacheSearchRow row) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.updateTxState(cctx, row); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) |
| throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| delegate.invoke(cctx, key, c); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) |
| throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| delegate.remove(cctx, key, partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.find(cctx, key); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot snapshot) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.mvccFind(cctx, key, snapshot); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<IgniteBiTuple<Object, MvccVersion>> mvccFindAllVersions(GridCacheContext cctx, KeyCacheObject key) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.mvccFindAllVersions(cctx, key); |
| |
| return Collections.emptyList(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, |
| KeyCacheObject key, Object x) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.mvccAllVersionsCursor(cctx, key, x); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(x); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(mvccSnapshot); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor( |
| int cacheId, |
| KeyCacheObject lower, |
| KeyCacheObject upper) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, lower, upper); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| KeyCacheObject lower, |
| KeyCacheObject upper, |
| Object x) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, lower, upper, x); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| KeyCacheObject lower, |
| KeyCacheObject upper, |
| Object x, |
| MvccSnapshot mvccSnapshot) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void destroy() throws IgniteCheckedException { |
| // No need to destroy delegate. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, mvccSnapshot); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear(int cacheId) throws IgniteCheckedException { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 == null) |
| return; |
| |
| ctx.database().checkpointReadLock(); |
| try { |
| // Clear persistent pendingTree |
| if (pendingTree != null) { |
| PendingRow row = new PendingRow(cacheId); |
| |
| GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY); |
| |
| while (cursor.next()) { |
| PendingRow row0 = cursor.get(); |
| |
| assert row0.link != 0 : row; |
| |
| boolean res = pendingTree.removex(row0); |
| |
| assert res; |
| } |
| } |
| |
| delegate0.clear(cacheId); |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * Gets the number of entries pending expire. |
| * |
| * @return Number of pending entries. |
| * @throws IgniteCheckedException If failed to get number of pending entries. |
| */ |
| public long expiredSize() throws IgniteCheckedException { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : pendingTree.size(); |
| } |
| |
| /** |
| * Try to remove expired entries from data store. |
| * |
| * @param cctx Cache context. |
| * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details. |
| * @param amount Limit of processed entries by single call, {@code -1} for no limit. |
| * @return cleared entries count. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public int purgeExpired(GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount) throws IgniteCheckedException { |
| CacheDataStore delegate0 = init0(true); |
| |
| long now = U.currentTimeMillis(); |
| |
| if (delegate0 == null || nextStoreCleanTime > now) |
| return 0; |
| |
| assert pendingTree != null : "Partition data store was not initialized."; |
| |
| int cleared = purgeExpiredInternal(cctx, c, amount); |
| |
| // Throttle if there is nothing to clean anymore. |
| if (cleared < amount) |
| nextStoreCleanTime = now + UNWIND_THROTTLING_TIMEOUT; |
| |
| return cleared; |
| } |
| |
| /** |
| * Removes expired entries from data store. |
| * |
| * @param cctx Cache context. |
| * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details. |
| * @param amount Limit of processed entries by single call, {@code -1} for no limit. |
| * @return cleared entries count. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private int purgeExpiredInternal(GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount) throws IgniteCheckedException { |
| |
| GridDhtLocalPartition part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false); |
| |
| // Skip non-owned partitions. |
| if (part == null || part.state() != OWNING) |
| return 0; |
| |
| cctx.shared().database().checkpointReadLock(); |
| try { |
| if (!part.reserve()) |
| return 0; |
| |
| try { |
| if (part.state() != OWNING) |
| return 0; |
| |
| long now = U.currentTimeMillis(); |
| |
| GridCursor<PendingRow> cur; |
| |
| if (grp.sharedGroup()) |
| cur = pendingTree.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); |
| else |
| cur = pendingTree.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0)); |
| |
| if (!cur.next()) |
| return 0; |
| |
| GridCacheVersion obsoleteVer = null; |
| |
| int cleared = 0; |
| |
| do { |
| PendingRow row = cur.get(); |
| |
| if (amount != -1 && cleared > amount) |
| return cleared; |
| |
| assert row.key != null && row.link != 0 && row.expireTime != 0 : row; |
| |
| row.key.partition(partId); |
| |
| if (pendingTree.removex(row)) { |
| if (obsoleteVer == null) |
| obsoleteVer = ctx.versions().next(); |
| |
| GridCacheEntryEx e1 = cctx.cache().entryEx(row.key); |
| |
| if (e1 != null) |
| c.apply(e1, obsoleteVer); |
| } |
| |
| cleared++; |
| } |
| while (cur.next()); |
| |
| return cleared; |
| } |
| finally { |
| part.release(); |
| } |
| } |
| finally { |
| cctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PendingEntriesTree pendingTree() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? null : pendingTree; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static final GridCursor<CacheDataRow> EMPTY_CURSOR = new GridCursor<CacheDataRow>() { |
| /** {@inheritDoc} */ |
| @Override public boolean next() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow get() { |
| return null; |
| } |
| }; |
| } |