Merge remote-tracking branch 'remotes/community/ignite-1.6.11' into ignite-4154-opt2
diff --git a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
index ec12973..f32aadd 100644
--- a/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
+++ b/modules/core/src/main/java/org/apache/ignite/cache/affinity/rendezvous/RendezvousAffinityFunction.java
@@ -17,7 +17,6 @@
package org.apache.ignite.cache.affinity.rendezvous;
-import java.io.ByteArrayOutputStream;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
@@ -355,29 +354,43 @@
/**
* Returns collection of nodes (primary first) for specified partition.
*/
- public List<ClusterNode> assignPartition(int part, List<ClusterNode> nodes, int backups,
+ public List<ClusterNode> assignPartition(MessageDigest d,
+ int part,
+ List<ClusterNode> nodes,
+ Map<ClusterNode, byte[]> nodesHash,
+ int backups,
@Nullable Map<UUID, Collection<ClusterNode>> neighborhoodCache) {
if (nodes.size() <= 1)
return nodes;
- List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>();
+ if (d == null)
+ d = digest.get();
- MessageDigest d = digest.get();
+ List<IgniteBiTuple<Long, ClusterNode>> lst = new ArrayList<>(nodes.size());
- for (ClusterNode node : nodes) {
- Object nodeHash = resolveNodeHash(node);
+ try {
+ for (int i = 0; i < nodes.size(); i++) {
+ ClusterNode node = nodes.get(i);
- try {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
+ Object nodeHash = resolveNodeHash(node);
- byte[] nodeHashBytes = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+ byte[] nodeHashBytes = nodesHash.get(node);
- out.write(U.intToBytes(part), 0, 4); // Avoid IOException.
- out.write(nodeHashBytes, 0, nodeHashBytes.length); // Avoid IOException.
+ if (nodeHashBytes == null) {
+ byte[] nodeHashBytes0 = U.marshal(ignite.configuration().getMarshaller(), nodeHash);
+
+ nodeHashBytes = new byte[nodeHashBytes0.length + 4];
+
+ System.arraycopy(nodeHashBytes0, 0, nodeHashBytes, 4, nodeHashBytes0.length);
+
+ nodesHash.put(node, nodeHashBytes0);
+ }
+
+ U.intToBytes(part, nodeHashBytes, 0);
d.reset();
- byte[] bytes = d.digest(out.toByteArray());
+ byte[] bytes = d.digest(nodeHashBytes);
long hash =
(bytes[0] & 0xFFL)
@@ -391,9 +404,9 @@
lst.add(F.t(hash, node));
}
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
}
Collections.sort(lst, COMPARATOR);
@@ -474,8 +487,18 @@
Map<UUID, Collection<ClusterNode>> neighborhoodCache = exclNeighbors ?
GridCacheUtils.neighbors(affCtx.currentTopologySnapshot()) : null;
+ MessageDigest d = digest.get();
+
+ List<ClusterNode> nodes = affCtx.currentTopologySnapshot();
+
+ Map<ClusterNode, byte[]> nodesHash = U.newHashMap(nodes.size());
+
for (int i = 0; i < parts; i++) {
- List<ClusterNode> partAssignment = assignPartition(i, affCtx.currentTopologySnapshot(), affCtx.backups(),
+ List<ClusterNode> partAssignment = assignPartition(d,
+ i,
+ nodes,
+ nodesHash,
+ affCtx.backups(),
neighborhoodCache);
assignments.add(partAssignment);
diff --git a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
index f65bf52..354ea10 100644
--- a/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
+++ b/modules/core/src/main/java/org/apache/ignite/configuration/CacheConfiguration.java
@@ -2402,7 +2402,7 @@
/**
* Filter that accepts all nodes.
*/
- public static class IgniteAllNodesPredicate implements IgnitePredicate<ClusterNode> {
+ public static class IgniteAllNodesPredicate implements IgnitePredicate<ClusterNode> {
/** */
private static final long serialVersionUID = 0L;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 345cfb4..640b7cf 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -1621,14 +1621,12 @@
}
/**
- * Gets cache remote nodes for cache with given name.
- *
- * @param cacheName Cache name.
* @param topVer Topology version.
- * @return Collection of cache nodes.
+ * @param node Node.
+ * @return DHT caches started on given node.
*/
- public Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, AffinityTopologyVersion topVer) {
- return resolveDiscoCache(cacheName, topVer).remoteCacheNodes(cacheName, topVer.topologyVersion());
+ public Collection<String> nodeCaches(AffinityTopologyVersion topVer, ClusterNode node) {
+ return resolveDiscoCache(null, topVer).nodeCaches(node);
}
/**
@@ -2742,6 +2740,17 @@
this.nodeMap = nodeMap;
}
+ Collection<String> nodeCaches(ClusterNode node) {
+ List<String> res = new ArrayList<>();
+
+ for (Map.Entry<String, Collection<ClusterNode>> e : affCacheNodes.entrySet()) {
+ if (F.contains(e.getValue(), node))
+ res.add(e.getKey());
+ }
+
+ return res;
+ }
+
/**
* Adds node to map.
*
@@ -2820,17 +2829,6 @@
}
/**
- * Gets all remote nodes that have cache with given name.
- *
- * @param cacheName Cache name.
- * @param topVer Topology version.
- * @return Collection of nodes.
- */
- Collection<ClusterNode> remoteCacheNodes(@Nullable String cacheName, final long topVer) {
- return filter(topVer, rmtCacheNodes.get(cacheName));
- }
-
- /**
* Gets all remote nodes that have at least one cache configured.
*
* @param topVer Topology version.
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
new file mode 100644
index 0000000..45758b0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityCalculateCache.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class AffinityCalculateCache {
+ /** */
+ private final Map<Object, List<List<ClusterNode>>> assignCache = new HashMap<>();
+
+ /** */
+ private final AffinityTopologyVersion topVer;
+
+ /** */
+ private final DiscoveryEvent discoEvt;
+
+ /** */
+ private Map<Integer, List<List<ClusterNode>>> grpAssign;
+
+ /** */
+ private int calcCnt;
+
+ /** */
+ private long lateAffTime;
+
+ public AffinityCalculateCache(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+ this.topVer = topVer;
+ this.discoEvt = discoEvt;
+ }
+
+ public int calculateCount() {
+ return calcCnt;
+ }
+
+ public void addLateAffinityCalculateTime(long time) {
+ lateAffTime += time;
+ }
+
+ public long lateAffinityCalculateTime() {
+ return lateAffTime;
+ }
+
+ public List<List<ClusterNode>> assignPartitions(AffinityFunction aff,
+ int backups,
+ List<ClusterNode> nodes,
+ List<List<ClusterNode>> prevAssignment,
+ @Nullable Integer affGrp,
+ Object affKey) {
+ if (affGrp != null && grpAssign != null) {
+ List<List<ClusterNode>> calcAssign = grpAssign.get(affGrp);
+
+ if (calcAssign != null) {
+ assert calcAssign.size() == aff.partitions() : calcAssign.size();
+
+ return calcAssign;
+ }
+ }
+
+ calcCnt++;
+
+ AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(nodes,
+ prevAssignment,
+ discoEvt,
+ topVer,
+ backups);
+
+ List<List<ClusterNode>> assign = aff.assignPartitions(ctx);
+
+ List<List<ClusterNode>> assign0 = assignCache.get(affKey);
+
+ if (assign0 != null && assign0.equals(assign))
+ assign = assign0;
+ else
+ assignCache.put(affKey, assign);
+
+ if (affGrp != null) {
+ if (grpAssign == null)
+ grpAssign = new HashMap<>();
+
+ grpAssign.put(affGrp, assign);
+ }
+
+ assert assign.size() == aff.partitions() : assign.size();
+
+ return assign;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
new file mode 100644
index 0000000..c9ebbbc
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/AffinityConfiguration.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.affinity;
+
+import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.lang.IgnitePredicate;
+
+/**
+ *
+ */
+public class AffinityConfiguration {
+ /** */
+ private final AffinityFunction aff;
+
+ /** */
+ private final IgnitePredicate<ClusterNode> nodeFilter;
+
+ /** */
+ private final int backups;
+
+ /**
+ * @param aff
+ * @param nodeFilter
+ * @param backups
+ */
+ public AffinityConfiguration(AffinityFunction aff, IgnitePredicate<ClusterNode> nodeFilter, int backups) {
+ this.aff = aff;
+ this.nodeFilter = nodeFilter;
+ this.backups = backups;
+ }
+
+ public AffinityFunction affinityFunction() {
+ return aff;
+ }
+
+ public IgnitePredicate<ClusterNode> nodeFilter() {
+ return nodeFilter;
+ }
+
+ public int backups() {
+ return backups;
+ }
+}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
index a388c7a..c77124b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityAssignmentCache.java
@@ -34,6 +34,7 @@
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cache.affinity.AffinityCentralizedFunction;
import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityFunctionContext;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.internal.GridKernalContext;
@@ -110,35 +111,35 @@
/** */
private final Object similarAffKey;
+ /** */
+ private final Integer affGrp;
+
/**
* Constructs affinity cached calculations.
*
* @param ctx Kernal context.
* @param cacheName Cache name.
- * @param aff Affinity function.
- * @param nodeFilter Node filter.
- * @param backups Number of backups.
+ * @param affCfg Affinity configuration.
* @param locCache Local cache flag.
*/
@SuppressWarnings("unchecked")
public GridAffinityAssignmentCache(GridKernalContext ctx,
String cacheName,
- AffinityFunction aff,
- IgnitePredicate<ClusterNode> nodeFilter,
- int backups,
+ AffinityConfiguration affCfg,
boolean locCache)
{
assert ctx != null;
- assert aff != null;
- assert nodeFilter != null;
this.ctx = ctx;
- this.aff = aff;
- this.nodeFilter = nodeFilter;
this.cacheName = cacheName;
- this.backups = backups;
+ this.aff = affCfg.affinityFunction();
+ this.nodeFilter = affCfg.nodeFilter();
+ this.backups = affCfg.backups();
this.locCache = locCache;
+ assert aff != null;
+ assert nodeFilter != null;
+
cacheId = CU.cacheId(cacheName);
log = ctx.log(GridAffinityAssignmentCache.class);
@@ -148,8 +149,11 @@
head = new AtomicReference<>(new GridAffinityAssignment(AffinityTopologyVersion.NONE));
similarAffKey = ctx.affinity().similaryAffinityKey(aff, nodeFilter, backups, partsCnt);
-
assert similarAffKey != null;
+
+ affGrp = locCache ? null : ctx.cache().context().affinity().equalAffinityGroup(cacheId, affCfg);
+
+ log.info("Initialized cache affinity group [cache=" + cacheName + ", grp=" + affGrp + ']');
}
/**
@@ -205,6 +209,8 @@
* @param assignment Assignment.
*/
public void idealAssignment(List<List<ClusterNode>> assignment) {
+ assert assignment == null || assignment.size() == partsCnt : partsCnt;
+
this.idealAssignment = assignment;
}
@@ -255,7 +261,9 @@
* @return Affinity assignments.
*/
@SuppressWarnings("IfMayBeConditional")
- public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer, DiscoveryEvent discoEvt) {
+ public List<List<ClusterNode>> calculate(AffinityTopologyVersion topVer,
+ DiscoveryEvent discoEvt,
+ @Nullable AffinityCalculateCache cache) {
if (log.isDebugEnabled())
log.debug("Calculating affinity [topVer=" + topVer + ", locNodeId=" + ctx.localNodeId() +
", discoEvt=" + discoEvt + ']');
@@ -273,20 +281,28 @@
else
sorted = Collections.singletonList(ctx.discovery().localNode());
- List<List<ClusterNode>> assignment;
+ List<List<ClusterNode>> assignment = null;
if (prevAssignment != null && discoEvt != null) {
boolean affNode = CU.affinityNode(discoEvt.eventNode(), nodeFilter);
if (!affNode)
assignment = prevAssignment;
- else
- assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment,
- discoEvt, topVer, backups));
}
- else
- assignment = aff.assignPartitions(new GridAffinityFunctionContextImpl(sorted, prevAssignment, discoEvt,
- topVer, backups));
+
+ if (assignment == null) {
+ if (cache != null)
+ assignment = cache.assignPartitions(aff, backups, sorted, prevAssignment, affGrp, similarAffKey);
+ else {
+ AffinityFunctionContext ctx = new GridAffinityFunctionContextImpl(sorted,
+ prevAssignment,
+ discoEvt,
+ topVer,
+ backups);
+
+ assignment = aff.assignPartitions(ctx);
+ }
+ }
assert assignment != null;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
index 2890887..9727905 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheAffinitySharedManager.java
@@ -30,12 +30,18 @@
import java.util.concurrent.ConcurrentMap;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cache.affinity.AffinityFunction;
+import org.apache.ignite.cache.affinity.AffinityNodeAddressHashResolver;
+import org.apache.ignite.cache.affinity.AffinityNodeIdHashResolver;
+import org.apache.ignite.cache.affinity.fair.FairAffinityFunction;
+import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityCalculateCache;
+import org.apache.ignite.internal.processors.affinity.AffinityConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtAffinityAssignmentResponse;
@@ -97,6 +103,115 @@
private final ConcurrentMap<T2<Integer, AffinityTopologyVersion>, GridDhtAssignmentFetchFuture>
pendingAssignmentFetchFuts = new ConcurrentHashMap8<>();
+ /** */
+ private int eqAffGrp;
+
+ /** */
+ private final Map<Integer, EqualAffinityCacheGroup> eqAffCacheGroups = new HashMap<>();
+
+ /**
+ *
+ */
+ private class EqualAffinityCacheGroup {
+ /** */
+ private final Map<Integer, AffinityConfiguration> caches = new HashMap<>();
+
+ private EqualAffinityCacheGroup(Integer cacheId, AffinityConfiguration cfg) {
+ caches.put(cacheId, cfg);
+ }
+
+ void add(Integer cacheId, AffinityConfiguration cfg) {
+ caches.put(cacheId, cfg);
+ }
+
+ /**
+ * @param cfg Affinity configuration.
+ * @return {@code True} if cache configurations have exactly the same affinity configuration.
+ */
+ boolean equalAffinity(AffinityConfiguration cfg) {
+ assert !caches.isEmpty();
+
+ AffinityConfiguration cfg0 = F.firstValue(caches);
+
+ assert cfg0 != null;
+
+ if (cfg0.backups() != cfg.backups())
+ return false;
+
+ if (!cfg0.nodeFilter().equals(cfg.nodeFilter()))
+ return false;
+
+ if (cfg0.affinityFunction().getClass() != cfg.affinityFunction().getClass())
+ return false;
+
+ if (cfg0.affinityFunction() == cfg.affinityFunction())
+ return true;
+
+ if (cfg0.affinityFunction() instanceof RendezvousAffinityFunction) {
+ RendezvousAffinityFunction f1 = (RendezvousAffinityFunction)cfg0.affinityFunction();
+ RendezvousAffinityFunction f2 = (RendezvousAffinityFunction)cfg.affinityFunction();
+
+ if (f1.getHashIdResolver() != f2.getHashIdResolver()) {
+ if (f1.getHashIdResolver() == null || f2.getHashIdResolver() == null)
+ return false;
+
+ boolean eqRslvr = (f1.getHashIdResolver().getClass() == f2.getHashIdResolver().getClass()) &&
+ (f1.getHashIdResolver().getClass() == AffinityNodeAddressHashResolver.class ||
+ f1.getHashIdResolver().getClass() == AffinityNodeIdHashResolver.class);
+
+ if (!eqRslvr)
+ return false;
+ }
+
+ return f1.partitions() == f2.partitions() &&
+ f1.isExcludeNeighbors() == f2.isExcludeNeighbors() &&
+ f1.getBackupFilter() == f2.getBackupFilter() &&
+ f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter();
+ }
+ else if (cfg0.affinityFunction() instanceof FairAffinityFunction) {
+ FairAffinityFunction f1 = (FairAffinityFunction)cfg0.affinityFunction();
+ FairAffinityFunction f2 = (FairAffinityFunction)cfg.affinityFunction();
+
+ return f1.partitions() == f2.partitions() &&
+ f1.isExcludeNeighbors() == f2.isExcludeNeighbors() &&
+ f1.getBackupFilter() == f2.getBackupFilter() &&
+ f1.getAffinityBackupFilter() == f2.getAffinityBackupFilter();
+ }
+ else
+ return false;
+ }
+ }
+
+ @Nullable public Integer equalAffinityGroup(Integer cacheId, AffinityConfiguration cfg) {
+ if (!(cfg.affinityFunction().getClass() == RendezvousAffinityFunction.class ||
+ cfg.affinityFunction().getClass() == FairAffinityFunction.class))
+ return null;
+
+ synchronized (eqAffCacheGroups) {
+ for (Map.Entry<Integer, EqualAffinityCacheGroup> e : eqAffCacheGroups.entrySet()) {
+ EqualAffinityCacheGroup grp = e.getValue();
+
+ if (grp.caches.containsKey(cacheId)) {
+ assert grp.equalAffinity(cfg);
+
+ return e.getKey();
+ }
+
+ if (grp.equalAffinity(cfg)) {
+ grp.add(cacheId, cfg);
+
+ return e.getKey();
+ }
+ }
+
+ Integer grp = eqAffGrp++;
+
+ eqAffCacheGroups.put(grp, new EqualAffinityCacheGroup(cacheId, cfg));
+
+ return grp;
+ }
+ }
+
/** Discovery listener. */
private final GridLocalEventListener discoLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
@@ -372,6 +487,8 @@
Set<Integer> stoppedCaches = null;
+ AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
for (DynamicCacheChangeRequest req : reqs) {
if (!(req.clientStartOnly() || req.close()))
clientOnly = false;
@@ -394,7 +511,7 @@
req.clientStartOnly() && req.initiatingNodeId().equals(cctx.localNodeId());
if (clientCacheStarted)
- initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign);
+ initAffinity(cacheCtx.affinity().affinityCache(), fut, lateAffAssign, affCache);
else if (!req.clientStartOnly()) {
assert fut.topologyVersion().equals(cacheCtx.startTopologyVersion());
@@ -403,7 +520,8 @@
assert aff.lastVersion().equals(AffinityTopologyVersion.NONE) : aff.lastVersion();
List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(),
- fut.discoveryEvent());
+ fut.discoveryEvent(),
+ affCache);
aff.initialize(fut.topologyVersion(), assignment);
}
@@ -415,6 +533,21 @@
else if (req.stop() || req.close()) {
cctx.cache().blockGateway(req);
+ if (req.stop()) {
+ synchronized (eqAffCacheGroups) {
+ for (Iterator<EqualAffinityCacheGroup> it = eqAffCacheGroups.values().iterator(); it.hasNext();) {
+ EqualAffinityCacheGroup grp = it.next();
+
+ if (grp.caches.remove(cacheId) != null) {
+ if (grp.caches.isEmpty())
+ it.remove();
+
+ break;
+ }
+ }
+ }
+ }
+
if (crd) {
boolean rmvCache = false;
@@ -753,7 +886,9 @@
assert old == null : old;
- List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ List<List<ClusterNode>> newAff = cache.affinity().calculate(fut.topologyVersion(),
+ fut.discoveryEvent(),
+ null);
cache.affinity().initialize(fut.topologyVersion(), newAff);
}
@@ -785,13 +920,16 @@
}
if (crd && lateAffAssign) {
+ final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+ fut.discoveryEvent());
+
forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
@Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException {
CacheHolder cache = cache(fut, desc);
if (cache.affinity().lastVersion().equals(AffinityTopologyVersion.NONE)) {
List<List<ClusterNode>> assignment =
- cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
cache.affinity().initialize(fut.topologyVersion(), assignment);
}
@@ -799,10 +937,13 @@
});
}
else {
+ final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+ fut.discoveryEvent());
+
forAllCaches(false, new IgniteInClosureX<GridAffinityAssignmentCache>() {
@Override public void applyx(GridAffinityAssignmentCache aff) throws IgniteCheckedException {
if (aff.lastVersion().equals(AffinityTopologyVersion.NONE))
- initAffinity(aff, fut, false);
+ initAffinity(aff, fut, false, affCache);
}
});
}
@@ -814,10 +955,13 @@
* @param fetch Force fetch flag.
* @throws IgniteCheckedException If failed.
*/
- private void initAffinity(GridAffinityAssignmentCache aff, GridDhtPartitionsExchangeFuture fut, boolean fetch)
+ private void initAffinity(GridAffinityAssignmentCache aff,
+ GridDhtPartitionsExchangeFuture fut,
+ boolean fetch,
+ AffinityCalculateCache affCache)
throws IgniteCheckedException {
if (!fetch && canCalculateAffinity(aff, fut)) {
- List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+ List<List<ClusterNode>> assignment = aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
aff.initialize(fut.topologyVersion(), assignment);
}
@@ -872,13 +1016,18 @@
if (lateAffAssign) {
if (locJoin) {
if (crd) {
+ final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+ fut.discoveryEvent());
+
forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
@Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
AffinityTopologyVersion topVer = fut.topologyVersion();
CacheHolder cache = cache(fut, cacheDesc);
- List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer, fut.discoveryEvent());
+ List<List<ClusterNode>> newAff = cache.affinity().calculate(topVer,
+ fut.discoveryEvent(),
+ affCache);
cache.affinity().initialize(topVer, newAff);
}
@@ -933,10 +1082,14 @@
* @throws IgniteCheckedException If failed.
*/
private void fetchAffinityOnJoin(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
+ long start = System.currentTimeMillis();
+
AffinityTopologyVersion topVer = fut.topologyVersion();
List<GridDhtAssignmentFetchFuture> fetchFuts = new ArrayList<>();
+ AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
@@ -944,8 +1097,9 @@
DynamicCacheDescriptor cacheDesc = registeredCaches.get(cacheCtx.cacheId());
if (cctx.localNodeId().equals(cacheDesc.receivedFrom())) {
- List<List<ClusterNode>> assignment =
- cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ List<List<ClusterNode>> assignment = cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(),
+ fut.discoveryEvent(),
+ affCache);
cacheCtx.affinity().affinityCache().initialize(fut.topologyVersion(), assignment);
}
@@ -967,6 +1121,8 @@
fetchAffinity(fut, cctx.cacheContext(cacheId).affinity().affinityCache(), fetchFut);
}
+
+ log.info("Affinity fetch time [topVer=" + topVer + ", time=" + (System.currentTimeMillis() - start) + ']');
}
/**
@@ -986,7 +1142,7 @@
GridDhtAffinityAssignmentResponse res = fetchFut.get();
if (res == null) {
- List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent());
+ List<List<ClusterNode>> aff = affCache.calculate(topVer, fut.discoveryEvent(), null);
affCache.initialize(topVer, aff);
}
@@ -998,7 +1154,7 @@
else {
assert !affCache.centralizedAffinityFunction() || !lateAffAssign;
- affCache.calculate(topVer, fut.discoveryEvent());
+ affCache.calculate(topVer, fut.discoveryEvent(), null);
}
List<List<ClusterNode>> aff = res.affinityAssignment(cctx.discovery());
@@ -1024,14 +1180,20 @@
boolean centralizedAff;
if (lateAffAssign) {
+ AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(),
+ fut.discoveryEvent());
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
- cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cacheCtx.affinity().affinityCache().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
}
centralizedAff = true;
+
+ log.info("Initialized affinity on node left [topVer=" + fut.topologyVersion() +
+ ", calcCnt=" + affCache.calculateCount() + ']');
}
else {
initCachesAffinity(fut);
@@ -1055,12 +1217,18 @@
private void initCachesAffinity(GridDhtPartitionsExchangeFuture fut) throws IgniteCheckedException {
assert !lateAffAssign;
+ long start = System.currentTimeMillis();
+
+ AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal())
continue;
- initAffinity(cacheCtx.affinity().affinityCache(), fut, false);
+ initAffinity(cacheCtx.affinity().affinityCache(), fut, false, affCache);
}
+
+ log.info("Affinity init time [topVer=" + fut.topologyVersion() + ", time=" + (System.currentTimeMillis() - start) + ']');
}
/**
@@ -1072,13 +1240,15 @@
throws IgniteCheckedException {
final List<IgniteInternalFuture<AffinityTopologyVersion>> futs = new ArrayList<>();
+ final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
+
forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
@Override public void applyx(DynamicCacheDescriptor desc) throws IgniteCheckedException {
CacheHolder cache = caches.get(desc.cacheId());
if (cache != null) {
if (cache.client())
- cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent());
+ cache.affinity().calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
return;
}
@@ -1129,7 +1299,7 @@
throws IgniteCheckedException {
fetchAffinity(prev, aff, (GridDhtAssignmentFetchFuture)fetchFut);
- aff.calculate(fut.topologyVersion(), fut.discoveryEvent());
+ aff.calculate(fut.topologyVersion(), fut.discoveryEvent(), affCache);
affFut.onDone(fut.topologyVersion());
}
@@ -1210,34 +1380,40 @@
throws IgniteCheckedException {
AffinityTopologyVersion topVer = fut.topologyVersion();
- final Map<Object, List<List<ClusterNode>>> affCache = new HashMap<>();
+ final AffinityCalculateCache affCache = new AffinityCalculateCache(fut.topologyVersion(), fut.discoveryEvent());
- if (!crd) {
- for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
- if (cacheCtx.isLocal())
- continue;
+ try {
+ if (!crd) {
+ for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
+ if (cacheCtx.isLocal())
+ continue;
- boolean latePrimary = cacheCtx.rebalanceEnabled();
+ boolean latePrimary = cacheCtx.rebalanceEnabled();
- initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
- }
-
- return null;
- }
- else {
- final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
-
- forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
- @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
- CacheHolder cache = cache(fut, cacheDesc);
-
- boolean latePrimary = cache.rebalanceEnabled;
-
- initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
+ initAffinityOnNodeJoin(fut, cacheCtx.affinity().affinityCache(), null, latePrimary, affCache);
}
- });
- return waitRebalanceInfo;
+ return null;
+ }
+ else {
+ final WaitRebalanceInfo waitRebalanceInfo = new WaitRebalanceInfo(topVer);
+
+ forAllRegisteredCaches(new IgniteInClosureX<DynamicCacheDescriptor>() {
+ @Override public void applyx(DynamicCacheDescriptor cacheDesc) throws IgniteCheckedException {
+ CacheHolder cache = cache(fut, cacheDesc);
+
+ boolean latePrimary = cache.rebalanceEnabled;
+
+ initAffinityOnNodeJoin(fut, cache.affinity(), waitRebalanceInfo, latePrimary, affCache);
+ }
+ });
+
+ return waitRebalanceInfo;
+ }
+ }
+ finally {
+ log.info("Initialized affinity on node join [topVer=" + topVer +
+ ", calcCnt=" + affCache.calculateCount() + ", lateAffCalcTime=" + affCache.lateAffinityCalculateTime() + ']');
}
}
@@ -1253,7 +1429,7 @@
GridAffinityAssignmentCache aff,
WaitRebalanceInfo rebalanceInfo,
boolean latePrimary,
- Map<Object, List<List<ClusterNode>>> affCache)
+ AffinityCalculateCache affCache)
throws IgniteCheckedException
{
assert lateAffAssign;
@@ -1265,14 +1441,16 @@
assert affTopVer.topologyVersion() > 0 : "Affinity is not initialized [cache=" + aff.cacheName() +
", topVer=" + affTopVer + ", node=" + cctx.localNodeId() + ']';
- List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
-
- assert aff.idealAssignment() != null : "Previous assignment is not available.";
-
- List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent());
+ List<List<ClusterNode>> idealAssignment = aff.calculate(topVer, fut.discoveryEvent(), affCache);
List<List<ClusterNode>> newAssignment = null;
if (latePrimary) {
+ long start = U.currentTimeMillis();
+
+ List<List<ClusterNode>> curAff = aff.assignments(affTopVer);
+
+ assert aff.idealAssignment() != null : "Previous assignment is not available.";
+
for (int p = 0; p < idealAssignment.size(); p++) {
List<ClusterNode> newNodes = idealAssignment.get(p);
List<ClusterNode> curNodes = curAff.get(p);
@@ -1295,12 +1473,15 @@
newAssignment.set(p, nodes0);
}
}
+
+ if (affCache != null)
+ affCache.addLateAffinityCalculateTime(U.currentTimeMillis() - start);
}
if (newAssignment == null)
newAssignment = idealAssignment;
- aff.initialize(fut.topologyVersion(), cachedAssignment(aff, newAssignment, affCache));
+ aff.initialize(fut.topologyVersion(), newAssignment);
}
/**
@@ -1517,6 +1698,14 @@
return assignment;
}
+ public GridAffinityAssignmentCache cacheAssignment(Integer cacheId) {
+ CacheHolder holder = caches.get(cacheId);
+
+ assert holder != null;
+
+ return holder.affinity();
+ }
+
/**
*
*/
@@ -1716,9 +1905,7 @@
GridAffinityAssignmentCache aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
ccfg.getName(),
- affFunc,
- ccfg.getNodeFilter(),
- ccfg.getBackups(),
+ new AffinityConfiguration(affFunc, ccfg.getNodeFilter(), ccfg.getBackups()),
ccfg.getCacheMode() == LOCAL);
return new CacheHolder2(ccfg.getRebalanceMode() != NONE, cctx, aff, initAff);
@@ -1730,7 +1917,7 @@
* @param aff Affinity.
* @param initAff Current affinity.
*/
- public CacheHolder2(
+ CacheHolder2(
boolean rebalanceEnabled,
GridCacheSharedContext cctx,
GridAffinityAssignmentCache aff,
@@ -1754,7 +1941,7 @@
/**
*
*/
- class WaitRebalanceInfo {
+ private class WaitRebalanceInfo {
/** */
private final AffinityTopologyVersion topVer;
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
index c6e7ee6..6b47f82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAffinityManager.java
@@ -26,6 +26,7 @@
import org.apache.ignite.internal.IgniteClientDisconnectedCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.affinity.AffinityAssignment;
+import org.apache.ignite.internal.processors.affinity.AffinityConfiguration;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.util.GridLeanSet;
@@ -67,9 +68,7 @@
aff = new GridAffinityAssignmentCache(cctx.kernalContext(),
cctx.namex(),
- affFunction,
- cctx.config().getNodeFilter(),
- cctx.config().getBackups(),
+ new AffinityConfiguration(affFunction, cctx.config().getNodeFilter(), cctx.config().getBackups()),
cctx.isLocal());
}
@@ -77,7 +76,7 @@
@Override protected void onKernalStart0() throws IgniteCheckedException {
if (cctx.isLocal())
// No discovery event needed for local affinity.
- aff.calculate(LOC_CACHE_TOP_VER, null);
+ aff.calculate(LOC_CACHE_TOP_VER, null, null);
}
/** {@inheritDoc} */
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
index 503b334..3e1e31f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCachePartitionExchangeManager.java
@@ -102,6 +102,7 @@
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_PRELOAD_RESEND_TIMEOUT;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_THREAD_DUMP_ON_EXCHANGE_TIMEOUT;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getLong;
import static org.apache.ignite.events.EventType.EVT_CACHE_REBALANCE_STARTED;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
@@ -119,6 +120,9 @@
/** Exchange history size. */
private static final int EXCHANGE_HISTORY_SIZE = 1000;
+ /** */
+ private boolean skipFirstExchangeMsg;
+
/** Atomic reference for pending timeout object. */
private AtomicReference<ResendTimeoutObject> pendingResend = new AtomicReference<>();
@@ -300,10 +304,24 @@
}
};
+ public boolean skipFirstExchangeMessage() {
+ return skipFirstExchangeMsg;
+ }
+
/** {@inheritDoc} */
@Override protected void start0() throws IgniteCheckedException {
super.start0();
+ if (getBoolean("SKIP_FIRST_EXCHANGE_MSG", false)) {
+ if (cctx.kernalContext().config().isLateAffinityAssignment()) {
+ skipFirstExchangeMsg = true;
+
+ cctx.kernalContext().addNodeAttribute("SKIP_FIRST_EXCHANGE_MSG", true);
+ }
+ else
+ U.warn(log, "Can not use SKIP_FIRST_EXCHANGE_MSG optimization when late affinity assignment disabled.");
+ }
+
exchWorker = new ExchangeWorker();
cctx.gridEvents().addLocalEventListener(discoLsnr, EVT_NODE_JOINED, EVT_NODE_LEFT, EVT_NODE_FAILED,
@@ -1105,6 +1123,17 @@
if (log.isDebugEnabled())
log.debug("Exchange done [topVer=" + topVer + ", fut=" + exchFut + ", err=" + err + ']');
+ if (exchFut.singleMsgUpdateCnt > 0) {
+ log.info("Exchange done [topVer=" + topVer +
+ ", singleMsgUpdateTime=" + exchFut.singleMsgUpdateTime +
+ ", singleMsgUpdateCnt=" + exchFut.singleMsgUpdateCnt +
+ ", singleMsgUpdateMaxTime=" + exchFut.singleMsgUpdateMaxTime +
+ ", singleMsgUpdateMinTime=" + exchFut.singleMsgUpdateMinTime +
+ ", err=" + err + ']');
+ }
+ else
+ log.info("Exchange done [topVer=" + topVer + ", err=" + err + ']');
+
IgniteProductVersion minVer = cctx.localNode().version();
IgniteProductVersion maxVer = cctx.localNode().version();
@@ -1224,7 +1253,7 @@
top = cacheCtx.topology();
if (top != null)
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null);
}
if (!cctx.kernalContext().clientNode() && updated)
@@ -1273,7 +1302,7 @@
top = cacheCtx.topology();
if (top != null) {
- updated |= top.update(null, entry.getValue(), null) != null;
+ updated |= top.update(null, entry.getValue(), null);
cctx.affinity().checkRebalanceState(top, cacheId);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
index 5efb317..4ddb0f8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridClientPartitionTopology.java
@@ -549,7 +549,7 @@
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -563,7 +563,7 @@
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -571,7 +571,7 @@
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return null;
+ return false;
}
updateSeq.incrementAndGet();
@@ -634,7 +634,7 @@
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return null;
+ return false;
}
finally {
lock.writeLock().unlock();
@@ -643,7 +643,7 @@
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -654,21 +654,21 @@
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return null;
+ return false;
}
lock.writeLock().lock();
try {
if (stopping)
- return null;
+ return false;
if (lastExchangeId != null && exchId != null && lastExchangeId.compareTo(exchId) > 0) {
if (log.isDebugEnabled())
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (exchId != null)
@@ -688,7 +688,7 @@
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -740,7 +740,7 @@
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
index cad1617..af66a82 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtAffinityAssignmentResponse.java
@@ -142,9 +142,11 @@
List<ClusterNode> nodes = new ArrayList<>(ids.size());
for (int j = 0; j < ids.size(); j++) {
- ClusterNode node = disco.node(topVer, ids.get(j));
+ UUID nodeId = ids.get(j);
- assert node != null;
+ ClusterNode node = disco.node(topVer, nodeId);
+
+ assert node != null : nodeId;
nodes.add(node);
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
index 4ae4e47..b0a4183 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopology.java
@@ -205,7 +205,7 @@
* @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
- public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap);
@@ -215,7 +215,7 @@
* @param cntrMap Partition update counters.
* @return Local partition map if there were evictions or {@code null} otherwise.
*/
- @Nullable public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
index f3751ac..12a0d43 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/GridDhtPartitionTopologyImpl.java
@@ -350,7 +350,9 @@
long updateSeq = this.updateSeq.incrementAndGet();
- initPartitions0(exchFut, updateSeq);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ initPartitions0(oldest, exchFut, updateSeq);
consistencyCheck();
}
@@ -363,11 +365,9 @@
* @param exchFut Exchange future.
* @param updateSeq Update sequence.
*/
- private void initPartitions0(GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
+ private void initPartitions0(ClusterNode oldest, GridDhtPartitionsExchangeFuture exchFut, long updateSeq) {
ClusterNode loc = cctx.localNode();
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
assert oldest != null || cctx.kernalContext().clientNode();
GridDhtPartitionExchangeId exchId = exchFut.exchangeId();
@@ -407,12 +407,12 @@
if (log.isDebugEnabled())
log.debug("Owned partition for oldest node: " + locPart);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
}
}
}
else
- createPartitions(aff, updateSeq);
+ createPartitions(oldest, aff, updateSeq);
}
else {
// If preloader is disabled, then we simply clear out
@@ -429,7 +429,7 @@
if (state.active()) {
locPart.rent(false);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
if (log.isDebugEnabled())
log.debug("Evicting partition with rebalancing disabled " +
@@ -443,7 +443,7 @@
}
if (node2part != null && node2part.valid())
- checkEvictions(updateSeq, aff);
+ checkEvictions(oldest, updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -452,7 +452,7 @@
* @param aff Affinity assignments.
* @param updateSeq Update sequence.
*/
- private void createPartitions(List<List<ClusterNode>> aff, long updateSeq) {
+ private void createPartitions(ClusterNode oldest, List<List<ClusterNode>> aff, long updateSeq) {
ClusterNode loc = cctx.localNode();
int num = cctx.affinity().partitions();
@@ -464,7 +464,7 @@
// will be created in MOVING state.
GridDhtLocalPartition locPart = createPartition(p);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
}
}
// If this node's map is empty, we pre-create local partitions,
@@ -533,11 +533,11 @@
}
if (affReady)
- initPartitions0(exchFut, updateSeq);
+ initPartitions0(oldest, exchFut, updateSeq);
else {
List<List<ClusterNode>> aff = cctx.affinity().idealAssignment();
- createPartitions(aff, updateSeq);
+ createPartitions(oldest, aff, updateSeq);
}
consistencyCheck();
@@ -584,6 +584,8 @@
long updateSeq = this.updateSeq.incrementAndGet();
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
for (int p = 0; p < num; p++) {
GridDhtLocalPartition locPart = localPartition(p, topVer, false, false);
@@ -610,7 +612,7 @@
assert owned : "Failed to own partition [cacheName" + cctx.name() + ", locPart=" +
locPart + ']';
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
changed = true;
@@ -630,7 +632,7 @@
locPart + ", owners = " + owners + ']');
}
else
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
}
}
else {
@@ -640,7 +642,7 @@
if (state == MOVING) {
locPart.rent(false);
- updateLocal(p, loc.id(), locPart.state(), updateSeq);
+ updateLocal(oldest, p, loc.id(), locPart.state(), updateSeq);
changed = true;
@@ -1014,7 +1016,7 @@
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionFullMap partMap,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -1026,7 +1028,7 @@
try {
if (stopping)
- return null;
+ return false;
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
@@ -1054,7 +1056,7 @@
log.debug("Stale exchange id for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (node2part != null && node2part.compareTo(partMap) >= 0) {
@@ -1062,7 +1064,7 @@
log.debug("Stale partition map for full partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ", curMap=" + node2part + ", newMap=" + partMap + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -1129,7 +1131,9 @@
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
- changed = checkEvictions(updateSeq, aff);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ changed = checkEvictions(oldest, updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -1139,7 +1143,7 @@
if (log.isDebugEnabled())
log.debug("Partition map after full update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
@@ -1148,7 +1152,7 @@
/** {@inheritDoc} */
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- @Nullable @Override public GridDhtPartitionMap2 update(@Nullable GridDhtPartitionExchangeId exchId,
+ @Nullable @Override public boolean update(@Nullable GridDhtPartitionExchangeId exchId,
GridDhtPartitionMap2 parts,
@Nullable Map<Integer, Long> cntrMap) {
if (log.isDebugEnabled())
@@ -1159,33 +1163,28 @@
log.debug("Received partition update for non-existing node (will ignore) [exchId=" + exchId +
", parts=" + parts + ']');
- return null;
+ return false;
}
lock.writeLock().lock();
try {
if (stopping)
- return null;
+ return false;
if (cntrMap != null) {
for (Map.Entry<Integer, Long> e : cntrMap.entrySet()) {
- Long cntr = this.cntrMap.get(e.getKey());
+ Integer p = e.getKey();
+
+ Long cntr = this.cntrMap.get(p);
if (cntr == null || cntr < e.getValue())
- this.cntrMap.put(e.getKey(), e.getValue());
- }
+ this.cntrMap.put(p, e.getValue());
- for (int i = 0; i < locParts.length; i++) {
- GridDhtLocalPartition part = locParts[i];
+ GridDhtLocalPartition part = locParts[p];
- if (part == null)
- continue;
-
- Long cntr = cntrMap.get(part.id());
-
- if (cntr != null)
- part.updateCounter(cntr);
+ if (part != null)
+ part.updateCounter(e.getValue());
}
}
@@ -1194,7 +1193,7 @@
log.debug("Stale exchange id for single partition map update (will ignore) [lastExchId=" +
lastExchangeId + ", exchId=" + exchId + ']');
- return null;
+ return false;
}
if (exchId != null)
@@ -1211,7 +1210,7 @@
log.debug("Stale update sequence for single partition map update (will ignore) [exchId=" + exchId +
", curSeq=" + cur.updateSequence() + ", newSeq=" + parts.updateSequence() + ']');
- return null;
+ return false;
}
long updateSeq = this.updateSeq.incrementAndGet();
@@ -1254,7 +1253,9 @@
if (!affVer.equals(AffinityTopologyVersion.NONE) && affVer.compareTo(topVer) >= 0) {
List<List<ClusterNode>> aff = cctx.affinity().assignments(topVer);
- changed |= checkEvictions(updateSeq, aff);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ changed |= checkEvictions(oldest, updateSeq, aff);
updateRebalanceVersion(aff);
}
@@ -1264,7 +1265,7 @@
if (log.isDebugEnabled())
log.debug("Partition map after single update: " + fullMapString());
- return changed ? localPartitionMap() : null;
+ return changed;
}
finally {
lock.writeLock().unlock();
@@ -1276,7 +1277,7 @@
* @param aff Affinity assignments.
* @return Checks if any of the local partitions need to be evicted.
*/
- private boolean checkEvictions(long updateSeq, List<List<ClusterNode>> aff) {
+ private boolean checkEvictions(ClusterNode oldest, long updateSeq, List<List<ClusterNode>> aff) {
boolean changed = false;
UUID locId = cctx.nodeId();
@@ -1299,7 +1300,7 @@
if (nodeIds.containsAll(F.nodeIds(affNodes))) {
part.rent(false);
- updateLocal(part.id(), locId, part.state(), updateSeq);
+ updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
changed = true;
@@ -1324,7 +1325,7 @@
if (locId.equals(n.id())) {
part.rent(false);
- updateLocal(part.id(), locId, part.state(), updateSeq);
+ updateLocal(oldest, part.id(), locId, part.state(), updateSeq);
changed = true;
@@ -1353,12 +1354,9 @@
* @param updateSeq Update sequence.
*/
@SuppressWarnings({"MismatchedQueryAndUpdateOfCollection"})
- private void updateLocal(int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
+ private void updateLocal(ClusterNode oldest, int p, UUID nodeId, GridDhtPartitionState state, long updateSeq) {
assert nodeId.equals(cctx.nodeId());
- // In case if node joins, get topology at the time of joining node.
- ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
-
assert oldest != null || cctx.kernalContext().clientNode();
// If this node became the oldest node.
@@ -1453,7 +1451,9 @@
try {
if (part.own()) {
- updateLocal(part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ updateLocal(oldest, part.id(), loc.id(), part.state(), updateSeq.incrementAndGet());
consistencyCheck();
@@ -1481,7 +1481,9 @@
long seq = updateSeq ? this.updateSeq.incrementAndGet() : this.updateSeq.get();
- updateLocal(part.id(), cctx.localNodeId(), part.state(), seq);
+ ClusterNode oldest = CU.oldestAliveCacheServerNode(cctx.shared(), topVer);
+
+ updateLocal(oldest, part.id(), cctx.localNodeId(), part.state(), seq);
consistencyCheck();
}
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
index f391265..66f3ed1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/dht/preloader/GridDhtPartitionsExchangeFuture.java
@@ -65,7 +65,6 @@
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.internal.util.typedef.internal.LT;
import org.apache.ignite.internal.util.typedef.internal.S;
@@ -81,6 +80,7 @@
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.MOVING;
/**
* Future for exchanging partition maps.
@@ -112,6 +112,22 @@
/** */
@GridToStringExclude
+ private int pendingSingleUpdates;
+
+ /** */
+ public int singleMsgUpdateCnt;
+
+ /** */
+ public long singleMsgUpdateTime;
+
+ /** */
+ public long singleMsgUpdateMaxTime;
+
+ /** */
+ public long singleMsgUpdateMinTime = Long.MAX_VALUE;
+
+ /** */
+ @GridToStringExclude
private List<ClusterNode> srvNodes;
/** */
@@ -431,6 +447,10 @@
assert !dummy && !forcePreload : this;
try {
+ long initStart = System.currentTimeMillis();
+
+ log.info("Start exchange init [topVer=" + topologyVersion() + ']');
+
srvNodes = new ArrayList<>(cctx.discovery().serverNodes(topologyVersion()));
remaining.addAll(F.nodeIds(F.view(srvNodes, F.remoteNodes(cctx.localNodeId()))));
@@ -462,14 +482,22 @@
cctx.affinity().initStartedCaches(crdNode, this, receivedCaches);
}
+ long affStart = System.currentTimeMillis();
+
if (CU.clientNode(discoEvt.eventNode()))
exchange = onClientNodeEvent(crdNode);
else
exchange = onServerNodeEvent(crdNode);
+
+ log.info("Affinity call time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - affStart) + ']');
}
+ long topUpdateStart = System.currentTimeMillis();
+
updateTopologies(crdNode);
+ log.info("Top update time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - topUpdateStart) + ']');
+
switch (exchange) {
case ALL: {
distributedExchange();
@@ -495,7 +523,10 @@
default:
assert false;
+
}
+
+ log.info("Finish exchange init [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - initStart) + ']');
}
catch (IgniteInterruptedCheckedException e) {
onDone(e);
@@ -719,6 +750,8 @@
boolean topChanged = discoEvt.type() != EVT_DISCOVERY_CUSTOM_EVT || affChangeMsg != null;
+ long beforeExchStart = System.currentTimeMillis();
+
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (cacheCtx.isLocal() || stopping(cacheCtx.cacheId()))
continue;
@@ -733,12 +766,68 @@
cacheCtx.topology().beforeExchange(this, !centralizedAff);
}
+ log.info("Before exchange time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - beforeExchStart) + ']');
+
if (crd.isLocal()) {
+ ClusterNode node = discoEvt.eventNode();
+
+ Object attr = node.attribute("SKIP_FIRST_EXCHANGE_MSG");
+
+ boolean skipFirstExchange = Boolean.TRUE.equals(attr) || ((attr instanceof String) && "true".equalsIgnoreCase((String)attr));
+
+ if (discoEvt.type() == EVT_NODE_JOINED && !node.isLocal() && skipFirstExchange) {
+ assert !CU.clientNode(node) : discoEvt;
+ assert srvNodes.contains(node);
+
+ boolean rmv = remaining.remove(node.id());
+ assert rmv;
+
+ Collection<String> caches = cctx.discovery().nodeCaches(topologyVersion(), node);
+
+ if (!caches.isEmpty()) {
+ GridDhtPartitionsSingleMessage msg = new GridDhtPartitionsSingleMessage(exchangeId(),
+ false,
+ null,
+ false);
+
+ for (String cache : caches) {
+ Map<Integer, GridDhtPartitionState> m = new HashMap<>();
+
+ GridDhtPartitionMap2 partMap = new GridDhtPartitionMap2(node.id(), 1, topologyVersion(), m, true);
+
+ GridAffinityAssignmentCache assign = cctx.affinity().cacheAssignment(CU.cacheId(cache));
+
+ for (Integer part : assign.primaryPartitions(node.id(), topologyVersion()))
+ partMap.put(part, MOVING);
+ for (Integer part : assign.backupPartitions(node.id(), topologyVersion()))
+ partMap.put(part, MOVING);
+
+ msg.addLocalPartitionMap(CU.cacheId(cache), partMap, null);
+ }
+
+ updatePartitionSingleMap(msg);
+ }
+ }
+
if (remaining.isEmpty())
onAllReceived(false);
}
- else
- sendPartitions(crd);
+ else {
+ boolean skipSnd = false;
+
+ if (cctx.exchange().skipFirstExchangeMessage() && discoEvt.type() == EVT_NODE_JOINED && discoEvt.eventNode().isLocal())
+ skipSnd = true;
+
+ if (!skipSnd) {
+ long sndStart = System.currentTimeMillis();
+
+ sendPartitions(crd);
+
+ log.info("Send parts time [topVer=" + topologyVersion() + ", time=" + (System.currentTimeMillis() - sndStart) + ']');
+ }
+ else
+ log.info("Skip first exchange message [topVer=" + topologyVersion() + ']');
+ }
initDone();
}
@@ -1162,23 +1251,66 @@
*/
private void processMessage(ClusterNode node, GridDhtPartitionsSingleMessage msg) {
boolean allReceived = false;
+ boolean updateSingleMap = false;
synchronized (mux) {
assert crd != null;
if (crd.isLocal()) {
if (remaining.remove(node.id())) {
- updatePartitionSingleMap(msg);
+ updateSingleMap = true;
+
+ pendingSingleUpdates++;
allReceived = remaining.isEmpty();
}
+
+ singleMsgUpdateCnt++;
}
else
singleMsgs.put(node, msg);
}
- if (allReceived)
+ if (updateSingleMap) {
+ long start = U.currentTimeMillis();
+
+ try {
+ updatePartitionSingleMap(msg);
+ }
+ finally {
+ synchronized (mux) {
+ long time = U.currentTimeMillis() - start;
+
+ if (time > singleMsgUpdateMaxTime)
+ singleMsgUpdateMaxTime = time;
+ if (time < singleMsgUpdateMinTime)
+ singleMsgUpdateMinTime = time;
+
+ singleMsgUpdateTime += time;
+
+ assert pendingSingleUpdates > 0;
+
+ pendingSingleUpdates--;
+
+ if (pendingSingleUpdates == 0)
+ mux.notifyAll();
+ }
+ }
+ }
+
+ if (allReceived) {
+ synchronized (mux) {
+ try {
+ while (pendingSingleUpdates > 0)
+ U.wait(mux);
+ }
+ catch (IgniteInterruptedCheckedException e) {
+ U.warn(log, "Failed to wait for partition map updates.");
+ }
+ }
+
onAllReceived(false);
+ }
}
/**
@@ -1211,6 +1343,8 @@
try {
assert crd.isLocal();
+ log.info("Coordinator exchange all received [topVer=" + topologyVersion() + ']');
+
if (!crd.equals(cctx.discovery().serverNodes(topologyVersion()).get(0))) {
for (GridCacheContext cacheCtx : cctx.cacheContexts()) {
if (!cacheCtx.isLocal())
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 8814745..67bd252 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -129,6 +129,7 @@
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryPingResponse;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient;
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
+import org.apache.ignite.thread.IgniteThreadPoolExecutor;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -136,6 +137,7 @@
import static org.apache.ignite.IgniteSystemProperties.IGNITE_DISCOVERY_CLIENT_RECONNECT_HISTORY_SIZE;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_OPTIMIZED_MARSHALLER_USE_DEFAULT_SUID;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SERVICES_COMPATIBILITY_MODE;
+import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.IgniteSystemProperties.getInteger;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_JOINED;
@@ -174,8 +176,10 @@
IgniteProductVersion.fromString("1.5.0");
/** */
- private final ThreadPoolExecutor utilityPool = new ThreadPoolExecutor(0, 1, 2000, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<Runnable>());
+ private static final boolean SEND_JOIN_REQ_DIRECTLY = getBoolean("SEND_JOIN_REQ_DIRECTLY", false);
+
+ /** */
+ private IgniteThreadPoolExecutor utilityPool;
/** Nodes ring. */
@GridToStringExclude
@@ -297,6 +301,13 @@
spiState = DISCONNECTED;
}
+ utilityPool = new IgniteThreadPoolExecutor("disco-pool",
+ spi.ignite().name(),
+ 0,
+ 1,
+ 2000,
+ new LinkedBlockingQueue<Runnable>());
+
if (debugMode) {
if (!log.isInfoEnabled())
throw new IgniteSpiException("Info log level should be enabled for TCP discovery to work " +
@@ -918,6 +929,8 @@
if (log.isDebugEnabled())
log.debug("Discovery SPI has been connected to topology with order: " + locNode.internalOrder());
+
+ log.info("Node joined topology: " + locNode);
}
/**
@@ -974,7 +987,7 @@
for (InetSocketAddress addr : addrs) {
try {
- Integer res = sendMessageDirectly(joinReq, addr);
+ Integer res = sendMessageDirectly(joinReq, addr, true);
assert res != null;
@@ -1087,13 +1100,15 @@
*
* @param msg Message to send.
* @param addr Address to send message to.
+ * @param join {@code True} if sends initial node join request.
* @return Response read from the recipient or {@code null} if no response is supposed.
* @throws IgniteSpiException If an error occurs.
*/
- @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr)
+ @Nullable private Integer sendMessageDirectly(TcpDiscoveryAbstractMessage msg, InetSocketAddress addr, boolean join)
throws IgniteSpiException {
assert msg != null;
assert addr != null;
+ assert !join || msg instanceof TcpDiscoveryJoinRequestMessage;
Collection<Throwable> errs = null;
@@ -1180,7 +1195,7 @@
// Connection has been established, but
// join request may not be unmarshalled on remote host.
// E.g. due to class not found issue.
- joinReqSent = msg instanceof TcpDiscoveryJoinRequestMessage;
+ joinReqSent = join;
int receipt = spi.readReceipt(sock, timeoutHelper.nextTimeoutChunk(ackTimeout0));
@@ -2403,9 +2418,12 @@
/** Connection check threshold. */
private long connCheckThreshold;
+ /** */
+ private long lastRingMsgTime;
+
/**
*/
- protected RingMessageWorker() {
+ RingMessageWorker() {
super("tcp-disco-msg-worker", 10);
initConnectionCheckFrequency();
@@ -2500,6 +2518,8 @@
* @param msg Message to process.
*/
@Override protected void processMessage(TcpDiscoveryAbstractMessage msg) {
+ sendHeartbeatMessage();
+
DebugLogger log = messageLogger(msg);
if (log.isDebugEnabled())
@@ -2508,6 +2528,11 @@
if (debugMode)
debugLog(msg, "Processing message [cls=" + msg.getClass().getSimpleName() + ", id=" + msg.id() + ']');
+ boolean ensured = spi.ensured(msg);
+
+ if (!locNode.id().equals(msg.senderNodeId()) && ensured)
+ lastRingMsgTime = U.currentTimeMillis();
+
if (locNode.internalOrder() == 0) {
boolean proc = false;
@@ -2564,7 +2589,7 @@
else
assert false : "Unknown message type: " + msg.getClass().getSimpleName();
- if (spi.ensured(msg) && redirectToClients(msg))
+ if (ensured && redirectToClients(msg))
msgHist.add(msg);
if (msg.senderNodeId() != null && !msg.senderNodeId().equals(getLocalNodeId())) {
@@ -3744,8 +3769,79 @@
if (nodeAddedMsg.verified())
msgHist.add(nodeAddedMsg);
}
- else if (sendMessageToRemotes(msg))
- sendMessageAcrossRing(msg);
+ else {
+ if (sendMessageToRemotes(msg)) {
+ if (SEND_JOIN_REQ_DIRECTLY && !msg.directSendFailed()) {
+ final TcpDiscoveryNode crd = resolveCoordinator();
+
+ Collection<TcpDiscoveryNode> failedNodes;
+
+ synchronized (mux) {
+ failedNodes = U.arrayList(ServerImpl.this.failedNodes.keySet());
+ }
+
+ TcpDiscoveryNode next = ring.nextNode(failedNodes);
+
+ if (crd != null && !crd.equals(next)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Will send join request directly to coordinator " +
+ "[msg=" + msg + ", crd=" + crd + ", next=" + next + ']');
+ }
+
+ log.info("Will send join request directly to coordinator " +
+ "[cnt=" + joiningNodes.size() + ", msg=" + msg + ", crd=" + crd + ", next=" + next + ']');
+
+ utilityPool.submit(new Runnable() {
+ @Override public void run() {
+ IgniteSpiException sndErr = null;
+ Integer res = null;
+
+ TcpDiscoveryJoinRequestMessage msg0 =
+ new TcpDiscoveryJoinRequestMessage(msg.node(), msg.discoveryData());
+
+ try {
+ res = trySendMessageDirectly(crd, msg0);
+
+ if (F.eq(RES_OK, res)) {
+ if (log.isDebugEnabled()) {
+ log.debug("Sent join request directly to coordinator " +
+ "[msg=" + msg0 + ", crd=" + crd + ']');
+ }
+
+ log.info("Sent join request directly to coordinator " +
+ "[msg=" + msg0 + ", crd=" + crd + ']');
+
+ return;
+ }
+ }
+ catch (IgniteSpiException e) {
+ sndErr = e;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Failed to send join request to coordinator, will process from " +
+ "message worker [msg=" + msg0 + ", crd=" + crd + ", err=" + sndErr +
+ ", res=" + res + ']');
+ }
+
+ log.info("Failed to send join request to coordinator, will process from " +
+ "message worker [msg=" + msg0 + ", crd=" + crd + ", err=" + sndErr +
+ ", res=" + res + ']');
+
+ msg.directSendFailed(true);
+
+ msgWorker.addMessage(msg);
+ }
+ });
+
+ return;
+ }
+ }
+
+ sendMessageAcrossRing(msg);
+ }
+
+ }
}
/**
@@ -3780,7 +3876,7 @@
* @param msg Message.
* @throws IgniteSpiException Last failure if all attempts failed.
*/
- private void trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg)
+ private Integer trySendMessageDirectly(TcpDiscoveryNode node, TcpDiscoveryAbstractMessage msg)
throws IgniteSpiException {
if (node.isClient()) {
TcpDiscoveryNode routerNode = ring.node(node.clientRouterNodeId());
@@ -3801,25 +3897,21 @@
worker.addMessage(msg);
- return;
+ return null;
}
- trySendMessageDirectly(routerNode, msg);
-
- return;
+ return trySendMessageDirectly(routerNode, msg);
}
IgniteSpiException ex = null;
for (InetSocketAddress addr : spi.getNodeAddresses(node, U.sameMacs(locNode, node))) {
try {
- sendMessageDirectly(msg, addr);
+ Integer res = sendMessageDirectly(msg, addr, false);
node.lastSuccessfulAddress(addr);
- ex = null;
-
- break;
+ return res;
}
catch (IgniteSpiException e) {
ex = e;
@@ -3828,6 +3920,8 @@
if (ex != null)
throw ex;
+
+ return null;
}
/**
@@ -5336,12 +5430,9 @@
* Sends heartbeat message if needed.
*/
private void sendHeartbeatMessage() {
- if (!isLocalNodeCoordinator())
- return;
-
long elapsed = (lastTimeHbMsgSent + spi.hbFreq) - U.currentTimeMillis();
- if (elapsed > 0)
+ if (elapsed > 0 || !isLocalNodeCoordinator())
return;
TcpDiscoveryHeartbeatMessage msg = new TcpDiscoveryHeartbeatMessage(getConfiguredNodeId());
@@ -5361,7 +5452,9 @@
if (lastTimeStatusMsgSent < locNode.lastUpdateTime())
lastTimeStatusMsgSent = locNode.lastUpdateTime();
- long elapsed = (lastTimeStatusMsgSent + hbCheckFreq) - U.currentTimeMillis();
+ long updateTime = Math.max(lastTimeStatusMsgSent, lastRingMsgTime);
+
+ long elapsed = (updateTime + hbCheckFreq) - U.currentTimeMillis();
if (elapsed > 0)
return;
@@ -5544,6 +5637,9 @@
ClientMessageWorker clientMsgWrk = null;
+ TcpDiscoveryAbstractMessage msg = null;
+ Exception sockE = null;
+
try {
InputStream in;
@@ -5604,7 +5700,7 @@
// Restore timeout.
sock.setSoTimeout(timeout);
- TcpDiscoveryAbstractMessage msg = spi.readMessage(sock, in, spi.netTimeout);
+ msg = spi.readMessage(sock, in, spi.netTimeout);
// Ping.
if (msg instanceof TcpDiscoveryPingRequest) {
@@ -5709,6 +5805,8 @@
}
}
catch (IOException e) {
+ sockE = e;
+
if (log.isDebugEnabled())
U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
@@ -5736,6 +5834,8 @@
return;
}
catch (IgniteCheckedException e) {
+ sockE = e;
+
if (log.isDebugEnabled())
U.error(log, "Caught exception on handshake [err=" + e +", sock=" + sock + ']', e);
@@ -5765,8 +5865,7 @@
while (!isInterrupted()) {
try {
- TcpDiscoveryAbstractMessage msg = U.unmarshal(spi.marshaller(), in,
- U.resolveClassLoader(spi.ignite().configuration()));
+ msg = U.unmarshal(spi.marshaller(), in, U.resolveClassLoader(spi.ignite().configuration()));
msg.senderNodeId(nodeId);
@@ -5793,9 +5892,12 @@
if (clientMsgWrk != null && ok)
continue;
- else
+ else {
+ log.info("Processed join request, close connection [msg=" + msg + ']');
+
// Direct join request - no need to handle this socket anymore.
break;
+ }
}
}
else if (msg instanceof TcpDiscoveryClientReconnectMessage) {
@@ -5968,6 +6070,8 @@
processClientHeartbeatMessage(heartbeatMsg);
}
catch (IgniteCheckedException e) {
+ sockE = e;
+
if (log.isDebugEnabled())
U.error(log, "Caught exception on message read [sock=" + sock +
", locNodeId=" + locNodeId + ", rmtNodeId=" + nodeId + ']', e);
@@ -5995,6 +6099,8 @@
return;
}
catch (IOException e) {
+ sockE = e;
+
if (log.isDebugEnabled())
U.error(log, "Caught exception on message read [sock=" + sock + ", locNodeId=" + locNodeId +
", rmtNodeId=" + nodeId + ']', e);
@@ -6018,6 +6124,8 @@
}
}
finally {
+ log.info("Close sock [readers=" + spi.stats.socketReaders() + ", msg=" + msg + ", err=" + sockE + ']');
+
if (clientMsgWrk != null) {
if (log.isDebugEnabled())
log.debug("Client connection failed [sock=" + sock + ", locNodeId=" + locNodeId +
@@ -6062,11 +6170,11 @@
TcpDiscoverySpiState state = spiStateCopy();
- long socketTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
+ long sockTimeout = spi.failureDetectionTimeoutEnabled() ? spi.failureDetectionTimeout() :
spi.getSocketTimeout();
if (state == CONNECTED) {
- spi.writeToSocket(msg, sock, RES_OK, socketTimeout);
+ spi.writeToSocket(msg, sock, RES_OK, sockTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + RES_OK + ']');
@@ -6103,7 +6211,7 @@
// Local node is stopping. Remote node should try next one.
res = RES_CONTINUE_JOIN;
- spi.writeToSocket(msg, sock, res, socketTimeout);
+ spi.writeToSocket(msg, sock, res, sockTimeout);
if (log.isDebugEnabled())
log.debug("Responded to join request message [msg=" + msg + ", res=" + res + ']');
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
index 9e73632..b434c04 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/TcpDiscoveryStatistics.java
@@ -628,6 +628,10 @@
return sockReadersCreated;
}
+ public synchronized int socketReaders() {
+ return sockReadersCreated - sockReadersRmv;
+ }
+
/**
* Gets socket readers removed count.
*
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 22ffae8..4422919 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -18,6 +18,7 @@
package org.apache.ignite.spi.discovery.tcp.messages;
import java.util.Map;
+import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
@@ -36,6 +37,10 @@
/** Discovery data. */
private final Map<Integer, byte[]> discoData;
+ /** */
+ @GridToStringExclude
+ private transient boolean directSndFailed;
+
/**
* Constructor.
*
@@ -79,6 +84,14 @@
setFlag(RESPONDED_FLAG_POS, responded);
}
+ public boolean directSendFailed() {
+ return directSndFailed;
+ }
+
+ public void directSendFailed(boolean directSndFailed) {
+ this.directSndFailed = directSndFailed;
+ }
+
/** {@inheritDoc} */
@Override public boolean equals(Object obj) {
// NOTE!
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
index a59ca8b..2d46cf4 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/GridCachePartitionedAffinitySpreadTest.java
@@ -76,7 +76,12 @@
Map<ClusterNode, Integer> parts = new HashMap<>(nodes.size());
for (int part = 0; part < aff.getPartitions(); part++) {
- Collection<ClusterNode> affNodes = aff.assignPartition(part, new ArrayList(nodes), 0, null);
+ Collection<ClusterNode> affNodes = aff.assignPartition(null,
+ part,
+ new ArrayList<>(nodes),
+ new HashMap<ClusterNode, byte[]>(),
+ 0,
+ null);
assertEquals(1, affNodes.size());
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
index 7e37450..643aa86 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/CacheLateAffinityAssignmentTest.java
@@ -2581,6 +2581,17 @@
return !excludeNodes.contains(name);
}
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (this == obj)
+ return true;
+
+ if (obj == null || !(getClass() == obj.getClass()))
+ return false;
+
+ return F.eq(excludeNodes, ((CacheNodeFilter)obj).excludeNodes);
+ }
}
/**