| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht.atomic; |
| |
| import java.io.Externalizable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| import javax.cache.expiry.ExpiryPolicy; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.EntryProcessorResult; |
| import org.apache.ignite.IgniteCacheRestartingException; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.binary.BinaryInvalidTypeException; |
| import org.apache.ignite.cache.CacheWriteSynchronizationMode; |
| import org.apache.ignite.cache.ReadRepairStrategy; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.failure.FailureContext; |
| import org.apache.ignite.failure.FailureType; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.UnregisteredBinaryTypeException; |
| import org.apache.ignite.internal.UnregisteredClassException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.mem.IgniteOutOfMemoryException; |
| import org.apache.ignite.internal.processors.affinity.AffinityAssignment; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; |
| import org.apache.ignite.internal.processors.cache.CacheInvokeResult; |
| import org.apache.ignite.internal.processors.cache.CacheLazyEntry; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.CacheStoppedException; |
| import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException; |
| import org.apache.ignite.internal.processors.cache.EntryGetResult; |
| import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; |
| import org.apache.ignite.internal.processors.cache.GridCacheOperation; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.GridCacheUpdateAtomicResult; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.LockedEntriesInfo; |
| import org.apache.ignite.internal.processors.cache.binary.CacheObjectBinaryProcessorImpl; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearAtomicCache; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.consistency.GridNearReadRepairCheckOnlyFuture; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrExpirationInfo; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.persistence.StorageException; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersionEx; |
| import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessor; |
| import org.apache.ignite.internal.processors.timeout.GridTimeoutObject; |
| import org.apache.ignite.internal.util.GridLongList; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.nio.GridNioBackPressureControl; |
| import org.apache.ignite.internal.util.nio.GridNioMessageTracker; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.CO; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE; |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_ASYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.FULL_SYNC; |
| import static org.apache.ignite.cache.CacheWriteSynchronizationMode.PRIMARY_SYNC; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ; |
| import static org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; |
| import static org.apache.ignite.internal.processors.cache.GridCacheUtils.isNearEnabled; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_BACKUP; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_PRIMARY; |
| |
| /** |
| * Non-transactional partitioned cache. |
| */ |
| @SuppressWarnings({"unchecked", "TooBroadScope"}) |
| @GridToStringExclude |
| public class GridDhtAtomicCache<K, V> extends GridDhtCacheAdapter<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** @see IgniteSystemProperties#IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE */ |
| public static final int DFLT_ATOMIC_DEFERRED_ACK_BUFFER_SIZE = 256; |
| |
| /** @see IgniteSystemProperties#IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT */ |
| public static final int DFLT_ATOMIC_DEFERRED_ACK_TIMEOUT = 500; |
| |
| /** Deferred update response buffer size. */ |
| private static final int DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE = |
| Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_BUFFER_SIZE, DFLT_ATOMIC_DEFERRED_ACK_BUFFER_SIZE); |
| |
| /** Deferred update response timeout. */ |
| private static final int DEFERRED_UPDATE_RESPONSE_TIMEOUT = |
| Integer.getInteger(IGNITE_ATOMIC_DEFERRED_ACK_TIMEOUT, DFLT_ATOMIC_DEFERRED_ACK_TIMEOUT); |
| |
| /** */ |
| private final ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>> defRes = |
| new ThreadLocal<Map<UUID, GridDhtAtomicDeferredUpdateResponse>>() { |
| @Override protected Map<UUID, GridDhtAtomicDeferredUpdateResponse> initialValue() { |
| return new HashMap<>(); |
| } |
| }; |
| |
| /** Locked entries info for each thread. */ |
| private final LockedEntriesInfo lockedEntriesInfo = new LockedEntriesInfo(); |
| |
| /** Update reply closure. */ |
| @GridToStringExclude |
| private UpdateReplyClosure updateReplyClos; |
| |
| /** */ |
| private GridNearAtomicCache<K, V> near; |
| |
| /** Logger. */ |
| private IgniteLogger msgLog; |
| |
| /** |
| * Empty constructor required by {@link Externalizable}. |
| */ |
| public GridDhtAtomicCache() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Cache context. |
| */ |
| public GridDhtAtomicCache(GridCacheContext<K, V> ctx) { |
| super(ctx); |
| |
| msgLog = ctx.shared().atomicMessageLogger(); |
| } |
| |
| /** |
| * @param ctx Cache context. |
| * @param map Cache concurrent map. |
| */ |
| public GridDhtAtomicCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { |
| super(ctx, map); |
| |
| msgLog = ctx.shared().atomicMessageLogger(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void checkJta() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isDhtAtomic() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void init() { |
| super.init(); |
| |
| updateReplyClos = new UpdateReplyClosure() { |
| @Override public void apply(GridNearAtomicAbstractUpdateRequest req, GridNearAtomicUpdateResponse res) { |
| if (req.writeSynchronizationMode() != FULL_ASYNC) |
| sendNearUpdateReply(res.nodeId(), res); |
| else { |
| if (res.remapTopologyVersion() != null) |
| // Remap keys on primary node in FULL_ASYNC mode. |
| remapToNewPrimary(req); |
| else if (res.error() != null) { |
| U.error(log, "Failed to process write update request in FULL_ASYNC mode for keys: " + |
| res.failedKeys(), res.error()); |
| } |
| } |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStart() throws IgniteCheckedException { |
| super.onKernalStart(); |
| |
| assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearGetRequest.class, |
| (CI2<UUID, GridNearGetRequest>)this::processNearGetRequest); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearSingleGetRequest.class, |
| (CI2<UUID, GridNearSingleGetRequest>)this::processNearSingleGetRequest); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearAtomicAbstractUpdateRequest.class, |
| new CI2<UUID, GridNearAtomicAbstractUpdateRequest>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearAtomicAbstractUpdateRequest req |
| ) { |
| processNearAtomicUpdateRequest( |
| nodeId, |
| req); |
| } |
| |
| @Override public String toString() { |
| return "GridNearAtomicAbstractUpdateRequest handler " + |
| "[msgIdx=" + GridNearAtomicAbstractUpdateRequest.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearAtomicUpdateResponse.class, |
| new CI2<UUID, GridNearAtomicUpdateResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridNearAtomicUpdateResponse res |
| ) { |
| processNearAtomicUpdateResponse( |
| nodeId, |
| res); |
| } |
| |
| @Override public String toString() { |
| return "GridNearAtomicUpdateResponse handler " + |
| "[msgIdx=" + GridNearAtomicUpdateResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridDhtAtomicAbstractUpdateRequest.class, |
| new CI2<UUID, GridDhtAtomicAbstractUpdateRequest>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridDhtAtomicAbstractUpdateRequest req |
| ) { |
| processDhtAtomicUpdateRequest( |
| nodeId, |
| req); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicUpdateRequest handler " + |
| "[msgIdx=" + GridDhtAtomicUpdateRequest.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridDhtAtomicUpdateResponse.class, |
| new CI2<UUID, GridDhtAtomicUpdateResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridDhtAtomicUpdateResponse res |
| ) { |
| processDhtAtomicUpdateResponse( |
| nodeId, |
| res); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicUpdateResponse handler " + |
| "[msgIdx=" + GridDhtAtomicUpdateResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridDhtAtomicDeferredUpdateResponse.class, |
| new CI2<UUID, GridDhtAtomicDeferredUpdateResponse>() { |
| @Override public void apply( |
| UUID nodeId, |
| GridDhtAtomicDeferredUpdateResponse res |
| ) { |
| processDhtAtomicDeferredUpdateResponse( |
| nodeId, |
| res); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicDeferredUpdateResponse handler " + |
| "[msgIdx=" + GridDhtAtomicDeferredUpdateResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridDhtAtomicNearResponse.class, |
| new CI2<UUID, GridDhtAtomicNearResponse>() { |
| @Override public void apply(UUID uuid, GridDhtAtomicNearResponse msg) { |
| processDhtAtomicNearResponse(uuid, msg); |
| } |
| |
| @Override public String toString() { |
| return "GridDhtAtomicNearResponse handler " + |
| "[msgIdx=" + GridDhtAtomicNearResponse.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearAtomicCheckUpdateRequest.class, |
| new CI2<UUID, GridNearAtomicCheckUpdateRequest>() { |
| @Override public void apply(UUID uuid, GridNearAtomicCheckUpdateRequest msg) { |
| processCheckUpdateRequest(uuid, msg); |
| } |
| |
| @Override public String toString() { |
| return "GridNearAtomicCheckUpdateRequest handler " + |
| "[msgIdx=" + GridNearAtomicCheckUpdateRequest.CACHE_MSG_IDX + ']'; |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridDhtForceKeysRequest.class, |
| new MessageHandler<GridDhtForceKeysRequest>() { |
| @Override public void onMessage(ClusterNode node, GridDhtForceKeysRequest msg) { |
| processForceKeysRequest(node, msg); |
| } |
| }); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridDhtForceKeysResponse.class, |
| new MessageHandler<GridDhtForceKeysResponse>() { |
| @Override public void onMessage(ClusterNode node, GridDhtForceKeysResponse msg) { |
| processForceKeyResponse(node, msg); |
| } |
| }); |
| |
| if (near == null) { |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearGetResponse.class, |
| (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearSingleGetResponse.class, |
| (CI2<UUID, GridNearSingleGetResponse>)this::processNearSingleGetResponse); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| assert metrics != null : "Cache metrics instance isn't initialized."; |
| |
| if (ctx.dht().near() != null) |
| metrics.delegate(ctx.dht().near().metrics0()); |
| } |
| |
| /** |
| * @param near Near cache. |
| */ |
| public void near(GridNearAtomicCache<K, V> near) { |
| this.near = near; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNearCacheAdapter<K, V> near() { |
| return near; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteInternalFuture<V> getAsync( |
| final K key, |
| final boolean forcePrimary, |
| final boolean skipTx, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean skipVals, |
| final boolean needVer |
| ) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null; |
| final boolean skipStore = opCtx != null && opCtx.skipStore(); |
| final boolean recovery = opCtx != null && opCtx.recovery(); |
| final ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null; |
| |
| return asyncOp(new CO<IgniteInternalFuture<V>>() { |
| @Override public IgniteInternalFuture<V> apply() { |
| return getAsync0(ctx.toCacheKeyObject(key), |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| expiryPlc, |
| skipVals, |
| skipStore, |
| needVer); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected Map<K, V> getAll( |
| Collection<? extends K> keys, |
| boolean deserializeBinary, |
| boolean needVer, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy) throws IgniteCheckedException { |
| return getAllAsyncInternal(keys, |
| !ctx.config().isReadFromBackup(), |
| ctx.kernalContext().job().currentTaskName(), |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| false, |
| needVer, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable final Collection<? extends K> keys, |
| final boolean forcePrimary, |
| boolean skipTx, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean recovery, |
| final ReadRepairStrategy readRepairStrategy, |
| final boolean skipVals, |
| final boolean needVer |
| ) { |
| return getAllAsyncInternal(keys, |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| skipVals, |
| needVer, |
| true); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param forcePrimary Force primary flag. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param readRepairStrategy Read Repair strategy. |
| * @param skipVals Skip values flag. |
| * @param needVer Need version flag. |
| * @param asyncOp Async operation flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<Map<K, V>> getAllAsyncInternal( |
| @Nullable final Collection<? extends K> keys, |
| final boolean forcePrimary, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean recovery, |
| final ReadRepairStrategy readRepairStrategy, |
| final boolean skipVals, |
| final boolean needVer, |
| boolean asyncOp |
| ) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| if (F.isEmpty(keys)) |
| return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); |
| |
| warnIfUnordered(keys, BulkOperation.GET); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final ExpiryPolicy expiryPlc = skipVals ? null : opCtx != null ? opCtx.expiry() : null; |
| |
| final boolean skipStore = opCtx != null && opCtx.skipStore(); |
| |
| if (asyncOp) { |
| return asyncOp(new CO<IgniteInternalFuture<Map<K, V>>>() { |
| @Override public IgniteInternalFuture<Map<K, V>> apply() { |
| return getAllAsync0(ctx.cacheKeysView(keys), |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| expiryPlc, |
| skipVals, |
| skipStore, |
| needVer); |
| } |
| }); |
| } |
| else { |
| return getAllAsync0(ctx.cacheKeysView(keys), |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| expiryPlc, |
| skipVals, |
| skipStore, |
| needVer); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { |
| return (V)update0( |
| key, |
| val, |
| null, |
| null, |
| true, |
| filter, |
| false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { |
| Boolean res = (Boolean)update0( |
| key, |
| val, |
| null, |
| null, |
| false, |
| filter, |
| false).get(); |
| |
| assert res != null; |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { |
| return update0( |
| key, |
| val, |
| null, |
| null, |
| true, |
| filter, |
| true); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { |
| return update0( |
| key, |
| val, |
| null, |
| null, |
| false, |
| filter, |
| true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException { |
| updateAll0( |
| m, |
| null, |
| null, |
| null, |
| null, |
| false, |
| UPDATE, |
| false |
| ).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) { |
| return updateAll0( |
| m, |
| null, |
| null, |
| null, |
| null, |
| false, |
| UPDATE, |
| true |
| ).chain(RET2NULL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void putAllConflict(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) |
| throws IgniteCheckedException { |
| putAllConflictAsync(conflictMap).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllConflictAsync(Map<KeyCacheObject, GridCacheDrInfo> conflictMap) { |
| ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); |
| |
| warnIfUnordered(conflictMap, BulkOperation.PUT); |
| |
| return updateAll0( |
| null, |
| null, |
| null, |
| conflictMap, |
| null, |
| false, |
| UPDATE, |
| true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public V getAndRemove0(K key) throws IgniteCheckedException { |
| return (V)remove0(key, true, null, false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) { |
| return remove0(key, true, null, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException { |
| removeAllAsync0(keys, null, false, false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) { |
| return removeAllAsync0(keys, null, false, true).chain(RET2NULL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean remove0(K key, CacheEntryPredicate filter) throws IgniteCheckedException { |
| return (Boolean)remove0(key, false, filter, false).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) { |
| return remove0(key, false, filter, true); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeAllConflict(Map<KeyCacheObject, GridCacheVersion> conflictMap) |
| throws IgniteCheckedException { |
| removeAllConflictAsync(conflictMap).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> removeAllConflictAsync(Map<KeyCacheObject, GridCacheVersion> conflictMap) { |
| ctx.dr().onReceiveCacheEntriesReceived(conflictMap.size()); |
| |
| return removeAllAsync0(null, conflictMap, false, true); |
| } |
| |
| /** |
| * @return {@code True} if store write-through enabled. |
| */ |
| private boolean writeThrough() { |
| return ctx.writeThrough() && ctx.store().configured(); |
| } |
| |
| /** |
| * @param op Operation closure. |
| * @return Future. |
| */ |
| private <T> IgniteInternalFuture<T> asyncOp(final CO<IgniteInternalFuture<T>> op) { |
| IgniteInternalFuture<T> fail = asyncOpAcquire(/*retry*/false); |
| |
| if (fail != null) |
| return fail; |
| |
| IgniteInternalFuture<T> f = op.apply(); |
| |
| f.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| asyncOpRelease(/*retry*/false); |
| } |
| }); |
| |
| return f; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteInternalFuture<Boolean> lockAllAsync(Collection<KeyCacheObject> keys, |
| long timeout, |
| @Nullable IgniteTxLocalEx tx, |
| boolean isInvalidate, |
| boolean isRead, |
| boolean retval, |
| @Nullable TransactionIsolation isolation, |
| long createTtl, |
| long accessTtl) { |
| return new FinishedLockFuture(new UnsupportedOperationException("Locks are not supported for " + |
| "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> EntryProcessorResult<T> invoke(K key, EntryProcessor<K, V, T> entryProcessor, Object... args) |
| throws IgniteCheckedException { |
| IgniteInternalFuture<EntryProcessorResult<T>> invokeFut = invoke0(false, key, entryProcessor, args); |
| |
| return invokeFut.get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) throws IgniteCheckedException { |
| |
| warnIfUnordered(keys, BulkOperation.INVOKE); |
| |
| return invokeAll0(false, keys, entryProcessor, args).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| return invoke0(true, key, entryProcessor, args); |
| } |
| |
| /** |
| * @param async Async operation flag. |
| * @param key Key. |
| * @param entryProcessor Entry processor. |
| * @param args Entry processor arguments. |
| * @return Future. |
| */ |
| private <T> IgniteInternalFuture<EntryProcessorResult<T>> invoke0( |
| boolean async, |
| K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| A.notNull(key, "key", entryProcessor, "entryProcessor"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = update0( |
| key, |
| null, |
| entryProcessor, |
| args, |
| false, |
| null, |
| async); |
| |
| return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { |
| @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) |
| throws IgniteCheckedException { |
| Map<K, EntryProcessorResult<T>> resMap = fut.get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (resMap != null) { |
| assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); |
| |
| EntryProcessorResult<T> res = |
| resMap.isEmpty() ? new CacheInvokeResult<>() : resMap.values().iterator().next(); |
| |
| if (res instanceof CacheInvokeResult) { |
| CacheInvokeResult invokeRes = (CacheInvokeResult)res; |
| |
| if (invokeRes.result() != null) |
| res = CacheInvokeResult.fromResult((T)ctx.unwrapBinaryIfNeeded(invokeRes.result(), |
| keepBinary, false, null)); |
| } |
| |
| return res; |
| } |
| |
| return new CacheInvokeResult<>(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync(Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| |
| warnIfUnordered(keys, BulkOperation.INVOKE); |
| |
| return invokeAll0(true, keys, entryProcessor, args); |
| } |
| |
| /** |
| * @param async Async operation flag. |
| * @param keys Keys. |
| * @param entryProcessor Entry processor. |
| * @param args Entry processor arguments. |
| * @return Future. |
| */ |
| private <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAll0( |
| boolean async, |
| Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| A.notNull(keys, "keys", entryProcessor, "entryProcessor"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { |
| @Override public EntryProcessor apply(K k) { |
| return entryProcessor; |
| } |
| }); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> resFut = updateAll0( |
| null, |
| invokeMap, |
| args, |
| null, |
| null, |
| false, |
| TRANSFORM, |
| async); |
| |
| return resFut.chain( |
| new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, Map<K, EntryProcessorResult<T>>>() { |
| @Override public Map<K, EntryProcessorResult<T>> applyx( |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut |
| ) throws IgniteCheckedException { |
| Map<Object, EntryProcessorResult> resMap = (Map)fut.get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| return ctx.unwrapInvokeResult(resMap, keepBinary); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( |
| Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| Object... args) throws IgniteCheckedException { |
| A.notNull(map, "map"); |
| |
| warnIfUnordered(map, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| Map<K, EntryProcessorResult<T>> updateResults = (Map<K, EntryProcessorResult<T>>)updateAll0( |
| null, |
| map, |
| args, |
| null, |
| null, |
| false, |
| TRANSFORM, |
| false).get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| return updateResults; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( |
| Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| Object... args) { |
| A.notNull(map, "map"); |
| |
| warnIfUnordered(map, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture updateResults = updateAll0( |
| null, |
| map, |
| args, |
| null, |
| null, |
| false, |
| TRANSFORM, |
| true); |
| |
| if (statsEnabled) |
| updateResults.listen(new InvokeAllTimeStatClosure(metrics0(), start)); |
| |
| return updateResults; |
| } |
| |
| /** |
| * Entry point for all public API put/transform methods. |
| * |
| * @param map Put map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. |
| * @param invokeMap Invoke map. Either {@code map}, {@code invokeMap} or {@code conflictPutMap} should be passed. |
| * @param invokeArgs Optional arguments for EntryProcessor. |
| * @param conflictPutMap Conflict put map. |
| * @param conflictRmvMap Conflict remove map. |
| * @param retval Return value required flag. |
| * @param async Async operation flag. |
| * @return Completion future. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| private IgniteInternalFuture updateAll0( |
| @Nullable Map<? extends K, ? extends V> map, |
| @Nullable Map<? extends K, ? extends EntryProcessor> invokeMap, |
| @Nullable Object[] invokeArgs, |
| @Nullable Map<KeyCacheObject, GridCacheDrInfo> conflictPutMap, |
| @Nullable Map<KeyCacheObject, GridCacheVersion> conflictRmvMap, |
| final boolean retval, |
| final GridCacheOperation op, |
| boolean async |
| ) { |
| assert ctx.updatesAllowed(); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_PUT); |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| if (opCtx != null && opCtx.hasDataCenterId()) { |
| assert conflictPutMap == null : conflictPutMap; |
| assert conflictRmvMap == null : conflictRmvMap; |
| |
| if (op == GridCacheOperation.TRANSFORM) { |
| assert invokeMap != null : invokeMap; |
| |
| conflictPutMap = F.viewReadOnly((Map)invokeMap, |
| new IgniteClosure<EntryProcessor, GridCacheDrInfo>() { |
| @Override public GridCacheDrInfo apply(EntryProcessor o) { |
| return new GridCacheDrInfo(o, nextVersion(opCtx.dataCenterId())); |
| } |
| }); |
| |
| invokeMap = null; |
| } |
| else if (op == GridCacheOperation.DELETE) { |
| assert map != null : map; |
| |
| conflictRmvMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheVersion>() { |
| @Override public GridCacheVersion apply(V o) { |
| return nextVersion(opCtx.dataCenterId()); |
| } |
| }); |
| |
| map = null; |
| } |
| else { |
| assert map != null : map; |
| |
| conflictPutMap = F.viewReadOnly((Map)map, new IgniteClosure<V, GridCacheDrInfo>() { |
| @Override public GridCacheDrInfo apply(V o) { |
| return new GridCacheDrInfo(ctx.toCacheObject(o), nextVersion(opCtx.dataCenterId())); |
| } |
| }); |
| |
| map = null; |
| } |
| } |
| |
| int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); |
| |
| final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| op, |
| map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : conflictPutMap != null ? |
| conflictPutMap.keySet() : conflictRmvMap.keySet(), |
| map != null ? map.values() : invokeMap != null ? invokeMap.values() : null, |
| invokeArgs, |
| (Collection)(conflictPutMap != null ? conflictPutMap.values() : null), |
| conflictRmvMap != null ? conflictRmvMap.values() : null, |
| retval, |
| opCtx != null ? opCtx.expiry() : null, |
| CU.filterArray(null), |
| taskNameHash, |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Entry point for update/invoke with a single key. |
| * |
| * @param key Key. |
| * @param val Value. |
| * @param proc Entry processor. |
| * @param invokeArgs Invoke arguments. |
| * @param retval Return value flag. |
| * @param filter Filter. |
| * @param async Async operation flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture update0( |
| K key, |
| @Nullable V val, |
| @Nullable EntryProcessor proc, |
| @Nullable Object[] invokeArgs, |
| final boolean retval, |
| @Nullable final CacheEntryPredicate filter, |
| boolean async |
| ) { |
| assert val == null || proc == null; |
| |
| assert ctx.updatesAllowed(); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_PUT); |
| |
| final GridNearAtomicAbstractUpdateFuture updateFut = |
| createSingleUpdateFuture(key, val, proc, invokeArgs, retval, filter); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Entry point for remove with single key. |
| * |
| * @param key Key. |
| * @param retval Whether to return |
| * @param filter Filter. |
| * @param async Async operation flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture remove0(K key, final boolean retval, |
| @Nullable CacheEntryPredicate filter, |
| boolean async) { |
| assert ctx.updatesAllowed(); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| |
| final GridNearAtomicAbstractUpdateFuture updateFut = createSingleUpdateFuture(key, |
| null, |
| null, |
| null, |
| retval, |
| filter); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Craete future for single key-val pair update. |
| * |
| * @param key Key. |
| * @param val Value. |
| * @param proc Processor. |
| * @param invokeArgs Invoke arguments. |
| * @param retval Return value flag. |
| * @param filter Filter. |
| * @return Future. |
| */ |
| private GridNearAtomicAbstractUpdateFuture createSingleUpdateFuture( |
| K key, |
| @Nullable V val, |
| @Nullable EntryProcessor proc, |
| @Nullable Object[] invokeArgs, |
| boolean retval, |
| @Nullable CacheEntryPredicate filter |
| ) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| GridCacheOperation op; |
| Object val0; |
| |
| if (val != null) { |
| op = UPDATE; |
| val0 = val; |
| } |
| else if (proc != null) { |
| op = TRANSFORM; |
| val0 = proc; |
| } |
| else { |
| op = DELETE; |
| val0 = null; |
| } |
| |
| GridCacheDrInfo conflictPutVal = null; |
| GridCacheVersion conflictRmvVer = null; |
| |
| if (opCtx != null && opCtx.hasDataCenterId()) { |
| Byte dcId = opCtx.dataCenterId(); |
| |
| assert dcId != null; |
| |
| if (op == UPDATE) { |
| conflictPutVal = new GridCacheDrInfo(ctx.toCacheObject(val), nextVersion(dcId)); |
| |
| val0 = null; |
| } |
| else if (op == GridCacheOperation.TRANSFORM) { |
| conflictPutVal = new GridCacheDrInfo(proc, nextVersion(dcId)); |
| |
| val0 = null; |
| } |
| else |
| conflictRmvVer = nextVersion(dcId); |
| } |
| |
| CacheEntryPredicate[] filters = CU.filterArray(filter); |
| |
| ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null; |
| |
| // Providing the guarantee that all copies are updated when read repair operation is finished. |
| CacheWriteSynchronizationMode syncMode = |
| readRepairStrategy != null ? FULL_SYNC : ctx.config().getWriteSynchronizationMode(); |
| |
| if (conflictPutVal == null && conflictRmvVer == null) { |
| return new GridNearAtomicSingleUpdateFuture( |
| ctx, |
| this, |
| syncMode, |
| op, |
| key, |
| val0, |
| invokeArgs, |
| retval, |
| opCtx != null ? opCtx.expiry() : null, |
| filters, |
| ctx.kernalContext().job().currentTaskNameHash(), |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES |
| ); |
| } |
| else { |
| return new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| syncMode, |
| op, |
| Collections.singletonList(key), |
| val0 != null ? Collections.singletonList(val0) : null, |
| invokeArgs, |
| conflictPutVal != null ? Collections.singleton(conflictPutVal) : null, |
| conflictRmvVer != null ? Collections.singleton(conflictRmvVer) : null, |
| retval, |
| opCtx != null ? opCtx.expiry() : null, |
| filters, |
| ctx.kernalContext().job().currentTaskNameHash(), |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); |
| } |
| } |
| |
| /** |
| * Entry point for all public API remove methods. |
| * |
| * @param keys Keys to remove. |
| * @param conflictMap Conflict map. |
| * @param retval Return value required flag. |
| * @return Completion future. |
| */ |
| private IgniteInternalFuture removeAllAsync0( |
| @Nullable Collection<? extends K> keys, |
| @Nullable Map<KeyCacheObject, GridCacheVersion> conflictMap, |
| final boolean retval, |
| boolean async |
| ) { |
| assert ctx.updatesAllowed(); |
| |
| assert keys != null || conflictMap != null; |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| int taskNameHash = ctx.kernalContext().job().currentTaskNameHash(); |
| |
| Collection<GridCacheVersion> drVers = null; |
| |
| if (opCtx != null && keys != null && opCtx.hasDataCenterId()) { |
| assert conflictMap == null : conflictMap; |
| |
| drVers = F.transform(keys, new C1<K, GridCacheVersion>() { |
| @Override public GridCacheVersion apply(K k) { |
| return nextVersion(opCtx.dataCenterId()); |
| } |
| }); |
| } |
| |
| final GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| DELETE, |
| keys != null ? keys : conflictMap.keySet(), |
| null, |
| null, |
| null, |
| drVers != null ? drVers : (keys != null ? null : conflictMap.values()), |
| retval, |
| opCtx != null ? opCtx.expiry() : null, |
| CU.filterArray(null), |
| taskNameHash, |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES); |
| |
| if (async) { |
| return asyncOp(new CO<IgniteInternalFuture<Object>>() { |
| @Override public IgniteInternalFuture<Object> apply() { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| }); |
| } |
| else { |
| updateFut.map(); |
| |
| return updateFut; |
| } |
| } |
| |
| /** |
| * Entry point to all public API single get methods. |
| * |
| * @param key Key. |
| * @param forcePrimary Force primary flag. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param readRepairStrategy Read Repair strategy. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param skipStore Skip store flag. |
| * @param needVer Need version. |
| * @return Get future. |
| */ |
| private IgniteInternalFuture<V> getAsync0(KeyCacheObject key, |
| boolean forcePrimary, |
| String taskName, |
| boolean deserializeBinary, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy, |
| @Nullable ExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean skipStore, |
| boolean needVer |
| ) { |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); |
| |
| if (readRepairStrategy != null) { |
| return new GridNearReadRepairCheckOnlyFuture( |
| topVer, |
| ctx, |
| Collections.singleton(ctx.toCacheKeyObject(key)), |
| readRepairStrategy, |
| !skipStore, |
| taskName, |
| deserializeBinary, |
| recovery, |
| expiry, |
| skipVals, |
| needVer, |
| false, |
| null).single(); |
| } |
| |
| GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, |
| key, |
| topVer, |
| !skipStore, |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| expiry, |
| skipVals, |
| needVer, |
| false, |
| recovery, |
| null); |
| |
| fut.init(); |
| |
| return (IgniteInternalFuture<V>)fut; |
| } |
| |
| /** |
| * Entry point to all public API get methods. |
| * |
| * @param keys Keys. |
| * @param forcePrimary Force primary flag. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param skipStore Skip store flag. |
| * @param needVer Need version. |
| * @return Get future. |
| */ |
| private IgniteInternalFuture<Map<K, V>> getAllAsync0(@Nullable Collection<KeyCacheObject> keys, |
| boolean forcePrimary, |
| String taskName, |
| boolean deserializeBinary, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy, |
| @Nullable ExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean skipStore, |
| boolean needVer |
| ) { |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc); |
| |
| final boolean evt = !skipVals; |
| |
| if (readRepairStrategy != null) { |
| return new GridNearReadRepairCheckOnlyFuture( |
| topVer, |
| ctx, |
| ctx.cacheKeysView(keys), |
| readRepairStrategy, |
| !skipStore, |
| taskName, |
| deserializeBinary, |
| recovery, |
| expiry, |
| skipVals, |
| needVer, |
| false, |
| null).multi(); |
| } |
| |
| // Optimisation: try to resolve value locally and escape 'get future' creation. |
| if (!forcePrimary && ctx.config().isReadFromBackup() && ctx.affinityNode() && |
| ctx.group().topology().lostPartitions().isEmpty()) { |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| Map<K, V> locVals = U.newHashMap(keys.size()); |
| |
| boolean success = true; |
| boolean readNoEntry = ctx.readNoEntry(expiry, false); |
| |
| // Optimistically expect that all keys are available locally (avoid creation of get future). |
| for (KeyCacheObject key : keys) { |
| if (readNoEntry) { |
| CacheDataRow row = ctx.offheap().read(ctx, key); |
| |
| if (row != null) { |
| long expireTime = row.expireTime(); |
| |
| if (expireTime == 0 || expireTime > U.currentTimeMillis()) { |
| ctx.addResult(locVals, |
| key, |
| row.value(), |
| skipVals, |
| false, |
| deserializeBinary, |
| true, |
| null, |
| row.version(), |
| 0, |
| 0, |
| needVer, |
| U.deploymentClassLoader(ctx.kernalContext(), U.contextDeploymentClassLoaderId(ctx.kernalContext()))); |
| |
| if (evt) { |
| ctx.events().readEvent(key, |
| null, |
| null, |
| row.value(), |
| taskName, |
| !deserializeBinary); |
| } |
| } |
| else |
| success = false; |
| } |
| else |
| success = false; |
| } |
| else { |
| GridCacheEntryEx entry = null; |
| |
| while (true) { |
| try { |
| entry = entryEx(key); |
| |
| // If our DHT cache do has value, then we peek it. |
| if (entry != null) { |
| boolean isNew = entry.isNewLocked(); |
| |
| EntryGetResult getRes = null; |
| CacheObject v = null; |
| GridCacheVersion ver = null; |
| |
| if (needVer) { |
| getRes = entry.innerGetVersioned( |
| null, |
| null, |
| /*update-metrics*/false, |
| /*event*/evt, |
| null, |
| taskName, |
| expiry, |
| true, |
| null); |
| |
| if (getRes != null) { |
| v = getRes.value(); |
| ver = getRes.version(); |
| } |
| } |
| else { |
| v = entry.innerGet( |
| null, |
| null, |
| /*read-through*/false, |
| /*update-metrics*/false, |
| /*event*/evt, |
| null, |
| taskName, |
| expiry, |
| !deserializeBinary); |
| } |
| |
| // Entry was not in memory or in swap, so we remove it from cache. |
| if (v == null) { |
| if (isNew && entry.markObsoleteIfEmpty(nextVersion())) |
| removeEntry(entry); |
| |
| success = false; |
| } |
| else { |
| ctx.addResult(locVals, |
| key, |
| v, |
| skipVals, |
| false, |
| deserializeBinary, |
| true, |
| getRes, |
| ver, |
| 0, |
| 0, |
| needVer, |
| U.deploymentClassLoader( |
| ctx.kernalContext(), |
| U.contextDeploymentClassLoaderId(ctx.kernalContext()) |
| ) |
| ); |
| } |
| } |
| else |
| success = false; |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| // No-op, retry. |
| } |
| catch (GridDhtInvalidPartitionException ignored) { |
| success = false; |
| |
| break; // While. |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| } |
| |
| if (!success) |
| break; |
| else if (!skipVals && ctx.statisticsEnabled()) |
| metrics0().onRead(true); |
| } |
| |
| if (success) { |
| sendTtlUpdateRequest(expiry); |
| |
| return new GridFinishedFuture<>(locVals); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| if (expiry != null) |
| expiry.reset(); |
| |
| // Either reload or not all values are available locally. |
| GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>(ctx, |
| keys, |
| !skipStore, |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| recovery, |
| expiry, |
| skipVals, |
| needVer, |
| false, |
| null, |
| null); |
| |
| fut.init(topVer); |
| |
| return fut; |
| } |
| |
| /** |
| * Executes local update. |
| * |
| * @param node Node. |
| * @param req Update request. |
| * @param completionCb Completion callback. |
| */ |
| void updateAllAsyncInternal( |
| final ClusterNode node, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final UpdateReplyClosure completionCb |
| ) { |
| IgniteInternalFuture<Object> forceFut = ctx.group().preloader().request(ctx, req, req.topologyVersion()); |
| |
| if (forceFut == null || forceFut.isDone()) { |
| try { |
| if (forceFut != null) |
| forceFut.get(); |
| } |
| catch (NodeStoppingException ignored) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| onForceKeysError(node.id(), req, completionCb, e); |
| |
| return; |
| } |
| |
| updateAllAsyncInternal0(node, req, completionCb); |
| } |
| else { |
| forceFut.listen(new CI1<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> fut) { |
| try { |
| fut.get(); |
| } |
| catch (NodeStoppingException ignored) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| onForceKeysError(node.id(), req, completionCb, e); |
| |
| return; |
| } |
| |
| updateAllAsyncInternal0(node, req, completionCb); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Update request. |
| * @param completionCb Completion callback. |
| * @param e Error. |
| */ |
| private void onForceKeysError(final UUID nodeId, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final UpdateReplyClosure completionCb, |
| IgniteCheckedException e |
| ) { |
| GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), |
| nodeId, |
| req.futureId(), |
| req.partition(), |
| false, |
| ctx.deploymentEnabled()); |
| |
| res.addFailedKeys(req.keys(), e); |
| |
| completionCb.apply(req, res); |
| } |
| |
| /** |
| * Executes local update after preloader fetched values. |
| * |
| * @param node Node. |
| * @param req Update request. |
| * @param completionCb Completion callback. |
| */ |
| private void updateAllAsyncInternal0( |
| final ClusterNode node, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final UpdateReplyClosure completionCb |
| ) { |
| GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), |
| node.id(), |
| req.futureId(), |
| req.partition(), |
| false, |
| ctx.deploymentEnabled()); |
| |
| assert !req.returnValue() || (req.operation() == TRANSFORM || req.size() == 1); |
| |
| GridDhtAtomicAbstractUpdateFuture dhtFut = null; |
| |
| IgniteCacheExpiryPolicy expiry = null; |
| |
| boolean needTaskName = ctx.events().isRecordable(EVT_CACHE_OBJECT_READ) || |
| ctx.events().isRecordable(EVT_CACHE_OBJECT_PUT) || |
| ctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED); |
| |
| String taskName = needTaskName ? ctx.kernalContext().task().resolveTaskName(req.taskNameHash()) : null; |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); |
| |
| // If batch store update is enabled, we need to lock all entries. |
| // First, need to acquire locks on cache entries, then check filter. |
| List<GridDhtCacheEntry> locked = lockEntries(req, req.topologyVersion());; |
| |
| Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = null; |
| |
| DhtAtomicUpdateResult updDhtRes = new DhtAtomicUpdateResult(); |
| |
| try { |
| while (true) { |
| try { |
| GridDhtPartitionTopology top = topology(); |
| |
| top.readLock(); |
| |
| try { |
| if (top.stopping()) { |
| if (ctx.shared().cache().isCacheRestarting(name())) |
| res.addFailedKeys(req.keys(), new IgniteCacheRestartingException(name())); |
| else |
| res.addFailedKeys(req.keys(), new CacheStoppedException(name())); |
| |
| completionCb.apply(req, res); |
| |
| return; |
| } |
| |
| boolean remap = false; |
| |
| // Do not check topology version if topology was locked on near node by |
| // external transaction or explicit lock. |
| if (!req.topologyLocked()) { |
| AffinityTopologyVersion waitVer = top.topologyVersionFuture().initialVersion(); |
| |
| // No need to remap if next future version is compatible. |
| boolean compatible = |
| waitVer.isBetween(req.lastAffinityChangedTopologyVersion(), req.topologyVersion()); |
| |
| // Can not wait for topology future since it will break |
| // GridNearAtomicCheckUpdateRequest processing. |
| remap = !compatible && !top.topologyVersionFuture().isDone() || |
| needRemap(req.topologyVersion()); |
| } |
| |
| if (!remap) { |
| update(node, locked, req, res, updDhtRes, taskName); |
| |
| dhtFut = updDhtRes.dhtFuture(); |
| deleted = updDhtRes.deleted(); |
| expiry = updDhtRes.expiryPolicy(); |
| } |
| else |
| // Should remap all keys. |
| res.remapTopologyVersion(top.lastTopologyChangeVersion()); |
| } |
| finally { |
| top.readUnlock(); |
| } |
| |
| // This call will convert entry processor invocation results to cache object instances. |
| // Must be done outside topology read lock to avoid deadlocks. |
| if (res.returnValue() != null) |
| res.returnValue().marshalResult(ctx); |
| |
| break; |
| } |
| catch (UnregisteredClassException ex) { |
| IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects(); |
| |
| assert cacheObjProc instanceof CacheObjectBinaryProcessorImpl; |
| |
| ((CacheObjectBinaryProcessorImpl)cacheObjProc) |
| .binaryContext().registerClass(ex.cls(), true, false); |
| } |
| catch (UnregisteredBinaryTypeException ex) { |
| if (ex.future() != null) { |
| // Wait for the future that couldn't be processed because of |
| // IgniteThread#isForbiddenToRequestBinaryMetadata flag being true. Usually this means |
| // that awaiting for the future right there would lead to potential deadlock if |
| // continuous queries are used in parallel with entry processor. |
| ex.future().get(); |
| |
| // Retry and don't update current binary metadata, because it most likely already exists. |
| continue; |
| } |
| |
| IgniteCacheObjectProcessor cacheObjProc = ctx.cacheObjects(); |
| |
| assert cacheObjProc instanceof CacheObjectBinaryProcessorImpl; |
| |
| ((CacheObjectBinaryProcessorImpl)cacheObjProc) |
| .binaryContext().updateMetadata(ex.typeId(), ex.binaryMetadata(), false); |
| } |
| } |
| } |
| catch (GridCacheEntryRemovedException e) { |
| assert false : "Entry should not become obsolete while holding lock."; |
| |
| e.printStackTrace(); |
| } |
| finally { |
| if (locked != null) |
| unlockEntries(locked, req.topologyVersion()); |
| |
| // Enqueue if necessary after locks release. |
| if (deleted != null) { |
| assert !deleted.isEmpty(); |
| assert ctx.deferredDelete() : this; |
| |
| for (IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion> e : deleted) |
| ctx.onDeferredDelete(e.get1(), e.get2()); |
| } |
| |
| // TODO handle failure: probably drop the node from topology |
| // TODO fire events only after successful fsync |
| if (ctx.shared().wal() != null) |
| ctx.shared().wal().flush(null, false); |
| } |
| } |
| catch (GridDhtInvalidPartitionException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Caught invalid partition exception for cache entry (will remap update request): " + req); |
| |
| res.remapTopologyVersion(ctx.topology().lastTopologyChangeVersion()); |
| } |
| catch (Throwable e) { |
| // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is |
| // an attempt to use cleaned resources. |
| U.error(log, "Unexpected exception during cache update", e); |
| |
| res.addFailedKeys(req.keys(), e); |
| |
| completionCb.apply(req, res); |
| |
| if (e instanceof Error) |
| throw (Error)e; |
| |
| return; |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| |
| if (res.remapTopologyVersion() != null) { |
| assert dhtFut == null; |
| |
| completionCb.apply(req, res); |
| } |
| else { |
| if (dhtFut != null) |
| dhtFut.map(node, res.returnValue(), res, completionCb); |
| } |
| |
| if (req.writeSynchronizationMode() != FULL_ASYNC) |
| req.cleanup(!node.isLocal()); |
| |
| sendTtlUpdateRequest(expiry); |
| } |
| |
| /** |
| * @param node Node. |
| * @param locked Entries. |
| * @param req Request. |
| * @param res Response. |
| * @param dhtUpdRes DHT update result |
| * @param taskName Task name. |
| * @return Operation result. |
| * @throws GridCacheEntryRemovedException If got obsolete entry. |
| */ |
| private DhtAtomicUpdateResult update( |
| ClusterNode node, |
| List<GridDhtCacheEntry> locked, |
| GridNearAtomicAbstractUpdateRequest req, |
| GridNearAtomicUpdateResponse res, |
| DhtAtomicUpdateResult dhtUpdRes, |
| String taskName |
| ) throws GridCacheEntryRemovedException { |
| GridDhtPartitionTopology top = topology(); |
| |
| boolean hasNear = req.nearCache(); |
| |
| // Assign next version for update inside entries lock. |
| GridCacheVersion ver = dhtUpdRes.dhtFuture() != null /*retry*/ ? dhtUpdRes.dhtFuture().writeVer : nextVersion(); |
| |
| if (hasNear) |
| res.nearVersion(ver); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Assigned update version [futId=" + req.futureId() + |
| ", writeVer=" + ver + ']'); |
| } |
| |
| assert ver != null : "Got null version for update request: " + req; |
| |
| boolean sndPrevVal = !top.rebalanceFinished(req.topologyVersion()); |
| |
| if (dhtUpdRes.dhtFuture() == null) |
| dhtUpdRes.dhtFuture(createDhtFuture(ver, req)); |
| |
| IgniteCacheExpiryPolicy expiry = expiryPolicy(req.expiry()); |
| |
| GridCacheReturn retVal = null; |
| |
| if (req.size() > 1 && // Several keys ... |
| writeThrough() && !req.skipStore() && // and store is enabled ... |
| !ctx.store().isLocal() && // and this is not local store ... |
| // (conflict resolver should be used for local store) |
| !ctx.dr().receiveEnabled() // and no DR. |
| ) { |
| // This method can only be used when there are no replicated entries in the batch. |
| updateWithBatch(node, |
| hasNear, |
| req, |
| res, |
| locked, |
| ver, |
| ctx.isDrEnabled(), |
| taskName, |
| expiry, |
| sndPrevVal, |
| dhtUpdRes); |
| |
| if (req.operation() == TRANSFORM) |
| retVal = dhtUpdRes.returnValue(); |
| } |
| else { |
| updateSingle(node, |
| hasNear, |
| req, |
| res, |
| locked, |
| ver, |
| ctx.isDrEnabled(), |
| taskName, |
| expiry, |
| sndPrevVal, |
| dhtUpdRes); |
| |
| retVal = dhtUpdRes.returnValue(); |
| } |
| |
| GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture(); |
| |
| if (retVal == null) |
| retVal = new GridCacheReturn(ctx, node.isLocal(), true, null, null, true); |
| |
| res.returnValue(retVal); |
| |
| if (dhtFut != null) { |
| if (req.writeSynchronizationMode() == PRIMARY_SYNC |
| // To avoid deadlock disable back-pressure for sender data node. |
| && !ctx.discovery().cacheGroupAffinityNode(node, ctx.groupId()) |
| && !dhtFut.isDone()) { |
| final IgniteRunnable tracker = GridNioBackPressureControl.threadTracker(); |
| |
| if (tracker instanceof GridNioMessageTracker) { |
| ((GridNioMessageTracker)tracker).onMessageReceived(); |
| |
| dhtFut.listen(new IgniteInClosure<IgniteInternalFuture<Void>>() { |
| @Override public void apply(IgniteInternalFuture<Void> fut) { |
| ((GridNioMessageTracker)tracker).onMessageProcessed(); |
| } |
| }); |
| } |
| } |
| |
| ctx.mvcc().addAtomicFuture(dhtFut.id(), dhtFut); |
| } |
| |
| dhtUpdRes.expiryPolicy(expiry); |
| |
| return dhtUpdRes; |
| } |
| |
| /** |
| * Updates locked entries using batched write-through. |
| * |
| * @param node Sender node. |
| * @param hasNear {@code True} if originating node has near cache. |
| * @param req Update request. |
| * @param res Update response. |
| * @param locked Locked entries. |
| * @param ver Assigned version. |
| * @param replicate Whether replication is enabled. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param sndPrevVal If {@code true} sends previous value to backups. |
| * @param dhtUpdRes DHT update result. |
| * @throws GridCacheEntryRemovedException Should not be thrown. |
| */ |
| @SuppressWarnings("unchecked") |
| private void updateWithBatch( |
| final ClusterNode node, |
| final boolean hasNear, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final GridNearAtomicUpdateResponse res, |
| final List<GridDhtCacheEntry> locked, |
| final GridCacheVersion ver, |
| final boolean replicate, |
| final String taskName, |
| @Nullable final IgniteCacheExpiryPolicy expiry, |
| final boolean sndPrevVal, |
| final DhtAtomicUpdateResult dhtUpdRes |
| ) throws GridCacheEntryRemovedException { |
| assert !ctx.dr().receiveEnabled(); // Cannot update in batches during DR due to possible conflicts. |
| assert !req.returnValue() || req.operation() == TRANSFORM; // Should not request return values for putAll. |
| |
| if (!F.isEmpty(req.filter()) && ctx.loadPreviousValue()) { |
| try { |
| reloadIfNeeded(locked); |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKeys(req.keys(), e); |
| |
| return; |
| } |
| } |
| |
| int size = req.size(); |
| |
| Map<KeyCacheObject, CacheObject> putMap = null; |
| |
| Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcMap = null; |
| |
| Collection<KeyCacheObject> rmvKeys = null; |
| |
| List<CacheObject> writeVals = null; |
| |
| List<GridDhtCacheEntry> filtered = new ArrayList<>(size); |
| |
| GridCacheOperation op = req.operation(); |
| |
| GridCacheReturn invokeRes = null; |
| |
| int firstEntryIdx = 0; |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| for (int i = dhtUpdRes.processedEntriesCount(); i < locked.size(); i++) { |
| GridDhtCacheEntry entry = locked.get(i); |
| |
| try { |
| if (!checkFilter(entry, req, res)) { |
| if (expiry != null && entry.hasValue()) { |
| long ttl = expiry.forAccess(); |
| |
| if (ttl != CU.TTL_NOT_CHANGED) { |
| entry.updateTtl(null, ttl); |
| |
| expiry.ttlUpdated(entry.key(), |
| entry.version(), |
| entry.readers()); |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Entry did not pass the filter (will skip write) [entry=" + entry + |
| ", filter=" + Arrays.toString(req.filter()) + ", res=" + res + ']'); |
| |
| if (hasNear) |
| res.addSkippedIndex(i); |
| |
| firstEntryIdx++; |
| |
| continue; |
| } |
| |
| if (op == TRANSFORM) { |
| EntryProcessor<Object, Object, Object> entryProc = req.entryProcessor(i); |
| |
| CacheObject old = entry.innerGet( |
| ver, |
| null, |
| /*read through*/true, |
| /*metrics*/true, |
| /*event*/true, |
| entryProc, |
| taskName, |
| null, |
| req.keepBinary()); |
| |
| Object oldVal = null; |
| Object updatedVal = null; |
| |
| CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry(entry.key(), old, |
| entry.version(), req.keepBinary(), entry); |
| |
| CacheObject updated = null; |
| |
| if (invokeRes == null) |
| invokeRes = new GridCacheReturn(node.isLocal()); |
| |
| CacheInvokeResult curInvokeRes = null; |
| |
| boolean validation = false; |
| |
| IgniteThread.onEntryProcessorEntered(true); |
| |
| try { |
| Object computed = entryProc.process(invokeEntry, req.invokeArguments()); |
| |
| if (computed != null) { |
| computed = ctx.unwrapTemporary(computed); |
| |
| curInvokeRes = CacheInvokeResult.fromResult(computed); |
| } |
| |
| if (!invokeEntry.modified()) { |
| if (ctx.statisticsEnabled()) |
| ctx.cache().metrics0().onReadOnlyInvoke(old != null); |
| |
| continue; |
| } |
| else { |
| updatedVal = ctx.unwrapTemporary(invokeEntry.getValue()); |
| |
| updated = ctx.toCacheObject(updatedVal); |
| |
| validation = true; |
| |
| if (updated != null) |
| ctx.validateKeyAndValue(entry.key(), updated); |
| } |
| } |
| catch (UnregisteredClassException | UnregisteredBinaryTypeException e) { |
| throw e; |
| } |
| catch (Exception e) { |
| curInvokeRes = CacheInvokeResult.fromError(e); |
| |
| updated = old; |
| |
| if (validation) { |
| res.addSkippedIndex(i); |
| |
| continue; |
| } |
| } |
| finally { |
| IgniteThread.onEntryProcessorLeft(); |
| |
| if (curInvokeRes != null) { |
| invokeRes.addEntryProcessResult(ctx, entry.key(), invokeEntry.key(), curInvokeRes.result(), |
| curInvokeRes.error(), req.keepBinary()); |
| } |
| } |
| |
| if (updated == null) { |
| if (intercept) { |
| CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal, req.keepBinary()); |
| |
| IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor().onBeforeRemove(e); |
| |
| if (ctx.cancelRemove(interceptorRes)) |
| continue; |
| } |
| |
| // Update previous batch. |
| if (putMap != null) { |
| updatePartialBatch( |
| hasNear, |
| firstEntryIdx, |
| filtered, |
| ver, |
| node, |
| writeVals, |
| putMap, |
| null, |
| entryProcMap, |
| req, |
| res, |
| replicate, |
| dhtUpdRes, |
| taskName, |
| expiry, |
| sndPrevVal); |
| |
| firstEntryIdx = i; |
| |
| putMap = null; |
| writeVals = null; |
| entryProcMap = null; |
| |
| filtered = new ArrayList<>(); |
| } |
| |
| // Start collecting new batch. |
| if (rmvKeys == null) |
| rmvKeys = new ArrayList<>(size); |
| |
| rmvKeys.add(entry.key()); |
| } |
| else { |
| if (intercept) { |
| CacheLazyEntry e = new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), old, oldVal, req.keepBinary()); |
| |
| Object val = ctx.config().getInterceptor().onBeforePut(e, updatedVal); |
| |
| if (val == null) |
| continue; |
| |
| updated = ctx.toCacheObject(ctx.unwrapTemporary(val)); |
| } |
| |
| // Update previous batch. |
| if (rmvKeys != null) { |
| updatePartialBatch( |
| hasNear, |
| firstEntryIdx, |
| filtered, |
| ver, |
| node, |
| null, |
| null, |
| rmvKeys, |
| entryProcMap, |
| req, |
| res, |
| replicate, |
| dhtUpdRes, |
| taskName, |
| expiry, |
| sndPrevVal); |
| |
| firstEntryIdx = i; |
| |
| rmvKeys = null; |
| entryProcMap = null; |
| |
| filtered = new ArrayList<>(); |
| } |
| |
| if (putMap == null) { |
| putMap = new LinkedHashMap<>(size, 1.0f); |
| writeVals = new ArrayList<>(size); |
| } |
| |
| putMap.put(entry.key(), updated); |
| writeVals.add(updated); |
| } |
| |
| if (entryProcMap == null) |
| entryProcMap = new HashMap<>(); |
| |
| entryProcMap.put(entry.key(), entryProc); |
| } |
| else if (op == UPDATE) { |
| CacheObject updated = req.value(i); |
| |
| if (intercept) { |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read through*/ctx.loadPreviousValue(), |
| /*metrics*/true, |
| /*event*/true, |
| null, |
| taskName, |
| null, |
| req.keepBinary()); |
| |
| Object val = ctx.config().getInterceptor().onBeforePut( |
| new CacheLazyEntry( |
| ctx, |
| entry.key(), |
| old, |
| req.keepBinary()), |
| ctx.unwrapBinaryIfNeeded( |
| updated, |
| req.keepBinary(), |
| false, |
| null)); |
| |
| if (val == null) |
| continue; |
| |
| updated = ctx.toCacheObject(ctx.unwrapTemporary(val)); |
| } |
| |
| assert updated != null; |
| |
| ctx.validateKeyAndValue(entry.key(), updated); |
| |
| if (putMap == null) { |
| putMap = new LinkedHashMap<>(size, 1.0f); |
| writeVals = new ArrayList<>(size); |
| } |
| |
| putMap.put(entry.key(), updated); |
| writeVals.add(updated); |
| } |
| else { |
| assert op == DELETE; |
| |
| if (intercept) { |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read through*/ctx.loadPreviousValue(), |
| /*metrics*/true, |
| /*event*/true, |
| null, |
| taskName, |
| null, |
| req.keepBinary()); |
| |
| IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() |
| .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, req.keepBinary())); |
| |
| if (ctx.cancelRemove(interceptorRes)) |
| continue; |
| } |
| |
| if (rmvKeys == null) |
| rmvKeys = new ArrayList<>(size); |
| |
| rmvKeys.add(entry.key()); |
| } |
| |
| filtered.add(entry); |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(entry.key(), e); |
| } |
| } |
| |
| // Store final batch. |
| if (putMap != null || rmvKeys != null) { |
| updatePartialBatch( |
| hasNear, |
| firstEntryIdx, |
| filtered, |
| ver, |
| node, |
| writeVals, |
| putMap, |
| rmvKeys, |
| entryProcMap, |
| req, |
| res, |
| replicate, |
| dhtUpdRes, |
| taskName, |
| expiry, |
| sndPrevVal); |
| } |
| else |
| assert filtered.isEmpty(); |
| |
| dhtUpdRes.returnValue(invokeRes); |
| } |
| |
| /** |
| * @param entries Entries. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void reloadIfNeeded(final List<GridDhtCacheEntry> entries) throws IgniteCheckedException { |
| Map<KeyCacheObject, Integer> needReload = null; |
| |
| for (int i = 0; i < entries.size(); i++) { |
| GridDhtCacheEntry entry = entries.get(i); |
| |
| if (entry == null) |
| continue; |
| |
| CacheObject val = entry.rawGet(); |
| |
| if (val == null) { |
| if (needReload == null) |
| needReload = new HashMap<>(entries.size(), 1.0f); |
| |
| needReload.put(entry.key(), i); |
| } |
| } |
| |
| if (needReload != null) { |
| final Map<KeyCacheObject, Integer> idxMap = needReload; |
| |
| ctx.store().loadAll(null, needReload.keySet(), new CI2<KeyCacheObject, Object>() { |
| @Override public void apply(KeyCacheObject k, Object v) { |
| Integer idx = idxMap.get(k); |
| |
| if (idx != null) { |
| GridDhtCacheEntry entry = entries.get(idx); |
| |
| try { |
| GridCacheVersion ver = entry.version(); |
| |
| entry.versionedValue(ctx.toCacheObject(v), null, ver, null, null); |
| } |
| catch (GridCacheEntryRemovedException e) { |
| assert false : "Entry should not get obsolete while holding lock [entry=" + entry + |
| ", e=" + e + ']'; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Updates locked entries one-by-one. |
| * |
| * @param nearNode Originating node. |
| * @param hasNear {@code True} if originating node has near cache. |
| * @param req Update request. |
| * @param res Update response. |
| * @param locked Locked entries. |
| * @param ver Assigned update version. |
| * @param replicate Whether DR is enabled for that cache. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param sndPrevVal If {@code true} sends previous value to backups. |
| * @param dhtUpdRes Dht update result |
| * @throws GridCacheEntryRemovedException Should be never thrown. |
| */ |
| private void updateSingle( |
| ClusterNode nearNode, |
| boolean hasNear, |
| GridNearAtomicAbstractUpdateRequest req, |
| GridNearAtomicUpdateResponse res, |
| List<GridDhtCacheEntry> locked, |
| GridCacheVersion ver, |
| boolean replicate, |
| String taskName, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean sndPrevVal, |
| DhtAtomicUpdateResult dhtUpdRes |
| ) throws GridCacheEntryRemovedException { |
| GridCacheReturn retVal = dhtUpdRes.returnValue(); |
| GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture(); |
| Collection<IgniteBiTuple<GridDhtCacheEntry, GridCacheVersion>> deleted = dhtUpdRes.deleted(); |
| |
| AffinityTopologyVersion topVer = req.topologyVersion(); |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); |
| |
| // Avoid iterator creation. |
| for (int i = dhtUpdRes.processedEntriesCount(); i < req.size(); i++) { |
| KeyCacheObject k = req.key(i); |
| |
| GridCacheOperation op = req.operation(); |
| |
| // We are holding java-level locks on entries at this point. |
| // No GridCacheEntryRemovedException can be thrown. |
| try { |
| GridDhtCacheEntry entry = locked.get(i); |
| |
| GridCacheVersion newConflictVer = req.conflictVersion(i); |
| long newConflictTtl = req.conflictTtl(i); |
| long newConflictExpireTime = req.conflictExpireTime(i); |
| |
| assert !(newConflictVer instanceof GridCacheVersionEx) : newConflictVer; |
| |
| Object writeVal = op == TRANSFORM ? req.entryProcessor(i) : req.writeValue(i); |
| |
| boolean readRepairRecovery = op == TRANSFORM && req.entryProcessor(i) instanceof AtomicReadRepairEntryProcessor; |
| |
| // Get readers before innerUpdate (reader cleared after remove). |
| GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); |
| |
| GridCacheUpdateAtomicResult updRes = entry.innerUpdate( |
| ver, |
| nearNode.id(), |
| locNodeId, |
| op, |
| writeVal, |
| req.invokeArguments(), |
| writeThrough() && !req.skipStore(), |
| !req.skipStore(), |
| sndPrevVal || req.returnValue(), |
| req.keepBinary(), |
| expiry, |
| /*event*/true, |
| /*metrics*/true, |
| /*primary*/true, |
| /*verCheck*/false, |
| readRepairRecovery, |
| topVer, |
| req.filter(), |
| replicate ? DR_PRIMARY : DR_NONE, |
| newConflictTtl, |
| newConflictExpireTime, |
| newConflictVer, |
| /*conflictResolve*/true, |
| intercept, |
| taskName, |
| /*prevVal*/null, |
| /*updateCntr*/null, |
| dhtFut, |
| false); |
| |
| if (dhtFut != null) { |
| if (updRes.sendToDht()) { // Send to backups even in case of remove-remove scenarios. |
| GridCacheVersionConflictContext<?, ?> conflictCtx = updRes.conflictResolveResult(); |
| |
| if (conflictCtx == null) |
| newConflictVer = null; |
| else if (conflictCtx.isMerge()) |
| newConflictVer = null; // Conflict version is discarded in case of merge. |
| |
| EntryProcessor<Object, Object, Object> entryProc = null; |
| |
| dhtFut.addWriteEntry( |
| affAssignment, |
| entry, |
| updRes.newValue(), |
| entryProc, |
| updRes.newTtl(), |
| updRes.conflictExpireTime(), |
| newConflictVer, |
| sndPrevVal, |
| updRes.oldValue(), |
| updRes.updateCounter(), |
| op, |
| readRepairRecovery); |
| |
| if (readers != null) |
| dhtFut.addNearWriteEntries( |
| nearNode, |
| readers, |
| entry, |
| updRes.newValue(), |
| entryProc, |
| updRes.newTtl(), |
| updRes.conflictExpireTime(), |
| readRepairRecovery); |
| } |
| else { |
| if (log.isDebugEnabled()) |
| log.debug("Entry did not pass the filter or conflict resolution (will skip write) " + |
| "[entry=" + entry + ", filter=" + Arrays.toString(req.filter()) + ']'); |
| } |
| } |
| |
| if (hasNear) { |
| if (updRes.sendToDht()) { |
| if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { |
| // If put the same value as in request then do not need to send it back. |
| if (op == TRANSFORM || writeVal != updRes.newValue()) { |
| res.addNearValue(i, |
| updRes.newValue(), |
| updRes.newTtl(), |
| updRes.conflictExpireTime()); |
| } |
| else |
| res.addNearTtl(i, updRes.newTtl(), updRes.conflictExpireTime()); |
| |
| if (updRes.newValue() != null) { |
| IgniteInternalFuture<Boolean> f = |
| entry.addReader(nearNode.id(), req.messageId(), topVer); |
| |
| assert f == null : f; |
| } |
| } |
| else if (GridDhtCacheEntry.ReaderId.contains(readers, nearNode.id())) { |
| // Reader became primary or backup. |
| entry.removeReader(nearNode.id(), req.messageId()); |
| } |
| else |
| res.addSkippedIndex(i); |
| } |
| else |
| res.addSkippedIndex(i); |
| } |
| |
| if (updRes.removeVersion() != null) { |
| if (deleted == null) |
| deleted = new ArrayList<>(req.size()); |
| |
| deleted.add(F.t(entry, updRes.removeVersion())); |
| } |
| |
| if (op == TRANSFORM) { |
| assert !req.returnValue(); |
| |
| IgniteBiTuple<Object, Exception> compRes = updRes.computedResult(); |
| |
| if (compRes != null && (compRes.get1() != null || compRes.get2() != null)) { |
| if (retVal == null) |
| retVal = new GridCacheReturn(nearNode.isLocal()); |
| |
| retVal.addEntryProcessResult(ctx, |
| k, |
| null, |
| compRes.get1(), |
| compRes.get2(), |
| req.keepBinary()); |
| } |
| } |
| else { |
| // Create only once. |
| if (retVal == null) { |
| CacheObject ret = updRes.oldValue(); |
| |
| retVal = new GridCacheReturn(ctx, |
| nearNode.isLocal(), |
| req.keepBinary(), |
| U.deploymentClassLoader(ctx.kernalContext(), U.contextDeploymentClassLoaderId(ctx.kernalContext())), |
| req.returnValue() ? ret : null, |
| updRes.success()); |
| } |
| } |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(k, e); |
| } |
| |
| dhtUpdRes.processedEntriesCount(i + 1); |
| } |
| |
| dhtUpdRes.returnValue(retVal); |
| dhtUpdRes.deleted(deleted); |
| dhtUpdRes.dhtFuture(dhtFut); |
| } |
| |
| /** |
| * @param hasNear {@code True} if originating node has near cache. |
| * @param firstEntryIdx Index of the first entry in the request keys collection. |
| * @param entries Entries to update. |
| * @param ver Version to set. |
| * @param nearNode Originating node. |
| * @param writeVals Write values. |
| * @param putMap Values to put. |
| * @param rmvKeys Keys to remove. |
| * @param entryProcessorMap Entry processors. |
| * @param req Request. |
| * @param res Response. |
| * @param replicate Whether replication is enabled. |
| * @param dhtUpdRes Batch update result. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param sndPrevVal If {@code true} sends previous value to backups. |
| */ |
| @Nullable private void updatePartialBatch( |
| final boolean hasNear, |
| final int firstEntryIdx, |
| final List<GridDhtCacheEntry> entries, |
| final GridCacheVersion ver, |
| final ClusterNode nearNode, |
| @Nullable final List<CacheObject> writeVals, |
| @Nullable final Map<KeyCacheObject, CacheObject> putMap, |
| @Nullable final Collection<KeyCacheObject> rmvKeys, |
| @Nullable final Map<KeyCacheObject, EntryProcessor<Object, Object, Object>> entryProcessorMap, |
| final GridNearAtomicAbstractUpdateRequest req, |
| final GridNearAtomicUpdateResponse res, |
| final boolean replicate, |
| final DhtAtomicUpdateResult dhtUpdRes, |
| final String taskName, |
| @Nullable final IgniteCacheExpiryPolicy expiry, |
| final boolean sndPrevVal |
| ) { |
| assert putMap == null ^ rmvKeys == null; |
| |
| assert req.conflictVersions() == null : "Cannot be called when there are conflict entries in the batch."; |
| |
| AffinityTopologyVersion topVer = req.topologyVersion(); |
| |
| CacheStorePartialUpdateException storeErr = null; |
| |
| try { |
| GridCacheOperation op; |
| |
| if (putMap != null) { |
| try { |
| Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap, |
| new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() { |
| @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) { |
| return F.t(val, ver); |
| } |
| }); |
| |
| ctx.store().putAll(null, view); |
| } |
| catch (CacheStorePartialUpdateException e) { |
| storeErr = e; |
| } |
| |
| op = UPDATE; |
| } |
| else { |
| try { |
| ctx.store().removeAll(null, rmvKeys); |
| } |
| catch (CacheStorePartialUpdateException e) { |
| storeErr = e; |
| } |
| |
| op = DELETE; |
| } |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| AffinityAssignment affAssignment = ctx.affinity().assignment(topVer); |
| |
| final GridDhtAtomicAbstractUpdateFuture dhtFut = dhtUpdRes.dhtFuture(); |
| |
| Collection<Object> failedToUnwrapKeys = null; |
| |
| // Avoid iterator creation. |
| for (int i = 0; i < entries.size(); i++) { |
| GridDhtCacheEntry entry = entries.get(i); |
| |
| assert entry.lockedByCurrentThread(); |
| |
| if (entry.obsolete()) { |
| assert req.operation() == DELETE : "Entry can become obsolete only after remove: " + entry; |
| |
| continue; |
| } |
| |
| if (storeErr != null) { |
| Object key = entry.key(); |
| |
| try { |
| key = entry.key().value(ctx.cacheObjectContext(), false); |
| } |
| catch (BinaryInvalidTypeException e) { |
| if (log.isDebugEnabled()) { |
| if (failedToUnwrapKeys == null) |
| failedToUnwrapKeys = new ArrayList<>(); |
| |
| // To limit keys count in log message. |
| if (failedToUnwrapKeys.size() < 5) |
| failedToUnwrapKeys.add(key); |
| } |
| } |
| |
| if (storeErr.failedKeys().contains(key)) |
| continue; |
| } |
| |
| try { |
| // We are holding java-level locks on entries at this point. |
| CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null; |
| |
| assert writeVal != null || op == DELETE : "null write value found."; |
| |
| // Get readers before innerUpdate (reader cleared after remove). |
| GridDhtCacheEntry.ReaderId[] readers = entry.readersLocked(); |
| |
| EntryProcessor<Object, Object, Object> entryProc = |
| entryProcessorMap == null ? null : entryProcessorMap.get(entry.key()); |
| |
| boolean readRepairRecovery = op == TRANSFORM && req.entryProcessor(i) instanceof AtomicReadRepairEntryProcessor; |
| |
| GridCacheUpdateAtomicResult updRes = entry.innerUpdate( |
| ver, |
| nearNode.id(), |
| locNodeId, |
| op, |
| writeVal, |
| null, |
| /*write-through*/false, |
| /*read-through*/false, |
| /*retval*/sndPrevVal, |
| req.keepBinary(), |
| expiry, |
| /*event*/true, |
| /*metrics*/true, |
| /*primary*/true, |
| /*verCheck*/false, |
| readRepairRecovery, |
| topVer, |
| null, |
| replicate ? DR_PRIMARY : DR_NONE, |
| CU.TTL_NOT_CHANGED, |
| CU.EXPIRE_TIME_CALCULATE, |
| null, |
| /*conflict resolve*/false, |
| /*intercept*/false, |
| taskName, |
| null, |
| null, |
| dhtFut, |
| entryProc != null); |
| |
| assert !updRes.success() || updRes.newTtl() == CU.TTL_NOT_CHANGED || expiry != null : |
| "success=" + updRes.success() + ", newTtl=" + updRes.newTtl() + ", expiry=" + expiry; |
| |
| if (intercept) { |
| if (op == UPDATE) { |
| ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry( |
| ctx, |
| entry.key(), |
| updRes.newValue(), |
| req.keepBinary())); |
| } |
| else { |
| assert op == DELETE : op; |
| |
| // Old value should be already loaded for 'CacheInterceptor.onBeforeRemove'. |
| ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), |
| updRes.oldValue(), req.keepBinary())); |
| } |
| } |
| |
| dhtUpdRes.addDeleted(entry, updRes, entries); |
| |
| if (dhtFut != null) { |
| dhtFut.addWriteEntry( |
| affAssignment, |
| entry, |
| writeVal, |
| entryProc, |
| updRes.newTtl(), |
| CU.EXPIRE_TIME_CALCULATE, |
| null, |
| sndPrevVal, |
| updRes.oldValue(), |
| updRes.updateCounter(), |
| op, |
| readRepairRecovery); |
| |
| if (readers != null) |
| dhtFut.addNearWriteEntries( |
| nearNode, |
| readers, |
| entry, |
| writeVal, |
| entryProc, |
| updRes.newTtl(), |
| CU.EXPIRE_TIME_CALCULATE, |
| readRepairRecovery); |
| } |
| |
| if (hasNear) { |
| if (!ctx.affinity().partitionBelongs(nearNode, entry.partition(), topVer)) { |
| int idx = firstEntryIdx + i; |
| |
| if (req.operation() == TRANSFORM) { |
| res.addNearValue(idx, |
| writeVal, |
| updRes.newTtl(), |
| CU.EXPIRE_TIME_CALCULATE); |
| } |
| else |
| res.addNearTtl(idx, updRes.newTtl(), CU.EXPIRE_TIME_CALCULATE); |
| |
| if (writeVal != null || entry.hasValue()) { |
| IgniteInternalFuture<Boolean> f = entry.addReader(nearNode.id(), req.messageId(), topVer); |
| |
| assert f == null : f; |
| } |
| } |
| else if (GridDhtCacheEntry.ReaderId.contains(readers, nearNode.id())) { |
| // Reader became primary or backup. |
| entry.removeReader(nearNode.id(), req.messageId()); |
| } |
| else |
| res.addSkippedIndex(firstEntryIdx + i); |
| } |
| } |
| catch (GridCacheEntryRemovedException e) { |
| assert false : "Entry cannot become obsolete while holding lock."; |
| |
| e.printStackTrace(); |
| } |
| |
| dhtUpdRes.processedEntriesCount(firstEntryIdx + i + 1); |
| } |
| |
| if (failedToUnwrapKeys != null) { |
| log.warning("Failed to get values of keys: " + failedToUnwrapKeys + |
| " (the binary objects will be used instead)."); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKeys(putMap != null ? putMap.keySet() : rmvKeys, e); |
| } |
| |
| if (storeErr != null) { |
| ArrayList<KeyCacheObject> failed = new ArrayList<>(storeErr.failedKeys().size()); |
| |
| for (Object failedKey : storeErr.failedKeys()) |
| failed.add(ctx.toCacheKeyObject(failedKey)); |
| |
| res.addFailedKeys(failed, storeErr.getCause()); |
| } |
| } |
| |
| /** |
| * Acquires java-level locks on cache entries. Returns collection of locked entries. |
| * |
| * @param req Request with keys to lock. |
| * @param topVer Topology version to lock on. |
| * @return Collection of locked entries. |
| * @throws GridDhtInvalidPartitionException If entry does not belong to local node. If exception is thrown, locks |
| * are released. |
| */ |
| private List<GridDhtCacheEntry> lockEntries(GridNearAtomicAbstractUpdateRequest req, AffinityTopologyVersion topVer) |
| throws GridDhtInvalidPartitionException { |
| if (req.size() == 1) { |
| KeyCacheObject key = req.key(0); |
| |
| while (true) { |
| GridDhtCacheEntry entry = entryExx(key, topVer); |
| |
| entry.lockEntry(); |
| |
| if (entry.obsolete()) |
| entry.unlockEntry(); |
| else |
| return Collections.singletonList(entry); |
| } |
| } |
| else { |
| GridDhtCacheEntry[] locked = new GridDhtCacheEntry[req.size()]; |
| |
| while (true) { |
| for (int i = 0; i < req.size(); i++) { |
| GridDhtCacheEntry entry = entryExx(req.key(i), topVer); |
| |
| locked[i] = entry; |
| } |
| |
| if (lockedEntriesInfo.tryLockEntries(locked)) |
| return Arrays.asList(locked); |
| } |
| } |
| } |
| |
| /** |
| * Releases java-level locks on cache entries. |
| * |
| * @param locked Locked entries. |
| * @param topVer Topology version. |
| */ |
| private void unlockEntries(List<GridDhtCacheEntry> locked, AffinityTopologyVersion topVer) { |
| // Process deleted entries before locks release. |
| assert ctx.deferredDelete() : this; |
| |
| // Entries to skip eviction manager notification for. |
| // Enqueue entries while holding locks. |
| Collection<KeyCacheObject> skip = null; |
| |
| int size = locked.size(); |
| |
| try { |
| for (int i = 0; i < size; i++) { |
| GridCacheMapEntry entry = locked.get(i); |
| if (entry != null && entry.deleted()) { |
| if (skip == null) |
| skip = U.newHashSet(locked.size()); |
| |
| skip.add(entry.key()); |
| } |
| } |
| } |
| finally { |
| // At least RuntimeException can be thrown by the code above when GridCacheContext is cleaned and there is |
| // an attempt to use cleaned resources. |
| // That's why releasing locks in the finally block.. |
| for (int i = 0; i < size; i++) { |
| GridCacheMapEntry entry = locked.get(i); |
| if (entry != null) |
| entry.unlockEntry(); |
| } |
| } |
| |
| // Try evict partitions. |
| for (int i = 0; i < size; i++) { |
| GridDhtCacheEntry entry = locked.get(i); |
| if (entry != null) |
| entry.onUnlock(); |
| } |
| |
| if (skip != null && skip.size() == size) |
| // Optimization. |
| return; |
| |
| // Must touch all entries since update may have deleted entries. |
| // Eviction manager will remove empty entries. |
| for (int i = 0; i < size; i++) { |
| GridCacheMapEntry entry = locked.get(i); |
| if (entry != null && (skip == null || !skip.contains(entry.key()))) |
| entry.touch(); |
| } |
| } |
| |
| /** |
| * @param entry Entry to check. |
| * @param req Update request. |
| * @param res Update response. If filter evaluation failed, key will be added to failed keys and method will return |
| * false. |
| * @return {@code True} if filter evaluation succeeded. |
| */ |
| private boolean checkFilter(GridCacheEntryEx entry, GridNearAtomicAbstractUpdateRequest req, |
| GridNearAtomicUpdateResponse res) { |
| try { |
| return ctx.isAllLocked(entry, req.filter()); |
| } |
| catch (IgniteCheckedException e) { |
| res.addFailedKey(entry.key(), e); |
| |
| return false; |
| } |
| } |
| |
| /** |
| * @param req Request to remap. |
| */ |
| void remapToNewPrimary(GridNearAtomicAbstractUpdateRequest req) { |
| assert req.writeSynchronizationMode() == FULL_ASYNC : req; |
| |
| if (log.isDebugEnabled()) |
| log.debug("Remapping near update request locally: " + req); |
| |
| Collection<?> vals; |
| Collection<GridCacheDrInfo> drPutVals; |
| Collection<GridCacheVersion> drRmvVals; |
| |
| if (req.conflictVersions() == null) { |
| vals = req.values(); |
| |
| drPutVals = null; |
| drRmvVals = null; |
| } |
| else if (req.operation() == UPDATE) { |
| int size = req.keys().size(); |
| |
| drPutVals = new ArrayList<>(size); |
| |
| for (int i = 0; i < size; i++) { |
| long ttl = req.conflictTtl(i); |
| |
| if (ttl == CU.TTL_NOT_CHANGED) |
| drPutVals.add(new GridCacheDrInfo(req.value(i), req.conflictVersion(i))); |
| else |
| drPutVals.add(new GridCacheDrExpirationInfo(req.value(i), req.conflictVersion(i), ttl, |
| req.conflictExpireTime(i))); |
| } |
| |
| vals = null; |
| drRmvVals = null; |
| } |
| else { |
| assert req.operation() == DELETE : req; |
| |
| drRmvVals = req.conflictVersions(); |
| |
| vals = null; |
| drPutVals = null; |
| } |
| |
| GridNearAtomicUpdateFuture updateFut = new GridNearAtomicUpdateFuture( |
| ctx, |
| this, |
| ctx.config().getWriteSynchronizationMode(), |
| req.operation(), |
| req.keys(), |
| vals, |
| req.invokeArguments(), |
| drPutVals, |
| drRmvVals, |
| req.returnValue(), |
| req.expiry(), |
| req.filter(), |
| req.taskNameHash(), |
| req.skipStore(), |
| req.keepBinary(), |
| req.recovery(), |
| MAX_RETRIES); |
| |
| updateFut.map(); |
| } |
| |
| /** |
| * Creates backup update future if necessary. |
| * |
| * @param writeVer Write version. |
| * @param updateReq Update request. |
| * @return Backup update future. |
| */ |
| private GridDhtAtomicAbstractUpdateFuture createDhtFuture( |
| GridCacheVersion writeVer, |
| GridNearAtomicAbstractUpdateRequest updateReq |
| ) { |
| if (updateReq.size() == 1) |
| return new GridDhtAtomicSingleUpdateFuture(ctx, writeVer, updateReq); |
| else |
| return new GridDhtAtomicUpdateFuture(ctx, writeVer, updateReq); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param req Near atomic update request. |
| */ |
| private void processNearAtomicUpdateRequest(UUID nodeId, GridNearAtomicAbstractUpdateRequest req) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received near atomic update request [futId=" + req.futureId() + |
| ", node=" + nodeId + ']'); |
| } |
| |
| ClusterNode node = ctx.discovery().node(nodeId); |
| |
| if (node == null) { |
| U.warn(msgLog, "Skip near update request, node originated update request left [" + |
| "futId=" + req.futureId() + ", node=" + nodeId + ']'); |
| |
| return; |
| } |
| |
| updateAllAsyncInternal(node, req, updateReplyClos); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Near atomic update response. |
| */ |
| private void processNearAtomicUpdateResponse(UUID nodeId, GridNearAtomicUpdateResponse res) { |
| if (msgLog.isDebugEnabled()) |
| msgLog.debug("Received near atomic update response [futId" + res.futureId() + |
| ", node=" + nodeId + ']'); |
| |
| res.nodeId(ctx.localNodeId()); |
| |
| GridNearAtomicAbstractUpdateFuture fut = |
| (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); |
| |
| if (fut != null) |
| fut.onPrimaryResponse(nodeId, res, false); |
| else |
| U.warn(msgLog, "Failed to find near update future for update response (will ignore) " + |
| "[futId=" + res.futureId() + ", node=" + nodeId + ", res=" + res + ']'); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param checkReq Request. |
| */ |
| private void processCheckUpdateRequest(UUID nodeId, GridNearAtomicCheckUpdateRequest checkReq) { |
| /* |
| * Message is processed in the same stripe, so primary already processed update request. It is possible |
| * response was not sent if operation result was empty. Near node will get original response or this one. |
| */ |
| GridNearAtomicUpdateResponse res = new GridNearAtomicUpdateResponse(ctx.cacheId(), |
| nodeId, |
| checkReq.futureId(), |
| checkReq.partition(), |
| false, |
| false); |
| |
| GridCacheReturn ret = new GridCacheReturn(false, true); |
| |
| res.returnValue(ret); |
| |
| sendNearUpdateReply(nodeId, res); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param req Dht atomic update request. |
| */ |
| private void processDhtAtomicUpdateRequest(UUID nodeId, GridDhtAtomicAbstractUpdateRequest req) { |
| assert Thread.currentThread().getName().startsWith("sys-stripe-") : Thread.currentThread().getName(); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic update request [futId=" + req.futureId() + |
| ", writeVer=" + req.writeVersion() + ", node=" + nodeId + ']'); |
| } |
| |
| assert req.partition() >= 0 : req; |
| |
| GridCacheVersion ver = req.writeVersion(); |
| |
| ctx.versions().onReceived(nodeId, ver); |
| |
| GridDhtAtomicNearResponse nearRes = null; |
| |
| if (req.nearNodeId() != null) { |
| nearRes = new GridDhtAtomicNearResponse(ctx.cacheId(), |
| req.partition(), |
| req.nearFutureId(), |
| nodeId, |
| req.flags()); |
| } |
| |
| boolean replicate = ctx.isDrEnabled(); |
| |
| boolean intercept = req.forceTransformBackups() && ctx.config().getInterceptor() != null; |
| |
| boolean needTaskName = ctx.events().isRecordable(EVT_CACHE_OBJECT_READ) || |
| ctx.events().isRecordable(EVT_CACHE_OBJECT_PUT) || |
| ctx.events().isRecordable(EVT_CACHE_OBJECT_REMOVED); |
| |
| String taskName = needTaskName ? ctx.kernalContext().task().resolveTaskName(req.taskNameHash()) : null; |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| for (int i = 0; i < req.size(); i++) { |
| KeyCacheObject key = req.key(i); |
| |
| try { |
| while (true) { |
| GridDhtCacheEntry entry = null; |
| |
| try { |
| entry = entryExx(key); |
| |
| CacheObject val = req.value(i); |
| CacheObject prevVal = req.previousValue(i); |
| |
| EntryProcessor<Object, Object, Object> entryProc = req.entryProcessor(i); |
| Long updateIdx = req.updateCounter(i); |
| |
| GridCacheOperation op = entryProc != null ? TRANSFORM : |
| (val != null) ? UPDATE : DELETE; |
| |
| long ttl = req.ttl(i); |
| long expireTime = req.conflictExpireTime(i); |
| |
| GridCacheUpdateAtomicResult updRes = entry.innerUpdate( |
| ver, |
| nodeId, |
| nodeId, |
| op, |
| op == TRANSFORM ? entryProc : val, |
| op == TRANSFORM ? req.invokeArguments() : null, |
| /*write-through*/(ctx.store().isLocal() && !ctx.shared().localStorePrimaryOnly()) |
| && writeThrough() && !req.skipStore(), |
| /*read-through*/false, |
| /*retval*/false, |
| req.keepBinary(), |
| /*expiry policy*/null, |
| /*event*/true, |
| /*metrics*/true, |
| /*primary*/false, |
| /*check version*/!req.forceTransformBackups(), |
| req.readRepairRecovery(), |
| req.topologyVersion(), |
| CU.empty0(), |
| replicate ? DR_BACKUP : DR_NONE, |
| ttl, |
| expireTime, |
| req.conflictVersion(i), |
| false, |
| intercept, |
| taskName, |
| prevVal, |
| updateIdx, |
| null, |
| req.transformOperation()); |
| |
| if (updRes.removeVersion() != null) |
| ctx.onDeferredDelete(entry, updRes.removeVersion()); |
| |
| entry.onUnlock(); |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry while updating backup value (will retry): " + key); |
| |
| entry = null; |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| } |
| catch (NodeStoppingException e) { |
| U.warn(log, "Failed to update key on backup (local node is stopping): " + key); |
| |
| return; |
| } |
| catch (GridDhtInvalidPartitionException ignored) { |
| // Ignore. |
| } |
| catch (IgniteCheckedException | RuntimeException e) { |
| if (e instanceof RuntimeException && !X.hasCause(e, IgniteOutOfMemoryException.class)) |
| throw (RuntimeException)e; |
| |
| U.error(log, "Failed to update key on backup node: " + key, e); |
| |
| IgniteCheckedException err = |
| new IgniteCheckedException("Failed to update key on backup node: " + key, e); |
| |
| // Trigger failure handler to avoid data inconsistency. |
| ctx.kernalContext().failure().process(new FailureContext(FailureType.CRITICAL_ERROR, err)); |
| |
| if (nearRes != null) |
| nearRes.addFailedKey(key, err); |
| } |
| } |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| |
| GridDhtAtomicUpdateResponse dhtRes = null; |
| |
| if (req.nearSize() > 0 || req.obsoleteNearKeysSize() > 0) { |
| List<KeyCacheObject> nearEvicted = null; |
| |
| if (isNearEnabled(ctx)) |
| nearEvicted = ((GridNearAtomicCache<K, V>)near()).processDhtAtomicUpdateRequest(nodeId, req, nearRes); |
| else if (req.nearSize() > 0) { |
| nearEvicted = new ArrayList<>(req.nearSize()); |
| |
| for (int i = 0; i < req.nearSize(); i++) |
| nearEvicted.add(req.nearKey(i)); |
| } |
| |
| if (nearEvicted != null) { |
| dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), |
| req.partition(), |
| req.futureId(), |
| ctx.deploymentEnabled()); |
| |
| dhtRes.nearEvicted(nearEvicted); |
| } |
| } |
| |
| try { |
| // TODO handle failure: probably drop the node from topology |
| // TODO fire events only after successful fsync |
| if (ctx.shared().wal() != null) |
| ctx.shared().wal().flush(null, false); |
| } |
| catch (StorageException e) { |
| if (dhtRes != null) |
| dhtRes.onError(new IgniteCheckedException(e)); |
| |
| if (nearRes != null) |
| nearRes.onClassError(e); |
| } |
| catch (IgniteCheckedException e) { |
| if (dhtRes != null) |
| dhtRes.onError(e); |
| |
| if (nearRes != null) |
| nearRes.onClassError(e); |
| } |
| |
| if (nearRes != null) |
| sendDhtNearResponse(req, nearRes); |
| |
| if (dhtRes == null && req.replyWithoutDelay()) { |
| dhtRes = new GridDhtAtomicUpdateResponse(ctx.cacheId(), |
| req.partition(), |
| req.futureId(), |
| ctx.deploymentEnabled()); |
| } |
| |
| if (dhtRes != null) |
| sendDhtPrimaryResponse(nodeId, req, dhtRes); |
| else |
| sendDeferredUpdateResponse(req.partition(), nodeId, req.futureId()); |
| } |
| |
| /** |
| * @param nodeId Primary node ID. |
| * @param req Request. |
| * @param dhtRes Response to send. |
| */ |
| private void sendDhtPrimaryResponse(UUID nodeId, |
| GridDhtAtomicAbstractUpdateRequest req, |
| GridDhtAtomicUpdateResponse dhtRes) { |
| try { |
| ctx.io().send(nodeId, dhtRes, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Sent DHT response [futId=" + req.futureId() + |
| ", nearFutId=" + req.nearFutureId() + |
| ", writeVer=" + req.writeVersion() + |
| ", node=" + nodeId + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| U.warn(msgLog, "Failed to send DHT response, node left [futId=" + req.futureId() + |
| ", nearFutId=" + req.nearFutureId() + |
| ", node=" + nodeId + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + |
| ", nearFutId=" + req.nearFutureId() + |
| ", node=" + nodeId + |
| ", res=" + dhtRes + ']', e); |
| } |
| } |
| |
| /** |
| * @param part Partition. |
| * @param primaryId Primary ID. |
| * @param futId Future ID. |
| */ |
| private void sendDeferredUpdateResponse(int part, UUID primaryId, long futId) { |
| Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get(); |
| |
| GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId); |
| |
| if (msg == null) { |
| msg = new GridDhtAtomicDeferredUpdateResponse(ctx.cacheId(), |
| new GridLongList(DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE)); |
| |
| if (DEFERRED_UPDATE_RESPONSE_TIMEOUT > 0) { |
| GridTimeoutObject timeoutSnd = new DeferredUpdateTimeout(part, primaryId); |
| |
| msg.timeoutSender(timeoutSnd); |
| |
| ctx.time().addTimeoutObject(timeoutSnd); |
| } |
| |
| resMap.put(primaryId, msg); |
| } |
| |
| GridLongList futIds = msg.futureIds(); |
| |
| assert futIds.size() < DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE : futIds.size(); |
| |
| futIds.add(futId); |
| |
| if (futIds.size() >= DEFERRED_UPDATE_RESPONSE_BUFFER_SIZE) { |
| resMap.remove(primaryId); |
| |
| sendDeferredUpdateResponse(primaryId, msg); |
| } |
| } |
| |
| /** |
| * @param primaryId Primary ID. |
| * @param msg Message. |
| */ |
| private void sendDeferredUpdateResponse(UUID primaryId, GridDhtAtomicDeferredUpdateResponse msg) { |
| try { |
| GridTimeoutObject timeoutSnd = msg.timeoutSender(); |
| |
| if (timeoutSnd != null) |
| ctx.time().removeTimeoutObject(timeoutSnd); |
| |
| ctx.io().send(primaryId, msg, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Sent deferred DHT update response [futIds=" + msg.futureIds() + |
| ", node=" + primaryId + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to send deferred DHT update response, node left [" + |
| "futIds=" + msg.futureIds() + ", node=" + primaryId + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send deferredDHT update response to remote node [" + |
| "futIds=" + msg.futureIds() + ", node=" + primaryId + ']', e); |
| } |
| } |
| |
| /** |
| * @param req Request. |
| * @param nearRes Response to send. |
| */ |
| private void sendDhtNearResponse(final GridDhtAtomicAbstractUpdateRequest req, GridDhtAtomicNearResponse nearRes) { |
| try { |
| ClusterNode node = ctx.discovery().node(req.nearNodeId()); |
| |
| if (node == null) |
| throw new ClusterTopologyCheckedException("Node failed: " + req.nearNodeId()); |
| |
| if (node.isLocal()) |
| processDhtAtomicNearResponse(node.id(), nearRes); |
| else |
| ctx.io().send(node, nearRes, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Sent DHT near response [futId=" + req.futureId() + |
| ", nearFutId=" + req.nearFutureId() + |
| ", writeVer=" + req.writeVersion() + |
| ", node=" + req.nearNodeId() + ']'); |
| } |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to send DHT near response, node left [futId=" + req.futureId() + |
| ", nearFutId=" + req.nearFutureId() + |
| ", node=" + req.nearNodeId() + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(msgLog, "Failed to send DHT near response [futId=" + req.futureId() + |
| ", nearFutId=" + req.nearFutureId() + |
| ", node=" + req.nearNodeId() + |
| ", res=" + nearRes + ']', e); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| private void processDhtAtomicNearResponse(UUID nodeId, GridDhtAtomicNearResponse res) { |
| GridNearAtomicAbstractUpdateFuture updateFut = |
| (GridNearAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); |
| |
| if (updateFut != null) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic near response [futId=" + res.futureId() + |
| ", node=" + nodeId + ']'); |
| } |
| |
| updateFut.onDhtResponse(nodeId, res); |
| } |
| else { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to find future for DHT atomic near response [futId=" + res.futureId() + |
| ", node=" + nodeId + |
| ", res=" + res + ']'); |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Dht atomic update response. |
| */ |
| private void processDhtAtomicUpdateResponse(UUID nodeId, GridDhtAtomicUpdateResponse res) { |
| GridDhtAtomicAbstractUpdateFuture updateFut = |
| (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(res.futureId()); |
| |
| if (updateFut != null) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic update response [futId=" + res.futureId() + |
| ", writeVer=" + updateFut.writeVersion() + ", node=" + nodeId + ']'); |
| } |
| |
| updateFut.onDhtResponse(nodeId, res); |
| } |
| else { |
| U.warn(msgLog, "Failed to find DHT update future for update response [futId=" + res.futureId() + |
| ", node=" + nodeId + ", res=" + res + ']'); |
| } |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Deferred atomic update response. |
| */ |
| private void processDhtAtomicDeferredUpdateResponse(UUID nodeId, GridDhtAtomicDeferredUpdateResponse res) { |
| GridLongList futIds = res.futureIds(); |
| |
| assert futIds != null && !futIds.isEmpty() : futIds; |
| |
| for (int i = 0; i < futIds.size(); i++) { |
| long id = futIds.get(i); |
| |
| GridDhtAtomicAbstractUpdateFuture updateFut = (GridDhtAtomicAbstractUpdateFuture)ctx.mvcc().atomicFuture(id); |
| |
| if (updateFut != null) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Received DHT atomic deferred update response [futId=" + id + |
| ", writeVer=" + res + ", node=" + nodeId + ']'); |
| } |
| |
| updateFut.onDeferredResponse(nodeId); |
| } |
| else { |
| U.warn(msgLog, "Failed to find DHT update future for deferred update response [futId=" + id + |
| ", nodeId=" + nodeId + ", res=" + res + ']'); |
| } |
| } |
| } |
| |
| /** |
| * @param nodeId Originating node ID. |
| * @param res Near update response. |
| */ |
| private void sendNearUpdateReply(UUID nodeId, GridNearAtomicUpdateResponse res) { |
| try { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| |
| if (msgLog.isDebugEnabled()) |
| msgLog.debug("Sent near update response [futId=" + res.futureId() + ", node=" + nodeId + ']'); |
| } |
| catch (ClusterTopologyCheckedException ignored) { |
| if (msgLog.isDebugEnabled()) { |
| msgLog.debug("Failed to send near update response [futId=" + res.futureId() + |
| ", node=" + nodeId + ']'); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| U.error(msgLog, "Failed to send near update response [futId=" + res.futureId() + |
| ", node=" + nodeId + ", res=" + res + ']', e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtAtomicCache.class, this, super.toString()); |
| } |
| |
| /** |
| * |
| */ |
| private static class FinishedLockFuture extends GridFinishedFuture<Boolean> implements GridDhtFuture<Boolean> { |
| /** |
| * @param err Error. |
| */ |
| private FinishedLockFuture(Throwable err) { |
| super(err); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Integer> invalidPartitions() { |
| return Collections.emptyList(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| interface UpdateReplyClosure extends CI2<GridNearAtomicAbstractUpdateRequest, GridNearAtomicUpdateResponse> { |
| // No-op. |
| } |
| |
| /** |
| * |
| */ |
| private class DeferredUpdateTimeout implements GridTimeoutObject, Runnable { |
| /** */ |
| private final int part; |
| |
| /** */ |
| private final UUID primaryId; |
| |
| /** */ |
| private final IgniteUuid id; |
| |
| /** */ |
| private final long endTime; |
| |
| /** |
| * @param part Partition. |
| * @param primaryId Primary ID. |
| */ |
| DeferredUpdateTimeout(int part, UUID primaryId) { |
| this.part = part; |
| this.primaryId = primaryId; |
| |
| endTime = U.currentTimeMillis() + DEFERRED_UPDATE_RESPONSE_TIMEOUT; |
| |
| id = IgniteUuid.fromUuid(primaryId); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteUuid timeoutId() { |
| return id; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long endTime() { |
| return endTime; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| Map<UUID, GridDhtAtomicDeferredUpdateResponse> resMap = defRes.get(); |
| |
| GridDhtAtomicDeferredUpdateResponse msg = resMap.get(primaryId); |
| |
| if (msg != null && msg.timeoutSender() == this) { |
| msg.timeoutSender(null); |
| |
| resMap.remove(primaryId); |
| |
| sendDeferredUpdateResponse(primaryId, msg); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onTimeout() { |
| ctx.kernalContext().pools().getStripedExecutorService().execute(part, this); |
| } |
| } |
| } |