| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.local.atomic; |
| |
| import java.io.Externalizable; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Iterator; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.Callable; |
| import javax.cache.expiry.ExpiryPolicy; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.EntryProcessorException; |
| import javax.cache.processor.EntryProcessorResult; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.cache.ReadRepairStrategy; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheEntryPredicate; |
| import org.apache.ignite.internal.processors.cache.CacheInvokeEntry; |
| import org.apache.ignite.internal.processors.cache.CacheInvokeResult; |
| import org.apache.ignite.internal.processors.cache.CacheLazyEntry; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; |
| import org.apache.ignite.internal.processors.cache.CacheStorePartialUpdateException; |
| import org.apache.ignite.internal.processors.cache.EntryGetResult; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheOperation; |
| import org.apache.ignite.internal.processors.cache.GridCachePreloader; |
| import org.apache.ignite.internal.processors.cache.GridCachePreloaderAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheReturn; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.LockedEntriesInfo; |
| import org.apache.ignite.internal.processors.cache.local.GridLocalCache; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.resource.GridResourceIoc; |
| import org.apache.ignite.internal.util.F0; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.lang.GridPlainCallable; |
| import org.apache.ignite.internal.util.lang.GridTuple3; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.thread.IgniteThread; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.DELETE; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.TRANSFORM; |
| import static org.apache.ignite.internal.processors.cache.GridCacheOperation.UPDATE; |
| |
| /** |
| * Non-transactional local cache. |
| */ |
| public class GridLocalAtomicCache<K, V> extends GridLocalCache<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private GridCachePreloader preldr; |
| |
| /** Locked entries info for each thread. */ |
| private final LockedEntriesInfo lockedEntriesInfo = new LockedEntriesInfo(); |
| |
| /** |
| * Empty constructor required by {@link Externalizable}. |
| */ |
| public GridLocalAtomicCache() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Cache context. |
| */ |
| public GridLocalAtomicCache(GridCacheContext<K, V> ctx) { |
| super(ctx); |
| |
| preldr = new GridCachePreloaderAdapter(ctx.group()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void checkJta() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isLocal() { |
| return true; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCachePreloader preloader() { |
| return preldr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected V getAndPut0(K key, V val, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return (V)updateAllInternal(UPDATE, |
| Collections.singleton(key), |
| Collections.singleton(val), |
| null, |
| expiryPerCall(), |
| true, |
| false, |
| filter, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected boolean put0(K key, V val, CacheEntryPredicate filter) throws IgniteCheckedException { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| Boolean res = (Boolean)updateAllInternal(UPDATE, |
| Collections.singleton(key), |
| Collections.singleton(val), |
| null, |
| expiryPerCall(), |
| false, |
| false, |
| filter, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| |
| assert res != null; |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<V> getAndPutAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { |
| return updateAllAsync0(F0.asMap(key, val), |
| null, |
| null, |
| true, |
| false, |
| filter); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Boolean> putAsync0(K key, V val, @Nullable CacheEntryPredicate filter) { |
| return updateAllAsync0(F0.asMap(key, val), |
| null, |
| null, |
| false, |
| false, |
| filter); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void putAll0(Map<? extends K, ? extends V> m) throws IgniteCheckedException { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| updateAllInternal(UPDATE, |
| m.keySet(), |
| m.values(), |
| null, |
| expiryPerCall(), |
| false, |
| false, |
| null, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllAsync0(Map<? extends K, ? extends V> m) { |
| return updateAllAsync0(m, |
| null, |
| null, |
| false, |
| false, |
| null).chain(RET2NULL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected V getAndRemove0(K key) throws IgniteCheckedException { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return (V)updateAllInternal(DELETE, |
| Collections.singleton(key), |
| null, |
| null, |
| expiryPerCall(), |
| true, |
| false, |
| null, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<V> getAndRemoveAsync0(K key) { |
| return removeAllAsync0(Collections.singletonList(key), true, false, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeAll0(Collection<? extends K> keys) throws IgniteCheckedException { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| updateAllInternal(DELETE, |
| keys, |
| null, |
| null, |
| expiryPerCall(), |
| false, |
| false, |
| null, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Object> removeAllAsync0(Collection<? extends K> keys) { |
| return removeAllAsync0(keys, false, false, null).chain(RET2NULL); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean remove0(K key, final CacheEntryPredicate filter) throws IgniteCheckedException { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| Boolean rmv = (Boolean)updateAllInternal(DELETE, |
| Collections.singleton(key), |
| null, |
| null, |
| expiryPerCall(), |
| false, |
| false, |
| filter, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| |
| assert rmv != null; |
| |
| return rmv; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Boolean> removeAsync0(K key, @Nullable CacheEntryPredicate filter) { |
| return removeAllAsync0(Collections.singletonList(key), false, false, filter); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> removeAllAsync() { |
| return ctx.closures().callLocalSafe(new GridPlainCallable<Void>() { |
| @Override public Void call() throws Exception { |
| removeAll(); |
| |
| return null; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected V get( |
| final K key, |
| String taskName, |
| boolean deserializeBinary, |
| boolean needVer) throws IgniteCheckedException { |
| Map<K, V> m = getAllInternal(Collections.singleton(key), |
| ctx.readThrough(), |
| taskName, |
| deserializeBinary, |
| false, |
| needVer); |
| |
| assert m.isEmpty() || m.size() == 1 : m.size(); |
| |
| return F.firstValue(m); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final Map<K, V> getAll( |
| Collection<? extends K> keys, |
| boolean deserializeBinary, |
| boolean needVer, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy) throws IgniteCheckedException { |
| A.notNull(keys, "keys"); |
| |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| return getAllInternal(keys, |
| ctx.readThrough(), |
| taskName, |
| deserializeBinary, |
| false, |
| needVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable final Collection<? extends K> keys, |
| final boolean forcePrimary, |
| boolean skipTx, |
| final String taskName, |
| final boolean deserializeBinary, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy, |
| final boolean skipVals, |
| final boolean needVer |
| ) { |
| A.notNull(keys, "keys"); |
| |
| final boolean storeEnabled = ctx.readThrough(); |
| |
| return asyncOp(new GridPlainCallable<Map<K, V>>() { |
| @Override public Map<K, V> call() throws Exception { |
| return getAllInternal(keys, storeEnabled, taskName, deserializeBinary, skipVals, needVer); |
| } |
| }); |
| } |
| |
| /** |
| * Entry point to all public API get methods. |
| * |
| * @param keys Keys to remove. |
| * @param storeEnabled Store enabled flag. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary . |
| * @param skipVals Skip value flag. |
| * @param needVer Need version. |
| * @return Key-value map. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("ConstantConditions") |
| private Map<K, V> getAllInternal(@Nullable Collection<? extends K> keys, |
| boolean storeEnabled, |
| String taskName, |
| boolean deserializeBinary, |
| boolean skipVals, |
| boolean needVer |
| ) throws IgniteCheckedException { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| if (F.isEmpty(keys)) |
| return Collections.emptyMap(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| Map<K, V> vals = U.newHashMap(keys.size()); |
| |
| warnIfUnordered(keys, BulkOperation.GET); |
| |
| final IgniteCacheExpiryPolicy expiry = expiryPolicy(opCtx != null ? opCtx.expiry() : null); |
| |
| boolean success = true; |
| boolean readNoEntry = ctx.readNoEntry(expiry, false); |
| final boolean evt = !skipVals; |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| for (K key : keys) { |
| if (key == null) |
| throw new NullPointerException("Null key."); |
| |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| boolean skipEntry = readNoEntry; |
| |
| if (readNoEntry) { |
| CacheDataRow row = ctx.offheap().read(ctx, cacheKey); |
| |
| if (row != null) { |
| long expireTime = row.expireTime(); |
| |
| if (expireTime == 0 || expireTime > U.currentTimeMillis()) { |
| ctx.addResult(vals, |
| cacheKey, |
| row.value(), |
| skipVals, |
| false, |
| deserializeBinary, |
| true, |
| null, |
| row.version(), |
| 0, |
| 0, |
| needVer, |
| null); |
| |
| if (ctx.statisticsEnabled() && !skipVals) |
| metrics0().onRead(true); |
| |
| if (evt) { |
| ctx.events().readEvent(cacheKey, |
| null, |
| null, |
| row.value(), |
| taskName, |
| !deserializeBinary); |
| } |
| } |
| else |
| skipEntry = false; |
| } |
| else |
| success = false; |
| } |
| |
| if (!skipEntry) { |
| GridCacheEntryEx entry = null; |
| |
| while (true) { |
| try { |
| entry = entryEx(cacheKey); |
| |
| if (entry != null) { |
| CacheObject v; |
| |
| if (needVer) { |
| EntryGetResult res = entry.innerGetVersioned( |
| null, |
| null, |
| /*update-metrics*/false, |
| /*event*/evt, |
| null, |
| taskName, |
| expiry, |
| !deserializeBinary, |
| null); |
| |
| if (res != null) { |
| ctx.addResult( |
| vals, |
| cacheKey, |
| res, |
| skipVals, |
| false, |
| deserializeBinary, |
| true, |
| needVer); |
| } |
| else |
| success = false; |
| } |
| else { |
| v = entry.innerGet( |
| null, |
| null, |
| /*read-through*/false, |
| /*update-metrics*/true, |
| /*event*/evt, |
| null, |
| taskName, |
| expiry, |
| !deserializeBinary); |
| |
| if (v != null) { |
| ctx.addResult(vals, |
| cacheKey, |
| v, |
| skipVals, |
| false, |
| deserializeBinary, |
| true, |
| null, |
| 0, |
| 0, |
| null); |
| } |
| else |
| success = false; |
| } |
| } |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| // No-op, retry. |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| } |
| |
| if (!success && storeEnabled) |
| break; |
| } |
| } |
| if (!success) { |
| if (!storeEnabled && ctx.statisticsEnabled() && !skipVals) |
| metrics0().onRead(false); |
| } |
| } |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| |
| if (success || !storeEnabled) |
| return vals; |
| |
| return getAllAsync( |
| keys, |
| null, |
| opCtx == null || !opCtx.skipStore(), |
| false, |
| taskName, |
| deserializeBinary, |
| opCtx != null && opCtx.recovery(), |
| null, |
| /*force primary*/false, |
| expiry, |
| skipVals, |
| needVer).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> EntryProcessorResult<T> invoke(K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) throws IgniteCheckedException { |
| return invokeAsync(key, entryProcessor, args).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| Object... args) throws IgniteCheckedException { |
| A.notNull(keys, "keys", entryProcessor, "entryProcessor"); |
| |
| warnIfUnordered(keys, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { |
| @Override public EntryProcessor apply(K k) { |
| return entryProcessor; |
| } |
| }); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| Map<K, EntryProcessorResult<T>> entryProcessorRes = (Map<K, EntryProcessorResult<T>>)updateAllInternal( |
| TRANSFORM, |
| invokeMap.keySet(), |
| invokeMap.values(), |
| args, |
| expiryPerCall(), |
| false, |
| false, |
| null, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| keepBinary); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| return entryProcessorRes; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync(K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) throws EntryProcessorException { |
| A.notNull(key, "key", entryProcessor, "entryProcessor"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| Map<? extends K, EntryProcessor> invokeMap = |
| Collections.singletonMap(key, (EntryProcessor)entryProcessor); |
| |
| IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut = updateAllAsync0(null, |
| invokeMap, |
| args, |
| false, |
| false, |
| null); |
| |
| return fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryProcessorResult<T>>>, EntryProcessorResult<T>>() { |
| @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> fut) |
| throws IgniteCheckedException { |
| Map<K, EntryProcessorResult<T>> resMap = fut.get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (resMap != null) { |
| assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); |
| |
| return resMap.isEmpty() ? new CacheInvokeResult<>() : resMap.values().iterator().next(); |
| } |
| |
| return new CacheInvokeResult<>(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( |
| Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| Object... args) { |
| A.notNull(keys, "keys", entryProcessor, "entryProcessor"); |
| |
| warnIfUnordered(keys, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| Map<? extends K, EntryProcessor> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor>() { |
| @Override public EntryProcessor apply(K k) { |
| return entryProcessor; |
| } |
| }); |
| |
| IgniteInternalFuture fut = updateAllAsync0(null, |
| invokeMap, |
| args, |
| true, |
| false, |
| null); |
| |
| if (statsEnabled) |
| fut.listen(new InvokeAllTimeStatClosure(metrics0(), start)); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( |
| Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| Object... args) throws IgniteCheckedException { |
| A.notNull(map, "map"); |
| |
| warnIfUnordered(map, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| Map<K, EntryProcessorResult<T>> entryProcessorResult = (Map<K, EntryProcessorResult<T>>)updateAllInternal( |
| TRANSFORM, |
| map.keySet(), |
| map.values(), |
| args, |
| expiryPerCall(), |
| false, |
| false, |
| null, |
| ctx.writeThrough(), |
| ctx.readThrough(), |
| opCtx != null && opCtx.isKeepBinary()); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| return entryProcessorResult; |
| } |
| |
| /** {@inheritDoc} */ |
| @SuppressWarnings("unchecked") |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( |
| Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| Object... args) { |
| A.notNull(map, "map"); |
| |
| warnIfUnordered(map, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| |
| final long start = statsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture fut = updateAllAsync0(null, |
| map, |
| args, |
| true, |
| false, |
| null); |
| |
| if (statsEnabled) |
| fut.listen(new InvokeAllTimeStatClosure(metrics0(), start)); |
| |
| return fut; |
| } |
| |
| /** |
| * Entry point for public API update methods. |
| * |
| * @param map Put map. Either {@code map} or {@code invokeMap} should be passed. |
| * @param invokeMap Transform map. Either {@code map} or {@code invokeMap} should be passed. |
| * @param invokeArgs Optional arguments for EntryProcessor. |
| * @param retval Return value required flag. |
| * @param rawRetval Return {@code GridCacheReturn} instance. |
| * @param filter Cache entry filter for atomic updates. |
| * @return Completion future. |
| */ |
| private IgniteInternalFuture updateAllAsync0( |
| @Nullable final Map<? extends K, ? extends V> map, |
| @Nullable final Map<? extends K, ? extends EntryProcessor> invokeMap, |
| @Nullable final Object[] invokeArgs, |
| final boolean retval, |
| final boolean rawRetval, |
| @Nullable final CacheEntryPredicate filter |
| ) { |
| final GridCacheOperation op = invokeMap != null ? TRANSFORM : UPDATE; |
| |
| final Collection<? extends K> keys = |
| map != null ? map.keySet() : invokeMap != null ? invokeMap.keySet() : null; |
| |
| final Collection<?> vals = map != null ? map.values() : invokeMap != null ? invokeMap.values() : null; |
| |
| final boolean writeThrough = ctx.writeThrough(); |
| |
| final boolean readThrough = ctx.readThrough(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final ExpiryPolicy expiry = expiryPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| return asyncOp(new GridPlainCallable<Object>() { |
| @Override public Object call() throws Exception { |
| return updateAllInternal(op, |
| keys, |
| vals, |
| invokeArgs, |
| expiry, |
| retval, |
| rawRetval, |
| filter, |
| writeThrough, |
| readThrough, |
| keepBinary); |
| } |
| }); |
| } |
| |
| /** |
| * Entry point for public API remove methods. |
| * |
| * @param keys Keys to remove. |
| * @param retval Return value required flag. |
| * @param rawRetval Return {@code GridCacheReturn} instance. |
| * @param filter Cache entry filter. |
| * @return Completion future. |
| */ |
| private IgniteInternalFuture removeAllAsync0( |
| @Nullable final Collection<? extends K> keys, |
| final boolean retval, |
| final boolean rawRetval, |
| @Nullable final CacheEntryPredicate filter |
| ) { |
| final boolean writeThrough = ctx.writeThrough(); |
| |
| final boolean readThrough = ctx.readThrough(); |
| |
| final ExpiryPolicy expiryPlc = expiryPerCall(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| return asyncOp(new GridPlainCallable<Object>() { |
| @Override public Object call() throws Exception { |
| return updateAllInternal(DELETE, |
| keys, |
| null, |
| null, |
| expiryPlc, |
| retval, |
| rawRetval, |
| filter, |
| writeThrough, |
| readThrough, |
| keepBinary); |
| } |
| }); |
| } |
| |
| /** |
| * Entry point for all public update methods (put, remove, invoke). |
| * |
| * @param op Operation. |
| * @param keys Keys. |
| * @param vals Values. |
| * @param invokeArgs Optional arguments for EntryProcessor. |
| * @param expiryPlc Expiry policy. |
| * @param retval Return value required flag. |
| * @param rawRetval Return {@code GridCacheReturn} instance. |
| * @param filter Cache entry filter. |
| * @param writeThrough Write through. |
| * @param readThrough Read through. |
| * @return Update result. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @SuppressWarnings("unchecked") |
| private Object updateAllInternal(GridCacheOperation op, |
| Collection<? extends K> keys, |
| @Nullable Iterable<?> vals, |
| @Nullable Object[] invokeArgs, |
| @Nullable ExpiryPolicy expiryPlc, |
| boolean retval, |
| boolean rawRetval, |
| CacheEntryPredicate filter, |
| boolean writeThrough, |
| boolean readThrough, |
| boolean keepBinary |
| ) throws IgniteCheckedException { |
| if (op == DELETE) |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| else |
| ctx.checkSecurity(SecurityPermission.CACHE_PUT); |
| |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| GridCacheVersion ver = nextVersion(); |
| |
| CacheEntryPredicate[] filters = CU.filterArray(filter); |
| |
| IgniteBiTuple<Boolean, ?> res = null; |
| |
| CachePartialUpdateCheckedException err = null; |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); |
| |
| if (writeThrough && keys.size() > 1) { |
| return updateWithBatch(op, |
| keys, |
| vals, |
| invokeArgs, |
| expiryPlc, |
| ver, |
| filters, |
| keepBinary, |
| taskName); |
| } |
| |
| Iterator<?> valsIter = vals != null ? vals.iterator() : null; |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| for (K key : keys) { |
| if (key == null) |
| throw new NullPointerException("Null key."); |
| |
| Object val = valsIter != null ? valsIter.next() : null; |
| |
| if (val == null && op != DELETE) |
| throw new NullPointerException("Null value."); |
| |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| if (op == UPDATE) { |
| val = ctx.toCacheObject(val); |
| |
| ctx.validateKeyAndValue(cacheKey, (CacheObject)val); |
| } |
| else if (op == TRANSFORM) |
| ctx.kernalContext().resource().inject(val, GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, ctx.name()); |
| |
| while (true) { |
| GridCacheEntryEx entry = null; |
| |
| try { |
| entry = entryEx(cacheKey); |
| |
| GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( |
| ver, |
| val == null ? DELETE : op, |
| val, |
| invokeArgs, |
| writeThrough, |
| readThrough, |
| retval, |
| keepBinary, |
| expiryPlc, |
| true, |
| true, |
| filters, |
| intercept, |
| taskName, |
| false); |
| |
| if (op == TRANSFORM) { |
| if (t.get3() != null) { |
| Map<K, EntryProcessorResult> computedMap; |
| |
| if (res == null) { |
| computedMap = U.newHashMap(keys.size()); |
| |
| res = new IgniteBiTuple<>(true, computedMap); |
| } |
| else |
| computedMap = (Map<K, EntryProcessorResult>)res.get2(); |
| |
| computedMap.put(key, t.get3()); |
| } |
| } |
| else if (res == null) |
| res = new T2(t.get1(), t.get2()); |
| |
| break; // While. |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry while updating (will retry): " + key); |
| |
| entry = null; |
| } |
| catch (IgniteCheckedException e) { |
| if (err == null) |
| err = partialUpdateException(); |
| |
| err.add(F.asList(key), e); |
| |
| U.error(log, "Failed to update key : " + key, e); |
| |
| break; |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| } |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| |
| if (err != null) |
| throw err; |
| |
| Object ret = res == null ? null : rawRetval ? new GridCacheReturn( |
| ctx, |
| true, |
| keepBinary, |
| U.deploymentClassLoader(ctx.kernalContext(), U.contextDeploymentClassLoaderId(ctx.kernalContext())), |
| res.get2(), |
| res.get1() |
| ) : (retval || op == TRANSFORM) ? res.get2() : res.get1(); |
| |
| if (op == TRANSFORM && ret == null) |
| ret = Collections.emptyMap(); |
| |
| return ret; |
| } |
| |
| /** |
| * Updates entries using batched write-through. |
| * |
| * @param op Operation. |
| * @param keys Keys. |
| * @param vals Values. |
| * @param invokeArgs Optional arguments for EntryProcessor. |
| * @param expiryPlc Expiry policy. |
| * @param ver Cache version. |
| * @param filter Optional filter. |
| * @param taskName Task name. |
| * @return Results map for invoke operation. |
| * @throws CachePartialUpdateCheckedException If update failed. |
| */ |
| @SuppressWarnings({"ForLoopReplaceableByForEach", "unchecked"}) |
| private Map<K, EntryProcessorResult> updateWithBatch( |
| GridCacheOperation op, |
| Collection<? extends K> keys, |
| @Nullable Iterable<?> vals, |
| @Nullable Object[] invokeArgs, |
| @Nullable ExpiryPolicy expiryPlc, |
| GridCacheVersion ver, |
| @Nullable CacheEntryPredicate[] filter, |
| boolean keepBinary, |
| String taskName |
| ) throws IgniteCheckedException { |
| List<GridCacheEntryEx> locked = lockEntries(keys); |
| |
| try { |
| int size = locked.size(); |
| |
| Map<KeyCacheObject, CacheObject> putMap = null; |
| |
| Collection<KeyCacheObject> rmvKeys = null; |
| |
| List<CacheObject> writeVals = null; |
| |
| Map<K, EntryProcessorResult> invokeResMap = |
| op == TRANSFORM ? U.<K, EntryProcessorResult>newHashMap(size) : null; |
| |
| List<GridCacheEntryEx> filtered = new ArrayList<>(size); |
| |
| CachePartialUpdateCheckedException err = null; |
| |
| Iterator<?> valsIter = vals != null ? vals.iterator() : null; |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| for (int i = 0; i < size; i++) { |
| GridCacheEntryEx entry = locked.get(i); |
| |
| Object val = valsIter != null ? valsIter.next() : null; |
| |
| if (val == null && op != DELETE) |
| throw new NullPointerException("Null value."); |
| |
| try { |
| try { |
| if (!ctx.isAllLocked(entry, filter)) { |
| if (log.isDebugEnabled()) |
| log.debug("Entry did not pass the filter (will skip write) [entry=" + entry + |
| ", filter=" + Arrays.toString(filter) + ']'); |
| |
| continue; |
| } |
| } |
| catch (IgniteCheckedException e) { |
| if (err == null) |
| err = partialUpdateException(); |
| |
| err.add(F.asList(entry.key()), e); |
| |
| continue; |
| } |
| |
| if (op == TRANSFORM) { |
| ctx.kernalContext().resource().inject(val, |
| GridResourceIoc.AnnotationSet.ENTRY_PROCESSOR, |
| ctx.name()); |
| |
| EntryProcessor<Object, Object, Object> entryProcessor = |
| (EntryProcessor<Object, Object, Object>)val; |
| |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read-through*/true, |
| /*update-metrics*/true, |
| /*event*/true, |
| entryProcessor, |
| taskName, |
| null, |
| keepBinary); |
| |
| Object oldVal = null; |
| |
| CacheInvokeEntry<Object, Object> invokeEntry = new CacheInvokeEntry<>(entry.key(), old, |
| entry.version(), keepBinary, entry); |
| |
| CacheObject updated; |
| Object updatedVal = null; |
| CacheInvokeResult invokeRes = null; |
| |
| boolean validation = false; |
| |
| IgniteThread.onEntryProcessorEntered(false); |
| |
| try { |
| Object computed = entryProcessor.process(invokeEntry, invokeArgs); |
| |
| updatedVal = ctx.unwrapTemporary(invokeEntry.getValue()); |
| |
| updated = ctx.toCacheObject(updatedVal); |
| |
| if (computed != null) |
| invokeRes = CacheInvokeResult.fromResult(ctx.unwrapTemporary(computed)); |
| |
| if (invokeEntry.modified() && updated != null) { |
| validation = true; |
| |
| ctx.validateKeyAndValue(entry.key(), updated); |
| } |
| else if (ctx.statisticsEnabled() && !invokeEntry.modified()) |
| ctx.cache().metrics0().onReadOnlyInvoke(old != null); |
| } |
| catch (Exception e) { |
| invokeRes = CacheInvokeResult.fromError(e); |
| |
| updated = old; |
| |
| if (validation) { |
| invokeResMap.put((K)entry.key().value(ctx.cacheObjectContext(), false), invokeRes); |
| |
| continue; |
| } |
| } |
| finally { |
| IgniteThread.onEntryProcessorLeft(); |
| } |
| |
| if (invokeRes != null) |
| invokeResMap.put((K)entry.key().value(ctx.cacheObjectContext(), false), invokeRes); |
| |
| if (updated == null) { |
| if (intercept) { |
| IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() |
| .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), invokeEntry.key(), |
| old, oldVal, keepBinary)); |
| |
| if (ctx.cancelRemove(interceptorRes)) |
| continue; |
| } |
| |
| // Update previous batch. |
| if (putMap != null) { |
| err = updatePartialBatch( |
| filtered, |
| ver, |
| writeVals, |
| putMap, |
| null, |
| expiryPlc, |
| keepBinary, |
| err, |
| taskName, |
| true); |
| |
| putMap = null; |
| writeVals = null; |
| |
| filtered = new ArrayList<>(); |
| } |
| |
| // Start collecting new batch. |
| if (rmvKeys == null) |
| rmvKeys = new ArrayList<>(size); |
| |
| rmvKeys.add(entry.key()); |
| } |
| else { |
| if (intercept) { |
| Object interceptorVal = ctx.config().getInterceptor() |
| .onBeforePut(new CacheLazyEntry(ctx, entry.key(), invokeEntry.getKey(), |
| old, oldVal, keepBinary), updatedVal); |
| |
| if (interceptorVal == null) |
| continue; |
| |
| updated = ctx.toCacheObject(ctx.unwrapTemporary(interceptorVal)); |
| } |
| |
| // Update previous batch. |
| if (rmvKeys != null) { |
| err = updatePartialBatch( |
| filtered, |
| ver, |
| null, |
| null, |
| rmvKeys, |
| expiryPlc, |
| keepBinary, |
| err, |
| taskName, |
| true); |
| |
| rmvKeys = null; |
| |
| filtered = new ArrayList<>(); |
| } |
| |
| if (putMap == null) { |
| putMap = new LinkedHashMap<>(size, 1.0f); |
| writeVals = new ArrayList<>(size); |
| } |
| |
| putMap.put(entry.key(), updated); |
| writeVals.add(updated); |
| } |
| } |
| else if (op == UPDATE) { |
| CacheObject cacheVal = ctx.toCacheObject(val); |
| |
| if (intercept) { |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read-through*/ctx.loadPreviousValue(), |
| /*update-metrics*/true, |
| /*event*/true, |
| null, |
| taskName, |
| null, |
| keepBinary); |
| |
| Object interceptorVal = ctx.config().getInterceptor().onBeforePut(new CacheLazyEntry( |
| ctx, entry.key(), old, keepBinary), val); |
| |
| if (interceptorVal == null) |
| continue; |
| |
| cacheVal = ctx.toCacheObject(ctx.unwrapTemporary(interceptorVal)); |
| } |
| |
| ctx.validateKeyAndValue(entry.key(), cacheVal); |
| |
| if (putMap == null) { |
| putMap = new LinkedHashMap<>(size, 1.0f); |
| writeVals = new ArrayList<>(size); |
| } |
| |
| putMap.put(entry.key(), cacheVal); |
| writeVals.add(cacheVal); |
| } |
| else { |
| assert op == DELETE; |
| |
| if (intercept) { |
| CacheObject old = entry.innerGet( |
| null, |
| null, |
| /*read-through*/ctx.loadPreviousValue(), |
| /*update-metrics*/true, |
| /*event*/true, |
| null, |
| taskName, |
| null, |
| keepBinary); |
| |
| IgniteBiTuple<Boolean, ?> interceptorRes = ctx.config().getInterceptor() |
| .onBeforeRemove(new CacheLazyEntry(ctx, entry.key(), old, keepBinary)); |
| |
| if (ctx.cancelRemove(interceptorRes)) |
| continue; |
| } |
| |
| if (rmvKeys == null) |
| rmvKeys = new ArrayList<>(size); |
| |
| rmvKeys.add(entry.key()); |
| } |
| |
| filtered.add(entry); |
| } |
| catch (IgniteCheckedException e) { |
| if (err == null) |
| err = partialUpdateException(); |
| |
| err.add(F.asList(entry.key()), e); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| assert false : "Entry cannot become obsolete while holding lock."; |
| } |
| } |
| |
| // Store final batch. |
| if (putMap != null || rmvKeys != null) { |
| err = updatePartialBatch( |
| filtered, |
| ver, |
| writeVals, |
| putMap, |
| rmvKeys, |
| expiryPlc, |
| keepBinary, |
| err, |
| taskName, |
| op == TRANSFORM); |
| } |
| else |
| assert filtered.isEmpty(); |
| |
| if (err != null) |
| throw err; |
| |
| return invokeResMap; |
| } |
| finally { |
| unlockEntries(locked); |
| } |
| } |
| |
| /** |
| * @param entries Entries to update. |
| * @param ver Cache version. |
| * @param writeVals Cache values. |
| * @param putMap Values to put. |
| * @param rmvKeys Keys to remove. |
| * @param expiryPlc Expiry policy. |
| * @param err Optional partial update exception. |
| * @param taskName Task name. |
| * @param transformed {@code True} if transform operation performed. |
| * @return Partial update exception. |
| */ |
| @SuppressWarnings({"unchecked", "ConstantConditions"}) |
| @Nullable private CachePartialUpdateCheckedException updatePartialBatch( |
| List<GridCacheEntryEx> entries, |
| final GridCacheVersion ver, |
| @Nullable List<CacheObject> writeVals, |
| @Nullable Map<KeyCacheObject, CacheObject> putMap, |
| @Nullable Collection<KeyCacheObject> rmvKeys, |
| @Nullable ExpiryPolicy expiryPlc, |
| boolean keepBinary, |
| @Nullable CachePartialUpdateCheckedException err, |
| String taskName, |
| boolean transformed) { |
| assert putMap == null ^ rmvKeys == null; |
| GridCacheOperation op; |
| |
| CacheStorePartialUpdateException storeErr = null; |
| |
| try { |
| if (putMap != null) { |
| try { |
| Map<? extends KeyCacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>> view = F.viewReadOnly(putMap, |
| new C1<CacheObject, IgniteBiTuple<? extends CacheObject, GridCacheVersion>>() { |
| @Override public IgniteBiTuple<? extends CacheObject, GridCacheVersion> apply(CacheObject val) { |
| return F.t(val, ver); |
| } |
| }); |
| |
| ctx.store().putAll(null, view); |
| } |
| catch (CacheStorePartialUpdateException e) { |
| storeErr = e; |
| } |
| |
| op = UPDATE; |
| } |
| else { |
| try { |
| ctx.store().removeAll(null, rmvKeys); |
| } |
| catch (CacheStorePartialUpdateException e) { |
| storeErr = e; |
| } |
| |
| op = DELETE; |
| } |
| } |
| catch (IgniteCheckedException e) { |
| if (err == null) |
| err = partialUpdateException(); |
| |
| err.add(putMap != null ? putMap.keySet() : rmvKeys, e); |
| |
| return err; |
| } |
| |
| boolean intercept = ctx.config().getInterceptor() != null; |
| |
| for (int i = 0; i < entries.size(); i++) { |
| GridCacheEntryEx entry = entries.get(i); |
| |
| assert entry.lockedByCurrentThread(); |
| |
| if (entry.obsolete() || |
| (storeErr != null && storeErr.failedKeys().contains(entry.key().value(ctx.cacheObjectContext(), false)))) |
| continue; |
| |
| try { |
| // We are holding java-level locks on entries at this point. |
| CacheObject writeVal = op == UPDATE ? writeVals.get(i) : null; |
| |
| assert writeVal != null || op == DELETE : "null write value found."; |
| |
| GridTuple3<Boolean, Object, EntryProcessorResult<Object>> t = entry.innerUpdateLocal( |
| ver, |
| op, |
| writeVal, |
| null, |
| false, |
| false, |
| false, |
| keepBinary, |
| expiryPlc, |
| true, |
| true, |
| null, |
| false, |
| taskName, |
| transformed); |
| |
| if (intercept) { |
| if (op == UPDATE) |
| ctx.config().getInterceptor().onAfterPut(new CacheLazyEntry(ctx, entry.key(), writeVal, keepBinary)); |
| else |
| ctx.config().getInterceptor().onAfterRemove(new CacheLazyEntry(ctx, entry.key(), t.get2(), keepBinary)); |
| } |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| assert false : "Entry cannot become obsolete while holding lock."; |
| } |
| catch (IgniteCheckedException e) { |
| if (err == null) |
| err = partialUpdateException(); |
| |
| err.add(Collections.singleton(entry.key()), e); |
| } |
| } |
| |
| return err; |
| } |
| |
| /** |
| * Acquires java-level locks on cache entries. |
| * |
| * @param keys Keys to lock. |
| * @return Collection of locked entries. |
| */ |
| private List<GridCacheEntryEx> lockEntries(Collection<? extends K> keys) { |
| GridCacheEntryEx[] locked = new GridCacheEntryEx[keys.size()]; |
| |
| boolean nullKeys = false; |
| |
| while (true) { |
| int i = 0; |
| |
| for (K key : keys) { |
| if (key == null) { |
| nullKeys = true; |
| |
| break; |
| } |
| |
| GridCacheEntryEx entry = entryEx(ctx.toCacheKeyObject(key)); |
| |
| locked[i++] = entry; |
| } |
| |
| if (nullKeys) |
| break; |
| |
| if (lockedEntriesInfo.tryLockEntries(locked)) |
| return Arrays.asList(locked); |
| } |
| |
| assert nullKeys; |
| |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| for (GridCacheEntryEx entry : locked) { |
| if (entry != null) |
| entry.touch(); |
| } |
| |
| throw new NullPointerException("Null key."); |
| } |
| |
| /** |
| * Releases java-level locks on cache entries. |
| * |
| * @param locked Locked entries. |
| */ |
| private void unlockEntries(Iterable<GridCacheEntryEx> locked) { |
| for (GridCacheEntryEx entry : locked) |
| entry.unlockEntry(); |
| |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| for (GridCacheEntryEx entry : locked) |
| entry.touch(); |
| } |
| |
| /** |
| * @return New partial update exception. |
| */ |
| private static CachePartialUpdateCheckedException partialUpdateException() { |
| return new CachePartialUpdateCheckedException("Failed to update keys (retry update if possible)."); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Boolean> txLockAsync(Collection<KeyCacheObject> keys, |
| long timeout, |
| IgniteTxLocalEx tx, |
| boolean isRead, |
| boolean retval, |
| TransactionIsolation isolation, |
| boolean invalidate, |
| long createTtl, |
| long accessTtl) { |
| return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " + |
| "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Boolean> lockAllAsync(@Nullable Collection<? extends K> keys, |
| long timeout) { |
| return new GridFinishedFuture<>(new UnsupportedOperationException("Locks are not supported for " + |
| "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)")); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unlockAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { |
| throw new UnsupportedOperationException("Locks are not supported for " + |
| "CacheAtomicityMode.ATOMIC mode (use CacheAtomicityMode.TRANSACTIONAL instead)"); |
| } |
| |
| /** |
| * @return Expiry policy. |
| */ |
| @Nullable private ExpiryPolicy expiryPerCall() { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| ExpiryPolicy expiry = opCtx != null ? opCtx.expiry() : null; |
| |
| if (expiry == null) |
| expiry = ctx.expiry(); |
| |
| return expiry; |
| } |
| |
| /** |
| * @param op Operation closure. |
| * @return Future. |
| */ |
| @SuppressWarnings("unchecked") |
| private IgniteInternalFuture asyncOp(final Callable<?> op) { |
| IgniteInternalFuture fail = asyncOpAcquire(/*retry*/false); |
| |
| if (fail != null) |
| return fail; |
| |
| IgniteInternalFuture f = ctx.closures().callLocalSafe(op); |
| |
| f.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| asyncOpRelease(false); |
| } |
| }); |
| |
| return f; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) { |
| assert false : "Should not be called"; |
| } |
| } |