| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht.topology; |
| |
| import java.util.Collection; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.NavigableSet; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedDeque; |
| import java.util.concurrent.ConcurrentMap; |
| import java.util.concurrent.CopyOnWriteArrayList; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.stream.Collectors; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheGroupContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; |
| import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.TxCounters; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.collection.IntMap; |
| import org.apache.ignite.internal.util.collection.IntRWHashMap; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.GridIterator; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.SB; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.util.deque.FastSizeDeque; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED; |
| import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING; |
| import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED; |
| |
| /** |
| * Key partition. |
| */ |
| public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable { |
| /** */ |
| private static final GridCacheMapEntryFactory ENTRY_FACTORY = GridDhtCacheEntry::new; |
| |
| /** @see IgniteSystemProperties#IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE */ |
| public static final int DFLT_ATOMIC_CACHE_DELETE_HISTORY_SIZE = 200_000; |
| |
| /** Maximum size for delete queue. */ |
| public static final int MAX_DELETE_QUEUE_SIZE = |
| Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, DFLT_ATOMIC_CACHE_DELETE_HISTORY_SIZE); |
| |
| /** @see IgniteSystemProperties#IGNITE_CACHE_REMOVED_ENTRIES_TTL */ |
| public static final int DFLT_CACHE_REMOVE_ENTRIES_TTL = 10_000; |
| |
| /** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */ |
| private static boolean forceTestCheckpointOnEviction = IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false); |
| |
| /** ONLY FOR TEST PURPOSES: partition id where test checkpoint was enforced during eviction. */ |
| static volatile Integer partWhereTestCheckpointEnforced; |
| |
| /** Maximum size for {@link #rmvQueue}. */ |
| private final int rmvQueueMaxSize; |
| |
| /** Removed items TTL. */ |
| private final long rmvdEntryTtl; |
| |
| /** Static logger to avoid re-creation. */ |
| private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); |
| |
| /** Logger. */ |
| private static volatile IgniteLogger log; |
| |
| /** Partition ID. */ |
| private final int id; |
| |
| /** State. 32 bits - size, 16 bits - reservations, 13 bits - reserved, 3 bits - GridDhtPartitionState. */ |
| @GridToStringExclude |
| private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32); |
| |
| /** Rent future. */ |
| @GridToStringExclude |
| private final GridFutureAdapter<?> rent; |
| |
| /** */ |
| @GridToStringExclude |
| private final GridCacheSharedContext ctx; |
| |
| /** */ |
| @GridToStringExclude |
| private final CacheGroupContext grp; |
| |
| /** Create time. */ |
| @GridToStringExclude |
| private final long createTime = U.currentTimeMillis(); |
| |
| /** */ |
| @GridToStringExclude |
| private final IntMap<CacheMapHolder> cacheMaps; |
| |
| /** */ |
| @GridToStringExclude |
| private final CacheMapHolder singleCacheEntryMap; |
| |
| /** Remove queue. */ |
| @GridToStringExclude |
| private final FastSizeDeque<RemovedEntryHolder> rmvQueue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>()); |
| |
| /** Group reservations. */ |
| @GridToStringExclude |
| private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>(); |
| |
| /** */ |
| @GridToStringExclude |
| private volatile CacheDataStore store; |
| |
| /** Set if failed to move partition to RENTING state due to reservations, to be checked when |
| * reservation is released. */ |
| private volatile boolean delayedRenting; |
| |
| /** */ |
| private final AtomicReference<GridFutureAdapter<?>> finishFutRef = new AtomicReference<>(); |
| |
| /** */ |
| private volatile long clearVer; |
| |
| /** |
| * @param ctx Context. |
| * @param grp Cache group. |
| * @param id Partition ID. |
| * @param recovery Flag indicates that partition is created during recovery phase. |
| */ |
| public GridDhtLocalPartition( |
| GridCacheSharedContext ctx, |
| CacheGroupContext grp, |
| int id, |
| boolean recovery |
| ) { |
| super(ENTRY_FACTORY); |
| |
| this.id = id; |
| this.ctx = ctx; |
| this.grp = grp; |
| |
| log = U.logger(ctx.kernalContext(), logRef, this); |
| |
| if (grp.sharedGroup()) { |
| singleCacheEntryMap = null; |
| cacheMaps = new IntRWHashMap<>(); |
| } |
| else { |
| GridCacheContext cctx = grp.singleCacheContext(); |
| |
| if (cctx.isNear()) |
| cctx = cctx.near().dht().context(); |
| |
| singleCacheEntryMap = ctx.kernalContext().resource().resolve( |
| new CacheMapHolder(cctx, createEntriesMap())); |
| |
| cacheMaps = null; |
| } |
| |
| rent = new GridFutureAdapter<Object>() { |
| @Override public String toString() { |
| return "PartitionRentFuture [part=" + GridDhtLocalPartition.this + ']'; |
| } |
| }; |
| |
| int delQueueSize = grp.systemCache() ? 100 : |
| Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20); |
| |
| rmvQueueMaxSize = U.ceilPow2(delQueueSize); |
| |
| rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, DFLT_CACHE_REMOVE_ENTRIES_TTL); |
| |
| try { |
| store = grp.offheap().createCacheDataStore(id); |
| |
| // Log partition creation for further crash recovery purposes. |
| if (grp.walEnabled() && !recovery) |
| ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0)); |
| |
| // Inject row cache cleaner on store creation. |
| // Used in case the cache with enabled SqlOnheapCache is single cache at the cache group. |
| if (ctx.kernalContext().query().moduleEnabled()) |
| store.setRowCacheCleaner(ctx.kernalContext().indexProcessor().rowCacheCleaner(grp.groupId())); |
| } |
| catch (IgniteCheckedException e) { |
| // TODO ignite-db |
| throw new IgniteException(e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partition has been created [grp=" + grp.cacheOrGroupName() |
| + ", p=" + id + ", state=" + state() + "]"); |
| |
| clearVer = ctx.versions().localOrder(); |
| } |
| |
| /** |
| * @return Entries map. |
| */ |
| private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> createEntriesMap() { |
| return new ConcurrentHashMap<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()), |
| 0.75f, |
| Runtime.getRuntime().availableProcessors() * 2); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int internalSize() { |
| if (grp.sharedGroup()) { |
| final AtomicInteger size = new AtomicInteger(0); |
| |
| cacheMaps.forEach((key, hld) -> size.addAndGet(hld.map.size())); |
| |
| return size.get(); |
| } |
| |
| return singleCacheEntryMap.map.size(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected CacheMapHolder entriesMap(GridCacheContext cctx) { |
| if (grp.sharedGroup()) |
| return cacheMapHolder(cctx); |
| |
| return singleCacheEntryMap; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) { |
| return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @return Map holder. |
| */ |
| private CacheMapHolder cacheMapHolder(GridCacheContext cctx) { |
| assert grp.sharedGroup(); |
| |
| CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed()); |
| |
| if (hld != null) |
| return hld; |
| |
| if (cctx.isNear()) |
| cctx = cctx.near().dht().context(); |
| |
| CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = ctx.kernalContext().resource().resolve( |
| new CacheMapHolder(cctx, createEntriesMap()))); |
| |
| if (old != null) |
| hld = old; |
| |
| return hld; |
| } |
| |
| /** |
| * @return Data store. |
| */ |
| public CacheDataStore dataStore() { |
| return store; |
| } |
| |
| /** |
| * Adds group reservation to this partition. |
| * |
| * @param r Reservation. |
| * @return {@code false} If such reservation already added. |
| */ |
| public boolean addReservation(GridDhtPartitionsReservation r) { |
| assert (getPartState(state.get())) != EVICTED : "we can reserve only active partitions"; |
| assert (getReservations(state.get())) != 0 : "partition must be already reserved before adding group reservation"; |
| |
| return reservations.addIfAbsent(r); |
| } |
| |
| /** |
| * @param r Reservation. |
| */ |
| public void removeReservation(GridDhtPartitionsReservation r) { |
| if (!reservations.remove(r)) |
| throw new IllegalStateException("Reservation was already removed."); |
| } |
| |
| /** |
| * @return Partition ID. |
| */ |
| public int id() { |
| return id; |
| } |
| |
| /** |
| * @return Create time. |
| */ |
| public long createTime() { |
| return createTime; |
| } |
| |
| /** |
| * @return Partition state. |
| */ |
| public GridDhtPartitionState state() { |
| return getPartState(state.get()); |
| } |
| |
| /** |
| * @return Reservations. |
| */ |
| public int reservations() { |
| return getReservations(state.get()); |
| } |
| |
| /** |
| * @return {@code True} if partition is empty. |
| */ |
| public boolean isEmpty() { |
| return store.isEmpty() && internalSize() == 0; |
| } |
| |
| /** |
| * @return If partition is moving or owning or renting. |
| */ |
| public boolean valid() { |
| GridDhtPartitionState state = state(); |
| |
| return state == MOVING || state == OWNING || state == RENTING; |
| } |
| |
| /** |
| * @param entry Entry to remove. |
| */ |
| public void onRemoved(GridDhtCacheEntry entry) { |
| assert entry.obsolete() : entry; |
| |
| // Make sure to remove exactly this entry. |
| removeEntry(entry); |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| * @param key Key. |
| * @param ver Version. |
| */ |
| private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) { |
| CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap; |
| |
| GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null; |
| |
| if (entry != null && entry.markObsoleteVersion(ver)) |
| removeEntry(entry); |
| } |
| |
| /** |
| * TODO FIXME Get rid of deferred delete queue https://issues.apache.org/jira/browse/IGNITE-11704 |
| */ |
| void cleanupRemoveQueue() { |
| if (state() == MOVING) { |
| if (rmvQueue.sizex() >= rmvQueueMaxSize) { |
| LT.warn(log, "Deletion queue cleanup for moving partition was delayed until rebalance is finished. " + |
| "[grpId=" + this.grp.groupId() + |
| ", partId=" + id() + |
| ", grpParts=" + this.grp.affinity().partitions() + |
| ", maxRmvQueueSize=" + rmvQueueMaxSize + ']'); |
| } |
| |
| return; |
| } |
| |
| while (rmvQueue.sizex() >= rmvQueueMaxSize) { |
| RemovedEntryHolder item = rmvQueue.pollFirst(); |
| |
| if (item != null) |
| removeVersionedEntry(item.cacheId(), item.key(), item.version()); |
| } |
| |
| if (!grp.isDrEnabled()) { |
| RemovedEntryHolder item = rmvQueue.peekFirst(); |
| |
| while (item != null && item.expireTime() < U.currentTimeMillis()) { |
| item = rmvQueue.pollFirst(); |
| |
| if (item == null) |
| break; |
| |
| removeVersionedEntry(item.cacheId(), item.key(), item.version()); |
| |
| item = rmvQueue.peekFirst(); |
| } |
| } |
| } |
| |
| /** |
| * @param cacheId cacheId Cache ID. |
| * @param key Removed key. |
| * @param ver Removed version. |
| */ |
| public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) { |
| cleanupRemoveQueue(); |
| |
| rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl)); |
| } |
| |
| /** |
| * Reserves the partition so it won't be cleared or evicted. |
| * Only MOVING, OWNING and LOST partitions can be reserved. |
| * |
| * @return {@code True} if reserved. |
| */ |
| @Override public boolean reserve() { |
| while (true) { |
| long state = this.state.get(); |
| |
| int ordinal = ordinal(state); |
| |
| if (ordinal == RENTING.ordinal() || ordinal == EVICTED.ordinal()) |
| return false; |
| |
| long newState = setReservations(state, getReservations(state) + 1); |
| |
| if (this.state.compareAndSet(state, newState)) |
| return true; |
| } |
| } |
| |
| /** |
| * Releases previously reserved partition. |
| */ |
| @Override public void release() { |
| release0(0); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) { |
| if (grp.sharedGroup() && sizeChange != 0) |
| hld.size.addAndGet(sizeChange); |
| |
| release0(sizeChange); |
| } |
| |
| /** |
| * @param sizeChange Size change delta. |
| */ |
| private void release0(int sizeChange) { |
| while (true) { |
| long state = this.state.get(); |
| |
| int reservations = getReservations(state); |
| |
| assert reservations > 0; |
| |
| assert getPartState(state) != EVICTED : this; |
| |
| long newState = setReservations(state, --reservations); |
| newState = setSize(newState, getSize(newState) + sizeChange); |
| |
| assert getSize(newState) == getSize(state) + sizeChange; |
| |
| // Decrement reservations. |
| if (this.state.compareAndSet(state, newState)) { |
| // If no more reservations try to continue delayed renting. |
| if (reservations == 0) |
| tryContinueClearing(); |
| |
| return; |
| } |
| } |
| } |
| |
| /** |
| * @param stateToRestore State to restore. |
| */ |
| public void restoreState(GridDhtPartitionState stateToRestore) { |
| state.set(setPartState(state.get(), stateToRestore)); |
| } |
| |
| /** |
| * For testing purposes only. |
| * @param toState State to set. |
| */ |
| public void setState(GridDhtPartitionState toState) { |
| if (grp.persistenceEnabled() && grp.walEnabled()) { |
| synchronized (this) { |
| long state0 = state.get(); |
| |
| this.state.compareAndSet(state0, setPartState(state0, toState)); |
| |
| try { |
| ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, 0)); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Error while writing to log", e); |
| } |
| } |
| } |
| else |
| restoreState(toState); |
| } |
| |
| /** |
| * @param state Current aggregated value. |
| * @param toState State to switch to. |
| * @return {@code true} if cas succeeds. |
| */ |
| private boolean casState(long state, GridDhtPartitionState toState) { |
| if (grp.persistenceEnabled() && grp.walEnabled()) { |
| synchronized (this) { |
| GridDhtPartitionState prevState = state(); |
| |
| boolean updated = this.state.compareAndSet(state, setPartState(state, toState)); |
| |
| if (updated) { |
| assert toState != EVICTED || reservations() == 0 : this; |
| |
| try { |
| // Optimization: do not log OWNING -> OWNING. |
| if (prevState == OWNING && toState == LOST) |
| return true; |
| |
| // Log LOST partitions as OWNING. |
| ctx.wal().log( |
| new PartitionMetaStateRecord(grp.groupId(), id, toState == LOST ? OWNING : toState, 0)); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to log partition state change to WAL.", e); |
| |
| ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e)); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partition changed state [grp=" + grp.cacheOrGroupName() |
| + ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]"); |
| } |
| |
| return updated; |
| } |
| } |
| else { |
| GridDhtPartitionState prevState = state(); |
| |
| boolean updated = this.state.compareAndSet(state, setPartState(state, toState)); |
| |
| if (updated) { |
| assert toState != EVICTED || reservations() == 0 : this; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partition changed state [grp=" + grp.cacheOrGroupName() |
| + ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]"); |
| } |
| |
| return updated; |
| } |
| } |
| |
| /** |
| * @return {@code True} if transitioned to OWNING state. |
| */ |
| public boolean own() { |
| while (true) { |
| long state = this.state.get(); |
| |
| GridDhtPartitionState partState = getPartState(state); |
| if (partState == RENTING || partState == EVICTED) |
| return false; |
| |
| if (partState == OWNING) |
| return true; |
| |
| assert partState == MOVING || partState == LOST; |
| |
| if (casState(state, OWNING)) |
| return true; |
| } |
| } |
| |
| /** |
| * Forcibly moves partition to a MOVING state. |
| * |
| * @return {@code True} if a partition was switched to MOVING state. |
| */ |
| public boolean moving() { |
| while (true) { |
| long state = this.state.get(); |
| |
| GridDhtPartitionState partState = getPartState(state); |
| |
| if (partState == EVICTED) |
| return false; |
| |
| assert partState == OWNING || partState == RENTING : |
| "Only partitions in state OWNING or RENTING can be moved to MOVING state " + partState + " " + id; |
| |
| if (casState(state, MOVING)) { |
| // The state is switched under global topology lock, safe to record version here. |
| updateClearVersion(); |
| |
| return true; |
| } |
| } |
| } |
| |
| /** |
| * Records a version for row clearing. Must be called when a partition is marked for full rebalancing. |
| * @see #clearAll(EvictionContext) |
| */ |
| public void updateClearVersion() { |
| clearVer = ctx.versions().localOrder(); |
| } |
| |
| /** |
| * @return {@code True} if partition state changed. |
| */ |
| public boolean markLost() { |
| while (true) { |
| long state = this.state.get(); |
| |
| GridDhtPartitionState partState = getPartState(state); |
| |
| if (partState == LOST) |
| return false; |
| |
| if (casState(state, LOST)) |
| return true; |
| } |
| } |
| |
| /** |
| * Initiates partition eviction process and returns an eviction future. |
| * Future will be completed when a partition is moved to EVICTED state (possibly not yet physically deleted). |
| * |
| * If partition has reservations, eviction will be delayed and continued after all reservations will be released. |
| * |
| * @return Future to signal that this node is no longer an owner or backup or null if corresponding partition |
| * state is {@code RENTING} or {@code EVICTED}. |
| */ |
| public IgniteInternalFuture<?> rent() { |
| long state0 = this.state.get(); |
| |
| GridDhtPartitionState partState = getPartState(state0); |
| |
| if (partState == EVICTED) |
| return rent; |
| |
| if (partState == RENTING) { |
| // If for some reason a partition has stuck in renting state try restart clearing. |
| if (finishFutRef.get() == null) |
| clearAsync(); |
| |
| return rent; |
| } |
| |
| if (tryInvalidateGroupReservations() && getReservations(state0) == 0 && casState(state0, RENTING)) { |
| // Evict asynchronously, as the 'rent' method may be called from within write locks on local partition. |
| clearAsync(); |
| } |
| else |
| delayedRenting = true; |
| |
| return rent; |
| } |
| |
| /** |
| * Continue clearing if it was delayed before due to reservation and topology version not changed. |
| */ |
| public void tryContinueClearing() { |
| if (delayedRenting) |
| group().topology().rent(id); |
| } |
| |
| /** |
| * Initiates a partition clearing attempt. |
| * |
| * @return A future what will be finished then a current clearing attempt is done. |
| */ |
| public IgniteInternalFuture<?> clearAsync() { |
| long state = this.state.get(); |
| |
| GridDhtPartitionState partState = getPartState(state); |
| |
| boolean evictionRequested = partState == RENTING; |
| boolean clearingRequested = partState == MOVING; |
| |
| if (!evictionRequested && !clearingRequested) |
| return new GridFinishedFuture<>(); |
| |
| GridFutureAdapter<?> finishFut = new GridFutureAdapter<>(); |
| |
| do { |
| GridFutureAdapter<?> curFut = finishFutRef.get(); |
| |
| if (curFut != null) |
| return curFut; |
| } |
| while (!finishFutRef.compareAndSet(null, finishFut)); |
| |
| finishFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> fut) { |
| // A partition cannot be reused after the eviction, it's not necessary to reset a clearing state. |
| if (state() == EVICTED) |
| rent.onDone(fut.error()); |
| else |
| finishFutRef.set(null); |
| } |
| }); |
| |
| // Evict partition asynchronously to avoid deadlocks. |
| ctx.evict().evictPartitionAsync(grp, this, finishFut); |
| |
| return finishFut; |
| } |
| |
| /** |
| * Invalidates all partition group reservations, so they can't be reserved again any more. |
| * |
| * @return {@code true} If all group reservations are invalidated (or no such reservations). |
| */ |
| private boolean tryInvalidateGroupReservations() { |
| for (GridDhtPartitionsReservation reservation : reservations) { |
| if (!reservation.invalidate()) |
| return false; // Failed to invalidate reservation -> we are reserved. |
| } |
| |
| return true; |
| } |
| |
| /** |
| * @param state State. |
| * @return {@code True} if partition has no reservations and empty. |
| */ |
| private boolean freeAndEmpty(long state) { |
| return isEmpty() && getSize(state) == 0 && getReservations(state) == 0; |
| } |
| |
| /** |
| * Moves partition state to {@code EVICTED} if possible. |
| */ |
| public void finishEviction() { |
| long state0 = this.state.get(); |
| |
| GridDhtPartitionState state = getPartState(state0); |
| |
| // Some entries still might be present in partition cache maps due to concurrent updates on backup nodes, |
| // but it's safe to finish eviction because no physical updates are possible. |
| // A partition is promoted to EVICTED state if it is not reserved and empty. |
| if (store.isEmpty() && getReservations(state0) == 0 && state == RENTING) |
| casState(state0, EVICTED); |
| } |
| |
| /** |
| * @return {@code True} if clearing process is running at the moment on the partition. |
| */ |
| public boolean isClearing() { |
| return finishFutRef.get() != null; |
| } |
| |
| /** |
| * On partition unlock callback. |
| * Tries to continue delayed partition clearing. |
| */ |
| public void onUnlock() { |
| // No-op. |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @return {@code True} if local node is primary for this partition. |
| */ |
| public boolean primary(AffinityTopologyVersion topVer) { |
| List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id); |
| |
| return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0)); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @return {@code True} if local node is backup for this partition. |
| */ |
| public boolean backup(AffinityTopologyVersion topVer) { |
| List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id); |
| |
| return nodes.indexOf(ctx.localNode()) > 0; |
| } |
| |
| /** |
| * Returns new update counter for primary node or passed counter for backup node. |
| * <p> |
| * Used for non-tx cases. |
| * <p> |
| * Counter generation/update logic is delegated to counter implementation. |
| * |
| * @param cacheId ID of cache initiated counter update. |
| * @param topVer Topology version for current operation. |
| * @param init {@code True} if initial update. |
| * @return Next update index. |
| */ |
| public long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, boolean init, |
| @Nullable Long primaryCntr) { |
| long nextCntr; |
| |
| if (primaryCntr == null) // Primary node. |
| nextCntr = store.nextUpdateCounter(); |
| else { |
| assert !init : "Initial update must generate a counter for partition " + this; |
| |
| // Backup. |
| assert primaryCntr != 0; |
| |
| store.updateCounter(nextCntr = primaryCntr); |
| } |
| |
| if (grp.sharedGroup()) |
| grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer, primary); |
| |
| return nextCntr; |
| } |
| |
| /** |
| * Used for transactions. |
| * |
| * @param cacheId Cache id. |
| * @param tx Tx. |
| * @param primaryCntr Primary counter. |
| */ |
| public long nextUpdateCounter(int cacheId, IgniteInternalTx tx, @Nullable Long primaryCntr) { |
| Long nextCntr; |
| |
| if (primaryCntr != null) |
| nextCntr = primaryCntr; |
| else { |
| TxCounters txCounters = tx.txCounters(false); |
| |
| assert txCounters != null : "Must have counters for tx [nearXidVer=" + tx.nearXidVersion() + ']'; |
| |
| // Null must never be returned on primary node. |
| nextCntr = txCounters.generateNextCounter(cacheId, id()); |
| |
| assert nextCntr != null : this; |
| } |
| |
| if (grp.sharedGroup()) |
| grp.onPartitionCounterUpdate(cacheId, id, nextCntr, tx.topologyVersion(), tx.local()); |
| |
| return nextCntr; |
| } |
| |
| /** |
| * @return Current update counter (LWM). |
| */ |
| public long updateCounter() { |
| return store.updateCounter(); |
| } |
| |
| /** |
| * @return Current reserved counter (HWM). |
| */ |
| public long reservedCounter() { |
| return store.reservedCounter(); |
| } |
| |
| /** |
| * @param val Update counter value. |
| */ |
| public void updateCounter(long val) { |
| store.updateCounter(val); |
| } |
| |
| /** |
| * @return Initial update counter. |
| */ |
| public long initialUpdateCounter() { |
| return store.initialUpdateCounter(); |
| } |
| |
| /** |
| * Increments cache update counter on primary node. |
| * |
| * @param delta Value to be added to update counter. |
| * @return Update counter value before update. |
| */ |
| public long getAndIncrementUpdateCounter(long delta) { |
| return store.getAndIncrementUpdateCounter(delta); |
| } |
| |
| /** |
| * Updates MVCC cache update counter on backup node. |
| * |
| * @param start Start position |
| * @param delta Delta. |
| */ |
| public boolean updateCounter(long start, long delta) { |
| return store.updateCounter(start, delta); |
| } |
| |
| /** |
| * Reset partition update counter. |
| */ |
| public void resetUpdateCounter() { |
| store.resetUpdateCounter(); |
| } |
| |
| /** |
| * Reset partition initial update counter. |
| */ |
| public void resetInitialUpdateCounter() { |
| store.resetInitialUpdateCounter(); |
| } |
| |
| /** |
| * @return Total size of all caches. |
| */ |
| public long fullSize() { |
| return store.fullSize(); |
| } |
| |
| /** |
| * Removes all entries and rows from this partition. |
| * |
| * @return Number of rows cleared from page memory. |
| * @throws NodeStoppingException If node stopping. |
| */ |
| protected long clearAll(EvictionContext evictionCtx) throws NodeStoppingException { |
| long order = clearVer; |
| |
| GridCacheVersion clearVer = ctx.versions().startVersion(); |
| |
| GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer); |
| |
| boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED); |
| |
| long cleared = 0; |
| int stopCntr = 0; |
| |
| CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap; |
| |
| try { |
| GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id); |
| |
| while (it0.hasNext()) { |
| if ((stopCntr = (stopCntr + 1) & 1023) == 0 && evictionCtx.shouldStop()) |
| return cleared; |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| CacheDataRow row = it0.next(); |
| |
| // Do not clear fresh rows in case of partition reloading. |
| // This is required because normal updates are possible to moving partition which is currently cleared. |
| // We can clean OWNING partition if a partition has been reset from lost state. |
| // In this case new updates must be preserved. |
| // Partition state can be switched from RENTING to MOVING and vice versa during clearing. |
| long order0 = row.version().order(); |
| |
| if (state() == MOVING && (order0 == 0 /** Inserted by isolated updater. */ || order0 > order)) |
| continue; |
| |
| if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId())) |
| hld = cacheMapHolder(ctx.cacheContext(row.cacheId())); |
| |
| assert hld != null; |
| |
| GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent( |
| hld, |
| hld.cctx, |
| grp.affinity().lastVersion(), |
| row.key(), |
| true, |
| true); |
| |
| assert cached != null : "Expecting the reservation " + this; |
| |
| if (cached.deleted()) |
| continue; |
| |
| if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry) cached).clearInternal(clearVer, extras)) { |
| removeEntry(cached); |
| |
| if (rec && !hld.cctx.config().isEventsDisabled()) { |
| hld.cctx.events().addEvent(cached.partition(), |
| cached.key(), |
| ctx.localNodeId(), |
| null, |
| null, |
| null, |
| EVT_CACHE_REBALANCE_OBJECT_UNLOADED, |
| null, |
| false, |
| cached.rawGet(), |
| cached.hasValue(), |
| null, |
| null, |
| null, |
| false); |
| } |
| |
| cleared++; |
| } |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']'; |
| |
| break; // Partition is already concurrently cleared and evicted. |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| if (forceTestCheckpointOnEviction) { |
| if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) { |
| ctx.database().forceCheckpoint("test").futureFor(FINISHED).get(); |
| |
| log.warning("Forced checkpoint by test reasons for partition: " + this); |
| |
| partWhereTestCheckpointEnforced = id; |
| } |
| } |
| |
| // Attempt to destroy. |
| ((GridDhtPreloader)grp.preloader()).tryFinishEviction(this); |
| } |
| catch (NodeStoppingException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to get iterator for evicted partition: " + id); |
| |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to get iterator for evicted partition: " + id, e); |
| } |
| |
| return cleared; |
| } |
| |
| /** |
| * Removes all deferred delete requests from {@code rmvQueue}. |
| */ |
| public void clearDeferredDeletes() { |
| for (RemovedEntryHolder e : rmvQueue) |
| removeVersionedEntry(e.cacheId(), e.key(), e.version()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return 31 * id + grp.groupId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| if (o == null || getClass() != o.getClass()) |
| return false; |
| |
| GridDhtLocalPartition part = (GridDhtLocalPartition)o; |
| |
| return id == part.id && grp.groupId() == part.group().groupId(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int compareTo(@NotNull GridDhtLocalPartition part) { |
| if (part == null) |
| return 1; |
| |
| return Integer.compare(id, part.id()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtLocalPartition.class, this, |
| "grp", grp.cacheOrGroupName(), |
| "state", state(), |
| "reservations", reservations(), |
| "empty", isEmpty(), |
| "createTime", U.format(createTime), |
| "fullSize", fullSize(), |
| "cntr", dataStore().partUpdateCounter()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int publicSize(int cacheId) { |
| if (grp.sharedGroup()) { |
| CacheMapHolder hld = cacheMaps.get(cacheId); |
| |
| return hld != null ? hld.size.get() : 0; |
| } |
| |
| return getSize(state.get()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { |
| if (grp.sharedGroup()) { |
| if (hld == null) |
| hld = cacheMapHolder(e.context()); |
| |
| hld.size.incrementAndGet(); |
| } |
| |
| while (true) { |
| long state = this.state.get(); |
| |
| if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1))) |
| return; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) { |
| if (grp.sharedGroup()) { |
| if (hld == null) |
| hld = cacheMapHolder(e.context()); |
| |
| hld.size.decrementAndGet(); |
| } |
| |
| while (true) { |
| long state = this.state.get(); |
| |
| assert getPartState(state) != EVICTED; |
| |
| if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1))) |
| return; |
| } |
| } |
| |
| /** |
| * Returns group context. |
| * |
| * @return Group context. |
| */ |
| public CacheGroupContext group() { |
| return grp; |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| */ |
| public void onCacheStopped(int cacheId) { |
| assert grp.sharedGroup() : grp.cacheOrGroupName(); |
| |
| for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) { |
| RemovedEntryHolder e = it.next(); |
| |
| if (e.cacheId() == cacheId) |
| it.remove(); |
| } |
| |
| cacheMaps.remove(cacheId); |
| } |
| |
| /** |
| * @param state Composite state. |
| * @return Partition state. |
| */ |
| private static GridDhtPartitionState getPartState(long state) { |
| return GridDhtPartitionState.fromOrdinal((int)(state & (0x0000000000000007L))); |
| } |
| |
| /** |
| * @param state State. |
| */ |
| private static int ordinal(long state) { |
| return (int)(state & (0x0000000000000007L)); |
| } |
| |
| /** |
| * @param state Composite state to update. |
| * @param partState Partition state. |
| * @return Updated composite state. |
| */ |
| private static long setPartState(long state, GridDhtPartitionState partState) { |
| return (state & (~0x0000000000000007L)) | partState.ordinal(); |
| } |
| |
| /** |
| * @param state Composite state. |
| * @return Reservations. |
| */ |
| private static int getReservations(long state) { |
| return (int)((state & 0x00000000FFFF0000L) >> 16); |
| } |
| |
| /** |
| * @param state Composite state to update. |
| * @param reservations Reservations to set. |
| * @return Updated composite state. |
| */ |
| private static long setReservations(long state, int reservations) { |
| return (state & (~0x00000000FFFF0000L)) | (reservations << 16); |
| } |
| |
| /** |
| * @param state Composite state. |
| * @return Size. |
| */ |
| private static int getSize(long state) { |
| return (int)((state & 0xFFFFFFFF00000000L) >> 32); |
| } |
| |
| /** |
| * @param state Composite state to update. |
| * @param size Size to set. |
| * @return Updated composite state. |
| */ |
| private static long setSize(long state, int size) { |
| return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32); |
| } |
| |
| /** |
| * Flushes pending update counters closing all possible gaps. |
| * |
| * @return Even-length array of pairs [start, end] for each gap. |
| */ |
| public GridLongList finalizeUpdateCounters() { |
| return store.finalizeUpdateCounters(); |
| } |
| |
| /** |
| * Called before next batch is about to be applied during rebalance. Currently used for tests. |
| * |
| * @param last {@code True} if last batch for partition. |
| */ |
| public void beforeApplyBatch(boolean last) { |
| // No-op. |
| } |
| |
| /** |
| * Removed entry holder. |
| */ |
| private static class RemovedEntryHolder { |
| /** */ |
| private final int cacheId; |
| |
| /** Cache key */ |
| private final KeyCacheObject key; |
| |
| /** Entry version */ |
| private final GridCacheVersion ver; |
| |
| /** Entry expire time. */ |
| private final long expireTime; |
| |
| /** |
| * @param cacheId Cache ID. |
| * @param key Key. |
| * @param ver Entry version. |
| * @param ttl TTL. |
| */ |
| private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) { |
| this.cacheId = cacheId; |
| this.key = key; |
| this.ver = ver; |
| |
| expireTime = U.currentTimeMillis() + ttl; |
| } |
| |
| /** |
| * @return Cache ID. |
| */ |
| int cacheId() { |
| return cacheId; |
| } |
| |
| /** |
| * @return Key. |
| */ |
| KeyCacheObject key() { |
| return key; |
| } |
| |
| /** |
| * @return Version. |
| */ |
| GridCacheVersion version() { |
| return ver; |
| } |
| |
| /** |
| * @return item expired time |
| */ |
| long expireTime() { |
| return expireTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(RemovedEntryHolder.class, this); |
| } |
| } |
| |
| /** |
| * Collects detailed info about the partition. |
| * |
| * @param buf Buffer. |
| */ |
| public void dumpDebugInfo(SB buf) { |
| GridDhtPartitionTopology top = grp.topology(); |
| AffinityTopologyVersion topVer = top.readyTopologyVersion(); |
| |
| if (!topVer.initialized()) { |
| buf.a(toString()); |
| |
| return; |
| } |
| |
| final int limit = 3; |
| |
| buf.a("[topVer=").a(topVer); |
| buf.a(", lastChangeTopVer=").a(top.lastTopologyChangeVersion()); |
| buf.a(", waitRebalance=").a(ctx.kernalContext().cache().context().affinity().waitRebalance(grp.groupId(), id)); |
| buf.a(", nodes=").a(F.nodeIds(top.nodes(id, topVer)).stream().limit(limit).collect(Collectors.toList())); |
| buf.a(", locPart=").a(toString()); |
| |
| NavigableSet<AffinityTopologyVersion> versions = grp.affinity().cachedVersions(); |
| |
| int i = 5; |
| |
| Iterator<AffinityTopologyVersion> iter = versions.descendingIterator(); |
| |
| while (--i >= 0 && iter.hasNext()) { |
| AffinityTopologyVersion topVer0 = iter.next(); |
| buf.a(", ver").a(i).a('=').a(topVer0); |
| |
| Collection<UUID> nodeIds = F.nodeIds(grp.affinity().cachedAffinity(topVer0).get(id)); |
| buf.a(", affOwners").a(i).a('=').a(nodeIds.stream().limit(limit).collect(Collectors.toList())); |
| } |
| |
| buf.a(']'); |
| } |
| } |