| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.nio.file.Files; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReferenceArray; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.PartitionLossPolicy; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.EventType; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.managers.discovery.DiscoCache; |
| import org.apache.ignite.internal.pagemem.store.IgnitePageStoreManager; |
| import org.apache.ignite.internal.processors.affinity.AffinityAssignment; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheGroupContext; |
| import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents; |
| import org.apache.ignite.internal.processors.cache.GridCacheSharedContext; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionFullCountersMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionExchangeId; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionFullMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionMap; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPartitionsExchangeFuture; |
| import org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager; |
| import org.apache.ignite.internal.processors.cache.persistence.file.FilePageStoreManager; |
| import org.apache.ignite.internal.util.F0; |
| import org.apache.ignite.internal.util.GridAtomicLong; |
| import org.apache.ignite.internal.util.GridPartitionStateMap; |
| import org.apache.ignite.internal.util.StripedCompositeReadWriteLock; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST; |
| import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.EVICTED; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.LOST; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.OWNING; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.RENTING; |
| |
| /** |
| * Partition topology. |
| */ |
| @GridToStringExclude |
| public class GridDhtPartitionTopologyImpl implements GridDhtPartitionTopology { |
| /** */ |
| private static final GridDhtPartitionState[] MOVING_STATES = new GridDhtPartitionState[] {MOVING}; |
| |
| /** Flag to control amount of output for full map. */ |
| private static final boolean FULL_MAP_DEBUG = false; |
| |
| /** */ |
| private static final boolean FAST_DIFF_REBUILD = false; |
| |
| /** */ |
| private static final Long ZERO = 0L; |
| |
| /** */ |
| private final GridCacheSharedContext ctx; |
| |
| /** */ |
| private final CacheGroupContext grp; |
| |
| /** Logger. */ |
| private final IgniteLogger log; |
| |
| /** Time logger. */ |
| private final IgniteLogger timeLog; |
| |
| /** */ |
| private final AtomicReferenceArray<GridDhtLocalPartition> locParts; |
| |
| /** Node to partition map. */ |
| private GridDhtPartitionFullMap node2part; |
| |
| /** */ |
| private final Map<Integer, Set<UUID>> diffFromAffinity = new HashMap<>(); |
| |
| /** */ |
| private volatile AffinityTopologyVersion diffFromAffinityVer = AffinityTopologyVersion.NONE; |
| |
| /** Last started exchange version (always >= readyTopVer). */ |
| private volatile AffinityTopologyVersion lastTopChangeVer = AffinityTopologyVersion.NONE; |
| |
| /** Last finished exchange version. */ |
| private volatile AffinityTopologyVersion readyTopVer = AffinityTopologyVersion.NONE; |
| |
| /** Discovery cache. */ |
| private volatile DiscoCache discoCache; |
| |
| /** */ |
| private volatile boolean stopping; |
| |
| /** A future that will be completed when topology with version topVer will be ready to use. */ |
| private volatile GridDhtTopologyFuture topReadyFut; |
| |
| /** */ |
| private final GridAtomicLong updateSeq = new GridAtomicLong(1); |
| |
| /** Lock. */ |
| private final StripedCompositeReadWriteLock lock = new StripedCompositeReadWriteLock(16); |
| |
| /** Partition update counter. */ |
| private final CachePartitionFullCountersMap cntrMap; |
| |
| /** */ |
| private volatile AffinityTopologyVersion rebalancedTopVer = AffinityTopologyVersion.NONE; |
| |
| /** |
| * @param ctx Cache shared context. |
| * @param grp Cache group. |
| */ |
| public GridDhtPartitionTopologyImpl( |
| GridCacheSharedContext ctx, |
| CacheGroupContext grp |
| ) { |
| assert ctx != null; |
| assert grp != null; |
| |
| this.ctx = ctx; |
| this.grp = grp; |
| |
| log = ctx.logger(getClass()); |
| |
| timeLog = ctx.logger(GridDhtPartitionsExchangeFuture.EXCHANGE_LOG); |
| |
| locParts = new AtomicReferenceArray<>(grp.affinityFunction().partitions()); |
| |
| cntrMap = new CachePartitionFullCountersMap(locParts.length()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int partitions() { |
| return grp.affinityFunction().partitions(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int groupId() { |
| return grp.groupId(); |
| } |
| |
| /** |
| * |
| */ |
| public void onReconnected() { |
| lock.writeLock().lock(); |
| |
| try { |
| node2part = null; |
| |
| diffFromAffinity.clear(); |
| |
| updateSeq.set(1); |
| |
| topReadyFut = null; |
| |
| diffFromAffinityVer = AffinityTopologyVersion.NONE; |
| |
| rebalancedTopVer = AffinityTopologyVersion.NONE; |
| |
| readyTopVer = AffinityTopologyVersion.NONE; |
| |
| lastTopChangeVer = AffinityTopologyVersion.NONE; |
| |
| discoCache = ctx.discovery().discoCache(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * @return Full map string representation. |
| */ |
| @SuppressWarnings({"ConstantConditions"}) |
| private String fullMapString() { |
| return node2part == null ? "null" : FULL_MAP_DEBUG ? node2part.toFullString() : node2part.toString(); |
| } |
| |
| /** |
| * @param map Map to get string for. |
| * @return Full map string representation. |
| */ |
| @SuppressWarnings({"ConstantConditions"}) |
| private String mapString(GridDhtPartitionMap map) { |
| return map == null ? "null" : FULL_MAP_DEBUG ? map.toFullString() : map.toString(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"LockAcquiredButNotSafelyReleased"}) |
| @Override public void readLock() { |
| lock.readLock().lock(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readUnlock() { |
| lock.readLock().unlock(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void updateTopologyVersion( |
| GridDhtTopologyFuture exchFut, |
| DiscoCache discoCache, |
| long updSeq, |
| boolean stopping |
| ) throws IgniteInterruptedCheckedException { |
| U.writeLock(lock); |
| |
| try { |
| AffinityTopologyVersion exchTopVer = exchFut.initialVersion(); |
| |
| assert exchTopVer.compareTo(readyTopVer) > 0 : "Invalid topology version [grp=" + grp.cacheOrGroupName() + |
| ", topVer=" + readyTopVer + |
| ", exchTopVer=" + exchTopVer + |
| ", fut=" + exchFut + ']'; |
| |
| this.stopping = stopping; |
| |
| updateSeq.setIfGreater(updSeq); |
| |
| topReadyFut = exchFut; |
| |
| rebalancedTopVer = AffinityTopologyVersion.NONE; |
| |
| lastTopChangeVer = exchTopVer; |
| |
| this.discoCache = discoCache; |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public AffinityTopologyVersion readyTopologyVersion() { |
| AffinityTopologyVersion topVer = this.readyTopVer; |
| |
| assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + |
| ", group=" + grp.cacheOrGroupName() + ']'; |
| |
| return topVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public AffinityTopologyVersion lastTopologyChangeVersion() { |
| AffinityTopologyVersion topVer = this.lastTopChangeVer; |
| |
| assert topVer.topologyVersion() > 0 : "Invalid topology version [topVer=" + topVer + |
| ", group=" + grp.cacheOrGroupName() + ']'; |
| |
| return topVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtTopologyFuture topologyVersionFuture() { |
| assert topReadyFut != null; |
| |
| return topReadyFut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean stopping() { |
| return stopping; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void initPartitionsWhenAffinityReady(AffinityTopologyVersion affVer, |
| GridDhtPartitionsExchangeFuture exchFut) |
| throws IgniteInterruptedCheckedException |
| { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| U.writeLock(lock); |
| |
| try { |
| if (stopping) |
| return; |
| |
| long updateSeq = this.updateSeq.incrementAndGet(); |
| |
| initPartitions0(affVer, exchFut, updateSeq); |
| |
| consistencyCheck(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * @param affVer Affinity version to use. |
| * @param exchFut Exchange future. |
| * @param updateSeq Update sequence. |
| */ |
| private void initPartitions0(AffinityTopologyVersion affVer, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) { |
| List<List<ClusterNode>> aff = grp.affinity().readyAssignments(affVer); |
| |
| if (grp.affinityNode()) { |
| ClusterNode loc = ctx.localNode(); |
| |
| ClusterNode oldest = discoCache.oldestAliveServerNode(); |
| |
| GridDhtPartitionExchangeId exchId = exchFut.exchangeId(); |
| |
| assert grp.affinity().lastVersion().equals(affVer) : |
| "Invalid affinity [topVer=" + grp.affinity().lastVersion() + |
| ", grp=" + grp.cacheOrGroupName() + |
| ", affVer=" + affVer + |
| ", fut=" + exchFut + ']'; |
| |
| int num = grp.affinity().partitions(); |
| |
| if (grp.rebalanceEnabled()) { |
| boolean added = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); |
| |
| boolean first = added || (loc.equals(oldest) && loc.id().equals(exchId.nodeId()) && exchId.isJoined()); |
| |
| if (first) { |
| assert exchId.isJoined() || added; |
| |
| for (int p = 0; p < num; p++) { |
| if (localNode(p, aff) || initLocalPartition(p, discoCache)) { |
| GridDhtLocalPartition locPart = createPartition(p); |
| |
| if (grp.persistenceEnabled()) { |
| GridCacheDatabaseSharedManager db = (GridCacheDatabaseSharedManager)grp.shared().database(); |
| |
| locPart.restoreState(db.readPartitionState(grp, locPart.id())); |
| } |
| else { |
| boolean owned = locPart.own(); |
| |
| assert owned : "Failed to own partition for oldest node [grp=" + grp.cacheOrGroupName() + |
| ", part=" + locPart + ']'; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Owned partition for oldest node [grp=" + grp.cacheOrGroupName() + |
| ", part=" + locPart + ']'); |
| } |
| |
| updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); |
| } |
| } |
| } |
| else |
| createPartitions(affVer, aff, updateSeq); |
| } |
| else { |
| // If preloader is disabled, then we simply clear out |
| // the partitions this node is not responsible for. |
| for (int p = 0; p < num; p++) { |
| GridDhtLocalPartition locPart = localPartition0(p, affVer, false, true, false); |
| |
| boolean belongs = localNode(p, aff); |
| |
| if (locPart != null) { |
| if (!belongs) { |
| GridDhtPartitionState state = locPart.state(); |
| |
| if (state.active()) { |
| locPart.rent(false); |
| |
| updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Evicting partition with rebalancing disabled (it does not belong to " + |
| "affinity) [grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'); |
| } |
| } |
| } |
| else |
| locPart.own(); |
| } |
| else if (belongs) { |
| locPart = createPartition(p); |
| |
| locPart.own(); |
| |
| updateLocal(p, locPart.state(), updateSeq, affVer); |
| } |
| } |
| } |
| } |
| |
| updateRebalanceVersion(aff); |
| } |
| |
| /** |
| * @param p Partition ID to restore. |
| * @param discoCache Disco cache to use. |
| * @return {@code True} if should restore local partition. |
| */ |
| private boolean initLocalPartition(int p, DiscoCache discoCache) { |
| IgnitePageStoreManager storeMgr = ctx.pageStore(); |
| |
| return |
| grp.persistenceEnabled() && |
| storeMgr instanceof FilePageStoreManager && |
| discoCache.baselineNode(ctx.localNodeId()) && |
| Files.exists(((FilePageStoreManager)storeMgr).getPath(grp.sharedGroup(), grp.cacheOrGroupName(), p)); |
| } |
| |
| /** |
| * @param affVer Affinity version. |
| * @param aff Affinity assignments. |
| * @param updateSeq Update sequence. |
| */ |
| private void createPartitions(AffinityTopologyVersion affVer, List<List<ClusterNode>> aff, long updateSeq) { |
| if (!grp.affinityNode()) |
| return; |
| |
| int num = grp.affinity().partitions(); |
| |
| for (int p = 0; p < num; p++) { |
| if (node2part != null && node2part.valid()) { |
| if (localNode(p, aff)) { |
| // This will make sure that all non-existing partitions |
| // will be created in MOVING state. |
| GridDhtLocalPartition locPart = createPartition(p); |
| |
| updateSeq = updateLocal(p, locPart.state(), updateSeq, affVer); |
| } |
| } |
| // If this node's map is empty, we pre-create local partitions, |
| // so local map will be sent correctly during exchange. |
| else if (localNode(p, aff)) |
| createPartition(p); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void beforeExchange(GridDhtPartitionsExchangeFuture exchFut, |
| boolean affReady, |
| boolean updateMoving) |
| throws IgniteCheckedException { |
| ClusterNode loc = ctx.localNode(); |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| synchronized (ctx.exchange().interruptLock()) { |
| if (Thread.currentThread().isInterrupted()) |
| throw new IgniteInterruptedCheckedException("Thread is interrupted: " + Thread.currentThread()); |
| |
| U.writeLock(lock); |
| |
| try { |
| if (stopping) |
| return; |
| |
| assert lastTopChangeVer.equals(exchFut.initialVersion()) : "Invalid topology version [topVer=" + lastTopChangeVer + |
| ", exchId=" + exchFut.exchangeId() + ']'; |
| |
| ExchangeDiscoveryEvents evts = exchFut.context().events(); |
| |
| if (affReady) { |
| assert grp.affinity().lastVersion().equals(evts.topologyVersion()) : "Invalid affinity version [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", affVer=" + grp.affinity().lastVersion() + |
| ", evtsVer=" + evts.topologyVersion() + ']'; |
| |
| lastTopChangeVer = readyTopVer = evts.topologyVersion(); |
| } |
| |
| ClusterNode oldest = discoCache.oldestAliveServerNode(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Partition map beforeExchange [grp=" + grp.cacheOrGroupName() + |
| ", exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); |
| } |
| |
| long updateSeq = this.updateSeq.incrementAndGet(); |
| |
| cntrMap.clear(); |
| |
| boolean grpStarted = exchFut.cacheGroupAddedOnExchange(grp.groupId(), grp.receivedFrom()); |
| |
| // If this is the oldest node. |
| if (oldest != null && (loc.equals(oldest) || grpStarted)) { |
| if (node2part == null) { |
| node2part = new GridDhtPartitionFullMap(oldest.id(), oldest.order(), updateSeq); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Created brand new full topology map on oldest node [" + |
| "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + |
| ", fullMap=" + fullMapString() + ']'); |
| } |
| } |
| else if (!node2part.valid()) { |
| node2part = new GridDhtPartitionFullMap(oldest.id(), |
| oldest.order(), |
| updateSeq, |
| node2part, |
| false); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Created new full topology map on oldest node [" + |
| "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + |
| ", fullMap=" + node2part + ']'); |
| } |
| } |
| else if (!node2part.nodeId().equals(loc.id())) { |
| node2part = new GridDhtPartitionFullMap(oldest.id(), |
| oldest.order(), |
| updateSeq, |
| node2part, |
| false); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Copied old map into new map on oldest node (previous oldest node left) [" + |
| "grp=" + grp.cacheOrGroupName() + ", exchId=" + exchFut.exchangeId() + |
| ", fullMap=" + fullMapString() + ']'); |
| } |
| } |
| } |
| |
| if (evts.hasServerLeft()) { |
| List<DiscoveryEvent> evts0 = evts.events(); |
| |
| for (int i = 0; i < evts0.size(); i++) { |
| DiscoveryEvent evt = evts0.get(i); |
| |
| if (ExchangeDiscoveryEvents.serverLeftEvent(evt)) |
| removeNode(evt.eventNode().id()); |
| } |
| } |
| |
| if (grp.affinityNode()) { |
| if (grpStarted || |
| exchFut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || |
| exchFut.serverNodeDiscoveryEvent()) { |
| if (affReady) { |
| assert grp.affinity().lastVersion().equals(evts.topologyVersion()); |
| |
| initPartitions0(evts.topologyVersion(), exchFut, updateSeq); |
| } |
| else { |
| assert !exchFut.context().mergeExchanges(); |
| |
| List<List<ClusterNode>> aff = grp.affinity().idealAssignment(); |
| |
| createPartitions(exchFut.initialVersion(), aff, updateSeq); |
| } |
| } |
| } |
| |
| consistencyCheck(); |
| |
| if (updateMoving) { |
| assert grp.affinity().lastVersion().equals(evts.topologyVersion()); |
| |
| createMovingPartitions(grp.affinity().readyAffinity(evts.topologyVersion())); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Partition map after beforeExchange [grp=" + grp.cacheOrGroupName() + ", " + |
| "exchId=" + exchFut.exchangeId() + ", fullMap=" + fullMapString() + ']'); |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** |
| * @param p Partition number. |
| * @param topVer Topology version. |
| * @return {@code True} if given partition belongs to local node. |
| */ |
| private boolean partitionLocalNode(int p, AffinityTopologyVersion topVer) { |
| return grp.affinity().nodes(p, topVer).contains(ctx.localNode()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean afterExchange(GridDhtPartitionsExchangeFuture exchFut) { |
| boolean changed = false; |
| |
| int num = grp.affinity().partitions(); |
| |
| AffinityTopologyVersion topVer = exchFut.context().events().topologyVersion(); |
| |
| assert grp.affinity().lastVersion().equals(topVer) : "Affinity is not initialized " + |
| "[grp=" + grp.cacheOrGroupName() + |
| ", topVer=" + topVer + |
| ", affVer=" + grp.affinity().lastVersion() + |
| ", fut=" + exchFut + ']'; |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| |
| lock.writeLock().lock(); |
| |
| try { |
| if (stopping) |
| return false; |
| |
| assert readyTopVer.initialized() : readyTopVer; |
| assert lastTopChangeVer.equals(readyTopVer); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Partition map before afterExchange [grp=" + grp.cacheOrGroupName() + |
| ", exchId=" + exchFut.exchangeId() + |
| ", fullMap=" + fullMapString() + ']'); |
| } |
| |
| long updateSeq = this.updateSeq.incrementAndGet(); |
| |
| for (int p = 0; p < num; p++) { |
| GridDhtLocalPartition locPart = localPartition0(p, topVer, false, false, false); |
| |
| if (partitionLocalNode(p, topVer)) { |
| // This partition will be created during next topology event, |
| // which obviously has not happened at this point. |
| if (locPart == null) { |
| if (log.isDebugEnabled()) { |
| log.debug("Skipping local partition afterExchange (will not create) [" + |
| "grp=" + grp.cacheOrGroupName() + ", p=" + p + ']'); |
| } |
| |
| continue; |
| } |
| |
| GridDhtPartitionState state = locPart.state(); |
| |
| if (state == MOVING) { |
| if (grp.rebalanceEnabled()) { |
| Collection<ClusterNode> owners = owners(p); |
| |
| // If there are no other owners, then become an owner. |
| if (F.isEmpty(owners)) { |
| boolean owned = locPart.own(); |
| |
| assert owned : "Failed to own partition [grp=" + grp.cacheOrGroupName() + |
| ", locPart=" + locPart + ']'; |
| |
| updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); |
| |
| changed = true; |
| |
| if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) { |
| DiscoveryEvent discoEvt = exchFut.events().lastEvent(); |
| |
| grp.addRebalanceEvent(p, |
| EVT_CACHE_REBALANCE_PART_DATA_LOST, |
| discoEvt.eventNode(), |
| discoEvt.type(), |
| discoEvt.timestamp()); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Owned partition [grp=" + grp.cacheOrGroupName() + |
| ", part=" + locPart + ']'); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Will not own partition (there are owners to rebalance from) [grp=" + grp.cacheOrGroupName() + |
| ", locPart=" + locPart + ", owners = " + owners + ']'); |
| } |
| else |
| updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); |
| } |
| } |
| else { |
| if (locPart != null) { |
| GridDhtPartitionState state = locPart.state(); |
| |
| if (state == MOVING) { |
| locPart.rent(false); |
| |
| updateSeq = updateLocal(p, locPart.state(), updateSeq, topVer); |
| |
| changed = true; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Evicting moving partition (it does not belong to affinity) [" + |
| "grp=" + grp.cacheOrGroupName() + ", part=" + locPart + ']'); |
| } |
| } |
| } |
| } |
| } |
| |
| AffinityAssignment aff = grp.affinity().readyAffinity(topVer); |
| |
| if (node2part != null && node2part.valid()) |
| changed |= checkEvictions(updateSeq, aff); |
| |
| updateRebalanceVersion(aff.assignment()); |
| |
| consistencyCheck(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| return changed; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, |
| boolean create) |
| throws GridDhtInvalidPartitionException { |
| return localPartition0(p, topVer, create, false, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public GridDhtLocalPartition localPartition(int p, AffinityTopologyVersion topVer, |
| boolean create, boolean showRenting) throws GridDhtInvalidPartitionException { |
| return localPartition0(p, topVer, create, showRenting, true); |
| } |
| |
| /** |
| * @param p Partition number. |
| * @return Partition. |
| */ |
| private GridDhtLocalPartition createPartition(int p) { |
| assert lock.isWriteLockedByCurrentThread(); |
| |
| assert ctx.database().checkpointLockIsHeldByThread(); |
| |
| GridDhtLocalPartition loc = locParts.get(p); |
| |
| if (loc == null || loc.state() == EVICTED) { |
| if (loc != null) { |
| try { |
| loc.rent(false).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); |
| |
| long updCntr = cntrMap.updateCounter(p); |
| |
| if (updCntr != 0) |
| loc.updateCounter(updCntr); |
| |
| if (ctx.pageStore() != null) { |
| try { |
| ctx.pageStore().onPartitionCreated(grp.groupId(), p); |
| } |
| catch (IgniteCheckedException e) { |
| // TODO ignite-db |
| throw new IgniteException(e); |
| } |
| } |
| } |
| |
| return loc; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtLocalPartition forceCreatePartition(int p) throws IgniteCheckedException { |
| lock.writeLock().lock(); |
| |
| try { |
| GridDhtLocalPartition part = locParts.get(p); |
| |
| if (part != null && part.state() != EVICTED) |
| return part; |
| |
| part = new GridDhtLocalPartition(ctx, grp, p); |
| |
| locParts.set(p, part); |
| |
| ctx.pageStore().onPartitionCreated(grp.groupId(), p); |
| |
| return part; |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * @param p Partition number. |
| * @param topVer Topology version. |
| * @param create Create flag. |
| * @param updateSeq Update sequence. |
| * @return Local partition. |
| */ |
| @SuppressWarnings("TooBroadScope") |
| private GridDhtLocalPartition localPartition0(int p, |
| AffinityTopologyVersion topVer, |
| boolean create, |
| boolean showRenting, |
| boolean updateSeq) { |
| GridDhtLocalPartition loc; |
| |
| loc = locParts.get(p); |
| |
| GridDhtPartitionState state = loc != null ? loc.state() : null; |
| |
| if (loc != null && state != EVICTED && (state != RENTING || showRenting)) |
| return loc; |
| |
| if (!create) |
| return null; |
| |
| boolean created = false; |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| loc = locParts.get(p); |
| |
| state = loc != null ? loc.state() : null; |
| |
| boolean belongs = partitionLocalNode(p, topVer); |
| |
| if (loc != null && state == EVICTED) { |
| try { |
| loc.rent(false).get(); |
| } |
| catch (IgniteCheckedException ex) { |
| throw new IgniteException(ex); |
| } |
| |
| locParts.set(p, loc = null); |
| |
| if (!belongs) { |
| throw new GridDhtInvalidPartitionException(p, "Adding entry to evicted partition " + |
| "(often may be caused by inconsistent 'key.hashCode()' implementation) " + |
| "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer + |
| ", this.topVer=" + this.readyTopVer + ']'); |
| } |
| } |
| else if (loc != null && state == RENTING && !showRenting) { |
| throw new GridDhtInvalidPartitionException(p, "Adding entry to partition that is concurrently " + |
| "evicted [grp=" + grp.cacheOrGroupName() + ", part=" + p + ", shouldBeMoving=" + |
| loc.reload() + ", belongs=" + belongs + ", topVer=" + topVer + ", curTopVer=" + this.readyTopVer + "]"); |
| } |
| |
| if (loc == null) { |
| if (!belongs) |
| throw new GridDhtInvalidPartitionException(p, "Creating partition which does not belong to " + |
| "local node (often may be caused by inconsistent 'key.hashCode()' implementation) " + |
| "[grp=" + grp.cacheOrGroupName() + ", part=" + p + ", topVer=" + topVer + |
| ", this.topVer=" + this.readyTopVer + ']'); |
| |
| locParts.set(p, loc = new GridDhtLocalPartition(ctx, grp, p)); |
| |
| if (updateSeq) |
| this.updateSeq.incrementAndGet(); |
| |
| created = true; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Created local partition [grp=" + grp.cacheOrGroupName() + ", part=" + loc + ']'); |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| if (created && ctx.pageStore() != null) { |
| try { |
| ctx.pageStore().onPartitionCreated(grp.groupId(), p); |
| } |
| catch (IgniteCheckedException e) { |
| // TODO ignite-db |
| throw new IgniteException(e); |
| } |
| } |
| |
| return loc; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void releasePartitions(int... parts) { |
| assert parts != null; |
| assert parts.length > 0; |
| |
| for (int i = 0; i < parts.length; i++) { |
| GridDhtLocalPartition part = locParts.get(parts[i]); |
| |
| if (part != null) |
| part.release(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtLocalPartition localPartition(int part) { |
| return locParts.get(part); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<GridDhtLocalPartition> localPartitions() { |
| List<GridDhtLocalPartition> list = new ArrayList<>(locParts.length()); |
| |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part != null && part.state().active()) |
| list.add(part); |
| } |
| |
| return list; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterable<GridDhtLocalPartition> currentLocalPartitions() { |
| return new Iterable<GridDhtLocalPartition>() { |
| @Override public Iterator<GridDhtLocalPartition> iterator() { |
| return new CurrentPartitionsIterator(); |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onRemoved(GridDhtCacheEntry e) { |
| /* |
| * Make sure not to acquire any locks here as this method |
| * may be called from sensitive synchronization blocks. |
| * =================================================== |
| */ |
| |
| GridDhtLocalPartition loc = localPartition(e.partition(), readyTopVer, false); |
| |
| if (loc != null) |
| loc.onRemoved(e); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtPartitionMap localPartitionMap() { |
| GridPartitionStateMap map = new GridPartitionStateMap(locParts.length()); |
| |
| lock.readLock().lock(); |
| |
| try { |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part == null) |
| continue; |
| |
| map.put(i, part.state()); |
| } |
| |
| return new GridDhtPartitionMap(ctx.localNodeId(), |
| updateSeq.get(), |
| readyTopVer, |
| map, |
| true); |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtPartitionState partitionState(UUID nodeId, int part) { |
| lock.readLock().lock(); |
| |
| try { |
| GridDhtPartitionMap partMap = node2part.get(nodeId); |
| |
| if (partMap != null) { |
| GridDhtPartitionState state = partMap.get(part); |
| |
| return state == null ? EVICTED : state; |
| } |
| |
| return EVICTED; |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public List<ClusterNode> nodes(int p, |
| AffinityAssignment affAssignment, |
| List<ClusterNode> affNodes) { |
| return nodes0(p, affAssignment, affNodes); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<ClusterNode> nodes(int p, AffinityTopologyVersion topVer) { |
| AffinityAssignment affAssignment = grp.affinity().cachedAffinity(topVer); |
| |
| List<ClusterNode> affNodes = affAssignment.get(p); |
| |
| List<ClusterNode> nodes = nodes0(p, affAssignment, affNodes); |
| |
| return nodes != null ? nodes : affNodes; |
| } |
| |
| /** |
| * @param p Partition. |
| * @param affAssignment Assignments. |
| * @param affNodes Node assigned for given partition by affinity. |
| * @return Nodes responsible for given partition (primary is first). |
| */ |
| @Nullable private List<ClusterNode> nodes0(int p, AffinityAssignment affAssignment, List<ClusterNode> affNodes) { |
| if (grp.isReplicated()) |
| return affNodes; |
| |
| AffinityTopologyVersion topVer = affAssignment.topologyVersion(); |
| |
| lock.readLock().lock(); |
| |
| try { |
| assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer1=" + topVer + |
| ", topVer2=" + this.readyTopVer + |
| ", node=" + ctx.igniteInstanceName() + |
| ", grp=" + grp.cacheOrGroupName() + |
| ", node2part=" + node2part + ']'; |
| |
| List<ClusterNode> nodes = null; |
| |
| if (!topVer.equals(diffFromAffinityVer)) { |
| LT.warn(log, "Requested topology version does not match calculated diff, will require full iteration to" + |
| "calculate mapping [grp=" + grp.cacheOrGroupName() + ", topVer=" + topVer + |
| ", diffVer=" + diffFromAffinityVer + "]"); |
| |
| nodes = new ArrayList<>(); |
| |
| nodes.addAll(affNodes); |
| |
| for (Map.Entry<UUID, GridDhtPartitionMap> entry : node2part.entrySet()) { |
| GridDhtPartitionState state = entry.getValue().get(p); |
| |
| ClusterNode n = ctx.discovery().node(entry.getKey()); |
| |
| if (n != null && state != null && (state == MOVING || state == OWNING || state == RENTING) |
| && !nodes.contains(n) && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { |
| nodes.add(n); |
| } |
| |
| } |
| |
| return nodes; |
| } |
| |
| Collection<UUID> diffIds = diffFromAffinity.get(p); |
| |
| if (!F.isEmpty(diffIds)) { |
| HashSet<UUID> affIds = affAssignment.getIds(p); |
| |
| for (UUID nodeId : diffIds) { |
| if (affIds.contains(nodeId)) { |
| U.warn(log, "Node from diff is affinity node, skipping it [grp=" + grp.cacheOrGroupName() + |
| ", node=" + nodeId + ']'); |
| |
| continue; |
| } |
| |
| if (hasState(p, nodeId, OWNING, MOVING, RENTING)) { |
| ClusterNode n = ctx.discovery().node(nodeId); |
| |
| if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) { |
| if (nodes == null) { |
| nodes = new ArrayList<>(affNodes.size() + diffIds.size()); |
| |
| nodes.addAll(affNodes); |
| } |
| |
| nodes.add(n); |
| } |
| } |
| } |
| } |
| |
| return nodes; |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * @param p Partition. |
| * @param topVer Topology version ({@code -1} for all nodes). |
| * @param state Partition state. |
| * @param states Additional partition states. |
| * @return List of nodes for the partition. |
| */ |
| private List<ClusterNode> nodes( |
| int p, |
| AffinityTopologyVersion topVer, |
| GridDhtPartitionState state, |
| GridDhtPartitionState... states |
| ) { |
| Collection<UUID> allIds = F.nodeIds(discoCache.cacheGroupAffinityNodes(grp.groupId())); |
| |
| lock.readLock().lock(); |
| |
| try { |
| assert node2part != null && node2part.valid() : "Invalid node-to-partitions map [topVer=" + topVer + |
| ", grp=" + grp.cacheOrGroupName() + |
| ", allIds=" + allIds + |
| ", node2part=" + node2part + ']'; |
| |
| // Node IDs can be null if both, primary and backup, nodes disappear. |
| List<ClusterNode> nodes = new ArrayList<>(); |
| |
| for (UUID id : allIds) { |
| if (hasState(p, id, state, states)) { |
| ClusterNode n = ctx.discovery().node(id); |
| |
| if (n != null && (topVer.topologyVersion() < 0 || n.order() <= topVer.topologyVersion())) |
| nodes.add(n); |
| } |
| } |
| |
| return nodes; |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<ClusterNode> owners(int p, AffinityTopologyVersion topVer) { |
| if (!grp.rebalanceEnabled()) |
| return ownersAndMoving(p, topVer); |
| |
| return nodes(p, topVer, OWNING, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<ClusterNode> owners(int p) { |
| return owners(p, AffinityTopologyVersion.NONE); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<ClusterNode> moving(int p) { |
| if (!grp.rebalanceEnabled()) |
| return ownersAndMoving(p, AffinityTopologyVersion.NONE); |
| |
| return nodes(p, AffinityTopologyVersion.NONE, MOVING, null); |
| } |
| |
| /** |
| * @param p Partition. |
| * @param topVer Topology version. |
| * @return List of nodes in state OWNING or MOVING. |
| */ |
| private List<ClusterNode> ownersAndMoving(int p, AffinityTopologyVersion topVer) { |
| return nodes(p, topVer, OWNING, MOVING_STATES); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long updateSequence() { |
| return updateSeq.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtPartitionFullMap partitionMap(boolean onlyActive) { |
| lock.readLock().lock(); |
| |
| try { |
| if (node2part == null || stopping) |
| return null; |
| |
| assert node2part.valid() : "Invalid node2part [node2part=" + node2part + |
| ", grp=" + grp.cacheOrGroupName() + |
| ", stopping=" + stopping + |
| ", locNodeId=" + ctx.localNode().id() + |
| ", locName=" + ctx.igniteInstanceName() + ']'; |
| |
| GridDhtPartitionFullMap m = node2part; |
| |
| return new GridDhtPartitionFullMap(m.nodeId(), m.nodeOrder(), m.updateSequence(), m, onlyActive); |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Checks should current partition map overwritten by new partition map |
| * Method returns true if topology version or update sequence of new map are greater than of current map |
| * |
| * @param currentMap Current partition map |
| * @param newMap New partition map |
| * @return True if current partition map should be overwritten by new partition map, false in other case |
| */ |
| private boolean shouldOverridePartitionMap(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { |
| return newMap != null && |
| (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) > 0 || |
| newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() > currentMap.updateSequence()); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) |
| @Override public boolean update( |
| @Nullable AffinityTopologyVersion exchangeVer, |
| GridDhtPartitionFullMap partMap, |
| @Nullable CachePartitionFullCountersMap incomeCntrMap, |
| Set<Integer> partsToReload, |
| @Nullable AffinityTopologyVersion msgTopVer) { |
| if (log.isDebugEnabled()) { |
| log.debug("Updating full partition map [grp=" + grp.cacheOrGroupName() + ", exchVer=" + exchangeVer + |
| ", fullMap=" + fullMapString() + ']'); |
| } |
| |
| assert partMap != null; |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| if (stopping || !lastTopChangeVer.initialized() || |
| // Ignore message not-related to exchange if exchange is in progress. |
| (exchangeVer == null && !lastTopChangeVer.equals(readyTopVer))) |
| return false; |
| |
| if (incomeCntrMap != null) { |
| // update local counters in partitions |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part == null) |
| continue; |
| |
| if (part.state() == OWNING || part.state() == MOVING) { |
| long updCntr = incomeCntrMap.updateCounter(part.id()); |
| |
| if (updCntr != 0 && updCntr > part.updateCounter()) |
| part.updateCounter(updCntr); |
| } |
| } |
| } |
| |
| if (exchangeVer != null) { |
| // Ignore if exchange already finished or new exchange started. |
| if (readyTopVer.compareTo(exchangeVer) > 0 || lastTopChangeVer.compareTo(exchangeVer) > 0) { |
| U.warn(log, "Stale exchange id for full partition map update (will ignore) [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", lastTopChange=" + lastTopChangeVer + |
| ", readTopVer=" + readyTopVer + |
| ", exchVer=" + exchangeVer + ']'); |
| |
| return false; |
| } |
| } |
| |
| if (msgTopVer != null && lastTopChangeVer.compareTo(msgTopVer) > 0) { |
| U.warn(log, "Stale version for full partition map update message (will ignore) [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", lastTopChange=" + lastTopChangeVer + |
| ", readTopVer=" + readyTopVer + |
| ", msgVer=" + msgTopVer + ']'); |
| |
| return false; |
| } |
| |
| boolean fullMapUpdated = (node2part == null); |
| |
| if (node2part != null) { |
| for (GridDhtPartitionMap part : node2part.values()) { |
| GridDhtPartitionMap newPart = partMap.get(part.nodeId()); |
| |
| if (shouldOverridePartitionMap(part, newPart)) { |
| fullMapUpdated = true; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Overriding partition map in full update map [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", exchVer=" + exchangeVer + |
| ", curPart=" + mapString(part) + |
| ", newPart=" + mapString(newPart) + ']'); |
| } |
| |
| if (newPart.nodeId().equals(ctx.localNodeId())) |
| updateSeq.setIfGreater(newPart.updateSequence()); |
| } |
| else { |
| // If for some nodes current partition has a newer map, |
| // then we keep the newer value. |
| partMap.put(part.nodeId(), part); |
| } |
| } |
| |
| // Check that we have new nodes. |
| for (GridDhtPartitionMap part : partMap.values()) { |
| if (fullMapUpdated) |
| break; |
| |
| fullMapUpdated = !node2part.containsKey(part.nodeId()); |
| } |
| |
| // Remove entry if node left. |
| for (Iterator<UUID> it = partMap.keySet().iterator(); it.hasNext(); ) { |
| UUID nodeId = it.next(); |
| |
| if (!ctx.discovery().alive(nodeId)) { |
| if (log.isDebugEnabled()) |
| log.debug("Removing left node from full map update [grp=" + grp.cacheOrGroupName() + |
| ", nodeId=" + nodeId + ", partMap=" + partMap + ']'); |
| |
| it.remove(); |
| } |
| } |
| } |
| else { |
| GridDhtPartitionMap locNodeMap = partMap.get(ctx.localNodeId()); |
| |
| if (locNodeMap != null) |
| updateSeq.setIfGreater(locNodeMap.updateSequence()); |
| } |
| |
| if (!fullMapUpdated) { |
| if (log.isDebugEnabled()) { |
| log.debug("No updates for full partition map (will ignore) [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", lastExch=" + lastTopChangeVer + |
| ", exchVer=" + exchangeVer + |
| ", curMap=" + node2part + |
| ", newMap=" + partMap + ']'); |
| } |
| |
| return false; |
| } |
| |
| if (exchangeVer != null) { |
| assert exchangeVer.compareTo(readyTopVer) >= 0 && exchangeVer.compareTo(lastTopChangeVer) >= 0; |
| |
| lastTopChangeVer = readyTopVer = exchangeVer; |
| } |
| |
| node2part = partMap; |
| |
| if (exchangeVer == null && !grp.isReplicated() && |
| (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0)) { |
| AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); |
| |
| for (Map.Entry<UUID, GridDhtPartitionMap> e : partMap.entrySet()) { |
| for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { |
| int p = e0.getKey(); |
| |
| Set<UUID> diffIds = diffFromAffinity.get(p); |
| |
| if ((e0.getValue() == MOVING || e0.getValue() == OWNING || e0.getValue() == RENTING) && |
| !affAssignment.getIds(p).contains(e.getKey())) { |
| |
| if (diffIds == null) |
| diffFromAffinity.put(p, diffIds = U.newHashSet(3)); |
| |
| diffIds.add(e.getKey()); |
| } |
| else { |
| if (diffIds != null && diffIds.remove(e.getKey())) { |
| if (diffIds.isEmpty()) |
| diffFromAffinity.remove(p); |
| } |
| } |
| } |
| } |
| |
| diffFromAffinityVer = readyTopVer; |
| } |
| |
| boolean changed = false; |
| |
| GridDhtPartitionMap nodeMap = partMap.get(ctx.localNodeId()); |
| |
| if (nodeMap != null && grp.persistenceEnabled() && readyTopVer.initialized()) { |
| for (Map.Entry<Integer, GridDhtPartitionState> e : nodeMap.entrySet()) { |
| int p = e.getKey(); |
| GridDhtPartitionState state = e.getValue(); |
| |
| if (state == OWNING) { |
| GridDhtLocalPartition locPart = locParts.get(p); |
| |
| assert locPart != null : grp.cacheOrGroupName(); |
| |
| if (locPart.state() == MOVING) { |
| boolean success = locPart.own(); |
| |
| assert success : locPart; |
| |
| changed |= success; |
| } |
| } |
| else if (state == MOVING) { |
| GridDhtLocalPartition locPart = locParts.get(p); |
| |
| if (locPart == null || locPart.state() == EVICTED) |
| locPart = createPartition(p); |
| |
| if (locPart.state() == OWNING) { |
| locPart.moving(); |
| |
| changed = true; |
| } |
| } |
| else if (state == RENTING && partsToReload.contains(p)) { |
| GridDhtLocalPartition locPart = locParts.get(p); |
| |
| if (locPart == null || locPart.state() == EVICTED) { |
| createPartition(p); |
| |
| changed = true; |
| } |
| else if (locPart.state() == OWNING || locPart.state() == MOVING) { |
| locPart.reload(true); |
| |
| locPart.rent(false); |
| |
| changed = true; |
| } |
| else |
| locPart.reload(true); |
| } |
| } |
| } |
| |
| long updateSeq = this.updateSeq.incrementAndGet(); |
| |
| if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { |
| AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); |
| |
| if (exchangeVer == null) |
| changed |= checkEvictions(updateSeq, aff); |
| |
| updateRebalanceVersion(aff.assignment()); |
| } |
| |
| consistencyCheck(); |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Partition map after full update [grp=" + grp.cacheOrGroupName() + |
| ", map=" + fullMapString() + ']'); |
| } |
| |
| if (changed) |
| ctx.exchange().scheduleResendPartitions(); |
| |
| return changed; |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void collectUpdateCounters(CachePartitionPartialCountersMap cntrMap) { |
| assert cntrMap != null; |
| |
| long now = U.currentTimeMillis(); |
| |
| lock.writeLock().lock(); |
| |
| try { |
| long acquired = U.currentTimeMillis(); |
| |
| if (acquired - now >= 100) { |
| if (timeLog.isInfoEnabled()) |
| timeLog.info("Waited too long to acquire topology write lock " + |
| "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + (acquired - now) + ']'); |
| } |
| |
| if (stopping) |
| return; |
| |
| for (int i = 0; i < cntrMap.size(); i++) { |
| int pId = cntrMap.partitionAt(i); |
| |
| long initialUpdateCntr = cntrMap.initialUpdateCounterAt(i); |
| long updateCntr = cntrMap.updateCounterAt(i); |
| |
| if (this.cntrMap.updateCounter(pId) < updateCntr) { |
| this.cntrMap.initialUpdateCounter(pId, initialUpdateCntr); |
| this.cntrMap.updateCounter(pId, updateCntr); |
| } |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void applyUpdateCounters() { |
| long now = U.currentTimeMillis(); |
| |
| lock.writeLock().lock(); |
| |
| try { |
| long acquired = U.currentTimeMillis(); |
| |
| if (acquired - now >= 100) { |
| if (timeLog.isInfoEnabled()) |
| timeLog.info("Waited too long to acquire topology write lock " + |
| "[grp=" + grp.cacheOrGroupName() + ", waitTime=" + (acquired - now) + ']'); |
| } |
| |
| if (stopping) |
| return; |
| |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part == null) |
| continue; |
| |
| boolean reserve = part.reserve(); |
| |
| try { |
| GridDhtPartitionState state = part.state(); |
| |
| if (!reserve || state == EVICTED || state == RENTING) |
| continue; |
| |
| long updCntr = cntrMap.updateCounter(part.id()); |
| |
| if (updCntr > part.updateCounter()) |
| part.updateCounter(updCntr); |
| else if (part.updateCounter() > 0) { |
| cntrMap.initialUpdateCounter(part.id(), part.initialUpdateCounter()); |
| cntrMap.updateCounter(part.id(), part.updateCounter()); |
| } |
| } |
| finally { |
| if (reserve) |
| part.release(); |
| } |
| } |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * Method checks is new partition map more stale than current partition map |
| * New partition map is stale if topology version or update sequence are less than of current map |
| * |
| * @param currentMap Current partition map |
| * @param newMap New partition map |
| * @return True if new partition map is more stale than current partition map, false in other case |
| */ |
| private boolean isStaleUpdate(GridDhtPartitionMap currentMap, GridDhtPartitionMap newMap) { |
| return currentMap != null && |
| (newMap.topologyVersion().compareTo(currentMap.topologyVersion()) < 0 || |
| newMap.topologyVersion().compareTo(currentMap.topologyVersion()) == 0 && newMap.updateSequence() <= currentMap.updateSequence()); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) |
| @Override public boolean update( |
| @Nullable GridDhtPartitionExchangeId exchId, |
| GridDhtPartitionMap parts, |
| boolean force |
| ) { |
| if (log.isDebugEnabled()) { |
| log.debug("Updating single partition map [grp=" + grp.cacheOrGroupName() + ", exchId=" + exchId + |
| ", parts=" + mapString(parts) + ']'); |
| } |
| |
| if (!ctx.discovery().alive(parts.nodeId())) { |
| if (log.isDebugEnabled()) { |
| log.debug("Received partition update for non-existing node (will ignore) [grp=" + grp.cacheOrGroupName() + |
| ", exchId=" + exchId + ", parts=" + parts + ']'); |
| } |
| |
| return false; |
| } |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| if (stopping) |
| return false; |
| |
| if (!force) { |
| if (lastTopChangeVer.initialized() && exchId != null && lastTopChangeVer.compareTo(exchId.topologyVersion()) > 0) { |
| U.warn(log, "Stale exchange id for single partition map update (will ignore) [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", lastTopChange=" + lastTopChangeVer + |
| ", readTopVer=" + readyTopVer + |
| ", exch=" + exchId.topologyVersion() + ']'); |
| |
| return false; |
| } |
| } |
| |
| if (node2part == null) |
| // Create invalid partition map. |
| node2part = new GridDhtPartitionFullMap(); |
| |
| GridDhtPartitionMap cur = node2part.get(parts.nodeId()); |
| |
| if (force) { |
| if (cur != null && cur.topologyVersion().initialized()) |
| parts.updateSequence(cur.updateSequence(), cur.topologyVersion()); |
| } |
| else if (isStaleUpdate(cur, parts)) { |
| U.warn(log, "Stale update for single partition map update (will ignore) [" + |
| "grp=" + grp.cacheOrGroupName() + |
| ", exchId=" + exchId + |
| ", curMap=" + cur + |
| ", newMap=" + parts + ']'); |
| |
| return false; |
| } |
| |
| long updateSeq = this.updateSeq.incrementAndGet(); |
| |
| node2part.newUpdateSequence(updateSeq); |
| |
| boolean changed = false; |
| |
| if (cur == null || !cur.equals(parts)) |
| changed = true; |
| |
| node2part.put(parts.nodeId(), parts); |
| |
| // During exchange diff is calculated after all messages are received and affinity initialized. |
| if (exchId == null && !grp.isReplicated()) { |
| if (readyTopVer.initialized() && readyTopVer.compareTo(diffFromAffinityVer) >= 0) { |
| AffinityAssignment affAssignment = grp.affinity().readyAffinity(readyTopVer); |
| |
| // Add new mappings. |
| for (Map.Entry<Integer, GridDhtPartitionState> e : parts.entrySet()) { |
| int p = e.getKey(); |
| |
| Set<UUID> diffIds = diffFromAffinity.get(p); |
| |
| if ((e.getValue() == MOVING || e.getValue() == OWNING || e.getValue() == RENTING) |
| && !affAssignment.getIds(p).contains(parts.nodeId())) { |
| if (diffIds == null) |
| diffFromAffinity.put(p, diffIds = U.newHashSet(3)); |
| |
| if (diffIds.add(parts.nodeId())) |
| changed = true; |
| } |
| else { |
| if (diffIds != null && diffIds.remove(parts.nodeId())) { |
| changed = true; |
| |
| if (diffIds.isEmpty()) |
| diffFromAffinity.remove(p); |
| } |
| } |
| } |
| |
| // Remove obsolete mappings. |
| if (cur != null) { |
| for (Integer p : F.view(cur.keySet(), F0.notIn(parts.keySet()))) { |
| Set<UUID> ids = diffFromAffinity.get(p); |
| |
| if (ids != null && ids.remove(parts.nodeId())) { |
| changed = true; |
| |
| if (ids.isEmpty()) |
| diffFromAffinity.remove(p); |
| } |
| } |
| } |
| |
| diffFromAffinityVer = readyTopVer; |
| } |
| } |
| |
| if (readyTopVer.initialized() && readyTopVer.equals(lastTopChangeVer)) { |
| AffinityAssignment aff = grp.affinity().readyAffinity(readyTopVer); |
| |
| if (exchId == null) |
| changed |= checkEvictions(updateSeq, aff); |
| |
| updateRebalanceVersion(aff.assignment()); |
| } |
| |
| consistencyCheck(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Partition map after single update [grp=" + grp.cacheOrGroupName() + ", map=" + fullMapString() + ']'); |
| |
| if (changed && exchId == null) |
| ctx.exchange().scheduleResendPartitions(); |
| |
| return changed; |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onExchangeDone(GridDhtPartitionsExchangeFuture fut, AffinityAssignment assignment, boolean updateRebalanceVer) { |
| lock.writeLock().lock(); |
| |
| try { |
| assert !(topReadyFut instanceof GridDhtPartitionsExchangeFuture) || |
| assignment.topologyVersion().equals(((GridDhtPartitionsExchangeFuture)topReadyFut).context().events().topologyVersion()); |
| |
| readyTopVer = lastTopChangeVer = assignment.topologyVersion(); |
| |
| if (!grp.isReplicated()) { |
| boolean rebuildDiff = fut == null || fut.localJoinExchange() || fut.serverNodeDiscoveryEvent() || |
| fut.firstEvent().type() == EVT_DISCOVERY_CUSTOM_EVT || !diffFromAffinityVer.initialized(); |
| |
| if (rebuildDiff) { |
| if (assignment.topologyVersion().compareTo(diffFromAffinityVer) >= 0) |
| rebuildDiff(assignment); |
| } |
| else |
| diffFromAffinityVer = readyTopVer; |
| } |
| |
| if (updateRebalanceVer) |
| updateRebalanceVersion(assignment.assignment()); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** |
| * @param aff Affinity. |
| */ |
| private void createMovingPartitions(AffinityAssignment aff) { |
| for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { |
| GridDhtPartitionMap map = e.getValue(); |
| |
| addMoving(map, aff.backupPartitions(e.getKey())); |
| addMoving(map, aff.primaryPartitions(e.getKey())); |
| } |
| } |
| |
| /** |
| * @param map Node partition state map. |
| * @param parts Partitions assigned to node. |
| */ |
| private void addMoving(GridDhtPartitionMap map, Set<Integer> parts) { |
| if (F.isEmpty(parts)) |
| return; |
| |
| for (Integer p : parts) { |
| GridDhtPartitionState state = map.get(p); |
| |
| if (state == null || state == EVICTED) |
| map.put(p, MOVING); |
| } |
| } |
| |
| /** |
| * @param affAssignment New affinity assignment. |
| */ |
| private void rebuildDiff(AffinityAssignment affAssignment) { |
| assert lock.isWriteLockedByCurrentThread(); |
| |
| if (node2part == null) |
| return; |
| |
| if (FAST_DIFF_REBUILD) { |
| Collection<UUID> affNodes = F.nodeIds(ctx.discovery().cacheGroupAffinityNodes(grp.groupId(), |
| affAssignment.topologyVersion())); |
| |
| for (Map.Entry<Integer, Set<UUID>> e : diffFromAffinity.entrySet()) { |
| int p = e.getKey(); |
| |
| Iterator<UUID> iter = e.getValue().iterator(); |
| |
| while (iter.hasNext()) { |
| UUID nodeId = iter.next(); |
| |
| if (!affNodes.contains(nodeId) || affAssignment.getIds(p).contains(nodeId)) |
| iter.remove(); |
| } |
| } |
| } |
| else { |
| for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { |
| UUID nodeId = e.getKey(); |
| |
| for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { |
| Integer p0 = e0.getKey(); |
| |
| GridDhtPartitionState state = e0.getValue(); |
| |
| Set<UUID> ids = diffFromAffinity.get(p0); |
| |
| if ((state == MOVING || state == OWNING || state == RENTING) && !affAssignment.getIds(p0).contains(nodeId)) { |
| if (ids == null) |
| diffFromAffinity.put(p0, ids = U.newHashSet(3)); |
| |
| ids.add(nodeId); |
| } |
| else { |
| if (ids != null) |
| ids.remove(nodeId); |
| } |
| } |
| } |
| } |
| |
| diffFromAffinityVer = affAssignment.topologyVersion(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean detectLostPartitions(AffinityTopologyVersion resTopVer, DiscoveryEvent discoEvt) { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| if (node2part == null) |
| return false; |
| |
| int parts = grp.affinity().partitions(); |
| |
| Set<Integer> lost = new HashSet<>(parts); |
| |
| for (int p = 0; p < parts; p++) |
| lost.add(p); |
| |
| for (GridDhtPartitionMap partMap : node2part.values()) { |
| for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { |
| if (e.getValue() == OWNING) { |
| lost.remove(e.getKey()); |
| |
| if (lost.isEmpty()) |
| break; |
| } |
| } |
| } |
| |
| boolean changed = false; |
| |
| if (!F.isEmpty(lost)) { |
| PartitionLossPolicy plc = grp.config().getPartitionLossPolicy(); |
| |
| assert plc != null; |
| |
| // Update partition state on all nodes. |
| for (Integer part : lost) { |
| long updSeq = updateSeq.incrementAndGet(); |
| |
| GridDhtLocalPartition locPart = localPartition(part, resTopVer, false, true); |
| |
| if (locPart != null) { |
| boolean marked = plc == PartitionLossPolicy.IGNORE ? locPart.own() : locPart.markLost(); |
| |
| if (marked) |
| updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); |
| |
| changed |= marked; |
| } |
| // Update map for remote node. |
| else if (plc != PartitionLossPolicy.IGNORE) { |
| for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { |
| if (e.getKey().equals(ctx.localNodeId())) |
| continue; |
| |
| if (e.getValue().get(part) != EVICTED) |
| e.getValue().put(part, LOST); |
| } |
| } |
| |
| if (grp.eventRecordable(EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST)) { |
| grp.addRebalanceEvent(part, |
| EVT_CACHE_REBALANCE_PART_DATA_LOST, |
| discoEvt.eventNode(), |
| discoEvt.type(), |
| discoEvt.timestamp()); |
| } |
| } |
| |
| if (plc != PartitionLossPolicy.IGNORE) |
| grp.needsRecovery(true); |
| } |
| |
| return changed; |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void resetLostPartitions(AffinityTopologyVersion resTopVer) { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| long updSeq = updateSeq.incrementAndGet(); |
| |
| for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { |
| for (Map.Entry<Integer, GridDhtPartitionState> e0 : e.getValue().entrySet()) { |
| if (e0.getValue() != LOST) |
| continue; |
| |
| e0.setValue(OWNING); |
| |
| GridDhtLocalPartition locPart = localPartition(e0.getKey(), resTopVer, false); |
| |
| if (locPart != null && locPart.state() == LOST) { |
| boolean marked = locPart.own(); |
| |
| if (marked) |
| updateLocal(locPart.id(), locPart.state(), updSeq, resTopVer); |
| } |
| } |
| } |
| |
| checkEvictions(updSeq, grp.affinity().readyAffinity(resTopVer)); |
| |
| grp.needsRecovery(false); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Integer> lostPartitions() { |
| if (grp.config().getPartitionLossPolicy() == PartitionLossPolicy.IGNORE) |
| return Collections.emptySet(); |
| |
| lock.readLock().lock(); |
| |
| try { |
| Set<Integer> res = null; |
| |
| int parts = grp.affinity().partitions(); |
| |
| for (GridDhtPartitionMap partMap : node2part.values()) { |
| for (Map.Entry<Integer, GridDhtPartitionState> e : partMap.entrySet()) { |
| if (e.getValue() == LOST) { |
| if (res == null) |
| res = new HashSet<>(parts); |
| |
| res.add(e.getKey()); |
| } |
| } |
| } |
| |
| return res == null ? Collections.<Integer>emptySet() : res; |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Set<UUID> setOwners(int p, Set<UUID> owners, boolean haveHistory, boolean updateSeq) { |
| Set<UUID> result = haveHistory ? Collections.<UUID>emptySet() : new HashSet<UUID>(); |
| |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| GridDhtLocalPartition locPart = locParts.get(p); |
| |
| if (locPart != null) { |
| if (locPart.state() == OWNING && !owners.contains(ctx.localNodeId())) { |
| if (haveHistory) |
| locPart.moving(); |
| else { |
| locPart.rent(false); |
| |
| locPart.reload(true); |
| |
| result.add(ctx.localNodeId()); |
| } |
| |
| U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + |
| "[nodeId=" + ctx.localNodeId() + ", grp=" + grp.cacheOrGroupName() + |
| ", partId=" + locPart.id() + ", haveHistory=" + haveHistory + "]"); |
| |
| } |
| } |
| |
| for (Map.Entry<UUID, GridDhtPartitionMap> e : node2part.entrySet()) { |
| GridDhtPartitionMap partMap = e.getValue(); |
| |
| if (!partMap.containsKey(p)) |
| continue; |
| |
| if (partMap.get(p) == OWNING && !owners.contains(e.getKey())) { |
| if (haveHistory) |
| partMap.put(p, MOVING); |
| else { |
| partMap.put(p, RENTING); |
| |
| result.add(e.getKey()); |
| } |
| |
| partMap.updateSequence(partMap.updateSequence() + 1, partMap.topologyVersion()); |
| |
| if (partMap.nodeId().equals(ctx.localNodeId())) |
| this.updateSeq.setIfGreater(partMap.updateSequence()); |
| |
| U.warn(log, "Partition has been scheduled for rebalancing due to outdated update counter " + |
| "[nodeId=" + e.getKey() + ", grp=" + grp.cacheOrGroupName() + |
| ", partId=" + p + ", haveHistory=" + haveHistory + "]"); |
| } |
| } |
| |
| if (updateSeq) |
| node2part = new GridDhtPartitionFullMap(node2part, this.updateSeq.incrementAndGet()); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * @param updateSeq Update sequence. |
| * @param aff Affinity assignments. |
| * @return Checks if any of the local partitions need to be evicted. |
| */ |
| private boolean checkEvictions(long updateSeq, AffinityAssignment aff) { |
| if (!ctx.kernalContext().state().evictionsAllowed()) |
| return false; |
| |
| boolean changed = false; |
| |
| UUID locId = ctx.localNodeId(); |
| |
| for (int p = 0; p < locParts.length(); p++) { |
| GridDhtLocalPartition part = locParts.get(p); |
| |
| if (part == null) |
| continue; |
| |
| GridDhtPartitionState state = part.state(); |
| |
| if (state.active()) { |
| List<ClusterNode> affNodes = aff.get(p); |
| |
| if (!affNodes.contains(ctx.localNode())) { |
| List<ClusterNode> nodes = nodes(p, aff.topologyVersion(), OWNING, null); |
| Collection<UUID> nodeIds = F.nodeIds(nodes); |
| |
| // If all affinity nodes are owners, then evict partition from local node. |
| if (nodeIds.containsAll(F.nodeIds(affNodes))) { |
| part.reload(false); |
| |
| part.rent(false); |
| |
| updateSeq = updateLocal(part.id(), part.state(), updateSeq, aff.topologyVersion()); |
| |
| changed = true; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Evicted local partition (all affinity nodes are owners) [grp=" + grp.cacheOrGroupName() + |
| ", part=" + part + ']'); |
| } |
| } |
| else { |
| int ownerCnt = nodeIds.size(); |
| int affCnt = affNodes.size(); |
| |
| if (ownerCnt > affCnt) { //TODO !!! we could loss all owners in such case. Should be fixed by GG-13223 |
| // Sort by node orders in ascending order. |
| Collections.sort(nodes, CU.nodeComparator(true)); |
| |
| int diff = nodes.size() - affCnt; |
| |
| for (int i = 0; i < diff; i++) { |
| ClusterNode n = nodes.get(i); |
| |
| if (locId.equals(n.id())) { |
| part.reload(false); |
| |
| part.rent(false); |
| |
| updateSeq = updateLocal(part.id(), |
| part.state(), |
| updateSeq, |
| aff.topologyVersion()); |
| |
| changed = true; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Evicted local partition (this node is oldest non-affinity node) [" + |
| "grp=" + grp.cacheOrGroupName() + ", part=" + part + ']'); |
| } |
| |
| break; |
| } |
| } |
| } |
| } |
| } |
| } |
| } |
| |
| return changed; |
| } |
| |
| /** |
| * Updates value for single partition. |
| * |
| * @param p Partition. |
| * @param state State. |
| * @param updateSeq Update sequence. |
| * @param affVer Affinity version. |
| * @return Update sequence. |
| */ |
| @SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"}) |
| private long updateLocal(int p, GridDhtPartitionState state, long updateSeq, AffinityTopologyVersion affVer) { |
| assert lock.isWriteLockedByCurrentThread(); |
| |
| ClusterNode oldest = discoCache.oldestAliveServerNode(); |
| |
| assert oldest != null || ctx.kernalContext().clientNode(); |
| |
| // If this node became the oldest node. |
| if (ctx.localNode().equals(oldest) && node2part != null) { |
| long seq = node2part.updateSequence(); |
| |
| if (seq != updateSeq) { |
| if (seq > updateSeq) { |
| long seq0 = this.updateSeq.get(); |
| |
| if (seq0 < seq) { |
| // Update global counter if necessary. |
| boolean b = this.updateSeq.compareAndSet(seq0, seq + 1); |
| |
| assert b : "Invalid update sequence [updateSeq=" + updateSeq + |
| ", grp=" + grp.cacheOrGroupName() + |
| ", seq=" + seq + |
| ", curUpdateSeq=" + this.updateSeq.get() + |
| ", node2part=" + node2part.toFullString() + ']'; |
| |
| updateSeq = seq + 1; |
| } |
| else |
| updateSeq = seq; |
| } |
| |
| node2part.updateSequence(updateSeq); |
| } |
| } |
| |
| if (node2part != null) { |
| UUID locNodeId = ctx.localNodeId(); |
| |
| GridDhtPartitionMap map = node2part.get(locNodeId); |
| |
| if (map == null) { |
| map = new GridDhtPartitionMap(locNodeId, |
| updateSeq, |
| affVer, |
| GridPartitionStateMap.EMPTY, |
| false); |
| |
| node2part.put(locNodeId, map); |
| } |
| |
| map.updateSequence(updateSeq, affVer); |
| |
| map.put(p, state); |
| |
| if (!grp.isReplicated() && (state == MOVING || state == OWNING || state == RENTING)) { |
| AffinityAssignment assignment = grp.affinity().cachedAffinity(diffFromAffinityVer); |
| |
| if (!assignment.getIds(p).contains(ctx.localNodeId())) { |
| Set<UUID> diffIds = diffFromAffinity.get(p); |
| |
| if (diffIds == null) |
| diffFromAffinity.put(p, diffIds = U.newHashSet(3)); |
| |
| diffIds.add(ctx.localNodeId()); |
| } |
| } |
| } |
| |
| return updateSeq; |
| } |
| |
| /** |
| * @param nodeId Node to remove. |
| */ |
| private void removeNode(UUID nodeId) { |
| assert nodeId != null; |
| assert lock.isWriteLockedByCurrentThread(); |
| |
| ClusterNode oldest = discoCache.oldestAliveServerNode(); |
| |
| assert oldest != null || ctx.kernalContext().clientNode(); |
| |
| ClusterNode loc = ctx.localNode(); |
| |
| if (node2part != null) { |
| if (loc.equals(oldest) && !node2part.nodeId().equals(loc.id())) |
| node2part = new GridDhtPartitionFullMap(loc.id(), loc.order(), updateSeq.get(), |
| node2part, false); |
| else |
| node2part = new GridDhtPartitionFullMap(node2part, node2part.updateSequence()); |
| |
| GridDhtPartitionMap parts = node2part.remove(nodeId); |
| |
| if (!grp.isReplicated()) { |
| if (parts != null) { |
| for (Integer p : parts.keySet()) { |
| Set<UUID> diffIds = diffFromAffinity.get(p); |
| |
| if (diffIds != null) |
| diffIds.remove(nodeId); |
| } |
| } |
| } |
| |
| consistencyCheck(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean own(GridDhtLocalPartition part) { |
| lock.writeLock().lock(); |
| |
| try { |
| if (part.own()) { |
| assert lastTopChangeVer.initialized() : lastTopChangeVer; |
| |
| updateLocal(part.id(), part.state(), updateSeq.incrementAndGet(), lastTopChangeVer); |
| |
| consistencyCheck(); |
| |
| return true; |
| } |
| |
| consistencyCheck(); |
| |
| return false; |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onEvicted(GridDhtLocalPartition part, boolean updateSeq) { |
| ctx.database().checkpointReadLock(); |
| |
| try { |
| lock.writeLock().lock(); |
| |
| try { |
| if (stopping) |
| return; |
| |
| assert part.state() == EVICTED; |
| |
| long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get(); |
| |
| if (part.reload()) |
| part = createPartition(part.id()); |
| |
| assert lastTopChangeVer.initialized() : lastTopChangeVer; |
| |
| updateLocal(part.id(), part.state(), seq, lastTopChangeVer); |
| |
| consistencyCheck(); |
| } |
| finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| finally { |
| ctx.database().checkpointReadUnlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public GridDhtPartitionMap partitions(UUID nodeId) { |
| lock.readLock().lock(); |
| |
| try { |
| return node2part.get(nodeId); |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CachePartitionFullCountersMap fullUpdateCounters() { |
| lock.readLock().lock(); |
| |
| try { |
| return new CachePartitionFullCountersMap(cntrMap); |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CachePartitionPartialCountersMap localUpdateCounters(boolean skipZeros) { |
| lock.readLock().lock(); |
| |
| try { |
| int locPartCnt = 0; |
| |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part != null) |
| locPartCnt++; |
| } |
| |
| CachePartitionPartialCountersMap res = new CachePartitionPartialCountersMap(locPartCnt); |
| |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part == null) |
| continue; |
| |
| long updCntr = part.updateCounter(); |
| long initCntr = part.initialUpdateCounter(); |
| |
| if (skipZeros && initCntr == 0L && updCntr == 0L) |
| continue; |
| |
| res.add(part.id(), initCntr, updCntr); |
| } |
| |
| res.trim(); |
| |
| return res; |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean rebalanceFinished(AffinityTopologyVersion topVer) { |
| AffinityTopologyVersion curTopVer = this.readyTopVer; |
| |
| return curTopVer.equals(topVer) && curTopVer.equals(rebalancedTopVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasMovingPartitions() { |
| lock.readLock().lock(); |
| |
| try { |
| if (node2part == null) |
| return false; |
| |
| assert node2part.valid() : "Invalid node2part [node2part: " + node2part + |
| ", grp=" + grp.cacheOrGroupName() + |
| ", stopping=" + stopping + |
| ", locNodeId=" + ctx.localNodeId() + |
| ", locName=" + ctx.igniteInstanceName() + ']'; |
| |
| for (GridDhtPartitionMap map : node2part.values()) { |
| if (map.hasMovingPartitions()) |
| return true; |
| } |
| |
| return false; |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| */ |
| public void onCacheStopped(int cacheId) { |
| if (!grp.sharedGroup()) |
| return; |
| |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part != null) |
| part.onCacheStopped(cacheId); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats(int threshold) { |
| X.println(">>> Cache partition topology stats [igniteInstanceName=" + ctx.igniteInstanceName() + |
| ", grp=" + grp.cacheOrGroupName() + ']'); |
| |
| lock.readLock().lock(); |
| |
| try { |
| for (int i = 0; i < locParts.length(); i++) { |
| GridDhtLocalPartition part = locParts.get(i); |
| |
| if (part == null) |
| continue; |
| |
| int size = part.dataStore().fullSize(); |
| |
| if (size >= threshold) |
| X.println(">>> Local partition [part=" + part.id() + ", size=" + size + ']'); |
| } |
| } |
| finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * @param part Partition. |
| * @param aff Affinity assignments. |
| * @return {@code True} if given partition belongs to local node. |
| */ |
| private boolean localNode(int part, List<List<ClusterNode>> aff) { |
| return aff.get(part).contains(ctx.localNode()); |
| } |
| |
| /** |
| * @param aff Affinity assignments. |
| */ |
| private void updateRebalanceVersion(List<List<ClusterNode>> aff) { |
| if (!rebalancedTopVer.equals(readyTopVer)) { |
| if (node2part == null || !node2part.valid()) |
| return; |
| |
| for (int i = 0; i < grp.affinity().partitions(); i++) { |
| List<ClusterNode> affNodes = aff.get(i); |
| |
| // Topology doesn't contain server nodes (just clients). |
| if (affNodes.isEmpty()) |
| continue; |
| |
| Set<ClusterNode> owners = U.newHashSet(affNodes.size()); |
| |
| for (ClusterNode node : affNodes) { |
| if (hasState(i, node.id(), OWNING)) |
| owners.add(node); |
| } |
| |
| if (!grp.isReplicated()) { |
| Set<UUID> diff = diffFromAffinity.get(i); |
| |
| if (diff != null) { |
| for (UUID nodeId : diff) { |
| if (hasState(i, nodeId, OWNING)) { |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node != null) |
| owners.add(node); |
| } |
| } |
| } |
| } |
| |
| if (affNodes.size() != owners.size() || !owners.containsAll(affNodes)) |
| return; |
| } |
| |
| rebalancedTopVer = readyTopVer; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Updated rebalanced version [grp=" + grp.cacheOrGroupName() + ", ver=" + rebalancedTopVer + ']'); |
| } |
| } |
| |
| /** |
| * @param p Partition. |
| * @param nodeId Node ID. |
| * @param match State to match. |
| * @param matches Additional states. |
| * @return Filter for owners of this partition. |
| */ |
| private boolean hasState(final int p, @Nullable UUID nodeId, final GridDhtPartitionState match, |
| final GridDhtPartitionState... matches) { |
| if (nodeId == null) |
| return false; |
| |
| GridDhtPartitionMap parts = node2part.get(nodeId); |
| |
| // Set can be null if node has been removed. |
| if (parts != null) { |
| GridDhtPartitionState state = parts.get(p); |
| |
| if (state == match) |
| return true; |
| |
| if (matches != null && matches.length > 0) { |
| for (GridDhtPartitionState s : matches) { |
| if (state == s) |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Checks consistency after all operations. |
| */ |
| private void consistencyCheck() { |
| // no-op |
| } |
| |
| /** |
| * Iterator over current local partitions. |
| */ |
| private class CurrentPartitionsIterator implements Iterator<GridDhtLocalPartition> { |
| /** Next index. */ |
| private int nextIdx; |
| |
| /** Next partition. */ |
| private GridDhtLocalPartition nextPart; |
| |
| /** |
| * Constructor |
| */ |
| private CurrentPartitionsIterator() { |
| advance(); |
| } |
| |
| /** |
| * Try to advance to next partition. |
| */ |
| private void advance() { |
| while (nextIdx < locParts.length()) { |
| GridDhtLocalPartition part = locParts.get(nextIdx); |
| |
| if (part != null && part.state().active()) { |
| nextPart = part; |
| return; |
| } |
| |
| nextIdx++; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return nextPart != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDhtLocalPartition next() { |
| if (nextPart == null) |
| throw new NoSuchElementException(); |
| |
| GridDhtLocalPartition retVal = nextPart; |
| |
| nextPart = null; |
| nextIdx++; |
| |
| advance(); |
| |
| return retVal; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| throw new UnsupportedOperationException("remove"); |
| } |
| } |
| } |