blob: d4264d45412c839b8f86818e064de5b61526b31f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteSystemProperties;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.failure.FailureContext;
import org.apache.ignite.failure.FailureType;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheMvccEntryInfo;
import org.apache.ignite.internal.processors.cache.IgniteRebalanceIterator;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUpdateVersionAware;
import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils;
import org.apache.ignite.internal.processors.cache.mvcc.MvccVersionAware;
import org.apache.ignite.internal.processors.cache.mvcc.txlog.TxState;
import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T3;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.IgniteSpiException;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_MISSED;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_SUPPLIED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
* Class for supplying partitions to demanding nodes.
*/
public class GridDhtPartitionSupplier {
/** */
private final CacheGroupContext grp;
/** */
private final IgniteLogger log;
/** */
private GridDhtPartitionTopology top;
/** Supply context map. T3: nodeId, topicId, topVer. */
private final Map<T3<UUID, Integer, AffinityTopologyVersion>, SupplyContext> scMap = new HashMap<>();
/**
* Override for rebalance throttle.
* @deprecated Use {@link IgniteConfiguration#getRebalanceThrottle()} instead.
*/
@Deprecated
private long rebalanceThrottleOverride =
IgniteSystemProperties.getLong(IgniteSystemProperties.IGNITE_REBALANCE_THROTTLE_OVERRIDE, 0);
/**
* @param grp Cache group.
*/
GridDhtPartitionSupplier(CacheGroupContext grp) {
assert grp != null;
this.grp = grp;
log = grp.shared().logger(getClass());
top = grp.topology();
if (rebalanceThrottleOverride > 0)
LT.info(log, "Using rebalance throttle override: " + rebalanceThrottleOverride);
}
/**
* Clears all supply contexts in case of node stopping.
*/
void stop() {
synchronized (scMap) {
Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
while (it.hasNext()) {
T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
clearContext(scMap.get(t), log);
it.remove();
}
}
}
/**
* Clears supply context.
*
* @param sc Supply context.
* @param log Logger.
*/
private static void clearContext(SupplyContext sc, IgniteLogger log) {
if (sc != null) {
final IgniteRebalanceIterator it = sc.iterator;
if (it != null && !it.isClosed()) {
try {
it.close();
}
catch (IgniteCheckedException e) {
U.error(log, "Iterator close failed.", e);
}
}
}
}
/**
* Handle topology change and clear supply context map of outdated contexts.
*/
void onTopologyChanged() {
synchronized (scMap) {
Iterator<T3<UUID, Integer, AffinityTopologyVersion>> it = scMap.keySet().iterator();
Collection<UUID> aliveNodes = grp.shared().discovery().aliveServerNodes().stream()
.map(ClusterNode::id)
.collect(Collectors.toList());
while (it.hasNext()) {
T3<UUID, Integer, AffinityTopologyVersion> t = it.next();
if (!aliveNodes.contains(t.get1())) { // Clear all obsolete contexts.
clearContext(scMap.get(t), log);
it.remove();
if (log.isDebugEnabled())
log.debug("Supply context removed [grp=" + grp.cacheOrGroupName() + ", demander=" + t.get1() + "]");
}
}
}
}
/**
* Check is cache having any active context for supply.
*
* @return True if a node supplies a cache to some other, otherwise is false.
*/
public boolean isSupply() {
return !F.isEmpty(scMap);
}
/**
* For each demand message method lookups (or creates new) supply context and starts to iterate entries across requested partitions.
* Each entry in iterator is placed to prepared supply message.
*
* If supply message size in bytes becomes greater than {@link IgniteConfiguration#getRebalanceBatchSize()}
* method sends this message to demand node and saves partial state of iterated entries to supply context,
* then restores the context again after new demand message with the same context id is arrived.
*
* @param topicId Id of the topic is used for the supply-demand communication.
* @param nodeId Id of the node which sent the demand message.
* @param demandMsg Demand message.
*/
public void handleDemandMessage(int topicId, UUID nodeId, GridDhtPartitionDemandMessage demandMsg) {
assert demandMsg != null;
assert nodeId != null;
T3<UUID, Integer, AffinityTopologyVersion> contextId = new T3<>(nodeId, topicId, demandMsg.topologyVersion());
if (demandMsg.rebalanceId() < 0) { // Demand node requested context cleanup.
synchronized (scMap) {
SupplyContext sctx = scMap.get(contextId);
if (sctx != null && sctx.rebalanceId == -demandMsg.rebalanceId()) {
clearContext(scMap.remove(contextId), log);
if (log.isDebugEnabled())
log.debug("Supply context cleaned [" + supplyRoutineInfo(topicId, nodeId, demandMsg)
+ ", supplyContext=" + sctx + "]");
}
else {
if (log.isDebugEnabled())
log.debug("Stale supply context cleanup message [" + supplyRoutineInfo(topicId, nodeId, demandMsg)
+ ", supplyContext=" + sctx + "]");
}
return;
}
}
ClusterNode demanderNode = grp.shared().discovery().node(nodeId);
if (demanderNode == null) {
if (log.isDebugEnabled())
log.debug("Demand message rejected (demander left cluster) ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
return;
}
IgniteRebalanceIterator iter = null;
SupplyContext sctx = null;
Set<Integer> remainingParts = null;
GridDhtPartitionSupplyMessage supplyMsg = new GridDhtPartitionSupplyMessage(
demandMsg.rebalanceId(),
grp.groupId(),
demandMsg.topologyVersion(),
grp.deploymentEnabled()
);
try {
synchronized (scMap) {
sctx = scMap.remove(contextId);
if (sctx != null && demandMsg.rebalanceId() < sctx.rebalanceId) {
// Stale message, return context back and return.
scMap.put(contextId, sctx);
if (log.isDebugEnabled())
log.debug("Stale demand message [" + supplyRoutineInfo(topicId, nodeId, demandMsg) +
", actualContext=" + sctx + "]");
return;
}
}
// Demand request should not contain empty partitions if no supply context is associated with it.
if (sctx == null && (demandMsg.partitions() == null || demandMsg.partitions().isEmpty())) {
if (log.isDebugEnabled())
log.debug("Empty demand message (no context and partitions) ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
return;
}
if (log.isDebugEnabled())
log.debug("Demand message accepted ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
assert !(sctx != null && !demandMsg.partitions().isEmpty());
long maxBatchesCnt = /* Each thread should gain prefetched batches. */
grp.preloader().batchesPrefetchCount() * grp.shared().gridConfig().getRebalanceThreadPoolSize();
if (sctx == null) {
if (log.isDebugEnabled())
log.debug("Starting supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) +
", fullPartitions=" + S.compact(demandMsg.partitions().fullSet()) +
", histPartitions=" + S.compact(demandMsg.partitions().historicalSet()) + "]");
}
else
maxBatchesCnt = 1;
if (sctx == null || sctx.iterator == null) {
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
CachePartitionPartialCountersMap histMap = demandMsg.partitions().historicalMap();
for (int i = 0; i < histMap.size(); i++) {
int p = histMap.partitionAt(i);
remainingParts.add(p);
}
iter = grp.offheap().rebalanceIterator(demandMsg.partitions(), demandMsg.topologyVersion());
for (Integer part : demandMsg.partitions().fullSet()) {
if (iter.isPartitionMissing(part))
continue;
GridDhtLocalPartition loc = top.localPartition(part, demandMsg.topologyVersion(), false);
assert loc != null && loc.state() == GridDhtPartitionState.OWNING
: "Partition should be in OWNING state: " + loc;
supplyMsg.addEstimatedKeysCount(loc.dataStore().fullSize());
}
for (int i = 0; i < histMap.size(); i++) {
int p = histMap.partitionAt(i);
if (iter.isPartitionMissing(p))
continue;
supplyMsg.addEstimatedKeysCount(histMap.updateCounterAt(i) - histMap.initialUpdateCounterAt(i));
}
}
else {
iter = sctx.iterator;
remainingParts = sctx.remainingParts;
}
final int msgMaxSize = grp.preloader().batchSize();
long batchesCnt = 0;
CacheDataRow prevRow = null;
while (iter.hasNext()) {
CacheDataRow row = iter.peek();
// Prevent mvcc entry history splitting into separate batches.
boolean canFlushHistory = !grp.mvccEnabled() ||
prevRow != null && ((grp.sharedGroup() && row.cacheId() != prevRow.cacheId()) ||
!row.key().equals(prevRow.key()));
if (canFlushHistory && supplyMsg.messageSize() >= msgMaxSize) {
if (++batchesCnt >= maxBatchesCnt) {
saveSupplyContext(contextId,
iter,
remainingParts,
demandMsg.rebalanceId()
);
reply(topicId, demanderNode, demandMsg, supplyMsg, contextId);
return;
}
else {
if (!reply(topicId, demanderNode, demandMsg, supplyMsg, contextId))
return;
supplyMsg = new GridDhtPartitionSupplyMessage(demandMsg.rebalanceId(),
grp.groupId(),
demandMsg.topologyVersion(),
grp.deploymentEnabled());
}
}
row = iter.next();
prevRow = row;
int part = row.partition();
GridDhtLocalPartition loc = top.localPartition(part, demandMsg.topologyVersion(), false);
assert (loc != null && loc.state() == OWNING && loc.reservations() > 0) || iter.isPartitionMissing(part)
: "Partition should be in OWNING state and has at least 1 reservation " + loc;
if (iter.isPartitionMissing(part) && remainingParts.contains(part)) {
supplyMsg.missed(part);
remainingParts.remove(part);
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
grp.addRebalanceMissEvent(part);
if (log.isDebugEnabled())
log.debug("Requested partition is marked as missing ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ", p=" + part + "]");
continue;
}
if (!remainingParts.contains(part))
continue;
GridCacheEntryInfo info = extractEntryInfo(row);
if (info == null)
continue;
supplyMsg.addEntry0(part, iter.historical(part), info, grp.shared(), grp.cacheObjectContext());
if (iter.isPartitionDone(part)) {
supplyMsg.last(part, loc.updateCounter());
remainingParts.remove(part);
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
grp.addRebalanceSupplyEvent(part);
}
}
Iterator<Integer> remainingIter = remainingParts.iterator();
while (remainingIter.hasNext()) {
int p = remainingIter.next();
if (iter.isPartitionDone(p)) {
GridDhtLocalPartition loc = top.localPartition(p, demandMsg.topologyVersion(), false);
assert loc != null
: "Supply partition is gone: grp=" + grp.cacheOrGroupName() + ", p=" + p;
supplyMsg.last(p, loc.updateCounter());
remainingIter.remove();
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_SUPPLIED))
grp.addRebalanceSupplyEvent(p);
}
else if (iter.isPartitionMissing(p)) {
supplyMsg.missed(p);
remainingIter.remove();
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_MISSED))
grp.addRebalanceMissEvent(p);
}
}
assert remainingParts.isEmpty()
: "Partitions after rebalance should be either done or missing: " + remainingParts;
if (sctx != null)
clearContext(sctx, log);
else
iter.close();
reply(topicId, demanderNode, demandMsg, supplyMsg, contextId);
if (log.isInfoEnabled())
log.info("Finished supplying rebalancing [" + supplyRoutineInfo(topicId, nodeId, demandMsg) + "]");
}
catch (Throwable t) {
if (iter != null && !iter.isClosed()) {
try {
iter.close();
}
catch (IgniteCheckedException e) {
t.addSuppressed(e);
}
}
if (grp.shared().kernalContext().isStopping())
return;
// Sending supply messages with error requires new protocol.
boolean sendErrMsg = demanderNode.version().compareTo(GridDhtPartitionSupplyMessageV2.AVAILABLE_SINCE) >= 0;
if (t instanceof IgniteSpiException) {
if (log.isDebugEnabled())
log.debug("Failed to send message to node (current node is stopping?) ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ", msg=" + t.getMessage() + ']');
sendErrMsg = false;
}
else
U.error(log, "Failed to continue supplying ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t);
try {
if (sctx != null)
clearContext(sctx, log);
}
catch (Throwable t1) {
U.error(log, "Failed to cleanup supplying context ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
}
if (!sendErrMsg)
return;
boolean fallbackToFullRebalance = X.hasCause(t, IgniteHistoricalIteratorException.class);
try {
GridDhtPartitionSupplyMessage errMsg;
if (fallbackToFullRebalance) {
// Mark the last checkpoint as not applicable for WAL rebalance.
grp.shared().database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
// Mark all remaining partitions as missed to trigger full rebalance.
if (iter == null && F.isEmpty(remainingParts)) {
remainingParts = new HashSet<>(demandMsg.partitions().fullSet());
remainingParts.addAll(demandMsg.partitions().historicalSet());
}
for (int p : Optional.ofNullable(remainingParts).orElseGet(Collections::emptySet))
supplyMsg.missed(p);
errMsg = supplyMsg;
}
else {
errMsg = new GridDhtPartitionSupplyMessageV2(
demandMsg.rebalanceId(),
grp.groupId(),
demandMsg.topologyVersion(),
grp.deploymentEnabled(),
t
);
}
reply(topicId, demanderNode, demandMsg, errMsg, contextId);
}
catch (Throwable t1) {
U.error(log, "Failed to send supply error message ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t1);
}
// If fallback to full rebalance is possible then let's try to switch to it
// instead of triggering failure handler.
if (!fallbackToFullRebalance) {
grp.shared().kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR,
new IgniteCheckedException("Failed to continue supplying ["
+ supplyRoutineInfo(topicId, nodeId, demandMsg) + ']', t)
));
}
}
}
/**
* Extracts entry info from row.
*
* @param row Cache data row.
* @return Entry info.
*/
private GridCacheEntryInfo extractEntryInfo(CacheDataRow row) {
GridCacheEntryInfo info = grp.mvccEnabled() ?
new GridCacheMvccEntryInfo() : new GridCacheEntryInfo();
info.key(row.key());
info.cacheId(row.cacheId());
if (grp.mvccEnabled()) {
assert row.mvccCoordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA;
// Rows from rebalance iterator have actual states already.
if (row.mvccTxState() != TxState.COMMITTED)
return null;
((MvccVersionAware)info).mvccVersion(row);
((GridCacheMvccEntryInfo)info).mvccTxState(TxState.COMMITTED);
if (row.newMvccCoordinatorVersion() != MvccUtils.MVCC_CRD_COUNTER_NA &&
row.newMvccTxState() == TxState.COMMITTED) {
((MvccUpdateVersionAware)info).newMvccVersion(row);
((GridCacheMvccEntryInfo)info).newMvccTxState(TxState.COMMITTED);
}
}
info.value(row.value());
info.version(row.version());
info.expireTime(row.expireTime());
return info;
}
/**
* Sends supply message to demand node.
*
* @param demander Recipient of supply message.
* @param demandMsg Demand message.
* @param supplyMsg Supply message.
* @param ctxId Supply context id.
* @return {@code True} if message was sent, {@code false} if recipient left grid.
* @throws IgniteCheckedException If failed.
*/
private boolean reply(
int topicId,
ClusterNode demander,
GridDhtPartitionDemandMessage demandMsg,
GridDhtPartitionSupplyMessage supplyMsg,
T3<UUID, Integer, AffinityTopologyVersion> ctxId
) throws IgniteCheckedException {
try {
if (log.isDebugEnabled())
log.debug("Send next supply message [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
grp.shared().io().sendOrderedMessage(demander, demandMsg.topic(), supplyMsg, grp.ioPolicy(), demandMsg.timeout());
// Throttle preloading.
if (rebalanceThrottleOverride > 0)
U.sleep(rebalanceThrottleOverride);
else if (grp.preloader().throttle() > 0)
U.sleep(grp.preloader().throttle());
return true;
}
catch (ClusterTopologyCheckedException ignore) {
if (log.isDebugEnabled())
log.debug("Failed to send supply message (demander left): [" + supplyRoutineInfo(topicId, demander.id(), demandMsg) + "]");
synchronized (scMap) {
clearContext(scMap.remove(ctxId), log);
}
return false;
}
}
/**
* String representation of supply routine.
*
* @param topicId Topic id.
* @param demander Demander.
* @param demandMsg Demand message.
*/
private String supplyRoutineInfo(int topicId, UUID demander, GridDhtPartitionDemandMessage demandMsg) {
return "grp=" + grp.cacheOrGroupName() +
", demander=" + demander +
", topVer=" + demandMsg.topologyVersion() +
(topicId > 0 ? ", topic=" + topicId : "");
}
/**
* Saves supply context with given parameters to {@code scMap}.
*
* @param ctxId Supply context id.
* @param entryIt Entries rebalance iterator.
* @param remainingParts Set of partitions that weren't sent yet.
* @param rebalanceId Rebalance id.
*/
private void saveSupplyContext(
T3<UUID, Integer, AffinityTopologyVersion> ctxId,
IgniteRebalanceIterator entryIt,
Set<Integer> remainingParts,
long rebalanceId
) {
synchronized (scMap) {
assert scMap.get(ctxId) == null;
scMap.put(ctxId, new SupplyContext(entryIt, remainingParts, rebalanceId));
}
}
/**
* Supply context.
*/
private static class SupplyContext {
/** Entries iterator. */
@GridToStringExclude
private final IgniteRebalanceIterator iterator;
/** Set of partitions which weren't sent yet. */
private final Set<Integer> remainingParts;
/** Rebalance id. */
private final long rebalanceId;
/**
* Constructor.
*
* @param iterator Entries rebalance iterator.
* @param remainingParts Set of partitions which weren't sent yet.
* @param rebalanceId Rebalance id.
*/
SupplyContext(IgniteRebalanceIterator iterator, Set<Integer> remainingParts, long rebalanceId) {
this.iterator = iterator;
this.remainingParts = remainingParts;
this.rebalanceId = rebalanceId;
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(SupplyContext.class, this);
}
}
}