* Key partition.
public class GridDhtLocalPartition extends GridCacheConcurrentMapImpl implements Comparable<GridDhtLocalPartition>, GridReservable {
/** */
private static final GridCacheMapEntryFactory ENTRY_FACTORY = GridDhtCacheEntry::new;
/** @see IgniteSystemProperties#IGNITE_ATOMIC_CACHE_DELETE_HISTORY_SIZE */
public static final int DFLT_ATOMIC_CACHE_DELETE_HISTORY_SIZE = 200_000;
/** Maximum size for delete queue. */
public static final int MAX_DELETE_QUEUE_SIZE =
/** @see IgniteSystemProperties#IGNITE_CACHE_REMOVED_ENTRIES_TTL */
public static final int DFLT_CACHE_REMOVE_ENTRIES_TTL = 10_000;
/** ONLY FOR TEST PURPOSES: force test checkpoint on partition eviction. */
private static boolean forceTestCheckpointOnEviction = IgniteSystemProperties.getBoolean("TEST_CHECKPOINT_ON_EVICTION", false);
/** ONLY FOR TEST PURPOSES: partition id where test checkpoint was enforced during eviction. */
static volatile Integer partWhereTestCheckpointEnforced;
/** Maximum size for {@link #rmvQueue}. */
private final int rmvQueueMaxSize;
/** Removed items TTL. */
private final long rmvdEntryTtl;
/** Static logger to avoid re-creation. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static volatile IgniteLogger log;
/** Partition ID. */
private final int id;
/** State. 32 bits - size, 16 bits - reservations, 13 bits - reserved, 3 bits - GridDhtPartitionState. */
private final AtomicLong state = new AtomicLong((long)MOVING.ordinal() << 32);
/** Rent future. */
private final GridFutureAdapter<?> rent;
/** */
private final GridCacheSharedContext ctx;
/** */
private final CacheGroupContext grp;
/** Create time. */
private final long createTime = U.currentTimeMillis();
/** */
private final IntMap<CacheMapHolder> cacheMaps;
/** */
private final CacheMapHolder singleCacheEntryMap;
/** Remove queue. */
private final FastSizeDeque<RemovedEntryHolder> rmvQueue = new FastSizeDeque<>(new ConcurrentLinkedDeque<>());
/** Group reservations. */
private final CopyOnWriteArrayList<GridDhtPartitionsReservation> reservations = new CopyOnWriteArrayList<>();
/** */
private volatile CacheDataStore store;
/** Set if failed to move partition to RENTING state due to reservations, to be checked when
* reservation is released. */
private volatile boolean delayedRenting;
/** */
private final AtomicReference<GridFutureAdapter<?>> finishFutRef = new AtomicReference<>();
/** */
private volatile long clearVer;
* @param ctx Context.
* @param grp Cache group.
* @param id Partition ID.
* @param recovery Flag indicates that partition is created during recovery phase.
public GridDhtLocalPartition(
GridCacheSharedContext ctx,
CacheGroupContext grp,
int id,
boolean recovery
) {
super(ENTRY_FACTORY); = id;
this.ctx = ctx;
this.grp = grp;
log = U.logger(ctx.kernalContext(), logRef, this);
if (grp.sharedGroup()) {
singleCacheEntryMap = null;
cacheMaps = new IntRWHashMap<>();
else {
GridCacheContext cctx = grp.singleCacheContext();
if (cctx.isNear())
cctx = cctx.near().dht().context();
singleCacheEntryMap = ctx.kernalContext().resource().resolve(
new CacheMapHolder(cctx, createEntriesMap()));
cacheMaps = null;
rent = new GridFutureAdapter<Object>() {
@Override public String toString() {
return "PartitionRentFuture [part=" + GridDhtLocalPartition.this + ']';
int delQueueSize = grp.systemCache() ? 100 :
Math.max(MAX_DELETE_QUEUE_SIZE / grp.affinity().partitions(), 20);
rmvQueueMaxSize = U.ceilPow2(delQueueSize);
try {
store = grp.offheap().createCacheDataStore(id);
// Log partition creation for further crash recovery purposes.
if (grp.walEnabled() && !recovery)
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, state(), 0));
// Inject row cache cleaner on store creation
// Used in case the cache with enabled SqlOnheapCache is single cache at the cache group
if (ctx.kernalContext().query().moduleEnabled()) {
GridQueryRowCacheCleaner cleaner = ctx.kernalContext().query().getIndexing()
if (store != null && cleaner != null)
catch (IgniteCheckedException e) {
// TODO ignite-db
throw new IgniteException(e);
if (log.isDebugEnabled())
log.debug("Partition has been created [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", state=" + state() + "]");
clearVer = ctx.versions().localOrder();
* @return Entries map.
private ConcurrentMap<KeyCacheObject, GridCacheMapEntry> createEntriesMap() {
return new ConcurrentHashMap<>(Math.max(10, GridCacheAdapter.DFLT_START_CACHE_SIZE / grp.affinity().partitions()),
Runtime.getRuntime().availableProcessors() * 2);
/** {@inheritDoc} */
@Override public int internalSize() {
if (grp.sharedGroup()) {
final AtomicInteger size = new AtomicInteger(0);
cacheMaps.forEach((key, hld) -> size.addAndGet(;
return size.get();
/** {@inheritDoc} */
@Override protected CacheMapHolder entriesMap(GridCacheContext cctx) {
if (grp.sharedGroup())
return cacheMapHolder(cctx);
return singleCacheEntryMap;
/** {@inheritDoc} */
@Nullable @Override protected CacheMapHolder entriesMapIfExists(Integer cacheId) {
return grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap;
* @param cctx Cache context.
* @return Map holder.
private CacheMapHolder cacheMapHolder(GridCacheContext cctx) {
assert grp.sharedGroup();
CacheMapHolder hld = cacheMaps.get(cctx.cacheIdBoxed());
if (hld != null)
return hld;
if (cctx.isNear())
cctx = cctx.near().dht().context();
CacheMapHolder old = cacheMaps.putIfAbsent(cctx.cacheIdBoxed(), hld = ctx.kernalContext().resource().resolve(
new CacheMapHolder(cctx, createEntriesMap())));
if (old != null)
hld = old;
return hld;
* @return Data store.
public CacheDataStore dataStore() {
return store;
* Adds group reservation to this partition.
* @param r Reservation.
* @return {@code false} If such reservation already added.
public boolean addReservation(GridDhtPartitionsReservation r) {
assert (getPartState(state.get())) != EVICTED : "we can reserve only active partitions";
assert (getReservations(state.get())) != 0 : "partition must be already reserved before adding group reservation";
return reservations.addIfAbsent(r);
* @param r Reservation.
public void removeReservation(GridDhtPartitionsReservation r) {
if (!reservations.remove(r))
throw new IllegalStateException("Reservation was already removed.");
* @return Partition ID.
public int id() {
return id;
* @return Create time.
public long createTime() {
return createTime;
* @return Partition state.
public GridDhtPartitionState state() {
return getPartState(state.get());
* @return Reservations.
public int reservations() {
return getReservations(state.get());
* @return {@code True} if partition is empty.
public boolean isEmpty() {
return store.isEmpty() && internalSize() == 0;
* @return If partition is moving or owning or renting.
public boolean valid() {
GridDhtPartitionState state = state();
return state == MOVING || state == OWNING || state == RENTING;
* @param entry Entry to remove.
public void onRemoved(GridDhtCacheEntry entry) {
assert entry.obsolete() : entry;
// Make sure to remove exactly this entry.
* @param cacheId Cache ID.
* @param key Key.
* @param ver Version.
private void removeVersionedEntry(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
CacheMapHolder hld = grp.sharedGroup() ? cacheMaps.get(cacheId) : singleCacheEntryMap;
GridCacheMapEntry entry = hld != null ? : null;
if (entry != null && entry.markObsoleteVersion(ver))
* TODO FIXME Get rid of deferred delete queue
void cleanupRemoveQueue() {
if (state() == MOVING) {
if (rmvQueue.sizex() >= rmvQueueMaxSize) {
LT.warn(log, "Deletion queue cleanup for moving partition was delayed until rebalance is finished. " +
"[grpId=" + this.grp.groupId() +
", partId=" + id() +
", grpParts=" + this.grp.affinity().partitions() +
", maxRmvQueueSize=" + rmvQueueMaxSize + ']');
while (rmvQueue.sizex() >= rmvQueueMaxSize) {
RemovedEntryHolder item = rmvQueue.pollFirst();
if (item != null)
removeVersionedEntry(item.cacheId(), item.key(), item.version());
if (!grp.isDrEnabled()) {
RemovedEntryHolder item = rmvQueue.peekFirst();
while (item != null && item.expireTime() < U.currentTimeMillis()) {
item = rmvQueue.pollFirst();
if (item == null)
removeVersionedEntry(item.cacheId(), item.key(), item.version());
item = rmvQueue.peekFirst();
* @param cacheId cacheId Cache ID.
* @param key Removed key.
* @param ver Removed version.
public void onDeferredDelete(int cacheId, KeyCacheObject key, GridCacheVersion ver) {
rmvQueue.add(new RemovedEntryHolder(cacheId, key, ver, rmvdEntryTtl));
* Reserves the partition so it won't be cleared or evicted.
* Only MOVING, OWNING and LOST partitions can be reserved.
* @return {@code True} if reserved.
@Override public boolean reserve() {
while (true) {
long state = this.state.get();
int ordinal = ordinal(state);
if (ordinal == RENTING.ordinal() || ordinal == EVICTED.ordinal())
return false;
long newState = setReservations(state, getReservations(state) + 1);
if (this.state.compareAndSet(state, newState))
return true;
* Releases previously reserved partition.
@Override public void release() {
/** {@inheritDoc} */
@Override protected void release(int sizeChange, CacheMapHolder hld, GridCacheEntryEx e) {
if (grp.sharedGroup() && sizeChange != 0)
* @param sizeChange Size change delta.
private void release0(int sizeChange) {
while (true) {
long state = this.state.get();
int reservations = getReservations(state);
assert reservations > 0;
assert getPartState(state) != EVICTED : this;
long newState = setReservations(state, --reservations);
newState = setSize(newState, getSize(newState) + sizeChange);
assert getSize(newState) == getSize(state) + sizeChange;
// Decrement reservations.
if (this.state.compareAndSet(state, newState)) {
// If no more reservations try to continue delayed renting.
if (reservations == 0)
* @param stateToRestore State to restore.
public void restoreState(GridDhtPartitionState stateToRestore) {
state.set(setPartState(state.get(), stateToRestore));
* For testing purposes only.
* @param toState State to set.
public void setState(GridDhtPartitionState toState) {
if (grp.persistenceEnabled() && grp.walEnabled()) {
synchronized (this) {
long state0 = state.get();
this.state.compareAndSet(state0, setPartState(state0, toState));
try {
ctx.wal().log(new PartitionMetaStateRecord(grp.groupId(), id, toState, 0));
catch (IgniteCheckedException e) {
U.error(log, "Error while writing to log", e);
* @param state Current aggregated value.
* @param toState State to switch to.
* @return {@code true} if cas succeeds.
private boolean casState(long state, GridDhtPartitionState toState) {
if (grp.persistenceEnabled() && grp.walEnabled()) {
synchronized (this) {
GridDhtPartitionState prevState = state();
boolean updated = this.state.compareAndSet(state, setPartState(state, toState));
if (updated) {
assert toState != EVICTED || reservations() == 0 : this;
try {
// Optimization: do not log OWNING -> OWNING.
if (prevState == OWNING && toState == LOST)
return true;
// Log LOST partitions as OWNING.
new PartitionMetaStateRecord(grp.groupId(), id, toState == LOST ? OWNING : toState, 0));
catch (IgniteCheckedException e) {
U.error(log, "Failed to log partition state change to WAL.", e);
ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, e));
if (log.isDebugEnabled())
log.debug("Partition changed state [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]");
return updated;
else {
GridDhtPartitionState prevState = state();
boolean updated = this.state.compareAndSet(state, setPartState(state, toState));
if (updated) {
assert toState != EVICTED || reservations() == 0 : this;
if (log.isDebugEnabled())
log.debug("Partition changed state [grp=" + grp.cacheOrGroupName()
+ ", p=" + id + ", prev=" + prevState + ", to=" + toState + "]");
return updated;
* @return {@code True} if transitioned to OWNING state.
public boolean own() {
while (true) {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
if (partState == RENTING || partState == EVICTED)
return false;
if (partState == OWNING)
return true;
assert partState == MOVING || partState == LOST;
if (casState(state, OWNING))
return true;
* Forcibly moves partition to a MOVING state.
* @return {@code True} if a partition was switched to MOVING state.
public boolean moving() {
while (true) {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
if (partState == EVICTED)
return false;
assert partState == OWNING || partState == RENTING :
"Only partitions in state OWNING or RENTING can be moved to MOVING state " + partState + " " + id;
if (casState(state, MOVING)) {
// The state is switched under global topology lock, safe to record version here.
clearVer = ctx.versions().localOrder();
return true;
* @return {@code True} if partition state changed.
public boolean markLost() {
while (true) {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
if (partState == LOST)
return false;
if (casState(state, LOST))
return true;
* Initiates partition eviction process and returns an eviction future.
* Future will be completed when a partition is moved to EVICTED state (possibly not yet physically deleted).
* If partition has reservations, eviction will be delayed and continued after all reservations will be released.
* @return Future to signal that this node is no longer an owner or backup or null if corresponding partition
* state is {@code RENTING} or {@code EVICTED}.
public IgniteInternalFuture<?> rent() {
long state0 = this.state.get();
GridDhtPartitionState partState = getPartState(state0);
if (partState == EVICTED)
return rent;
if (partState == RENTING) {
// If for some reason a partition has stuck in renting state try restart clearing.
if (finishFutRef.get() == null)
return rent;
if (tryInvalidateGroupReservations() && getReservations(state0) == 0 && casState(state0, RENTING)) {
// Evict asynchronously, as the 'rent' method may be called from within write locks on local partition.
delayedRenting = true;
return rent;
* Continue clearing if it was delayed before due to reservation and topology version not changed.
public void tryContinueClearing() {
if (delayedRenting)
* Initiates a partition clearing attempt.
* @return A future what will be finished then a current clearing attempt is done.
public IgniteInternalFuture<?> clearAsync() {
long state = this.state.get();
GridDhtPartitionState partState = getPartState(state);
boolean evictionRequested = partState == RENTING;
boolean clearingRequested = partState == MOVING;
if (!evictionRequested && !clearingRequested)
return new GridFinishedFuture<>();
GridFutureAdapter<?> finishFut = new GridFutureAdapter<>();
do {
GridFutureAdapter<?> curFut = finishFutRef.get();
if (curFut != null)
return curFut;
while (!finishFutRef.compareAndSet(null, finishFut));
finishFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> fut) {
// A partition cannot be reused after the eviction, it's not necessary to reset a clearing state.
if (state() == EVICTED)
// Evict partition asynchronously to avoid deadlocks.
ctx.evict().evictPartitionAsync(grp, this, finishFut);
return finishFut;
* Invalidates all partition group reservations, so they can't be reserved again any more.
* @return {@code true} If all group reservations are invalidated (or no such reservations).
private boolean tryInvalidateGroupReservations() {
for (GridDhtPartitionsReservation reservation : reservations) {
if (!reservation.invalidate())
return false; // Failed to invalidate reservation -> we are reserved.
return true;
* @param state State.
* @return {@code True} if partition has no reservations and empty.
private boolean freeAndEmpty(long state) {
return isEmpty() && getSize(state) == 0 && getReservations(state) == 0;
* Moves partition state to {@code EVICTED} if possible.
public void finishEviction() {
long state0 = this.state.get();
GridDhtPartitionState state = getPartState(state0);
// Some entries still might be present in partition cache maps due to concurrent updates on backup nodes,
// but it's safe to finish eviction because no physical updates are possible.
// A partition is promoted to EVICTED state if it is not reserved and empty.
if (store.isEmpty() && getReservations(state0) == 0 && state == RENTING)
casState(state0, EVICTED);
* @return {@code True} if clearing process is running at the moment on the partition.
public boolean isClearing() {
return finishFutRef.get() != null;
* On partition unlock callback.
* Tries to continue delayed partition clearing.
public void onUnlock() {
// No-op.
* @param topVer Topology version.
* @return {@code True} if local node is primary for this partition.
public boolean primary(AffinityTopologyVersion topVer) {
List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
return !nodes.isEmpty() && ctx.localNode().equals(nodes.get(0));
* @param topVer Topology version.
* @return {@code True} if local node is backup for this partition.
public boolean backup(AffinityTopologyVersion topVer) {
List<ClusterNode> nodes = grp.affinity().cachedAffinity(topVer).get(id);
return nodes.indexOf(ctx.localNode()) > 0;
* Returns new update counter for primary node or passed counter for backup node.
* <p>
* Used for non-tx cases.
* <p>
* Counter generation/update logic is delegated to counter implementation.
* @param cacheId ID of cache initiated counter update.
* @param topVer Topology version for current operation.
* @param init {@code True} if initial update.
* @return Next update index.
public long nextUpdateCounter(int cacheId, AffinityTopologyVersion topVer, boolean primary, boolean init,
@Nullable Long primaryCntr) {
long nextCntr;
if (primaryCntr == null) // Primary node.
nextCntr = store.nextUpdateCounter();
else {
assert !init : "Initial update must generate a counter for partition " + this;
// Backup.
assert primaryCntr != 0;
store.updateCounter(nextCntr = primaryCntr);
if (grp.sharedGroup())
grp.onPartitionCounterUpdate(cacheId, id, nextCntr, topVer, primary);
return nextCntr;
* Used for transactions.
* @param cacheId Cache id.
* @param tx Tx.
* @param primaryCntr Primary counter.
public long nextUpdateCounter(int cacheId, IgniteInternalTx tx, @Nullable Long primaryCntr) {
Long nextCntr;
if (primaryCntr != null)
nextCntr = primaryCntr;
else {
TxCounters txCounters = tx.txCounters(false);
assert txCounters != null : "Must have counters for tx [nearXidVer=" + tx.nearXidVersion() + ']';
// Null must never be returned on primary node.
nextCntr = txCounters.generateNextCounter(cacheId, id());
assert nextCntr != null : this;
if (grp.sharedGroup())
grp.onPartitionCounterUpdate(cacheId, id, nextCntr, tx.topologyVersion(), tx.local());
return nextCntr;
* @return Current update counter (LWM).
public long updateCounter() {
return store.updateCounter();
* @return Current reserved counter (HWM).
public long reservedCounter() {
return store.reservedCounter();
* @param val Update counter value.
public void updateCounter(long val) {
* @return Initial update counter.
public long initialUpdateCounter() {
return store.initialUpdateCounter();
* Increments cache update counter on primary node.
* @param delta Value to be added to update counter.
* @return Update counter value before update.
public long getAndIncrementUpdateCounter(long delta) {
return store.getAndIncrementUpdateCounter(delta);
* Updates MVCC cache update counter on backup node.
* @param start Start position
* @param delta Delta.
public boolean updateCounter(long start, long delta) {
return store.updateCounter(start, delta);
* Reset partition update counter.
public void resetUpdateCounter() {
* Reset partition initial update counter.
public void resetInitialUpdateCounter() {
* @return Total size of all caches.
public long fullSize() {
return store.fullSize();
* Removes all entries and rows from this partition.
* @return Number of rows cleared from page memory.
* @throws NodeStoppingException If node stopping.
protected long clearAll(EvictionContext evictionCtx) throws NodeStoppingException {
long order = clearVer;
GridCacheVersion clearVer = ctx.versions().startVersion();
GridCacheObsoleteEntryExtras extras = new GridCacheObsoleteEntryExtras(clearVer);
boolean rec = grp.eventRecordable(EVT_CACHE_REBALANCE_OBJECT_UNLOADED);
long cleared = 0;
int stopCntr = 0;
CacheMapHolder hld = grp.sharedGroup() ? null : singleCacheEntryMap;
try {
GridIterator<CacheDataRow> it0 = grp.offheap().partitionIterator(id);
while (it0.hasNext()) {
if ((stopCntr = (stopCntr + 1) & 1023) == 0 && evictionCtx.shouldStop())
return cleared;
try {
CacheDataRow row =;
// Do not clear fresh rows in case of partition reloading.
// This is required because normal updates are possible to moving partition which is currently cleared.
// We can clean OWNING partition if a partition has been reset from lost state.
// In this case new updates must be preserved.
// Partition state can be switched from RENTING to MOVING and vice versa during clearing.
long order0 = row.version().order();
if (state() == MOVING && (order0 == 0 /** Inserted by isolated updater. */ || order0 > order))
if (grp.sharedGroup() && (hld == null || hld.cctx.cacheId() != row.cacheId()))
hld = cacheMapHolder(ctx.cacheContext(row.cacheId()));
assert hld != null;
GridCacheMapEntry cached = putEntryIfObsoleteOrAbsent(
assert cached != null : "Expecting the reservation " + this;
if (cached.deleted())
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry) cached).clearInternal(clearVer, extras)) {
if (rec && !hld.cctx.config().isEventsDisabled()) {,
catch (GridDhtInvalidPartitionException e) {
assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
break; // Partition is already concurrently cleared and evicted.
finally {
if (forceTestCheckpointOnEviction) {
if (partWhereTestCheckpointEnforced == null && cleared >= fullSize()) {
log.warning("Forced checkpoint by test reasons for partition: " + this);
partWhereTestCheckpointEnforced = id;
// Attempt to destroy.
catch (NodeStoppingException e) {
if (log.isDebugEnabled())
log.debug("Failed to get iterator for evicted partition: " + id);
throw e;
catch (IgniteCheckedException e) {
U.error(log, "Failed to get iterator for evicted partition: " + id, e);
return cleared;
* Removes all cache entries from specified {@code map}.
* @param map Map to clear.
* @param extras Obsolete extras.
* @param evt Unload event flag.
* @throws NodeStoppingException If current node is stopping.
private void clear(ConcurrentMap<KeyCacheObject, GridCacheMapEntry> map,
GridCacheObsoleteEntryExtras extras,
boolean evt) throws NodeStoppingException {
Iterator<GridCacheMapEntry> it = map.values().iterator();
while (it.hasNext()) {
GridCacheMapEntry cached = null;
try {
cached =;
if (cached instanceof GridDhtCacheEntry && ((GridDhtCacheEntry)cached).clearInternal(extras.obsoleteVersion(), extras)) {
if (!cached.isInternal()) {
if (evt) {
catch (GridDhtInvalidPartitionException e) {
assert isEmpty() && state() == EVICTED : "Invalid error [e=" + e + ", part=" + this + ']';
break; // Partition is already concurrently cleared and evicted.
catch (NodeStoppingException e) {
if (log.isDebugEnabled())
log.debug("Failed to clear cache entry for evicted partition: " + cached.partition());
throw e;
catch (IgniteCheckedException e) {
U.error(log, "Failed to clear cache entry for evicted partition: " + cached, e);
finally {
* Removes all deferred delete requests from {@code rmvQueue}.
public void clearDeferredDeletes() {
for (RemovedEntryHolder e : rmvQueue)
removeVersionedEntry(e.cacheId(), e.key(), e.version());
/** {@inheritDoc} */
@Override public int hashCode() {
return 31 * id + grp.groupId();
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || getClass() != o.getClass())
return false;
GridDhtLocalPartition part = (GridDhtLocalPartition)o;
return id == && grp.groupId() ==;
/** {@inheritDoc} */
@Override public int compareTo(@NotNull GridDhtLocalPartition part) {
if (part == null)
return 1;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtLocalPartition.class, this,
"grp", grp.cacheOrGroupName(),
"state", state(),
"reservations", reservations(),
"empty", isEmpty(),
"createTime", U.format(createTime),
"fullSize", fullSize(),
"cntr", dataStore().partUpdateCounter());
/** {@inheritDoc} */
@Override public int publicSize(int cacheId) {
if (grp.sharedGroup()) {
CacheMapHolder hld = cacheMaps.get(cacheId);
return hld != null ? hld.size.get() : 0;
return getSize(state.get());
/** {@inheritDoc} */
@Override public void incrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
if (grp.sharedGroup()) {
if (hld == null)
hld = cacheMapHolder(e.context());
while (true) {
long state = this.state.get();
if (this.state.compareAndSet(state, setSize(state, getSize(state) + 1)))
/** {@inheritDoc} */
@Override public void decrementPublicSize(@Nullable CacheMapHolder hld, GridCacheEntryEx e) {
if (grp.sharedGroup()) {
if (hld == null)
hld = cacheMapHolder(e.context());
while (true) {
long state = this.state.get();
assert getPartState(state) != EVICTED;
if (this.state.compareAndSet(state, setSize(state, getSize(state) - 1)))
* Returns group context.
* @return Group context.
public CacheGroupContext group() {
return grp;
* @param cacheId Cache ID.
public void onCacheStopped(int cacheId) {
assert grp.sharedGroup() : grp.cacheOrGroupName();
for (Iterator<RemovedEntryHolder> it = rmvQueue.iterator(); it.hasNext();) {
RemovedEntryHolder e =;
if (e.cacheId() == cacheId)
* @param state Composite state.
* @return Partition state.
private static GridDhtPartitionState getPartState(long state) {
return GridDhtPartitionState.fromOrdinal((int)(state & (0x0000000000000007L)));
* @param state State.
private static int ordinal(long state) {
return (int)(state & (0x0000000000000007L));
* @param state Composite state to update.
* @param partState Partition state.
* @return Updated composite state.
private static long setPartState(long state, GridDhtPartitionState partState) {
return (state & (~0x0000000000000007L)) | partState.ordinal();
* @param state Composite state.
* @return Reservations.
private static int getReservations(long state) {
return (int)((state & 0x00000000FFFF0000L) >> 16);
* @param state Composite state to update.
* @param reservations Reservations to set.
* @return Updated composite state.
private static long setReservations(long state, int reservations) {
return (state & (~0x00000000FFFF0000L)) | (reservations << 16);
* @param state Composite state.
* @return Size.
private static int getSize(long state) {
return (int)((state & 0xFFFFFFFF00000000L) >> 32);
* @param state Composite state to update.
* @param size Size to set.
* @return Updated composite state.
private static long setSize(long state, int size) {
return (state & (~0xFFFFFFFF00000000L)) | ((long)size << 32);
* Flushes pending update counters closing all possible gaps.
* @return Even-length array of pairs [start, end] for each gap.
public GridLongList finalizeUpdateCounters() {
return store.finalizeUpdateCounters();
* Called before next batch is about to be applied during rebalance. Currently used for tests.
* @param last {@code True} if last batch for partition.
public void beforeApplyBatch(boolean last) {
// No-op.
* Removed entry holder.
private static class RemovedEntryHolder {
/** */
private final int cacheId;
/** Cache key */
private final KeyCacheObject key;
/** Entry version */
private final GridCacheVersion ver;
/** Entry expire time. */
private final long expireTime;
* @param cacheId Cache ID.
* @param key Key.
* @param ver Entry version.
* @param ttl TTL.
private RemovedEntryHolder(int cacheId, KeyCacheObject key, GridCacheVersion ver, long ttl) {
this.cacheId = cacheId;
this.key = key;
this.ver = ver;
expireTime = U.currentTimeMillis() + ttl;
* @return Cache ID.
int cacheId() {
return cacheId;
* @return Key.
KeyCacheObject key() {
return key;
* @return Version.
GridCacheVersion version() {
return ver;
* @return item expired time
long expireTime() {
return expireTime;
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(RemovedEntryHolder.class, this);
* Collects detailed info about the partition.
* @param buf Buffer.
public void dumpDebugInfo(SB buf) {
GridDhtPartitionTopology top = grp.topology();
AffinityTopologyVersion topVer = top.readyTopologyVersion();
if (!topVer.initialized()) {
final int limit = 3;
buf.a(", lastChangeTopVer=").a(top.lastTopologyChangeVersion());
buf.a(", waitRebalance=").a(ctx.kernalContext().cache().context().affinity().waitRebalance(grp.groupId(), id));
buf.a(", nodes=").a(F.nodeIds(top.nodes(id, topVer)).stream().limit(limit).collect(Collectors.toList()));
buf.a(", locPart=").a(toString());
NavigableSet<AffinityTopologyVersion> versions = grp.affinity().cachedVersions();
int i = 5;
Iterator<AffinityTopologyVersion> iter = versions.descendingIterator();
while (--i >= 0 && iter.hasNext()) {
AffinityTopologyVersion topVer0 =;
buf.a(", ver").a(i).a('=').a(topVer0);
Collection<UUID> nodeIds = F.nodeIds(grp.affinity().cachedAffinity(topVer0).get(id));
buf.a(", affOwners").a(i).a('=').a(;