| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.function.Supplier; |
| import javax.cache.Cache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cluster.ClusterState; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.GridKernalContext; |
| import org.apache.ignite.internal.GridKernalState; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.pagemem.FullPageId; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteDhtDemandedPartitionsMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteHistoricalIterator; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.IgniteRebalanceIteratorImpl; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRowAdapter; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheSearchRow; |
| import org.apache.ignite.internal.processors.cache.persistence.DataRowCacheAware; |
| import org.apache.ignite.internal.processors.cache.persistence.RootPage; |
| import org.apache.ignite.internal.processors.cache.persistence.RowStore; |
| import org.apache.ignite.internal.processors.cache.persistence.freelist.SimpleDataRow; |
| import org.apache.ignite.internal.processors.cache.persistence.partstorage.PartitionMetaStorage; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.BPlusTree; |
| import org.apache.ignite.internal.processors.cache.persistence.tree.reuse.ReuseList; |
| import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataRowStore; |
| import org.apache.ignite.internal.processors.cache.tree.CacheDataTree; |
| import org.apache.ignite.internal.processors.cache.tree.DataRow; |
| import org.apache.ignite.internal.processors.cache.tree.PendingEntriesTree; |
| import org.apache.ignite.internal.processors.cache.tree.PendingRow; |
| import org.apache.ignite.internal.processors.cache.tree.SearchRow; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner; |
| import org.apache.ignite.internal.util.GridAtomicLong; |
| import org.apache.ignite.internal.util.GridCloseableIteratorAdapter; |
| import org.apache.ignite.internal.util.GridEmptyCloseableIterator; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.GridSpinBusyLock; |
| import org.apache.ignite.internal.util.GridStripedLock; |
| import org.apache.ignite.internal.util.collection.ImmutableIntSet; |
| import org.apache.ignite.internal.util.collection.IntMap; |
| import org.apache.ignite.internal.util.collection.IntRWHashMap; |
| import org.apache.ignite.internal.util.collection.IntSet; |
| import org.apache.ignite.internal.util.lang.GridCloseableIterator; |
| import org.apache.ignite.internal.util.lang.GridCursor; |
| import org.apache.ignite.internal.util.lang.GridIterator; |
| import org.apache.ignite.internal.util.lang.IgniteInClosure2X; |
| import org.apache.ignite.internal.util.lang.IgnitePredicateX; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.pagemem.PageIdAllocator.FLAG_IDX; |
| import static org.apache.ignite.internal.pagemem.PageIdAllocator.INDEX_PARTITION; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.TTL_ETERNAL; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| |
| /** |
| * |
| */ |
| public class IgniteCacheOffheapManagerImpl implements IgniteCacheOffheapManager { |
| /** The maximum number of entries that can be preloaded under checkpoint read lock. */ |
| public static final int PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK = 100; |
| |
| /** Batch size for cache removals during destroy. */ |
| private static final int BATCH_SIZE = 1000; |
| |
| /** */ |
| protected GridCacheSharedContext ctx; |
| |
| /** */ |
| protected CacheGroupContext grp; |
| |
| /** */ |
| protected IgniteLogger log; |
| |
| /** */ |
| private PendingEntriesTree pendingEntries; |
| |
| /** */ |
| private final GridAtomicLong globalRmvId = new GridAtomicLong(U.currentTimeMillis() * 1000_000); |
| |
| /** */ |
| protected final GridSpinBusyLock busyLock = new GridSpinBusyLock(); |
| |
| /** */ |
| protected GridStripedLock partStoreLock = new GridStripedLock(Runtime.getRuntime().availableProcessors()); |
| |
| /** */ |
| private final AtomicBoolean stopping = new AtomicBoolean(); |
| |
| /** {@inheritDoc} */ |
| @Override public GridAtomicLong globalRemoveId() { |
| return globalRmvId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start(GridCacheSharedContext ctx, CacheGroupContext grp) throws IgniteCheckedException { |
| this.ctx = ctx; |
| this.grp = grp; |
| this.log = ctx.logger(getClass()); |
| |
| if (grp.affinityNode()) { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| initDataStructures(); |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onCacheStarted(GridCacheContext cctx) throws IgniteCheckedException { |
| initPendingTree(cctx); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void initPendingTree(GridCacheContext<?, ?> cctx) throws IgniteCheckedException { |
| assert !cctx.group().persistenceEnabled(); |
| |
| if (cctx.affinityNode() && cctx.ttl().eagerTtlEnabled() && pendingEntries == null) { |
| String pendingEntriesTreeName = cctx.name() + "##PendingEntries"; |
| |
| long rootPage = allocateForTree(); |
| |
| pendingEntries = new PendingEntriesTree( |
| grp, |
| pendingEntriesTreeName, |
| grp.dataRegion().pageMemory(), |
| rootPage, |
| grp.reuseList(), |
| true, |
| ctx.diagnostic().pageLockTracker(), |
| FLAG_IDX |
| ); |
| } |
| } |
| |
| /** |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void initDataStructures() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stopCache(int cacheId, boolean destroy) { |
| if (destroy && grp.affinityNode()) |
| removeCacheData(cacheId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| GridKernalContext kctx = ctx.kernalContext(); |
| |
| // For in-memory mode, if we stop caches on grid stopping or cluster deactivation, skip data deletion from |
| // the trees and just close trees to release resources. |
| if (kctx.gateway().getState() == GridKernalState.STOPPING |
| || kctx.state().clusterState().state() == ClusterState.INACTIVE) { |
| for (CacheDataStore store : cacheDataStores()) |
| store.tree().close(); |
| |
| if (pendingEntries != null) |
| pendingEntries.close(); |
| |
| return; |
| } |
| |
| // In other cases (cache stop, for example) perform destroy with data deletion (through tree iteration). |
| try { |
| for (CacheDataStore store : cacheDataStores()) |
| destroyCacheDataStore(store); |
| |
| if (pendingEntries != null) |
| pendingEntries.destroy(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e.getMessage(), e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long restoreStateOfPartition(int p, @Nullable Integer recoveryState) throws IgniteCheckedException { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void restorePartitionStates() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void confirmPartitionStatesRestored() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop() { |
| if (stopping.compareAndSet(false, true)) // Avoid concurrent blocking by prepareToStop and onKernalStop. |
| busyLock.block(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void prepareToStop() { |
| if (stopping.compareAndSet(false, true)) // Avoid concurrent blocking by prepareToStop and onKernalStop. |
| busyLock.block(); |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| */ |
| private void removeCacheData(int cacheId) { |
| assert grp.affinityNode(); |
| |
| try { |
| if (grp.sharedGroup()) { |
| assert cacheId != CU.UNDEFINED_CACHE_ID; |
| |
| for (CacheDataStore store : cacheDataStores()) |
| store.clear(cacheId); |
| |
| // Clear non-persistent pending tree if needed. |
| if (pendingEntries != null) { |
| PendingRow row = new PendingRow(cacheId); |
| |
| GridCursor<PendingRow> cursor = pendingEntries.find(row, row, PendingEntriesTree.WITHOUT_KEY); |
| |
| while (cursor.next()) { |
| boolean res = pendingEntries.removex(cursor.get()); |
| |
| assert res; |
| } |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e.getMessage(), e); |
| } |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param key Key. |
| * @return Data store. |
| */ |
| @Nullable private CacheDataStore dataStore(GridCacheContext<?, ?> cctx, KeyCacheObject key) { |
| return dataStore(cctx.affinity().partition(key), false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataStore dataStore(@Nullable GridDhtLocalPartition part) { |
| assert part != null; |
| |
| return part.dataStore(); |
| } |
| |
| /** |
| * @param partId Partition id. |
| * @param includeRenting {@code true} if includeRenting partitions must also be shown. |
| * @return Related partition cache data store or {@code null} if partition haven't been initialized. |
| */ |
| @Nullable private CacheDataStore dataStore(int partId, boolean includeRenting) { |
| GridDhtLocalPartition part = grp.topology().localPartition(partId, AffinityTopologyVersion.NONE, false, includeRenting); |
| |
| return part == null ? null : part.dataStore(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long cacheEntriesCount(int cacheId) { |
| long size = 0; |
| |
| for (CacheDataStore store : cacheDataStores()) |
| size += store.cacheSize(cacheId); |
| |
| return size; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void preloadPartition(int partId) throws IgniteCheckedException { |
| throw new IgniteCheckedException("Operation only applicable to caches with enabled persistence"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long cacheEntriesCount( |
| int cacheId, |
| boolean primary, |
| boolean backup, |
| AffinityTopologyVersion topVer |
| ) { |
| long cnt = 0; |
| |
| Iterator<CacheDataStore> it = cacheData(primary, backup, topVer); |
| |
| while (it.hasNext()) |
| cnt += it.next().cacheSize(cacheId); |
| |
| return cnt; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long cacheEntriesCount(int cacheId, int part) { |
| CacheDataStore store = dataStore(part, true); |
| |
| return store == null ? 0 : store.cacheSize(cacheId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterable<CacheDataStore> cacheDataStores() { |
| return cacheDataStores(F.alwaysTrue()); |
| } |
| |
| /** |
| * @param filter Filtering predicate. |
| * @return Iterable over all existing cache data stores except which one is marked as <tt>destroyed</tt>. |
| */ |
| private Iterable<CacheDataStore> cacheDataStores( |
| IgnitePredicate<GridDhtLocalPartition> filter |
| ) { |
| return F.iterator(grp.topology().currentLocalPartitions(), GridDhtLocalPartition::dataStore, true, |
| filter, p -> !p.dataStore().destroyed()); |
| } |
| |
| /** |
| * @param primary Primary data flag. |
| * @param backup Backup data flag. |
| * @param topVer Topology version. |
| * @return Data stores iterator. |
| */ |
| private Iterator<CacheDataStore> cacheData(boolean primary, boolean backup, AffinityTopologyVersion topVer) { |
| assert primary || backup; |
| |
| IgnitePredicate<GridDhtLocalPartition> filter; |
| |
| if (primary && backup) |
| filter = F.alwaysTrue(); |
| else { |
| IntSet parts = ImmutableIntSet.wrap(primary ? grp.affinity().primaryPartitions(ctx.localNodeId(), topVer) : |
| grp.affinity().backupPartitions(ctx.localNodeId(), topVer)); |
| |
| filter = part -> parts.contains(part.id()); |
| } |
| |
| return cacheDataStores(filter).iterator(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void invoke( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| GridDhtLocalPartition part, |
| OffheapInvokeClosure c) |
| throws IgniteCheckedException { |
| dataStore(part).invoke(cctx, key, c); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void update( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| GridDhtLocalPartition part, |
| @Nullable CacheDataRow oldRow |
| ) throws IgniteCheckedException { |
| assert expireTime >= 0; |
| |
| dataStore(part).update(cctx, key, val, ver, expireTime, oldRow); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| int partId, |
| GridDhtLocalPartition part |
| ) throws IgniteCheckedException { |
| dataStore(part).remove(cctx, key, partId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override @Nullable public CacheDataRow read(GridCacheMapEntry entry) |
| throws IgniteCheckedException { |
| KeyCacheObject key = entry.key(); |
| |
| assert entry.localPartition() != null : entry; |
| |
| return dataStore(entry.localPartition()).find(entry.context(), key); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public CacheDataRow read(GridCacheContext cctx, KeyCacheObject key) |
| throws IgniteCheckedException { |
| CacheDataStore dataStore = dataStore(cctx, key); |
| |
| CacheDataRow row = dataStore != null ? dataStore.find(cctx, key) : null; |
| |
| assert row == null || row.value() != null : row; |
| |
| return row; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean containsKey(GridCacheMapEntry entry) { |
| try { |
| return read(entry) != null; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to read value", e); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * Clears offheap entries. |
| * |
| * @param readers {@code True} to clear readers. |
| */ |
| @Override public void clearCache(GridCacheContext cctx, boolean readers) { |
| GridCacheVersion obsoleteVer = null; |
| |
| try (GridCloseableIterator<CacheDataRow> it = evictionSafeIterator(cctx.cacheId(), cacheDataStores().iterator())) { |
| while (it.hasNext()) { |
| cctx.shared().database().checkpointReadLock(); |
| |
| try { |
| KeyCacheObject key = it.next().key(); |
| |
| try { |
| if (obsoleteVer == null) |
| obsoleteVer = cctx.cache().nextVersion(); |
| |
| GridCacheEntryEx entry = cctx.cache().entryEx(key); |
| |
| entry.clear(obsoleteVer, readers); |
| } |
| catch (GridDhtInvalidPartitionException ignore) { |
| // Ignore. |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to clear cache entry: " + key, e); |
| } |
| } |
| finally { |
| cctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to close iterator", e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int onUndeploy(ClassLoader ldr) { |
| // TODO: GG-11141. |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long offHeapAllocatedSize() { |
| // TODO GG-10884. |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <K, V> GridCloseableIterator<Cache.Entry<K, V>> cacheEntriesIterator( |
| GridCacheContext cctx, |
| boolean primary, |
| boolean backup, |
| AffinityTopologyVersion topVer, |
| boolean keepBinary, |
| @Nullable MvccSnapshot mvccSnapshot, |
| Boolean dataPageScanEnabled |
| ) { |
| Iterator<CacheDataRow> it = cacheIterator(cctx.cacheId(), primary, backup, |
| topVer, mvccSnapshot, dataPageScanEnabled); |
| |
| return new GridCloseableIteratorAdapter<Cache.Entry<K, V>>() { |
| /** */ |
| private CacheEntryImplEx next; |
| |
| @Override protected Cache.Entry<K, V> onNext() { |
| CacheEntryImplEx ret = next; |
| |
| next = null; |
| |
| return ret; |
| } |
| |
| @Override protected boolean onHasNext() { |
| if (next != null) |
| return true; |
| |
| CacheDataRow nextRow = null; |
| |
| if (it.hasNext()) |
| nextRow = it.next(); |
| |
| if (nextRow != null) { |
| KeyCacheObject key = nextRow.key(); |
| CacheObject val = nextRow.value(); |
| |
| Object key0 = cctx.unwrapBinaryIfNeeded(key, keepBinary, false, null); |
| Object val0 = cctx.unwrapBinaryIfNeeded(val, keepBinary, false, null); |
| |
| next = new CacheEntryImplEx(key0, val0, nextRow.version()); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCloseableIterator<KeyCacheObject> cacheKeysIterator(int cacheId, int part) |
| throws IgniteCheckedException { |
| CacheDataStore data = dataStore(part, true); |
| |
| if (data == null) |
| return new GridEmptyCloseableIterator<>(); |
| |
| GridCursor<? extends CacheDataRow> cur = |
| data.cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY); |
| |
| return new GridCloseableIteratorAdapter<KeyCacheObject>() { |
| /** */ |
| private KeyCacheObject next; |
| |
| @Override protected KeyCacheObject onNext() { |
| KeyCacheObject res = next; |
| |
| next = null; |
| |
| return res; |
| } |
| |
| @Override protected boolean onHasNext() throws IgniteCheckedException { |
| if (next != null) |
| return true; |
| |
| if (cur.next()) { |
| CacheDataRow row = cur.get(); |
| |
| next = row.key(); |
| } |
| |
| return next != null; |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridIterator<CacheDataRow> cacheIterator( |
| int cacheId, |
| boolean primary, |
| boolean backups, |
| AffinityTopologyVersion topVer, |
| @Nullable MvccSnapshot mvccSnapshot, |
| Boolean dataPageScanEnabled |
| ) { |
| return iterator(cacheId, cacheData(primary, backups, topVer), mvccSnapshot, dataPageScanEnabled); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridIterator<CacheDataRow> cachePartitionIterator(int cacheId, int part, |
| @Nullable MvccSnapshot mvccSnapshot, Boolean dataPageScanEnabled) { |
| CacheDataStore data = dataStore(part, true); |
| |
| if (data == null) |
| return new GridEmptyCloseableIterator<>(); |
| |
| return iterator(cacheId, singletonIterator(data), mvccSnapshot, dataPageScanEnabled); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridIterator<CacheDataRow> partitionIterator(int part) { |
| CacheDataStore data = dataStore(part, true); |
| |
| if (data == null) |
| return new GridEmptyCloseableIterator<>(); |
| |
| return iterator(CU.UNDEFINED_CACHE_ID, singletonIterator(data), null, null); |
| } |
| |
| /** |
| * |
| * @param cacheId Cache ID. |
| * @param dataIt Data store iterator. |
| * @param mvccSnapshot Mvcc snapshot. |
| * @param dataPageScanEnabled Flag to enable data page scan. |
| * @return Rows iterator |
| */ |
| private GridCloseableIterator<CacheDataRow> iterator(int cacheId, |
| Iterator<CacheDataStore> dataIt, |
| MvccSnapshot mvccSnapshot, |
| Boolean dataPageScanEnabled |
| ) { |
| return new GridCloseableIteratorAdapter<CacheDataRow>() { |
| /** */ |
| private GridCursor<? extends CacheDataRow> cur; |
| |
| /** */ |
| private int curPart; |
| |
| /** */ |
| private CacheDataRow next; |
| |
| @Override protected CacheDataRow onNext() { |
| CacheDataRow res = next; |
| |
| next = null; |
| |
| return res; |
| } |
| |
| @Override protected boolean onHasNext() throws IgniteCheckedException { |
| if (next != null) |
| return true; |
| |
| while (true) { |
| try { |
| if (cur == null) { |
| if (dataIt.hasNext()) { |
| CacheDataStore ds = dataIt.next(); |
| |
| curPart = ds.partId(); |
| |
| // Data page scan is disabled by default for scan queries. |
| // TODO https://issues.apache.org/jira/browse/IGNITE-11998 |
| CacheDataTree.setDataPageScanEnabled(false); |
| |
| try { |
| if (mvccSnapshot == null) |
| cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); |
| else { |
| cur = cacheId == CU.UNDEFINED_CACHE_ID ? |
| ds.cursor(mvccSnapshot) : ds.cursor(cacheId, mvccSnapshot); |
| } |
| } |
| finally { |
| CacheDataTree.setDataPageScanEnabled(false); |
| } |
| } |
| else |
| break; |
| } |
| |
| if (cur.next()) { |
| next = cur.get(); |
| next.key().partition(curPart); |
| |
| break; |
| } |
| else |
| cur = null; |
| } |
| catch (IgniteCheckedException ex) { |
| throw new IgniteCheckedException("Failed to get next data row due to underlying cursor " + |
| "invalidation", ex); |
| } |
| } |
| |
| return next != null; |
| } |
| }; |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| * @param dataIt Data store iterator. |
| * @return Rows iterator |
| */ |
| private GridCloseableIterator<CacheDataRow> evictionSafeIterator(int cacheId, Iterator<CacheDataStore> dataIt) { |
| return new GridCloseableIteratorAdapter<CacheDataRow>() { |
| /** */ |
| private GridCursor<? extends CacheDataRow> cur; |
| |
| /** */ |
| private GridDhtLocalPartition curPart; |
| |
| /** */ |
| private CacheDataRow next; |
| |
| @Override protected CacheDataRow onNext() { |
| CacheDataRow res = next; |
| |
| next = null; |
| |
| return res; |
| } |
| |
| @Override protected boolean onHasNext() throws IgniteCheckedException { |
| if (next != null) |
| return true; |
| |
| while (true) { |
| if (cur == null) { |
| if (dataIt.hasNext()) { |
| CacheDataStore ds = dataIt.next(); |
| |
| if (!reservePartition(ds.partId())) |
| continue; |
| |
| cur = cacheId == CU.UNDEFINED_CACHE_ID ? ds.cursor() : ds.cursor(cacheId); |
| } |
| else |
| break; |
| } |
| |
| if (cur.next()) { |
| next = cur.get(); |
| next.key().partition(curPart.id()); |
| |
| break; |
| } |
| else { |
| cur = null; |
| |
| releaseCurrentPartition(); |
| } |
| } |
| |
| return next != null; |
| } |
| |
| /** */ |
| private void releaseCurrentPartition() { |
| GridDhtLocalPartition p = curPart; |
| |
| assert p != null; |
| |
| curPart = null; |
| |
| p.release(); |
| } |
| |
| /** |
| * @param partId Partition number. |
| * @return {@code True} if partition was reserved. |
| */ |
| private boolean reservePartition(int partId) { |
| GridDhtLocalPartition p = grp.topology().localPartition(partId); |
| |
| if (p != null && p.reserve()) { |
| curPart = p; |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void onClose() throws IgniteCheckedException { |
| if (curPart != null) |
| releaseCurrentPartition(); |
| } |
| }; |
| } |
| |
| /** |
| * @param item Item. |
| * @return Single item iterator. |
| * @param <T> Type of item. |
| */ |
| private <T> Iterator<T> singletonIterator(T item) { |
| return new Iterator<T>() { |
| /** */ |
| private boolean hasNext = true; |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return hasNext; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public T next() { |
| if (hasNext) { |
| hasNext = false; |
| |
| return item; |
| } |
| |
| throw new NoSuchElementException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| }; |
| } |
| |
| /** |
| * @return Page ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private long allocateForTree() throws IgniteCheckedException { |
| ReuseList reuseList = grp.reuseList(); |
| |
| long pageId; |
| |
| if (reuseList == null || (pageId = reuseList.takeRecycledPage()) == 0L) |
| pageId = grp.dataRegion().pageMemory().allocatePage(grp.groupId(), INDEX_PARTITION, FLAG_IDX); |
| |
| return pageId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RootPage rootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException { |
| long pageId = allocateForTree(); |
| |
| return new RootPage(new FullPageId(pageId, grp.groupId()), true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable RootPage findRootPageForIndex(int cacheId, String idxName, int segment) throws IgniteCheckedException { |
| return null; // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable RootPage dropRootPageForIndex( |
| int cacheId, |
| String idxName, |
| int segment |
| ) throws IgniteCheckedException { |
| return null; // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public @Nullable RootPage renameRootPageForIndex( |
| int cacheId, |
| String oldIdxName, |
| String newIdxName, |
| int segment |
| ) throws IgniteCheckedException { |
| return null; // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ReuseList reuseListForIndex(String idxName) { |
| return grp.reuseList(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCloseableIterator<CacheDataRow> reservedIterator(int part, AffinityTopologyVersion topVer) { |
| GridDhtLocalPartition loc = grp.topology().localPartition(part, topVer, false); |
| |
| if (loc == null || !loc.reserve()) |
| return null; |
| |
| // It is necessary to check state after reservation to avoid race conditions. |
| if (loc.state() != OWNING) { |
| loc.release(); |
| |
| return null; |
| } |
| |
| CacheDataStore data = dataStore(loc); |
| |
| return new GridCloseableIteratorAdapter<CacheDataRow>() { |
| /** */ |
| private CacheDataRow next; |
| |
| /** */ |
| private GridCursor<? extends CacheDataRow> cur; |
| |
| @Override protected CacheDataRow onNext() { |
| CacheDataRow res = next; |
| |
| next = null; |
| |
| return res; |
| } |
| |
| @Override protected boolean onHasNext() throws IgniteCheckedException { |
| if (cur == null) |
| cur = data.cursor(CacheDataRowAdapter.RowData.FULL_WITH_HINTS); |
| |
| if (next != null) |
| return true; |
| |
| if (cur.next()) |
| next = cur.get(); |
| |
| boolean hasNext = next != null; |
| |
| if (!hasNext) |
| cur = null; |
| |
| return hasNext; |
| } |
| |
| @Override protected void onClose() throws IgniteCheckedException { |
| assert loc != null && loc.state() == OWNING && loc.reservations() > 0 |
| : "Partition should be in OWNING state and has at least 1 reservation: " + loc; |
| |
| loc.release(); |
| |
| cur = null; |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteRebalanceIterator rebalanceIterator(IgniteDhtDemandedPartitionsMap parts, |
| AffinityTopologyVersion topVer) |
| throws IgniteCheckedException { |
| |
| TreeMap<Integer, GridCloseableIterator<CacheDataRow>> iterators = new TreeMap<>(); |
| |
| Set<Integer> missing = new HashSet<>(); |
| |
| for (Integer p : parts.fullSet()) { |
| GridCloseableIterator<CacheDataRow> partIter = reservedIterator(p, topVer); |
| |
| if (partIter == null) { |
| missing.add(p); |
| |
| continue; |
| } |
| |
| iterators.put(p, partIter); |
| } |
| |
| IgniteHistoricalIterator historicalIter = historicalIterator(parts.historicalMap(), missing); |
| |
| IgniteRebalanceIterator iter = new IgniteRebalanceIteratorImpl(iterators, historicalIter); |
| |
| for (Integer p : missing) |
| iter.setPartitionMissing(p); |
| |
| return iter; |
| } |
| |
| /** |
| * @param partCntrs Partition counters map. |
| * @param missing Set of partitions need to populate if partition is missing or failed to reserve. |
| * @return Historical iterator. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable protected IgniteHistoricalIterator historicalIterator(CachePartitionPartialCountersMap partCntrs, Set<Integer> missing) |
| throws IgniteCheckedException { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void storeEntries(GridDhtLocalPartition part, Iterator<GridCacheEntryInfo> infos, |
| IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { |
| CacheDataStore dataStore = dataStore(part); |
| |
| List<DataRowCacheAware> batch = new ArrayList<>(PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK); |
| |
| while (infos.hasNext()) { |
| GridCacheEntryInfo info = infos.next(); |
| |
| assert info.ttl() == TTL_ETERNAL : info.ttl(); |
| |
| batch.add(new DataRowCacheAware(info.key(), |
| info.value(), |
| info.version(), |
| part.id(), |
| info.expireTime(), |
| info.cacheId(), |
| grp.storeCacheIdInDataPage())); |
| |
| if (batch.size() == PRELOAD_SIZE_UNDER_CHECKPOINT_LOCK || !infos.hasNext()) { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| dataStore.insertRows(batch, initPred); |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| batch.clear(); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final CacheDataStore createCacheDataStore(int p) throws IgniteCheckedException { |
| partStoreLock.lock(p); |
| |
| try { |
| return createCacheDataStore0(p); |
| } |
| finally { |
| partStoreLock.unlock(p); |
| } |
| } |
| |
| /** |
| * @param p Partition. |
| * @return Cache data store. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected CacheDataStore createCacheDataStore0(int p) throws IgniteCheckedException { |
| long rootPage = allocateForTree(); |
| |
| CacheDataRowStore rowStore = new CacheDataRowStore(grp, grp.freeList(), p); |
| |
| String dataTreeName = grp.cacheOrGroupName() + "-" + treeName(p); |
| |
| CacheDataTree dataTree = new CacheDataTree( |
| grp, |
| dataTreeName, |
| grp.reuseList(), |
| rowStore, |
| rootPage, |
| true, |
| ctx.diagnostic().pageLockTracker(), |
| FLAG_IDX |
| ); |
| |
| return new CacheDataStoreImpl(p, rowStore, dataTree, () -> pendingEntries, grp, busyLock, log, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final void destroyCacheDataStore(CacheDataStore store) throws IgniteCheckedException { |
| int p = store.partId(); |
| |
| partStoreLock.lock(p); |
| |
| try { |
| if (store.destroyed()) |
| return; |
| |
| destroyCacheDataStore0(store); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| finally { |
| partStoreLock.unlock(p); |
| } |
| } |
| |
| /** |
| * @param store Cache data store. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void destroyCacheDataStore0(CacheDataStore store) throws IgniteCheckedException { |
| store.destroy(); |
| } |
| |
| /** |
| * @param p Partition. |
| * @return Tree name for given partition. |
| */ |
| protected final String treeName(int p) { |
| return BPlusTree.treeName("p-" + p, "CacheData"); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean expire( |
| GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount |
| ) throws IgniteCheckedException { |
| assert !cctx.isNear() : cctx.name(); |
| |
| assert pendingEntries != null; |
| |
| int cleared = expireInternal(cctx, c, amount); |
| |
| return amount != -1 && cleared >= amount; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param c Closure. |
| * @param amount Limit of processed entries by single call, {@code -1} for no limit. |
| * @return cleared entries count. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private int expireInternal( |
| GridCacheContext cctx, |
| IgniteInClosure2X<GridCacheEntryEx, GridCacheVersion> c, |
| int amount |
| ) throws IgniteCheckedException { |
| GridCacheVersion obsoleteVer = null; |
| |
| cctx.shared().database().checkpointReadLock(); |
| |
| try { |
| int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| if (!busyLock.enterBusy()) |
| return 0; |
| |
| try { |
| List<PendingRow> rows = pendingEntries.remove( |
| new PendingRow(cacheId, Long.MIN_VALUE, 0), new PendingRow(cacheId, U.currentTimeMillis(), 0), amount); |
| |
| for (PendingRow row : rows) { |
| if (row.key.partition() == -1) |
| row.key.partition(cctx.affinity().partition(row.key)); |
| |
| assert row.key != null && row.link != 0 && row.expireTime != 0 : row; |
| |
| if (obsoleteVer == null) |
| obsoleteVer = cctx.cache().nextVersion(); |
| |
| GridCacheEntryEx entry = cctx.cache().entryEx(row.key instanceof KeyCacheObjectImpl |
| ? new ExpiredKeyCacheObject((KeyCacheObjectImpl)row.key, row.expireTime, row.link) : row.key); |
| |
| if (entry != null) |
| c.apply(entry, obsoleteVer); |
| } |
| |
| return rows.size(); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| finally { |
| cctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long expiredSize() throws IgniteCheckedException { |
| return pendingEntries != null ? pendingEntries.size() : 0; |
| } |
| |
| /** |
| * |
| */ |
| public static class CacheDataStoreImpl implements CacheDataStore { |
| /** */ |
| private final int partId; |
| |
| /** */ |
| private final CacheDataRowStore rowStore; |
| |
| /** */ |
| private final CacheDataTree dataTree; |
| |
| /** */ |
| private final Supplier<PendingEntriesTree> pendingEntries; |
| |
| /** */ |
| private final CacheGroupContext grp; |
| |
| /** */ |
| private final GridSpinBusyLock busyLock; |
| |
| /** Update counter. */ |
| protected final PartitionUpdateCounter pCntr; |
| |
| /** Partition size. */ |
| private final LongAdder storageSize = new LongAdder(); |
| |
| /** */ |
| private final IntMap<AtomicLong> cacheSizes = new IntRWHashMap<>(); |
| |
| /** */ |
| private final IgniteLogger log; |
| |
| /** */ |
| private final Boolean failNodeOnPartitionInconsistency = Boolean.getBoolean( |
| IgniteSystemProperties.IGNITE_FAIL_NODE_ON_UNRECOVERABLE_PARTITION_INCONSISTENCY |
| ); |
| |
| /** */ |
| private final int updateValSizeThreshold; |
| |
| /** */ |
| private volatile GridQueryRowCacheCleaner rowCacheCleaner; |
| |
| /** |
| * @param partId Partition number. |
| * @param rowStore Row store. |
| * @param dataTree Data tree. |
| */ |
| public CacheDataStoreImpl( |
| int partId, |
| CacheDataRowStore rowStore, |
| CacheDataTree dataTree, |
| Supplier<PendingEntriesTree> pendingEntries, |
| CacheGroupContext grp, |
| GridSpinBusyLock busyLock, |
| IgniteLogger log, |
| @Nullable Supplier<GridQueryRowCacheCleaner> cleaner |
| ) { |
| this.partId = partId; |
| this.rowStore = rowStore; |
| this.dataTree = dataTree; |
| this.pendingEntries = pendingEntries; |
| this.grp = grp; |
| this.busyLock = busyLock; |
| this.log = log; |
| |
| PartitionUpdateCounter delegate = !grp.persistenceEnabled() || grp.hasAtomicCaches() ? |
| new PartitionUpdateCounterVolatileImpl(grp) : |
| new PartitionUpdateCounterTrackingImpl(grp); |
| |
| pCntr = grp.shared().logger(PartitionUpdateCounterDebugWrapper.class).isDebugEnabled() ? |
| new PartitionUpdateCounterDebugWrapper(partId, delegate) : new PartitionUpdateCounterErrorWrapper(partId, delegate); |
| |
| updateValSizeThreshold = grp.shared().database().pageSize() / 2; |
| |
| if (cleaner == null) |
| rowStore.setRowCacheCleaner(() -> rowCacheCleaner); |
| else |
| rowStore.setRowCacheCleaner(cleaner); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataTree tree() { |
| return dataTree; |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| */ |
| void incrementSize(int cacheId) { |
| updateSize(cacheId, 1); |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| */ |
| void decrementSize(int cacheId) { |
| updateSize(cacheId, -1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean init() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partId() { |
| return partId; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long cacheSize(int cacheId) { |
| if (grp.sharedGroup()) { |
| AtomicLong size = cacheSizes.get(cacheId); |
| |
| return size != null ? (int)size.get() : 0; |
| } |
| |
| return storageSize.sum(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Map<Integer, Long> cacheSizes() { |
| if (!grp.sharedGroup()) |
| return null; |
| |
| Map<Integer, Long> res = new HashMap<>(); |
| |
| cacheSizes.forEach((key, val) -> res.put(key, val.longValue())); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long fullSize() { |
| return storageSize.sum(); |
| } |
| |
| /** |
| * @return {@code True} if there are no items in the store. |
| */ |
| @Override public boolean isEmpty() { |
| return storageSize.sum() == 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateSize(int cacheId, long delta) { |
| storageSize.add(delta); |
| |
| if (grp.sharedGroup()) { |
| AtomicLong size = cacheSizes.get(cacheId); |
| |
| if (size == null) { |
| AtomicLong old = cacheSizes.putIfAbsent(cacheId, size = new AtomicLong()); |
| |
| if (old != null) |
| size = old; |
| } |
| |
| size.addAndGet(delta); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long nextUpdateCounter() { |
| return pCntr.next(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long initialUpdateCounter() { |
| return pCntr.initial(); |
| } |
| |
| /** {@inheritDoc} |
| * @param start Start. |
| * @param delta Delta. |
| */ |
| @Override public void updateInitialCounter(long start, long delta) { |
| pCntr.updateInitial(start, delta); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long getAndIncrementUpdateCounter(long delta) { |
| return pCntr.reserve(delta); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long updateCounter() { |
| return pCntr.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long highestAppliedCounter() { |
| return pCntr.highestAppliedCounter(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long reservedCounter() { |
| return pCntr.reserved(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PartitionUpdateCounter partUpdateCounter() { |
| return pCntr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long reserve(long delta) { |
| return pCntr.reserve(delta); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateCounter(long val) { |
| try { |
| pCntr.update(val); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to update partition counter. " + |
| "Most probably a node with most actual data is out of topology or data streamer is used " + |
| "in preload mode (allowOverride=false) concurrently with cache transactions [grpName=" + |
| grp.cacheOrGroupName() + ", partId=" + partId + ']', e); |
| |
| if (failNodeOnPartitionInconsistency) |
| grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean updateCounter(long start, long delta) { |
| return pCntr.update(start, delta); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridLongList finalizeUpdateCounters() { |
| return pCntr.finalizeUpdateCounters(); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param oldRow Old row. |
| * @param dataRow New row. |
| * @return {@code True} if it is possible to update old row data. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private boolean canUpdateOldRow(GridCacheContext cctx, @Nullable CacheDataRow oldRow, DataRow dataRow) |
| throws IgniteCheckedException { |
| if (oldRow == null || cctx.queries().enabled()) |
| return false; |
| |
| if (oldRow.expireTime() != dataRow.expireTime()) |
| return false; |
| |
| int oldLen = oldRow.size(); |
| |
| // Use grp.sharedGroup() flag since it is possible cacheId is not yet set here. |
| if (!grp.storeCacheIdInDataPage() && grp.sharedGroup() && oldRow.cacheId() != CU.UNDEFINED_CACHE_ID) |
| oldLen -= 4; |
| |
| if (oldLen > updateValSizeThreshold) |
| return false; |
| |
| int newLen = dataRow.size(); |
| |
| return oldLen == newLen; |
| } |
| |
| /** */ |
| private IgniteCheckedException operationCancelledException() { |
| if (grp.isPreparedToStop()) { |
| return new IgniteCheckedException("Operation has been cancelled (cache group: " + |
| grp.cacheOrGroupName() + " is stopping)."); |
| } |
| else |
| return new NodeStoppingException("Operation has been cancelled (node is stopping)."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void invoke(GridCacheContext cctx, KeyCacheObject key, OffheapInvokeClosure c) |
| throws IgniteCheckedException { |
| if (!busyLock.enterBusy()) |
| throw operationCancelledException(); |
| |
| int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| try { |
| invoke0(cctx, new SearchRow(cacheId, key), c); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param row Search row. |
| * @param c Closure. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void invoke0(GridCacheContext cctx, CacheSearchRow row, OffheapInvokeClosure c) |
| throws IgniteCheckedException { |
| assert cctx.shared().database().checkpointLockIsHeldByThread(); |
| |
| dataTree.invoke(row, CacheDataRowAdapter.RowData.NO_KEY, c); |
| |
| switch (c.operationType()) { |
| case PUT: { |
| assert c.newRow() != null : c; |
| |
| CacheDataRow oldRow = c.oldRow(); |
| |
| finishUpdate(cctx, c.newRow(), oldRow, c.oldRowExpiredFlag()); |
| |
| break; |
| } |
| |
| case REMOVE: { |
| CacheDataRow oldRow = c.oldRow(); |
| |
| finishRemove(cctx, row.key(), oldRow); |
| |
| break; |
| } |
| |
| case NOOP: |
| case IN_PLACE: |
| break; |
| |
| default: |
| assert false : c.operationType(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow createRow( |
| GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| @Nullable CacheDataRow oldRow) throws IgniteCheckedException { |
| int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| DataRow dataRow = makeDataRow(key, val, ver, expireTime, cacheId); |
| |
| if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow, grp.statisticsHolderData())) |
| dataRow.link(oldRow.link()); |
| else { |
| CacheObjectContext coCtx = cctx.cacheObjectContext(); |
| |
| key.valueBytes(coCtx); |
| val.valueBytes(coCtx); |
| |
| rowStore.addRow(dataRow, grp.statisticsHolderData()); |
| } |
| |
| assert dataRow.link() != 0 : dataRow; |
| |
| if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) |
| dataRow.cacheId(cctx.cacheId()); |
| |
| return dataRow; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void insertRows(Collection<DataRowCacheAware> rows, |
| IgnitePredicateX<CacheDataRow> initPred) throws IgniteCheckedException { |
| if (!busyLock.enterBusy()) |
| throw operationCancelledException(); |
| |
| try { |
| rowStore.addRows(F.view(rows, row -> row.value() != null), grp.statisticsHolderData()); |
| |
| boolean cacheIdAwareGrp = grp.sharedGroup() || grp.storeCacheIdInDataPage(); |
| |
| for (DataRowCacheAware row : rows) { |
| row.storeCacheId(cacheIdAwareGrp); |
| |
| if (!initPred.applyx(row) && row.value() != null) |
| rowStore.removeRow(row.link(), grp.statisticsHolderData()); |
| } |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param key Cache key. |
| * @param val Cache value. |
| * @param ver Version. |
| * @param expireTime Expired time. |
| * @param cacheId Cache id. |
| * @return Made data row. |
| */ |
| @NotNull private DataRow makeDataRow(KeyCacheObject key, CacheObject val, GridCacheVersion ver, long expireTime, |
| int cacheId) { |
| if (key.partition() == -1) |
| key.partition(partId); |
| |
| return new DataRow(key, val, ver, partId, expireTime, cacheId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void update(GridCacheContext cctx, |
| KeyCacheObject key, |
| CacheObject val, |
| GridCacheVersion ver, |
| long expireTime, |
| @Nullable CacheDataRow oldRow |
| ) throws IgniteCheckedException { |
| assert oldRow == null || oldRow.link() != 0L : oldRow; |
| |
| if (!busyLock.enterBusy()) |
| throw operationCancelledException(); |
| |
| try { |
| int cacheId = grp.storeCacheIdInDataPage() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| assert oldRow == null || oldRow.cacheId() == cacheId : oldRow; |
| |
| DataRow dataRow = makeDataRow(key, val, ver, expireTime, cacheId); |
| |
| CacheObjectContext coCtx = cctx.cacheObjectContext(); |
| |
| // Make sure value bytes initialized. |
| key.valueBytes(coCtx); |
| val.valueBytes(coCtx); |
| |
| CacheDataRow old; |
| |
| assert cctx.shared().database().checkpointLockIsHeldByThread(); |
| |
| if (canUpdateOldRow(cctx, oldRow, dataRow) && rowStore.updateRow(oldRow.link(), dataRow, grp.statisticsHolderData())) { |
| old = oldRow; |
| |
| dataRow.link(oldRow.link()); |
| } |
| else { |
| rowStore.addRow(dataRow, grp.statisticsHolderData()); |
| |
| assert dataRow.link() != 0 : dataRow; |
| |
| if (grp.sharedGroup() && dataRow.cacheId() == CU.UNDEFINED_CACHE_ID) |
| dataRow.cacheId(cctx.cacheId()); |
| |
| if (oldRow != null) { |
| old = oldRow; |
| |
| dataTree.putx(dataRow); |
| } |
| else |
| old = dataTree.put(dataRow); |
| } |
| |
| finishUpdate(cctx, dataRow, old); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param newRow New row. |
| * @param oldRow Old row if available. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow) |
| throws IgniteCheckedException { |
| finishUpdate(cctx, newRow, oldRow, false); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param newRow New row. |
| * @param oldRow Old row if available. |
| * @param oldRowExpired Old row expiration flag |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void finishUpdate(GridCacheContext cctx, CacheDataRow newRow, @Nullable CacheDataRow oldRow, boolean oldRowExpired) |
| throws IgniteCheckedException { |
| if (oldRow == null && !oldRowExpired) |
| incrementSize(cctx.cacheId()); |
| |
| GridCacheQueryManager qryMgr = cctx.queries(); |
| |
| if (qryMgr.enabled()) |
| qryMgr.store(newRow, oldRow, true); |
| |
| updatePendingEntries(cctx, newRow, oldRow); |
| |
| if (oldRow != null) { |
| assert oldRow.link() != 0 : oldRow; |
| |
| if (newRow.link() != oldRow.link()) |
| rowStore.removeRow(oldRow.link(), grp.statisticsHolderData()); |
| } |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param newRow New row. |
| * @param oldRow Old row. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void updatePendingEntries( |
| GridCacheContext cctx, |
| CacheDataRow newRow, |
| @Nullable CacheDataRow oldRow |
| ) throws IgniteCheckedException { |
| long expireTime = newRow.expireTime(); |
| |
| int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| if (oldRow != null) { |
| assert oldRow.link() != 0 : oldRow; |
| |
| if (pendingTree() != null && oldRow.expireTime() != 0) |
| pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); |
| } |
| |
| if (pendingTree() != null && expireTime != 0) { |
| pendingTree().putx(new PendingRow(cacheId, expireTime, newRow.link())); |
| |
| if (!cctx.ttl().hasPendingEntries()) |
| cctx.ttl().hasPendingEntries(true); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove(GridCacheContext cctx, KeyCacheObject key, int partId) throws IgniteCheckedException { |
| if (!busyLock.enterBusy()) |
| throw operationCancelledException(); |
| |
| try { |
| int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| assert cctx.shared().database().checkpointLockIsHeldByThread(); |
| |
| CacheDataRow oldRow = dataTree.remove(new SearchRow(cacheId, key)); |
| |
| finishRemove(cctx, key, oldRow); |
| } |
| finally { |
| busyLock.leaveBusy(); |
| } |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param key Key. |
| * @param oldRow Removed row. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void finishRemove(GridCacheContext cctx, KeyCacheObject key, @Nullable CacheDataRow oldRow) throws IgniteCheckedException { |
| if (oldRow != null) { |
| if (!(key instanceof ExpiredKeyCacheObject) |
| || ((ExpiredKeyCacheObject)key).expireTime != oldRow.expireTime() |
| || ((ExpiredKeyCacheObject)key).link != oldRow.link() |
| ) |
| clearPendingEntries(cctx, oldRow); |
| |
| decrementSize(cctx.cacheId()); |
| } |
| |
| GridCacheQueryManager qryMgr = cctx.queries(); |
| |
| if (qryMgr.enabled()) |
| qryMgr.remove(key, oldRow); |
| |
| if (oldRow != null) |
| rowStore.removeRow(oldRow.link(), grp.statisticsHolderData()); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param oldRow Old row. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void clearPendingEntries(GridCacheContext cctx, CacheDataRow oldRow) |
| throws IgniteCheckedException { |
| int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| assert oldRow.link() != 0 : oldRow; |
| assert cacheId == CU.UNDEFINED_CACHE_ID || oldRow.cacheId() == cacheId : |
| "Incorrect cache ID [expected=" + cacheId + ", actual=" + oldRow.cacheId() + "]."; |
| |
| if (pendingTree() != null && oldRow.expireTime() != 0) |
| pendingTree().removex(new PendingRow(cacheId, oldRow.expireTime(), oldRow.link())); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheDataRow find(GridCacheContext cctx, KeyCacheObject key) throws IgniteCheckedException { |
| key.valueBytes(cctx.cacheObjectContext()); |
| |
| int cacheId = grp.sharedGroup() ? cctx.cacheId() : CU.UNDEFINED_CACHE_ID; |
| |
| CacheDataRow row = dataTree.findOne(new SearchRow(cacheId, key), CacheDataRowAdapter.RowData.NO_KEY); |
| |
| afterRowFound(row, key); |
| |
| return row; |
| } |
| |
| /** |
| * @param row Row. |
| * @param key Key. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void afterRowFound(@Nullable CacheDataRow row, KeyCacheObject key) throws IgniteCheckedException { |
| if (row != null) { |
| row.key(key); |
| |
| grp.dataRegion().evictionTracker().touchPage(row.link()); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor() throws IgniteCheckedException { |
| return dataTree.find(null, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(Object x) throws IgniteCheckedException { |
| return dataTree.find(null, null, x); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(MvccSnapshot mvccSnapshot) |
| throws IgniteCheckedException { |
| |
| return dataTree.find(null, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId) throws IgniteCheckedException { |
| return cursor(cacheId, null, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, |
| MvccSnapshot mvccSnapshot) throws IgniteCheckedException { |
| return cursor(cacheId, null, null, null, mvccSnapshot); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, |
| KeyCacheObject upper) throws IgniteCheckedException { |
| return cursor(cacheId, lower, upper, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, |
| KeyCacheObject upper, Object x) throws IgniteCheckedException { |
| return cursor(cacheId, lower, upper, null, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCursor<? extends CacheDataRow> cursor(int cacheId, KeyCacheObject lower, |
| KeyCacheObject upper, Object x, MvccSnapshot snapshot) throws IgniteCheckedException { |
| SearchRow lowerRow; |
| SearchRow upperRow; |
| |
| if (grp.sharedGroup()) { |
| assert cacheId != CU.UNDEFINED_CACHE_ID; |
| |
| lowerRow = lower != null ? new SearchRow(cacheId, lower) : new SearchRow(cacheId); |
| upperRow = upper != null ? new SearchRow(cacheId, upper) : new SearchRow(cacheId); |
| } |
| else { |
| lowerRow = lower != null ? new SearchRow(CU.UNDEFINED_CACHE_ID, lower) : null; |
| upperRow = upper != null ? new SearchRow(CU.UNDEFINED_CACHE_ID, upper) : null; |
| } |
| |
| return dataTree.find(lowerRow, upperRow, x); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void destroy() throws IgniteCheckedException { |
| AtomicReference<IgniteCheckedException> exRef = new AtomicReference<>(); |
| |
| dataTree.destroy(row -> { |
| try { |
| rowStore.removeRow(row.link(), grp.statisticsHolderData()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to remove row [link=" + row.link() + "]"); |
| |
| IgniteCheckedException ex = exRef.get(); |
| |
| if (ex == null) |
| exRef.set(e); |
| else |
| ex.addSuppressed(e); |
| } |
| }, false); |
| |
| if (exRef.get() != null) |
| throw new IgniteCheckedException("Failed to destroy store", exRef.get()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void markDestroyed() { |
| dataTree.markDestroyed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean destroyed() { |
| return dataTree.destroyed(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear(int cacheId) throws IgniteCheckedException { |
| assert cacheId != CU.UNDEFINED_CACHE_ID; |
| |
| if (cacheSize(cacheId) == 0) |
| return; |
| |
| Exception ex = null; |
| |
| GridCursor<? extends CacheDataRow> cur = |
| cursor(cacheId, null, null, CacheDataRowAdapter.RowData.KEY_ONLY); |
| |
| int rmv = 0; |
| |
| while (cur.next()) { |
| if (++rmv == BATCH_SIZE) { |
| grp.shared().database().checkpointReadUnlock(); |
| |
| rmv = 0; |
| |
| grp.shared().database().checkpointReadLock(); |
| } |
| |
| CacheDataRow row = cur.get(); |
| |
| assert row.link() != 0 : row; |
| |
| try { |
| boolean res = dataTree.removex(row); |
| |
| assert res : row; |
| |
| rowStore.removeRow(row.link(), grp.statisticsHolderData()); |
| |
| decrementSize(cacheId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Fail remove row [link=" + row.link() + "]"); |
| |
| if (ex == null) |
| ex = e; |
| else |
| ex.addSuppressed(e); |
| } |
| } |
| |
| if (ex != null) |
| throw new IgniteCheckedException("Fail destroy store", ex); |
| |
| // Allow checkpointer to progress if a partition contains less than BATCH_SIZE keys. |
| grp.shared().database().checkpointReadUnlock(); |
| |
| grp.shared().database().checkpointReadLock(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public RowStore rowStore() { |
| return rowStore; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void setRowCacheCleaner(GridQueryRowCacheCleaner rowCacheCleaner) { |
| this.rowCacheCleaner = rowCacheCleaner; |
| } |
| |
| /** |
| * @param size Size to init. |
| * @param updCntr Update counter. |
| * @param cacheSizes Cache sizes if store belongs to group containing multiple caches. |
| * @param cntrUpdData Counter updates. |
| */ |
| public void restoreState(long size, long updCntr, @Nullable Map<Integer, Long> cacheSizes, byte[] cntrUpdData) { |
| pCntr.init(updCntr, cntrUpdData); |
| |
| storageSize.reset(); |
| storageSize.add(size); |
| |
| if (cacheSizes != null) { |
| for (Map.Entry<Integer, Long> e : cacheSizes.entrySet()) |
| this.cacheSizes.put(e.getKey(), new AtomicLong(e.getValue())); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PendingEntriesTree pendingTree() { |
| return pendingEntries.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void preload() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetUpdateCounter() { |
| pCntr.reset(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetInitialUpdateCounter() { |
| pCntr.resetInitialCounter(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public PartitionMetaStorage<SimpleDataRow> partStorage() { |
| return null; |
| } |
| } |
| |
| /** |
| * This entry key is used to indicate that an expired entry has already been deleted from |
| * PendingEntriesTree and doesn't need to participate in PendingEntriesTree cleanup again. |
| */ |
| private static class ExpiredKeyCacheObject extends KeyCacheObjectImpl { |
| /** Serial version uid. */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private long expireTime; |
| |
| /** */ |
| private long link; |
| |
| /** */ |
| private ExpiredKeyCacheObject(KeyCacheObjectImpl keyCacheObj, long expireTime, long link) { |
| super(keyCacheObj.val, keyCacheObj.valBytes, keyCacheObj.partition()); |
| |
| this.expireTime = expireTime; |
| |
| this.link = link; |
| } |
| |
| /** */ |
| public ExpiredKeyCacheObject() { |
| } |
| } |
| } |