blob: f58ac9d30b896ccd4b9256d81d7b592a7e5bb716 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.NodeStoppingException;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
*
*/
public final class GridDhtGetSingleFuture<K, V> extends GridFutureAdapter<GridCacheEntryInfo>
implements GridDhtFuture<GridCacheEntryInfo> {
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
/** Message ID. */
private long msgId;
/** */
private UUID reader;
/** Read through flag. */
private boolean readThrough;
/** Context. */
private GridCacheContext<K, V> cctx;
/** Key. */
private KeyCacheObject key;
/** */
private final boolean addRdr;
/** Reserved partitions. */
private int part = -1;
/** Future ID. */
private IgniteUuid futId;
/** Version. */
private GridCacheVersion ver;
/** Topology version .*/
private AffinityTopologyVersion topVer;
/** Retry because ownership changed. */
private Integer retry;
/** Task name. */
private int taskNameHash;
/** Expiry policy. */
private IgniteCacheExpiryPolicy expiryPlc;
/** Skip values flag. */
private boolean skipVals;
/** Recovery context flag. */
private final boolean recovery;
/** Transaction label. */
private final String txLbl;
/**
* @param cctx Context.
* @param msgId Message ID.
* @param reader Reader.
* @param key Key.
* @param addRdr Add reader flag.
* @param readThrough Read through flag.
* @param topVer Topology version.
* @param taskNameHash Task name hash code.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
* @param txLbl Transaction label.
*/
public GridDhtGetSingleFuture(
GridCacheContext<K, V> cctx,
long msgId,
UUID reader,
KeyCacheObject key,
boolean addRdr,
boolean readThrough,
@NotNull AffinityTopologyVersion topVer,
int taskNameHash,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
boolean recovery,
@Nullable String txLbl
) {
assert reader != null;
assert key != null;
this.reader = reader;
this.cctx = cctx;
this.msgId = msgId;
this.key = key;
this.addRdr = addRdr;
this.readThrough = readThrough;
this.topVer = topVer;
this.taskNameHash = taskNameHash;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
this.recovery = recovery;
this.txLbl = txLbl;
futId = IgniteUuid.randomUuid();
ver = cctx.cache().nextVersion();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtGetSingleFuture.class);
}
/**
* Initializes future.
*/
void init() {
map();
}
/**
* @return Future ID.
*/
public IgniteUuid futureId() {
return futId;
}
/**
* @return Future version.
*/
public GridCacheVersion version() {
return ver;
}
/** {@inheritDoc} */
@Override public boolean onDone(GridCacheEntryInfo res, Throwable err) {
if (super.onDone(res, err)) {
// Release all partitions reserved by this future.
if (part != -1)
cctx.topology().releasePartitions(part);
return true;
}
return false;
}
/**
*
*/
private void map() {
// TODO Get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251.
if (cctx.group().preloader().needForceKeys()) {
GridDhtFuture<Object> fut = cctx.group().preloader().request(
cctx,
Collections.singleton(key),
topVer);
if (fut != null) {
if (!F.isEmpty(fut.invalidPartitions())) {
assert fut.invalidPartitions().size() == 1 : fut.invalidPartitions();
retry = F.first(fut.invalidPartitions());
onDone((GridCacheEntryInfo)null);
return;
}
fut.listen(
new IgniteInClosure<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> fut) {
Throwable e = fut.error();
if (e != null) { // Check error first.
if (log.isDebugEnabled())
log.debug("Failed to request keys from preloader " +
"[keys=" + key + ", err=" + e + ']');
if (e instanceof NodeStoppingException)
return;
onDone(e);
}
else
map0(true);
}
}
);
return;
}
}
map0(false);
}
/**
*
*/
private void map0(boolean forceKeys) {
assert retry == null : retry;
if (!map(key, forceKeys)) {
retry = cctx.affinity().partition(key);
if (!isDone())
onDone((GridCacheEntryInfo)null);
return;
}
getAsync();
}
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
return retry == null ? Collections.<Integer>emptyList() : Collections.singletonList(retry);
}
/**
* @param key Key.
* @return {@code True} if mapped.
*/
private boolean map(KeyCacheObject key, boolean forceKeys) {
try {
int keyPart = cctx.affinity().partition(key);
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
cache().topology().localPartition(keyPart, topVer, true) :
cache().topology().localPartition(keyPart);
if (part == null)
return false;
assert this.part == -1;
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
if (forceKeys || (part.state() == OWNING || part.state() == LOST)) {
this.part = part.id();
return true;
}
else {
part.release();
return false;
}
}
else
return false;
}
catch (GridDhtInvalidPartitionException ex) {
return false;
}
}
/**
*
*/
private void getAsync() {
assert part != -1;
String taskName0 = cctx.kernalContext().job().currentTaskName();
if (taskName0 == null)
taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
final String taskName = taskName0;
IgniteInternalFuture<Boolean> rdrFut = null;
ReaderArguments readerArgs = null;
if (addRdr && !skipVals && !cctx.localNodeId().equals(reader)) {
while (true) {
GridDhtCacheEntry e = cache().entryExx(key, topVer);
try {
if (e.obsolete())
continue;
boolean addReader = !e.deleted();
if (addReader) {
e.unswap(false);
// Entry will be removed on touch() if no data in cache,
// but they could be loaded from store,
// we have to add reader again later.
if (readerArgs == null)
readerArgs = new ReaderArguments(reader, msgId, topVer);
}
// Register reader. If there are active transactions for this entry,
// then will wait for their completion before proceeding.
// TODO: IGNITE-3498:
// TODO: What if any transaction we wait for actually removes this entry?
// TODO: In this case seems like we will be stuck with untracked near entry.
// TODO: To fix, check that reader is contained in the list of readers once
// TODO: again after the returned future completes - if not, try again.
rdrFut = addReader ? e.addReader(reader, msgId, topVer) : null;
break;
}
catch (IgniteCheckedException err) {
onDone(err);
return;
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry when getting a DHT value: " + e);
}
finally {
e.touch();
}
}
}
IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut;
if (rdrFut == null || rdrFut.isDone()) {
fut = cache().getDhtAllAsync(
Collections.singleton(key),
readerArgs,
readThrough,
taskName,
expiryPlc,
skipVals,
recovery,
txLbl);
}
else {
final ReaderArguments args = readerArgs;
rdrFut.listen(
new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
@Override public void apply(IgniteInternalFuture<Boolean> fut) {
Throwable e = fut.error();
if (e != null) {
onDone(e);
return;
}
IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut0 =
cache().getDhtAllAsync(
Collections.singleton(key),
args,
readThrough,
taskName,
expiryPlc,
skipVals,
recovery,
null);
fut0.listen(createGetFutureListener());
}
}
);
return;
}
if (fut.isDone())
onResult(fut);
else
fut.listen(createGetFutureListener());
}
/**
* @return Listener for get future.
*/
@NotNull private IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>>
createGetFutureListener() {
return new IgniteInClosure<IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>>() {
@Override public void apply(
IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut
) {
onResult(fut);
}
};
}
/**
* @param fut Completed future to finish this process with.
*/
private void onResult(IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut) {
assert fut.isDone();
if (fut.error() != null)
onDone(fut.error());
else {
try {
onDone(toEntryInfo(fut.get()));
}
catch (IgniteCheckedException ignored) {
assert false; // Should never happen.
}
}
}
/**
* @param map Map to convert.
* @return List of infos.
*/
private GridCacheEntryInfo toEntryInfo(Map<KeyCacheObject, EntryGetResult> map) {
if (map.isEmpty())
return null;
EntryGetResult val = map.get(key);
assert val != null;
GridCacheEntryInfo info = new GridCacheEntryInfo();
info.cacheId(cctx.cacheId());
info.key(key);
info.value(skipVals ? null : (CacheObject)val.value());
info.version(val.version());
info.expireTime(val.expireTime());
info.ttl(val.ttl());
return info;
}
/**
* @return DHT cache.
*/
private GridDhtCacheAdapter<K, V> cache() {
return (GridDhtCacheAdapter<K, V>)cctx.cache();
}
}