blob: 742fbfe5780c4156cce6fd4bff5b077b40ea99a2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht;
import org.apache.ignite.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.processors.affinity.*;
import org.apache.ignite.internal.processors.cache.*;
import org.apache.ignite.internal.processors.cache.transactions.*;
import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.util.*;
import org.apache.ignite.internal.util.future.*;
import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
import org.jetbrains.annotations.*;
import java.util.*;
import java.util.concurrent.atomic.*;
/**
*
*/
public final class GridDhtGetFuture<K, V> extends GridCompoundIdentityFuture<Collection<GridCacheEntryInfo>>
implements GridDhtFuture<Collection<GridCacheEntryInfo>> {
/** */
private static final long serialVersionUID = 0L;
/** Logger reference. */
private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
/** Logger. */
private static IgniteLogger log;
/** Message ID. */
private long msgId;
/** */
private UUID reader;
/** Reload flag. */
private boolean reload;
/** Read through flag. */
private boolean readThrough;
/** Context. */
private GridCacheContext<K, V> cctx;
/** Keys. */
private LinkedHashMap<KeyCacheObject, Boolean> keys;
/** Reserved partitions. */
private Collection<GridDhtLocalPartition> parts = new GridLeanSet<>(5);
/** Future ID. */
private IgniteUuid futId;
/** Version. */
private GridCacheVersion ver;
/** Topology version .*/
private AffinityTopologyVersion topVer;
/** Transaction. */
private IgniteTxLocalEx tx;
/** Retries because ownership changed. */
private Collection<Integer> retries = new GridLeanSet<>();
/** Subject ID. */
private UUID subjId;
/** Task name. */
private int taskNameHash;
/** Expiry policy. */
private IgniteCacheExpiryPolicy expiryPlc;
/** Skip values flag. */
private boolean skipVals;
/**
* @param cctx Context.
* @param msgId Message ID.
* @param reader Reader.
* @param keys Keys.
* @param readThrough Read through flag.
* @param reload Reload flag.
* @param tx Transaction.
* @param topVer Topology version.
* @param subjId Subject ID.
* @param taskNameHash Task name hash code.
* @param expiryPlc Expiry policy.
* @param skipVals Skip values flag.
*/
public GridDhtGetFuture(
GridCacheContext<K, V> cctx,
long msgId,
UUID reader,
LinkedHashMap<KeyCacheObject, Boolean> keys,
boolean readThrough,
boolean reload,
@Nullable IgniteTxLocalEx tx,
@NotNull AffinityTopologyVersion topVer,
@Nullable UUID subjId,
int taskNameHash,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
boolean skipVals
) {
super(cctx.kernalContext(), CU.<GridCacheEntryInfo>collectionsReducer());
assert reader != null;
assert !F.isEmpty(keys);
this.reader = reader;
this.cctx = cctx;
this.msgId = msgId;
this.keys = keys;
this.readThrough = readThrough;
this.reload = reload;
this.tx = tx;
this.topVer = topVer;
this.subjId = subjId;
this.taskNameHash = taskNameHash;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
futId = IgniteUuid.randomUuid();
ver = tx == null ? cctx.versions().next() : tx.xidVersion();
if (log == null)
log = U.logger(cctx.kernalContext(), logRef, GridDhtGetFuture.class);
}
/**
* Initializes future.
*/
void init() {
map(keys);
markInitialized();
}
/** {@inheritDoc} */
@Override public Collection<Integer> invalidPartitions() {
return retries;
}
/**
* @return Future ID.
*/
public IgniteUuid futureId() {
return futId;
}
/**
* @return Future version.
*/
public GridCacheVersion version() {
return ver;
}
/** {@inheritDoc} */
@Override public boolean onDone(Collection<GridCacheEntryInfo> res, Throwable err) {
if (super.onDone(res, err)) {
// Release all partitions reserved by this future.
for (GridDhtLocalPartition part : parts)
part.release();
return true;
}
return false;
}
/**
* @param keys Keys.
*/
private void map(final LinkedHashMap<KeyCacheObject, Boolean> keys) {
GridDhtFuture<Object> fut = cctx.dht().dhtPreloader().request(keys.keySet(), topVer);
if (!F.isEmpty(fut.invalidPartitions()))
retries.addAll(fut.invalidPartitions());
add(new GridEmbeddedFuture<>(
new IgniteBiClosure<Object, Exception, Collection<GridCacheEntryInfo>>() {
@Override public Collection<GridCacheEntryInfo> apply(Object o, Exception e) {
if (e != null) { // Check error first.
if (log.isDebugEnabled())
log.debug("Failed to request keys from preloader [keys=" + keys + ", err=" + e + ']');
onDone(e);
}
LinkedHashMap<KeyCacheObject, Boolean> mappedKeys = U.newLinkedHashMap(keys.size());
// Assign keys to primary nodes.
for (Map.Entry<KeyCacheObject, Boolean> key : keys.entrySet()) {
int part = cctx.affinity().partition(key.getKey());
if (!retries.contains(part)) {
if (!map(key.getKey(), parts))
retries.add(part);
else
mappedKeys.put(key.getKey(), key.getValue());
}
}
// Add new future.
add(getAsync(mappedKeys));
// Finish this one.
return Collections.emptyList();
}
},
fut));
}
/**
* @param key Key.
* @param parts Parts to map.
* @return {@code True} if mapped.
*/
private boolean map(KeyCacheObject key, Collection<GridDhtLocalPartition> parts) {
GridDhtLocalPartition part = topVer.topologyVersion() > 0 ?
cache().topology().localPartition(cctx.affinity().partition(key), topVer, true) :
cache().topology().localPartition(key, false);
if (part == null)
return false;
if (!parts.contains(part)) {
// By reserving, we make sure that partition won't be unloaded while processed.
if (part.reserve()) {
parts.add(part);
return true;
}
else
return false;
}
else
return true;
}
/**
* @param keys Keys to get.
* @return Future for local get.
*/
@SuppressWarnings( {"unchecked", "IfMayBeConditional"})
private IgniteInternalFuture<Collection<GridCacheEntryInfo>> getAsync(
final LinkedHashMap<KeyCacheObject, Boolean> keys)
{
if (F.isEmpty(keys))
return new GridFinishedFuture<Collection<GridCacheEntryInfo>>(
Collections.<GridCacheEntryInfo>emptyList());
final Collection<GridCacheEntryInfo> infos = new LinkedList<>();
String taskName0 = cctx.kernalContext().job().currentTaskName();
if (taskName0 == null)
taskName0 = cctx.kernalContext().task().resolveTaskName(taskNameHash);
final String taskName = taskName0;
GridCompoundFuture<Boolean, Boolean> txFut = null;
for (Map.Entry<KeyCacheObject, Boolean> k : keys.entrySet()) {
while (true) {
GridDhtCacheEntry e = cache().entryExx(k.getKey(), topVer);
try {
GridCacheEntryInfo info = e.info();
// If entry is obsolete.
if (info == null)
continue;
boolean addReader = (!e.deleted() && k.getValue() && !skipVals);
if (addReader)
e.unswap(false);
// Register reader. If there are active transactions for this entry,
// then will wait for their completion before proceeding.
// TODO: GG-4003:
// TODO: What if any transaction we wait for actually removes this entry?
// TODO: In this case seems like we will be stuck with untracked near entry.
// TODO: To fix, check that reader is contained in the list of readers once
// TODO: again after the returned future completes - if not, try again.
// TODO: Also, why is info read before transactions are complete, and not after?
IgniteInternalFuture<Boolean> f = addReader ? e.addReader(reader, msgId, topVer) : null;
if (f != null) {
if (txFut == null)
txFut = new GridCompoundFuture<>(CU.boolReducer());
txFut.add(f);
}
infos.add(info);
break;
}
catch (IgniteCheckedException err) {
return new GridFinishedFuture<>(err);
}
catch (GridCacheEntryRemovedException ignore) {
if (log.isDebugEnabled())
log.debug("Got removed entry when getting a DHT value: " + e);
}
finally {
cctx.evicts().touch(e, topVer);
}
}
}
if (txFut != null)
txFut.markInitialized();
IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> fut;
if (txFut == null || txFut.isDone()) {
if (reload && cctx.readThrough() && cctx.store().configured()) {
fut = cache().reloadAllAsync0(keys.keySet(),
true,
skipVals,
subjId,
taskName);
}
else {
if (tx == null) {
fut = cache().getDhtAllAsync(keys.keySet(),
readThrough,
subjId,
taskName,
expiryPlc,
skipVals);
}
else {
fut = tx.getAllAsync(cctx,
keys.keySet(),
null,
/*deserialize portable*/false,
skipVals,
/*keep cache objects*/true,
/*skip store*/!readThrough);
}
}
}
else {
// If we are here, then there were active transactions for some entries
// when we were adding the reader. In that case we must wait for those
// transactions to complete.
fut = new GridEmbeddedFuture<>(
txFut,
new C2<Boolean, Exception, IgniteInternalFuture<Map<KeyCacheObject, CacheObject>>>() {
@Override public IgniteInternalFuture<Map<KeyCacheObject, CacheObject>> apply(Boolean b, Exception e) {
if (e != null)
throw new GridClosureException(e);
if (reload && cctx.readThrough() && cctx.store().configured()) {
return cache().reloadAllAsync0(keys.keySet(),
true,
skipVals,
subjId,
taskName);
}
else {
if (tx == null) {
return cache().getDhtAllAsync(keys.keySet(),
readThrough,
subjId,
taskName,
expiryPlc,
skipVals);
}
else {
return tx.getAllAsync(cctx,
keys.keySet(),
null,
/*deserialize portable*/false,
skipVals,
/*keep cache objects*/true,
/*skip store*/!readThrough);
}
}
}
}
);
}
return new GridEmbeddedFuture<>(
new C2<Map<KeyCacheObject, CacheObject>, Exception, Collection<GridCacheEntryInfo>>() {
@Override public Collection<GridCacheEntryInfo> apply(Map<KeyCacheObject, CacheObject> map, Exception e) {
if (e != null) {
onDone(e);
return Collections.emptyList();
}
else {
for (Iterator<GridCacheEntryInfo> it = infos.iterator(); it.hasNext();) {
GridCacheEntryInfo info = it.next();
Object v = map.get(info.key());
if (v == null)
it.remove();
else if (!skipVals)
info.value((CacheObject)v);
}
return infos;
}
}
},
fut);
}
/**
* @return DHT cache.
*/
private GridDhtCacheAdapter<K, V> cache() {
return (GridDhtCacheAdapter<K, V>)cctx.cache();
}
}