| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.persistence; |
| |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Objects; |
| import java.util.Set; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.Executor; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.function.ToLongFunction; |
| import java.util.stream.Stream; |
| import javax.cache.processor.EntryProcessor; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.SystemProperty; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.managers.encryption.GridEncryptionManager; |
| import org.apache.ignite.internal.managers.encryption.ReencryptStateUtils; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.pagemem.PageIdAllocator; |
| import org.apache.ignite.internal.pagemem.PageIdUtils; |
| import org.apache.ignite.internal.pagemem.PageMemory; |
| import org.apache.ignite.internal.pagemem.PageSupport; |
| import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; |
| import org.apache.ignite.internal.pagemem.store.PageStore; |
| import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager; |
| import org.apache.ignite.internal.pagemem.wal.WALIterator; |
| import org.apache.ignite.internal.pagemem.wal.record.DataEntry; |
| import org.apache.ignite.internal.pagemem.wal.record.DataRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.PageSnapshot; |
| import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.WALRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageInitRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdateIndexDataRecord; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.MetaPageUpdatePartitionDataRecordV3; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionDestroyRecord; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheGroupContext; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheTtlManager; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManagerImpl; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.PartitionUpdateCounter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIteratorException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccVersion; |
| import org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; |
| import org.apache.ignite.internal.processors.cache.persistence.freelist.AbstractFreeList; |
| import org.apache.ignite.internal.processors.cache.persistence.freelist.CacheFreeList; |
| import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow; |
| import org.apache.ignite.internal.processors.cache.persistence.migration.UpgradePendingTreeToPerPartitionTask; |
| import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMemoryEx; |
| import org.apache.ignite.internal.processors.cache.persistence.pagemem.PageMetrics; |
| import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId; |
| import org.apache.ignite.internal.processors.cache.persistence.partstate.PagesAllocationRange; |
| import org.apache.ignite.internal.processors.cache.persistence.partstate.PartitionAllocationMap; |
| import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage; |
| import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorageImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageMetaIOV2; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionCountersIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIO; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.io.PagePartitionMetaIOV3; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseListImpl; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler; |
| import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; |
| import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; |
| import org.apache.ignite.internal.processors.cache.tree.PendingRow; |
| import org.apache.ignite.internal.processors.cache.tree.mvcc.data.MvccUpdateResult; |
| import org.apache.ignite.internal.processors.cache.tree.mvcc.search.MvccLinkAwareSearchRow; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.lang.GridCursor; |
| import org.apache.ignite.internal.util.lang.IgniteInClosure2X; |
| import org.apache.ignite.internal.util.lang.IgnitePredicateX; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.cache.GridCacheTtlManager.DFLT_UNWIND_THROTTLING_TIMEOUT; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; |
| import static org.apache.ignite.internal.processors.cache.persistence.tree.util.PageHandler.isWalDeltaRecordNeeded; |
| |
| /** |
| * Used when persistence enabled. |
| */ |
| public class GridCacheOffheapManager extends IgniteCacheOffheapManagerImpl implements CheckpointListener { |
| /** @see #WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE */ |
| public static final int DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE = 5; |
| |
| @SystemProperty(value = "The WAL iterator margin that is used to prevent partitions divergence on the historical " + |
| "rebalance of atomic caches", type = Long.class, |
| defaults = "" + DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE) |
| public static final String WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE = |
| "WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE"; |
| |
| /** |
| * Margin for WAL iterator, that used for historical rebalance on atomic cache. |
| * It is intended for prevent partition divergence due to reordering in WAL. |
| * <p> |
| * Default is {@code 5}. Iterator starts from 5 updates earlier than expected. |
| * |
| */ |
| private final long walAtomicCacheMargin = IgniteSystemProperties.getLong( |
| WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE, DFLT_WAL_MARGIN_FOR_ATOMIC_CACHE_HISTORICAL_REBALANCE); |
| |
| /** |
| * Throttling timeout in millis which avoid excessive PendingTree access on unwind |
| * if there is nothing to clean yet. |
| */ |
| private final long unwindThrottlingTimeout = Long.getLong( |
| IgniteSystemProperties.IGNITE_UNWIND_THROTTLING_TIMEOUT, DFLT_UNWIND_THROTTLING_TIMEOUT); |
| |
| /** */ |
| private IndexStorage indexStorage; |
| |
| /** */ |
| private ReuseListImpl reuseList; |
| |
| /** Page list cache limit for data region of this cache group. */ |
| private AtomicLong pageListCacheLimit; |
| |
| /** Flag indicates that all group partitions have restored their state from page memory / disk. */ |
| private volatile boolean partitionStatesRestored; |
| |
| /** */ |
| private DataStorageMetricsImpl persStoreMetrics; |
| |
| /** {@inheritDoc} */ |
| @Override protected void initPendingTree(GridCacheContext cctx) throws IgniteCheckedException { |
| // No-op. Per-partition PendingTree should be used. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void initDataStructures() throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| Metas metas = getOrAllocateCacheMetas(); |
| |
| String reuseListName = grp.cacheOrGroupName() + "##ReuseList"; |
| String indexStorageTreeName = grp.cacheOrGroupName() + "##IndexStorageTree"; |
| |
| RootPage reuseListRoot = metas.reuseListRoot; |
| |
| GridCacheDatabaseSharedManager databaseSharedManager = (GridCacheDatabaseSharedManager)ctx.database(); |
| |
| pageListCacheLimit = databaseSharedManager.pageListCacheLimitHolder(grp.dataRegion()); |
| |
| reuseList = new ReuseListImpl( |
| grp.groupId(), |
| reuseListName, |
| grp.dataRegion().pageMemory(), |
| ctx.wal(), |
| reuseListRoot.pageId().pageId(), |
| reuseListRoot.isAllocated(), |
| ctx.diagnostic().pageLockTracker(), |
| ctx.kernalContext(), |
| pageListCacheLimit, |
| PageIdAllocator.FLAG_IDX |
| ); |
| |
| RootPage metastoreRoot = metas.treeRoot; |
| |
| indexStorage = new IndexStorageImpl( |
| indexStorageTreeName, |
| grp.dataRegion().pageMemory(), |
| ctx.wal(), |
| globalRemoveId(), |
| grp.groupId(), |
| grp.sharedGroup(), |
| PageIdAllocator.INDEX_PARTITION, |
| PageIdAllocator.FLAG_IDX, |
| reuseList, |
| metastoreRoot.pageId().pageId(), |
| metastoreRoot.isAllocated(), |
| ctx.kernalContext().failure(), |
| ctx.diagnostic().pageLockTracker() |
| ); |
| |
| persStoreMetrics = databaseSharedManager.persistentStoreMetricsImpl(); |
| |
| databaseSharedManager.addCheckpointListener(this, grp.dataRegion()); |
| } |
| |
| /** |
| * Get internal IndexStorage. |
| * See {@link UpgradePendingTreeToPerPartitionTask} for details. |
| */ |
| public IndexStorage getIndexStorage() { |
| return indexStorage; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException { |
| if (ctx.database() instanceof GridCacheDatabaseSharedManager) { |
| boolean canceled = |
| ((GridCacheDatabaseSharedManager)ctx.database()).cancelOrWaitPartitionDestroy(grp.groupId(), p); |
| |
| if (canceled && grp.config().isEncryptionEnabled()) |
| ctx.kernalContext().encryption().onCancelDestroyPartitionStore(grp, p); |
| } |
| |
| boolean exists = ctx.pageStore() != null && ctx.pageStore().exists(grp.groupId(), p); |
| |
| return createGridCacheDataStore(grp, p, exists, log); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onCheckpointBegin(Context ctx) throws IgniteCheckedException { |
| /* No-op. */ |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onMarkCheckpointBegin(Context ctx) throws IgniteCheckedException { |
| assert grp.dataRegion().pageMemory() instanceof PageMemoryEx; |
| |
| syncMetadata(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void beforeCheckpointBegin(Context ctx) throws IgniteCheckedException { |
| assert F.size(cacheDataStores().iterator(), CacheDataStore::destroyed) == 0; |
| |
| // Optimization: reducing the holding time of checkpoint write lock. |
| syncMetadata(ctx, ctx.executor(), false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void afterCheckpointEnd(Context ctx) throws IgniteCheckedException { |
| persStoreMetrics.onStorageSizeChanged( |
| forAllPageStores(PageStore::size), |
| forAllPageStores(PageStore::getSparseSize) |
| ); |
| } |
| |
| /** |
| * @param f Consumer. |
| * @return Accumulated result for all page stores. |
| */ |
| private long forAllPageStores(ToLongFunction<PageStore> f) { |
| return forGroupPageStores(grp, f); |
| } |
| |
| /** |
| * @param gctx Group context. |
| * @param f Consumer. |
| * @return Accumulated result for all page stores. |
| */ |
| private long forGroupPageStores(CacheGroupContext gctx, ToLongFunction<PageStore> f) { |
| int groupId = gctx.groupId(); |
| |
| long res = 0; |
| |
| try { |
| Collection<PageStore> stores = ((FilePageStoreManager)ctx.cache().context().pageStore()).getStores(groupId); |
| |
| if (stores != null) { |
| for (PageStore store : stores) |
| res += f.applyAsLong(store); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Syncs and saves meta-information of all data structures to page memory. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void syncMetadata(Context ctx) throws IgniteCheckedException { |
| Executor execSvc = ctx.executor(); |
| |
| boolean needSnapshot = ctx.nextSnapshot() && ctx.needToSnapshot(grp.cacheOrGroupName()); |
| |
| if (needSnapshot) { |
| if (execSvc == null) |
| addPartitions(ctx); |
| else { |
| execSvc.execute(() -> { |
| try { |
| addPartitions(ctx); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| } |
| } |
| |
| syncMetadata(ctx, ctx.executor(), needSnapshot); |
| } |
| |
| /** |
| * Syncs and saves meta-information of all data structures to page memory. |
| * |
| * @param execSvc Executor service to run save process |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void syncMetadata(Context ctx, Executor execSvc, boolean needSnapshot) throws IgniteCheckedException { |
| if (execSvc == null) { |
| reuseList.saveMetadata(grp.statisticsHolderData()); |
| |
| for (CacheDataStore store : cacheDataStores()) |
| saveStoreMetadata(store, ctx, false, needSnapshot); |
| } |
| else { |
| execSvc.execute(() -> { |
| try { |
| reuseList.saveMetadata(grp.statisticsHolderData()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| |
| for (CacheDataStore store : cacheDataStores()) |
| execSvc.execute(() -> { |
| try { |
| saveStoreMetadata(store, ctx, false, needSnapshot); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| }); |
| } |
| |
| if (grp.config().isEncryptionEnabled()) |
| saveIndexReencryptionStatus(grp.groupId()); |
| } |
| |
| /** |
| * @param store Store to save metadata. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void saveStoreMetadata( |
| CacheDataStore store, |
| Context ctx, |
| boolean beforeDestroy, |
| boolean needSnapshot |
| ) throws IgniteCheckedException { |
| RowStore rowStore0 = store.rowStore(); |
| |
| if (rowStore0 != null && (partitionStatesRestored || grp.isLocal())) { |
| ((CacheFreeList)rowStore0.freeList()).saveMetadata(grp.statisticsHolderData()); |
| |
| PartitionMetaStorage<SimpleDataRow> partStore = store.partStorage(); |
| |
| long updCntr = store.updateCounter(); |
| long size = store.fullSize(); |
| long rmvId = globalRemoveId().get(); |
| |
| byte[] updCntrsBytes = store.partUpdateCounter().getBytes(); |
| |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = this.ctx.wal(); |
| GridEncryptionManager encMgr = this.ctx.kernalContext().encryption(); |
| |
| if (size > 0 || updCntr > 0 || !store.partUpdateCounter().sequential() || |
| (grp.config().isEncryptionEnabled() && encMgr.getEncryptionState(grp.groupId(), store.partId()) > 0)) { |
| GridDhtPartitionState state = null; |
| |
| // localPartition will not acquire writeLock here because create=false. |
| GridDhtLocalPartition part = null; |
| |
| if (!grp.isLocal()) { |
| if (beforeDestroy) |
| state = GridDhtPartitionState.EVICTED; |
| else { |
| part = getPartition(store); |
| |
| if (part != null && part.state() != GridDhtPartitionState.EVICTED) |
| state = part.state(); |
| } |
| |
| // Do not save meta for evicted partitions on next checkpoints. |
| if (state == null) |
| return; |
| } |
| |
| int grpId = grp.groupId(); |
| long partMetaId = pageMem.partitionMetaPageId(grpId, store.partId()); |
| |
| long partMetaPage = pageMem.acquirePage(grpId, partMetaId); |
| try { |
| long partMetaPageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); |
| |
| if (partMetaPageAddr == 0L) { |
| U.warn(log, "Failed to acquire write lock for meta page [metaPage=" + partMetaPage + |
| ", beforeDestroy=" + beforeDestroy + ", size=" + size + |
| ", updCntr=" + updCntr + ", state=" + state + ']'); |
| |
| return; |
| } |
| |
| boolean changed = false; |
| |
| try { |
| PagePartitionMetaIOV3 io = PageIO.getPageIO(partMetaPageAddr); |
| |
| long link = io.getGapsLink(partMetaPageAddr); |
| |
| if (updCntrsBytes == null && link != 0) { |
| partStore.removeDataRowByLink(link, grp.statisticsHolderData()); |
| |
| io.setGapsLink(partMetaPageAddr, (link = 0)); |
| |
| changed = true; |
| } |
| else if (updCntrsBytes != null && link == 0) { |
| SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes); |
| |
| partStore.insertDataRow(row, grp.statisticsHolderData()); |
| |
| io.setGapsLink(partMetaPageAddr, (link = row.link())); |
| |
| changed = true; |
| } |
| else if (updCntrsBytes != null && link != 0) { |
| byte[] prev = partStore.readRow(link); |
| |
| assert prev != null : "Read null gaps using link=" + link; |
| |
| if (!Arrays.equals(prev, updCntrsBytes)) { |
| partStore.removeDataRowByLink(link, grp.statisticsHolderData()); |
| |
| SimpleDataRow row = new SimpleDataRow(store.partId(), updCntrsBytes); |
| |
| partStore.insertDataRow(row, grp.statisticsHolderData()); |
| |
| io.setGapsLink(partMetaPageAddr, (link = row.link())); |
| |
| changed = true; |
| } |
| } |
| |
| if (changed) |
| partStore.saveMetadata(grp.statisticsHolderData()); |
| |
| changed |= io.setUpdateCounter(partMetaPageAddr, updCntr); |
| changed |= io.setGlobalRemoveId(partMetaPageAddr, rmvId); |
| changed |= io.setSize(partMetaPageAddr, size); |
| |
| int encryptIdx = 0; |
| int encryptCnt = 0; |
| |
| if (grp.config().isEncryptionEnabled()) { |
| long reencryptState = encMgr.getEncryptionState(grpId, store.partId()); |
| |
| if (reencryptState != 0) { |
| encryptIdx = ReencryptStateUtils.pageIndex(reencryptState); |
| encryptCnt = ReencryptStateUtils.pageCount(reencryptState); |
| |
| if (encryptIdx == encryptCnt) { |
| encMgr.setEncryptionState(grp, store.partId(), 0, 0); |
| |
| encryptIdx = encryptCnt = 0; |
| } |
| |
| changed |= io.setEncryptedPageIndex(partMetaPageAddr, encryptIdx); |
| changed |= io.setEncryptedPageCount(partMetaPageAddr, encryptCnt); |
| } |
| } |
| |
| if (state != null) |
| changed |= io.setPartitionState(partMetaPageAddr, (byte)state.ordinal()); |
| else |
| assert grp.isLocal() : grp.cacheOrGroupName(); |
| |
| long cntrsPageId; |
| |
| if (grp.sharedGroup()) { |
| long initCntrPageId = io.getCountersPageId(partMetaPageAddr); |
| |
| Map<Integer, Long> newSizes = store.cacheSizes(); |
| Map<Integer, Long> prevSizes = readSharedGroupCacheSizes(pageMem, grpId, initCntrPageId); |
| |
| if (prevSizes != null && prevSizes.equals(newSizes)) |
| cntrsPageId = initCntrPageId; // Preventing modification of sizes pages for store |
| else { |
| cntrsPageId = writeSharedGroupCacheSizes(pageMem, grpId, initCntrPageId, |
| store.partId(), newSizes); |
| |
| if (initCntrPageId == 0 && cntrsPageId != 0) { |
| io.setCountersPageId(partMetaPageAddr, cntrsPageId); |
| |
| changed = true; |
| } |
| } |
| } |
| else |
| cntrsPageId = 0L; |
| |
| int pageCnt; |
| |
| if (needSnapshot) { |
| pageCnt = this.ctx.pageStore().pages(grpId, store.partId()); |
| |
| io.setCandidatePageCount(partMetaPageAddr, size == 0 ? 0 : pageCnt); |
| |
| if (state == OWNING) { |
| assert part != null; |
| |
| if (!addPartition( |
| part, |
| ctx.partitionStatMap(), |
| partMetaPageAddr, |
| io, |
| grpId, |
| store.partId(), |
| this.ctx.pageStore().pages(grpId, store.partId()), |
| store.fullSize() |
| )) |
| U.warn(log, "Partition was concurrently evicted grpId=" + grpId + |
| ", partitionId=" + part.id()); |
| } |
| else if (state == MOVING || state == RENTING) { |
| if (ctx.partitionStatMap().forceSkipIndexPartition(grpId)) { |
| if (log.isInfoEnabled()) |
| log.info("Will not include SQL indexes to snapshot because there is " + |
| "a partition not in " + OWNING + " state [grp=" + grp.cacheOrGroupName() + |
| ", partId=" + store.partId() + ", state=" + state + ']'); |
| } |
| } |
| |
| changed = true; |
| } |
| else |
| pageCnt = io.getCandidatePageCount(partMetaPageAddr); |
| |
| if (changed && isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) |
| wal.log(new MetaPageUpdatePartitionDataRecordV3( |
| grpId, |
| partMetaId, |
| updCntr, |
| rmvId, |
| (int)size, // TODO: Partition size may be long |
| cntrsPageId, |
| state == null ? -1 : (byte)state.ordinal(), |
| pageCnt, |
| link, |
| encryptIdx, |
| encryptCnt |
| )); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, changed); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, partMetaId, partMetaPage); |
| } |
| } |
| else if (needSnapshot) |
| tryAddEmptyPartitionToSnapshot(store, ctx); |
| } |
| else if (needSnapshot) |
| tryAddEmptyPartitionToSnapshot(store, ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<Integer, Long> restorePartitionStates( |
| Map<GroupPartitionId, Integer> partRecoveryStates |
| ) throws IgniteCheckedException { |
| if (grp.isLocal() || !grp.affinityNode() || !grp.dataRegion().config().isPersistenceEnabled() |
| || partitionStatesRestored) |
| return Collections.emptyMap(); |
| |
| Map<Integer, Long> processed = new HashMap<>(); |
| |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| |
| for (int p = 0; p < grp.affinity().partitions(); p++) { |
| Integer recoverState = partRecoveryStates.get(new GroupPartitionId(grp.groupId(), p)); |
| |
| long startTime = U.currentTimeMillis(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Started restoring partition state [grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); |
| |
| if (ctx.pageStore().exists(grp.groupId(), p)) { |
| ctx.pageStore().ensure(grp.groupId(), p); |
| |
| if (ctx.pageStore().pages(grp.groupId(), p) <= 1) { |
| if (log.isDebugEnabled()) { |
| log.debug("Skipping partition on recovery (pages less than 1) " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); |
| } |
| |
| continue; |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Creating partition on recovery (exists in page store) " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); |
| } |
| |
| GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); |
| |
| // Triggers initialization of existing(having datafile) partition before acquiring cp read lock. |
| part.dataStore().init(); |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| long partMetaId = pageMem.partitionMetaPageId(grp.groupId(), p); |
| long partMetaPage = pageMem.acquirePage(grp.groupId(), partMetaId); |
| |
| try { |
| long pageAddr = pageMem.writeLock(grp.groupId(), partMetaId, partMetaPage); |
| |
| boolean changed = false; |
| |
| try { |
| PagePartitionMetaIO io = PagePartitionMetaIO.VERSIONS.forPage(pageAddr); |
| |
| if (recoverState != null) { |
| changed = io.setPartitionState(pageAddr, (byte)recoverState.intValue()); |
| |
| updateState(part, recoverState); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Restored partition state (from WAL) " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + |
| ", updCntr=" + part.initialUpdateCounter() + |
| ", size=" + part.fullSize() + ']'); |
| } |
| } |
| else { |
| int stateId = io.getPartitionState(pageAddr); |
| |
| updateState(part, stateId); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Restored partition state (from page memory) " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + |
| ", updCntr=" + part.initialUpdateCounter() + ", stateId=" + stateId + |
| ", size=" + part.fullSize() + ']'); |
| } |
| } |
| } |
| finally { |
| pageMem.writeUnlock(grp.groupId(), partMetaId, partMetaPage, null, changed); |
| } |
| } |
| finally { |
| pageMem.releasePage(grp.groupId(), partMetaId, partMetaPage); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| processed.put(p, U.currentTimeMillis() - startTime); |
| } |
| else if (recoverState != null) { // Pre-create partition if having valid state. |
| GridDhtLocalPartition part = grp.topology().forceCreatePartition(p); |
| |
| updateState(part, recoverState); |
| |
| processed.put(p, U.currentTimeMillis() - startTime); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Restored partition state (from WAL) " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", state=" + part.state() + |
| ", updCntr=" + part.initialUpdateCounter() + |
| ", size=" + part.fullSize() + ']'); |
| } |
| } |
| else { |
| if (log.isDebugEnabled()) { |
| log.debug("Skipping partition on recovery (no page store OR wal state) " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); |
| } |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Finished restoring partition state " + |
| "[grp=" + grp.cacheOrGroupName() + ", p=" + p + |
| ", time=" + U.humanReadableDuration(U.currentTimeMillis() - startTime) + ']'); |
| } |
| } |
| |
| partitionStatesRestored = true; |
| |
| return processed; |
| } |
| |
| /** |
| * @param part Partition to restore state for. |
| * @param stateId State enum ordinal. |
| */ |
| private void updateState(GridDhtLocalPartition part, int stateId) { |
| if (stateId != -1) { |
| GridDhtPartitionState state = GridDhtPartitionState.fromOrdinal(stateId); |
| |
| assert state != null; |
| |
| part.restoreState(state == EVICTED ? RENTING : state); |
| } |
| } |
| |
| /** |
| * Check that we need to snapshot this partition and add it to map. |
| * |
| * @param store Store. |
| * @param ctx Snapshot context. |
| */ |
| private void tryAddEmptyPartitionToSnapshot(CacheDataStore store, Context ctx) { |
| GridDhtLocalPartition locPart = getPartition(store); |
| |
| if (locPart != null && locPart.state() == OWNING) { |
| ctx.partitionStatMap().put( |
| new GroupPartitionId(grp.groupId(), store.partId()), |
| new PagesAllocationRange(0, 0)); |
| } |
| } |
| |
| /** |
| * @param store Store. |
| * |
| * @return corresponding to store local partition |
| */ |
| private GridDhtLocalPartition getPartition(CacheDataStore store) { |
| return grp.topology().localPartition(store.partId(), |
| AffinityTopologyVersion.NONE, false, true); |
| } |
| |
| /** |
| * Loads cache sizes for all caches in shared group. |
| * |
| * @param pageMem page memory to perform operations on pages. |
| * @param grpId Cache group ID. |
| * @param cntrsPageId Counters page ID, if zero is provided that means no counters page exist. |
| * @return Cache sizes if store belongs to group containing multiple caches and sizes are available in memory. May |
| * return null if counter page does not exist. |
| * @throws IgniteCheckedException If page memory operation failed. |
| */ |
| @Nullable public static Map<Integer, Long> readSharedGroupCacheSizes(PageSupport pageMem, int grpId, |
| long cntrsPageId) throws IgniteCheckedException { |
| |
| if (cntrsPageId == 0L) |
| return null; |
| |
| Map<Integer, Long> cacheSizes = new HashMap<>(); |
| |
| long nextId = cntrsPageId; |
| |
| while (true) { |
| long curId = nextId; |
| long curPage = pageMem.acquirePage(grpId, curId); |
| |
| try { |
| long curAddr = pageMem.readLock(grpId, curId, curPage); |
| |
| assert curAddr != 0; |
| |
| try { |
| PagePartitionCountersIO cntrsIO = PageIO.getPageIO(curAddr); |
| |
| if (cntrsIO.readCacheSizes(curAddr, cacheSizes)) |
| break; |
| |
| nextId = cntrsIO.getNextCountersPageId(curAddr); |
| |
| assert nextId != 0; |
| } |
| finally { |
| pageMem.readUnlock(grpId, curId, curPage); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, curId, curPage); |
| } |
| } |
| return cacheSizes; |
| } |
| |
| /** |
| * Saves cache sizes for all caches in shared group. Unconditionally marks pages as dirty. |
| * |
| * @param pageMem page memory to perform operations on pages. |
| * @param grpId Cache group ID. |
| * @param cntrsPageId Counters page ID, if zero is provided that means no counters page exist. |
| * @param partId Partition ID. |
| * @param sizes Cache sizes of all caches in group. Not null. |
| * @return new counter page Id. Same as {@code cntrsPageId} or new value if cache size pages were initialized. |
| * @throws IgniteCheckedException if page memory operation failed. |
| */ |
| public static long writeSharedGroupCacheSizes(PageMemory pageMem, int grpId, |
| long cntrsPageId, int partId, Map<Integer, Long> sizes) throws IgniteCheckedException { |
| byte[] data = PagePartitionCountersIO.VERSIONS.latest().serializeCacheSizes(sizes); |
| |
| int items = data.length / PagePartitionCountersIO.ITEM_SIZE; |
| boolean init = cntrsPageId == 0; |
| |
| if (init && !sizes.isEmpty()) |
| cntrsPageId = pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_AUX); |
| |
| long nextId = cntrsPageId; |
| int written = 0; |
| |
| PageMetrics metrics = pageMem.metrics().cacheGrpPageMetrics(grpId); |
| |
| while (written != items) { |
| long curId = nextId; |
| long curPage = pageMem.acquirePage(grpId, curId); |
| |
| try { |
| long curAddr = pageMem.writeLock(grpId, curId, curPage); |
| |
| assert curAddr != 0; |
| |
| try { |
| PagePartitionCountersIO partCntrIo; |
| |
| if (init) { |
| partCntrIo = PagePartitionCountersIO.VERSIONS.latest(); |
| |
| partCntrIo.initNewPage(curAddr, curId, pageMem.realPageSize(grpId), metrics); |
| } |
| else |
| partCntrIo = PageIO.getPageIO(curAddr); |
| |
| written += partCntrIo.writeCacheSizes(pageMem.realPageSize(grpId), curAddr, data, written); |
| |
| nextId = partCntrIo.getNextCountersPageId(curAddr); |
| |
| if (written != items && (init = nextId == 0)) { |
| //allocate new counters page |
| nextId = pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_AUX); |
| partCntrIo.setNextCountersPageId(curAddr, nextId); |
| } |
| } |
| finally { |
| // Write full page |
| pageMem.writeUnlock(grpId, curId, curPage, Boolean.TRUE, true); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, curId, curPage); |
| } |
| } |
| |
| return cntrsPageId; |
| } |
| |
| /** |
| * @param ctx Context. |
| */ |
| private void addPartitions(Context ctx) throws IgniteCheckedException { |
| int grpId = grp.groupId(); |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| |
| long metaPageId = PageMemory.META_PAGE_ID; |
| long metaPage = pageMem.acquirePage(grpId, metaPageId); |
| |
| try { |
| long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage); |
| |
| if (metaPageAddr == 0L) { |
| U.warn(log, "Failed to acquire write lock for index meta page [grpId=" + grpId + |
| ", metaPageId=" + metaPageId + ']'); |
| |
| return; |
| } |
| |
| boolean changed = false; |
| |
| try { |
| PageMetaIO metaIo = PageMetaIO.getPageIO(metaPageAddr); |
| |
| int pageCnt = this.ctx.pageStore().pages(grpId, PageIdAllocator.INDEX_PARTITION); |
| |
| changed = metaIo.setCandidatePageCount(metaPageAddr, pageCnt); |
| |
| // Following method doesn't modify page data, it only reads last allocated page count from it. |
| addPartition( |
| null, |
| ctx.partitionStatMap(), |
| metaPageAddr, |
| metaIo, |
| grpId, |
| PageIdAllocator.INDEX_PARTITION, |
| pageCnt, |
| -1); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, metaPageId, metaPage, null, changed); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, metaPageId, metaPage); |
| } |
| } |
| |
| /** |
| * @param part Local partition. |
| * @param map Map to add values to. |
| * @param metaPageAddr Meta page address |
| * @param io Page Meta IO |
| * @param grpId Cache Group ID. |
| * @param currAllocatedPageCnt total number of pages allocated for partition <code>[partition, grpId]</code> |
| */ |
| private static boolean addPartition( |
| GridDhtLocalPartition part, |
| PartitionAllocationMap map, |
| long metaPageAddr, |
| PageMetaIO io, |
| int grpId, |
| int partId, |
| int currAllocatedPageCnt, |
| long partSize |
| ) { |
| if (part != null) { |
| boolean reserved = part.reserve(); |
| |
| if (!reserved) |
| return false; |
| } |
| else |
| assert partId == PageIdAllocator.INDEX_PARTITION : partId; |
| |
| assert PageIO.getPageId(metaPageAddr) != 0; |
| |
| int lastAllocatedPageCnt = io.getLastAllocatedPageCount(metaPageAddr); |
| |
| int curPageCnt = partSize == 0 ? 0 : currAllocatedPageCnt; |
| |
| map.put( |
| new GroupPartitionId(grpId, partId), |
| new PagesAllocationRange(lastAllocatedPageCnt, curPageCnt)); |
| |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { |
| assert ctx.database() instanceof GridCacheDatabaseSharedManager |
| : "Destroying cache data store when persistence is not enabled: " + ctx.database(); |
| |
| int partId = store.partId(); |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| saveStoreMetadata(store, null, true, false); |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| store.markDestroyed(); |
| |
| ((GridCacheDatabaseSharedManager)ctx.database()).schedulePartitionDestroy(grp.groupId(), partId); |
| } |
| |
| /** |
| * Invalidates page memory for given partition. Destroys partition store. |
| * <b>NOTE:</b> This method can be invoked only within checkpoint lock or checkpointer thread. |
| * |
| * @param partId Partition ID. |
| * |
| * @throws IgniteCheckedException If destroy has failed. |
| */ |
| public void destroyPartitionStore(int partId) throws IgniteCheckedException { |
| PageMemoryEx pageMemory = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| |
| int tag = pageMemory.invalidate(grp.groupId(), partId); |
| |
| if (grp.walEnabled()) |
| ctx.wal().log(new PartitionDestroyRecord(grp.groupId(), partId)); |
| |
| ctx.pageStore().truncate(grp.groupId(), partId, tag); |
| |
| if (grp.config().isEncryptionEnabled()) |
| ctx.kernalContext().encryption().onDestroyPartitionStore(grp, partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RootPage rootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException { |
| return indexStorage.allocateCacheIndex(cacheId, idxName, segment); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable RootPage findRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException { |
| return indexStorage.findCacheIndex(cacheId, idxName, segment); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable RootPage dropRootPageForIndex( |
| int cacheId, |
| String idxName, |
| int segment |
| ) throws IgniteCheckedException { |
| return indexStorage.dropCacheIndex(cacheId, idxName, segment); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable RootPage renameRootPageForIndex( |
| int cacheId, |
| String oldIdxName, |
| String newIdxName, |
| int segment |
| ) throws IgniteCheckedException { |
| return indexStorage.renameCacheIndex(cacheId, oldIdxName, newIdxName, segment); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ReuseList reuseListForIndex(String idxName) { |
| return reuseList; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| if (reuseList != null) |
| reuseList.close(); |
| |
| if (grp.affinityNode()) |
| ((GridCacheDatabaseSharedManager)ctx.database()).removeCheckpointListener(this); |
| } |
| |
| /** |
| * @return Meta root pages info. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private Metas getOrAllocateCacheMetas() throws IgniteCheckedException { |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = ctx.wal(); |
| |
| int grpId = grp.groupId(); |
| long metaId = PageMemory.META_PAGE_ID; |
| long metaPage = pageMem.acquirePage(grpId, metaId); |
| |
| try { |
| long pageAddr = pageMem.writeLock(grpId, metaId, metaPage); |
| |
| boolean allocated = false; |
| boolean markDirty = false; |
| |
| try { |
| long metastoreRoot, reuseListRoot; |
| |
| PageMetaIOV2 io = (PageMetaIOV2)PageMetaIO.VERSIONS.latest(); |
| |
| if (PageIO.getType(pageAddr) != PageIO.T_META) { |
| PageMetrics metrics = pageMem.metrics().cacheGrpPageMetrics(grpId); |
| |
| io.initNewPage(pageAddr, metaId, pageMem.realPageSize(grpId), metrics); |
| |
| metastoreRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); |
| reuseListRoot = pageMem.allocatePage(grpId, PageIdAllocator.INDEX_PARTITION, PageMemory.FLAG_IDX); |
| |
| io.setTreeRoot(pageAddr, metastoreRoot); |
| io.setReuseListRoot(pageAddr, reuseListRoot); |
| |
| if (isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) { |
| assert io.getType() == PageIO.T_META; |
| |
| wal.log(new MetaPageInitRecord( |
| grpId, |
| metaId, |
| io.getType(), |
| io.getVersion(), |
| metastoreRoot, |
| reuseListRoot |
| )); |
| } |
| |
| allocated = true; |
| } |
| else { |
| if (io != PageIO.getPageIO(pageAddr)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Upgrade index partition meta page version: [grpId=" + grpId + |
| ", oldVer=" + PagePartitionMetaIO.getVersion(pageAddr) + |
| ", newVer=" + io.getVersion() + ']'); |
| } |
| |
| io.upgradePage(pageAddr); |
| |
| markDirty = true; |
| } |
| |
| metastoreRoot = io.getTreeRoot(pageAddr); |
| reuseListRoot = io.getReuseListRoot(pageAddr); |
| |
| int encrPageCnt = io.getEncryptedPageCount(pageAddr); |
| |
| if (encrPageCnt > 0) { |
| ctx.kernalContext().encryption().setEncryptionState(grp, PageIdAllocator.INDEX_PARTITION, |
| io.getEncryptedPageIndex(pageAddr), encrPageCnt); |
| |
| markDirty = true; |
| } |
| |
| assert reuseListRoot != 0L; |
| |
| if (markDirty && isWalDeltaRecordNeeded(pageMem, grpId, metaId, metaPage, wal, null)) { |
| wal.log(new PageSnapshot(new FullPageId(PageIdAllocator.INDEX_PARTITION, grpId), pageAddr, |
| pageMem.pageSize(), pageMem.realPageSize(grpId))); |
| } |
| } |
| |
| return new Metas( |
| new RootPage(new FullPageId(metastoreRoot, grpId), allocated), |
| new RootPage(new FullPageId(reuseListRoot, grpId), allocated), |
| null, |
| null); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, metaId, metaPage, null, allocated || markDirty); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, metaId, metaPage); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override @Nullable protected IgniteHistoricalIterator historicalIterator( |
| CachePartitionPartialCountersMap partCntrs, |
| Set<Integer> missing |
| ) throws IgniteCheckedException { |
| if (partCntrs == null || partCntrs.isEmpty()) |
| return null; |
| |
| if (grp.mvccEnabled()) // TODO IGNITE-7384 |
| return super.historicalIterator(partCntrs, missing); |
| |
| GridCacheDatabaseSharedManager database = (GridCacheDatabaseSharedManager)grp.shared().database(); |
| |
| Map<Integer, Long> partsCounters = new HashMap<>(); |
| |
| for (int i = 0; i < partCntrs.size(); i++) { |
| int p = partCntrs.partitionAt(i); |
| long initCntr = partCntrs.initialUpdateCounterAt(i); |
| |
| partsCounters.put(p, initCntr); |
| } |
| |
| try { |
| WALPointer minPtr = database.checkpointHistory().searchEarliestWalPointer(grp.groupId(), |
| partsCounters, grp.hasAtomicCaches() ? walAtomicCacheMargin : 0L); |
| |
| WALPointer latestReservedPointer = database.latestWalPointerReservedForPreloading(); |
| |
| assert latestReservedPointer == null || latestReservedPointer.compareTo(minPtr) <= 0 |
| : "Historical iterator tries to iterate WAL out of reservation [cache=" + grp.cacheOrGroupName() |
| + ", reservedPointer=" + latestReservedPointer |
| + ", historicalPointer=" + minPtr + ']'; |
| |
| if (latestReservedPointer == null) |
| log.warning("History for the preloading has not reserved yet."); |
| |
| WALIterator it = grp.shared().wal().replay(minPtr); |
| |
| WALHistoricalIterator histIt = new WALHistoricalIterator(log, grp, partCntrs, partsCounters, it); |
| |
| // Add historical partitions which are unabled to reserve to missing set. |
| missing.addAll(histIt.missingParts); |
| |
| return histIt; |
| } |
| catch (Exception ex) { |
| if (!X.hasCause(ex, IgniteHistoricalIteratorException.class)) |
| throw new IgniteHistoricalIteratorException(ex); |
| |
| throw ex; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean expire( |
| GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount |
| ) throws IgniteCheckedException { |
| assert !cctx.isNear() : cctx.name(); |
| |
| // Prevent manager being stopped in the middle of pds operation. |
| if (!busyLock.enterBusy()) |
| return false; |
| |
| try { |
| int cleared = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) { |
| cleared += ((GridCacheDataStore)store).purgeExpired(cctx, c, unwindThrottlingTimeout, amount - cleared); |
| |
| if (amount != -1 && cleared >= amount) |
| return true; |
| } |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long expiredSize() throws IgniteCheckedException { |
| long size = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) |
| size += ((GridCacheDataStore)store).expiredSize(); |
| |
| return size; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void preloadPartition(int partId) throws IgniteCheckedException { |
| if (grp.isLocal()) { |
| dataStore(null).preload(); |
| |
| return; |
| } |
| |
| GridDhtLocalPartition locPart = grp.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false); |
| |
| assert locPart != null && locPart.reservations() > 0; |
| |
| locPart.dataStore().preload(); |
| } |
| |
| /** |
| * Calculates free space of all partition data stores - number of bytes available for use in allocated pages. |
| * |
| * @return free space size in bytes. |
| */ |
| long freeSpace() { |
| long freeSpace = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) { |
| assert store instanceof GridCacheDataStore; |
| |
| AbstractFreeList freeList = ((GridCacheDataStore)store).getCacheStoreFreeList(); |
| |
| if (freeList == null) |
| continue; |
| |
| freeSpace += freeList.freeSpace(); |
| } |
| |
| return freeSpace; |
| } |
| |
| /** |
| * Calculates empty data pages of all partition data stores. |
| * |
| * @return empty data pages count. |
| */ |
| long emptyDataPages() { |
| long emptyDataPages = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) { |
| assert store instanceof GridCacheDataStore; |
| |
| AbstractFreeList freeList = ((GridCacheDataStore)store).getCacheStoreFreeList(); |
| |
| if (freeList == null) |
| continue; |
| |
| emptyDataPages += freeList.emptyDataPages(); |
| } |
| |
| return emptyDataPages; |
| } |
| |
| /** |
| * @param cacheId Which was stopped, but its data still presented. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void findAndCleanupLostIndexesForStoppedCache(int cacheId) throws IgniteCheckedException { |
| for (String name : indexStorage.getIndexNames()) { |
| if (indexStorage.nameIsAssosiatedWithCache(name, cacheId)) { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| RootPage page = indexStorage.allocateIndex(name); |
| |
| ctx.kernalContext().indexProcessor().destroyOrphanIndex( |
| ctx.kernalContext(), |
| page, |
| name, |
| grp.groupId(), |
| grp.dataRegion().pageMemory(), |
| globalRemoveId(), |
| reuseListForIndex(name), |
| grp.mvccEnabled() |
| ); |
| |
| indexStorage.dropIndex(name); |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * @param grpId Cache group ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void saveIndexReencryptionStatus(int grpId) throws IgniteCheckedException { |
| long state = ctx.kernalContext().encryption().getEncryptionState(grpId, PageIdAllocator.INDEX_PARTITION); |
| |
| if (state == 0) |
| return; |
| |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| |
| long metaPageId = PageIdAllocator.META_PAGE_ID; |
| long metaPage = pageMem.acquirePage(grpId, metaPageId); |
| |
| try { |
| boolean changed = false; |
| |
| long metaPageAddr = pageMem.writeLock(grpId, metaPageId, metaPage); |
| |
| try { |
| PageMetaIOV2 metaIo = PageMetaIO.getPageIO(metaPageAddr); |
| |
| int encryptIdx = ReencryptStateUtils.pageIndex(state); |
| int encryptCnt = ReencryptStateUtils.pageCount(state); |
| |
| if (encryptIdx == encryptCnt) { |
| ctx.kernalContext().encryption().setEncryptionState(grp, PageIdAllocator.INDEX_PARTITION, 0, 0); |
| |
| encryptIdx = encryptCnt = 0; |
| } |
| |
| changed |= metaIo.setEncryptedPageIndex(metaPageAddr, encryptIdx); |
| changed |= metaIo.setEncryptedPageCount(metaPageAddr, encryptCnt); |
| |
| IgniteWriteAheadLogManager wal = ctx.cache().context().wal(); |
| |
| if (changed && PageHandler.isWalDeltaRecordNeeded(pageMem, grpId, metaPageId, metaPage, wal, null)) |
| wal.log(new MetaPageUpdateIndexDataRecord(grpId, metaPageId, encryptIdx, encryptCnt)); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, metaPageId, metaPage, null, changed); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, metaPageId, metaPage); |
| } |
| } |
| |
| /** */ |
| public GridCacheDataStore createGridCacheDataStore( |
| CacheGroupContext grpCtx, |
| int partId, |
| boolean exists, |
| IgniteLogger log |
| ) { |
| return new GridCacheDataStore( |
| grpCtx, |
| partId, |
| exists, |
| busyLock, |
| log |
| ); |
| } |
| |
| /** |
| * |
| */ |
| private static class WALHistoricalIterator implements IgniteHistoricalIterator { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Cache context. */ |
| private final CacheGroupContext grp; |
| |
| /** Partition counters map. */ |
| private final CachePartitionPartialCountersMap partMap; |
| |
| /** Partitions marked as missing (unable to reserve or partition is not in OWNING state). */ |
| private final Set<Integer> missingParts = new HashSet<>(); |
| |
| /** Partitions marked as done. */ |
| private final Set<Integer> doneParts = new HashSet<>(); |
| |
| /** Cache IDs. This collection is stored as field to avoid re-calculation on each iteration. */ |
| private final Set<Integer> cacheIds; |
| |
| /** WAL iterator. */ |
| private final WALIterator walIt; |
| |
| /** */ |
| private Iterator<DataEntry> entryIt; |
| |
| /** */ |
| private DataEntry next; |
| |
| /** |
| * Rebalanced counters in the range from initialUpdateCntr to updateCntr. |
| * Invariant: initUpdCntr[idx] + rebalancedCntrs[idx] = updateCntr[idx] |
| */ |
| private final long[] rebalancedCntrs; |
| |
| /** A partition what will be finished on next iteration. */ |
| private int donePart = -1; |
| |
| /** |
| * @param log Logger. |
| * @param grp Cache context. |
| * @param walIt WAL iterator. |
| */ |
| private WALHistoricalIterator( |
| IgniteLogger log, |
| CacheGroupContext grp, |
| CachePartitionPartialCountersMap partMap, |
| Map<Integer, Long> updatedPartCntr, |
| WALIterator walIt) { |
| this.log = log; |
| this.grp = grp; |
| this.partMap = partMap; |
| this.walIt = walIt; |
| |
| cacheIds = grp.cacheIds(); |
| |
| rebalancedCntrs = new long[partMap.size()]; |
| |
| for (int i = 0; i < rebalancedCntrs.length; i++) { |
| int p = partMap.partitionAt(i); |
| |
| rebalancedCntrs[i] = updatedPartCntr.get(p); |
| |
| partMap.initialUpdateCounterAt(i, rebalancedCntrs[i]); |
| } |
| |
| reservePartitions(); |
| |
| advance(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean contains(int partId) { |
| return partMap.contains(partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDone(int partId) { |
| return doneParts.contains(partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void close() throws IgniteCheckedException { |
| walIt.close(); |
| releasePartitions(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isClosed() { |
| return walIt.isClosed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNextX() { |
| return hasNext(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow nextX() throws IgniteCheckedException { |
| return next(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeX() throws IgniteCheckedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterator<CacheDataRow> iterator() { |
| return this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow next() { |
| if (next == null) |
| throw new NoSuchElementException(); |
| |
| CacheDataRow val = new DataEntryRow(next); |
| |
| if (donePart != -1) { |
| int pIdx = partMap.partitionIndex(donePart); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Partition done [grpId=" + grp.groupId() + |
| ", partId=" + donePart + |
| ", from=" + partMap.initialUpdateCounterAt(pIdx) + |
| ", to=" + partMap.updateCounterAt(pIdx) + ']'); |
| } |
| |
| doneParts.add(donePart); |
| |
| donePart = -1; |
| } |
| |
| advance(); |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Reserve historical partitions. |
| * If partition is unable to reserve, id of that partition is placed to {@link #missingParts} set. |
| */ |
| private void reservePartitions() { |
| for (int i = 0; i < partMap.size(); i++) { |
| int p = partMap.partitionAt(i); |
| GridDhtLocalPartition part = grp.topology().localPartition(p); |
| |
| if (part == null || !part.reserve()) { |
| missingParts.add(p); |
| continue; |
| } |
| |
| if (part.state() != OWNING) { |
| part.release(); |
| missingParts.add(p); |
| } |
| } |
| } |
| |
| /** |
| * Release historical partitions. |
| */ |
| private void releasePartitions() { |
| for (int i = 0; i < partMap.size(); i++) { |
| int p = partMap.partitionAt(i); |
| |
| if (missingParts.contains(p)) |
| continue; |
| |
| GridDhtLocalPartition part = grp.topology().localPartition(p); |
| |
| assert part != null && part.state() == OWNING && part.reservations() > 0 |
| : "Partition should in OWNING state and has at least 1 reservation"; |
| |
| part.release(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void advance() { |
| try { |
| next = null; |
| |
| outer: |
| while (doneParts.size() != partMap.size()) { |
| if (entryIt != null) { |
| while (entryIt.hasNext()) { |
| DataEntry entry = entryIt.next(); |
| |
| if (cacheIds.contains(entry.cacheId())) { |
| int idx = partMap.partitionIndex(entry.partitionId()); |
| |
| if (idx < 0 || missingParts.contains(idx)) |
| continue; |
| |
| long from = partMap.initialUpdateCounterAt(idx); |
| long to = partMap.updateCounterAt(idx); |
| |
| if (entry.partitionCounter() > from && entry.partitionCounter() <= to) { |
| // Partition will be marked as done for current entry on next iteration. |
| if (++rebalancedCntrs[idx] == to) |
| donePart = entry.partitionId(); |
| |
| next = entry; |
| |
| return; |
| } |
| } |
| } |
| } |
| |
| entryIt = null; |
| |
| // Search for next DataEntry while applying rollback counters. |
| while (walIt.hasNext()) { |
| IgniteBiTuple<WALPointer, WALRecord> rec = walIt.next(); |
| |
| if (rec.get2() instanceof DataRecord) { |
| DataRecord data = (DataRecord)rec.get2(); |
| |
| entryIt = data.writeEntries().iterator(); |
| |
| // Move on to the next valid data entry. |
| continue outer; |
| } |
| else if (rec.get2() instanceof RollbackRecord) { |
| RollbackRecord rbRec = (RollbackRecord)rec.get2(); |
| |
| if (grp.groupId() == rbRec.groupId()) { |
| int idx = partMap.partitionIndex(rbRec.partitionId()); |
| |
| if (idx < 0 || missingParts.contains(idx)) |
| continue; |
| |
| long from = partMap.initialUpdateCounterAt(idx); |
| long to = partMap.updateCounterAt(idx); |
| |
| rebalancedCntrs[idx] += rbRec.overlap(from, to); |
| |
| if (rebalancedCntrs[idx] == partMap.updateCounterAt(idx)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Partition done [grpId=" + grp.groupId() + |
| ", partId=" + donePart + |
| ", from=" + from + |
| ", to=" + to + ']'); |
| } |
| |
| doneParts.add(rbRec.partitionId()); // Add to done set immediately. |
| } |
| } |
| } |
| } |
| |
| if (entryIt == null && doneParts.size() != partMap.size()) { |
| for (int i = 0; i < partMap.size(); i++) { |
| int p = partMap.partitionAt(i); |
| |
| if (!doneParts.contains(p)) { |
| log.warning("Some partition entries were missed during historical rebalance " + |
| "[grp=" + grp + ", part=" + p + ", missed=" + (partMap.updateCounterAt(i) - rebalancedCntrs[i]) + ']'); |
| |
| doneParts.add(p); |
| } |
| } |
| |
| return; |
| } |
| } |
| } |
| catch (Exception ex) { |
| throw new IgniteHistoricalIteratorException(ex); |
| } |
| } |
| } |
| |
| /** |
| * Data entry row. |
| */ |
| private static class DataEntryRow implements CacheDataRow { |
| /** */ |
| private final DataEntry entry; |
| |
| /** |
| * @param entry Data entry. |
| */ |
| private DataEntryRow(DataEntry entry) { |
| this.entry = entry; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public KeyCacheObject key() { |
| return entry.key(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void key(KeyCacheObject key) { |
| throw new IllegalStateException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheObject value() { |
| return entry.value(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCacheVersion version() { |
| return entry.writeVersion(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long expireTime() { |
| return entry.expireTime(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partition() { |
| return entry.partitionId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() throws IgniteCheckedException { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int headerSize() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long link() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void link(long link) { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hash() { |
| return entry.key().hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int cacheId() { |
| return entry.cacheId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long mvccCoordinatorVersion() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long mvccCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int mvccOperationCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long newMvccCoordinatorVersion() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long newMvccCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int newMvccOperationCounter() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte mvccTxState() { |
| return 0; // TODO IGNITE-7384 |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public byte newMvccTxState() { |
| return 0; // TODO IGNITE-7384 |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class Metas { |
| /** */ |
| @GridToStringInclude |
| public final RootPage reuseListRoot; |
| |
| /** */ |
| @GridToStringInclude |
| public final RootPage treeRoot; |
| |
| /** */ |
| @GridToStringInclude |
| public final RootPage pendingTreeRoot; |
| |
| /** */ |
| @GridToStringInclude |
| public final RootPage partMetastoreReuseListRoot; |
| |
| /** |
| * @param treeRoot Metadata storage root. |
| * @param reuseListRoot Reuse list root. |
| */ |
| Metas(RootPage treeRoot, RootPage reuseListRoot, RootPage pendingTreeRoot, RootPage partMetastoreReuseListRoot) { |
| this.treeRoot = treeRoot; |
| this.reuseListRoot = reuseListRoot; |
| this.pendingTreeRoot = pendingTreeRoot; |
| this.partMetastoreReuseListRoot = partMetastoreReuseListRoot; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(Metas.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static class GridCacheDataStore implements CacheDataStore { |
| /** */ |
| private final int partId; |
| |
| /** */ |
| private final CacheGroupContext grp; |
| |
| /** */ |
| private volatile AbstractFreeList<CacheDataRow> freeList; |
| |
| /** */ |
| private PendingEntriesTree pendingTree; |
| |
| /** */ |
| private volatile CacheDataStoreImpl delegate; |
| |
| /** |
| * Cache id which should be throttled. |
| */ |
| private volatile int lastThrottledCacheId; |
| |
| /** |
| * Timestamp when next clean try will be allowed for the current partition |
| * in accordance with the value of {@code lastThrottledCacheId}. |
| * Used for fine-grained throttling on per-partition basis. |
| */ |
| private volatile long nextStoreCleanTimeNanos; |
| |
| /** */ |
| private volatile GridQueryRowCacheCleaner rowCacheCleaner; |
| |
| /** */ |
| private PartitionMetaStorageImpl<SimpleDataRow> partStorage; |
| |
| /** */ |
| private final boolean exists; |
| |
| /** */ |
| private final GridSpinBusyLock busyLock; |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** */ |
| private final AtomicBoolean init = new AtomicBoolean(); |
| |
| /** */ |
| private final CountDownLatch latch = new CountDownLatch(1); |
| |
| /** */ |
| private CacheDataTree dataTree; |
| |
| /** |
| * @param partId Partition. |
| * @param exists {@code True} if store exists. |
| */ |
| public GridCacheDataStore(CacheGroupContext grp, int partId, boolean exists, |
| GridSpinBusyLock busyLock, |
| IgniteLogger log) { |
| this.grp = grp; |
| this.partId = partId; |
| this.exists = exists; |
| this.busyLock = busyLock; |
| this.log = log; |
| } |
| |
| /** */ |
| public AbstractFreeList<CacheDataRow> getCacheStoreFreeList() { |
| return freeList; |
| } |
| |
| /** |
| * @return Name of free pages list. |
| */ |
| private String freeListName() { |
| return grp.cacheOrGroupName() + "-" + partId; |
| } |
| |
| /** |
| * @return Name of partition meta store. |
| */ |
| private String partitionMetaStoreName() { |
| return grp.cacheOrGroupName() + "-partstore-" + partId; |
| } |
| |
| /** |
| * @return Name of data tree. |
| */ |
| private String dataTreeName() { |
| return grp.cacheOrGroupName() + "-" + BPlusTree.treeName("p-" + partId, "CacheData"); |
| } |
| |
| /** |
| * @return Name of pending entires tree. |
| */ |
| private String pendingEntriesTreeName() { |
| return grp.cacheOrGroupName() + "-PendingEntries-" + partId; |
| } |
| |
| /** |
| * @param checkExists If {@code true} data store won't be initialized if it doesn't exists |
| * (has non empty data file). This is an optimization for lazy store initialization on writes. |
| * |
| * @return Store delegate. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private CacheDataStore init0(boolean checkExists) throws IgniteCheckedException { |
| CacheDataStoreImpl delegate0 = delegate; |
| |
| if (delegate0 != null) |
| return delegate0; |
| |
| if (checkExists) { |
| if (!exists) |
| return null; |
| } |
| |
| GridCacheSharedContext ctx = grp.shared(); |
| |
| AtomicLong pageListCacheLimit = ((GridCacheDatabaseSharedManager) ctx.database()).pageListCacheLimitHolder(grp.dataRegion()); |
| |
| IgniteCacheDatabaseSharedManager dbMgr = ctx.database(); |
| |
| dbMgr.checkpointReadLock(); |
| |
| if (init.compareAndSet(false, true)) { |
| try { |
| Metas metas = getOrAllocatePartitionMetas(); |
| |
| if (PageIdUtils.partId(metas.reuseListRoot.pageId().pageId()) != partId || |
| PageIdUtils.partId(metas.treeRoot.pageId().pageId()) != partId || |
| PageIdUtils.partId(metas.pendingTreeRoot.pageId().pageId()) != partId || |
| PageIdUtils.partId(metas.partMetastoreReuseListRoot.pageId().pageId()) != partId |
| ) { |
| throw new IgniteCheckedException("Invalid meta root allocated [" + |
| "cacheOrGroupName=" + grp.cacheOrGroupName() + |
| ", partId=" + partId + |
| ", metas=" + metas + ']'); |
| } |
| |
| String freeListName = freeListName(); |
| |
| RootPage reuseRoot = metas.reuseListRoot; |
| |
| freeList = new CacheFreeList( |
| grp.groupId(), |
| freeListName, |
| grp.dataRegion(), |
| ctx.wal(), |
| reuseRoot.pageId().pageId(), |
| reuseRoot.isAllocated(), |
| ctx.diagnostic().pageLockTracker(), |
| ctx.kernalContext(), |
| pageListCacheLimit, |
| PageIdAllocator.FLAG_AUX |
| ) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_AUX); |
| } |
| }; |
| |
| RootPage partMetastoreReuseListRoot = metas.partMetastoreReuseListRoot; |
| |
| String partMetastoreName = partitionMetaStoreName(); |
| |
| partStorage = new PartitionMetaStorageImpl<SimpleDataRow>( |
| grp.groupId(), |
| partMetastoreName, |
| grp.dataRegion(), |
| freeList, |
| ctx.wal(), |
| partMetastoreReuseListRoot.pageId().pageId(), |
| partMetastoreReuseListRoot.isAllocated(), |
| ctx.diagnostic().pageLockTracker(), |
| ctx.kernalContext(), |
| pageListCacheLimit, |
| PageIdAllocator.FLAG_AUX |
| ) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_AUX); |
| } |
| }; |
| |
| String dataTreeName = dataTreeName(); |
| |
| CacheDataRowStore rowStore = new CacheDataRowStore(grp, freeList, partId); |
| |
| RootPage treeRoot = metas.treeRoot; |
| |
| dataTree = new CacheDataTree( |
| grp, |
| dataTreeName, |
| freeList, |
| rowStore, |
| treeRoot.pageId().pageId(), |
| treeRoot.isAllocated(), |
| ctx.diagnostic().pageLockTracker(), |
| PageIdAllocator.FLAG_AUX |
| ) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_AUX); |
| } |
| }; |
| |
| String pendingEntriesTreeName = pendingEntriesTreeName(); |
| |
| RootPage pendingTreeRoot = metas.pendingTreeRoot; |
| |
| PendingEntriesTree pendingTree0 = new PendingEntriesTree( |
| grp, |
| pendingEntriesTreeName, |
| grp.dataRegion().pageMemory(), |
| pendingTreeRoot.pageId().pageId(), |
| freeList, |
| pendingTreeRoot.isAllocated(), |
| ctx.diagnostic().pageLockTracker(), |
| PageIdAllocator.FLAG_AUX |
| ) { |
| /** {@inheritDoc} */ |
| @Override protected long allocatePageNoReuse() throws IgniteCheckedException { |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| return pageMem.allocatePage(grpId, partId, PageIdAllocator.FLAG_AUX); |
| } |
| }; |
| |
| PageMemoryEx pageMem = (PageMemoryEx) grp.dataRegion().pageMemory(); |
| |
| int grpId = grp.groupId(); |
| |
| delegate0 = new CacheDataStoreImpl(partId, |
| rowStore, |
| dataTree, |
| () -> pendingTree0, |
| grp, |
| busyLock, |
| log, |
| () -> rowCacheCleaner |
| ) { |
| /** {@inheritDoc} */ |
| @Override public PendingEntriesTree pendingTree() { |
| return pendingTree0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void preload() throws IgniteCheckedException { |
| IgnitePageStoreManager pageStoreMgr = ctx.pageStore(); |
| |
| if (pageStoreMgr == null) |
| return; |
| |
| int pages = pageStoreMgr.pages(grpId, partId); |
| |
| long pageId = pageMem.partitionMetaPageId(grpId, partId); |
| |
| // For each page sequentially pin/unpin. |
| for (int pageNo = 0; pageNo < pages; pageId++, pageNo++) { |
| long pagePointer = -1; |
| |
| try { |
| pagePointer = pageMem.acquirePage(grpId, pageId); |
| } |
| finally { |
| if (pagePointer != -1) |
| pageMem.releasePage(grpId, pageId, pagePointer); |
| } |
| } |
| } |
| }; |
| |
| pendingTree = pendingTree0; |
| |
| if (!pendingTree0.isEmpty()) |
| grp.caches().forEach(cctx -> cctx.ttl().hasPendingEntries(true)); |
| |
| long partMetaId = pageMem.partitionMetaPageId(grpId, partId); |
| long partMetaPage = pageMem.acquirePage(grpId, partMetaId); |
| |
| try { |
| long pageAddr = pageMem.readLock(grpId, partMetaId, partMetaPage); |
| |
| try { |
| if (PageIO.getType(pageAddr) != 0) { |
| PagePartitionMetaIOV3 io = (PagePartitionMetaIOV3)PagePartitionMetaIO.VERSIONS.latest(); |
| |
| Map<Integer, Long> cacheSizes = null; |
| |
| if (grp.sharedGroup()) |
| cacheSizes = readSharedGroupCacheSizes(pageMem, grpId, io.getCountersPageId(pageAddr)); |
| |
| long link = io.getGapsLink(pageAddr); |
| |
| byte[] data = link == 0 ? null : partStorage.readRow(link); |
| |
| delegate0.restoreState(io.getSize(pageAddr), io.getUpdateCounter(pageAddr), cacheSizes, data); |
| |
| int encrPageCnt = io.getEncryptedPageCount(pageAddr); |
| |
| if (encrPageCnt > 0) { |
| ctx.kernalContext().encryption().setEncryptionState( |
| grp, partId, io.getEncryptedPageIndex(pageAddr), encrPageCnt); |
| } |
| |
| grp.offheap().globalRemoveId().setIfGreater(io.getGlobalRemoveId(pageAddr)); |
| } |
| } |
| finally { |
| pageMem.readUnlock(grpId, partMetaId, partMetaPage); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, partMetaId, partMetaPage); |
| } |
| |
| delegate = delegate0; |
| } |
| catch (Throwable ex) { |
| U.error(log, "Unhandled exception during page store initialization. All further operations will " + |
| "be failed and local node will be stopped.", ex); |
| |
| ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, ex)); |
| |
| throw ex; |
| } |
| finally { |
| latch.countDown(); |
| |
| dbMgr.checkpointReadUnlock(); |
| } |
| } |
| else { |
| dbMgr.checkpointReadUnlock(); |
| |
| U.await(latch); |
| |
| delegate0 = delegate; |
| |
| if (delegate0 == null) |
| throw new IgniteCheckedException("Cache store initialization failed."); |
| } |
| |
| return delegate0; |
| } |
| |
| /** |
| * @return Partition metas. |
| */ |
| private Metas getOrAllocatePartitionMetas() throws IgniteCheckedException { |
| PageMemoryEx pageMem = (PageMemoryEx)grp.dataRegion().pageMemory(); |
| IgniteWriteAheadLogManager wal = grp.shared().wal(); |
| |
| int grpId = grp.groupId(); |
| long partMetaId = pageMem.partitionMetaPageId(grpId, partId); |
| |
| AtomicBoolean metaPageAllocated = new AtomicBoolean(false); |
| |
| long partMetaPage = pageMem.acquirePage(grpId, partMetaId, metaPageAllocated); |
| |
| if (metaPageAllocated.get()) |
| grp.metrics().incrementInitializedLocalPartitions(); |
| |
| try { |
| boolean allocated = false; |
| boolean pageUpgraded = false; |
| boolean pendingTreeAllocated = false; |
| boolean partMetastoreReuseListAllocated = false; |
| boolean updateLogTreeRootAllocated = false; |
| |
| long pageAddr = pageMem.writeLock(grpId, partMetaId, partMetaPage); |
| try { |
| long treeRoot, reuseListRoot, pendingTreeRoot, partMetaStoreReuseListRoot; |
| |
| PagePartitionMetaIOV3 io = (PagePartitionMetaIOV3)PagePartitionMetaIO.VERSIONS.latest(); |
| |
| // Initialize new page. |
| if (PageIO.getType(pageAddr) != PageIO.T_PART_META) { |
| PageMetrics metrics = pageMem.metrics().cacheGrpPageMetrics(grpId); |
| |
| io.initNewPage(pageAddr, partMetaId, pageMem.realPageSize(grpId), metrics); |
| |
| treeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_AUX); |
| reuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_AUX); |
| pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_AUX); |
| partMetaStoreReuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_AUX); |
| |
| assert PageIdUtils.flag(treeRoot) == PageMemory.FLAG_AUX; |
| assert PageIdUtils.flag(reuseListRoot) == PageMemory.FLAG_AUX; |
| assert PageIdUtils.flag(pendingTreeRoot) == PageMemory.FLAG_AUX; |
| assert PageIdUtils.flag(partMetaStoreReuseListRoot) == PageMemory.FLAG_AUX; |
| |
| io.setTreeRoot(pageAddr, treeRoot); |
| io.setReuseListRoot(pageAddr, reuseListRoot); |
| io.setPendingTreeRoot(pageAddr, pendingTreeRoot); |
| io.setPartitionMetaStoreReuseListRoot(pageAddr, partMetaStoreReuseListRoot); |
| |
| allocated = true; |
| } |
| else { |
| if (io != PageIO.getPageIO(pageAddr)) { |
| if (log.isDebugEnabled()) { |
| log.debug("Upgrade partition meta page version: [part=" + partId + |
| ", grpId=" + grpId + |
| ", oldVer=" + PagePartitionMetaIO.getVersion(pageAddr) + |
| ", newVer=" + io.getVersion() + ']'); |
| } |
| |
| io.upgradePage(pageAddr); |
| |
| pageUpgraded = true; |
| } |
| |
| treeRoot = io.getTreeRoot(pageAddr); |
| reuseListRoot = io.getReuseListRoot(pageAddr); |
| |
| if ((pendingTreeRoot = io.getPendingTreeRoot(pageAddr)) == 0) { |
| pendingTreeRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_AUX); |
| |
| io.setPendingTreeRoot(pageAddr, pendingTreeRoot); |
| |
| pendingTreeAllocated = true; |
| } |
| |
| if ((partMetaStoreReuseListRoot = io.getPartitionMetaStoreReuseListRoot(pageAddr)) == 0) { |
| partMetaStoreReuseListRoot = pageMem.allocatePage(grpId, partId, PageMemory.FLAG_AUX); |
| |
| io.setPartitionMetaStoreReuseListRoot(pageAddr, partMetaStoreReuseListRoot); |
| |
| partMetastoreReuseListAllocated = true; |
| } |
| |
| if (PageIdUtils.flag(treeRoot) != PageMemory.FLAG_AUX |
| && PageIdUtils.flag(treeRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong tree root page id flag: treeRoot=" |
| + U.hexLong(treeRoot) + ", part=" + partId + ", grpId=" + grpId); |
| |
| if (PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_AUX |
| && PageIdUtils.flag(reuseListRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong reuse list root page id flag: reuseListRoot=" |
| + U.hexLong(reuseListRoot) + ", part=" + partId + ", grpId=" + grpId); |
| |
| if (PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_AUX |
| && PageIdUtils.flag(pendingTreeRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong pending tree root page id flag: reuseListRoot=" |
| + U.hexLong(pendingTreeRoot) + ", part=" + partId + ", grpId=" + grpId); |
| |
| if (PageIdUtils.flag(partMetaStoreReuseListRoot) != PageMemory.FLAG_AUX |
| && PageIdUtils.flag(partMetaStoreReuseListRoot) != PageMemory.FLAG_DATA) |
| throw new StorageException("Wrong partition meta store list root page id flag: partMetaStoreReuseListRoot=" |
| + U.hexLong(partMetaStoreReuseListRoot) + ", part=" + partId + ", grpId=" + grpId); |
| } |
| |
| if ((allocated || pageUpgraded || pendingTreeAllocated || partMetastoreReuseListAllocated || updateLogTreeRootAllocated) |
| && isWalDeltaRecordNeeded(pageMem, grpId, partMetaId, partMetaPage, wal, null)) { |
| wal.log(new PageSnapshot(new FullPageId(partMetaId, grpId), pageAddr, |
| pageMem.pageSize(), pageMem.realPageSize(grpId))); |
| } |
| |
| return new Metas( |
| new RootPage(new FullPageId(treeRoot, grpId), allocated), |
| new RootPage(new FullPageId(reuseListRoot, grpId), allocated), |
| new RootPage(new FullPageId(pendingTreeRoot, grpId), allocated || pendingTreeAllocated), |
| new RootPage(new FullPageId(partMetaStoreReuseListRoot, grpId), allocated || partMetastoreReuseListAllocated)); |
| } |
| finally { |
| pageMem.writeUnlock(grpId, partMetaId, partMetaPage, null, |
| allocated || pageUpgraded || pendingTreeAllocated || partMetastoreReuseListAllocated || updateLogTreeRootAllocated); |
| } |
| } |
| finally { |
| pageMem.releasePage(grpId, partMetaId, partMetaPage); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataTree tree() { |
| return dataTree; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean init() { |
| try { |
| return init0(true) != null; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partId() { |
| return partId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RowStore rowStore() { |
| CacheDataStore delegate0 = delegate; |
| |
| return delegate0 == null ? null : delegate0.rowStore(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long fullSize() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.fullSize(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isEmpty() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null || delegate0.isEmpty(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long cacheSize(int cacheId) { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.cacheSize(cacheId); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<Integer, Long> cacheSizes() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? null : delegate0.cacheSizes(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateSize(int cacheId, long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 != null) |
| delegate0.updateSize(cacheId, delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long updateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.updateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long reservedCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.reservedCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PartitionUpdateCounter partUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? null : delegate0.partUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getAndIncrementUpdateCounter(long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| return delegate0 == null ? 0 : delegate0.getAndIncrementUpdateCounter(delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long reserve(long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 == null) |
| throw new IllegalStateException("Should be never called."); |
| |
| return delegate0.reserve(delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateCounter(long val) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 != null) |
| delegate0.updateCounter(val); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean updateCounter(long start, long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| return delegate0 != null && delegate0.updateCounter(start, delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridLongList finalizeUpdateCounters() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 != null ? delegate0.finalizeUpdateCounters() : null; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long nextUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| if (delegate0 == null) |
| throw new IllegalStateException("Should be never called."); |
| |
| return delegate0.nextUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long initialUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : delegate0.initialUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateInitialCounter(long start, long delta) { |
| try { |
| CacheDataStore delegate0 = init0(false); |
| |
| // Partition may not exists before recovery starts in case of recovering counters from RollbackRecord. |
| delegate0.updateInitialCounter(start, delta); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) { |
| this.rowCacheCleaner = rowCacheCleaner; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void update( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| @Nullable CacheDataRow oldRow |
| ) throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| delegate.update(cctx, key, val, ver, expireTime, oldRow); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean mvccInitialValue( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| @Nullable CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccVersion mvccVer, |
| MvccVersion newMvccVer) |
| throws IgniteCheckedException |
| { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccInitialValue(cctx, key, val, ver, expireTime, mvccVer, newMvccVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean mvccApplyHistoryIfAbsent( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| List<GridCacheMvccEntryInfo> hist) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccApplyHistoryIfAbsent(cctx, key, hist); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean mvccUpdateRowWithPreloadInfo( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| @Nullable CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccVersion mvccVer, |
| MvccVersion newMvccVer, |
| byte mvccTxState, |
| byte newMvccTxState) throws IgniteCheckedException { |
| |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccUpdateRowWithPreloadInfo(cctx, |
| key, |
| val, |
| ver, |
| expireTime, |
| mvccVer, |
| newMvccVer, |
| mvccTxState, |
| newMvccTxState); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MvccUpdateResult mvccUpdate( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| MvccSnapshot mvccVer, |
| CacheEntryPredicate filter, |
| EntryProcessor entryProc, |
| Object[] invokeArgs, |
| boolean primary, |
| boolean needHistory, |
| boolean noCreate, |
| boolean needOldVal, |
| boolean retVal, |
| boolean keepBinary) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccUpdate(cctx, key, val, ver, expireTime, mvccVer, filter, entryProc, invokeArgs, primary, |
| needHistory, noCreate, needOldVal, retVal, keepBinary); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MvccUpdateResult mvccRemove( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| MvccSnapshot mvccVer, |
| CacheEntryPredicate filter, |
| boolean primary, |
| boolean needHistory, |
| boolean needOldVal, |
| boolean retVal) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccRemove(cctx, key, mvccVer, filter, primary, needHistory, needOldVal, retVal); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public MvccUpdateResult mvccLock( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.mvccLock(cctx, key, mvccSnapshot); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void mvccRemoveAll(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.mvccRemoveAll(cctx, key); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void mvccApplyUpdate(GridCacheContext cctx, KeyCacheObject key, CacheObject val, GridCacheVersion ver, |
| long expireTime, MvccVersion mvccVer) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.mvccApplyUpdate(cctx, key, val, ver, expireTime, mvccVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow createRow( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| @Nullable CacheDataRow oldRow) throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.createRow(cctx, key, val, ver, expireTime, oldRow); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void insertRows(Collection<DataRowCacheAware> rows, |
| IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.insertRows(rows, initPred); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int cleanup(GridCacheContext cctx, |
| @Nullable List<MvccLinkAwareSearchRow> cleanupRows) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| return delegate.cleanup(cctx, cleanupRows); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateTxState(GridCacheContext cctx, CacheSearchRow row) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(false); |
| |
| delegate.updateTxState(cctx, row); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) |
| throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| delegate.invoke(cctx, key, c); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) |
| throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate = init0(false); |
| |
| delegate.remove(cctx, key, partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.find(cctx, key); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow mvccFind(GridCacheContext cctx, KeyCacheObject key, MvccSnapshot snapshot) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.mvccFind(cctx, key, snapshot); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<IgniteBiTuple<Object, MvccVersion>> mvccFindAllVersions(GridCacheContext cctx, KeyCacheObject key) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.mvccFindAllVersions(cctx, key); |
| |
| return Collections.emptyList(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<CacheDataRow> mvccAllVersionsCursor(GridCacheContext cctx, |
| KeyCacheObject key, Object x) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.mvccAllVersionsCursor(cctx, key, x); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(x); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(mvccSnapshot); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor( |
| int cacheId, |
| KeyCacheObject lower, |
| KeyCacheObject upper) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, lower, upper); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| KeyCacheObject lower, |
| KeyCacheObject upper, |
| Object x) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, lower, upper, x); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| KeyCacheObject lower, |
| KeyCacheObject upper, |
| Object x, |
| MvccSnapshot mvccSnapshot) |
| throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, lower, upper, x, mvccSnapshot); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void destroy() throws IgniteCheckedException { |
| // No need to destroy delegate. |
| |
| Stream.of(freeList, partStorage, dataTree, pendingTree) |
| .filter(Objects::nonNull) |
| .forEach(DataStructure::close); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void markDestroyed() throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| delegate.markDestroyed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean destroyed() { |
| try { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.destroyed(); |
| |
| return false; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| CacheDataStore delegate = init0(true); |
| |
| if (delegate != null) |
| return delegate.cursor(cacheId, mvccSnapshot); |
| |
| return EMPTY_CURSOR; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear(int cacheId) throws IgniteCheckedException { |
| assert grp.shared().database().checkpointLockIsHeldByThread(); |
| |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 == null) |
| return; |
| |
| // Clear persistent pendingTree |
| if (pendingTree != null) { |
| PendingRow row = new PendingRow(cacheId); |
| |
| GridCursor<PendingRow> cursor = pendingTree.find(row, row, PendingEntriesTree.WITHOUT_KEY); |
| |
| while (cursor.next()) { |
| PendingRow row0 = cursor.get(); |
| |
| assert row0.link != 0 : row; |
| |
| boolean res = pendingTree.removex(row0); |
| |
| assert res; |
| } |
| } |
| |
| delegate0.clear(cacheId); |
| } |
| |
| /** |
| * Gets the number of entries pending expire. |
| * |
| * @return Number of pending entries. |
| * @throws IgniteCheckedException If failed to get number of pending entries. |
| */ |
| public long expiredSize() throws IgniteCheckedException { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? 0 : pendingTree.size(); |
| } |
| |
| /** |
| * Try to remove expired entries from data store. |
| * |
| * @param cctx Cache context. |
| * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details. |
| * @param amount Limit of processed entries by single call, {@code -1} for no limit. |
| * @return cleared entries count. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public int purgeExpired( |
| GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| long throttlingTimeout, |
| int amount |
| ) throws IgniteCheckedException { |
| CacheDataStore delegate0 = init0(true); |
| |
| long nowNanos = System.nanoTime(); |
| |
| if (delegate0 == null || (cctx.cacheId() == lastThrottledCacheId && nextStoreCleanTimeNanos - nowNanos > 0)) |
| return 0; |
| |
| assert pendingTree != null : "Partition data store was not initialized."; |
| |
| int cleared = purgeExpiredInternal(cctx, c, amount); |
| |
| // Throttle if there is nothing to clean anymore. |
| if (cleared < amount) { |
| lastThrottledCacheId = cctx.cacheId(); |
| |
| nextStoreCleanTimeNanos = nowNanos + U.millisToNanos(throttlingTimeout); |
| } |
| |
| return cleared; |
| } |
| |
| /** |
| * Removes expired entries from data store. |
| * |
| * @param cctx Cache context. |
| * @param c Expiry closure that should be applied to expired entry. See {@link GridCacheTtlManager} for details. |
| * @param amount Limit of processed entries by single call, {@code -1} for no limit. |
| * @return cleared entries count. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private int purgeExpiredInternal( |
| GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount |
| ) throws IgniteCheckedException { |
| GridDhtLocalPartition part = null; |
| |
| if (!grp.isLocal()) { |
| part = cctx.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, false); |
| |
| // Skip non-owned partitions. |
| if (part == null || part.state() != OWNING) |
| return 0; |
| } |
| |
| cctx.shared().database().checkpointReadLock(); |
| |
| try { |
| if (part != null && !part.reserve()) |
| return 0; |
| |
| try { |
| if (part != null && part.state() != OWNING) |
| return 0; |
| |
| long now = U.currentTimeMillis(); |
| |
| GridCursor<PendingRow> cur; |
| |
| if (grp.sharedGroup()) |
| cur = pendingTree.find(new PendingRow(cctx.cacheId()), new PendingRow(cctx.cacheId(), now, 0)); |
| else |
| cur = pendingTree.find(null, new PendingRow(CU.UNDEFINED_CACHE_ID, now, 0)); |
| |
| if (!cur.next()) |
| return 0; |
| |
| GridCacheVersion obsoleteVer = null; |
| |
| int cleared = 0; |
| |
| do { |
| PendingRow row = cur.get(); |
| |
| if (amount != -1 && cleared > amount) |
| return cleared; |
| |
| assert row.key != null && row.link != 0 && row.expireTime != 0 : row; |
| |
| row.key.partition(partId); |
| |
| if (pendingTree.removex(row)) { |
| if (obsoleteVer == null) |
| obsoleteVer = cctx.cache().nextVersion(); |
| |
| GridCacheEntryEx e1 = cctx.cache().entryEx(row.key); |
| |
| if (e1 != null) |
| c.apply(e1, obsoleteVer); |
| } |
| |
| cleared++; |
| } |
| while (cur.next()); |
| |
| return cleared; |
| } |
| finally { |
| if (part != null) |
| part.release(); |
| } |
| } |
| finally { |
| cctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PendingEntriesTree pendingTree() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| return delegate0 == null ? null : pendingTree; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void preload() throws IgniteCheckedException { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 != null) |
| delegate0.preload(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 == null) |
| return; |
| |
| delegate0.resetUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetInitialUpdateCounter() { |
| try { |
| CacheDataStore delegate0 = init0(true); |
| |
| if (delegate0 == null) |
| return; |
| |
| delegate0.resetInitialUpdateCounter(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** */ |
| @Override public PartitionMetaStorage<SimpleDataRow> partStorage() { |
| return partStorage; |
| } |
| } |
| |
| /** |
| * |
| */ |
| public static final GridCursor<CacheDataRow> EMPTY_CURSOR = new GridCursor<CacheDataRow>() { |
| /** {@inheritDoc} */ |
| @Override public boolean next() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow get() { |
| return null; |
| } |
| }; |
| } |