blob: 877f0d515b6d33d3de6199b718e9fae897166da8 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cluster.placement.plugins;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.cluster.Cluster;
import org.apache.solr.cluster.Node;
import org.apache.solr.cluster.Replica;
import org.apache.solr.cluster.Shard;
import org.apache.solr.cluster.SolrCollection;
import org.apache.solr.cluster.placement.*;
import org.apache.solr.cluster.placement.Builders;
import org.apache.solr.cluster.placement.impl.ModificationRequestImpl;
import org.apache.solr.cluster.placement.impl.PlacementRequestImpl;
import org.apache.solr.common.util.Pair;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
/**
* Unit test for {@link AffinityPlacementFactory}
*/
public class AffinityPlacementFactoryTest extends SolrTestCaseJ4 {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static PlacementPlugin plugin;
private final static long MINIMAL_FREE_DISK_GB = 10L;
private final static long PRIORITIZED_FREE_DISK_GB = 50L;
private final static String secondaryCollectionName = "withCollection_secondary";
private final static String primaryCollectionName = "withCollection_primary";
@BeforeClass
public static void setupPlugin() {
AffinityPlacementConfig config = new AffinityPlacementConfig(
MINIMAL_FREE_DISK_GB,
PRIORITIZED_FREE_DISK_GB);
AffinityPlacementFactory factory = new AffinityPlacementFactory();
factory.configure(config);
plugin = factory.createPluginInstance();
}
@Test
public void testBasicPlacementNewCollection() throws Exception {
testBasicPlacementInternal(false);
}
@Test
public void testBasicPlacementExistingCollection() throws Exception {
testBasicPlacementInternal(true);
}
/**
* When this test places a replica for a new collection, it should pick the node with less cores.<p>
* <p>
* When it places a replica for an existing collection, it should pick the node with less cores that doesn't already have a replica for the shard.
*/
private void testBasicPlacementInternal(boolean hasExistingCollection) throws Exception {
String collectionName = "basicCollection";
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(2);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
nodeBuilders.get(0).setCoreCount(1).setFreeDiskGB((double)(PRIORITIZED_FREE_DISK_GB + 1));
nodeBuilders.get(1).setCoreCount(10).setFreeDiskGB((double)(PRIORITIZED_FREE_DISK_GB + 1));
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
if (hasExistingCollection) {
// Existing collection has replicas for its shards and is visible in the cluster state
collectionBuilder.initializeShardsReplicas(1, 1, 0, 0, nodeBuilders);
clusterBuilder.addCollection(collectionBuilder);
} else {
// New collection to create has the shards defined but no replicas and is not present in cluster state
collectionBuilder.initializeShardsReplicas(1, 0, 0, 0, List.of());
}
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// Place a new replica for the (only) existing shard of the collection
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
Set.of(solrCollection.shards().iterator().next().getShardName()), new HashSet<>(liveNodes),
1, 0, 0);
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
assertEquals(1, pp.getReplicaPlacements().size());
ReplicaPlacement rp = pp.getReplicaPlacements().iterator().next();
assertEquals(hasExistingCollection ? liveNodes.get(1) : liveNodes.get(0), rp.getNode());
}
/**
* Test not placing replicas on nodes low free disk unless no other option
*/
@Test
public void testLowSpaceNode() throws Exception {
String collectionName = "lowSpaceCollection";
final int LOW_SPACE_NODE_INDEX = 0;
final int NO_SPACE_NODE_INDEX = 1;
// Cluster nodes and their attributes
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(8);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
for (int i = 0; i < nodeBuilders.size(); i++) {
if (i == LOW_SPACE_NODE_INDEX) {
nodeBuilders.get(i).setCoreCount(1).setFreeDiskGB((double)(MINIMAL_FREE_DISK_GB + 1)); // Low space
} else if (i == NO_SPACE_NODE_INDEX) {
nodeBuilders.get(i).setCoreCount(10).setFreeDiskGB(1.0); // Really not enough space
} else {
nodeBuilders.get(i).setCoreCount(10).setFreeDiskGB((double)(PRIORITIZED_FREE_DISK_GB + 1));
}
}
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// The collection to create (shards are defined but no replicas)
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
collectionBuilder.initializeShardsReplicas(3, 0, 0, 0, List.of());
SolrCollection solrCollection = collectionBuilder.build();
// Place two replicas of each type for each shard
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
2, 2, 2);
PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
assertEquals(18, pp.getReplicaPlacements().size()); // 3 shards, 6 replicas total each
Set<Pair<String, Node>> placements = new HashSet<>();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
assertTrue("two replicas for same shard placed on same node", placements.add(new Pair<>(rp.getShardName(), rp.getNode())));
assertNotEquals("Replica unnecessarily placed on node with low free space", rp.getNode(), liveNodes.get(LOW_SPACE_NODE_INDEX));
assertNotEquals("Replica placed on node with not enough free space", rp.getNode(), liveNodes.get(NO_SPACE_NODE_INDEX));
}
// Verify that if we ask for 7 replicas, the placement will use the low free space node
placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
7, 0, 0);
pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
assertEquals(21, pp.getReplicaPlacements().size()); // 3 shards, 7 replicas each
placements = new HashSet<>();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
assertEquals("Only NRT replicas should be created", Replica.ReplicaType.NRT, rp.getReplicaType());
assertTrue("two replicas for same shard placed on same node", placements.add(new Pair<>(rp.getShardName(), rp.getNode())));
assertNotEquals("Replica placed on node with not enough free space", rp.getNode(), liveNodes.get(NO_SPACE_NODE_INDEX));
}
// Verify that if we ask for 8 replicas, the placement fails
try {
placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
8, 0, 0);
plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
fail("Placing 8 replicas should not be possible given only 7 nodes have enough space");
} catch (PlacementException e) {
// expected
}
}
/**
* Tests that existing collection replicas are taken into account when preventing more than one replica per shard to be
* placed on any node.
*/
@Test
public void testPlacementWithExistingReplicas() throws Exception {
String collectionName = "existingCollection";
// Cluster nodes and their attributes
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(5);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
int coresOnNode = 10;
for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
nodeBuilder.setCoreCount(coresOnNode).setFreeDiskGB((double)(PRIORITIZED_FREE_DISK_GB + 1));
coresOnNode += 10;
}
// The collection already exists with shards and replicas
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
// Note that the collection as defined below is in a state that would NOT be returned by the placement plugin:
// shard 1 has two replicas on node 0.
// The plugin should still be able to place additional replicas as long as they don't break the rules.
List<List<String>> shardsReplicas = List.of(
List.of("NRT 0", "TLOG 0", "NRT 3"), // shard 1
List.of("NRT 1", "NRT 3", "TLOG 2")); // shard 2
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// Place an additional NRT and an additional TLOG replica for each shard
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
1, 1, 0);
// The replicas must be placed on the most appropriate nodes, i.e. those that do not already have a replica for the
// shard and then on the node with the lowest number of cores.
// NRT are placed first and given the cluster state here the placement is deterministic (easier to test, only one good placement).
PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Each expected placement is represented as a string "shard replica-type node"
Set<String> expectedPlacements = Set.of("1 NRT 1", "1 TLOG 2", "2 NRT 0", "2 TLOG 4");
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
}
/**
* Tests placement with multiple criteria: Replica type restricted nodes, Availability zones + existing collection
*/
@Test
public void testPlacementMultiCriteria() throws Exception {
String collectionName = "multiCollection";
// Note node numbering is in purpose not following AZ structure
final int AZ1_NRT_LOWCORES = 0;
final int AZ1_NRT_HIGHCORES = 3;
final int AZ1_TLOGPULL_LOWFREEDISK = 5;
final int AZ2_NRT_MEDCORES = 2;
final int AZ2_NRT_HIGHCORES = 1;
final int AZ2_TLOGPULL = 7;
final int AZ3_NRT_LOWCORES = 4;
final int AZ3_NRT_HIGHCORES = 6;
final int AZ3_TLOGPULL = 8;
final String AZ1 = "AZ1";
final String AZ2 = "AZ2";
final String AZ3 = "AZ3";
final int LOW_CORES = 10;
final int MED_CORES = 50;
final int HIGH_CORES = 100;
final String TLOG_PULL_REPLICA_TYPE = "TLOG, PULL";
final String NRT_REPLICA_TYPE = "Nrt";
// Cluster nodes and their attributes.
// 3 AZ's with three nodes each, 2 of which can only take NRT, one that can take TLOG or PULL
// One of the NRT has less cores than the other
// The TLOG/PULL replica on AZ1 doesn't have much free disk space
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
for (int i = 0; i < 9; i++) {
final String az;
final int numcores;
final double freedisk;
final String acceptedReplicaType;
if (i == AZ1_NRT_LOWCORES || i == AZ1_NRT_HIGHCORES || i == AZ1_TLOGPULL_LOWFREEDISK) {
az = AZ1;
} else if (i == AZ2_NRT_HIGHCORES || i == AZ2_NRT_MEDCORES || i == AZ2_TLOGPULL) {
az = AZ2;
} else {
az = AZ3;
}
if (i == AZ1_NRT_LOWCORES || i == AZ3_NRT_LOWCORES) {
numcores = LOW_CORES;
} else if (i == AZ2_NRT_MEDCORES) {
numcores = MED_CORES;
} else {
numcores = HIGH_CORES;
}
if (i == AZ1_TLOGPULL_LOWFREEDISK) {
freedisk = PRIORITIZED_FREE_DISK_GB - 10;
} else {
freedisk = PRIORITIZED_FREE_DISK_GB + 10;
}
if (i == AZ1_TLOGPULL_LOWFREEDISK || i == AZ2_TLOGPULL || i == AZ3_TLOGPULL) {
acceptedReplicaType = TLOG_PULL_REPLICA_TYPE;
} else {
acceptedReplicaType = NRT_REPLICA_TYPE;
}
nodeBuilders.get(i).setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, az)
.setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, acceptedReplicaType)
.setCoreCount(numcores)
.setFreeDiskGB(freedisk);
}
// The collection already exists with shards and replicas.
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
List<List<String>> shardsReplicas = List.of(
List.of("NRT " + AZ1_NRT_HIGHCORES, "TLOG " + AZ3_TLOGPULL), // shard 1
List.of("TLOG " + AZ2_TLOGPULL)); // shard 2
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// Add 2 NRT and one TLOG to each shard.
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
2, 1, 0);
PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Shard 1: The NRT's should go to the med cores node on AZ2 and low core on az3 (even though
// a low core node can take the replica in az1, there's already an NRT replica there and we want spreading across AZ's),
// the TLOG to the TLOG node on AZ2 (because the tlog node on AZ1 has low free disk)
// Shard 2: The NRT's should go to AZ1 and AZ3 lowcores because AZ2 has more cores (and there's not NRT in any AZ for
// this shard). The TLOG should go to AZ3 because AZ1 TLOG node has low free disk.
// Each expected placement is represented as a string "shard replica-type node"
Set<String> expectedPlacements = Set.of("1 NRT " + AZ2_NRT_MEDCORES, "1 NRT " + AZ3_NRT_LOWCORES, "1 TLOG " + AZ2_TLOGPULL,
"2 NRT " + AZ1_NRT_LOWCORES, "2 NRT " + AZ3_NRT_LOWCORES, "2 TLOG " + AZ3_TLOGPULL);
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
// If we add instead 2 PULL replicas to each shard
placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
0, 0, 2);
pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Shard 1: Given node AZ3_TLOGPULL is taken by the TLOG replica, the PULL should go to AZ1_TLOGPULL_LOWFREEDISK and AZ2_TLOGPULL
// Shard 2: Similarly AZ2_TLOGPULL is taken. Replicas should go to AZ1_TLOGPULL_LOWFREEDISK and AZ3_TLOGPULL
expectedPlacements = Set.of("1 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "1 PULL " + AZ2_TLOGPULL,
"2 PULL " + AZ1_TLOGPULL_LOWFREEDISK, "2 PULL " + AZ3_TLOGPULL);
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
}
/**
* Tests placement for new collection with nodes with a varying number of cores over multiple AZ's
*/
@Test
public void testPlacementAzsCores() throws Exception {
String collectionName = "coresAzsCollection";
// Count cores == node index, and AZ's are: AZ0, AZ0, AZ0, AZ1, AZ1, AZ1, AZ2, AZ2, AZ2.
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(9);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
for (int i = 0; i < 9; i++) {
nodeBuilders.get(i).setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, "AZ" + (i / 3))
.setCoreCount(i)
.setFreeDiskGB((double)(PRIORITIZED_FREE_DISK_GB + 10));
}
// The collection does not exist, has 1 shard.
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
List<List<String>> shardsReplicas = List.of(List.of());
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// Test placing between 1 and 9 NRT replicas. check that it's done in order
List<Set<String>> placements = List.of(
Set.of("1 NRT 0"),
Set.of("1 NRT 0", "1 NRT 3"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7", "1 NRT 2"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7", "1 NRT 2", "1 NRT 5"),
Set.of("1 NRT 0", "1 NRT 3", "1 NRT 6", "1 NRT 1", "1 NRT 4", "1 NRT 7", "1 NRT 2", "1 NRT 5", "1 NRT 8"));
for (int countNrtToPlace = 1; countNrtToPlace <= 9; countNrtToPlace++) {
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(), new HashSet<>(liveNodes),
countNrtToPlace, 0, 0);
PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
verifyPlacements(placements.get(countNrtToPlace - 1), pp, collectionBuilder.getShardBuilders(), liveNodes);
}
}
/**
* Tests that if a collection has replicas on nodes not currently live, placement for new replicas works ok.
*/
@Test
public void testCollectionOnDeadNodes() throws Exception {
String collectionName = "walkingDead";
// Cluster nodes and their attributes
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(3);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
int coreCount = 0;
for (Builders.NodeBuilder nodeBuilder : nodeBuilders) {
nodeBuilder.setCoreCount(coreCount++).setFreeDiskGB((double)(PRIORITIZED_FREE_DISK_GB + 1));
}
// The collection already exists with shards and replicas
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
// The collection below has shard 1 having replicas only on dead nodes and shard 2 no replicas at all... (which is
// likely a challenging condition to recover from, but the placement computations should still execute happily).
List<List<String>> shardsReplicas = List.of(
List.of("NRT 10", "TLOG 11"), // shard 1
List.of()); // shard 2
collectionBuilder.customCollectionSetup(shardsReplicas, nodeBuilders);
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// Place an additional PULL replica for shard 1
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, Set.of(solrCollection.iterator().next().getShardName()), new HashSet<>(liveNodes),
0, 0, 1);
PlacementPlan pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
// Each expected placement is represented as a string "shard replica-type node"
// Node 0 has less cores than node 1 (0 vs 1) so the placement should go there.
Set<String> expectedPlacements = Set.of("1 PULL 0");
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
// If we placed instead a replica for shard 2 (starting with the same initial cluster state, not including the first
// placement above), it should go too to node 0 since it has less cores...
Iterator<Shard> it = solrCollection.iterator();
it.next(); // skip first shard to do placement for the second one...
placementRequest = new PlacementRequestImpl(solrCollection, Set.of(it.next().getShardName()), new HashSet<>(liveNodes),
0, 0, 1);
pp = plugin.computePlacement(placementRequest, clusterBuilder.buildPlacementContext());
expectedPlacements = Set.of("2 PULL 0");
verifyPlacements(expectedPlacements, pp, collectionBuilder.getShardBuilders(), liveNodes);
}
/**
* Verifies that a computed set of placements does match the expected placement on nodes.
* @param expectedPlacements a set of strings of the form {@code "1 NRT 3"} where 1 would be the shard index, NRT the
* replica type and 3 the node on which the replica is placed. Shards are 1-based. Nodes 0-based.<p>
* Read carefully: <b>shard index</b> and not shard name. Index in the <b>order</b> of shards as defined
* for the collection in the call to {@link org.apache.solr.cluster.placement.Builders.CollectionBuilder#customCollectionSetup(List, List)}
* @param shardBuilders the shard builders are passed here to get the shard names by index (1-based) rather than by
* parsing the shard names (which would break if we change the shard naming scheme).
*/
private static void verifyPlacements(Set<String> expectedPlacements, PlacementPlan placementPlan,
List<Builders.ShardBuilder> shardBuilders, List<Node> liveNodes) {
Set<ReplicaPlacement> computedPlacements = placementPlan.getReplicaPlacements();
// Prepare structures for looking up shard name index and node index
Map<String, Integer> shardNumbering = new HashMap<>();
int index = 1; // first shard is 1 not 0
for (Builders.ShardBuilder sb : shardBuilders) {
shardNumbering.put(sb.getShardName(), index++);
}
Map<Node, Integer> nodeNumbering = new HashMap<>();
index = 0;
for (Node n : liveNodes) {
nodeNumbering.put(n, index++);
}
if (expectedPlacements.size() != computedPlacements.size()) {
fail("Wrong number of placements, expected " + expectedPlacements.size() + " computed " + computedPlacements.size() + ". " +
getExpectedVsComputedPlacement(expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
}
Set<String> expected = new HashSet<>(expectedPlacements);
for (ReplicaPlacement p : computedPlacements) {
String lookUpPlacementResult = shardNumbering.get(p.getShardName()) + " " + p.getReplicaType().name() + " " + nodeNumbering.get(p.getNode());
if (!expected.remove(lookUpPlacementResult)) {
fail("Computed placement [" + lookUpPlacementResult + "] not expected. " +
getExpectedVsComputedPlacement(expectedPlacements, computedPlacements, shardNumbering, nodeNumbering));
}
}
}
private static String getExpectedVsComputedPlacement(Set<String> expectedPlacements, Set<ReplicaPlacement> computedPlacements,
Map<String, Integer> shardNumbering, Map<Node, Integer> nodeNumbering) {
StringBuilder sb = new StringBuilder("Expected placement: ");
for (String placement : expectedPlacements) {
sb.append("[").append(placement).append("] ");
}
sb.append("Computed placement: ");
for (ReplicaPlacement placement : computedPlacements) {
String lookUpPlacementResult = shardNumbering.get(placement.getShardName()) + " " + placement.getReplicaType().name() + " " + nodeNumbering.get(placement.getNode());
sb.append("[").append(lookUpPlacementResult).append("] ");
}
return sb.toString();
}
@Test
public void testAvailabilityZones() throws Exception {
String collectionName = "azCollection";
int NUM_NODES = 6;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
for (int i = 0; i < NUM_NODES; i++) {
Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
nodeBuilder.setCoreCount(0);
nodeBuilder.setFreeDiskGB(100.0);
if (i < NUM_NODES / 2) {
nodeBuilder.setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, "az1");
} else {
nodeBuilder.setSysprop(AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP, "az2");
}
}
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection solrCollection = cluster.getCollection(collectionName);
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
StreamSupport.stream(solrCollection.shards().spliterator(), false)
.map(Shard::getShardName).collect(Collectors.toSet()),
cluster.getLiveNodes(), 2, 2, 2);
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
// 2 shards, 6 replicas
assertEquals(12, pp.getReplicaPlacements().size());
// shard -> AZ -> replica count
Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>();
AttributeValues attributeValues = placementContext.getAttributeFetcher().fetchAttributes();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
Optional<String> azOptional = attributeValues.getSystemProperty(rp.getNode(), AffinityPlacementFactory.AVAILABILITY_ZONE_SYSPROP);
if (!azOptional.isPresent()) {
fail("missing AZ sysprop for node " + rp.getNode());
}
String az = azOptional.get();
replicas.computeIfAbsent(rp.getReplicaType(), type -> new HashMap<>())
.computeIfAbsent(rp.getShardName(), shard -> new HashMap<>())
.computeIfAbsent(az, zone -> new AtomicInteger()).incrementAndGet();
}
replicas.forEach((type, perTypeReplicas) -> {
perTypeReplicas.forEach((shard, azCounts) -> {
assertEquals("number of AZs", 2, azCounts.size());
azCounts.forEach((az, count) -> {
assertTrue("too few replicas shard=" + shard + ", type=" + type + ", az=" + az,
count.get() >= 1);
});
});
});
}
@Test
public void testReplicaType() throws Exception {
String collectionName = "replicaTypeCollection";
int NUM_NODES = 6;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
for (int i = 0; i < NUM_NODES; i++) {
Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
nodeBuilder.setCoreCount(0);
nodeBuilder.setFreeDiskGB(100.0);
if (i < NUM_NODES / 3 * 2) {
nodeBuilder.setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, "Nrt, TlOg");
nodeBuilder.setSysprop("group", "one");
} else {
nodeBuilder.setSysprop(AffinityPlacementFactory.REPLICA_TYPE_SYSPROP, "Pull,foobar");
nodeBuilder.setSysprop("group", "two");
}
}
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection solrCollection = cluster.getCollection(collectionName);
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
StreamSupport.stream(solrCollection.shards().spliterator(), false)
.map(Shard::getShardName).collect(Collectors.toSet()),
cluster.getLiveNodes(), 2, 2, 2);
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
// 2 shards, 6 replicas
assertEquals(12, pp.getReplicaPlacements().size());
// shard -> group -> replica count
Map<Replica.ReplicaType, Map<String, Map<String, AtomicInteger>>> replicas = new HashMap<>();
AttributeValues attributeValues = placementContext.getAttributeFetcher().fetchAttributes();
for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
Optional<String> groupOptional = attributeValues.getSystemProperty(rp.getNode(), "group");
if (!groupOptional.isPresent()) {
fail("missing group sysprop for node " + rp.getNode());
}
String group = groupOptional.get();
if (group.equals("one")) {
assertTrue("wrong replica type in group one",
(rp.getReplicaType() == Replica.ReplicaType.NRT) || rp.getReplicaType() == Replica.ReplicaType.TLOG);
} else {
assertEquals("wrong replica type in group two", Replica.ReplicaType.PULL, rp.getReplicaType());
}
replicas.computeIfAbsent(rp.getReplicaType(), type -> new HashMap<>())
.computeIfAbsent(rp.getShardName(), shard -> new HashMap<>())
.computeIfAbsent(group, g -> new AtomicInteger()).incrementAndGet();
}
replicas.forEach((type, perTypeReplicas) -> {
perTypeReplicas.forEach((shard, groupCounts) -> {
assertEquals("number of groups", 1, groupCounts.size());
groupCounts.forEach((group, count) -> {
assertTrue("too few replicas shard=" + shard + ", type=" + type + ", group=" + group,
count.get() >= 1);
});
});
});
}
@Test
public void testFreeDiskConstraints() throws Exception {
String collectionName = "freeDiskCollection";
int NUM_NODES = 3;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
Node smallNode = null;
for (int i = 0; i < NUM_NODES; i++) {
Builders.NodeBuilder nodeBuilder = clusterBuilder.getLiveNodeBuilders().get(i);
nodeBuilder.setCoreCount(0);
if (i == 0) {
// default minimalFreeDiskGB == 20
nodeBuilder.setFreeDiskGB(1.0);
smallNode = nodeBuilder.build();
} else {
nodeBuilder.setFreeDiskGB(100.0);
}
}
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
collectionBuilder.initializeShardsReplicas(2, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection solrCollection = cluster.getCollection(collectionName);
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection,
StreamSupport.stream(solrCollection.shards().spliterator(), false)
.map(Shard::getShardName).collect(Collectors.toSet()),
cluster.getLiveNodes(), 1, 0, 1);
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
assertEquals(4, pp.getReplicaPlacements().size());
for (ReplicaPlacement rp : pp.getReplicaPlacements()) {
assertFalse("should not put any replicas on " + smallNode, rp.getNode().equals(smallNode));
}
}
@Test
public void testWithCollectionPlacement() throws Exception {
int NUM_NODES = 3;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(secondaryCollectionName);
collectionBuilder.initializeShardsReplicas(1, 2, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
collectionBuilder = Builders.newCollectionBuilder(primaryCollectionName);
collectionBuilder.initializeShardsReplicas(0, 0, 0, 0, clusterBuilder.getLiveNodeBuilders());
collectionBuilder.addCustomProperty(AffinityPlacementConfig.WITH_COLLECTION_PROPERTY, secondaryCollectionName);
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection secondaryCollection = cluster.getCollection(secondaryCollectionName);
SolrCollection primaryCollection = cluster.getCollection(primaryCollectionName);
Set<Node> secondaryNodes = new HashSet<>();
secondaryCollection.shards().forEach(s -> s.replicas().forEach(r -> secondaryNodes.add(r.getNode())));
PlacementRequestImpl placementRequest = new PlacementRequestImpl(primaryCollection,
Set.of("shard1", "shard2"), cluster.getLiveNodes(), 1, 0, 0);
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
assertEquals(2, pp.getReplicaPlacements().size());
// verify that all placements are on nodes with the secondary replica
pp.getReplicaPlacements().forEach(placement ->
assertTrue("placement node " + placement.getNode() + " not in secondary=" + secondaryNodes,
secondaryNodes.contains(placement.getNode())));
placementRequest = new PlacementRequestImpl(primaryCollection,
Set.of("shard1"), cluster.getLiveNodes(), 3, 0, 0);
try {
pp = plugin.computePlacement(placementRequest, placementContext);
fail("should generate 'Not enough eligible nodes' failure here");
} catch (PlacementException pe) {
assertTrue(pe.toString().contains("Not enough eligible nodes"));
}
}
@Test
public void testWithCollectionModificationRejected() throws Exception {
int NUM_NODES = 2;
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(NUM_NODES);
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(secondaryCollectionName);
collectionBuilder.initializeShardsReplicas(1, 4, 0, 0, clusterBuilder.getLiveNodeBuilders());
clusterBuilder.addCollection(collectionBuilder);
collectionBuilder = Builders.newCollectionBuilder(primaryCollectionName);
collectionBuilder.initializeShardsReplicas(2, 2, 0, 0, clusterBuilder.getLiveNodeBuilders());
collectionBuilder.addCustomProperty(AffinityPlacementConfig.WITH_COLLECTION_PROPERTY, secondaryCollectionName);
clusterBuilder.addCollection(collectionBuilder);
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
Cluster cluster = placementContext.getCluster();
SolrCollection secondaryCollection = cluster.getCollection(secondaryCollectionName);
SolrCollection primaryCollection = cluster.getCollection(primaryCollectionName);
Node node = cluster.getLiveNodes().iterator().next();
Set<Replica> secondaryReplicas = new HashSet<>();
secondaryCollection.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
if (secondaryReplicas.size() < 1 && replica.getNode().equals(node)) {
secondaryReplicas.add(replica);
}
}));
DeleteReplicasRequest deleteReplicasRequest = ModificationRequestImpl.createDeleteReplicasRequest(secondaryCollection, secondaryReplicas);
try {
plugin.verifyAllowedModification(deleteReplicasRequest, placementContext);
} catch (PlacementException pe) {
fail("should have succeeded: " + pe.toString());
}
secondaryCollection.shards().forEach(shard ->
shard.replicas().forEach(replica -> {
if (secondaryReplicas.size() < 2 && replica.getNode().equals(node)) {
secondaryReplicas.add(replica);
}
}));
deleteReplicasRequest = ModificationRequestImpl.createDeleteReplicasRequest(secondaryCollection, secondaryReplicas);
try {
plugin.verifyAllowedModification(deleteReplicasRequest, placementContext);
fail("should have failed: " + deleteReplicasRequest);
} catch (PlacementException pe) {
}
}
@Test @Slow
public void testScalability() throws Exception {
log.info("==== numNodes ====");
runTestScalability(1000, 100, 40, 40, 20);
runTestScalability(2000, 100, 40, 40, 20);
runTestScalability(5000, 100, 40, 40, 20);
runTestScalability(10000, 100, 40, 40, 20);
runTestScalability(20000, 100, 40, 40, 20);
log.info("==== numShards ====");
runTestScalability(5000, 100, 40, 40, 20);
runTestScalability(5000, 200, 40, 40, 20);
runTestScalability(5000, 500, 40, 40, 20);
runTestScalability(5000, 1000, 40, 40, 20);
runTestScalability(5000, 2000, 40, 40, 20);
log.info("==== numReplicas ====");
runTestScalability(5000, 100, 100, 0, 0);
runTestScalability(5000, 100, 200, 0, 0);
runTestScalability(5000, 100, 500, 0, 0);
runTestScalability(5000, 100, 1000, 0, 0);
runTestScalability(5000, 100, 2000, 0, 0);
}
private void runTestScalability(int numNodes, int numShards, int nrtReplicas, int tlogReplicas, int pullReplicas) throws Exception {
String collectionName = "scaleCollection";
Builders.ClusterBuilder clusterBuilder = Builders.newClusterBuilder().initializeLiveNodes(numNodes);
LinkedList<Builders.NodeBuilder> nodeBuilders = clusterBuilder.getLiveNodeBuilders();
for (int i = 0; i < numNodes; i++) {
nodeBuilders.get(i).setCoreCount(0).setFreeDiskGB((double) numNodes);
}
Builders.CollectionBuilder collectionBuilder = Builders.newCollectionBuilder(collectionName);
collectionBuilder.initializeShardsReplicas(numShards, 0, 0, 0, List.of());
PlacementContext placementContext = clusterBuilder.buildPlacementContext();
SolrCollection solrCollection = collectionBuilder.build();
List<Node> liveNodes = clusterBuilder.buildLiveNodes();
// Place replicas for all the shards of the (newly created since it has no replicas yet) collection
PlacementRequestImpl placementRequest = new PlacementRequestImpl(solrCollection, solrCollection.getShardNames(),
new HashSet<>(liveNodes), nrtReplicas, tlogReplicas, pullReplicas);
long start = System.nanoTime();
PlacementPlan pp = plugin.computePlacement(placementRequest, placementContext);
long end = System.nanoTime();
final int REPLICAS_PER_SHARD = nrtReplicas + tlogReplicas + pullReplicas;
final int TOTAL_REPLICAS = numShards * REPLICAS_PER_SHARD;
log.info("ComputePlacement: {} nodes, {} shards, {} total replicas, elapsed time {} ms.", numNodes, numShards, TOTAL_REPLICAS, TimeUnit.NANOSECONDS.toMillis(end - start)); //nowarn
assertEquals("incorrect number of calculated placements", TOTAL_REPLICAS,
pp.getReplicaPlacements().size());
// check that replicas are correctly placed
Map<Node, AtomicInteger> replicasPerNode = new HashMap<>();
Map<Node, Set<String>> shardsPerNode = new HashMap<>();
Map<String, AtomicInteger> replicasPerShard = new HashMap<>();
Map<Replica.ReplicaType, AtomicInteger> replicasByType = new HashMap<>();
for (ReplicaPlacement placement : pp.getReplicaPlacements()) {
replicasPerNode.computeIfAbsent(placement.getNode(), n -> new AtomicInteger()).incrementAndGet();
shardsPerNode.computeIfAbsent(placement.getNode(), n -> new HashSet<>()).add(placement.getShardName());
replicasByType.computeIfAbsent(placement.getReplicaType(), t -> new AtomicInteger()).incrementAndGet();
replicasPerShard.computeIfAbsent(placement.getShardName(), s -> new AtomicInteger()).incrementAndGet();
}
int perNode = TOTAL_REPLICAS > numNodes ? TOTAL_REPLICAS / numNodes : 1;
replicasPerNode.forEach((node, count) -> {
assertEquals(count.get(), perNode);
});
shardsPerNode.forEach((node, names) -> {
assertEquals(names.size(), perNode);
});
replicasPerShard.forEach((shard, count) -> {
assertEquals(count.get(), REPLICAS_PER_SHARD);
});
}
}