blob: b996ce4116a8d6400614f40830e49c099a2afb68 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.atomic;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import javax.cache.expiry.ExpiryPolicy;
import javax.cache.processor.EntryProcessor;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException;
import org.apache.ignite.internal.processors.cache.EntryProcessorResourceInjectorProxy;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheOperation;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK;
import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM;
/**
* DHT atomic cache near update future.
*/
public class GridNearAtomicSingleUpdateFuture extends GridNearAtomicAbstractUpdateFuture {
/** Keys */
private Object key;
/** Values. */
@SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"})
private Object val;
/** Not null is operation is mapped to single node. */
private GridNearAtomicUpdateRequest req;
/**
* @param cctx Cache context.
* @param cache Cache instance.
* @param syncMode Write synchronization mode.
* @param op Update operation.
* @param key Keys to update.
* @param val Values or transform closure.
* @param invokeArgs Optional arguments for entry processor.
* @param retval Return value require flag.
* @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result.
* @param expiryPlc Expiry policy explicitly specified for cache operation.
* @param filter Entry filter.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param skipStore Skip store flag.
* @param keepBinary Keep binary flag.
* @param remapCnt Maximum number of retries.
* @param waitTopFut If {@code false} does not wait for affinity change future.
*/
public GridNearAtomicSingleUpdateFuture(
GridCacheContext cctx,
GridDhtAtomicCache cache,
CacheWriteSynchronizationMode syncMode,
GridCacheOperation op,
Object key,
@Nullable Object val,
@Nullable Object[] invokeArgs,
final boolean retval,
final boolean rawRetval,
@Nullable ExpiryPolicy expiryPlc,
final CacheEntryPredicate[] filter,
UUID subjId,
int taskNameHash,
boolean skipStore,
boolean keepBinary,
int remapCnt,
boolean waitTopFut
) {
super(cctx, cache, syncMode, op, invokeArgs, retval, rawRetval, expiryPlc, filter, subjId, taskNameHash,
skipStore, keepBinary, remapCnt, waitTopFut);
assert subjId != null;
this.key = key;
this.val = val;
}
/** {@inheritDoc} */
@Override public GridCacheVersion version() {
synchronized (mux) {
return futVer;
}
}
/** {@inheritDoc} */
@Override public boolean onNodeLeft(UUID nodeId) {
GridNearAtomicUpdateResponse res = null;
GridNearAtomicUpdateRequest req;
synchronized (mux) {
req = this.req != null && this.req.nodeId().equals(nodeId) ? this.req : null;
if (req != null && req.response() == null) {
res = new GridNearAtomicUpdateResponse(cctx.cacheId(),
nodeId,
req.futureVersion(),
cctx.deploymentEnabled());
ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " +
"before response is received: " + nodeId);
e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion()));
res.addFailedKeys(req.keys(), e);
}
}
if (res != null) {
if (msgLog.isDebugEnabled()) {
msgLog.debug("Near update single fut, node left [futId=" + req.futureVersion() +
", writeVer=" + req.updateVersion() +
", node=" + nodeId + ']');
}
onResult(nodeId, res, true);
}
return false;
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) {
return null;
}
/** {@inheritDoc} */
@SuppressWarnings("ConstantConditions")
@Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) {
assert res == null || res instanceof GridCacheReturn;
GridCacheReturn ret = (GridCacheReturn)res;
Object retval =
res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ?
cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success();
if (op == TRANSFORM && retval == null)
retval = Collections.emptyMap();
if (super.onDone(retval, err)) {
GridCacheVersion futVer = onFutureDone();
if (futVer != null)
cctx.mvcc().removeAtomicFuture(futVer);
return true;
}
return false;
}
/** {@inheritDoc} */
@SuppressWarnings({"unchecked", "ThrowableResultOfMethodCallIgnored"})
@Override public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) {
GridNearAtomicUpdateRequest req;
AffinityTopologyVersion remapTopVer = null;
GridCacheReturn opRes0 = null;
CachePartialUpdateCheckedException err0 = null;
GridFutureAdapter<?> fut0 = null;
synchronized (mux) {
if (!res.futureVersion().equals(futVer))
return;
if (!this.req.nodeId().equals(nodeId))
return;
req = this.req;
this.req = null;
boolean remapKey = !F.isEmpty(res.remapKeys());
if (remapKey) {
if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0)
mapErrTopVer = req.topologyVersion();
}
else if (res.error() != null) {
if (res.failedKeys() != null) {
if (err == null)
err = new CachePartialUpdateCheckedException(
"Failed to update keys (retry update if possible).");
Collection<Object> keys = new ArrayList<>(res.failedKeys().size());
for (KeyCacheObject key : res.failedKeys())
keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false));
err.add(keys, res.error(), req.topologyVersion());
}
}
else {
if (!req.fastMap() || req.hasPrimary()) {
GridCacheReturn ret = res.returnValue();
if (op == TRANSFORM) {
if (ret != null) {
assert ret.value() == null || ret.value() instanceof Map : ret.value();
if (ret.value() != null) {
if (opRes != null)
opRes.mergeEntryProcessResults(ret);
else
opRes = ret;
}
}
}
else
opRes = ret;
}
}
if (remapKey) {
assert mapErrTopVer != null;
remapTopVer = cctx.shared().exchange().topologyVersion();
}
else {
if (err != null &&
X.hasCause(err, CachePartialUpdateCheckedException.class) &&
X.hasCause(err, ClusterTopologyCheckedException.class) &&
storeFuture() &&
--remapCnt > 0) {
ClusterTopologyCheckedException topErr =
X.cause(err, ClusterTopologyCheckedException.class);
if (!(topErr instanceof ClusterTopologyServerNotFoundException)) {
CachePartialUpdateCheckedException cause =
X.cause(err, CachePartialUpdateCheckedException.class);
assert cause != null && cause.topologyVersion() != null : err;
remapTopVer =
new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1);
err = null;
updVer = null;
}
}
}
if (remapTopVer == null) {
err0 = err;
opRes0 = opRes;
}
else {
fut0 = topCompleteFut;
topCompleteFut = null;
cctx.mvcc().removeAtomicFuture(futVer);
futVer = null;
topVer = AffinityTopologyVersion.ZERO;
}
}
if (res.error() != null && res.failedKeys() == null) {
onDone(res.error());
return;
}
if (nearEnabled && !nodeErr)
updateNear(req, res);
if (remapTopVer != null) {
if (fut0 != null)
fut0.onDone();
if (!waitTopFut) {
onDone(new GridCacheTryPutFailedException());
return;
}
if (topLocked) {
CachePartialUpdateCheckedException e =
new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible).");
ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException(
"Failed to update keys, topology changed while execute atomic update inside transaction.");
cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer));
e.add(Collections.singleton(cctx.toCacheKeyObject(key)), cause);
onDone(e);
return;
}
IgniteInternalFuture<AffinityTopologyVersion> fut =
cctx.shared().exchange().affinityReadyFuture(remapTopVer);
if (fut == null)
fut = new GridFinishedFuture<>(remapTopVer);
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
mapOnTopology();
}
});
}
});
return;
}
onDone(opRes0, err0);
}
/**
* Updates near cache.
*
* @param req Update request.
* @param res Update response.
*/
private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) {
assert nearEnabled;
if (res.remapKeys() != null || !req.hasPrimary())
return;
GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near();
near.processNearAtomicUpdateResponse(req, res);
}
/** {@inheritDoc} */
@Override protected void mapOnTopology() {
cache.topology().readLock();
AffinityTopologyVersion topVer;
GridCacheVersion futVer;
try {
if (cache.topology().stopping()) {
onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " +
cache.name()));
return;
}
GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture();
if (fut.isDone()) {
Throwable err = fut.validateCache(cctx);
if (err != null) {
onDone(err);
return;
}
topVer = fut.topologyVersion();
futVer = addAtomicFuture(topVer);
}
else {
if (waitTopFut) {
assert !topLocked : this;
fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
@Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) {
cctx.kernalContext().closure().runLocalSafe(new Runnable() {
@Override public void run() {
mapOnTopology();
}
});
}
});
}
else
onDone(new GridCacheTryPutFailedException());
return;
}
}
finally {
cache.topology().readUnlock();
}
if (futVer != null)
map(topVer, futVer);
}
/** {@inheritDoc} */
@Override protected void map(AffinityTopologyVersion topVer, GridCacheVersion futVer) {
Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer);
if (F.isEmpty(topNodes)) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid)."));
return;
}
GridCacheVersion updVer;
// Assign version on near node in CLOCK ordering mode even if fastMap is false.
if (cctx.config().getAtomicWriteOrderMode() == CLOCK) {
updVer = this.updVer;
if (updVer == null) {
updVer = cctx.versions().next(topVer);
if (log.isDebugEnabled())
log.debug("Assigned fast-map version for update on near node: " + updVer);
}
}
else
updVer = null;
Exception err = null;
GridNearAtomicUpdateRequest singleReq0 = null;
try {
singleReq0 = mapSingleUpdate(topVer, futVer, updVer);
synchronized (mux) {
assert this.futVer == futVer || (this.isDone() && this.error() != null);
assert this.topVer == topVer;
this.updVer = updVer;
resCnt = 0;
req = singleReq0;
}
}
catch (Exception e) {
err = e;
}
if (err != null) {
onDone(err);
return;
}
// Optimize mapping for single key.
mapSingle(singleReq0.nodeId(), singleReq0);
}
/**
* @return Future version.
*/
private GridCacheVersion onFutureDone() {
GridCacheVersion ver0;
GridFutureAdapter<Void> fut0;
synchronized (mux) {
fut0 = topCompleteFut;
topCompleteFut = null;
ver0 = futVer;
futVer = null;
}
if (fut0 != null)
fut0.onDone();
return ver0;
}
/**
* @param topVer Topology version.
* @param futVer Future version.
* @param updVer Update version.
* @return Request.
* @throws Exception If failed.
*/
private GridNearAtomicUpdateRequest mapSingleUpdate(AffinityTopologyVersion topVer,
GridCacheVersion futVer,
@Nullable GridCacheVersion updVer) throws Exception {
if (key == null)
throw new NullPointerException("Null key.");
Object val = this.val;
if (val == null && op != GridCacheOperation.DELETE)
throw new NullPointerException("Null value.");
KeyCacheObject cacheKey = cctx.toCacheKeyObject(key);
if (op != TRANSFORM)
val = cctx.toCacheObject(val);
else
val = EntryProcessorResourceInjectorProxy.wrap(cctx.kernalContext(), (EntryProcessor)val);
ClusterNode primary = cctx.affinity().primary(cacheKey, topVer);
if (primary == null)
throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " +
"left the grid).");
GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest(
cctx.cacheId(),
primary.id(),
futVer,
false,
updVer,
topVer,
topLocked,
syncMode,
op,
retval,
expiryPlc,
invokeArgs,
filter,
subjId,
taskNameHash,
skipStore,
keepBinary,
cctx.kernalContext().clientNode(),
cctx.deploymentEnabled(),
1);
req.addUpdateEntry(cacheKey,
val,
CU.TTL_NOT_CHANGED,
CU.EXPIRE_TIME_CALCULATE,
null,
true);
return req;
}
/** {@inheritDoc} */
public String toString() {
synchronized (mux) {
return S.toString(GridNearAtomicSingleUpdateFuture.class, this, super.toString());
}
}
}