blob: 7e31b901a759ad1e0394678e94019939bbf825d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteDiagnosticAware;
import org.apache.ignite.internal.IgniteDiagnosticPrepareContext;
import org.apache.ignite.internal.IgniteFutureTimeoutCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.IgniteNeedReconnectException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.communication.GridIoPolicy;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
import org.apache.ignite.internal.pagemem.wal.record.ExchangeRecord;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheAffinityChangeMessage;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.CacheGroupDescriptor;
import org.apache.ignite.internal.processors.cache.CachePartitionExchangeWorkerTask;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeFailureMessage;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeRequest;
import org.apache.ignite.internal.processors.cache.DynamicCacheDescriptor;
import org.apache.ignite.internal.processors.cache.ExchangeActions;
import org.apache.ignite.internal.processors.cache.ExchangeContext;
import org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate;
import org.apache.ignite.internal.processors.cache.GridCacheProcessor;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.StateChangeRequest;
import org.apache.ignite.internal.processors.cache.WalStateAbstractMessage;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFutureAdapter;
import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.latch.Latch;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridClientPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionsStateValidator;
import org.apache.ignite.internal.processors.cache.persistence.DatabaseLifecycleListener;
import org.apache.ignite.internal.processors.cache.persistence.snapshot.SnapshotDiscoveryMessage;
import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cluster.BaselineTopology;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.lang.IgniteRunnable;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverJoinEvent;
import static org.apache.ignite.internal.processors.cache.ExchangeDiscoveryEvents.serverLeftEvent;
import static org.apache.ignite.internal.processors.cache.distributed.dht.preloader.CachePartitionPartialCountersMap.PARTIAL_COUNTERS_MAP_SINCE;
import static org.apache.ignite.internal.util.IgniteUtils.doInParallel;
/**
* Future for exchanging partition maps.
*/
@SuppressWarnings({"TypeMayBeWeakened", "unchecked"})
public class GridDhtPartitionsExchangeFuture extends GridDhtTopologyFutureAdapter
implements Comparable<GridDhtPartitionsExchangeFuture>, CachePartitionExchangeWorkerTask, IgniteDiagnosticAware {
/** */
public static final String EXCHANGE_LOG = "org.apache.ignite.internal.exchange.time";
/** */
private static final int RELEASE_FUTURE_DUMP_THRESHOLD =
IgniteSystemProperties.getInteger(IGNITE_PARTITION_RELEASE_FUTURE_DUMP_THRESHOLD, 0);
/** */
private static final IgniteProductVersion FORCE_AFF_REASSIGNMENT_SINCE = IgniteProductVersion.fromString("2.4.3");
/**
* This may be useful when per-entry (not per-cache based) partition policy is in use.
* See {@link IgniteSystemProperties#IGNITE_SKIP_PARTITION_SIZE_VALIDATION} for details.
* Default value is {@code false}.
*/
private static final boolean SKIP_PARTITION_SIZE_VALIDATION = Boolean.getBoolean(IgniteSystemProperties.IGNITE_SKIP_PARTITION_SIZE_VALIDATION);
/** */
private static final String DISTRIBUTED_LATCH_ID = "exchange";
/** */
@GridToStringExclude
private final Object mux = new Object();
/** */
@GridToStringExclude
private volatile DiscoCache firstEvtDiscoCache;
/** Discovery event triggered this exchange. */
private volatile DiscoveryEvent firstDiscoEvt;
/** */
@GridToStringExclude
private final Set<UUID> remaining = new HashSet<>();
/** Guarded by this */
@GridToStringExclude
private int pendingSingleUpdates;
/** */
@GridToStringExclude
private List<ClusterNode> srvNodes;
/** */
private volatile ClusterNode crd;
/** ExchangeFuture id. */
private final GridDhtPartitionExchangeId exchId;
/** Cache context. */
private final GridCacheSharedContext<?, ?> cctx;
/** Busy lock to prevent activities from accessing exchanger while it's stopping. */
private ReadWriteLock busyLock;
/** */
private AtomicBoolean added = new AtomicBoolean(false);
/**
* Discovery event receive latch. There is a race between discovery event processing and single message
* processing, so it is possible to create an exchange future before the actual discovery event is received.
* This latch is notified when the discovery event arrives.
*/
@GridToStringExclude
private final CountDownLatch evtLatch = new CountDownLatch(1);
/** Exchange future init method completes this future. */
private GridFutureAdapter<Boolean> initFut;
/** */
@GridToStringExclude
private final List<IgniteRunnable> discoEvts = new ArrayList<>();
/** */
private boolean init;
/** Last committed cache version before next topology version use. */
private AtomicReference<GridCacheVersion> lastVer = new AtomicReference<>();
/**
* Message received from node joining cluster (if this is 'node join' exchange),
* needed if this exchange is merged with another one.
*/
@GridToStringExclude
private GridDhtPartitionsSingleMessage pendingJoinMsg;
/**
* Messages received on non-coordinator are stored in case if this node
* becomes coordinator.
*/
private final Map<UUID, GridDhtPartitionsSingleMessage> pendingSingleMsgs = new ConcurrentHashMap<>();
/** Messages received from new coordinator. */
private final Map<ClusterNode, GridDhtPartitionsFullMessage> fullMsgs = new ConcurrentHashMap<>();
/** */
@GridToStringInclude
private volatile IgniteInternalFuture<?> partReleaseFut;
/** Logger. */
private final IgniteLogger log;
/** Cache change requests. */
private ExchangeActions exchActions;
/** */
private final IgniteLogger exchLog;
/** */
private CacheAffinityChangeMessage affChangeMsg;
/** Init timestamp. Used to track the amount of time spent to complete the future. */
private long initTs;
/**
* Centralized affinity assignment required. Activated for node left of failed. For this mode crd will send full
* partitions maps to nodes using discovery (ring) instead of communication.
*/
private boolean centralizedAff;
/**
* Enforce affinity reassignment based on actual partition distribution. This mode should be used when partitions
* might be distributed not according to affinity assignment.
*/
private boolean forceAffReassignment;
/** Exception that was thrown during init phase on local node. */
private Exception exchangeLocE;
/** Exchange exceptions from all participating nodes. */
private final Map<UUID, Exception> exchangeGlobalExceptions = new ConcurrentHashMap<>();
/** Used to track the fact that {@link DynamicCacheChangeFailureMessage} was sent. */
private volatile boolean cacheChangeFailureMsgSent;
/** */
private ConcurrentMap<UUID, GridDhtPartitionsSingleMessage> msgs = new ConcurrentHashMap<>();
/** Single messages from merged 'node join' exchanges. */
@GridToStringExclude
private Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs;
/** Number of awaited messages for merged 'node join' exchanges. */
@GridToStringExclude
private int awaitMergedMsgs;
/** */
@GridToStringExclude
private volatile IgniteDhtPartitionHistorySuppliersMap partHistSuppliers = new IgniteDhtPartitionHistorySuppliersMap();
/** */
private volatile Map<Integer, Map<Integer, Long>> partHistReserved;
/** */
@GridToStringExclude
private final IgniteDhtPartitionsToReloadMap partsToReload = new IgniteDhtPartitionsToReloadMap();
/** */
private final AtomicBoolean done = new AtomicBoolean();
/** */
private ExchangeLocalState state;
/** */
@GridToStringExclude
private ExchangeContext exchCtx;
/** */
@GridToStringExclude
private FinishState finishState;
/** Initialized when node becomes new coordinator. */
@GridToStringExclude
private InitNewCoordinatorFuture newCrdFut;
/** */
@GridToStringExclude
private GridDhtPartitionsExchangeFuture mergedWith;
/** Validator for partition states. */
@GridToStringExclude
private final GridDhtPartitionsStateValidator validator;
/** Register caches future. Initialized on exchange init. Must be waited on exchange end. */
private IgniteInternalFuture<?> registerCachesFuture;
/** Partitions sent flag (for coordinator node). */
private volatile boolean partitionsSent;
/** Partitions received flag (for non-coordinator node). */
private volatile boolean partitionsReceived;
/** Latest (by update sequences) full message with exchangeId == null, need to be processed right after future is done. */
private GridDhtPartitionsFullMessage delayedLatestMsg;
/** Future for wait all exchange listeners comepleted. */
private final GridFutureAdapter<?> afterLsnrCompleteFut = new GridFutureAdapter<>();
/**
* @param cctx Cache context.
* @param busyLock Busy lock.
* @param exchId Exchange ID.
* @param exchActions Cache change requests.
* @param affChangeMsg Affinity change message.
*/
public GridDhtPartitionsExchangeFuture(
GridCacheSharedContext cctx,
ReadWriteLock busyLock,
GridDhtPartitionExchangeId exchId,
ExchangeActions exchActions,
CacheAffinityChangeMessage affChangeMsg
) {
assert busyLock != null;
assert exchId != null;
assert exchId.topologyVersion() != null;
assert exchActions == null || !exchActions.empty();
this.cctx = cctx;
this.busyLock = busyLock;
this.exchId = exchId;
this.exchActions = exchActions;
this.affChangeMsg = affChangeMsg;
this.validator = new GridDhtPartitionsStateValidator(cctx);
if (exchActions != null && exchActions.deactivate())
this.clusterIsActive = false;
log = cctx.logger(getClass());
exchLog = cctx.logger(EXCHANGE_LOG);
initFut = new GridFutureAdapter<Boolean>() {
@Override public IgniteLogger logger() {
return log;
}
};
if (log.isDebugEnabled())
log.debug("Creating exchange future [localNode=" + cctx.localNodeId() + ", fut=" + this + ']');
}
/**
* @return Future mutex.
*/
public Object mutex() {
return mux;
}
/**
* @return Shared cache context.
*/
public GridCacheSharedContext sharedContext() {
return cctx;
}
/** {@inheritDoc} */
@Override public boolean skipForExchangeMerge() {
return false;
}
/**
* @return Exchange context.
*/
public ExchangeContext context() {
assert exchCtx != null : this;
return exchCtx;
}
/**
* Sets exchange actions associated with the exchange future (such as cache start or stop).
* Exchange actions is created from discovery event, so the actions must be set before the event is processed,
* thus the setter requires that {@code evtLatch} be armed.
*
* @param exchActions Exchange actions.
*/
public void exchangeActions(ExchangeActions exchActions) {
assert exchActions == null || !exchActions.empty() : exchActions;
assert evtLatch != null && evtLatch.getCount() == 1L : this;
this.exchActions = exchActions;
}
/**
* Gets exchanges actions (such as cache start or stop) associated with the exchange future.
* Exchange actions can be {@code null} (for example, if the exchange is created for topology
* change event).
*
* @return Exchange actions.
*/
@Nullable public ExchangeActions exchangeActions() {
return exchActions;
}
/**
* Sets affinity change message associated with the exchange. Affinity change message is required when
* centralized affinity change is performed.
*
* @param affChangeMsg Affinity change message.
*/
public void affinityChangeMessage(CacheAffinityChangeMessage affChangeMsg) {
this.affChangeMsg = affChangeMsg;
}
/**
* Gets the affinity topology version for which this exchange was created. If several exchanges
* were merged, initial version is the version of the earliest merged exchange.
*
* @return Initial exchange version.
*/
@Override public AffinityTopologyVersion initialVersion() {
return exchId.topologyVersion();
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
/*
Should not be called before exchange is finished since result version can change in
case of merged exchanges.
*/
assert exchangeDone() : "Should not be called before exchange is finished";
return isDone() ? result() : exchCtx.events().topologyVersion();
}
/**
* Retreives the node which has WAL history since {@code cntrSince}.
*
* @param grpId Cache group ID.
* @param partId Partition ID.
* @param cntrSince Partition update counter since history supplying is requested.
* @return ID of history supplier node or null if it doesn't exist.
*/
@Nullable public UUID partitionHistorySupplier(int grpId, int partId, long cntrSince) {
return partHistSuppliers.getSupplier(grpId, partId, cntrSince);
}
/**
* @param cacheId Cache ID.
* @param rcvdFrom Node ID cache was received from.
* @return {@code True} if cache was added during this exchange.
*/
public boolean cacheAddedOnExchange(int cacheId, UUID rcvdFrom) {
return dynamicCacheStarted(cacheId) || exchCtx.events().nodeJoined(rcvdFrom);
}
/**
* @param grpId Cache group ID.
* @param rcvdFrom Node ID cache group was received from.
* @return {@code True} if cache group was added during this exchange.
*/
public boolean cacheGroupAddedOnExchange(int grpId, UUID rcvdFrom) {
return dynamicCacheGroupStarted(grpId) || exchCtx.events().nodeJoined(rcvdFrom);
}
/**
* @param cacheId Cache ID.
* @return {@code True} if non-client cache was added during this exchange.
*/
private boolean dynamicCacheStarted(int cacheId) {
return exchActions != null && exchActions.cacheStarted(cacheId);
}
/**
* @param grpId Cache group ID.
* @return {@code True} if non-client cache group was added during this exchange.
*/
public boolean dynamicCacheGroupStarted(int grpId) {
return exchActions != null && exchActions.cacheGroupStarting(grpId);
}
/**
* @return {@code True}
*/
public boolean onAdded() {
return added.compareAndSet(false, true);
}
/**
* Event callback.
*
* @param exchId Exchange ID.
* @param discoEvt Discovery event.
* @param discoCache Discovery data cache.
*/
public void onEvent(GridDhtPartitionExchangeId exchId, DiscoveryEvent discoEvt, DiscoCache discoCache) {
assert exchId.equals(this.exchId);
this.exchId.discoveryEvent(discoEvt);
this.firstDiscoEvt = discoEvt;
this.firstEvtDiscoCache = discoCache;
evtLatch.countDown();
}
/**
* @return {@code True} if cluster state change exchange.
*/
private boolean stateChangeExchange() {
return exchActions != null && exchActions.stateChangeRequest() != null;
}
/**
* @return {@code True} if this exchange was triggered by DynamicCacheChangeBatch message
* in order to start cache(s).
*/
private boolean dynamicCacheStartExchange() {
return exchActions != null && !exchActions.cacheStartRequests().isEmpty()
&& exchActions.cacheStopRequests().isEmpty();
}
/**
* @param cacheOrGroupName Group or cache name for reset lost partitions.
* @return {@code True} if reset lost partition exchange.
*/
public boolean resetLostPartitionFor(String cacheOrGroupName) {
return exchActions != null && exchActions.cachesToResetLostPartitions().contains(cacheOrGroupName);
}
/**
* @return {@code True} if activate cluster exchange.
*/
public boolean activateCluster() {
return exchActions != null && exchActions.activate();
}
/**
* @return {@code True} if deactivate cluster exchange.
*/
private boolean deactivateCluster() {
return exchActions != null && exchActions.deactivate();
}
/** */
public boolean changedBaseline() {
return exchActions != null && exchActions.changedBaseline();
}
/** {@inheritDoc} */
@Override public boolean changedAffinity() {
DiscoveryEvent firstDiscoEvt0 = firstDiscoEvt;
assert firstDiscoEvt0 != null;
return firstDiscoEvt0.type() == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT
|| !firstDiscoEvt0.eventNode().isClient() || firstDiscoEvt0.eventNode().isLocal();
}
/**
* @return {@code True} if there are caches to start.
*/
public boolean hasCachesToStart() {
return exchActions != null && !exchActions.cacheStartRequests().isEmpty();
}
/**
* @return First event discovery event.
*
*/
public DiscoveryEvent firstEvent() {
return firstDiscoEvt;
}
/**
* @return Discovery cache for first event.
*/
public DiscoCache firstEventCache() {
return firstEvtDiscoCache;
}
/**
* @return Events processed in this exchange.
*/
public ExchangeDiscoveryEvents events() {
return exchCtx.events();
}
/**
* @return Exchange ID.
*/
public GridDhtPartitionExchangeId exchangeId() {
return exchId;
}
/**
* @return {@code true} if entered to busy state.
*/
private boolean enterBusy() {
if (busyLock.readLock().tryLock())
return true;
if (log.isDebugEnabled())
log.debug("Failed to enter busy state (exchanger is stopping): " + this);
return false;
}
/**
*
*/
private void leaveBusy() {
busyLock.readLock().unlock();
}
/**
* @param newCrd {@code True} if node become coordinator on this exchange.
* @throws IgniteCheckedException If failed.
*/
private void initCoordinatorCaches(boolean newCrd) throws IgniteCheckedException {
if (newCrd) {
IgniteInternalFuture<?> fut = cctx.affinity().initCoordinatorCaches(this, false);
if (fut != null) {
fut.get();
cctx.exchange().exchangerUpdateHeartbeat();
}
cctx.exchange().onCoordinatorInitialized();
cctx.exchange().exchangerUpdateHeartbeat();
}
}
/**
* Starts activity.
*
* @param newCrd {@code True} if node become coordinator on this exchange.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
public void init(boolean newCrd) throws IgniteInterruptedCheckedException {
if (isDone())
return;
assert !cctx.kernalContext().isDaemon();
initTs = U.currentTimeMillis();
cctx.exchange().exchangerBlockingSectionBegin();
try {
U.await(evtLatch);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
assert firstDiscoEvt != null : this;
assert exchId.nodeId().equals(firstDiscoEvt.eventNode().id()) : this;
try {
AffinityTopologyVersion topVer = initialVersion();
srvNodes = new ArrayList<>(firstEvtDiscoCache.serverNodes());
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
crd = srvNodes.isEmpty() ? null : srvNodes.get(0);
boolean crdNode = crd != null && crd.isLocal();
exchCtx = new ExchangeContext(crdNode, this);
cctx.exchange().exchangerBlockingSectionBegin();
assert state == null : state;
if (crdNode)
state = ExchangeLocalState.CRD;
else
state = cctx.kernalContext().clientNode() ? ExchangeLocalState.CLIENT : ExchangeLocalState.SRV;
if (exchLog.isInfoEnabled()) {
exchLog.info("Started exchange init [topVer=" + topVer +
", crd=" + crdNode +
", evt=" + IgniteUtils.gridEventName(firstDiscoEvt.type()) +
", evtNode=" + firstDiscoEvt.eventNode().id() +
", customEvt=" + (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT ? ((DiscoveryCustomEvent)firstDiscoEvt).customMessage() : null) +
", allowMerge=" + exchCtx.mergeExchanges() + ']');
}
ExchangeType exchange;
if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert !exchCtx.mergeExchanges();
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
forceAffReassignment = DiscoveryCustomEvent.requiresCentralizedAffinityAssignment(msg)
&& firstEventCache().minimumNodeVersion().compareToIgnoreTimestamp(FORCE_AFF_REASSIGNMENT_SINCE) >= 0;
if (msg instanceof ChangeGlobalStateMessage) {
assert exchActions != null && !exchActions.empty();
exchange = onClusterStateChangeRequest(crdNode);
}
else if (msg instanceof DynamicCacheChangeBatch) {
assert exchActions != null && !exchActions.empty();
exchange = onCacheChangeRequest(crdNode);
}
else if (msg instanceof SnapshotDiscoveryMessage) {
exchange = onCustomMessageNoAffinityChange(crdNode);
}
else if (msg instanceof WalStateAbstractMessage)
exchange = onCustomMessageNoAffinityChange(crdNode);
else {
assert affChangeMsg != null : this;
exchange = onAffinityChangeRequest(crdNode);
}
if (forceAffReassignment)
cctx.affinity().onCentralizedAffinityChange(this, crdNode);
initCoordinatorCaches(newCrd);
}
else {
if (firstDiscoEvt.type() == EVT_NODE_JOINED) {
if (!firstDiscoEvt.eventNode().isLocal()) {
Collection<DynamicCacheDescriptor> receivedCaches = cctx.cache().startReceivedCaches(
firstDiscoEvt.eventNode().id(),
topVer);
registerCachesFuture = cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
else
registerCachesFuture = initCachesOnLocalJoin();
}
initCoordinatorCaches(newCrd);
if (exchCtx.mergeExchanges()) {
if (localJoinExchange()) {
if (cctx.kernalContext().clientNode()) {
onClientNodeEvent(crdNode);
exchange = ExchangeType.CLIENT;
}
else {
onServerNodeEvent(crdNode);
exchange = ExchangeType.ALL;
}
}
else {
if (firstDiscoEvt.eventNode().isClient())
exchange = onClientNodeEvent(crdNode);
else
exchange = cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
if (exchId.isLeft())
onLeft();
}
else {
exchange = firstDiscoEvt.eventNode().isClient() ? onClientNodeEvent(crdNode) :
onServerNodeEvent(crdNode);
}
}
cctx.cache().registrateProxyRestart(resolveCacheRequests(exchActions), afterLsnrCompleteFut);
updateTopologies(crdNode);
switch (exchange) {
case ALL: {
distributedExchange();
break;
}
case CLIENT: {
if (!exchCtx.mergeExchanges() && exchCtx.fetchAffinityOnJoin())
initTopologies();
clientOnlyExchange();
break;
}
case NONE: {
initTopologies();
onDone(topVer);
break;
}
default:
assert false;
}
if (cctx.localNode().isClient()) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
tryToPerformLocalSnapshotOperation();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
if (exchLog.isInfoEnabled())
exchLog.info("Finished exchange init [topVer=" + topVer + ", crd=" + crdNode + ']');
}
catch (IgniteInterruptedCheckedException e) {
onDone(e);
throw e;
}
catch (IgniteNeedReconnectException e) {
onDone(e);
}
catch (Throwable e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else {
U.error(log, "Failed to reinitialize local partitions (rebalancing will be stopped): " + exchId, e);
onDone(e);
}
if (e instanceof Error)
throw (Error)e;
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private IgniteInternalFuture<?> initCachesOnLocalJoin() throws IgniteCheckedException {
if (!cctx.kernalContext().clientNode() && !isLocalNodeInBaseline()) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
List<DatabaseLifecycleListener> listeners = cctx.kernalContext().internalSubscriptionProcessor()
.getDatabaseListeners();
for (DatabaseLifecycleListener lsnr : listeners)
lsnr.onBaselineChange();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
cctx.activate();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
IgniteInternalFuture<?> cachesRegistrationFut = cctx.cache().startCachesOnLocalJoin(initialVersion(),
exchActions == null ? null : exchActions.localJoinContext());
if (!cctx.kernalContext().clientNode())
cctx.cache().shutdownNotFinishedRecoveryCaches();
ensureClientCachesStarted();
return cachesRegistrationFut;
}
/**
* Start client caches if absent.
*/
private void ensureClientCachesStarted() {
GridCacheProcessor cacheProcessor = cctx.cache();
Set<String> cacheNames = new HashSet<>(cacheProcessor.cacheNames());
List<CacheConfiguration> notStartedCacheConfigs = new ArrayList<>();
for (CacheConfiguration cCfg : cctx.gridConfig().getCacheConfiguration()) {
if (!cacheNames.contains(cCfg.getName()) && !GridCacheUtils.isCacheTemplateName(cCfg.getName()))
notStartedCacheConfigs.add(cCfg);
}
if (!notStartedCacheConfigs.isEmpty())
cacheProcessor.dynamicStartCaches(notStartedCacheConfigs, false, false, false);
}
/**
* @return {@code true} if local node is in baseline and {@code false} otherwise.
*/
private boolean isLocalNodeInBaseline() {
BaselineTopology topology = cctx.discovery().discoCache().state().baselineTopology();
return topology != null && topology.consistentIds().contains(cctx.localNode().consistentId());
}
/**
* @throws IgniteCheckedException If failed.
*/
private void initTopologies() throws IgniteCheckedException {
cctx.database().checkpointReadLock();
try {
if (crd != null) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false);
cctx.exchange().exchangerUpdateHeartbeat();
}
}
}
finally {
cctx.database().checkpointReadUnlock();
}
}
/**
* Updates topology versions and discovery caches on all topologies.
*
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
*/
private void updateTopologies(boolean crd) throws IgniteCheckedException {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
GridClientPartitionTopology clientTop = cctx.exchange().clearClientTopology(grp.groupId());
long updSeq = clientTop == null ? -1 : clientTop.lastUpdateSequence();
GridDhtPartitionTopology top = grp.topology();
if (crd) {
boolean updateTop = exchId.topologyVersion().equals(grp.localStartVersion());
if (updateTop && clientTop != null) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
top.update(null,
clientTop.partitionMap(true),
clientTop.fullUpdateCounters(),
Collections.emptySet(),
null,
null);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
top.updateTopologyVersion(
this,
events().discoveryCache(),
updSeq,
cacheGroupStopping(grp.groupId()));
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies()) {
top.updateTopologyVersion(this,
events().discoveryCache(),
-1,
cacheGroupStopping(top.groupId()));
}
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
/**
* @param crd Coordinator flag.
* @return Exchange type.
*/
private ExchangeType onClusterStateChangeRequest(boolean crd) {
assert exchActions != null && !exchActions.empty() : this;
StateChangeRequest req = exchActions.stateChangeRequest();
assert req != null : exchActions;
DiscoveryDataClusterState state = cctx.kernalContext().state().clusterState();
if (state.transitionError() != null)
exchangeLocE = state.transitionError();
if (req.activeChanged()) {
if (req.activate()) {
if (log.isInfoEnabled()) {
log.info("Start activation process [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]");
}
try {
cctx.exchange().exchangerBlockingSectionBegin();
try {
cctx.activate();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
cctx.exchange().exchangerBlockingSectionBegin();
try {
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
if (!cctx.kernalContext().clientNode())
cctx.cache().shutdownNotFinishedRecoveryCaches();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
if (log.isInfoEnabled()) {
log.info("Successfully activated caches [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]");
}
}
catch (Exception e) {
U.error(log, "Failed to activate node components [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]", e);
exchangeLocE = e;
if (crd) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
synchronized (mux) {
exchangeGlobalExceptions.put(cctx.localNodeId(), e);
}
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
}
else {
if (log.isInfoEnabled()) {
log.info("Start deactivation process [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]");
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
cctx.kernalContext().dataStructures().onDeActivate(cctx.kernalContext());
cctx.kernalContext().service().onDeActivate(cctx.kernalContext());
assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
cctx.kernalContext().encryption().onDeActivate(cctx.kernalContext());
if (log.isInfoEnabled()) {
log.info("Successfully deactivated data structures, services and caches [" +
"nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]");
}
}
catch (Exception e) {
U.error(log, "Failed to deactivate node components [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]", e);
exchangeLocE = e;
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
else if (req.activate()) {
cctx.exchange().exchangerBlockingSectionBegin();
// TODO: BLT changes on inactive cluster can't be handled easily because persistent storage hasn't been initialized yet.
try {
if (!forceAffReassignment) {
// possible only if cluster contains nodes without forceAffReassignment mode
assert firstEventCache().minimumNodeVersion()
.compareToIgnoreTimestamp(FORCE_AFF_REASSIGNMENT_SINCE) < 0
: firstEventCache().minimumNodeVersion();
cctx.affinity().onBaselineTopologyChanged(this, crd);
}
if (CU.isPersistenceEnabled(cctx.kernalContext().config()) && !cctx.kernalContext().clientNode())
cctx.kernalContext().state().onBaselineTopologyChanged(req.baselineTopology(),
req.prevBaselineTopologyHistoryItem());
}
catch (Exception e) {
U.error(log, "Failed to change baseline topology [nodeId=" + cctx.localNodeId() +
", client=" + cctx.kernalContext().clientNode() +
", topVer=" + initialVersion() + "]", e);
exchangeLocE = e;
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
/**
* @param crd Coordinator flag.
* @return Exchange type.
* @throws IgniteCheckedException If failed.
*/
private ExchangeType onCacheChangeRequest(boolean crd) throws IgniteCheckedException {
assert exchActions != null && !exchActions.empty() : this;
assert !exchActions.clientOnlyExchange() : exchActions;
cctx.exchange().exchangerBlockingSectionBegin();
try {
assert registerCachesFuture == null : "No caches registration should be scheduled before new caches have started.";
registerCachesFuture = cctx.affinity().onCacheChangeRequest(this, crd, exchActions);
}
catch (Exception e) {
if (reconnectOnError(e) || !isRollbackSupported())
// This exception will be handled by init() method.
throw e;
U.error(log, "Failed to initialize cache(s) (will try to rollback) [exchId=" + exchId +
", caches=" + exchActions.cacheGroupsToStart() + ']', e);
exchangeLocE = new IgniteCheckedException(
"Failed to initialize exchange locally [locNodeId=" + cctx.localNodeId() + "]", e);
exchangeGlobalExceptions.put(cctx.localNodeId(), exchangeLocE);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
/**
* @param crd Coordinator flag.
* @return Exchange type.
*/
private ExchangeType onCustomMessageNoAffinityChange(boolean crd) {
if (!forceAffReassignment)
cctx.affinity().onCustomMessageNoAffinityChange(this, crd, exchActions);
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
/**
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return Exchange type.
*/
private ExchangeType onAffinityChangeRequest(boolean crd) throws IgniteCheckedException {
assert affChangeMsg != null : this;
cctx.affinity().onChangeAffinityMessage(this, crd, affChangeMsg);
if (cctx.kernalContext().clientNode())
return ExchangeType.CLIENT;
return ExchangeType.ALL;
}
/**
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return Exchange type.
*/
private ExchangeType onClientNodeEvent(boolean crd) throws IgniteCheckedException {
assert firstDiscoEvt.eventNode().isClient() : this;
if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
onLeft();
assert !firstDiscoEvt.eventNode().isLocal() : firstDiscoEvt;
}
else
assert firstDiscoEvt.type() == EVT_NODE_JOINED || firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT : firstDiscoEvt;
cctx.affinity().onClientEvent(this, crd);
return firstDiscoEvt.eventNode().isLocal() ? ExchangeType.CLIENT : ExchangeType.NONE;
}
/**
* @param crd Coordinator flag.
* @throws IgniteCheckedException If failed.
* @return Exchange type.
*/
private ExchangeType onServerNodeEvent(boolean crd) throws IgniteCheckedException {
assert !firstDiscoEvt.eventNode().isClient() : this;
if (firstDiscoEvt.type() == EVT_NODE_LEFT || firstDiscoEvt.type() == EVT_NODE_FAILED) {
onLeft();
exchCtx.events().warnNoAffinityNodes(cctx);
centralizedAff = cctx.affinity().onCentralizedAffinityChange(this, crd);
}
else
cctx.affinity().onServerJoin(this, crd);
return cctx.kernalContext().clientNode() ? ExchangeType.CLIENT : ExchangeType.ALL;
}
/**
* @throws IgniteCheckedException If failed.
*/
private void clientOnlyExchange() throws IgniteCheckedException {
if (crd != null) {
assert !crd.isLocal() : crd;
cctx.exchange().exchangerBlockingSectionBegin();
try {
if (!centralizedAff)
sendLocalPartitions(crd);
initDone();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
else {
if (centralizedAff) { // Last server node failed.
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
GridAffinityAssignmentCache aff = grp.affinity();
aff.initialize(initialVersion(), aff.idealAssignment());
cctx.exchange().exchangerUpdateHeartbeat();
}
}
else
onAllServersLeft();
cctx.exchange().exchangerBlockingSectionBegin();
try {
onDone(initialVersion());
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
/**
* @throws IgniteCheckedException If failed.
*/
private void distributedExchange() throws IgniteCheckedException {
assert crd != null;
assert !cctx.kernalContext().clientNode();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
cctx.exchange().exchangerBlockingSectionBegin();
try {
grp.preloader().onTopologyChanged(this);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
cctx.database().releaseHistoryForPreloading();
// To correctly rebalance when persistence is enabled, it is necessary to reserve history within exchange.
partHistReserved = cctx.database().reserveHistoryForExchange();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
// Skipping wait on local join is available when all cluster nodes have the same protocol.
boolean skipWaitOnLocalJoin = cctx.exchange().latch().canSkipJoiningNodes(initialVersion())
&& localJoinExchange();
// Skip partition release if node has locally joined (it doesn't have any updates to be finished).
if (!skipWaitOnLocalJoin) {
boolean distributed = true;
// Do not perform distributed partition release in case of cluster activation.
if (activateCluster())
distributed = false;
// On first phase we wait for finishing all local tx updates, atomic updates and lock releases on all nodes.
waitPartitionRelease(distributed, true);
// Second phase is needed to wait for finishing all tx updates from primary to backup nodes remaining after first phase.
if (distributed)
waitPartitionRelease(false, false);
}
else {
if (log.isInfoEnabled())
log.info("Skipped waiting for partitions release future (local node is joining) " +
"[topVer=" + initialVersion() + "]");
}
boolean topChanged = firstDiscoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal() || cacheStopping(cacheCtx.cacheId()))
continue;
if (topChanged) {
// Partition release future is done so we can flush the write-behind store.
cctx.exchange().exchangerBlockingSectionBegin();
try {
cacheCtx.store().forceFlush();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
cctx.exchange().exchangerBlockingSectionBegin();
try {
/* It is necessary to run database callback before all topology callbacks.
In case of persistent store is enabled we first restore partitions presented on disk.
We need to guarantee that there are no partition state changes logged to WAL before this callback
to make sure that we correctly restored last actual states. */
cctx.database().beforeExchange(this);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
// Pre-create missing partitions using current affinity.
if (!exchCtx.mergeExchanges()) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
continue;
// It is possible affinity is not initialized yet if node joins to cluster.
if (grp.affinity().lastVersion().topologyVersion() > 0) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
}
// After all partitions have been restored and pre-created it's safe to make first checkpoint.
if (localJoinExchange() || activateCluster()) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
cctx.database().onStateRestored(initialVersion());
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
changeWalModeIfNeeded();
if (events().hasServerLeft())
finalizePartitionCounters();
cctx.exchange().exchangerBlockingSectionBegin();
try {
if (crd.isLocal()) {
if (remaining.isEmpty())
onAllReceived(null);
}
else
sendPartitions(crd);
initDone();
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
/**
* Try to start local snapshot operation if it is needed by discovery event
*/
private void tryToPerformLocalSnapshotOperation() {
try {
long start = U.currentTimeMillis();
IgniteInternalFuture fut = cctx.snapshot().tryStartLocalSnapshotOperation(firstDiscoEvt, exchId.topologyVersion());
if (fut != null) {
fut.get();
long end = U.currentTimeMillis();
if (log.isInfoEnabled())
log.info("Snapshot initialization completed [topVer=" + exchangeId().topologyVersion() +
", time=" + (end - start) + "ms]");
}
}
catch (IgniteCheckedException e) {
U.error(log, "Error while starting snapshot operation", e);
}
}
/**
* Change WAL mode if needed.
*/
private void changeWalModeIfNeeded() {
WalStateAbstractMessage msg = firstWalMessage();
if (msg != null) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
cctx.walState().onProposeExchange(msg.exchangeMessage());
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
/**
* Get first message if and only if this is WAL message.
*
* @return WAL message or {@code null}.
*/
@Nullable private WalStateAbstractMessage firstWalMessage() {
if (firstDiscoEvt != null && firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
DiscoveryCustomMessage customMsg = ((DiscoveryCustomEvent)firstDiscoEvt).customMessage();
if (customMsg instanceof WalStateAbstractMessage) {
WalStateAbstractMessage msg0 = (WalStateAbstractMessage)customMsg;
assert msg0.needExchange();
return msg0;
}
}
return null;
}
/**
* The main purpose of this method is to wait for all ongoing updates (transactional and atomic), initiated on
* the previous topology version, to finish to prevent inconsistencies during rebalancing and to prevent two
* different simultaneous owners of the same lock.
* For the exact list of the objects being awaited for see
* {@link GridCacheSharedContext#partitionReleaseFuture(AffinityTopologyVersion)} javadoc.
*
* @param distributed If {@code true} then node should wait for partition release completion on all other nodes.
* @param doRollback If {@code true} tries to rollback transactions which lock partitions. Avoids unnecessary calls
* of {@link org.apache.ignite.internal.processors.cache.transactions.IgniteTxManager#rollbackOnTopologyChange}
*
* @throws IgniteCheckedException If failed.
*/
private void waitPartitionRelease(boolean distributed, boolean doRollback) throws IgniteCheckedException {
Latch releaseLatch = null;
IgniteInternalFuture<?> partReleaseFut;
cctx.exchange().exchangerBlockingSectionBegin();
try {
// Wait for other nodes only on first phase.
if (distributed)
releaseLatch = cctx.exchange().latch().getOrCreate(DISTRIBUTED_LATCH_ID, initialVersion());
partReleaseFut = cctx.partitionReleaseFuture(initialVersion());
// Assign to class variable so it will be included into toString() method.
this.partReleaseFut = partReleaseFut;
if (exchId.isLeft())
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
if (log.isTraceEnabled())
log.trace("Before waiting for partition release future: " + this);
int dumpCnt = 0;
long nextDumpTime = 0;
IgniteConfiguration cfg = cctx.gridConfig();
long waitStart = U.currentTimeMillis();
long waitTimeout = 2 * cfg.getNetworkTimeout();
boolean txRolledBack = !doRollback;
while (true) {
// Read txTimeoutOnPME from configuration after every iteration.
long curTimeout = cfg.getTransactionConfiguration().getTxTimeoutOnPartitionMapExchange();
cctx.exchange().exchangerBlockingSectionBegin();
try {
// This avoids unnessesary waiting for rollback.
partReleaseFut.get(curTimeout > 0 && !txRolledBack ?
Math.min(curTimeout, waitTimeout) : waitTimeout, TimeUnit.MILLISECONDS);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
// Print pending transactions and locks that might have led to hang.
if (nextDumpTime <= U.currentTimeMillis()) {
dumpPendingObjects(partReleaseFut, curTimeout <= 0 && !txRolledBack);
nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, waitTimeout);
}
if (!txRolledBack && curTimeout > 0 && U.currentTimeMillis() - waitStart >= curTimeout) {
txRolledBack = true;
cctx.tm().rollbackOnTopologyChange(initialVersion());
}
}
catch (IgniteCheckedException e) {
U.warn(log,"Unable to await partitions release future", e);
throw e;
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
long waitEnd = U.currentTimeMillis();
if (log.isInfoEnabled()) {
long waitTime = (waitEnd - waitStart);
String futInfo = RELEASE_FUTURE_DUMP_THRESHOLD > 0 && waitTime > RELEASE_FUTURE_DUMP_THRESHOLD ?
partReleaseFut.toString() : "NA";
String mode = distributed ? "DISTRIBUTED" : "LOCAL";
if (log.isInfoEnabled())
log.info("Finished waiting for partition release future [topVer=" + exchangeId().topologyVersion() +
", waitTime=" + (waitEnd - waitStart) + "ms, futInfo=" + futInfo + ", mode=" + mode + "]");
}
IgniteInternalFuture<?> locksFut = cctx.mvcc().finishLocks(exchId.topologyVersion());
nextDumpTime = 0;
dumpCnt = 0;
while (true) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
locksFut.get(waitTimeout, TimeUnit.MILLISECONDS);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
if (nextDumpTime <= U.currentTimeMillis()) {
U.warn(log, "Failed to wait for locks release future. " +
"Dumping pending objects that might be the cause: " + cctx.localNodeId());
U.warn(log, "Locked keys:");
for (IgniteTxKey key : cctx.mvcc().lockedKeys())
U.warn(log, "Locked key: " + key);
for (IgniteTxKey key : cctx.mvcc().nearLockedKeys())
U.warn(log, "Locked near key: " + key);
Map<IgniteTxKey, Collection<GridCacheMvccCandidate>> locks =
cctx.mvcc().unfinishedLocks(exchId.topologyVersion());
for (Map.Entry<IgniteTxKey, Collection<GridCacheMvccCandidate>> e : locks.entrySet())
U.warn(log, "Awaited locked entry [key=" + e.getKey() + ", mvcc=" + e.getValue() + ']');
nextDumpTime = U.currentTimeMillis() + nextDumpTimeout(dumpCnt++, waitTimeout);
if (getBoolean(IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT, false))
U.dumpThreads(log);
}
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
if (releaseLatch == null) {
assert !distributed : "Partitions release latch must be initialized in distributed mode.";
return;
}
releaseLatch.countDown();
// For compatibility with old version where joining nodes are not waiting for latch.
if (localJoinExchange() && !cctx.exchange().latch().canSkipJoiningNodes(initialVersion()))
return;
try {
while (true) {
try {
cctx.exchange().exchangerBlockingSectionBegin();
try {
releaseLatch.await(waitTimeout, TimeUnit.MILLISECONDS);
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
if (log.isInfoEnabled())
log.info("Finished waiting for partitions release latch: " + releaseLatch);
break;
}
catch (IgniteFutureTimeoutCheckedException ignored) {
U.warn(log, "Unable to await partitions release latch within timeout: " + releaseLatch);
// Try to resend ack.
releaseLatch.countDown();
}
}
}
catch (IgniteCheckedException e) {
U.warn(log, "Stop waiting for partitions release latch: " + e.getMessage());
}
}
/**
*
*/
private void onLeft() {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
grp.preloader().unwindUndeploys();
cctx.exchange().exchangerUpdateHeartbeat();
}
cctx.mvcc().removeExplicitNodeLocks(exchId.nodeId(), exchId.topologyVersion());
}
/**
* @param partReleaseFut Partition release future.
* @param txTimeoutNotifyFlag If {@code true} print transaction rollback timeout on PME notification.
*/
private void dumpPendingObjects(IgniteInternalFuture<?> partReleaseFut, boolean txTimeoutNotifyFlag) {
U.warn(cctx.kernalContext().cluster().diagnosticLog(),
"Failed to wait for partition release future [topVer=" + initialVersion() +
", node=" + cctx.localNodeId() + "]");
if (txTimeoutNotifyFlag)
U.warn(cctx.kernalContext().cluster().diagnosticLog(), "Consider changing TransactionConfiguration." +
"txTimeoutOnPartitionMapExchange to non default value to avoid this message.");
U.warn(log, "Partition release future: " + partReleaseFut);
U.warn(cctx.kernalContext().cluster().diagnosticLog(),
"Dumping pending objects that might be the cause: ");
try {
cctx.exchange().dumpDebugInfo(this);
}
catch (Exception e) {
U.error(cctx.kernalContext().cluster().diagnosticLog(), "Failed to dump debug information: " + e, e);
}
}
/**
* @param grpId Cache group ID to check.
* @return {@code True} if cache group us stopping by this exchange.
*/
private boolean cacheGroupStopping(int grpId) {
return exchActions != null && exchActions.cacheGroupStopping(grpId);
}
/**
* @param cacheId Cache ID to check.
* @return {@code True} if cache is stopping by this exchange.
*/
private boolean cacheStopping(int cacheId) {
return exchActions != null && exchActions.cacheStopped(cacheId);
}
/**
* @return {@code True} if exchange for local node join.
*/
public boolean localJoinExchange() {
return firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isLocal();
}
/**
* @param node Target Node.
* @throws IgniteCheckedException If failed.
*/
private void sendLocalPartitions(ClusterNode node) throws IgniteCheckedException {
assert node != null;
long time = System.currentTimeMillis();
GridDhtPartitionsSingleMessage msg;
// Reset lost partitions before sending local partitions to coordinator.
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
resetLostPartitions(caches, false);
}
if (cctx.kernalContext().clientNode() || (dynamicCacheStartExchange() && exchangeLocE != null)) {
msg = new GridDhtPartitionsSingleMessage(exchangeId(),
cctx.kernalContext().clientNode(),
cctx.versions().last(),
true);
}
else {
msg = cctx.exchange().createPartitionsSingleMessage(exchangeId(),
false,
true,
node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0,
exchActions);
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
if (partHistReserved0 != null)
msg.partitionHistoryCounters(partHistReserved0);
}
if ((stateChangeExchange() || dynamicCacheStartExchange()) && exchangeLocE != null)
msg.setError(exchangeLocE);
else if (localJoinExchange())
msg.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
if (log.isTraceEnabled())
log.trace("Sending local partitions [nodeId=" + node.id() + ", exchId=" + exchId + ", msg=" + msg + ']');
try {
cctx.io().send(node, msg, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']');
}
if (log.isInfoEnabled())
log.info("Sending Single Message performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
* @param compress Message compress flag.
* @param newCntrMap {@code True} if possible to use {@link CachePartitionFullCountersMap}.
* @return Message.
*/
private GridDhtPartitionsFullMessage createPartitionsMessage(boolean compress,
boolean newCntrMap) {
GridCacheVersion last = lastVer.get();
GridDhtPartitionsFullMessage m = cctx.exchange().createPartitionsFullMessage(
compress,
newCntrMap,
exchangeId(),
last != null ? last : cctx.versions().last(),
partHistSuppliers,
partsToReload);
if (stateChangeExchange() && !F.isEmpty(exchangeGlobalExceptions))
m.setErrorsMap(exchangeGlobalExceptions);
return m;
}
/**
* @param fullMsg Message to send.
* @param nodes Nodes.
* @param mergedJoinExchMsgs Messages received from merged 'join node' exchanges.
* @param affinityForJoinedNodes Affinity if was requested by some nodes.
*/
private void sendAllPartitions(
GridDhtPartitionsFullMessage fullMsg,
Collection<ClusterNode> nodes,
Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs,
Map<Integer, CacheGroupAffinityMessage> affinityForJoinedNodes
) {
assert !nodes.contains(cctx.localNode());
if (log.isTraceEnabled()) {
log.trace("Sending full partition map [nodeIds=" + F.viewReadOnly(nodes, F.node2id()) +
", exchId=" + exchId + ", msg=" + fullMsg + ']');
}
// Find any single message with affinity request. This request exists only for newly joined nodes.
Optional<GridDhtPartitionsSingleMessage> singleMsgWithAffinityReq = nodes.stream()
.flatMap(node -> Optional.ofNullable(msgs.get(node.id()))
.filter(singleMsg -> singleMsg.cacheGroupsAffinityRequest() != null)
.map(Stream::of)
.orElse(Stream.empty()))
.findAny();
// Prepare full message for newly joined nodes with affinity request.
final GridDhtPartitionsFullMessage fullMsgWithAffinity = singleMsgWithAffinityReq
.filter(singleMessage -> affinityForJoinedNodes != null)
.map(singleMessage -> fullMsg.copy().joinedNodeAffinity(affinityForJoinedNodes))
.orElse(null);
// Prepare and send full messages for given nodes.
nodes.stream()
.map(node -> {
// No joined nodes, just send a regular full message.
if (fullMsgWithAffinity == null)
return new T2<>(node, fullMsg);
return new T2<>(
node,
// If single message contains affinity request, use special full message for such single messages.
Optional.ofNullable(msgs.get(node.id()))
.filter(singleMsg -> singleMsg.cacheGroupsAffinityRequest() != null)
.map(singleMsg -> fullMsgWithAffinity)
.orElse(fullMsg)
);
})
.map(nodeAndMsg -> {
ClusterNode node = nodeAndMsg.get1();
GridDhtPartitionsFullMessage fullMsgToSend = nodeAndMsg.get2();
// If exchange has merged, use merged version of exchange id.
GridDhtPartitionExchangeId sndExchId = mergedJoinExchMsgs != null
? Optional.ofNullable(mergedJoinExchMsgs.get(node.id()))
.map(GridDhtPartitionsAbstractMessage::exchangeId)
.orElse(exchangeId())
: exchangeId();
if (sndExchId != null && !sndExchId.equals(exchangeId())) {
GridDhtPartitionsFullMessage fullMsgWithUpdatedExchangeId = fullMsgToSend.copy();
fullMsgWithUpdatedExchangeId.exchangeId(sndExchId);
return new T2<>(node, fullMsgWithUpdatedExchangeId);
}
return new T2<>(node, fullMsgToSend);
})
.forEach(nodeAndMsg -> {
ClusterNode node = nodeAndMsg.get1();
GridDhtPartitionsFullMessage fullMsgToSend = nodeAndMsg.get2();
try {
cctx.io().send(node, fullMsgToSend, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send partitions, node failed: " + node);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send partitions [node=" + node + ']', e);
}
});
}
/**
* @param oldestNode Oldest node. Target node to send message to.
*/
private void sendPartitions(ClusterNode oldestNode) {
try {
sendLocalPartitions(oldestNode);
}
catch (ClusterTopologyCheckedException ignore) {
if (log.isDebugEnabled())
log.debug("Coordinator left during partition exchange [nodeId=" + oldestNode.id() +
", exchId=" + exchId + ']');
}
catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else {
U.error(log, "Failed to send local partitions to coordinator [crd=" + oldestNode.id() +
", exchId=" + exchId + ']', e);
}
}
}
/**
* @return {@code True} if exchange triggered by server node join or fail.
*/
public boolean serverNodeDiscoveryEvent() {
assert exchCtx != null;
return exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft();
}
/** {@inheritDoc} */
@Override public boolean exchangeDone() {
return done.get();
}
/**
* Finish merged future to allow GridCachePartitionExchangeManager.ExchangeFutureSet cleanup.
*/
public void finishMerged() {
super.onDone(null, null);
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable AffinityTopologyVersion res, @Nullable Throwable err) {
assert res != null || err != null : "TopVer=" + res + ", err=" + err;
if (isDone() || !done.compareAndSet(false, true))
return false;
if (log.isInfoEnabled()) {
log.info("Finish exchange future [startVer=" + initialVersion() +
", resVer=" + res +
", err=" + err + ']');
}
assert res != null || err != null;
waitUntilNewCachesAreRegistered();
if (err == null &&
!cctx.kernalContext().clientNode() &&
(serverNodeDiscoveryEvent() || affChangeMsg != null)) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.affinityNode() || cacheCtx.isLocal())
continue;
cacheCtx.continuousQueries().flushBackupQueue(res);
}
}
if (err == null) {
if (centralizedAff || forceAffReassignment) {
assert !exchCtx.mergeExchanges();
Collection<CacheGroupContext> grpToRefresh = U.newHashSet(cctx.cache().cacheGroups().size());
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
try {
if (grp.topology().initPartitionsWhenAffinityReady(res, this))
grpToRefresh.add(grp);
}
catch (IgniteInterruptedCheckedException e) {
U.error(log, "Failed to initialize partitions.", e);
}
}
if (!grpToRefresh.isEmpty())
cctx.exchange().refreshPartitions(grpToRefresh);
}
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
GridCacheContext drCacheCtx = cacheCtx.isNear() ? cacheCtx.near().dht().context() : cacheCtx;
if (drCacheCtx.isDrEnabled()) {
try {
drCacheCtx.dr().onExchange(res, exchId.isLeft(), activateCluster());
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to notify DR: " + e, e);
}
}
}
if (serverNodeDiscoveryEvent() || localJoinExchange())
detectLostPartitions(res, false);
Map<Integer, CacheGroupValidation> m = U.newHashMap(cctx.cache().cacheGroups().size());
for (CacheGroupContext grp : cctx.cache().cacheGroups())
m.put(grp.groupId(), validateCacheGroup(grp, events().lastEvent().topologyNodes()));
grpValidRes = m;
}
if (!cctx.localNode().isClient())
tryToPerformLocalSnapshotOperation();
if (err == null)
cctx.coordinators().onExchangeDone(exchCtx.events().discoveryCache());
// Create and destory caches and cache proxies.
cctx.cache().onExchangeDone(initialVersion(), exchActions, err);
cctx.kernalContext().authentication().onActivate();
Map<T2<Integer, Integer>, Long> localReserved = partHistSuppliers.getReservations(cctx.localNodeId());
if (localReserved != null) {
for (Map.Entry<T2<Integer, Integer>, Long> e : localReserved.entrySet()) {
boolean success = cctx.database().reserveHistoryForPreloading(
e.getKey().get1(), e.getKey().get2(), e.getValue());
if (!success) {
// TODO: how to handle?
err = new IgniteCheckedException("Could not reserve history");
}
}
}
cctx.database().releaseHistoryForExchange();
if (err == null) {
cctx.database().rebuildIndexesIfNeeded(this);
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal())
grp.topology().onExchangeDone(this, grp.affinity().readyAffinity(res), false);
}
cctx.walState().changeLocalStatesOnExchangeDone(res);
}
final Throwable err0 = err;
// Should execute this listener first, before any external listeners.
// Listeners use stack as data structure.
listen(f -> {
// Update last finished future in the first.
cctx.exchange().lastFinishedFuture(this);
// Complete any affReady futures and update last exchange done version.
cctx.exchange().onExchangeDone(res, initialVersion(), err0);
cctx.cache().completeProxyRestart(resolveCacheRequests(exchActions), initialVersion(), res);
if (exchActions != null && err0 == null)
exchActions.completeRequestFutures(cctx, null);
if (stateChangeExchange() && err0 == null)
cctx.kernalContext().state().onStateChangeExchangeDone(exchActions.stateChangeRequest());
});
if (super.onDone(res, err)) {
afterLsnrCompleteFut.onDone();
if (log.isDebugEnabled()) {
log.debug("Completed partition exchange [localNode=" + cctx.localNodeId() + ", exchange= " + this +
", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
}
else if (log.isInfoEnabled()) {
log.info("Completed partition exchange [localNode=" + cctx.localNodeId() +
", exchange=" + shortInfo() + ", topVer=" + topologyVersion() +
", durationFromInit=" + (U.currentTimeMillis() - initTs) + ']');
}
initFut.onDone(err == null);
if (exchCtx != null && exchCtx.events().hasServerLeft()) {
ExchangeDiscoveryEvents evts = exchCtx.events();
for (DiscoveryEvent evt : evts.events()) {
if (serverLeftEvent(evt)) {
for (CacheGroupContext grp : cctx.cache().cacheGroups())
grp.affinityFunction().removeNode(evt.eventNode().id());
}
}
}
if (firstDiscoEvt instanceof DiscoveryCustomEvent)
((DiscoveryCustomEvent)firstDiscoEvt).customMessage(null);
if (err == null) {
if (exchCtx != null && (exchCtx.events().hasServerLeft() || exchCtx.events().hasServerJoin())) {
ExchangeDiscoveryEvents evts = exchCtx.events();
for (DiscoveryEvent evt : evts.events()) {
if (serverLeftEvent(evt) || serverJoinEvent(evt))
logExchange(evt);
}
}
}
return true;
}
return false;
}
/**
* @param exchangeActions Exchange actions.
* @return Map of cache names and start descriptors.
*/
private Map<String, DynamicCacheChangeRequest> resolveCacheRequests(ExchangeActions exchangeActions) {
if (exchangeActions == null)
return Collections.emptyMap();
return exchangeActions.cacheStartRequests()
.stream()
.map(ExchangeActions.CacheActionData::request)
.collect(Collectors.toMap(DynamicCacheChangeRequest::cacheName, r -> r));
}
/**
* Method waits for new caches registration and cache configuration persisting to disk.
*/
private void waitUntilNewCachesAreRegistered() {
try {
IgniteInternalFuture<?> registerCachesFut = registerCachesFuture;
if (registerCachesFut != null && !registerCachesFut.isDone()) {
final int timeout = Math.max(1000,
(int)(cctx.kernalContext().config().getFailureDetectionTimeout() / 2));
for (;;) {
cctx.exchange().exchangerBlockingSectionBegin();
try {
registerCachesFut.get(timeout, TimeUnit.SECONDS);
break;
}
catch (IgniteFutureTimeoutCheckedException te) {
List<String> cacheNames = exchActions.cacheStartRequests().stream()
.map(req -> req.descriptor().cacheName())
.collect(Collectors.toList());
U.warn(log, "Failed to wait for caches configuration registration and saving within timeout. " +
"Probably disk is too busy or slow." +
"[caches=" + cacheNames + "]");
}
finally {
cctx.exchange().exchangerBlockingSectionEnd();
}
}
}
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to wait for caches registration and saving", e);
}
}
/**
* Log exchange event.
*
* @param evt Discovery event.
*/
private void logExchange(DiscoveryEvent evt) {
if (cctx.kernalContext().state().publicApiActiveState(false) && cctx.wal() != null) {
if (cctx.wal().serializerVersion() > 1)
try {
ExchangeRecord.Type type = null;
if (evt.type() == EVT_NODE_JOINED)
type = ExchangeRecord.Type.JOIN;
else if (evt.type() == EVT_NODE_LEFT || evt.type() == EVT_NODE_FAILED)
type = ExchangeRecord.Type.LEFT;
BaselineTopology blt = cctx.kernalContext().state().clusterState().baselineTopology();
if (type != null && blt != null) {
Short constId = blt.consistentIdMapping().get(evt.eventNode().consistentId());
if (constId != null)
cctx.wal().log(new ExchangeRecord(constId, type));
}
}
catch (IgniteCheckedException e) {
U.error(log, "Fail during log exchange record.", e);
}
}
}
/**
* Cleans up resources to avoid excessive memory usage.
*/
public void cleanUp() {
pendingSingleMsgs.clear();
fullMsgs.clear();
msgs.clear();
crd = null;
partReleaseFut = null;
exchActions = null;
mergedJoinExchMsgs = null;
pendingJoinMsg = null;
exchCtx = null;
newCrdFut = null;
exchangeLocE = null;
exchangeGlobalExceptions.clear();
}
/**
* @param ver Version.
*/
private void updateLastVersion(GridCacheVersion ver) {
assert ver != null;
while (true) {
GridCacheVersion old = lastVer.get();
if (old == null || Long.compare(old.order(), ver.order()) < 0) {
if (lastVer.compareAndSet(old, ver))
break;
}
else
break;
}
}
/**
* Records that this exchange if merged with another 'node join' exchange.
*
* @param node Joined node.
* @param msg Joined node message if already received.
* @return {@code True} if need to wait for message from joined server node.
*/
private boolean addMergedJoinExchange(ClusterNode node, @Nullable GridDhtPartitionsSingleMessage msg) {
assert Thread.holdsLock(mux);
assert node != null;
assert state == ExchangeLocalState.CRD : state;
if (msg == null && newCrdFut != null)
msg = newCrdFut.joinExchangeMessage(node.id());
UUID nodeId = node.id();
boolean wait = false;
if (node.isClient()) {
if (msg != null)
waitAndReplyToNode(nodeId, msg);
}
else {
if (mergedJoinExchMsgs == null)
mergedJoinExchMsgs = new LinkedHashMap<>();
if (msg != null) {
assert msg.exchangeId().topologyVersion().equals(new AffinityTopologyVersion(node.order()));
if (log.isInfoEnabled()) {
log.info("Merge server join exchange, message received [curFut=" + initialVersion() +
", node=" + nodeId + ']');
}
mergedJoinExchMsgs.put(nodeId, msg);
}
else {
if (cctx.discovery().alive(nodeId)) {
if (log.isInfoEnabled()) {
log.info("Merge server join exchange, wait for message [curFut=" + initialVersion() +
", node=" + nodeId + ']');
}
wait = true;
mergedJoinExchMsgs.put(nodeId, null);
awaitMergedMsgs++;
}
else {
if (log.isInfoEnabled()) {
log.info("Merge server join exchange, awaited node left [curFut=" + initialVersion() +
", node=" + nodeId + ']');
}
}
}
}
return wait;
}
/**
* Merges this exchange with given one.
*
* @param fut Current exchange to merge with.
* @return {@code True} if need wait for message from joined server node.
*/
public boolean mergeJoinExchange(GridDhtPartitionsExchangeFuture fut) {
boolean wait;
synchronized (mux) {
assert (!isDone() && !initFut.isDone()) || cctx.kernalContext().isStopping() : this;
assert (mergedWith == null && state == null) || cctx.kernalContext().isStopping() : this;
state = ExchangeLocalState.MERGED;
mergedWith = fut;
ClusterNode joinedNode = firstDiscoEvt.eventNode();
wait = fut.addMergedJoinExchange(joinedNode, pendingJoinMsg);
}
return wait;
}
/**
* @param fut Current future.
* @return Pending join request if any.
*/
@Nullable public GridDhtPartitionsSingleMessage mergeJoinExchangeOnDone(GridDhtPartitionsExchangeFuture fut) {
synchronized (mux) {
assert !isDone();
assert !initFut.isDone();
assert mergedWith == null;
assert state == null;
state = ExchangeLocalState.MERGED;
mergedWith = fut;
return pendingJoinMsg;
}
}
/**
* @param node Sender node.
* @param msg Message.
*/
private void processMergedMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
if (msg.client()) {
waitAndReplyToNode(node.id(), msg);
return;
}
boolean done = false;
FinishState finishState0 = null;
synchronized (mux) {
if (state == ExchangeLocalState.DONE) {
assert finishState != null;
finishState0 = finishState;
}
else {
boolean process = mergedJoinExchMsgs != null &&
mergedJoinExchMsgs.containsKey(node.id()) &&
mergedJoinExchMsgs.get(node.id()) == null;
if (log.isInfoEnabled()) {
log.info("Merge server join exchange, received message [curFut=" + initialVersion() +
", node=" + node.id() +
", msgVer=" + msg.exchangeId().topologyVersion() +
", process=" + process +
", awaited=" + awaitMergedMsgs + ']');
}
if (process) {
mergedJoinExchMsgs.put(node.id(), msg);
assert awaitMergedMsgs > 0 : awaitMergedMsgs;
awaitMergedMsgs--;
done = awaitMergedMsgs == 0;
}
}
}
if (finishState0 != null) {
sendAllPartitionsToNode(finishState0, msg, node.id());
return;
}
if (done)
finishExchangeOnCoordinator(null);
}
/**
* Method is called on coordinator in situation when initial ExchangeFuture created on client join event was preempted
* from exchange history because of IGNITE_EXCHANGE_HISTORY_SIZE property.
*
* @param node Client node that should try to reconnect to the cluster.
* @param msg Single message received from the client which didn't find original ExchangeFuture.
*/
public void forceClientReconnect(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
Exception reconnectException = new IgniteNeedReconnectException(node, null);
exchangeGlobalExceptions.put(node.id(), reconnectException);
onDone(null, reconnectException);
GridDhtPartitionsFullMessage fullMsg = createPartitionsMessage(true, false);
fullMsg.setErrorsMap(exchangeGlobalExceptions);
try {
cctx.io().send(node, fullMsg, SYSTEM_POOL);
if (log.isDebugEnabled())
log.debug("Full message for reconnect client was sent to node: " + node + ", fullMsg: " + fullMsg);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send reconnect client message [node=" + node + ']', e);
}
}
/**
* Processing of received single message. Actual processing in future may be delayed if init method was not
* completed, see {@link #initDone()}
*
* @param node Sender node.
* @param msg Single partition info.
*/
public void onReceiveSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
assert !node.isDaemon() : node;
assert msg != null;
assert exchId.equals(msg.exchangeId()) : msg;
assert !cctx.kernalContext().clientNode();
if (msg.restoreState()) {
InitNewCoordinatorFuture newCrdFut0;
synchronized (mux) {
assert newCrdFut != null;
newCrdFut0 = newCrdFut;
}
newCrdFut0.onMessage(node, msg);
return;
}
if (!msg.client()) {
assert msg.lastVersion() != null : msg;
updateLastVersion(msg.lastVersion());
}
GridDhtPartitionsExchangeFuture mergedWith0 = null;
synchronized (mux) {
if (state == ExchangeLocalState.MERGED) {
assert mergedWith != null;
mergedWith0 = mergedWith;
}
else {
assert state != ExchangeLocalState.CLIENT;
if (exchangeId().isJoined() && node.id().equals(exchId.nodeId()))
pendingJoinMsg = msg;
}
}
if (mergedWith0 != null) {
mergedWith0.processMergedMessage(node, msg);
if (log.isDebugEnabled())
log.debug("Merged message processed, message handling finished: " + msg);
return;
}
initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> f) {
try {
if (!f.get())
return;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to initialize exchange future: " + this, e);
return;
}
processSingleMessage(node.id(), msg);
}
});
}
/**
* Tries to fast reply with {@link GridDhtPartitionsFullMessage} on received single message
* in case of exchange future has already completed.
*
* @param node Cluster node which sent single message.
* @param msg Single message.
* @return {@code true} if fast reply succeed.
*/
public boolean fastReplyOnSingleMessage(final ClusterNode node, final GridDhtPartitionsSingleMessage msg) {
GridDhtPartitionsExchangeFuture futToFastReply = this;
ExchangeLocalState currState;
synchronized (mux) {
currState = state;
if (currState == ExchangeLocalState.MERGED)
futToFastReply = mergedWith;
}
if (currState == ExchangeLocalState.DONE)
futToFastReply.processSingleMessage(node.id(), msg);
else if (currState == ExchangeLocalState.MERGED)
futToFastReply.processMergedMessage(node, msg);
return currState == ExchangeLocalState.MERGED || currState == ExchangeLocalState.DONE;
}
/**
* @param nodeId Node ID.
* @param msg Client's message.
*/
public void waitAndReplyToNode(final UUID nodeId, final GridDhtPartitionsSingleMessage msg) {
if (log.isDebugEnabled())
log.debug("Single message will be handled on completion of exchange future: " + this);
listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
if (cctx.kernalContext().isStopping())
return;
// DynamicCacheChangeFailureMessage was sent.
// Thus, there is no need to create and send GridDhtPartitionsFullMessage.
if (cacheChangeFailureMsgSent)
return;
FinishState finishState0;
synchronized (mux) {
finishState0 = finishState;
}
if (finishState0 == null) {
assert firstDiscoEvt.type() == EVT_NODE_JOINED && firstDiscoEvt.eventNode().isClient() : this;
ClusterNode node = cctx.node(nodeId);
if (node == null) {
if (log.isDebugEnabled()) {
log.debug("No node found for nodeId: " +
nodeId +
", handling of single message will be stopped: " +
msg
);
}
return;
}
finishState0 = new FinishState(cctx.localNodeId(),
initialVersion(),
createPartitionsMessage(true, node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0));
}
sendAllPartitionsToNode(finishState0, msg, nodeId);
}
});
}
/**
* Note this method performs heavy updatePartitionSingleMap operation, this operation is moved out from the
* synchronized block. Only count of such updates {@link #pendingSingleUpdates} is managed under critical section.
*
* @param nodeId Sender node.
* @param msg Partition single message.
*/
private void processSingleMessage(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
if (msg.client()) {
waitAndReplyToNode(nodeId, msg);
return;
}
boolean allReceived = false; // Received all expected messages.
boolean updateSingleMap = false;
FinishState finishState0 = null;
synchronized (mux) {
assert crd != null;
switch (state) {
case DONE: {
if (log.isInfoEnabled()) {
log.info("Received single message, already done [ver=" + initialVersion() +
", node=" + nodeId + ']');
}
assert finishState != null;
finishState0 = finishState;
break;
}
case CRD: {
assert crd.isLocal() : crd;
if (remaining.remove(nodeId)) {
updateSingleMap = true;
pendingSingleUpdates++;
if ((stateChangeExchange() || dynamicCacheStartExchange()) && msg.getError() != null)
exchangeGlobalExceptions.put(nodeId, msg.getError());
allReceived = remaining.isEmpty();
if (log.isInfoEnabled()) {
log.info("Coordinator received single message [ver=" + initialVersion() +
", node=" + nodeId +
", allReceived=" + allReceived + ']');
}
}
else if (log.isDebugEnabled())
log.debug("Coordinator received single message it didn't expect to receive: " + msg);
break;
}
case SRV:
case BECOME_CRD: {
if (log.isInfoEnabled()) {
log.info("Non-coordinator received single message [ver=" + initialVersion() +
", node=" + nodeId + ", state=" + state + ']');
}
pendingSingleMsgs.put(nodeId, msg);
break;
}
default:
assert false : state;
}
}
if (finishState0 != null) {
// DynamicCacheChangeFailureMessage was sent.
// Thus, there is no need to create and send GridDhtPartitionsFullMessage.
if (!cacheChangeFailureMsgSent)
sendAllPartitionsToNode(finishState0, msg, nodeId);
return;
}
if (updateSingleMap) {
try {
// Do not update partition map, in case cluster transitioning to inactive state.
if (!deactivateCluster())
updatePartitionSingleMap(nodeId, msg);
}
finally {
synchronized (mux) {
assert pendingSingleUpdates > 0;
pendingSingleUpdates--;
if (pendingSingleUpdates == 0)
mux.notifyAll();
}
}
}
if (allReceived) {
if (!awaitSingleMapUpdates())
return;
onAllReceived(null);
}
}
/**
* @return {@code False} if interrupted.
*/
private boolean awaitSingleMapUpdates() {
try {
synchronized (mux) {
while (pendingSingleUpdates > 0)
U.wait(mux);
}
return true;
}
catch (IgniteInterruptedCheckedException e) {
U.warn(log, "Failed to wait for partition map updates, thread was interrupted: " + e);
return false;
}
}
/**
* @param fut Affinity future.
*/
private void onAffinityInitialized(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
try {
assert fut.isDone();
Map<Integer, Map<Integer, List<UUID>>> assignmentChange = fut.get();
GridDhtPartitionsFullMessage m = createPartitionsMessage(false, false);
CacheAffinityChangeMessage msg = new CacheAffinityChangeMessage(exchId, m, assignmentChange);
if (log.isDebugEnabled())
log.debug("Centralized affinity exchange, send affinity change message: " + msg);
cctx.discovery().sendCustomEvent(msg);
}
catch (IgniteCheckedException e) {
onDone(e);
}
}
/**
* @param top Topology.
*/
private void assignPartitionSizes(GridDhtPartitionTopology top) {
Map<Integer, Long> partSizes = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
GridDhtPartitionsSingleMessage singleMsg = e.getValue();
GridDhtPartitionMap partMap = singleMsg.partitions().get(top.groupId());
if (partMap == null)
continue;
for (Map.Entry<Integer, GridDhtPartitionState> e0 : partMap.entrySet()) {
int p = e0.getKey();
GridDhtPartitionState state = e0.getValue();
if (state == GridDhtPartitionState.OWNING)
partSizes.put(p, singleMsg.partitionSizes(top.groupId()).get(p));
}
}
for (GridDhtLocalPartition locPart : top.currentLocalPartitions()) {
if (locPart.state() == GridDhtPartitionState.OWNING)
partSizes.put(locPart.id(), locPart.fullSize());
}
top.globalPartSizes(partSizes);
}
/**
* Collects and determines new owners of partitions for all nodes for given {@code top}.
*
* @param top Topology to assign.
*/
private void assignPartitionStates(GridDhtPartitionTopology top) {
Map<Integer, CounterWithNodes> maxCntrs = new HashMap<>();
Map<Integer, Long> minCntrs = new HashMap<>();
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
CachePartitionPartialCountersMap nodeCntrs = e.getValue().partitionUpdateCounters(top.groupId(),
top.partitions());
assert nodeCntrs != null;
for (int i = 0; i < nodeCntrs.size(); i++) {
int p = nodeCntrs.partitionAt(i);
UUID uuid = e.getKey();
GridDhtPartitionState state = top.partitionState(uuid, p);
if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
continue;
long cntr = state == GridDhtPartitionState.MOVING ?
nodeCntrs.initialUpdateCounterAt(i) :
nodeCntrs.updateCounterAt(i);
Long minCntr = minCntrs.get(p);
if (minCntr == null || minCntr > cntr)
minCntrs.put(p, cntr);
if (state != GridDhtPartitionState.OWNING)
continue;
CounterWithNodes maxCntr = maxCntrs.get(p);
if (maxCntr == null || cntr > maxCntr.cnt)
maxCntrs.put(p, new CounterWithNodes(cntr, e.getValue().partitionSizes(top.groupId()).get(p), uuid));
else if (cntr == maxCntr.cnt)
maxCntr.nodes.add(uuid);
}
}
// Also must process counters from the local node.
for (GridDhtLocalPartition part : top.currentLocalPartitions()) {
GridDhtPartitionState state = top.partitionState(cctx.localNodeId(), part.id());
if (state != GridDhtPartitionState.OWNING && state != GridDhtPartitionState.MOVING)
continue;
final long cntr = state == GridDhtPartitionState.MOVING ? part.initialUpdateCounter() : part.updateCounter();
Long minCntr = minCntrs.get(part.id());
if (minCntr == null || minCntr > cntr)
minCntrs.put(part.id(), cntr);
if (state != GridDhtPartitionState.OWNING)
continue;
CounterWithNodes maxCntr = maxCntrs.get(part.id());
if (maxCntr == null && cntr == 0) {
CounterWithNodes cntrObj = new CounterWithNodes(0, 0L, cctx.localNodeId());
for (UUID nodeId : msgs.keySet()) {
if (top.partitionState(nodeId, part.id()) == GridDhtPartitionState.OWNING)
cntrObj.nodes.add(nodeId);
}
maxCntrs.put(part.id(), cntrObj);
}
else if (maxCntr == null || cntr > maxCntr.cnt)
maxCntrs.put(part.id(), new CounterWithNodes(cntr, part.fullSize(), cctx.localNodeId()));
else if (cntr == maxCntr.cnt)
maxCntr.nodes.add(cctx.localNodeId());
}
Map<Integer, Map<Integer, Long>> partHistReserved0 = partHistReserved;
Map<Integer, Long> localReserved = partHistReserved0 != null ? partHistReserved0.get(top.groupId()) : null;
Set<Integer> haveHistory = new HashSet<>();
for (Map.Entry<Integer, Long> e : minCntrs.entrySet()) {
int p = e.getKey();
long minCntr = e.getValue();
CounterWithNodes maxCntrObj = maxCntrs.get(p);
long maxCntr = maxCntrObj != null ? maxCntrObj.cnt : 0;
// If minimal counter is zero, do clean preloading.
if (minCntr == 0 || minCntr == maxCntr)
continue;
if (localReserved != null) {
Long localCntr = localReserved.get(p);
if (localCntr != null && localCntr <= minCntr && maxCntrObj.nodes.contains(cctx.localNodeId())) {
partHistSuppliers.put(cctx.localNodeId(), top.groupId(), p, localCntr);
haveHistory.add(p);
continue;
}
}
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e0 : msgs.entrySet()) {
Long histCntr = e0.getValue().partitionHistoryCounters(top.groupId()).get(p);
if (histCntr != null && histCntr <= minCntr && maxCntrObj.nodes.contains(e0.getKey())) {
partHistSuppliers.put(e0.getKey(), top.groupId(), p, histCntr);
haveHistory.add(p);
break;
}
}
}
Map<Integer, Set<UUID>> ownersByUpdCounters = new HashMap<>(maxCntrs.size());
for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
ownersByUpdCounters.put(e.getKey(), e.getValue().nodes);
Map<Integer, Long> partSizes = new HashMap<>(maxCntrs.size());
for (Map.Entry<Integer, CounterWithNodes> e : maxCntrs.entrySet())
partSizes.put(e.getKey(), e.getValue().size);
top.globalPartSizes(partSizes);
Map<UUID, Set<Integer>> partitionsToRebalance = top.resetOwners(ownersByUpdCounters, haveHistory);
for (Map.Entry<UUID, Set<Integer>> e : partitionsToRebalance.entrySet()) {
UUID nodeId = e.getKey();
Set<Integer> parts = e.getValue();
for (int part : parts)
partsToReload.put(nodeId, top.groupId(), part);
}
}
/**
* Detect lost partitions.
*
* @param resTopVer Result topology version.
* @param crd {@code True} if run on coordinator.
*/
private void detectLostPartitions(AffinityTopologyVersion resTopVer, boolean crd) {
boolean detected = false;
long time = System.currentTimeMillis();
synchronized (cctx.exchange().interruptLock()) {
if (Thread.currentThread().isInterrupted())
return;
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (!grp.isLocal()) {
// Do not trigger lost partition events on start.
boolean event = !localJoinExchange() && !activateCluster();
boolean detectedOnGrp = grp.topology().detectLostPartitions(resTopVer, event ? events().lastEvent() : null);
detected |= detectedOnGrp;
}
}
if (crd) {
for (GridClientPartitionTopology top : cctx.exchange().clientTopologies())
top.detectLostPartitions(resTopVer, null);
}
}
if (detected) {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
"Lost partitions detect on " + resTopVer + "]");
cctx.exchange().scheduleResendPartitions();
}
if (log.isInfoEnabled())
log.info("Detecting lost partitions performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
* @param cacheNames Cache names.
* @param crd {@code True} if run on coordinator.
*/
private void resetLostPartitions(Collection<String> cacheNames, boolean crd) {
assert !exchCtx.mergeExchanges();
synchronized (cctx.exchange().interruptLock()) {
if (Thread.currentThread().isInterrupted())
return;
for (String cacheName : cacheNames) {
DynamicCacheDescriptor cacheDesc = cctx.affinity().caches().get(CU.cacheId(cacheName));
if (cacheDesc == null || cacheDesc.cacheConfiguration().getCacheMode() == CacheMode.LOCAL)
continue;
CacheGroupContext grp = cctx.cache().cacheGroup(cacheDesc.groupId());
if (grp != null)
grp.topology().resetLostPartitions(initialVersion());
else if (crd) {
GridDhtPartitionTopology top = cctx.exchange().clientTopology(cacheDesc.groupId(), context().events().discoveryCache());
top.resetLostPartitions(initialVersion());
}
}
}
}
/**
* Creates an IgniteCheckedException that is used as root cause of the exchange initialization failure.
* This method aggregates all the exceptions provided from all participating nodes.
*
* @param globalExceptions collection exceptions from all participating nodes.
* @return exception that represents a cause of the exchange initialization failure.
*/
private IgniteCheckedException createExchangeException(Map<UUID, Exception> globalExceptions) {
IgniteCheckedException ex = new IgniteCheckedException("Failed to complete exchange process.");
for (Map.Entry<UUID, Exception> entry : globalExceptions.entrySet())
if (ex != entry.getValue())
ex.addSuppressed(entry.getValue());
return ex;
}
/**
* @return {@code true} if the given {@code discoEvt} supports the rollback procedure.
*/
private boolean isRollbackSupported() {
if (!firstEvtDiscoCache.checkAttribute(ATTR_DYNAMIC_CACHE_START_ROLLBACK_SUPPORTED, Boolean.TRUE))
return false;
// Currently the rollback process is supported for dynamically started caches only.
return firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && dynamicCacheStartExchange();
}
/**
* Sends {@link DynamicCacheChangeFailureMessage} to all participated nodes
* that represents a cause of exchange failure.
*/
private void sendExchangeFailureMessage() {
assert crd != null && crd.isLocal();
try {
IgniteCheckedException err = createExchangeException(exchangeGlobalExceptions);
List<String> cacheNames = new ArrayList<>(exchActions.cacheStartRequests().size());
for (ExchangeActions.CacheActionData actionData : exchActions.cacheStartRequests())
cacheNames.add(actionData.request().cacheName());
DynamicCacheChangeFailureMessage msg = new DynamicCacheChangeFailureMessage(
cctx.localNode(), exchId, err, cacheNames);
if (log.isDebugEnabled())
log.debug("Dynamic cache change failed (send message to all participating nodes): " + msg);
cacheChangeFailureMsgSent = true;
cctx.discovery().sendCustomEvent(msg);
return;
}
catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else
onDone(e);
}
}
/**
* @param sndResNodes Additional nodes to send finish message to.
*/
private void onAllReceived(@Nullable Collection<ClusterNode> sndResNodes) {
try {
assert crd.isLocal();
assert partHistSuppliers.isEmpty() : partHistSuppliers;
if (!exchCtx.mergeExchanges() && !crd.equals(events().discoveryCache().serverNodes().get(0))) {
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal())
continue;
// It is possible affinity is not initialized.
// For example, dynamic cache start failed.
if (grp.affinity().lastVersion().topologyVersion() > 0)
grp.topology().beforeExchange(this, !centralizedAff && !forceAffReassignment, false);
else
assert exchangeLocE != null :
"Affinity is not calculated for the cache group [groupName=" + grp.name() + "]";
}
}
if (exchCtx.mergeExchanges()) {
if (log.isInfoEnabled())
log.info("Coordinator received all messages, try merge [ver=" + initialVersion() + ']');
long time = System.currentTimeMillis();
boolean finish = cctx.exchange().mergeExchangesOnCoordinator(this);
if (log.isInfoEnabled())
log.info("Exchanges merging performed in " + (System.currentTimeMillis() - time) + " ms.");
if (!finish)
return;
}
finishExchangeOnCoordinator(sndResNodes);
}
catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else
onDone(e);
}
}
/**
* @param sndResNodes Additional nodes to send finish message to.
*/
private void finishExchangeOnCoordinator(@Nullable Collection<ClusterNode> sndResNodes) {
try {
if (!F.isEmpty(exchangeGlobalExceptions) && dynamicCacheStartExchange() && isRollbackSupported()) {
sendExchangeFailureMessage();
return;
}
AffinityTopologyVersion resTopVer = exchCtx.events().topologyVersion();
if (log.isInfoEnabled()) {
log.info("finishExchangeOnCoordinator [topVer=" + initialVersion() +
", resVer=" + resTopVer + ']');
}
Map<Integer, CacheGroupAffinityMessage> idealAffDiff = null;
long time = System.currentTimeMillis();
if (exchCtx.mergeExchanges()) {
synchronized (mux) {
if (mergedJoinExchMsgs != null) {
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) {
msgs.put(e.getKey(), e.getValue());
updatePartitionSingleMap(e.getKey(), e.getValue());
}
}
}
assert exchCtx.events().hasServerJoin() || exchCtx.events().hasServerLeft();
exchCtx.events().processEvents(this);
if (exchCtx.events().hasServerLeft())
idealAffDiff = cctx.affinity().onServerLeftWithExchangeMergeProtocol(this);
else
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, true);
for (CacheGroupDescriptor desc : cctx.affinity().cacheGroups().values()) {
if (desc.config().getCacheMode() == CacheMode.LOCAL)
continue;
CacheGroupContext grp = cctx.cache().cacheGroup(desc.groupId());
GridDhtPartitionTopology top = grp != null ? grp.topology() :
cctx.exchange().clientTopology(desc.groupId(), events().discoveryCache());
top.beforeExchange(this, true, true);
}
}
if (log.isInfoEnabled())
log.info("Affinity changes (coordinator) applied in " + (System.currentTimeMillis() - time) + " ms.");
Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
GridDhtPartitionsSingleMessage msg = e.getValue();
// Apply update counters after all single messages are received.
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
GridDhtPartitionTopology top = grp != null ? grp.topology() :
cctx.exchange().clientTopology(grpId, events().discoveryCache());
CachePartitionPartialCountersMap cntrs = msg.partitionUpdateCounters(grpId,
top.partitions());
if (cntrs != null)
top.collectUpdateCounters(cntrs);
}
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
if (affReq != null) {
joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
resTopVer,
affReq,
joinedNodeAff);
}
}
validatePartitionsState();
if (firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT) {
assert firstDiscoEvt instanceof DiscoveryCustomEvent;
assert !events().hasServerJoin() && !events().hasServerLeft();
if (activateCluster() || changedBaseline())
assignPartitionsStates();
DiscoveryCustomMessage discoveryCustomMessage = ((DiscoveryCustomEvent) firstDiscoEvt).customMessage();
if (discoveryCustomMessage instanceof DynamicCacheChangeBatch) {
if (exchActions != null) {
Set<String> caches = exchActions.cachesToResetLostPartitions();
if (!F.isEmpty(caches))
resetLostPartitions(caches, true);
assignPartitionsStates();
}
}
else if (discoveryCustomMessage instanceof SnapshotDiscoveryMessage
&& ((SnapshotDiscoveryMessage)discoveryCustomMessage).needAssignPartitions())
assignPartitionsStates();
}
else {
if (exchCtx.events().hasServerLeft())
detectLostPartitions(resTopVer, true);
if (exchCtx.events().hasServerJoin())
assignPartitionsStates();
}
// Recalculate new affinity based on partitions availability.
if (!exchCtx.mergeExchanges() && forceAffReassignment)
idealAffDiff = cctx.affinity().onCustomEventWithEnforcedAffinityReassignment(this);
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (!grpCtx.isLocal())
grpCtx.topology().applyUpdateCounters();
}
updateLastVersion(cctx.versions().last());
cctx.versions().onExchange(lastVer.get().order());
IgniteProductVersion minVer = exchCtx.events().discoveryCache().minimumNodeVersion();
time = System.currentTimeMillis();
GridDhtPartitionsFullMessage msg = createPartitionsMessage(true,
minVer.compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0);
if (exchCtx.mergeExchanges()) {
assert !centralizedAff;
msg.resultTopologyVersion(resTopVer);
if (exchCtx.events().hasServerLeft())
msg.idealAffinityDiff(idealAffDiff);
}
else if (forceAffReassignment)
msg.idealAffinityDiff(idealAffDiff);
msg.prepareMarshal(cctx);
if (log.isInfoEnabled())
log.info("Preparing Full Message performed in " + (System.currentTimeMillis() - time) + " ms.");
synchronized (mux) {
finishState = new FinishState(crd.id(), resTopVer, msg);
state = ExchangeLocalState.DONE;
}
if (centralizedAff) {
assert !exchCtx.mergeExchanges();
time = System.currentTimeMillis();
IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut = cctx.affinity().initAffinityOnNodeLeft(this);
if (!fut.isDone()) {
fut.listen(new IgniteInClosure<IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>>>() {
@Override public void apply(IgniteInternalFuture<Map<Integer, Map<Integer, List<UUID>>>> fut) {
onAffinityInitialized(fut);
}
});
}
else
onAffinityInitialized(fut);
if (log.isInfoEnabled())
log.info("Centralized affinity changes are performed in " + (System.currentTimeMillis() - time) + " ms.");
}
else {
Set<ClusterNode> nodes;
Map<UUID, GridDhtPartitionsSingleMessage> mergedJoinExchMsgs0;
synchronized (mux) {
srvNodes.remove(cctx.localNode());
nodes = new LinkedHashSet<>(srvNodes);
mergedJoinExchMsgs0 = mergedJoinExchMsgs;
if (mergedJoinExchMsgs != null) {
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : mergedJoinExchMsgs.entrySet()) {
if (e.getValue() != null) {
ClusterNode node = cctx.discovery().node(e.getKey());
if (node != null)
nodes.add(node);
}
}
}
if (!F.isEmpty(sndResNodes))
nodes.addAll(sndResNodes);
}
if (!nodes.isEmpty())
sendAllPartitions(msg, nodes, mergedJoinExchMsgs0, joinedNodeAff);
partitionsSent = true;
if (!stateChangeExchange())
onDone(exchCtx.events().topologyVersion(), null);
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> e : pendingSingleMsgs.entrySet()) {
if (log.isInfoEnabled()) {
log.info("Process pending message on coordinator [node=" + e.getKey() +
", ver=" + initialVersion() +
", resVer=" + resTopVer + ']');
}
processSingleMessage(e.getKey(), e.getValue());
}
}
if (stateChangeExchange()) {
StateChangeRequest req = exchActions.stateChangeRequest();
assert req != null : exchActions;
boolean stateChangeErr = false;
if (!F.isEmpty(exchangeGlobalExceptions)) {
stateChangeErr = true;
cctx.kernalContext().state().onStateChangeError(exchangeGlobalExceptions, req);
}
else {
boolean hasMoving = !partsToReload.isEmpty();
Set<Integer> waitGrps = cctx.affinity().waitGroups();
if (!hasMoving) {
for (CacheGroupContext grpCtx : cctx.cache().cacheGroups()) {
if (waitGrps.contains(grpCtx.groupId()) && grpCtx.topology().hasMovingPartitions()) {
hasMoving = true;
break;
}
}
}
cctx.kernalContext().state().onExchangeFinishedOnCoordinator(this, hasMoving);
}
boolean active = !stateChangeErr && req.activate();
ChangeGlobalStateFinishMessage stateFinishMsg = new ChangeGlobalStateFinishMessage(
req.requestId(),
active,
!stateChangeErr);
cctx.discovery().sendCustomEvent(stateFinishMsg);
if (!centralizedAff)
onDone(exchCtx.events().topologyVersion(), null);
}
}
catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else
onDone(e);
}
}
/**
* Collects non local cache group descriptors.
*
* @return Collection of non local cache group descriptors.
*/
private List<CacheGroupDescriptor> nonLocalCacheGroupDescriptors() {
return cctx.affinity().cacheGroups().values().stream()
.filter(grpDesc -> grpDesc.config().getCacheMode() != CacheMode.LOCAL)
.collect(Collectors.toList());
}
/**
* Collects non local cache groups.
*
* @return Collection of non local cache groups.
*/
private List<CacheGroupContext> nonLocalCacheGroups() {
return cctx.cache().cacheGroups().stream()
.filter(grp -> !grp.isLocal() && !cacheGroupStopping(grp.groupId()))
.collect(Collectors.toList());
}
/**
* Validates that partition update counters and cache sizes for all caches are consistent.
*/
private void validatePartitionsState() {
long time = System.currentTimeMillis();
try {
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
nonLocalCacheGroupDescriptors(),
grpDesc -> {
CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
GridDhtPartitionTopology top = grpCtx != null
? grpCtx.topology()
: cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
// Do not validate read or write through caches or caches with disabled rebalance
// or ExpiryPolicy is set or validation is disabled.
if (grpCtx == null
|| grpCtx.config().isReadThrough()
|| grpCtx.config().isWriteThrough()
|| grpCtx.config().getCacheStoreFactory() != null
|| grpCtx.config().getRebalanceDelay() == -1
|| grpCtx.config().getRebalanceMode() == CacheRebalanceMode.NONE
|| grpCtx.config().getExpiryPolicyFactory() == null
|| SKIP_PARTITION_SIZE_VALIDATION)
return null;
try {
validator.validatePartitionCountersAndSizes(GridDhtPartitionsExchangeFuture.this, top, msgs);
}
catch (IgniteCheckedException ex) {
log.warning("Partition states validation has failed for group: " + grpCtx.cacheOrGroupName() + ". " + ex.getMessage());
// TODO: Handle such errors https://issues.apache.org/jira/browse/IGNITE-7833
}
return null;
}
);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to validate partitions state", e);
}
if (log.isInfoEnabled())
log.info("Partitions validation performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
*
*/
private void assignPartitionsStates() {
long time = System.currentTimeMillis();
try {
U.doInParallel(
cctx.kernalContext().getSystemExecutorService(),
nonLocalCacheGroupDescriptors(),
grpDesc -> {
CacheGroupContext grpCtx = cctx.cache().cacheGroup(grpDesc.groupId());
GridDhtPartitionTopology top = grpCtx != null
? grpCtx.topology()
: cctx.exchange().clientTopology(grpDesc.groupId(), events().discoveryCache());
if (!CU.isPersistentCache(grpDesc.config(), cctx.gridConfig().getDataStorageConfiguration()))
assignPartitionSizes(top);
else
assignPartitionStates(top);
return null;
}
);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to assign partition states", e);
}
if (log.isInfoEnabled())
log.info("Partitions assignment performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
* Removes gaps in the local update counters. Gaps in update counters are possible on backup node when primary
* failed to send update counter deltas to backup.
*/
private void finalizePartitionCounters() {
int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
// Reserve at least 2 threads for system operations.
parallelismLvl = Math.max(1, parallelismLvl - 2);
long time = System.currentTimeMillis();
try {
U.<CacheGroupContext, Void>doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
nonLocalCacheGroups(),
grp -> {
grp.topology().finalizeUpdateCounters();
return null;
}
);
}
catch (IgniteCheckedException e) {
throw new IgniteException("Failed to finalize partition counters", e);
}
if (log.isInfoEnabled())
log.info("Partition counters finalization performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
* @param finishState State.
* @param msg Request.
* @param nodeId Node ID.
*/
private void sendAllPartitionsToNode(FinishState finishState, GridDhtPartitionsSingleMessage msg, UUID nodeId) {
ClusterNode node = cctx.node(nodeId);
if (node == null) {
if (log.isDebugEnabled())
log.debug("Failed to send partitions, node failed: " + nodeId);
return;
}
GridDhtPartitionsFullMessage fullMsg = finishState.msg.copy();
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
if (affReq != null) {
Map<Integer, CacheGroupAffinityMessage> aff = CacheGroupAffinityMessage.createAffinityMessages(
cctx,
finishState.resTopVer,
affReq,
null);
fullMsg.joinedNodeAffinity(aff);
}
if (!fullMsg.exchangeId().equals(msg.exchangeId())) {
fullMsg = fullMsg.copy();
fullMsg.exchangeId(msg.exchangeId());
}
try {
cctx.io().send(node, fullMsg, SYSTEM_POOL);
if (log.isTraceEnabled()) {
log.trace("Full message was sent to node: " +
node +
", fullMsg: " + fullMsg
);
}
}
catch (ClusterTopologyCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send partitions, node failed: " + node);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send partitions [node=" + node + ']', e);
}
}
/**
* @param node Sender node.
* @param msg Full partition info.
*/
public void onReceiveFullMessage(final ClusterNode node, final GridDhtPartitionsFullMessage msg) {
assert msg != null;
assert msg.exchangeId() != null : msg;
assert !node.isDaemon() : node;
initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> f) {
try {
if (!f.get())
return;
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to initialize exchange future: " + this, e);
return;
}
processFullMessage(true, node, msg);
}
});
}
/**
* @param node Sender node.
* @param msg Message with full partition info.
*/
public void onReceivePartitionRequest(final ClusterNode node, final GridDhtPartitionsSingleRequest msg) {
assert !cctx.kernalContext().clientNode() || msg.restoreState();
assert !node.isDaemon() && !node.isClient() : node;
initFut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
processSinglePartitionRequest(node, msg);
}
});
}
/**
* @param node Sender node.
* @param msg Message.
*/
private void processSinglePartitionRequest(ClusterNode node, GridDhtPartitionsSingleRequest msg) {
FinishState finishState0 = null;
synchronized (mux) {
if (crd == null) {
if (log.isInfoEnabled())
log.info("Ignore partitions request, no coordinator [node=" + node.id() + ']');
return;
}
switch (state) {
case DONE: {
assert finishState != null;
if (node.id().equals(finishState.crdId)) {
if (log.isInfoEnabled())
log.info("Ignore partitions request, finished exchange with this coordinator: " + msg);
return;
}
finishState0 = finishState;
break;
}
case CRD:
case BECOME_CRD: {
if (log.isInfoEnabled())
log.info("Ignore partitions request, node is coordinator: " + msg);
return;
}
case CLIENT:
case SRV: {
if (!cctx.discovery().alive(node)) {
if (log.isInfoEnabled())
log.info("Ignore partitions request, node is not alive [node=" + node.id() + ']');
return;
}
if (msg.restoreState()) {
if (!node.equals(crd)) {
if (node.order() > crd.order()) {
if (log.isInfoEnabled()) {
log.info("Received partitions request, change coordinator [oldCrd=" + crd.id() +
", newCrd=" + node.id() + ']');
}
crd = node; // Do not allow to process FullMessage from old coordinator.
}
else {
if (log.isInfoEnabled()) {
log.info("Ignore restore state request, coordinator changed [oldCrd=" + crd.id() +
", newCrd=" + node.id() + ']');
}
return;
}
}
}
break;
}
default:
assert false : state;
}
}
if (msg.restoreState()) {
try {
assert msg.restoreExchangeId() != null : msg;
GridDhtPartitionsSingleMessage res;
if (dynamicCacheStartExchange() && exchangeLocE != null) {
res = new GridDhtPartitionsSingleMessage(msg.restoreExchangeId(),
cctx.kernalContext().clientNode(),
cctx.versions().last(),
true);
res.setError(exchangeLocE);
}
else {
res = cctx.exchange().createPartitionsSingleMessage(
msg.restoreExchangeId(),
cctx.kernalContext().clientNode(),
true,
node.version().compareToIgnoreTimestamp(PARTIAL_COUNTERS_MAP_SINCE) >= 0,
exchActions);
if (localJoinExchange() && finishState0 == null)
res.cacheGroupsAffinityRequest(exchCtx.groupsAffinityRequestOnJoin());
}
res.restoreState(true);
if (log.isInfoEnabled()) {
log.info("Send restore state response [node=" + node.id() +
", exchVer=" + msg.restoreExchangeId().topologyVersion() +
", hasState=" + (finishState0 != null) +
", affReq=" + !F.isEmpty(res.cacheGroupsAffinityRequest()) + ']');
}
res.finishMessage(finishState0 != null ? finishState0.msg : null);
cctx.io().send(node, res, SYSTEM_POOL);
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
log.debug("Node left during partition exchange [nodeId=" + node.id() + ", exchId=" + exchId + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send partitions message [node=" + node + ", msg=" + msg + ']', e);
}
return;
}
try {
sendLocalPartitions(node);
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to send message to coordinator: " + e);
}
}
/**
* @param checkCrd If {@code true} checks that local node is exchange coordinator.
* @param node Sender node.
* @param msg Message.
*/
private void processFullMessage(boolean checkCrd, ClusterNode node, GridDhtPartitionsFullMessage msg) {
try {
assert exchId.equals(msg.exchangeId()) : msg;
assert msg.lastVersion() != null : msg;
if (checkCrd) {
assert node != null;
synchronized (mux) {
if (crd == null) {
if (log.isInfoEnabled())
log.info("Ignore full message, all server nodes left: " + msg);
return;
}
switch (state) {
case CRD:
case BECOME_CRD: {
if (log.isInfoEnabled())
log.info("Ignore full message, node is coordinator: " + msg);
return;
}
case DONE: {
if (log.isInfoEnabled())
log.info("Ignore full message, future is done: " + msg);
return;
}
case SRV:
case CLIENT: {
if (!crd.equals(node)) {
if (log.isInfoEnabled()) {
log.info("Received full message from non-coordinator [node=" + node.id() +
", nodeOrder=" + node.order() +
", crd=" + crd.id() +
", crdOrder=" + crd.order() + ']');
}
if (node.order() > crd.order())
fullMsgs.put(node, msg);
return;
}
else {
if (!F.isEmpty(msg.getErrorsMap())) {
Exception e = msg.getErrorsMap().get(cctx.localNodeId());
if (e instanceof IgniteNeedReconnectException) {
onDone(e);
return;
}
}
AffinityTopologyVersion resVer = msg.resultTopologyVersion() != null ? msg.resultTopologyVersion() : initialVersion();
if (log.isInfoEnabled()) {
log.info("Received full message, will finish exchange [node=" + node.id() +
", resVer=" + resVer + ']');
}
finishState = new FinishState(crd.id(), resVer, msg);
state = ExchangeLocalState.DONE;
break;
}
}
}
}
}
else
assert node == null : node;
AffinityTopologyVersion resTopVer = initialVersion();
long time = System.currentTimeMillis();
if (exchCtx.mergeExchanges()) {
if (msg.resultTopologyVersion() != null && !initialVersion().equals(msg.resultTopologyVersion())) {
if (log.isInfoEnabled()) {
log.info("Received full message, need merge [curFut=" + initialVersion() +
", resVer=" + msg.resultTopologyVersion() + ']');
}
resTopVer = msg.resultTopologyVersion();
if (cctx.exchange().mergeExchanges(this, msg)) {
assert cctx.kernalContext().isStopping();
return; // Node is stopping, no need to further process exchange.
}
assert resTopVer.equals(exchCtx.events().topologyVersion()) : "Unexpected result version [" +
"msgVer=" + resTopVer +
", locVer=" + exchCtx.events().topologyVersion() + ']';
}
exchCtx.events().processEvents(this);
if (localJoinExchange())
cctx.affinity().onLocalJoin(this, msg, resTopVer);
else {
if (exchCtx.events().hasServerLeft())
cctx.affinity().applyAffinityFromFullMessage(this, msg);
else
cctx.affinity().onServerJoinWithExchangeMergeProtocol(this, false);
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
if (grp.isLocal() || cacheGroupStopping(grp.groupId()))
continue;
grp.topology().beforeExchange(this, true, false);
}
}
}
else if (localJoinExchange() && !exchCtx.fetchAffinityOnJoin())
cctx.affinity().onLocalJoin(this, msg, resTopVer);
else if (forceAffReassignment)
cctx.affinity().applyAffinityFromFullMessage(this, msg);
if (log.isInfoEnabled())
log.info("Affinity changes applied in " + (System.currentTimeMillis() - time) + " ms.");
if (dynamicCacheStartExchange() && !F.isEmpty(exchangeGlobalExceptions)) {
assert cctx.localNode().isClient();
// TODO: https://issues.apache.org/jira/browse/IGNITE-8796
// The current exchange has been successfully completed on all server nodes,
// but has failed on that client node for some reason.
// It looks like that we need to rollback dynamically started caches on the client node,
// complete DynamicCacheStartFutures (if they are registered) with the cause of that failure
// and complete current exchange without errors.
onDone(exchangeLocE);
return;
}
updatePartitionFullMap(resTopVer, msg);
if (stateChangeExchange() && !F.isEmpty(msg.getErrorsMap()))
cctx.kernalContext().state().onStateChangeError(msg.getErrorsMap(), exchActions.stateChangeRequest());
onDone(resTopVer, null);
}
catch (IgniteCheckedException e) {
onDone(e);
}
}
/**
* Updates partition map in all caches.
*
* @param resTopVer Result topology version.
* @param msg Partitions full messages.
*/
private void updatePartitionFullMap(AffinityTopologyVersion resTopVer, GridDhtPartitionsFullMessage msg) {
cctx.versions().onExchange(msg.lastVersion().order());
assert partHistSuppliers.isEmpty();
partHistSuppliers.putAll(msg.partitionHistorySuppliers());
long time = System.currentTimeMillis();
int parallelismLvl = cctx.kernalContext().config().getSystemThreadPoolSize();
// Reserve at least 2 threads for system operations.
parallelismLvl = Math.max(1, parallelismLvl - 2);
try {
doInParallel(
parallelismLvl,
cctx.kernalContext().getSystemExecutorService(),
msg.partitions().keySet(), grpId -> {
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
if (grp != null) {
CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
grp.topology().partitions());
grp.topology().update(resTopVer,
msg.partitions().get(grpId),
cntrMap,
msg.partsToReload(cctx.localNodeId(), grpId),
msg.partitionSizes(grpId),
null);
}
else {
ClusterNode oldest = cctx.discovery().oldestAliveServerNode(AffinityTopologyVersion.NONE);
if (oldest != null && oldest.isLocal()) {
GridDhtPartitionTopology top = cctx.exchange().clientTopology(grpId, events().discoveryCache());
CachePartitionFullCountersMap cntrMap = msg.partitionUpdateCounters(grpId,
top.partitions());
top.update(resTopVer,
msg.partitions().get(grpId),
cntrMap,
Collections.emptySet(),
null,
null);
}
}
return null;
});
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);
}
partitionsReceived = true;
if (log.isInfoEnabled())
log.info("Full map updating for " + msg.partitions().size()
+ " groups performed in " + (System.currentTimeMillis() - time) + " ms.");
}
/**
* Updates partition map in all caches.
*
* @param nodeId Node message received from.
* @param msg Partitions single message.
*/
private void updatePartitionSingleMap(UUID nodeId, GridDhtPartitionsSingleMessage msg) {
msgs.put(nodeId, msg);
for (Map.Entry<Integer, GridDhtPartitionMap> entry : msg.partitions().entrySet()) {
Integer grpId = entry.getKey();
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
GridDhtPartitionTopology top = grp != null ? grp.topology() :
cctx.exchange().clientTopology(grpId, events().discoveryCache());
top.update(exchId, entry.getValue(), false);
}
}
/**
* Cache change failure message callback, processed from the discovery thread.
*
* @param node Message sender node.
* @param msg Failure message.
*/
public void onDynamicCacheChangeFail(final ClusterNode node, final DynamicCacheChangeFailureMessage msg) {
assert exchId.equals(msg.exchangeId()) : msg;
assert firstDiscoEvt.type() == EVT_DISCOVERY_CUSTOM_EVT && dynamicCacheStartExchange();
final ExchangeActions actions = exchangeActions();
onDiscoveryEvent(new IgniteRunnable() {
@Override public void run() {
// The rollbackExchange() method has to wait for checkpoint.
// That operation is time consumed, and therefore it should be executed outside the discovery thread.
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
@Override public void run() {
if (isDone() || !enterBusy())
return;
try {
assert msg.error() != null: msg;
// Try to revert all the changes that were done during initialization phase
cctx.affinity().forceCloseCaches(
GridDhtPartitionsExchangeFuture.this,
crd.isLocal(),
msg.exchangeActions()
);
synchronized (mux) {
finishState = new FinishState(crd.id(), initialVersion(), null);
state = ExchangeLocalState.DONE;
}
if (actions != null)
actions.completeRequestFutures(cctx, msg.error());
onDone(exchId.topologyVersion());
}
catch (Throwable e) {
onDone(e);
}
finally {
leaveBusy();
}
}
});
}
});
}
/**
* Affinity change message callback, processed from the same thread as {@link #onNodeLeft}.
*
* @param node Message sender node.
* @param msg Message.
*/
public void onAffinityChangeMessage(final ClusterNode node, final CacheAffinityChangeMessage msg) {
assert exchId.equals(msg.exchangeId()) : msg;
onDiscoveryEvent(new IgniteRunnable() {
@Override public void run() {
if (isDone() || !enterBusy())
return;
try {
assert centralizedAff;
if (crd.equals(node)) {
AffinityTopologyVersion resTopVer = initialVersion();
cctx.affinity().onExchangeChangeAffinityMessage(GridDhtPartitionsExchangeFuture.this,
crd.isLocal(),
msg);
IgniteCheckedException err = !F.isEmpty(msg.partitionsMessage().getErrorsMap()) ?
new IgniteCheckedException("Cluster state change failed.") : null;
if (!crd.isLocal()) {
GridDhtPartitionsFullMessage partsMsg = msg.partitionsMessage();
assert partsMsg != null : msg;
assert partsMsg.lastVersion() != null : partsMsg;
updatePartitionFullMap(resTopVer, partsMsg);
if (exchActions != null && exchActions.stateChangeRequest() != null && err != null)
cctx.kernalContext().state().onStateChangeError(msg.partitionsMessage().getErrorsMap(), exchActions.stateChangeRequest());
}
onDone(resTopVer, err);
}
else {
if (log.isDebugEnabled()) {
log.debug("Ignore affinity change message, coordinator changed [node=" + node.id() +
", crd=" + crd.id() +
", msg=" + msg +
']');
}
}
}
finally {
leaveBusy();
}
}
});
}
/**
* @param c Closure.
*/
private void onDiscoveryEvent(IgniteRunnable c) {
synchronized (discoEvts) {
if (!init) {
discoEvts.add(c);
return;
}
assert discoEvts.isEmpty() : discoEvts;
}
c.run();
}
/**
* Moves exchange future to state 'init done' using {@link #initFut}.
*/
private void initDone() {
while (!isDone()) {
List<IgniteRunnable> evts;
synchronized (discoEvts) {
if (discoEvts.isEmpty()) {
init = true;
break;
}
evts = new ArrayList<>(discoEvts);
discoEvts.clear();
}
for (IgniteRunnable c : evts)
c.run();
}
initFut.onDone(true);
}
/**
*
*/
private void onAllServersLeft() {
assert cctx.kernalContext().clientNode() : cctx.localNode();
List<ClusterNode> empty = Collections.emptyList();
for (CacheGroupContext grp : cctx.cache().cacheGroups()) {
List<List<ClusterNode>> affAssignment = new ArrayList<>(grp.affinity().partitions());
for (int i = 0; i < grp.affinity().partitions(); i++)
affAssignment.add(empty);
grp.affinity().idealAssignment(affAssignment);
grp.affinity().initialize(initialVersion(), affAssignment);
cctx.exchange().exchangerUpdateHeartbeat();
}
}
/**
* Node left callback, processed from the same thread as {@link #onAffinityChangeMessage}.
*
* @param node Left node.
*/
public void onNodeLeft(final ClusterNode node) {
if (isDone() || !enterBusy())
return;
cctx.mvcc().removeExplicitNodeLocks(node.id(), initialVersion());
try {
onDiscoveryEvent(new IgniteRunnable() {
@Override public void run() {
if (isDone() || !enterBusy())
return;
try {
boolean crdChanged = false;
boolean allReceived = false;
ClusterNode crd0;
events().discoveryCache().updateAlives(node);
InitNewCoordinatorFuture newCrdFut0;
synchronized (mux) {
newCrdFut0 = newCrdFut;
}
if (newCrdFut0 != null)
newCrdFut0.onNodeLeft(node.id());
synchronized (mux) {
if (!srvNodes.remove(node))
return;
boolean rmvd = remaining.remove(node.id());
if (!rmvd) {
if (mergedJoinExchMsgs != null && mergedJoinExchMsgs.containsKey(node.id())) {
if (mergedJoinExchMsgs.get(node.id()) == null) {
mergedJoinExchMsgs.remove(node.id());
rmvd = true;
}
}
}
if (node.equals(crd)) {
crdChanged = true;
crd = !srvNodes.isEmpty() ? srvNodes.get(0) : null;
}
switch (state) {
case DONE:
return;
case CRD:
allReceived = rmvd && (remaining.isEmpty() && F.isEmpty(mergedJoinExchMsgs));
break;
case SRV:
assert crd != null;
if (crdChanged && crd.isLocal()) {
state = ExchangeLocalState.BECOME_CRD;
newCrdFut = new InitNewCoordinatorFuture(cctx);
}
break;
}
crd0 = crd;
if (crd0 == null)
finishState = new FinishState(null, initialVersion(), null);
}
if (crd0 == null) {
onAllServersLeft();
onDone(initialVersion());
return;
}
if (crd0.isLocal()) {
if (stateChangeExchange() && exchangeLocE != null)
exchangeGlobalExceptions.put(crd0.id(), exchangeLocE);
if (crdChanged) {
if (log.isInfoEnabled()) {
log.info("Coordinator failed, node is new coordinator [ver=" + initialVersion() +
", prev=" + node.id() + ']');
}
assert newCrdFut != null;
cctx.kernalContext().closure().callLocal(new Callable<Void>() {
@Override public Void call() throws Exception {
newCrdFut.init(GridDhtPartitionsExchangeFuture.this);
newCrdFut.listen(new CI1<IgniteInternalFuture>() {
@Override public void apply(IgniteInternalFuture fut) {
if (isDone())
return;
Lock lock = cctx.io().readLock();
if (lock == null)
return;
try {
onBecomeCoordinator((InitNewCoordinatorFuture) fut);
}
finally {
lock.unlock();
}
}
});
return null;
}
}, GridIoPolicy.SYSTEM_POOL);
return;
}
if (allReceived) {
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
@Override public void run() {
awaitSingleMapUpdates();
onAllReceived(null);
}
});
}
}
else {
if (crdChanged) {
for (Map.Entry<ClusterNode, GridDhtPartitionsFullMessage> m : fullMsgs.entrySet()) {
if (crd0.equals(m.getKey())) {
if (log.isInfoEnabled()) {
log.info("Coordinator changed, process pending full message [" +
"ver=" + initialVersion() +
", crd=" + node.id() +
", pendingMsgNode=" + m.getKey() + ']');
}
processFullMessage(true, m.getKey(), m.getValue());
if (isDone())
return;
}
}
if (log.isInfoEnabled()) {
log.info("Coordinator changed, send partitions to new coordinator [" +
"ver=" + initialVersion() +
", crd=" + node.id() +
", newCrd=" + crd0.id() + ']');
}
final ClusterNode newCrd = crd0;
cctx.kernalContext().getSystemExecutorService().submit(new Runnable() {
@Override public void run() {
sendPartitions(newCrd);
}
});
}
}
}
catch (IgniteCheckedException e) {
if (reconnectOnError(e))
onDone(new IgniteNeedReconnectException(cctx.localNode(), e));
else
U.error(log, "Failed to process node left event: " + e, e);
}
finally {
leaveBusy();
}
}
});
}
finally {
leaveBusy();
}
}
/**
* @param newCrdFut Coordinator initialization future.
*/
private void onBecomeCoordinator(InitNewCoordinatorFuture newCrdFut) {
boolean allRcvd = false;
cctx.exchange().onCoordinatorInitialized();
if (newCrdFut.restoreState()) {
GridDhtPartitionsFullMessage fullMsg = newCrdFut.fullMessage();
assert msgs.isEmpty() : msgs;
if (fullMsg != null) {
if (log.isInfoEnabled()) {
log.info("New coordinator restored state [ver=" + initialVersion() +
", resVer=" + fullMsg.resultTopologyVersion() + ']');
}
synchronized (mux) {
state = ExchangeLocalState.DONE;
finishState = new FinishState(crd.id(), fullMsg.resultTopologyVersion(), fullMsg);
}
fullMsg.exchangeId(exchId);
processFullMessage(false, null, fullMsg);
Map<ClusterNode, GridDhtPartitionsSingleMessage> msgs = newCrdFut.messages();
if (!F.isEmpty(msgs)) {
Map<Integer, CacheGroupAffinityMessage> joinedNodeAff = null;
for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : msgs.entrySet()) {
this.msgs.put(e.getKey().id(), e.getValue());
GridDhtPartitionsSingleMessage msg = e.getValue();
Collection<Integer> affReq = msg.cacheGroupsAffinityRequest();
if (!F.isEmpty(affReq)) {
joinedNodeAff = CacheGroupAffinityMessage.createAffinityMessages(cctx,
fullMsg.resultTopologyVersion(),
affReq,
joinedNodeAff);
}
}
Map<UUID, GridDhtPartitionsSingleMessage> mergedJoins = newCrdFut.mergedJoinExchangeMessages();
if (log.isInfoEnabled()) {
log.info("New coordinator sends full message [ver=" + initialVersion() +
", resVer=" + fullMsg.resultTopologyVersion() +
", nodes=" + F.nodeIds(msgs.keySet()) +
", mergedJoins=" + (mergedJoins != null ? mergedJoins.keySet() : null) + ']');
}
sendAllPartitions(fullMsg, msgs.keySet(), mergedJoins, joinedNodeAff);
}
return;
}
else {
if (log.isInfoEnabled())
log.info("New coordinator restore state finished [ver=" + initialVersion() + ']');
for (Map.Entry<ClusterNode, GridDhtPartitionsSingleMessage> e : newCrdFut.messages().entrySet()) {
GridDhtPartitionsSingleMessage msg = e.getValue();
if (!msg.client()) {
msgs.put(e.getKey().id(), e.getValue());
if (dynamicCacheStartExchange() && msg.getError() != null)
exchangeGlobalExceptions.put(e.getKey().id(), msg.getError());
updatePartitionSingleMap(e.getKey().id(), msg);
}
}
}
allRcvd = true;
synchronized (mux) {
remaining.clear(); // Do not process messages.
assert crd != null && crd.isLocal();
state = ExchangeLocalState.CRD;
assert mergedJoinExchMsgs == null;
}
}
else {
Set<UUID> remaining0 = null;
synchronized (mux) {
assert crd != null && crd.isLocal();
state = ExchangeLocalState.CRD;
assert mergedJoinExchMsgs == null;
if (log.isInfoEnabled()) {
log.info("New coordinator initialization finished [ver=" + initialVersion() +
", remaining=" + remaining + ']');
}
if (!remaining.isEmpty())
remaining0 = new HashSet<>(remaining);
}
if (remaining0 != null) {
// It is possible that some nodes finished exchange with previous coordinator.
GridDhtPartitionsSingleRequest req = new GridDhtPartitionsSingleRequest(exchId);
for (UUID nodeId : remaining0) {
try {
if (!pendingSingleMsgs.containsKey(nodeId)) {
if (log.isInfoEnabled()) {
log.info("New coordinator sends request [ver=" + initialVersion() +
", node=" + nodeId + ']');
}
cctx.io().send(nodeId, req, SYSTEM_POOL);
}
}
catch (ClusterTopologyCheckedException ignored) {
if (log.isDebugEnabled())
log.debug("Node left during partition exchange [nodeId=" + nodeId +
", exchId=" + exchId + ']');
}
catch (IgniteCheckedException e) {
U.error(log, "Failed to request partitions from node: " + nodeId, e);
}
}
for (Map.Entry<UUID, GridDhtPartitionsSingleMessage> m : pendingSingleMsgs.entrySet()) {
if (log.isInfoEnabled()) {
log.info("New coordinator process pending message [ver=" + initialVersion() +
", node=" + m.getKey() + ']');
}
processSingleMessage(m.getKey(), m.getValue());
}
}
}
if (allRcvd) {
awaitSingleMapUpdates();
onAllReceived(newCrdFut.messages().keySet());
}
}
/**
* @param e Exception.
* @return {@code True} if local node should try reconnect in case of error.
*/
public boolean reconnectOnError(Throwable e) {
return (e instanceof IgniteNeedReconnectException
|| X.hasCause(e, IOException.class, IgniteClientDisconnectedCheckedException.class))
&& cctx.discovery().reconnectSupported();
}
/**
* @return {@code True} If partition changes triggered by receiving Single/Full messages are not finished yet.
*/
public boolean partitionChangesInProgress() {
ClusterNode crd0 = crd;
if (crd0 == null)
return false;
return crd0.equals(cctx.localNode()) ? !partitionsSent : !partitionsReceived;
}
/**
* Add or merge updates received from coordinator while exchange in progress.
*
* @param fullMsg Full message with exchangeId = null.
* @return {@code True} if message should be ignored and processed after exchange is done.
*/
public synchronized boolean addOrMergeDelayedFullMessage(ClusterNode node, GridDhtPartitionsFullMessage fullMsg) {
assert fullMsg.exchangeId() == null : fullMsg.exchangeId();
if (isDone())
return false;
GridDhtPartitionsFullMessage prev = delayedLatestMsg;
if (prev == null) {
delayedLatestMsg = fullMsg;
listen(f -> {
GridDhtPartitionsFullMessage msg;
synchronized (this) {
msg = delayedLatestMsg;
delayedLatestMsg = null;
}
if (msg != null)
cctx.exchange().processFullPartitionUpdate(node, msg);
});
}
else
delayedLatestMsg.merge(fullMsg, cctx.discovery());
return true;
}
/** {@inheritDoc} */
@Override public int compareTo(GridDhtPartitionsExchangeFuture fut) {
return exchId.compareTo(fut.exchId);
}
/** {@inheritDoc} */
@Override public boolean equals(Object o) {
if (this == o)
return true;
if (o == null || o.getClass() != getClass())
return false;
GridDhtPartitionsExchangeFuture fut = (GridDhtPartitionsExchangeFuture)o;
return exchId.equals(fut.exchId);
}
/** {@inheritDoc} */
@Override public int hashCode() {
return exchId.hashCode();
}
/** {@inheritDoc} */
@Override public void addDiagnosticRequest(IgniteDiagnosticPrepareContext diagCtx) {
if (!isDone()) {
ClusterNode crd;
Set<UUID> remaining;
synchronized (mux) {
crd = this.crd;
remaining = new HashSet<>(this.remaining);
}
if (crd != null) {
if (!crd.isLocal()) {
diagCtx.exchangeInfo(crd.id(), initialVersion(), "Exchange future waiting for coordinator " +
"response [crd=" + crd.id() + ", topVer=" + initialVersion() + ']');
}
else if (!remaining.isEmpty()){
UUID nodeId = remaining.iterator().next();
diagCtx.exchangeInfo(nodeId, initialVersion(), "Exchange future on coordinator waiting for " +
"server response [node=" + nodeId + ", topVer=" + initialVersion() + ']');
}
}
}
}
/**
* @return Short information string.
*/
public String shortInfo() {
return "GridDhtPartitionsExchangeFuture [topVer=" + initialVersion() +
", evt=" + (firstDiscoEvt != null ? IgniteUtils.gridEventName(firstDiscoEvt.type()) : -1) +
", evtNode=" + (firstDiscoEvt != null ? firstDiscoEvt.eventNode() : null) +
", done=" + isDone() + ']';
}
/** {@inheritDoc} */
@Override public String toString() {
Set<UUID> remaining;
synchronized (mux) {
remaining = new HashSet<>(this.remaining);
}
return S.toString(GridDhtPartitionsExchangeFuture.class, this,
"evtLatch", evtLatch == null ? "null" : evtLatch.getCount(),
"remaining", remaining,
"super", super.toString());
}
/**
*
*/
private static class CounterWithNodes {
/** */
private final long cnt;
/** */
private final long size;
/** */
private final Set<UUID> nodes = new HashSet<>();
/**
* @param cnt Count.
* @param firstNode Node ID.
*/
private CounterWithNodes(long cnt, @Nullable Long size, UUID firstNode) {
this.cnt = cnt;
this.size = size != null ? size : 0;
nodes.add(firstNode);
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CounterWithNodes.class, this);
}
}
/**
* @param step Exponent coefficient.
* @param timeout Base timeout.
* @return Time to wait before next debug dump.
*/
public static long nextDumpTimeout(int step, long timeout) {
long limit = getLong(IGNITE_LONG_OPERATIONS_DUMP_TIMEOUT_LIMIT, 30 * 60_000);
if (limit <= 0)
limit = 30 * 60_000;
assert step >= 0 : step;
long dumpFactor = Math.round(Math.pow(2, step));
long nextTimeout = timeout * dumpFactor;
if (nextTimeout <= 0)
return limit;
return nextTimeout <= limit ? nextTimeout : limit;
}
/**
*
*/
private static class FinishState {
/** */
private final UUID crdId;
/** */
private final AffinityTopologyVersion resTopVer;
/** */
private final GridDhtPartitionsFullMessage msg;
/**
* @param crdId Coordinator node.
* @param resTopVer Result version.
* @param msg Result message.
*/
FinishState(UUID crdId, AffinityTopologyVersion resTopVer, GridDhtPartitionsFullMessage msg) {
this.crdId = crdId;
this.resTopVer = resTopVer;
this.msg = msg;
}
}
/**
*
*/
enum ExchangeType {
/** */
CLIENT,
/** */
ALL,
/** */
NONE
}
/**
*
*/
private enum ExchangeLocalState {
/** Local node is coordinator. */
CRD,
/** Local node is non-coordinator server. */
SRV,
/** Local node is client node. */
CLIENT,
/**
* Previous coordinator failed before exchange finished and
* local performs initialization to become new coordinator.
*/
BECOME_CRD,
/** Exchange finished. */
DONE,
/** This exchange was merged with another one. */
MERGED
}
}