blob: 4df827338b327dbaf8a9f0c5ddadc660696ea4b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.topology;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.PartitionLossPolicy;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture;
import org.apache.ignite.internal.processors.cluster.GridClusterStateProcessor;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridAtomicLong;
import org.apache.ignite.internal.util.GridPartitionStateMap;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
* Partition topology for node which does not have any local partitions.
*/
@GridToStringExclude
public class GridClientPartitionTopology implements GridDhtPartitionTopology {
/** */
private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING};
/** If true, then check consistency. */
private static final boolean CONSISTENCY_CHECK = false;
/** Flag to control amount of output for full map. */
private static final boolean FULL_MAP_DEBUG = false;
/** Cache shared context. */
private GridCacheSharedContext cctx;
/** Cache ID. */
private int grpId;
/** Logger. */
private final IgniteLogger log;
/** Node to partition map. */
private GridDhtPartitionFullMap node2part;
/** Partition to node map. */
private final Map<Integer, Set<UUID>> part2node = new HashMap<>();
/** */
private AffinityTopologyVersion lastExchangeVer;
/** */
private AffinityTopologyVersion topVer = AffinityTopologyVersion.NONE;
/** */
private volatile boolean stopping;
/** A future that will be completed when topology with version topVer will be ready to use. */
private volatile GridDhtTopologyFuture topReadyFut;
/** */
private final GridAtomicLong updateSeq = new GridAtomicLong(1);
/** Lock. */
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/** Partition update counters. */
private CachePartitionFullCountersMap cntrMap;
/** */
private final Object similarAffKey;
/** */
private volatile DiscoCache discoCache;
/** */
private final int parts;
/** */
private volatile Map<Integer, Long> globalPartSizes;
/** */
private Set<Integer> lostParts;
/** */
private PartitionLossPolicy partLossPlc;
/**
* @param cctx Context.
* @param discoCache Discovery data cache.
* @param grpId Group ID.
* @param parts Number of partitions in the group.
* @param similarAffKey Key to find caches with similar affinity.
* @param partLossPlc Loss policy.
*/
public GridClientPartitionTopology(
GridCacheSharedContext<?, ?> cctx,
DiscoCache discoCache,
int grpId,
int parts,
Object similarAffKey,
PartitionLossPolicy partLossPlc
) {
this.cctx = cctx;
this.discoCache = discoCache;
this.grpId = grpId;
this.similarAffKey = similarAffKey;
this.parts = parts;
topVer = AffinityTopologyVersion.NONE;
log = cctx.logger(getClass());
node2part = new GridDhtPartitionFullMap(cctx.localNode().id(),
cctx.localNode().order(),
updateSeq.get());
cntrMap = new CachePartitionFullCountersMap(parts);
this.partLossPlc = partLossPlc;
}
/** {@inheritDoc} */
@Override public int partitions() {
return parts;
}
/**
* @return Key to find caches with similar affinity.
*/
@Nullable public Object similarAffinityKey() {
return similarAffKey;
}
/**
* @return Full map string representation.
*/
private String fullMapString() {
return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString();
}
/**
* @param map Map to get string for.
* @return Full map string representation.
*/
private String mapString(GridDhtPartitionMap map) {
return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString();
}
/** {@inheritDoc} */
@Override public int groupId() {
return grpId;
}
/** {@inheritDoc} */
@SuppressWarnings({"LockAcquiredButNotSafelyReleased"})
@Override public void readLock() {
lock.readLock().lock();
}
/** {@inheritDoc} */
@Override public void readUnlock() {
lock.readLock().unlock();
}
/** {@inheritDoc} */
@Override public boolean holdsLock() {
return lock.isWriteLockedByCurrentThread() || lock.getReadHoldCount() > 0;
}
/** {@inheritDoc} */
@Override public void updateTopologyVersion(
GridDhtTopologyFuture exchFut,
DiscoCache discoCache,
long updSeq,
boolean stopping
) throws IgniteInterruptedCheckedException {
U.writeLock(lock);
try {
AffinityTopologyVersion exchTopVer = exchFut.initialVersion();
assert exchTopVer.compareTo(topVer) > 0 : "Invalid topology version [grp=" + grpId +
", topVer=" + topVer +
", exchVer=" + exchTopVer +
", discoCacheVer=" + (this.discoCache != null ? this.discoCache.version() : "None") +
", exchDiscoCacheVer=" + discoCache.version() + ']';
this.stopping = stopping;
topVer = exchTopVer;
this.discoCache = discoCache;
updateSeq.setIfGreater(updSeq);
topReadyFut = exchFut;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public boolean initialized() {
lock.readLock().lock();
try {
assert topVer != null;
return !AffinityTopologyVersion.NONE.equals(topVer);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion readyTopologyVersion() {
lock.readLock().lock();
try {
assert topVer.topologyVersion() > 0;
return topVer;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion lastTopologyChangeVersion() {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public GridDhtTopologyFuture topologyVersionFuture() {
assert topReadyFut != null;
return topReadyFut;
}
/** {@inheritDoc} */
@Override public boolean stopping() {
return stopping;
}
/** {@inheritDoc} */
@Override public boolean initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer,
GridDhtPartitionsExchangeFuture exchFut) {
return false;
}
/** {@inheritDoc} */
@Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut,
boolean initParts,
boolean updateMoving)
throws IgniteCheckedException
{
ClusterNode loc = cctx.localNode();
U.writeLock(lock);
try {
if (stopping)
return;
discoCache = exchFut.events().discoveryCache();
beforeExchange0(loc, exchFut);
if (updateMoving) {
ExchangeDiscoveryEvents evts = exchFut.context().events();
GridAffinityAssignmentCache aff = cctx.affinity().affinity(grpId);
assert aff.lastVersion().equals(evts.topologyVersion());
createMovingPartitions(aff.readyAffinity(evts.topologyVersion()));
}
}
finally {
lock.writeLock().unlock();
}
}
/**
* @param aff Affinity.
*/
private void createMovingPartitions(AffinityAssignment aff) {
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
GridDhtPartitionMap map = e.getValue();
addMoving(map, aff.backupPartitions(e.getKey()));
addMoving(map, aff.primaryPartitions(e.getKey()));
}
}
/**
* @param map Node partition state map.
* @param parts Partitions assigned to node.
*/
private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) {
if (F.isEmpty(parts))
return;
for (Integer p : parts) {
GridDhtPartitionState state = map.get(p);
if (state == null || state == EVICTED)
map.put(p, MOVING);
}
}
/**
* @param loc Local node.
* @param exchFut Exchange future.
*/
private void beforeExchange0(ClusterNode loc, GridDhtPartitionsExchangeFuture exchFut) {
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
if (exchFut.context().events().hasServerLeft()) {
for (DiscoveryEvent evt : exchFut.context().events().events()) {
if (ExchangeDiscoveryEvents.serverLeftEvent(evt))
removeNode(evt.eventNode().id());
}
}
// In case if node joins, get topology at the time of joining node.
ClusterNode oldest = discoCache.oldestAliveServerNode();
assert oldest != null;
if (log.isDebugEnabled())
log.debug("Partition map beforeExchange [exchId=" + exchId + ", fullMap=" + fullMapString() + ']');
long updateSeq = this.updateSeq.incrementAndGet();
// If this is the oldest node (coordinator) or cache was added during this exchange
if (oldest.id().equals(loc.id()) || exchFut.dynamicCacheGroupStarted(grpId)) {
if (node2part == null) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq);
if (log.isDebugEnabled())
log.debug("Created brand new full topology map on oldest node [exchId=" +
exchId + ", fullMap=" + fullMapString() + ']');
}
else if (!node2part.valid()) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
if (log.isDebugEnabled())
log.debug("Created new full topology map on oldest node [exchId=" + exchId + ", fullMap=" +
node2part + ']');
}
else if (!node2part.nodeId().equals(loc.id())) {
node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq, node2part, false);
if (log.isDebugEnabled())
log.debug("Copied old map into new map on oldest node (previous oldest node left) [exchId=" +
exchId + ", fullMap=" + fullMapString() + ']');
}
}
consistencyCheck();
if (log.isDebugEnabled())
log.debug("Partition map after beforeExchange [exchId=" + exchId + ", fullMap=" +
fullMapString() + ']');
}
/** {@inheritDoc} */
@Override public void afterStateRestored(AffinityTopologyVersion topVer) {
// no-op
}
/** {@inheritDoc} */
@Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) throws IgniteCheckedException {
AffinityTopologyVersion topVer = exchFut.topologyVersion();
lock.writeLock().lock();
try {
assert topVer.equals(exchFut.topologyVersion()) : "Invalid topology version [topVer=" +
topVer + ", exchId=" + exchFut.exchangeId() + ']';
if (log.isDebugEnabled())
log.debug("Partition map before afterExchange [exchId=" + exchFut.exchangeId() + ", fullMap=" +
fullMapString() + ']');
updateSeq.incrementAndGet();
consistencyCheck();
}
finally {
lock.writeLock().unlock();
}
return false;
}
/** {@inheritDoc} */
@Nullable @Override public GridDhtLocalPartition localPartition(
int p,
AffinityTopologyVersion topVer,
boolean create
)
throws GridDhtInvalidPartitionException {
if (!create)
return null;
throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition (often may be caused by " +
"inconsistent 'key.hashCode()' implementation) " +
"[part=" + p + ", topVer=" + topVer + ", this.topVer=" + this.topVer + ']');
}
/** {@inheritDoc} */
@Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer,
boolean create, boolean showRenting) throws GridDhtInvalidPartitionException {
return localPartition(p, topVer, create);
}
/** {@inheritDoc} */
@Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public GridDhtLocalPartition localPartition(int p) {
return localPartition(p, AffinityTopologyVersion.NONE, false);
}
/** {@inheritDoc} */
@Override public void releasePartitions(int... parts) {
// No-op.
}
/** {@inheritDoc} */
@Override public List<GridDhtLocalPartition> localPartitions() {
return Collections.emptyList();
}
/** {@inheritDoc} */
@Override public Collection<GridDhtLocalPartition> currentLocalPartitions() {
return Collections.emptyList();
}
/** {@inheritDoc} */
@Override public void onRemoved(GridDhtCacheEntry e) {
assert false : "Entry should not be removed from client topology: " + e;
}
/** {@inheritDoc} */
@Override public GridDhtPartitionMap localPartitionMap() {
lock.readLock().lock();
try {
return new GridDhtPartitionMap(cctx.localNodeId(), updateSeq.get(), topVer,
GridPartitionStateMap.EMPTY, true);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public GridDhtPartitionState partitionState(UUID nodeId, int part) {
lock.readLock().lock();
try {
GridDhtPartitionMap partMap = node2part.get(nodeId);
if (partMap != null) {
GridDhtPartitionState state = partMap.get(part);
return state == null ? EVICTED : state;
}
return EVICTED;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Nullable @Override public List<ClusterNode> nodes(int p,
AffinityAssignment affAssignment,
List<ClusterNode> affNodes) {
throw new UnsupportedOperationException();
}
/** {@inheritDoc} */
@Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) {
lock.readLock().lock();
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
", node2part=" + node2part + ']';
List<ClusterNode> nodes = null;
Collection<UUID> nodeIds = part2node.get(p);
if (!F.isEmpty(nodeIds)) {
for (UUID nodeId : nodeIds) {
ClusterNode n = discoCache.node(nodeId);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) {
if (nodes == null)
nodes = new ArrayList<>(nodeIds.size());
nodes.add(n);
}
}
}
return nodes;
}
finally {
lock.readLock().unlock();
}
}
/**
* @param p Partition.
* @param topVer Topology version ({@code -1} for all nodes).
* @param state Partition state.
* @param states Additional partition states.
* @return List of nodes for the partition.
*/
private List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer, GridDhtPartitionState state, GridDhtPartitionState... states) {
Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grpId));
lock.readLock().lock();
try {
assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer +
", allIds=" + allIds + ", node2part=" + node2part + ']';
Collection<UUID> nodeIds = part2node.get(p);
// Node IDs can be null if both, primary and backup, nodes disappear.
int size = nodeIds == null ? 0 : nodeIds.size();
if (size == 0)
return Collections.emptyList();
List<ClusterNode> nodes = new ArrayList<>(size);
for (UUID id : nodeIds) {
if (topVer.topologyVersion() > 0 && !F.contains(allIds, id))
continue;
if (hasState(p, id, state, states)) {
ClusterNode n = discoCache.node(id);
if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion()))
nodes.add(n);
}
}
return nodes;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) {
return nodes(p, topVer, OWNING, null);
}
/** {@inheritDoc} */
@Override public List<ClusterNode> owners(int p) {
return owners(p, AffinityTopologyVersion.NONE);
}
/** {@inheritDoc} */
@Override public List<List<ClusterNode>> allOwners() {
lock.readLock().lock();
try {
int parts = partitions();
List<List<ClusterNode>> res = new ArrayList<>(parts);
for (int i = 0; i < parts; i++)
res.add(new ArrayList<>());
List<ClusterNode> allNodes = discoCache.cacheGroupAffinityNodes(grpId);
for (int i = 0; i < allNodes.size(); i++) {
ClusterNode node = allNodes.get(i);
GridDhtPartitionMap nodeParts = node2part.get(node.id());
if (nodeParts != null) {
for (Map.Entry<Integer, GridDhtPartitionState> e : nodeParts.map().entrySet()) {
if (e.getValue() == OWNING) {
int part = e.getKey();
List<ClusterNode> owners = res.get(part);
owners.add(node);
}
}
}
}
return res;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public List<ClusterNode> moving(int p) {
return nodes(p, AffinityTopologyVersion.NONE, MOVING, null);
}
/**
* @param p Partition.
* @param topVer Topology version.
* @return List of nodes in state OWNING or MOVING.
*/
private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) {
return nodes(p, topVer, OWNING, MOVING_STATES);
}
/** {@inheritDoc} */
@Override public long updateSequence() {
return updateSeq.get();
}
/**
* @return Last update sequence.
*/
public long lastUpdateSequence() {
lock.writeLock().lock();
try {
return updateSeq.incrementAndGet();
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) {
lock.readLock().lock();
try {
if (stopping || node2part == null)
return null;
assert node2part.valid() : "Invalid node2part [node2part: " + node2part +
", locNodeId=" + cctx.localNodeId() +
", igniteInstanceName=" + cctx.igniteInstanceName() + ']';
GridDhtPartitionFullMap m = node2part;
return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive);
}
finally {
lock.readLock().unlock();
}
}
/**
* Checks should current partition map overwritten by new partition map
* Method returns true if topology version or update sequence of new map are greater than of current map.
*
* @param currentMap Current partition map.
* @param newMap New partition map.
* @return True if current partition map should be overwritten by new partition map, false in other case.
*/
private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
return newMap != null && newMap.compareTo(currentMap) > 0;
}
/** {@inheritDoc} */
@Override public boolean update(
@Nullable AffinityTopologyVersion exchangeVer,
GridDhtPartitionFullMap partMap,
@Nullable CachePartitionFullCountersMap cntrMap,
Set<Integer> partsToReload,
@Nullable Map<Integer, Long> partSizes,
@Nullable AffinityTopologyVersion msgTopVer,
@Nullable GridDhtPartitionsExchangeFuture exchFut,
@Nullable Set<Integer> lostParts
) {
if (log.isDebugEnabled())
log.debug("Updating full partition map [exchVer=" + exchangeVer + ", parts=" + fullMapString() + ']');
lock.writeLock().lock();
try {
if (exchangeVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(exchangeVer) >= 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeVer + ", exchVer=" + exchangeVer + ']');
return false;
}
if (msgTopVer != null && lastExchangeVer != null && lastExchangeVer.compareTo(msgTopVer) > 0) {
if (log.isDebugEnabled())
log.debug("Stale topology version for full partition map update message (will ignore) " +
"[lastExchId=" + lastExchangeVer + ", topVersion=" + msgTopVer + ']');
return false;
}
boolean fullMapUpdated = (node2part == null);
if (node2part != null) {
for (GridDhtPartitionMap part : node2part.values()) {
GridDhtPartitionMap newPart = partMap.get(part.nodeId());
if (shouldOverridePartitionMap(part, newPart)) {
fullMapUpdated = true;
if (log.isDebugEnabled())
log.debug("Overriding partition map in full update map [exchId=" + exchangeVer + ", curPart=" +
mapString(part) + ", newPart=" + mapString(newPart) + ']');
}
else {
// If for some nodes current partition has a newer map,
// then we keep the newer value.
partMap.put(part.nodeId(), part);
}
}
// Check that we have new nodes.
for (GridDhtPartitionMap part : partMap.values()) {
if (fullMapUpdated)
break;
fullMapUpdated = !node2part.containsKey(part.nodeId());
}
// Remove entry if node left.
for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) {
UUID nodeId = it.next();
if (!cctx.discovery().alive(nodeId)) {
if (log.isDebugEnabled())
log.debug("Removing left node from full map update [nodeId=" + nodeId + ", partMap=" +
partMap + ']');
it.remove();
}
}
}
if (!fullMapUpdated) {
if (log.isDebugEnabled())
log.debug("No updates for full partition map (will ignore) [lastExch=" +
lastExchangeVer + ", exch=" + exchangeVer + ", curMap=" + node2part + ", newMap=" + partMap + ']');
return false;
}
if (exchangeVer != null)
lastExchangeVer = exchangeVer;
node2part = partMap;
updateSeq.incrementAndGet();
part2node.clear();
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
if (e0.getValue() != MOVING && e0.getValue() != OWNING)
continue;
int p = e0.getKey();
Set<UUID> ids = part2node.get(p);
if (ids == null)
// Initialize HashSet to size 3 in anticipation that there won't be
// more than 3 nodes per partitions.
part2node.put(p, ids = U.newHashSet(3));
ids.add(e.getKey());
}
}
if (cntrMap != null)
this.cntrMap = new CachePartitionFullCountersMap(cntrMap);
if (partSizes != null)
this.globalPartSizes = partSizes;
consistencyCheck();
if (exchangeVer != null)
this.lostParts = lostParts == null ? null : new TreeSet<>(lostParts);
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
return false;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) {
assert cntrMap != null;
lock.writeLock().lock();
try {
for (int i = 0; i < cntrMap.size(); i++) {
int pId = cntrMap.partitionAt(i);
long initUpdateCntr = cntrMap.initialUpdateCounterAt(i);
long updateCntr = cntrMap.updateCounterAt(i);
if (this.cntrMap.updateCounter(pId) < updateCntr) {
this.cntrMap.initialUpdateCounter(pId, initUpdateCntr);
this.cntrMap.updateCounter(pId, updateCntr);
}
}
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void applyUpdateCounters() {
// No-op on client topology.
}
/**
* Method checks is new partition map more stale than current partition map
* New partition map is stale if topology version or update sequence are less or equal than of current map
*
* @param currentMap Current partition map
* @param newMap New partition map
* @return True if new partition map is more stale than current partition map, false in other case
*/
private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) {
return currentMap != null && newMap.compareTo(currentMap) <= 0;
}
/** {@inheritDoc} */
@Override public boolean update(
@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap parts,
boolean force
) {
if (log.isDebugEnabled())
log.debug("Updating single partition map [exchId=" + exchId + ", parts=" + mapString(parts) + ']');
if (!cctx.discovery().alive(parts.nodeId())) {
if (log.isDebugEnabled())
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
return false;
}
lock.writeLock().lock();
try {
if (stopping)
return false;
if (!force) {
if (lastExchangeVer != null && exchId != null && lastExchangeVer.compareTo(exchId.topologyVersion()) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchVer=" +
lastExchangeVer + ", exchId=" + exchId + ']');
return false;
}
}
if (exchId != null)
lastExchangeVer = exchId.topologyVersion();
if (node2part == null)
// Create invalid partition map.
node2part = new GridDhtPartitionFullMap();
GridDhtPartitionMap cur = node2part.get(parts.nodeId());
if (force) {
if (cur != null && cur.topologyVersion().initialized())
parts.updateSequence(cur.updateSequence(), cur.topologyVersion());
}
else if (isStaleUpdate(cur, parts)) {
if (log.isDebugEnabled())
log.debug("Stale update for single partition map update (will ignore) [exchId=" + exchId +
", curMap=" + cur + ", newMap=" + parts + ']');
return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
node2part = new GridDhtPartitionFullMap(node2part, updateSeq);
boolean changed = false;
if (cur == null || !cur.equals(parts))
changed = true;
node2part.put(parts.nodeId(), parts);
// Add new mappings.
for (Map.Entry<Integer,GridDhtPartitionState> e : parts.entrySet()) {
int p = e.getKey();
Set<UUID> ids = part2node.get(p);
if (e.getValue() == MOVING || e.getValue() == OWNING) {
if (ids == null)
// Initialize HashSet to size 3 in anticipation that there won't be
// more than 3 nodes per partition.
part2node.put(p, ids = U.newHashSet(3));
changed |= ids.add(parts.nodeId());
}
else {
if (ids != null)
changed |= ids.remove(parts.nodeId());
}
}
// Remove obsolete mappings.
if (cur != null) {
for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) {
Set<UUID> ids = part2node.get(p);
if (ids != null)
changed |= ids.remove(parts.nodeId());
}
}
consistencyCheck();
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
return changed;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment,
boolean updateRebalanceVer) {
// No-op.
}
/** {@inheritDoc} */
@Override public boolean detectLostPartitions(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture fut) {
lock.writeLock().lock();
try {
if (node2part == null)
return false;
final GridClusterStateProcessor state = cctx.kernalContext().state();
boolean isInMemoryCluster = CU.isInMemoryCluster(
cctx.kernalContext().discovery().allNodes(),
cctx.kernalContext().marshallerContext().jdkMarshaller(),
U.resolveClassLoader(cctx.kernalContext().config())
);
boolean compatibleWithIgnorePlc = isInMemoryCluster
&& state.isBaselineAutoAdjustEnabled() && state.baselineAutoAdjustTimeout() == 0L;
// Calculate how loss data is handled.
boolean safe = partLossPlc != PartitionLossPolicy.IGNORE || !compatibleWithIgnorePlc;
boolean changed = false;
for (int part = 0; part < parts; part++) {
boolean lost = F.contains(lostParts, part);
if (!lost) {
boolean hasOwner = false;
// Detect if all owners are left.
for (GridDhtPartitionMap partMap : node2part.values()) {
if (partMap.get(part) == OWNING) {
hasOwner = true;
break;
}
}
if (!hasOwner) {
lost = true;
// Do not detect and record lost partition in IGNORE mode.
if (safe) {
if (lostParts == null)
lostParts = new TreeSet<>();
lostParts.add(part);
}
}
}
if (lost) {
// Update remote maps according to policy.
for (Map.Entry<UUID, GridDhtPartitionMap> entry : node2part.entrySet()) {
if (entry.getValue().get(part) != null)
entry.getValue().put(part, safe ? LOST : OWNING);
}
}
}
return changed;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void resetLostPartitions(AffinityTopologyVersion affVer) {
lock.writeLock().lock();
try {
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) {
if (e0.getValue() != LOST)
continue;
e0.setValue(OWNING);
}
}
lostParts = null;
} finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public Set<Integer> lostPartitions() {
lock.readLock().lock();
try {
return lostParts == null ? Collections.emptySet() : lostParts;
}
finally {
lock.readLock().unlock();
}
}
/**
* Updates value for single partition.
*
* @param p Partition.
* @param nodeId Node ID.
* @param state State.
* @param updateSeq Update sequence.
*/
private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
assert lock.isWriteLockedByCurrentThread();
assert nodeId.equals(cctx.localNodeId());
// In case if node joins, get topology at the time of joining node.
ClusterNode oldest = discoCache.oldestAliveServerNode();
// If this node became the oldest node.
if (cctx.localNode().equals(oldest)) {
long seq = node2part.updateSequence();
if (seq != updateSeq) {
if (seq > updateSeq) {
if (this.updateSeq.get() < seq) {
// Update global counter if necessary.
boolean b = this.updateSeq.compareAndSet(this.updateSeq.get(), seq + 1);
assert b : "Invalid update sequence [updateSeq=" + updateSeq + ", seq=" + seq +
", curUpdateSeq=" + this.updateSeq.get() + ", node2part=" + node2part.toFullString() + ']';
updateSeq = seq + 1;
}
else
updateSeq = seq;
}
node2part.updateSequence(updateSeq);
}
}
GridDhtPartitionMap map = node2part.get(nodeId);
if (map == null)
node2part.put(nodeId, map = new GridDhtPartitionMap(nodeId, updateSeq, topVer,
GridPartitionStateMap.EMPTY, false));
map.updateSequence(updateSeq, topVer);
map.put(p, state);
Set<UUID> ids = part2node.get(p);
if (ids == null)
part2node.put(p, ids = U.newHashSet(3));
ids.add(nodeId);
}
/**
* @param nodeId Node to remove.
*/
private void removeNode(UUID nodeId) {
assert nodeId != null;
assert lock.writeLock().isHeldByCurrentThread();
ClusterNode loc = cctx.localNode();
if (node2part != null) {
if (!node2part.nodeId().equals(loc.id())) {
updateSeq.setIfGreater(node2part.updateSequence());
node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.incrementAndGet(),
node2part, false);
}
else
node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence());
GridDhtPartitionMap parts = node2part.remove(nodeId);
if (parts != null) {
for (Integer p : parts.keySet()) {
Set<UUID> nodeIds = part2node.get(p);
if (nodeIds != null) {
nodeIds.remove(nodeId);
if (nodeIds.isEmpty())
part2node.remove(p);
}
}
}
consistencyCheck();
}
}
/** {@inheritDoc} */
@Override public boolean own(GridDhtLocalPartition part) {
assert false : "Client topology should never own a partition: " + part;
return false;
}
/** {@inheritDoc} */
@Override public void ownMoving() {
// No-op
}
/** {@inheritDoc} */
@Override public boolean tryFinishEviction(GridDhtLocalPartition part) {
lock.writeLock().lock();
try {
if (stopping)
return false;
assert part.state() == EVICTED;
long seq = updateSeq.incrementAndGet();
updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
consistencyCheck();
return true;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) {
lock.readLock().lock();
try {
return node2part.get(nodeId);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public Map<UUID, Set<Integer>> resetOwners(
Map<Integer, Set<UUID>> ownersByUpdCounters,
Set<Integer> haveHist,
GridDhtPartitionsExchangeFuture exchFut
) {
Map<UUID, Set<Integer>> res = new HashMap<>();
lock.writeLock().lock();
try {
// Process remote partitions.
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet()) {
int part = entry.getKey();
Set<UUID> newOwners = entry.getValue();
for (Map.Entry<UUID, GridDhtPartitionMap> remotes : node2part.entrySet()) {
UUID remoteNodeId = remotes.getKey();
GridDhtPartitionMap partMap = remotes.getValue();
GridDhtPartitionState state = partMap.get(part);
if (state == null || state != OWNING)
continue;
if (!newOwners.contains(remoteNodeId)) {
partMap.put(part, MOVING);
partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion());
res.computeIfAbsent(remoteNodeId, n -> new HashSet<>());
res.get(remoteNodeId).add(part);
}
}
}
for (Map.Entry<UUID, Set<Integer>> entry : res.entrySet()) {
UUID nodeId = entry.getKey();
Set<Integer> partsToRebalance = entry.getValue();
if (!partsToRebalance.isEmpty()) {
Set<Integer> historical = partsToRebalance.stream()
.filter(haveHist::contains)
.collect(Collectors.toSet());
// Filter out partitions having WAL history.
partsToRebalance.removeAll(historical);
U.warn(log, "Partitions have been scheduled for rebalancing due to outdated update counter "
+ "[grpId=" + grpId
+ ", nodeId=" + nodeId
+ ", partsFull=" + S.compact(partsToRebalance)
+ ", partsHistorical=" + S.compact(historical) + "]");
}
}
for (Map.Entry<Integer, Set<UUID>> entry : ownersByUpdCounters.entrySet())
part2node.put(entry.getKey(), entry.getValue());
updateSeq.incrementAndGet();
} finally {
lock.writeLock().unlock();
}
return res;
}
/** {@inheritDoc} */
@Override public CachePartitionFullCountersMap fullUpdateCounters() {
lock.readLock().lock();
try {
return new CachePartitionFullCountersMap(cntrMap);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) {
return CachePartitionPartialCountersMap.EMPTY;
}
/** {@inheritDoc} */
@Override public void finalizeUpdateCounters(Set<Integer> parts) {
// No-op.
}
/** {@inheritDoc} */
@Override public Map<Integer, Long> partitionSizes() {
return Collections.emptyMap();
}
/** {@inheritDoc} */
@Override @Nullable public Map<Integer, Long> globalPartSizes() {
lock.readLock().lock();
try {
if (globalPartSizes == null)
return Collections.emptyMap();
return Collections.unmodifiableMap(globalPartSizes);
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void globalPartSizes(@Nullable Map<Integer, Long> partSizes) {
lock.writeLock().lock();
try {
this.globalPartSizes = partSizes;
}
finally {
lock.writeLock().unlock();
}
}
/** {@inheritDoc} */
@Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) {
assert false : "Should not be called on non-affinity node";
return false;
}
/** {@inheritDoc} */
@Override public boolean hasMovingPartitions() {
lock.readLock().lock();
try {
assert node2part != null && node2part.valid() : "Invalid node2part [node2part: " + node2part +
", locNodeId=" + cctx.localNodeId() +
", igniteInstanceName=" + cctx.igniteInstanceName() + ']';
for (GridDhtPartitionMap map : node2part.values()) {
if (map.hasMovingPartitions())
return true;
}
return false;
}
finally {
lock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public void printMemoryStats(int threshold) {
X.println(">>> Cache partition topology stats [igniteInstanceName=" + cctx.igniteInstanceName() +
", grpId=" + grpId + ']');
}
/**
* @param p Partition.
* @param nodeId Node ID.
* @param match State to match.
* @param matches Additional states.
* @return Filter for owners of this partition.
*/
private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match,
final GridDhtPartitionState... matches) {
if (nodeId == null)
return false;
GridDhtPartitionMap parts = node2part.get(nodeId);
// Set can be null if node has been removed.
if (parts != null) {
GridDhtPartitionState state = parts.get(p);
if (state == match)
return true;
if (matches != null && matches.length > 0)
for (GridDhtPartitionState s : matches)
if (state == s)
return true;
}
return false;
}
/**
* Checks consistency after all operations.
*/
private void consistencyCheck() {
if (CONSISTENCY_CHECK) {
assert lock.writeLock().isHeldByCurrentThread();
if (node2part == null)
return;
for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) {
for (Integer p : e.getValue().keySet()) {
Set<UUID> nodeIds = part2node.get(p);
assert nodeIds != null : "Failed consistency check [part=" + p + ", nodeId=" + e.getKey() + ']';
assert nodeIds.contains(e.getKey()) : "Failed consistency check [part=" + p + ", nodeId=" +
e.getKey() + ", nodeIds=" + nodeIds + ']';
}
}
for (Map.Entry<Integer, Set<UUID>> e : part2node.entrySet()) {
for (UUID nodeId : e.getValue()) {
GridDhtPartitionMap map = node2part.get(nodeId);
assert map != null : "Failed consistency check [part=" + e.getKey() + ", nodeId=" + nodeId + ']';
assert map.containsKey(e.getKey()) : "Failed consistency check [part=" + e.getKey() +
", nodeId=" + nodeId + ']';
}
}
}
}
/** {@inheritDoc} */
@Override public boolean rent(int p) {
return false;
}
}