blob: 41cc400ff4ccf05504644a43bd57740d266658ea [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
import org.apache.ignite.cluster.*;
import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.distributed.dht.*;
import org.apache.ignite.internal.processors.cache.distributed.near.*;
import org.apache.ignite.internal.processors.cache.dr.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import org.jsr166.*;
import javax.cache.expiry.*;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.*;
import static org.apache.ignite.cache.CacheWriteSynchronizationMode.*;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.*;
/**
* DHT atomic cache near update future.
*/
public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object>
implements GridCacheAtomicFuture<Object>{
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
protected static IgniteLogger log;
/** Cache context. */
private final GridCacheContext cctx;
/** Cache. */
private GridDhtAtomicCache cache;
/** Future ID. */
private volatile GridCacheVersion futVer;
/** Update operation. */
private final GridCacheOperation op;
/** Keys */
private Collection<?> keys;
/** Values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<?> vals;
/** Optional arguments for entry processor. */
private Object[] invokeArgs;
/** Conflict put values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheDrInfo> conflictPutVals;
/** Conflict remove values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Collection<GridCacheVersion> conflictRmvVals;
/** Mappings. */
@GridToStringInclude
private ConcurrentMap<UUID, GridNearAtomicUpdateRequest> mappings;
/** Error. */
private volatile CachePartialUpdateCheckedException err;
/** Operation result. */
private volatile GridCacheReturn opRes;
/** Return value require flag. */
private final boolean retval;
/** Expiry policy. */
private final ExpiryPolicy expiryPlc;
/** Future map topology version. */
private volatile AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO;
/** Completion future for a particular topology version. */
private GridFutureAdapter<Void> topCompleteFut;
/** Optional filter. */
private final CacheEntryPredicate[] filter;
/** Write synchronization mode. */
private final CacheWriteSynchronizationMode syncMode;
/** If this future mapped to single node. */
private volatile Boolean single;
/** If this future is mapped to a single node, this field will contain that node ID. */
private UUID singleNodeId;
/** Single update request. */
private GridNearAtomicUpdateRequest singleReq;
/** Raw return value flag. */
private final boolean rawRetval;
/** Fast map flag. */
private final boolean fastMap;
/** */
private boolean fastMapRemap;
/** */
private GridCacheVersion updVer;
/** Near cache flag. */
private final boolean nearEnabled;
/** Subject ID. */
private final UUID subjId;
/** Task name hash. */
private final int taskNameHash;
/** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */
private boolean topLocked;
/** Skip store flag. */
private final boolean skipStore;
/** Wait for topology future flag. */
private final boolean waitTopFut;
/** Remap count. */
private AtomicInteger remapCnt;
/**
* @param cctx Cache context.
* @param cache Cache instance.
* @param syncMode Write synchronization mode.
* @param op Update operation.
* @param keys Keys to update.
* @param vals Values or transform closure.
* @param invokeArgs Optional arguments for entry processor.
* @param conflictPutVals Conflict put values (optional).
* @param conflictRmvVals Conflict remove values (optional).
* @param retval Return value require flag.
* @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
* @param expiryPlc Expiry policy explicitly specified for cache operation.
* @param filter Entry filter.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip store flag.
*/
public GridNearAtomicUpdateFuture(
GridCacheContext cctx,
GridDhtAtomicCache cache,
CacheWriteSynchronizationMode syncMode,
GridCacheOperation op,
Collection<?> keys,
@Nullable Collection<?> vals,
@Nullable Object[] invokeArgs,
@Nullable Collection<GridCacheDrInfo> conflictPutVals,
@Nullable Collection<GridCacheVersion> conflictRmvVals,
final boolean retval,
final boolean rawRetval,
@Nullable ExpiryPolicy expiryPlc,
final CacheEntryPredicate[] filter,
UUID subjId,
int taskNameHash,
boolean skipStore,
int remapCnt,
boolean waitTopFut
) {
this.rawRetval = rawRetval;
assert vals == null || vals.size() == keys.size();
assert conflictPutVals == null || conflictPutVals.size() == keys.size();
assert conflictRmvVals == null || conflictRmvVals.size() == keys.size();
assert subjId != null;
this.cctx = cctx;
this.cache = cache;
this.syncMode = syncMode;
this.op = op;
this.keys = keys;
this.vals = vals;
this.invokeArgs = invokeArgs;
this.conflictPutVals = conflictPutVals;
this.conflictRmvVals = conflictRmvVals;
this.retval = retval;
this.expiryPlc = expiryPlc;
this.filter = filter;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.skipStore = skipStore;
this.waitTopFut = waitTopFut;
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class);
mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC &&
cctx.config().getAtomicWriteOrderMode() == CLOCK &&
!(cctx.writeThrough() && cctx.config().getInterceptor() != null);
nearEnabled = CU.isNearEnabled(cctx);
this.remapCnt = new AtomicInteger(remapCnt);
}
/** {@inheritDoc} */
@Override public IgniteUuid futureId() {
return futVer.asGridUuid();
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
return futVer;
}
/** {@inheritDoc} */
@Override public Collection<? extends ClusterNode> nodes() {
return F.view(F.viewReadOnly(mappings.keySet(), U.id2Node(cctx.kernalContext())), F.notNull());
}
/**
* @return {@code True} if this future should block partition map exchange.
*/
private boolean waitForPartitionExchange() {
// Wait fast-map near atomic update futures in CLOCK mode.
return fastMap;
}
/** {@inheritDoc} */
@Override public AffinityTopologyVersion topologyVersion() {
return topVer;
}
/** {@inheritDoc} */
@Override public Collection<?> keys() {
return keys;
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
Boolean single0 = single;
if (single0 != null && single0) {
if (singleNodeId.equals(nodeId)) {
onDone(addFailedKeys(
singleReq.keys(),
new ClusterTopologyCheckedException("Primary node left grid before response is received: " + nodeId)));
return true;
}
return false;
}
GridNearAtomicUpdateRequest req = mappings.get(nodeId);
if (req != null) {
addFailedKeys(req.keys(), new ClusterTopologyCheckedException("Primary node left grid before response is " +
"received: " + nodeId));
mappings.remove(nodeId);
checkComplete();
return true;
}
return false;
}
/** {@inheritDoc} */
@Override public boolean trackable() {
return true;
}
/** {@inheritDoc} */
@Override public void markNotTrackable() {
// No-op.
}
/**
* Performs future mapping.
*/
public void map() {
AffinityTopologyVersion topVer = null;
IgniteInternalTx tx = cctx.tm().anyActiveThreadTx();
if (tx != null && tx.topologyVersionSnapshot() != null)
topVer = tx.topologyVersionSnapshot();
if (topVer == null)
topVer = cctx.mvcc().lastExplicitLockTopologyVersion(Thread.currentThread().getId());
if (topVer == null)
mapOnTopology(null, false, null);
else {
topLocked = true;
// Cannot remap.
remapCnt.set(1);
map0(topVer, null, false, null);
}
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
if (waitForPartitionExchange() && topologyVersion().compareTo(topVer) < 0) {
synchronized (this) {
if (this.topVer == AffinityTopologyVersion.ZERO)
return null;
if (this.topVer.compareTo(topVer) < 0) {
if (topCompleteFut == null)
topCompleteFut = new GridFutureAdapter<>();
return topCompleteFut;
}
}
}
return null;
}
/**
* @param failed Keys to remap.
*/
private void remap(Collection<?> failed) {
if (futVer != null)
cctx.mvcc().removeAtomicFuture(version());
Collection<Object> remapKeys = new ArrayList<>(failed.size());
Collection<Object> remapVals = vals != null ? new ArrayList<>(failed.size()) : null;
Collection<GridCacheDrInfo> remapConflictPutVals = conflictPutVals != null ? new ArrayList<GridCacheDrInfo>(failed.size()) : null;
Collection<GridCacheVersion> remapConflictRmvVals = conflictRmvVals != null ? new ArrayList<GridCacheVersion>(failed.size()) : null;
Iterator<?> keyIt = keys.iterator();
Iterator<?> valsIt = vals != null ? vals.iterator() : null;
Iterator<GridCacheDrInfo> conflictPutValsIt = conflictPutVals != null ? conflictPutVals.iterator() : null;
Iterator<GridCacheVersion> conflictRmvValsIt = conflictRmvVals != null ? conflictRmvVals.iterator() : null;
for (Object key : failed) {
while (keyIt.hasNext()) {
Object nextKey = keyIt.next();
Object nextVal = valsIt != null ? valsIt.next() : null;
GridCacheDrInfo nextConflictPutVal = conflictPutValsIt != null ? conflictPutValsIt.next() : null;
GridCacheVersion nextConflictRmvVal = conflictRmvValsIt != null ? conflictRmvValsIt.next() : null;
if (F.eq(key, nextKey)) {
remapKeys.add(nextKey);
if (remapVals != null)
remapVals.add(nextVal);
if (remapConflictPutVals != null)
remapConflictPutVals.add(nextConflictPutVal);
if (remapConflictRmvVals != null)
remapConflictRmvVals.add(nextConflictRmvVal);
break;
}
}
}
keys = remapKeys;
vals = remapVals;
conflictPutVals = remapConflictPutVals;
conflictRmvVals = remapConflictRmvVals;
single = null;
futVer = null;
err = null;
opRes = null;
GridFutureAdapter<Void> fut0;
synchronized (this) {
mappings = new ConcurrentHashMap8<>(keys.size(), 1.0f);
topVer = AffinityTopologyVersion.ZERO;
fut0 = topCompleteFut;
topCompleteFut = null;
}
if (fut0 != null)
fut0.onDone();
singleNodeId = null;
singleReq = null;
fastMapRemap = false;
updVer = null;
topLocked = false;
map();
}
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
assert res == null || res instanceof GridCacheReturn;
GridCacheReturn ret = (GridCacheReturn)res;
Object retval =
res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? ret.value() : ret.success();
if (op == TRANSFORM && retval == null)
retval = Collections.emptyMap();
if (err != null && X.hasCause(err, CachePartialUpdateCheckedException.class) &&
X.hasCause(err, ClusterTopologyCheckedException.class) &&
remapCnt.decrementAndGet() > 0) {
CachePartialUpdateCheckedException cause = X.cause(err, CachePartialUpdateCheckedException.class);
if (F.isEmpty(cause.failedKeys()))
cause.printStackTrace();
remap(cause.failedKeys());
return false;
}
if (super.onDone(retval, err)) {
if (futVer != null)
cctx.mvcc().removeAtomicFuture(version());
GridFutureAdapter<Void> fut0;
synchronized (this) {
fut0 = topCompleteFut;
}
if (fut0 != null)
fut0.onDone();
return true;
}
return false;
}
/**
* Response callback.
*
* @param nodeId Node ID.
* @param res Update response.
*/
public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) {
if (res.remapKeys() != null) {
assert !fastMap || cctx.kernalContext().clientNode();
Collection<KeyCacheObject> remapKeys = fastMap ? null : res.remapKeys();
mapOnTopology(remapKeys, true, nodeId);
return;
}
GridCacheReturn ret = res.returnValue();
Boolean single0 = single;
if (single0 != null && single0) {
assert singleNodeId.equals(nodeId) : "Invalid response received for single-node mapped future " +
"[singleNodeId=" + singleNodeId + ", nodeId=" + nodeId + ", res=" + res + ']';
updateNear(singleReq, res);
if (res.error() != null)
onDone(res.failedKeys() != null ? addFailedKeys(res.failedKeys(), res.error()) : res.error());
else {
if (op == TRANSFORM) {
if (ret != null)
addInvokeResults(ret);
onDone(opRes);
}
else {
GridCacheReturn opRes0 = opRes = ret;
onDone(opRes0);
}
}
}
else {
GridNearAtomicUpdateRequest req = mappings.get(nodeId);
if (req != null) { // req can be null if onResult is being processed concurrently with onNodeLeft.
updateNear(req, res);
if (res.error() != null)
addFailedKeys(req.keys(), res.error());
else {
if (op == TRANSFORM) {
assert !req.fastMap();
if (ret != null)
addInvokeResults(ret);
}
else if (req.fastMap() && req.hasPrimary())
opRes = ret;
}
mappings.remove(nodeId);
}
checkComplete();
}
}
/**
* Updates near cache.
*
* @param req Update request.
* @param res Update response.
*/
private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
if (!nearEnabled || !req.hasPrimary())
return;
GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
near.processNearAtomicUpdateResponse(req, res);
}
/**
* Maps future on ready topology.
*
* @param keys Keys to map.
* @param remap Boolean flag indicating if this is partial future remap.
* @param oldNodeId Old node ID if remap.
*/
private void mapOnTopology(final Collection<?> keys, final boolean remap, final UUID oldNodeId) {
cache.topology().readLock();
AffinityTopologyVersion topVer = null;
try {
if (cache.topology().stopping()) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
cache.name()));
return;
}
GridDhtTopologyFuture fut = cctx.topologyVersionFuture();
if (fut.isDone()) {
if (!fut.isCacheTopologyValid(cctx)) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache topology is not valid): " +
cctx.name()));
return;
}
topVer = fut.topologyVersion();
}
else {
if (waitTopFut) {
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
mapOnTopology(keys, remap, oldNodeId);
}
});
}
});
}
else
onDone(new GridCacheTryPutFailedException());
return;
}
}
finally {
cache.topology().readUnlock();
}
map0(topVer, keys, remap, oldNodeId);
}
/**
* Checks if future is ready to be completed.
*/
private void checkComplete() {
boolean remap = false;
synchronized (this) {
if (topVer != AffinityTopologyVersion.ZERO &&
((syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY) || mappings.isEmpty())) {
CachePartialUpdateCheckedException err0 = err;
if (err0 != null)
onDone(err0);
else {
if (fastMapRemap) {
assert cctx.kernalContext().clientNode();
remap = true;
}
else
onDone(opRes);
}
}
}
if (remap)
mapOnTopology(null, true, null);
}
/**
* @param topVer Topology version.
* @param remapKeys Keys to remap or {@code null} to map all keys.
* @param remap Flag indicating if this is partial remap for this future.
* @param oldNodeId Old node ID if was remap.
*/
private void map0(
AffinityTopologyVersion topVer,
@Nullable Collection<?> remapKeys,
boolean remap,
@Nullable UUID oldNodeId) {
assert oldNodeId == null || remap || fastMapRemap;
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid)."));
return;
}
if (futVer == null)
// Assign future version in topology read lock before first exception may be thrown.
futVer = cctx.versions().next(topVer);
if (!remap && (cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC))
cctx.mvcc().addAtomicFuture(version(), this);
CacheConfiguration ccfg = cctx.config();
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
if (updVer == null)
updVer = ccfg.getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null;
if (updVer != null && log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
if (keys.size() == 1 && !fastMap && (single == null || single)) {
assert remapKeys == null || remapKeys.size() == 1 : remapKeys;
Object key = F.first(keys);
Object val;
GridCacheVersion conflictVer;
long conflictTtl;
long conflictExpireTime;
if (vals != null) {
// Regular PUT.
val = F.first(vals);
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else if (conflictPutVals != null) {
// Conflict PUT.
GridCacheDrInfo conflictPutVal = F.first(conflictPutVals);
val = conflictPutVal.value();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
// Conflict REMOVE.
val = null;
conflictVer = F.first(conflictRmvVals);
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
// Regular REMOVE.
val = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
// We still can get here if user pass map with single element.
if (key == null) {
NullPointerException err = new NullPointerException("Null key.");
onDone(err);
return;
}
if (val == null && op != GridCacheOperation.DELETE) {
NullPointerException err = new NullPointerException("Null value.");
onDone(err);
return;
}
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
if (primary == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid)."));
return;
}
GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
cctx.cacheId(),
primary.id(),
futVer,
fastMap,
updVer,
topVer,
topLocked,
syncMode,
op,
retval,
expiryPlc,
invokeArgs,
filter,
subjId,
taskNameHash,
skipStore,
cctx.kernalContext().clientNode());
req.addUpdateEntry(cacheKey,
val,
conflictTtl,
conflictExpireTime,
conflictVer,
true);
synchronized (this) {
this.topVer = topVer;
single = true;
}
// Optimize mapping for single key.
mapSingle(primary.id(), req);
return;
}
Iterator<?> it = null;
if (vals != null)
it = vals.iterator();
Iterator<GridCacheDrInfo> conflictPutValsIt = null;
if (conflictPutVals != null)
conflictPutValsIt = conflictPutVals.iterator();
Iterator<GridCacheVersion> conflictRmvValsIt = null;
if (conflictRmvVals != null)
conflictRmvValsIt = conflictRmvVals.iterator();
Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = new HashMap<>(topNodes.size(), 1.0f);
// Must do this in synchronized block because we need to atomically remove and add mapping.
// Otherwise checkComplete() may see empty intermediate state.
synchronized (this) {
if (oldNodeId != null)
removeMapping(oldNodeId);
// For fastMap mode wait for all responses before remapping.
if (remap && fastMap && !mappings.isEmpty()) {
fastMapRemap = true;
return;
}
// Create mappings first, then send messages.
for (Object key : keys) {
if (key == null) {
NullPointerException err = new NullPointerException("Null key.");
onDone(err);
return;
}
Object val;
GridCacheVersion conflictVer;
long conflictTtl;
long conflictExpireTime;
if (vals != null) {
val = it.next();
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
if (val == null) {
NullPointerException err = new NullPointerException("Null value.");
onDone(err);
return;
}
}
else if (conflictPutVals != null) {
GridCacheDrInfo conflictPutVal = conflictPutValsIt.next();
val = conflictPutVal.value();
conflictVer = conflictPutVal.version();
conflictTtl = conflictPutVal.ttl();
conflictExpireTime = conflictPutVal.expireTime();
}
else if (conflictRmvVals != null) {
val = null;
conflictVer = conflictRmvValsIt.next();
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
else {
val = null;
conflictVer = null;
conflictTtl = CU.TTL_NOT_CHANGED;
conflictExpireTime = CU.EXPIRE_TIME_CALCULATE;
}
if (val == null && op != GridCacheOperation.DELETE)
continue;
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
if (remapKeys != null && !remapKeys.contains(cacheKey))
continue;
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap);
if (affNodes.isEmpty()) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
return;
}
int i = 0;
for (ClusterNode affNode : affNodes) {
if (affNode == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
return;
}
UUID nodeId = affNode.id();
GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId);
if (mapped == null) {
mapped = new GridNearAtomicUpdateRequest(
cctx.cacheId(),
nodeId,
futVer,
fastMap,
updVer,
topVer,
topLocked,
syncMode,
op,
retval,
expiryPlc,
invokeArgs,
filter,
subjId,
taskNameHash,
skipStore,
cctx.kernalContext().clientNode());
pendingMappings.put(nodeId, mapped);
GridNearAtomicUpdateRequest old = mappings.put(nodeId, mapped);
assert old == null || (old != null && remap) :
"Invalid mapping state [old=" + old + ", remap=" + remap + ']';
}
mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0);
i++;
}
}
this.topVer = topVer;
fastMapRemap = false;
}
if ((single == null || single) && pendingMappings.size() == 1) {
Map.Entry<UUID, GridNearAtomicUpdateRequest> entry = F.first(pendingMappings.entrySet());
single = true;
mapSingle(entry.getKey(), entry.getValue());
return;
}
else
single = false;
doUpdate(pendingMappings);
}
/**
* Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near
* node and send updates in parallel to all participating nodes.
*
* @param key Key to map.
* @param topVer Topology version to map.
* @param fastMap Flag indicating whether mapping is performed for fast-circuit update.
* @return Collection of nodes to which key is mapped.
*/
private Collection<ClusterNode> mapKey(
KeyCacheObject key,
AffinityTopologyVersion topVer,
boolean fastMap
) {
GridCacheAffinityManager affMgr = cctx.affinity();
// If we can send updates in parallel - do it.
return fastMap ?
cctx.topology().nodes(affMgr.partition(key), topVer) :
Collections.singletonList(affMgr.primary(key, topVer));
}
/**
* Maps future to single node.
*
* @param nodeId Node ID.
* @param req Request.
*/
private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) {
singleNodeId = nodeId;
singleReq = req;
if (cctx.localNodeId().equals(nodeId)) {
cache.updateAllAsyncInternal(nodeId, req,
new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@Override public void apply(GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
assert res.futureVersion().equals(futVer);
onResult(res.nodeId(), res);
}
});
}
else {
try {
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
if (syncMode == FULL_ASYNC && cctx.config().getAtomicWriteOrderMode() == PRIMARY)
onDone(new GridCacheReturn(cctx, true, null, true));
}
catch (IgniteCheckedException e) {
onDone(addFailedKeys(req.keys(), e));
}
}
}
/**
* Sends messages to remote nodes and updates local cache.
*
* @param mappings Mappings to send.
*/
private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) {
UUID locNodeId = cctx.localNodeId();
GridNearAtomicUpdateRequest locUpdate = null;
// Send messages to remote nodes first, then run local update.
for (GridNearAtomicUpdateRequest req : mappings.values()) {
if (locNodeId.equals(req.nodeId())) {
assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate +
", req=" + req + ']';
locUpdate = req;
}
else {
try {
if (log.isDebugEnabled())
log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']');
cctx.io().send(req.nodeId(), req, cctx.ioPolicy());
}
catch (IgniteCheckedException e) {
addFailedKeys(req.keys(), e);
removeMapping(req.nodeId());
}
if (syncMode == PRIMARY_SYNC && !req.hasPrimary())
removeMapping(req.nodeId());
}
}
if (syncMode == FULL_ASYNC)
// In FULL_ASYNC mode always return (null, true).
opRes = new GridCacheReturn(cctx, true, null, true);
if (locUpdate != null) {
cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate,
new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() {
@Override public void apply(GridNearAtomicUpdateRequest req,
GridNearAtomicUpdateResponse res) {
assert res.futureVersion().equals(futVer);
onResult(res.nodeId(), res);
}
});
}
checkComplete();
}
/**
* Removes mapping from future mappings map.
*
* @param nodeId Node ID to remove mapping for.
*/
private void removeMapping(UUID nodeId) {
mappings.remove(nodeId);
}
/**
* @param ret Result from single node.
*/
@SuppressWarnings("unchecked")
private synchronized void addInvokeResults(GridCacheReturn ret) {
assert op == TRANSFORM : op;
assert ret.value() == null || ret.value() instanceof Map : ret.value();
if (ret.value() != null) {
if (opRes != null)
opRes.mergeEntryProcessResults(ret);
else
opRes = ret;
}
}
/**
* @param failedKeys Failed keys.
* @param err Error cause.
* @return Root {@link org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException}.
*/
private synchronized IgniteCheckedException addFailedKeys(Collection<KeyCacheObject> failedKeys, Throwable err) {
CachePartialUpdateCheckedException err0 = this.err;
if (err0 == null)
err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
Collection<Object> keys = new ArrayList<>(failedKeys.size());
for (KeyCacheObject key : failedKeys)
keys.add(key.value(cctx.cacheObjectContext(), false));
err0.add(keys, err);
return err0;
}
/** {@inheritDoc} */
public String toString() {
return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString());
}
}