| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.io.Externalizable; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.NoSuchElementException; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentMap; |
| import javax.cache.Cache; |
| import javax.cache.expiry.ExpiryPolicy; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteException; |
| import org.apache.ignite.cluster.ClusterNode; |
| import org.apache.ignite.events.DiscoveryEvent; |
| import org.apache.ignite.events.Event; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException; |
| import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.CacheOperationContext; |
| import org.apache.ignite.internal.processors.cache.EntryGetResult; |
| import org.apache.ignite.internal.processors.cache.GridCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.GridCacheClearAllRunnable; |
| import org.apache.ignite.internal.processors.cache.GridCacheConcurrentMap; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryEx; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.GridCacheMapEntry; |
| import org.apache.ignite.internal.processors.cache.GridCachePreloader; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.ReaderArguments; |
| import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.GridDistributedCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.colocated.GridDhtDetachedCacheEntry; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysFuture; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.preloader.GridDhtForceKeysResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionTopology; |
| import org.apache.ignite.internal.processors.cache.distributed.near.CacheVersionedValue; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearCacheAdapter; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearGetResponse; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetRequest; |
| import org.apache.ignite.internal.processors.cache.distributed.near.GridNearSingleGetResponse; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot; |
| import org.apache.ignite.internal.processors.cache.mvcc.MvccUtils; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; |
| import org.apache.ignite.internal.processors.security.SecurityUtils; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.lang.GridPlainRunnable; |
| import org.apache.ignite.internal.util.typedef.CI1; |
| import org.apache.ignite.internal.util.typedef.CI2; |
| import org.apache.ignite.internal.util.typedef.CI3; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.CU; |
| import org.apache.ignite.internal.util.typedef.internal.S; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteBiInClosure; |
| import org.apache.ignite.lang.IgniteBiPredicate; |
| import org.apache.ignite.lang.IgniteBiTuple; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.apache.ignite.plugin.extensions.communication.Message; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.events.EventType.EVT_NODE_FAILED; |
| import static org.apache.ignite.events.EventType.EVT_NODE_LEFT; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_LOAD; |
| import static org.apache.ignite.internal.processors.dr.GridDrType.DR_NONE; |
| import static org.apache.ignite.internal.util.GridConcurrentFactory.newMap; |
| |
| /** |
| * DHT cache adapter. |
| */ |
| public abstract class GridDhtCacheAdapter<K, V> extends GridDistributedCacheAdapter<K, V> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** Force key futures. */ |
| private final ConcurrentMap<IgniteUuid, GridDhtForceKeysFuture<?, ?>> forceKeyFuts = newMap(); |
| |
| /** */ |
| private volatile boolean stopping; |
| |
| /** Discovery listener. */ |
| private final GridLocalEventListener discoLsnr = new GridLocalEventListener() { |
| @Override public void onEvent(Event evt) { |
| DiscoveryEvent e = (DiscoveryEvent)evt; |
| |
| ClusterNode loc = ctx.localNode(); |
| |
| assert e.type() == EVT_NODE_LEFT || e.type() == EVT_NODE_FAILED : e; |
| |
| final ClusterNode n = e.eventNode(); |
| |
| assert !loc.id().equals(n.id()); |
| |
| for (GridDhtForceKeysFuture<?, ?> f : forceKeyFuts.values()) |
| f.onDiscoveryEvent(e); |
| } |
| }; |
| |
| /** |
| * Empty constructor required for {@link Externalizable}. |
| */ |
| protected GridDhtCacheAdapter() { |
| // No-op. |
| } |
| |
| /** |
| * Adds future to future map. |
| * |
| * @param fut Future to add. |
| * @return {@code False} if node cache is stopping and future was completed with error. |
| */ |
| public boolean addFuture(GridDhtForceKeysFuture<?, ?> fut) { |
| forceKeyFuts.put(fut.futureId(), fut); |
| |
| if (stopping) { |
| fut.onDone(stopError()); |
| |
| return false; |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Removes future from future map. |
| * |
| * @param fut Future to remove. |
| */ |
| public void removeFuture(GridDhtForceKeysFuture<?, ?> fut) { |
| forceKeyFuts.remove(fut.futureId(), fut); |
| } |
| |
| /** |
| * @param node Node. |
| * @param msg Message. |
| */ |
| protected final void processForceKeyResponse(ClusterNode node, GridDhtForceKeysResponse msg) { |
| GridDhtForceKeysFuture<?, ?> f = forceKeyFuts.get(msg.futureId()); |
| |
| if (f != null) |
| f.onResult(msg); |
| else if (log.isDebugEnabled()) |
| log.debug("Receive force key response for unknown future (is it duplicate?) [nodeId=" + node.id() + |
| ", res=" + msg + ']'); |
| } |
| |
| /** |
| * @param node Node originated request. |
| * @param msg Force keys message. |
| */ |
| protected final void processForceKeysRequest(final ClusterNode node, final GridDhtForceKeysRequest msg) { |
| IgniteInternalFuture<?> fut = ctx.mvcc().finishKeys(msg.keys(), msg.cacheId(), msg.topologyVersion()); |
| |
| if (fut.isDone()) |
| processForceKeysRequest0(node, msg); |
| else |
| fut.listen(new CI1<IgniteInternalFuture<?>>() { |
| @Override public void apply(IgniteInternalFuture<?> t) { |
| processForceKeysRequest0(node, msg); |
| } |
| }); |
| } |
| |
| /** |
| * @param node Node originated request. |
| * @param msg Force keys message. |
| */ |
| private void processForceKeysRequest0(ClusterNode node, GridDhtForceKeysRequest msg) { |
| try { |
| ClusterNode loc = ctx.localNode(); |
| |
| GridDhtForceKeysResponse res = new GridDhtForceKeysResponse( |
| ctx.cacheId(), |
| msg.futureId(), |
| msg.miniId(), |
| ctx.deploymentEnabled()); |
| |
| GridDhtPartitionTopology top = ctx.topology(); |
| |
| for (KeyCacheObject k : msg.keys()) { |
| int p = ctx.affinity().partition(k); |
| |
| GridDhtLocalPartition locPart = top.localPartition(p, AffinityTopologyVersion.NONE, false); |
| |
| // If this node is no longer an owner. |
| if (locPart == null && !top.owners(p).contains(loc)) { |
| res.addMissed(k); |
| |
| continue; |
| } |
| |
| GridCacheEntryEx entry; |
| |
| while (true) { |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| entry = ctx.dht().entryEx(k); |
| |
| entry.unswap(); |
| |
| if (ctx.mvccEnabled()) { |
| List<GridCacheEntryInfo> infos = entry.allVersionsInfo(); |
| |
| if (infos == null) { |
| assert entry.obsolete() : entry; |
| |
| continue; |
| } |
| |
| for (int i = 0; i < infos.size(); i++) |
| res.addInfo(infos.get(i)); |
| } |
| else { |
| GridCacheEntryInfo info = entry.info(); |
| |
| if (info == null) { |
| assert entry.obsolete() : entry; |
| |
| continue; |
| } |
| |
| if (!info.isNew()) |
| res.addInfo(info); |
| } |
| |
| entry.touch(); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry: " + k); |
| } |
| catch (GridDhtInvalidPartitionException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Local node is no longer an owner: " + p); |
| |
| res.addMissed(k); |
| |
| break; |
| } |
| finally { |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Sending force key response [node=" + node.id() + ", res=" + res + ']'); |
| |
| ctx.io().send(node, res, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Received force key request form failed node (will ignore) [nodeId=" + node.id() + |
| ", req=" + msg + ']'); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to reply to force key request [nodeId=" + node.id() + ", req=" + msg + ']', e); |
| } |
| } |
| |
| /** |
| * |
| */ |
| public void dumpDebugInfo() { |
| if (!forceKeyFuts.isEmpty()) { |
| U.warn(log, "Pending force key futures [cache=" + ctx.name() + "]:"); |
| |
| for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) |
| U.warn(log, ">>> " + fut); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onKernalStop() { |
| super.onKernalStop(); |
| |
| stopping = true; |
| |
| IgniteCheckedException err = stopError(); |
| |
| for (GridDhtForceKeysFuture fut : forceKeyFuts.values()) |
| fut.onDone(err); |
| |
| ctx.gridEvents().removeLocalEventListener(discoLsnr); |
| } |
| |
| /** |
| * @return Node stop exception. |
| */ |
| private IgniteCheckedException stopError() { |
| return new NodeStoppingException("Operation has been cancelled (cache or node is stopping)."); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Near get response. |
| */ |
| protected final void processNearGetResponse(UUID nodeId, GridNearGetResponse res) { |
| if (log.isDebugEnabled()) |
| log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']'); |
| |
| CacheGetFuture fut = (CacheGetFuture)ctx.mvcc().future(res.futureId()); |
| |
| if (fut == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); |
| |
| return; |
| } |
| |
| fut.onResult(nodeId, res); |
| } |
| |
| /** |
| * @param nodeId Sender node ID. |
| * @param res Near get response. |
| */ |
| protected void processNearSingleGetResponse(UUID nodeId, GridNearSingleGetResponse res) { |
| if (log.isDebugEnabled()) |
| log.debug("Processing near get response [nodeId=" + nodeId + ", res=" + res + ']'); |
| |
| GridPartitionedSingleGetFuture fut = (GridPartitionedSingleGetFuture)ctx.mvcc() |
| .future(new IgniteUuid(IgniteUuid.VM_ID, res.futureId())); |
| |
| if (fut == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to find future for get response [sender=" + nodeId + ", res=" + res + ']'); |
| |
| return; |
| } |
| |
| fut.onResult(nodeId, res); |
| } |
| |
| /** |
| * @param ctx Context. |
| */ |
| protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx) { |
| this(ctx, new GridCachePartitionedConcurrentMap(ctx.group())); |
| } |
| |
| /** |
| * Constructor used for near-only cache. |
| * |
| * @param ctx Cache context. |
| * @param map Cache map. |
| */ |
| protected GridDhtCacheAdapter(GridCacheContext<K, V> ctx, GridCacheConcurrentMap map) { |
| super(ctx, map); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void start() throws IgniteCheckedException { |
| ctx.io().addCacheHandler(ctx.cacheId(), GridCacheTtlUpdateRequest.class, |
| (CI2<UUID, GridCacheTtlUpdateRequest>)this::processTtlUpdateRequest); |
| |
| ctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_LEFT, EVT_NODE_FAILED); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void printMemoryStats() { |
| super.printMemoryStats(); |
| |
| ctx.group().topology().printMemoryStats(1024); |
| } |
| |
| /** |
| * @return Near cache. |
| */ |
| public abstract GridNearCacheAdapter<K, V> near(); |
| |
| /** |
| * @return Partition topology. |
| */ |
| public GridDhtPartitionTopology topology() { |
| return ctx.group().topology(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public GridCachePreloader preloader() { |
| return ctx.group().preloader(); |
| } |
| |
| /** |
| * @param key Key. |
| * @return DHT entry. |
| */ |
| @Nullable public GridDhtCacheEntry peekExx(KeyCacheObject key) { |
| return (GridDhtCacheEntry)peekEx(key); |
| } |
| |
| /** |
| * {@inheritDoc} |
| * |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| @Override public GridCacheEntryEx entryEx(KeyCacheObject key, |
| AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { |
| return super.entryEx(key, topVer); |
| } |
| |
| /** |
| * @param key Key. |
| * @return DHT entry. |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| public GridDhtCacheEntry entryExx(KeyCacheObject key) throws GridDhtInvalidPartitionException { |
| return (GridDhtCacheEntry)entryEx(key); |
| } |
| |
| /** |
| * @param key Key. |
| * @param topVer Topology version. |
| * @return DHT entry. |
| * @throws GridDhtInvalidPartitionException If partition for the key is no longer valid. |
| */ |
| public GridDhtCacheEntry entryExx(KeyCacheObject key, |
| AffinityTopologyVersion topVer) throws GridDhtInvalidPartitionException { |
| return (GridDhtCacheEntry)entryEx(key, topVer); |
| } |
| |
| /** |
| * @param key Key for which entry should be returned. |
| * @return Cache entry. |
| */ |
| protected GridDistributedCacheEntry createEntry(KeyCacheObject key) { |
| return new GridDhtDetachedCacheEntry(ctx, key); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void localLoad(Collection<? extends K> keys, final ExpiryPolicy plc, final boolean keepBinary) |
| throws IgniteCheckedException { |
| if (ctx.store().isLocal()) { |
| super.localLoad(keys, plc, keepBinary); |
| |
| return; |
| } |
| |
| // Version for all loaded entries. |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer.topologyVersion()); |
| |
| final boolean replicate = ctx.isDrEnabled(); |
| |
| final ExpiryPolicy plc0 = plc != null ? plc : ctx.expiry(); |
| |
| Collection<KeyCacheObject> keys0 = ctx.cacheKeysView(keys); |
| |
| ctx.store().loadAll(null, keys0, new CI2<KeyCacheObject, Object>() { |
| @Override public void apply(KeyCacheObject key, Object val) { |
| loadEntry(key, val, ver0, null, topVer, replicate, plc0); |
| } |
| }); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void localLoadCache(final IgniteBiPredicate<K, V> p, Object[] args) throws IgniteCheckedException { |
| if (ctx.store().isLocal()) { |
| super.localLoadCache(p, args); |
| |
| return; |
| } |
| |
| //TODO IGNITE-7954 |
| MvccUtils.verifyMvccOperationSupport(ctx, "Load"); |
| |
| final AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| // Version for all loaded entries. |
| final GridCacheVersion ver0 = ctx.shared().versions().nextForLoad(topVer.topologyVersion()); |
| |
| final boolean replicate = ctx.isDrEnabled(); |
| |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| ExpiryPolicy plc0 = opCtx != null ? opCtx.expiry() : null; |
| |
| final ExpiryPolicy plc = plc0 != null ? plc0 : ctx.expiry(); |
| |
| final IgniteBiPredicate<K, V> pred; |
| |
| if (p != null) { |
| ctx.kernalContext().resource().injectGeneric(p); |
| |
| pred = SecurityUtils.sandboxedProxy(ctx.kernalContext(), IgniteBiPredicate.class, p); |
| } |
| else |
| pred = null; |
| |
| try { |
| ctx.store().loadCache(new CI3<KeyCacheObject, Object, GridCacheVersion>() { |
| @Override public void apply(KeyCacheObject key, Object val, @Nullable GridCacheVersion ver) { |
| assert ver == null; |
| |
| loadEntry(key, val, ver0, pred, topVer, replicate, plc); |
| } |
| }, args); |
| |
| } |
| finally { |
| if (p instanceof PlatformCacheEntryFilter) |
| ((PlatformCacheEntryFilter)p).onClose(); |
| } |
| } |
| |
| /** |
| * @param key Key. |
| * @param val Value. |
| * @param ver Cache version. |
| * @param p Optional predicate. |
| * @param topVer Topology version. |
| * @param replicate Replication flag. |
| * @param plc Expiry policy. |
| */ |
| private void loadEntry(KeyCacheObject key, |
| Object val, |
| GridCacheVersion ver, |
| @Nullable IgniteBiPredicate<K, V> p, |
| AffinityTopologyVersion topVer, |
| boolean replicate, |
| @Nullable ExpiryPolicy plc) { |
| if (p != null && !p.apply(key.<K>value(ctx.cacheObjectContext(), false), (V)val)) |
| return; |
| |
| try { |
| GridDhtLocalPartition part = ctx.group().topology().localPartition(ctx.affinity().partition(key), |
| AffinityTopologyVersion.NONE, true); |
| |
| // Reserve to make sure that partition does not get unloaded. |
| if (part.reserve()) { |
| GridCacheEntryEx entry = null; |
| |
| ctx.shared().database().checkpointReadLock(); |
| |
| try { |
| long ttl = CU.ttlForLoad(plc); |
| |
| if (ttl == CU.TTL_ZERO) |
| return; |
| |
| CacheObject cacheVal = ctx.toCacheObject(val); |
| |
| entry = entryEx(key); |
| |
| entry.initialValue(cacheVal, |
| ver, |
| ttl, |
| CU.EXPIRE_TIME_CALCULATE, |
| false, |
| topVer, |
| replicate ? DR_LOAD : DR_NONE, |
| false); |
| } |
| catch (IgniteCheckedException e) { |
| throw new IgniteException("Failed to put cache value: " + entry, e); |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry during loadCache (will ignore): " + entry); |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| |
| part.release(); |
| |
| ctx.shared().database().checkpointReadUnlock(); |
| } |
| } |
| else if (log.isDebugEnabled()) |
| log.debug("Will node load entry into cache (partition is invalid): " + part); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| if (log.isDebugEnabled()) |
| log.debug(S.toString("Ignoring entry for partition that does not belong", |
| "key", key, true, |
| "val", val, true, |
| "err", e, false)); |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int size() { |
| return (int)sizeLong(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long sizeLong() { |
| long sum = 0; |
| |
| for (GridDhtLocalPartition p : topology().currentLocalPartitions()) |
| sum += p.dataStore().cacheSize(ctx.cacheId()); |
| |
| return sum; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public int primarySize() { |
| return (int)primarySizeLong(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public long primarySizeLong() { |
| long sum = 0; |
| |
| AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion(); |
| |
| for (GridDhtLocalPartition p : topology().currentLocalPartitions()) { |
| if (p.primary(topVer)) |
| sum += p.dataStore().cacheSize(ctx.cacheId()); |
| } |
| |
| return sum; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public IgniteInternalFuture<Map<K, V>> getAllAsync( |
| @Nullable Collection<? extends K> keys, |
| boolean forcePrimary, |
| boolean skipTx, |
| @Nullable UUID subjId, |
| String taskName, |
| boolean deserializeBinary, |
| boolean recovery, |
| boolean readRepair, |
| boolean skipVals, |
| boolean needVer |
| ) { |
| CacheOperationContext opCtx = ctx.operationContextPerCall(); |
| |
| return getAllAsync(keys, |
| null, |
| opCtx == null || !opCtx.skipStore(), |
| /*don't check local tx. */false, |
| subjId, |
| taskName, |
| deserializeBinary, |
| opCtx != null && opCtx.recovery(), |
| readRepair, |
| forcePrimary, |
| null, |
| skipVals, |
| needVer); |
| } |
| |
| /** |
| * @param keys Keys to get |
| * @param readerArgs Reader will be added if not null. |
| * @param readThrough Read through flag. |
| * @param subjId Subject ID. |
| * @param taskName Task name. |
| * @param expiry Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param txLbl Transaction label. |
| * @param mvccSnapshot MVCC snapshot. |
| * @return Get future. |
| */ |
| IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> getDhtAllAsync( |
| Collection<KeyCacheObject> keys, |
| @Nullable final ReaderArguments readerArgs, |
| boolean readThrough, |
| @Nullable UUID subjId, |
| String taskName, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean skipVals, |
| boolean recovery, |
| @Nullable String txLbl, |
| MvccSnapshot mvccSnapshot |
| ) { |
| return getAllAsync0(keys, |
| readerArgs, |
| readThrough, |
| /*don't check local tx. */false, |
| subjId, |
| taskName, |
| false, |
| expiry, |
| skipVals, |
| /*keep cache objects*/true, |
| recovery, |
| false, |
| /*need version*/true, |
| txLbl, |
| mvccSnapshot); |
| } |
| |
| /** |
| * @param reader Reader node ID. |
| * @param msgId Message ID. |
| * @param keys Keys to get. |
| * @param addReaders Add readers flag. |
| * @param readThrough Read through flag. |
| * @param topVer Topology version. |
| * @param subjId Subject ID. |
| * @param taskNameHash Task name hash code. |
| * @param expiry Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param txLbl Transaction label. |
| * @param mvccSnapshot MVCC snapshot. |
| * @return DHT future. |
| */ |
| public GridDhtFuture<Collection<GridCacheEntryInfo>> getDhtAsync(UUID reader, |
| long msgId, |
| Map<KeyCacheObject, Boolean> keys, |
| boolean addReaders, |
| boolean readThrough, |
| AffinityTopologyVersion topVer, |
| @Nullable UUID subjId, |
| int taskNameHash, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean skipVals, |
| boolean recovery, |
| @Nullable String txLbl, |
| MvccSnapshot mvccSnapshot |
| ) { |
| GridDhtGetFuture<K, V> fut = new GridDhtGetFuture<>(ctx, |
| msgId, |
| reader, |
| keys, |
| readThrough, |
| topVer, |
| subjId, |
| taskNameHash, |
| expiry, |
| skipVals, |
| recovery, |
| addReaders, |
| txLbl, |
| mvccSnapshot); |
| |
| fut.init(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param msgId Message ID. |
| * @param key Key. |
| * @param addRdr Add reader flag. |
| * @param readThrough Read through flag. |
| * @param topVer Topology version flag. |
| * @param subjId Subject ID. |
| * @param taskNameHash Task name hash. |
| * @param expiry Expiry. |
| * @param skipVals Skip vals flag. |
| * @param txLbl Transaction label. |
| * @param mvccSnapshot Mvcc snapshot. |
| * @return Future for the operation. |
| */ |
| GridDhtGetSingleFuture getDhtSingleAsync( |
| UUID nodeId, |
| long msgId, |
| KeyCacheObject key, |
| boolean addRdr, |
| boolean readThrough, |
| AffinityTopologyVersion topVer, |
| @Nullable UUID subjId, |
| int taskNameHash, |
| @Nullable IgniteCacheExpiryPolicy expiry, |
| boolean skipVals, |
| boolean recovery, |
| String txLbl, |
| MvccSnapshot mvccSnapshot |
| ) { |
| GridDhtGetSingleFuture fut = new GridDhtGetSingleFuture<>( |
| ctx, |
| msgId, |
| nodeId, |
| key, |
| addRdr, |
| readThrough, |
| topVer, |
| subjId, |
| taskNameHash, |
| expiry, |
| skipVals, |
| recovery, |
| txLbl, |
| mvccSnapshot); |
| |
| fut.init(); |
| |
| return fut; |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Get request. |
| */ |
| protected void processNearSingleGetRequest(final UUID nodeId, final GridNearSingleGetRequest req) { |
| assert ctx.affinityNode(); |
| |
| final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl()); |
| |
| IgniteInternalFuture<GridCacheEntryInfo> fut = |
| getDhtSingleAsync( |
| nodeId, |
| req.messageId(), |
| req.key(), |
| req.addReader(), |
| req.readThrough(), |
| req.topologyVersion(), |
| req.subjectId(), |
| req.taskNameHash(), |
| expiryPlc, |
| req.skipValues(), |
| req.recovery(), |
| req.txLabel(), |
| req.mvccSnapshot()); |
| |
| fut.listen(new CI1<IgniteInternalFuture<GridCacheEntryInfo>>() { |
| @Override public void apply(IgniteInternalFuture<GridCacheEntryInfo> f) { |
| GridNearSingleGetResponse res; |
| |
| GridDhtFuture<GridCacheEntryInfo> fut = (GridDhtFuture<GridCacheEntryInfo>)f; |
| |
| try { |
| GridCacheEntryInfo info = fut.get(); |
| |
| if (F.isEmpty(fut.invalidPartitions())) { |
| Message res0 = null; |
| |
| if (info != null) { |
| if (req.needEntryInfo()) { |
| info.key(null); |
| |
| res0 = info; |
| } |
| else if (req.needVersion()) |
| res0 = new CacheVersionedValue(info.value(), info.version()); |
| else |
| res0 = info.value(); |
| } |
| |
| res = new GridNearSingleGetResponse( |
| ctx.cacheId(), |
| req.futureId(), |
| null, |
| res0, |
| false, |
| req.addDeploymentInfo() |
| ); |
| |
| if (info != null && req.skipValues()) |
| res.setContainsValue(); |
| } |
| else { |
| AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion(); |
| |
| res = new GridNearSingleGetResponse( |
| ctx.cacheId(), |
| req.futureId(), |
| topVer, |
| null, |
| true, |
| req.addDeploymentInfo() |
| ); |
| } |
| } |
| catch (NodeStoppingException ignored) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed processing get request: " + req, e); |
| |
| res = new GridNearSingleGetResponse(ctx.cacheId(), |
| req.futureId(), |
| req.topologyVersion(), |
| null, |
| false, |
| req.addDeploymentInfo()); |
| |
| res.error(e); |
| } |
| |
| try { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| } |
| catch (ClusterTopologyCheckedException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send get response to node, node failed: " + nodeId); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + |
| ",req=" + req + ", res=" + res + ']', e); |
| } |
| |
| sendTtlUpdateRequest(expiryPlc); |
| } |
| }); |
| } |
| |
| /** |
| * @param nodeId Node ID. |
| * @param req Get request. |
| */ |
| protected void processNearGetRequest(final UUID nodeId, final GridNearGetRequest req) { |
| assert ctx.affinityNode(); |
| |
| final CacheExpiryPolicy expiryPlc = CacheExpiryPolicy.fromRemote(req.createTtl(), req.accessTtl()); |
| |
| IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = |
| getDhtAsync(nodeId, |
| req.messageId(), |
| req.keys(), |
| req.addReaders(), |
| req.readThrough(), |
| req.topologyVersion(), |
| req.subjectId(), |
| req.taskNameHash(), |
| expiryPlc, |
| req.skipValues(), |
| req.recovery(), |
| req.txLabel(), |
| req.mvccSnapshot()); |
| |
| fut.listen(new CI1<IgniteInternalFuture<Collection<GridCacheEntryInfo>>>() { |
| @Override public void apply(IgniteInternalFuture<Collection<GridCacheEntryInfo>> f) { |
| GridNearGetResponse res = new GridNearGetResponse(ctx.cacheId(), |
| req.futureId(), |
| req.miniId(), |
| req.version(), |
| req.deployInfo() != null); |
| |
| GridDhtFuture<Collection<GridCacheEntryInfo>> fut = |
| (GridDhtFuture<Collection<GridCacheEntryInfo>>)f; |
| |
| try { |
| Collection<GridCacheEntryInfo> entries = fut.get(); |
| |
| res.entries(entries); |
| } |
| catch (NodeStoppingException ignored) { |
| return; |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed processing get request: " + req, e); |
| |
| res.error(e); |
| } |
| |
| if (!F.isEmpty(fut.invalidPartitions())) { |
| AffinityTopologyVersion topVer = ctx.shared().exchange().lastTopologyFuture().initialVersion(); |
| |
| res.invalidPartitions(fut.invalidPartitions(), topVer); |
| } |
| |
| try { |
| ctx.io().send(nodeId, res, ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| U.error(log, "Failed to send get response to node (is node still alive?) [nodeId=" + nodeId + |
| ",req=" + req + ", res=" + res + ']', e); |
| } |
| |
| sendTtlUpdateRequest(expiryPlc); |
| } |
| }); |
| } |
| |
| /** |
| * Initiates process of notifying all interested nodes that TTL was changed. |
| * Directly sends requests to primary nodes and {@link IgniteCacheExpiryPolicy#readers()}. |
| * |
| * @param expiryPlc Expiry policy. |
| */ |
| public void sendTtlUpdateRequest(@Nullable final IgniteCacheExpiryPolicy expiryPlc) { |
| if (expiryPlc != null && !F.isEmpty(expiryPlc.entries())) { |
| ctx.closures().runLocalSafe(new GridPlainRunnable() { |
| @SuppressWarnings({"ForLoopReplaceableByForEach"}) |
| @Override public void run() { |
| Map<KeyCacheObject, GridCacheVersion> entries = expiryPlc.entries(); |
| |
| assert entries != null && !entries.isEmpty(); |
| |
| Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); |
| |
| AffinityTopologyVersion topVer = ctx.shared().exchange().readyAffinityVersion(); |
| |
| for (Map.Entry<KeyCacheObject, GridCacheVersion> e : entries.entrySet()) { |
| ClusterNode primaryNode = ctx.affinity().primaryByKey(e.getKey(), topVer); |
| |
| if (primaryNode.isLocal()) { |
| Collection<ClusterNode> nodes = ctx.affinity().backupsByKey(e.getKey(), topVer); |
| |
| for (Iterator<ClusterNode> nodesIter = nodes.iterator(); nodesIter.hasNext(); ) { |
| ClusterNode node = nodesIter.next(); |
| GridCacheTtlUpdateRequest req = reqMap.get(node); |
| |
| if (req == null) { |
| reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| topVer, |
| expiryPlc.forAccess())); |
| } |
| |
| req.addEntry(e.getKey(), e.getValue()); |
| } |
| } |
| else { |
| GridCacheTtlUpdateRequest req = reqMap.get(primaryNode); |
| |
| if (req == null) { |
| reqMap.put(primaryNode, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| topVer, |
| expiryPlc.forAccess())); |
| } |
| |
| req.addEntry(e.getKey(), e.getValue()); |
| } |
| } |
| |
| Map<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> rdrs = expiryPlc.readers(); |
| |
| if (rdrs != null) { |
| assert !rdrs.isEmpty(); |
| |
| for (Map.Entry<UUID, Collection<IgniteBiTuple<KeyCacheObject, GridCacheVersion>>> e : rdrs.entrySet()) { |
| ClusterNode node = ctx.node(e.getKey()); |
| |
| if (node != null) { |
| GridCacheTtlUpdateRequest req = reqMap.get(node); |
| |
| if (req == null) { |
| reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| topVer, |
| expiryPlc.forAccess())); |
| } |
| |
| for (IgniteBiTuple<KeyCacheObject, GridCacheVersion> t : e.getValue()) |
| req.addNearEntry(t.get1(), t.get2()); |
| } |
| } |
| } |
| |
| for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : reqMap.entrySet()) { |
| try { |
| ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| if (e instanceof ClusterTopologyCheckedException) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send TTC update request, node left: " + req.getKey()); |
| } |
| else |
| U.error(log, "Failed to send TTL update request.", e); |
| } |
| } |
| } |
| }); |
| } |
| } |
| |
| /** |
| * @param srcNodeId The Id of a node that sends original ttl request. |
| * @param incomingReq Original ttl request. |
| */ |
| private void sendTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest incomingReq) { |
| ctx.closures().runLocalSafe(new Runnable() { |
| @SuppressWarnings({"ForLoopReplaceableByForEach"}) |
| @Override public void run() { |
| Map<ClusterNode, GridCacheTtlUpdateRequest> reqMap = new HashMap<>(); |
| |
| for (int i = 0; i < incomingReq.keys().size(); i++) { |
| KeyCacheObject key = incomingReq.keys().get(i); |
| |
| // It's only required to broadcast ttl update requests if we are on primary node for given key. |
| if (!ctx.affinity().primaryByKey(key, incomingReq.topologyVersion()).isLocal()) |
| continue; |
| |
| Collection<ClusterNode> nodes = ctx.affinity().backupsByKey(key, incomingReq.topologyVersion()); |
| |
| for (Iterator<ClusterNode> nodesIter = nodes.iterator(); nodesIter.hasNext(); ) { |
| ClusterNode node = nodesIter.next(); |
| |
| // There's no need to send and update ttl request to the node that send us the initial |
| // ttl update request. |
| if (node.id().equals(srcNodeId)) |
| continue; |
| |
| GridCacheTtlUpdateRequest req = reqMap.get(node); |
| |
| if (req == null) { |
| reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| incomingReq.topologyVersion(), |
| incomingReq.ttl())); |
| } |
| |
| req.addEntry(key, incomingReq.version(i)); |
| } |
| |
| GridDhtCacheEntry entry = ctx.dht().entryExx(key, incomingReq.topologyVersion()); |
| |
| Collection<UUID> readers = null; |
| |
| try { |
| readers = entry.readers(); |
| } |
| catch (GridCacheEntryRemovedException e) { |
| U.error(log, "Failed to send TTL update request.", e); |
| } |
| |
| for (UUID reader : readers) { |
| // There's no need to send and update ttl request to the node that send us the initial |
| // ttl update request. |
| if (reader.equals(srcNodeId)) |
| continue; |
| |
| ClusterNode node = ctx.node(reader); |
| |
| if (node != null) { |
| GridCacheTtlUpdateRequest req = reqMap.get(node); |
| |
| if (req == null) { |
| reqMap.put(node, req = new GridCacheTtlUpdateRequest(ctx.cacheId(), |
| incomingReq.topologyVersion(), |
| incomingReq.ttl())); |
| } |
| |
| req.addNearEntry(key, incomingReq.version(i)); |
| } |
| } |
| } |
| |
| for (Map.Entry<ClusterNode, GridCacheTtlUpdateRequest> req : reqMap.entrySet()) { |
| try { |
| ctx.io().send(req.getKey(), req.getValue(), ctx.ioPolicy()); |
| } |
| catch (IgniteCheckedException e) { |
| if (e instanceof ClusterTopologyCheckedException) { |
| if (log.isDebugEnabled()) |
| log.debug("Failed to send TTC update request, node left: " + req.getKey()); |
| } |
| else |
| U.error(log, "Failed to send TTL update request.", e); |
| } |
| } |
| } |
| }); |
| } |
| |
| /** |
| * @param req Request. |
| */ |
| private void processTtlUpdateRequest(UUID srcNodeId, GridCacheTtlUpdateRequest req) { |
| if (req.keys() != null) |
| updateTtl(this, req.keys(), req.versions(), req.ttl()); |
| |
| if (req.nearKeys() != null) { |
| GridNearCacheAdapter<K, V> near = near(); |
| |
| assert near != null; |
| |
| updateTtl(near, req.nearKeys(), req.nearVersions(), req.ttl()); |
| } |
| |
| sendTtlUpdateRequest(srcNodeId, req); |
| } |
| |
| /** |
| * @param cache Cache. |
| * @param keys Entries keys. |
| * @param vers Entries versions. |
| * @param ttl TTL. |
| */ |
| private void updateTtl(GridCacheAdapter<K, V> cache, |
| List<KeyCacheObject> keys, |
| List<GridCacheVersion> vers, |
| long ttl) { |
| assert !F.isEmpty(keys); |
| assert keys.size() == vers.size(); |
| |
| int size = keys.size(); |
| |
| for (int i = 0; i < size; i++) { |
| try { |
| GridCacheEntryEx entry = null; |
| |
| try { |
| while (true) { |
| try { |
| entry = cache.entryEx(keys.get(i)); |
| |
| entry.unswap(false); |
| |
| entry.updateTtl(vers.get(i), ttl); |
| |
| break; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry: " + entry); |
| } |
| catch (GridDhtInvalidPartitionException e) { |
| if (log.isDebugEnabled()) |
| log.debug("Got GridDhtInvalidPartitionException: " + e); |
| |
| break; |
| } |
| } |
| } |
| finally { |
| if (entry != null) |
| entry.touch(); |
| } |
| } |
| catch (IgniteCheckedException e) { |
| log.error("Failed to unswap entry.", e); |
| } |
| } |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void unlockAll(Collection<? extends K> keys) { |
| assert false; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public String toString() { |
| return S.toString(GridDhtCacheAdapter.class, this, super.toString()); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public List<GridCacheClearAllRunnable<K, V>> splitClearLocally(boolean srv, boolean near, |
| boolean readers) { |
| return ctx.affinityNode() ? super.splitClearLocally(srv, near, readers) : |
| Collections.<GridCacheClearAllRunnable<K, V>>emptyList(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public void onDeferredDelete(GridCacheEntryEx entry, GridCacheVersion ver) { |
| assert entry.isDht(); |
| |
| GridDhtLocalPartition part = topology().localPartition(entry.partition(), AffinityTopologyVersion.NONE, |
| false); |
| |
| if (part != null) |
| part.onDeferredDelete(entry.context().cacheId(), entry.key(), ver); |
| } |
| |
| /** |
| * @param mapVer Mapped topology version. |
| * @param curVer Current topology version. |
| * @return {@code True} if cache affinity changed and operation should be remapped. |
| */ |
| protected final boolean needRemap(AffinityTopologyVersion mapVer, AffinityTopologyVersion curVer) { |
| if (curVer.equals(mapVer)) |
| return false; |
| |
| AffinityTopologyVersion lastAffChangedTopVer = ctx.shared().exchange().lastAffinityChangedTopologyVersion(mapVer); |
| |
| if (curVer.isBetween(lastAffChangedTopVer, mapVer)) |
| return false; |
| |
| // TODO IGNITE-7164 check mvcc crd for mvcc enabled txs. |
| |
| Collection<ClusterNode> cacheNodes0 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), mapVer); |
| Collection<ClusterNode> cacheNodes1 = ctx.discovery().cacheGroupAffinityNodes(ctx.groupId(), curVer); |
| |
| if (!cacheNodes0.equals(cacheNodes1) || ctx.affinity().affinityTopologyVersion().before(curVer)) |
| return true; |
| |
| try { |
| List<List<ClusterNode>> aff1 = ctx.affinity().assignments(mapVer); |
| List<List<ClusterNode>> aff2 = ctx.affinity().assignments(curVer); |
| |
| return !aff1.equals(aff2); |
| } |
| catch (IllegalStateException ignored) { |
| return true; |
| } |
| } |
| |
| /** |
| * @param primary If {@code true} includes primary entries. |
| * @param backup If {@code true} includes backup entries. |
| * @param keepBinary Keep binary flag. |
| * @return Local entries iterator. |
| */ |
| public Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, |
| final boolean backup, |
| final boolean keepBinary) { |
| return localEntriesIterator(primary, |
| backup, |
| keepBinary, |
| ctx.affinity().affinityTopologyVersion()); |
| } |
| |
| /** |
| * @param primary If {@code true} includes primary entries. |
| * @param backup If {@code true} includes backup entries. |
| * @param keepBinary Keep binary flag. |
| * @param topVer Specified affinity topology version. |
| * @return Local entries iterator. |
| */ |
| private Iterator<Cache.Entry<K, V>> localEntriesIterator(final boolean primary, |
| final boolean backup, |
| final boolean keepBinary, |
| final AffinityTopologyVersion topVer) { |
| |
| return iterator(localEntriesIteratorEx(primary, backup, topVer), !keepBinary); |
| } |
| |
| /** |
| * @param primary If {@code true} includes primary entries. |
| * @param backup If {@code true} includes backup entries. |
| * @param topVer Specified affinity topology version. |
| * @return Local entries iterator. |
| */ |
| private Iterator<? extends GridCacheEntryEx> localEntriesIteratorEx(final boolean primary, |
| final boolean backup, |
| final AffinityTopologyVersion topVer) { |
| assert primary || backup; |
| |
| if (primary && backup) |
| return entries().iterator(); |
| else { |
| final Iterator<GridDhtLocalPartition> partIt = topology().currentLocalPartitions().iterator(); |
| |
| return new Iterator<GridCacheMapEntry>() { |
| private GridCacheMapEntry next; |
| |
| private Iterator<GridCacheMapEntry> curIt; |
| |
| { |
| advance(); |
| } |
| |
| @Override public boolean hasNext() { |
| return next != null; |
| } |
| |
| @Override public GridCacheMapEntry next() { |
| if (next == null) |
| throw new NoSuchElementException(); |
| |
| GridCacheMapEntry e = next; |
| |
| advance(); |
| |
| return e; |
| } |
| |
| @Override public void remove() { |
| throw new UnsupportedOperationException(); |
| } |
| |
| private void advance() { |
| next = null; |
| |
| do { |
| if (curIt == null) { |
| while (partIt.hasNext()) { |
| GridDhtLocalPartition part = partIt.next(); |
| |
| if (primary == part.primary(topVer)) { |
| curIt = part.entries(ctx.cacheId()).iterator(); |
| |
| break; |
| } |
| } |
| } |
| |
| if (curIt != null) { |
| if (curIt.hasNext()) { |
| next = curIt.next(); |
| |
| break; |
| } |
| else |
| curIt = null; |
| } |
| } |
| while (partIt.hasNext()); |
| } |
| }; |
| } |
| } |
| |
| /** |
| * Multi update future. |
| */ |
| private static class MultiUpdateFuture extends GridFutureAdapter<IgniteUuid> { |
| /** Topology version. */ |
| private AffinityTopologyVersion topVer; |
| |
| /** |
| * @param topVer Topology version. |
| */ |
| private MultiUpdateFuture(@NotNull AffinityTopologyVersion topVer) { |
| this.topVer = topVer; |
| } |
| |
| /** |
| * @return Topology version. |
| */ |
| private AffinityTopologyVersion topologyVersion() { |
| return topVer; |
| } |
| } |
| |
| /** |
| * |
| */ |
| protected abstract class MessageHandler<M> implements IgniteBiInClosure<UUID, M> { |
| /** */ |
| private static final long serialVersionUID = 0L; |
| |
| /** {@inheritDoc} */ |
| @Override public void apply(UUID nodeId, M msg) { |
| ClusterNode node = ctx.node(nodeId); |
| |
| if (node == null) { |
| if (log.isDebugEnabled()) |
| log.debug("Received message from failed node [node=" + nodeId + ", msg=" + msg + ']'); |
| |
| return; |
| } |
| |
| if (log.isDebugEnabled()) |
| log.debug("Received message from node [node=" + nodeId + ", msg=" + msg + ']'); |
| |
| onMessage(node, msg); |
| } |
| |
| /** |
| * @param node Node. |
| * @param msg Message. |
| */ |
| protected abstract void onMessage(ClusterNode node, M msg); |
| } |
| } |