blob: 1079d5b71cf8e723e0817cc6191accca86584905 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology;
import org.apache.ignite.internal.util.F0;
import org.apache.ignite.internal.util.GridLeanSet;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.Nullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_OBJECT_LOADED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.MOVING;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE;
import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRELOAD;
/**
* Force keys request future.
*/
public final class GridDhtForceKeysFuture<K, V> extends GridCompoundFuture<Object, Collection<K>>
implements GridDhtFuture<Collection<K>> {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
/** Wait for 1 second for topology to change. */
private static final long REMAP_PAUSE = 1000;
/** Cache context. */
private GridCacheContext<K, V> cctx;
/** Topology. */
private GridDhtPartitionTopology top;
/** Keys to request. */
private Collection<KeyCacheObject> keys;
/** Keys for which local node is no longer primary. */
private Collection<Integer> invalidParts = new GridLeanSet<>();
/** Topology change counter. */
private AtomicInteger topCntr = new AtomicInteger(1);
/** Topology version. */
private AffinityTopologyVersion topVer;
/** Future ID. */
private IgniteUuid futId = IgniteUuid.randomUuid();
/** Trackable flag. */
private boolean trackable;
/**
* @param cctx Cache context.
* @param topVer Topology version.
* @param keys Keys.
*/
public GridDhtForceKeysFuture(
GridCacheContext<K, V> cctx,
AffinityTopologyVersion topVer,
Collection<KeyCacheObject> keys
) {
assert topVer.topologyVersion() != 0 : topVer;
assert !F.isEmpty(keys) : keys;
assert !cctx.isNear();
this.cctx = cctx;
this.keys = keys;
this.topVer = topVer;
top = cctx.dht().topology();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtForceKeysFuture.class);
}
/**
* @return Future ID.
*/
public IgniteUuid futureId() {
return futId;
}
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
return invalidParts;
}
/**
* @param f Future.
* @return {@code True} if mini-future.
*/
private boolean isMini(IgniteInternalFuture<?> f) {
return f.getClass().equals(MiniFuture.class);
}
/** {@inheritDoc} */
@Override public boolean onDone(@Nullable Collection<K> res, @Nullable Throwable err) {
if (super.onDone(res, err)) {
if (trackable)
cctx.dht().removeFuture(this);
return true;
}
return false;
}
/**
* @param evt Discovery event.
*/
public void onDiscoveryEvent(DiscoveryEvent evt) {
topCntr.incrementAndGet();
int type = evt.type();
for (IgniteInternalFuture<?> f : futures()) {
if (isMini(f)) {
MiniFuture mini = (MiniFuture)f;
mini.onDiscoveryEvent();
if (type == EVT_NODE_LEFT || type == EVT_NODE_FAILED) {
if (mini.node().id().equals(evt.eventNode().id())) {
mini.onResult();
break;
}
}
}
}
}
/**
* @param res Response.
*/
public void onResult(GridDhtForceKeysResponse res) {
for (IgniteInternalFuture<Object> f : futures())
if (isMini(f)) {
MiniFuture mini = (MiniFuture)f;
if (mini.miniId().equals(res.miniId())) {
mini.onResult(res);
return;
}
}
if (log.isDebugEnabled())
log.debug("Failed to find mini future for response [cacheName=" + cctx.name() + ", res=" + res + ']');
}
/**
* Initializes this future.
*/
public void init() {
assert cctx.preloader().startFuture().isDone();
map(keys, Collections.<ClusterNode>emptyList());
markInitialized();
}
/**
* @param keys Keys.
* @param exc Exclude nodes.
* @return {@code True} if some mapping was added.
*/
private boolean map(Iterable<KeyCacheObject> keys, Collection<ClusterNode> exc) {
Map<ClusterNode, Set<KeyCacheObject>> mappings = null;
for (KeyCacheObject key : keys)
mappings = map(key, mappings, exc);
if (isDone())
return false;
boolean ret = false;
if (mappings != null) {
ClusterNode loc = cctx.localNode();
int curTopVer = topCntr.get();
if (!cctx.dht().addFuture(this)) {
assert isDone() : this;
return false;
}
trackable = true;
// Create mini futures.
for (Map.Entry<ClusterNode, Set<KeyCacheObject>> mapped : mappings.entrySet()) {
ClusterNode n = mapped.getKey();
Set<KeyCacheObject> mappedKeys = mapped.getValue();
int cnt = F.size(mappedKeys);
if (cnt > 0) {
ret = true;
MiniFuture fut = new MiniFuture(n, mappedKeys, curTopVer, exc);
GridDhtForceKeysRequest req = new GridDhtForceKeysRequest(
cctx.cacheId(),
futId,
fut.miniId(),
mappedKeys,
topVer,
cctx.deploymentEnabled());
try {
add(fut); // Append new future.
assert !n.id().equals(loc.id());
if (log.isTraceEnabled())
log.trace("Sending force key request [cacheName=" + cctx.name() + "node=" + n.id() +
", req=" + req + ']');
cctx.io().send(n, req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
// Fail the whole thing.
if (e instanceof ClusterTopologyCheckedException)
fut.onResult();
else if (!cctx.kernalContext().isStopping())
fut.onResult(e);
}
}
}
}
return ret;
}
/**
* @param key Key.
* @param exc Exclude nodes.
* @param mappings Mappings.
* @return Mappings.
*/
private Map<ClusterNode, Set<KeyCacheObject>> map(KeyCacheObject key,
@Nullable Map<ClusterNode, Set<KeyCacheObject>> mappings,
Collection<ClusterNode> exc) {
ClusterNode loc = cctx.localNode();
GridCacheEntryEx e = cctx.dht().peekEx(key);
try {
if (e != null && !e.isNewLocked()) {
if (log.isTraceEnabled()) {
int part = cctx.affinity().partition(key);
log.trace("Will not rebalance key (entry is not new) [cacheName=" + cctx.name() +
", key=" + key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
}
// Key has been rebalanced or retrieved already.
return mappings;
}
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isTraceEnabled())
log.trace("Received removed DHT entry for force keys request [entry=" + e +
", locId=" + cctx.nodeId() + ']');
}
int part = cctx.affinity().partition(key);
List<ClusterNode> owners = F.isEmpty(exc) ? top.owners(part, topVer) :
new ArrayList<>(F.view(top.owners(part, topVer), F.notIn(exc)));
if (owners.isEmpty() || (owners.contains(loc) && cctx.rebalanceEnabled())) {
if (log.isTraceEnabled())
log.trace("Will not rebalance key (local node is owner) [key=" + key + ", part=" + part +
"topVer=" + topVer + ", locId=" + cctx.nodeId() + ']');
// Key is already rebalanced.
return mappings;
}
// Create partition.
GridDhtLocalPartition locPart = top.localPartition(part, topVer, false);
if (log.isTraceEnabled())
log.trace("Mapping local partition [loc=" + cctx.localNodeId() + ", topVer" + topVer +
", part=" + locPart + ", owners=" + owners + ", allOwners=" + U.toShortString(top.owners(part)) + ']');
if (locPart == null)
invalidParts.add(part);
else if (!cctx.rebalanceEnabled() || locPart.state() == MOVING) {
Collections.sort(owners, CU.nodeComparator(false));
// Load from youngest owner.
ClusterNode pick = F.first(owners);
assert pick != null;
if (!cctx.rebalanceEnabled() && loc.id().equals(pick.id()))
pick = F.first(F.view(owners, F.remoteNodes(loc.id())));
if (pick == null) {
if (log.isTraceEnabled())
log.trace("Will not rebalance key (no nodes to request from with rebalancing disabled) [key=" +
key + ", part=" + part + ", locId=" + cctx.nodeId() + ']');
return mappings;
}
if (mappings == null)
mappings = U.newHashMap(keys.size());
Collection<KeyCacheObject> mappedKeys = F.addIfAbsent(mappings, pick, F.<KeyCacheObject>newSet());
assert mappedKeys != null;
mappedKeys.add(key);
if (log.isTraceEnabled())
log.trace("Will rebalance key from node [cacheName=" + cctx.name() + ", key=" + key + ", part=" +
part + ", node=" + pick.id() + ", locId=" + cctx.nodeId() + ']');
}
else if (locPart.state() != OWNING)
invalidParts.add(part);
else {
if (log.isTraceEnabled())
log.trace("Will not rebalance key (local partition is not MOVING) [cacheName=" + cctx.name() +
", key=" + key + ", part=" + locPart + ", locId=" + cctx.nodeId() + ']');
}
return mappings;
}
/** {@inheritDoc} */
@Override public String toString() {
Collection<String> futs = F.viewReadOnly(futures(), new C1<IgniteInternalFuture<?>, String>() {
@Override public String apply(IgniteInternalFuture<?> f) {
return f.toString();
}
});
return S.toString(GridDhtForceKeysFuture.class, this,
"innerFuts", futs,
"super", super.toString());
}
/**
* Mini-future for get operations. Mini-futures are only waiting on a single node as opposed to multiple nodes.
*/
private class MiniFuture extends GridFutureAdapter<Object> {
/** Mini-future ID. */
private IgniteUuid miniId = IgniteUuid.randomUuid();
/** Node. */
private ClusterNode node;
/** Requested keys. */
private Collection<KeyCacheObject> keys;
/** Topology version for this mini-future. */
private int curTopVer;
/** Pause latch for remapping missed keys. */
private CountDownLatch pauseLatch = new CountDownLatch(1);
/** Excludes. */
private Collection<ClusterNode> exc;
/**
* @param node Node.
* @param keys Keys.
* @param curTopVer Topology version for this mini-future.
* @param exc Exclude node list.
*/
MiniFuture(ClusterNode node, Collection<KeyCacheObject> keys, int curTopVer, Collection<ClusterNode> exc) {
assert node != null;
assert curTopVer > 0;
assert exc != null;
this.node = node;
this.keys = keys;
this.curTopVer = curTopVer;
this.exc = exc;
}
/**
* @return Mini-future ID.
*/
IgniteUuid miniId() {
return miniId;
}
/**
* @return Node ID.
*/
ClusterNode node() {
return node;
}
/**
*
*/
void onDiscoveryEvent() {
pauseLatch.countDown();
}
/**
* @param e Error.
*/
void onResult(Throwable e) {
if (log.isDebugEnabled())
log.debug("Failed to get future result [fut=" + this + ", err=" + e + ']');
// Fail.
onDone(e);
}
/**
*/
void onResult() {
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
// Remap.
map(keys, /*exclude*/F.asList(node));
onDone(true);
}
/**
* @param res Result callback.
*/
void onResult(GridDhtForceKeysResponse res) {
if (res.error() != null) {
onDone(res.error());
return;
}
Collection<KeyCacheObject> missedKeys = res.missedKeys();
boolean remapMissed = false;
if (!F.isEmpty(missedKeys)) {
if (curTopVer != topCntr.get() || pauseLatch.getCount() == 0)
map(missedKeys, Collections.<ClusterNode>emptyList());
else
remapMissed = true;
}
// If rebalancing is disabled, we need to check other backups.
if (!cctx.rebalanceEnabled()) {
Collection<KeyCacheObject> retryKeys = F.view(
keys,
F0.notIn(missedKeys),
F0.notIn(F.viewReadOnly(res.forcedInfos(), CU.<KeyCacheObject, V>info2Key())));
if (!retryKeys.isEmpty())
map(retryKeys, F.concat(false, node, exc));
}
boolean rec = cctx.events().isRecordable(EVT_CACHE_REBALANCE_OBJECT_LOADED);
boolean replicate = cctx.isDrEnabled();
for (GridCacheEntryInfo info : res.forcedInfos()) {
int p = cctx.affinity().partition(info.key());
GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false);
if (locPart != null && (!cctx.rebalanceEnabled() || locPart.state() == MOVING) && locPart.reserve()) {
GridCacheEntryEx entry = cctx.dht().entryEx(info.key());
cctx.shared().database().checkpointReadLock();
try {
if (entry.initialValue(
info.value(),
info.version(),
info.ttl(),
info.expireTime(),
true,
topVer,
replicate ? DR_PRELOAD : DR_NONE,
false,
false
)) {
if (rec && !entry.isInternal())
cctx.events().addEvent(entry.partition(), entry.key(), cctx.localNodeId(), null,
null, null, EVT_CACHE_REBALANCE_OBJECT_LOADED, info.value(), true, null,
false, null, null, false);
}
}
catch (IgniteCheckedException e) {
onDone(e);
return;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isTraceEnabled())
log.trace("Trying to rebalance removed entry (will ignore) [cacheName=" +
cctx.name() + ", entry=" + entry + ']');
}
finally {
cctx.shared().database().checkpointReadUnlock();
locPart.release();
}
}
}
if (remapMissed && pause())
map(missedKeys, Collections.<ClusterNode>emptyList());
// Finish mini future.
onDone(true);
}
/**
* Pause to avoid crazy resending in case of topology changes.
*
* @return {@code True} if was not interrupted.
*/
private boolean pause() {
try {
U.await(pauseLatch, REMAP_PAUSE, MILLISECONDS);
return true;
}
catch (IgniteInterruptedCheckedException e) {
// Fail.
onDone(e);
return false;
}
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(MiniFuture.class, this, super.toString());
}
}
}