blob: a6808c73577e1443758fd2388cef7110c66271b2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.CacheRebalanceMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryInfoCollection;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.GridCachePartitionExchangeManager;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObjectAdapter;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.CI2;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.spi.IgniteSpiException;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_LOADED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STOPPED;
import static org.apache.ignite.internal.GridTopic.TOPIC_CACHE;
import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
/**
* Thread pool for requesting partitions from other nodes and populating local cache.
*/
@SuppressWarnings("NonConstantFieldWithUpperCaseName")
public class GridDhtPartitionDemander {
/** */
private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
/** Preload predicate. */
private IgnitePredicate<GridCacheEntryInfo> preloadPred;
/** Future for preload mode {@link CacheRebalanceMode#SYNC}. */
@GridToStringInclude
private final GridFutureAdapter syncFut = new GridFutureAdapter();
/** Rebalance future. */
@GridToStringInclude
private volatile RebalanceFuture rebalanceFut;
/** Last timeout object. */
private AtomicReference<GridTimeoutObject> lastTimeoutObj = new AtomicReference<>();
/** Last exchange future. */
private volatile GridDhtPartitionsExchangeFuture lastExchangeFut;
/** Demand lock. */
@Deprecated//Backward compatibility. To be removed in future.
private final ReadWriteLock demandLock;
/** DemandWorker index. */
@Deprecated//Backward compatibility. To be removed in future.
private final AtomicInteger dmIdx = new AtomicInteger();
/** DemandWorker. */
@Deprecated//Backward compatibility. To be removed in future.
private volatile DemandWorker worker;
/** Cached rebalance topics. */
private final Map<Integer, Object> rebalanceTopics;
/**
* Started event sent.
* Make sense for replicated cache only.
*/
private final AtomicBoolean startedEvtSent = new AtomicBoolean();
/**
* Stopped event sent.
* Make sense for replicated cache only.
*/
private final AtomicBoolean stoppedEvtSent = new AtomicBoolean();
/**
* @param cctx Cctx.
* @param demandLock Demand lock.
*/
public GridDhtPartitionDemander(GridCacheContext<?, ?> cctx, ReadWriteLock demandLock) {
assert cctx != null;
this.cctx = cctx;
this.demandLock = demandLock;
log = cctx.logger(getClass());
boolean enabled = cctx.rebalanceEnabled() && !cctx.kernalContext().clientNode();
rebalanceFut = new RebalanceFuture();//Dummy.
if (!enabled) {
// Calling onDone() immediately since preloading is disabled.
rebalanceFut.onDone(true);
syncFut.onDone();
}
Map<Integer, Object> tops = new HashMap<>();
for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++)
tops.put(idx, GridCachePartitionExchangeManager.rebalanceTopic(idx));
rebalanceTopics = tops;
}
/**
* Start.
*/
void start() {
// No-op.
}
/**
* Stop.
*/
void stop() {
try {
rebalanceFut.cancel();
}
catch (Exception ex) {
rebalanceFut.onDone(false);
}
DemandWorker dw = worker;
if (dw != null)
dw.cancel();
lastExchangeFut = null;
lastTimeoutObj.set(null);
}
/**
* @return Future for {@link CacheRebalanceMode#SYNC} mode.
*/
IgniteInternalFuture<?> syncFuture() {
return syncFut;
}
/**
* @return Rebalance future.
*/
IgniteInternalFuture<Boolean> rebalanceFuture() {
return rebalanceFut;
}
/**
* Sets preload predicate for demand pool.
*
* @param preloadPred Preload predicate.
*/
void preloadPredicate(IgnitePredicate<GridCacheEntryInfo> preloadPred) {
this.preloadPred = preloadPred;
}
/**
* Force preload.
*/
void forcePreload() {
GridTimeoutObject obj = lastTimeoutObj.getAndSet(null);
if (obj != null)
cctx.time().removeTimeoutObject(obj);
final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
if (exchFut != null) {
if (log.isDebugEnabled())
log.debug("Forcing rebalance event for future: " + exchFut);
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.shared().exchange().forcePreloadExchange(exchFut);
}
});
}
else if (log.isDebugEnabled())
log.debug("Ignoring force rebalance request (no topology event happened yet).");
}
/**
* @param fut Future.
* @return {@code True} if topology changed.
*/
private boolean topologyChanged(RebalanceFuture fut) {
return
!cctx.affinity().affinityTopologyVersion().equals(fut.topologyVersion()) || // Topology already changed.
fut != rebalanceFut; // Same topology, but dummy exchange forced because of missing partitions.
}
/**
* @param part Partition.
* @param type Type.
* @param discoEvt Discovery event.
*/
private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
assert discoEvt != null;
cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
/**
* Sets last exchange future.
*
* @param lastFut Last future to set.
*/
void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
lastExchangeFut = lastFut;
}
/**
* @param assigns Assignments.
* @param force {@code True} if dummy reassign.
* @param cnt Counter.
* @param next Runnable responsible for cache rebalancing start.
* @return Rebalancing runnable.
*/
Runnable addAssignments(final GridDhtPreloaderAssignments assigns,
boolean force,
int cnt,
final Runnable next) {
if (log.isDebugEnabled())
log.debug("Adding partition assignments: " + assigns);
long delay = cctx.config().getRebalanceDelay();
if (delay == 0 || force) {
final RebalanceFuture oldFut = rebalanceFut;
final RebalanceFuture fut = new RebalanceFuture(assigns, cctx, log, startedEvtSent, stoppedEvtSent, cnt);
if (!oldFut.isInitial())
oldFut.cancel();
else {
fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
oldFut.onDone(fut.result());
}
});
}
rebalanceFut = fut;
fut.sendRebalanceStartedEvent();
if (assigns.cancelled()) { // Pending exchange.
if (log.isDebugEnabled())
log.debug("Rebalancing skipped due to cancelled assignments.");
fut.onDone(false);
fut.sendRebalanceFinishedEvent();
return null;
}
if (assigns.isEmpty()) { // Nothing to rebalance.
if (log.isDebugEnabled())
log.debug("Rebalancing skipped due to empty assignments.");
fut.onDone(true);
((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
fut.sendRebalanceFinishedEvent();
return null;
}
return new Runnable() {
@Override public void run() {
try {
if (next != null)
fut.listen(new CI1<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> f) {
try {
if (f.get()) // Not cancelled.
next.run(); // Starts next cache rebalancing (according to the order).
}
catch (IgniteCheckedException ignored) {
if (log.isDebugEnabled())
log.debug(ignored.getMessage());
}
}
});
requestPartitions(fut, assigns);
}
catch (IgniteCheckedException e) {
ClusterTopologyCheckedException cause = e.getCause(ClusterTopologyCheckedException.class);
if (cause != null)
log.warning("Failed to send initial demand request to node. " + e.getMessage());
else
log.error("Failed to send initial demand request to node.", e);
fut.cancel();
}
catch (Throwable th) {
log.error("Runtime error caught during initial demand request sending.", th);
fut.cancel();
if (th instanceof Error)
throw th;
}
}
};
}
else if (delay > 0) {
GridTimeoutObject obj = lastTimeoutObj.get();
if (obj != null)
cctx.time().removeTimeoutObject(obj);
final GridDhtPartitionsExchangeFuture exchFut = lastExchangeFut;
assert exchFut != null : "Delaying rebalance process without topology event.";
obj = new GridTimeoutObjectAdapter(delay) {
@Override public void onTimeout() {
exchFut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> f) {
cctx.shared().exchange().forcePreloadExchange(exchFut);
}
});
}
};
lastTimeoutObj.set(obj);
cctx.time().addTimeoutObject(obj);
}
return null;
}
/**
* @param fut Future.
* @param assigns Assignments.
* @throws IgniteCheckedException If failed.
* @return Partitions were requested.
*/
private void requestPartitions(
RebalanceFuture fut,
GridDhtPreloaderAssignments assigns
) throws IgniteCheckedException {
if (topologyChanged(fut)) {
fut.cancel();
return;
}
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
final ClusterNode node = e.getKey();
GridDhtPartitionDemandMessage d = e.getValue();
fut.appendPartitions(node.id(), d.partitions()); //Future preparation.
}
for (Map.Entry<ClusterNode, GridDhtPartitionDemandMessage> e : assigns.entrySet()) {
final ClusterNode node = e.getKey();
final CacheConfiguration cfg = cctx.config();
final Collection<Integer> parts = fut.remaining.get(node.id()).get2();
GridDhtPartitionDemandMessage d = e.getValue();
//Check remote node rebalancing API version.
if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
U.log(log, "Starting rebalancing [mode=" + cfg.getRebalanceMode() +
", fromNode=" + node.id() + ", partitionsCount=" + parts.size() +
", topology=" + fut.topologyVersion() + ", updateSeq=" + fut.updateSeq + "]");
int lsnrCnt = cctx.gridConfig().getRebalanceThreadPoolSize();
List<Set<Integer>> sParts = new ArrayList<>(lsnrCnt);
for (int cnt = 0; cnt < lsnrCnt; cnt++)
sParts.add(new HashSet<Integer>());
Iterator<Integer> it = parts.iterator();
int cnt = 0;
while (it.hasNext())
sParts.get(cnt++ % lsnrCnt).add(it.next());
for (cnt = 0; cnt < lsnrCnt; cnt++) {
if (!sParts.get(cnt).isEmpty()) {
// Create copy.
GridDhtPartitionDemandMessage initD = new GridDhtPartitionDemandMessage(d, sParts.get(cnt));
initD.topic(rebalanceTopics.get(cnt));
initD.updateSequence(fut.updateSeq);
initD.timeout(cctx.config().getRebalanceTimeout());
synchronized (fut) {
if (!fut.isDone()) {
// Future can be already cancelled at this moment and all failovers happened.
// New requests will not be covered by failovers.
cctx.io().sendOrderedMessage(node,
rebalanceTopics.get(cnt), initD, cctx.ioPolicy(), initD.timeout());
}
}
if (log.isDebugEnabled())
log.debug("Requested rebalancing [from node=" + node.id() + ", listener index=" +
cnt + ", partitions count=" + sParts.get(cnt).size() +
" (" + partitionsList(sParts.get(cnt)) + ")]");
}
}
}
else {
U.log(log, "Starting rebalancing (old api) [cache=" + cctx.name() +
", mode=" + cfg.getRebalanceMode() +
", fromNode=" + node.id() +
", partitionsCount=" + parts.size() +
", topology=" + fut.topologyVersion() +
", updateSeq=" + fut.updateSeq + "]");
d.timeout(cctx.config().getRebalanceTimeout());
d.workerId(0);//old api support.
worker = new DemandWorker(dmIdx.incrementAndGet(), fut);
worker.run(node, d);
}
}
}
/**
* @param c Partitions.
* @return String representation of partitions list.
*/
private String partitionsList(Collection<Integer> c) {
List<Integer> s = new ArrayList<>(c);
Collections.sort(s);
StringBuilder sb = new StringBuilder();
int start = -1;
int prev = -1;
Iterator<Integer> sit = s.iterator();
while (sit.hasNext()) {
int p = sit.next();
if (start == -1) {
start = p;
prev = p;
}
if (prev < p - 1) {
sb.append(start);
if (start != prev)
sb.append("-").append(prev);
sb.append(", ");
start = p;
}
if (!sit.hasNext()) {
sb.append(start);
if (start != p)
sb.append("-").append(p);
}
prev = p;
}
return sb.toString();
}
/**
* @param idx Index.
* @param id Node id.
* @param supply Supply.
*/
public void handleSupplyMessage(
int idx,
final UUID id,
final GridDhtPartitionSupplyMessageV2 supply
) {
AffinityTopologyVersion topVer = supply.topologyVersion();
final RebalanceFuture fut = rebalanceFut;
ClusterNode node = cctx.node(id);
if (node == null)
return;
if (!fut.isActual(supply.updateSequence())) // Current future have another update sequence.
return; // Supple message based on another future.
if (topologyChanged(fut)) // Topology already changed (for the future that supply message based on).
return;
if (log.isDebugEnabled())
log.debug("Received supply message: " + supply);
// Check whether there were class loading errors on unmarshal
if (supply.classError() != null) {
U.warn(log, "Rebalancing from node cancelled [node=" + id +
"]. Class got undeployed during preloading: " + supply.classError());
fut.cancel(id);
return;
}
final GridDhtPartitionTopology top = cctx.dht().topology();
try {
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
int p = e.getKey();
if (cctx.affinity().localNode(p, topVer)) {
GridDhtLocalPartition part = top.localPartition(p, topVer, true);
assert part != null;
boolean last = supply.last().contains(p);
if (part.state() == MOVING) {
boolean reserved = part.reserve();
assert reserved : "Failed to reserve partition [gridName=" +
cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
part.lock();
try {
// Loop through all received entries and try to preload them.
for (GridCacheEntryInfo entry : e.getValue().infos()) {
if (!part.preloadingPermitted(entry.key(), entry.version())) {
if (log.isDebugEnabled())
log.debug("Preloading is not permitted for entry due to " +
"evictions [key=" + entry.key() +
", ver=" + entry.version() + ']');
continue;
}
if (!preloadEntry(node, p, entry, topVer)) {
if (log.isDebugEnabled())
log.debug("Got entries for invalid partition during " +
"preloading (will skip) [p=" + p + ", entry=" + entry + ']');
break;
}
}
// If message was last for this partition,
// then we take ownership.
if (last) {
top.own(part);
fut.partitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
}
}
finally {
part.unlock();
part.release();
}
}
else {
if (last)
fut.partitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
else {
fut.partitionDone(id, p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
}
}
// Only request partitions based on latest topology version.
for (Integer miss : supply.missed()) {
if (cctx.affinity().localNode(miss, topVer))
fut.partitionMissed(id, miss);
}
for (Integer miss : supply.missed())
fut.partitionDone(id, miss);
GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
supply.updateSequence(), supply.topologyVersion(), cctx.cacheId());
d.timeout(cctx.config().getRebalanceTimeout());
d.topic(rebalanceTopics.get(idx));
if (!topologyChanged(fut) && !fut.isDone()) {
// Send demand message.
cctx.io().sendOrderedMessage(node, rebalanceTopics.get(idx),
d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Node left during rebalancing [node=" + node.id() +
", msg=" + e.getMessage() + ']');
}
catch (IgniteSpiException e) {
if (log.isDebugEnabled())
log.debug("Failed to send message to node (current node is stopping?) [node=" + node.id() +
", msg=" + e.getMessage() + ']');
}
}
/**
* @param pick Node picked for preloading.
* @param p Partition.
* @param entry Preloaded entry.
* @param topVer Topology version.
* @return {@code False} if partition has become invalid during preloading.
* @throws IgniteInterruptedCheckedException If interrupted.
*/
private boolean preloadEntry(
ClusterNode pick,
int p,
GridCacheEntryInfo entry,
AffinityTopologyVersion topVer
) throws IgniteCheckedException {
try {
GridCacheEntryEx cached = null;
try {
cached = cctx.dht().entryEx(entry.key());
if (log.isDebugEnabled())
log.debug("Rebalancing key [key=" + entry.key() + ", part=" + p + ", node=" + pick.id() + ']');
if (cctx.dht().isIgfsDataCache() &&
cctx.dht().igfsDataSpaceUsed() > cctx.dht().igfsDataSpaceMax()) {
LT.error(log, null, "Failed to rebalance IGFS data cache (IGFS space size exceeded maximum " +
"value, will ignore rebalance entries)");
if (cached.markObsoleteIfEmpty(null))
cached.context().cache().removeEntry(cached);
return true;
}
if (preloadPred == null || preloadPred.apply(entry)) {
if (cached.initialValue(
entry.value(),
entry.version(),
entry.ttl(),
entry.expireTime(),
true,
topVer,
cctx.isDrEnabled() ? DR_PRELOAD : DR_NONE,
false
)) {
cctx.evicts().touch(cached, topVer); // Start tracking.
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED) && !cached.isInternal())
cctx.events().addEvent(cached.partition(), cached.key(), cctx.localNodeId(),
(IgniteUuid)null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, entry.value(), true, null,
false, null, null, null, true);
}
else {
if (cctx.isSwapOrOffheapEnabled())
cctx.evicts().touch(cached, topVer); // Start tracking.
if (log.isDebugEnabled())
log.debug("Rebalancing entry is already in cache (will ignore) [key=" + cached.key() +
", part=" + p + ']');
}
}
else if (log.isDebugEnabled())
log.debug("Rebalance predicate evaluated to false for entry (will ignore): " + entry);
}
catch (GridCacheEntryRemovedException ignored) {
if (log.isDebugEnabled())
log.debug("Entry has been concurrently removed while rebalancing (will ignore) [key=" +
cached.key() + ", part=" + p + ']');
}
catch (GridDhtInvalidPartitionException ignored) {
if (log.isDebugEnabled())
log.debug("Partition became invalid during rebalancing (will ignore): " + p);
return false;
}
}
catch (IgniteInterruptedCheckedException e) {
throw e;
}
catch (IgniteCheckedException e) {
throw new IgniteCheckedException("Failed to cache rebalanced entry (will stop rebalancing) [local=" +
cctx.nodeId() + ", node=" + pick.id() + ", key=" + entry.key() + ", part=" + p + ']', e);
}
return true;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(GridDhtPartitionDemander.class, this);
}
/**
*
*/
public static class RebalanceFuture extends GridFutureAdapter<Boolean> {
/** */
private static final long serialVersionUID = 1L;
/** Should EVT_CACHE_REBALANCE_STARTED event be sent or not. */
private final AtomicBoolean startedEvtSent;
/** Should EVT_CACHE_REBALANCE_STOPPED event be sent or not. */
private final AtomicBoolean stoppedEvtSent;
/** */
private final GridCacheContext<?, ?> cctx;
/** */
private final IgniteLogger log;
/** Remaining. T2: startTime, partitions */
private final Map<UUID, T2<Long, Collection<Integer>>> remaining = new HashMap<>();
/** Missed. */
private final Map<UUID, Collection<Integer>> missed = new HashMap<>();
/** Exchange future. */
@GridToStringExclude
private final GridDhtPartitionsExchangeFuture exchFut;
/** Topology version. */
private final AffinityTopologyVersion topVer;
/** Unique (per demander) sequence id. */
private final long updateSeq;
/**
* @param assigns Assigns.
* @param cctx Context.
* @param log Logger.
* @param startedEvtSent Start event sent flag.
* @param stoppedEvtSent Stop event sent flag.
* @param updateSeq Update sequence.
*/
RebalanceFuture(GridDhtPreloaderAssignments assigns,
GridCacheContext<?, ?> cctx,
IgniteLogger log,
AtomicBoolean startedEvtSent,
AtomicBoolean stoppedEvtSent,
long updateSeq) {
assert assigns != null;
this.exchFut = assigns.exchangeFuture();
this.topVer = assigns.topologyVersion();
this.cctx = cctx;
this.log = log;
this.startedEvtSent = startedEvtSent;
this.stoppedEvtSent = stoppedEvtSent;
this.updateSeq = updateSeq;
}
/**
* Dummy future. Will be done by real one.
*/
public RebalanceFuture() {
this.exchFut = null;
this.topVer = null;
this.cctx = null;
this.log = null;
this.startedEvtSent = null;
this.stoppedEvtSent = null;
this.updateSeq = -1;
}
/**
* @return Topology version.
*/
public AffinityTopologyVersion topologyVersion() {
return topVer;
}
/**
* @param updateSeq Update sequence.
* @return true in case future created for specified updateSeq, false in other case.
*/
private boolean isActual(long updateSeq) {
return this.updateSeq == updateSeq;
}
/**
* @return Is initial (created at demander creation).
*/
private boolean isInitial() {
return topVer == null;
}
/**
* @param nodeId Node id.
* @param parts Parts.
*/
private void appendPartitions(UUID nodeId, Collection<Integer> parts) {
synchronized (this) {
assert parts != null : "Partitions are null [cache=" + cctx.name() + ", fromNode=" + nodeId + "]";
remaining.put(nodeId, new T2<>(U.currentTimeMillis(), parts));
}
}
/**
* Cancels this future.
*
* @return {@code True}.
*/
@Override public boolean cancel() {
synchronized (this) {
if (isDone())
return true;
U.log(log, "Cancelled rebalancing from all nodes [topology=" + topologyVersion() + ']');
if (!cctx.kernalContext().isStopping()) {
for (UUID nodeId : remaining.keySet())
cleanupRemoteContexts(nodeId);
}
remaining.clear();
checkIsDone(true /* cancelled */);
}
return true;
}
/**
* @param nodeId Node id.
*/
private void cancel(UUID nodeId) {
synchronized (this) {
if (isDone())
return;
U.log(log, ("Cancelled rebalancing [cache=" + cctx.name() +
", fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - remaining.get(nodeId).get1()) + " ms]"));
cleanupRemoteContexts(nodeId);
remaining.remove(nodeId);
onDone(false); // Finishing rebalance future as non completed.
checkIsDone(); // But will finish syncFuture only when other nodes are preloaded or rebalancing cancelled.
}
}
/**
* @param nodeId Node id.
* @param p P.
*/
private void partitionMissed(UUID nodeId, int p) {
synchronized (this) {
if (isDone())
return;
if (missed.get(nodeId) == null)
missed.put(nodeId, new HashSet<Integer>());
missed.get(nodeId).add(p);
}
}
/**
* @param nodeId Node id.
*/
private void cleanupRemoteContexts(UUID nodeId) {
ClusterNode node = cctx.discovery().node(nodeId);
if (node == null)
return;
//Check remote node rebalancing API version.
if (node.version().compareTo(GridDhtPreloader.REBALANCING_VER_2_SINCE) >= 0) {
GridDhtPartitionDemandMessage d = new GridDhtPartitionDemandMessage(
-1/* remove supply context signal */, this.topologyVersion(), cctx.cacheId());
d.timeout(cctx.config().getRebalanceTimeout());
try {
for (int idx = 0; idx < cctx.gridConfig().getRebalanceThreadPoolSize(); idx++) {
d.topic(GridCachePartitionExchangeManager.rebalanceTopic(idx));
cctx.io().sendOrderedMessage(node, GridCachePartitionExchangeManager.rebalanceTopic(idx),
d, cctx.ioPolicy(), cctx.config().getRebalanceTimeout());
}
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to send failover context cleanup request to node");
}
}
}
/**
* @param nodeId Node id.
* @param p P.
*/
private void partitionDone(UUID nodeId, int p) {
synchronized (this) {
if (isDone())
return;
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
exchFut.discoveryEvent());
T2<Long, Collection<Integer>> t = remaining.get(nodeId);
assert t != null : "Remaining not found [cache=" + cctx.name() + ", fromNode=" + nodeId +
", part=" + p + "]";
Collection<Integer> parts = t.get2();
boolean rmvd = parts.remove(p);
assert rmvd : "Partition already done [cache=" + cctx.name() + ", fromNode=" + nodeId +
", part=" + p + ", left=" + parts + "]";
if (parts.isEmpty()) {
U.log(log, "Completed " + ((remaining.size() == 1 ? "(final) " : "") +
"rebalancing [fromNode=" + nodeId + ", topology=" + topologyVersion() +
", time=" + (U.currentTimeMillis() - t.get1()) + " ms]"));
remaining.remove(nodeId);
}
checkIsDone();
}
}
/**
* @param part Partition.
* @param type Type.
* @param discoEvt Discovery event.
*/
private void preloadEvent(int part, int type, DiscoveryEvent discoEvt) {
assert discoEvt != null;
cctx.events().addPreloadEvent(part, type, discoEvt.eventNode(), discoEvt.type(), discoEvt.timestamp());
}
/**
* @param type Type.
* @param discoEvt Discovery event.
*/
private void preloadEvent(int type, DiscoveryEvent discoEvt) {
preloadEvent(-1, type, discoEvt);
}
/**
*
*/
private void checkIsDone() {
checkIsDone(false);
}
/**
* @param cancelled Is cancelled.
*/
private void checkIsDone(boolean cancelled) {
if (remaining.isEmpty()) {
sendRebalanceFinishedEvent();
if (log.isDebugEnabled())
log.debug("Completed rebalance future: " + this);
cctx.shared().exchange().scheduleResendPartitions();
Collection<Integer> m = new HashSet<>();
for (Map.Entry<UUID, Collection<Integer>> e : missed.entrySet()) {
if (e.getValue() != null && !e.getValue().isEmpty())
m.addAll(e.getValue());
}
if (!m.isEmpty()) {
U.log(log, ("Reassigning partitions that were missed: " + m));
onDone(false); //Finished but has missed partitions, will force dummy exchange
cctx.shared().exchange().forceDummyExchange(true, exchFut);
return;
}
if (!cancelled && !cctx.preloader().syncFuture().isDone())
((GridFutureAdapter)cctx.preloader().syncFuture()).onDone();
onDone(!cancelled);
}
}
/**
*
*/
private void sendRebalanceStartedEvent() {
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STARTED) &&
(!cctx.isReplicated() || !startedEvtSent.get())) {
preloadEvent(EVT_CACHE_REBALANCE_STARTED, exchFut.discoveryEvent());
startedEvtSent.set(true);
}
}
/**
*
*/
private void sendRebalanceFinishedEvent() {
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_STOPPED) &&
(!cctx.isReplicated() || !stoppedEvtSent.get())) {
preloadEvent(EVT_CACHE_REBALANCE_STOPPED, exchFut.discoveryEvent());
stoppedEvtSent.set(true);
}
}
/** {@inheritDoc} */
public String toString() {
return S.toString(RebalanceFuture.class, this);
}
}
/**
* Supply message wrapper.
*/
@Deprecated//Backward compatibility. To be removed in future.
private static class SupplyMessage {
/** Sender ID. */
private UUID sndId;
/** Supply message. */
private GridDhtPartitionSupplyMessage supply;
/**
* Dummy constructor.
*/
private SupplyMessage() {
// No-op.
}
/**
* @param sndId Sender ID.
* @param supply Supply message.
*/
SupplyMessage(UUID sndId, GridDhtPartitionSupplyMessage supply) {
this.sndId = sndId;
this.supply = supply;
}
/**
* @return Sender ID.
*/
UUID senderId() {
return sndId;
}
/**
* @return Message.
*/
GridDhtPartitionSupplyMessage supply() {
return supply;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SupplyMessage.class, this);
}
}
/**
*
*/
@Deprecated//Backward compatibility. To be removed in future.
private class DemandWorker {
/** Worker ID. */
private int id;
/** Partition-to-node assignments. */
private final LinkedBlockingDeque<GridDhtPreloaderAssignments> assignQ = new LinkedBlockingDeque<>();
/** Message queue. */
private final LinkedBlockingDeque<SupplyMessage> msgQ =
new LinkedBlockingDeque<>();
/** Counter. */
private long cntr;
/** Hide worker logger and use cache logger instead. */
private IgniteLogger log = GridDhtPartitionDemander.this.log;
/** */
private volatile RebalanceFuture fut;
/**
* @param id Worker ID.
* @param fut Rebalance future.
*/
private DemandWorker(int id, RebalanceFuture fut) {
assert id >= 0;
this.id = id;
this.fut = fut;
}
/**
* @param msg Message.
*/
private void addMessage(SupplyMessage msg) {
msgQ.offer(msg);
}
/**
* @param deque Deque to poll from.
* @param time Time to wait.
* @return Polled item.
* @throws InterruptedException If interrupted.
*/
@Nullable private <T> T poll(BlockingQueue<T> deque, long time) throws InterruptedException {
return deque.poll(time, MILLISECONDS);
}
/**
* @param idx Unique index for this topic.
* @return Topic for partition.
*/
public Object topic(long idx) {
return TOPIC_CACHE.topic(cctx.namexx(), cctx.nodeId(), id, idx);
}
/** */
public void cancel() {
msgQ.clear();
msgQ.offer(new SupplyMessage(null, null));
}
/**
* @param node Node to demand from.
* @param topVer Topology version.
* @param d Demand message.
* @param exchFut Exchange future.
* @throws InterruptedException If interrupted.
* @throws ClusterTopologyCheckedException If node left.
* @throws IgniteCheckedException If failed to send message.
*/
private void demandFromNode(
ClusterNode node,
final AffinityTopologyVersion topVer,
GridDhtPartitionDemandMessage d,
GridDhtPartitionsExchangeFuture exchFut
) throws InterruptedException, IgniteCheckedException {
GridDhtPartitionTopology top = cctx.dht().topology();
cntr++;
d.topic(topic(cntr));
d.workerId(id);
if (fut.isDone() || topologyChanged(fut))
return;
cctx.io().addOrderedHandler(d.topic(), new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@Override public void apply(UUID nodeId, GridDhtPartitionSupplyMessage msg) {
addMessage(new SupplyMessage(nodeId, msg));
}
});
try {
boolean retry;
// DoWhile.
// =======
do {
retry = false;
// Create copy.
d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
long timeout = cctx.config().getRebalanceTimeout();
d.timeout(timeout);
if (log.isDebugEnabled())
log.debug("Sending demand message [node=" + node.id() + ", demand=" + d + ']');
// Send demand message.
cctx.io().send(node, d, cctx.ioPolicy());
// While.
// =====
while (!fut.isDone() && !topologyChanged(fut)) {
SupplyMessage s = poll(msgQ, timeout);
// If timed out.
if (s == null) {
if (msgQ.isEmpty()) { // Safety check.
U.warn(log, "Timed out waiting for partitions to load, will retry in " + timeout +
" ms (you may need to increase 'networkTimeout' or 'rebalanceBatchSize'" +
" configuration properties).");
// Ordered listener was removed if timeout expired.
cctx.io().removeOrderedHandler(d.topic());
// Must create copy to be able to work with IO manager thread local caches.
d = new GridDhtPartitionDemandMessage(d, fut.remaining.get(node.id()).get2());
// Create new topic.
d.topic(topic(++cntr));
// Create new ordered listener.
cctx.io().addOrderedHandler(d.topic(),
new CI2<UUID, GridDhtPartitionSupplyMessage>() {
@Override public void apply(UUID nodeId,
GridDhtPartitionSupplyMessage msg) {
addMessage(new SupplyMessage(nodeId, msg));
}
});
// Resend message with larger timeout.
retry = true;
break; // While.
}
else
continue; // While.
}
if (s.senderId() == null)
return; // Stopping now.
// Check that message was received from expected node.
if (!s.senderId().equals(node.id())) {
U.warn(log, "Received supply message from unexpected node [expectedId=" + node.id() +
", rcvdId=" + s.senderId() + ", msg=" + s + ']');
continue; // While.
}
if (log.isDebugEnabled())
log.debug("Received supply message: " + s);
GridDhtPartitionSupplyMessage supply = s.supply();
// Check whether there were class loading errors on unmarshal
if (supply.classError() != null) {
if (log.isDebugEnabled())
log.debug("Class got undeployed during preloading: " + supply.classError());
retry = true;
// Quit preloading.
break;
}
// Preload.
for (Map.Entry<Integer, CacheEntryInfoCollection> e : supply.infos().entrySet()) {
int p = e.getKey();
if (cctx.affinity().localNode(p, topVer)) {
GridDhtLocalPartition part = top.localPartition(p, topVer, true);
assert part != null;
if (part.state() == MOVING) {
boolean reserved = part.reserve();
assert reserved : "Failed to reserve partition [gridName=" +
cctx.gridName() + ", cacheName=" + cctx.namex() + ", part=" + part + ']';
part.lock();
try {
Collection<Integer> invalidParts = new GridLeanSet<>();
// Loop through all received entries and try to preload them.
for (GridCacheEntryInfo entry : e.getValue().infos()) {
if (!invalidParts.contains(p)) {
if (!part.preloadingPermitted(entry.key(), entry.version())) {
if (log.isDebugEnabled())
log.debug("Preloading is not permitted for entry due to " +
"evictions [key=" + entry.key() +
", ver=" + entry.version() + ']');
continue;
}
if (!preloadEntry(node, p, entry, topVer)) {
invalidParts.add(p);
if (log.isDebugEnabled())
log.debug("Got entries for invalid partition during " +
"preloading (will skip) [p=" + p + ", entry=" + entry + ']');
}
}
}
boolean last = supply.last().contains(p);
// If message was last for this partition,
// then we take ownership.
if (last) {
fut.partitionDone(node.id(), p);
top.own(part);
if (log.isDebugEnabled())
log.debug("Finished rebalancing partition: " + part);
if (cctx.events().isRecordable(EVT_CACHE_REBALANCE_PART_LOADED))
preloadEvent(p, EVT_CACHE_REBALANCE_PART_LOADED,
exchFut.discoveryEvent());
}
}
finally {
part.unlock();
part.release();
}
}
else {
fut.partitionDone(node.id(), p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (state is not MOVING): " + part);
}
}
else {
fut.partitionDone(node.id(), p);
if (log.isDebugEnabled())
log.debug("Skipping rebalancing partition (it does not belong on current node): " + p);
}
}
// Only request partitions based on latest topology version.
for (Integer miss : s.supply().missed()) {
if (cctx.affinity().localNode(miss, topVer))
fut.partitionMissed(node.id(), miss);
}
for (Integer miss : s.supply().missed())
fut.partitionDone(node.id(), miss);
if (fut.remaining.get(node.id()) == null)
break; // While.
if (s.supply().ack()) {
retry = true;
break;
}
}
}
while (retry && !fut.isDone() && !topologyChanged(fut));
}
finally {
cctx.io().removeOrderedHandler(d.topic());
}
}
/**
* @param node Node.
* @param d D.
* @throws IgniteCheckedException If failed.
*/
public void run(ClusterNode node, GridDhtPartitionDemandMessage d) throws IgniteCheckedException {
demandLock.readLock().lock();
try {
GridDhtPartitionsExchangeFuture exchFut = fut.exchFut;
AffinityTopologyVersion topVer = fut.topVer;
try {
demandFromNode(node, topVer, d, exchFut);
}
catch (InterruptedException e) {
throw new IgniteCheckedException(e);
}
}
finally {
demandLock.readLock().unlock();
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(DemandWorker.class, this, "assignQ", assignQ, "msgQ", msgQ, "super", super.toString());
}
}
}