blob: d767bfb6e212096df61001d83cd8c1f6588eb323 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.pagemem.wal.record.delta.PartitionMetaStateRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheAdapter;
import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMapImpl;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntry;
import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridReservable;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionDemander;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader;
import org.apache.ignite.internal.processors.cache.extras.GridCacheObsoleteEntryExtras;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
import org.apache.ignite.internal.processors.cache.transactions.TxCounters;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.query.GridQueryRowCacheCleaner;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.collection.IntMap;
import org.apache.ignite.internal.util.collection.IntRWHashMap;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridIterator;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.util.deque.FastSizeDeque;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_REMOVED_ENTRIES_TTL;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_UNLOADED;
import static org.apache.ignite.internal.processors.cache.IgniteCacheOffheapManager.CacheDataStore;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager.EvictReason.CLEARING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.PartitionsEvictManager.EvictReason.EVICTION;
import static org.apache.ignite.internal.processors.cache.persistence.CheckpointState.FINISHED;
/**
* Key partition.
*/
public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable {
/** */
private static final GridCacheMapEntryFactory ENTRY_FACTORY = GridDhtCacheEntry::new;
/** Maximum size for delete queue. */
public static final int MAX_DELETE_QUEUE_SIZE = Integer.getInteger(IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE, 200_000);
/** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */
private static boolean forceTestCheckpointOnEviction = IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false);
/** ONLY FOR TEST PURPOSES: partition id where test checkpoint was enforced during eviction. */
static volatile Integer partWhereTestCheckpointEnforced;
/** Maximum size for {@link #rmvQueue}. */
private final int rmvQueueMaxSize;
/** Removed items TTL. */
private final long rmvdEntryTtl;
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static volatile IgniteLogger log;
/** Partition ID. */
private final int id;
/** State. 32 bits - size, 16 bits - reservations, 13 bits - reserved, 3 bits - GridDhtPartitionState. */
@GridToStringExclude
private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32);
/** Evict guard. Must be CASed to -1 only when partition state is EVICTED. */
@GridToStringExclude
private final AtomicInteger evictGuard = new AtomicInteger();
/** Rent future. */
@GridToStringExclude
private final GridFutureAdapter<?> rent;
/** Clear future. */
@GridToStringExclude
private final ClearFuture clearFuture;
/** */
@GridToStringExclude
private final GridCacheSharedContext ctx;
/** */
@GridToStringExclude
private final CacheGroupContext grp;
/** Create time. */
@GridToStringExclude
private final long createTime = U.currentTimeMillis();
/** */
@GridToStringExclude
private final IntMap<CacheMapHolder> cacheMaps;
/** */
@GridToStringExclude
private final CacheMapHolder singleCacheEntryMap;
/** Remove queue. */
@GridToStringExclude
private final FastSizeDeque<RemovedEntryHolder> rmvQueue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
/** Group reservations. */
@GridToStringExclude
private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
/** */
@GridToStringExclude
private volatile CacheDataStore store;
/** Set if failed to move partition to RENTING state due to reservations, to be checked when
* reservation is released. */
private volatile long delayedRentingTopVer;
/** Set if topology update sequence should be updated on partition destroy. */
private boolean updateSeqOnDestroy;
/**
* @param ctx Context.
* @param grp Cache group.
* @param id Partition ID.
* @param recovery Flag indicates that partition is created during recovery phase.
*/
public GridDhtLocalPartition(
GridCacheSharedContext ctx,
CacheGroupContext grp,
int id,
boolean recovery
) {
super(ENTRY_FACTORY);
this.id = id;
this.ctx = ctx;
this.grp = grp;
log = U.logger(ctx.kernalContext(), logRef, this);
if (grp.sharedGroup()) {
singleCacheEntryMap = null;
cacheMaps = new IntRWHashMap<>();
}
else {
singleCacheEntryMap = new CacheMapHolder(grp.singleCacheContext(), createEntriesMap());
cacheMaps = null;
}
rent = new GridFutureAdapter<Object>() {
@Override public String toString() {
return "PartitionRentFuture [part=" + GridDhtLocalPartition.this + ']';
}
};
clearFuture = new ClearFuture();
int delQueueSize = grp.systemCache() ? 100 :
Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
rmvQueueMaxSize = U.ceilPow2(delQueueSize);
rmvdEntryTtl = Long.getLong(IGNITE_CACHE_REMOVED_ENTRIES_TTL, 10_000);
try {
store = grp.offheap().createCacheDataStore(id);
// Log partition creation for further crash recovery purposes.
if (grp.walEnabled() && !recovery)
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0));
// Inject row cache cleaner on store creation
// Used in case the cache with enabled SqlOnheapCache is single cache at the cache group
if (ctx.kernalContext().query().moduleEnabled()) {
GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing()
.rowCacheCleaner(grp.groupId());
if (store != null && cleaner != null)
store.setRowCacheCleaner(cleaner);
}
}
catch (IgniteCheckedException e) {
// TODO ignite-db
throw new IgniteException(e);
}
if (log.isDebugEnabled())
log.debug("Partition has been created [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", state=" + state() + "]");
}
/**
* @return Entries map.
*/
private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> createEntriesMap() {
return new ConcurrentHashMap<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()),
0.75f,
Runtime.getRuntime().availableProcessors() * 2);
}
/** {@inheritDoc} */
@Override public int internalSize() {
if (grp.sharedGroup()) {
final AtomicInteger size = new AtomicInteger(0);
cacheMaps.forEach((key, hld) -> size.addAndGet(hld.map.size()));
return size.get();
}
return singleCacheEntryMap.map.size();
}
/** {@inheritDoc} */
@Override protected CacheMapHolder entriesMap(GridCacheContext cctx) {
if (grp.sharedGroup())
return cacheMapHolder(cctx);
return singleCacheEntryMap;
}
/** {@inheritDoc} */
@Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) {
return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap;
}
/**
* @param cctx Cache context.
* @return Map holder.
*/
private CacheMapHolder cacheMapHolder(GridCacheContext cctx) {
assert grp.sharedGroup();
CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed());
if (hld != null)
return hld;
CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = new CacheMapHolder(cctx, createEntriesMap()));
if (old != null)
hld = old;
return hld;
}
/**
* @return Data store.
*/
public CacheDataStore dataStore() {
return store;
}
/**
* Adds group reservation to this partition.
*
* @param r Reservation.
* @return {@code false} If such reservation already added.
*/
public boolean addReservation(GridDhtPartitionsReservation r) {
assert (getPartState(state.get())) != EVICTED : "we can reserve only active partitions";
assert (getReservations(state.get())) != 0 : "partition must be already reserved before adding group reservation";
return reservations.addIfAbsent(r);
}
/**
* @param r Reservation.
*/
public void removeReservation(GridDhtPartitionsReservation r) {
if (!reservations.remove(r))
throw new IllegalStateException("Reservation was already removed.");
}
/**
* @return Partition ID.
*/
public int id() {
return id;
}
/**
* @return Create time.
*/
public long createTime() {
return createTime;
}
/**
* @return Partition state.
*/
public GridDhtPartitionState state() {
return getPartState(state.get());
}
/**
* @return Reservations.
*/
public int reservations() {
return getReservations(state.get());
}
/**
* @return {@code True} if partition is empty.
*/
public boolean isEmpty() {
return store.isEmpty() && internalSize() == 0;
}
/**
* @return If partition is moving or owning or renting.
*/
public boolean valid() {
GridDhtPartitionState state = state();
return state == MOVING || state == OWNING || state == RENTING;
}
/**
* @param entry Entry to remove.
*/
public void onRemoved(GridDhtCacheEntry entry) {
assert entry.obsolete() : entry;
// Make sure to remove exactly this entry.
removeEntry(entry);
}
/**
* @param cacheId Cache ID.
* @param key Key.
* @param ver Version.
*/
private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap;
GridCacheMapEntry entry = hld != null ? hld.map.get(key) : null;
if (entry != null && entry.markObsoleteVersion(ver))
removeEntry(entry);
}
/**
* TODO FIXME Get rid of deferred delete queue https://issues.apache.org/jira/browse/IGNITE-11704
*/
public void cleanupRemoveQueue() {
if (state() == MOVING) {
if (rmvQueue.sizex() >= rmvQueueMaxSize) {
LT.warn(log, "Deletion queue cleanup for moving partition was delayed until rebalance is finished. " +
"[grpId=" + this.grp.groupId() +
", partId=" + id() +
", grpParts=" + this.grp.affinity().partitions() +
", maxRmvQueueSize=" + rmvQueueMaxSize + ']');
}
return;
}
while (rmvQueue.sizex() >= rmvQueueMaxSize) {
RemovedEntryHolder item = rmvQueue.pollFirst();
if (item != null)
removeVersionedEntry(item.cacheId(), item.key(), item.version());
}
if (!grp.isDrEnabled()) {
RemovedEntryHolder item = rmvQueue.peekFirst();
while (item != null && item.expireTime() < U.currentTimeMillis()) {
item = rmvQueue.pollFirst();
if (item == null)
break;
removeVersionedEntry(item.cacheId(), item.key(), item.version());
item = rmvQueue.peekFirst();
}
}
}
/**
* @param cacheId cacheId Cache ID.
* @param key Removed key.
* @param ver Removed version.
*/
public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
cleanupRemoveQueue();
rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl));
}
/**
* Reserves a partition so it won't be cleared or evicted.
*
* @return {@code True} if reserved.
*/
@Override public boolean reserve() {
while (true) {
long state = this.state.get();
if (getPartState(state) == EVICTED)
return false;
long newState = setReservations(state, getReservations(state) + 1);
if (this.state.compareAndSet(state, newState))
return true;
}
}
/**
* Releases previously reserved partition.
*/
@Override public void release() {
release0(0);
}
/** {@inheritDoc} */
@Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) {
if (grp.sharedGroup() && sizeChange != 0)
hld.size.addAndGet(sizeChange);
release0(sizeChange);
}
/**
* @param sizeChange Size change delta.
*/
private void release0(int sizeChange) {
while (true) {
long state = this.state.get();
int reservations = getReservations(state);
assert reservations > 0;
assert getPartState(state) != EVICTED : this;
long newState = setReservations(state, --reservations);
newState = setSize(newState, getSize(newState) + sizeChange);
assert getSize(newState) == getSize(state) + sizeChange;
// Decrement reservations.
if (this.state.compareAndSet(state, newState)) {
// If no more reservations try to continue delayed renting.
if (reservations == 0) {
if (delayedRentingTopVer != 0 &&
// Prevents delayed renting on topology which expects ownership.
delayedRentingTopVer == ctx.exchange().readyAffinityVersion().topologyVersion())
rent(true);
else if (getPartState(state) == RENTING)
tryContinueClearing();
}
return;
}
}
}
/**
* @param stateToRestore State to restore.
*/
public void restoreState(GridDhtPartitionState stateToRestore) {
state.set(setPartState(state.get(), stateToRestore));
}
/**
* For testing purposes only.
* @param toState State to set.
*/
public void setState(GridDhtPartitionState toState) {
if (grp.persistenceEnabled() && grp.walEnabled()) {
synchronized (this) {
long state0 = state.get();
this.state.compareAndSet(state0, setPartState(state0, toState));
try {
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, 0));
}
catch (IgniteCheckedException e) {
U.error(log, "Error while writing to log", e);
}
}
}
else
restoreState(toState);
}
/**
* @param state Current aggregated value.
* @param toState State to switch to.
* @return {@code true} if cas succeeds.
*/
private boolean casState(long state, GridDhtPartitionState toState) {
if (grp.persistenceEnabled() && grp.walEnabled()) {
synchronized (this) {
GridDhtPartitionState prevState = state();
boolean update = this.state.compareAndSet(state, setPartState(state, toState));
if (update) {
assert toState != EVICTED || reservations() == 0 : this;
try {
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, 0));
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to log partition state change to WAL.", e);
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
}
if (log.isDebugEnabled())
log.debug("Partition changed state [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]");
}
return update;
}
}
else {
GridDhtPartitionState prevState = state();
boolean update = this.state.compareAndSet(state, setPartState(state, toState));
if (update) {
assert toState != EVICTED || reservations() == 0 : this;
if (log.isDebugEnabled())
log.debug("Partition changed state [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]");
}
return update;
}
}
/**
* @return {@code True} if transitioned to OWNING state.
*/
public boolean own() {
while (true) {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
if (partState == RENTING || partState == EVICTED)
return false;
if (partState == OWNING)
return true;
assert partState == MOVING || partState == LOST;
if (casState(state, OWNING))
return true;
}
}
/**
* Forcibly moves partition to a MOVING state.
*/
public void moving() {
while (true) {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
assert partState == OWNING || partState == RENTING : "Only partitions in state OWNING or RENTING can be moved to MOVING state";
if (casState(state, MOVING))
break;
}
}
/**
* @return {@code True} if partition state changed.
*/
public boolean markLost() {
while (true) {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
if (partState == LOST)
return false;
if (casState(state, LOST))
return true;
}
}
/**
* Initiates partition eviction process.
*
* If partition has reservations, eviction will be delayed and continued after all reservations will be released.
*
* @param updateSeq If {@code true} topology update sequence will be updated after eviction is finished.
* @return Future to signal that this node is no longer an owner or backup.
*/
public IgniteInternalFuture<?> rent(boolean updateSeq) {
return rent(updateSeq, true);
}
/**
* Initiates partition eviction process.
*
* If partition has reservations, eviction will be delayed and continued after all reservations will be released.
*
* @param updateSeq If {@code true} topology update sequence will be updated after eviction is finished.
* @param alwaysReturnRentingFut If {@code true} renting future is returned in any way.
* @return Future to signal that this node is no longer an owner or backup or null if corresponding partition
* state is {@code RENTING} or {@code EVICTED}.
*/
public IgniteInternalFuture<?> rent(boolean updateSeq, boolean alwaysReturnRentingFut) {
long state0 = this.state.get();
GridDhtPartitionState partState = getPartState(state0);
if (partState == RENTING || partState == EVICTED)
return alwaysReturnRentingFut ? rent : null;
delayedRentingTopVer = ctx.exchange().readyAffinityVersion().topologyVersion();
if (getReservations(state0) == 0 && casState(state0, RENTING)) {
delayedRentingTopVer = 0;
// Evict asynchronously, as the 'rent' method may be called
// from within write locks on local partition.
clearAsync0(updateSeq);
}
return rent;
}
/**
* Starts clearing process asynchronously if it's requested and not running at the moment.
* Method may finish clearing process ahead of time if partition is empty and doesn't have reservations.
*
* @param updateSeq Update sequence.
*/
private void clearAsync0(boolean updateSeq) {
// Method expected to be called from exchange worker or rebalancing thread when rebalancing is done.
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
boolean evictionRequested = partState == RENTING;
boolean clearingRequested = partState == MOVING;
if (!evictionRequested && !clearingRequested)
return;
boolean reinitialized = clearFuture.initialize(updateSeq, evictionRequested);
// Clearing process is already running at the moment. No need to run it again.
if (!reinitialized)
return;
// Reset the initial update counter value to prevent historical rebalancing on this partition.
if (grp.persistenceEnabled())
store.resetInitialUpdateCounter();
// Make sure current rebalance future is finished before start clearing
// to avoid clearing currently rebalancing partition (except "initial" dummy rebalance).
if (clearingRequested) {
GridDhtPartitionDemander.RebalanceFuture rebFut =
(GridDhtPartitionDemander.RebalanceFuture)grp.preloader().rebalanceFuture();
if (!rebFut.isInitial() && !rebFut.isDone()) {
rebFut.listen(fut -> {
// Partition could be owned after rebalance future is done. Skip clearing in such case.
// Otherwise continue clearing.
if (fut.error() == null && state() == MOVING) {
if (freeAndEmpty(state) && !grp.queriesEnabled() && !groupReserved()) {
clearFuture.finish();
return;
}
ctx.evict().evictPartitionAsync(grp, this, CLEARING);
}
});
return;
}
}
// Try fast eviction.
if (freeAndEmpty(state) && !grp.queriesEnabled() && !groupReserved()) {
if (partState == RENTING && casState(state, EVICTED) || clearingRequested) {
clearFuture.finish();
if (state() == EVICTED && markForDestroy()) {
updateSeqOnDestroy = updateSeq;
destroy();
}
if (log.isDebugEnabled() && evictionRequested)
log.debug("Partition has been fast evicted [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", state=" + state() + "]");
return;
}
}
ctx.evict().evictPartitionAsync(grp, this, EVICTION);
}
/**
* Initiates single clear process if partition is in MOVING state or continues cleaning for RENTING state.
* Method does nothing if clear process is already running.
*
* IMPORTANT: if clearing is required when after return from method call clear future must be initialized.
* This enforces clearing happens before sending demand requests.
*/
public void clearAsync() {
GridDhtPartitionState state0 = state();
if (state0 != MOVING && state0 != RENTING)
return;
clearAsync0(false);
}
/**
* Continues delayed clearing of partition if possible.
* Clearing may be delayed because of existing reservations.
*/
public void tryContinueClearing() {
clearAsync0(true);
}
/**
* @return {@code true} If there is a group reservation.
*/
private boolean groupReserved() {
for (GridDhtPartitionsReservation reservation : reservations) {
if (!reservation.invalidate())
return true; // Failed to invalidate reservation -> we are reserved.
}
return false;
}
/**
* @param state State.
* @return {@code True} if partition has no reservations and empty.
*/
private boolean freeAndEmpty(long state) {
return isEmpty() && getSize(state) == 0 && getReservations(state) == 0;
}
/**
* @return {@code True} if evicting thread was added.
*/
private boolean addEvicting() {
return evictGuard.compareAndSet(0, 1);
}
/**
* Clears evicting flag.
*/
private void clearEvicting() {
evictGuard.set(0);
}
/**
* @return {@code True} if partition is marked for destroy.
*/
public boolean markForDestroy() {
return evictGuard.compareAndSet(0, -1);
}
/**
* Moves partition state to {@code EVICTED} if possible.
* and initiates partition destroy process after successful moving partition state to {@code EVICTED} state.
*
* @param updateSeq If {@code true} increment update sequence on cache group topology after successful eviction.
*/
private void finishEviction(boolean updateSeq) {
long state0 = this.state.get();
GridDhtPartitionState state = getPartState(state0);
// Some entries still might be present in partition cache maps due to concurrent updates on backup nodes,
// but it's safe to finish eviction because no physical updates are possible.
if (state == EVICTED ||
(store.isEmpty() && getReservations(state0) == 0 && state == RENTING && casState(state0, EVICTED)))
updateSeqOnDestroy = updateSeq;
}
/**
* Destroys partition data store and invokes appropriate callbacks.
*/
public void destroy() {
assert state() == EVICTED : this;
assert reservations() == 0;
assert evictGuard.get() == -1;
grp.onPartitionEvicted(id);
destroyCacheDataStore();
rent.onDone();
((GridDhtPreloader)grp.preloader()).onPartitionEvicted(this, updateSeqOnDestroy);
clearDeferredDeletes();
}
/**
* Awaits completion of partition destroy process in case of {@code EVICTED} partition state.
*/
public void awaitDestroy() {
if (state() != EVICTED)
return;
final long timeout = 10_000;
for (;;) {
try {
rent.get(timeout);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
U.warn(log, "Failed to await partition destroy within timeout " + this);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to await partition destroy " + this, e);
}
}
}
/**
* Adds listener on {@link #clearFuture} finish.
*
* @param lsnr Listener.
*/
public void onClearFinished(IgniteInClosure<? super IgniteInternalFuture<?>> lsnr) {
clearFuture.listen(lsnr);
}
/**
* @return {@code True} if clearing process is running at the moment on the partition.
*/
public boolean isClearing() {
return !clearFuture.isDone();
}
/**
* Tries to start partition clear process {@link GridDhtLocalPartition#clearAll(EvictionContext)}).
* Only one thread is allowed to do such process concurrently.
* At the end of clearing method completes {@code clearFuture}.
*
* @param evictionCtx Eviction context.
*
* @return {@code false} if clearing is not started due to existing reservations.
* @throws NodeStoppingException If node is stopping.
*/
public boolean tryClear(EvictionContext evictionCtx) throws NodeStoppingException {
if (clearFuture.isDone())
return true;
long state = this.state.get();
if (getReservations(state) != 0 || groupReserved())
return false;
if (addEvicting()) {
try {
// Attempt to evict partition entries from cache.
long clearedEntities = clearAll(evictionCtx);
if (log.isDebugEnabled())
log.debug("Partition has been cleared [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", state=" + state() + ", clearedCnt=" + clearedEntities + "]");
}
catch (NodeStoppingException e) {
clearFuture.finish(e);
throw e;
}
finally {
clearEvicting();
clearFuture.finish();
}
}
return true;
}
/**
* Release created data store for this partition.
*/
private void destroyCacheDataStore() {
try {
grp.offheap().destroyCacheDataStore(dataStore());
}
catch (IgniteCheckedException e) {
log.error("Unable to destroy cache data store on partition eviction [id=" + id + "]", e);
}
}
/**
* On partition unlock callback.
* Tries to continue delayed partition clearing.
*/
public void onUnlock() {
// No-op.
}
/**
* @param topVer Topology version.
* @return {@code True} if local node is primary for this partition.
*/
public boolean primary(AffinityTopologyVersion topVer) {
List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
}
/**
* @param topVer Topology version.
* @return {@code True} if local node is backup for this partition.
*/
public boolean backup(AffinityTopologyVersion topVer) {
List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
return nodes.indexOf(ctx.localNode()) > 0;
}
/**
* Returns new update counter for primary node or passed counter for backup node.
* <p>
* Used for non-tx cases.
* <p>
* Counter generation/update logic is delegated to counter implementation.
*
* @param cacheId ID of cache initiated counter update.
* @param topVer Topology version for current operation.
* @param init {@code True} if initial update.
* @return Next update index.
*/
public long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, boolean init,
@Nullable Long primaryCntr) {
long nextCntr;
if (primaryCntr == null) // Primary node.
nextCntr = store.nextUpdateCounter();
else {
assert !init : "Initial update must generate a counter for partition " + this;
// Backup.
assert primaryCntr != 0;
store.updateCounter(nextCntr = primaryCntr);
}
if (grp.sharedGroup())
grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer, primary);
return nextCntr;
}
/**
* Used for transactions.
*
* @param cacheId Cache id.
* @param tx Tx.
* @param primaryCntr Primary counter.
*/
public long nextUpdateCounter(int cacheId, IgniteInternalTx tx, @Nullable Long primaryCntr) {
Long nextCntr;
if (primaryCntr != null)
nextCntr = primaryCntr;
else {
TxCounters txCounters = tx.txCounters(false);
assert txCounters != null : "Must have counters for tx [nearXidVer=" + tx.nearXidVersion() + ']';
// Null must never be returned on primary node.
nextCntr = txCounters.generateNextCounter(cacheId, id());
assert nextCntr != null : this;
}
if (grp.sharedGroup())
grp.onPartitionCounterUpdate(cacheId, id, nextCntr, tx.topologyVersion(), tx.local());
return nextCntr;
}
/**
* @return Current update counter (LWM).
*/
public long updateCounter() {
return store.updateCounter();
}
/**
* @return Current reserved counter (HWM).
*/
public long reservedCounter() {
return store.reservedCounter();
}
/**
* @param val Update counter value.
*/
public void updateCounter(long val) {
store.updateCounter(val);
}
/**
* @return Initial update counter.
*/
public long initialUpdateCounter() {
return store.initialUpdateCounter();
}
/**
* Increments cache update counter on primary node.
*
* @param delta Value to be added to update counter.
* @return Update counter value before update.
*/
public long getAndIncrementUpdateCounter(long delta) {
return store.getAndIncrementUpdateCounter(delta);
}
/**
* Updates MVCC cache update counter on backup node.
*
* @param start Start position
* @param delta Delta.
*/
public boolean updateCounter(long start, long delta) {
return store.updateCounter(start, delta);
}
/**
* Reset partition update counter.
*/
public void resetUpdateCounter() {
store.resetUpdateCounter();
}
/**
* @return Total size of all caches.
*/
public long fullSize() {
return store.fullSize();
}
/**
* Removes all entries and rows from this partition.
*
* @return Number of rows cleared from page memory.
* @throws NodeStoppingException If node stopping.
*/
private long clearAll(EvictionContext evictionCtx) throws NodeStoppingException {
GridCacheVersion clearVer = ctx.versions().next();
GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
if (grp.sharedGroup())
cacheMaps.forEach((key, hld) -> clear(hld.map, extras, rec));
else
clear(singleCacheEntryMap.map, extras, rec);
long cleared = 0;
final int stopCheckingFreq = 1000;
CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
try {
GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
while (it0.hasNext()) {
ctx.database().checkpointReadLock();
try {
CacheDataRow row = it0.next();
// Do not clear fresh rows in case of partition reloading.
// This is required because normal updates are possible to moving partition which is currently cleared.
if (row.version().compareTo(clearVer) >= 0 && state() == MOVING)
continue;
if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
assert hld != null;
GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
hld,
hld.cctx,
grp.affinity().lastVersion(),
row.key(),
true,
false);
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(clearVer, extras)) {
removeEntry(cached);
if (rec && !hld.cctx.config().isEventsDisabled()) {
hld.cctx.events().addEvent(cached.partition(),
cached.key(),
ctx.localNodeId(),
null,
null,
null,
EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
null,
false,
cached.rawGet(),
cached.hasValue(),
null,
null,
null,
false);
}
cleared++;
}
// For each 'stopCheckingFreq' cleared entities check clearing process to stop.
if (cleared % stopCheckingFreq == 0 && evictionCtx.shouldStop())
return cleared;
}
catch (GridDhtInvalidPartitionException e) {
assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
break; // Partition is already concurrently cleared and evicted.
}
finally {
ctx.database().checkpointReadUnlock();
}
}
if (forceTestCheckpointOnEviction) {
if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) {
ctx.database().forceCheckpoint("test").futureFor(FINISHED).get();
log.warning("Forced checkpoint by test reasons for partition: " + this);
partWhereTestCheckpointEnforced = id;
}
}
}
catch (NodeStoppingException e) {
if (log.isDebugEnabled())
log.debug("Failed to get iterator for evicted partition: " + id);
throw e;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to get iterator for evicted partition: " + id, e);
}
return cleared;
}
/**
* Removes all cache entries from specified {@code map}.
*
* @param map Map to clear.
* @param extras Obsolete extras.
* @param evt Unload event flag.
* @throws NodeStoppingException If current node is stopping.
*/
private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
GridCacheObsoleteEntryExtras extras,
boolean evt) throws NodeStoppingException {
Iterator<GridCacheMapEntry> it = map.values().iterator();
while (it.hasNext()) {
GridCacheMapEntry cached = null;
ctx.database().checkpointReadLock();
try {
cached = it.next();
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) {
removeEntry(cached);
if (!cached.isInternal()) {
if (evt) {
grp.addCacheEvent(cached.partition(),
cached.key(),
ctx.localNodeId(),
EVT_CACHE_REBALANCE_OBJECT_UNLOADED,
null,
false,
cached.rawGet(),
cached.hasValue(),
false);
}
}
}
}
catch (GridDhtInvalidPartitionException e) {
assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
break; // Partition is already concurrently cleared and evicted.
}
catch (NodeStoppingException e) {
if (log.isDebugEnabled())
log.debug("Failed to clear cache entry for evicted partition: " + cached.partition());
throw e;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e);
}
finally {
ctx.database().checkpointReadUnlock();
}
}
}
/**
* Removes all deferred delete requests from {@code rmvQueue}.
*/
private void clearDeferredDeletes() {
for (RemovedEntryHolder e : rmvQueue)
removeVersionedEntry(e.cacheId(), e.key(), e.version());
}
/** {@inheritDoc} */
@Override public int hashCode() {
return 31 * id + grp.groupId();
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
GridDhtLocalPartition part = (GridDhtLocalPartition)o;
return id == part.id && grp.groupId() == part.group().groupId();
}
/** {@inheritDoc} */
@Override public int compareTo(@NotNull GridDhtLocalPartition part) {
if (part == null)
return 1;
return Integer.compare(id, part.id());
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtLocalPartition.class, this,
"grp", grp.cacheOrGroupName(),
"state", state(),
"reservations", reservations(),
"empty", isEmpty(),
"createTime", U.format(createTime),
"fullSize", fullSize(),
"cntr", dataStore().partUpdateCounter());
}
/** {@inheritDoc} */
@Override public int publicSize(int cacheId) {
if (grp.sharedGroup()) {
CacheMapHolder hld = cacheMaps.get(cacheId);
return hld != null ? hld.size.get() : 0;
}
return getSize(state.get());
}
/** {@inheritDoc} */
@Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
if (grp.sharedGroup()) {
if (hld == null)
hld = cacheMapHolder(e.context());
hld.size.incrementAndGet();
}
while (true) {
long state = this.state.get();
if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1)))
return;
}
}
/** {@inheritDoc} */
@Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
if (grp.sharedGroup()) {
if (hld == null)
hld = cacheMapHolder(e.context());
hld.size.decrementAndGet();
}
while (true) {
long state = this.state.get();
assert getPartState(state) != EVICTED;
if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1)))
return;
}
}
/**
* Returns group context.
*
* @return Group context.
*/
public CacheGroupContext group() {
return grp;
}
/**
* @param cacheId Cache ID.
*/
public void onCacheStopped(int cacheId) {
assert grp.sharedGroup() : grp.cacheOrGroupName();
for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) {
RemovedEntryHolder e = it.next();
if (e.cacheId() == cacheId)
it.remove();
}
cacheMaps.remove(cacheId);
}
/**
* @param state Composite state.
* @return Partition state.
*/
private static GridDhtPartitionState getPartState(long state) {
return GridDhtPartitionState.fromOrdinal((int)(state & (0x0000000000000007L)));
}
/**
* @param state Composite state to update.
* @param partState Partition state.
* @return Updated composite state.
*/
private static long setPartState(long state, GridDhtPartitionState partState) {
return (state & (~0x0000000000000007L)) | partState.ordinal();
}
/**
* @param state Composite state.
* @return Reservations.
*/
private static int getReservations(long state) {
return (int)((state & 0x00000000FFFF0000L) >> 16);
}
/**
* @param state Composite state to update.
* @param reservations Reservations to set.
* @return Updated composite state.
*/
private static long setReservations(long state, int reservations) {
return (state & (~0x00000000FFFF0000L)) | (reservations << 16);
}
/**
* @param state Composite state.
* @return Size.
*/
private static int getSize(long state) {
return (int)((state & 0xFFFFFFFF00000000L) >> 32);
}
/**
* @param state Composite state to update.
* @param size Size to set.
* @return Updated composite state.
*/
private static long setSize(long state, int size) {
return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32);
}
/**
* Flushes pending update counters closing all possible gaps.
*
* @return Even-length array of pairs [start, end] for each gap.
*/
public GridLongList finalizeUpdateCounters() {
return store.finalizeUpdateCounters();
}
/**
* Called before next batch is about to be applied during rebalance. Currently used for tests.
*
* @param last {@code True} if last batch for partition.
*/
public void beforeApplyBatch(boolean last) {
// No-op.
}
/**
* Removed entry holder.
*/
private static class RemovedEntryHolder {
/** */
private final int cacheId;
/** Cache key */
private final KeyCacheObject key;
/** Entry version */
private final GridCacheVersion ver;
/** Entry expire time. */
private final long expireTime;
/**
* @param cacheId Cache ID.
* @param key Key.
* @param ver Entry version.
* @param ttl TTL.
*/
private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) {
this.cacheId = cacheId;
this.key = key;
this.ver = ver;
expireTime = U.currentTimeMillis() + ttl;
}
/**
* @return Cache ID.
*/
int cacheId() {
return cacheId;
}
/**
* @return Key.
*/
KeyCacheObject key() {
return key;
}
/**
* @return Version.
*/
GridCacheVersion version() {
return ver;
}
/**
* @return item expired time
*/
long expireTime() {
return expireTime;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RemovedEntryHolder.class, this);
}
}
/**
* Future is needed to control partition clearing process.
* Future can be used both for single clearing or eviction processes.
*/
class ClearFuture extends GridFutureAdapter<Boolean> {
/** Flag indicates that eviction callback was registered on the current future. */
private volatile boolean evictionCbRegistered;
/** Flag indicates that clearing callback was registered on the current future. */
private volatile boolean clearingCbRegistered;
/** Flag indicates that future with all callbacks was finished. */
private volatile boolean finished;
/**
* Constructor.
*/
ClearFuture() {
onDone();
finished = true;
}
/**
* Registers finish eviction callback on the future.
*
* @param updateSeq If {@code true} update topology sequence after successful eviction.
*/
private void registerEvictionCallback(boolean updateSeq) {
if (evictionCbRegistered)
return;
synchronized (this) {
// Double check
if (evictionCbRegistered)
return;
evictionCbRegistered = true;
// Initiates partition eviction and destroy.
listen(f -> {
try {
// Check for errors.
f.get();
finishEviction(updateSeq);
}
catch (Exception e) {
rent.onDone(e);
}
evictionCbRegistered = false;
});
}
}
/**
* Registers clearing callback on the future.
*/
private void registerClearingCallback() {
if (clearingCbRegistered)
return;
synchronized (this) {
// Double check
if (clearingCbRegistered)
return;
clearingCbRegistered = true;
// Recreate cache data store in case of allowed fast eviction, and reset clear flag.
listen(f -> {
clearingCbRegistered = false;
});
}
}
/**
* Successfully finishes the future.
*/
public void finish() {
synchronized (this) {
onDone();
finished = true; // Marks state when all future listeners are finished.
}
}
/**
* Finishes the future with error.
*
* @param t Error.
*/
public void finish(Throwable t) {
synchronized (this) {
onDone(t);
finished = true;
}
}
/**
* Reuses future if it's done.
* Adds appropriate callbacks to the future in case of eviction or single clearing.
*
* @param updateSeq Update sequence.
* @param evictionRequested If {@code true} adds eviction callback, in other case adds single clearing callback.
* @return {@code true} if future has been reinitialized.
*/
public boolean initialize(boolean updateSeq, boolean evictionRequested) {
// In case of running clearing just try to add missing callbacks to avoid extra synchronization.
if (!finished) {
if (evictionRequested)
registerEvictionCallback(updateSeq);
else
registerClearingCallback();
return false;
}
synchronized (this) {
boolean done = isDone();
if (done) {
reset();
finished = false;
evictionCbRegistered = false;
clearingCbRegistered = false;
}
if (evictionRequested)
registerEvictionCallback(updateSeq);
else
registerClearingCallback();
return done;
}
}
}
}