IGNITE-1275 - Use topology-safe method in marshaller context.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
index 87bd3b6..dc0fd57 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/MarshallerContextImpl.java
@@ -136,7 +136,7 @@
throw new IllegalStateException("Failed to initialize marshaller context (grid is stopping).");
}
- String clsName = cache0.get(id);
+ String clsName = cache0.getTopologySafe(id);
if (clsName == null) {
File file = new File(workDir, id + ".classname");
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 992edd8..c7fbbfc 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -526,7 +526,8 @@
/*subj id*/null,
/*task name*/null,
/*deserialize portable*/false,
- /*skip values*/true
+ /*skip values*/true,
+ /*can remap*/true
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> map = fut.get();
@@ -560,7 +561,8 @@
/*subj id*/null,
/*task name*/null,
/*deserialize portable*/false,
- /*skip values*/true
+ /*skip values*/true,
+ /*can remap*/true
).chain(new CX1<IgniteInternalFuture<Map<K, V>>, Boolean>() {
@Override public Boolean applyx(IgniteInternalFuture<Map<K, V>> fut) throws IgniteCheckedException {
Map<K, V> kvMap = fut.get();
@@ -894,7 +896,7 @@
/** {@inheritDoc} */
@Override public Set<Cache.Entry<K, V>> entrySet() {
- return entrySet((CacheEntryPredicate[]) null);
+ return entrySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@@ -919,12 +921,12 @@
/** {@inheritDoc} */
@Override public Set<K> primaryKeySet() {
- return primaryKeySet((CacheEntryPredicate[]) null);
+ return primaryKeySet((CacheEntryPredicate[])null);
}
/** {@inheritDoc} */
@Override public Collection<V> values() {
- return values((CacheEntryPredicate[]) null);
+ return values((CacheEntryPredicate[])null);
}
/**
@@ -1210,22 +1212,57 @@
@Override public V getForcePrimary(K key) throws IgniteCheckedException {
String taskName = ctx.kernalContext().job().currentTaskName();
- return getAllAsync(F.asList(key), /*force primary*/true, /*skip tx*/false, null, null, taskName, true, false)
- .get().get(key);
+ return getAllAsync(
+ F.asList(key),
+ /*force primary*/true,
+ /*skip tx*/false,
+ /*cached entry*/null,
+ /*subject id*/null,
+ taskName,
+ /*deserialize cache objects*/true,
+ /*skip values*/false,
+ /*can remap*/true
+ ).get().get(key);
}
/** {@inheritDoc} */
@Override public IgniteInternalFuture<V> getForcePrimaryAsync(final K key) {
String taskName = ctx.kernalContext().job().currentTaskName();
- return getAllAsync(Collections.singletonList(key), /*force primary*/true, /*skip tx*/false, null, null,
- taskName, true, false).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
- @Override public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
+ return getAllAsync(
+ Collections.singletonList(key),
+ /*force primary*/true,
+ /*skip tx*/false,
+ null,
+ null,
+ taskName,
+ true,
+ false,
+ /*can remap*/true
+ ).chain(new CX1<IgniteInternalFuture<Map<K, V>>, V>() {
+ @Override
+ public V applyx(IgniteInternalFuture<Map<K, V>> e) throws IgniteCheckedException {
return e.get().get(key);
}
});
}
+ public V getTopologySafe(K key) throws IgniteCheckedException {
+ String taskName = ctx.kernalContext().job().currentTaskName();
+
+ return getAllAsync(
+ F.asList(key),
+ /*force primary*/false,
+ /*skip tx*/false,
+ /*cached entry*/null,
+ /*subject id*/null,
+ taskName,
+ /*deserialize cache objects*/true,
+ /*skip values*/false,
+ /*can remap*/false
+ ).get().get(key);
+ }
+
/** {@inheritDoc} */
@Nullable @Override public Map<K, V> getAllOutTx(Set<? extends K> keys) throws IgniteCheckedException {
return getAllOutTxAsync(keys).get();
@@ -1242,7 +1279,8 @@
null,
taskName,
!ctx.keepPortable(),
- false);
+ /*skip values*/false,
+ /*can remap*/true);
}
/**
@@ -1582,7 +1620,8 @@
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -1597,7 +1636,8 @@
deserializePortable,
forcePrimary,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
- skipVals);
+ skipVals,
+ canRemap);
}
/**
@@ -1623,7 +1663,8 @@
final boolean deserializePortable,
final boolean forcePrimary,
@Nullable IgniteCacheExpiryPolicy expiry,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -1638,7 +1679,8 @@
deserializePortable,
expiry,
skipVals,
- false);
+ false,
+ canRemap);
}
/**
@@ -1661,7 +1703,8 @@
final boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiry,
final boolean skipVals,
- final boolean keepCacheObjects
+ final boolean keepCacheObjects,
+ boolean canRemap
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K1, V1>emptyMap());
@@ -1684,7 +1727,7 @@
assert keys != null;
final AffinityTopologyVersion topVer = tx == null
- ? ctx.affinity().affinityTopologyVersion()
+ ? (canRemap ? ctx.affinity().affinityTopologyVersion(): ctx.shared().exchange().readyAffinityVersion())
: tx.topologyVersion();
final Map<K1, V1> map = new GridLeanMap<>(keys.size());
@@ -4461,7 +4504,8 @@
null,
taskName,
deserializePortable,
- false);
+ false,
+ /*can remap*/true);
}
/**
@@ -4682,7 +4726,7 @@
}
/**
- * @param tx Transaction.
+ *
*/
public void execute() {
tx = ctx.tm().newTx(
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
index adea9e0..a7b3b1e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtCacheAdapter.java
@@ -527,7 +527,8 @@
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
CacheOperationContext opCtx = ctx.operationContextPerCall();
@@ -540,7 +541,8 @@
deserializePortable,
forcePrimary,
null,
- skipVals);
+ skipVals,
+ canRemap);
}
/**
@@ -558,7 +560,8 @@
@Nullable UUID subjId,
String taskName,
@Nullable IgniteCacheExpiryPolicy expiry,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
return getAllAsync0(keys,
readThrough,
@@ -568,7 +571,8 @@
false,
expiry,
skipVals,
- /*keep cache objects*/true);
+ /*keep cache objects*/true,
+ canRemap);
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
index 742fbfe..9005541 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtGetFuture.java
@@ -349,12 +349,14 @@
}
else {
if (tx == null) {
- fut = cache().getDhtAllAsync(keys.keySet(),
+ fut = cache().getDhtAllAsync(
+ keys.keySet(),
readThrough,
subjId,
taskName,
expiryPlc,
- skipVals);
+ skipVals,
+ /*can remap*/true);
}
else {
fut = tx.getAllAsync(cctx,
@@ -387,12 +389,14 @@
}
else {
if (tx == null) {
- return cache().getDhtAllAsync(keys.keySet(),
+ return cache().getDhtAllAsync(
+ keys.keySet(),
readThrough,
subjId,
taskName,
expiryPlc,
- skipVals);
+ skipVals,
+ /*can remap*/true);
}
else {
return tx.getAllAsync(cctx,
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
index 79d5e75..a85962f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridPartitionedGetFuture.java
@@ -61,7 +61,7 @@
DFLT_MAX_REMAP_CNT);
/** Context. */
- private GridCacheContext<K, V> cctx;
+ private final GridCacheContext<K, V> cctx;
/** Keys. */
private Collection<KeyCacheObject> keys;
@@ -105,6 +105,9 @@
/** Skip values flag. */
private boolean skipVals;
+ /** Flag indicating whether future can be remapped on a newer topology version. */
+ private final boolean canRemap;
+
/**
* @param cctx Context.
* @param keys Keys.
@@ -130,7 +133,8 @@
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
@@ -147,6 +151,7 @@
this.taskName = taskName;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
+ this.canRemap = canRemap;
futId = IgniteUuid.randomUuid();
@@ -160,7 +165,8 @@
* Initializes future.
*/
public void init() {
- AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer : cctx.affinity().affinityTopologyVersion();
+ AffinityTopologyVersion topVer = this.topVer.topologyVersion() > 0 ? this.topVer :
+ canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion();
map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
@@ -334,7 +340,7 @@
remapKeys.add(key);
}
- AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+ AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
"not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -461,7 +467,7 @@
}
}
- ClusterNode node = cctx.affinity().primary(key, topVer);
+ ClusterNode node = affinityNode(key, topVer);
if (node == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
@@ -522,6 +528,28 @@
}
/**
+ * Finds affinity node to send get request to.
+ *
+ * @param key Key to get.
+ * @param topVer Topology version.
+ * @return Affinity node from which the key will be requested.
+ */
+ private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ if (!canRemap) {
+ List<ClusterNode> nodes = cctx.affinity().nodes(key, topVer);
+
+ for (ClusterNode node : nodes) {
+ if (cctx.discovery().alive(node))
+ return node;
+ }
+
+ return null;
+ }
+ else
+ return cctx.affinity().primary(key, topVer);
+ }
+
+ /**
* @param infos Entry infos.
* @return Result map.
*/
@@ -557,14 +585,14 @@
private final IgniteUuid futId = IgniteUuid.randomUuid();
/** Node ID. */
- private ClusterNode node;
+ private final ClusterNode node;
/** Keys. */
@GridToStringInclude
- private LinkedHashMap<KeyCacheObject, Boolean> keys;
+ private final LinkedHashMap<KeyCacheObject, Boolean> keys;
/** Topology version on which this future was mapped. */
- private AffinityTopologyVersion topVer;
+ private final AffinityTopologyVersion topVer;
/** {@code True} if remapped after node left. */
private boolean remapped;
@@ -625,30 +653,38 @@
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
- final AffinityTopologyVersion updTopVer =
- new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+ // Try getting from existing nodes.
+ if (!canRemap) {
+ map(keys.keySet(), F.t(node, keys), topVer);
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
+ onDone(Collections.<K, V>emptyMap());
+ }
+ else {
+ final AffinityTopologyVersion updTopVer =
+ new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
- cctx.affinity().affinityReadyFuture(updTopVer).listen(
- new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+ final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+ cctx.kernalContext().config().getNetworkTimeout(),
+ updTopVer,
+ e);
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ cctx.affinity().affinityReadyFuture(updTopVer).listen(
+ new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ if (timeout.finish()) {
+ cctx.kernalContext().timeout().removeTimeoutObject(timeout);
- onDone(Collections.<K, V>emptyMap());
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
}
}
- }
- );
+ );
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
index 96e6edc..5b82162 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/atomic/GridDhtAtomicCache.java
@@ -248,7 +248,8 @@
@Nullable UUID subjId,
final String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ final boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -278,7 +279,8 @@
deserializePortable,
expiryPlc,
skipVals,
- skipStore);
+ skipStore,
+ canRemap);
}
});
}
@@ -870,8 +872,11 @@
boolean deserializePortable,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVals,
- boolean skipStore) {
- AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
+ boolean skipStore,
+ boolean canRemap
+ ) {
+ AffinityTopologyVersion topVer = canRemap ? ctx.affinity().affinityTopologyVersion() :
+ ctx.shared().exchange().readyAffinityVersion();
final IgniteCacheExpiryPolicy expiry = skipVals ? null : expiryPolicy(expiryPlc);
@@ -971,7 +976,8 @@
taskName,
deserializePortable,
expiry,
- skipVals);
+ skipVals,
+ canRemap);
fut.init();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
index 221b230..eb7c78f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/colocated/GridDhtColocatedCache.java
@@ -155,7 +155,8 @@
@Nullable UUID subjId,
String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -183,7 +184,9 @@
});
}
- AffinityTopologyVersion topVer = tx == null ? ctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+ AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ? ctx.affinity().affinityTopologyVersion() : ctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
subjId = ctx.subjectIdPerCall(subjId, opCtx);
@@ -197,7 +200,8 @@
taskName,
deserializePortable,
skipVals ? null : expiryPolicy(opCtx != null ? opCtx.expiry() : null),
- skipVals);
+ skipVals,
+ canRemap);
}
/** {@inheritDoc} */
@@ -226,7 +230,8 @@
* @param skipVals Skip values flag.
* @return Loaded values.
*/
- public IgniteInternalFuture<Map<K, V>> loadAsync(@Nullable Collection<KeyCacheObject> keys,
+ public IgniteInternalFuture<Map<K, V>> loadAsync(
+ @Nullable Collection<KeyCacheObject> keys,
boolean readThrough,
boolean reload,
boolean forcePrimary,
@@ -235,7 +240,8 @@
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
if (keys == null || keys.isEmpty())
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -340,7 +346,8 @@
taskName,
deserializePortable,
expiryPlc,
- skipVals);
+ skipVals,
+ canRemap);
fut.init();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
index 041f83a..2bf5365 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearAtomicCache.java
@@ -364,7 +364,8 @@
@Nullable UUID subjId,
String taskName,
boolean deserializePortable,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -387,7 +388,8 @@
deserializePortable,
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
- opCtx != null && opCtx.skipStore());
+ opCtx != null && opCtx.skipStore(),
+ canRemap);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
index 351d6cd..ba0692c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheAdapter.java
@@ -195,13 +195,14 @@
return (IgniteInternalFuture)loadAsync(tx,
keys,
reload,
- false,
+ /*force primary*/false,
subjId,
taskName,
- true,
- null,
+ /*deserialize portable*/true,
+ /*expiry policy*/null,
skipVals,
- /*skip store*/false);
+ /*skip store*/false,
+ /*can remap*/true);
}
/**
@@ -226,7 +227,8 @@
boolean deserializePortable,
@Nullable ExpiryPolicy expiryPlc,
boolean skipVal,
- boolean skipStore
+ boolean skipStore,
+ boolean canRemap
) {
if (F.isEmpty(keys))
return new GridFinishedFuture<>(Collections.<K, V>emptyMap());
@@ -245,7 +247,8 @@
taskName,
deserializePortable,
expiry,
- skipVal);
+ skipVal,
+ canRemap);
// init() will register future for responses if future has remote mappings.
fut.init();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
index 194c68a..6f4f15e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearCacheEntry.java
@@ -333,7 +333,9 @@
true,
null,
false,
- /*skip store*/false).get().get(keyValue(false));
+ /*skip store*/false,
+ /*can remap*/true
+ ).get().get(keyValue(false));
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
index d109d2b..ca460c5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearGetFuture.java
@@ -62,7 +62,7 @@
private static final int MAX_REMAP_CNT = getInteger(IGNITE_NEAR_GET_MAX_REMAPS, DFLT_MAX_REMAP_CNT);
/** Context. */
- private GridCacheContext<K, V> cctx;
+ private final GridCacheContext<K, V> cctx;
/** Keys. */
private Collection<KeyCacheObject> keys;
@@ -106,6 +106,9 @@
/** Expiry policy. */
private IgniteCacheExpiryPolicy expiryPlc;
+ /** Flag indicating that get should be done on a locked topology version. */
+ private final boolean canRemap;
+
/**
* @param cctx Context.
* @param keys Keys.
@@ -131,7 +134,8 @@
String taskName,
boolean deserializePortable,
@Nullable IgniteCacheExpiryPolicy expiryPlc,
- boolean skipVals
+ boolean skipVals,
+ boolean canRemap
) {
super(cctx.kernalContext(), CU.<K, V>mapsReducer(keys.size()));
@@ -148,6 +152,7 @@
this.deserializePortable = deserializePortable;
this.expiryPlc = expiryPlc;
this.skipVals = skipVals;
+ this.canRemap = canRemap;
futId = IgniteUuid.randomUuid();
@@ -161,7 +166,9 @@
* Initializes future.
*/
public void init() {
- AffinityTopologyVersion topVer = tx == null ? cctx.affinity().affinityTopologyVersion() : tx.topologyVersion();
+ AffinityTopologyVersion topVer = tx == null ?
+ (canRemap ? cctx.affinity().affinityTopologyVersion() : cctx.shared().exchange().readyAffinityVersion()) :
+ tx.topologyVersion();
map(keys, Collections.<ClusterNode, LinkedHashMap<KeyCacheObject, Boolean>>emptyMap(), topVer);
@@ -327,7 +334,7 @@
remapKeys.add(key);
}
- AffinityTopologyVersion updTopVer = new AffinityTopologyVersion(cctx.discovery().topologyVersion());
+ AffinityTopologyVersion updTopVer = cctx.discovery().topologyVersionEx();
assert updTopVer.compareTo(topVer) > 0 : "Got invalid partitions for local node but topology version did " +
"not change [topVer=" + topVer + ", updTopVer=" + updTopVer +
@@ -435,7 +442,7 @@
taskName,
expiryPlc);
- ClusterNode primary = null;
+ ClusterNode affNode = null;
if (v == null && allowLocRead && cctx.affinityNode()) {
GridDhtCacheAdapter<K, V> dht = cache().dht();
@@ -472,16 +479,16 @@
near.metrics0().onRead(true);
}
else {
- primary = cctx.affinity().primary(key, topVer);
+ affNode = affinityNode(key, topVer);
- if (primary == null) {
+ if (affNode == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
return savedVers;
}
- if (!primary.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
+ if (!affNode.isLocal() && cctx.cache().configuration().isStatisticsEnabled() && !skipVals)
near.metrics0().onRead(false);
}
}
@@ -507,10 +514,10 @@
add(new GridFinishedFuture<>(Collections.singletonMap(key0, val0)));
}
else {
- if (primary == null) {
- primary = cctx.affinity().primary(key, topVer);
+ if (affNode == null) {
+ affNode = affinityNode(key, topVer);
- if (primary == null) {
+ if (affNode == null) {
onDone(new ClusterTopologyServerNotFoundException("Failed to map keys for cache " +
"(all partition nodes left the grid)."));
@@ -527,13 +534,13 @@
savedVers.put(key, nearEntry == null ? null : nearEntry.dhtVersion());
- LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(primary);
+ LinkedHashMap<KeyCacheObject, Boolean> keys = mapped.get(affNode);
if (keys != null && keys.containsKey(key)) {
if (remapCnt.incrementAndGet() > MAX_REMAP_CNT) {
onDone(new ClusterTopologyCheckedException("Failed to remap key to a new node after " +
MAX_REMAP_CNT + " attempts (key got remapped to the same node) " +
- "[key=" + key + ", node=" + U.toShortString(primary) + ", mappings=" + mapped + ']'));
+ "[key=" + key + ", node=" + U.toShortString(affNode) + ", mappings=" + mapped + ']'));
return savedVers;
}
@@ -545,10 +552,10 @@
if (!addRdr && tx.readCommitted() && !tx.writeSet().contains(cctx.txKey(key)))
addRdr = true;
- LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(primary);
+ LinkedHashMap<KeyCacheObject, Boolean> old = mappings.get(affNode);
if (old == null)
- mappings.put(primary, old = new LinkedHashMap<>(3, 1f));
+ mappings.put(affNode, old = new LinkedHashMap<>(3, 1f));
old.put(key, addRdr);
}
@@ -579,6 +586,28 @@
}
/**
+ * Affinity node to send get request to.
+ *
+ * @param key Key to get.
+ * @param topVer Topology version.
+ * @return Affinity node to get key from.
+ */
+ private ClusterNode affinityNode(KeyCacheObject key, AffinityTopologyVersion topVer) {
+ if (!canRemap) {
+ List<ClusterNode> affNodes = cctx.affinity().nodes(key, topVer);
+
+ for (ClusterNode node : affNodes) {
+ if (cctx.discovery().alive(node))
+ return node;
+ }
+
+ return null;
+ }
+ else
+ return cctx.affinity().primary(key, topVer);
+ }
+
+ /**
* @return Near cache.
*/
private GridNearCacheAdapter<K, V> cache() {
@@ -752,30 +781,38 @@
if (log.isDebugEnabled())
log.debug("Remote node left grid while sending or waiting for reply (will retry): " + this);
- final AffinityTopologyVersion updTopVer =
- new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
+ // Try getting value from alive nodes.
+ if (!canRemap) {
+ // Remap
+ map(keys.keySet(), F.t(node, keys), topVer);
- final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
- cctx.kernalContext().config().getNetworkTimeout(),
- updTopVer,
- e);
+ onDone(Collections.<K, V>emptyMap());
+ } else {
+ final AffinityTopologyVersion updTopVer =
+ new AffinityTopologyVersion(Math.max(topVer.topologyVersion() + 1, cctx.discovery().topologyVersion()));
- cctx.affinity().affinityReadyFuture(updTopVer).listen(
- new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
- @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
- if (timeout.finish()) {
- cctx.kernalContext().timeout().removeTimeoutObject(timeout);
+ final GridFutureRemapTimeoutObject timeout = new GridFutureRemapTimeoutObject(this,
+ cctx.kernalContext().config().getNetworkTimeout(),
+ updTopVer,
+ e);
- // Remap.
- map(keys.keySet(), F.t(node, keys), updTopVer);
+ cctx.affinity().affinityReadyFuture(updTopVer).listen(
+ new CI1<IgniteInternalFuture<AffinityTopologyVersion>>() {
+ @Override public void apply(IgniteInternalFuture<AffinityTopologyVersion> fut) {
+ if (timeout.finish()) {
+ cctx.kernalContext().timeout().removeTimeoutObject(timeout);
- onDone(Collections.<K, V>emptyMap());
+ // Remap.
+ map(keys.keySet(), F.t(node, keys), updTopVer);
+
+ onDone(Collections.<K, V>emptyMap());
+ }
}
}
- }
- );
+ );
- cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ cctx.kernalContext().timeout().addTimeoutObject(timeout);
+ }
}
/**
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
index 696acfb..a1f1383 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTransactionalCache.java
@@ -101,7 +101,8 @@
@Nullable UUID subjId,
String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
ctx.checkSecurity(SecurityPermission.CACHE_READ);
@@ -142,7 +143,8 @@
deserializePortable,
skipVals ? null : opCtx != null ? opCtx.expiry() : null,
skipVals,
- skipStore);
+ skipStore,
+ canRemap);
}
/**
@@ -172,7 +174,8 @@
tx.resolveTaskName(),
deserializePortable,
expiryPlc,
- skipVals);
+ skipVals,
+ /*can remap*/true);
// init() will register future for responses if it has remote mappings.
fut.init();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
index cb391e4..5ff7345 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/near/GridNearTxLocal.java
@@ -313,7 +313,8 @@
});
}
else if (cacheCtx.isColocated()) {
- return cacheCtx.colocated().loadAsync(keys,
+ return cacheCtx.colocated().loadAsync(
+ keys,
readThrough,
/*reload*/false,
/*force primary*/false,
@@ -322,7 +323,9 @@
resolveTaskName(),
deserializePortable,
accessPolicy(cacheCtx, keys),
- skipVals).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
+ skipVals,
+ /*can remap*/true
+ ).chain(new C1<IgniteInternalFuture<Map<Object, Object>>, Boolean>() {
@Override public Boolean apply(IgniteInternalFuture<Map<Object, Object>> f) {
try {
Map<Object, Object> map = f.get();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
index bcbdec4..c648f11 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/local/atomic/GridLocalAtomicCache.java
@@ -458,7 +458,8 @@
@Nullable UUID subjId,
final String taskName,
final boolean deserializePortable,
- final boolean skipVals
+ final boolean skipVals,
+ boolean canRemap
) {
A.notNull(keys, "keys");
@@ -570,8 +571,18 @@
if (success || !storeEnabled)
return vals;
- return getAllAsync(keys, opCtx == null || !opCtx.skipStore(), null, false, subjId, taskName, deserializePortable,
- false, expiry, skipVals).get();
+ return getAllAsync(
+ keys,
+ opCtx == null || !opCtx.skipStore(),
+ null,
+ false,
+ subjId,
+ taskName,
+ deserializePortable,
+ /*force primary*/false,
+ expiry,
+ skipVals,
+ /*can remap*/true).get();
}
/** {@inheritDoc} */
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
new file mode 100644
index 0000000..ef031f6
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/IgniteCacheTopologySafeGetSelfTest.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.cache.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.testframework.*;
+import org.apache.ignite.testframework.junits.common.*;
+import org.apache.ignite.transactions.*;
+
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.cache.CacheAtomicityMode.*;
+import static org.apache.ignite.transactions.TransactionConcurrency.*;
+import static org.apache.ignite.transactions.TransactionIsolation.*;
+
+/**
+ *
+ */
+public class IgniteCacheTopologySafeGetSelfTest extends GridCommonAbstractTest {
+ /** Number of initial grids. */
+ public static final int GRID_CNT = 4;
+
+ /** */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** TX commit latch. */
+ private CountDownLatch releaseLatch;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setCacheConfiguration(
+ cacheCfg("tx", TRANSACTIONAL, false),
+ cacheCfg("atomic", ATOMIC, false),
+ cacheCfg("tx_near", TRANSACTIONAL, true),
+ cacheCfg("atomic_near", ATOMIC, true));
+
+ TcpDiscoverySpi disco = new TcpDiscoverySpi();
+
+ disco.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(disco);
+
+ return cfg;
+ }
+
+ /**
+ * @param name Cache name.
+ * @param cacheMode Cache mode.
+ * @param near Near enabled flag.
+ * @return Cache configuration.
+ */
+ @SuppressWarnings("unchecked")
+ private CacheConfiguration cacheCfg(String name, CacheAtomicityMode cacheMode, boolean near) {
+ CacheConfiguration cfg = new CacheConfiguration(name);
+
+ cfg.setAtomicityMode(cacheMode);
+ cfg.setBackups(1);
+
+ if (near)
+ cfg.setNearConfiguration(new NearCacheConfiguration());
+ else
+ cfg.setNearConfiguration(null);
+
+ return cfg;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetTopologySafeNodeJoin() throws Exception {
+ checkGetTopologySafeNodeJoin(false);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testGetTopologySafeNodeJoinPrimaryLeave() throws Exception {
+ checkGetTopologySafeNodeJoin(true);
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void checkGetTopologySafeNodeJoin(boolean failPrimary) throws Exception {
+ startGrids(GRID_CNT);
+
+ awaitPartitionMapExchange();
+
+ try {
+ ClusterNode targetNode = ignite(1).cluster().localNode();
+
+ info(">>> Target node: " + targetNode.id());
+
+ // Populate caches with a key that does not belong to ignite(0).
+ int key = -1;
+ for (int i = 0; i < 100; i++) {
+ Collection<ClusterNode> nodes = ignite(0).affinity("tx").mapKeyToPrimaryAndBackups(i);
+ ClusterNode primaryNode = F.first(nodes);
+
+ if (!nodes.contains(ignite(0).cluster().localNode()) && primaryNode.id().equals(targetNode.id())) {
+ ignite(1).cache("tx").put(i, i);
+ ignite(1).cache("atomic").put(i, i);
+ ignite(1).cache("tx_near").put(i, i);
+ ignite(1).cache("atomic_near").put(i, i);
+
+ key = i;
+
+
+ break;
+ }
+ }
+
+ assertTrue(key != -1);
+
+ IgniteInternalFuture<?> txFut = startBlockingTxAsync();
+
+ IgniteInternalFuture<?> nodeFut = startNodeAsync();
+
+ if (failPrimary)
+ stopGrid(1);
+
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx").getTopologySafe(key));
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic").getTopologySafe(key));
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("tx_near").getTopologySafe(key));
+ assertEquals(key, ((IgniteKernal)ignite(0)).internalCache("atomic_near").getTopologySafe(key));
+
+ releaseTx();
+
+ txFut.get();
+ nodeFut.get();
+ }
+ finally {
+ stopAllGrids();
+ }
+ }
+
+ private IgniteInternalFuture<?> startNodeAsync() throws Exception {
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override
+ public Object call() throws Exception {
+ startGrid(GRID_CNT);
+
+ return null;
+ }
+ });
+
+ U.sleep(1000);
+
+ return fut;
+ }
+
+ /**
+ * @return TX release future.
+ * @throws Exception If failed.
+ */
+ private IgniteInternalFuture<?> startBlockingTxAsync() throws Exception {
+ final CountDownLatch lockLatch = new CountDownLatch(1);
+
+ IgniteInternalFuture<Object> fut = GridTestUtils.runAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ try (Transaction ignore = ignite(0).transactions().txStart(PESSIMISTIC, REPEATABLE_READ)) {
+ for (int i = 0; i < 30; i++)
+ ignite(0).cache("tx").get("value-" + i);
+
+ releaseLatch = new CountDownLatch(1);
+
+ lockLatch.countDown();
+
+ releaseLatch.await();
+ }
+
+ return null;
+ }
+ });
+
+ lockLatch.await();
+
+ return fut;
+ }
+
+ /**
+ *
+ */
+ private void releaseTx() {
+ assert releaseLatch != null;
+
+ releaseLatch.countDown();
+ }
+}
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
index 9c4446d..c2fc46c 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/dht/IgniteCachePutRetryTransactionalSelfTest.java
@@ -120,6 +120,8 @@
stopGrid(stopIdx);
+ U.sleep(500);
+
startGrid(stopIdx);
}
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
index af2b85c..b64471b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheFailoverTestSuite.java
@@ -80,6 +80,8 @@
suite.addTestSuite(IgniteCacheSizeFailoverTest.class);
+ suite.addTestSuite(IgniteCacheTopologySafeGetSelfTest.class);
+
return suite;
}
}