| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.ignite.internal.processors.cache.distributed.dht; |
| |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.ignite.IgniteCheckedException; |
| import org.apache.ignite.IgniteLogger; |
| import org.apache.ignite.internal.IgniteInternalFuture; |
| import org.apache.ignite.internal.NodeStoppingException; |
| import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion; |
| import org.apache.ignite.internal.processors.cache.CacheObject; |
| import org.apache.ignite.internal.processors.cache.EntryGetResult; |
| import org.apache.ignite.internal.processors.cache.GridCacheContext; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo; |
| import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException; |
| import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy; |
| import org.apache.ignite.internal.processors.cache.KeyCacheObject; |
| import org.apache.ignite.internal.processors.cache.ReaderArguments; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException; |
| import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition; |
| import org.apache.ignite.internal.processors.cache.version.GridCacheVersion; |
| import org.apache.ignite.internal.util.future.GridFutureAdapter; |
| import org.apache.ignite.internal.util.typedef.F; |
| import org.apache.ignite.internal.util.typedef.internal.U; |
| import org.apache.ignite.lang.IgniteInClosure; |
| import org.apache.ignite.lang.IgniteUuid; |
| import org.jetbrains.annotations.NotNull; |
| import org.jetbrains.annotations.Nullable; |
| |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST; |
| import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING; |
| |
| /** |
| * |
| */ |
| public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo> |
| implements GridDhtFuture<GridCacheEntryInfo> { |
| /** Logger reference. */ |
| private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>(); |
| |
| /** Logger. */ |
| private static IgniteLogger log; |
| |
| /** Message ID. */ |
| private long msgId; |
| |
| /** */ |
| private UUID reader; |
| |
| /** Read through flag. */ |
| private boolean readThrough; |
| |
| /** Context. */ |
| private GridCacheContext<K, V> cctx; |
| |
| /** Key. */ |
| private KeyCacheObject key; |
| |
| /** */ |
| private final boolean addRdr; |
| |
| /** Reserved partitions. */ |
| private int part = -1; |
| |
| /** Future ID. */ |
| private IgniteUuid futId; |
| |
| /** Version. */ |
| private GridCacheVersion ver; |
| |
| /** Topology version .*/ |
| private AffinityTopologyVersion topVer; |
| |
| /** Retry because ownership changed. */ |
| private Integer retry; |
| |
| /** Task name. */ |
| private int taskNameHash; |
| |
| /** Expiry policy. */ |
| private IgniteCacheExpiryPolicy expiryPlc; |
| |
| /** Skip values flag. */ |
| private boolean skipVals; |
| |
| /** Recovery context flag. */ |
| private final boolean recovery; |
| |
| /** Transaction label. */ |
| private final String txLbl; |
| |
| /** |
| * @param cctx Context. |
| * @param msgId Message ID. |
| * @param reader Reader. |
| * @param key Key. |
| * @param addRdr Add reader flag. |
| * @param readThrough Read through flag. |
| * @param topVer Topology version. |
| * @param taskNameHash Task name hash code. |
| * @param expiryPlc Expiry policy. |
| * @param skipVals Skip values flag. |
| * @param txLbl Transaction label. |
| */ |
| public GridDhtGetSingleFuture( |
| GridCacheContext<K, V> cctx, |
| long msgId, |
| UUID reader, |
| KeyCacheObject key, |
| boolean addRdr, |
| boolean readThrough, |
| @NotNull AffinityTopologyVersion topVer, |
| int taskNameHash, |
| @Nullable IgniteCacheExpiryPolicy expiryPlc, |
| boolean skipVals, |
| boolean recovery, |
| @Nullable String txLbl |
| ) { |
| assert reader != null; |
| assert key != null; |
| |
| this.reader = reader; |
| this.cctx = cctx; |
| this.msgId = msgId; |
| this.key = key; |
| this.addRdr = addRdr; |
| this.readThrough = readThrough; |
| this.topVer = topVer; |
| this.taskNameHash = taskNameHash; |
| this.expiryPlc = expiryPlc; |
| this.skipVals = skipVals; |
| this.recovery = recovery; |
| this.txLbl = txLbl; |
| |
| futId = IgniteUuid.randomUuid(); |
| |
| ver = cctx.cache().nextVersion(); |
| |
| if (log == null) |
| log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class); |
| } |
| |
| /** |
| * Initializes future. |
| */ |
| void init() { |
| map(); |
| } |
| |
| /** |
| * @return Future ID. |
| */ |
| public IgniteUuid futureId() { |
| return futId; |
| } |
| |
| /** |
| * @return Future version. |
| */ |
| public GridCacheVersion version() { |
| return ver; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public boolean onDone(GridCacheEntryInfo res, Throwable err) { |
| if (super.onDone(res, err)) { |
| // Release all partitions reserved by this future. |
| if (part != -1) |
| cctx.topology().releasePartitions(part); |
| |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /** |
| * |
| */ |
| private void map() { |
| // TODO Get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251. |
| if (cctx.group().preloader().needForceKeys()) { |
| GridDhtFuture<Object> fut = cctx.group().preloader().request( |
| cctx, |
| Collections.singleton(key), |
| topVer); |
| |
| if (fut != null) { |
| if (!F.isEmpty(fut.invalidPartitions())) { |
| assert fut.invalidPartitions().size() == 1 : fut.invalidPartitions(); |
| |
| retry = F.first(fut.invalidPartitions()); |
| |
| onDone((GridCacheEntryInfo)null); |
| |
| return; |
| } |
| |
| fut.listen( |
| new IgniteInClosure<IgniteInternalFuture<Object>>() { |
| @Override public void apply(IgniteInternalFuture<Object> fut) { |
| Throwable e = fut.error(); |
| |
| if (e != null) { // Check error first. |
| if (log.isDebugEnabled()) |
| log.debug("Failed to request keys from preloader " + |
| "[keys=" + key + ", err=" + e + ']'); |
| |
| if (e instanceof NodeStoppingException) |
| return; |
| |
| onDone(e); |
| } |
| else |
| map0(true); |
| } |
| } |
| ); |
| |
| return; |
| } |
| } |
| |
| map0(false); |
| } |
| |
| /** |
| * |
| */ |
| private void map0(boolean forceKeys) { |
| assert retry == null : retry; |
| |
| if (!map(key, forceKeys)) { |
| retry = cctx.affinity().partition(key); |
| |
| if (!isDone()) |
| onDone((GridCacheEntryInfo)null); |
| |
| return; |
| } |
| |
| getAsync(); |
| } |
| |
| /** {@inheritDoc} */ |
| @Override public Collection<Integer> invalidPartitions() { |
| return retry == null ? Collections.<Integer>emptyList() : Collections.singletonList(retry); |
| } |
| |
| /** |
| * @param key Key. |
| * @return {@code True} if mapped. |
| */ |
| private boolean map(KeyCacheObject key, boolean forceKeys) { |
| try { |
| int keyPart = cctx.affinity().partition(key); |
| |
| GridDhtLocalPartition part = topVer.topologyVersion() > 0 ? |
| cache().topology().localPartition(keyPart, topVer, true) : |
| cache().topology().localPartition(keyPart); |
| |
| if (part == null) |
| return false; |
| |
| assert this.part == -1; |
| |
| // By reserving, we make sure that partition won't be unloaded while processed. |
| if (part.reserve()) { |
| if (forceKeys || (part.state() == OWNING || part.state() == LOST)) { |
| this.part = part.id(); |
| |
| return true; |
| } |
| else { |
| part.release(); |
| |
| return false; |
| } |
| } |
| else |
| return false; |
| } |
| catch (GridDhtInvalidPartitionException ex) { |
| return false; |
| } |
| } |
| |
| /** |
| * |
| */ |
| private void getAsync() { |
| assert part != -1; |
| |
| String taskName0 = cctx.kernalContext().job().currentTaskName(); |
| |
| if (taskName0 == null) |
| taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash); |
| |
| final String taskName = taskName0; |
| |
| IgniteInternalFuture<Boolean> rdrFut = null; |
| |
| ReaderArguments readerArgs = null; |
| |
| if (addRdr && !skipVals && !cctx.localNodeId().equals(reader)) { |
| while (true) { |
| GridDhtCacheEntry e = cache().entryExx(key, topVer); |
| |
| try { |
| if (e.obsolete()) |
| continue; |
| |
| boolean addReader = !e.deleted(); |
| |
| if (addReader) { |
| e.unswap(false); |
| |
| // Entry will be removed on touch() if no data in cache, |
| // but they could be loaded from store, |
| // we have to add reader again later. |
| if (readerArgs == null) |
| readerArgs = new ReaderArguments(reader, msgId, topVer); |
| } |
| |
| // Register reader. If there are active transactions for this entry, |
| // then will wait for their completion before proceeding. |
| // TODO: IGNITE-3498: |
| // TODO: What if any transaction we wait for actually removes this entry? |
| // TODO: In this case seems like we will be stuck with untracked near entry. |
| // TODO: To fix, check that reader is contained in the list of readers once |
| // TODO: again after the returned future completes - if not, try again. |
| rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null; |
| |
| break; |
| } |
| catch (IgniteCheckedException err) { |
| onDone(err); |
| |
| return; |
| } |
| catch (GridCacheEntryRemovedException ignore) { |
| if (log.isDebugEnabled()) |
| log.debug("Got removed entry when getting a DHT value: " + e); |
| } |
| finally { |
| e.touch(); |
| } |
| } |
| } |
| |
| IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut; |
| |
| if (rdrFut == null || rdrFut.isDone()) { |
| fut = cache().getDhtAllAsync( |
| Collections.singleton(key), |
| readerArgs, |
| readThrough, |
| taskName, |
| expiryPlc, |
| skipVals, |
| recovery, |
| txLbl); |
| } |
| else { |
| final ReaderArguments args = readerArgs; |
| |
| rdrFut.listen( |
| new IgniteInClosure<IgniteInternalFuture<Boolean>>() { |
| @Override public void apply(IgniteInternalFuture<Boolean> fut) { |
| Throwable e = fut.error(); |
| |
| if (e != null) { |
| onDone(e); |
| |
| return; |
| } |
| |
| IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut0 = |
| cache().getDhtAllAsync( |
| Collections.singleton(key), |
| args, |
| readThrough, |
| taskName, |
| expiryPlc, |
| skipVals, |
| recovery, |
| null); |
| |
| fut0.listen(createGetFutureListener()); |
| } |
| } |
| ); |
| |
| return; |
| } |
| |
| if (fut.isDone()) |
| onResult(fut); |
| else |
| fut.listen(createGetFutureListener()); |
| } |
| |
| /** |
| * @return Listener for get future. |
| */ |
| @NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>> |
| createGetFutureListener() { |
| return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>>() { |
| @Override public void apply( |
| IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut |
| ) { |
| onResult(fut); |
| } |
| }; |
| } |
| |
| /** |
| * @param fut Completed future to finish this process with. |
| */ |
| private void onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut) { |
| assert fut.isDone(); |
| |
| if (fut.error() != null) |
| onDone(fut.error()); |
| else { |
| try { |
| onDone(toEntryInfo(fut.get())); |
| } |
| catch (IgniteCheckedException ignored) { |
| assert false; // Should never happen. |
| } |
| } |
| } |
| |
| /** |
| * @param map Map to convert. |
| * @return List of infos. |
| */ |
| private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, EntryGetResult> map) { |
| if (map.isEmpty()) |
| return null; |
| |
| EntryGetResult val = map.get(key); |
| |
| assert val != null; |
| |
| GridCacheEntryInfo info = new GridCacheEntryInfo(); |
| |
| info.cacheId(cctx.cacheId()); |
| info.key(key); |
| info.value(skipVals ? null : (CacheObject)val.value()); |
| info.version(val.version()); |
| info.expireTime(val.expireTime()); |
| info.ttl(val.ttl()); |
| |
| return info; |
| } |
| |
| /** |
| * @return DHT cache. |
| */ |
| private GridDhtCacheAdapter<K, V> cache() { |
| return (GridDhtCacheAdapter<K, V>)cctx.cache(); |
| } |
| } |