blob: d129b5b23fa90ece36d1be5ba21a1da642cd5b9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement.plugins;
import com.google.common.collect.Ordering;
import com.google.common.collect.TreeMultimap;
import org.apache.solr.cluster.*;
import org.apache.solr.cluster.placement.*;
import org.apache.solr.cluster.placement.impl.NodeMetricImpl;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.SuppressForbidden;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
/**
* <p>This factory is instantiated by config from its class name. Using it is the only way to create instances of
* {@link AffinityPlacementPlugin}.</p>
*
* <p>In order to configure this plugin to be used for placement decisions, the following {@code curl} command (or something
* equivalent) has to be executed once the cluster is already running in order to set
* the appropriate Zookeeper stored configuration. Replace {@code localhost:8983} by one of your servers' IP address and port.</p>
*
* <pre>
*
* curl -X POST -H 'Content-type:application/json' -d '{
* "add": {
* "name": ".placement-plugin",
* "class": "org.apache.solr.cluster.placement.plugins.AffinityPlacementFactory",
* "config": {
* "minimalFreeDiskGB": 10,
* "prioritizedFreeDiskGB": 50
* }
* }
* }' http://localhost:8983/api/cluster/plugin
* </pre>
*
* <p>In order to delete the placement-plugin section (and to fallback to either Legacy
* or rule based placement if configured for a collection), execute:</p>
*
* <pre>
*
* curl -X POST -H 'Content-type:application/json' -d '{
* "remove" : ".placement-plugin"
* }' http://localhost:8983/api/cluster/plugin
* </pre>
*
*
* <p>{@link AffinityPlacementPlugin} implements placing replicas in a way that replicate past Autoscaling config defined
* <a href="https://github.com/lucidworks/fusion-cloud-native/blob/master/policy.json#L16">here</a>.</p>
*
* <p>This specification is doing the following:
* <p><i>Spread replicas per shard as evenly as possible across multiple availability zones (given by a sys prop),
* assign replicas based on replica type to specific kinds of nodes (another sys prop), and avoid having more than
* one replica per shard on the same node.<br>
* Only after these constraints are satisfied do minimize cores per node or disk usage.</i></p>
*
* <p>Overall strategy of this plugin:</p>
* <ul><li>
* The set of nodes in the cluster is obtained and transformed into 3 independent sets (that can overlap) of nodes
* accepting each of the three replica types.
* </li><li>
* For each shard on which placing replicas is required and then for each replica type to place (starting with NRT,
* then TLOG then PULL): <ul>
* <li>The set of candidates nodes corresponding to the replica type is used and from that set are removed nodes
* that already have a replica (of any type) for that shard</li>
* <li>If there are not enough nodes, an error is thrown (this is checked further down during processing).</li>
* <li>The number of (already existing) replicas of the current type on each Availability Zone is collected.</li>
* <li>Separate the set of available nodes to as many subsets (possibly some are empty) as there are Availability Zones
* defined for the candidate nodes</li>
* <li>In each AZ nodes subset, sort the nodes by increasing total number of cores count, with possibly a condition
* that pushes nodes with low disk space to the end of the list? Or a weighted combination of the relative
* importance of these two factors? Some randomization? Marking as non available nodes with not enough disk space?
* These and other are likely aspects to be played with once the plugin is tested or observed to be running in prod,
* don't expect the initial code drop(s) to do all of that.</li>
* <li>Iterate over the number of replicas to place (for the current replica type for the current shard):
* <ul>
* <li>Based on the number of replicas per AZ collected previously, pick the non empty set of nodes having the
* lowest number of replicas. Then pick the first node in that set. That's the node the replica is placed one.
* Remove the node from the set of available nodes for the given AZ and increase the number of replicas placed
* on that AZ.</li>
* </ul></li>
* <li>During this process, the number of cores on the nodes in general is tracked to take into account placement
* decisions so that not all shards decide to put their replicas on the same nodes (they might though if these are
* the less loaded nodes).</li>
* </ul>
* </li>
* </ul>
*
* <p>This code is a realistic placement computation, based on a few assumptions. The code is written in such a way to
* make it relatively easy to adapt it to (somewhat) different assumptions. Additional configuration options could be introduced
* to allow configuration base option selection as well...</p>
*/
public class AffinityPlacementFactory implements PlacementPluginFactory<AffinityPlacementConfig> {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/**
* <p>Name of the system property on a node indicating which (public cloud) Availability Zone that node is in. The value
* is any string, different strings denote different availability zones.
*
* <p>Nodes on which this system property is not defined are considered being in the same Availability Zone
* {@link #UNDEFINED_AVAILABILITY_ZONE} (hopefully the value of this constant is not the name of a real Availability Zone :).
*/
public static final String AVAILABILITY_ZONE_SYSPROP = "availability_zone";
/**
* <p>Name of the system property on a node indicating the type of replicas allowed on that node.
* The value of that system property is a comma separated list or a single string of value names of
* {@link org.apache.solr.cluster.Replica.ReplicaType} (case insensitive). If that property is not defined, that node is
* considered accepting all replica types (i.e. undefined is equivalent to {@code "NRT,Pull,tlog"}).
*/
public static final String REPLICA_TYPE_SYSPROP = "replica_type";
/**
* This is the "AZ" name for nodes that do not define an AZ. Should not match a real AZ name (I think we're safe)
*/
public static final String UNDEFINED_AVAILABILITY_ZONE = "uNd3f1NeD";
private AffinityPlacementConfig config = AffinityPlacementConfig.DEFAULT;
/**
* Empty public constructor is used to instantiate this factory. Using a factory pattern to allow the factory to do one
* time costly operations if needed, and to only have to instantiate a default constructor class by name, rather than
* having to call a constructor with more parameters (if we were to instantiate the plugin class directly without going
* through a factory).
*/
public AffinityPlacementFactory() {
}
@Override
public PlacementPlugin createPluginInstance() {
return new AffinityPlacementPlugin(config.minimalFreeDiskGB, config.prioritizedFreeDiskGB);
}
@Override
public void configure(AffinityPlacementConfig cfg) {
Objects.requireNonNull(cfg, "configuration must never be null");
this.config = cfg;
}
@Override
public AffinityPlacementConfig getConfig() {
return config;
}
/**
* See {@link AffinityPlacementFactory} for instructions on how to configure a cluster to use this plugin and details
* on what the plugin does.
*/
static class AffinityPlacementPlugin implements PlacementPlugin {
private final long minimalFreeDiskGB;
private final long prioritizedFreeDiskGB;
private final Random replicaPlacementRandom = new Random(); // ok even if random sequence is predictable.
/**
* The factory has decoded the configuration for the plugin instance and passes it the parameters it needs.
*/
private AffinityPlacementPlugin(long minimalFreeDiskGB, long prioritizedFreeDiskGB) {
this.minimalFreeDiskGB = minimalFreeDiskGB;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
// We make things reproducible in tests by using test seed if any
String seed = System.getProperty("tests.seed");
if (seed != null) {
replicaPlacementRandom.setSeed(seed.hashCode());
}
}
@Override
@SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
public PlacementPlan computePlacement(PlacementRequest request, PlacementContext placementContext) throws PlacementException {
Set<Node> nodes = request.getTargetNodes();
SolrCollection solrCollection = request.getCollection();
nodes = filterNodesWithCollection(placementContext.getCluster(), request, nodes);
// Request all needed attributes
AttributeFetcher attributeFetcher = placementContext.getAttributeFetcher();
attributeFetcher.requestNodeSystemProperty(AVAILABILITY_ZONE_SYSPROP).requestNodeSystemProperty(REPLICA_TYPE_SYSPROP);
attributeFetcher
.requestNodeMetric(NodeMetricImpl.NUM_CORES)
.requestNodeMetric(NodeMetricImpl.FREE_DISK_GB);
attributeFetcher.fetchFrom(nodes);
final AttributeValues attrValues = attributeFetcher.fetchAttributes();
// Split the set of nodes into 3 sets of nodes accepting each replica type (sets can overlap if nodes accept multiple replica types)
// These subsets sets are actually maps, because we capture the number of cores (of any replica type) present on each node.
// Also get the number of currently existing cores per node, so we can keep update as we place new cores to not end up
// always selecting the same node(s).
Pair<EnumMap<Replica.ReplicaType, Set<Node>>, Map<Node, Integer>> p = getNodesPerReplicaType(nodes, attrValues);
EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = p.first();
Map<Node, Integer> coresOnNodes = p.second();
// All available zones of live nodes. Due to some nodes not being candidates for placement, and some existing replicas
// being one availability zones that might be offline (i.e. their nodes are not live), this set might contain zones
// on which it is impossible to place replicas. That's ok.
Set<String> availabilityZones = getZonesFromNodes(nodes, attrValues);
// Build the replica placement decisions here
Set<ReplicaPlacement> replicaPlacements = new HashSet<>();
// Let's now iterate on all shards to create replicas for and start finding home sweet homes for the replicas
for (String shardName : request.getShardNames()) {
// Inventory nodes (if any) that already have a replica of any type for the shard, because we can't be placing
// additional replicas on these. This data structure is updated after each replica to node assign and is used to
// make sure different replica types are not allocated to the same nodes (protecting same node assignments within
// a given replica type is done "by construction" in makePlacementDecisions()).
Set<Node> nodesWithReplicas = new HashSet<>();
Shard shard = solrCollection.getShard(shardName);
if (shard != null) {
for (Replica r : shard.replicas()) {
nodesWithReplicas.add(r.getNode());
}
}
// Iterate on the replica types in the enum order. We place more strategic replicas first
// (NRT is more strategic than TLOG more strategic than PULL). This is in case we eventually decide that less
// strategic replica placement impossibility is not a problem that should lead to replica placement computation
// failure. Current code does fail if placement is impossible (constraint is at most one replica of a shard on any node).
for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
makePlacementDecisions(solrCollection, shardName, availabilityZones, replicaType, request.getCountReplicasToCreate(replicaType),
attrValues, replicaTypeToNodes, nodesWithReplicas, coresOnNodes, placementContext.getPlacementPlanFactory(), replicaPlacements);
}
}
return placementContext.getPlacementPlanFactory().createPlacementPlan(request, replicaPlacements);
}
@Override
public void verifyAllowedModification(ModificationRequest modificationRequest, PlacementContext placementContext) throws PlacementModificationException, InterruptedException {
if (modificationRequest instanceof DeleteShardsRequest) {
log.warn("DeleteShardsRequest not implemented yet, skipping: {}", modificationRequest);
} else if (modificationRequest instanceof DeleteCollectionRequest) {
verifyDeleteCollection((DeleteCollectionRequest) modificationRequest, placementContext);
} else if (modificationRequest instanceof DeleteReplicasRequest) {
verifyDeleteReplicas((DeleteReplicasRequest) modificationRequest, placementContext);
} else {
log.warn("unsupported request type, skipping: {}", modificationRequest);
}
}
private void verifyDeleteCollection(DeleteCollectionRequest deleteCollectionRequest, PlacementContext placementContext) throws PlacementModificationException, InterruptedException {
Cluster cluster = placementContext.getCluster();
Set<String> colocatedWith = new HashSet<>();
for (SolrCollection collection : cluster.collections()) {
String withCollection = collection.getCustomProperty(AffinityPlacementConfig.WITH_COLLECTION_PROPERTY);
if (withCollection != null && withCollection.equals(deleteCollectionRequest.getCollection().getName())) {
colocatedWith.add(collection.getName());
}
}
if (!colocatedWith.isEmpty()) {
// still exists
throw new PlacementModificationException("primary colocated collections: " + colocatedWith +
" of the secondary collection " + deleteCollectionRequest.getCollection().getName() + " still present!");
}
}
private void verifyDeleteReplicas(DeleteReplicasRequest deleteReplicasRequest, PlacementContext placementContext) throws PlacementModificationException, InterruptedException {
Cluster cluster = placementContext.getCluster();
SolrCollection secondaryCollection = deleteReplicasRequest.getCollection();
Set<String> colocatedCollections = new HashSet<>();
for (SolrCollection collection : cluster.collections()) {
String withCollection = collection.getCustomProperty(AffinityPlacementConfig.WITH_COLLECTION_PROPERTY);
if (withCollection != null && withCollection.equals(deleteReplicasRequest.getCollection().getName())) {
colocatedCollections.add(collection.getName());
}
}
if (colocatedCollections.isEmpty()) {
return;
}
Map<Node, Map<String, AtomicInteger>> secondaryNodeShardReplicas = new HashMap<>();
secondaryCollection.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
secondaryNodeShardReplicas.computeIfAbsent(replica.getNode(), n -> new HashMap<>())
.computeIfAbsent(replica.getShard().getShardName(), s -> new AtomicInteger())
.incrementAndGet();
}));
// find the colocated-with collections
Map<Node, Set<String>> colocatingNodes = new HashMap<>();
try {
for (String colocatedCollection : colocatedCollections) {
SolrCollection coll = cluster.getCollection(colocatedCollection);
coll.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
colocatingNodes.computeIfAbsent(replica.getNode(), n -> new HashSet<>())
.add(coll.getName());
}));
}
} catch (IOException ioe) {
throw new PlacementModificationException("failed to retrieve colocated collection information", ioe);
}
PlacementModificationException exception = null;
for (Replica replica : deleteReplicasRequest.getReplicas()) {
if (!colocatingNodes.containsKey(replica.getNode())) {
continue;
}
// check that there will be at least one replica remaining
AtomicInteger secondaryCount = secondaryNodeShardReplicas
.getOrDefault(replica.getNode(), Map.of())
.getOrDefault(replica.getShard().getShardName(), new AtomicInteger());
if (secondaryCount.get() > 1) {
// we can delete it - record the deletion
secondaryCount.decrementAndGet();
continue;
}
// fail - this replica cannot be removed
if (exception == null) {
exception = new PlacementModificationException("delete replica(s) rejected");
}
exception.addRejectedModification(replica.toString(), "co-located with replicas of " + colocatingNodes.get(replica.getNode()));
}
if (exception != null) {
throw exception;
}
}
private Set<String> getZonesFromNodes(Set<Node> nodes, final AttributeValues attrValues) {
Set<String> azs = new HashSet<>();
for (Node n : nodes) {
azs.add(getNodeAZ(n, attrValues));
}
return Collections.unmodifiableSet(azs);
}
/**
* Resolves the AZ of a node and takes care of nodes that have no defined AZ in system property {@link #AVAILABILITY_ZONE_SYSPROP}
* to then return {@link #UNDEFINED_AVAILABILITY_ZONE} as the AZ name.
*/
private String getNodeAZ(Node n, final AttributeValues attrValues) {
Optional<String> nodeAz = attrValues.getSystemProperty(n, AVAILABILITY_ZONE_SYSPROP);
// All nodes with undefined AZ will be considered part of the same AZ. This also works for deployments that do not care about AZ's
return nodeAz.orElse(UNDEFINED_AVAILABILITY_ZONE);
}
/**
* This class captures an availability zone and the nodes that are legitimate targets for replica placement in that
* Availability Zone. Instances are used as values in a {@link java.util.TreeMap} in which the total number of already
* existing replicas in the AZ is the key. This allows easily picking the set of nodes from which to select a node for
* placement in order to balance the number of replicas per AZ. Picking one of the nodes from the set is done using
* different criteria unrelated to the Availability Zone (picking the node is based on the {@link CoresAndDiskComparator}
* ordering).
*/
private static class AzWithNodes {
final String azName;
List<Node> availableNodesForPlacement;
boolean hasBeenSorted;
AzWithNodes(String azName, List<Node> availableNodesForPlacement) {
this.azName = azName;
this.availableNodesForPlacement = availableNodesForPlacement;
// Once the list is sorted to an order we're happy with, this flag is set to true to avoid sorting multiple times
// unnecessarily.
this.hasBeenSorted = false;
}
}
/**
* Given the set of all nodes on which to do placement and fetched attributes, builds the sets representing
* candidate nodes for placement of replicas of each replica type.
* These sets are packaged and returned in an EnumMap keyed by replica type (1st member of the Pair).
* Also builds the number of existing cores on each node present in the returned EnumMap (2nd member of the returned Pair).
* Nodes for which the number of cores is not available for whatever reason are excluded from acceptable candidate nodes
* as it would not be possible to make any meaningful placement decisions.
*
* @param nodes all nodes on which this plugin should compute placement
* @param attrValues attributes fetched for the nodes. This method uses system property {@link #REPLICA_TYPE_SYSPROP} as
* well as the number of cores on each node.
*/
private Pair<EnumMap<Replica.ReplicaType, Set<Node>>, Map<Node, Integer>> getNodesPerReplicaType(Set<Node> nodes, final AttributeValues attrValues) {
EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes = new EnumMap<>(Replica.ReplicaType.class);
Map<Node, Integer> coresOnNodes = new HashMap<>();
for (Replica.ReplicaType replicaType : Replica.ReplicaType.values()) {
replicaTypeToNodes.put(replicaType, new HashSet<>());
}
for (Node node : nodes) {
// Exclude nodes with unknown or too small disk free space
if (attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("Unknown free disk on node {}, excluding it from placement decisions.", node.getName());
}
// We rely later on the fact that the free disk optional is present (see CoresAndDiskComparator), be careful it you change anything here.
continue;
}
if (attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).get() < minimalFreeDiskGB) {
if (log.isWarnEnabled()) {
log.warn("Node {} free disk ({}GB) lower than configured minimum {}GB, excluding it from placement decisions.", node.getName(), attrValues.getNodeMetric(node, NodeMetricImpl.FREE_DISK_GB).get(), minimalFreeDiskGB);
}
continue;
}
if (attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("Unknown number of cores on node {}, excluding it from placement decisions.", node.getName());
}
// We rely later on the fact that the number of cores optional is present (see CoresAndDiskComparator), be careful it you change anything here.
continue;
}
Integer coresCount = attrValues.getNodeMetric(node, NodeMetricImpl.NUM_CORES).get();
coresOnNodes.put(node, coresCount);
String supportedReplicaTypes = attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).isPresent() ? attrValues.getSystemProperty(node, REPLICA_TYPE_SYSPROP).get() : null;
// If property not defined or is only whitespace on a node, assuming node can take any replica type
if (supportedReplicaTypes == null || supportedReplicaTypes.isBlank()) {
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
replicaTypeToNodes.get(rt).add(node);
}
} else {
Set<String> acceptedTypes = Arrays.stream(supportedReplicaTypes.split(",")).map(String::trim).map(s -> s.toLowerCase(Locale.ROOT)).collect(Collectors.toSet());
for (Replica.ReplicaType rt : Replica.ReplicaType.values()) {
if (acceptedTypes.contains(rt.name().toLowerCase(Locale.ROOT))) {
replicaTypeToNodes.get(rt).add(node);
}
}
}
}
return new Pair<>(replicaTypeToNodes, coresOnNodes);
}
/**
* <p>Picks nodes from {@code targetNodes} for placing {@code numReplicas} replicas.
*
* <p>The criteria used in this method are, in this order:
* <ol>
* <li>No more than one replica of a given shard on a given node (strictly enforced)</li>
* <li>Balance as much as possible replicas of a given {@link org.apache.solr.cluster.Replica.ReplicaType} over available AZ's.
* This balancing takes into account existing replicas <b>of the corresponding replica type</b>, if any.</li>
* <li>Place replicas if possible on nodes having more than a certain amount of free disk space (note that nodes with a too small
* amount of free disk space were eliminated as placement targets earlier, in {@link #getNodesPerReplicaType}). There's
* a threshold here rather than sorting on the amount of free disk space, because sorting on that value would in
* practice lead to never considering the number of cores on a node.</li>
* <li>Place replicas on nodes having a smaller number of cores (the number of cores considered
* for this decision includes previous placement decisions made during the processing of the placement request)</li>
* </ol>
*/
@SuppressForbidden(reason = "Ordering.arbitrary() has no equivalent in Comparator class. Rather reuse than copy.")
private void makePlacementDecisions(SolrCollection solrCollection, String shardName, Set<String> availabilityZones,
Replica.ReplicaType replicaType, int numReplicas, final AttributeValues attrValues,
EnumMap<Replica.ReplicaType, Set<Node>> replicaTypeToNodes, Set<Node> nodesWithReplicas,
Map<Node, Integer> coresOnNodes, PlacementPlanFactory placementPlanFactory,
Set<ReplicaPlacement> replicaPlacements) throws PlacementException {
// Count existing replicas per AZ. We count only instances of the type of replica for which we need to do placement.
// If we ever want to balance replicas of any type across AZ's (and not each replica type balanced independently),
// we'd have to move this data structure to the caller of this method so it can be reused across different replica
// type placements for a given shard. Note then that this change would be risky. For example all NRT's and PULL
// replicas for a shard my be correctly balanced over three AZ's, but then all NRT can end up in the same AZ...
Map<String, Integer> azToNumReplicas = new HashMap<>();
for (String az : availabilityZones) {
azToNumReplicas.put(az, 0);
}
// Build the set of candidate nodes for the placement, i.e. nodes that can accept the replica type
Set<Node> candidateNodes = new HashSet<>(replicaTypeToNodes.get(replicaType));
// Remove nodes that already have a replica for the shard (no two replicas of same shard can be put on same node)
candidateNodes.removeAll(nodesWithReplicas);
Shard shard = solrCollection.getShard(shardName);
if (shard != null) {
// shard is non null if we're adding replicas to an already existing collection.
// If we're creating the collection, the shards do not exist yet.
for (Replica replica : shard.replicas()) {
// The node's AZ is counted as having a replica if it has a replica of the same type as the one we need
// to place here.
if (replica.getType() == replicaType) {
final String az = getNodeAZ(replica.getNode(), attrValues);
if (azToNumReplicas.containsKey(az)) {
// We do not count replicas on AZ's for which we don't have any node to place on because it would not help
// the placement decision. If we did want to do that, note the dereferencing below can't be assumed as the
// entry will not exist in the map.
azToNumReplicas.put(az, azToNumReplicas.get(az) + 1);
}
}
}
}
// We now have the set of real candidate nodes, we've enforced "No more than one replica of a given shard on a given node".
// We also counted for the shard and replica type under consideration how many replicas were per AZ, so we can place
// (or try to place) replicas on AZ's that have fewer replicas
// Get the candidate nodes per AZ in order to build (further down) a mapping of AZ to placement candidates.
Map<String, List<Node>> nodesPerAz = new HashMap<>();
for (Node node : candidateNodes) {
String nodeAz = getNodeAZ(node, attrValues);
List<Node> nodesForAz = nodesPerAz.computeIfAbsent(nodeAz, k -> new ArrayList<>());
nodesForAz.add(node);
}
// Build a treeMap sorted by the number of replicas per AZ and including candidates nodes suitable for placement on the
// AZ, so we can easily select the next AZ to get a replica assignment and quickly (constant time) decide if placement
// on this AZ is possible or not.
TreeMultimap<Integer, AzWithNodes> azByExistingReplicas = TreeMultimap.create(Comparator.naturalOrder(), Ordering.arbitrary());
for (Map.Entry<String, List<Node>> e : nodesPerAz.entrySet()) {
azByExistingReplicas.put(azToNumReplicas.get(e.getKey()), new AzWithNodes(e.getKey(), e.getValue()));
}
CoresAndDiskComparator coresAndDiskComparator = new CoresAndDiskComparator(attrValues, coresOnNodes, prioritizedFreeDiskGB);
for (int i = 0; i < numReplicas; i++) {
// We have for each AZ on which we might have a chance of placing a replica, the list of candidate nodes for replicas
// (candidate: does not already have a replica of this shard and is in the corresponding AZ).
// Among the AZ's with the minimal number of replicas of the given replica type for the shard, we must pick the AZ that
// offers the best placement (based on number of cores and free disk space). In order to do so, for these "minimal" AZ's
// we sort the nodes from best to worst placement candidate (based on the number of cores and free disk space) then pick
// the AZ that has the best best node. We don't sort all AZ's because that will not necessarily be needed.
int minNumberOfReplicasPerAz = 0; // This value never observed but compiler can't tell
Set<Map.Entry<Integer, AzWithNodes>> candidateAzEntries = null;
// Iterate over AZ's (in the order of increasing number of replicas on that AZ) and do two things: 1. remove those AZ's that
// have no nodes, no use iterating over these again and again (as we compute placement for more replicas), and 2. collect
// all those AZ with a minimal number of replicas.
for (Iterator<Map.Entry<Integer, AzWithNodes>> it = azByExistingReplicas.entries().iterator(); it.hasNext(); ) {
Map.Entry<Integer, AzWithNodes> entry = it.next();
int numberOfNodes = entry.getValue().availableNodesForPlacement.size();
if (numberOfNodes == 0) {
it.remove();
} else { // AZ does have node(s) for placement
if (candidateAzEntries == null) {
// First AZ with nodes that can take the replica. Initialize tracking structures
minNumberOfReplicasPerAz = numberOfNodes;
candidateAzEntries = new HashSet<>();
}
if (minNumberOfReplicasPerAz != numberOfNodes) {
// AZ's with more replicas than the minimum number seen are not placement candidates
break;
}
candidateAzEntries.add(entry);
// We remove all entries that are candidates: the "winner" will be modified, all entries might also be sorted,
// so we'll insert back the updated versions later.
it.remove();
}
}
if (candidateAzEntries == null) {
// This can happen because not enough nodes for the placement request or already too many nodes with replicas of
// the shard that can't accept new replicas or not enough nodes with enough free disk space.
throw new PlacementException("Not enough eligible nodes to place " + numReplicas + " replica(s) of type " + replicaType +
" for shard " + shardName + " of collection " + solrCollection.getName());
}
// Iterate over all candidate AZ's, sort them if needed and find the best one to use for this placement
Map.Entry<Integer, AzWithNodes> selectedAz = null;
Node selectedAzBestNode = null;
for (Map.Entry<Integer, AzWithNodes> candidateAzEntry : candidateAzEntries) {
AzWithNodes azWithNodes = candidateAzEntry.getValue();
List<Node> nodes = azWithNodes.availableNodesForPlacement;
if (!azWithNodes.hasBeenSorted) {
// Make sure we do not tend to use always the same nodes (within an AZ) if all conditions are identical (well, this
// likely is not the case since after having added a replica to a node its number of cores increases for the next
// placement decision, but let's be defensive here, given that multiple concurrent placement decisions might see
// the same initial cluster state, and we want placement to be reasonable even in that case without creating an
// unnecessary imbalance).
// For example, if all nodes have 0 cores and same amount of free disk space, ideally we want to pick a random node
// for placement, not always the same one due to some internal ordering.
Collections.shuffle(nodes, replicaPlacementRandom);
// Sort by increasing number of cores but pushing nodes with low free disk space to the end of the list
nodes.sort(coresAndDiskComparator);
azWithNodes.hasBeenSorted = true;
}
// Which one is better, the new one or the previous best?
if (selectedAz == null || coresAndDiskComparator.compare(nodes.get(0), selectedAzBestNode) < 0) {
selectedAz = candidateAzEntry;
selectedAzBestNode = nodes.get(0);
}
}
// Now actually remove the selected node from the winning AZ
AzWithNodes azWithNodes = selectedAz.getValue();
List<Node> nodes = selectedAz.getValue().availableNodesForPlacement;
Node assignTarget = nodes.remove(0);
// Insert back all the qualifying but non winning AZ's removed while searching for the one
for (Map.Entry<Integer, AzWithNodes> removedAzs : candidateAzEntries) {
if (removedAzs != selectedAz) {
azByExistingReplicas.put(removedAzs.getKey(), removedAzs.getValue());
}
}
// Insert back a corrected entry for the winning AZ: one more replica living there and one less node that can accept new replicas
// (the remaining candidate node list might be empty, in which case it will be cleaned up on the next iteration).
azByExistingReplicas.put(selectedAz.getKey() + 1, azWithNodes);
// Do not assign that node again for replicas of other replica type for this shard
// (this update of the set is not useful in the current execution of this method but for following ones only)
nodesWithReplicas.add(assignTarget);
// Track that the node has one more core. These values are only used during the current run of the plugin.
coresOnNodes.merge(assignTarget, 1, Integer::sum);
// Register the replica assignment just decided
replicaPlacements.add(placementPlanFactory.createReplicaPlacement(solrCollection, shardName, assignTarget, replicaType));
}
}
private Set<Node> filterNodesWithCollection(Cluster cluster, PlacementRequest request, Set<Node> initialNodes) throws PlacementException {
// if there's a `withCollection` constraint for this collection then remove nodes
// that are not eligible
String withCollectionName = request.getCollection().getCustomProperty(AffinityPlacementConfig.WITH_COLLECTION_PROPERTY);
if (withCollectionName == null) {
return initialNodes;
}
SolrCollection withCollection;
try {
withCollection = cluster.getCollection(withCollectionName);
} catch (Exception e) {
throw new PlacementException("Error getting info of withCollection=" + withCollectionName, e);
}
Set<Node> withCollectionNodes = new HashSet<>();
withCollection.shards().forEach(s -> s.replicas().forEach(r -> withCollectionNodes.add(r.getNode())));
if (withCollectionNodes.isEmpty()) {
throw new PlacementException("Collection " + withCollection + " defined in `withCollection` has no replicas on eligible nodes.");
}
HashSet<Node> filteredNodes = new HashSet<>(initialNodes);
filteredNodes.retainAll(withCollectionNodes);
if (filteredNodes.isEmpty()) {
throw new PlacementException("Collection " + withCollection + " defined in `withCollection` has no replicas on eligible nodes.");
}
return filteredNodes;
}
/**
* Comparator implementing the placement strategy based on free space and number of cores: we want to place new replicas
* on nodes with the less number of cores, but only if they do have enough disk space (expressed as a threshold value).
*/
static class CoresAndDiskComparator implements Comparator<Node> {
private final AttributeValues attrValues;
private final Map<Node, Integer> coresOnNodes;
private final long prioritizedFreeDiskGB;
/**
* The data we sort on is not part of the {@link Node} instances but has to be retrieved from the attributes and configuration.
* The number of cores per node is passed in a map whereas the free disk is fetched from the attributes due to the
* fact that we update the number of cores per node as we do allocations, but we do not update the free disk. The
* attrValues corresponding to the number of cores per node are the initial values, but we want to compare the actual
* value taking into account placement decisions already made during the current execution of the placement plugin.
*/
CoresAndDiskComparator(AttributeValues attrValues, Map<Node, Integer> coresOnNodes, long prioritizedFreeDiskGB) {
this.attrValues = attrValues;
this.coresOnNodes = coresOnNodes;
this.prioritizedFreeDiskGB = prioritizedFreeDiskGB;
}
@Override
public int compare(Node a, Node b) {
// Note all nodes do have free disk defined. This has been verified earlier.
boolean aHasLowFreeSpace = attrValues.getNodeMetric(a, NodeMetricImpl.FREE_DISK_GB).get() < prioritizedFreeDiskGB;
boolean bHasLowFreeSpace = attrValues.getNodeMetric(b, NodeMetricImpl.FREE_DISK_GB).get() < prioritizedFreeDiskGB;
if (aHasLowFreeSpace != bHasLowFreeSpace) {
// A node with low free space should be considered > node with high free space since it needs to come later in sort order
return Boolean.compare(aHasLowFreeSpace, bHasLowFreeSpace);
}
// The ordering on the number of cores is the natural order.
return Integer.compare(coresOnNodes.get(a), coresOnNodes.get(b));
}
}
}
}