blob: 67ba01804aae181729e23c4d116b95ceb9fb20e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridNearAtomicAbstractUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridPlainRunnable;
import org.apache.ignite.internal.util.typedef.CI1;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_DATA_LOST;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_PART_UNLOADED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.EVICTED;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.RENTING;
/**
* DHT cache preloader.
*/
public class GridDhtPreloader extends GridCachePreloaderAdapter {
/** Default preload resend timeout. */
public static final long DFLT_PRELOAD_RESEND_TIMEOUT = 1500;
/** */
private GridDhtPartitionTopology top;
/** Partition suppliers. */
private GridDhtPartitionSupplier supplier;
/** Partition demanders. */
private GridDhtPartitionDemander demander;
/** Start future. */
private GridFutureAdapter<Object> startFut;
/** Busy lock to prevent activities from accessing exchanger while it's stopping. */
private final ReadWriteLock busyLock = new ReentrantReadWriteLock();
/** */
private boolean stopped;
/**
* @param grp Cache group.
*/
public GridDhtPreloader(CacheGroupContext grp) {
super(grp);
top = grp.topology();
startFut = new GridFutureAdapter<>();
}
/** {@inheritDoc} */
@Override public void start() {
if (log.isDebugEnabled())
log.debug("Starting DHT rebalancer...");
supplier = new GridDhtPartitionSupplier(grp);
demander = new GridDhtPartitionDemander(grp);
demander.start();
}
/** {@inheritDoc} */
@Override public void onKernalStop() {
if (log.isDebugEnabled())
log.debug("DHT rebalancer onKernalStop callback.");
pause();
try {
if (supplier != null)
supplier.stop();
if (demander != null)
demander.stop();
top = null;
stopped = true;
}
finally {
resume();
}
}
/**
* @return Node stop exception.
*/
private IgniteCheckedException stopError() {
return new NodeStoppingException("Operation has been cancelled (cache or node is stopping).");
}
/** {@inheritDoc} */
@Override public void onInitialExchangeComplete(@Nullable Throwable err) {
if (err == null)
startFut.onDone();
else
startFut.onDone(err);
}
/** {@inheritDoc} */
@Override public void onTopologyChanged(GridDhtPartitionsExchangeFuture lastFut) {
supplier.onTopologyChanged();
demander.onTopologyChanged(lastFut);
}
/** {@inheritDoc} */
@Override public boolean rebalanceRequired(AffinityTopologyVersion rebTopVer,
GridDhtPartitionsExchangeFuture exchFut) {
if (ctx.kernalContext().clientNode() || rebTopVer.equals(AffinityTopologyVersion.NONE))
return false; // No-op.
if (exchFut.resetLostPartitionFor(grp.cacheOrGroupName()))
return true;
if (exchFut.localJoinExchange())
return true; // Required, can have outdated updSeq partition counter if node reconnects.
if (!grp.affinity().cachedVersions().contains(rebTopVer)) {
assert rebTopVer.compareTo(grp.localStartVersion()) <= 0 :
"Empty history allowed only for newly started cache group [rebTopVer=" + rebTopVer +
", localStartTopVer=" + grp.localStartVersion() + ']';
return true; // Required, since no history info available.
}
final IgniteInternalFuture<Boolean> rebFut = rebalanceFuture();
if (rebFut.isDone() && !rebFut.result())
return true; // Required, previous rebalance cancelled.
AffinityTopologyVersion lastAffChangeTopVer =
ctx.exchange().lastAffinityChangedTopologyVersion(exchFut.topologyVersion());
return lastAffChangeTopVer.compareTo(rebTopVer) > 0;
}
/** {@inheritDoc} */
@Override public GridDhtPreloaderAssignments generateAssignments(GridDhtPartitionExchangeId exchId,
GridDhtPartitionsExchangeFuture exchFut) {
assert exchFut == null || exchFut.isDone();
// No assignments for disabled preloader.
GridDhtPartitionTopology top = grp.topology();
if (!grp.rebalanceEnabled())
return new GridDhtPreloaderAssignments(exchId, top.readyTopologyVersion());
int partitions = grp.affinity().partitions();
AffinityTopologyVersion topVer = top.readyTopologyVersion();
assert exchFut == null || exchFut.context().events().topologyVersion().equals(top.readyTopologyVersion()) :
"Topology version mismatch [exchId=" + exchId +
", grp=" + grp.name() +
", topVer=" + top.readyTopologyVersion() + ']';
GridDhtPreloaderAssignments assignments = new GridDhtPreloaderAssignments(exchId, topVer);
AffinityAssignment aff = grp.affinity().cachedAffinity(topVer);
CachePartitionFullCountersMap countersMap = grp.topology().fullUpdateCounters();
for (int p = 0; p < partitions; p++) {
if (ctx.exchange().hasPendingServerExchange()) {
if (log.isDebugEnabled())
log.debug("Skipping assignments creation, exchange worker has pending assignments: " +
exchId);
assignments.cancelled(true);
return assignments;
}
// If partition belongs to local node.
if (aff.get(p).contains(ctx.localNode())) {
GridDhtLocalPartition part = top.localPartition(p);
assert part != null;
assert part.id() == p;
// Do not rebalance OWNING or LOST partitions.
if (part.state() == OWNING || part.state() == LOST)
continue;
// If partition is currently rented prevent destroy and start clearing process.
if (part.state() == RENTING) {
if (part.reserve()) {
part.moving();
part.clearAsync();
part.release();
}
}
// If partition was destroyed recreate it.
if (part.state() == EVICTED) {
part.awaitDestroy();
part = top.localPartition(p, topVer, true);
assert part != null : "Partition was not created [grp=" + grp.name() + ", topVer=" + topVer + ", p=" + p + ']';
part.resetUpdateCounter();
}
assert part.state() == MOVING : "Partition has invalid state for rebalance " + aff.topologyVersion() + " " + part;
ClusterNode histSupplier = null;
if (grp.persistenceEnabled() && exchFut != null) {
UUID nodeId = exchFut.partitionHistorySupplier(grp.groupId(), p, part.initialUpdateCounter());
if (nodeId != null)
histSupplier = ctx.discovery().node(nodeId);
}
if (histSupplier != null && !exchFut.isClearingPartition(grp, p)) {
assert grp.persistenceEnabled();
assert remoteOwners(p, topVer).contains(histSupplier) : remoteOwners(p, topVer);
GridDhtPartitionDemandMessage msg = assignments.get(histSupplier);
if (msg == null) {
assignments.put(histSupplier, msg = new GridDhtPartitionDemandMessage(
top.updateSequence(),
assignments.topologyVersion(),
grp.groupId())
);
}
// TODO FIXME https://issues.apache.org/jira/browse/IGNITE-11790
msg.partitions().addHistorical(p, part.initialUpdateCounter(), countersMap.updateCounter(p), partitions);
}
else {
// If for some reason (for example if supplier fails and new supplier is elected) partition is
// assigned for full rebalance force clearing if not yet set.
if (grp.persistenceEnabled() && exchFut != null && !exchFut.isClearingPartition(grp, p))
part.clearAsync();
List<ClusterNode> picked = remoteOwners(p, topVer);
if (picked.isEmpty()) {
top.own(part);
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_DATA_LOST)) {
grp.addRebalanceEvent(p,
EVT_CACHE_REBALANCE_PART_DATA_LOST,
exchId.eventNode(),
exchId.event(),
exchId.eventTimestamp());
}
if (log.isDebugEnabled())
log.debug("Owning partition as there are no other owners: " + part);
}
else {
ClusterNode n = picked.get(p % picked.size());
GridDhtPartitionDemandMessage msg = assignments.get(n);
if (msg == null) {
assignments.put(n, msg = new GridDhtPartitionDemandMessage(
top.updateSequence(),
assignments.topologyVersion(),
grp.groupId()));
}
msg.partitions().addFull(p);
}
}
}
}
if (!assignments.isEmpty()) {
ctx.database().lastCheckpointInapplicableForWalRebalance(grp.groupId());
assert exchFut == null || !exchFut.rebalanced() :
"Unexpected rebalance on rebalanced cluster " +
"[top=" + topVer + ", grp=" + grp.groupId() + ", assignments=" + assignments + "]";
}
return assignments;
}
/** {@inheritDoc} */
@Override public void onReconnected() {
startFut = new GridFutureAdapter<>();
}
/**
* Returns remote owners (excluding local node) for specified partition {@code p}.
*
* @param p Partition.
* @param topVer Topology version.
* @return Nodes owning this partition.
*/
private List<ClusterNode> remoteOwners(int p, AffinityTopologyVersion topVer) {
List<ClusterNode> owners = grp.topology().owners(p, topVer);
List<ClusterNode> res = new ArrayList<>(owners.size());
for (ClusterNode owner : owners) {
if (!owner.id().equals(ctx.localNodeId()))
res.add(owner);
}
return res;
}
/** {@inheritDoc} */
@Override public void handleSupplyMessage(UUID nodeId, final GridDhtPartitionSupplyMessage s) {
demander.registerSupplyMessage(nodeId, s, () -> {
if (!enterBusy())
return;
try {
demander.handleSupplyMessage(nodeId, s);
}
finally {
leaveBusy();
}
});
}
/** {@inheritDoc} */
@Override public void handleDemandMessage(int idx, UUID nodeId, GridDhtPartitionDemandMessage d) {
ctx.kernalContext().getStripedRebalanceExecutorService().execute(() -> {
if (!enterBusy())
return;
try {
supplier.handleDemandMessage(idx, nodeId, d);
}
finally {
leaveBusy();
}
}, Math.abs(nodeId.hashCode()));
}
/** {@inheritDoc} */
@Override public Runnable addAssignments(
GridDhtPreloaderAssignments assignments,
boolean forceRebalance,
long rebalanceId,
Runnable next,
@Nullable GridCompoundFuture<Boolean, Boolean> forcedRebFut
) {
return demander.addAssignments(assignments, forceRebalance, rebalanceId, next, forcedRebFut);
}
/**
* @return Start future.
*/
@Override public IgniteInternalFuture<Object> startFuture() {
return startFut;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<?> syncFuture() {
return ctx.kernalContext().clientNode() ? startFut : demander.syncFuture();
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> rebalanceFuture() {
return ctx.kernalContext().clientNode() ? new GridFinishedFuture<>(true) : demander.rebalanceFuture();
}
/**
* @return {@code true} if entered to busy state.
*/
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
private boolean enterBusy() {
busyLock.readLock().lock();
if (stopped) {
busyLock.readLock().unlock();
return false;
}
return true;
}
/**
*
*/
private void leaveBusy() {
busyLock.readLock().unlock();
}
/**
* Resends partitions on partition evict within configured timeout.
*
* @param part Evicted partition.
* @param updateSeq Update sequence.
*/
public void onPartitionEvicted(GridDhtLocalPartition part, boolean updateSeq) {
if (!enterBusy())
return;
try {
top.onEvicted(part, updateSeq);
if (grp.eventRecordable(EVT_CACHE_REBALANCE_PART_UNLOADED))
grp.addUnloadEvent(part.id());
if (updateSeq) {
if (log.isDebugEnabled())
log.debug("Partitions have been scheduled to resend [reason=" +
"Eviction [grp" + grp.cacheOrGroupName() + " " + part.id() + "]");
ctx.exchange().scheduleResendPartitions();
}
}
finally {
leaveBusy();
}
}
/** {@inheritDoc} */
@Override public boolean needForceKeys() {
// Do not use force key request with enabled MVCC.
if (grp.mvccEnabled())
return false;
if (grp.rebalanceEnabled()) {
IgniteInternalFuture<Boolean> rebalanceFut = rebalanceFuture();
if (rebalanceFut.isDone() && Boolean.TRUE.equals(rebalanceFut.result()))
return false;
}
return true;
}
/** {@inheritDoc} */
@Override public GridDhtFuture<Object> request(GridCacheContext cctx,
GridNearAtomicAbstractUpdateRequest req,
AffinityTopologyVersion topVer) {
if (!needForceKeys())
return null;
return request0(cctx, req.keys(), topVer);
}
/**
* @param keys Keys to request.
* @return Future for request.
*/
@Override public GridDhtFuture<Object> request(GridCacheContext cctx,
Collection<KeyCacheObject> keys,
AffinityTopologyVersion topVer) {
if (!needForceKeys())
return null;
return request0(cctx, keys, topVer);
}
/**
* @param cctx Cache context.
* @param keys Keys to request.
* @param topVer Topology version.
* @return Future for request.
*/
@SuppressWarnings({"unchecked", "RedundantCast"})
private GridDhtFuture<Object> request0(GridCacheContext cctx, Collection<KeyCacheObject> keys,
AffinityTopologyVersion topVer) {
if (cctx.isNear())
cctx = cctx.near().dht().context();
final GridDhtForceKeysFuture<?, ?> fut = new GridDhtForceKeysFuture<>(cctx, topVer, keys);
IgniteInternalFuture<?> topReadyFut = cctx.affinity().affinityReadyFuturex(topVer);
if (startFut.isDone() && topReadyFut == null)
fut.init();
else {
if (topReadyFut == null)
startFut.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> syncFut) {
ctx.kernalContext().closure().runLocalSafe(
new GridPlainRunnable() {
@Override public void run() {
fut.init();
}
});
}
});
else {
GridCompoundFuture<Object, Object> compound = new GridCompoundFuture<>();
compound.add((IgniteInternalFuture<Object>)startFut);
compound.add((IgniteInternalFuture<Object>)topReadyFut);
compound.markInitialized();
compound.listen(new CI1<IgniteInternalFuture<?>>() {
@Override public void apply(IgniteInternalFuture<?> syncFut) {
fut.init();
}
});
}
}
return (GridDhtFuture)fut;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Boolean> forceRebalance() {
return demander.forceRebalance();
}
/** {@inheritDoc} */
@SuppressWarnings("LockAcquiredButNotSafelyReleased")
@Override public void pause() {
busyLock.writeLock().lock();
}
/** {@inheritDoc} */
@Override public void resume() {
busyLock.writeLock().unlock();
}
}