| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht.colocated; |
| |
| import java.io.Externalizable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.Map; |
| import java.util.UUID; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cache.ReadRepairStrategy; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.EntryGetResult; |
| import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheLockTimeoutException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMvccCandidate; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedLockCancelledException; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedUnlockRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtEmbeddedFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtFinishedFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtLockFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTransactionalCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedGetFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridPartitionedSingleGetFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearLockResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTransactionalCache; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearUnlockRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.consistency.GridNearReadRepairCheckOnlyFuture; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.util.future.GridEmbeddedFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.lang.IgnitePair; |
| import org.apache.ignite.internal.util.typedef.C2; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| |
| /** |
| * Colocated cache. |
| */ |
| public class GridDhtColocatedCache<K, V> extends GridDhtTransactionalCacheAdapter<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * Empty constructor required for {@link Externalizable} |
| */ |
| public GridDhtColocatedCache() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Cache context. |
| */ |
| public GridDhtColocatedCache(GridCacheContext<K, V> ctx) { |
| super(ctx); |
| } |
| |
| /** |
| * Creates colocated cache with specified map. |
| * |
| * @param ctx Cache context. |
| * @param map Cache map. |
| */ |
| public GridDhtColocatedCache(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { |
| super(ctx, map); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isColocated() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStart() throws IgniteCheckedException { |
| super.onKernalStart(); |
| |
| assert !ctx.isRecoveryMode() : "Registering message handlers in recovery mode [cacheName=" + name() + ']'; |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearGetResponse.class, |
| (CI2<UUID, GridNearGetResponse>)this::processNearGetResponse); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearSingleGetResponse.class, |
| (CI2<UUID, GridNearSingleGetResponse>)this::processNearSingleGetResponse); |
| |
| ctx.io().addCacheHandler( |
| ctx.cacheId(), |
| ctx.startTopologyVersion(), |
| GridNearLockResponse.class, |
| (CI2<UUID, GridNearLockResponse>)this::processNearLockResponse); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| } |
| |
| /** |
| * Gets or creates entry for given key and given topology version. |
| * |
| * @param key Key for entry. |
| * @param topVer Topology version. |
| * @param allowDetached Whether to allow detached entries. If {@code true} and node is not primary |
| * for given key, a new detached entry will be created. Otherwise, entry will be obtained from |
| * dht cache map. |
| * @return Cache entry. |
| * @throws GridDhtInvalidPartitionException If {@code allowDetached} is false and node is not primary |
| * for given key. |
| */ |
| public GridDistributedCacheEntry entryExx( |
| KeyCacheObject key, |
| AffinityTopologyVersion topVer, |
| boolean allowDetached |
| ) { |
| return allowDetached && !ctx.affinity().primaryByKey(ctx.localNode(), key, topVer) ? |
| createEntry(key) : entryExx(key, topVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isLocked(K key) { |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| return ctx.mvcc().isLockedByThread(ctx.txKey(cacheKey), -1); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isLockedByThread(K key) { |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| return ctx.mvcc().isLockedByThread(ctx.txKey(cacheKey), Thread.currentThread().getId()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected IgniteInternalFuture<V> getAsync(final K key, |
| boolean forcePrimary, |
| boolean skipTx, |
| String taskName, |
| final boolean deserializeBinary, |
| final boolean skipVals, |
| final boolean needVer) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| GridNearTxLocal tx = checkCurrentTx(); |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean recovery = opCtx != null && opCtx.recovery(); |
| final ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null; |
| |
| // Get operation bypass Tx in Mvcc mode. |
| if (tx != null && !tx.implicit() && !skipTx) { |
| return asyncOp(tx, new AsyncOp<V>() { |
| @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| IgniteInternalFuture<Map<Object, Object>> fut = tx.getAllAsync(ctx, |
| readyTopVer, |
| Collections.singleton(ctx.toCacheKeyObject(key)), |
| deserializeBinary, |
| skipVals, |
| false, |
| opCtx != null && opCtx.skipStore(), |
| recovery, |
| readRepairStrategy, |
| needVer); |
| |
| return fut.chain(new CX1<IgniteInternalFuture<Map<Object, Object>>, V>() { |
| @Override public V applyx(IgniteInternalFuture<Map<Object, Object>> e) |
| throws IgniteCheckedException { |
| Map<Object, Object> map = e.get(); |
| |
| assert map.isEmpty() || map.size() == 1 : map.size(); |
| |
| if (skipVals) { |
| Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map); |
| |
| return (V)(val); |
| } |
| |
| return (V)F.firstValue(map); |
| } |
| }); |
| } |
| }, opCtx, /*retry*/false); |
| } |
| |
| AffinityTopologyVersion topVer; |
| |
| if (tx != null) |
| topVer = tx.topologyVersion(); |
| else |
| topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| if (readRepairStrategy != null) { |
| return new GridNearReadRepairCheckOnlyFuture( |
| topVer, |
| ctx, |
| Collections.singleton(ctx.toCacheKeyObject(key)), |
| readRepairStrategy, |
| opCtx == null || !opCtx.skipStore(), |
| taskName, |
| deserializeBinary, |
| recovery, |
| skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), |
| skipVals, |
| needVer, |
| false, |
| tx).single(); |
| } |
| |
| GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, |
| ctx.toCacheKeyObject(key), |
| topVer, |
| opCtx == null || !opCtx.skipStore(), |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), |
| skipVals, |
| needVer, |
| /*keepCacheObjects*/false, |
| opCtx != null && opCtx.recovery(), |
| null); |
| |
| fut.init(); |
| |
| return (IgniteInternalFuture<V>)fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable final Collection<? extends K> keys, |
| boolean forcePrimary, |
| boolean skipTx, |
| String taskName, |
| final boolean deserializeBinary, |
| final boolean recovery, |
| final ReadRepairStrategy readRepairStrategy, |
| final boolean skipVals, |
| final boolean needVer |
| ) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| if (F.isEmpty(keys)) |
| return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); |
| |
| warnIfUnordered(keys, BulkOperation.GET); |
| |
| GridNearTxLocal tx = checkCurrentTx(); |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| if (tx != null && !tx.implicit() && !skipTx) { |
| return asyncOp(tx, new AsyncOp<Map<K, V>>(keys) { |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Map<K, V>> op(GridNearTxLocal tx, |
| AffinityTopologyVersion readyTopVer) { |
| return tx.getAllAsync(ctx, |
| readyTopVer, |
| ctx.cacheKeysView(keys), |
| deserializeBinary, |
| skipVals, |
| false, |
| opCtx != null && opCtx.skipStore(), |
| recovery, |
| readRepairStrategy, |
| needVer); |
| } |
| }, opCtx, /*retry*/false); |
| } |
| |
| AffinityTopologyVersion topVer; |
| |
| if (tx != null) |
| topVer = tx.topologyVersion(); |
| else |
| topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| if (readRepairStrategy != null) { |
| return new GridNearReadRepairCheckOnlyFuture( |
| topVer, |
| ctx, |
| ctx.cacheKeysView(keys), |
| readRepairStrategy, |
| opCtx == null || !opCtx.skipStore(), |
| taskName, |
| deserializeBinary, |
| recovery, |
| skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), |
| skipVals, |
| needVer, |
| false, |
| tx).multi(); |
| } |
| |
| IgniteInternalFuture<Map<K, V>> fut = loadAsync( |
| ctx.cacheKeysView(keys), |
| opCtx == null || !opCtx.skipStore(), |
| forcePrimary, |
| topVer, |
| taskName, |
| deserializeBinary, |
| recovery, |
| skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), |
| skipVals, |
| needVer, |
| false, |
| null |
| ); |
| |
| return fut; |
| } |
| |
| /** |
| * @param key Key to load. |
| * @param readThrough Read through flag. |
| * @param forcePrimary Force get from primary node flag. |
| * @param topVer Topology version. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param needVer If {@code true} returns values as tuples containing value and version. |
| * @param keepCacheObj Keep cache objects flag. |
| * @param txLbl Transaction label. |
| * @return Load future. |
| */ |
| public final IgniteInternalFuture<Object> loadAsync( |
| KeyCacheObject key, |
| boolean readThrough, |
| boolean forcePrimary, |
| AffinityTopologyVersion topVer, |
| String taskName, |
| boolean deserializeBinary, |
| @Nullable IgniteCacheExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean needVer, |
| boolean keepCacheObj, |
| boolean recovery, |
| @Nullable String txLbl |
| ) { |
| GridPartitionedSingleGetFuture fut = new GridPartitionedSingleGetFuture(ctx, |
| ctx.toCacheKeyObject(key), |
| topVer, |
| readThrough, |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| expiryPlc, |
| skipVals, |
| needVer, |
| keepCacheObj, |
| recovery, |
| txLbl); |
| |
| fut.init(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param keys Keys to load. |
| * @param readThrough Read through flag. |
| * @param forcePrimary Force get from primary node flag. |
| * @param topVer Topology version. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param needVer If {@code true} returns values as tuples containing value and version. |
| * @param keepCacheObj Keep cache objects flag. |
| * @param txLbl Transaction label. |
| * @return Load future. |
| */ |
| public final IgniteInternalFuture<Map<K, V>> loadAsync( |
| @Nullable Collection<KeyCacheObject> keys, |
| boolean readThrough, |
| boolean forcePrimary, |
| AffinityTopologyVersion topVer, |
| String taskName, |
| boolean deserializeBinary, |
| boolean recovery, |
| @Nullable IgniteCacheExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean needVer, |
| boolean keepCacheObj, |
| @Nullable String txLbl |
| ) { |
| if (keys == null || keys.isEmpty()) |
| return new GridFinishedFuture<>(Collections.<K, V>emptyMap()); |
| |
| if (expiryPlc == null) |
| expiryPlc = expiryPolicy(null); |
| |
| // Optimization: try to resolve value locally and escape 'get future' creation. |
| if (!forcePrimary && ctx.config().isReadFromBackup() && ctx.affinityNode() && |
| ctx.topology().lostPartitions().isEmpty()) { |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| Map<K, V> locVals = null; |
| |
| boolean success = true; |
| boolean readNoEntry = ctx.readNoEntry(expiryPlc, false); |
| boolean evt = !skipVals; |
| |
| for (KeyCacheObject key : keys) { |
| if (readNoEntry) { |
| CacheDataRow row = ctx.offheap().read(ctx, key); |
| |
| if (row != null) { |
| long expireTime = row.expireTime(); |
| |
| if (expireTime == 0 || expireTime > U.currentTimeMillis()) { |
| if (locVals == null) |
| locVals = U.newHashMap(keys.size()); |
| |
| ctx.addResult(locVals, |
| key, |
| row.value(), |
| skipVals, |
| keepCacheObj, |
| deserializeBinary, |
| true, |
| null, |
| row.version(), |
| 0, |
| 0, |
| needVer, |
| U.deploymentClassLoader(ctx.kernalContext(), U.contextDeploymentClassLoaderId(ctx.kernalContext()))); |
| |
| if (evt) { |
| ctx.events().readEvent(key, |
| null, |
| txLbl, |
| row.value(), |
| taskName, |
| !deserializeBinary); |
| } |
| } |
| else |
| success = false; |
| } |
| else |
| success = false; |
| } |
| else { |
| GridCacheEntryEx entry = null; |
| |
| while (true) { |
| try { |
| entry = entryEx(key); |
| |
| // If our DHT cache do has value, then we peek it. |
| if (entry != null) { |
| boolean isNew = entry.isNewLocked(); |
| |
| EntryGetResult getRes = null; |
| CacheObject v = null; |
| GridCacheVersion ver = null; |
| |
| if (needVer) { |
| getRes = entry.innerGetVersioned( |
| null, |
| null, |
| /*update-metrics*/false, |
| /*event*/evt, |
| null, |
| taskName, |
| expiryPlc, |
| !deserializeBinary, |
| null); |
| |
| if (getRes != null) { |
| v = getRes.value(); |
| ver = getRes.version(); |
| } |
| } |
| else { |
| v = entry.innerGet( |
| null, |
| null, |
| /*read-through*/false, |
| /*update-metrics*/false, |
| /*event*/evt, |
| null, |
| taskName, |
| expiryPlc, |
| !deserializeBinary); |
| } |
| |
| // Entry was not in memory or in swap, so we remove it from cache. |
| if (v == null) { |
| GridCacheVersion obsoleteVer = nextVersion(); |
| |
| if (isNew && entry.markObsoleteIfEmpty(obsoleteVer)) |
| removeEntry(entry); |
| |
| success = false; |
| } |
| else { |
| if (locVals == null) |
| locVals = U.newHashMap(keys.size()); |
| |
| ctx.addResult(locVals, |
| key, |
| v, |
| skipVals, |
| keepCacheObj, |
| deserializeBinary, |
| true, |
| getRes, |
| ver, |
| 0, |
| 0, |
| needVer, |
| U.deploymentClassLoader( |
| ctx.kernalContext(), |
| U.contextDeploymentClassLoaderId(ctx.kernalContext()) |
| ) |
| ); |
| } |
| } |
| else |
| success = false; |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| // No-op, retry. |
| } |
| catch (GridDhtInvalidPartitionException ignored) { |
| success = false; |
| |
| break; // While. |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| } |
| |
| if (!success) |
| break; |
| else if (!skipVals && ctx.statisticsEnabled()) |
| ctx.cache().metrics0().onRead(true); |
| } |
| |
| if (success) { |
| sendTtlUpdateRequest(expiryPlc); |
| |
| return new GridFinishedFuture<>(locVals); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| if (expiryPlc != null) |
| expiryPlc.reset(); |
| |
| // Either reload or not all values are available locally. |
| GridPartitionedGetFuture<K, V> fut = new GridPartitionedGetFuture<>( |
| ctx, |
| keys, |
| readThrough, |
| forcePrimary, |
| taskName, |
| deserializeBinary, |
| recovery, |
| expiryPlc, |
| skipVals, |
| needVer, |
| keepCacheObj, |
| txLbl, |
| null); |
| |
| fut.init(topVer); |
| |
| return fut; |
| } |
| |
| /** |
| * This is an entry point to pessimistic locking within transaction. |
| * |
| * {@inheritDoc} |
| */ |
| @Override public IgniteInternalFuture<Boolean> lockAllAsync( |
| Collection<KeyCacheObject> keys, |
| long timeout, |
| @Nullable IgniteTxLocalEx tx, |
| boolean isInvalidate, |
| boolean isRead, |
| boolean retval, |
| @Nullable TransactionIsolation isolation, |
| long createTtl, |
| long accessTtl |
| ) { |
| assert tx == null || tx instanceof GridNearTxLocal : tx; |
| |
| GridNearTxLocal txx = (GridNearTxLocal)tx; |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| GridDhtColocatedLockFuture fut = new GridDhtColocatedLockFuture(ctx, |
| keys, |
| txx, |
| isRead, |
| retval, |
| timeout, |
| createTtl, |
| accessTtl, |
| CU.empty0(), |
| opCtx != null && opCtx.skipStore(), |
| opCtx != null && opCtx.isKeepBinary(), |
| opCtx != null && opCtx.recovery()); |
| |
| // Future will be added to mvcc only if it was mapped to remote nodes. |
| fut.map(); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNearTransactionalCache<K, V> near() { |
| assert false : "Near cache is not available in colocated mode."; |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unlockAll(Collection<? extends K> keys) { |
| if (keys.isEmpty()) |
| return; |
| |
| try { |
| GridCacheVersion ver = null; |
| |
| int keyCnt = -1; |
| |
| Map<ClusterNode, GridNearUnlockRequest> map = null; |
| |
| Collection<KeyCacheObject> locKeys = new ArrayList<>(); |
| |
| for (K key : keys) { |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| IgniteTxKey txKey = ctx.txKey(cacheKey); |
| |
| GridDistributedCacheEntry entry = peekExx(cacheKey); |
| |
| GridCacheMvccCandidate lock = |
| ctx.mvcc().removeExplicitLock(Thread.currentThread().getId(), txKey, null); |
| |
| if (lock != null) { |
| final AffinityTopologyVersion topVer = lock.topologyVersion(); |
| |
| assert topVer.compareTo(AffinityTopologyVersion.ZERO) > 0; |
| |
| // Send request to remove from remote nodes. |
| ClusterNode primary = ctx.affinity().primaryByKey(key, topVer); |
| |
| if (primary == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to unlock keys (all partition nodes left the grid)."); |
| |
| continue; |
| } |
| |
| if (map == null) { |
| Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer); |
| |
| keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); |
| |
| map = U.newHashMap(affNodes.size()); |
| } |
| |
| if (ver == null) |
| ver = lock.version(); |
| |
| if (!lock.reentry()) { |
| if (!ver.equals(lock.version())) |
| throw new IgniteCheckedException("Failed to unlock (if keys were locked separately, " + |
| "then they need to be unlocked separately): " + keys); |
| |
| if (!primary.isLocal()) { |
| GridNearUnlockRequest req = map.get(primary); |
| |
| if (req == null) { |
| map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt, |
| ctx.deploymentEnabled())); |
| |
| req.version(ver); |
| } |
| |
| KeyCacheObject key0 = entry != null ? entry.key() : cacheKey; |
| |
| req.addKey(key0, ctx); |
| } |
| else |
| locKeys.add(cacheKey); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Removed lock (will distribute): " + lock); |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Current thread still owns lock (or there are no other nodes)" + |
| " [lock=" + lock + ", curThreadId=" + Thread.currentThread().getId() + ']'); |
| } |
| } |
| |
| if (ver == null) |
| return; |
| |
| if (!locKeys.isEmpty()) |
| removeLocks(ctx.localNodeId(), ver, locKeys, true); |
| |
| for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { |
| ClusterNode n = mapping.getKey(); |
| |
| GridDistributedUnlockRequest req = mapping.getValue(); |
| |
| assert !n.isLocal(); |
| |
| if (!F.isEmpty(req.keys())) { |
| try { |
| // We don't wait for reply to this message. |
| ctx.io().send(n, req, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() + |
| ", n=" + n + ", e=" + e + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e); |
| } |
| } |
| } |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to unlock the lock for keys: " + keys, ex); |
| } |
| } |
| |
| /** |
| * Removes locks regardless of whether they are owned or not for given |
| * version and keys. |
| * |
| * @param threadId Thread ID. |
| * @param ver Lock version. |
| * @param keys Keys. |
| */ |
| public void removeLocks(long threadId, GridCacheVersion ver, Collection<KeyCacheObject> keys) { |
| if (keys.isEmpty()) |
| return; |
| |
| try { |
| int keyCnt = -1; |
| |
| Map<ClusterNode, GridNearUnlockRequest> map = null; |
| |
| Collection<KeyCacheObject> locKeys = new LinkedList<>(); |
| |
| for (KeyCacheObject key : keys) { |
| IgniteTxKey txKey = ctx.txKey(key); |
| |
| GridCacheMvccCandidate lock = ctx.mvcc().removeExplicitLock(threadId, txKey, ver); |
| |
| if (lock != null) { |
| AffinityTopologyVersion topVer = lock.topologyVersion(); |
| |
| if (map == null) { |
| Collection<ClusterNode> affNodes = CU.affinityNodes(ctx, topVer); |
| |
| keyCnt = (int)Math.ceil((double)keys.size() / affNodes.size()); |
| |
| map = U.newHashMap(affNodes.size()); |
| } |
| |
| ClusterNode primary = ctx.affinity().primaryByKey(key, topVer); |
| |
| if (primary == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to remove locks (all partition nodes left the grid)."); |
| |
| continue; |
| } |
| |
| if (!primary.isLocal()) { |
| // Send request to remove from remote nodes. |
| GridNearUnlockRequest req = map.get(primary); |
| |
| if (req == null) { |
| map.put(primary, req = new GridNearUnlockRequest(ctx.cacheId(), keyCnt, |
| ctx.deploymentEnabled())); |
| |
| req.version(ver); |
| } |
| |
| GridCacheEntryEx entry = peekEx(key); |
| |
| KeyCacheObject key0 = entry != null ? entry.key() : key; |
| |
| req.addKey(key0, ctx); |
| } |
| else |
| locKeys.add(key); |
| } |
| } |
| |
| if (!locKeys.isEmpty()) |
| removeLocks(ctx.localNodeId(), ver, locKeys, true); |
| |
| if (map == null || map.isEmpty()) |
| return; |
| |
| IgnitePair<Collection<GridCacheVersion>> versPair = ctx.tm().versions(ver); |
| |
| Collection<GridCacheVersion> committed = versPair.get1(); |
| Collection<GridCacheVersion> rolledback = versPair.get2(); |
| |
| for (Map.Entry<ClusterNode, GridNearUnlockRequest> mapping : map.entrySet()) { |
| ClusterNode n = mapping.getKey(); |
| |
| GridDistributedUnlockRequest req = mapping.getValue(); |
| |
| if (!F.isEmpty(req.keys())) { |
| req.completedVersions(committed, rolledback); |
| |
| try { |
| // We don't wait for reply to this message. |
| ctx.io().send(n, req, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send unlock request (node has left the grid) [keys=" + req.keys() + |
| ", n=" + n + ", e=" + e + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send unlock request [keys=" + req.keys() + ", n=" + n + ']', e); |
| } |
| } |
| } |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to unlock the lock for keys: " + keys, ex); |
| } |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @param tx Started colocated transaction (if any). |
| * @param threadId Thread ID. |
| * @param ver Lock version. |
| * @param topVer Topology version. |
| * @param keys Mapped keys. |
| * @param txRead Tx read. |
| * @param retval Return value flag. |
| * @param timeout Lock timeout. |
| * @param createTtl TTL for create operation. |
| * @param accessTtl TTL for read operation. |
| * @param filter filter Optional filter. |
| * @param skipStore Skip store flag. |
| * @return Lock future. |
| */ |
| IgniteInternalFuture<Exception> lockAllAsync( |
| final GridCacheContext<?, ?> cacheCtx, |
| @Nullable final GridNearTxLocal tx, |
| final long threadId, |
| final GridCacheVersion ver, |
| final AffinityTopologyVersion topVer, |
| final Collection<KeyCacheObject> keys, |
| final boolean txRead, |
| final boolean retval, |
| final long timeout, |
| final long createTtl, |
| final long accessTtl, |
| @Nullable final CacheEntryPredicate[] filter, |
| final boolean skipStore, |
| final boolean keepBinary |
| ) { |
| assert keys != null; |
| |
| IgniteInternalFuture<Object> keyFut = ctx.group().preloader().request(cacheCtx, keys, topVer); |
| |
| // Prevent embedded future creation if possible. |
| if (keyFut == null || keyFut.isDone()) { |
| // Check for exception. |
| if (keyFut != null && keyFut.error() != null) |
| return new GridFinishedFuture<>(keyFut.error()); |
| |
| return lockAllAsync0(cacheCtx, |
| tx, |
| threadId, |
| ver, |
| topVer, |
| keys, |
| txRead, |
| retval, |
| timeout, |
| createTtl, |
| accessTtl, |
| filter, |
| skipStore, |
| keepBinary); |
| } |
| else { |
| return new GridEmbeddedFuture<>(keyFut, |
| new C2<Object, Exception, IgniteInternalFuture<Exception>>() { |
| @Override public IgniteInternalFuture<Exception> apply(Object o, Exception exx) { |
| if (exx != null) |
| return new GridDhtFinishedFuture<>(exx); |
| |
| return lockAllAsync0(cacheCtx, |
| tx, |
| threadId, |
| ver, |
| topVer, |
| keys, |
| txRead, |
| retval, |
| timeout, |
| createTtl, |
| accessTtl, |
| filter, |
| skipStore, |
| keepBinary); |
| } |
| } |
| ); |
| } |
| } |
| |
| /** |
| * @param cacheCtx Cache context. |
| * @param tx Started colocated transaction (if any). |
| * @param threadId Thread ID. |
| * @param ver Lock version. |
| * @param topVer Topology version. |
| * @param keys Mapped keys. |
| * @param txRead Tx read. |
| * @param retval Return value flag. |
| * @param timeout Lock timeout. |
| * @param createTtl TTL for create operation. |
| * @param accessTtl TTL for read operation. |
| * @param filter filter Optional filter. |
| * @param skipStore Skip store flag. |
| * @return Lock future. |
| */ |
| private IgniteInternalFuture<Exception> lockAllAsync0( |
| GridCacheContext<?, ?> cacheCtx, |
| @Nullable final GridNearTxLocal tx, |
| long threadId, |
| final GridCacheVersion ver, |
| AffinityTopologyVersion topVer, |
| final Collection<KeyCacheObject> keys, |
| final boolean txRead, |
| boolean retval, |
| final long timeout, |
| final long createTtl, |
| final long accessTtl, |
| @Nullable final CacheEntryPredicate[] filter, |
| boolean skipStore, |
| boolean keepBinary) { |
| int cnt = keys.size(); |
| |
| if (tx == null) { |
| GridDhtLockFuture fut = new GridDhtLockFuture(ctx, |
| ctx.localNodeId(), |
| ver, |
| topVer, |
| cnt, |
| txRead, |
| retval, |
| timeout, |
| tx, |
| threadId, |
| createTtl, |
| accessTtl, |
| skipStore, |
| keepBinary); |
| |
| // Add before mapping. |
| if (!ctx.mvcc().addFuture(fut)) |
| throw new IllegalStateException("Duplicate future ID: " + fut); |
| |
| boolean timedout = false; |
| |
| for (KeyCacheObject key : keys) { |
| if (timedout) |
| break; |
| |
| while (true) { |
| GridDhtCacheEntry entry = entryExx(key, topVer); |
| |
| try { |
| fut.addEntry(key == null ? null : entry); |
| |
| if (fut.isDone()) |
| timedout = true; |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry when adding lock (will retry): " + entry); |
| } |
| catch (GridDistributedLockCancelledException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to add entry [err=" + e + ", entry=" + entry + ']'); |
| |
| fut.onError(e); |
| |
| return new GridDhtFinishedFuture<>(e); |
| } |
| } |
| } |
| |
| // This will send remote messages. |
| fut.map(); |
| |
| return new GridDhtEmbeddedFuture<>( |
| new C2<Boolean, Exception, Exception>() { |
| @Override public Exception apply(Boolean b, Exception e) { |
| if (e != null) |
| e = U.unwrap(e); |
| else if (!b) |
| e = new GridCacheLockTimeoutException(ver); |
| |
| return e; |
| } |
| }, |
| fut); |
| } |
| else { |
| // Handle implicit locks for pessimistic transactions. |
| ctx.tm().txContext(tx); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Performing colocated lock [tx=" + tx + ", keys=" + keys + ']'); |
| |
| IgniteInternalFuture<GridCacheReturn> txFut = tx.lockAllAsync(cacheCtx, |
| keys, |
| retval, |
| txRead, |
| createTtl, |
| accessTtl, |
| skipStore, |
| keepBinary); |
| |
| return new GridDhtEmbeddedFuture<>( |
| new C2<GridCacheReturn, Exception, Exception>() { |
| @Override public Exception apply(GridCacheReturn ret, |
| Exception e) { |
| if (e != null) |
| e = U.unwrap(e); |
| |
| assert !tx.empty(); |
| |
| return e; |
| } |
| }, |
| txFut); |
| } |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param res Response. |
| */ |
| private void processNearLockResponse(UUID nodeId, GridNearLockResponse res) { |
| if (txLockMsgLog.isDebugEnabled()) |
| txLockMsgLog.debug("Received near lock response [txId=" + res.version() + ", node=" + nodeId + ']'); |
| |
| assert nodeId != null; |
| assert res != null; |
| |
| GridDhtColocatedLockFuture fut = (GridDhtColocatedLockFuture)ctx.mvcc(). |
| <Boolean>versionedFuture(res.version(), res.futureId()); |
| |
| if (fut != null) |
| fut.onResult(nodeId, res); |
| else { |
| if (txLockMsgLog.isDebugEnabled()) { |
| txLockMsgLog.debug("Received near lock response for unknown future [txId=" + res.version() + |
| ", node=" + nodeId + |
| ", res=" + res + ']'); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtColocatedCache.class, this, super.toString()); |
| } |
| } |