blob: 994f228a8bd22ab58b8db59c7b31f7206a62609d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReferenceArray;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.pagemem.wal.record.RollbackRecord;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.SB;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.PartitionLossPolicy.IGNORE;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.ExchangeType.ALL;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture.ExchangeType.NONE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
/**
* Partition topology.
*/
@GridToStringExclude
public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology {
/** */
private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING};
/** Flag to control amount of output for full map. */
private static final boolean FULL_MAP_DEBUG = false;
/** */
private static final boolean FAST_DIFF_REBUILD = false;
/** */
private final GridCacheSharedContext ctx;
/** */
private final CacheGroupContext grp;
/** Logger. */
private final IgniteLogger log;
/** Time logger. */
private final IgniteLogger timeLog;
/** */
private final AtomicReferenceArray<GridDhtLocalPartition> locParts;
/** Node to partition map. */
private GridDhtPartitionFullMap node2part;
/** */
private Set<Integer> lostParts;
/** */
private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>();
/** */
private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE;
/** Last started exchange version (always >= readyTopVer). */
private volatile AffinityTopologyVersion lastTopChangeVer = AffinityTopologyVersion.NONE;
/** Last finished exchange version. */
private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE;
/** Discovery cache. */
private volatile DiscoCache discoCache;
/** */
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will be ready to use. */
private volatile GridDhtTopologyFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
/** Lock. */
private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16);
/** Partition update counter. */
private final CachePartitionFullCountersMap cntrMap;
/** */
private volatile Map<Integer, Long> globalPartSizes;
/** */
private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE;
/** Factory used for re-creating partition during it's lifecycle. */
private PartitionFactory partFactory;
/**
* @param ctx Cache shared context.
* @param grp Cache group.
*/
public GridDhtPartitionTopologyImpl(
GridCacheSharedContext ctx,
CacheGroupContext grp
) {
assert ctx != null;
assert grp != null;
this.ctx = ctx;
this.grp = grp;
log = ctx.logger(getClass());
timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG);
locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions());
cntrMap = new CachePartitionFullCountersMap(locParts.length());
partFactory = GridDhtLocalPartition::new;
}
/**
* Set partition factory to use. Currently is used for tests.
*
* @param factory Factory.
*/
public void partitionFactory(PartitionFactory factory) {
this.partFactory = factory;
}
/** {@inheritDoc} */
@Override public int partitions() {
return grp.affinityFunction().partitions();
}
/** {@inheritDoc} */
@Override public int groupId() {
return grp.groupId();
}
/**
*
*/
public void onReconnected() {
lock.writeLock().lock();
try {
node2part = null;
diffFromAffinity.clear();
updateSeq.set(1);
topReadyFut = null;
diffFromAffinityVer = AffinityTopologyVersion.NONE;
rebalancedTopVer = AffinityTopologyVersion.NONE;
readyTopVer = AffinityTopologyVersion.NONE;
lastTopChangeVer = AffinityTopologyVersion.NONE;
discoCache = ctx.discovery().discoCache();
}
finally {
lock.writeLock().unlock();
}
}
/**
* @return Full map string representation.
*/
private String fullMapString() {
return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString();
}
/**
* @param map Map to get string for.
* @return Full map string representation.
*/
private String mapString(GridDhtPartitionMap map) {
return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString();
}
/** {@inheritDoc} */
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
@Override public void readLock() {
lock.readLock().lock();
}
/** {@inheritDoc} */
@Override public void readUnlock() {
lock.readLock().unlock();
}
/** {@inheritDoc} */
@Override public boolean holdsLock() {
return lock.isWriteLockedByCurrentThread() || lock.getReadHoldCount() > 0;
}
/** {@inheritDoc} */
@Override public void updateTopologyVersion(
GridDhtTopologyFuture exchFut,
@NotNull DiscoCache discoCache,
long updSeq,
boolean stopping
) throws IgniteInterruptedCheckedException {
U.writeLock(lock);
try {
AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() +
", topVer=" + readyTopVer +
", exchTopVer=" + exchTopVer +
", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") +
", exchDiscoCacheVer=" + discoCache.version() +
", fut=" + exchFut + ']';
this.stopping = stopping;
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
rebalancedTopVer = AffinityTopologyVersion.NONE;
lastTopChangeVer = exchTopVer;
this.discoCache = discoCache;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public boolean initialized() {
AffinityTopologyVersion topVer = readyTopVer;
assert topVer != null;
return topVer.initialized();
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion readyTopologyVersion() {
AffinityTopologyVersion topVer = this.readyTopVer;
assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
", group=" + grp.cacheOrGroupName() + ']';
return topVer;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion lastTopologyChangeVersion() {
AffinityTopologyVersion topVer = this.lastTopChangeVer;
assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer +
", group=" + grp.cacheOrGroupName() + ']';
return topVer;
}
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
GridDhtTopologyFuture topReadyFut0 = topReadyFut;
assert topReadyFut0 != null;
if (!topReadyFut0.changedAffinity()) {
GridDhtTopologyFuture lastFut = ctx.exchange().lastFinishedFuture();
if (lastFut != null)
return lastFut;
}
return topReadyFut0;
}
/** {@inheritDoc} */
@Override public boolean stopping() {
return stopping;
}
/** {@inheritDoc} */
@Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
GridDhtPartitionsExchangeFuture exchFut)
throws IgniteInterruptedCheckedException
{
boolean needRefresh;
ctx.database().checkpointReadLock();
try {
U.writeLock(lock);
try {
if (stopping)
return false;
long updateSeq = this.updateSeq.incrementAndGet();
needRefresh = initPartitions(affVer, grp.affinity().readyAssignments(affVer), exchFut, updateSeq);
consistencyCheck();
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
return needRefresh;
}
/**
* Creates and initializes partitions using given {@code affVer} and {@code affAssignment}.
*
* @param affVer Affinity version to use.
* @param affAssignment Affinity assignment to use.
* @param exchFut Exchange future.
* @param updateSeq Update sequence.
* @return {@code True} if partitions must be refreshed.
*/
private boolean initPartitions(
AffinityTopologyVersion affVer,
List<List<ClusterNode>> affAssignment,
GridDhtPartitionsExchangeFuture exchFut,
long updateSeq
) {
boolean needRefresh = false;
if (grp.affinityNode()) {
ClusterNode loc = ctx.localNode();
ClusterNode oldest = discoCache.oldestAliveServerNode();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
int partitions = grp.affinity().partitions();
if (grp.rebalanceEnabled()) {
boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined())
|| exchFut.activateCluster();
if (first) {
assert exchId.isJoined() || added || exchFut.activateCluster();
if (log.isDebugEnabled()) {
String reason;
if (exchId.isJoined())
reason = "First node in cluster";
else if (added)
reason = "Cache group added";
else
reason = "Cluster activate";
log.debug("Initialize partitions (" + reason + ")" + " [grp=" + grp.cacheOrGroupName() + "]");
}
for (int p = 0; p < partitions; p++) {
if (localNode(p, affAssignment)) {
// Partition is created first time, so it's safe to own it.
boolean shouldOwn = locParts.get(p) == null;
GridDhtLocalPartition locPart = getOrCreatePartition(p);
if (shouldOwn) {
locPart.own();
if (log.isDebugEnabled())
log.debug("Partition has been owned (created first time) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + locPart.id() + ']');
}
needRefresh = true;
updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
}
else {
// Apply partitions not belonging by affinity to partition map.
GridDhtLocalPartition locPart = locParts.get(p);
if (locPart != null) {
needRefresh = true;
updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
}
}
}
}
else
createPartitions(affVer, affAssignment, updateSeq);
}
else {
// If preloader is disabled, then we simply clear out
// the partitions this node is not responsible for.
for (int p = 0; p < partitions; p++) {
GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true);
boolean belongs = localNode(p, affAssignment);
if (locPart != null) {
if (!belongs) {
GridDhtPartitionState state = locPart.state();
if (state.active()) {
locPart.rent();
updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
if (log.isDebugEnabled()) {
log.debug("Evicting partition with rebalancing disabled (it does not belong to " +
"affinity) [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']');
}
}
}
else {
locPart.own();
// Make sure partition map is initialized.
updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
}
}
else if (belongs) {
locPart = getOrCreatePartition(p);
locPart.own();
updateLocal(p, locPart.state(), updateSeq, affVer);
}
}
}
}
updateRebalanceVersion(affVer, affAssignment);
return needRefresh;
}
/**
* Creates non-existing partitions belong to given affinity {@code aff}.
*
* @param affVer Affinity version.
* @param aff Affinity assignments.
* @param updateSeq Update sequence.
*/
private void createPartitions(
AffinityTopologyVersion affVer,
List<List<ClusterNode>> aff,
long updateSeq
) {
if (!grp.affinityNode())
return;
int partitions = grp.affinity().partitions();
if (log.isDebugEnabled())
log.debug("Create non-existing partitions [grp=" + grp.cacheOrGroupName() + "]");
for (int p = 0; p < partitions; p++) {
if (node2part != null && node2part.valid()) {
if (localNode(p, aff)) {
GridDhtLocalPartition locPart = locParts.get(p);
if (locPart != null) {
if (locPart.state() == RENTING) {
if (lostPartitions().contains(locPart.id()))
locPart.markLost(); // Preserve LOST state.
else
locPart.moving();
}
if (locPart.state() == EVICTED)
locPart = getOrCreatePartition(p);
}
else
locPart = getOrCreatePartition(p);
updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer);
}
}
// If this node's map is empty, we pre-create local partitions,
// so local map will be sent correctly during exchange.
else if (localNode(p, aff))
getOrCreatePartition(p);
}
}
/** {@inheritDoc} */
@Override public void beforeExchange(
GridDhtPartitionsExchangeFuture exchFut,
boolean affReady,
boolean updateMoving
) throws IgniteCheckedException {
ctx.database().checkpointReadLock();
try {
U.writeLock(lock);
try {
if (stopping)
return;
assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer +
", exchId=" + exchFut.exchangeId() + ']';
ExchangeDiscoveryEvents evts = exchFut.context().events();
if (affReady) {
assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" +
"grp=" + grp.cacheOrGroupName() +
", affVer=" + grp.affinity().lastVersion() +
", evtsVer=" + evts.topologyVersion() + ']';
lastTopChangeVer = readyTopVer = evts.topologyVersion();
discoCache = evts.discoveryCache();
}
if (log.isDebugEnabled()) {
log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() +
", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
}
long updateSeq = this.updateSeq.incrementAndGet();
cntrMap.clear();
initializeFullMap(updateSeq);
boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
if (evts.hasServerLeft()) {
for (DiscoveryEvent evt : evts.events()) {
if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
removeNode(evt.eventNode().id());
}
}
else if (affReady && grpStarted && exchFut.exchangeType() == NONE) {
assert !exchFut.context().mergeExchanges() : exchFut;
assert node2part != null && node2part.valid() : exchFut;
// Initialize node maps if group was started from joining client.
final List<ClusterNode> nodes = exchFut.firstEventCache().cacheGroupAffinityNodes(grp.groupId());
for (ClusterNode node : nodes) {
if (!node2part.containsKey(node.id()) && ctx.discovery().alive(node)) {
final GridDhtPartitionMap partMap = new GridDhtPartitionMap(node.id(),
1L,
exchFut.initialVersion(),
new GridPartitionStateMap(),
false);
final AffinityAssignment aff = grp.affinity().cachedAffinity(exchFut.initialVersion());
for (Integer p0 : aff.primaryPartitions(node.id()))
partMap.put(p0, OWNING);
for (Integer p0 : aff.backupPartitions(node.id()))
partMap.put(p0, OWNING);
node2part.put(node.id(), partMap);
}
}
}
if (grp.affinityNode()) {
if (grpStarted ||
exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT ||
exchFut.serverNodeDiscoveryEvent()) {
AffinityTopologyVersion affVer;
List<List<ClusterNode>> affAssignment;
if (affReady) {
affVer = evts.topologyVersion();
assert grp.affinity().lastVersion().equals(affVer) :
"Invalid affinity [topVer=" + grp.affinity().lastVersion() +
", grp=" + grp.cacheOrGroupName() +
", affVer=" + affVer +
", fut=" + exchFut + ']';
affAssignment = grp.affinity().readyAssignments(affVer);
}
else {
assert !exchFut.context().mergeExchanges();
affVer = exchFut.initialVersion();
affAssignment = grp.affinity().idealAssignmentRaw();
}
initPartitions(affVer, affAssignment, exchFut, updateSeq);
}
}
consistencyCheck();
if (updateMoving) {
assert grp.affinity().lastVersion().equals(evts.topologyVersion());
createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion()));
}
if (log.isDebugEnabled()) {
log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " +
"exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']');
}
if (log.isTraceEnabled()) {
log.trace("Partition states after beforeExchange [grp=" + grp.cacheOrGroupName()
+ ", exchId=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
}
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/**
* Initializes full map if current full map is empty or invalid in case of coordinator or cache groups start.
*
* @param updateSeq Update sequence to initialize full map.
*/
private void initializeFullMap(long updateSeq) {
if (!(topReadyFut instanceof GridDhtPartitionsExchangeFuture))
return;
GridDhtPartitionsExchangeFuture exchFut = (GridDhtPartitionsExchangeFuture) topReadyFut;
boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom());
ClusterNode oldest = discoCache.oldestAliveServerNode();
// If this is the oldest node (coordinator) or cache was added during this exchange
if (oldest != null && (ctx.localNode().equals(oldest) || grpStarted)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
if (log.isDebugEnabled()) {
log.debug("Created brand new full topology map on oldest node [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
}
else if (!node2part.valid()) {
node2part = new GridDhtPartitionFullMap(oldest.id(),
oldest.order(),
updateSeq,
node2part,
false);
if (log.isDebugEnabled()) {
log.debug("Created new full topology map on oldest node [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + node2part + ']');
}
}
else if (!node2part.nodeId().equals(ctx.localNode().id())) {
node2part = new GridDhtPartitionFullMap(oldest.id(),
oldest.order(),
updateSeq,
node2part,
false);
if (log.isDebugEnabled()) {
log.debug("Copied old map into new map on oldest node (previous oldest node left) [" +
"grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
}
}
}
/**
* @param p Partition number.
* @param topVer Topology version.
* @return {@code True} if given partition belongs to local node.
*/
private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) {
return grp.affinity().nodes(p, topVer).contains(ctx.localNode());
}
/** {@inheritDoc} */
@Override public void afterStateRestored(AffinityTopologyVersion topVer) {
// No-op. If some partitions are restored in RENTING state clearing will be started after PME.
// Otherwise it's possible RENTING will be finished while node2part is not ready and a partition
// map will not be updated.
}
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) {
boolean changed = false;
int partitions = grp.affinity().partitions();
AffinityTopologyVersion topVer = exchFut.context().events().topologyVersion();
assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " +
"[grp=" + grp.cacheOrGroupName() +
", topVer=" + topVer +
", affVer=" + grp.affinity().lastVersion() +
", fut=" + exchFut + ']';
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
if (stopping)
return false;
assert readyTopVer.initialized() : readyTopVer;
assert lastTopChangeVer.equals(readyTopVer);
if (log.isDebugEnabled()) {
log.debug("Partition map before afterExchange [grp=" + grp.cacheOrGroupName() +
", exchId=" + exchFut.exchangeId() +
", fullMap=" + fullMapString() + ']');
}
if (log.isTraceEnabled()) {
log.trace("Partition states before afterExchange [grp=" + grp.cacheOrGroupName()
+ ", exchVer=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
}
long updateSeq = this.updateSeq.incrementAndGet();
// Skip partition updates in case of not real exchange.
if (!ctx.localNode().isClient() && exchFut.exchangeType() == ALL) {
for (int p = 0; p < partitions; p++) {
GridDhtLocalPartition locPart = localPartition0(p, topVer, false, true);
if (partitionLocalNode(p, topVer)) {
// Prepare partition to rebalance if it's not happened on full map update phase.
if (locPart == null || locPart.state() == RENTING || locPart.state() == EVICTED)
locPart = rebalancePartition(p, true, exchFut);
GridDhtPartitionState state = locPart.state();
if (state == MOVING) {
if (grp.rebalanceEnabled()) {
Collection<ClusterNode> owners = owners(p);
// If an owner node left during exchange,
// then new exchange should be started with detecting lost partitions.
if (!F.isEmpty(owners)) {
if (log.isDebugEnabled())
log.debug("Will not own partition (there are owners to rebalance from) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + p + ", owners = " + owners + ']');
}
}
else
updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
}
}
else {
if (locPart != null) {
GridDhtPartitionState state = locPart.state();
if (state == MOVING) {
locPart.rent();
updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer);
changed = true;
if (log.isDebugEnabled()) {
log.debug("Evicting MOVING partition (it does not belong to affinity) [" +
"grp=" + grp.cacheOrGroupName() + ", p=" + locPart.id() + ']');
}
}
}
}
}
}
AffinityAssignment aff = grp.affinity().readyAffinity(topVer);
if (node2part != null && node2part.valid())
changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff.topologyVersion(), aff.assignment());
consistencyCheck();
if (log.isTraceEnabled()) {
log.trace("Partition states after afterExchange [grp=" + grp.cacheOrGroupName()
+ ", exchVer=" + exchFut.exchangeId() + ", states=" + dumpPartitionStates() + ']');
}
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
return changed;
}
/** {@inheritDoc} */
@Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer,
boolean create)
throws GridDhtInvalidPartitionException {
return localPartition0(p, topVer, create, false);
}
/** {@inheritDoc} */
@Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer,
boolean create, boolean showRenting) throws GridDhtInvalidPartitionException {
return localPartition0(p, topVer, create, showRenting);
}
/**
* Creates partition with id {@code p} if it doesn't exist or evicted.
* In other case returns existing partition.
*
* @param p Partition number.
* @return Partition.
*/
public GridDhtLocalPartition getOrCreatePartition(int p) {
assert lock.isWriteLockedByCurrentThread();
assert ctx.database().checkpointLockIsHeldByThread();
GridDhtLocalPartition loc = locParts.get(p);
if (loc == null || loc.state() == EVICTED) {
boolean recreate = false;
// Make sure that after eviction partition is destroyed.
if (loc != null)
recreate = true;
locParts.set(p, loc = partFactory.create(ctx, grp, p, false));
if (recreate)
loc.resetUpdateCounter();
long updCntr = cntrMap.updateCounter(p);
if (updCntr != 0)
loc.updateCounter(updCntr);
// Create a partition in lost state.
if (lostParts != null && lostParts.contains(p))
loc.markLost();
}
return loc;
}
/** {@inheritDoc} */
@Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
lock.writeLock().lock();
try {
GridDhtLocalPartition part = locParts.get(p);
boolean recreate = false;
if (part != null) {
if (part.state() != EVICTED)
return part;
else
recreate = true;
}
part = partFactory.create(ctx, grp, p, true);
if (recreate)
part.resetUpdateCounter();
locParts.set(p, part);
return part;
}
finally {
lock.writeLock().unlock();
}
}
/**
* @param p Partition number.
* @param topVer Topology version.
* @param create If {@code true} create partition if it doesn't exists or evicted.
* @param showRenting If {@code true} return partition in RENTING state if exists.
* @return Local partition.
*/
@SuppressWarnings("TooBroadScope")
private GridDhtLocalPartition localPartition0(int p,
AffinityTopologyVersion topVer,
boolean create,
boolean showRenting) {
GridDhtLocalPartition loc;
loc = locParts.get(p);
GridDhtPartitionState state = loc != null ? loc.state() : null;
if (loc != null && state != EVICTED && (state != RENTING || showRenting))
return loc;
if (!create)
return null;
boolean created = false;
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
loc = locParts.get(p);
state = loc != null ? loc.state() : null;
boolean belongs = partitionLocalNode(p, topVer);
boolean recreate = false;
if (loc != null && state == EVICTED) {
recreate = true;
locParts.set(p, loc = null);
if (!belongs) {
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " +
"(often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer +
", this.topVer=" + this.readyTopVer + ']');
}
}
else if (loc != null && state == RENTING && !showRenting) {
boolean belongsNow = topVer.equals(this.readyTopVer) ? belongs : partitionLocalNode(p, this.readyTopVer);
throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " +
"evicted [grp=" + grp.cacheOrGroupName() + ", part=" + p + ", belongsNow=" + belongsNow
+ ", belongs=" + belongs + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]");
}
if (loc == null) {
if (!belongs)
throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " +
"local node (often may be caused by inconsistent 'key.hashCode()' implementation) " +
"[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer +
", this.topVer=" + this.readyTopVer + ']');
locParts.set(p, loc = partFactory.create(ctx, grp, p, false));
if (recreate)
loc.resetUpdateCounter();
this.updateSeq.incrementAndGet();
created = true;
}
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
return loc;
}
/** {@inheritDoc} */
@Override public void releasePartitions(int... parts) {
assert parts != null;
assert parts.length > 0;
for (int i = 0; i < parts.length; i++) {
GridDhtLocalPartition part = locParts.get(parts[i]);
if (part != null)
part.release();
}
}
/** {@inheritDoc} */
@Override public GridDhtLocalPartition localPartition(int part) {
return locParts.get(part);
}
/** {@inheritDoc} */
@Override public List<GridDhtLocalPartition> localPartitions() {
List<GridDhtLocalPartition> list = new ArrayList<>(locParts.length());
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part != null && part.state().active())
list.add(part);
}
return list;
}
/** {@inheritDoc} */
@Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() {
return new Iterable<GridDhtLocalPartition>() {
@Override public Iterator<GridDhtLocalPartition> iterator() {
return new CurrentPartitionsIterator();
}
};
}
/** {@inheritDoc} */
@Override public void onRemoved(GridDhtCacheEntry e) {
/*
* Make sure not to acquire any locks here as this method
* may be called from sensitive synchronization blocks.
* ===================================================
*/
GridDhtLocalPartition loc = localPartition(e.partition(), readyTopVer, false);
if (loc != null)
loc.onRemoved(e);
}
/** {@inheritDoc} */
@Override public GridDhtPartitionMap localPartitionMap() {
GridPartitionStateMap map = new GridPartitionStateMap(locParts.length());
lock.readLock().lock();
try {
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part == null)
continue;
map.put(i, part.state());
}
GridDhtPartitionMap locPartMap = node2part != null ? node2part.get(ctx.localNodeId()) : null;
return new GridDhtPartitionMap(ctx.localNodeId(),
updateSeq.get(),
locPartMap != null ? locPartMap.topologyVersion() : readyTopVer,
map,
true);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
lock.readLock().lock();
try {
GridDhtPartitionMap partMap = node2part.get(nodeId);
if (partMap != null) {
GridDhtPartitionState state = partMap.get(part);
return state == null ? EVICTED : state;
}
return EVICTED;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Nullable @Override public List<ClusterNode> nodes(int p,
AffinityAssignment affAssignment,
List<ClusterNode> affNodes) {
return nodes0(p, affAssignment, affNodes);
}
/** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer);
List<ClusterNode> affNodes = affAssignment.get(p);
List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes);
return nodes != null ? nodes : affNodes;
}
/**
* @param p Partition.
* @param affAssignment Assignments.
* @param affNodes Node assigned for given partition by affinity.
* @return Nodes responsible for given partition (primary is first).
*/
@Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) {
if (grp.isReplicated())
return affNodes;
AffinityTopologyVersion topVer = affAssignment.topologyVersion();
lock.readLock().lock();
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer +
", topVer2=" + readyTopVer +
", node=" + ctx.igniteInstanceName() +
", grp=" + grp.cacheOrGroupName() +
", node2part=" + node2part + ']';
List<ClusterNode> nodes = null;
AffinityTopologyVersion diffVer = diffFromAffinityVer;
if (!diffVer.equals(topVer)) {
if (log.isDebugEnabled()) {
log.debug("Requested topology version does not match calculated diff, need to check if " +
"affinity has changed [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer +
", diffVer=" + diffVer + "]");
}
boolean affChanged;
if (diffVer.compareTo(topVer) < 0)
affChanged = ctx.exchange().affinityChanged(diffVer, topVer);
else
affChanged = ctx.exchange().affinityChanged(topVer, diffVer);
if (affChanged) {
if (log.isDebugEnabled()) {
log.debug("Requested topology version does not match calculated diff, will require full iteration to" +
"calculate mapping [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer +
", diffVer=" + diffVer + "]");
}
nodes = new ArrayList<>();
nodes.addAll(affNodes);
for (Map.Entry<UUID, GridDhtPartitionMap> entry : node2part.entrySet()) {
GridDhtPartitionState state = entry.getValue().get(p);
ClusterNode n = ctx.discovery().node(entry.getKey());
if (n != null && state != null && (state == MOVING || state == OWNING || state == RENTING)
&& !nodes.contains(n) && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
}
}
return nodes;
}
Collection<UUID> diffIds = diffFromAffinity.get(p);
if (!F.isEmpty(diffIds)) {
Collection<UUID> affIds = affAssignment.getIds(p);
for (UUID nodeId : diffIds) {
if (affIds.contains(nodeId))
continue;
if (hasState(p, nodeId, OWNING, MOVING, RENTING)) {
ClusterNode n = ctx.discovery().node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null) {
nodes = new ArrayList<>(affNodes.size() + diffIds.size());
nodes.addAll(affNodes);
}
nodes.add(n);
}
}
}
}
return nodes;
}
finally {
lock.readLock().unlock();
}
}
/**
* @param p Partition.
* @param topVer Topology version ({@code -1} for all nodes).
* @param state Partition state.
* @param states Additional partition states.
* @return List of nodes for the partition.
*/
private List<ClusterNode> nodes(
int p,
AffinityTopologyVersion topVer,
GridDhtPartitionState state,
GridDhtPartitionState... states
) {
Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId()));
lock.readLock().lock();
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
", grp=" + grp.cacheOrGroupName() +
", allIds=" + allIds +
", node2part=" + node2part + ']';
// Node IDs can be null if both, primary and backup, nodes disappear.
// Empirical size to reduce growing of ArrayList.
// We bear in mind that most of the time we filter OWNING partitions.
List<ClusterNode> nodes = new ArrayList<>(allIds.size() / 2 + 1);
for (UUID id : allIds) {
if (hasState(p, id, state, states)) {
ClusterNode n = ctx.discovery().node(id);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
}
}
return nodes;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
if (!grp.rebalanceEnabled())
return ownersAndMoving(p, topVer);
return nodes(p, topVer, OWNING, null);
}
/** {@inheritDoc} */
@Override public List<ClusterNode> owners(int p) {
return owners(p, AffinityTopologyVersion.NONE);
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> allOwners() {
lock.readLock().lock();
try {
int parts = partitions();
List<List<ClusterNode>> res = new ArrayList<>(parts);
for (int i = 0; i < parts; i++)
res.add(new ArrayList<>());
List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grp.groupId());
for (int i = 0; i < allNodes.size(); i++) {
ClusterNode node = allNodes.get(i);
GridDhtPartitionMap nodeParts = node2part.get(node.id());
if (nodeParts != null) {
for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) {
if (e.getValue() == OWNING) {
int part = e.getKey();
List<ClusterNode> owners = res.get(part);
owners.add(node);
}
}
}
}
return res;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public List<ClusterNode> moving(int p) {
if (!grp.rebalanceEnabled())
return ownersAndMoving(p, AffinityTopologyVersion.NONE);
return nodes(p, AffinityTopologyVersion.NONE, MOVING, null);
}
/**
* @param p Partition.
* @param topVer Topology version.
* @return List of nodes in state OWNING or MOVING.
*/
private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) {
return nodes(p, topVer, OWNING, MOVING_STATES);
}
/** {@inheritDoc} */
@Override public long updateSequence() {
return updateSeq.get();
}
/** {@inheritDoc} */
@Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) {
lock.readLock().lock();
try {
if (node2part == null || stopping)
return null;
assert node2part.valid() : "Invalid node2part [node2part=" + node2part +
", grp=" + grp.cacheOrGroupName() +
", stopping=" + stopping +
", locNodeId=" + ctx.localNode().id() +
", locName=" + ctx.igniteInstanceName() + ']';
GridDhtPartitionFullMap m = node2part;
return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive);
}
finally {
lock.readLock().unlock();
}
}
/**
* Checks should current partition map overwritten by new partition map
* Method returns true if topology version or update sequence of new map are greater than of current map
*
* @param currentMap Current partition map
* @param newMap New partition map
* @return True if current partition map should be overwritten by new partition map, false in other case
*/
private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
return newMap != null && newMap.compareTo(currentMap) > 0;
}
/** {@inheritDoc} */
@Override public boolean update(
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap incomeCntrMap,
Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
@Nullable Set<Integer> lostParts
) {
if (log.isDebugEnabled()) {
log.debug("Updating full partition map " +
"[grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + ", fullMap=" + fullMapString() + ']');
}
assert partMap != null;
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
if (log.isTraceEnabled() && exchangeVer != null) {
log.trace("Partition states before full update [grp=" + grp.cacheOrGroupName()
+ ", exchVer=" + exchangeVer + ", states=" + dumpPartitionStates() + ']');
}
if (stopping || !lastTopChangeVer.initialized() ||
// Ignore message not-related to exchange if exchange is in progress.
(exchangeVer == null && !lastTopChangeVer.equals(readyTopVer)))
return false;
if (incomeCntrMap != null) {
// update local counters in partitions
for (int i = 0; i < locParts.length(); i++) {
cntrMap.updateCounter(i, incomeCntrMap.updateCounter(i));
GridDhtLocalPartition part = locParts.get(i);
if (part == null)
continue;
if (part.state() == OWNING || part.state() == MOVING) {
long updCntr = incomeCntrMap.updateCounter(part.id());
long curCntr = part.updateCounter();
// Avoid zero counter update to empty partition to prevent lazy init.
if (updCntr != 0 || curCntr != 0) {
part.updateCounter(updCntr);
if (updCntr > curCntr) {
if (log.isDebugEnabled())
log.debug("Partition update counter has updated [grp=" + grp.cacheOrGroupName() + ", p=" + part.id()
+ ", state=" + part.state() + ", prevCntr=" + curCntr + ", nextCntr=" + updCntr + "]");
}
}
}
}
}
// TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11800
if (exchangeVer != null) {
// Ignore if exchange already finished or new exchange started.
if (readyTopVer.after(exchangeVer) || lastTopChangeVer.after(exchangeVer)) {
U.warn(log, "Stale exchange id for full partition map update (will ignore) [" +
"grp=" + grp.cacheOrGroupName() +
", lastTopChange=" + lastTopChangeVer +
", readTopVer=" + readyTopVer +
", exchVer=" + exchangeVer + ']');
return false;
}
}
boolean fullMapUpdated = node2part == null;
if (node2part != null) {
// Merge maps.
for (GridDhtPartitionMap part : node2part.values()) {
GridDhtPartitionMap newPart = partMap.get(part.nodeId());
if (shouldOverridePartitionMap(part, newPart)) {
fullMapUpdated = true;
if (log.isDebugEnabled()) {
log.debug("Overriding partition map in full update map [" +
"node=" + part.nodeId() +
", grp=" + grp.cacheOrGroupName() +
", exchVer=" + exchangeVer +
", curPart=" + mapString(part) +
", newPart=" + mapString(newPart) + ']');
}
if (newPart.nodeId().equals(ctx.localNodeId()))
updateSeq.setIfGreater(newPart.updateSequence());
}
else {
// If for some nodes current partition has a newer map, then we keep the newer value.
if (log.isDebugEnabled()) {
log.debug("Partitions map for the node keeps newer value than message [" +
"node=" + part.nodeId() +
", grp=" + grp.cacheOrGroupName() +
", exchVer=" + exchangeVer +
", curPart=" + mapString(part) +
", newPart=" + mapString(newPart) + ']');
}
partMap.put(part.nodeId(), part);
}
}
// Check that we have new nodes.
for (GridDhtPartitionMap part : partMap.values()) {
if (fullMapUpdated)
break;
fullMapUpdated = !node2part.containsKey(part.nodeId());
}
GridDhtPartitionsExchangeFuture topFut =
exchFut == null ? ctx.exchange().lastFinishedFuture() : exchFut;
// topFut can be null if lastFinishedFuture has completed with error.
if (topFut != null) {
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
final ClusterNode node = topFut.events().discoveryCache().node(nodeId);
if (node == null) {
if (log.isTraceEnabled())
log.trace("Removing left node from full map update [grp=" + grp.cacheOrGroupName() +
", exchTopVer=" + exchangeVer + ", futVer=" + topFut.initialVersion() +
", nodeId=" + nodeId + ", partMap=" + partMap + ']');
it.remove();
}
}
}
}
else {
GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId());
if (locNodeMap != null)
updateSeq.setIfGreater(locNodeMap.updateSequence());
}
if (!fullMapUpdated) {
if (log.isTraceEnabled()) {
log.trace("No updates for full partition map (will ignore) [" +
"grp=" + grp.cacheOrGroupName() +
", lastExch=" + lastTopChangeVer +
", exchVer=" + exchangeVer +
", curMap=" + node2part +
", newMap=" + partMap + ']');
}
return false;
}
if (exchangeVer != null) {
assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0;
lastTopChangeVer = readyTopVer = exchangeVer;
// Apply lost partitions from full message.
if (lostParts != null) {
this.lostParts = new HashSet<>(lostParts);
for (Integer part : lostParts) {
GridDhtLocalPartition locPart = localPartition(part);
// EVICTED partitions should not be marked directly as LOST, or
// part.clearFuture lifecycle will be broken after resetting.
// New partition should be created instead.
if (locPart != null && locPart.state() != EVICTED) {
locPart.markLost();
GridDhtPartitionMap locMap = partMap.get(ctx.localNodeId());
locMap.put(part, LOST);
}
}
}
}
node2part = partMap;
if (log.isDebugEnabled()) {
log.debug("Partition map after processFullMessage [grp=" + grp.cacheOrGroupName() +
", exchId=" + (exchFut == null ? null : exchFut.exchangeId()) +
", fullMap=" + fullMapString() + ']');
}
if (exchangeVer == null && !grp.isReplicated() &&
(readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) {
AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
int p = e0.getKey();
Set<UUID> diffIds = diffFromAffinity.get(p);
if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) &&
!affAssignment.getIds(p).contains(e.getKey())) {
if (diffIds == null)
diffFromAffinity.put(p, diffIds = U.newHashSet(3));
diffIds.add(e.getKey());
}
else {
if (diffIds != null && diffIds.remove(e.getKey())) {
if (diffIds.isEmpty())
diffFromAffinity.remove(p);
}
}
}
}
diffFromAffinityVer = readyTopVer;
}
boolean changed = false;
GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId());
// Only in real exchange occurred.
if (exchangeVer != null &&
nodeMap != null &&
grp.persistenceEnabled() &&
readyTopVer.initialized()) {
assert exchFut != null;
for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) {
int p = e.getKey();
GridDhtPartitionState state = e.getValue();
if (state == OWNING) {
GridDhtLocalPartition locPart = locParts.get(p);
assert locPart != null : grp.cacheOrGroupName();
if (locPart.state() == MOVING) {
boolean success = locPart.own();
assert success : locPart;
changed |= success;
}
}
else if (state == MOVING) {
GridDhtLocalPartition locPart = locParts.get(p);
rebalancePartition(p, partsToReload.contains(p) ||
locPart != null && locPart.state() == MOVING && exchFut.localJoinExchange(), exchFut);
changed = true;
}
}
}
long updateSeq = this.updateSeq.incrementAndGet();
if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer);
// Check evictions while preloading.
// Evictions on exchange are checked in exchange worker thread before rebalancing.
if (exchangeVer == null)
changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff.topologyVersion(), aff.assignment());
}
if (partSizes != null)
this.globalPartSizes = partSizes;
consistencyCheck();
if (log.isDebugEnabled()) {
log.debug("Partition map after full update [grp=" + grp.cacheOrGroupName() +
", map=" + fullMapString() + ']');
}
if (log.isTraceEnabled() && exchangeVer != null) {
log.trace("Partition states after full update [grp=" + grp.cacheOrGroupName()
+ ", exchVer=" + exchangeVer + ", states=" + dumpPartitionStates() + ']');
}
if (changed) {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
"Full map update [grp" + grp.cacheOrGroupName() + "]");
ctx.exchange().scheduleResendPartitions();
}
return changed;
} finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
assert cntrMap != null;
long nowNanos = System.nanoTime();
lock.writeLock().lock();
try {
long acquiredNanos = System.nanoTime();
if (acquiredNanos - nowNanos >= U.millisToNanos(100)) {
if (timeLog.isInfoEnabled())
timeLog.info("Waited too long to acquire topology write lock " +
"[grp=" + grp.cacheOrGroupName() + ", waitTime=" +
U.nanosToMillis(acquiredNanos - nowNanos) + ']');
}
if (stopping)
return;
for (int i = 0; i < cntrMap.size(); i++) {
int pId = cntrMap.partitionAt(i);
long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i);
long updateCntr = cntrMap.updateCounterAt(i);
if (this.cntrMap.updateCounter(pId) < updateCntr) {
this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr);
this.cntrMap.updateCounter(pId, updateCntr);
}
}
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void applyUpdateCounters() {
long nowNanos = System.nanoTime();
lock.writeLock().lock();
try {
long acquiredNanos = System.nanoTime();
if (acquiredNanos - nowNanos >= U.millisToNanos(100)) {
if (timeLog.isInfoEnabled())
timeLog.info("Waited too long to acquire topology write lock " +
"[grp=" + grp.cacheOrGroupName() + ", waitTime=" +
U.nanosToMillis(acquiredNanos - nowNanos) + ']');
}
if (stopping)
return;
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part == null)
continue;
boolean reserve = part.reserve();
try {
GridDhtPartitionState state = part.state();
if (!reserve || state == EVICTED || state == RENTING || state == LOST)
continue;
long updCntr = cntrMap.updateCounter(part.id());
long locUpdCntr = part.updateCounter();
if (updCntr != 0 || locUpdCntr != 0) { // Avoid creation of empty partition.
part.updateCounter(updCntr);
if (updCntr > locUpdCntr) {
if (log.isDebugEnabled())
log.debug("Partition update counter has updated [grp=" + grp.cacheOrGroupName() + ", p=" + part.id()
+ ", state=" + part.state() + ", prevCntr=" + locUpdCntr + ", nextCntr=" + updCntr + "]");
}
}
if (locUpdCntr > updCntr) {
cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter());
cntrMap.updateCounter(part.id(), locUpdCntr);
}
}
finally {
if (reserve)
part.release();
}
}
}
finally {
lock.writeLock().unlock();
}
}
/**
* Method checks is new partition map more stale than current partition map
* New partition map is stale if topology version or update sequence are less or equal than of current map
*
* @param currentMap Current partition map
* @param newMap New partition map
* @return True if new partition map is more stale than current partition map, false in other case
*/
private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
return currentMap != null && newMap.compareTo(currentMap) <= 0;
}
/** {@inheritDoc} */
@Override public boolean update(
@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap parts,
boolean force
) {
if (log.isDebugEnabled()) {
log.debug("Updating single partition map [grp=" + grp.cacheOrGroupName() + ", exchId=" + exchId +
", parts=" + mapString(parts) + ']');
}
if (!ctx.discovery().alive(parts.nodeId())) {
if (log.isTraceEnabled()) {
log.trace("Received partition update for non-existing node (will ignore) [grp=" + grp.cacheOrGroupName() +
", exchId=" + exchId + ", parts=" + parts + ']');
}
return false;
}
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
if (stopping)
return false;
if (!force) {
if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) {
U.warn(log, "Stale exchange id for single partition map update (will ignore) [" +
"grp=" + grp.cacheOrGroupName() +
", lastTopChange=" + lastTopChangeVer +
", readTopVer=" + readyTopVer +
", exch=" + exchId.topologyVersion() + ']');
return false;
}
}
if (node2part == null)
// Create invalid partition map.
node2part = new GridDhtPartitionFullMap();
GridDhtPartitionMap cur = node2part.get(parts.nodeId());
if (force) {
if (cur != null && cur.topologyVersion().initialized())
parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
}
else if (isStaleUpdate(cur, parts)) {
assert cur != null;
String msg = "Stale update for single partition map update (will ignore) [" +
"nodeId=" + parts.nodeId() +
", grp=" + grp.cacheOrGroupName() +
", exchId=" + exchId +
", curMap=" + cur +
", newMap=" + parts + ']';
// This is usual situation when partition maps are equal, just print debug message.
if (cur.compareTo(parts) == 0) {
if (log.isTraceEnabled())
log.trace(msg);
}
else
U.warn(log, msg);
return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
node2part.newUpdateSequence(updateSeq);
boolean changed = false;
if (cur == null || !cur.equals(parts))
changed = true;
node2part.put(parts.nodeId(), parts);
// During exchange diff is calculated after all messages are received and affinity initialized.
if (exchId == null && !grp.isReplicated()) {
if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) {
AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer);
// Add new mappings.
for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) {
int p = e.getKey();
Set<UUID> diffIds = diffFromAffinity.get(p);
if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING)
&& !affAssignment.getIds(p).contains(parts.nodeId())) {
if (diffIds == null)
diffFromAffinity.put(p, diffIds = U.newHashSet(3));
if (diffIds.add(parts.nodeId()))
changed = true;
}
else {
if (diffIds != null && diffIds.remove(parts.nodeId())) {
changed = true;
if (diffIds.isEmpty())
diffFromAffinity.remove(p);
}
}
}
// Remove obsolete mappings.
if (cur != null) {
for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
Set<UUID> ids = diffFromAffinity.get(p);
if (ids != null && ids.remove(parts.nodeId())) {
changed = true;
if (ids.isEmpty())
diffFromAffinity.remove(p);
}
}
}
diffFromAffinityVer = readyTopVer;
}
}
if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) {
AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer);
if (exchId == null)
changed |= checkEvictions(updateSeq, aff);
updateRebalanceVersion(aff.topologyVersion(), aff.assignment());
}
consistencyCheck();
if (log.isDebugEnabled())
log.debug("Partition map after single update [grp=" + grp.cacheOrGroupName() + ", map=" + fullMapString() + ']');
if (changed && exchId == null) {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
"Single map update [grp" + grp.cacheOrGroupName() + "]");
ctx.exchange().scheduleResendPartitions();
}
return changed;
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public void onExchangeDone(@Nullable GridDhtPartitionsExchangeFuture fut,
AffinityAssignment assignment,
boolean updateRebalanceVer) {
lock.writeLock().lock();
try {
assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) ||
assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion());
readyTopVer = lastTopChangeVer = assignment.topologyVersion();
if (fut != null)
discoCache = fut.events().discoveryCache();
if (!grp.isReplicated()) {
boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() ||
fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized();
if (rebuildDiff) {
if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0)
rebuildDiff(assignment);
}
else
diffFromAffinityVer = readyTopVer;
if (!updateRebalanceVer)
updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment());
}
if (updateRebalanceVer)
updateRebalanceVersion(assignment.topologyVersion(), assignment.assignment());
// Own orphan moving partitions (having no suppliers).
if (fut != null && (fut.events().hasServerJoin() || fut.changedBaseline()))
ownOrphans();
}
finally {
lock.writeLock().unlock();
}
}
/**
* Check if some local moving partitions have no suitable supplier and own them.
* This can happen if a partition has been created by affinity assignment on new node and no supplier exists.
* For example, if a node joins topology being a first data node for cache with node filter configured.
*/
private void ownOrphans() {
for (int p = 0; p < grp.affinity().partitions(); p++) {
boolean hasOwner = false;
for (GridDhtPartitionMap map : node2part.values()) {
if (map.get(p) == OWNING) {
hasOwner = true;
break;
}
}
if (!hasOwner) {
GridDhtLocalPartition locPart = localPartition(p);
if (locPart != null && locPart.state() != EVICTED && locPart.state() != LOST) {
locPart.own();
for (GridDhtPartitionMap map : node2part.values()) {
if (map.get(p) != null)
map.put(p, OWNING);
}
}
}
}
}
/**
* @param aff Affinity.
*/
private void createMovingPartitions(AffinityAssignment aff) {
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
GridDhtPartitionMap map = e.getValue();
addMoving(map, aff.backupPartitions(e.getKey()));
addMoving(map, aff.primaryPartitions(e.getKey()));
}
}
/**
* @param map Node partition state map.
* @param parts Partitions assigned to node.
*/
private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
if (F.isEmpty(parts))
return;
for (Integer p : parts) {
GridDhtPartitionState state = map.get(p);
if (state == null || state == EVICTED)
map.put(p, MOVING);
}
}
/**
* Rebuilds {@link #diffFromAffinity} from given assignment.
*
* @param affAssignment New affinity assignment.
*/
private void rebuildDiff(AffinityAssignment affAssignment) {
assert lock.isWriteLockedByCurrentThread();
if (node2part == null)
return;
if (FAST_DIFF_REBUILD) {
Collection<UUID> affNodes = F.nodeIds(ctx.discovery().cacheGroupAffinityNodes(grp.groupId(),
affAssignment.topologyVersion()));
for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) {
int p = e.getKey();
Iterator<UUID> iter = e.getValue().iterator();
while (iter.hasNext()) {
UUID nodeId = iter.next();
if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId))
iter.remove();
}
}
}
else {
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
UUID nodeId = e.getKey();
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
Integer p0 = e0.getKey();
GridDhtPartitionState state = e0.getValue();
Set<UUID> ids = diffFromAffinity.get(p0);
if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) {
if (ids == null)
diffFromAffinity.put(p0, ids = U.newHashSet(3));
ids.add(nodeId);
}
else {
if (ids != null)
ids.remove(nodeId);
}
}
}
}
diffFromAffinityVer = affAssignment.topologyVersion();
}
/** {@inheritDoc} */
@Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, GridDhtPartitionsExchangeFuture fut) {
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
if (node2part == null)
return false;
// Do not trigger lost partition events on activation.
DiscoveryEvent discoEvt = fut.activateCluster() ? null : fut.firstEvent();
final GridClusterStateProcessor state = grp.shared().kernalContext().state();
boolean isInMemoryCluster = CU.isInMemoryCluster(
grp.shared().kernalContext().discovery().allNodes(),
grp.shared().kernalContext().marshallerContext().jdkMarshaller(),
U.resolveClassLoader(grp.shared().kernalContext().config())
);
boolean compatibleWithIgnorePlc = isInMemoryCluster
&& state.isBaselineAutoAdjustEnabled() && state.baselineAutoAdjustTimeout() == 0L;
// Calculate how data loss is handled.
boolean safe = grp.config().getPartitionLossPolicy() != IGNORE || !compatibleWithIgnorePlc;
int parts = grp.affinity().partitions();
Set<Integer> recentlyLost = null;
boolean changed = false;
for (int part = 0; part < parts; part++) {
boolean lost = F.contains(lostParts, part);
if (!lost) {
boolean hasOwner = false;
// Detect if all owners are left.
for (GridDhtPartitionMap partMap : node2part.values()) {
if (partMap.get(part) == OWNING) {
hasOwner = true;
break;
}
}
if (!hasOwner) {
lost = true;
// Do not detect and record lost partition in IGNORE mode.
if (safe) {
if (lostParts == null)
lostParts = new TreeSet<>();
lostParts.add(part);
if (discoEvt != null) {
if (recentlyLost == null)
recentlyLost = new HashSet<>();
recentlyLost.add(part);
if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
grp.addRebalanceEvent(part,
EVT_CACHE_REBALANCE_PART_DATA_LOST,
discoEvt.eventNode(),
discoEvt.type(),
discoEvt.timestamp());
}
}
}
}
}
if (lost) {
GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true);
if (locPart != null) {
if (locPart.state() == LOST)
continue;
final GridDhtPartitionState prevState = locPart.state();
changed = safe ? locPart.markLost() : locPart.own();
if (changed) {
long updSeq = updateSeq.incrementAndGet();
updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer);
// If a partition was lost while rebalancing reset it's counter to force demander mode.
if (prevState == MOVING)
locPart.resetUpdateCounter();
}
}
// Update remote maps according to policy.
for (Map.Entry<UUID, GridDhtPartitionMap> entry : node2part.entrySet()) {
if (entry.getKey().equals(ctx.localNodeId()))
continue;
GridDhtPartitionState p0 = entry.getValue().get(part);
if (p0 != null && p0 != EVICTED)
entry.getValue().put(part, safe ? LOST : OWNING);
}
}
}
if (recentlyLost != null) {
U.warn(log, "Detected lost partitions" + (!safe ? " (will ignore)" : "")
+ " [grp=" + grp.cacheOrGroupName()
+ ", parts=" + S.compact(recentlyLost)
+ ", topVer=" + resTopVer + "]");
}
return changed;
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public void resetLostPartitions(AffinityTopologyVersion resTopVer) {
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
long updSeq = updateSeq.incrementAndGet();
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
if (e0.getValue() != LOST)
continue;
e0.setValue(OWNING);
GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false);
if (locPart != null && locPart.state() == LOST) {
boolean marked = locPart.own();
if (marked) {
updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer);
// Reset counters to zero for triggering full rebalance.
locPart.resetInitialUpdateCounter();
}
}
}
}
lostParts = null;
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public Set<Integer> lostPartitions() {
lock.readLock().lock();
try {
return lostParts == null ? Collections.<Integer>emptySet() : new HashSet<>(lostParts);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public Map<UUID, Set<Integer>> resetOwners(
Map<Integer, Set<UUID>> ownersByUpdCounters,
Set<Integer> haveHist,
GridDhtPartitionsExchangeFuture exchFut) {
Map<UUID, Set<Integer>> res = new HashMap<>();
Collection<DiscoveryEvent> evts = exchFut.events().events();
Set<UUID> joinedNodes = U.newHashSet(evts.size());
for (DiscoveryEvent evt : evts) {
if (evt.type() == EVT_NODE_JOINED)
joinedNodes.add(evt.eventNode().id());
}
ctx.database().checkpointReadLock();
try {
Map<UUID, Set<Integer>> addToWaitGroups = new HashMap<>();
lock.writeLock().lock();
try {
// First process local partitions.
UUID locNodeId = ctx.localNodeId();
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> maxCounterPartOwners = entry.getValue();
GridDhtLocalPartition locPart = localPartition(part);
if (locPart == null || locPart.state() != OWNING)
continue;
// Partition state should be mutated only on joining nodes if they are exists for the exchange.
if (joinedNodes.isEmpty() && !maxCounterPartOwners.contains(locNodeId)) {
rebalancePartition(part, !haveHist.contains(part), exchFut);
res.computeIfAbsent(locNodeId, n -> new HashSet<>()).add(part);
}
}
// Then process node maps.
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> maxCounterPartOwners = entry.getValue();
for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
if (!joinedNodes.isEmpty() && !joinedNodes.contains(remoteNodeId))
continue;
GridDhtPartitionMap partMap = remotes.getValue();
GridDhtPartitionState state = partMap.get(part);
if (state != OWNING)
continue;
if (!maxCounterPartOwners.contains(remoteNodeId)) {
partMap.put(part, MOVING);
partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
if (partMap.nodeId().equals(locNodeId))
updateSeq.setIfGreater(partMap.updateSequence());
res.computeIfAbsent(remoteNodeId, n -> new HashSet<>()).add(part);
}
}
}
for (Map.Entry<UUID, Set<Integer>> entry : res.entrySet()) {
UUID nodeId = entry.getKey();
Set<Integer> rebalancedParts = entry.getValue();
addToWaitGroups.put(nodeId, new HashSet<>(rebalancedParts));
if (!rebalancedParts.isEmpty()) {
Set<Integer> historical = rebalancedParts.stream()
.filter(haveHist::contains)
.collect(Collectors.toSet());
// Filter out partitions having WAL history.
rebalancedParts.removeAll(historical);
U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter "
+ "[grp=" + grp.cacheOrGroupName()
+ ", readyTopVer=" + readyTopVer
+ ", topVer=" + exchFut.initialVersion()
+ ", nodeId=" + nodeId
+ ", partsFull=" + S.compact(rebalancedParts)
+ ", partsHistorical=" + S.compact(historical) + "]");
}
}
node2part = new GridDhtPartitionFullMap(node2part, updateSeq.incrementAndGet());
}
finally {
lock.writeLock().unlock();
}
List<List<ClusterNode>> ideal = ctx.affinity().affinity(groupId()).idealAssignmentRaw();
for (Map.Entry<UUID, Set<Integer>> entry : addToWaitGroups.entrySet()) {
// Add to wait groups to ensure late assignment switch after all partitions are rebalanced.
for (Integer part : entry.getValue()) {
ctx.cache().context().affinity().addToWaitGroup(
groupId(),
part,
topologyVersionFuture().initialVersion(),
ideal.get(part)
);
}
}
}
finally {
ctx.database().checkpointReadUnlock();
}
return res;
}
/**
* Prepares given partition {@code p} for rebalance.
* Changes partition state to MOVING and starts clearing if needed.
* Prevents ongoing renting if required.
*
* @param p Partition id.
* @param clear If {@code true} partition have to be cleared before rebalance (full rebalance or rebalance restart
* after cancellation).
* @param exchFut Future related to partition state change.
*/
private GridDhtLocalPartition rebalancePartition(int p, boolean clear, GridDhtPartitionsExchangeFuture exchFut) {
GridDhtLocalPartition part = getOrCreatePartition(p);
if (part.state() == LOST)
return part;
// Prevent renting.
if (part.state() == RENTING) {
if (!part.moving()) {
assert part.state() == EVICTED : part;
part = getOrCreatePartition(p);
if (part.state() == LOST)
return part;
}
}
if (part.state() != MOVING)
part.moving();
else
part.updateClearVersion();
if (clear)
exchFut.addClearingPartition(grp, part.id());
assert part.state() == MOVING : part;
return part;
}
/**
* Finds local partitions which don't belong to affinity and runs eviction process for such partitions.
*
* @param updateSeq Update sequence.
* @param aff Affinity assignments.
* @return {@code True} if there are local partitions need to be evicted.
*/
private boolean checkEvictions(long updateSeq, AffinityAssignment aff) {
assert lock.isWriteLockedByCurrentThread();
if (!ctx.kernalContext().state().evictionsAllowed())
return false;
boolean hasEvictedPartitions = false;
UUID locId = ctx.localNodeId();
for (int p = 0; p < locParts.length(); p++) {
GridDhtLocalPartition part = locParts.get(p);
if (part == null || !part.state().active())
continue;
List<ClusterNode> affNodes = aff.get(p);
// This node is affinity node for partition, no need to run eviction.
if (affNodes.contains(ctx.localNode()))
continue;
List<ClusterNode> nodes = nodes(p, aff.topologyVersion(), OWNING);
Collection<UUID> nodeIds = F.nodeIds(nodes);
// If all affinity nodes are owners, then evict partition from local node.
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
GridDhtPartitionState state0 = part.state();
part.rent();
updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion());
boolean stateChanged = state0 != part.state();
hasEvictedPartitions |= stateChanged;
if (stateChanged && log.isDebugEnabled()) {
log.debug("Partition has been scheduled for eviction (all affinity nodes are owners) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + part.id() + ", prevState=" + state0 + ", state=" + part.state() + "]");
}
}
else {
int ownerCnt = nodeIds.size();
int affCnt = affNodes.size();
if (ownerCnt > affCnt) {
// Sort by node orders in ascending order.
Collections.sort(nodes, CU.nodeComparator(true));
int diff = nodes.size() - affCnt;
for (int i = 0; i < diff; i++) {
ClusterNode n = nodes.get(i);
if (locId.equals(n.id())) {
GridDhtPartitionState state0 = part.state();
part.rent();
updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion());
boolean stateChanged = state0 != part.state();
hasEvictedPartitions |= stateChanged;
if (stateChanged && log.isDebugEnabled()) {
log.debug("Partition has been scheduled for eviction (this node is oldest non-affinity node) " +
"[grp=" + grp.cacheOrGroupName() + ", p=" + part.id() + ", prevState=" + state0 +
", state=" + part.state() + "]");
}
break;
}
}
}
}
}
return hasEvictedPartitions;
}
/**
* Updates state of partition in local {@link #node2part} map and recalculates {@link #diffFromAffinity}.
*
* @param p Partition.
* @param state Partition state.
* @param updateSeq Update sequence.
* @param affVer Affinity version.
* @return Update sequence.
*/
private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, AffinityTopologyVersion affVer) {
assert lock.isWriteLockedByCurrentThread();
ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null || ctx.kernalContext().clientNode();
// If this node became the oldest node.
if (ctx.localNode().equals(oldest) && node2part != null) {
long seq = node2part.updateSequence();
if (seq != updateSeq) {
if (seq > updateSeq) {
long seq0 = this.updateSeq.get();
if (seq0 < seq) {
// Update global counter if necessary.
boolean b = this.updateSeq.compareAndSet(seq0, seq + 1);
assert b : "Invalid update sequence [updateSeq=" + updateSeq +
", grp=" + grp.cacheOrGroupName() +
", seq=" + seq +
", curUpdateSeq=" + this.updateSeq.get() +
", node2part=" + node2part.toFullString() + ']';
updateSeq = seq + 1;
}
else
updateSeq = seq;
}
node2part.updateSequence(updateSeq);
}
}
if (node2part != null) {
UUID locNodeId = ctx.localNodeId();
GridDhtPartitionMap map = node2part.get(locNodeId);
if (map == null) {
map = new GridDhtPartitionMap(locNodeId,
updateSeq,
affVer,
GridPartitionStateMap.EMPTY,
false);
node2part.put(locNodeId, map);
}
else
map.updateSequence(updateSeq, affVer);
map.put(p, state);
if (!grp.isReplicated() && (state == MOVING || state == OWNING || state == RENTING)) {
AffinityAssignment assignment = grp.affinity().cachedAffinity(diffFromAffinityVer);
if (!assignment.getIds(p).contains(ctx.localNodeId())) {
Set<UUID> diffIds = diffFromAffinity.get(p);
if (diffIds == null)
diffFromAffinity.put(p, diffIds = U.newHashSet(3));
diffIds.add(ctx.localNodeId());
}
}
}
return updateSeq;
}
/**
* Removes node from local {@link #node2part} map and recalculates {@link #diffFromAffinity}.
*
* @param nodeId Node to remove.
*/
private void removeNode(UUID nodeId) {
assert nodeId != null;
assert lock.isWriteLockedByCurrentThread();
ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null || ctx.kernalContext().clientNode();
ClusterNode loc = ctx.localNode();
if (node2part != null) {
if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id()))
node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.get(),
node2part, false);
else
node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
GridDhtPartitionMap parts = node2part.remove(nodeId);
if (!grp.isReplicated()) {
if (parts != null) {
for (Integer p : parts.keySet()) {
Set<UUID> diffIds = diffFromAffinity.get(p);
if (diffIds != null)
diffIds.remove(nodeId);
}
}
}
consistencyCheck();
}
}
/** {@inheritDoc} */
@Override public boolean own(GridDhtLocalPartition part) {
lock.writeLock().lock();
try {
if (part.own()) {
assert lastTopChangeVer.initialized() : lastTopChangeVer;
long updSeq = updateSeq.incrementAndGet();
updateLocal(part.id(), part.state(), updSeq, lastTopChangeVer);
consistencyCheck();
return true;
}
consistencyCheck();
return false;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void ownMoving() {
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
for (GridDhtLocalPartition locPart : currentLocalPartitions()) {
if (locPart.state() == MOVING) {
boolean reserved = locPart.reserve();
try {
if (reserved && locPart.state() == MOVING)
own(locPart);
} finally {
if (reserved)
locPart.release();
}
}
}
} finally {
lock.writeLock().unlock();
}
} finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public boolean tryFinishEviction(GridDhtLocalPartition part) {
ctx.database().checkpointReadLock();
try {
// Write lock protects from concurrent partition creation.
lock.writeLock().lock();
try {
if (stopping)
return false;
part.finishEviction();
if (part.state() != EVICTED)
return false;
long seq = this.updateSeq.incrementAndGet();
assert lastTopChangeVer.initialized() : lastTopChangeVer;
updateLocal(part.id(), part.state(), seq, lastTopChangeVer);
consistencyCheck();
grp.onPartitionEvicted(part.id());
try {
grp.offheap().destroyCacheDataStore(part.dataStore());
}
catch (IgniteCheckedException e) {
log.error("Unable to destroy cache data store on partition eviction [id=" + part.id() + "]", e);
}
part.clearDeferredDeletes();
List<GridDhtLocalPartition> parts = localPartitions();
boolean renting = false;
for (GridDhtLocalPartition part0 : parts) {
if (part0.state() == RENTING) {
renting = true;
break;
}
}
// Refresh partitions when all is evicted and local map is updated.
if (!renting)
ctx.exchange().scheduleResendPartitions();
return true;
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) {
lock.readLock().lock();
try {
return node2part.get(nodeId);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void finalizeUpdateCounters(Set<Integer> parts) {
// It is need to acquire checkpoint lock before topology lock acquiring.
ctx.database().checkpointReadLock();
try {
WALPointer ptr = null;
lock.readLock().lock();
try {
for (int p : parts) {
GridDhtLocalPartition part = locParts.get(p);
if (part != null && part.state().active()) {
// We need to close all gaps in partition update counters sequence. We assume this finalizing is
// happened on exchange and hence all txs are completed. Therefore each gap in update counters
// sequence is a result of undelivered DhtTxFinishMessage on backup (sequences on primary nodes
// do not have gaps). Here we close these gaps and asynchronously notify continuous query engine
// about the skipped events.
AffinityTopologyVersion topVer = ctx.exchange().readyAffinityVersion();
GridLongList gaps = part.finalizeUpdateCounters();
if (gaps != null) {
for (int j = 0; j < gaps.size() / 2; j++) {
long gapStart = gaps.get(j * 2);
long gapStop = gaps.get(j * 2 + 1);
if (part.group().persistenceEnabled() &&
part.group().walEnabled() &&
!part.group().mvccEnabled()) {
// Rollback record tracks applied out-of-order updates while finalizeUpdateCounters
// return gaps (missing updates). The code below transforms gaps to updates.
RollbackRecord rec = new RollbackRecord(part.group().groupId(), part.id(),
gapStart - 1, gapStop - gapStart + 1);
try {
ptr = ctx.wal().log(rec);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
}
}
for (GridCacheContext ctx0 : grp.caches())
ctx0.continuousQueries().closeBackupUpdateCountersGaps(ctx0, part.id(), topVer, gaps);
}
}
}
}
finally {
try {
if (ptr != null)
ctx.wal().flush(ptr, false);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
finally {
lock.readLock().unlock();
}
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/** {@inheritDoc} */
@Override public CachePartitionFullCountersMap fullUpdateCounters() {
lock.readLock().lock();
try {
return new CachePartitionFullCountersMap(cntrMap);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
lock.readLock().lock();
try {
int locPartCnt = 0;
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part != null)
locPartCnt++;
}
CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(locPartCnt);
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part == null)
continue;
long initCntr = part.initialUpdateCounter();
long updCntr = part.updateCounter();
if (skipZeros && initCntr == 0L && updCntr == 0L)
continue;
res.add(part.id(), initCntr, updCntr);
}
res.trim();
return res;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public Map<Integer, Long> partitionSizes() {
lock.readLock().lock();
try {
Map<Integer, Long> partitionSizes = new HashMap<>();
for (int p = 0; p < locParts.length(); p++) {
GridDhtLocalPartition part = locParts.get(p);
if (part == null || part.fullSize() == 0)
continue;
partitionSizes.put(part.id(), part.fullSize());
}
return partitionSizes;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public Map<Integer, Long> globalPartSizes() {
lock.readLock().lock();
try {
if (globalPartSizes == null)
return Collections.emptyMap();
return Collections.unmodifiableMap(globalPartSizes);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) {
lock.writeLock().lock();
try {
this.globalPartSizes = partSizes;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
AffinityTopologyVersion curTopVer = this.readyTopVer;
return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer);
}
/** {@inheritDoc} */
@Override public boolean hasMovingPartitions() {
lock.readLock().lock();
try {
if (node2part == null)
return false;
assert node2part.valid() : "Invalid node2part [node2part: " + node2part +
", grp=" + grp.cacheOrGroupName() +
", stopping=" + stopping +
", locNodeId=" + ctx.localNodeId() +
", locName=" + ctx.igniteInstanceName() + ']';
for (GridDhtPartitionMap map : node2part.values()) {
if (map.hasMovingPartitions())
return true;
}
return false;
}
finally {
lock.readLock().unlock();
}
}
/**
* @param cacheId Cache ID.
*/
public void onCacheStopped(int cacheId) {
if (!grp.sharedGroup())
return;
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part != null)
part.onCacheStopped(cacheId);
}
}
/** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() +
", grp=" + grp.cacheOrGroupName() + ']');
lock.readLock().lock();
try {
for (int i = 0; i < locParts.length(); i++) {
GridDhtLocalPartition part = locParts.get(i);
if (part == null)
continue;
long size = part.dataStore().fullSize();
if (size >= threshold)
X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']');
}
}
finally {
lock.readLock().unlock();
}
}
/**
* @param part Partition.
* @param aff Affinity assignments.
* @return {@code True} if given partition belongs to local node.
*/
private boolean localNode(int part, List<List<ClusterNode>> aff) {
return aff.get(part).contains(ctx.localNode());
}
/**
* @param affVer Affinity version.
* @param aff Affinity assignments.
*/
private void updateRebalanceVersion(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff) {
if (!grp.isReplicated() && !affVer.equals(diffFromAffinityVer))
return;
if (!rebalancedTopVer.equals(readyTopVer)) {
if (node2part == null || !node2part.valid())
return;
for (int i = 0; i < grp.affinity().partitions(); i++) {
List<ClusterNode> affNodes = aff.get(i);
// Topology doesn't contain server nodes (just clients).
if (affNodes.isEmpty())
continue;
for (ClusterNode node : affNodes) {
if (!hasState(i, node.id(), OWNING))
return;
}
if (!grp.isReplicated()) {
Set<UUID> diff = diffFromAffinity.get(i);
if (diff != null) {
for (UUID nodeId : diff) {
if (hasState(i, nodeId, OWNING)) {
ClusterNode node = ctx.discovery().node(nodeId);
if (node != null && !affNodes.contains(node))
return;
}
}
}
}
}
rebalancedTopVer = readyTopVer;
if (log.isDebugEnabled())
log.debug("Updated rebalanced version [grp=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']');
}
}
/**
* Checks that state of partition {@code p} for node {@code nodeId} in local {@code node2part} map
* matches to one of the specified states {@code match} or {@ode matches}.
*
* @param p Partition id.
* @param nodeId Node ID.
* @param match State to match.
* @param matches Additional states.
* @return {@code True} if partition matches to one of the specified states.
*/
private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match,
final GridDhtPartitionState... matches) {
if (nodeId == null)
return false;
GridDhtPartitionMap parts = node2part.get(nodeId);
// Set can be null if node has been removed.
if (parts != null) {
GridDhtPartitionState state = parts.get(p);
if (state == match)
return true;
if (matches != null && matches.length > 0) {
for (GridDhtPartitionState s : matches) {
if (state == s)
return true;
}
}
}
return false;
}
/** {@inheritDoc} */
@Override public boolean rent(int p) {
ctx.database().checkpointReadLock();
try {
lock.writeLock().lock();
try {
// Do not rent if PME in progress, will be rented later if applicable.
if (lastTopChangeVer.after(readyTopVer))
return false;
GridDhtLocalPartition locPart = localPartition(p);
if (locPart == null)
return false;
GridDhtPartitionState state0 = locPart.state();
if (state0 == RENTING || state0 == EVICTED || partitionLocalNode(p, readyTopVer))
return false;
locPart.rent();
GridDhtPartitionState state = locPart.state();
if (state == RENTING && state != state0) {
long updateSeq = this.updateSeq.incrementAndGet();
updateLocal(p, state0, updateSeq, readyTopVer);
ctx.exchange().scheduleResendPartitions();
}
return true;
}
finally {
lock.writeLock().unlock();
}
}
finally {
ctx.database().checkpointReadUnlock();
}
}
/**
* Checks consistency after all operations.
*/
private void consistencyCheck() {
// No-op.
}
/**
* Collects states of local partitions.
*
* @return String representation of all local partition states.
*/
private String dumpPartitionStates() {
SB sb = new SB();
for (int p = 0; p < locParts.length(); p++) {
GridDhtLocalPartition part = locParts.get(p);
if (part == null)
continue;
sb.a("Part [");
sb.a("id=" + part.id() + ", ");
sb.a("state=" + part.state() + ", ");
sb.a("initCounter=" + part.initialUpdateCounter() + ", ");
sb.a("updCounter=" + part.updateCounter() + ", ");
sb.a("size=" + part.fullSize() + "] ");
}
return sb.toString();
}
/**
* Iterator over current local partitions.
*/
private class CurrentPartitionsIterator implements Iterator<GridDhtLocalPartition> {
/** Next index. */
private int nextIdx;
/** Next partition. */
private GridDhtLocalPartition nextPart;
/**
* Constructor
*/
private CurrentPartitionsIterator() {
advance();
}
/**
* Try to advance to next partition.
*/
private void advance() {
while (nextIdx < locParts.length()) {
GridDhtLocalPartition part = locParts.get(nextIdx);
if (part != null && part.state().active()) {
nextPart = part;
return;
}
nextIdx++;
}
}
/** {@inheritDoc} */
@Override public boolean hasNext() {
return nextPart != null;
}
/** {@inheritDoc} */
@Override public GridDhtLocalPartition next() {
if (nextPart == null)
throw new NoSuchElementException();
GridDhtLocalPartition retVal = nextPart;
nextPart = null;
nextIdx++;
advance();
return retVal;
}
/** {@inheritDoc} */
@Override public void remove() {
throw new UnsupportedOperationException("remove");
}
}
/**
* Partition factory used for (re-)creating partitions during their lifecycle.
* Currently used in tests for overriding default partition behavior.
*/
public interface PartitionFactory {
/**
* @param ctx Context.
* @param grp Group.
* @param id Partition id.
* @param recovery Recovery mode.
* @return New partition instance.
*/
public GridDhtLocalPartition create(
GridCacheSharedContext ctx,
CacheGroupContext grp,
int id,
boolean recovery
);
}
}