| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; |
| |
| import java.io.Externalizable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import javax.cache.expiry.ExpiryPolicy; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.EntryProcessorResult; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; |
| import org.apache.ignite.internal.processors.cache.CacheInvokeResult; |
| import org.apache.ignite.internal.processors.cache.CacheLazyEntry; |
| import org.apache.ignite.internal.processors.cache.CacheMetricsImpl; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException; |
| import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntryFactory; |
| import org.apache.ignite.internal.processors.cache.GridCacheOperation; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; |
| import org.apache.ignite.internal.processors.cache.GridDeferredAckMessageSender; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtPreloader; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; |
| import org.apache.ignite.internal.util.GridUnsafe; |
| import org.apache.ignite.internal.util.future.GridEmbeddedFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.CO; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.P1; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteOutClosure; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| import org.jsr166.ConcurrentLinkedDeque8; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT; |
| import static org.apache.ignite.cache.CacheAtomicWriteOrderMode.CLOCK; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; |
| |
| /** |
| * Non-transactional partitioned cache. |
| */ |
| @SuppressWarnings("unchecked") |
| @GridToStringExclude |
| public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Deferred update response buffer size. */ |
| private static final int DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE = |
| Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE, 256); |
| |
| /** Deferred update response timeout. */ |
| private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT = |
| Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, 500); |
| |
| /** Update reply closure. */ |
| @GridToStringExclude |
| private CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> updateReplyClos; |
| |
| /** Pending */ |
| private GridDeferredAckMessageSender deferredUpdateMsgSnd; |
| |
| /** */ |
| private GridNearAtomicCache<K, V> near; |
| |
| /** Logger. */ |
| private IgniteLogger msgLog; |
| |
| /** |
| * Empty constructor required by {@link Externalizable}. |
| */ |
| public GridDhtAtomicCache() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Cache context. |
| */ |
| public GridDhtAtomicCache(GridCacheContext<K, V> ctx) { |
| super(ctx); |
| |
| msgLog = ctx.shared().atomicMessageLogger(); |
| } |
| |
| /** |
| * @param ctx Cache context. |
| * @param map Cache concurrent map. |
| */ |
| public GridDhtAtomicCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { |
| super(ctx, map); |
| |
| msgLog = ctx.shared().atomicMessageLogger(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void checkJta() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDhtAtomic() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected GridCacheMapEntryFactory entryFactory() { |
| return new GridCacheMapEntryFactory() { |
| @Override public GridCacheMapEntry create( |
| GridCacheContext ctx, |
| AffinityTopologyVersion topVer, |
| KeyCacheObject key, |
| int hash, |
| CacheObject val |
| ) { |
| if (ctx.useOffheapEntry()) |
| return new GridDhtAtomicOffHeapCacheEntry(ctx, topVer, key, hash, val); |
| |
| return new GridDhtAtomicCacheEntry(ctx, topVer, key, hash, val); |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void init() { |
| super.init(); |
| |
| updateReplyClos = new CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse>() { |
| @SuppressWarnings("ThrowableResultOfMethodCallIgnored") |
| @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { |
| if (ctx.config().getAtomicWriteOrderMode() == CLOCK) { |
| assert req.writeSynchronizationMode() != FULL_ASYNC : req; |
| |
| // Always send reply in CLOCK ordering mode. |
| sendNearUpdateReply(res.nodeId(), res); |
| |
| return; |
| } |
| |
| // Request should be for primary keys only in PRIMARY ordering mode. |
| assert req.hasPrimary() : req; |
| |
| if (req.writeSynchronizationMode() != FULL_ASYNC) |
| sendNearUpdateReply(res.nodeId(), res); |
| else { |
| if (!F.isEmpty(res.remapKeys())) |
| // Remap keys on primary node in FULL_ASYNC mode. |
| remapToNewPrimary(req); |
| else if (res.error() != null) { |
| U.error(log, "Failed to process write update request in FULL_ASYNC mode for keys: " + |
| res.failedKeys(), res.error()); |
| } |
| } |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings({"IfMayBeConditional", "SimplifiableIfStatement"}) |
| @Override public void start() throws IgniteCheckedException { |
| super.start(); |
| |
| deferredUpdateMsgSnd = new GridDeferredAckMessageSender(ctx.time(), ctx.closures()) { |
| @Override public int getTimeout() { |
| return DEFERRED_UPDATE_RESPONSE_TIMEOUT; |
| } |
| |
| @Override public int getBufferSize() { |
| return DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE; |
| } |
| |
| @Override public void finish(UUID nodeId, ConcurrentLinkedDeque8<GridCacheVersion> vers) { |
| GridDhtAtomicDeferredUpdateResponse msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), |
| vers, ctx.deploymentEnabled()); |
| |
| try { |
| ctx.kernalContext().gateway().readLock(); |
| |
| try { |
| ctx.io().send(nodeId, msg, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureVersions() + |
| ", node=" + nodeId + ']'); |
| } |
| } |
| finally { |
| ctx.kernalContext().gateway().readUnlock(); |
| } |
| } |
| catch (IllegalStateException ignored) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to send deferred DHT update response, node is stopping [" + |
| "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to send deferred DHT update response, node left [" + |
| "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send deferred DHT update response to remote node [" + |
| "futIds=" + msg.futureVersions() + ", node=" + nodeId + ']', e); |
| } |
| } |
| }; |
| |
| CacheMetricsImpl m = new CacheMetricsImpl(ctx); |
| |
| if (ctx.dht().near() != null) |
| m.delegate(ctx.dht().near().metrics0()); |
| |
| metrics = m; |
| |
| preldr = new GridDhtPreloader(ctx); |
| |
| preldr.start(); |
| |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridNearGetRequest.class, |
| new CI2<UUID, GridNearGetRequest>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearGetRequest req |
| ) { |
| processNearGetRequest( |
| nodeId, |
| req); |
| } |
| }); |
| |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridNearSingleGetRequest.class, |
| new CI2<UUID, GridNearSingleGetRequest>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearSingleGetRequest req |
| ) { |
| processNearSingleGetRequest( |
| nodeId, |
| req); |
| } |
| }); |
| |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridNearAtomicAbstractUpdateRequest.class, |
| new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearAtomicAbstractUpdateRequest req |
| ) { |
| processNearAtomicUpdateRequest( |
| nodeId, |
| req); |
| } |
| |
| @Override public String toString() { |
| return "GridNearAtomicAbstractUpdateRequest handler " + |
| "[msgIdx=" + GridNearAtomicAbstractUpdateRequest.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addHandler(ctx.cacheId(), |
| GridNearAtomicUpdateResponse.class, |
| new CI2<UUID, GridNearAtomicUpdateResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearAtomicUpdateResponse res |
| ) { |
| processNearAtomicUpdateResponse( |
| nodeId, |
| res); |
| } |
| |
| @Override public String toString() { |
| return "GridNearAtomicUpdateResponse handler " + |
| "[msgIdx=" + GridNearAtomicUpdateResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridDhtAtomicAbstractUpdateRequest.class, |
| new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridDhtAtomicAbstractUpdateRequest req |
| ) { |
| processDhtAtomicUpdateRequest( |
| nodeId, |
| req); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicUpdateRequest handler " + |
| "[msgIdx=" + GridDhtAtomicUpdateRequest.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridDhtAtomicUpdateResponse.class, |
| new CI2<UUID, GridDhtAtomicUpdateResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridDhtAtomicUpdateResponse res |
| ) { |
| processDhtAtomicUpdateResponse( |
| nodeId, |
| res); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicUpdateResponse handler " + |
| "[msgIdx=" + GridDhtAtomicUpdateResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addHandler(ctx.cacheId(), |
| GridDhtAtomicDeferredUpdateResponse.class, |
| new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridDhtAtomicDeferredUpdateResponse res |
| ) { |
| processDhtAtomicDeferredUpdateResponse( |
| nodeId, |
| res); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicDeferredUpdateResponse handler " + |
| "[msgIdx=" + GridDhtAtomicDeferredUpdateResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| if (near == null) { |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridNearGetResponse.class, |
| new CI2<UUID, GridNearGetResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearGetResponse res |
| ) { |
| processNearGetResponse( |
| nodeId, |
| res); |
| } |
| }); |
| |
| ctx.io().addHandler( |
| ctx.cacheId(), |
| GridNearSingleGetResponse.class, |
| new CI2<UUID, GridNearSingleGetResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearSingleGetResponse res |
| ) { |
| processNearSingleGetResponse( |
| nodeId, |
| res); |
| } |
| }); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| deferredUpdateMsgSnd.stop(); |
| } |
| |
| /** |
| * @param near Near cache. |
| */ |
| public void near(GridNearAtomicCache<K, V> near) { |
| this.near = near; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNearCacheAdapter<K, V> near() { |
| return near; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected V get0(K key, String taskName, boolean deserializeBinary, boolean needVer) |
| throws IgniteCheckedException { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| if (keyCheck) |
| validateCacheKey(key); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| UUID subjId = ctx.subjectIdPerCall(null, opCtx); |
| |
| final ExpiryPolicy expiryPlc = opCtx != null ? opCtx.expiry() : null; |
| |
| final boolean skipStore = opCtx != null && opCtx.skipStore(); |
| |
| try { |
| return getAsync0(ctx.toCacheKeyObject(key), |
| !ctx.config().isReadFromBackup(), |
| subjId, |
| taskName, |
| deserializeBinary, |
| expiryPlc, |
| false, |
| skipStore, |
| true, |
| needVer).get(); |
| } |
| catch (IgniteException e) { |
| if (e.getCause(IgniteCheckedException.class) != null) |
| throw e.getCause(IgniteCheckedException.class); |
| else |
| throw e; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteInternalFuture<V> getAsync(final K key, |
| final boolean forcePrimary, |
| final boolean skipTx, |
| @Nullable UUID subjId, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean skipVals, |
| final boolean canRemap, |
| final boolean needVer) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| if (keyCheck) |
| validateCacheKey(key); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| subjId = ctx.subjectIdPerCall(null, opCtx); |
| |
| final UUID subjId0 = subjId; |
| |
| final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null; |
| |
| final boolean skipStore = opCtx != null && opCtx.skipStore(); |
| |
| return asyncOp(new CO<IgniteInternalFuture<V>>() { |
| @Override public IgniteInternalFuture<V> apply() { |
| return getAsync0(ctx.toCacheKeyObject(key), |
| forcePrimary, |
| subjId0, |
| taskName, |
| deserializeBinary, |
| expiryPlc, |
| skipVals, |
| skipStore, |
| canRemap, |
| needVer); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected Map<K, V> getAll0(Collection<? extends K> keys, boolean deserializeBinary, boolean needVer) |
| throws IgniteCheckedException { |
| return getAllAsyncInternal(keys, |
| !ctx.config().isReadFromBackup(), |
| true, |
| null, |
| ctx.kernalContext().job().currentTaskName(), |
| deserializeBinary, |
| false, |
| true, |
| needVer, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable final Collection<? extends K> keys, |
| final boolean forcePrimary, |
| boolean skipTx, |
| @Nullable UUID subjId, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean skipVals, |
| final boolean canRemap, |
| final boolean needVer |
| ) { |
| return getAllAsyncInternal(keys, |
| forcePrimary, |
| skipTx, |
| subjId, |
| taskName, |
| deserializeBinary, |
| skipVals, |
| canRemap, |
| needVer, |
| true); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param forcePrimary Force primary flag. |
| * @param skipTx Skip tx flag. |
| * @param subjId Subject ID. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param skipVals Skip values flag. |
| * @param canRemap Can remap flag. |
| * @param needVer Need version flag. |
| * @param asyncOp Async operation flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal( |
| @Nullable final Collection<? extends K> keys, |
| final boolean forcePrimary, |
| boolean skipTx, |
| @Nullable UUID subjId, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean skipVals, |
| final boolean canRemap, |
| final boolean needVer, |
| boolean asyncOp |
| ) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| if (F.isEmpty(keys)) |
| return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); |
| |
| if (keyCheck) |
| validateCacheKeys(keys); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| subjId = ctx.subjectIdPerCall(subjId, opCtx); |
| |
| final UUID subjId0 = subjId; |
| |
| final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null; |
| |
| final boolean skipStore = opCtx != null && opCtx.skipStore(); |
| |
| if (asyncOp) { |
| return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { |
| @Override public IgniteInternalFuture<Map<K, V>> apply() { |
| return getAllAsync0(ctx.cacheKeysView(keys), |
| forcePrimary, |
| subjId0, |
| taskName, |
| deserializeBinary, |
| expiryPlc, |
| skipVals, |
| skipStore, |
| canRemap, |
| needVer); |
| } |
| }); |
| } |
| else { |
| return getAllAsync0(ctx.cacheKeysView(keys), |
| forcePrimary, |
| subjId0, |
| taskName, |
| deserializeBinary, |
| expiryPlc, |
| skipVals, |
| skipStore, |
| canRemap, |
| needVer); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { |
| return (V)update0( |
| key, |
| val, |
| null, |
| null, |
| true, |
| filter, |
| true, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { |
| Boolean res = (Boolean)update0( |
| key, |
| val, |
| null, |
| null, |
| false, |
| filter, |
| true, |
| false).get(); |
| |
| assert res != null; |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { |
| return update0( |
| key, |
| val, |
| null, |
| null, |
| true, |
| filter, |
| true, |
| true); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { |
| return update0( |
| key, |
| val, |
| null, |
| null, |
| false, |
| filter, |
| true, |
| true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public V tryGetAndPut(K key, V val) throws IgniteCheckedException { |
| A.notNull(key, "key", val, "val"); |
| |
| return (V) update0( |
| key, |
| val, |
| null, |
| null, |
| true, |
| null, |
| false, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException { |
| updateAll0(m, |
| null, |
| null, |
| null, |
| null, |
| false, |
| false, |
| true, |
| UPDATE, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) { |
| return updateAll0(m, |
| null, |
| null, |
| null, |
| null, |
| false, |
| false, |
| true, |
| UPDATE, |
| true).chain(RET2NULL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) |
| throws IgniteCheckedException { |
| putAllConflictAsync(conflictMap).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) { |
| ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); |
| |
| return updateAll0(null, |
| null, |
| null, |
| conflictMap, |
| null, |
| false, |
| false, |
| true, |
| UPDATE, |
| true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public V getAndRemove0(K key) throws IgniteCheckedException { |
| return (V)remove0(key, true, null, false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) { |
| return remove0(key, true, null, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException { |
| removeAllAsync0(keys, null, false, false, false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) { |
| return removeAllAsync0(keys, null, false, false, true).chain(RET2NULL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean remove0(K key, CacheEntryPredicate filter) throws IgniteCheckedException { |
| return (Boolean)remove0(key, false, filter, false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) { |
| return remove0(key, false, filter, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> conflictMap) |
| throws IgniteCheckedException { |
| removeAllConflictAsync(conflictMap).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) { |
| ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); |
| |
| return removeAllAsync0(null, conflictMap, false, false, true); |
| } |
| |
| /** |
| * @return {@code True} if store write-through enabled. |
| */ |
| private boolean writeThrough() { |
| return ctx.writeThrough() && ctx.store().configured(); |
| } |
| |
| /** |
| * @param op Operation closure. |
| * @return Future. |
| */ |
| @SuppressWarnings("unchecked") |
| private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { |
| IgniteInternalFuture<T> fail = asyncOpAcquire(); |
| |
| if (fail != null) |
| return fail; |
| |
| FutureHolder holder = lastFut.get(); |
| |
| holder.lock(); |
| |
| try { |
| IgniteInternalFuture fut = holder.future(); |
| |
| if (fut != null && !fut.isDone()) { |
| IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, |
| new IgniteOutClosure<IgniteInternalFuture>() { |
| @Override public IgniteInternalFuture<T> apply() { |
| if (ctx.kernalContext().isStopping()) |
| return new GridFinishedFuture<>( |
| new IgniteCheckedException("Operation has been cancelled (node is stopping).")); |
| |
| return op.apply(); |
| } |
| }); |
| |
| saveFuture(holder, f); |
| |
| return f; |
| } |
| |
| IgniteInternalFuture<T> f = op.apply(); |
| |
| saveFuture(holder, f); |
| |
| return f; |
| } |
| finally { |
| holder.unlock(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, |
| long timeout, |
| @Nullable IgniteTxLocalEx tx, |
| boolean isInvalidate, |
| boolean isRead, |
| boolean retval, |
| @Nullable TransactionIsolation isolation, |
| long accessTtl) { |
| return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " + |
| "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) |
| throws IgniteCheckedException { |
| IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invoke0(false, key, entryProcessor, args); |
| |
| EntryProcessorResult<T> res = invokeFut.get(); |
| |
| return res != null ? res : new CacheInvokeResult<T>(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) throws IgniteCheckedException |
| { |
| return invokeAll0(false, keys, entryProcessor, args).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| return invoke0(true, key, entryProcessor, args); |
| } |
| |
| /** |
| * @param async Async operation flag. |
| * @param key Key. |
| * @param entryProcessor Entry processor. |
| * @param args Entry processor arguments. |
| * @return Future. |
| */ |
| private <T> IgniteInternalFuture<EntryProcessorResult<T>> invoke0( |
| boolean async, |
| K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| A.notNull(key, "key", entryProcessor, "entryProcessor"); |
| |
| if (keyCheck) |
| validateCacheKey(key); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = update0( |
| key, |
| null, |
| entryProcessor, |
| args, |
| false, |
| null, |
| true, |
| async); |
| |
| return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { |
| @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) |
| throws IgniteCheckedException { |
| Map<K, EntryProcessorResult<T>> resMap = fut.get(); |
| |
| if (resMap != null) { |
| assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); |
| |
| EntryProcessorResult<T> res = resMap.isEmpty() ? null : resMap.values().iterator().next(); |
| |
| if (res instanceof CacheInvokeResult) { |
| CacheInvokeResult invokeRes = (CacheInvokeResult)res; |
| |
| if (invokeRes.result() != null) |
| res = CacheInvokeResult.fromResult((T)ctx.unwrapBinaryIfNeeded(invokeRes.result(), |
| keepBinary, false)); |
| } |
| |
| return res; |
| } |
| |
| return null; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| return invokeAll0(true, keys, entryProcessor, args); |
| } |
| |
| /** |
| * @param async Async operation flag. |
| * @param keys Keys. |
| * @param entryProcessor Entry processor. |
| * @param args Entry processor arguments. |
| * @return Future. |
| */ |
| private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( |
| boolean async, |
| Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| A.notNull(keys, "keys", entryProcessor, "entryProcessor"); |
| |
| if (keyCheck) |
| validateCacheKeys(keys); |
| |
| Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { |
| @Override public EntryProcessor apply(K k) { |
| return entryProcessor; |
| } |
| }); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAll0(null, |
| invokeMap, |
| args, |
| null, |
| null, |
| false, |
| false, |
| true, |
| TRANSFORM, |
| async); |
| |
| return resFut.chain( |
| new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { |
| @Override public Map<K, EntryProcessorResult<T>> applyx( |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut |
| ) throws IgniteCheckedException { |
| Map<Object, EntryProcessorResult> resMap = (Map)fut.get(); |
| |
| return ctx.unwrapInvokeResult(resMap, keepBinary); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( |
| Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| Object... args) throws IgniteCheckedException { |
| A.notNull(map, "map"); |
| |
| if (keyCheck) |
| validateCacheKeys(map.keySet()); |
| |
| return (Map<K, EntryProcessorResult<T>>)updateAll0(null, |
| map, |
| args, |
| null, |
| null, |
| false, |
| false, |
| true, |
| TRANSFORM, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( |
| Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| Object... args) { |
| A.notNull(map, "map"); |
| |
| if (keyCheck) |
| validateCacheKeys(map.keySet()); |
| |
| return updateAll0(null, |
| map, |
| args, |
| null, |
| null, |
| false, |
| false, |
| true, |
| TRANSFORM, |
| true); |
| } |
| |
| /** |
| * Entry point for all public API put/transform methods. |
| * |
| * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. |
| * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. |
| * @param invokeArgs Optional arguments for EntryProcessor. |
| * @param conflictPutMap Conflict put map. |
| * @param conflictRmvMap Conflict remove map. |
| * @param retval Return value required flag. |
| * @param rawRetval Return {@code GridCacheReturn} instance. |
| * @param waitTopFut Whether to wait for topology future. |
| * @param async Async operation flag. |
| * @return Completion future. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| private IgniteInternalFuture updateAll0( |
| @Nullable Map<? extends K, ? extends V> map, |
| @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, |
| @Nullable Object[] invokeArgs, |
| @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, |
| @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, |
| final boolean retval, |
| final boolean rawRetval, |
| final boolean waitTopFut, |
| final GridCacheOperation op, |
| boolean async |
| ) { |
| assert ctx.updatesAllowed(); |
| |
| if (map != null && keyCheck) |
| validateCacheKeys(map.keySet()); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_PUT); |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| if (opCtx != null && opCtx.hasDataCenterId()) { |
| assert conflictPutMap == null : conflictPutMap; |
| assert conflictRmvMap == null : conflictRmvMap; |
| |
| if (op == GridCacheOperation.TRANSFORM) { |
| assert invokeMap != null : invokeMap; |
| |
| conflictPutMap = F.viewReadOnly((Map)invokeMap, |
| new IgniteClosure<EntryProcessor, GridCacheDrInfo>() { |
| @Override public GridCacheDrInfo apply(EntryProcessor o) { |
| return new GridCacheDrInfo(o, ctx.versions().next(opCtx.dataCenterId())); |
| } |
| }); |
| |
| invokeMap = null; |
| } |
| else if (op == GridCacheOperation.DELETE) { |
| assert map != null : map; |
| |
| conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() { |
| @Override public GridCacheVersion apply(V o) { |
| return ctx.versions().next(opCtx.dataCenterId()); |
| } |
| }); |
| |
| map = null; |
| } |
| else { |
| assert map != null : map; |
| |
| conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() { |
| @Override public GridCacheDrInfo apply(V o) { |
| return new GridCacheDrInfo(ctx.toCacheObject(o), ctx.versions().next(opCtx.dataCenterId())); |
| } |
| }); |
| |
| map = null; |
| } |
| } |
| |
| UUID subjId = ctx.subjectIdPerCall(null, opCtx); |
| |
| int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); |
| |
| final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| op, |
| map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ? |
| conflictPutMap.keySet() : conflictRmvMap.keySet(), |
| map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, |
| invokeArgs, |
| (Collection)(conflictPutMap != null ? conflictPutMap.values() : null), |
| conflictRmvMap != null ? conflictRmvMap.values() : null, |
| retval, |
| rawRetval, |
| opCtx != null ? opCtx.expiry() : null, |
| CU.filterArray(null), |
| subjId, |
| taskNameHash, |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, |
| waitTopFut); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Entry point for update/invoke with a single key. |
| * |
| * @param key Key. |
| * @param val Value. |
| * @param proc Entry processor. |
| * @param invokeArgs Invoke arguments. |
| * @param retval Return value flag. |
| * @param filter Filter. |
| * @param waitTopFut Whether to wait for topology future. |
| * @param async Async operation flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture update0( |
| K key, |
| @Nullable V val, |
| @Nullable EntryProcessor proc, |
| @Nullable Object[] invokeArgs, |
| final boolean retval, |
| @Nullable final CacheEntryPredicate filter, |
| final boolean waitTopFut, |
| boolean async |
| ) { |
| assert val == null || proc == null; |
| |
| assert ctx.updatesAllowed(); |
| |
| validateCacheKey(key); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_PUT); |
| |
| final GridNearAtomicAbstractUpdateFuture updateFut = |
| createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter, waitTopFut); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Entry point for remove with single key. |
| * |
| * @param key Key. |
| * @param retval Whether to return |
| * @param filter Filter. |
| * @param async Async operation flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture remove0(K key, final boolean retval, |
| @Nullable CacheEntryPredicate filter, |
| boolean async) { |
| assert ctx.updatesAllowed(); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| |
| final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, |
| null, |
| null, |
| null, |
| retval, |
| filter, |
| true); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Craete future for single key-val pair update. |
| * |
| * @param key Key. |
| * @param val Value. |
| * @param proc Processor. |
| * @param invokeArgs Invoke arguments. |
| * @param retval Return value flag. |
| * @param filter Filter. |
| * @param waitTopFut Whether to wait for topology future. |
| * @return Future. |
| */ |
| private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( |
| K key, |
| @Nullable V val, |
| @Nullable EntryProcessor proc, |
| @Nullable Object[] invokeArgs, |
| boolean retval, |
| @Nullable CacheEntryPredicate filter, |
| boolean waitTopFut |
| ) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| GridCacheOperation op; |
| Object val0; |
| |
| if (val != null) { |
| op = UPDATE; |
| val0 = val; |
| } |
| else if (proc != null) { |
| op = TRANSFORM; |
| val0 = proc; |
| } |
| else { |
| op = DELETE; |
| val0 = null; |
| } |
| |
| GridCacheDrInfo conflictPutVal = null; |
| GridCacheVersion conflictRmvVer = null; |
| |
| if (opCtx != null && opCtx.hasDataCenterId()) { |
| Byte dcId = opCtx.dataCenterId(); |
| |
| assert dcId != null; |
| |
| if (op == UPDATE) { |
| conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), ctx.versions().next(dcId)); |
| |
| val0 = null; |
| } |
| else if (op == GridCacheOperation.TRANSFORM) { |
| conflictPutVal = new GridCacheDrInfo(proc, ctx.versions().next(dcId)); |
| |
| val0 = null; |
| } |
| else |
| conflictRmvVer = ctx.versions().next(dcId); |
| } |
| |
| CacheEntryPredicate[] filters = CU.filterArray(filter); |
| |
| if (conflictPutVal == null && |
| conflictRmvVer == null && |
| !isFastMap(filters, op)) { |
| return new GridNearAtomicSingleUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| op, |
| key, |
| val0, |
| invokeArgs, |
| retval, |
| false, |
| opCtx != null ? opCtx.expiry() : null, |
| filters, |
| ctx.subjectIdPerCall(null, opCtx), |
| ctx.kernalContext().job().currentTaskNameHash(), |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, |
| waitTopFut |
| ); |
| } |
| else { |
| return new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| op, |
| Collections.singletonList(key), |
| val0 != null ? Collections.singletonList(val0) : null, |
| invokeArgs, |
| conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, |
| conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, |
| retval, |
| false, |
| opCtx != null ? opCtx.expiry() : null, |
| filters, |
| ctx.subjectIdPerCall(null, opCtx), |
| ctx.kernalContext().job().currentTaskNameHash(), |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, |
| waitTopFut); |
| } |
| } |
| |
| /** |
| * Whether this is fast-map operation. |
| * |
| * @param filters Filters. |
| * @param op Operation. |
| * @return {@code True} if fast-map. |
| */ |
| public boolean isFastMap(CacheEntryPredicate[] filters, GridCacheOperation op) { |
| return F.isEmpty(filters) && op != TRANSFORM && ctx.config().getWriteSynchronizationMode() == FULL_SYNC && |
| ctx.config().getAtomicWriteOrderMode() == CLOCK && |
| !(ctx.writeThrough() && ctx.config().getInterceptor() != null); |
| } |
| |
| /** |
| * Entry point for all public API remove methods. |
| * |
| * @param keys Keys to remove. |
| * @param conflictMap Conflict map. |
| * @param retval Return value required flag. |
| * @param rawRetval Return {@code GridCacheReturn} instance. |
| * @return Completion future. |
| */ |
| private IgniteInternalFuture removeAllAsync0( |
| @Nullable Collection<? extends K> keys, |
| @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap, |
| final boolean retval, |
| boolean rawRetval, |
| boolean async |
| ) { |
| assert ctx.updatesAllowed(); |
| |
| assert keys != null || conflictMap != null; |
| |
| if (keyCheck) |
| validateCacheKeys(keys); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| UUID subjId = ctx.subjectIdPerCall(null, opCtx); |
| |
| int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); |
| |
| Collection<GridCacheVersion> drVers = null; |
| |
| if (opCtx != null && keys != null && opCtx.hasDataCenterId()) { |
| assert conflictMap == null : conflictMap; |
| |
| drVers = F.transform(keys, new C1<K, GridCacheVersion>() { |
| @Override public GridCacheVersion apply(K k) { |
| return ctx.versions().next(opCtx.dataCenterId()); |
| } |
| }); |
| } |
| |
| final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| DELETE, |
| keys != null ? keys : conflictMap.keySet(), |
| null, |
| null, |
| null, |
| drVers != null ? drVers : (keys != null ? null : conflictMap.values()), |
| retval, |
| rawRetval, |
| opCtx != null ? opCtx.expiry() : null, |
| CU.filterArray(null), |
| subjId, |
| taskNameHash, |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES, |
| true); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Entry point to all public API single get methods. |
| * |
| * @param key Key. |
| * @param forcePrimary Force primary flag. |
| * @param subjId Subject ID. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param skipStore Skip store flag. |
| * @param canRemap Can remap flag. |
| * @param needVer Need version. |
| * @return Get future. |
| */ |
| private IgniteInternalFuture<V> getAsync0(KeyCacheObject key, |
| boolean forcePrimary, |
| UUID subjId, |
| String taskName, |
| boolean deserializeBinary, |
| @Nullable ExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean skipStore, |
| boolean canRemap, |
| boolean needVer |
| ) { |
| AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : |
| ctx.shared().exchange().readyAffinityVersion(); |
| |
| IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); |
| |
| GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, |
| key, |
| topVer, |
| !skipStore, |
| forcePrimary, |
| subjId, |
| taskName, |
| deserializeBinary, |
| expiry, |
| skipVals, |
| canRemap, |
| needVer, |
| false); |
| |
| fut.init(); |
| |
| return (IgniteInternalFuture<V>)fut; |
| } |
| |
| /** |
| * Entry point to all public API get methods. |
| * |
| * @param keys Keys. |
| * @param forcePrimary Force primary flag. |
| * @param subjId Subject ID. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param skipStore Skip store flag. |
| * @param needVer Need version. |
| * @return Get future. |
| */ |
| private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys, |
| boolean forcePrimary, |
| UUID subjId, |
| String taskName, |
| boolean deserializeBinary, |
| @Nullable ExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean skipStore, |
| boolean canRemap, |
| boolean needVer |
| ) { |
| AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() : |
| ctx.shared().exchange().readyAffinityVersion(); |
| |
| final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); |
| |
| // Optimisation: try to resolve value locally and escape 'get future' creation. |
| if (!forcePrimary && ctx.affinityNode()) { |
| Map<K, V> locVals = U.newHashMap(keys.size()); |
| |
| boolean success = true; |
| |
| // Optimistically expect that all keys are available locally (avoid creation of get future). |
| for (KeyCacheObject key : keys) { |
| GridCacheEntryEx entry = null; |
| |
| while (true) { |
| try { |
| entry = ctx.isSwapOrOffheapEnabled() ? entryEx(key) : peekEx(key); |
| |
| // If our DHT cache do has value, then we peek it. |
| if (entry != null) { |
| boolean isNew = entry.isNewLocked(); |
| |
| CacheObject v = null; |
| GridCacheVersion ver = null; |
| |
| if (needVer) { |
| T2<CacheObject, GridCacheVersion> res = entry.innerGetVersioned( |
| null, |
| null, |
| /*swap*/true, |
| /*unmarshal*/true, |
| /**update-metrics*/false, |
| /*event*/!skipVals, |
| subjId, |
| null, |
| taskName, |
| expiry, |
| true); |
| |
| if (res != null) { |
| v = res.get1(); |
| ver = res.get2(); |
| } |
| } |
| else { |
| v = entry.innerGet(null, |
| null, |
| /*swap*/true, |
| /*read-through*/false, |
| /**update-metrics*/false, |
| /*event*/!skipVals, |
| /*temporary*/false, |
| subjId, |
| null, |
| taskName, |
| expiry, |
| !deserializeBinary); |
| } |
| |
| // Entry was not in memory or in swap, so we remove it from cache. |
| if (v == null) { |
| GridCacheVersion obsoleteVer = context().versions().next(); |
| |
| if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) |
| removeEntry(entry); |
| |
| success = false; |
| } |
| else |
| ctx.addResult(locVals, key, v, skipVals, false, deserializeBinary, true, ver); |
| } |
| else |
| success = false; |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| // No-op, retry. |
| } |
| catch (GridDhtInvalidPartitionException ignored) { |
| success = false; |
| |
| break; // While. |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| finally { |
| if (entry != null) |
| ctx.evicts().touch(entry, topVer); |
| } |
| } |
| |
| if (!success) |
| break; |
| else if (!skipVals && ctx.config().isStatisticsEnabled()) |
| metrics0().onRead(true); |
| } |
| |
| if (success) { |
| sendTtlUpdateRequest(expiry); |
| |
| return new GridFinishedFuture<>(locVals); |
| } |
| } |
| |
| if (expiry != null) |
| expiry.reset(); |
| |
| // Either reload or not all values are available locally. |
| GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, |
| keys, |
| topVer, |
| !skipStore, |
| forcePrimary, |
| subjId, |
| taskName, |
| deserializeBinary, |
| expiry, |
| skipVals, |
| canRemap, |
| needVer, |
| false); |
| |
| fut.init(); |
| |
| return fut; |
| } |
| |
| /** |
| * Executes local update. |
| * |
| * @param nodeId Node ID. |
| * @param req Update request. |
| * @param completionCb Completion callback. |
| */ |
| public void updateAllAsyncInternal( |
| final UUID nodeId, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb |
| ) { |
| IgniteInternalFuture<Object> forceFut = preldr.request(req, req.topologyVersion()); |
| |
| if (forceFut == null || forceFut.isDone()) { |
| try { |
| if (forceFut != null) |
| forceFut.get(); |
| } |
| catch (NodeStoppingException e) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| onForceKeysError(nodeId, req, completionCb, e); |
| |
| return; |
| } |
| |
| updateAllAsyncInternal0(nodeId, req, completionCb); |
| } |
| else { |
| forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> fut) { |
| try { |
| fut.get(); |
| } |
| catch (NodeStoppingException e) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| onForceKeysError(nodeId, req, completionCb, e); |
| |
| return; |
| } |
| |
| updateAllAsyncInternal0(nodeId, req, completionCb); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Update request. |
| * @param completionCb Completion callback. |
| * @param e Error. |
| */ |
| private void onForceKeysError(final UUID nodeId, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, |
| IgniteCheckedException e |
| ) { |
| GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), |
| nodeId, |
| req.futureVersion(), |
| ctx.deploymentEnabled()); |
| |
| res.addFailedKeys(req.keys(), e); |
| |
| completionCb.apply(req, res); |
| } |
| |
| /** |
| * Executes local update after preloader fetched values. |
| * |
| * @param nodeId Node ID. |
| * @param req Update request. |
| * @param completionCb Completion callback. |
| */ |
| private void updateAllAsyncInternal0( |
| UUID nodeId, |
| GridNearAtomicAbstractUpdateRequest req, |
| CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb |
| ) { |
| GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), nodeId, req.futureVersion(), |
| ctx.deploymentEnabled()); |
| |
| assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); |
| |
| GridDhtAtomicAbstractUpdateFuture dhtFut = null; |
| |
| boolean remap = false; |
| |
| String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); |
| |
| IgniteCacheExpiryPolicy expiry = null; |
| |
| try { |
| // If batch store update is enabled, we need to lock all entries. |
| // First, need to acquire locks on cache entries, then check filter. |
| List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion()); |
| |
| Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; |
| |
| try { |
| GridDhtPartitionTopology top = topology(); |
| |
| top.readLock(); |
| |
| try { |
| if (top.stopping()) { |
| res.addFailedKeys(req.keys(), new IgniteCheckedException("Failed to perform cache operation " + |
| "(cache is stopped): " + name())); |
| |
| completionCb.apply(req, res); |
| |
| return; |
| } |
| |
| // Do not check topology version for CLOCK versioning since |
| // partition exchange will wait for near update future (if future is on server node). |
| // Also do not check topology version if topology was locked on near node by |
| // external transaction or explicit lock. |
| if ((req.fastMap() && !req.clientRequest()) || req.topologyLocked() || |
| !needRemap(req.topologyVersion(), top.topologyVersion())) { |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) { |
| U.warn(msgLog, "Skip near update request, node originated update request left [" + |
| "futId=" + req.futureVersion() + ", node=" + nodeId + ']'); |
| |
| return; |
| } |
| |
| boolean hasNear = ctx.discovery().cacheNearNode(node, name()); |
| |
| GridCacheVersion ver = req.updateVersion(); |
| |
| if (ver == null) { |
| // Assign next version for update inside entries lock. |
| ver = ctx.versions().next(top.topologyVersion()); |
| |
| if (hasNear) |
| res.nearVersion(ver); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Assigned update version [futId=" + req.futureVersion() + |
| ", writeVer=" + ver + ']'); |
| } |
| } |
| |
| assert ver != null : "Got null version for update request: " + req; |
| |
| boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); |
| |
| dhtFut = createDhtFuture(ver, req, res, completionCb, false); |
| |
| expiry = expiryPolicy(req.expiry()); |
| |
| GridCacheReturn retVal = null; |
| |
| if (req.size() > 1 && // Several keys ... |
| writeThrough() && !req.skipStore() && // and store is enabled ... |
| !ctx.store().isLocal() && // and this is not local store ... |
| // (conflict resolver should be used for local store) |
| !ctx.dr().receiveEnabled() // and no DR. |
| ) { |
| // This method can only be used when there are no replicated entries in the batch. |
| UpdateBatchResult updRes = updateWithBatch(node, |
| hasNear, |
| req, |
| res, |
| locked, |
| ver, |
| dhtFut, |
| completionCb, |
| ctx.isDrEnabled(), |
| taskName, |
| expiry, |
| sndPrevVal); |
| |
| deleted = updRes.deleted(); |
| dhtFut = updRes.dhtFuture(); |
| |
| if (req.operation() == TRANSFORM) |
| retVal = updRes.invokeResults(); |
| } |
| else { |
| UpdateSingleResult updRes = updateSingle(node, |
| hasNear, |
| req, |
| res, |
| locked, |
| ver, |
| dhtFut, |
| completionCb, |
| ctx.isDrEnabled(), |
| taskName, |
| expiry, |
| sndPrevVal); |
| |
| retVal = updRes.returnValue(); |
| deleted = updRes.deleted(); |
| dhtFut = updRes.dhtFuture(); |
| } |
| |
| if (retVal == null) |
| retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, true); |
| |
| res.returnValue(retVal); |
| |
| if (req.writeSynchronizationMode() != FULL_ASYNC) |
| req.cleanup(!node.isLocal()); |
| |
| if (dhtFut != null) |
| ctx.mvcc().addAtomicFuture(dhtFut.version(), dhtFut); |
| } |
| else |
| // Should remap all keys. |
| remap = true; |
| } |
| finally { |
| top.readUnlock(); |
| } |
| } |
| catch (GridCacheEntryRemovedException e) { |
| assert false : "Entry should not become obsolete while holding lock."; |
| |
| e.printStackTrace(); |
| } |
| finally { |
| if (locked != null) |
| unlockEntries(locked, req.topologyVersion()); |
| |
| // Enqueue if necessary after locks release. |
| if (deleted != null) { |
| assert !deleted.isEmpty(); |
| assert ctx.deferredDelete() : this; |
| |
| for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) |
| ctx.onDeferredDelete(e.get1(), e.get2()); |
| } |
| } |
| } |
| catch (GridDhtInvalidPartitionException ignore) { |
| assert !req.fastMap() || req.clientRequest() : req; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); |
| |
| remap = true; |
| } |
| catch (Throwable e) { |
| // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is |
| // an attempt to use cleaned resources. |
| U.error(log, "Unexpected exception during cache update", e); |
| |
| res.addFailedKeys(req.keys(), e); |
| |
| completionCb.apply(req, res); |
| |
| if (e instanceof Error) |
| throw e; |
| |
| return; |
| } |
| |
| if (remap) { |
| assert dhtFut == null; |
| |
| res.remapKeys(req.keys()); |
| |
| completionCb.apply(req, res); |
| } |
| else { |
| // If there are backups, map backup update future. |
| if (dhtFut != null) |
| dhtFut.map(); |
| // Otherwise, complete the call. |
| else |
| completionCb.apply(req, res); |
| } |
| |
| sendTtlUpdateRequest(expiry); |
| } |
| |
| /** |
| * Updates locked entries using batched write-through. |
| * |
| * @param node Sender node. |
| * @param hasNear {@code True} if originating node has near cache. |
| * @param req Update request. |
| * @param res Update response. |
| * @param locked Locked entries. |
| * @param ver Assigned version. |
| * @param dhtFut Optional DHT future. |
| * @param completionCb Completion callback to invoke when DHT future is completed. |
| * @param replicate Whether replication is enabled. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param sndPrevVal If {@code true} sends previous value to backups. |
| * @return Deleted entries. |
| * @throws GridCacheEntryRemovedException Should not be thrown. |
| */ |
| @SuppressWarnings("unchecked") |
| private UpdateBatchResult updateWithBatch( |
| final ClusterNode node, |
| final boolean hasNear, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final GridNearAtomicUpdateResponse res, |
| final List<GridDhtCacheEntry> locked, |
| final GridCacheVersion ver, |
| @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, |
| final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, |
| final boolean replicate, |
| final String taskName, |
| @Nullable final IgniteCacheExpiryPolicy expiry, |
| final boolean sndPrevVal |
| ) throws GridCacheEntryRemovedException { |
| assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. |
| assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. |
| |
| if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) { |
| try { |
| reloadIfNeeded(locked); |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKeys(req.keys(), e); |
| |
| return new UpdateBatchResult(); |
| } |
| } |
| |
| int size = req.size(); |
| |
| Map<KeyCacheObject, CacheObject> putMap = null; |
| |
| Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap = null; |
| |
| Collection<KeyCacheObject> rmvKeys = null; |
| |
| List<CacheObject> writeVals = null; |
| |
| UpdateBatchResult updRes = new UpdateBatchResult(); |
| |
| List<GridDhtCacheEntry> filtered = new ArrayList<>(size); |
| |
| GridCacheOperation op = req.operation(); |
| |
| GridCacheReturn invokeRes = null; |
| |
| int firstEntryIdx = 0; |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| for (int i = 0; i < locked.size(); i++) { |
| GridDhtCacheEntry entry = locked.get(i); |
| |
| if (entry == null) |
| continue; |
| |
| try { |
| if (!checkFilter(entry, req, res)) { |
| if (expiry != null && entry.hasValue()) { |
| long ttl = expiry.forAccess(); |
| |
| if (ttl != CU.TTL_NOT_CHANGED) { |
| entry.updateTtl(null, ttl); |
| |
| expiry.ttlUpdated(entry.key(), |
| entry.version(), |
| entry.readers()); |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Entry did not pass the filter (will skip write) [entry=" + entry + |
| ", filter=" + Arrays.toString(req.filter()) + ", res=" + res + ']'); |
| |
| if (hasNear) |
| res.addSkippedIndex(i); |
| |
| firstEntryIdx++; |
| |
| continue; |
| } |
| |
| if (op == TRANSFORM) { |
| EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); |
| |
| CacheObject old = entry.innerGet( |
| ver, |
| null, |
| /*read swap*/true, |
| /*read through*/true, |
| /*metrics*/true, |
| /*event*/true, |
| /*temporary*/true, |
| req.subjectId(), |
| entryProcessor, |
| taskName, |
| null, |
| req.keepBinary()); |
| |
| Object oldVal = null; |
| Object updatedVal = null; |
| |
| CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(entry.key(), old, |
| entry.version(), req.keepBinary(), entry); |
| |
| CacheObject updated; |
| |
| try { |
| Object computed = entryProcessor.process(invokeEntry, req.invokeArguments()); |
| |
| if (computed != null) { |
| if (invokeRes == null) |
| invokeRes = new GridCacheReturn(node.isLocal()); |
| |
| computed = ctx.unwrapTemporary(computed); |
| |
| invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), computed, null, |
| req.keepBinary()); |
| } |
| |
| if (!invokeEntry.modified()) |
| continue; |
| |
| updatedVal = ctx.unwrapTemporary(invokeEntry.getValue()); |
| |
| updated = ctx.toCacheObject(updatedVal); |
| } |
| catch (Exception e) { |
| if (invokeRes == null) |
| invokeRes = new GridCacheReturn(node.isLocal()); |
| |
| invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), null, e, req.keepBinary()); |
| |
| updated = old; |
| } |
| |
| if (updated == null) { |
| if (intercept) { |
| CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal, req.keepBinary()); |
| |
| IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(e); |
| |
| if (ctx.cancelRemove(interceptorRes)) |
| continue; |
| } |
| |
| // Update previous batch. |
| if (putMap != null) { |
| dhtFut = updatePartialBatch( |
| hasNear, |
| firstEntryIdx, |
| filtered, |
| ver, |
| node, |
| writeVals, |
| putMap, |
| null, |
| entryProcessorMap, |
| dhtFut, |
| completionCb, |
| req, |
| res, |
| replicate, |
| updRes, |
| taskName, |
| expiry, |
| sndPrevVal); |
| |
| firstEntryIdx = i; |
| |
| putMap = null; |
| writeVals = null; |
| entryProcessorMap = null; |
| |
| filtered = new ArrayList<>(); |
| } |
| |
| // Start collecting new batch. |
| if (rmvKeys == null) |
| rmvKeys = new ArrayList<>(size); |
| |
| rmvKeys.add(entry.key()); |
| } |
| else { |
| if (intercept) { |
| CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal, req.keepBinary()); |
| |
| Object val = ctx.config().getInterceptor().onBeforePut(e, updatedVal); |
| |
| if (val == null) |
| continue; |
| |
| updated = ctx.toCacheObject(ctx.unwrapTemporary(val)); |
| } |
| |
| // Update previous batch. |
| if (rmvKeys != null) { |
| dhtFut = updatePartialBatch( |
| hasNear, |
| firstEntryIdx, |
| filtered, |
| ver, |
| node, |
| null, |
| null, |
| rmvKeys, |
| entryProcessorMap, |
| dhtFut, |
| completionCb, |
| req, |
| res, |
| replicate, |
| updRes, |
| taskName, |
| expiry, |
| sndPrevVal); |
| |
| firstEntryIdx = i; |
| |
| rmvKeys = null; |
| entryProcessorMap = null; |
| |
| filtered = new ArrayList<>(); |
| } |
| |
| if (putMap == null) { |
| putMap = new LinkedHashMap<>(size, 1.0f); |
| writeVals = new ArrayList<>(size); |
| } |
| |
| putMap.put(entry.key(), updated); |
| writeVals.add(updated); |
| } |
| |
| if (entryProcessorMap == null) |
| entryProcessorMap = new HashMap<>(); |
| |
| entryProcessorMap.put(entry.key(), entryProcessor); |
| } |
| else if (op == UPDATE) { |
| CacheObject updated = req.value(i); |
| |
| if (intercept) { |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read swap*/true, |
| /*read through*/ctx.loadPreviousValue(), |
| /*metrics*/true, |
| /*event*/true, |
| /*temporary*/true, |
| req.subjectId(), |
| null, |
| taskName, |
| null, |
| req.keepBinary()); |
| |
| Object val = ctx.config().getInterceptor().onBeforePut( |
| new CacheLazyEntry( |
| ctx, |
| entry.key(), |
| old, |
| req.keepBinary()), |
| ctx.unwrapBinaryIfNeeded( |
| updated, |
| req.keepBinary(), |
| false)); |
| |
| if (val == null) |
| continue; |
| |
| updated = ctx.toCacheObject(ctx.unwrapTemporary(val)); |
| } |
| |
| assert updated != null; |
| |
| if (putMap == null) { |
| putMap = new LinkedHashMap<>(size, 1.0f); |
| writeVals = new ArrayList<>(size); |
| } |
| |
| putMap.put(entry.key(), updated); |
| writeVals.add(updated); |
| } |
| else { |
| assert op == DELETE; |
| |
| if (intercept) { |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read swap*/true, |
| /*read through*/ctx.loadPreviousValue(), |
| /*metrics*/true, |
| /*event*/true, |
| /*temporary*/true, |
| req.subjectId(), |
| null, |
| taskName, |
| null, |
| req.keepBinary()); |
| |
| IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() |
| .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, req.keepBinary())); |
| |
| if (ctx.cancelRemove(interceptorRes)) |
| continue; |
| } |
| |
| if (rmvKeys == null) |
| rmvKeys = new ArrayList<>(size); |
| |
| rmvKeys.add(entry.key()); |
| } |
| |
| filtered.add(entry); |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(entry.key(), e); |
| } |
| } |
| |
| // Store final batch. |
| if (putMap != null || rmvKeys != null) { |
| dhtFut = updatePartialBatch( |
| hasNear, |
| firstEntryIdx, |
| filtered, |
| ver, |
| node, |
| writeVals, |
| putMap, |
| rmvKeys, |
| entryProcessorMap, |
| dhtFut, |
| completionCb, |
| req, |
| res, |
| replicate, |
| updRes, |
| taskName, |
| expiry, |
| sndPrevVal); |
| } |
| else |
| assert filtered.isEmpty(); |
| |
| updRes.dhtFuture(dhtFut); |
| |
| updRes.invokeResult(invokeRes); |
| |
| return updRes; |
| } |
| |
| /** |
| * @param entries Entries. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void reloadIfNeeded(final List<GridDhtCacheEntry> entries) throws IgniteCheckedException { |
| Map<KeyCacheObject, Integer> needReload = null; |
| |
| for (int i = 0; i < entries.size(); i++) { |
| GridDhtCacheEntry entry = entries.get(i); |
| |
| if (entry == null) |
| continue; |
| |
| CacheObject val = entry.rawGetOrUnmarshal(false); |
| |
| if (val == null) { |
| if (needReload == null) |
| needReload = new HashMap<>(entries.size(), 1.0f); |
| |
| needReload.put(entry.key(), i); |
| } |
| } |
| |
| if (needReload != null) { |
| final Map<KeyCacheObject, Integer> idxMap = needReload; |
| |
| ctx.store().loadAll(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { |
| @Override public void apply(KeyCacheObject k, Object v) { |
| Integer idx = idxMap.get(k); |
| |
| if (idx != null) { |
| GridDhtCacheEntry entry = entries.get(idx); |
| |
| try { |
| GridCacheVersion ver = entry.version(); |
| |
| entry.versionedValue(ctx.toCacheObject(v), null, ver, null); |
| } |
| catch (GridCacheEntryRemovedException e) { |
| assert false : "Entry should not get obsolete while holding lock [entry=" + entry + |
| ", e=" + e + ']'; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Updates locked entries one-by-one. |
| * |
| * @param node Originating node. |
| * @param hasNear {@code True} if originating node has near cache. |
| * @param req Update request. |
| * @param res Update response. |
| * @param locked Locked entries. |
| * @param ver Assigned update version. |
| * @param dhtFut Optional DHT future. |
| * @param completionCb Completion callback to invoke when DHT future is completed. |
| * @param replicate Whether DR is enabled for that cache. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param sndPrevVal If {@code true} sends previous value to backups. |
| * @return Return value. |
| * @throws GridCacheEntryRemovedException Should be never thrown. |
| */ |
| private UpdateSingleResult updateSingle( |
| ClusterNode node, |
| boolean hasNear, |
| GridNearAtomicAbstractUpdateRequest req, |
| GridNearAtomicUpdateResponse res, |
| List<GridDhtCacheEntry> locked, |
| GridCacheVersion ver, |
| @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, |
| CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, |
| boolean replicate, |
| String taskName, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean sndPrevVal |
| ) throws GridCacheEntryRemovedException { |
| GridCacheReturn retVal = null; |
| Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; |
| |
| AffinityTopologyVersion topVer = req.topologyVersion(); |
| |
| boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); |
| |
| boolean readersOnly = false; |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| // Avoid iterator creation. |
| for (int i = 0; i < req.size(); i++) { |
| KeyCacheObject k = req.key(i); |
| |
| GridCacheOperation op = req.operation(); |
| |
| // We are holding java-level locks on entries at this point. |
| // No GridCacheEntryRemovedException can be thrown. |
| try { |
| GridDhtCacheEntry entry = locked.get(i); |
| |
| if (entry == null) |
| continue; |
| |
| GridCacheVersion newConflictVer = req.conflictVersion(i); |
| long newConflictTtl = req.conflictTtl(i); |
| long newConflictExpireTime = req.conflictExpireTime(i); |
| |
| assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; |
| |
| boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.partition(), |
| req.topologyVersion()); |
| |
| Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); |
| |
| Collection<UUID> readers = null; |
| Collection<UUID> filteredReaders = null; |
| |
| if (checkReaders) { |
| readers = entry.readers(); |
| filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); |
| } |
| |
| GridCacheUpdateAtomicResult updRes = entry.innerUpdate( |
| ver, |
| node.id(), |
| locNodeId, |
| op, |
| writeVal, |
| req.invokeArguments(), |
| (primary || (ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly())) |
| && writeThrough() && !req.skipStore(), |
| !req.skipStore(), |
| sndPrevVal || req.returnValue(), |
| req.keepBinary(), |
| expiry, |
| true, |
| true, |
| primary, |
| ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. |
| topVer, |
| req.filter(), |
| replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, |
| newConflictTtl, |
| newConflictExpireTime, |
| newConflictVer, |
| true, |
| intercept, |
| req.subjectId(), |
| taskName, |
| null, |
| null, |
| dhtFut); |
| |
| if (dhtFut == null && !F.isEmpty(filteredReaders)) { |
| dhtFut = createDhtFuture(ver, req, res, completionCb, true); |
| |
| readersOnly = true; |
| } |
| |
| if (dhtFut != null) { |
| if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. |
| GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); |
| |
| if (conflictCtx == null) |
| newConflictVer = null; |
| else if (conflictCtx.isMerge()) |
| newConflictVer = null; // Conflict version is discarded in case of merge. |
| |
| EntryProcessor<Object, Object, Object> entryProcessor = null; |
| |
| if (!readersOnly) { |
| dhtFut.addWriteEntry(entry, |
| updRes.newValue(), |
| entryProcessor, |
| updRes.newTtl(), |
| updRes.conflictExpireTime(), |
| newConflictVer, |
| sndPrevVal, |
| updRes.oldValue(), |
| updRes.updateCounter()); |
| } |
| |
| if (!F.isEmpty(filteredReaders)) |
| dhtFut.addNearWriteEntries(filteredReaders, |
| entry, |
| updRes.newValue(), |
| entryProcessor, |
| updRes.newTtl(), |
| updRes.conflictExpireTime()); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Entry did not pass the filter or conflict resolution (will skip write) " + |
| "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); |
| } |
| } |
| |
| if (hasNear) { |
| if (primary && updRes.sendToDht()) { |
| if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { |
| // If put the same value as in request then do not need to send it back. |
| if (op == TRANSFORM || writeVal != updRes.newValue()) { |
| res.addNearValue(i, |
| updRes.newValue(), |
| updRes.newTtl(), |
| updRes.conflictExpireTime()); |
| } |
| else |
| res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); |
| |
| if (updRes.newValue() != null) { |
| IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); |
| |
| assert f == null : f; |
| } |
| } |
| else if (F.contains(readers, node.id())) // Reader became primary or backup. |
| entry.removeReader(node.id(), req.messageId()); |
| else |
| res.addSkippedIndex(i); |
| } |
| else |
| res.addSkippedIndex(i); |
| } |
| |
| if (updRes.removeVersion() != null) { |
| if (deleted == null) |
| deleted = new ArrayList<>(req.size()); |
| |
| deleted.add(F.t(entry, updRes.removeVersion())); |
| } |
| |
| if (op == TRANSFORM) { |
| assert !req.returnValue(); |
| |
| IgniteBiTuple<Object, Exception> compRes = updRes.computedResult(); |
| |
| if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { |
| if (retVal == null) |
| retVal = new GridCacheReturn(node.isLocal()); |
| |
| retVal.addEntryProcessResult(ctx, |
| k, |
| null, |
| compRes.get1(), |
| compRes.get2(), |
| req.keepBinary()); |
| } |
| } |
| else { |
| // Create only once. |
| if (retVal == null) { |
| CacheObject ret = updRes.oldValue(); |
| |
| retVal = new GridCacheReturn(ctx, |
| node.isLocal(), |
| req.keepBinary(), |
| req.returnValue() ? ret : null, |
| updRes.success()); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(k, e); |
| } |
| } |
| |
| return new UpdateSingleResult(retVal, deleted, dhtFut); |
| } |
| |
| /** |
| * @param hasNear {@code True} if originating node has near cache. |
| * @param firstEntryIdx Index of the first entry in the request keys collection. |
| * @param entries Entries to update. |
| * @param ver Version to set. |
| * @param node Originating node. |
| * @param writeVals Write values. |
| * @param putMap Values to put. |
| * @param rmvKeys Keys to remove. |
| * @param entryProcessorMap Entry processors. |
| * @param dhtFut DHT update future if has backups. |
| * @param completionCb Completion callback to invoke when DHT future is completed. |
| * @param req Request. |
| * @param res Response. |
| * @param replicate Whether replication is enabled. |
| * @param batchRes Batch update result. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param sndPrevVal If {@code true} sends previous value to backups. |
| * @return Deleted entries. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| @Nullable private GridDhtAtomicAbstractUpdateFuture updatePartialBatch( |
| final boolean hasNear, |
| final int firstEntryIdx, |
| final List<GridDhtCacheEntry> entries, |
| final GridCacheVersion ver, |
| final ClusterNode node, |
| @Nullable final List<CacheObject> writeVals, |
| @Nullable final Map<KeyCacheObject, CacheObject> putMap, |
| @Nullable final Collection<KeyCacheObject> rmvKeys, |
| @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, |
| @Nullable GridDhtAtomicAbstractUpdateFuture dhtFut, |
| final CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final GridNearAtomicUpdateResponse res, |
| final boolean replicate, |
| final UpdateBatchResult batchRes, |
| final String taskName, |
| @Nullable final IgniteCacheExpiryPolicy expiry, |
| final boolean sndPrevVal |
| ) { |
| assert putMap == null ^ rmvKeys == null; |
| |
| assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch."; |
| |
| AffinityTopologyVersion topVer = req.topologyVersion(); |
| |
| boolean checkReaders = hasNear || ctx.discovery().hasNearCache(name(), topVer); |
| |
| CacheStorePartialUpdateException storeErr = null; |
| |
| try { |
| GridCacheOperation op; |
| |
| if (putMap != null) { |
| // If fast mapping, filter primary keys for write to store. |
| Map<KeyCacheObject, CacheObject> storeMap = req.fastMap() ? |
| F.view(putMap, new P1<CacheObject>() { |
| @Override public boolean apply(CacheObject key) { |
| return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); |
| } |
| }) : |
| putMap; |
| |
| try { |
| ctx.store().putAll(null, F.viewReadOnly(storeMap, new C1<CacheObject, IgniteBiTuple<CacheObject, GridCacheVersion>>() { |
| @Override public IgniteBiTuple<CacheObject, GridCacheVersion> apply(CacheObject v) { |
| return F.t(v, ver); |
| } |
| })); |
| } |
| catch (CacheStorePartialUpdateException e) { |
| storeErr = e; |
| } |
| |
| op = UPDATE; |
| } |
| else { |
| // If fast mapping, filter primary keys for write to store. |
| Collection<KeyCacheObject> storeKeys = req.fastMap() ? |
| F.view(rmvKeys, new P1<Object>() { |
| @Override public boolean apply(Object key) { |
| return ctx.affinity().primary(ctx.localNode(), key, req.topologyVersion()); |
| } |
| }) : |
| rmvKeys; |
| |
| try { |
| ctx.store().removeAll(null, storeKeys); |
| } |
| catch (CacheStorePartialUpdateException e) { |
| storeErr = e; |
| } |
| |
| op = DELETE; |
| } |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| // Avoid iterator creation. |
| for (int i = 0; i < entries.size(); i++) { |
| GridDhtCacheEntry entry = entries.get(i); |
| |
| assert Thread.holdsLock(entry); |
| |
| if (entry.obsolete()) { |
| assert req.operation() == DELETE : "Entry can become obsolete only after remove: " + entry; |
| |
| continue; |
| } |
| |
| if (storeErr != null && |
| storeErr.failedKeys().contains(entry.key().value(ctx.cacheObjectContext(), false))) |
| continue; |
| |
| try { |
| // We are holding java-level locks on entries at this point. |
| CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null; |
| |
| assert writeVal != null || op == DELETE : "null write value found."; |
| |
| boolean primary = !req.fastMap() || ctx.affinity().primary(ctx.localNode(), entry.key(), |
| req.topologyVersion()); |
| |
| Collection<UUID> readers = null; |
| Collection<UUID> filteredReaders = null; |
| |
| if (checkReaders) { |
| readers = entry.readers(); |
| filteredReaders = F.view(entry.readers(), F.notEqualTo(node.id())); |
| } |
| |
| GridCacheUpdateAtomicResult updRes = entry.innerUpdate( |
| ver, |
| node.id(), |
| locNodeId, |
| op, |
| writeVal, |
| null, |
| /*write-through*/false, |
| /*read-through*/false, |
| /*retval*/sndPrevVal, |
| req.keepBinary(), |
| expiry, |
| /*event*/true, |
| /*metrics*/true, |
| primary, |
| ctx.config().getAtomicWriteOrderMode() == CLOCK, // Check version in CLOCK mode on primary node. |
| topVer, |
| null, |
| replicate ? primary ? DR_PRIMARY : DR_BACKUP : DR_NONE, |
| CU.TTL_NOT_CHANGED, |
| CU.EXPIRE_TIME_CALCULATE, |
| null, |
| /*conflict resolve*/false, |
| /*intercept*/false, |
| req.subjectId(), |
| taskName, |
| null, |
| null, |
| dhtFut); |
| |
| assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : |
| "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; |
| |
| if (intercept) { |
| if (op == UPDATE) { |
| ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( |
| ctx, |
| entry.key(), |
| updRes.newValue(), |
| req.keepBinary())); |
| } |
| else { |
| assert op == DELETE : op; |
| |
| // Old value should be already loaded for 'CacheInterceptor.onBeforeRemove'. |
| ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), |
| updRes.oldValue(), req.keepBinary())); |
| } |
| } |
| |
| batchRes.addDeleted(entry, updRes, entries); |
| |
| if (dhtFut == null && !F.isEmpty(filteredReaders)) { |
| dhtFut = createDhtFuture(ver, req, res, completionCb, true); |
| |
| batchRes.readersOnly(true); |
| } |
| |
| if (dhtFut != null) { |
| EntryProcessor<Object, Object, Object> entryProcessor = |
| entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); |
| |
| if (!batchRes.readersOnly()) { |
| dhtFut.addWriteEntry(entry, |
| writeVal, |
| entryProcessor, |
| updRes.newTtl(), |
| CU.EXPIRE_TIME_CALCULATE, |
| null, |
| sndPrevVal, |
| updRes.oldValue(), |
| updRes.updateCounter()); |
| } |
| |
| if (!F.isEmpty(filteredReaders)) |
| dhtFut.addNearWriteEntries(filteredReaders, |
| entry, |
| writeVal, |
| entryProcessor, |
| updRes.newTtl(), |
| CU.EXPIRE_TIME_CALCULATE); |
| } |
| |
| if (hasNear) { |
| if (primary) { |
| if (!ctx.affinity().belongs(node, entry.partition(), topVer)) { |
| int idx = firstEntryIdx + i; |
| |
| if (req.operation() == TRANSFORM) { |
| res.addNearValue(idx, |
| writeVal, |
| updRes.newTtl(), |
| CU.EXPIRE_TIME_CALCULATE); |
| } |
| else |
| res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); |
| |
| if (writeVal != null || entry.hasValue()) { |
| IgniteInternalFuture<Boolean> f = entry.addReader(node.id(), req.messageId(), topVer); |
| |
| assert f == null : f; |
| } |
| } |
| else if (readers.contains(node.id())) // Reader became primary or backup. |
| entry.removeReader(node.id(), req.messageId()); |
| else |
| res.addSkippedIndex(firstEntryIdx + i); |
| } |
| else |
| res.addSkippedIndex(firstEntryIdx + i); |
| } |
| } |
| catch (GridCacheEntryRemovedException e) { |
| assert false : "Entry cannot become obsolete while holding lock."; |
| |
| e.printStackTrace(); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e, ctx); |
| } |
| |
| if (storeErr != null) { |
| ArrayList<KeyCacheObject> failed = new ArrayList<>(storeErr.failedKeys().size()); |
| |
| for (Object failedKey : storeErr.failedKeys()) |
| failed.add(ctx.toCacheKeyObject(failedKey)); |
| |
| res.addFailedKeys(failed, storeErr.getCause(), ctx); |
| } |
| |
| return dhtFut; |
| } |
| |
| /** |
| * Acquires java-level locks on cache entries. Returns collection of locked entries. |
| * |
| * @param req Request with keys to lock. |
| * @param topVer Topology version to lock on. |
| * @return Collection of locked entries. |
| * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown, |
| * locks are released. |
| */ |
| @SuppressWarnings("ForLoopReplaceableByForEach") |
| private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) |
| throws GridDhtInvalidPartitionException { |
| if (req.size() == 1) { |
| KeyCacheObject key = req.key(0); |
| |
| while (true) { |
| try { |
| GridDhtCacheEntry entry = entryExx(key, topVer); |
| |
| GridUnsafe.monitorEnter(entry); |
| |
| if (entry.obsolete()) |
| GridUnsafe.monitorExit(entry); |
| else |
| return Collections.singletonList(entry); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| // Ignore invalid partition exception in CLOCK ordering mode. |
| if (ctx.config().getAtomicWriteOrderMode() == CLOCK) |
| return Collections.singletonList(null); |
| else |
| throw e; |
| } |
| } |
| } |
| else { |
| List<GridDhtCacheEntry> locked = new ArrayList<>(req.size()); |
| |
| while (true) { |
| for (int i = 0; i < req.size(); i++) { |
| try { |
| GridDhtCacheEntry entry = entryExx(req.key(i), topVer); |
| |
| locked.add(entry); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| // Ignore invalid partition exception in CLOCK ordering mode. |
| if (ctx.config().getAtomicWriteOrderMode() == CLOCK) |
| locked.add(null); |
| else |
| throw e; |
| } |
| } |
| |
| boolean retry = false; |
| |
| for (int i = 0; i < locked.size(); i++) { |
| GridCacheMapEntry entry = locked.get(i); |
| |
| if (entry == null) |
| continue; |
| |
| GridUnsafe.monitorEnter(entry); |
| |
| if (entry.obsolete()) { |
| // Unlock all locked. |
| for (int j = 0; j <= i; j++) { |
| if (locked.get(j) != null) |
| GridUnsafe.monitorExit(locked.get(j)); |
| } |
| |
| // Clear entries. |
| locked.clear(); |
| |
| // Retry. |
| retry = true; |
| |
| break; |
| } |
| } |
| |
| if (!retry) |
| return locked; |
| } |
| } |
| } |
| |
| /** |
| * Releases java-level locks on cache entries. |
| * |
| * @param locked Locked entries. |
| * @param topVer Topology version. |
| */ |
| private void unlockEntries(Collection<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) { |
| // Process deleted entries before locks release. |
| assert ctx.deferredDelete() : this; |
| |
| // Entries to skip eviction manager notification for. |
| // Enqueue entries while holding locks. |
| Collection<KeyCacheObject> skip = null; |
| |
| try { |
| for (GridCacheMapEntry entry : locked) { |
| if (entry != null && entry.deleted()) { |
| if (skip == null) |
| skip = new HashSet<>(locked.size(), 1.0f); |
| |
| skip.add(entry.key()); |
| } |
| } |
| } |
| finally { |
| // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is |
| // an attempt to use cleaned resources. |
| // That's why releasing locks in the finally block.. |
| for (GridCacheMapEntry entry : locked) { |
| if (entry != null) |
| GridUnsafe.monitorExit(entry); |
| } |
| } |
| |
| // Try evict partitions. |
| for (GridDhtCacheEntry entry : locked) { |
| if (entry != null) |
| entry.onUnlock(); |
| } |
| |
| if (skip != null && skip.size() == locked.size()) |
| // Optimization. |
| return; |
| |
| // Must touch all entries since update may have deleted entries. |
| // Eviction manager will remove empty entries. |
| for (GridCacheMapEntry entry : locked) { |
| if (entry != null && (skip == null || !skip.contains(entry.key()))) |
| ctx.evicts().touch(entry, topVer); |
| } |
| } |
| |
| /** |
| * @param entry Entry to check. |
| * @param req Update request. |
| * @param res Update response. If filter evaluation failed, key will be added to failed keys and method |
| * will return false. |
| * @return {@code True} if filter evaluation succeeded. |
| */ |
| private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req, |
| GridNearAtomicUpdateResponse res) { |
| try { |
| return ctx.isAllLocked(entry, req.filter()); |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(entry.key(), e); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * @param req Request to remap. |
| */ |
| private void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) { |
| assert req.writeSynchronizationMode() == FULL_ASYNC : req; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Remapping near update request locally: " + req); |
| |
| Collection<?> vals; |
| Collection<GridCacheDrInfo> drPutVals; |
| Collection<GridCacheVersion> drRmvVals; |
| |
| if (req.conflictVersions() == null) { |
| vals = req.values(); |
| |
| drPutVals = null; |
| drRmvVals = null; |
| } |
| else if (req.operation() == UPDATE) { |
| int size = req.keys().size(); |
| |
| drPutVals = new ArrayList<>(size); |
| |
| for (int i = 0; i < size; i++) { |
| long ttl = req.conflictTtl(i); |
| |
| if (ttl == CU.TTL_NOT_CHANGED) |
| drPutVals.add(new GridCacheDrInfo(req.value(i), req.conflictVersion(i))); |
| else |
| drPutVals.add(new GridCacheDrExpirationInfo(req.value(i), req.conflictVersion(i), ttl, |
| req.conflictExpireTime(i))); |
| } |
| |
| vals = null; |
| drRmvVals = null; |
| } |
| else { |
| assert req.operation() == DELETE : req; |
| |
| drRmvVals = req.conflictVersions(); |
| |
| vals = null; |
| drPutVals = null; |
| } |
| |
| final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| req.operation(), |
| req.keys(), |
| vals, |
| req.invokeArguments(), |
| drPutVals, |
| drRmvVals, |
| req.returnValue(), |
| false, |
| req.expiry(), |
| req.filter(), |
| req.subjectId(), |
| req.taskNameHash(), |
| req.skipStore(), |
| req.keepBinary(), |
| MAX_RETRIES, |
| true); |
| |
| updateFut.map(); |
| } |
| |
| /** |
| * Creates backup update future if necessary. |
| * |
| * @param writeVer Write version. |
| * @param updateReq Update request. |
| * @param updateRes Update response. |
| * @param completionCb Completion callback to invoke when future is completed. |
| * @param force If {@code true} then creates future without optimizations checks. |
| * @return Backup update future or {@code null} if there are no backups. |
| */ |
| @Nullable private GridDhtAtomicAbstractUpdateFuture createDhtFuture( |
| GridCacheVersion writeVer, |
| GridNearAtomicAbstractUpdateRequest updateReq, |
| GridNearAtomicUpdateResponse updateRes, |
| CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> completionCb, |
| boolean force |
| ) { |
| if (!force) { |
| if (updateReq.fastMap()) |
| return null; |
| |
| AffinityTopologyVersion topVer = updateReq.topologyVersion(); |
| |
| Collection<ClusterNode> nodes = ctx.kernalContext().discovery().cacheAffinityNodes(name(), topVer); |
| |
| // We are on primary node for some key. |
| assert !nodes.isEmpty() : "Failed to find affinity nodes [name=" + name() + ", topVer=" + topVer + |
| ctx.kernalContext().discovery().discoCache(topVer) + ']'; |
| |
| if (nodes.size() == 1) { |
| if (log.isDebugEnabled()) |
| log.debug("Partitioned cache topology has only one node, will not create DHT atomic update future " + |
| "[topVer=" + topVer + ", updateReq=" + updateReq + ']'); |
| |
| return null; |
| } |
| } |
| |
| if (updateReq.size() == 1) |
| return new GridDhtAtomicSingleUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); |
| else |
| return new GridDhtAtomicUpdateFuture(ctx, completionCb, writeVer, updateReq, updateRes); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param req Near atomic update request. |
| */ |
| private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received near atomic update request [futId=" + req.futureVersion() + |
| ", writeVer=" + req.updateVersion() + |
| ", node=" + nodeId + ']'); |
| } |
| |
| req.nodeId(ctx.localNodeId()); |
| |
| updateAllAsyncInternal(nodeId, req, updateReplyClos); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Near atomic update response. |
| */ |
| @SuppressWarnings("unchecked") |
| private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { |
| if (msgLog.isDebugEnabled()) |
| msgLog.debug("Received near atomic update response [futId" + res.futureVersion() + ", node=" + nodeId + ']'); |
| |
| res.nodeId(ctx.localNodeId()); |
| |
| GridNearAtomicAbstractUpdateFuture fut = |
| (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); |
| |
| if (fut != null) |
| fut.onResult(nodeId, res, false); |
| |
| else |
| U.warn(msgLog, "Failed to find near update future for update response (will ignore) " + |
| "[futId" + res.futureVersion() + ", node=" + nodeId + ", res=" + res + ']'); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param req Dht atomic update request. |
| */ |
| private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic update request [futId=" + req.futureVersion() + |
| ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); |
| } |
| |
| GridCacheVersion ver = req.writeVersion(); |
| |
| // Always send update reply. |
| GridDhtAtomicUpdateResponse res = new GridDhtAtomicUpdateResponse(ctx.cacheId(), req.futureVersion(), |
| ctx.deploymentEnabled()); |
| |
| Boolean replicate = ctx.isDrEnabled(); |
| |
| boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; |
| |
| String taskName = ctx.kernalContext().task().resolveTaskName(req.taskNameHash()); |
| |
| for (int i = 0; i < req.size(); i++) { |
| KeyCacheObject key = req.key(i); |
| |
| try { |
| while (true) { |
| GridDhtCacheEntry entry = null; |
| |
| try { |
| entry = entryExx(key); |
| |
| CacheObject val = req.value(i); |
| CacheObject prevVal = req.previousValue(i); |
| |
| EntryProcessor<Object, Object, Object> entryProcessor = req.entryProcessor(i); |
| Long updateIdx = req.updateCounter(i); |
| |
| GridCacheOperation op = entryProcessor != null ? TRANSFORM : |
| (val != null) ? UPDATE : DELETE; |
| |
| long ttl = req.ttl(i); |
| long expireTime = req.conflictExpireTime(i); |
| |
| GridCacheUpdateAtomicResult updRes = entry.innerUpdate( |
| ver, |
| nodeId, |
| nodeId, |
| op, |
| op == TRANSFORM ? entryProcessor : val, |
| op == TRANSFORM ? req.invokeArguments() : null, |
| /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) |
| && writeThrough() && !req.skipStore(), |
| /*read-through*/false, |
| /*retval*/false, |
| req.keepBinary(), |
| /*expiry policy*/null, |
| /*event*/true, |
| /*metrics*/true, |
| /*primary*/false, |
| /*check version*/!req.forceTransformBackups(), |
| req.topologyVersion(), |
| CU.empty0(), |
| replicate ? DR_BACKUP : DR_NONE, |
| ttl, |
| expireTime, |
| req.conflictVersion(i), |
| false, |
| intercept, |
| req.subjectId(), |
| taskName, |
| prevVal, |
| updateIdx, |
| null); |
| |
| if (updRes.removeVersion() != null) |
| ctx.onDeferredDelete(entry, updRes.removeVersion()); |
| |
| entry.onUnlock(); |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry while updating backup value (will retry): " + key); |
| |
| entry = null; |
| } |
| finally { |
| if (entry != null) |
| ctx.evicts().touch(entry, req.topologyVersion()); |
| } |
| } |
| } |
| catch (GridDhtInvalidPartitionException ignored) { |
| // Ignore. |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(key, new IgniteCheckedException("Failed to update key on backup node: " + key, e)); |
| } |
| } |
| |
| if (isNearEnabled(cacheCfg)) |
| ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, res); |
| |
| try { |
| if (res.failedKeys() != null || res.nearEvicted() != null || req.writeSynchronizationMode() == FULL_SYNC) { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Sent DHT atomic update response [futId=" + req.futureVersion() + |
| ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); |
| } |
| } |
| else { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Will send deferred DHT atomic update response [futId=" + req.futureVersion() + |
| ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); |
| } |
| |
| // No failed keys and sync mode is not FULL_SYNC, thus sending deferred response. |
| sendDeferredUpdateResponse(nodeId, req.futureVersion()); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| U.warn(msgLog, "Failed to send DHT atomic update response, node left [futId=" + req.futureVersion() + |
| ", node=" + req.nodeId() + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(msgLog, "Failed to send DHT atomic update response [futId=" + req.futureVersion() + |
| ", node=" + nodeId + ", res=" + res + ']', e); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID to send message to. |
| * @param ver Version to ack. |
| */ |
| private void sendDeferredUpdateResponse(UUID nodeId, GridCacheVersion ver) { |
| deferredUpdateMsgSnd.sendDeferredAckMessage(nodeId, ver); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Dht atomic update response. |
| */ |
| @SuppressWarnings("unchecked") |
| private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { |
| GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureVersion()); |
| |
| if (updateFut != null) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic update response [futId=" + res.futureVersion() + |
| ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']'); |
| } |
| |
| updateFut.onResult(nodeId, res); |
| } |
| else { |
| U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureVersion() + |
| ", node=" + nodeId + ", res=" + res + ']'); |
| } |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Deferred atomic update response. |
| */ |
| @SuppressWarnings("unchecked") |
| private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { |
| for (GridCacheVersion ver : res.futureVersions()) { |
| GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(ver); |
| |
| if (updateFut != null) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic deferred update response [futId=" + ver + |
| ", writeVer=" + res + ", node=" + nodeId + ']'); |
| } |
| |
| updateFut.onResult(nodeId); |
| } |
| else { |
| U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + ver + |
| ", nodeId=" + nodeId + ", res=" + res + ']'); |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Originating node ID. |
| * @param res Near update response. |
| */ |
| private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { |
| try { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) |
| msgLog.debug("Sent near update response [futId=" + res.futureVersion() + ", node=" + nodeId + ']'); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to send near update response [futId=" + res.futureVersion() + |
| ", node=" + nodeId + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(msgLog, "Failed to send near update response [futId=" + res.futureVersion() + |
| ", node=" + nodeId + ", res=" + res + ']', e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtAtomicCache.class, this, super.toString()); |
| } |
| |
| /** |
| * Result of {@link GridDhtAtomicCache#updateSingle} execution. |
| */ |
| private static class UpdateSingleResult { |
| /** */ |
| private final GridCacheReturn retVal; |
| |
| /** */ |
| private final Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; |
| |
| /** */ |
| private final GridDhtAtomicAbstractUpdateFuture dhtFut; |
| |
| /** |
| * @param retVal Return value. |
| * @param deleted Deleted entries. |
| * @param dhtFut DHT future. |
| */ |
| private UpdateSingleResult(GridCacheReturn retVal, |
| Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted, |
| GridDhtAtomicAbstractUpdateFuture dhtFut) { |
| this.retVal = retVal; |
| this.deleted = deleted; |
| this.dhtFut = dhtFut; |
| } |
| |
| /** |
| * @return Return value. |
| */ |
| private GridCacheReturn returnValue() { |
| return retVal; |
| } |
| |
| /** |
| * @return Deleted entries. |
| */ |
| private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { |
| return deleted; |
| } |
| |
| /** |
| * @return DHT future. |
| */ |
| public GridDhtAtomicAbstractUpdateFuture dhtFuture() { |
| return dhtFut; |
| } |
| } |
| |
| /** |
| * Result of {@link GridDhtAtomicCache#updateWithBatch} execution. |
| */ |
| private static class UpdateBatchResult { |
| /** */ |
| private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted; |
| |
| /** */ |
| private GridDhtAtomicAbstractUpdateFuture dhtFut; |
| |
| /** */ |
| private boolean readersOnly; |
| |
| /** */ |
| private GridCacheReturn invokeRes; |
| |
| /** |
| * @param entry Entry. |
| * @param updRes Entry update result. |
| * @param entries All entries. |
| */ |
| private void addDeleted(GridDhtCacheEntry entry, |
| GridCacheUpdateAtomicResult updRes, |
| Collection<GridDhtCacheEntry> entries) { |
| if (updRes.removeVersion() != null) { |
| if (deleted == null) |
| deleted = new ArrayList<>(entries.size()); |
| |
| deleted.add(F.t(entry, updRes.removeVersion())); |
| } |
| } |
| |
| /** |
| * @return Deleted entries. |
| */ |
| private Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted() { |
| return deleted; |
| } |
| |
| /** |
| * @return DHT future. |
| */ |
| public GridDhtAtomicAbstractUpdateFuture dhtFuture() { |
| return dhtFut; |
| } |
| |
| /** |
| * @param invokeRes Result for invoke operation. |
| */ |
| private void invokeResult(GridCacheReturn invokeRes) { |
| this.invokeRes = invokeRes; |
| } |
| |
| /** |
| * @return Result for invoke operation. |
| */ |
| GridCacheReturn invokeResults() { |
| return invokeRes; |
| } |
| |
| /** |
| * @param dhtFut DHT future. |
| */ |
| private void dhtFuture(@Nullable GridDhtAtomicAbstractUpdateFuture dhtFut) { |
| this.dhtFut = dhtFut; |
| } |
| |
| /** |
| * @return {@code True} if only readers (not backups) should be updated. |
| */ |
| private boolean readersOnly() { |
| return readersOnly; |
| } |
| |
| /** |
| * @param readersOnly {@code True} if only readers (not backups) should be updated. |
| */ |
| private void readersOnly(boolean readersOnly) { |
| this.readersOnly = readersOnly; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class FinishedLockFuture extends GridFinishedFuture<Boolean> implements GridDhtFuture<Boolean> { |
| /** |
| * @param err Error. |
| */ |
| private FinishedLockFuture(Throwable err) { |
| super(err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Integer> invalidPartitions() { |
| return Collections.emptyList(); |
| } |
| } |
| } |