blob: 97e025ebdf20b30a90b4c79a6172aa622d1675f7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.EntryGetResult;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheEntryInfo;
import org.apache.ignite.internal.processors.cache.GridCacheEntryRemovedException;
import org.apache.ignite.internal.processors.cache.IgniteCacheExpiryPolicy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
import org.apache.ignite.internal.processors.cache.ReaderArguments;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtInvalidPartitionException;
import org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtLocalPartition;
import org.apache.ignite.internal.processors.cache.mvcc.MvccSnapshot;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.util.future.GridCompoundFuture;
import org.apache.ignite.internal.util.future.GridCompoundIdentityFuture;
import org.apache.ignite.internal.util.future.GridEmbeddedFuture;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.lang.GridClosureException;
import org.apache.ignite.internal.util.typedef.C2;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteUuid;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.LOST;
import static org.apache.ignite.internal.processors.cache.distributed.dht.topology.GridDhtPartitionState.OWNING;
/**
*
*/
public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Collection<GridCacheEntryInfo>>
implements GridDhtFuture<Collection<GridCacheEntryInfo>> {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
/** Message ID. */
private long msgId;
/** */
private UUID reader;
/** Read through flag. */
private boolean readThrough;
/** Context. */
private GridCacheContext<K, V> cctx;
/** Keys. */
private Map<KeyCacheObject, Boolean> keys;
/** Reserved partitions. */
private int[] parts;
/** Future ID. */
private IgniteUuid futId;
/** Version. */
private GridCacheVersion ver;
/** Topology version .*/
private AffinityTopologyVersion topVer;
/** Retries because ownership changed. */
private Collection<Integer> retries;
/** Task name. */
private int taskNameHash;
/** Expiry policy. */
private IgniteCacheExpiryPolicy expiryPlc;
/** Skip values flag. */
private boolean skipVals;
/** */
private final boolean recovery;
/** */
private final boolean addReaders;
/** Transaction label. */
private final String txLbl;
/** */
private final MvccSnapshot mvccSnapshot;
/**
* @param cctx Context.
* @param msgId Message ID.
* @param reader Reader.
* @param keys Keys.
* @param readThrough Read through flag.
* @param topVer Topology version.
* @param taskNameHash Task name hash code.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
* @param txLbl Transaction label.
* @param mvccSnapshot MVCC snapshot.
*/
public GridDhtGetFuture(
GridCacheContext<K, V> cctx,
long msgId,
UUID reader,
Map<KeyCacheObject, Boolean> keys,
boolean readThrough,
@NotNull AffinityTopologyVersion topVer,
int taskNameHash,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals,
boolean recovery,
boolean addReaders,
@Nullable String txLbl,
MvccSnapshot mvccSnapshot
) {
super(CU.<GridCacheEntryInfo>collectionsReducer(keys.size()));
assert reader != null;
assert !F.isEmpty(keys);
this.reader = reader;
this.cctx = cctx;
this.msgId = msgId;
this.keys = keys;
this.readThrough = readThrough;
this.topVer = topVer;
this.taskNameHash = taskNameHash;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
this.recovery = recovery;
this.addReaders = addReaders;
this.txLbl = txLbl;
this.mvccSnapshot = mvccSnapshot;
futId = IgniteUuid.randomUuid();
ver = cctx.cache().nextVersion();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtGetFuture.class);
}
/**
* Initializes future.
*/
void init() {
// TODO get rid of force keys request https://issues.apache.org/jira/browse/IGNITE-10251
GridDhtFuture<Object> fut = cctx.group().preloader().request(cctx, keys.keySet(), topVer);
if (fut != null) {
if (!F.isEmpty(fut.invalidPartitions())) {
if (retries == null)
retries = new HashSet<>();
retries.addAll(fut.invalidPartitions());
}
fut.listen(new CI1<IgniteInternalFuture<Object>>() {
@Override public void apply(IgniteInternalFuture<Object> fut) {
try {
fut.get();
}
catch (IgniteCheckedException e) {
if (log.isDebugEnabled())
log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
onDone(e);
return;
}
map0(keys, true);
markInitialized();
}
});
}
else {
map0(keys, false);
markInitialized();
}
}
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
return retries == null ? Collections.<Integer>emptyList() : retries;
}
/**
* @return Future ID.
*/
public IgniteUuid futureId() {
return futId;
}
/**
* @return Future version.
*/
public GridCacheVersion version() {
return ver;
}
/** {@inheritDoc} */
@Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err) {
if (super.onDone(res, err)) {
// Release all partitions reserved by this future.
if (parts != null)
cctx.topology().releasePartitions(parts);
return true;
}
return false;
}
/**
* @param keys Keys to map.
*/
private void map0(Map<KeyCacheObject, Boolean> keys, boolean forceKeys) {
Map<KeyCacheObject, Boolean> mappedKeys = null;
// Assign keys to primary nodes.
for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
int part = cctx.affinity().partition(key.getKey());
if (retries == null || !retries.contains(part)) {
if (!map(key.getKey(), forceKeys)) {
if (retries == null)
retries = new HashSet<>();
retries.add(part);
if (mappedKeys == null) {
mappedKeys = U.newLinkedHashMap(keys.size());
for (Map.Entry<KeyCacheObject, Boolean> key1 : keys.entrySet()) {
if (key1.getKey() == key.getKey())
break;
mappedKeys.put(key.getKey(), key1.getValue());
}
}
}
else if (mappedKeys != null)
mappedKeys.put(key.getKey(), key.getValue());
}
}
// Add new future.
IgniteInternalFuture<Collection<GridCacheEntryInfo>> fut = getAsync(mappedKeys == null ? keys : mappedKeys);
// Optimization to avoid going through compound future,
// if getAsync() has been completed and no other futures added to this
// compound future.
if (fut.isDone() && !hasFutures()) {
if (fut.error() != null)
onDone(fut.error());
else
onDone(fut.result());
return;
}
add(fut);
}
/**
* @param key Key.
* @return {@code True} if mapped.
*/
private boolean map(KeyCacheObject key, boolean forceKeys) {
try {
int keyPart = cctx.affinity().partition(key);
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
cache().topology().localPartition(keyPart, topVer, true) :
cache().topology().localPartition(keyPart);
if (part == null)
return false;
if (parts == null || !F.contains(parts, part.id())) {
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
if (forceKeys || (part.state() == OWNING || part.state() == LOST)) {
parts = parts == null ? new int[1] : Arrays.copyOf(parts, parts.length + 1);
parts[parts.length - 1] = part.id();
return true;
}
else {
part.release();
return false;
}
}
else
return false;
}
else
return true;
}
catch (GridDhtInvalidPartitionException e) {
if (log.isDebugEnabled())
log.debug("Attempted to create a partition which does not belong to local node, will remap " +
"[key=" + key + ", part=" + e.partition() + ']');
return false;
}
}
/**
* @param keys Keys to get.
* @return Future for local get.
*/
private IgniteInternalFuture<Collection<GridCacheEntryInfo>> getAsync(
final Map<KeyCacheObject, Boolean> keys
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(
Collections.<GridCacheEntryInfo>emptyList());
String taskName0 = cctx.kernalContext().job().currentTaskName();
if (taskName0 == null)
taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
final String taskName = taskName0;
GridCompoundFuture<Boolean, Boolean> txFut = null;
ReaderArguments readerArgs = null;
if (addReaders && !skipVals && !cctx.localNodeId().equals(reader)) {
for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
if (!k.getValue())
continue;
while (true) {
GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
try {
if (e.obsolete())
continue;
boolean addReader = !e.deleted();
if (addReader) {
e.unswap(false);
// Entry will be removed on touch() if no data in cache,
// but they could be loaded from store,
// we have to add reader again later.
if (readerArgs == null)
readerArgs = new ReaderArguments(reader, msgId, topVer);
}
// Register reader. If there are active transactions for this entry,
// then will wait for their completion before proceeding.
// TODO: IGNITE-3498:
// TODO: What if any transaction we wait for actually removes this entry?
// TODO: In this case seems like we will be stuck with untracked near entry.
// TODO: To fix, check that reader is contained in the list of readers once
// TODO: again after the returned future completes - if not, try again.
IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
if (f != null) {
if (txFut == null)
txFut = new GridCompoundFuture<>(CU.boolReducer());
txFut.add(f);
}
break;
}
catch (IgniteCheckedException err) {
return new GridFinishedFuture<>(err);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry when getting a DHT value: " + e);
}
finally {
e.touch();
}
}
}
if (txFut != null)
txFut.markInitialized();
}
IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> fut;
if (txFut == null || txFut.isDone()) {
fut = cache().getDhtAllAsync(
keys.keySet(),
readerArgs,
readThrough,
taskName,
expiryPlc,
skipVals,
recovery,
txLbl,
mvccSnapshot);
}
else {
final ReaderArguments args = readerArgs;
// If we are here, then there were active transactions for some entries
// when we were adding the reader. In that case we must wait for those
// transactions to complete.
fut = new GridEmbeddedFuture<>(
txFut,
new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>>>() {
@Override public IgniteInternalFuture<Map<KeyCacheObject, EntryGetResult>> apply(Boolean b, Exception e) {
if (e != null)
throw new GridClosureException(e);
return cache().getDhtAllAsync(
keys.keySet(),
args,
readThrough,
taskName,
expiryPlc,
skipVals,
recovery,
txLbl,
mvccSnapshot);
}
}
);
}
if (fut.isDone()) {
if (fut.error() != null)
onDone(fut.error());
else
return new GridFinishedFuture<>(toEntryInfos(fut.result()));
}
return new GridEmbeddedFuture<>(
new C2<Map<KeyCacheObject, EntryGetResult>, Exception, Collection<GridCacheEntryInfo>>() {
@Override public Collection<GridCacheEntryInfo> apply(
Map<KeyCacheObject, EntryGetResult> map, Exception e
) {
if (e != null) {
onDone(e);
return Collections.emptyList();
}
else
return toEntryInfos(map);
}
},
fut);
}
/**
* @param map Map to convert.
* @return List of infos.
*/
private Collection<GridCacheEntryInfo> toEntryInfos(Map<KeyCacheObject, EntryGetResult> map) {
if (map.isEmpty())
return Collections.emptyList();
Collection<GridCacheEntryInfo> infos = new ArrayList<>(map.size());
for (Map.Entry<KeyCacheObject, EntryGetResult> entry : map.entrySet()) {
EntryGetResult val = entry.getValue();
assert val != null;
GridCacheEntryInfo info = new GridCacheEntryInfo();
info.cacheId(cctx.cacheId());
info.key(entry.getKey());
info.value(skipVals ? null : (CacheObject)val.value());
info.version(val.version());
info.expireTime(val.expireTime());
info.ttl(val.ttl());
infos.add(info);
}
return infos;
}
/**
* @return DHT cache.
*/
private GridDhtCacheAdapter<K, V> cache() {
return (GridDhtCacheAdapter<K, V>)cctx.cache();
}
}