blob: 16cc24428cdaf4c60a65acbb52b0acf47b2d9603 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ignite.internal.processors.cache.distributed.dht.preloader;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.internal.GridDirectCollection;
import org.apache.ignite.internal.GridDirectMap;
import org.apache.ignite.internal.managers.discovery.DiscoCache;
import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
import org.apache.ignite.internal.processors.affinity.GridAffinityAssignmentCache;
import org.apache.ignite.internal.processors.cache.CacheGroupContext;
import org.apache.ignite.internal.processors.cache.GridCacheSharedContext;
import org.apache.ignite.internal.util.GridLongList;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.plugin.extensions.communication.Message;
import org.apache.ignite.plugin.extensions.communication.MessageCollectionItemType;
import org.apache.ignite.plugin.extensions.communication.MessageReader;
import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import org.jetbrains.annotations.Nullable;
/**
* Information about affinity assignment.
*/
public class CacheGroupAffinityMessage implements Message {
/** */
private static final long serialVersionUID = 0L;
/** */
@GridDirectCollection(GridLongList.class)
private List<GridLongList> assigns;
/** */
@GridDirectCollection(GridLongList.class)
private List<GridLongList> idealAssigns;
/** */
@GridDirectMap(keyType = Integer.class, valueType = GridLongList.class)
private Map<Integer, GridLongList> assignsDiff;
/**
*
*/
public CacheGroupAffinityMessage() {
// No-op.
}
/**
* @param assign0 Assignment.
* @param idealAssign0 Ideal assignment.
* @param assignDiff0 Difference with ideal affinity assignment.
*/
private CacheGroupAffinityMessage(List<List<ClusterNode>> assign0,
List<List<ClusterNode>> idealAssign0,
Map<Integer, List<Long>> assignDiff0) {
if (assign0 != null)
assigns = createAssigns(assign0);
if (idealAssign0 != null)
idealAssigns = createAssigns(idealAssign0);
if (assignDiff0 != null) {
assignsDiff = U.newHashMap(assignDiff0.size());
for (Map.Entry<Integer, List<Long>> e : assignDiff0.entrySet()) {
List<Long> orders = e.getValue();
GridLongList l = new GridLongList(orders.size());
for (int n = 0; n < orders.size(); n++)
l.add(orders.get(n));
assignsDiff.put(e.getKey(), l);
}
}
}
/** */
private List<GridLongList> createAssigns(List<List<ClusterNode>> assign0) {
if (assign0 != null) {
List<GridLongList> assigns = new ArrayList<>(assign0.size());
for (int i = 0; i < assign0.size(); i++) {
List<ClusterNode> nodes = assign0.get(i);
GridLongList l = new GridLongList(nodes.size());
for (int n = 0; n < nodes.size(); n++)
l.add(nodes.get(n).order());
assigns.add(l);
}
return assigns;
}
return null;
}
/**
* @param affDiff Affinity diff.
* @return Affinity diff messages.
*/
public static Map<Integer, CacheGroupAffinityMessage> createAffinityDiffMessages(
Map<Integer, Map<Integer, List<Long>>> affDiff) {
if (F.isEmpty(affDiff))
return null;
Map<Integer, CacheGroupAffinityMessage> map = U.newHashMap(affDiff.size());
for (Map.Entry<Integer, Map<Integer, List<Long>>> e : affDiff.entrySet())
map.put(e.getKey(), new CacheGroupAffinityMessage(null, null, e.getValue()));
return map;
}
/**
* Fill Map of CacheGroupAffinityMessages.
*
* @param cctx Context.
* @param topVer Topology version.
* @param affReq Cache group IDs.
* @param cachesAff Optional already prepared affinity.
*/
static void createAffinityMessages(
GridCacheSharedContext cctx,
AffinityTopologyVersion topVer,
Collection<Integer> affReq,
Map<Integer, CacheGroupAffinityMessage> cachesAff
) {
assert !F.isEmpty(affReq) : affReq;
for (Integer grpId : affReq) {
cachesAff.computeIfAbsent(grpId, (integer) -> {
GridAffinityAssignmentCache aff = cctx.affinity().groupAffinity(grpId);
// If no coordinator group holder on the node, try fetch affinity from existing cache group.
if (aff == null) {
CacheGroupContext grp = cctx.cache().cacheGroup(grpId);
if (grp == null)
return null;
aff = grp.affinity();
}
List<List<ClusterNode>> assign = aff.readyAssignments(topVer);
return new CacheGroupAffinityMessage(
assign,
aff.centralizedAffinityFunction() ? aff.idealAssignmentRaw() : null,
null
);
});
}
}
/**
* @param assign Nodes orders.
* @param nodesByOrder Nodes by order cache.
* @param discoCache Discovery data cache.
* @return Nodes list.
*/
public static List<ClusterNode> toNodes(GridLongList assign, Map<Long, ClusterNode> nodesByOrder,
DiscoCache discoCache) {
List<ClusterNode> assign0 = new ArrayList<>(assign.size());
for (int n = 0; n < assign.size(); n++) {
long order = assign.get(n);
ClusterNode affNode = nodesByOrder.computeIfAbsent(order, o -> discoCache.serverNodeByOrder(order));
assert affNode != null : "Failed to find node by order [order=" + order +
", topVer=" + discoCache.version() + ']';
assign0.add(affNode);
}
return assign0;
}
/**
* @param nodesByOrder Nodes by order cache.
* @param discoCache Discovery data cache.
* @return Nodes list.
*/
@Nullable public List<List<ClusterNode>> createIdealAssignments(Map<Long, ClusterNode> nodesByOrder,
DiscoCache discoCache) {
if (idealAssigns == null)
return null;
return createAssignments(idealAssigns, nodesByOrder, discoCache);
}
/**
* @param nodesByOrder Nodes by order cache.
* @param discoCache Discovery data cache.
* @return Assignments.
*/
public List<List<ClusterNode>> createAssignments(Map<Long, ClusterNode> nodesByOrder, DiscoCache discoCache) {
return createAssignments(assigns, nodesByOrder, discoCache);
}
/**
* @param assigns Nodes orders.
* @param nodesByOrder Nodes by order cache.
* @param discoCache Discovery data cache.
* @return Nodes list.
*/
private List<List<ClusterNode>> createAssignments(List<GridLongList> assigns,
Map<Long, ClusterNode> nodesByOrder,
DiscoCache discoCache) {
List<List<ClusterNode>> assignments0 = new ArrayList<>(assigns.size());
for (int p = 0; p < assigns.size(); p++) {
GridLongList assign = assigns.get(p);
assignments0.add(toNodes(assign, nodesByOrder, discoCache));
}
return assignments0;
}
/**
* @return Difference with ideal affinity assignment.
*/
public Map<Integer, GridLongList> assignmentsDiff() {
return assignsDiff;
}
/** {@inheritDoc} */
@Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
writer.setBuffer(buf);
if (!writer.isHeaderWritten()) {
if (!writer.writeHeader(directType(), fieldsCount()))
return false;
writer.onHeaderWritten();
}
switch (writer.state()) {
case 0:
if (!writer.writeCollection("assigns", assigns, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 1:
if (!writer.writeMap("assignsDiff", assignsDiff, MessageCollectionItemType.INT, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
case 2:
if (!writer.writeCollection("idealAssigns", idealAssigns, MessageCollectionItemType.MSG))
return false;
writer.incrementState();
}
return true;
}
/** {@inheritDoc} */
@Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
reader.setBuffer(buf);
if (!reader.beforeMessageRead())
return false;
switch (reader.state()) {
case 0:
assigns = reader.readCollection("assigns", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 1:
assignsDiff = reader.readMap("assignsDiff", MessageCollectionItemType.INT, MessageCollectionItemType.MSG, false);
if (!reader.isLastRead())
return false;
reader.incrementState();
case 2:
idealAssigns = reader.readCollection("idealAssigns", MessageCollectionItemType.MSG);
if (!reader.isLastRead())
return false;
reader.incrementState();
}
return reader.afterMessageRead(CacheGroupAffinityMessage.class);
}
/** {@inheritDoc} */
@Override public short directType() {
return 128;
}
/** {@inheritDoc} */
@Override public byte fieldsCount() {
return 3;
}
/** {@inheritDoc} */
@Override public void onAckReceived() {
// No-op.
}
/** {@inheritDoc} */
@Override public String toString() {
return S.toString(CacheGroupAffinityMessage.class, this);
}
}