| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import org.apache.ignite.*; |
| import org.apache.ignite.cluster.*; |
| import org.apache.ignite.internal.*; |
| import org.apache.ignite.internal.processors.affinity.*; |
| import org.apache.ignite.internal.processors.cache.*; |
| import org.apache.ignite.internal.processors.cache.distributed.*; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.*; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.*; |
| import org.apache.ignite.internal.processors.cache.distributed.near.*; |
| import org.apache.ignite.internal.processors.cache.version.*; |
| import org.apache.ignite.internal.util.future.*; |
| import org.apache.ignite.internal.util.lang.*; |
| import org.apache.ignite.internal.util.typedef.*; |
| import org.apache.ignite.internal.util.typedef.internal.*; |
| import org.apache.ignite.lang.*; |
| import org.jetbrains.annotations.*; |
| import org.jsr166.*; |
| |
| import javax.cache.*; |
| import javax.cache.expiry.*; |
| import java.io.*; |
| import java.util.*; |
| import java.util.concurrent.*; |
| |
| import static org.apache.ignite.internal.processors.dr.GridDrType.*; |
| |
| /** |
| * DHT cache adapter. |
| */ |
| public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Topology. */ |
| private GridDhtPartitionTopology top; |
| |
| /** Preloader. */ |
| protected GridCachePreloader preldr; |
| |
| /** Multi tx future holder. */ |
| private ThreadLocal<IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture>> multiTxHolder = new ThreadLocal<>(); |
| |
| /** Multi tx futures. */ |
| private ConcurrentMap<IgniteUuid, MultiUpdateFuture> multiTxFuts = new ConcurrentHashMap8<>(); |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| */ |
| protected GridDhtCacheAdapter() { |
| // No-op. |
| } |
| |
| /** |
| * @param ctx Context. |
| */ |
| protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { |
| super(ctx, ctx.config().getStartSize()); |
| |
| top = new GridDhtPartitionTopologyImpl(ctx); |
| } |
| |
| /** |
| * Constructor used for near-only cache. |
| * |
| * @param ctx Cache context. |
| * @param map Cache map. |
| */ |
| protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { |
| super(ctx, map); |
| |
| top = new GridDhtPartitionTopologyImpl(ctx); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override protected void init() { |
| map.setEntryFactory(new GridCacheMapEntryFactory() { |
| /** {@inheritDoc} */ |
| @Override public GridCacheMapEntry create(GridCacheContext ctx, |
| AffinityTopologyVersion topVer, |
| KeyCacheObject key, |
| int hash, |
| CacheObject val, |
| GridCacheMapEntry next, |
| int hdrId) |
| { |
| if (ctx.useOffheapEntry()) |
| return new GridDhtOffHeapCacheEntry(ctx, topVer, key, hash, val, next, hdrId); |
| |
| return new GridDhtCacheEntry(ctx, topVer, key, hash, val, next, hdrId); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| super.start(); |
| |
| ctx.io().addHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, new CI2<UUID, GridCacheTtlUpdateRequest>() { |
| @Override public void apply(UUID nodeId, GridCacheTtlUpdateRequest req) { |
| processTtlUpdateRequest(req); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void stop() { |
| super.stop(); |
| |
| if (preldr != null) |
| preldr.stop(); |
| |
| // Clean up to help GC. |
| preldr = null; |
| top = null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStart() throws IgniteCheckedException { |
| super.onKernalStart(); |
| |
| preldr.onKernalStart(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop() { |
| super.onKernalStop(); |
| |
| if (preldr != null) |
| preldr.onKernalStop(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| super.printMemoryStats(); |
| |
| top.printMemoryStats(1024); |
| } |
| |
| /** |
| * @return Near cache. |
| */ |
| public abstract GridNearCacheAdapter<K, V> near(); |
| |
| /** |
| * @return Partition topology. |
| */ |
| public GridDhtPartitionTopology topology() { |
| return top; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCachePreloader preloader() { |
| return preldr; |
| } |
| |
| /** |
| * @return DHT preloader. |
| */ |
| public GridDhtPreloader dhtPreloader() { |
| assert preldr instanceof GridDhtPreloader; |
| |
| return (GridDhtPreloader)preldr; |
| } |
| |
| /** |
| * @return Topology version future registered for multi-update. |
| */ |
| @Nullable public GridDhtTopologyFuture multiUpdateTopologyFuture() { |
| IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); |
| |
| return tup == null ? null : tup.get2(); |
| } |
| |
| /** |
| * Starts multi-update lock. Will wait for topology future is ready. |
| * |
| * @return Topology version. |
| * @throws IgniteCheckedException If failed. |
| */ |
| public AffinityTopologyVersion beginMultiUpdate() throws IgniteCheckedException { |
| IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); |
| |
| if (tup != null) |
| throw new IgniteCheckedException("Nested multi-update locks are not supported"); |
| |
| top.readLock(); |
| |
| GridDhtTopologyFuture topFut; |
| |
| AffinityTopologyVersion topVer; |
| |
| try { |
| // While we are holding read lock, register lock future for partition release future. |
| IgniteUuid lockId = IgniteUuid.fromUuid(ctx.localNodeId()); |
| |
| topVer = top.topologyVersion(); |
| |
| MultiUpdateFuture fut = new MultiUpdateFuture(topVer); |
| |
| MultiUpdateFuture old = multiTxFuts.putIfAbsent(lockId, fut); |
| |
| assert old == null; |
| |
| topFut = top.topologyVersionFuture(); |
| |
| multiTxHolder.set(F.t(lockId, topFut)); |
| } |
| finally { |
| top.readUnlock(); |
| } |
| |
| topFut.get(); |
| |
| return topVer; |
| } |
| |
| /** |
| * Ends multi-update lock. |
| * |
| * @throws IgniteCheckedException If failed. |
| */ |
| public void endMultiUpdate() throws IgniteCheckedException { |
| IgniteBiTuple<IgniteUuid, GridDhtTopologyFuture> tup = multiTxHolder.get(); |
| |
| if (tup == null) |
| throw new IgniteCheckedException("Multi-update was not started or released twice."); |
| |
| top.readLock(); |
| |
| try { |
| IgniteUuid lockId = tup.get1(); |
| |
| MultiUpdateFuture multiFut = multiTxFuts.remove(lockId); |
| |
| multiTxHolder.set(null); |
| |
| // Finish future. |
| multiFut.onDone(lockId); |
| } |
| finally { |
| top.readUnlock(); |
| } |
| } |
| |
| /** |
| * Creates multi update finish future. Will return {@code null} if no multi-update locks are found. |
| * |
| * @param topVer Topology version. |
| * @return Finish future. |
| */ |
| @Nullable public IgniteInternalFuture<?> multiUpdateFinishFuture(AffinityTopologyVersion topVer) { |
| GridCompoundFuture<IgniteUuid, Object> fut = null; |
| |
| for (MultiUpdateFuture multiFut : multiTxFuts.values()) { |
| if (multiFut.topologyVersion().compareTo(topVer) <= 0) { |
| if (fut == null) |
| fut = new GridCompoundFuture<>(); |
| |
| fut.add(multiFut); |
| } |
| } |
| |
| if (fut != null) |
| fut.markInitialized(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param key Key. |
| * @return DHT entry. |
| */ |
| @Nullable public GridDhtCacheEntry peekExx(KeyCacheObject key) { |
| return (GridDhtCacheEntry)peekEx(key); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| @Override public GridCacheEntryEx entryEx(KeyCacheObject key, boolean touch) |
| throws GridDhtInvalidPartitionException { |
| return super.entryEx(key, touch); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| @Override public GridCacheEntryEx entryEx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { |
| return super.entryEx(key, topVer); |
| } |
| |
| /** |
| * @param key Key. |
| * @return DHT entry. |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| public GridDhtCacheEntry entryExx(KeyCacheObject key) throws GridDhtInvalidPartitionException { |
| return (GridDhtCacheEntry)entryEx(key); |
| } |
| |
| /** |
| * @param key Key. |
| * @param topVer Topology version. |
| * @return DHT entry. |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| public GridDhtCacheEntry entryExx(KeyCacheObject key, AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { |
| return (GridDhtCacheEntry)entryEx(key, topVer); |
| } |
| |
| /** |
| * Gets or creates entry for given key. If key belongs to local node, dht entry will be returned, otherwise |
| * if {@code allowDetached} is {@code true}, detached entry will be returned, otherwise exception will be |
| * thrown. |
| * |
| * @param key Key for which entry should be returned. |
| * @param allowDetached Whether to allow detached entries. |
| * @param touch {@code True} if entry should be passed to eviction policy. |
| * @return Cache entry. |
| * @throws GridDhtInvalidPartitionException if entry does not belong to this node and |
| * {@code allowDetached} is {@code false}. |
| */ |
| public GridCacheEntryEx entryExx(KeyCacheObject key, AffinityTopologyVersion topVer, boolean allowDetached, boolean touch) { |
| try { |
| return allowDetached && !ctx.affinity().localNode(key, topVer) ? |
| createEntry(key) : entryEx(key, touch); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| if (!allowDetached) |
| throw e; |
| |
| return createEntry(key); |
| } |
| } |
| |
| /** |
| * @param key Key for which entry should be returned. |
| * @return Cache entry. |
| */ |
| protected GridDistributedCacheEntry createEntry(KeyCacheObject key) { |
| return new GridDhtDetachedCacheEntry(ctx, key, key.hashCode(), null, null, 0); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc) |
| throws IgniteCheckedException { |
| if (ctx.store().isLocal()) { |
| super.localLoad(keys, plc); |
| |
| return; |
| } |
| |
| // Version for all loaded entries. |
| final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion()); |
| |
| final boolean replicate = ctx.isDrEnabled(); |
| |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); |
| |
| Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); |
| |
| ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() { |
| @Override public void apply(KeyCacheObject key, Object val) { |
| loadEntry(key, val, ver0, null, topVer, replicate, plc0); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException { |
| if (ctx.store().isLocal()) { |
| super.localLoadCache(p, args); |
| |
| return; |
| } |
| |
| // Version for all loaded entries. |
| final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topology().topologyVersion()); |
| |
| final boolean replicate = ctx.isDrEnabled(); |
| |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null; |
| |
| final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); |
| |
| if (p != null) |
| ctx.kernalContext().resource().injectGeneric(p); |
| |
| try { |
| ctx.store().loadCache(new CI3<KeyCacheObject, Object, GridCacheVersion>() { |
| @Override public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) { |
| assert ver == null; |
| |
| loadEntry(key, val, ver0, p, topVer, replicate, plc); |
| } |
| }, args); |
| |
| } |
| finally { |
| if (p instanceof GridLoadCacheCloseablePredicate) |
| ((GridLoadCacheCloseablePredicate)p).onClose(); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param ver Cache version. |
| * @param p Optional predicate. |
| * @param topVer Topology version. |
| * @param replicate Replication flag. |
| * @param plc Expiry policy. |
| */ |
| private void loadEntry(KeyCacheObject key, |
| Object val, |
| GridCacheVersion ver, |
| @Nullable IgniteBiPredicate<K, V> p, |
| AffinityTopologyVersion topVer, |
| boolean replicate, |
| @Nullable ExpiryPolicy plc) { |
| if (p != null && !p.apply(key.<K>value(ctx.cacheObjectContext(), false), (V)val)) |
| return; |
| |
| try { |
| GridDhtLocalPartition part = top.localPartition(ctx.affinity().partition(key), |
| AffinityTopologyVersion.NONE, true); |
| |
| // Reserve to make sure that partition does not get unloaded. |
| if (part.reserve()) { |
| GridCacheEntryEx entry = null; |
| |
| try { |
| long ttl = CU.ttlForLoad(plc); |
| |
| if (ttl == CU.TTL_ZERO) |
| return; |
| |
| CacheObject cacheVal = ctx.toCacheObject(val); |
| |
| entry = entryEx(key, false); |
| |
| entry.initialValue(cacheVal, ver, ttl, CU.EXPIRE_TIME_CALCULATE, false, topVer, |
| replicate ? DR_LOAD : DR_NONE); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException("Failed to put cache value: " + entry, e); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry during loadCache (will ignore): " + entry); |
| } |
| finally { |
| if (entry != null) |
| entry.context().evicts().touch(entry, topVer); |
| |
| part.release(); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Will node load entry into cache (partition is invalid): " + part); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Ignoring entry for partition that does not belong [key=" + key + ", val=" + val + |
| ", err=" + e + ']'); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int primarySize() { |
| int sum = 0; |
| |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| for (GridDhtLocalPartition p : topology().currentLocalPartitions()) { |
| if (p.primary(topVer)) |
| sum += p.publicSize(); |
| } |
| |
| return sum; |
| } |
| |
| /** |
| * This method is used internally. Use |
| * {@link #getDhtAsync(UUID, long, LinkedHashMap, boolean, boolean, AffinityTopologyVersion, UUID, int, IgniteCacheExpiryPolicy, boolean)} |
| * method instead to retrieve DHT value. |
| * |
| * @param keys {@inheritDoc} |
| * @param forcePrimary {@inheritDoc} |
| * @param skipTx {@inheritDoc} |
| * @return {@inheritDoc} |
| */ |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable Collection<? extends K> keys, |
| boolean forcePrimary, |
| boolean skipTx, |
| @Nullable GridCacheEntryEx entry, |
| @Nullable UUID subjId, |
| String taskName, |
| boolean deserializePortable, |
| boolean skipVals |
| ) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return getAllAsync(keys, |
| opCtx == null || !opCtx.skipStore(), |
| null, |
| /*don't check local tx. */false, |
| subjId, |
| taskName, |
| deserializePortable, |
| forcePrimary, |
| null, |
| skipVals); |
| } |
| |
| /** |
| * @param keys Keys to get |
| * @param readThrough Read through flag. |
| * @param subjId Subject ID. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param skipVals Skip values flag. |
| * @return Get future. |
| */ |
| IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> getDhtAllAsync( |
| Collection<KeyCacheObject> keys, |
| boolean readThrough, |
| @Nullable UUID subjId, |
| String taskName, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean skipVals |
| ) { |
| return getAllAsync0(keys, |
| readThrough, |
| /*don't check local tx. */false, |
| subjId, |
| taskName, |
| false, |
| expiry, |
| skipVals, |
| /*keep cache objects*/true); |
| } |
| |
| /** |
| * @param reader Reader node ID. |
| * @param msgId Message ID. |
| * @param keys Keys to get. |
| * @param readThrough Read through flag. |
| * @param reload Reload flag. |
| * @param topVer Topology version. |
| * @param subjId Subject ID. |
| * @param taskNameHash Task name hash code. |
| * @param expiry Expiry policy. |
| * @return DHT future. |
| */ |
| public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, |
| long msgId, |
| LinkedHashMap<KeyCacheObject, Boolean> keys, |
| boolean readThrough, |
| boolean reload, |
| AffinityTopologyVersion topVer, |
| @Nullable UUID subjId, |
| int taskNameHash, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean skipVals) { |
| GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, |
| msgId, |
| reader, |
| keys, |
| readThrough, |
| reload, |
| /*tx*/null, |
| topVer, |
| subjId, |
| taskNameHash, |
| expiry, |
| skipVals); |
| |
| fut.init(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Get request. |
| */ |
| protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { |
| assert ctx.affinityNode(); |
| |
| long ttl = req.accessTtl(); |
| |
| final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.forAccess(ttl); |
| |
| IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = |
| getDhtAsync(nodeId, |
| req.messageId(), |
| req.keys(), |
| req.readThrough(), |
| req.reload(), |
| req.topologyVersion(), |
| req.subjectId(), |
| req.taskNameHash(), |
| expiryPlc, |
| req.skipValues()); |
| |
| fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() { |
| @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) { |
| GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), |
| req.futureId(), |
| req.miniId(), |
| req.version()); |
| |
| GridDhtFuture<Collection<GridCacheEntryInfo>> fut = |
| (GridDhtFuture<Collection<GridCacheEntryInfo>>)f; |
| |
| try { |
| Collection<GridCacheEntryInfo> entries = fut.get(); |
| |
| res.entries(entries); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed processing get request: " + req, e); |
| |
| res.error(e); |
| } |
| |
| if (!F.isEmpty(fut.invalidPartitions())) |
| res.invalidPartitions(fut.invalidPartitions(), ctx.shared().exchange().readyAffinityVersion()); |
| else |
| res.invalidPartitions(fut.invalidPartitions(), req.topologyVersion()); |
| |
| try { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + |
| ",req=" + req + ", res=" + res + ']', e); |
| } |
| |
| sendTtlUpdateRequest(expiryPlc); |
| } |
| }); |
| } |
| |
| /** |
| * @param expiryPlc Expiry policy. |
| */ |
| public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy expiryPlc) { |
| if (expiryPlc != null && expiryPlc.entries() != null) { |
| ctx.closures().runLocalSafe(new Runnable() { |
| @SuppressWarnings({"unchecked", "ForLoopReplaceableByForEach"}) |
| @Override public void run() { |
| Map<KeyCacheObject, GridCacheVersion> entries = expiryPlc.entries(); |
| |
| assert entries != null && !entries.isEmpty(); |
| |
| Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); |
| |
| AffinityTopologyVersion topVer = new AffinityTopologyVersion(ctx.discovery().topologyVersion()); |
| |
| for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) { |
| List<ClusterNode> nodes = ctx.affinity().nodes(e.getKey(), topVer); |
| |
| for (int i = 0; i < nodes.size(); i++) { |
| ClusterNode node = nodes.get(i); |
| |
| if (!node.isLocal()) { |
| GridCacheTtlUpdateRequest req = reqMap.get(node); |
| |
| if (req == null) { |
| reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| topVer, |
| expiryPlc.forAccess())); |
| } |
| |
| req.addEntry(e.getKey(), e.getValue()); |
| } |
| } |
| } |
| |
| Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrs = expiryPlc.readers(); |
| |
| if (rdrs != null) { |
| assert !rdrs.isEmpty(); |
| |
| for (Map.Entry<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> e : rdrs.entrySet()) { |
| ClusterNode node = ctx.node(e.getKey()); |
| |
| if (node != null) { |
| GridCacheTtlUpdateRequest req = reqMap.get(node); |
| |
| if (req == null) { |
| reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| topVer, |
| expiryPlc.forAccess())); |
| } |
| |
| for (IgniteBiTuple<KeyCacheObject, GridCacheVersion> t : e.getValue()) |
| req.addNearEntry(t.get1(), t.get2()); |
| } |
| } |
| } |
| |
| for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : reqMap.entrySet()) { |
| try { |
| ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send TTL update request.", e); |
| } |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param req Request. |
| */ |
| private void processTtlUpdateRequest(GridCacheTtlUpdateRequest req) { |
| if (req.keys() != null) |
| updateTtl(this, req.keys(), req.versions(), req.ttl()); |
| |
| if (req.nearKeys() != null) { |
| GridNearCacheAdapter<K, V> near = near(); |
| |
| assert near != null; |
| |
| updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl()); |
| } |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param keys Entries keys. |
| * @param vers Entries versions. |
| * @param ttl TTL. |
| */ |
| private void updateTtl(GridCacheAdapter<K, V> cache, |
| List<KeyCacheObject> keys, |
| List<GridCacheVersion> vers, |
| long ttl) { |
| assert !F.isEmpty(keys); |
| assert keys.size() == vers.size(); |
| |
| int size = keys.size(); |
| |
| boolean swap = cache.context().isSwapOrOffheapEnabled(); |
| |
| for (int i = 0; i < size; i++) { |
| try { |
| GridCacheEntryEx entry = null; |
| |
| try { |
| if (swap) { |
| entry = cache.entryEx(keys.get(i)); |
| |
| entry.unswap(false); |
| } |
| else |
| entry = cache.peekEx(keys.get(i)); |
| |
| if (entry != null) |
| entry.updateTtl(vers.get(i), ttl); |
| } |
| finally { |
| if (entry != null) |
| cache.context().evicts().touch(entry, AffinityTopologyVersion.NONE); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to unswap entry.", e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unlockAll(Collection<? extends K> keys) { |
| assert false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Set<Cache.Entry<K, V>> entrySet(int part) { |
| return new PartitionEntrySet(part); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtCacheAdapter.class, this, super.toString()); |
| } |
| |
| /** |
| * |
| */ |
| private class PartitionEntrySet extends AbstractSet<Cache.Entry<K, V>> { |
| /** */ |
| private int partId; |
| |
| /** |
| * @param partId Partition id. |
| */ |
| private PartitionEntrySet(int partId) { |
| this.partId = partId; |
| } |
| |
| /** {@inheritDoc} */ |
| @NotNull @Override public Iterator<Cache.Entry<K, V>> iterator() { |
| final GridDhtLocalPartition part = ctx.topology().localPartition(partId, |
| ctx.discovery().topologyVersionEx(), false); |
| |
| Iterator<GridDhtCacheEntry> partIt = part == null ? null : part.entries().iterator(); |
| |
| return new PartitionEntryIterator(partIt); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean remove(Object o) { |
| if (!(o instanceof Cache.Entry)) |
| return false; |
| |
| Cache.Entry<K, V> entry = (Cache.Entry<K, V>)o; |
| |
| K key = entry.getKey(); |
| V val = entry.getValue(); |
| |
| if (val == null) |
| return false; |
| |
| try { |
| // Cannot use remove(key, val) since we may be in DHT cache and should go through near. |
| return GridDhtCacheAdapter.this.remove(key, val); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean removeAll(Collection<?> c) { |
| boolean rmv = false; |
| |
| for (Object o : c) |
| rmv |= remove(o); |
| |
| return rmv; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean contains(Object o) { |
| if (!(o instanceof Cache.Entry)) |
| return false; |
| |
| Cache.Entry<K, V> entry = (Cache.Entry<K, V>)o; |
| |
| try { |
| return partId == ctx.affinity().partition(entry.getKey()) && |
| F.eq(entry.getValue(), localPeek(entry.getKey(), CachePeekModes.ONHEAP_ONLY, null)); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException(e); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() { |
| GridDhtLocalPartition part = ctx.topology().localPartition(partId, |
| new AffinityTopologyVersion(ctx.discovery().topologyVersion()), false); |
| |
| return part != null ? part.publicSize() : 0; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(PartitionEntrySet.class, this, "super", super.toString()); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally() { |
| return ctx.affinityNode() ? super.splitClearLocally() : |
| Collections.<GridCacheClearAllRunnable<K, V>>emptyList(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) { |
| assert entry.isDht(); |
| |
| GridDhtLocalPartition part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, |
| false); |
| |
| // Do not remove entry on replica topology. Instead, add entry to removal queue. |
| // It will be cleared eventually. |
| if (part != null) { |
| try { |
| part.onDeferredDelete(entry.key(), ver); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to enqueue deleted entry [key=" + entry.key() + ", ver=" + ver + ']', e); |
| } |
| } |
| } |
| |
| /** |
| * @param expVer Expected topology version. |
| * @param curVer Current topology version. |
| * @return {@code True} if cache affinity changed and operation should be remapped. |
| */ |
| protected final boolean needRemap(AffinityTopologyVersion expVer, AffinityTopologyVersion curVer) { |
| if (expVer.equals(curVer)) |
| return false; |
| |
| Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheAffinityNodes(ctx.name(), expVer); |
| Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheAffinityNodes(ctx.name(), curVer); |
| |
| return !cacheNodes0.equals(cacheNodes1); |
| } |
| |
| /** |
| * @param primary If {@code true} includes primary entries. |
| * @param backup If {@code true} includes backup entries. |
| * @return Local entries iterator. |
| */ |
| public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, final boolean backup) { |
| assert primary || backup; |
| |
| if (primary && backup) |
| return iterator(map.entries0().iterator(), !ctx.keepPortable()); |
| else { |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator(); |
| |
| Iterator<GridCacheEntryEx> it = new Iterator<GridCacheEntryEx>() { |
| private GridCacheEntryEx next; |
| |
| private Iterator<GridDhtCacheEntry> curIt; |
| |
| { |
| advance(); |
| } |
| |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| @Override public GridCacheEntryEx next() { |
| if (next == null) |
| throw new NoSuchElementException(); |
| |
| GridCacheEntryEx e = next; |
| |
| advance(); |
| |
| return e; |
| } |
| |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| private void advance() { |
| next = null; |
| |
| do { |
| if (curIt == null) { |
| while (partIt.hasNext()) { |
| GridDhtLocalPartition part = partIt.next(); |
| |
| if (primary == part.primary(topVer)) { |
| curIt = part.entries().iterator(); |
| |
| break; |
| } |
| } |
| } |
| |
| if (curIt != null) { |
| if (curIt.hasNext()) { |
| next = curIt.next(); |
| |
| break; |
| } |
| else |
| curIt = null; |
| } |
| } |
| while (partIt.hasNext()); |
| } |
| }; |
| |
| return iterator(it, !ctx.keepPortable()); |
| } |
| } |
| |
| /** |
| * Complex partition iterator for both partition and swap iteration. |
| */ |
| private class PartitionEntryIterator extends GridIteratorAdapter<Cache.Entry<K, V>> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Next entry. */ |
| private Cache.Entry<K, V> entry; |
| |
| /** Last seen entry to support remove. */ |
| private Cache.Entry<K, V> last; |
| |
| /** Partition iterator. */ |
| private final Iterator<GridDhtCacheEntry> partIt; |
| |
| /** |
| * @param partIt Partition iterator. |
| */ |
| private PartitionEntryIterator(@Nullable Iterator<GridDhtCacheEntry> partIt) { |
| this.partIt = partIt; |
| |
| advance(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean hasNextX() { |
| return entry != null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Cache.Entry<K, V> nextX() throws IgniteCheckedException { |
| if (!hasNext()) |
| throw new NoSuchElementException(); |
| |
| last = entry; |
| |
| advance(); |
| |
| return last; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void removeX() throws IgniteCheckedException { |
| if (last == null) |
| throw new IllegalStateException(); |
| |
| ctx.grid().cache(ctx.name()).remove(last.getKey(), last.getValue()); |
| } |
| |
| /** |
| * |
| */ |
| private void advance() { |
| if (partIt != null) { |
| while (partIt.hasNext()) { |
| GridDhtCacheEntry next = partIt.next(); |
| |
| if (next.isInternal() || !next.visitable(CU.empty0())) |
| continue; |
| |
| entry = next.wrapLazyValue(); |
| |
| return; |
| } |
| } |
| |
| entry = null; |
| } |
| } |
| |
| /** |
| * Multi update future. |
| */ |
| @SuppressWarnings("TypeMayBeWeakened") |
| private static class MultiUpdateFuture extends GridFutureAdapter<IgniteUuid> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Topology version. */ |
| private AffinityTopologyVersion topVer; |
| |
| /** |
| * @param topVer Topology version. |
| */ |
| private MultiUpdateFuture(@NotNull AffinityTopologyVersion topVer) { |
| this.topVer = topVer; |
| } |
| |
| /** |
| * @return Topology version. |
| */ |
| private AffinityTopologyVersion topologyVersion() { |
| return topVer; |
| } |
| } |
| } |