| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache; |
| |
| import java.io.Externalizable; |
| import java.io.IOException; |
| import java.io.InvalidObjectException; |
| import java.io.ObjectInput; |
| import java.io.ObjectOutput; |
| import java.io.ObjectStreamException; |
| import java.util.AbstractSet; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.Set; |
| import java.util.SortedMap; |
| import java.util.SortedSet; |
| import java.util.UUID; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Function; |
| import java.util.function.Supplier; |
| import javax.cache.Cache; |
| import javax.cache.expiry.ExpiryPolicy; |
| import javax.cache.processor.EntryProcessor; |
| import javax.cache.processor.EntryProcessorException; |
| import javax.cache.processor.EntryProcessorResult; |
| import javax.cache.processor.MutableEntry; |
| import org.apache.ignite.Ignite; |
| import org.apache.ignite.IgniteCache; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.IgniteSystemProperties; |
| import org.apache.ignite.cache.CacheEntry; |
| import org.apache.ignite.cache.CacheEntryProcessor; |
| import org.apache.ignite.cache.CacheInterceptor; |
| import org.apache.ignite.cache.CacheMetrics; |
| import org.apache.ignite.cache.CachePeekMode; |
| import org.apache.ignite.cache.ReadRepairStrategy; |
| import org.apache.ignite.cache.affinity.Affinity; |
| import org.apache.ignite.cluster.ClusterGroup; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.cluster.ClusterTopologyException; |
| import org.apache.ignite.compute.ComputeJob; |
| import org.apache.ignite.compute.ComputeJobAdapter; |
| import org.apache.ignite.compute.ComputeJobContext; |
| import org.apache.ignite.compute.ComputeJobResult; |
| import org.apache.ignite.compute.ComputeJobResultPolicy; |
| import org.apache.ignite.compute.ComputeTaskAdapter; |
| import org.apache.ignite.configuration.CacheConfiguration; |
| import org.apache.ignite.configuration.IgniteConfiguration; |
| import org.apache.ignite.configuration.TransactionConfiguration; |
| import org.apache.ignite.internal.ComputeTaskInternalFuture; |
| import org.apache.ignite.internal.IgniteEx; |
| import org.apache.ignite.internal.IgniteFeatures; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.IgniteInterruptedCheckedException; |
| import org.apache.ignite.internal.IgniteKernal; |
| import org.apache.ignite.internal.IgniteTransactionsEx; |
| import org.apache.ignite.internal.IgnitionEx; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyServerNotFoundException; |
| import org.apache.ignite.internal.cluster.IgniteClusterEx; |
| import org.apache.ignite.internal.managers.discovery.IgniteClusterNode; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl; |
| import org.apache.ignite.internal.processors.cache.distributed.IgniteExternalizableExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtTopologyFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal; |
| import org.apache.ignite.internal.processors.cache.distributed.near.consistency.GridCompoundReadRepairFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteAtomicConsistencyViolationException; |
| import org.apache.ignite.internal.processors.cache.distributed.near.consistency.IgniteConsistencyViolationException; |
| import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; |
| import org.apache.ignite.internal.processors.cache.persistence.CacheDataRow; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxKey; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalAdapter; |
| import org.apache.ignite.internal.processors.cache.transactions.IgniteTxLocalEx; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheRawVersionedEntry; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry; |
| import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl; |
| import org.apache.ignite.internal.processors.dr.IgniteDrDataStreamerCacheUpdater; |
| import org.apache.ignite.internal.processors.performancestatistics.OperationType; |
| import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; |
| import org.apache.ignite.internal.processors.task.GridInternal; |
| import org.apache.ignite.internal.transactions.IgniteTxHeuristicCheckedException; |
| import org.apache.ignite.internal.transactions.IgniteTxRollbackCheckedException; |
| import org.apache.ignite.internal.transactions.IgniteTxTimeoutCheckedException; |
| import org.apache.ignite.internal.transactions.TransactionCheckedException; |
| import org.apache.ignite.internal.util.GridSerializableMap; |
| import org.apache.ignite.internal.util.future.GridEmbeddedFuture; |
| import org.apache.ignite.internal.util.future.GridFinishedFuture; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.GridCloseableIterator; |
| import org.apache.ignite.internal.util.lang.GridClosureException; |
| import org.apache.ignite.internal.util.lang.GridPlainCallable; |
| import org.apache.ignite.internal.util.lang.GridPlainRunnable; |
| import org.apache.ignite.internal.util.tostring.GridToStringExclude; |
| import org.apache.ignite.internal.util.typedef.C1; |
| import org.apache.ignite.internal.util.typedef.C2; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.CIX2; |
| import org.apache.ignite.internal.util.typedef.CIX3; |
| import org.apache.ignite.internal.util.typedef.CX1; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.T2; |
| import org.apache.ignite.internal.util.typedef.X; |
| import org.apache.ignite.internal.util.typedef.internal.A; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.GPC; |
| import org.apache.ignite.internal.util.typedef.internal.LT; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteCallable; |
| import org.apache.ignite.lang.IgniteClosure; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteOutClosure; |
| import org.apache.ignite.lang.IgnitePredicate; |
| import org.apache.ignite.lang.IgniteProductVersion; |
| import org.apache.ignite.lang.IgniteRunnable; |
| import org.apache.ignite.mxbean.CacheMetricsMXBean; |
| import org.apache.ignite.plugin.security.SecurityPermission; |
| import org.apache.ignite.resources.IgniteInstanceResource; |
| import org.apache.ignite.resources.JobContextResource; |
| import org.apache.ignite.resources.LoggerResource; |
| import org.apache.ignite.thread.IgniteThreadFactory; |
| import org.apache.ignite.transactions.Transaction; |
| import org.apache.ignite.transactions.TransactionConcurrency; |
| import org.apache.ignite.transactions.TransactionIsolation; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.IgniteSystemProperties.IGNITE_CACHE_RETRIES_COUNT; |
| import static org.apache.ignite.internal.GridClosureCallMode.BROADCAST; |
| import static org.apache.ignite.internal.processors.cache.CacheOperationContext.DFLT_ALLOW_ATOMIC_OPS_IN_TX; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; |
| import static org.apache.ignite.internal.processors.metric.impl.MetricUtils.cacheMetricsRegistryName; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_NO_FAILOVER; |
| import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.TC_SUBGRID; |
| import static org.apache.ignite.transactions.TransactionConcurrency.OPTIMISTIC; |
| import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; |
| import static org.apache.ignite.transactions.TransactionIsolation.READ_COMMITTED; |
| import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; |
| import static org.apache.ignite.transactions.TransactionIsolation.SERIALIZABLE; |
| |
| /** |
| * Adapter for different cache implementations. |
| */ |
| @SuppressWarnings("unchecked") |
| public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** clearLocally() split threshold. */ |
| public static final int CLEAR_ALL_SPLIT_THRESHOLD = 10000; |
| |
| /** @see IgniteSystemProperties#IGNITE_CACHE_START_SIZE */ |
| public static final int DFLT_CACHE_START_SIZE = 4096; |
| |
| /** Default cache start size. */ |
| public static final int DFLT_START_CACHE_SIZE = IgniteSystemProperties.getInteger( |
| IgniteSystemProperties.IGNITE_CACHE_START_SIZE, DFLT_CACHE_START_SIZE); |
| |
| /** Size of keys batch to removeAll. */ |
| private static final int REMOVE_ALL_KEYS_BATCH = 10000; |
| |
| /** @see IgniteSystemProperties#IGNITE_CACHE_RETRIES_COUNT */ |
| public static final int DFLT_CACHE_RETRIES_COUNT = 100; |
| |
| /** Maximum number of retries when topology changes. */ |
| public static final int MAX_RETRIES = |
| IgniteSystemProperties.getInteger(IGNITE_CACHE_RETRIES_COUNT, DFLT_CACHE_RETRIES_COUNT); |
| |
| /** Minimum version supporting partition preloading. */ |
| private static final IgniteProductVersion PRELOAD_PARTITION_SINCE = IgniteProductVersion.fromString("2.7.0"); |
| |
| /** Deserialization stash. */ |
| private static final ThreadLocal<IgniteBiTuple<String, String>> stash = new ThreadLocal<IgniteBiTuple<String, |
| String>>() { |
| @Override protected IgniteBiTuple<String, String> initialValue() { |
| return new IgniteBiTuple<>(); |
| } |
| }; |
| |
| /** {@link GridCacheReturn}-to-value conversion. */ |
| private static final IgniteClosure RET2VAL = |
| new CX1<IgniteInternalFuture<GridCacheReturn>, Object>() { |
| @Nullable @Override public Object applyx(IgniteInternalFuture<GridCacheReturn> fut) |
| throws IgniteCheckedException { |
| return fut.get().value(); |
| } |
| |
| @Override public String toString() { |
| return "Cache return value to value converter."; |
| } |
| }; |
| |
| /** {@link GridCacheReturn}-to-null conversion. */ |
| protected static final IgniteClosure RET2NULL = |
| new CX1<IgniteInternalFuture<GridCacheReturn>, Object>() { |
| @Nullable @Override public Object applyx(IgniteInternalFuture<GridCacheReturn> fut) |
| throws IgniteCheckedException { |
| fut.get(); |
| |
| return null; |
| } |
| |
| @Override public String toString() { |
| return "Cache return value to null converter."; |
| } |
| }; |
| |
| /** {@link GridCacheReturn}-to-success conversion. */ |
| private static final IgniteClosure RET2FLAG = |
| new CX1<IgniteInternalFuture<GridCacheReturn>, Boolean>() { |
| @Override public Boolean applyx(IgniteInternalFuture<GridCacheReturn> fut) throws IgniteCheckedException { |
| return fut.get().success(); |
| } |
| |
| @Override public String toString() { |
| return "Cache return value to boolean flag converter."; |
| } |
| }; |
| |
| /** Last asynchronous future. */ |
| protected ThreadLocal<FutureHolder> lastFut = new ThreadLocal<FutureHolder>() { |
| @Override protected FutureHolder initialValue() { |
| return new FutureHolder(); |
| } |
| }; |
| |
| /** Cache configuration. */ |
| @GridToStringExclude |
| protected GridCacheContext<K, V> ctx; |
| |
| /** Local map. */ |
| @GridToStringExclude |
| protected GridCacheConcurrentMap map; |
| |
| /** Local node ID. */ |
| @GridToStringExclude |
| protected UUID locNodeId; |
| |
| /** Cache configuration. */ |
| @GridToStringExclude |
| protected CacheConfiguration cacheCfg; |
| |
| /** Grid configuration. */ |
| @GridToStringExclude |
| private IgniteConfiguration gridCfg; |
| |
| /** Cache metrics. */ |
| protected CacheMetricsImpl metrics; |
| |
| /** Cache localMxBean. */ |
| private CacheMetricsMXBean locMxBean; |
| |
| /** Cache mxBean. */ |
| private CacheMetricsMXBean clusterMxBean; |
| |
| /** Logger. */ |
| protected IgniteLogger log; |
| |
| /** Logger. */ |
| protected IgniteLogger txLockMsgLog; |
| |
| /** Affinity impl. */ |
| private Affinity<K> aff; |
| |
| /** Asynchronous operations limit semaphore. */ |
| private Semaphore asyncOpsSem; |
| |
| /** {@code True} if attempted to use partition preloading on outdated node. */ |
| private volatile boolean partPreloadBadVerWarned; |
| |
| /** Active. */ |
| private volatile boolean active; |
| |
| /** {@inheritDoc} */ |
| @Override public String name() { |
| return cacheCfg.getName(); |
| } |
| |
| /** |
| * Empty constructor required by {@link Externalizable}. |
| */ |
| protected GridCacheAdapter() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Cache context. |
| */ |
| protected GridCacheAdapter(GridCacheContext<K, V> ctx) { |
| this(ctx, null); |
| } |
| |
| /** |
| * @param ctx Cache context. |
| * @param map Concurrent map. |
| */ |
| @SuppressWarnings({"OverriddenMethodCallDuringObjectConstruction", "deprecation"}) |
| protected GridCacheAdapter(final GridCacheContext<K, V> ctx, @Nullable GridCacheConcurrentMap map) { |
| assert ctx != null; |
| |
| this.ctx = ctx; |
| |
| gridCfg = ctx.gridConfig(); |
| cacheCfg = ctx.config(); |
| |
| locNodeId = ctx.gridConfig().getNodeId(); |
| |
| this.map = map; |
| |
| log = ctx.logger(getClass()); |
| txLockMsgLog = ctx.shared().txLockMessageLogger(); |
| |
| metrics = new CacheMetricsImpl(ctx, isNear()); |
| |
| locMxBean = new CacheLocalMetricsMXBeanImpl(this); |
| clusterMxBean = new CacheClusterMetricsMXBeanImpl(this); |
| |
| if (ctx.config().getMaxConcurrentAsyncOperations() > 0) |
| asyncOpsSem = new Semaphore(ctx.config().getMaxConcurrentAsyncOperations()); |
| |
| init(); |
| |
| aff = new GridCacheAffinityImpl<>(ctx); |
| } |
| |
| /** |
| * Prints memory stats. |
| */ |
| public void printMemoryStats() { |
| if (ctx.isNear()) { |
| X.println(">>> Near cache size: " + size()); |
| |
| ctx.near().dht().printMemoryStats(); |
| } |
| else if (ctx.isDht()) |
| X.println(">>> DHT cache size: " + size()); |
| else |
| X.println(">>> Cache size: " + size()); |
| } |
| |
| /** |
| * @return Base map. |
| */ |
| public GridCacheConcurrentMap map() { |
| return map; |
| } |
| |
| /** |
| * Increments map public size. |
| * |
| * @param e Map entry. |
| */ |
| public void incrementSize(GridCacheMapEntry e) { |
| map.incrementPublicSize(null, e); |
| } |
| |
| /** |
| * Decrements map public size. |
| * |
| * @param e Map entry. |
| */ |
| public void decrementSize(GridCacheMapEntry e) { |
| map.decrementPublicSize(null, e); |
| } |
| |
| /** |
| * @return Context. |
| */ |
| @Override public GridCacheContext<K, V> context() { |
| return ctx; |
| } |
| |
| /** |
| * @return Logger. |
| */ |
| protected IgniteLogger log() { |
| return log; |
| } |
| |
| /** |
| * @return {@code True} if this is near cache. |
| */ |
| public boolean isNear() { |
| return false; |
| } |
| |
| /** |
| * @return {@code True} if cache is local. |
| */ |
| public boolean isLocal() { |
| return false; |
| } |
| |
| /** |
| * @return {@code True} if cache is colocated. |
| */ |
| public boolean isColocated() { |
| return false; |
| } |
| |
| /** |
| * @return {@code True} if cache is DHT Atomic. |
| */ |
| public boolean isDhtAtomic() { |
| return false; |
| } |
| |
| /** |
| * @return {@code True} if cache is DHT. |
| */ |
| public boolean isDht() { |
| return false; |
| } |
| |
| /** |
| * |
| */ |
| public boolean active() { |
| return active; |
| } |
| |
| /** |
| * @param active Active. |
| */ |
| public void active(boolean active) { |
| this.active = active; |
| } |
| |
| /** |
| * @return Preloader. |
| */ |
| public abstract GridCachePreloader preloader(); |
| |
| /** {@inheritDoc} */ |
| @Override public final Affinity<K> affinity() { |
| return aff; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final <K1, V1> IgniteInternalCache<K1, V1> cache() { |
| return (IgniteInternalCache<K1, V1>)this; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean skipStore() { |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final GridCacheProxyImpl<K, V> setSkipStore(boolean skipStore) { |
| CacheOperationContext opCtx = new CacheOperationContext( |
| true, |
| false, |
| null, |
| false, |
| null, |
| false, |
| null, |
| DFLT_ALLOW_ATOMIC_OPS_IN_TX); |
| |
| return new GridCacheProxyImpl<>(ctx, this, opCtx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final <K1, V1> GridCacheProxyImpl<K1, V1> keepBinary() { |
| CacheOperationContext opCtx = new CacheOperationContext( |
| false, |
| true, |
| null, |
| false, |
| null, |
| false, |
| null, |
| DFLT_ALLOW_ATOMIC_OPS_IN_TX); |
| |
| return new GridCacheProxyImpl<>((GridCacheContext<K1, V1>)ctx, (GridCacheAdapter<K1, V1>)this, opCtx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final ExpiryPolicy expiry() { |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final GridCacheProxyImpl<K, V> withExpiryPolicy(ExpiryPolicy plc) { |
| assert !CU.isUtilityCache(ctx.name()); |
| |
| CacheOperationContext opCtx = new CacheOperationContext( |
| false, |
| false, |
| plc, |
| false, |
| null, |
| false, |
| null, |
| DFLT_ALLOW_ATOMIC_OPS_IN_TX); |
| |
| return new GridCacheProxyImpl<>(ctx, this, opCtx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalCache<K, V> withNoRetries() { |
| CacheOperationContext opCtx = new CacheOperationContext( |
| false, |
| false, |
| null, |
| true, |
| null, |
| false, |
| null, |
| DFLT_ALLOW_ATOMIC_OPS_IN_TX); |
| |
| return new GridCacheProxyImpl<>(ctx, this, opCtx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalCache<K, V> withAllowAtomicOpsInTx() { |
| CacheOperationContext opCtx = new CacheOperationContext( |
| false, |
| false, |
| null, |
| false, |
| null, |
| false, |
| null, |
| DFLT_ALLOW_ATOMIC_OPS_IN_TX); |
| |
| return new GridCacheProxyImpl<>(ctx, this, opCtx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final CacheConfiguration configuration() { |
| return ctx.config(); |
| } |
| |
| /** |
| * @param keys Keys to lock. |
| * @param timeout Lock timeout. |
| * @param tx Transaction. |
| * @param isRead {@code True} for read operations. |
| * @param retval Flag to return value. |
| * @param isolation Transaction isolation. |
| * @param invalidate Invalidate flag. |
| * @param createTtl TTL for create operation. |
| * @param accessTtl TTL for read operation. |
| * @return Locks future. |
| */ |
| public abstract IgniteInternalFuture<Boolean> txLockAsync( |
| Collection<KeyCacheObject> keys, |
| long timeout, |
| IgniteTxLocalEx tx, |
| boolean isRead, |
| boolean retval, |
| TransactionIsolation isolation, |
| boolean invalidate, |
| long createTtl, |
| long accessTtl); |
| |
| /** |
| * Post constructor initialization for subclasses. |
| */ |
| protected void init() { |
| // No-op. |
| } |
| |
| /** |
| * Starts this cache. Child classes should override this method |
| * to provide custom start-up behavior. |
| * |
| * @throws IgniteCheckedException If start failed. |
| */ |
| public abstract void start() throws IgniteCheckedException; |
| |
| /** |
| * Startup info. |
| * |
| * @return Startup info. |
| */ |
| protected final String startInfo() { |
| return "Cache started: " + U.maskName(ctx.config().getName()); |
| } |
| |
| /** |
| * Stops this cache. Child classes should override this method |
| * to provide custom stop behavior. |
| */ |
| public void stop() { |
| // Nulling thread local reference to ensure values will be eventually GCed |
| // no matter what references these futures are holding. |
| lastFut = null; |
| } |
| |
| /** |
| * Remove cache metrics. |
| * |
| * @param destroy Group destroy flag. |
| */ |
| public void removeMetrics(boolean destroy) { |
| if (!ctx.kernalContext().isStopping()) |
| ctx.kernalContext().metric().remove(cacheMetricsRegistryName(ctx.name(), isNear()), destroy); |
| } |
| |
| /** |
| * Stop info. |
| * |
| * @return Stop info. |
| */ |
| protected final String stopInfo() { |
| return "Cache stopped: " + U.maskName(ctx.config().getName()); |
| } |
| |
| /** |
| * Kernal start callback. |
| * |
| * @throws IgniteCheckedException If callback failed. |
| */ |
| protected void onKernalStart() throws IgniteCheckedException { |
| // No-op. |
| } |
| |
| /** |
| * Kernal stop callback. |
| */ |
| public void onKernalStop() { |
| // No-op. |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean isEmpty() { |
| try { |
| return localSize(null) == 0; |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean containsKey(K key) { |
| try { |
| return containsKeyAsync(key).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Boolean> containsKeyAsync(K key) { |
| A.notNull(key, "key"); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| IgniteInternalFuture fut = getAsync( |
| key, |
| /*force primary*/ !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| /*task name*/null, |
| /*deserialize binary*/false, |
| /*skip values*/true, |
| false); |
| |
| ReadRepairStrategy readRepairStrategy = opCtx != null ? opCtx.readRepairStrategy() : null; |
| |
| if (readRepairStrategy != null) |
| return getWithRepairAsync( |
| fut, |
| (e) -> repairAsync(e, opCtx, true), |
| () -> containsKeyAsync(key)); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean containsKeys(Collection<? extends K> keys) { |
| try { |
| return containsKeysAsync(keys).get(); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Boolean> containsKeysAsync(final Collection<? extends K> keys) { |
| A.notNull(keys, "keys"); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return repairableGetAllAsync( |
| keys, |
| /*force primary*/ !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| /*task name*/null, |
| /*deserialize binary*/false, |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| /*skip values*/true, |
| /*need ver*/false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() { |
| @Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException { |
| Map<K, V> kvMap = fut.get(); |
| |
| if (keys.size() != kvMap.size()) |
| return false; |
| |
| for (Map.Entry<K, V> entry : kvMap.entrySet()) { |
| if (entry.getValue() == null) |
| return false; |
| } |
| |
| return true; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final Iterable<Cache.Entry<K, V>> localEntries( |
| CachePeekMode[] peekModes) throws IgniteCheckedException { |
| assert peekModes != null; |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| PeekModes modes = parsePeekModes(peekModes, false); |
| |
| Collection<Iterator<Cache.Entry<K, V>>> its = new ArrayList<>(); |
| |
| final boolean keepBinary = ctx.keepBinary(); |
| |
| if (ctx.isLocal()) { |
| modes.primary = true; |
| modes.backup = true; |
| } |
| |
| if (modes.offheap) { |
| if (modes.heap && modes.near && ctx.isNear()) |
| its.add(ctx.near().nearEntries().iterator()); |
| |
| if (modes.primary || modes.backup) { |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| IgniteCacheOffheapManager offheapMgr = ctx.isNear() ? ctx.near().dht().context().offheap() : ctx.offheap(); |
| |
| MvccSnapshot mvccSnapshot = ctx.mvccEnabled() ? MvccUtils.MVCC_MAX_SNAPSHOT : null; |
| |
| its.add(offheapMgr.cacheEntriesIterator(ctx, modes.primary, modes.backup, topVer, ctx.keepBinary(), |
| mvccSnapshot, null)); |
| } |
| } |
| else if (modes.heap) { |
| if (ctx.mvccEnabled()) |
| return F.emptyIterator(); |
| |
| if (modes.near && ctx.isNear()) |
| its.add(ctx.near().nearEntries().iterator()); |
| |
| if (modes.primary || modes.backup) { |
| GridDhtCacheAdapter<K, V> cache = ctx.isNear() ? ctx.near().dht() : ctx.dht(); |
| |
| its.add(cache.localEntriesIterator(modes.primary, modes.backup, keepBinary)); |
| } |
| } |
| |
| final Iterator<Cache.Entry<K, V>> it = F.flatIterators(its); |
| |
| return new Iterable<Cache.Entry<K, V>>() { |
| @Override public Iterator<Cache.Entry<K, V>> iterator() { |
| return it; |
| } |
| |
| @Override public String toString() { |
| return "CacheLocalEntries []"; |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final V localPeek(K key, |
| CachePeekMode[] peekModes) |
| throws IgniteCheckedException { |
| A.notNull(key, "key"); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| PeekModes modes = parsePeekModes(peekModes, false); |
| |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| CacheObject cacheVal = null; |
| |
| if (!ctx.isLocal()) { |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| int part = ctx.affinity().partition(cacheKey); |
| |
| boolean nearKey; |
| |
| if (!(modes.near && modes.primary && modes.backup)) { |
| boolean keyPrimary = ctx.affinity().primaryByPartition(ctx.localNode(), part, topVer); |
| |
| if (keyPrimary) { |
| if (!modes.primary) |
| return null; |
| |
| nearKey = false; |
| } |
| else { |
| boolean keyBackup = ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); |
| |
| if (keyBackup) { |
| if (!modes.backup) |
| return null; |
| |
| nearKey = false; |
| } |
| else { |
| if (!modes.near) |
| return null; |
| |
| nearKey = true; |
| |
| // Swap and offheap are disabled for near cache. |
| modes.offheap = false; |
| } |
| } |
| } |
| else { |
| nearKey = !ctx.affinity().partitionBelongs(ctx.localNode(), part, topVer); |
| |
| if (nearKey) { |
| // Swap and offheap are disabled for near cache. |
| modes.offheap = false; |
| } |
| } |
| |
| if (nearKey && !ctx.isNear()) |
| return null; |
| |
| GridCacheEntryEx e; |
| GridCacheContext ctx0; |
| |
| while (true) { |
| if (nearKey) |
| e = peekEx(key); |
| else { |
| ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; |
| e = modes.offheap ? ctx0.cache().entryEx(key) : ctx0.cache().peekEx(key); |
| } |
| |
| if (e != null) { |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| cacheVal = ctx.mvccEnabled() |
| ? e.mvccPeek(modes.heap && !modes.offheap) |
| : e.peek(modes.heap, modes.offheap, topVer, null); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry during 'peek': " + key); |
| |
| continue; |
| } |
| finally { |
| e.touch(); |
| |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| |
| break; |
| } |
| } |
| else { |
| while (true) { |
| try { |
| cacheVal = localCachePeek0(cacheKey, modes.heap, modes.offheap); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry during 'peek': " + key); |
| |
| // continue |
| } |
| } |
| } |
| |
| Object val = ctx.unwrapBinaryIfNeeded(cacheVal, ctx.keepBinary(), false, null); |
| |
| return (V)val; |
| } |
| |
| /** |
| * @param key Key. |
| * @param heap Read heap flag. |
| * @param offheap Read offheap flag. |
| * @return Value. |
| * @throws GridCacheEntryRemovedException If entry removed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable private CacheObject localCachePeek0(KeyCacheObject key, |
| boolean heap, |
| boolean offheap) |
| throws GridCacheEntryRemovedException, IgniteCheckedException { |
| assert ctx.isLocal(); |
| assert heap || offheap; |
| |
| GridCacheEntryEx e = offheap ? entryEx(key) : peekEx(key); |
| |
| if (e != null) { |
| try { |
| return e.peek(heap, offheap, AffinityTopologyVersion.NONE, null); |
| } |
| finally { |
| e.touch(); |
| } |
| } |
| |
| return null; |
| } |
| |
| /** |
| * Undeploys and removes all entries for class loader. |
| * |
| * @param ldr Class loader to undeploy. |
| */ |
| public final void onUndeploy(ClassLoader ldr) { |
| ctx.deploy().onUndeploy(ldr, context()); |
| } |
| |
| /** |
| * |
| * @param key Entry key. |
| * @return Entry or <tt>null</tt>. |
| */ |
| @Nullable public final GridCacheEntryEx peekEx(KeyCacheObject key) { |
| return entry0(key, ctx.affinity().affinityTopologyVersion(), false, false); |
| } |
| |
| /** |
| * |
| * @param key Entry key. |
| * @return Entry or <tt>null</tt>. |
| */ |
| @Nullable public final GridCacheEntryEx peekEx(Object key) { |
| return entry0(ctx.toCacheKeyObject(key), ctx.affinity().affinityTopologyVersion(), false, false); |
| } |
| |
| /** |
| * @param key Entry key. |
| * @return Entry (never {@code null}). |
| */ |
| public final GridCacheEntryEx entryEx(Object key) { |
| return entryEx(ctx.toCacheKeyObject(key)); |
| } |
| |
| /** |
| * @param key Entry key. |
| * @return Entry (never {@code null}). |
| */ |
| public final GridCacheEntryEx entryEx(KeyCacheObject key) { |
| return entryEx(key, ctx.affinity().affinityTopologyVersion()); |
| } |
| |
| /** |
| * @param topVer Topology version. |
| * @param key Entry key. |
| * @return Entry (never {@code null}). |
| */ |
| public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) { |
| GridCacheEntryEx e = map.putEntryIfObsoleteOrAbsent(ctx, topVer, key, true, false); |
| |
| assert e != null; |
| |
| return e; |
| } |
| |
| /** |
| * @param key Entry key. |
| * @param topVer Topology version at the time of creation. |
| * @param create Flag to create entry if it does not exist. |
| * @param skipReserve Flag to skip partition reservation. |
| * @return Entry or <tt>null</tt>. |
| */ |
| @Nullable private GridCacheEntryEx entry0(KeyCacheObject key, AffinityTopologyVersion topVer, boolean create, |
| boolean skipReserve) { |
| GridCacheMapEntry cur = map.getEntry(ctx, key); |
| |
| if (cur == null || cur.obsolete()) { |
| cur = map.putEntryIfObsoleteOrAbsent( |
| ctx, |
| topVer, |
| key, |
| create, |
| skipReserve); |
| } |
| |
| return cur; |
| } |
| |
| /** |
| * @return Set of internal cached entry representations. |
| */ |
| public final Iterable<? extends GridCacheEntryEx> entries() { |
| return allEntries(); |
| } |
| |
| /** |
| * @return Set of internal cached entry representations. |
| */ |
| public final Iterable<? extends GridCacheEntryEx> allEntries() { |
| return map.entries(ctx.cacheId()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final Set<Cache.Entry<K, V>> entrySet() { |
| return entrySet((CacheEntryPredicate[])null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final Set<K> keySet() { |
| return new KeySet(map.entrySet(ctx.cacheId())); |
| } |
| |
| /** |
| * |
| * @param key Entry key. |
| */ |
| public final void removeIfObsolete(KeyCacheObject key) { |
| assert key != null; |
| |
| GridCacheMapEntry entry = map.getEntry(ctx, key); |
| |
| if (entry != null && entry.obsolete()) |
| removeEntry(entry); |
| } |
| |
| /** |
| * Split clearLocally all task into multiple runnables. |
| * |
| * @param srv Whether to clear server cache. |
| * @param near Whether to clear near cache. |
| * @param readers Whether to clear readers. |
| * @return Split runnables. |
| */ |
| public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, boolean readers) { |
| if ((isNear() && near) || (!isNear() && srv)) { |
| int keySize = size(); |
| |
| int cnt = Math.min(keySize / CLEAR_ALL_SPLIT_THRESHOLD + (keySize % CLEAR_ALL_SPLIT_THRESHOLD != 0 ? 1 : 0), |
| Runtime.getRuntime().availableProcessors()); |
| |
| if (cnt == 0) |
| cnt = 1; // Still perform cleanup since there could be entries in swap. |
| |
| GridCacheVersion obsoleteVer = nextVersion(); |
| |
| List<GridCacheClearAllRunnable<K, V>> res = new ArrayList<>(cnt); |
| |
| for (int i = 0; i < cnt; i++) |
| res.add(new GridCacheClearAllRunnable<>(this, obsoleteVer, i, cnt, readers)); |
| |
| return res; |
| } |
| else |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean clearLocally(K key) { |
| return clearLocally0(key, false); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clearLocallyAll(Set<? extends K> keys, boolean srv, boolean near, boolean readers) { |
| if (keys != null && ((isNear() && near) || (!isNear() && srv))) { |
| for (K key : keys) |
| clearLocally0(key, readers); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clearLocally(boolean srv, boolean near, boolean readers) { |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| |
| //TODO IGNITE-7952 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Clear"); |
| |
| List<GridCacheClearAllRunnable<K, V>> jobs = splitClearLocally(srv, near, readers); |
| |
| if (!F.isEmpty(jobs)) { |
| ExecutorService execSvc = null; |
| |
| try { |
| if (jobs.size() > 1) { |
| execSvc = Executors.newFixedThreadPool(jobs.size() - 1, |
| new IgniteThreadFactory(ctx.igniteInstanceName(), "async-cache-cleaner")); |
| |
| for (int i = 1; i < jobs.size(); i++) |
| execSvc.execute(jobs.get(i)); |
| } |
| |
| jobs.get(0).run(); |
| } |
| finally { |
| if (execSvc != null) { |
| execSvc.shutdown(); |
| |
| try { |
| while (!execSvc.isTerminated() && !Thread.currentThread().isInterrupted()) |
| execSvc.awaitTermination(1000, TimeUnit.MILLISECONDS); |
| } |
| catch (InterruptedException ignore) { |
| U.warn(log, "Got interrupted while waiting for Cache.clearLocally() executor service to " + |
| "finish."); |
| |
| Thread.currentThread().interrupt(); |
| } |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear() throws IgniteCheckedException { |
| clear((Set<? extends K>)null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clear(K key) throws IgniteCheckedException { |
| clear(Collections.singleton(key)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void clearAll(Set<? extends K> keys) throws IgniteCheckedException { |
| clear(keys); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> clearAsync() { |
| return clearAsync((Set<? extends K>)null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> clearAsync(K key) { |
| return clearAsync(Collections.singleton(key)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> clearAllAsync(Set<? extends K> keys) { |
| return clearAsync(keys); |
| } |
| |
| /** |
| * @param keys Keys to clear. |
| * @throws IgniteCheckedException In case of error. |
| */ |
| private void clear(@Nullable Set<? extends K> keys) throws IgniteCheckedException { |
| //TODO IGNITE-7952 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Clear"); |
| |
| ctx.shared().cache().checkReadOnlyState("clear", ctx.config()); |
| |
| if (isLocal()) { |
| if (keys == null) |
| clearLocally(true, false, false); |
| else |
| clearLocallyAll(keys, true, false, false); |
| } |
| else { |
| executeClearTask(keys, false).get(); |
| executeClearTask(keys, true).get(); |
| } |
| } |
| |
| /** |
| * @param keys Keys to clear or {@code null} if all cache should be cleared. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<?> clearAsync(@Nullable final Set<? extends K> keys) { |
| //TODO IGNITE-7952 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Clear"); |
| |
| ctx.shared().cache().checkReadOnlyState("clear", ctx.config()); |
| |
| if (isLocal()) |
| return clearLocallyAsync(keys); |
| else |
| return executeClearTask(keys, false).chainCompose(fut -> executeClearTask(keys, true)); |
| } |
| |
| /** |
| * @param keys Keys to clear. |
| * @return Clear future. |
| */ |
| private IgniteInternalFuture<?> clearLocallyAsync(@Nullable final Set<? extends K> keys) { |
| return ctx.closures().callLocalSafe(new GridPlainCallable<Object>() { |
| @Override public Object call() { |
| if (keys == null) |
| clearLocally(true, false, false); |
| else |
| clearLocallyAll(keys, true, false, false); |
| |
| return null; |
| } |
| }, false); |
| } |
| |
| /** |
| * @param keys Keys to clear. |
| * @param near Near cache flag. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<?> executeClearTask(@Nullable Set<? extends K> keys, boolean near) { |
| Collection<ClusterNode> srvNodes = ctx.grid().cluster().forCacheNodes(name(), !near, near, false).nodes(); |
| |
| if (!srvNodes.isEmpty()) { |
| ctx.kernalContext().task().setThreadContext(TC_SUBGRID, srvNodes); |
| |
| return ctx.kernalContext().task().execute( |
| new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys, near), null); |
| } |
| else |
| return new GridFinishedFuture<>(); |
| } |
| |
| /** |
| * @param part Partition id. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<?> executePreloadTask(int part) throws IgniteCheckedException { |
| ClusterGroup grp = ctx.grid().cluster().forDataNodes(ctx.name()); |
| |
| @Nullable ClusterNode targetNode = ctx.affinity().primaryByPartition(part, ctx.topology().readyTopologyVersion()); |
| |
| if (targetNode == null || targetNode.version().compareTo(PRELOAD_PARTITION_SINCE) < 0) { |
| if (!partPreloadBadVerWarned) { |
| U.warn(log(), "Attempting to execute partition preloading task on outdated or not mapped node " + |
| "[targetNodeVer=" + (targetNode == null ? "NA" : targetNode.version()) + |
| ", minSupportedNodeVer=" + PRELOAD_PARTITION_SINCE + ']'); |
| |
| partPreloadBadVerWarned = true; |
| } |
| |
| return new GridFinishedFuture<>(); |
| } |
| |
| return ctx.closures().affinityRun(Collections.singleton(name()), part, |
| new PartitionPreloadJob(ctx.name(), part), grp.nodes(), null); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param readers Readers flag. |
| */ |
| public void clearLocally(Collection<KeyCacheObject> keys, boolean readers) { |
| if (F.isEmpty(keys)) |
| return; |
| |
| //TODO IGNITE-7952 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Clear"); |
| |
| GridCacheVersion obsoleteVer = nextVersion(); |
| |
| for (KeyCacheObject key : keys) { |
| GridCacheEntryEx e = peekEx(key); |
| |
| try { |
| if (e != null) |
| e.clear(obsoleteVer, readers); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to clearLocally entry (will continue to clearLocally other entries): " + e, |
| ex); |
| } |
| } |
| } |
| |
| /** |
| * @param entry Removes entry from cache if currently mapped value is the same as passed. |
| */ |
| public final void removeEntry(GridCacheEntryEx entry) { |
| boolean rmvd = map.removeEntry(entry); |
| |
| if (log.isDebugEnabled()) { |
| if (rmvd) |
| log.debug("Removed entry from cache: " + entry); |
| else |
| log.debug("Remove will not be done for key (entry got replaced or removed): " + entry.key()); |
| } |
| } |
| |
| /** |
| * Evicts an entry from cache. |
| * |
| * @param key Key. |
| * @param ver Version. |
| * @param filter Filter. |
| * @return {@code True} if entry was evicted. |
| */ |
| private boolean evictx(K key, GridCacheVersion ver, |
| @Nullable CacheEntryPredicate[] filter) { |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| GridCacheEntryEx entry = peekEx(cacheKey); |
| |
| if (entry == null) |
| return true; |
| |
| try { |
| return ctx.evicts().evict(entry, ver, true, filter); |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to evict entry from cache: " + entry, ex); |
| |
| return false; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Integer> lostPartitions() { |
| if (isLocal()) |
| return Collections.emptyList(); |
| |
| return ctx.topology().lostPartitions(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final V getForcePrimary(K key) throws IgniteCheckedException { |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return repairableGetAllAsync( |
| F.asList(key), |
| /*force primary*/true, |
| /*skip tx*/false, |
| taskName, |
| /*deserialize cache objects*/true, |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| /*skip values*/false, |
| /*need ver*/false).get().get(key); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<V> getForcePrimaryAsync(final K key) { |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return repairableGetAllAsync( |
| Collections.singletonList(key), |
| /*force primary*/true, |
| /*skip tx*/false, |
| taskName, |
| true, |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| /*skip vals*/false, |
| /*can remap*/false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { |
| @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { |
| return e.get().get(key); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException { |
| return getAllOutTxAsync(keys).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Map<K, V>> getAllOutTxAsync(Set<? extends K> keys) { |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| IgniteInternalFuture<Map<K, V>> fut = repairableGetAllAsync(keys, |
| !ctx.config().isReadFromBackup(), |
| /*skip tx*/true, |
| taskName, |
| !(opCtx != null && opCtx.isKeepBinary()), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| /*skip values*/false, |
| /*need ver*/false); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateGetAllTimeStatClosure<>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL, start)); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public V get(K key) throws IgniteCheckedException { |
| A.notNull(key, "key"); |
| |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| boolean keepBinary = ctx.keepBinary(); |
| |
| if (keepBinary) |
| key = (K)ctx.toCacheKeyObject(key); |
| |
| V val = repairableGet(key, !keepBinary, false); |
| |
| if (ctx.config().getInterceptor() != null) { |
| key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false, null) : key; |
| |
| val = (V)ctx.config().getInterceptor().onGet(key, val); |
| } |
| |
| if (statsEnabled) |
| metrics0().addGetTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_GET, start); |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public CacheEntry<K, V> getEntry(K key) throws IgniteCheckedException { |
| A.notNull(key, "key"); |
| |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| boolean keepBinary = ctx.keepBinary(); |
| |
| if (keepBinary) |
| key = (K)ctx.toCacheKeyObject(key); |
| |
| EntryGetResult t |
| = (EntryGetResult)repairableGet(key, !keepBinary, true); |
| |
| CacheEntry<K, V> val = t != null ? new CacheEntryImplEx<>( |
| keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false, null) : key, |
| (V)t.value(), |
| t.version()) |
| : null; |
| |
| if (ctx.config().getInterceptor() != null) { |
| key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key, true, false, null) : key; |
| |
| V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); |
| |
| val = (val0 != null) ? new CacheEntryImplEx<>(key, val0, t != null ? t.version() : null) : null; |
| } |
| |
| if (statsEnabled) |
| metrics0().addGetTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_GET, start); |
| |
| return val; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<V> getAsync(final K key) { |
| A.notNull(key, "key"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| final boolean keepBinary = ctx.keepBinary(); |
| |
| final K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key; |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| IgniteInternalFuture<V> fut = repairableGetAsync( |
| key, |
| !keepBinary, |
| false, |
| opCtx != null ? opCtx.readRepairStrategy() : null); |
| |
| if (ctx.config().getInterceptor() != null) |
| fut = fut.chain(new CX1<IgniteInternalFuture<V>, V>() { |
| @Override public V applyx(IgniteInternalFuture<V> f) throws IgniteCheckedException { |
| K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false, null) : key0; |
| |
| return (V)ctx.config().getInterceptor().onGet(key, f.get()); |
| } |
| }); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateGetTimeStatClosure<V>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET, start)); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<CacheEntry<K, V>> getEntryAsync(final K key) { |
| A.notNull(key, "key"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| final boolean keepBinary = ctx.keepBinary(); |
| |
| final K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key; |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| IgniteInternalFuture<EntryGetResult> fut = |
| (IgniteInternalFuture<EntryGetResult>)repairableGetAsync( |
| key0, |
| !keepBinary, |
| true, |
| opCtx != null ? opCtx.readRepairStrategy() : null); |
| |
| final boolean intercept = ctx.config().getInterceptor() != null; |
| |
| IgniteInternalFuture<CacheEntry<K, V>> fr = fut.chain( |
| new CX1<IgniteInternalFuture<EntryGetResult>, CacheEntry<K, V>>() { |
| @Override public CacheEntry<K, V> applyx(IgniteInternalFuture<EntryGetResult> f) |
| throws IgniteCheckedException { |
| EntryGetResult t = f.get(); |
| |
| K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false, null) : key0; |
| |
| CacheEntry val = t != null ? new CacheEntryImplEx<>( |
| key, |
| t.value(), |
| t.version()) |
| : null; |
| |
| if (intercept) { |
| V val0 = (V)ctx.config().getInterceptor().onGet(key, t != null ? val.getValue() : null); |
| |
| return val0 != null ? new CacheEntryImplEx(key, val0, t != null ? t.version() : null) : null; |
| } |
| else |
| return val; |
| } |
| }); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateGetTimeStatClosure<EntryGetResult>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET, start)); |
| |
| return fr; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final Map<K, V> getAll(@Nullable Collection<? extends K> keys) throws IgniteCheckedException { |
| A.notNull(keys, "keys"); |
| |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| Map<K, V> map = repairableGetAll( |
| keys, |
| !ctx.keepBinary(), |
| false, |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null); |
| |
| if (ctx.config().getInterceptor() != null) |
| map = interceptGet(keys, map); |
| |
| if (statsEnabled) |
| metrics0().addGetAllTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_GET_ALL, start); |
| |
| return map; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<CacheEntry<K, V>> getEntries(@Nullable Collection<? extends K> keys) |
| throws IgniteCheckedException { |
| A.notNull(keys, "keys"); |
| |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| Map<K, EntryGetResult> map = (Map<K, EntryGetResult>)repairableGetAll( |
| keys, |
| !ctx.keepBinary(), |
| true, |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null); |
| |
| Collection<CacheEntry<K, V>> res = new HashSet<>(); |
| |
| if (ctx.config().getInterceptor() != null) |
| res = interceptGetEntries(keys, map); |
| else |
| for (Map.Entry<K, EntryGetResult> e : map.entrySet()) |
| res.add(new CacheEntryImplEx<>(e.getKey(), (V)e.getValue().value(), e.getValue().version())); |
| |
| if (statsEnabled) |
| metrics0().addGetAllTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_GET_ALL, start); |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys) { |
| A.notNull(keys, "keys"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| IgniteInternalFuture<Map<K, V>> fut = repairableGetAllAsync( |
| keys, |
| !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| taskName, |
| !(opCtx != null && opCtx.isKeepBinary()), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| /*skip vals*/false, |
| /*need ver*/false); |
| |
| if (ctx.config().getInterceptor() != null) |
| return fut.chain(new CX1<IgniteInternalFuture<Map<K, V>>, Map<K, V>>() { |
| @Override public Map<K, V> applyx(IgniteInternalFuture<Map<K, V>> f) throws IgniteCheckedException { |
| return interceptGet(keys, f.get()); |
| } |
| }); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateGetAllTimeStatClosure<Map<K, V>>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL, start)); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Collection<CacheEntry<K, V>>> getEntriesAsync( |
| @Nullable final Collection<? extends K> keys) { |
| A.notNull(keys, "keys"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| String taskName = ctx.kernalContext().job().currentTaskName(); |
| |
| IgniteInternalFuture<Map<K, EntryGetResult>> fut = |
| (IgniteInternalFuture<Map<K, EntryGetResult>>)((IgniteInternalFuture)repairableGetAllAsync( |
| keys, |
| !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| taskName, |
| !(opCtx != null && opCtx.isKeepBinary()), |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| /*skip vals*/false, |
| /*need ver*/true)); |
| |
| final boolean intercept = ctx.config().getInterceptor() != null; |
| |
| IgniteInternalFuture<Collection<CacheEntry<K, V>>> rf = |
| fut.chain(new CX1<IgniteInternalFuture<Map<K, EntryGetResult>>, Collection<CacheEntry<K, V>>>() { |
| @Override public Collection<CacheEntry<K, V>> applyx( |
| IgniteInternalFuture<Map<K, EntryGetResult>> f) throws IgniteCheckedException { |
| if (intercept) |
| return interceptGetEntries(keys, f.get()); |
| else { |
| Map<K, CacheEntry<K, V>> res = U.newHashMap(f.get().size()); |
| |
| for (Map.Entry<K, EntryGetResult> e : f.get().entrySet()) |
| res.put(e.getKey(), |
| new CacheEntryImplEx<>(e.getKey(), (V)e.getValue().value(), e.getValue().version())); |
| |
| return res.values(); |
| } |
| } |
| }); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateGetAllTimeStatClosure<Map<K, EntryGetResult>>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET_ALL, start)); |
| |
| return rf; |
| } |
| |
| /** |
| * Applies cache interceptor on result of 'get' operation. |
| * |
| * @param keys All requested keys. |
| * @param map Result map. |
| * @return Map with values returned by cache interceptor.. |
| */ |
| private Map<K, V> interceptGet(@Nullable Collection<? extends K> keys, Map<K, V> map) { |
| if (F.isEmpty(keys)) |
| return map; |
| |
| CacheInterceptor<K, V> interceptor = cacheCfg.getInterceptor(); |
| |
| assert interceptor != null; |
| |
| Map<K, V> res = U.newHashMap(keys.size()); |
| |
| for (Map.Entry<K, V> e : map.entrySet()) { |
| V val = interceptor.onGet(e.getKey(), e.getValue()); |
| |
| if (val != null) |
| res.put(e.getKey(), val); |
| } |
| |
| if (map.size() != keys.size()) { // Not all requested keys were in cache. |
| for (K key : keys) { |
| if (key != null) { |
| if (!map.containsKey(key)) { |
| V val = interceptor.onGet(key, null); |
| |
| if (val != null) |
| res.put(key, val); |
| } |
| } |
| } |
| } |
| |
| return res; |
| } |
| |
| /** |
| * Applies cache interceptor on result of 'getEntries' operation. |
| * |
| * @param keys All requested keys. |
| * @param map Result map. |
| * @return Map with values returned by cache interceptor.. |
| */ |
| private Collection<CacheEntry<K, V>> interceptGetEntries( |
| @Nullable Collection<? extends K> keys, Map<K, EntryGetResult> map) { |
| if (F.isEmpty(keys)) { |
| assert map.isEmpty(); |
| |
| return Collections.emptySet(); |
| } |
| |
| Map<K, CacheEntry<K, V>> res = U.newHashMap(keys.size()); |
| |
| CacheInterceptor<K, V> interceptor = cacheCfg.getInterceptor(); |
| |
| assert interceptor != null; |
| |
| for (Map.Entry<K, EntryGetResult> e : map.entrySet()) { |
| V val = interceptor.onGet(e.getKey(), (V)e.getValue().value()); |
| |
| if (val != null) |
| res.put(e.getKey(), new CacheEntryImplEx<>(e.getKey(), val, e.getValue().version())); |
| } |
| |
| if (map.size() != keys.size()) { // Not all requested keys were in cache. |
| for (K key : keys) { |
| if (key != null) { |
| if (!map.containsKey(key)) { |
| V val = interceptor.onGet(key, null); |
| |
| if (val != null) |
| res.put(key, new CacheEntryImplEx<>(key, val, null)); |
| } |
| } |
| } |
| } |
| |
| return res.values(); |
| } |
| |
| /** |
| * @param key Key. |
| * @param forcePrimary Force primary. |
| * @param skipTx Skip tx. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary. |
| * @param skipVals Skip values. |
| * @param needVer Need version. |
| * @return Future for the get operation. |
| */ |
| protected IgniteInternalFuture<V> getAsync( |
| final K key, |
| boolean forcePrimary, |
| boolean skipTx, |
| String taskName, |
| boolean deserializeBinary, |
| final boolean skipVals, |
| final boolean needVer |
| ) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return getAllAsync(Collections.singletonList(key), |
| forcePrimary, |
| skipTx, |
| taskName, |
| deserializeBinary, |
| opCtx != null && opCtx.recovery(), |
| opCtx != null ? opCtx.readRepairStrategy() : null, |
| skipVals, |
| needVer).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() { |
| @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException { |
| Map<K, V> map = e.get(); |
| |
| assert map.isEmpty() || map.size() == 1 : map.size(); |
| |
| if (skipVals) { |
| Boolean val = map.isEmpty() ? false : (Boolean)F.firstValue(map); |
| |
| return (V)(val); |
| } |
| |
| return F.firstValue(map); |
| } |
| }); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param forcePrimary Force primary. |
| * @param skipTx Skip tx. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary. |
| * @param recovery Recovery mode flag. |
| * @param skipVals Skip values. |
| * @param needVer Need version. |
| * @return Future for the get operation. |
| * @see GridCacheAdapter#getAllAsync(Collection) |
| */ |
| protected IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable Collection<? extends K> keys, |
| boolean forcePrimary, |
| boolean skipTx, |
| String taskName, |
| boolean deserializeBinary, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy, |
| boolean skipVals, |
| final boolean needVer |
| ) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return getAllAsync(keys, |
| null, |
| opCtx == null || !opCtx.skipStore(), |
| !skipTx, |
| taskName, |
| deserializeBinary, |
| opCtx != null && opCtx.recovery(), |
| readRepairStrategy, |
| forcePrimary, |
| skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null), |
| skipVals, |
| needVer); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param readerArgs Near cache reader will be added if not null. |
| * @param readThrough Read through. |
| * @param checkTx Check tx. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary. |
| * @param recovery Recovery flag. |
| * @param forcePrimary Froce primary. |
| * @param expiry Expiry policy. |
| * @param skipVals Skip values. |
| * @param needVer Need version. |
| * @return Future for the get operation. |
| * @see GridCacheAdapter#getAllAsync(Collection) |
| */ |
| public final IgniteInternalFuture<Map<K, V>> getAllAsync(@Nullable final Collection<? extends K> keys, |
| @Nullable final ReaderArguments readerArgs, |
| boolean readThrough, |
| boolean checkTx, |
| final String taskName, |
| final boolean deserializeBinary, |
| final boolean recovery, |
| final ReadRepairStrategy readRepairStrategy, |
| final boolean forcePrimary, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| final boolean skipVals, |
| final boolean needVer |
| ) { |
| ctx.checkSecurity(SecurityPermission.CACHE_READ); |
| |
| warnIfUnordered(keys, BulkOperation.GET); |
| |
| return getAllAsync0(ctx.cacheKeysView(keys), |
| readerArgs, |
| readThrough, |
| checkTx, |
| taskName, |
| deserializeBinary, |
| expiry, |
| skipVals, |
| /*keep cache objects*/false, |
| recovery, |
| readRepairStrategy, |
| needVer, |
| null, |
| null); // TODO IGNITE-7371 |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param readerArgs Near cache reader will be added if not null. |
| * @param readThrough Read-through flag. |
| * @param checkTx Check local transaction flag. |
| * @param taskName Task name/ |
| * @param deserializeBinary Deserialize binary flag. |
| * @param expiry Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param keepCacheObjects Keep cache objects. |
| * @param needVer If {@code true} returns values as tuples containing value and version. |
| * @param txLbl Transaction label. |
| * @param mvccSnapshot MVCC snapshot. |
| * @return Future. |
| */ |
| protected final <K1, V1> IgniteInternalFuture<Map<K1, V1>> getAllAsync0( |
| @Nullable final Collection<KeyCacheObject> keys, |
| @Nullable final ReaderArguments readerArgs, |
| final boolean readThrough, |
| boolean checkTx, |
| final String taskName, |
| final boolean deserializeBinary, |
| @Nullable final IgniteCacheExpiryPolicy expiry, |
| final boolean skipVals, |
| final boolean keepCacheObjects, |
| final boolean recovery, |
| final ReadRepairStrategy readRepairStrategy, |
| final boolean needVer, |
| @Nullable String txLbl, |
| MvccSnapshot mvccSnapshot |
| ) { |
| if (F.isEmpty(keys)) |
| return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap()); |
| |
| GridNearTxLocal tx = null; |
| |
| if (checkTx) { |
| try { |
| checkJta(); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| tx = checkCurrentTx(); |
| } |
| |
| if (ctx.mvccEnabled() || tx == null || tx.implicit()) { |
| assert (mvccSnapshot == null) == !ctx.mvccEnabled(); |
| |
| Map<KeyCacheObject, EntryGetResult> misses = null; |
| |
| Set<GridCacheEntryEx> newLocalEntries = null; |
| |
| final AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : |
| tx.topologyVersion(); |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| int keysSize = keys.size(); |
| |
| GridDhtTopologyFuture topFut = ctx.shared().exchange().lastFinishedFuture(); |
| |
| Throwable ex = topFut != null ? topFut.validateCache(ctx, recovery, /*read*/true, null, keys) : null; |
| |
| if (ex != null) |
| return new GridFinishedFuture<>(ex); |
| |
| final Map<K1, V1> map = keysSize == 1 ? |
| (Map<K1, V1>)new IgniteBiTuple<>() : |
| U.<K1, V1>newHashMap(keysSize); |
| |
| final boolean storeEnabled = !skipVals && readThrough && ctx.readThrough(); |
| |
| boolean readNoEntry = ctx.readNoEntry(expiry, readerArgs != null); |
| |
| for (KeyCacheObject key : keys) { |
| while (true) { |
| try { |
| EntryGetResult res = null; |
| |
| boolean evt = !skipVals; |
| boolean updateMetrics = !skipVals; |
| |
| GridCacheEntryEx entry = null; |
| |
| boolean skipEntry = readNoEntry; |
| |
| if (readNoEntry) { |
| CacheDataRow row = mvccSnapshot != null ? |
| ctx.offheap().mvccRead(ctx, key, mvccSnapshot) : |
| ctx.offheap().read(ctx, key); |
| |
| if (row != null) { |
| long expireTime = row.expireTime(); |
| |
| if (expireTime != 0) { |
| if (expireTime > U.currentTimeMillis()) { |
| res = new EntryGetWithTtlResult(row.value(), |
| row.version(), |
| false, |
| expireTime, |
| 0); |
| } |
| else |
| skipEntry = false; |
| } |
| else |
| res = new EntryGetResult(row.value(), row.version(), false); |
| } |
| |
| if (res != null) { |
| if (evt) { |
| ctx.events().readEvent(key, |
| null, |
| txLbl, |
| row.value(), |
| taskName, |
| !deserializeBinary); |
| } |
| |
| if (updateMetrics && ctx.statisticsEnabled()) |
| ctx.cache().metrics0().onRead(true); |
| } |
| else if (storeEnabled) |
| skipEntry = false; |
| } |
| |
| if (!skipEntry) { |
| boolean isNewLocalEntry = this.map.getEntry(ctx, key) == null; |
| |
| entry = entryEx(key); |
| |
| if (entry == null) { |
| if (!skipVals && ctx.statisticsEnabled()) |
| ctx.cache().metrics0().onRead(false); |
| |
| break; |
| } |
| |
| if (isNewLocalEntry) { |
| if (newLocalEntries == null) |
| newLocalEntries = new HashSet<>(); |
| |
| newLocalEntries.add(entry); |
| } |
| |
| if (storeEnabled) { |
| res = entry.innerGetAndReserveForLoad(updateMetrics, |
| evt, |
| taskName, |
| expiry, |
| !deserializeBinary, |
| readerArgs); |
| |
| assert res != null; |
| |
| if (res.value() == null) { |
| if (misses == null) |
| misses = new HashMap<>(); |
| |
| misses.put(key, res); |
| |
| res = null; |
| } |
| } |
| else { |
| res = entry.innerGetVersioned( |
| null, |
| null, |
| updateMetrics, |
| evt, |
| null, |
| taskName, |
| expiry, |
| !deserializeBinary, |
| readerArgs); |
| |
| if (res == null) |
| entry.touch(); |
| } |
| } |
| |
| if (res != null) { |
| ctx.addResult(map, |
| key, |
| res, |
| skipVals, |
| keepCacheObjects, |
| deserializeBinary, |
| true, |
| needVer); |
| |
| if (entry != null && (tx == null || (!tx.implicit() && tx.isolation() == READ_COMMITTED))) |
| entry.touch(); |
| |
| if (keysSize == 1) |
| // Safe to return because no locks are required in READ_COMMITTED mode. |
| return new GridFinishedFuture<>(map); |
| } |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignored) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry in getAllAsync(..) method (will retry): " + key); |
| } |
| } |
| } |
| |
| if (storeEnabled && misses != null) { |
| final Map<KeyCacheObject, EntryGetResult> loadKeys = misses; |
| |
| final IgniteTxLocalAdapter tx0 = tx; |
| |
| final Collection<KeyCacheObject> loaded = new HashSet<>(); |
| |
| return new GridEmbeddedFuture( |
| ctx.closures().callLocalSafe(ctx.projectSafe(new GPC<Map<K1, V1>>() { |
| @Override public Map<K1, V1> call() throws Exception { |
| ctx.store().loadAll(null/*tx*/, loadKeys.keySet(), new CI2<KeyCacheObject, Object>() { |
| @Override public void apply(KeyCacheObject key, Object val) { |
| EntryGetResult res = loadKeys.get(key); |
| |
| if (res == null || val == null) |
| return; |
| |
| loaded.add(key); |
| |
| CacheObject cacheVal = ctx.toCacheObject(val); |
| |
| while (true) { |
| GridCacheEntryEx entry = null; |
| |
| try { |
| ctx.shared().database().ensureFreeSpace(ctx.dataRegion()); |
| } |
| catch (IgniteCheckedException e) { |
| // Wrap errors (will be unwrapped). |
| throw new GridClosureException(e); |
| } |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| entry = entryEx(key); |
| |
| entry.unswap(); |
| |
| GridCacheVersion newVer = nextVersion(); |
| |
| EntryGetResult verVal = entry.versionedValue( |
| cacheVal, |
| res.version(), |
| newVer, |
| expiry, |
| readerArgs); |
| |
| if (log.isDebugEnabled()) |
| log.debug("Set value loaded from store into entry [" + |
| "oldVer=" + res.version() + |
| ", newVer=" + verVal.version() + ", " + |
| "entry=" + entry + ']'); |
| |
| // Don't put key-value pair into result map if value is null. |
| if (verVal.value() != null) { |
| ctx.addResult(map, |
| key, |
| verVal, |
| skipVals, |
| keepCacheObjects, |
| deserializeBinary, |
| true, |
| needVer); |
| } |
| else { |
| ctx.addResult( |
| map, |
| key, |
| new EntryGetResult(cacheVal, res.version()), |
| skipVals, |
| keepCacheObjects, |
| deserializeBinary, |
| false, |
| needVer |
| ); |
| } |
| |
| if (tx0 == null || (!tx0.implicit() && |
| tx0.isolation() == READ_COMMITTED)) |
| entry.touch(); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry during getAllAsync (will retry): " + |
| entry); |
| } |
| catch (IgniteCheckedException e) { |
| // Wrap errors (will be unwrapped). |
| throw new GridClosureException(e); |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| } |
| }); |
| |
| clearReservationsIfNeeded(topVer, loadKeys, loaded, tx0); |
| |
| return map; |
| } |
| }), true), |
| new C2<Map<K, V>, Exception, IgniteInternalFuture<Map<K, V>>>() { |
| @Override public IgniteInternalFuture<Map<K, V>> apply(Map<K, V> map, Exception e) { |
| if (e != null) { |
| clearReservationsIfNeeded(topVer, loadKeys, loaded, tx0); |
| |
| return new GridFinishedFuture<>(e); |
| } |
| |
| if (tx0 == null || (!tx0.implicit() && tx0.isolation() == READ_COMMITTED)) { |
| Collection<KeyCacheObject> notFound = new HashSet<>(loadKeys.keySet()); |
| |
| notFound.removeAll(loaded); |
| |
| // Touch entries that were not found in store. |
| for (KeyCacheObject key : notFound) { |
| GridCacheEntryEx entry = peekEx(key); |
| |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| |
| // There were no misses. |
| return new GridFinishedFuture<>(Collections.<K, |
| V>emptyMap()); |
| } |
| }, |
| new C2<Map<K1, V1>, Exception, Map<K1, V1>>() { |
| @Override public Map<K1, V1> apply(Map<K1, V1> loaded, Exception e) { |
| if (e == null) |
| map.putAll(loaded); |
| |
| return map; |
| } |
| } |
| ); |
| } |
| else |
| // Misses can be non-zero only if store is enabled. |
| assert misses == null; |
| |
| return new GridFinishedFuture<>(map); |
| } |
| catch (RuntimeException | AssertionError e) { |
| if (misses != null) { |
| for (KeyCacheObject key0 : misses.keySet()) { |
| GridCacheEntryEx entry = peekEx(key0); |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| |
| if (newLocalEntries != null) { |
| for (GridCacheEntryEx entry : newLocalEntries) |
| removeEntry(entry); |
| } |
| |
| return new GridFinishedFuture<>(e); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| else { |
| return asyncOp(tx, new AsyncOp<Map<K1, V1>>(keys) { |
| @Override public IgniteInternalFuture<Map<K1, V1>> op(GridNearTxLocal tx, |
| AffinityTopologyVersion readyTopVer) { |
| return tx.getAllAsync(ctx, |
| readyTopVer, |
| keys, |
| deserializeBinary, |
| skipVals, |
| false, |
| !readThrough, |
| recovery, |
| readRepairStrategy, |
| needVer); |
| } |
| }, ctx.operationContextPerCall(), /*retry*/false); |
| } |
| } |
| |
| /** */ |
| protected GridNearTxLocal checkCurrentTx() { |
| if (!ctx.mvccEnabled()) |
| return ctx.tm().threadLocalTx(ctx); |
| |
| return MvccUtils.tx(ctx.kernalContext(), null); |
| } |
| |
| /** |
| * @param topVer Affinity topology version for which load was performed. |
| * @param loadKeys Keys to load. |
| * @param loaded Actually loaded keys. |
| * @param tx0 Transaction within which the load was run, if any. |
| */ |
| private void clearReservationsIfNeeded( |
| AffinityTopologyVersion topVer, |
| Map<KeyCacheObject, EntryGetResult> loadKeys, |
| Collection<KeyCacheObject> loaded, |
| IgniteTxLocalAdapter tx0 |
| ) { |
| if (loaded.size() != loadKeys.size()) { |
| boolean needTouch = |
| tx0 == null || (!tx0.implicit() && tx0.isolation() == READ_COMMITTED); |
| |
| for (Map.Entry<KeyCacheObject, EntryGetResult> e : loadKeys.entrySet()) { |
| if (loaded.contains(e.getKey())) |
| continue; |
| |
| if (needTouch || e.getValue().reserved()) { |
| GridCacheEntryEx entry = peekEx(e.getKey()); |
| |
| if (entry != null) { |
| if (e.getValue().reserved()) |
| entry.clearReserveForLoad(e.getValue().version()); |
| |
| if (needTouch) |
| entry.touch(); |
| } |
| } |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final V getAndPut(K key, V val) throws IgniteCheckedException { |
| return getAndPut(key, val, null); |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Optional filter. |
| * @return Previous value. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable public V getAndPut(final K key, final V val, @Nullable final CacheEntryPredicate filter) |
| throws IgniteCheckedException { |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key", val, "val"); |
| |
| V prevVal = getAndPut0(key, val, filter); |
| |
| if (statsEnabled) |
| metrics0().addPutAndGetTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_GET_AND_PUT, start); |
| |
| return prevVal; |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Optional filter. |
| * @return Previous value. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected V getAndPut0(final K key, final V val, @Nullable final CacheEntryPredicate filter) |
| throws IgniteCheckedException { |
| return syncOp(new SyncOp<V>(true) { |
| @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException { |
| return (V)tx.putAsync(ctx, null, key, val, true, filter).get().value(); |
| } |
| |
| @Override public String toString() { |
| return S.toString("put", |
| "key", key, true, |
| "val", val, true, |
| "filter", filter, false); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<V> getAndPutAsync(K key, V val) { |
| return getAndPutAsync(key, val, null); |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Filter. |
| * @return Put operation future. |
| */ |
| protected final IgniteInternalFuture<V> getAndPutAsync(K key, V val, @Nullable CacheEntryPredicate filter) { |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key", val, "val"); |
| |
| IgniteInternalFuture<V> fut = getAndPutAsync0(key, val, filter); |
| |
| if (statsEnabled) |
| fut.listen(new UpdatePutAndGetTimeStatClosure<V>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET_AND_PUT, start)); |
| |
| return fut; |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Optional filter. |
| * @return Put operation future. |
| */ |
| public IgniteInternalFuture<V> getAndPutAsync0(final K key, |
| final V val, |
| @Nullable final CacheEntryPredicate filter) { |
| return asyncOp(new AsyncOp<V>() { |
| @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.putAsync(ctx, readyTopVer, key, val, true, filter) |
| .chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); |
| } |
| |
| @Override public String toString() { |
| return S.toString("putAsync", |
| "key", key, true, |
| "val", val, true, |
| "filter", filter, false); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean put(final K key, final V val) throws IgniteCheckedException { |
| return put(key, val, null); |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Filter. |
| * @return {@code True} if optional filter passed and value was stored in cache, |
| * {@code false} otherwise. Note that this method will return {@code true} if filter is not |
| * specified. |
| * @throws IgniteCheckedException If put operation failed. |
| */ |
| public boolean put(final K key, final V val, final CacheEntryPredicate filter) |
| throws IgniteCheckedException { |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key", val, "val"); |
| |
| boolean stored = put0(key, val, filter); |
| |
| if (statsEnabled && stored) |
| metrics0().addPutTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_PUT, start); |
| |
| return stored; |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Filter. |
| * @return {@code True} if optional filter passed and value was stored in cache, |
| * {@code false} otherwise. Note that this method will return {@code true} if filter is not |
| * specified. |
| * @throws IgniteCheckedException If put operation failed. |
| */ |
| protected boolean put0(final K key, final V val, final CacheEntryPredicate filter) |
| throws IgniteCheckedException { |
| Boolean res = syncOp(new SyncOp<Boolean>(true) { |
| @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException { |
| return tx.putAsync(ctx, null, key, val, false, filter).get().success(); |
| } |
| |
| @Override public String toString() { |
| return S.toString("putx", |
| "key", key, true, |
| "val", val, true, |
| "filter", filter, false); |
| } |
| }); |
| |
| assert res != null; |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void putAllConflict(final Map<KeyCacheObject, GridCacheDrInfo> drMap) |
| throws IgniteCheckedException { |
| if (F.isEmpty(drMap)) |
| return; |
| |
| ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); |
| |
| syncOp(new SyncInOp(drMap.size() == 1) { |
| @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { |
| tx.putAllDrAsync(ctx, drMap).get(); |
| } |
| |
| @Override public String toString() { |
| return "putAllConflict [drMap=" + drMap + ']'; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllConflictAsync(final Map<KeyCacheObject, GridCacheDrInfo> drMap) |
| throws IgniteCheckedException { |
| if (F.isEmpty(drMap)) |
| return new GridFinishedFuture<Object>(); |
| |
| ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); |
| |
| return asyncOp(new AsyncOp(drMap.keySet()) { |
| @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.putAllDrAsync(ctx, drMap); |
| } |
| |
| @Override public String toString() { |
| return "putAllConflictAsync [drMap=" + drMap + ']'; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final <T> EntryProcessorResult<T> invoke(@Nullable AffinityTopologyVersion topVer, |
| K key, |
| EntryProcessor<K, V, T> entryProcessor, |
| Object... args) throws IgniteCheckedException { |
| return invoke0(topVer, key, entryProcessor, args); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> EntryProcessorResult<T> invoke(final K key, |
| final EntryProcessor<K, V, T> entryProcessor, |
| final Object... args) throws IgniteCheckedException { |
| return invoke0(null, key, entryProcessor, args); |
| } |
| |
| /** |
| * @param topVer Locked topology version. |
| * @param key Key. |
| * @param entryProcessor Entry processor. |
| * @param args Entry processor arguments. |
| * @return Invoke result. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private <T> EntryProcessorResult<T> invoke0( |
| @Nullable final AffinityTopologyVersion topVer, |
| final K key, |
| final EntryProcessor<K, V, T> entryProcessor, |
| final Object... args) |
| throws IgniteCheckedException { |
| A.notNull(key, "key", entryProcessor, "entryProcessor"); |
| |
| return syncOp(new SyncOp<EntryProcessorResult<T>>(true) { |
| @Override public EntryProcessorResult<T> op(GridNearTxLocal tx) |
| throws IgniteCheckedException { |
| assert topVer == null || tx.implicit(); |
| |
| if (topVer != null) |
| tx.topologyVersion(topVer); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, |
| null, |
| key, |
| (EntryProcessor<K, V, Object>)entryProcessor, |
| args); |
| |
| Map<K, EntryProcessorResult<T>> resMap = fut.get().value(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_INVOKE, start); |
| |
| EntryProcessorResult<T> res = null; |
| |
| if (resMap != null) { |
| assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); |
| |
| res = resMap.isEmpty() ? null : resMap.values().iterator().next(); |
| } |
| |
| return res != null ? res : new CacheInvokeResult(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll(final Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| final Object... args) throws IgniteCheckedException { |
| A.notNull(keys, "keys", entryProcessor, "entryProcessor"); |
| |
| warnIfUnordered(keys, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(keys.size() == 1) { |
| @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) |
| throws IgniteCheckedException { |
| Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, |
| new C1<K, EntryProcessor<K, V, Object>>() { |
| @Override public EntryProcessor apply(K k) { |
| return entryProcessor; |
| } |
| }); |
| |
| IgniteInternalFuture<GridCacheReturn> fut = tx.invokeAsync(ctx, null, invokeMap, args); |
| |
| Map<K, EntryProcessorResult<T>> res = fut.get().value(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_INVOKE_ALL, start); |
| |
| return res != null ? res : Collections.<K, EntryProcessorResult<T>>emptyMap(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> IgniteInternalFuture<EntryProcessorResult<T>> invokeAsync( |
| final K key, |
| final EntryProcessor<K, V, T> entryProcessor, |
| final Object... args) |
| throws EntryProcessorException { |
| A.notNull(key, "key", entryProcessor, "entryProcessor"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture<?> fut = asyncOp(new AsyncOp() { |
| @Override public IgniteInternalFuture op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = |
| Collections.singletonMap(key, (EntryProcessor<K, V, Object>)entryProcessor); |
| |
| return tx.invokeAsync(ctx, readyTopVer, invokeMap, args); |
| } |
| |
| @Override public String toString() { |
| return S.toString("invokeAsync", |
| "key", key, true, |
| "entryProcessor", entryProcessor, false); |
| } |
| }); |
| |
| IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut; |
| |
| return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, EntryProcessorResult<T>>() { |
| @Override public EntryProcessorResult<T> applyx(IgniteInternalFuture<GridCacheReturn> fut) |
| throws IgniteCheckedException { |
| GridCacheReturn ret = fut.get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_INVOKE, start); |
| |
| Map<K, EntryProcessorResult<T>> resMap = ret.value(); |
| |
| if (resMap != null) { |
| assert resMap.isEmpty() || resMap.size() == 1 : resMap.size(); |
| |
| return resMap.isEmpty() ? new CacheInvokeResult<>() : resMap.values().iterator().next(); |
| } |
| |
| return new CacheInvokeResult<>(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( |
| final Set<? extends K> keys, |
| final EntryProcessor<K, V, T> entryProcessor, |
| final Object... args) { |
| A.notNull(keys, "keys", entryProcessor, "entryProcessor"); |
| |
| warnIfUnordered(keys, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(keys) { |
| @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, |
| AffinityTopologyVersion readyTopVer) { |
| Map<? extends K, EntryProcessor<K, V, Object>> invokeMap = F.viewAsMap(keys, new C1<K, EntryProcessor<K, V, Object>>() { |
| @Override public EntryProcessor apply(K k) { |
| return entryProcessor; |
| } |
| }); |
| |
| return tx.invokeAsync(ctx, readyTopVer, invokeMap, args); |
| } |
| |
| @Override public String toString() { |
| return S.toString("invokeAllAsync", |
| "keys", keys, true, |
| "entryProcessor", entryProcessor, false); |
| } |
| }); |
| |
| IgniteInternalFuture<GridCacheReturn> fut0 = |
| (IgniteInternalFuture<GridCacheReturn>)fut; |
| |
| return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() { |
| @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) |
| throws IgniteCheckedException { |
| GridCacheReturn ret = fut.get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_INVOKE_ALL, start); |
| |
| assert ret != null; |
| |
| return ret.value() != null ? ret.value() : Collections.emptyMap(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> IgniteInternalFuture<Map<K, EntryProcessorResult<T>>> invokeAllAsync( |
| final Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| final Object... args) { |
| A.notNull(map, "map"); |
| |
| warnIfUnordered(map, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture<?> fut = asyncOp(new AsyncOp(map.keySet()) { |
| @Override public IgniteInternalFuture<GridCacheReturn> op(GridNearTxLocal tx, |
| AffinityTopologyVersion readyTopVer) { |
| return tx.invokeAsync(ctx, |
| readyTopVer, |
| (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, |
| args); |
| } |
| |
| @Override public String toString() { |
| return S.toString("invokeAllAsync", |
| "map", map, true); |
| } |
| }); |
| |
| IgniteInternalFuture<GridCacheReturn> fut0 = (IgniteInternalFuture<GridCacheReturn>)fut; |
| |
| return fut0.chain(new CX1<IgniteInternalFuture<GridCacheReturn>, Map<K, EntryProcessorResult<T>>>() { |
| @Override public Map<K, EntryProcessorResult<T>> applyx(IgniteInternalFuture<GridCacheReturn> fut) |
| throws IgniteCheckedException { |
| GridCacheReturn ret = fut.get(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_INVOKE_ALL, start); |
| |
| assert ret != null; |
| |
| return ret.value() != null |
| ? ret.<Map<K, EntryProcessorResult<T>>>value() |
| : Collections.<K, EntryProcessorResult<T>>emptyMap(); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public <T> Map<K, EntryProcessorResult<T>> invokeAll( |
| final Map<? extends K, ? extends EntryProcessor<K, V, T>> map, |
| final Object... args) throws IgniteCheckedException { |
| A.notNull(map, "map"); |
| |
| warnIfUnordered(map, BulkOperation.INVOKE); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| return syncOp(new SyncOp<Map<K, EntryProcessorResult<T>>>(map.size() == 1) { |
| @Nullable @Override public Map<K, EntryProcessorResult<T>> op(GridNearTxLocal tx) |
| throws IgniteCheckedException { |
| IgniteInternalFuture<GridCacheReturn> fut = |
| tx.invokeAsync(ctx, null, (Map<? extends K, ? extends EntryProcessor<K, V, Object>>)map, args); |
| |
| Map<K, EntryProcessorResult<T>> value = fut.get().value(); |
| |
| if (statsEnabled) |
| metrics0().addInvokeTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_INVOKE_ALL, start); |
| |
| return value; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Boolean> putAsync(K key, V val) { |
| return putAsync(key, val, null); |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Filter. |
| * @return Put future. |
| */ |
| public final IgniteInternalFuture<Boolean> putAsync(K key, V val, @Nullable CacheEntryPredicate filter) { |
| A.notNull(key, "key", val, "val"); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| IgniteInternalFuture<Boolean> fut = putAsync0(key, val, filter); |
| |
| if (statsEnabled) |
| fut.listen(new UpdatePutTimeStatClosure<Boolean>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_PUT, start)); |
| |
| return fut; |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param filter Optional filter. |
| * @return Putx operation future. |
| */ |
| public IgniteInternalFuture<Boolean> putAsync0(final K key, final V val, |
| @Nullable final CacheEntryPredicate filter) { |
| return asyncOp(new AsyncOp<Boolean>() { |
| @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.putAsync(ctx, |
| readyTopVer, |
| key, |
| val, |
| false, |
| filter).chain( |
| (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); |
| } |
| |
| @Override public String toString() { |
| return S.toString("putxAsync", |
| "key", key, true, |
| "val", val, true, |
| "filter", filter, false); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final V getAndPutIfAbsent(final K key, final V val) throws IgniteCheckedException { |
| return getAndPut(key, val, ctx.noVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<V> getAndPutIfAbsentAsync(final K key, final V val) { |
| return getAndPutAsync(key, val, ctx.noVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean putIfAbsent(final K key, final V val) throws IgniteCheckedException { |
| return put(key, val, ctx.noVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Boolean> putIfAbsentAsync(final K key, final V val) { |
| return putAsync(key, val, ctx.noVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final V getAndReplace(final K key, final V val) throws IgniteCheckedException { |
| return getAndPut(key, val, ctx.hasVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<V> getAndReplaceAsync(final K key, final V val) { |
| return getAndPutAsync(key, val, ctx.hasVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean replace(final K key, final V val) throws IgniteCheckedException { |
| return put(key, val, ctx.hasVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Boolean> replaceAsync(final K key, final V val) { |
| return putAsync(key, val, ctx.hasVal()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean replace(final K key, final V oldVal, final V newVal) throws IgniteCheckedException { |
| A.notNull(oldVal, "oldVal"); |
| |
| return put(key, newVal, ctx.equalsVal(oldVal)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Boolean> replaceAsync(final K key, final V oldVal, final V newVal) { |
| A.notNull(oldVal, "oldVal"); |
| |
| return putAsync(key, newVal, ctx.equalsVal(oldVal)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void putAll(@Nullable final Map<? extends K, ? extends V> m) throws IgniteCheckedException { |
| A.notNull(m, "map"); |
| |
| if (F.isEmpty(m)) |
| return; |
| |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| warnIfUnordered(m, BulkOperation.PUT); |
| |
| putAll0(m); |
| |
| if (statsEnabled) |
| metrics0().addPutAllTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_PUT_ALL, start); |
| } |
| |
| /** |
| * @param m Map. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void putAll0(final Map<? extends K, ? extends V> m) throws IgniteCheckedException { |
| syncOp(new SyncInOp(m.size() == 1) { |
| @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { |
| tx.putAllAsync(ctx, null, m, false).get(); |
| } |
| |
| @Override public String toString() { |
| return S.toString("putAll", |
| "map", m, true); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> putAllAsync(final Map<? extends K, ? extends V> m) { |
| if (F.isEmpty(m)) |
| return new GridFinishedFuture<Object>(); |
| |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| warnIfUnordered(m, BulkOperation.PUT); |
| |
| IgniteInternalFuture<?> fut = putAllAsync0(m); |
| |
| if (statsEnabled) |
| fut.listen(new UpdatePutAllTimeStatClosure<>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_PUT_ALL, start)); |
| |
| return fut; |
| } |
| |
| /** |
| * @param m Map. |
| * @return Future. |
| */ |
| protected IgniteInternalFuture<?> putAllAsync0(final Map<? extends K, ? extends V> m) { |
| return asyncOp(new AsyncOp(m.keySet()) { |
| @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.putAllAsync(ctx, |
| readyTopVer, |
| m, |
| false).chain(RET2NULL); |
| } |
| |
| @Override public String toString() { |
| return S.toString("putAllAsync", |
| "map", m, true); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public V getAndRemove(final K key) throws IgniteCheckedException { |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key"); |
| |
| V prevVal = getAndRemove0(key); |
| |
| if (statsEnabled) |
| metrics0().addRemoveAndGetTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_GET_AND_REMOVE, start); |
| |
| return prevVal; |
| } |
| |
| /** |
| * @param key Key. |
| * @return Previous value. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected V getAndRemove0(final K key) throws IgniteCheckedException { |
| final boolean keepBinary = ctx.keepBinary(); |
| |
| return syncOp(new SyncOp<V>(true) { |
| @Override public V op(GridNearTxLocal tx) throws IgniteCheckedException { |
| K key0 = keepBinary ? (K)ctx.toCacheKeyObject(key) : key; |
| |
| IgniteInternalFuture<GridCacheReturn> fut = tx.removeAllAsync(ctx, |
| null, |
| Collections.singletonList(key0), |
| /*retval*/true, |
| null, |
| /*singleRmv*/false); |
| |
| V ret = fut.get().value(); |
| |
| if (ctx.config().getInterceptor() != null) { |
| K key = keepBinary ? (K)ctx.unwrapBinaryIfNeeded(key0, true, false, null) : key0; |
| |
| return (V)ctx.config().getInterceptor().onBeforeRemove(new CacheEntryImpl(key, ret)).get2(); |
| } |
| |
| return ret; |
| } |
| |
| @Override public String toString() { |
| return S.toString("remove", |
| "key", key, true); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<V> getAndRemoveAsync(final K key) { |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key"); |
| |
| IgniteInternalFuture<V> fut = getAndRemoveAsync0(key); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateGetAndRemoveTimeStatClosure<V>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_GET_AND_REMOVE, start)); |
| |
| return fut; |
| } |
| |
| /** |
| * @param key Key. |
| * @return Future. |
| */ |
| protected IgniteInternalFuture<V> getAndRemoveAsync0(final K key) { |
| return asyncOp(new AsyncOp<V>() { |
| @Override public IgniteInternalFuture<V> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| // TODO should we invoke interceptor here? |
| return tx.removeAllAsync(ctx, |
| readyTopVer, |
| Collections.singletonList(key), |
| /*retval*/true, |
| null, |
| /*singleRmv*/false).chain((IgniteClosure<IgniteInternalFuture<GridCacheReturn>, V>)RET2VAL); |
| } |
| |
| @Override public String toString() { |
| return S.toString("removeAsync", |
| "key", key, true); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeAll() throws IgniteCheckedException { |
| assert ctx.isLocal(); |
| |
| // We do batch and recreate cursor because removing using a single cursor |
| // will cause it to reinitialize on each merged page. |
| List<K> keys = new ArrayList<>(Math.min(REMOVE_ALL_KEYS_BATCH, size())); |
| |
| do { |
| Iterator<CacheDataRow> it = ctx.offheap().cacheIterator(ctx.cacheId(), |
| true, true, null, null, null); |
| |
| while (it.hasNext() && keys.size() < REMOVE_ALL_KEYS_BATCH) |
| keys.add((K)it.next().key()); |
| |
| removeAll(keys); |
| |
| keys.clear(); |
| } |
| while (!isEmpty()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeAll(final Collection<? extends K> keys) throws IgniteCheckedException { |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(keys, "keys"); |
| |
| if (F.isEmpty(keys)) |
| return; |
| |
| warnIfUnordered(keys, BulkOperation.REMOVE); |
| |
| removeAll0(keys); |
| |
| if (statsEnabled) |
| metrics0().addRemoveAllTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_REMOVE_ALL, start); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected void removeAll0(final Collection<? extends K> keys) throws IgniteCheckedException { |
| syncOp(new SyncInOp(keys.size() == 1) { |
| @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { |
| tx.removeAllAsync(ctx, |
| null, |
| keys, |
| /*retval*/false, |
| null, |
| /*singleRmv*/false).get(); |
| } |
| |
| @Override public String toString() { |
| return S.toString("removeAll", |
| "keys", keys, true); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> removeAllAsync(@Nullable final Collection<? extends K> keys) { |
| if (F.isEmpty(keys)) |
| return new GridFinishedFuture<Object>(); |
| |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| warnIfUnordered(keys, BulkOperation.REMOVE); |
| |
| IgniteInternalFuture<Object> fut = removeAllAsync0(keys); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateRemoveAllTimeStatClosure<>(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_REMOVE_ALL, start)); |
| |
| return fut; |
| } |
| |
| /** |
| * @param keys Keys. |
| * @return Future. |
| */ |
| protected IgniteInternalFuture<Object> removeAllAsync0(final Collection<? extends K> keys) { |
| return asyncOp(new AsyncOp(keys) { |
| @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.removeAllAsync(ctx, |
| readyTopVer, |
| keys, |
| /*retval*/false, |
| null, |
| /*singleRmv*/false).chain(RET2NULL); |
| } |
| |
| @Override public String toString() { |
| return S.toString("removeAllAsync", |
| "keys", keys, true); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean remove(final K key) throws IgniteCheckedException { |
| return remove(key, (CacheEntryPredicate)null); |
| } |
| |
| /** |
| * @param key Key. |
| * @param filter Filter. |
| * @return {@code True} if entry was removed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public boolean remove(final K key, @Nullable CacheEntryPredicate filter) throws IgniteCheckedException { |
| boolean statsEnabled = ctx.statisticsEnabled(); |
| boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key"); |
| |
| boolean rmv = remove0(key, filter); |
| |
| if (statsEnabled && rmv) |
| metrics0().addRemoveTimeNanos(System.nanoTime() - start); |
| |
| if (performanceStatsEnabled) |
| writeStatistics(OperationType.CACHE_REMOVE, start); |
| |
| return rmv; |
| } |
| |
| /** |
| * @param key Key. |
| * @param filter Filter. |
| * @return {@code True} if entry was removed. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected boolean remove0(final K key, final CacheEntryPredicate filter) throws IgniteCheckedException { |
| Boolean res = syncOp(new SyncOp<Boolean>(true) { |
| @Override public Boolean op(GridNearTxLocal tx) throws IgniteCheckedException { |
| return tx.removeAllAsync(ctx, |
| null, |
| Collections.singletonList(key), |
| /*retval*/false, |
| filter, |
| /*singleRmv*/filter == null).get().success(); |
| } |
| |
| @Override public String toString() { |
| return S.toString("removex", |
| "key", key, true); |
| } |
| }); |
| |
| assert res != null; |
| |
| return res; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Boolean> removeAsync(K key) { |
| A.notNull(key, "key"); |
| |
| return removeAsync(key, (CacheEntryPredicate)null); |
| } |
| |
| /** |
| * @param key Key to remove. |
| * @param filter Optional filter. |
| * @return Putx operation future. |
| */ |
| public IgniteInternalFuture<Boolean> removeAsync(final K key, @Nullable final CacheEntryPredicate filter) { |
| final boolean statsEnabled = ctx.statisticsEnabled(); |
| final boolean performanceStatsEnabled = ctx.kernalContext().performanceStatistics().enabled(); |
| |
| final long start = statsEnabled || performanceStatsEnabled ? System.nanoTime() : 0L; |
| |
| A.notNull(key, "key"); |
| |
| IgniteInternalFuture<Boolean> fut = removeAsync0(key, filter); |
| |
| if (statsEnabled) |
| fut.listen(new UpdateRemoveTimeStatClosure(metrics0(), start)); |
| |
| if (performanceStatsEnabled) |
| fut.listen(f -> writeStatistics(OperationType.CACHE_REMOVE, start)); |
| |
| return fut; |
| } |
| |
| /** |
| * @param key Key. |
| * @param filter Filter. |
| * @return Future. |
| */ |
| protected IgniteInternalFuture<Boolean> removeAsync0(final K key, @Nullable final CacheEntryPredicate filter) { |
| return asyncOp(new AsyncOp<Boolean>() { |
| @Override public IgniteInternalFuture<Boolean> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.removeAllAsync(ctx, |
| readyTopVer, |
| Collections.singletonList(key), |
| /*retval*/false, |
| filter, |
| /*singleRmv*/true).chain( |
| (IgniteClosure<IgniteInternalFuture<GridCacheReturn>, Boolean>)RET2FLAG); |
| } |
| |
| @Override public String toString() { |
| return S.toString("removeAsync", |
| "key", key, true, |
| "filter", filter, false); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeAllConflict(final Map<KeyCacheObject, GridCacheVersion> drMap) |
| throws IgniteCheckedException { |
| if (F.isEmpty(drMap)) |
| return; |
| |
| ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); |
| |
| syncOp(new SyncInOp(false) { |
| @Override public void inOp(GridNearTxLocal tx) throws IgniteCheckedException { |
| tx.removeAllDrAsync(ctx, drMap).get(); |
| } |
| |
| @Override public String toString() { |
| return "removeAllConflict [drMap=" + drMap + ']'; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> removeAllConflictAsync(final Map<KeyCacheObject, GridCacheVersion> drMap) |
| throws IgniteCheckedException { |
| if (F.isEmpty(drMap)) |
| return new GridFinishedFuture<Object>(); |
| |
| ctx.dr().onReceiveCacheEntriesReceived(drMap.size()); |
| |
| return asyncOp(new AsyncOp(drMap.keySet()) { |
| @Override public IgniteInternalFuture<?> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer) { |
| return tx.removeAllDrAsync(ctx, drMap); |
| } |
| |
| @Override public String toString() { |
| return "removeAllDrASync [drMap=" + drMap + ']'; |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final boolean remove(final K key, final V val) throws IgniteCheckedException { |
| A.notNull(val, "val"); |
| |
| return remove(key, ctx.equalsVal(val)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final IgniteInternalFuture<Boolean> removeAsync(final K key, final V val) { |
| A.notNull(key, "val"); |
| |
| return removeAsync(key, ctx.equalsVal(val)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public final CacheMetrics clusterMetrics() { |
| return clusterMetrics(ctx.grid().cluster().forDataNodes(ctx.name())); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheMetrics clusterMetrics(ClusterGroup grp) { |
| List<CacheMetrics> metrics = new ArrayList<>(grp.nodes().size()); |
| |
| for (ClusterNode node : grp.nodes()) { |
| Map<Integer, CacheMetrics> nodeCacheMetrics = ((IgniteClusterNode)node).cacheMetrics(); |
| |
| if (nodeCacheMetrics != null) { |
| CacheMetrics e = nodeCacheMetrics.get(context().cacheId()); |
| |
| if (e != null) |
| metrics.add(e); |
| } |
| } |
| |
| return isCacheMetricsV2Supported() ? new CacheMetricsSnapshotV2(ctx.cache().localMetrics(), metrics) : |
| new CacheMetricsSnapshot(ctx.cache().localMetrics(), metrics); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheMetrics localMetrics() { |
| return isCacheMetricsV2Supported() ? new CacheMetricsSnapshotV2(metrics) : |
| new CacheMetricsSnapshot(metrics); |
| } |
| |
| /** |
| * @return checks cluster server nodes version is compatible with Cache Metrics V2 |
| */ |
| private boolean isCacheMetricsV2Supported() { |
| Collection<ClusterNode> nodes = ctx.discovery().allNodes(); |
| |
| return IgniteFeatures.allNodesSupports(nodes, IgniteFeatures.CACHE_METRICS_V2); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheMetricsMXBean localMxBean() { |
| return locMxBean; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public CacheMetricsMXBean clusterMxBean() { |
| return clusterMxBean; |
| } |
| |
| /** |
| * @return Metrics. |
| */ |
| public CacheMetricsImpl metrics0() { |
| return metrics; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public GridNearTxLocal tx() { |
| return ctx.tm().threadLocalTx(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean lock(K key, long timeout) throws IgniteCheckedException { |
| A.notNull(key, "key"); |
| |
| return lockAll(Collections.singletonList(key), timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean lockAll(@Nullable Collection<? extends K> keys, long timeout) |
| throws IgniteCheckedException { |
| if (F.isEmpty(keys)) |
| return true; |
| |
| //TODO: IGNITE-9324: add explicit locks support. |
| MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); |
| |
| IgniteInternalFuture<Boolean> fut = lockAllAsync(keys, timeout); |
| |
| boolean isInterrupted = false; |
| |
| try { |
| IgniteCheckedException e = null; |
| |
| while (!ctx.shared().kernalContext().isStopping()) { |
| try { |
| return fut.get(); |
| } |
| catch (IgniteInterruptedCheckedException ex) { |
| // Interrupted status of current thread was cleared, retry to get lock. |
| isInterrupted = true; |
| |
| e = ex; |
| } |
| } |
| |
| try { |
| fut.cancel(); |
| } |
| catch (IgniteCheckedException ex) { |
| if (e != null) |
| ex.addSuppressed(e); |
| |
| e = ex; |
| } |
| |
| throw new NodeStoppingException(e); |
| } |
| finally { |
| if (isInterrupted) |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Boolean> lockAsync(K key, long timeout) { |
| A.notNull(key, "key"); |
| |
| //TODO: IGNITE-9324: add explicit locks support. |
| MvccUtils.verifyMvccOperationSupport(ctx, "Lock"); |
| |
| return lockAllAsync(Collections.singletonList(key), timeout); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unlock(K key) |
| throws IgniteCheckedException { |
| A.notNull(key, "key"); |
| |
| unlockAll(Collections.singletonList(key)); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isLocked(K key) { |
| A.notNull(key, "key"); |
| |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| while (true) { |
| try { |
| GridCacheEntryEx entry = peekEx(cacheKey); |
| |
| return entry != null && entry.lockedByAny(); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| // No-op. |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean isLockedByThread(K key) { |
| A.notNull(key, "key"); |
| |
| try { |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| GridCacheEntryEx e = entry0(cacheKey, |
| ctx.shared().exchange().readyAffinityVersion(), |
| false, |
| false); |
| |
| if (e == null) |
| return false; |
| |
| // Delegate to near if dht. |
| if (e.isDht() && CU.isNearEnabled(ctx)) { |
| IgniteInternalCache<K, V> near = ctx.isDht() ? ctx.dht().near() : ctx.near(); |
| |
| return near.isLockedByThread(key) || e.lockedByThread(); |
| } |
| |
| return e.lockedByThread(); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| return false; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Transaction txStart(TransactionConcurrency concurrency, TransactionIsolation isolation) { |
| A.notNull(concurrency, "concurrency"); |
| A.notNull(isolation, "isolation"); |
| |
| TransactionConfiguration cfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); |
| |
| return txStart( |
| concurrency, |
| isolation, |
| cfg.getDefaultTxTimeout(), |
| 0 |
| ); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridNearTxLocal txStartEx(TransactionConcurrency concurrency, TransactionIsolation isolation) { |
| IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions(); |
| |
| return txs.txStartEx(ctx, concurrency, isolation); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Transaction txStart(TransactionConcurrency concurrency, |
| TransactionIsolation isolation, long timeout, int txSize) throws IllegalStateException { |
| IgniteTransactionsEx txs = ctx.kernalContext().cache().transactions(); |
| |
| return txs.txStartEx(ctx, concurrency, isolation, timeout, txSize).proxy(); |
| } |
| |
| /** |
| * Checks if cache is working in JTA transaction and enlist cache as XAResource if necessary. |
| * |
| * @throws IgniteCheckedException In case of error. |
| */ |
| protected void checkJta() throws IgniteCheckedException { |
| ctx.jta().checkJta(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) |
| throws IgniteCheckedException { |
| //TODO IGNITE-7954 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Load"); |
| |
| final boolean replicate = ctx.isDrEnabled(); |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null; |
| |
| final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| if (p != null) |
| ctx.kernalContext().resource().injectGeneric(p); |
| |
| try { |
| if (ctx.store().isLocal()) { |
| DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.name()); |
| |
| try { |
| ldr.skipStore(true); |
| |
| ldr.receiver(new IgniteDrDataStreamerCacheUpdater()); |
| |
| ldr.keepBinary(keepBinary); |
| |
| LocalStoreLoadClosure c = new LocalStoreLoadClosure(p, ldr, plc); |
| |
| ctx.store().loadCache(c, args); |
| |
| c.onDone(); |
| } |
| finally { |
| ldr.closeEx(false); |
| } |
| } |
| else { |
| // Version for all loaded entries. |
| final GridCacheVersion ver0 = ctx.versions().nextForLoad(); |
| |
| ctx.store().loadCache(new CIX3<KeyCacheObject, Object, GridCacheVersion>() { |
| @Override public void applyx(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) |
| throws IgniteException { |
| assert ver == null; |
| |
| long ttl = CU.ttlForLoad(plc); |
| |
| if (ttl == CU.TTL_ZERO) |
| return; |
| |
| loadEntry(key, val, ver0, (IgniteBiPredicate<Object, Object>)p, topVer, replicate, ttl); |
| } |
| }, args); |
| } |
| } |
| finally { |
| if (p instanceof PlatformCacheEntryFilter) |
| ((PlatformCacheEntryFilter)p).onClose(); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param ver Cache version. |
| * @param p Optional predicate. |
| * @param topVer Topology version. |
| * @param replicate Replication flag. |
| * @param ttl TTL. |
| */ |
| private void loadEntry(KeyCacheObject key, |
| Object val, |
| GridCacheVersion ver, |
| @Nullable IgniteBiPredicate<Object, Object> p, |
| AffinityTopologyVersion topVer, |
| boolean replicate, |
| long ttl) { |
| if (p != null && !p.apply(key.value(ctx.cacheObjectContext(), false), val)) |
| return; |
| |
| CacheObject cacheVal = ctx.toCacheObject(val); |
| |
| GridCacheEntryEx entry = entryEx(key); |
| |
| try { |
| entry.initialValue(cacheVal, |
| ver, |
| ttl, |
| CU.EXPIRE_TIME_CALCULATE, |
| false, |
| topVer, |
| replicate ? DR_LOAD : DR_NONE, |
| true, |
| false); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException("Failed to put cache value: " + entry, e); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry during loadCache (will ignore): " + entry); |
| } |
| finally { |
| entry.touch(); |
| } |
| |
| CU.unwindEvicts(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> localLoadCacheAsync(final IgniteBiPredicate<K, V> p, |
| final Object[] args) { |
| return ctx.closures().callLocalSafe( |
| ctx.projectSafe(new GridPlainCallable<Object>() { |
| @Nullable @Override public Object call() throws IgniteCheckedException { |
| localLoadCache(p, args); |
| |
| return null; |
| } |
| }), true); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param replaceExisting Replace existing values flag. |
| * @return Load future. |
| */ |
| public IgniteInternalFuture<?> loadAll( |
| final Set<? extends K> keys, |
| boolean replaceExisting |
| ) { |
| A.notNull(keys, "keys"); |
| |
| for (Object key : keys) |
| A.notNull(key, "key"); |
| |
| //TODO IGNITE-7954 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Load"); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| return runLoadKeysCallable(keys, plc, keepBinary, replaceExisting); |
| } |
| |
| /** |
| * Run load keys callable on appropriate nodes. |
| * |
| * @param keys Keys. |
| * @param plc Expiry policy. |
| * @param keepBinary Keep binary flag. |
| * @return Operation future. |
| */ |
| private IgniteInternalFuture<?> runLoadKeysCallable(final Set<? extends K> keys, final ExpiryPolicy plc, |
| final boolean keepBinary, final boolean update) { |
| Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes(); |
| |
| if (nodes.isEmpty()) |
| return new GridFinishedFuture<>(); |
| |
| return ctx.closures().callAsyncNoFailover(BROADCAST, |
| new LoadKeysCallable<>(ctx.name(), keys, update, plc, keepBinary), |
| nodes, |
| true, |
| 0, |
| false); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private void localLoadAndUpdate(final Collection<? extends K> keys) throws IgniteCheckedException { |
| try (final DataStreamerImpl<KeyCacheObject, CacheObject> ldr = |
| ctx.kernalContext().<KeyCacheObject, CacheObject>dataStream().dataStreamer(ctx.name())) { |
| ldr.allowOverwrite(true); |
| ldr.skipStore(true); |
| |
| final Collection<DataStreamerEntry> col = new ArrayList<>(ldr.perNodeBufferSize()); |
| |
| Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); |
| |
| ctx.store().loadAll(null, keys0, new CIX2<KeyCacheObject, Object>() { |
| @Override public void applyx(KeyCacheObject key, Object val) { |
| col.add(new DataStreamerEntry(key, ctx.toCacheObject(val))); |
| |
| if (col.size() == ldr.perNodeBufferSize()) { |
| ldr.addDataInternal(col, false); |
| |
| col.clear(); |
| } |
| } |
| }); |
| |
| if (!col.isEmpty()) |
| ldr.addData(col); |
| } |
| } |
| |
| /** |
| * @param keys Keys to load. |
| * @param plc Optional expiry policy. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void localLoad(Collection<? extends K> keys, @Nullable ExpiryPolicy plc, final boolean keepBinary) |
| throws IgniteCheckedException { |
| //TODO IGNITE-7954 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Load"); |
| |
| final boolean replicate = ctx.isDrEnabled(); |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); |
| |
| Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); |
| |
| if (ctx.store().isLocal()) { |
| DataStreamerImpl ldr = ctx.kernalContext().dataStream().dataStreamer(ctx.name()); |
| |
| try { |
| ldr.skipStore(true); |
| |
| ldr.keepBinary(keepBinary); |
| |
| ldr.receiver(new IgniteDrDataStreamerCacheUpdater()); |
| |
| LocalStoreLoadClosure c = new LocalStoreLoadClosure(null, ldr, plc0); |
| |
| ctx.store().localStoreLoadAll(null, keys0, c); |
| |
| c.onDone(); |
| } |
| finally { |
| ldr.closeEx(false); |
| } |
| } |
| else { |
| // Version for all loaded entries. |
| final GridCacheVersion ver0 = ctx.versions().nextForLoad(); |
| |
| ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() { |
| @Override public void apply(KeyCacheObject key, Object val) { |
| long ttl = CU.ttlForLoad(plc0); |
| |
| if (ttl == CU.TTL_ZERO) |
| return; |
| |
| loadEntry(key, val, ver0, null, topVer, replicate, ttl); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param p Predicate. |
| * @param args Arguments. |
| * @throws IgniteCheckedException If failed. |
| */ |
| void globalLoadCache(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) throws IgniteCheckedException { |
| globalLoadCacheAsync(p, args).get(); |
| } |
| |
| /** |
| * @param p Predicate. |
| * @param args Arguments. |
| * @return Load cache future. |
| * @throws IgniteCheckedException If failed. |
| */ |
| IgniteInternalFuture<?> globalLoadCacheAsync(@Nullable IgniteBiPredicate<K, V> p, @Nullable Object... args) |
| throws IgniteCheckedException { |
| |
| ctx.kernalContext().task().setThreadContext(TC_NO_FAILOVER, true); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| ExpiryPolicy plc = opCtx != null ? opCtx.expiry() : null; |
| |
| Collection<ClusterNode> nodes = ctx.kernalContext().grid().cluster().forDataNodes(ctx.name()).nodes(); |
| |
| assert !F.isEmpty(nodes) : "There are not datanodes fo cache: " + ctx.name(); |
| |
| //TODO IGNITE-7954 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Load"); |
| |
| final boolean keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| |
| ComputeTaskInternalFuture fut = ctx.kernalContext().closure().callAsync(BROADCAST, |
| Collections.singletonList( |
| new LoadCacheJobV2<>(ctx.name(), ctx.affinity().affinityTopologyVersion(), p, args, plc, keepBinary)), |
| nodes); |
| |
| return fut; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size(CachePeekMode[] peekModes) throws IgniteCheckedException { |
| if (isLocal()) |
| return localSize(peekModes); |
| |
| return sizeAsync(peekModes).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long sizeLong(CachePeekMode[] peekModes) throws IgniteCheckedException { |
| if (isLocal()) |
| return localSizeLong(peekModes); |
| |
| return sizeLongAsync(peekModes).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long sizeLong(int partition, CachePeekMode[] peekModes) throws IgniteCheckedException { |
| if (isLocal()) |
| return localSizeLong(partition, peekModes); |
| |
| return sizeLongAsync(partition, peekModes).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Integer> sizeAsync(final CachePeekMode[] peekModes) { |
| assert peekModes != null; |
| |
| PeekModes modes = parsePeekModes(peekModes, true); |
| |
| IgniteClusterEx cluster = ctx.grid().cluster(); |
| |
| ClusterGroup grp = modes.near ? cluster.forCacheNodes(name(), true, true, false) : cluster.forDataNodes(name()); |
| |
| Collection<ClusterNode> nodes = new ArrayList<>(grp.nodes()); |
| |
| if (nodes.isEmpty()) |
| return new GridFinishedFuture<>(0); |
| |
| ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); |
| |
| return ctx.kernalContext().task().execute( |
| new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Long> sizeLongAsync(final CachePeekMode[] peekModes) { |
| assert peekModes != null; |
| |
| PeekModes modes = parsePeekModes(peekModes, true); |
| |
| IgniteClusterEx cluster = ctx.grid().cluster(); |
| |
| ClusterGroup grp = modes.near ? cluster.forCacheNodes(name(), true, true, false) : cluster.forDataNodes(name()); |
| |
| Collection<ClusterNode> nodes = new ArrayList<>(grp.nodes()); |
| |
| if (nodes.isEmpty()) |
| return new GridFinishedFuture<>(0L); |
| |
| ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); |
| |
| return ctx.kernalContext().task().execute( |
| new SizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Long> sizeLongAsync(final int part, final CachePeekMode[] peekModes) { |
| assert peekModes != null; |
| |
| final PeekModes modes = parsePeekModes(peekModes, true); |
| |
| IgniteClusterEx cluster = ctx.grid().cluster(); |
| final GridCacheAffinityManager aff = ctx.affinity(); |
| final AffinityTopologyVersion topVer = aff.affinityTopologyVersion(); |
| |
| ClusterGroup grp = cluster.forDataNodes(name()); |
| |
| Collection<ClusterNode> nodes = new ArrayList<>(grp.forPredicate(new IgnitePredicate<ClusterNode>() { |
| /** {@inheritDoc} */ |
| @Override public boolean apply(ClusterNode clusterNode) { |
| return ((modes.primary && aff.primaryByPartition(clusterNode, part, topVer)) || |
| (modes.backup && aff.backupByPartition(clusterNode, part, topVer))); |
| } |
| }).nodes()); |
| |
| if (nodes.isEmpty()) |
| return new GridFinishedFuture<>(0L); |
| |
| ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes); |
| |
| return ctx.kernalContext().task().execute( |
| new PartitionSizeLongTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes, part), null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int localSize(CachePeekMode[] peekModes) throws IgniteCheckedException { |
| return (int)localSizeLong(peekModes); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() { |
| return map.publicSize(ctx.cacheId()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long sizeLong() { |
| return map.publicSize(ctx.cacheId()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int nearSize() { |
| return 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int primarySize() { |
| return map.publicSize(ctx.cacheId()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long primarySizeLong() { |
| return map.publicSize(ctx.cacheId()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridCacheAdapter.class, this, "name", name(), "size", size()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterator<Cache.Entry<K, V>> iterator() { |
| return entrySet().iterator(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterator<Cache.Entry<K, V>> scanIterator(boolean keepBinary, |
| @Nullable IgniteBiPredicate<Object, Object> p) |
| throws IgniteCheckedException { |
| return igniteIterator(keepBinary, p); |
| } |
| |
| /** |
| * @return Distributed ignite cache iterator. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public Iterator<Cache.Entry<K, V>> igniteIterator() throws IgniteCheckedException { |
| return igniteIterator(ctx.keepBinary(), null); |
| } |
| |
| /** |
| * @param keepBinary Keep binary flag. |
| * @return Distributed ignite cache iterator. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary) throws IgniteCheckedException { |
| return igniteIterator(keepBinary, null); |
| } |
| |
| /** |
| * Gets next grid cache version. |
| * |
| * @return Next version based on given topology version. |
| */ |
| public GridCacheVersion nextVersion() { |
| return ctx.versions().next(ctx.topology().readyTopologyVersion().topologyVersion()); |
| } |
| |
| /** |
| * Gets next grid cache version. |
| * |
| * @param dataCenterId Data center id. |
| * @return Next version based on given topology version. |
| */ |
| public GridCacheVersion nextVersion(byte dataCenterId) { |
| return ctx.versions().next(ctx.topology().readyTopologyVersion().topologyVersion(), dataCenterId); |
| } |
| |
| /** |
| * @param keepBinary Keep binary flag. |
| * @param p Optional predicate. |
| * @return Distributed ignite cache iterator. |
| * @throws IgniteCheckedException If failed. |
| */ |
| private Iterator<Cache.Entry<K, V>> igniteIterator(boolean keepBinary, |
| @Nullable IgniteBiPredicate<Object, Object> p) |
| throws IgniteCheckedException { |
| GridCacheContext ctx0 = ctx.isNear() ? ctx.near().dht().context() : ctx; |
| |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final GridCloseableIterator<Map.Entry<K, V>> iter = ctx0.queries().createScanQuery(p, null, keepBinary, null) |
| .executeScanQuery(); |
| |
| return ctx.itHolder().iterator(iter, new CacheIteratorConverter<Cache.Entry<K, V>, Map.Entry<K, V>>() { |
| @Override protected Cache.Entry<K, V> convert(Map.Entry<K, V> e) { |
| // Actually Scan Query returns Iterator<CacheQueryEntry> by default, |
| // CacheQueryEntry implements both Map.Entry and Cache.Entry interfaces. |
| return (Cache.Entry<K, V>)e; |
| } |
| |
| @Override protected void remove(Cache.Entry<K, V> item) { |
| CacheOperationContext prev = ctx.gate().enter(opCtx); |
| |
| try { |
| GridCacheAdapter.this.remove(item.getKey()); |
| } |
| catch (IgniteCheckedException e) { |
| throw CU.convertToCacheException(e); |
| } |
| finally { |
| ctx.gate().leave(prev); |
| } |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long offHeapEntriesCount() { |
| try { |
| IgniteCacheOffheapManager mgr = ctx.offheap(); |
| |
| return mgr != null ? mgr.cacheEntriesCount(ctx.cacheId(), |
| true, |
| true, |
| ctx.affinity().affinityTopologyVersion()) : -1; |
| } |
| catch (IgniteCheckedException ignore) { |
| return 0; |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long offHeapAllocatedSize() { |
| IgniteCacheOffheapManager mgr = ctx.offheap(); |
| |
| return mgr != null ? mgr.offHeapAllocatedSize() : -1; |
| } |
| |
| /** |
| * Asynchronously commits transaction after all previous asynchronous operations are completed. |
| * |
| * @param tx Transaction to commit. |
| * @return Transaction commit future. |
| */ |
| @SuppressWarnings("unchecked") |
| IgniteInternalFuture<IgniteInternalTx> commitTxAsync(final GridNearTxLocal tx) { |
| FutureHolder holder = lastFut.get(); |
| |
| holder.lock(); |
| |
| try { |
| IgniteInternalFuture fut = holder.future(); |
| |
| if (fut != null && !fut.isDone()) { |
| IgniteInternalFuture<IgniteInternalTx> f = new GridEmbeddedFuture<>(fut, |
| new C2<Object, Exception, IgniteInternalFuture<IgniteInternalTx>>() { |
| @Override public IgniteInternalFuture<IgniteInternalTx> apply(Object o, Exception e) { |
| return tx.commitNearTxLocalAsync(); |
| } |
| }); |
| |
| saveFuture(holder, f, /*asyncOp*/false, /*retry*/false); |
| |
| return f; |
| } |
| |
| IgniteInternalFuture<IgniteInternalTx> f = tx.commitNearTxLocalAsync(); |
| |
| saveFuture(holder, f, /*asyncOp*/false, /*retry*/false); |
| |
| ctx.tm().resetContext(); |
| |
| return f; |
| } |
| finally { |
| holder.unlock(); |
| } |
| } |
| |
| /** |
| * Awaits for previous async operation to be completed. |
| */ |
| public void awaitLastFut() { |
| FutureHolder holder = lastFut.get(); |
| |
| IgniteInternalFuture fut = holder.future(); |
| |
| if (fut != null && !fut.isDone()) { |
| try { |
| // Ignore any exception from previous async operation as it should be handled by user. |
| fut.get(); |
| } |
| catch (IgniteCheckedException ignored) { |
| // No-op. |
| } |
| } |
| } |
| |
| /** |
| * @param op Cache operation. |
| * @param <T> Return type. |
| * @return Operation result. |
| * @throws IgniteCheckedException If operation failed. |
| */ |
| @SuppressWarnings({"ErrorNotRethrown", "AssignmentToCatchBlockParameter"}) |
| @Nullable private <T> T syncOp(SyncOp<T> op) throws IgniteCheckedException { |
| checkJta(); |
| |
| awaitLastFut(); |
| |
| GridNearTxLocal tx = checkCurrentTx(); |
| |
| if (tx == null || tx.implicit()) { |
| TransactionConfiguration tCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES; |
| |
| for (int i = 0; i < retries; i++) { |
| tx = ctx.tm().newTx( |
| true, |
| op.single(), |
| ctx.systemTx() ? ctx : null, |
| ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC, |
| ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED, |
| tCfg.getDefaultTxTimeout(), |
| !ctx.skipStore(), |
| ctx.mvccEnabled(), |
| 0, |
| null, |
| false |
| ); |
| |
| assert tx != null; |
| |
| try { |
| T t = op.op(tx); |
| |
| assert tx.done() : "Transaction is not done: " + tx; |
| |
| return t; |
| } |
| catch (IgniteInterruptedCheckedException | |
| IgniteTxHeuristicCheckedException | |
| NodeStoppingException | |
| IgniteConsistencyViolationException e) { |
| throw e; |
| } |
| catch (IgniteCheckedException e) { |
| if (!(e instanceof IgniteTxRollbackCheckedException)) { |
| try { |
| tx.rollback(); |
| |
| if (!(e instanceof TransactionCheckedException)) |
| e = new IgniteTxRollbackCheckedException("Transaction has been rolled back: " + |
| tx.xid(), e); |
| } |
| catch (IgniteCheckedException | AssertionError | RuntimeException e1) { |
| U.error(log, "Failed to rollback transaction (cache may contain stale locks): " + |
| CU.txString(tx), e1); |
| |
| if (e != e1) |
| e.addSuppressed(e1); |
| } |
| } |
| |
| if (X.hasCause(e, ClusterTopologyCheckedException.class) && i != retries - 1) { |
| ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); |
| |
| if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { |
| AffinityTopologyVersion topVer = tx.topologyVersion(); |
| |
| assert topVer != null && topVer.topologyVersion() > 0 : tx; |
| |
| AffinityTopologyVersion awaitVer = new AffinityTopologyVersion( |
| topVer.topologyVersion() + 1, 0); |
| |
| ctx.shared().exchange().affinityReadyFuture(awaitVer).get(); |
| |
| continue; |
| } |
| } |
| |
| throw e; |
| } |
| catch (RuntimeException e) { |
| try { |
| tx.rollback(); |
| } |
| catch (IgniteCheckedException | AssertionError | RuntimeException e1) { |
| U.error(log, "Failed to rollback transaction " + CU.txString(tx), e1); |
| } |
| |
| throw e; |
| } |
| finally { |
| ctx.tm().resetContext(); |
| |
| if (ctx.isNear()) |
| ctx.near().dht().context().tm().resetContext(); |
| } |
| } |
| |
| // Should not happen. |
| throw new IgniteCheckedException("Failed to perform cache operation (maximum number of retries exceeded)."); |
| } |
| else |
| return op.op(tx); |
| } |
| |
| /** |
| * @param op Cache operation. |
| * @param <T> Return type. |
| * @return Future. |
| */ |
| private <T> IgniteInternalFuture<T> asyncOp(final AsyncOp<T> op) { |
| try { |
| checkJta(); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Performing async op: " + op); |
| |
| GridNearTxLocal tx = checkCurrentTx(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| final TransactionConfiguration txCfg = CU.transactionConfiguration(ctx, ctx.kernalContext().config()); |
| |
| if (tx == null || tx.implicit()) { |
| boolean skipStore = ctx.skipStore(); // Save value of thread-local flag. |
| |
| int retries = opCtx != null && opCtx.noRetries() ? 1 : MAX_RETRIES; |
| |
| if (retries == 1) { |
| tx = ctx.tm().newTx( |
| true, |
| op.single(), |
| ctx.systemTx() ? ctx : null, |
| ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC, |
| ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED, |
| txCfg.getDefaultTxTimeout(), |
| !skipStore, |
| ctx.mvccEnabled(), |
| 0, |
| null, |
| false); |
| |
| return asyncOp(tx, op, opCtx, /*retry*/false); |
| } |
| else { |
| AsyncOpRetryFuture<T> fut = new AsyncOpRetryFuture<>(op, retries, opCtx); |
| |
| fut.execute(/*retry*/false); |
| |
| return fut; |
| } |
| } |
| else |
| return asyncOp(tx, op, opCtx, /*retry*/false); |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @param op Cache operation. |
| * @param opCtx Cache operation context. |
| * @param <T> Return type. |
| * @return Future. |
| */ |
| @SuppressWarnings("unchecked") |
| protected <T> IgniteInternalFuture<T> asyncOp( |
| GridNearTxLocal tx, |
| final AsyncOp<T> op, |
| final CacheOperationContext opCtx, |
| final boolean retry |
| ) { |
| IgniteInternalFuture<T> fail = asyncOpAcquire(retry); |
| |
| if (fail != null) |
| return fail; |
| |
| FutureHolder holder = lastFut.get(); |
| |
| holder.lock(); |
| |
| try { |
| IgniteInternalFuture fut = holder.future(); |
| |
| final GridNearTxLocal tx0 = tx; |
| |
| final CX1 clo = new CX1<IgniteInternalFuture<T>, T>() { |
| @Override public T applyx(IgniteInternalFuture<T> tFut) throws IgniteCheckedException { |
| try { |
| return tFut.get(); |
| } |
| catch (IgniteTxTimeoutCheckedException | |
| IgniteTxRollbackCheckedException | |
| NodeStoppingException | |
| IgniteConsistencyViolationException e) { |
| throw e; |
| } |
| catch (IgniteCheckedException e1) { |
| try { |
| tx0.rollbackNearTxLocalAsync(); |
| } |
| catch (Throwable e2) { |
| if (e1 != e2) |
| e1.addSuppressed(e2); |
| } |
| |
| throw e1; |
| } |
| finally { |
| ctx.shared().txContextReset(); |
| } |
| } |
| }; |
| |
| if (fut != null && !fut.isDone()) { |
| IgniteInternalFuture<T> f = new GridEmbeddedFuture(fut, |
| (IgniteOutClosure<IgniteInternalFuture>)() -> { |
| GridFutureAdapter resFut = new GridFutureAdapter(); |
| |
| ctx.kernalContext().closure().runLocalSafe((GridPlainRunnable)() -> { |
| IgniteInternalFuture fut0; |
| |
| if (ctx.kernalContext().isStopping()) |
| fut0 = new GridFinishedFuture<>( |
| new IgniteCheckedException("Operation has been cancelled (node or cache is stopping).")); |
| else if (ctx.gate().isStopped()) |
| fut0 = new GridFinishedFuture<>(new CacheStoppedException(ctx.name())); |
| else { |
| ctx.operationContextPerCall(opCtx); |
| ctx.shared().txContextReset(); |
| |
| try { |
| fut0 = op.op(tx0).chain(clo); |
| } |
| finally { |
| // It is necessary to clear tx context in this thread as well. |
| ctx.shared().txContextReset(); |
| ctx.operationContextPerCall(null); |
| } |
| } |
| |
| fut0.listen((IgniteInClosure<IgniteInternalFuture>)fut01 -> { |
| try { |
| resFut.onDone(fut01.get()); |
| } |
| catch (Throwable ex) { |
| resFut.onDone(ex); |
| } |
| }); |
| }, true); |
| |
| return resFut; |
| }); |
| |
| saveFuture(holder, f, /*asyncOp*/true, retry); |
| |
| return f; |
| } |
| |
| /** |
| * Wait for concurrent tx operation to finish. |
| * See {@link GridDhtTxLocalAdapter#updateLockFuture(IgniteInternalFuture, IgniteInternalFuture)} |
| */ |
| if (!tx0.txState().implicitSingle()) |
| tx0.txState().awaitLastFuture(ctx.shared()); |
| |
| IgniteInternalFuture<T> f; |
| |
| try { |
| f = op.op(tx).chain(clo); |
| } |
| finally { |
| // It is necessary to clear tx context in this thread as well. |
| ctx.shared().txContextReset(); |
| } |
| |
| saveFuture(holder, f, /*asyncOp*/true, retry); |
| |
| if (tx.implicit()) |
| ctx.tm().resetContext(); |
| |
| return f; |
| } |
| finally { |
| holder.unlock(); |
| } |
| } |
| |
| /** |
| * Saves future in thread local holder and adds listener |
| * that will clear holder when future is finished. |
| * |
| * @param holder Future holder. |
| * @param fut Future to save. |
| * @param asyncOp Whether operation is instance of AsyncOp. |
| * @param retry {@code true} for retry operations. |
| */ |
| protected void saveFuture(final FutureHolder holder, IgniteInternalFuture<?> fut, final boolean asyncOp, final boolean retry) { |
| assert holder != null; |
| assert fut != null; |
| assert holder.holdsLock(); |
| |
| holder.future(fut); |
| |
| if (fut.isDone()) { |
| holder.future(null); |
| |
| if (asyncOp) |
| asyncOpRelease(retry); |
| } |
| else { |
| fut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> f) { |
| if (asyncOp) |
| asyncOpRelease(retry); |
| |
| if (!holder.tryLock()) |
| return; |
| |
| try { |
| if (holder.future() == f) |
| holder.future(null); |
| } |
| finally { |
| holder.unlock(); |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Tries to acquire asynchronous operations permit, if limited. |
| * |
| * @param retry Retry flag. |
| * @return Failed future if waiting was interrupted. |
| */ |
| @Nullable protected <T> IgniteInternalFuture<T> asyncOpAcquire(boolean retry) { |
| try { |
| if (!retry && asyncOpsSem != null) |
| asyncOpsSem.acquire(); |
| |
| return null; |
| } |
| catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| |
| return new GridFinishedFuture<>(new IgniteInterruptedCheckedException("Failed to wait for asynchronous " + |
| "operation permit (thread got interrupted).", e)); |
| } |
| } |
| |
| /** |
| * Releases asynchronous operations permit, if limited. |
| * |
| * @param retry Retry flag. |
| */ |
| protected final void asyncOpRelease(boolean retry) { |
| if (!retry && asyncOpsSem != null) |
| asyncOpsSem.release(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| U.writeString(out, ctx.igniteInstanceName()); |
| U.writeString(out, ctx.name()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| IgniteBiTuple<String, String> t = stash.get(); |
| |
| t.set1(U.readString(in)); |
| t.set2(U.readString(in)); |
| } |
| |
| /** |
| * Reconstructs object on unmarshalling. |
| * |
| * @return Reconstructed object. |
| * @throws ObjectStreamException Thrown in case of unmarshalling error. |
| */ |
| protected Object readResolve() throws ObjectStreamException { |
| try { |
| IgniteBiTuple<String, String> t = stash.get(); |
| |
| return IgnitionEx.localIgnite().cachex(t.get2()); |
| } |
| catch (IllegalStateException e) { |
| throw U.withCause(new InvalidObjectException(e.getMessage()), e); |
| } |
| finally { |
| stash.remove(); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> rebalance() { |
| return ctx.preloader().forceRebalance(); |
| } |
| |
| /** |
| * @param key Key. |
| * @param readers Whether to clear readers. |
| */ |
| private boolean clearLocally0(K key, boolean readers) { |
| ctx.shared().cache().checkReadOnlyState("clear", ctx.config()); |
| |
| //TODO IGNITE-7952 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Clear"); |
| |
| ctx.checkSecurity(SecurityPermission.CACHE_REMOVE); |
| |
| GridCacheVersion obsoleteVer = nextVersion(); |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| KeyCacheObject cacheKey = ctx.toCacheKeyObject(key); |
| |
| GridCacheEntryEx entry = ctx.isNear() ? peekEx(cacheKey) : entryEx(cacheKey); |
| |
| if (entry != null) |
| return entry.clear(obsoleteVer, readers); |
| } |
| catch (GridDhtInvalidPartitionException ignored) { |
| // No-op. |
| } |
| catch (IgniteCheckedException ex) { |
| U.error(log, "Failed to clearLocally entry for key: " + key, ex); |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| |
| return false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean evict(K key) { |
| A.notNull(key, "key"); |
| |
| //TODO IGNITE-7956 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Evict"); |
| |
| return evictx(key, nextVersion(), CU.empty0()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void evictAll(Collection<? extends K> keys) { |
| A.notNull(keys, "keys"); |
| |
| if (F.isEmpty(keys)) |
| return; |
| |
| //TODO IGNITE-7956 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Evict"); |
| |
| GridCacheVersion obsoleteVer = nextVersion(); |
| |
| try { |
| ctx.evicts().batchEvict(keys, obsoleteVer); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to perform batch evict for keys: " + keys, e); |
| } |
| } |
| |
| /** |
| * @param filter Filters to evaluate. |
| * @return Entry set. |
| */ |
| public Set<Cache.Entry<K, V>> entrySet(@Nullable CacheEntryPredicate... filter) { |
| boolean keepBinary = ctx.keepBinary(); |
| |
| return new EntrySet(map.entrySet(ctx.cacheId(), filter), keepBinary); |
| } |
| |
| /** |
| * @param key Key. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param needVer Need version. |
| * @return Cached value. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable public final V repairableGet( |
| K key, |
| boolean deserializeBinary, |
| boolean needVer) throws IgniteCheckedException { |
| try { |
| return get( |
| key, |
| ctx.kernalContext().job().currentTaskName(), |
| deserializeBinary, |
| needVer); |
| } |
| catch (IgniteConsistencyViolationException e) { |
| repairAsync(e, ctx.operationContextPerCall(), false).get(); |
| |
| return repairableGet(key, deserializeBinary, needVer); |
| } |
| catch (IgniteException e) { |
| if (e.getCause(IgniteCheckedException.class) != null) |
| throw e.getCause(IgniteCheckedException.class); |
| else |
| throw e; |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param needVer Need version. |
| * @return Cached value. |
| * @throws IgniteCheckedException If failed. |
| */ |
| protected V get( |
| final K key, |
| String taskName, |
| boolean deserializeBinary, |
| boolean needVer) throws IgniteCheckedException { |
| checkJta(); |
| |
| return getAsync(key, |
| !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| taskName, |
| deserializeBinary, |
| /*skip vals*/false, |
| needVer).get(); |
| } |
| |
| /** |
| * @param key Key. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param needVer Need version. |
| * @return Read operation future. |
| */ |
| public final IgniteInternalFuture<V> repairableGetAsync( |
| final K key, |
| boolean deserializeBinary, |
| final boolean needVer, |
| ReadRepairStrategy readRepairStrategy) { |
| try { |
| checkJta(); |
| } |
| catch (IgniteCheckedException e) { |
| return new GridFinishedFuture<>(e); |
| } |
| |
| IgniteInternalFuture<V> fut = getAsync(key, |
| !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| ctx.kernalContext().job().currentTaskName(), |
| deserializeBinary, |
| /*skip vals*/false, |
| needVer); |
| |
| if (readRepairStrategy != null) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return getWithRepairAsync( |
| fut, |
| (e) -> repairAsync(e, opCtx, false), |
| () -> repairableGetAsync(key, deserializeBinary, needVer, readRepairStrategy)); |
| } |
| |
| return fut; |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param needVer Need version. |
| * @return Map of cached values. |
| * @throws IgniteCheckedException If read failed. |
| */ |
| protected Map<K, V> repairableGetAll( |
| Collection<? extends K> keys, |
| boolean deserializeBinary, |
| boolean needVer, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy) throws IgniteCheckedException { |
| try { |
| return getAll(keys, deserializeBinary, needVer, recovery, readRepairStrategy); |
| } |
| catch (IgniteConsistencyViolationException e) { |
| repairAsync(e, ctx.operationContextPerCall(), false).get(); |
| |
| return repairableGetAll(keys, deserializeBinary, needVer, recovery, readRepairStrategy); |
| } |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param deserializeBinary Deserialize binary flag. |
| * @param needVer Need version. |
| * @param recovery Recovery flag. |
| * @param readRepairStrategy Read repair strategy. |
| * @return Map of cached values. |
| * @throws IgniteCheckedException If read failed. |
| */ |
| protected Map<K, V> getAll( |
| Collection<? extends K> keys, |
| boolean deserializeBinary, |
| boolean needVer, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy) throws IgniteCheckedException { |
| checkJta(); |
| |
| return getAllAsync(keys, |
| !ctx.config().isReadFromBackup(), |
| /*skip tx*/false, |
| ctx.kernalContext().job().currentTaskName(), |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| /*skip vals*/false, |
| needVer).get(); |
| } |
| |
| /** |
| * @param keys Keys. |
| * @param forcePrimary Force primary. |
| * @param skipTx Skip tx. |
| * @param taskName Task name. |
| * @param deserializeBinary Deserialize binary. |
| * @param recovery Recovery mode flag. |
| * @param skipVals Skip values. |
| * @param needVer Need version. |
| * @return Future for the get operation. |
| * @see GridCacheAdapter#getAllAsync(Collection) |
| */ |
| public IgniteInternalFuture<Map<K, V>> repairableGetAllAsync( |
| @Nullable Collection<? extends K> keys, |
| boolean forcePrimary, |
| boolean skipTx, |
| String taskName, |
| boolean deserializeBinary, |
| boolean recovery, |
| ReadRepairStrategy readRepairStrategy, |
| boolean skipVals, |
| final boolean needVer |
| ) { |
| IgniteInternalFuture<Map<K, V>> fut = getAllAsync( |
| keys, |
| forcePrimary, |
| skipTx, |
| taskName, |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| skipVals, |
| needVer); |
| |
| if (readRepairStrategy != null) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return getWithRepairAsync( |
| fut, |
| (e) -> repairAsync(e, opCtx, skipVals), |
| () -> repairableGetAllAsync( |
| keys, |
| forcePrimary, |
| skipTx, |
| taskName, |
| deserializeBinary, |
| recovery, |
| readRepairStrategy, |
| skipVals, |
| needVer)); |
| } |
| |
| return fut; |
| } |
| |
| /** |
| * Performs repair and retries get if necessary. |
| * @param orig Original get future. |
| * @param repair Repair callback, used in case of inconcstency. |
| * @param retry Callback to original method to retry operation after repair. |
| */ |
| private <R> IgniteInternalFuture<R> getWithRepairAsync( |
| IgniteInternalFuture<R> orig, |
| Function<IgniteConsistencyViolationException, IgniteInternalFuture<Void>> repair, |
| Supplier<IgniteInternalFuture<R>> retry) { |
| final GridNearTxLocal tx = checkCurrentTx(); |
| final CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| GridFutureAdapter<R> fut = new GridFutureAdapter<>(); |
| |
| orig.listen((f) -> { |
| try { |
| fut.onDone(f.get()); |
| } |
| catch (IgniteConsistencyViolationException e1) { |
| repair.apply(e1).listen((repFut) -> { |
| if (repFut.error() != null) |
| fut.onDone(repFut.error()); |
| else { |
| CacheOperationContext prevOpCtx = ctx.operationContextPerCall(); |
| |
| IgniteInternalTx prevTx = ctx.tm().tx(tx); // Within the original tx. |
| ctx.operationContextPerCall(opCtx); // With the same operation context. |
| |
| try { |
| fut.onDone(retry.get().get()); |
| } |
| catch (IgniteCheckedException e2) { |
| fut.onDone(e2); |
| } |
| finally { |
| ctx.tm().tx(prevTx); |
| ctx.operationContextPerCall(prevOpCtx); |
| } |
| } |
| }); |
| } |
| catch (IgniteCheckedException e1) { |
| fut.onDone(e1); |
| } |
| }); |
| |
| return fut; |
| } |
| |
| /** |
| * Checks the given {@code keys} and repairs entries across the topology if needed. |
| * |
| * @param ex Consistency violation exception. |
| * @param opCtx Operation context. |
| * @param skipVals Skip values flag. |
| * @return Compound future that represents a result of repair action. |
| */ |
| private IgniteInternalFuture<Void> repairAsync( |
| IgniteConsistencyViolationException ex, |
| final CacheOperationContext opCtx, |
| boolean skipVals) { |
| GridCompoundReadRepairFuture fut = new GridCompoundReadRepairFuture(); |
| |
| for (KeyCacheObject key : ex.keys()) { |
| fut.add(ctx.transactional() ? |
| repairTxAsync(key, opCtx, skipVals) : |
| repairAtomicAsync(key, (IgniteAtomicConsistencyViolationException)ex, opCtx)); |
| } |
| |
| fut.markInitialized(); |
| |
| return fut; |
| } |
| |
| /** |
| * Checks the given {@code key} and repairs entry across the topology if needed. |
| * |
| * @param key Key. |
| * @param opCtx Operation context. |
| * @param skipVals Skip values flag. |
| * @return Recovery future. |
| */ |
| private IgniteInternalFuture<Void> repairTxAsync( |
| final KeyCacheObject key, |
| final CacheOperationContext opCtx, |
| boolean skipVals) { |
| assert ctx.transactional(); |
| |
| final GridNearTxLocal orig = checkCurrentTx(); |
| |
| assert orig == null || orig.optimistic() || orig.readCommitted() || /*contains*/ skipVals : |
| "Pessimistic non-read-committed 'get' should be fixed inside its own tx, the only exception is 'contains' " + |
| "[tx=" + orig + ", skipVals=" + skipVals + "]"; |
| |
| // Async check and recover if necessary. |
| return ctx.kernalContext().closure().callLocalSafe(new GridPlainCallable<Void>() { |
| @Override public Void call() throws IgniteCheckedException { |
| CacheOperationContext prevOpCtx = ctx.operationContextPerCall(); |
| |
| ctx.operationContextPerCall(opCtx); |
| |
| try (Transaction tx = ctx.grid().transactions().txStart(PESSIMISTIC, SERIALIZABLE)) { |
| get((K)key, null, false, false); // Repair. |
| |
| final GridNearTxLocal tx0 = checkCurrentTx(); |
| |
| final IgniteTxKey txKey = ctx.txKey(ctx.toCacheKeyObject(key)); |
| |
| // Value was broken, fixed locally and should be spread across the topology. |
| if (tx0.txState().hasWriteKey(txKey)) |
| tx.commit(); |
| |
| return null; |
| } |
| finally { |
| ctx.operationContextPerCall(prevOpCtx); |
| } |
| } |
| }); |
| } |
| |
| /** |
| * Checks the given {@code key} and repairs entry across the topology if needed. |
| * |
| * @param key Key. |
| * @param ex Ignite atomic consistency violation exception |
| * @param opCtx Operation context. |
| * @return Recovery future. |
| */ |
| private IgniteInternalFuture<Void> repairAtomicAsync( |
| final KeyCacheObject key, |
| final IgniteAtomicConsistencyViolationException ex, |
| final CacheOperationContext opCtx) { |
| assert ctx.atomic(); |
| |
| EntryGetResult correctedRes = ex.correctedMap().get(key); |
| EntryGetResult primRes = ex.primaryMap().get(key); |
| |
| CacheObject correctedObj = correctedRes != null ? correctedRes.value() : null; |
| |
| V correctedVal = correctedObj != null ? (V)ctx.unwrapBinaryIfNeeded(correctedObj, true, false, null) : null; |
| |
| GridCacheVersion primVer = primRes != null ? primRes.version() : null; |
| |
| return ctx.kernalContext().closure().callLocalSafe(new GridPlainCallable<Boolean>() { |
| @Override public Boolean call() throws IgniteCheckedException { |
| CacheOperationContext prevOpCtx = ctx.operationContextPerCall(); |
| |
| ctx.operationContextPerCall(opCtx.keepBinary()); |
| |
| try { |
| return invoke((K)key, new AtomicReadRepairEntryProcessor<>(correctedVal, primVer)).get(); |
| } |
| finally { |
| ctx.operationContextPerCall(prevOpCtx); |
| } |
| } |
| }).chain(fut -> { |
| if (fut.result()) |
| ex.onRepaired(key); // Event recording after fixed. |
| |
| return null; |
| }); |
| } |
| |
| /** |
| * |
| */ |
| protected static final class AtomicReadRepairEntryProcessor<K, V> implements CacheEntryProcessor<K, V, Boolean> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Corrected value. */ |
| private final V correctedVal; |
| |
| /** Primary version. */ |
| private final GridCacheVersion primVer; |
| |
| /** |
| * @param correctedVal Corrected value. |
| * @param primVer Primary version. |
| */ |
| public AtomicReadRepairEntryProcessor(V correctedVal, GridCacheVersion primVer) { |
| this.correctedVal = correctedVal; |
| this.primVer = primVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Boolean process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException { |
| try { |
| if ((primVer == null && entry.getValue() == null) || // Still null at primary. |
| // No updates since consistency violation has been found. |
| primVer.equals(((CacheInvokeEntry<Object, Object>)entry).entry().version())) { |
| |
| if (correctedVal != null) |
| entry.setValue(correctedVal); |
| else |
| entry.remove(); |
| |
| return true; |
| } |
| else |
| return false; |
| } |
| catch (GridCacheEntryRemovedException e) { |
| throw new EntryProcessorException(e); |
| } |
| } |
| } |
| |
| /** |
| * @param entry Entry. |
| * @param ver Version. |
| */ |
| public abstract void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver); |
| |
| /** |
| * |
| */ |
| public void onReconnected() { |
| // No-op. |
| } |
| |
| /** |
| * Checks that given map is sorted or otherwise constant order, or processed inside deadlock-detecting transaction. |
| * |
| * Issues developer warning otherwise. |
| * |
| * @param m Map to examine. |
| */ |
| protected void warnIfUnordered(Map<?, ?> m, BulkOperation op) { |
| if (ctx.atomic()) |
| return; |
| |
| if (m == null || m.size() <= 1) |
| return; |
| |
| if (m instanceof SortedMap || m instanceof GridSerializableMap) |
| return; |
| |
| Transaction tx = ctx.kernalContext().cache().transactions().tx(); |
| |
| if (tx != null && !op.canBlockTx(tx.concurrency(), tx.isolation())) |
| return; |
| |
| LT.warn(log, "Unordered map " + m.getClass().getName() + |
| " is used for " + op.title() + " operation on cache " + name() + ". " + |
| "This can lead to a distributed deadlock. Switch to a sorted map like TreeMap instead."); |
| } |
| |
| /** |
| * Checks that given collection is sorted set, or processed inside deadlock-detecting transaction. |
| * |
| * Issues developer warning otherwise. |
| * |
| * @param coll Collection to examine. |
| */ |
| protected void warnIfUnordered(Collection<?> coll, BulkOperation op) { |
| if (ctx.atomic()) |
| return; |
| |
| if (coll == null || coll.size() <= 1) |
| return; |
| |
| if (coll instanceof SortedSet || coll instanceof GridCacheAdapter.KeySet) |
| return; |
| |
| // To avoid false positives, once removeAll() is called, cache will never issue Remove All warnings. |
| if (ctx.lastRemoveAllJobFut().get() != null && op == BulkOperation.REMOVE) |
| return; |
| |
| Transaction tx = ctx.kernalContext().cache().transactions().tx(); |
| |
| if (op == BulkOperation.GET && tx == null) |
| return; |
| |
| if (tx != null && !op.canBlockTx(tx.concurrency(), tx.isolation())) |
| return; |
| |
| LT.warn(log, "Unordered collection " + coll.getClass().getName() + |
| " is used for " + op.title() + " operation on cache " + name() + ". " + |
| "This can lead to a distributed deadlock. Switch to a sorted set like TreeSet instead."); |
| } |
| |
| /** */ |
| protected enum BulkOperation { |
| /** */ |
| GET, |
| |
| /** */ |
| PUT, |
| |
| /** */ |
| INVOKE, |
| |
| /** */ |
| REMOVE; |
| |
| /** */ |
| public String title() { |
| return name().toLowerCase() + "All"; |
| } |
| |
| /** */ |
| public boolean canBlockTx(TransactionConcurrency concurrency, TransactionIsolation isolation) { |
| if (concurrency == OPTIMISTIC && isolation == SERIALIZABLE) |
| return false; |
| |
| if (this == GET && concurrency == PESSIMISTIC && isolation == READ_COMMITTED) |
| return false; |
| |
| return true; |
| } |
| } |
| |
| /** |
| * @param it Internal entry iterator. |
| * @param deserializeBinary Deserialize binary flag. |
| * @return Public API iterator. |
| */ |
| protected final Iterator<Cache.Entry<K, V>> iterator(final Iterator<? extends GridCacheEntryEx> it, |
| final boolean deserializeBinary) { |
| return new Iterator<Cache.Entry<K, V>>() { |
| { |
| advance(); |
| } |
| |
| /** */ |
| private Cache.Entry<K, V> next; |
| |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| @Override public Cache.Entry<K, V> next() { |
| if (next == null) |
| throw new NoSuchElementException(); |
| |
| Cache.Entry<K, V> e = next; |
| |
| advance(); |
| |
| return e; |
| } |
| |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| /** |
| * Switch to next entry. |
| */ |
| private void advance() { |
| next = null; |
| |
| while (it.hasNext()) { |
| GridCacheEntryEx entry = it.next(); |
| |
| try { |
| next = toCacheEntry(entry, deserializeBinary); |
| |
| if (next == null) |
| continue; |
| |
| break; |
| } |
| catch (IgniteCheckedException e) { |
| throw CU.convertToCacheException(e); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| // No-op. |
| } |
| } |
| } |
| }; |
| } |
| |
| /** |
| * @param entry Internal entry. |
| * @param deserializeBinary Deserialize binary flag. |
| * @return Public API entry. |
| * @throws IgniteCheckedException If failed. |
| * @throws GridCacheEntryRemovedException If entry removed. |
| */ |
| @Nullable private Cache.Entry<K, V> toCacheEntry(GridCacheEntryEx entry, |
| boolean deserializeBinary) |
| throws IgniteCheckedException, GridCacheEntryRemovedException { |
| CacheObject val = entry.innerGet( |
| /*ver*/null, |
| /*tx*/null, |
| /*readThrough*/false, |
| /*metrics*/false, |
| /*evt*/false, |
| /*transformClo*/null, |
| /*taskName*/null, |
| /*expiryPlc*/null, |
| !deserializeBinary); |
| |
| if (val == null) |
| return null; |
| |
| KeyCacheObject key = entry.key(); |
| |
| Object key0 = ctx.unwrapBinaryIfNeeded(key, !deserializeBinary, true, null); |
| Object val0 = ctx.unwrapBinaryIfNeeded(val, !deserializeBinary, true, null); |
| |
| return new CacheEntryImpl<>((K)key0, (V)val0, entry.version()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void preloadPartition(int part) throws IgniteCheckedException { |
| if (isLocal()) |
| ctx.offheap().preloadPartition(part); |
| else |
| executePreloadTask(part).get(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<?> preloadPartitionAsync(int part) throws IgniteCheckedException { |
| if (isLocal()) { |
| return ctx.kernalContext().closure().runLocalSafe(new GridPlainRunnable() { |
| @Override public void run() { |
| try { |
| ctx.offheap().preloadPartition(part); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| }); |
| } |
| else |
| return executePreloadTask(part); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean localPreloadPartition(int part) throws IgniteCheckedException { |
| if (!ctx.affinityNode()) |
| return false; |
| |
| GridDhtPartitionTopology top = ctx.group().topology(); |
| |
| @Nullable GridDhtLocalPartition p = top.localPartition(part, top.readyTopologyVersion(), false); |
| |
| if (p == null) |
| return false; |
| |
| try { |
| if (!p.reserve() || p.state() != OWNING) |
| return false; |
| |
| p.dataStore().preload(); |
| } |
| finally { |
| p.release(); |
| } |
| |
| return true; |
| } |
| |
| /** |
| * |
| */ |
| private class AsyncOpRetryFuture<T> extends GridFutureAdapter<T> { |
| /** */ |
| private AsyncOp<T> op; |
| |
| /** */ |
| private int retries; |
| |
| /** */ |
| private GridNearTxLocal tx; |
| |
| /** */ |
| private CacheOperationContext opCtx; |
| |
| /** |
| * @param op Operation. |
| * @param retries Number of retries. |
| * @param opCtx Operation context per call to save. |
| */ |
| public AsyncOpRetryFuture( |
| AsyncOp<T> op, |
| int retries, |
| CacheOperationContext opCtx |
| ) { |
| assert retries > 1 : retries; |
| |
| tx = null; |
| |
| this.op = op; |
| this.retries = retries; |
| this.opCtx = opCtx; |
| } |
| |
| /** |
| * |
| */ |
| public void execute(boolean retry) { |
| tx = ctx.tm().newTx( |
| true, |
| op.single(), |
| ctx.systemTx() ? ctx : null, |
| ctx.mvccEnabled() ? PESSIMISTIC : OPTIMISTIC, |
| ctx.mvccEnabled() ? REPEATABLE_READ : READ_COMMITTED, |
| CU.transactionConfiguration(ctx, ctx.kernalContext().config()).getDefaultTxTimeout(), |
| opCtx == null || !opCtx.skipStore(), |
| ctx.mvccEnabled(), |
| 0, |
| null, |
| false); |
| |
| IgniteInternalFuture<T> fut = asyncOp(tx, op, opCtx, retry); |
| |
| fut.listen(new IgniteInClosure<IgniteInternalFuture<T>>() { |
| @Override public void apply(IgniteInternalFuture<T> fut) { |
| try { |
| T res = fut.get(); |
| |
| onDone(res); |
| } |
| catch (IgniteCheckedException e) { |
| if (X.hasCause(e, ClusterTopologyCheckedException.class) && --retries > 0) { |
| ClusterTopologyCheckedException topErr = e.getCause(ClusterTopologyCheckedException.class); |
| |
| if (!(topErr instanceof ClusterTopologyServerNotFoundException)) { |
| IgniteTxLocalAdapter tx = AsyncOpRetryFuture.this.tx; |
| |
| assert tx != null; |
| |
| AffinityTopologyVersion topVer = tx.topologyVersion(); |
| |
| assert topVer != null && topVer.topologyVersion() > 0 : tx; |
| |
| AffinityTopologyVersion awaitVer = new AffinityTopologyVersion(topVer.topologyVersion() + 1, 0); |
| |
| IgniteInternalFuture<?> topFut = |
| ctx.shared().exchange().affinityReadyFuture(awaitVer); |
| |
| topFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> topFut) { |
| try { |
| topFut.get(); |
| |
| execute(/*retry*/true); |
| } |
| catch (IgniteCheckedException e) { |
| onDone(e); |
| } |
| finally { |
| ctx.shared().txContextReset(); |
| } |
| } |
| }); |
| |
| return; |
| } |
| } |
| |
| onDone(e); |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static final class PeekModes { |
| /** */ |
| public boolean near; |
| |
| /** */ |
| public boolean primary; |
| |
| /** */ |
| public boolean backup; |
| |
| /** */ |
| public boolean heap; |
| |
| /** */ |
| public boolean offheap; |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(PeekModes.class, this); |
| } |
| } |
| |
| /** |
| * @param peekModes Cache peek modes array. |
| * @param primary Defines the default behavior if affinity flags are not specified. |
| * @return Peek modes flags. |
| */ |
| protected static PeekModes parsePeekModes(CachePeekMode[] peekModes, boolean primary) { |
| PeekModes modes = new PeekModes(); |
| |
| if (F.isEmpty(peekModes)) { |
| modes.primary = true; |
| |
| if (!primary) { |
| modes.backup = true; |
| modes.near = true; |
| } |
| |
| modes.heap = true; |
| modes.offheap = true; |
| } |
| else { |
| for (CachePeekMode peekMode : peekModes) { |
| A.notNull(peekMode, "peekMode"); |
| |
| switch (peekMode) { |
| case ALL: |
| modes.near = true; |
| modes.primary = true; |
| modes.backup = true; |
| |
| modes.heap = true; |
| modes.offheap = true; |
| |
| break; |
| |
| case BACKUP: |
| modes.backup = true; |
| |
| break; |
| |
| case PRIMARY: |
| modes.primary = true; |
| |
| break; |
| |
| case NEAR: |
| modes.near = true; |
| |
| break; |
| |
| case ONHEAP: |
| modes.heap = true; |
| |
| break; |
| |
| case OFFHEAP: |
| modes.offheap = true; |
| |
| break; |
| |
| default: |
| assert false : peekMode; |
| } |
| } |
| } |
| |
| if (!(modes.heap || modes.offheap)) { |
| modes.heap = true; |
| modes.offheap = true; |
| } |
| |
| if (!(modes.primary || modes.backup || modes.near)) { |
| modes.primary = true; |
| |
| if (!primary) { |
| modes.backup = true; |
| modes.near = true; |
| } |
| } |
| |
| assert modes.heap || modes.offheap; |
| assert modes.primary || modes.backup || modes.near; |
| |
| return modes; |
| } |
| |
| /** |
| * @param plc Explicitly specified expiry policy for cache operation. |
| * @return Expiry policy wrapper. |
| */ |
| @Nullable public final IgniteCacheExpiryPolicy expiryPolicy(@Nullable ExpiryPolicy plc) { |
| if (plc == null) |
| plc = ctx.expiry(); |
| |
| return CacheExpiryPolicy.forPolicy(plc); |
| } |
| |
| /** |
| * Cache operation. |
| */ |
| private abstract class SyncOp<T> { |
| /** Flag to indicate only-one-key operation. */ |
| private final boolean single; |
| |
| /** |
| * @param single Flag to indicate only-one-key operation. |
| */ |
| SyncOp(boolean single) { |
| this.single = single; |
| } |
| |
| /** |
| * @return Flag to indicate only-one-key operation. |
| */ |
| final boolean single() { |
| return single; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @return Operation return value. |
| * @throws IgniteCheckedException If failed. |
| */ |
| @Nullable public abstract T op(GridNearTxLocal tx) throws IgniteCheckedException; |
| } |
| |
| /** |
| * Cache operation. |
| */ |
| private abstract class SyncInOp extends SyncOp<Object> { |
| /** |
| * @param single Flag to indicate only-one-key operation. |
| */ |
| SyncInOp(boolean single) { |
| super(single); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final Object op(GridNearTxLocal tx) throws IgniteCheckedException { |
| inOp(tx); |
| |
| return null; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public abstract void inOp(GridNearTxLocal tx) throws IgniteCheckedException; |
| } |
| |
| /** |
| * Cache operation. |
| */ |
| protected abstract class AsyncOp<T> { |
| /** Flag to indicate only-one-key operation. */ |
| private final boolean single; |
| |
| /** |
| * |
| */ |
| protected AsyncOp() { |
| single = true; |
| } |
| |
| /** |
| * @param keys Keys involved. |
| */ |
| protected AsyncOp(Collection<?> keys) { |
| single = keys.size() == 1; |
| } |
| |
| /** |
| * @return Flag to indicate only-one-key operation. |
| */ |
| final boolean single() { |
| return single; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @param readyTopVer Ready topology version. |
| * @return Operation future. |
| */ |
| public abstract IgniteInternalFuture<T> op(GridNearTxLocal tx, AffinityTopologyVersion readyTopVer); |
| |
| /** |
| * @param tx Transaction. |
| * @return Operation future. |
| */ |
| public IgniteInternalFuture<T> op(final GridNearTxLocal tx) { |
| AffinityTopologyVersion txTopVer = tx.topologyVersionSnapshot(); |
| |
| if (txTopVer != null) |
| return op(tx, (AffinityTopologyVersion)null); |
| |
| // Tx needs affinity for entry creation, wait when affinity is ready to avoid blocking inside async operation. |
| final AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion(); |
| |
| IgniteInternalFuture<?> topFut = ctx.shared().exchange().affinityReadyFuture(topVer); |
| |
| if (topFut == null || topFut.isDone()) |
| return op(tx, topVer); |
| else |
| return waitTopologyFuture(topFut, topVer, tx, ctx.operationContextPerCall()); |
| } |
| |
| /** |
| * @param topFut Topology future. |
| * @param topVer Topology version to use. |
| * @param tx Transaction. |
| * @param opCtx Operation context. |
| * @return Operation future. |
| */ |
| private IgniteInternalFuture<T> waitTopologyFuture(IgniteInternalFuture<?> topFut, |
| final AffinityTopologyVersion topVer, |
| final GridNearTxLocal tx, |
| final CacheOperationContext opCtx) { |
| final GridFutureAdapter fut0 = new GridFutureAdapter(); |
| |
| topFut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> topFut) { |
| try { |
| topFut.get(); |
| |
| IgniteInternalFuture<?> opFut = runOp(tx, topVer, opCtx); |
| |
| opFut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> opFut) { |
| try { |
| fut0.onDone(opFut.get()); |
| } |
| catch (IgniteCheckedException e) { |
| fut0.onDone(e); |
| } |
| } |
| }); |
| } |
| catch (IgniteCheckedException e) { |
| fut0.onDone(e); |
| } |
| } |
| }); |
| |
| return fut0; |
| } |
| |
| /** |
| * @param tx Transaction. |
| * @param topVer Ready topology version. |
| * @param opCtx Operation context. |
| * @return Future. |
| */ |
| private IgniteInternalFuture<T> runOp(GridNearTxLocal tx, |
| AffinityTopologyVersion topVer, |
| CacheOperationContext opCtx) { |
| ctx.operationContextPerCall(opCtx); |
| |
| ctx.shared().txContextReset(); |
| |
| try { |
| return op(tx, topVer); |
| } |
| finally { |
| ctx.shared().txContextReset(); |
| |
| ctx.operationContextPerCall(null); |
| } |
| } |
| } |
| |
| /** |
| * Global clear all. |
| */ |
| private static class GlobalClearAllJob extends TopologyVersionAwareJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| */ |
| private GlobalClearAllJob(String cacheName, AffinityTopologyVersion topVer) { |
| super(cacheName, topVer); |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| if (cache != null) |
| cache.clearLocally(clearServerCache(), clearNearCache(), true); |
| |
| return null; |
| } |
| |
| /** |
| * @return Whether to clear server cache. |
| */ |
| protected boolean clearServerCache() { |
| return true; |
| } |
| |
| /** |
| * @return Whether to clear near cache. |
| */ |
| protected boolean clearNearCache() { |
| return false; |
| } |
| } |
| |
| /** |
| * Global clear keys. |
| */ |
| private static class GlobalClearKeySetJob<K> extends TopologyVersionAwareJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Keys to remove. */ |
| private final Set<? extends K> keys; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param keys Keys to clear. |
| */ |
| private GlobalClearKeySetJob(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { |
| super(cacheName, topVer); |
| |
| this.keys = keys; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| if (cache != null) |
| cache.clearLocallyAll(keys, clearServerCache(), clearNearCache(), true); |
| |
| return null; |
| } |
| |
| /** |
| * @return Whether to clear server cache. |
| */ |
| protected boolean clearServerCache() { |
| return true; |
| } |
| |
| /** |
| * @return Whether to clear near cache. |
| */ |
| protected boolean clearNearCache() { |
| return false; |
| } |
| } |
| |
| /** |
| * Global clear all for near cache. |
| */ |
| private static class GlobalClearAllNearJob extends GlobalClearAllJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| */ |
| private GlobalClearAllNearJob(String cacheName, AffinityTopologyVersion topVer) { |
| super(cacheName, topVer); |
| } |
| |
| /** |
| * @return Whether to clear server cache. |
| */ |
| @Override protected boolean clearServerCache() { |
| return false; |
| } |
| |
| /** |
| * @return Whether to clear near cache. |
| */ |
| @Override protected boolean clearNearCache() { |
| return true; |
| } |
| } |
| |
| /** |
| * Global clear keys for near cache. |
| */ |
| private static class GlobalClearKeySetNearJob<K> extends GlobalClearKeySetJob<K> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param keys Keys to clear. |
| */ |
| private GlobalClearKeySetNearJob(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) { |
| super(cacheName, topVer, keys); |
| } |
| |
| /** |
| * @return Whether to clear server cache. |
| */ |
| @Override protected boolean clearServerCache() { |
| return false; |
| } |
| |
| /** |
| * @return Whether to clear near cache. |
| */ |
| @Override protected boolean clearNearCache() { |
| return true; |
| } |
| } |
| |
| /** |
| * Internal callable for partition size calculation. |
| */ |
| private static class PartitionSizeLongJob extends TopologyVersionAwareJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Partition. */ |
| private final int partition; |
| |
| /** Peek modes. */ |
| private final CachePeekMode[] peekModes; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param peekModes Cache peek modes. |
| * @param partition partition. |
| */ |
| private PartitionSizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes, |
| int partition) { |
| super(cacheName, topVer); |
| |
| this.peekModes = peekModes; |
| this.partition = partition; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| if (cache == null) |
| return 0; |
| |
| try { |
| return cache.localSizeLong(partition, peekModes); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(PartitionSizeLongJob.class, this); |
| } |
| } |
| |
| /** |
| * Internal callable for global size calculation. |
| */ |
| private static class SizeJob extends TopologyVersionAwareJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Peek modes. */ |
| private final CachePeekMode[] peekModes; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param peekModes Cache peek modes. |
| */ |
| private SizeJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { |
| super(cacheName, topVer); |
| |
| this.peekModes = peekModes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| if (cache == null) |
| return 0; |
| |
| try { |
| return cache.localSize(peekModes); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(SizeJob.class, this); |
| } |
| } |
| |
| /** |
| * Internal callable for global size calculation. |
| */ |
| private static class SizeLongJob extends TopologyVersionAwareJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Peek modes. */ |
| private final CachePeekMode[] peekModes; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param peekModes Cache peek modes. |
| */ |
| private SizeLongJob(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { |
| super(cacheName, topVer); |
| |
| this.peekModes = peekModes; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| if (cache == null) |
| return 0L; |
| |
| try { |
| return cache.localSizeLong(peekModes); |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(SizeLongJob.class, this); |
| } |
| } |
| |
| /** |
| * Internal callable for global size calculation. |
| */ |
| @GridInternal |
| private static class LoadCacheJob<K, V> extends TopologyVersionAwareJob { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private final IgniteBiPredicate<K, V> p; |
| |
| /** */ |
| private final Object[] loadArgs; |
| |
| /** */ |
| private final ExpiryPolicy plc; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param p Predicate. |
| * @param loadArgs Arguments. |
| * @param plc Policy. |
| */ |
| private LoadCacheJob(String cacheName, AffinityTopologyVersion topVer, IgniteBiPredicate<K, V> p, |
| Object[] loadArgs, |
| ExpiryPolicy plc) { |
| super(cacheName, topVer); |
| |
| this.p = p; |
| this.loadArgs = loadArgs; |
| this.plc = plc; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| try { |
| assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]"; |
| |
| if (plc != null) |
| cache = cache.withExpiryPolicy(plc); |
| |
| cache.localLoadCache(p, loadArgs); |
| |
| return null; |
| } |
| catch (IgniteCheckedException e) { |
| throw U.convertException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(LoadCacheJob.class, this); |
| } |
| } |
| |
| /** |
| * Load cache job that with keepBinary flag. |
| */ |
| private static class LoadCacheJobV2<K, V> extends LoadCacheJob<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private final boolean keepBinary; |
| |
| /** |
| * Constructor. |
| * |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param p Predicate. |
| * @param loadArgs Arguments. |
| * @param keepBinary Keep binary flag. |
| */ |
| public LoadCacheJobV2(final String cacheName, final AffinityTopologyVersion topVer, |
| final IgniteBiPredicate<K, V> p, final Object[] loadArgs, final ExpiryPolicy plc, |
| final boolean keepBinary) { |
| super(cacheName, topVer, p, loadArgs, plc); |
| |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache) { |
| assert cache != null : "Failed to get a cache [cacheName=" + cacheName + ", topVer=" + topVer + "]"; |
| |
| if (keepBinary) |
| cache = cache.keepBinary(); |
| |
| return super.localExecute(cache); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(LoadCacheJobV2.class, this); |
| } |
| } |
| |
| /** |
| * Holder for last async operation future. |
| */ |
| protected static class FutureHolder { |
| /** Lock. */ |
| private final ReentrantLock lock = new ReentrantLock(); |
| |
| /** Future. */ |
| private IgniteInternalFuture fut; |
| |
| /** |
| * Tries to acquire lock. |
| * |
| * @return Whether lock was actually acquired. |
| */ |
| public boolean tryLock() { |
| return lock.tryLock(); |
| } |
| |
| /** |
| * Acquires lock. |
| */ |
| @SuppressWarnings("LockAcquiredButNotSafelyReleased") |
| public void lock() { |
| lock.lock(); |
| } |
| |
| /** |
| * Releases lock. |
| */ |
| public void unlock() { |
| lock.unlock(); |
| } |
| |
| /** |
| * @return Whether lock is held by current thread. |
| */ |
| public boolean holdsLock() { |
| return lock.isHeldByCurrentThread(); |
| } |
| |
| /** |
| * Gets future. |
| * |
| * @return Future. |
| */ |
| public IgniteInternalFuture future() { |
| return fut; |
| } |
| |
| /** |
| * Sets future. |
| * |
| * @param fut Future. |
| */ |
| public void future(@Nullable IgniteInternalFuture fut) { |
| this.fut = fut; |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected abstract static class CacheExpiryPolicy implements IgniteCacheExpiryPolicy { |
| /** */ |
| private Map<KeyCacheObject, GridCacheVersion> entries; |
| |
| /** */ |
| private Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrsMap; |
| |
| /** |
| * @param expiryPlc Expiry policy. |
| * @return Access expire policy. |
| */ |
| @Nullable private static CacheExpiryPolicy forPolicy(@Nullable final ExpiryPolicy expiryPlc) { |
| if (expiryPlc == null) |
| return null; |
| |
| return new CacheExpiryPolicy() { |
| @Override public long forAccess() { |
| return CU.toTtl(expiryPlc.getExpiryForAccess()); |
| } |
| |
| @Override public long forCreate() { |
| return CU.toTtl(expiryPlc.getExpiryForCreation()); |
| } |
| |
| @Override public long forUpdate() { |
| return CU.toTtl(expiryPlc.getExpiryForUpdate()); |
| } |
| }; |
| } |
| |
| /** |
| * @param createTtl Create TTL. |
| * @param accessTtl Access TTL. |
| * @return Access expire policy. |
| */ |
| @Nullable public static CacheExpiryPolicy fromRemote(final long createTtl, final long accessTtl) { |
| if (createTtl == CU.TTL_NOT_CHANGED && accessTtl == CU.TTL_NOT_CHANGED) |
| return null; |
| |
| return new CacheExpiryPolicy() { |
| @Override public long forCreate() { |
| return createTtl; |
| } |
| |
| @Override public long forAccess() { |
| return accessTtl; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long forUpdate() { |
| return CU.TTL_NOT_CHANGED; |
| } |
| }; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void reset() { |
| if (entries != null) |
| entries.clear(); |
| |
| if (rdrsMap != null) |
| rdrsMap.clear(); |
| } |
| |
| /** |
| * @param key Entry key. |
| * @param ver Entry version. |
| */ |
| @Override public void ttlUpdated(KeyCacheObject key, |
| GridCacheVersion ver, |
| @Nullable Collection<UUID> rdrs) { |
| if (entries == null) |
| entries = new HashMap<>(); |
| |
| entries.put(key, ver); |
| |
| if (rdrs != null && !rdrs.isEmpty()) { |
| if (rdrsMap == null) |
| rdrsMap = new HashMap<>(); |
| |
| for (UUID nodeId : rdrs) { |
| Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>> col = rdrsMap.get(nodeId); |
| |
| if (col == null) |
| rdrsMap.put(nodeId, col = new ArrayList<>()); |
| |
| col.add(new T2<>(key, ver)); |
| } |
| } |
| } |
| |
| /** |
| * @return TTL update request. |
| */ |
| @Nullable @Override public Map<KeyCacheObject, GridCacheVersion> entries() { |
| return entries; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> readers() { |
| return rdrsMap; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean readyToFlush(int cnt) { |
| return (entries != null && entries.size() > cnt) || (rdrsMap != null && rdrsMap.size() > cnt); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(CacheExpiryPolicy.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| static class LoadKeysCallable<K, V> implements IgniteCallable<Void>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Cache name. */ |
| private String cacheName; |
| |
| /** Injected grid instance. */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** Keys to load. */ |
| private Collection<? extends K> keys; |
| |
| /** Update flag. */ |
| private boolean update; |
| |
| /** */ |
| private ExpiryPolicy plc; |
| |
| /** */ |
| private boolean keepBinary; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public LoadKeysCallable() { |
| // No-op. |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param keys Keys. |
| * @param update If {@code true} calls {@link #localLoadAndUpdate(Collection)} |
| * otherwise {@link #localLoad(Collection, ExpiryPolicy, boolean)}. |
| * @param plc Expiry policy. |
| * @param keepBinary Keep binary flag. |
| */ |
| LoadKeysCallable(final String cacheName, final Collection<? extends K> keys, final boolean update, |
| final ExpiryPolicy plc, final boolean keepBinary) { |
| this.cacheName = cacheName; |
| this.keys = keys; |
| this.update = update; |
| this.plc = plc; |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Void call() throws Exception { |
| GridCacheAdapter<K, V> cache = ((IgniteKernal)ignite).context().cache().internalCache(cacheName); |
| |
| assert cache != null : cacheName; |
| |
| cache.context().gate().enter(); |
| |
| try { |
| if (update) |
| cache.localLoadAndUpdate(keys); |
| else |
| cache.localLoad(keys, plc, keepBinary); |
| } |
| finally { |
| cache.context().gate().leave(); |
| } |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(final ObjectOutput out) throws IOException { |
| U.writeString(out, cacheName); |
| |
| U.writeCollection(out, keys); |
| |
| out.writeBoolean(update); |
| |
| out.writeObject(plc != null ? new IgniteExternalizableExpiryPolicy(plc) : null); |
| |
| out.writeBoolean(keepBinary); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(final ObjectInput in) throws IOException, ClassNotFoundException { |
| cacheName = U.readString(in); |
| |
| keys = U.readCollection(in); |
| |
| update = in.readBoolean(); |
| |
| plc = (ExpiryPolicy)in.readObject(); |
| |
| keepBinary = in.readBoolean(); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private class LocalStoreLoadClosure extends CIX3<KeyCacheObject, Object, GridCacheVersion> { |
| /** */ |
| final IgniteBiPredicate<K, V> p; |
| |
| /** */ |
| final Collection<GridCacheRawVersionedEntry> col; |
| |
| /** */ |
| final DataStreamerImpl<K, V> ldr; |
| |
| /** */ |
| final ExpiryPolicy plc; |
| |
| /** |
| * @param p Key/value predicate. |
| * @param ldr Loader. |
| * @param plc Optional expiry policy. |
| */ |
| private LocalStoreLoadClosure(@Nullable IgniteBiPredicate<K, V> p, |
| DataStreamerImpl<K, V> ldr, |
| @Nullable ExpiryPolicy plc) { |
| this.p = p; |
| this.ldr = ldr; |
| this.plc = plc; |
| |
| col = new ArrayList<>(ldr.perNodeBufferSize()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void applyx(KeyCacheObject key, Object val, GridCacheVersion ver) |
| throws IgniteCheckedException { |
| assert ver != null; |
| |
| if (p != null && !p.apply(key.<K>value(ctx.cacheObjectContext(), false), (V)val)) |
| return; |
| |
| long ttl = 0; |
| |
| if (plc != null) { |
| ttl = CU.toTtl(plc.getExpiryForCreation()); |
| |
| if (ttl == CU.TTL_ZERO) |
| return; |
| else if (ttl == CU.TTL_NOT_CHANGED) |
| ttl = 0; |
| } |
| |
| GridCacheRawVersionedEntry e = new GridCacheRawVersionedEntry(ctx.toCacheKeyObject(key), |
| ctx.toCacheObject(val), |
| ttl, |
| 0, |
| ver.conflictVersion()); |
| |
| e.prepareDirectMarshal(ctx.cacheObjectContext()); |
| |
| col.add(e); |
| |
| if (col.size() == ldr.perNodeBufferSize()) { |
| ldr.addDataInternal(col, false); |
| |
| col.clear(); |
| } |
| } |
| |
| /** |
| * Adds remaining data to loader. |
| */ |
| void onDone() { |
| if (!col.isEmpty()) |
| ldr.addDataInternal(col, false); |
| } |
| } |
| |
| /** |
| * |
| */ |
| private static class LoadCacheClosure<K, V> implements Callable<Void>, Externalizable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| private String cacheName; |
| |
| /** */ |
| private IgniteBiPredicate<K, V> p; |
| |
| /** */ |
| private Object[] args; |
| |
| /** */ |
| @IgniteInstanceResource |
| private Ignite ignite; |
| |
| /** */ |
| private ExpiryPolicy plc; |
| |
| /** |
| * Required by {@link Externalizable}. |
| */ |
| public LoadCacheClosure() { |
| // No-op. |
| } |
| |
| /** |
| * @param cacheName Cache name. |
| * @param p Predicate. |
| * @param args Arguments. |
| * @param plc Explicitly specified expiry policy. |
| */ |
| private LoadCacheClosure(String cacheName, |
| IgniteBiPredicate<K, V> p, |
| Object[] args, |
| @Nullable ExpiryPolicy plc) { |
| this.cacheName = cacheName; |
| this.p = p; |
| this.args = args; |
| this.plc = plc; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Void call() throws Exception { |
| IgniteCache<K, V> cache = ignite.cache(cacheName); |
| |
| assert cache != null : cacheName; |
| |
| if (plc != null) |
| cache = cache.withExpiryPolicy(plc); |
| |
| cache.localLoadCache(p, args); |
| |
| return null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void writeExternal(ObjectOutput out) throws IOException { |
| out.writeObject(p); |
| |
| out.writeObject(args); |
| |
| U.writeString(out, cacheName); |
| |
| if (plc != null) |
| out.writeObject(new IgniteExternalizableExpiryPolicy(plc)); |
| else |
| out.writeObject(null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { |
| p = (IgniteBiPredicate<K, V>)in.readObject(); |
| |
| args = (Object[])in.readObject(); |
| |
| cacheName = U.readString(in); |
| |
| plc = (ExpiryPolicy)in.readObject(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(LoadCacheClosure.class, this); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected abstract static class UpdateTimeStatClosure<T> implements CI1<IgniteInternalFuture<T>> { |
| /** */ |
| protected final CacheMetricsImpl metrics; |
| |
| /** */ |
| protected final long start; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdateTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| this.metrics = metrics; |
| this.start = start; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void apply(IgniteInternalFuture<T> fut) { |
| try { |
| if (!fut.isCancelled()) { |
| T res = fut.get(); |
| |
| updateTimeStat(res); |
| } |
| } |
| catch (IgniteCheckedException ignore) { |
| //No-op. |
| } |
| } |
| |
| /** |
| * Updates statistics. |
| * |
| * @param res Result of operation. |
| */ |
| protected abstract void updateTimeStat(T res); |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdateGetTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdateGetTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addGetTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdateGetAllTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdateGetAllTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addGetAllTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdateGetAndRemoveTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdateGetAndRemoveTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addRemoveAndGetTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdateRemoveTimeStatClosure extends UpdateTimeStatClosure<Boolean> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdateRemoveTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(Boolean res) { |
| if (res) |
| metrics.addRemoveTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdateRemoveAllTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdateRemoveAllTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addRemoveAllTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdatePutTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdatePutTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addPutTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdatePutAllTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdatePutAllTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addPutAllTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class UpdatePutAndGetTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public UpdatePutAndGetTimeStatClosure(CacheMetricsImpl metrics, long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addPutAndGetTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected static class InvokeAllTimeStatClosure<T> extends UpdateTimeStatClosure<T> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** |
| * @param metrics Metrics. |
| * @param start Start time. |
| */ |
| public InvokeAllTimeStatClosure(CacheMetricsImpl metrics, final long start) { |
| super(metrics, start); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void updateTimeStat(T res) { |
| metrics.addInvokeTimeNanos(System.nanoTime() - start); |
| } |
| } |
| |
| /** |
| * Writes cache operation performance statistics. |
| * |
| * @param op Operation type. |
| * @param start Start time in nanoseconds. |
| */ |
| private void writeStatistics(OperationType op, long start) { |
| ctx.kernalContext().performanceStatistics().cacheOperation( |
| op, |
| ctx.cacheId(), |
| U.currentTimeMillis(), |
| System.nanoTime() - start); |
| } |
| |
| /** |
| * Delayed callable class. |
| */ |
| public abstract static class TopologyVersionAwareJob extends ComputeJobAdapter { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Injected job context. */ |
| @JobContextResource |
| protected ComputeJobContext jobCtx; |
| |
| /** Injected grid instance. */ |
| @IgniteInstanceResource |
| protected IgniteEx ignite; |
| |
| /** Affinity topology version. */ |
| protected final AffinityTopologyVersion topVer; |
| |
| /** Cache name. */ |
| protected final String cacheName; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| */ |
| public TopologyVersionAwareJob(String cacheName, AffinityTopologyVersion topVer) { |
| assert topVer != null; |
| |
| this.cacheName = cacheName; |
| this.topVer = topVer; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public final Object execute() { |
| if (!waitAffinityReadyFuture()) |
| return null; |
| |
| IgniteInternalCache cache = ignite.context().cache().cache(cacheName); |
| |
| return localExecute(cache); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @return Local execution result. |
| */ |
| @Nullable protected abstract Object localExecute(@Nullable IgniteInternalCache cache); |
| |
| /** |
| * Holds (suspends) job execution until our cache version becomes equal to remote cache's version. |
| * |
| * @return {@code True} if topology check passed. |
| */ |
| private boolean waitAffinityReadyFuture() { |
| GridCacheProcessor cacheProc = ignite.context().cache(); |
| |
| AffinityTopologyVersion locTopVer = cacheProc.context().exchange().readyAffinityVersion(); |
| |
| if (locTopVer.compareTo(topVer) < 0) { |
| IgniteInternalFuture<?> fut = cacheProc.context().exchange().affinityReadyFuture(topVer); |
| |
| if (fut != null && !fut.isDone()) { |
| jobCtx.holdcc(); |
| |
| fut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> t) { |
| ignite.context().closure().runLocalSafe(new GridPlainRunnable() { |
| @Override public void run() { |
| jobCtx.callcc(); |
| } |
| }, false); |
| } |
| }); |
| |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| } |
| |
| /** |
| * Size task. |
| */ |
| @GridInternal |
| private static class SizeTask extends ComputeTaskAdapter<Object, Integer> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Cache name. */ |
| private final String cacheName; |
| |
| /** Affinity topology version. */ |
| private final AffinityTopologyVersion topVer; |
| |
| /** Peek modes. */ |
| private final CachePeekMode[] peekModes; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param peekModes Cache peek modes. |
| */ |
| public SizeTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { |
| this.cacheName = cacheName; |
| this.topVer = topVer; |
| this.peekModes = peekModes; |
| } |
| |
| /** {@inheritDoc} */ |
| @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, |
| @Nullable Object arg) throws IgniteException { |
| Map<ComputeJob, ClusterNode> jobs = new HashMap(); |
| |
| for (ClusterNode node : subgrid) |
| jobs.put(new SizeJob(cacheName, topVer, peekModes), node); |
| |
| return jobs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { |
| IgniteException e = res.getException(); |
| |
| if (e != null) { |
| if (e instanceof ClusterTopologyException) |
| return ComputeJobResultPolicy.WAIT; |
| |
| throw new IgniteException("Remote job threw exception.", e); |
| } |
| |
| return ComputeJobResultPolicy.WAIT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Integer reduce(List<ComputeJobResult> results) throws IgniteException { |
| int size = 0; |
| |
| for (ComputeJobResult res : results) { |
| if (res.getException() == null && res != null) |
| size += res.<Integer>getData(); |
| } |
| |
| return size; |
| } |
| } |
| |
| /** |
| * Size task. |
| */ |
| @GridInternal |
| private static class SizeLongTask extends ComputeTaskAdapter<Object, Long> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Cache name. */ |
| private final String cacheName; |
| |
| /** Affinity topology version. */ |
| private final AffinityTopologyVersion topVer; |
| |
| /** Peek modes. */ |
| private final CachePeekMode[] peekModes; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param peekModes Cache peek modes. |
| */ |
| private SizeLongTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) { |
| this.cacheName = cacheName; |
| this.topVer = topVer; |
| this.peekModes = peekModes; |
| } |
| |
| /** {@inheritDoc} */ |
| @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, |
| @Nullable Object arg) throws IgniteException { |
| Map<ComputeJob, ClusterNode> jobs = new HashMap(); |
| |
| for (ClusterNode node : subgrid) |
| jobs.put(new SizeLongJob(cacheName, topVer, peekModes), node); |
| |
| return jobs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { |
| IgniteException e = res.getException(); |
| |
| if (e != null) { |
| if (e instanceof ClusterTopologyException) |
| return ComputeJobResultPolicy.WAIT; |
| |
| throw new IgniteException("Remote job threw exception.", e); |
| } |
| |
| return ComputeJobResultPolicy.WAIT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Long reduce(List<ComputeJobResult> results) throws IgniteException { |
| long size = 0; |
| |
| for (ComputeJobResult res : results) { |
| if (res != null && res.getException() == null) |
| size += res.<Long>getData(); |
| } |
| |
| return size; |
| } |
| } |
| |
| /** |
| * Partition Size Long task. |
| */ |
| @GridInternal |
| private static class PartitionSizeLongTask extends ComputeTaskAdapter<Object, Long> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Partition */ |
| private final int partition; |
| |
| /** Cache name. */ |
| private final String cacheName; |
| |
| /** Affinity topology version. */ |
| private final AffinityTopologyVersion topVer; |
| |
| /** Peek modes. */ |
| private final CachePeekMode[] peekModes; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param peekModes Cache peek modes. |
| * @param partition partition. |
| */ |
| private PartitionSizeLongTask( |
| String cacheName, |
| AffinityTopologyVersion topVer, |
| CachePeekMode[] peekModes, |
| int partition |
| ) { |
| this.cacheName = cacheName; |
| this.topVer = topVer; |
| this.peekModes = peekModes; |
| this.partition = partition; |
| } |
| |
| /** {@inheritDoc} */ |
| @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map( |
| List<ClusterNode> subgrid, |
| @Nullable Object arg |
| ) throws IgniteException { |
| Map<ComputeJob, ClusterNode> jobs = new HashMap(); |
| |
| for (ClusterNode node : subgrid) |
| jobs.put(new PartitionSizeLongJob(cacheName, topVer, peekModes, partition), node); |
| |
| return jobs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { |
| IgniteException e = res.getException(); |
| |
| if (e != null) { |
| if (e instanceof ClusterTopologyException) |
| return ComputeJobResultPolicy.WAIT; |
| |
| throw new IgniteException("Remote job threw exception.", e); |
| } |
| |
| return ComputeJobResultPolicy.WAIT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Long reduce(List<ComputeJobResult> results) throws IgniteException { |
| long size = 0; |
| |
| for (ComputeJobResult res : results) { |
| if (res != null) { |
| if (res.getException() == null) |
| size += res.<Long>getData(); |
| else |
| throw res.getException(); |
| } |
| } |
| |
| return size; |
| } |
| } |
| |
| /** |
| * Clear task. |
| */ |
| @GridInternal |
| private static class ClearTask<K> extends ComputeTaskAdapter<Object, Object> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Cache name. */ |
| private final String cacheName; |
| |
| /** Affinity topology version. */ |
| private final AffinityTopologyVersion topVer; |
| |
| /** Keys to clear. */ |
| private final Set<? extends K> keys; |
| |
| /** Near cache flag. */ |
| private final boolean near; |
| |
| /** |
| * @param cacheName Cache name. |
| * @param topVer Affinity topology version. |
| * @param keys Keys to clear. |
| * @param near Near cache flag. |
| */ |
| public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys, boolean near) { |
| this.cacheName = cacheName; |
| this.topVer = topVer; |
| this.keys = keys; |
| this.near = near; |
| } |
| |
| /** {@inheritDoc} */ |
| @NotNull @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid, |
| @Nullable Object arg) throws IgniteException { |
| Map<ComputeJob, ClusterNode> jobs = new HashMap<>(); |
| |
| for (ClusterNode node : subgrid) { |
| ComputeJob job; |
| |
| if (near) { |
| job = keys == null ? new GlobalClearAllNearJob(cacheName, topVer) : |
| new GlobalClearKeySetNearJob<>(cacheName, topVer, keys); |
| } |
| else { |
| job = keys == null ? new GlobalClearAllJob(cacheName, topVer) : |
| new GlobalClearKeySetJob<>(cacheName, topVer, keys); |
| } |
| |
| jobs.put(job, node); |
| } |
| |
| return jobs; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) { |
| IgniteException e = res.getException(); |
| |
| if (e != null) { |
| if (e instanceof ClusterTopologyException) |
| return ComputeJobResultPolicy.WAIT; |
| |
| throw new IgniteException("Remote job threw exception.", e); |
| } |
| |
| return ComputeJobResultPolicy.WAIT; |
| } |
| |
| /** {@inheritDoc} */ |
| @Nullable @Override public Object reduce(List<ComputeJobResult> results) throws IgniteException { |
| return null; |
| } |
| } |
| |
| /** |
| * Partition preload job. |
| */ |
| @GridInternal |
| private static class PartitionPreloadJob implements IgniteRunnable { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** */ |
| @IgniteInstanceResource |
| private IgniteEx ignite; |
| |
| /** */ |
| @LoggerResource |
| private IgniteLogger log; |
| |
| /** */ |
| private final String name; |
| |
| /** Cache name. */ |
| private final int part; |
| |
| /** |
| * @param name Name. |
| * @param part Partition. |
| */ |
| public PartitionPreloadJob(String name, int part) { |
| this.name = name; |
| this.part = part; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void run() { |
| IgniteInternalCache cache = ignite.context().cache().cache(name); |
| |
| try { |
| cache.context().offheap().preloadPartition(part); |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to preload the partition [cache=" + name + ", partition=" + part + ']', e); |
| |
| throw new IgniteException(e); |
| } |
| } |
| } |
| |
| /** |
| * Iterator implementation for KeySet. |
| */ |
| private final class KeySetIterator implements Iterator<K> { |
| /** Internal map entry iterator. */ |
| private final Iterator<GridCacheMapEntry> internalIterator; |
| |
| /** Keep binary flag. */ |
| private final boolean keepBinary; |
| |
| /** Current entry. */ |
| private GridCacheMapEntry current; |
| |
| /** |
| * Constructor. |
| * |
| * @param internalIterator Internal iterator. |
| * @param keepBinary Keep binary flag. |
| */ |
| private KeySetIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) { |
| this.internalIterator = internalIterator; |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return internalIterator.hasNext(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public K next() { |
| current = internalIterator.next(); |
| |
| return (K)ctx.unwrapBinaryIfNeeded(current.key(), keepBinary, true, null); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| if (current == null) |
| throw new IllegalStateException(); |
| |
| try { |
| getAndRemove((K)current.key()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| current = null; |
| } |
| } |
| |
| /** |
| * A wrapper over internal map that provides set semantics and constant-time contains() check. |
| */ |
| private final class KeySet extends AbstractSet<K> { |
| /** Internal entry set. */ |
| private final Set<GridCacheMapEntry> internalSet; |
| |
| /** Keep binary flag. */ |
| private final boolean keepBinary; |
| |
| /** |
| * Constructor |
| * |
| * @param internalSet Internal set. |
| */ |
| private KeySet(Set<GridCacheMapEntry> internalSet) { |
| this.internalSet = internalSet; |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| keepBinary = opCtx != null && opCtx.isKeepBinary(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterator<K> iterator() { |
| return new KeySetIterator(internalSet.iterator(), keepBinary); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() { |
| return F.size(iterator()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean contains(Object o) { |
| GridCacheMapEntry entry = map.getEntry(ctx, ctx.toCacheKeyObject(o)); |
| |
| return entry != null && internalSet.contains(entry); |
| } |
| } |
| |
| /** |
| * Iterator implementation for EntrySet. |
| */ |
| private final class EntryIterator implements Iterator<Cache.Entry<K, V>> { |
| |
| /** Internal iterator. */ |
| private final Iterator<GridCacheMapEntry> internalIterator; |
| |
| /** Current entry. */ |
| private GridCacheMapEntry current; |
| |
| /** Keep binary flag. */ |
| private final boolean keepBinary; |
| |
| /** |
| * Constructor. |
| * |
| * @param internalIterator Internal iterator. |
| * @param keepBinary Keep binary. |
| */ |
| private EntryIterator(Iterator<GridCacheMapEntry> internalIterator, boolean keepBinary) { |
| this.internalIterator = internalIterator; |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNext() { |
| return internalIterator.hasNext(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Cache.Entry<K, V> next() { |
| current = internalIterator.next(); |
| |
| return current.wrapLazyValue(keepBinary); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void remove() { |
| if (current == null) |
| throw new IllegalStateException(); |
| |
| try { |
| getAndRemove((K)current.wrapLazyValue(keepBinary).getKey()); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| |
| current = null; |
| } |
| } |
| |
| /** |
| * A wrapper over internal map that provides set semantics and constant-time contains() check. |
| */ |
| private final class EntrySet extends AbstractSet<Cache.Entry<K, V>> { |
| |
| /** Internal set. */ |
| private final Set<GridCacheMapEntry> internalSet; |
| |
| /** Keep binary flag. */ |
| private final boolean keepBinary; |
| |
| /** |
| * Constructor. |
| * |
| * @param internalSet Internal set. |
| * @param keepBinary Keep binary flag. |
| */ |
| private EntrySet(Set<GridCacheMapEntry> internalSet, boolean keepBinary) { |
| this.internalSet = internalSet; |
| this.keepBinary = keepBinary; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Iterator<Cache.Entry<K, V>> iterator() { |
| return new EntryIterator(internalSet.iterator(), keepBinary); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() { |
| return F.size(iterator()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean contains(Object o) { |
| GridCacheMapEntry entry = map.getEntry(ctx, ctx.toCacheKeyObject(o)); |
| |
| return entry != null && internalSet.contains(entry); |
| } |
| } |
| } |