| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht.preloader; |
| |
| import org.apache.ignite.*; |
| import org.apache.ignite.cache.affinity.*; |
| import org.apache.ignite.cluster.*; |
| import org.apache.ignite.events.*; |
| import org.apache.ignite.internal.*; |
| import org.apache.ignite.internal.cluster.*; |
| import org.apache.ignite.internal.managers.discovery.*; |
| import org.apache.ignite.internal.processors.affinity.*; |
| import org.apache.ignite.internal.processors.cache.*; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.*; |
| import org.apache.ignite.internal.processors.cache.transactions.*; |
| import org.apache.ignite.internal.processors.cache.version.*; |
| import org.apache.ignite.internal.processors.timeout.*; |
| import org.apache.ignite.internal.util.*; |
| import org.apache.ignite.internal.util.future.*; |
| import org.apache.ignite.internal.util.tostring.*; |
| import org.apache.ignite.internal.util.typedef.*; |
| import org.apache.ignite.internal.util.typedef.internal.*; |
| import org.apache.ignite.lang.*; |
| import org.jetbrains.annotations.*; |
| import org.jsr166.*; |
| |
| import java.util.*; |
| import java.util.concurrent.*; |
| import java.util.concurrent.atomic.*; |
| import java.util.concurrent.locks.*; |
| |
| import static org.apache.ignite.events.EventType.*; |
| import static org.apache.ignite.internal.events.DiscoveryCustomEvent.*; |
| import static org.apache.ignite.internal.managers.communication.GridIoPolicy.*; |
| |
| /** |
| * Future for exchanging partition maps. |
| */ |
| public class GridDhtPartitionsExchangeFuture extends GridFutureAdapter<AffinityTopologyVersion> |
| implements Comparable<GridDhtPartitionsExchangeFuture>, GridDhtTopologyFuture { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Dummy flag. */ |
| private final boolean dummy; |
| |
| /** Force preload flag. */ |
| private final boolean forcePreload; |
| |
| /** Dummy reassign flag. */ |
| private final boolean reassign; |
| |
| /** Discovery event. */ |
| private volatile DiscoveryEvent discoEvt; |
| |
| /** */ |
| @GridToStringInclude |
| private final Collection<UUID> rcvdIds = new GridConcurrentHashSet<>(); |
| |
| /** Remote nodes. */ |
| private volatile Collection<ClusterNode> rmtNodes; |
| |
| /** Remote nodes. */ |
| @GridToStringInclude |
| private volatile Collection<UUID> rmtIds; |
| |
| /** Oldest node. */ |
| @GridToStringExclude |
| private final AtomicReference<ClusterNode> oldestNode = new AtomicReference<>(); |
| |
| /** ExchangeFuture id. */ |
| private final GridDhtPartitionExchangeId exchId; |
| |
| /** Init flag. */ |
| @GridToStringInclude |
| private final AtomicBoolean init = new AtomicBoolean(false); |
| |
| /** Ready for reply flag. */ |
| @GridToStringInclude |
| private final AtomicBoolean ready = new AtomicBoolean(false); |
| |
| /** Replied flag. */ |
| @GridToStringInclude |
| private final AtomicBoolean replied = new AtomicBoolean(false); |
| |
| /** Timeout object. */ |
| @GridToStringExclude |
| private volatile GridTimeoutObject timeoutObj; |
| |
| /** Cache context. */ |
| private final GridCacheSharedContext<?, ?> cctx; |
| |
| /** Busy lock to prevent activities from accessing exchanger while it's stopping. */ |
| private ReadWriteLock busyLock; |
| |
| /** */ |
| private AtomicBoolean added = new AtomicBoolean(false); |
| |
| /** Event latch. */ |
| @GridToStringExclude |
| private CountDownLatch evtLatch = new CountDownLatch(1); |
| |
| /** */ |
| private GridFutureAdapter<Boolean> initFut; |
| |
| /** Topology snapshot. */ |
| private AtomicReference<GridDiscoveryTopologySnapshot> topSnapshot = new AtomicReference<>(); |
| |
| /** Last committed cache version before next topology version use. */ |
| private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>(); |
| |
| /** |
| * Messages received on non-coordinator are stored in case if this node |
| * becomes coordinator. |
| */ |
| private final Map<UUID, GridDhtPartitionsSingleMessage> singleMsgs = new ConcurrentHashMap8<>(); |
| |
| /** Messages received from new coordinator. */ |
| private final Map<UUID, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap8<>(); |
| |
| /** */ |
| @SuppressWarnings({"FieldCanBeLocal", "UnusedDeclaration"}) |
| @GridToStringInclude |
| private volatile IgniteInternalFuture<?> partReleaseFut; |
| |
| /** */ |
| private final Object mux = new Object(); |
| |
| /** Logger. */ |
| private IgniteLogger log; |
| |
| /** Dynamic cache change requests. */ |
| private Collection<DynamicCacheChangeRequest> reqs; |
| |
| /** Cache validation results. */ |
| private volatile Map<Integer, Boolean> cacheValidRes; |
| |
| /** Skip preload flag. */ |
| private boolean skipPreload; |
| |
| /** |
| * Dummy future created to trigger reassignments if partition |
| * topology changed while preloading. |
| * |
| * @param cctx Cache context. |
| * @param reassign Dummy reassign flag. |
| * @param discoEvt Discovery event. |
| * @param exchId Exchange id. |
| */ |
| public GridDhtPartitionsExchangeFuture( |
| GridCacheSharedContext cctx, |
| boolean reassign, |
| DiscoveryEvent discoEvt, |
| GridDhtPartitionExchangeId exchId |
| ) { |
| dummy = true; |
| forcePreload = false; |
| |
| this.exchId = exchId; |
| this.reassign = reassign; |
| this.discoEvt = discoEvt; |
| this.cctx = cctx; |
| |
| onDone(exchId.topologyVersion()); |
| } |
| |
| /** |
| * Force preload future created to trigger reassignments if partition |
| * topology changed while preloading. |
| * |
| * @param cctx Cache context. |
| * @param discoEvt Discovery event. |
| * @param exchId Exchange id. |
| */ |
| public GridDhtPartitionsExchangeFuture(GridCacheSharedContext cctx, DiscoveryEvent discoEvt, |
| GridDhtPartitionExchangeId exchId) { |
| dummy = false; |
| forcePreload = true; |
| |
| this.exchId = exchId; |
| this.discoEvt = discoEvt; |
| this.cctx = cctx; |
| |
| reassign = true; |
| |
| onDone(exchId.topologyVersion()); |
| } |
| |
| /** |
| * @param cctx Cache context. |
| * @param busyLock Busy lock. |
| * @param exchId Exchange ID. |
| * @param reqs Cache change requests. |
| */ |
| public GridDhtPartitionsExchangeFuture( |
| GridCacheSharedContext cctx, |
| ReadWriteLock busyLock, |
| GridDhtPartitionExchangeId exchId, |
| Collection<DynamicCacheChangeRequest> reqs |
| ) { |
| assert busyLock != null; |
| assert exchId != null; |
| |
| dummy = false; |
| forcePreload = false; |
| reassign = false; |
| |
| this.cctx = cctx; |
| this.busyLock = busyLock; |
| this.exchId = exchId; |
| this.reqs = reqs; |
| |
| log = cctx.logger(getClass()); |
| |
| initFut = new GridFutureAdapter<>(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']'); |
| } |
| |
| /** |
| * @param reqs Cache change requests. |
| */ |
| public void cacheChangeRequests(Collection<DynamicCacheChangeRequest> reqs) { |
| this.reqs = reqs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridDiscoveryTopologySnapshot topologySnapshot() throws IgniteCheckedException { |
| get(); |
| |
| if (topSnapshot.get() == null) |
| topSnapshot.compareAndSet(null, new GridDiscoveryTopologySnapshot(discoEvt.topologyVersion(), |
| discoEvt.topologyNodes())); |
| |
| return topSnapshot.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public AffinityTopologyVersion topologyVersion() { |
| return exchId.topologyVersion(); |
| } |
| |
| /** |
| * @return Skip preload flag. |
| */ |
| public boolean skipPreload() { |
| return skipPreload; |
| } |
| |
| /** |
| * @return Dummy flag. |
| */ |
| public boolean dummy() { |
| return dummy; |
| } |
| |
| /** |
| * @return Force preload flag. |
| */ |
| public boolean forcePreload() { |
| return forcePreload; |
| } |
| |
| /** |
| * @return Dummy reassign flag. |
| */ |
| public boolean reassign() { |
| return reassign; |
| } |
| |
| /** |
| * @return {@code True} if dummy reassign. |
| */ |
| public boolean dummyReassign() { |
| return (dummy() || forcePreload()) && reassign(); |
| } |
| |
| /** |
| * @param cacheId Cache ID to check. |
| * @param topVer Topology version. |
| * @return {@code True} if cache was added during this exchange. |
| */ |
| public boolean isCacheAdded(int cacheId, AffinityTopologyVersion topVer) { |
| if (!F.isEmpty(reqs)) { |
| for (DynamicCacheChangeRequest req : reqs) { |
| if (req.start() && !req.clientStartOnly()) { |
| if (CU.cacheId(req.cacheName()) == cacheId) |
| return true; |
| } |
| } |
| } |
| |
| GridCacheContext<?, ?> cacheCtx = cctx.cacheContext(cacheId); |
| |
| return cacheCtx != null && F.eq(cacheCtx.startTopologyVersion(), topVer); |
| } |
| |
| /** |
| * @param cacheId Cache ID. |
| * @return {@code True} if local client has been added. |
| */ |
| public boolean isLocalClientAdded(int cacheId) { |
| if (!F.isEmpty(reqs)) { |
| for (DynamicCacheChangeRequest req : reqs) { |
| if (req.start() && F.eq(req.initiatingNodeId(), cctx.localNodeId())) { |
| if (CU.cacheId(req.cacheName()) == cacheId) |
| return true; |
| } |
| } |
| } |
| |
| return false; |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void initTopology(GridCacheContext cacheCtx) throws IgniteCheckedException { |
| if (stopping(cacheCtx.cacheId())) |
| return; |
| |
| if (canCalculateAffinity(cacheCtx)) { |
| if (log.isDebugEnabled()) |
| log.debug("Will recalculate affinity [locNodeId=" + cctx.localNodeId() + ", exchId=" + exchId + ']'); |
| |
| cacheCtx.affinity().calculateAffinity(exchId.topologyVersion(), discoEvt); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Will request affinity from remote node [locNodeId=" + cctx.localNodeId() + ", exchId=" + |
| exchId + ']'); |
| |
| // Fetch affinity assignment from remote node. |
| GridDhtAssignmentFetchFuture fetchFut = new GridDhtAssignmentFetchFuture(cacheCtx, |
| exchId.topologyVersion(), |
| CU.affinityNodes(cacheCtx, exchId.topologyVersion())); |
| |
| fetchFut.init(); |
| |
| List<List<ClusterNode>> affAssignment = fetchFut.get(); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Fetched affinity from remote node, initializing affinity assignment [locNodeId=" + |
| cctx.localNodeId() + ", topVer=" + exchId.topologyVersion() + ']'); |
| |
| if (affAssignment == null) { |
| affAssignment = new ArrayList<>(cacheCtx.affinity().partitions()); |
| |
| List<ClusterNode> empty = Collections.emptyList(); |
| |
| for (int i = 0; i < cacheCtx.affinity().partitions(); i++) |
| affAssignment.add(empty); |
| } |
| |
| cacheCtx.affinity().initializeAffinity(exchId.topologyVersion(), affAssignment); |
| } |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @return {@code True} if local node can calculate affinity on it's own for this partition map exchange. |
| */ |
| private boolean canCalculateAffinity(GridCacheContext cacheCtx) { |
| AffinityFunction affFunc = cacheCtx.config().getAffinity(); |
| |
| // Do not request affinity from remote nodes if affinity function is not centralized. |
| if (!U.hasAnnotation(affFunc, AffinityCentralizedFunction.class)) |
| return true; |
| |
| // If local node did not initiate exchange or local node is the only cache node in grid. |
| Collection<ClusterNode> affNodes = CU.affinityNodes(cacheCtx, exchId.topologyVersion()); |
| |
| return !exchId.nodeId().equals(cctx.localNodeId()) || |
| (affNodes.size() == 1 && affNodes.contains(cctx.localNode())); |
| } |
| |
| /** |
| * @return {@code True} |
| */ |
| public boolean onAdded() { |
| return added.compareAndSet(false, true); |
| } |
| |
| /** |
| * Event callback. |
| * |
| * @param exchId Exchange ID. |
| * @param discoEvt Discovery event. |
| */ |
| public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt) { |
| assert exchId.equals(this.exchId); |
| |
| this.discoEvt = discoEvt; |
| |
| evtLatch.countDown(); |
| } |
| |
| /** |
| * @return Discovery event. |
| */ |
| public DiscoveryEvent discoveryEvent() { |
| return discoEvt; |
| } |
| |
| /** |
| * @return Exchange ID. |
| */ |
| public GridDhtPartitionExchangeId exchangeId() { |
| return exchId; |
| } |
| |
| /** |
| * @return {@code true} if entered to busy state. |
| */ |
| private boolean enterBusy() { |
| if (busyLock.readLock().tryLock()) |
| return true; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Failed to enter busy state (exchanger is stopping): " + this); |
| |
| return false; |
| } |
| |
| /** |
| * |
| */ |
| private void leaveBusy() { |
| busyLock.readLock().unlock(); |
| } |
| |
| /** |
| * Starts activity. |
| * |
| * @throws IgniteInterruptedCheckedException If interrupted. |
| */ |
| public void init() throws IgniteInterruptedCheckedException { |
| if (isDone()) |
| return; |
| |
| if (init.compareAndSet(false, true)) { |
| if (isDone()) |
| return; |
| |
| try { |
| // Wait for event to occur to make sure that discovery |
| // will return corresponding nodes. |
| U.await(evtLatch); |
| |
| assert discoEvt != null : this; |
| assert !dummy && !forcePreload : this; |
| |
| ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); |
| |
| oldestNode.set(oldest); |
| |
| startCaches(); |
| |
| // True if client node joined or failed. |
| boolean clientNodeEvt; |
| |
| if (F.isEmpty(reqs)) { |
| int type = discoEvt.type(); |
| |
| assert type == EVT_NODE_JOINED || type == EVT_NODE_LEFT || type == EVT_NODE_FAILED : discoEvt; |
| |
| clientNodeEvt = CU.clientNode(discoEvt.eventNode()); |
| } |
| else { |
| assert discoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : discoEvt; |
| |
| boolean clientOnlyStart = true; |
| |
| for (DynamicCacheChangeRequest req : reqs) { |
| if (!req.clientStartOnly()) { |
| clientOnlyStart = false; |
| |
| break; |
| } |
| } |
| |
| clientNodeEvt = clientOnlyStart; |
| } |
| |
| if (clientNodeEvt) { |
| ClusterNode node = discoEvt.eventNode(); |
| |
| // Client need to initialize affinity for local join event or for stated client caches. |
| if (!node.isLocal()) { |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (cacheCtx.isLocal()) |
| continue; |
| |
| GridDhtPartitionTopology top = cacheCtx.topology(); |
| |
| top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); |
| |
| if (cacheCtx.affinity().affinityTopologyVersion() == AffinityTopologyVersion.NONE) { |
| initTopology(cacheCtx); |
| |
| top.beforeExchange(this); |
| } |
| else |
| cacheCtx.affinity().clientEventTopologyChange(discoEvt, exchId.topologyVersion()); |
| } |
| |
| if (exchId.isLeft()) |
| cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); |
| |
| onDone(exchId.topologyVersion()); |
| |
| skipPreload = cctx.kernalContext().clientNode(); |
| |
| return; |
| } |
| } |
| |
| if (cctx.kernalContext().clientNode()) { |
| skipPreload = true; |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (cacheCtx.isLocal()) |
| continue; |
| |
| GridDhtPartitionTopology top = cacheCtx.topology(); |
| |
| top.updateTopologyVersion(exchId, this, -1, stopping(cacheCtx.cacheId())); |
| } |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (cacheCtx.isLocal()) |
| continue; |
| |
| initTopology(cacheCtx); |
| } |
| |
| if (oldestNode.get() != null) { |
| rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, |
| exchId.topologyVersion())); |
| |
| rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); |
| |
| ready.set(true); |
| |
| initFut.onDone(true); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Initialized future: " + this); |
| |
| sendPartitions(); |
| } |
| else |
| onDone(exchId.topologyVersion()); |
| |
| return; |
| } |
| |
| assert oldestNode.get() != null; |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (isCacheAdded(cacheCtx.cacheId(), exchId.topologyVersion())) { |
| if (cacheCtx.discovery().cacheAffinityNodes(cacheCtx.name(), topologyVersion()).isEmpty()) |
| U.quietAndWarn(log, "No server nodes found for cache client: " + cacheCtx.namex()); |
| } |
| |
| cacheCtx.preloader().onExchangeFutureAdded(); |
| } |
| |
| List<String> cachesWithoutNodes = null; |
| |
| if (exchId.isLeft()) { |
| for (String name : cctx.cache().cacheNames()) { |
| if (cctx.discovery().cacheAffinityNodes(name, topologyVersion()).isEmpty()) { |
| if (cachesWithoutNodes == null) |
| cachesWithoutNodes = new ArrayList<>(); |
| |
| cachesWithoutNodes.add(name); |
| |
| // Fire event even if there is no client cache started. |
| if (cctx.gridEvents().isRecordable(EventType.EVT_CACHE_NODES_LEFT)) { |
| Event evt = new CacheEvent( |
| name, |
| cctx.localNode(), |
| cctx.localNode(), |
| "All server nodes have left the cluster.", |
| EventType.EVT_CACHE_NODES_LEFT, |
| 0, |
| false, |
| null, |
| null, |
| null, |
| null, |
| false, |
| null, |
| false, |
| null, |
| null, |
| null |
| ); |
| |
| cctx.gridEvents().record(evt); |
| } |
| } |
| } |
| } |
| |
| if (cachesWithoutNodes != null) { |
| StringBuilder sb = |
| new StringBuilder("All server nodes for the following caches have left the cluster: "); |
| |
| for (int i = 0; i < cachesWithoutNodes.size(); i++) { |
| String cache = cachesWithoutNodes.get(i); |
| |
| sb.append('\'').append(cache).append('\''); |
| |
| if (i != cachesWithoutNodes.size() - 1) |
| sb.append(", "); |
| } |
| |
| U.quietAndWarn(log, sb.toString()); |
| |
| U.quietAndWarn(log, "Must have server nodes for caches to operate."); |
| } |
| |
| assert discoEvt != null; |
| |
| assert exchId.nodeId().equals(discoEvt.eventNode().id()); |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology( |
| cacheCtx.cacheId()); |
| |
| long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence(); |
| |
| // Update before waiting for locks. |
| if (!cacheCtx.isLocal()) |
| cacheCtx.topology().updateTopologyVersion(exchId, this, updSeq, stopping(cacheCtx.cacheId())); |
| } |
| |
| // Grab all alive remote nodes with order of equal or less than last joined node. |
| rmtNodes = new ConcurrentLinkedQueue<>(CU.aliveRemoteServerNodesWithCaches(cctx, |
| exchId.topologyVersion())); |
| |
| rmtIds = Collections.unmodifiableSet(new HashSet<>(F.nodeIds(rmtNodes))); |
| |
| for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet()) |
| // If received any messages, process them. |
| onReceive(m.getKey(), m.getValue()); |
| |
| for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) |
| // If received any messages, process them. |
| onReceive(m.getKey(), m.getValue()); |
| |
| AffinityTopologyVersion topVer = exchId.topologyVersion(); |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (cacheCtx.isLocal()) |
| continue; |
| |
| // Must initialize topology after we get discovery event. |
| initTopology(cacheCtx); |
| |
| cacheCtx.preloader().updateLastExchangeFuture(this); |
| } |
| |
| IgniteInternalFuture<?> partReleaseFut = cctx.partitionReleaseFuture(topVer); |
| |
| // Assign to class variable so it will be included into toString() method. |
| this.partReleaseFut = partReleaseFut; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Before waiting for partition release future: " + this); |
| |
| while (true) { |
| try { |
| partReleaseFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); |
| |
| break; |
| } |
| catch (IgniteFutureTimeoutCheckedException ignored) { |
| // Print pending transactions and locks that might have led to hang. |
| dumpPendingObjects(); |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("After waiting for partition release future: " + this); |
| |
| if (!F.isEmpty(reqs)) |
| blockGateways(); |
| |
| if (exchId.isLeft()) |
| cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion()); |
| |
| IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion()); |
| |
| while (true) { |
| try { |
| locksFut.get(2 * cctx.gridConfig().getNetworkTimeout(), TimeUnit.MILLISECONDS); |
| |
| break; |
| } |
| catch (IgniteFutureTimeoutCheckedException ignored) { |
| U.warn(log, "Failed to wait for locks release future. " + |
| "Dumping pending objects that might be the cause: " + cctx.localNodeId()); |
| |
| U.warn(log, "Locked entries:"); |
| |
| Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks = |
| cctx.mvcc().unfinishedLocks(exchId.topologyVersion()); |
| |
| for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet()) |
| U.warn(log, "Locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']'); |
| } |
| } |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (cacheCtx.isLocal()) |
| continue; |
| |
| // Notify replication manager. |
| GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx; |
| |
| if (drCacheCtx.isDrEnabled()) |
| drCacheCtx.dr().beforeExchange(topVer, exchId.isLeft()); |
| |
| // Partition release future is done so we can flush the write-behind store. |
| cacheCtx.store().forceFlush(); |
| |
| // Process queued undeploys prior to sending/spreading map. |
| cacheCtx.preloader().unwindUndeploys(); |
| |
| GridDhtPartitionTopology top = cacheCtx.topology(); |
| |
| assert topVer.equals(top.topologyVersion()) : |
| "Topology version is updated only in this class instances inside single ExchangeWorker thread."; |
| |
| top.beforeExchange(this); |
| } |
| |
| for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) { |
| top.updateTopologyVersion(exchId, this, -1, stopping(top.cacheId())); |
| |
| top.beforeExchange(this); |
| } |
| } |
| catch (IgniteInterruptedCheckedException e) { |
| onDone(e); |
| |
| throw e; |
| } |
| catch (Throwable e) { |
| U.error(log, "Failed to reinitialize local partitions (preloading will be stopped): " + exchId, e); |
| |
| onDone(e); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| |
| return; |
| } |
| |
| if (F.isEmpty(rmtIds)) { |
| onDone(exchId.topologyVersion()); |
| |
| return; |
| } |
| |
| ready.set(true); |
| |
| initFut.onDone(true); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Initialized future: " + this); |
| |
| // If this node is not oldest. |
| if (!oldestNode.get().id().equals(cctx.localNodeId())) |
| sendPartitions(); |
| else { |
| boolean allReceived = allReceived(); |
| |
| if (allReceived && replied.compareAndSet(false, true)) { |
| if (spreadPartitions()) |
| onDone(exchId.topologyVersion()); |
| } |
| } |
| |
| scheduleRecheck(); |
| } |
| else |
| assert false : "Skipped init future: " + this; |
| } |
| |
| /** |
| * |
| */ |
| private void dumpPendingObjects() { |
| U.warn(log, "Failed to wait for partition release future. Dumping pending objects that might be the cause: " + |
| cctx.localNodeId()); |
| |
| U.warn(log, "Pending transactions:"); |
| |
| for (IgniteInternalTx tx : cctx.tm().activeTransactions()) |
| U.warn(log, ">>> " + tx); |
| |
| U.warn(log, "Pending explicit locks:"); |
| |
| for (GridCacheExplicitLockSpan lockSpan : cctx.mvcc().activeExplicitLocks()) |
| U.warn(log, ">>> " + lockSpan); |
| |
| U.warn(log, "Pending cache futures:"); |
| |
| for (GridCacheFuture<?> fut : cctx.mvcc().activeFutures()) |
| U.warn(log, ">>> " + fut); |
| |
| U.warn(log, "Pending atomic cache futures:"); |
| |
| for (GridCacheFuture<?> fut : cctx.mvcc().atomicFutures()) |
| U.warn(log, ">>> " + fut); |
| } |
| |
| /** |
| * @param cacheId Cache ID to check. |
| * @return {@code True} if cache is stopping by this exchange. |
| */ |
| private boolean stopping(int cacheId) { |
| boolean stopping = false; |
| |
| if (!F.isEmpty(reqs)) { |
| for (DynamicCacheChangeRequest req : reqs) { |
| if (cacheId == CU.cacheId(req.cacheName())) { |
| stopping = req.stop(); |
| |
| break; |
| } |
| } |
| } |
| |
| return stopping; |
| } |
| |
| /** |
| * Starts dynamic caches. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void startCaches() throws IgniteCheckedException { |
| cctx.cache().prepareCachesStart(F.view(reqs, new IgnitePredicate<DynamicCacheChangeRequest>() { |
| @Override public boolean apply(DynamicCacheChangeRequest req) { |
| return req.start(); |
| } |
| }), exchId.topologyVersion()); |
| } |
| |
| /** |
| * |
| */ |
| private void blockGateways() { |
| for (DynamicCacheChangeRequest req : reqs) { |
| if (req.stop()) |
| cctx.cache().blockGateway(req); |
| } |
| } |
| |
| /** |
| * @param node Node. |
| * @param id ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void sendLocalPartitions(ClusterNode node, @Nullable GridDhtPartitionExchangeId id) throws IgniteCheckedException { |
| GridDhtPartitionsSingleMessage m = new GridDhtPartitionsSingleMessage(id, |
| cctx.kernalContext().clientNode(), |
| cctx.versions().last()); |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (!cacheCtx.isLocal()) |
| m.addLocalPartitionMap(cacheCtx.cacheId(), cacheCtx.topology().localPartitionMap()); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + m + ']'); |
| |
| cctx.io().send(node, m, SYSTEM_POOL); |
| } |
| |
| /** |
| * @param nodes Nodes. |
| * @param id ID. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void sendAllPartitions(Collection<? extends ClusterNode> nodes, GridDhtPartitionExchangeId id) |
| throws IgniteCheckedException { |
| GridDhtPartitionsFullMessage m = new GridDhtPartitionsFullMessage(id, |
| lastVer.get(), |
| id.topologyVersion()); |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (!cacheCtx.isLocal()) { |
| AffinityTopologyVersion startTopVer = cacheCtx.startTopologyVersion(); |
| |
| boolean ready = startTopVer == null || startTopVer.compareTo(id.topologyVersion()) <= 0; |
| |
| if (ready) |
| m.addFullPartitionsMap(cacheCtx.cacheId(), cacheCtx.topology().partitionMap(true)); |
| } |
| } |
| |
| // It is important that client topologies be added after contexts. |
| for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) |
| m.addFullPartitionsMap(top.cacheId(), top.partitionMap(true)); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) + |
| ", exchId=" + exchId + ", msg=" + m + ']'); |
| |
| cctx.io().safeSend(nodes, m, SYSTEM_POOL, null); |
| } |
| |
| /** |
| * |
| */ |
| private void sendPartitions() { |
| ClusterNode oldestNode = this.oldestNode.get(); |
| |
| try { |
| sendLocalPartitions(oldestNode, exchId); |
| } |
| catch (ClusterTopologyCheckedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Oldest node left during partition exchange [nodeId=" + oldestNode.id() + |
| ", exchId=" + exchId + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| scheduleRecheck(); |
| |
| U.error(log, "Failed to send local partitions to oldest node (will retry after timeout) [oldestNodeId=" + |
| oldestNode.id() + ", exchId=" + exchId + ']', e); |
| } |
| } |
| |
| /** |
| * @return {@code True} if succeeded. |
| */ |
| private boolean spreadPartitions() { |
| try { |
| sendAllPartitions(rmtNodes, exchId); |
| |
| return true; |
| } |
| catch (IgniteCheckedException e) { |
| scheduleRecheck(); |
| |
| if (!X.hasCause(e, InterruptedException.class)) |
| U.error(log, "Failed to send full partition map to nodes (will retry after timeout) [nodes=" + |
| F.nodeId8s(rmtNodes) + ", exchangeId=" + exchId + ']', e); |
| |
| return false; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(AffinityTopologyVersion res, Throwable err) { |
| Map<Integer, Boolean> m = null; |
| |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (cacheCtx.config().getTopologyValidator() != null && !CU.isSystemCache(cacheCtx.name())) { |
| if (m == null) |
| m = new HashMap<>(); |
| |
| m.put(cacheCtx.cacheId(), cacheCtx.config().getTopologyValidator().validate(discoEvt.topologyNodes())); |
| } |
| } |
| |
| cacheValidRes = m != null ? m : Collections.<Integer, Boolean>emptyMap(); |
| |
| cctx.cache().onExchangeDone(exchId.topologyVersion(), reqs, err); |
| |
| cctx.exchange().onExchangeDone(this, err); |
| |
| if (super.onDone(res, err) && !dummy && !forcePreload) { |
| if (log.isDebugEnabled()) |
| log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this + ']'); |
| |
| initFut.onDone(err == null); |
| |
| GridTimeoutObject timeoutObj = this.timeoutObj; |
| |
| // Deschedule timeout object. |
| if (timeoutObj != null) |
| cctx.kernalContext().timeout().removeTimeoutObject(timeoutObj); |
| |
| if (exchId.isLeft()) { |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) |
| cacheCtx.config().getAffinity().removeNode(exchId.nodeId()); |
| } |
| |
| return true; |
| } |
| |
| return dummy; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isCacheTopologyValid(GridCacheContext cctx) { |
| return cctx.config().getTopologyValidator() != null && cacheValidRes.containsKey(cctx.cacheId()) ? |
| cacheValidRes.get(cctx.cacheId()) : true; |
| } |
| |
| /** |
| * Cleans up resources to avoid excessive memory usage. |
| */ |
| public void cleanUp() { |
| topSnapshot.set(null); |
| singleMsgs.clear(); |
| fullMsgs.clear(); |
| rcvdIds.clear(); |
| oldestNode.set(null); |
| partReleaseFut = null; |
| |
| Collection<ClusterNode> rmtNodes = this.rmtNodes; |
| |
| if (rmtNodes != null) |
| rmtNodes.clear(); |
| } |
| |
| /** |
| * @return {@code True} if all replies are received. |
| */ |
| private boolean allReceived() { |
| Collection<UUID> rmtIds = this.rmtIds; |
| |
| assert rmtIds != null : "Remote Ids can't be null: " + this; |
| |
| synchronized (rcvdIds) { |
| return rcvdIds.containsAll(rmtIds); |
| } |
| } |
| |
| /** |
| * @param nodeId Sender node id. |
| * @param msg Single partition info. |
| */ |
| public void onReceive(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) { |
| assert msg != null; |
| |
| assert msg.exchangeId().equals(exchId); |
| |
| // Update last seen version. |
| while (true) { |
| GridCacheVersion old = lastVer.get(); |
| |
| if (old == null || old.compareTo(msg.lastVersion()) < 0) { |
| if (lastVer.compareAndSet(old, msg.lastVersion())) |
| break; |
| } |
| else |
| break; |
| } |
| |
| if (isDone()) { |
| if (log.isDebugEnabled()) |
| log.debug("Received message for finished future (will reply only to sender) [msg=" + msg + |
| ", fut=" + this + ']'); |
| |
| sendAllPartitions(nodeId, cctx.gridConfig().getNetworkSendRetryCount()); |
| } |
| else { |
| initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> t) { |
| try { |
| if (!t.get()) // Just to check if there was an error. |
| return; |
| |
| ClusterNode loc = cctx.localNode(); |
| |
| singleMsgs.put(nodeId, msg); |
| |
| boolean match = true; |
| |
| // Check if oldest node has changed. |
| if (!oldestNode.get().equals(loc)) { |
| match = false; |
| |
| synchronized (mux) { |
| // Double check. |
| if (oldestNode.get().equals(loc)) |
| match = true; |
| } |
| } |
| |
| if (match) { |
| boolean allReceived; |
| |
| synchronized (rcvdIds) { |
| if (rcvdIds.add(nodeId)) |
| updatePartitionSingleMap(msg); |
| |
| allReceived = allReceived(); |
| } |
| |
| // If got all replies, and initialization finished, and reply has not been sent yet. |
| if (allReceived && ready.get() && replied.compareAndSet(false, true)) { |
| spreadPartitions(); |
| |
| onDone(exchId.topologyVersion()); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Exchange future full map is not sent [allReceived=" + allReceived() + |
| ", ready=" + ready + ", replied=" + replied.get() + ", init=" + init.get() + |
| ", fut=" + this + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to initialize exchange future: " + this, e); |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param retryCnt Number of retries. |
| */ |
| private void sendAllPartitions(final UUID nodeId, final int retryCnt) { |
| ClusterNode n = cctx.node(nodeId); |
| |
| try { |
| if (n != null) |
| sendAllPartitions(F.asList(n), exchId); |
| } |
| catch (IgniteCheckedException e) { |
| if (e instanceof ClusterTopologyCheckedException || !cctx.discovery().alive(n)) { |
| log.debug("Failed to send full partition map to node, node left grid " + |
| "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); |
| |
| return; |
| } |
| |
| if (retryCnt > 0) { |
| long timeout = cctx.gridConfig().getNetworkSendRetryDelay(); |
| |
| LT.error(log, e, "Failed to send full partition map to node (will retry after timeout) " + |
| "[node=" + nodeId + ", exchangeId=" + exchId + ", timeout=" + timeout + ']'); |
| |
| cctx.time().addTimeoutObject(new GridTimeoutObjectAdapter(timeout) { |
| @Override public void onTimeout() { |
| sendAllPartitions(nodeId, retryCnt - 1); |
| } |
| }); |
| } |
| else |
| U.error(log, "Failed to send full partition map [node=" + n + ", exchangeId=" + exchId + ']', e); |
| } |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param msg Full partition info. |
| */ |
| public void onReceive(final UUID nodeId, final GridDhtPartitionsFullMessage msg) { |
| assert msg != null; |
| |
| if (isDone()) { |
| if (log.isDebugEnabled()) |
| log.debug("Received message for finished future [msg=" + msg + ", fut=" + this + ']'); |
| |
| return; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received full partition map from node [nodeId=" + nodeId + ", msg=" + msg + ']'); |
| |
| assert exchId.topologyVersion().equals(msg.topologyVersion()); |
| |
| initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> t) { |
| ClusterNode curOldest = oldestNode.get(); |
| |
| if (!nodeId.equals(curOldest.id())) { |
| if (log.isDebugEnabled()) |
| log.debug("Received full partition map from unexpected node [oldest=" + curOldest.id() + |
| ", unexpectedNodeId=" + nodeId + ']'); |
| |
| ClusterNode snd = cctx.discovery().node(nodeId); |
| |
| if (snd == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Sender node left grid, will ignore message from unexpected node [nodeId=" + nodeId + |
| ", exchId=" + msg.exchangeId() + ']'); |
| |
| return; |
| } |
| |
| // Will process message later if sender node becomes oldest node. |
| if (snd.order() > curOldest.order()) |
| fullMsgs.put(nodeId, msg); |
| |
| return; |
| } |
| |
| assert msg.exchangeId().equals(exchId); |
| |
| assert msg.lastVersion() != null; |
| |
| cctx.versions().onReceived(nodeId, msg.lastVersion()); |
| |
| updatePartitionFullMap(msg); |
| |
| onDone(exchId.topologyVersion()); |
| } |
| }); |
| } |
| |
| /** |
| * Updates partition map in all caches. |
| * |
| * @param msg Partitions full messages. |
| */ |
| private void updatePartitionFullMap(GridDhtPartitionsFullMessage msg) { |
| for (Map.Entry<Integer, GridDhtPartitionFullMap> entry : msg.partitions().entrySet()) { |
| Integer cacheId = entry.getKey(); |
| |
| GridCacheContext cacheCtx = cctx.cacheContext(cacheId); |
| |
| if (cacheCtx != null) |
| cacheCtx.topology().update(exchId, entry.getValue()); |
| else { |
| ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx, AffinityTopologyVersion.NONE); |
| |
| if (oldest != null && oldest.isLocal()) |
| cctx.exchange().clientTopology(cacheId, this).update(exchId, entry.getValue()); |
| } |
| } |
| } |
| |
| /** |
| * Updates partition map in all caches. |
| * |
| * @param msg Partitions single message. |
| */ |
| private void updatePartitionSingleMap(GridDhtPartitionsSingleMessage msg) { |
| for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) { |
| Integer cacheId = entry.getKey(); |
| GridCacheContext cacheCtx = cctx.cacheContext(cacheId); |
| |
| GridDhtPartitionTopology top = cacheCtx != null ? cacheCtx.topology() : |
| cctx.exchange().clientTopology(cacheId, this); |
| |
| top.update(exchId, entry.getValue()); |
| } |
| } |
| |
| /** |
| * @param nodeId Left node id. |
| */ |
| public void onNodeLeft(final UUID nodeId) { |
| if (isDone()) |
| return; |
| |
| if (!enterBusy()) |
| return; |
| |
| try { |
| // Wait for initialization part of this future to complete. |
| initFut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| if (isDone()) |
| return; |
| |
| if (!enterBusy()) |
| return; |
| |
| try { |
| // Pretend to have received message from this node. |
| rcvdIds.add(nodeId); |
| |
| Collection<UUID> rmtIds = GridDhtPartitionsExchangeFuture.this.rmtIds; |
| |
| assert rmtIds != null; |
| |
| ClusterNode oldest = oldestNode.get(); |
| |
| if (oldest.id().equals(nodeId)) { |
| if (log.isDebugEnabled()) |
| log.debug("Oldest node left or failed on partition exchange " + |
| "(will restart exchange process)) [oldestNodeId=" + oldest.id() + |
| ", exchangeId=" + exchId + ']'); |
| |
| boolean set = false; |
| |
| ClusterNode newOldest = CU.oldestAliveCacheServerNode(cctx, exchId.topologyVersion()); |
| |
| if (newOldest != null) { |
| // If local node is now oldest. |
| if (newOldest.id().equals(cctx.localNodeId())) { |
| synchronized (mux) { |
| if (oldestNode.compareAndSet(oldest, newOldest)) { |
| // If local node is just joining. |
| if (exchId.nodeId().equals(cctx.localNodeId())) { |
| try { |
| for (GridCacheContext cacheCtx : cctx.cacheContexts()) { |
| if (!cacheCtx.isLocal()) |
| cacheCtx.topology().beforeExchange( |
| GridDhtPartitionsExchangeFuture.this); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| onDone(e); |
| |
| return; |
| } |
| } |
| |
| set = true; |
| } |
| } |
| } |
| else { |
| synchronized (mux) { |
| set = oldestNode.compareAndSet(oldest, newOldest); |
| } |
| |
| if (set && log.isDebugEnabled()) |
| log.debug("Reassigned oldest node [this=" + cctx.localNodeId() + |
| ", old=" + oldest.id() + ", new=" + newOldest.id() + ']'); |
| } |
| } |
| |
| if (set) { |
| // If received any messages, process them. |
| for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : singleMsgs.entrySet()) |
| onReceive(m.getKey(), m.getValue()); |
| |
| for (Map.Entry<UUID, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) |
| onReceive(m.getKey(), m.getValue()); |
| |
| // Reassign oldest node and resend. |
| recheck(); |
| } |
| } |
| else if (rmtIds.contains(nodeId)) { |
| if (log.isDebugEnabled()) |
| log.debug("Remote node left of failed during partition exchange (will ignore) " + |
| "[rmtNode=" + nodeId + ", exchangeId=" + exchId + ']'); |
| |
| assert rmtNodes != null; |
| |
| for (Iterator<ClusterNode> it = rmtNodes.iterator(); it.hasNext(); ) { |
| if (it.next().id().equals(nodeId)) |
| it.remove(); |
| } |
| |
| if (allReceived() && ready.get() && replied.compareAndSet(false, true)) |
| if (spreadPartitions()) |
| onDone(exchId.topologyVersion()); |
| } |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| }); |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void recheck() { |
| // If this is the oldest node. |
| if (oldestNode.get().id().equals(cctx.localNodeId())) { |
| Collection<UUID> remaining = remaining(); |
| |
| if (!remaining.isEmpty()) { |
| try { |
| cctx.io().safeSend(cctx.discovery().nodes(remaining), |
| new GridDhtPartitionsSingleRequest(exchId), SYSTEM_POOL, null); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to request partitions from nodes [exchangeId=" + exchId + |
| ", nodes=" + remaining + ']', e); |
| } |
| } |
| // Resend full partition map because last attempt failed. |
| else { |
| if (spreadPartitions()) |
| onDone(exchId.topologyVersion()); |
| } |
| } |
| else |
| sendPartitions(); |
| |
| // Schedule another send. |
| scheduleRecheck(); |
| } |
| |
| /** |
| * |
| */ |
| private void scheduleRecheck() { |
| if (!isDone()) { |
| GridTimeoutObject old = timeoutObj; |
| |
| if (old != null) |
| cctx.kernalContext().timeout().removeTimeoutObject(old); |
| |
| GridTimeoutObject timeoutObj = new GridTimeoutObjectAdapter( |
| cctx.gridConfig().getNetworkTimeout() * Math.max(1, cctx.gridConfig().getCacheConfiguration().length)) { |
| @Override public void onTimeout() { |
| cctx.kernalContext().closure().runLocalSafe(new Runnable() { |
| @Override public void run() { |
| if (isDone()) |
| return; |
| |
| if (!enterBusy()) |
| return; |
| |
| try { |
| U.warn(log, |
| "Retrying preload partition exchange due to timeout [done=" + isDone() + |
| ", dummy=" + dummy + ", exchId=" + exchId + ", rcvdIds=" + F.id8s(rcvdIds) + |
| ", rmtIds=" + F.id8s(rmtIds) + ", remaining=" + F.id8s(remaining()) + |
| ", init=" + init + ", initFut=" + initFut.isDone() + |
| ", ready=" + ready + ", replied=" + replied + ", added=" + added + |
| ", oldest=" + U.id8(oldestNode.get().id()) + ", oldestOrder=" + |
| oldestNode.get().order() + ", evtLatch=" + evtLatch.getCount() + |
| ", locNodeOrder=" + cctx.localNode().order() + |
| ", locNodeId=" + cctx.localNode().id() + ']', |
| "Retrying preload partition exchange due to timeout."); |
| |
| recheck(); |
| } |
| finally { |
| leaveBusy(); |
| } |
| } |
| }); |
| } |
| }; |
| |
| this.timeoutObj = timeoutObj; |
| |
| cctx.kernalContext().timeout().addTimeoutObject(timeoutObj); |
| } |
| } |
| |
| /** |
| * @return Remaining node IDs. |
| */ |
| Collection<UUID> remaining() { |
| if (rmtIds == null) |
| return Collections.emptyList(); |
| |
| return F.lose(rmtIds, true, rcvdIds); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int compareTo(GridDhtPartitionsExchangeFuture fut) { |
| return exchId.compareTo(fut.exchId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean equals(Object o) { |
| if (this == o) |
| return true; |
| |
| GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)o; |
| |
| return exchId.equals(fut.exchId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int hashCode() { |
| return exchId.hashCode(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| ClusterNode oldestNode = this.oldestNode.get(); |
| |
| return S.toString(GridDhtPartitionsExchangeFuture.class, this, |
| "oldest", oldestNode == null ? "null" : oldestNode.id(), |
| "oldestOrder", oldestNode == null ? "null" : oldestNode.order(), |
| "evtLatch", evtLatch == null ? "null" : evtLatch.getCount(), |
| "remaining", remaining(), |
| "super", super.toString()); |
| } |
| } |