| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReference; |
| import javax.cache.expiry.ExpiryPolicy; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheAffinityManager; |
| import org.apache.ignite.internal.processors.cache.GridCacheAtomicFuture; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccManager; |
| import org.apache.ignite.internal.processors.cache.GridCacheOperation; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.GridCacheTryPutFailedException; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.tostring.GridToStringInclude; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; |
| |
| /** |
| * DHT atomic cache near update future. |
| */ |
| public class GridNearAtomicUpdateFuture extends GridFutureAdapter<Object> |
| implements GridCacheAtomicFuture<Object>{ |
| /** Logger reference. */ |
| private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); |
| |
| /** Logger. */ |
| protected static IgniteLogger log; |
| |
| /** Cache context. */ |
| private final GridCacheContext cctx; |
| |
| /** Cache. */ |
| private GridDhtAtomicCache cache; |
| |
| /** Update operation. */ |
| private final GridCacheOperation op; |
| |
| /** Keys */ |
| private Collection<?> keys; |
| |
| /** Values. */ |
| @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) |
| private Collection<?> vals; |
| |
| /** Optional arguments for entry processor. */ |
| private Object[] invokeArgs; |
| |
| /** Conflict put values. */ |
| @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) |
| private Collection<GridCacheDrInfo> conflictPutVals; |
| |
| /** Conflict remove values. */ |
| @SuppressWarnings({"FieldAccessedSynchronizedAndUnsynchronized"}) |
| private Collection<GridCacheVersion> conflictRmvVals; |
| |
| /** Return value require flag. */ |
| private final boolean retval; |
| |
| /** Expiry policy. */ |
| private final ExpiryPolicy expiryPlc; |
| |
| /** Optional filter. */ |
| private final CacheEntryPredicate[] filter; |
| |
| /** Write synchronization mode. */ |
| private final CacheWriteSynchronizationMode syncMode; |
| |
| /** Raw return value flag. */ |
| private final boolean rawRetval; |
| |
| /** Fast map flag. */ |
| private final boolean fastMap; |
| |
| /** Near cache flag. */ |
| private final boolean nearEnabled; |
| |
| /** Subject ID. */ |
| private final UUID subjId; |
| |
| /** Task name hash. */ |
| private final int taskNameHash; |
| |
| /** Topology locked flag. Set if atomic update is performed inside a TX or explicit lock. */ |
| private boolean topLocked; |
| |
| /** Skip store flag. */ |
| private final boolean skipStore; |
| |
| /** */ |
| private final boolean keepBinary; |
| |
| /** Wait for topology future flag. */ |
| private final boolean waitTopFut; |
| |
| /** Remap count. */ |
| private int remapCnt; |
| |
| /** State. */ |
| private final UpdateState state; |
| |
| /** |
| * @param cctx Cache context. |
| * @param cache Cache instance. |
| * @param syncMode Write synchronization mode. |
| * @param op Update operation. |
| * @param keys Keys to update. |
| * @param vals Values or transform closure. |
| * @param invokeArgs Optional arguments for entry processor. |
| * @param conflictPutVals Conflict put values (optional). |
| * @param conflictRmvVals Conflict remove values (optional). |
| * @param retval Return value require flag. |
| * @param rawRetval {@code True} if should return {@code GridCacheReturn} as future result. |
| * @param expiryPlc Expiry policy explicitly specified for cache operation. |
| * @param filter Entry filter. |
| * @param subjId Subject ID. |
| * @param taskNameHash Task name hash code. |
| * @param skipStore Skip store flag. |
| * @param keepBinary Keep binary flag. |
| * @param remapCnt Maximum number of retries. |
| * @param waitTopFut If {@code false} does not wait for affinity change future. |
| */ |
| public GridNearAtomicUpdateFuture( |
| GridCacheContext cctx, |
| GridDhtAtomicCache cache, |
| CacheWriteSynchronizationMode syncMode, |
| GridCacheOperation op, |
| Collection<?> keys, |
| @Nullable Collection<?> vals, |
| @Nullable Object[] invokeArgs, |
| @Nullable Collection<GridCacheDrInfo> conflictPutVals, |
| @Nullable Collection<GridCacheVersion> conflictRmvVals, |
| final boolean retval, |
| final boolean rawRetval, |
| @Nullable ExpiryPolicy expiryPlc, |
| final CacheEntryPredicate[] filter, |
| UUID subjId, |
| int taskNameHash, |
| boolean skipStore, |
| boolean keepBinary, |
| int remapCnt, |
| boolean waitTopFut |
| ) { |
| this.rawRetval = rawRetval; |
| |
| assert vals == null || vals.size() == keys.size(); |
| assert conflictPutVals == null || conflictPutVals.size() == keys.size(); |
| assert conflictRmvVals == null || conflictRmvVals.size() == keys.size(); |
| assert subjId != null; |
| |
| this.cctx = cctx; |
| this.cache = cache; |
| this.syncMode = syncMode; |
| this.op = op; |
| this.keys = keys; |
| this.vals = vals; |
| this.invokeArgs = invokeArgs; |
| this.conflictPutVals = conflictPutVals; |
| this.conflictRmvVals = conflictRmvVals; |
| this.retval = retval; |
| this.expiryPlc = expiryPlc; |
| this.filter = filter; |
| this.subjId = subjId; |
| this.taskNameHash = taskNameHash; |
| this.skipStore = skipStore; |
| this.keepBinary = keepBinary; |
| this.waitTopFut = waitTopFut; |
| |
| if (log == null) |
| log = U.logger(cctx.kernalContext(), logRef, GridFutureAdapter.class); |
| |
| fastMap = F.isEmpty(filter) && op != TRANSFORM && cctx.config().getWriteSynchronizationMode() == FULL_SYNC && |
| cctx.config().getAtomicWriteOrderMode() == CLOCK && |
| !(cctx.writeThrough() && cctx.config().getInterceptor() != null); |
| |
| nearEnabled = CU.isNearEnabled(cctx); |
| |
| if (!waitTopFut) |
| remapCnt = 1; |
| |
| this.remapCnt = remapCnt; |
| |
| state = new UpdateState(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid futureId() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCacheVersion version() { |
| return state.futureVersion(); |
| } |
| |
| /** |
| * @return {@code True} if this future should block partition map exchange. |
| */ |
| private boolean waitForPartitionExchange() { |
| // Wait fast-map near atomic update futures in CLOCK mode. |
| return fastMap; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<?> keys() { |
| return keys; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onNodeLeft(UUID nodeId) { |
| state.onNodeLeft(nodeId); |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean trackable() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void markNotTrackable() { |
| // No-op. |
| } |
| |
| /** |
| * Performs future mapping. |
| */ |
| public void map() { |
| AffinityTopologyVersion topVer = cctx.shared().lockedTopologyVersion(null); |
| |
| if (topVer == null) |
| mapOnTopology(); |
| else { |
| topLocked = true; |
| |
| // Cannot remap. |
| remapCnt = 1; |
| |
| state.map(topVer); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Void> completeFuture(AffinityTopologyVersion topVer) { |
| if (waitForPartitionExchange()) { |
| GridFutureAdapter<Void> fut = state.completeFuture(topVer); |
| |
| if (fut != null && isDone()) { |
| fut.onDone(); |
| |
| return null; |
| } |
| |
| return fut; |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("ConstantConditions") |
| @Override public boolean onDone(@Nullable Object res, @Nullable Throwable err) { |
| assert res == null || res instanceof GridCacheReturn; |
| |
| GridCacheReturn ret = (GridCacheReturn)res; |
| |
| Object retval = |
| res == null ? null : rawRetval ? ret : (this.retval || op == TRANSFORM) ? |
| cctx.unwrapBinaryIfNeeded(ret.value(), keepBinary) : ret.success(); |
| |
| if (op == TRANSFORM && retval == null) |
| retval = Collections.emptyMap(); |
| |
| if (super.onDone(retval, err)) { |
| GridCacheVersion futVer = state.onFutureDone(); |
| |
| if (futVer != null) |
| cctx.mvcc().removeAtomicFuture(futVer); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * Response callback. |
| * |
| * @param nodeId Node ID. |
| * @param res Update response. |
| */ |
| public void onResult(UUID nodeId, GridNearAtomicUpdateResponse res) { |
| state.onResult(nodeId, res, false); |
| } |
| |
| /** |
| * Updates near cache. |
| * |
| * @param req Update request. |
| * @param res Update response. |
| */ |
| private void updateNear(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { |
| assert nearEnabled; |
| |
| if (res.remapKeys() != null || !req.hasPrimary()) |
| return; |
| |
| GridNearAtomicCache near = (GridNearAtomicCache)cctx.dht().near(); |
| |
| near.processNearAtomicUpdateResponse(req, res); |
| } |
| |
| /** |
| * Maps future on ready topology. |
| */ |
| private void mapOnTopology() { |
| cache.topology().readLock(); |
| |
| AffinityTopologyVersion topVer = null; |
| |
| try { |
| if (cache.topology().stopping()) { |
| onDone(new IgniteCheckedException("Failed to perform cache operation (cache is stopped): " + |
| cache.name())); |
| |
| return; |
| } |
| |
| GridDhtTopologyFuture fut = cache.topology().topologyVersionFuture(); |
| |
| if (fut.isDone()) { |
| Throwable err = fut.validateCache(cctx); |
| |
| if (err != null) { |
| onDone(err); |
| |
| return; |
| } |
| |
| topVer = fut.topologyVersion(); |
| } |
| else { |
| if (waitTopFut) { |
| assert !topLocked : this; |
| |
| fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { |
| @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> t) { |
| cctx.kernalContext().closure().runLocalSafe(new Runnable() { |
| @Override public void run() { |
| mapOnTopology(); |
| } |
| }); |
| } |
| }); |
| } |
| else |
| onDone(new GridCacheTryPutFailedException()); |
| |
| return; |
| } |
| } |
| finally { |
| cache.topology().readUnlock(); |
| } |
| |
| state.map(topVer); |
| } |
| |
| /** |
| * @return {@code True} future is stored by {@link GridCacheMvccManager#addAtomicFuture}. |
| */ |
| private boolean storeFuture() { |
| return cctx.config().getAtomicWriteOrderMode() == CLOCK || syncMode != FULL_ASYNC; |
| } |
| |
| /** |
| * Maps key to nodes. If filters are absent and operation is not TRANSFORM, then we can assign version on near |
| * node and send updates in parallel to all participating nodes. |
| * |
| * @param key Key to map. |
| * @param topVer Topology version to map. |
| * @param fastMap Flag indicating whether mapping is performed for fast-circuit update. |
| * @return Collection of nodes to which key is mapped. |
| */ |
| private Collection<ClusterNode> mapKey( |
| KeyCacheObject key, |
| AffinityTopologyVersion topVer, |
| boolean fastMap |
| ) { |
| GridCacheAffinityManager affMgr = cctx.affinity(); |
| |
| // If we can send updates in parallel - do it. |
| return fastMap ? |
| cctx.topology().nodes(affMgr.partition(key), topVer) : |
| Collections.singletonList(affMgr.primary(key, topVer)); |
| } |
| |
| /** |
| * Maps future to single node. |
| * |
| * @param nodeId Node ID. |
| * @param req Request. |
| */ |
| private void mapSingle(UUID nodeId, GridNearAtomicUpdateRequest req) { |
| if (cctx.localNodeId().equals(nodeId)) { |
| cache.updateAllAsyncInternal(nodeId, req, |
| new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { |
| @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { |
| onResult(res.nodeId(), res); |
| } |
| }); |
| } |
| else { |
| try { |
| if (log.isDebugEnabled()) |
| log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); |
| |
| cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); |
| |
| if (syncMode == FULL_ASYNC) |
| onDone(new GridCacheReturn(cctx, true, true, null, true)); |
| } |
| catch (IgniteCheckedException e) { |
| state.onSendError(req, e); |
| } |
| } |
| } |
| |
| /** |
| * Sends messages to remote nodes and updates local cache. |
| * |
| * @param mappings Mappings to send. |
| */ |
| private void doUpdate(Map<UUID, GridNearAtomicUpdateRequest> mappings) { |
| UUID locNodeId = cctx.localNodeId(); |
| |
| GridNearAtomicUpdateRequest locUpdate = null; |
| |
| // Send messages to remote nodes first, then run local update. |
| for (GridNearAtomicUpdateRequest req : mappings.values()) { |
| if (locNodeId.equals(req.nodeId())) { |
| assert locUpdate == null : "Cannot have more than one local mapping [locUpdate=" + locUpdate + |
| ", req=" + req + ']'; |
| |
| locUpdate = req; |
| } |
| else { |
| try { |
| if (log.isDebugEnabled()) |
| log.debug("Sending near atomic update request [nodeId=" + req.nodeId() + ", req=" + req + ']'); |
| |
| cctx.io().send(req.nodeId(), req, cctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| state.onSendError(req, e); |
| } |
| } |
| } |
| |
| if (locUpdate != null) { |
| cache.updateAllAsyncInternal(cctx.localNodeId(), locUpdate, |
| new CI2<GridNearAtomicUpdateRequest, GridNearAtomicUpdateResponse>() { |
| @Override public void apply(GridNearAtomicUpdateRequest req, GridNearAtomicUpdateResponse res) { |
| onResult(res.nodeId(), res); |
| } |
| }); |
| } |
| |
| if (syncMode == FULL_ASYNC) |
| onDone(new GridCacheReturn(cctx, true, true, null, true)); |
| } |
| |
| /** |
| * |
| */ |
| private class UpdateState { |
| /** Current topology version. */ |
| private AffinityTopologyVersion topVer = AffinityTopologyVersion.ZERO; |
| |
| /** */ |
| private GridCacheVersion updVer; |
| |
| /** Topology version when got mapping error. */ |
| private AffinityTopologyVersion mapErrTopVer; |
| |
| /** Mappings if operations is mapped to more than one node. */ |
| @GridToStringInclude |
| private Map<UUID, GridNearAtomicUpdateRequest> mappings; |
| |
| /** */ |
| private int resCnt; |
| |
| /** Error. */ |
| private CachePartialUpdateCheckedException err; |
| |
| /** Future ID. */ |
| private GridCacheVersion futVer; |
| |
| /** Completion future for a particular topology version. */ |
| private GridFutureAdapter<Void> topCompleteFut; |
| |
| /** Keys to remap. */ |
| private Collection<KeyCacheObject> remapKeys; |
| |
| /** Not null is operation is mapped to single node. */ |
| private GridNearAtomicUpdateRequest singleReq; |
| |
| /** Operation result. */ |
| private GridCacheReturn opRes; |
| |
| /** |
| * @return Future version. |
| */ |
| @Nullable synchronized GridCacheVersion futureVersion() { |
| return futVer; |
| } |
| |
| /** |
| * @param nodeId Left node ID. |
| */ |
| void onNodeLeft(UUID nodeId) { |
| GridNearAtomicUpdateResponse res = null; |
| |
| synchronized (this) { |
| GridNearAtomicUpdateRequest req; |
| |
| if (singleReq != null) |
| req = singleReq.nodeId().equals(nodeId) ? singleReq : null; |
| else |
| req = mappings != null ? mappings.get(nodeId) : null; |
| |
| if (req != null && req.response() == null) { |
| res = new GridNearAtomicUpdateResponse(cctx.cacheId(), nodeId, req.futureVersion(), |
| cctx.deploymentEnabled()); |
| |
| ClusterTopologyCheckedException e = new ClusterTopologyCheckedException("Primary node left grid " + |
| "before response is received: " + nodeId); |
| |
| e.retryReadyFuture(cctx.shared().nextAffinityReadyFuture(req.topologyVersion())); |
| |
| res.addFailedKeys(req.keys(), e); |
| } |
| } |
| |
| if (res != null) |
| onResult(nodeId, res, true); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| * @param nodeErr {@code True} if response was created on node failure. |
| */ |
| void onResult(UUID nodeId, GridNearAtomicUpdateResponse res, boolean nodeErr) { |
| GridNearAtomicUpdateRequest req; |
| |
| AffinityTopologyVersion remapTopVer = null; |
| |
| GridCacheReturn opRes0 = null; |
| CachePartialUpdateCheckedException err0 = null; |
| |
| boolean rcvAll; |
| |
| GridFutureAdapter<?> fut0 = null; |
| |
| synchronized (this) { |
| if (!res.futureVersion().equals(futVer)) |
| return; |
| |
| if (singleReq != null) { |
| if (!singleReq.nodeId().equals(nodeId)) |
| return; |
| |
| req = singleReq; |
| |
| singleReq = null; |
| |
| rcvAll = true; |
| } |
| else { |
| req = mappings != null ? mappings.get(nodeId) : null; |
| |
| if (req != null && req.onResponse(res)) { |
| resCnt++; |
| |
| rcvAll = mappings.size() == resCnt; |
| } |
| else |
| return; |
| } |
| |
| assert req != null && req.topologyVersion().equals(topVer) : req; |
| |
| if (res.remapKeys() != null) { |
| assert !fastMap || cctx.kernalContext().clientNode(); |
| |
| if (remapKeys == null) |
| remapKeys = U.newHashSet(res.remapKeys().size()); |
| |
| remapKeys.addAll(res.remapKeys()); |
| |
| if (mapErrTopVer == null || mapErrTopVer.compareTo(req.topologyVersion()) < 0) |
| mapErrTopVer = req.topologyVersion(); |
| } |
| else if (res.error() != null) { |
| if (res.failedKeys() != null) |
| addFailedKeys(res.failedKeys(), req.topologyVersion(), res.error()); |
| } |
| else { |
| if (!req.fastMap() || req.hasPrimary()) { |
| GridCacheReturn ret = res.returnValue(); |
| |
| if (op == TRANSFORM) { |
| if (ret != null) |
| addInvokeResults(ret); |
| } |
| else |
| opRes = ret; |
| } |
| } |
| |
| if (rcvAll) { |
| if (remapKeys != null) { |
| assert mapErrTopVer != null; |
| |
| remapTopVer = new AffinityTopologyVersion(mapErrTopVer.topologyVersion() + 1); |
| } |
| else { |
| if (err != null && |
| X.hasCause(err, CachePartialUpdateCheckedException.class) && |
| X.hasCause(err, ClusterTopologyCheckedException.class) && |
| storeFuture() && |
| --remapCnt > 0) { |
| ClusterTopologyCheckedException topErr = |
| X.cause(err, ClusterTopologyCheckedException.class); |
| |
| if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { |
| CachePartialUpdateCheckedException cause = |
| X.cause(err, CachePartialUpdateCheckedException.class); |
| |
| assert cause != null && cause.topologyVersion() != null : err; |
| |
| remapTopVer = |
| new AffinityTopologyVersion(cause.topologyVersion().topologyVersion() + 1); |
| |
| err = null; |
| |
| Collection<Object> failedKeys = cause.failedKeys(); |
| |
| remapKeys = new ArrayList<>(failedKeys.size()); |
| |
| for (Object key : failedKeys) |
| remapKeys.add(cctx.toCacheKeyObject(key)); |
| |
| updVer = null; |
| } |
| } |
| } |
| |
| if (remapTopVer == null) { |
| err0 = err; |
| opRes0 = opRes; |
| } |
| else { |
| fut0 = topCompleteFut; |
| |
| topCompleteFut = null; |
| |
| cctx.mvcc().removeAtomicFuture(futVer); |
| |
| futVer = null; |
| topVer = AffinityTopologyVersion.ZERO; |
| } |
| } |
| } |
| |
| if (res.error() != null && res.failedKeys() == null) { |
| onDone(res.error()); |
| |
| return; |
| } |
| |
| if (rcvAll && nearEnabled) { |
| if (mappings != null) { |
| for (GridNearAtomicUpdateRequest req0 : mappings.values()) { |
| GridNearAtomicUpdateResponse res0 = req0.response(); |
| |
| assert res0 != null : req0; |
| |
| updateNear(req0, res0); |
| } |
| } |
| else if (!nodeErr) |
| updateNear(req, res); |
| } |
| |
| if (remapTopVer != null) { |
| if (fut0 != null) |
| fut0.onDone(); |
| |
| if (!waitTopFut) { |
| onDone(new GridCacheTryPutFailedException()); |
| |
| return; |
| } |
| |
| if (topLocked) { |
| assert !F.isEmpty(remapKeys) : remapKeys; |
| |
| CachePartialUpdateCheckedException e = |
| new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); |
| |
| ClusterTopologyCheckedException cause = new ClusterTopologyCheckedException( |
| "Failed to update keys, topology changed while execute atomic update inside transaction."); |
| |
| cause.retryReadyFuture(cctx.affinity().affinityReadyFuture(remapTopVer)); |
| |
| e.add(remapKeys, cause); |
| |
| onDone(e); |
| |
| return; |
| } |
| |
| IgniteInternalFuture<AffinityTopologyVersion> fut = cctx.affinity().affinityReadyFuture(remapTopVer); |
| |
| fut.listen(new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() { |
| @Override public void apply(final IgniteInternalFuture<AffinityTopologyVersion> fut) { |
| cctx.kernalContext().closure().runLocalSafe(new Runnable() { |
| @Override public void run() { |
| try { |
| AffinityTopologyVersion topVer = fut.get(); |
| |
| map(topVer); |
| } |
| catch (IgniteCheckedException e) { |
| onDone(e); |
| } |
| } |
| }); |
| } |
| }); |
| |
| return; |
| } |
| |
| if (rcvAll) |
| onDone(opRes0, err0); |
| } |
| |
| /** |
| * @param req Request. |
| * @param e Error. |
| */ |
| void onSendError(GridNearAtomicUpdateRequest req, IgniteCheckedException e) { |
| synchronized (this) { |
| GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(cctx.cacheId(), |
| req.nodeId(), |
| req.futureVersion(), |
| cctx.deploymentEnabled()); |
| |
| res.addFailedKeys(req.keys(), e); |
| |
| onResult(req.nodeId(), res, true); |
| } |
| } |
| |
| /** |
| * @param topVer Topology version. |
| */ |
| void map(AffinityTopologyVersion topVer) { |
| Collection<ClusterNode> topNodes = CU.affinityNodes(cctx, topVer); |
| |
| if (F.isEmpty(topNodes)) { |
| onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + |
| "left the grid).")); |
| |
| return; |
| } |
| |
| Exception err = null; |
| GridNearAtomicUpdateRequest singleReq0 = null; |
| Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = null; |
| |
| int size = keys.size(); |
| |
| synchronized (this) { |
| assert futVer == null : this; |
| assert this.topVer == AffinityTopologyVersion.ZERO : this; |
| |
| resCnt = 0; |
| |
| this.topVer = topVer; |
| |
| futVer = cctx.versions().next(topVer); |
| |
| if (storeFuture()) { |
| if (!cctx.mvcc().addAtomicFuture(futVer, GridNearAtomicUpdateFuture.this)) { |
| assert isDone() : GridNearAtomicUpdateFuture.this; |
| |
| return; |
| } |
| } |
| |
| // Assign version on near node in CLOCK ordering mode even if fastMap is false. |
| if (updVer == null) |
| updVer = cctx.config().getAtomicWriteOrderMode() == CLOCK ? cctx.versions().next(topVer) : null; |
| |
| if (updVer != null && log.isDebugEnabled()) |
| log.debug("Assigned fast-map version for update on near node: " + updVer); |
| |
| try { |
| if (size == 1 && !fastMap) { |
| assert remapKeys == null || remapKeys.size() == 1; |
| |
| singleReq0 = singleReq = mapSingleUpdate(); |
| } |
| else { |
| pendingMappings = mapUpdate(topNodes); |
| |
| if (pendingMappings.size() == 1) |
| singleReq0 = singleReq = F.firstValue(pendingMappings); |
| else { |
| if (syncMode == PRIMARY_SYNC) { |
| mappings = U.newHashMap(pendingMappings.size()); |
| |
| for (GridNearAtomicUpdateRequest req : pendingMappings.values()) { |
| if (req.hasPrimary()) |
| mappings.put(req.nodeId(), req); |
| } |
| } |
| else |
| mappings = new HashMap<>(pendingMappings); |
| |
| assert !mappings.isEmpty() || size == 0 : GridNearAtomicUpdateFuture.this; |
| } |
| } |
| |
| remapKeys = null; |
| } |
| catch (Exception e) { |
| err = e; |
| } |
| } |
| |
| if (err != null) { |
| onDone(err); |
| |
| return; |
| } |
| |
| // Optimize mapping for single key. |
| if (singleReq0 != null) |
| mapSingle(singleReq0.nodeId(), singleReq0); |
| else { |
| assert pendingMappings != null; |
| |
| if (size == 0) |
| onDone(new GridCacheReturn(cctx, true, true, null, true)); |
| else |
| doUpdate(pendingMappings); |
| } |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @return Future. |
| */ |
| @Nullable synchronized GridFutureAdapter<Void> completeFuture(AffinityTopologyVersion topVer) { |
| if (this.topVer == AffinityTopologyVersion.ZERO) |
| return null; |
| |
| if (this.topVer.compareTo(topVer) < 0) { |
| if (topCompleteFut == null) |
| topCompleteFut = new GridFutureAdapter<>(); |
| |
| return topCompleteFut; |
| } |
| |
| return null; |
| } |
| |
| /** |
| * @return Future version. |
| */ |
| GridCacheVersion onFutureDone() { |
| GridCacheVersion ver0; |
| |
| GridFutureAdapter<Void> fut0; |
| |
| synchronized (this) { |
| fut0 = topCompleteFut; |
| |
| topCompleteFut = null; |
| |
| ver0 = futVer; |
| |
| futVer = null; |
| } |
| |
| if (fut0 != null) |
| fut0.onDone(); |
| |
| return ver0; |
| } |
| |
| /** |
| * @param topNodes Cache nodes. |
| * @return Mapping. |
| * @throws Exception If failed. |
| */ |
| private Map<UUID, GridNearAtomicUpdateRequest> mapUpdate(Collection<ClusterNode> topNodes) throws Exception { |
| Iterator<?> it = null; |
| |
| if (vals != null) |
| it = vals.iterator(); |
| |
| Iterator<GridCacheDrInfo> conflictPutValsIt = null; |
| |
| if (conflictPutVals != null) |
| conflictPutValsIt = conflictPutVals.iterator(); |
| |
| Iterator<GridCacheVersion> conflictRmvValsIt = null; |
| |
| if (conflictRmvVals != null) |
| conflictRmvValsIt = conflictRmvVals.iterator(); |
| |
| Map<UUID, GridNearAtomicUpdateRequest> pendingMappings = U.newHashMap(topNodes.size()); |
| |
| // Create mappings first, then send messages. |
| for (Object key : keys) { |
| if (key == null) |
| throw new NullPointerException("Null key."); |
| |
| Object val; |
| GridCacheVersion conflictVer; |
| long conflictTtl; |
| long conflictExpireTime; |
| |
| if (vals != null) { |
| val = it.next(); |
| conflictVer = null; |
| conflictTtl = CU.TTL_NOT_CHANGED; |
| conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; |
| |
| if (val == null) |
| throw new NullPointerException("Null value."); |
| } |
| else if (conflictPutVals != null) { |
| GridCacheDrInfo conflictPutVal = conflictPutValsIt.next(); |
| |
| val = conflictPutVal.value(); |
| conflictVer = conflictPutVal.version(); |
| conflictTtl = conflictPutVal.ttl(); |
| conflictExpireTime = conflictPutVal.expireTime(); |
| } |
| else if (conflictRmvVals != null) { |
| val = null; |
| conflictVer = conflictRmvValsIt.next(); |
| conflictTtl = CU.TTL_NOT_CHANGED; |
| conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; |
| } |
| else { |
| val = null; |
| conflictVer = null; |
| conflictTtl = CU.TTL_NOT_CHANGED; |
| conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; |
| } |
| |
| if (val == null && op != GridCacheOperation.DELETE) |
| continue; |
| |
| KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); |
| |
| if (remapKeys != null && !remapKeys.contains(cacheKey)) |
| continue; |
| |
| if (op != TRANSFORM) |
| val = cctx.toCacheObject(val); |
| |
| Collection<ClusterNode> affNodes = mapKey(cacheKey, topVer, fastMap); |
| |
| if (affNodes.isEmpty()) |
| throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + |
| "(all partition nodes left the grid)."); |
| |
| int i = 0; |
| |
| for (ClusterNode affNode : affNodes) { |
| if (affNode == null) |
| throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache " + |
| "(all partition nodes left the grid)."); |
| |
| UUID nodeId = affNode.id(); |
| |
| GridNearAtomicUpdateRequest mapped = pendingMappings.get(nodeId); |
| |
| if (mapped == null) { |
| mapped = new GridNearAtomicUpdateRequest( |
| cctx.cacheId(), |
| nodeId, |
| futVer, |
| fastMap, |
| updVer, |
| topVer, |
| topLocked, |
| syncMode, |
| op, |
| retval, |
| expiryPlc, |
| invokeArgs, |
| filter, |
| subjId, |
| taskNameHash, |
| skipStore, |
| keepBinary, |
| cctx.kernalContext().clientNode(), |
| cctx.deploymentEnabled()); |
| |
| pendingMappings.put(nodeId, mapped); |
| } |
| |
| mapped.addUpdateEntry(cacheKey, val, conflictTtl, conflictExpireTime, conflictVer, i == 0); |
| |
| i++; |
| } |
| } |
| |
| return pendingMappings; |
| } |
| |
| /** |
| * @return Request. |
| * @throws Exception If failed. |
| */ |
| private GridNearAtomicUpdateRequest mapSingleUpdate() throws Exception { |
| Object key = F.first(keys); |
| |
| Object val; |
| GridCacheVersion conflictVer; |
| long conflictTtl; |
| long conflictExpireTime; |
| |
| if (vals != null) { |
| // Regular PUT. |
| val = F.first(vals); |
| conflictVer = null; |
| conflictTtl = CU.TTL_NOT_CHANGED; |
| conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; |
| } |
| else if (conflictPutVals != null) { |
| // Conflict PUT. |
| GridCacheDrInfo conflictPutVal = F.first(conflictPutVals); |
| |
| val = conflictPutVal.value(); |
| conflictVer = conflictPutVal.version(); |
| conflictTtl = conflictPutVal.ttl(); |
| conflictExpireTime = conflictPutVal.expireTime(); |
| } |
| else if (conflictRmvVals != null) { |
| // Conflict REMOVE. |
| val = null; |
| conflictVer = F.first(conflictRmvVals); |
| conflictTtl = CU.TTL_NOT_CHANGED; |
| conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; |
| } |
| else { |
| // Regular REMOVE. |
| val = null; |
| conflictVer = null; |
| conflictTtl = CU.TTL_NOT_CHANGED; |
| conflictExpireTime = CU.EXPIRE_TIME_CALCULATE; |
| } |
| |
| // We still can get here if user pass map with single element. |
| if (key == null) |
| throw new NullPointerException("Null key."); |
| |
| if (val == null && op != GridCacheOperation.DELETE) |
| throw new NullPointerException("Null value."); |
| |
| KeyCacheObject cacheKey = cctx.toCacheKeyObject(key); |
| |
| if (op != TRANSFORM) |
| val = cctx.toCacheObject(val); |
| |
| ClusterNode primary = cctx.affinity().primary(cacheKey, topVer); |
| |
| if (primary == null) |
| throw new ClusterTopologyServerNotFoundException("Failed to map keys for cache (all partition nodes " + |
| "left the grid)."); |
| |
| GridNearAtomicUpdateRequest req = new GridNearAtomicUpdateRequest( |
| cctx.cacheId(), |
| primary.id(), |
| futVer, |
| fastMap, |
| updVer, |
| topVer, |
| topLocked, |
| syncMode, |
| op, |
| retval, |
| expiryPlc, |
| invokeArgs, |
| filter, |
| subjId, |
| taskNameHash, |
| skipStore, |
| keepBinary, |
| cctx.kernalContext().clientNode(), |
| cctx.deploymentEnabled()); |
| |
| req.addUpdateEntry(cacheKey, |
| val, |
| conflictTtl, |
| conflictExpireTime, |
| conflictVer, |
| true); |
| |
| return req; |
| } |
| |
| /** |
| * @param ret Result from single node. |
| */ |
| @SuppressWarnings("unchecked") |
| private void addInvokeResults(GridCacheReturn ret) { |
| assert op == TRANSFORM : op; |
| assert ret.value() == null || ret.value() instanceof Map : ret.value(); |
| |
| if (ret.value() != null) { |
| if (opRes != null) |
| opRes.mergeEntryProcessResults(ret); |
| else |
| opRes = ret; |
| } |
| } |
| |
| /** |
| * @param failedKeys Failed keys. |
| * @param topVer Topology version for failed update. |
| * @param err Error cause. |
| */ |
| private void addFailedKeys(Collection<KeyCacheObject> failedKeys, |
| AffinityTopologyVersion topVer, |
| Throwable err) { |
| CachePartialUpdateCheckedException err0 = this.err; |
| |
| if (err0 == null) |
| err0 = this.err = new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); |
| |
| Collection<Object> keys = new ArrayList<>(failedKeys.size()); |
| |
| for (KeyCacheObject key : failedKeys) |
| keys.add(cctx.cacheObjectContext().unwrapBinaryIfNeeded(key, keepBinary, false)); |
| |
| err0.add(keys, err, topVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public synchronized String toString() { |
| return S.toString(UpdateState.class, this); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| public String toString() { |
| return S.toString(GridNearAtomicUpdateFuture.class, this, super.toString()); |
| } |
| } |