blob: 9564f05de63f589aa805de3c40731f161ec425d2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.solr.SolrTestCaseJ4;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrResponse;
import org.apache.solr.client.solrj.V2RequestSupport;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.DistributedQueueFactory;
import org.apache.solr.client.solrj.cloud.NodeStateProvider;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.impl.SolrClientNodeStateProvider;
import org.apache.solr.client.solrj.request.CollectionAdminRequest;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CollectionParams.CollectionAction;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.JsonTextWriter;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.ObjectCache;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.SolrJSONWriter;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.apache.solr.common.util.ValidatingJsonMap;
import org.apache.solr.response.JSONWriter;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.solr.client.solrj.cloud.autoscaling.Policy.CLUSTER_PREFERENCES;
import static org.apache.solr.client.solrj.cloud.autoscaling.TestPolicy2.loadFromResource;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.CORES;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.FREEDISK;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.REPLICA;
import static org.apache.solr.common.cloud.ZkStateReader.CLUSTER_STATE;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
public class TestPolicy extends SolrTestCaseJ4 {
boolean useNodeset ;
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public TestPolicy(){
useNodeset = true;
}
@SuppressWarnings({"unchecked"})
static Suggester createSuggester(SolrCloudManager cloudManager,
@SuppressWarnings({"rawtypes"})Map jsonObj, Suggester seed) throws IOException, InterruptedException {
Policy.Session session = null;
if (seed != null) session = seed.session;
else {
session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
}
@SuppressWarnings({"rawtypes"})
Map m = (Map) jsonObj.get("suggester");
Suggester result = session.getSuggester(CollectionParams.CollectionAction.get((String) m.get("action")));
m = (Map) m.get("hints");
m.forEach((k, v) -> {
Hint hint = Hint.get(k.toString());
result.hint(hint, hint.parse(v));
});
return result;
}
static SolrCloudManager createCloudManager(@SuppressWarnings({"rawtypes"})Map jsonObj) {
return cloudManagerWithData(jsonObj);
}
public static String clusterState = Utils.toJSONString(loadFromResource("testPolicy.json"));
public static Map<String, Map<String, List<ReplicaInfo>>> getReplicaDetails(String node,
@SuppressWarnings({"rawtypes"})Map clusterState) {
ValidatingJsonMap m = ValidatingJsonMap
.getDeepCopy(clusterState, 6, true);
Map<String, Map<String, List<ReplicaInfo>>> result = new LinkedHashMap<>();
m.forEach((collName, o) -> {
ValidatingJsonMap coll = (ValidatingJsonMap) o;
coll.getMap("shards").forEach((shard, o1) -> {
ValidatingJsonMap sh = (ValidatingJsonMap) o1;
sh.getMap("replicas").forEach((replicaName, o2) -> {
ValidatingJsonMap r = (ValidatingJsonMap) o2;
String node_name = (String) r.get("node_name");
if (!node_name.equals(node)) return;
Map<String, List<ReplicaInfo>> shardVsReplicaStats = result.computeIfAbsent(collName, k -> new HashMap<>());
List<ReplicaInfo> replicaInfos = shardVsReplicaStats.computeIfAbsent(shard, k -> new ArrayList<>());
replicaInfos.add(new ReplicaInfo(replicaName, (String) r.get("core"), collName, shard,
Replica.Type.get((String) r.get(ZkStateReader.REPLICA_TYPE)), node, r));
});
});
});
return result;
}
public void testWithCollection() {
@SuppressWarnings({"unchecked"})
ClusterState clusterState = ClusterState.load(1,
(Map) loadFromResource("testWithCollection.json"),
ImmutableSet.of("node1:80_", "node2:80_", "node3:80_", "node4:80_", "node5:80_"), CLUSTER_STATE);
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
@SuppressWarnings({"rawtypes"})
Map m = solrClientNodeStateProvider.getNodeValues("node1:80_", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'minimize': 'cores'}," +
" { 'maximize': 'freedisk', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
Set<String> nodes = new HashSet<>(2);
nodes.add(op.getParams().get("node"));
session = suggester.getSession();
suggester = session.getSuggester(ADDREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
op = suggester.getSuggestion();
assertNotNull(op);
nodes.add(op.getParams().get("node"));
assertEquals(2, nodes.size());
assertTrue("node1:80_ should have been selected by add replica", nodes.contains("node1:80_"));
assertTrue("node2:80_ should have been selected by add replica", nodes.contains("node2:80_"));
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
op = suggester.getSuggestion();
assertNull(op);
}
public void testWithCollectionSuggestions() {
@SuppressWarnings({"unchecked"})
ClusterState clusterState =
ClusterState.load(1,
(Map) loadFromResource("testWithCollectionSuggestions.json"),
ImmutableSet.of("node1:80_", "node2:80_", "node3:80_", "node4:80_", "node5:80_"), CLUSTER_STATE);
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
@SuppressWarnings({"rawtypes"})
Map m = solrClientNodeStateProvider.getNodeValues("node1:80_", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores'}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
@SuppressWarnings({"unchecked"})
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies),
new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
assertNotNull(l);
assertEquals(2, l.size());
// collect the set of nodes to which replicas are being added
Set<String> nodes = new HashSet<>(2);
assertEquals(1.0d, l.get(0)._get("violation/violation/delta", null));
assertEquals("POST", l.get(0)._get("operation/method", null));
assertEquals("/c/articles_coll/shards", l.get(0)._get("operation/path", null));
assertNotNull(l.get(0)._get("operation/command/add-replica", null));
nodes.add((String) l.get(0)._get("operation/command/add-replica/node", null));
assertEquals(1.0d, l.get(1)._get("violation/violation/delta", null));
assertEquals("POST", l.get(1)._get("operation/method", null));
assertEquals("/c/articles_coll/shards", l.get(1)._get("operation/path", null));
assertNotNull(l.get(1)._get("operation/command/add-replica", null));
nodes.add((String) l.get(1)._get("operation/command/add-replica/node", null));
assertEquals(2, nodes.size());
assertTrue(nodes.contains("node1:80_"));
assertTrue(nodes.contains("node2:80_"));
}
public void testWithCollectionMoveVsAddSuggestions() throws IOException {
@SuppressWarnings({"unchecked"})
ClusterState clusterState = ClusterState.load(1,
(Map) loadFromResource("testWithCollectionMoveVsAddSuggestions.json"),
ImmutableSet.of("node1:80_", "node2:80_", "node3:80_", "node4:80_", "node5:80_", "node6:80_"),
CLUSTER_STATE
);
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
@SuppressWarnings({"rawtypes"})
Map m = solrClientNodeStateProvider.getNodeValues("node1:80_", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores'}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
@SuppressWarnings({"unchecked"})
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig(policies),
new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
assertNotNull(l);
assertEquals(3, l.size());
// collect the set of nodes to which replicas are being added
Set<String> nodes = new HashSet<>(2);
int numMoves = 0, numAdds = 0;
Set<String> addNodes = new HashSet<>();
Set<String> targetNodes = new HashSet<>();
Set<String> movedReplicas = new HashSet<>();
for (Suggester.SuggestionInfo suggestionInfo : l) {
assertEquals("POST", suggestionInfo._get("operation/method", null));
if (suggestionInfo._get("operation/command/add-replica", null) != null) {
numAdds++;
assertEquals(1.0d, suggestionInfo._get("violation/violation/delta", null));
assertEquals("/c/articles_coll/shards", suggestionInfo._get("operation/path", null));
addNodes.add((String) suggestionInfo._get("operation/command/add-replica/node", null));
} else if (suggestionInfo._get("operation/command/move-replica", null) != null) {
numMoves++;
assertEquals("/c/articles_coll", suggestionInfo._get("operation/path", null));
targetNodes.add((String) suggestionInfo._get("operation/command/move-replica/targetNode", null));
movedReplicas.add((String) suggestionInfo._get("operation/command/move-replica/replica", null));
} else {
fail("Unexpected operation type suggested for suggestion: " + suggestionInfo);
}
}
assertEquals(2, targetNodes.size());
assertEquals(1, addNodes.size());
assertEquals(2, movedReplicas.size());
Set<String> allTargetNodes = new HashSet<>(targetNodes);
allTargetNodes.addAll(addNodes);
assertEquals(3, allTargetNodes.size());
assertTrue(allTargetNodes.contains("node3:80_"));
assertTrue(allTargetNodes.contains("node4:80_"));
assertTrue(allTargetNodes.contains("node5:80_"));
}
public void testWithCollectionMoveReplica() {
@SuppressWarnings({"unchecked"})
ClusterState clusterState = ClusterState.load(1,
(Map) loadFromResource("testWithCollectionMoveReplica.json"),
ImmutableSet.of("node2:80_", "node3:80_", "node4:80_", "node5:80_"), CLUSTER_STATE);
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String solrNode, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
@SuppressWarnings({"rawtypes"})
Map m = solrClientNodeStateProvider.getNodeValues("node1:80_", ImmutableSet.of("cores", "withCollection"));
assertNotNull(m.get("withCollection"));
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'minimize': 'cores'}," +
" { 'maximize': 'freedisk', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
Suggester suggester = session.getSuggester(CollectionAction.MOVEREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
suggester.hint(Hint.SRC_NODE, "node1:80_");
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
assertEquals("node2:80_ should have been selected by move replica", "node2:80_",
op.getParams().get("targetNode"));
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA);
suggester.hint(Hint.COLL_SHARD, new Pair<>("comments_coll", "shard1"));
suggester.hint(Hint.SRC_NODE, "node1:80_");
op = suggester.getSuggestion();
assertNull(op);
}
public void testValidate() {
expectError("replica", -1, "must be greater than");
expectError("replica", "hello", "not a valid number");
assertEquals(1d, Clause.validate("replica", "1", true));
assertEquals("c", Clause.validate("collection", "c", true));
assertEquals("s", Clause.validate("shard", "s", true));
assertEquals("overseer", Clause.validate("nodeRole", "overseer", true));
expectError("nodeRole", "wrong", "must be one of");
expectError("sysLoadAvg", "101", "must be less than ");
expectError("sysLoadAvg", 101, "must be less than ");
expectError("sysLoadAvg", "-1", "must be greater than");
expectError("sysLoadAvg", -1, "must be greater than");
assertEquals(12.46d, Clause.validate("sysLoadAvg", "12.46", true));
assertEquals(12.46, Clause.validate("sysLoadAvg", 12.46d, true));
expectError("ip_1", "300", "must be less than ");
expectError("ip_1", 300, "must be less than ");
expectError("ip_1", "-1", "must be greater than");
expectError("ip_1", -1, "must be greater than");
assertEquals(1L, Clause.validate("ip_1", "1", true));
expectError("heapUsage", "-1", "must be greater than");
expectError("heapUsage", -1, "must be greater than");
assertEquals(69.9d, Clause.validate("heapUsage", "69.9", true));
assertEquals(69.9d, Clause.validate("heapUsage", 69.9d, true));
expectError("port", "70000", "must be less than ");
expectError("port", 70000, "must be less than ");
expectError("port", "0", "must be greater than");
expectError("port", 0, "must be greater than");
expectError("cores", "-1", "must be greater than");
assertEquals(Operand.EQUAL, REPLICA.getOperand(Operand.EQUAL, "2.0", null));
assertEquals(Operand.NOT_EQUAL, REPLICA.getOperand(Operand.NOT_EQUAL, "2.0", null));
assertEquals(Operand.EQUAL, REPLICA.getOperand(Operand.EQUAL, "2", null));
assertEquals(Operand.NOT_EQUAL, REPLICA.getOperand(Operand.NOT_EQUAL, "2", null));
assertEquals(Operand.RANGE_EQUAL, REPLICA.getOperand(Operand.EQUAL, "2.1", null));
assertEquals(Operand.RANGE_EQUAL, REPLICA.getOperand(Operand.EQUAL, "2.01", null));
Clause clause = Clause.create("{replica: '1.23', node:'#ANY'}");
assertTrue(clause.getReplica().isPass(2));
assertTrue(clause.getReplica().isPass(1));
assertFalse(clause.getReplica().isPass(0));
assertFalse(clause.getReplica().isPass(3));
clause = Clause.create("{replica: '<1.23', node:'#ANY'}");
assertTrue(clause.getReplica().isPass(1));
assertTrue(clause.getReplica().isPass(0));
assertFalse(clause.getReplica().isPass(2));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '!1.23', node:'#ANY'}"));
clause = Clause.create("{replica: 1.23, node:'#ANY'}");
assertTrue(clause.getReplica().isPass(2));
assertTrue(clause.getReplica().isPass(1));
assertFalse(clause.getReplica().isPass(0));
assertFalse(clause.getReplica().isPass(3));
clause = Clause.create("{replica: '33%', node:'#ANY'}");
assertEquals(Operand.RANGE_EQUAL, clause.getReplica().op);
clause = clause.getSealedClause(condition -> {
if (condition.name.equals("replica")) {
return 2.0d;
}
throw new RuntimeException("");
});
assertTrue(clause.getReplica().isPass(2));
clause = Clause.create("{replica: '3 - 5', node:'#ANY'}");
assertEquals(Operand.RANGE_EQUAL, clause.getReplica().getOperand());
RangeVal range = (RangeVal) clause.getReplica().getValue();
assertEquals(3.0, range.min);
assertEquals(5.0, range.max);
assertTrue(clause.replica.isPass(3));
assertTrue(clause.replica.isPass(4));
assertTrue(clause.replica.isPass(5));
assertFalse(clause.replica.isPass(6));
assertFalse(clause.replica.isPass(2));
assertEquals(Double.valueOf(1.0), clause.replica.delta(6));
assertEquals(Double.valueOf(-1.0), clause.replica.delta(2));
assertEquals(Double.valueOf(0.0), clause.replica.delta(4));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '-33%', node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 'x%', node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '20%-33%', node:'#ANY'}"));
clause = Clause.create("{replica: '#EQUAL', shard:'#EACH', node:'#ANY'}");
assertEquals(Operand.RANGE_EQUAL, clause.replica.op);
clause = Clause.create("{replica: '#EQUAL', node:'#ANY'}");
assertEquals(Operand.RANGE_EQUAL, clause.replica.op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#EQUAL', node:'node_1'}"));
clause = Clause.create("{replica : 0, freedisk:'<20%'}");
assertEquals(clause.tag.computedType, ComputedType.PERCENT);
assertEquals(clause.tag.op, Operand.LESS_THAN);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica : 0, INDEX.sizeInGB:'>300'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:'<3', shard: '#ANV', node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:'<3', shard: '#EACH', node:'#E4CH'}"));
try {
Clause.create("{replica:0, 'ip_1':'<30%'}");
fail("Expected exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains("'%' is not allowed for variable : 'ip_1'"));
}
clause = Clause.create("{replica: '#ALL', freedisk:'>20%'}");
clause = Clause.create("{replica: '#ALL', sysprop.zone :'west'}");
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: [3,4] , freedisk:'>20'}"));
clause = Clause.create("{replica: 3 , port:[8983, 7574]}");
assertEquals(Operand.IN, clause.tag.op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 3 , sysprop.zone :['east', ' ', 'west']}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 3 , sysprop.zone :[]}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: 3 , sysprop.zone :['!east','west']}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#ALL' , shard: '#EACH' , sysprop.zone:[east, west]}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#ALL' , shard: '#EACH' , sysprop.zone:'#EACH'}"));
clause = Clause.create("{replica: '#EQUAL' , shard: '#EACH' , sysprop.zone:[east, west]}");
assertEquals(ComputedType.EQUAL, clause.replica.computedType);
assertEquals(Operand.IN, clause.tag.op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '#EQUAL' , shard: '#EACH' , sysprop.zone:[east]}"));
clause = Clause.create("{cores: '#EQUAL' , node:'#ANY'}");
assertEquals(ComputedType.EQUAL, clause.globalTag.computedType);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{cores: '#EQUAL' , node:'node1:80_'}"));
clause = Clause.create("{cores: '#EQUAL' , node:['node1:80_' , 'node2:80_' , 'node3:80_']}");
assertEquals(Operand.IN, clause.getTag().op);
assertEquals(ComputedType.EQUAL, clause.getGlobalTag().computedType);
clause = Clause.create("{cores: '3-5' , node:'#ANY'}");
assertEquals(Operand.RANGE_EQUAL, clause.globalTag.op);
assertEquals(3.0d, ((RangeVal) clause.globalTag.val).min.doubleValue(), 0.001);
assertEquals(5.0d, ((RangeVal) clause.globalTag.val).max.doubleValue(), 0.001);
clause = Clause.create("{cores: 1.66 , node:'#ANY'}");
assertEquals(Operand.RANGE_EQUAL, clause.globalTag.op);
assertEquals(1.0d, ((RangeVal) clause.globalTag.val).min.doubleValue(), 0.001);
assertEquals(2.0d, ((RangeVal) clause.globalTag.val).max.doubleValue(), 0.001);
assertEquals(1.66d, ((RangeVal) clause.globalTag.val).actual.doubleValue(), 0.001);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{cores:5, sysprop.zone : west}"));
clause = Clause.create("{cores: '14%' , node:'#ANY'}");
assertEquals(ComputedType.PERCENT, clause.getGlobalTag().computedType);
clause = Clause.create("{cores: '14%' , node:['node1:80_', 'node2:80_', 'node3:80_']}");
assertEquals(Operand.IN, clause.getTag().op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '!14%' , node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{cores: '!14%' , node:['node1:80_', 'node2:80_', 'node3:80_']}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{cores: '!1.66' , node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '<14%' , node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica: '>14%' , node:'#ANY'}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{cores: '>14%' , node:'#ANY'}"));
clause = Clause.create("{replica:1, nodeset : {sysprop.zone : east}}");
assertEquals(Variable.Type.SYSPROP, clause.tag.varType);
clause =Clause.create("{replica:1, nodeset : ['node1:80_', 'node2:80_', 'node3:80_']}");
assertEquals(Variable.Type.NODE, clause.tag.varType);
assertEquals(Operand.IN, clause.tag.op);
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:1, nodeset : {sysprop.zone : '#EACH'}}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:1, nodeset : {host : '#EACH'}}"));
expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:1, node: n1, nodeset : {sysprop.zone : east}}"));
IllegalArgumentException exp = expectThrows(IllegalArgumentException.class,
() -> Clause.create("{replica:1, nodeset : {sysprop.zone : east , port: 8983 }}"));
assertTrue(exp.getMessage().contains("nodeset must only have one and only one key"));
clause = Clause.create("{'replica': '#ALL', 'nodeset': {'freedisk': '>700'}, 'strict': false}");
assertEquals(clause.put, Clause.Put.ON_ALL);
assertEquals(Operand.GREATER_THAN , clause.tag.op);
clause = Clause.create("{'replica': '#ALL', put: on-each-node, 'nodeset': {sysprop.zone : east}}");
assertEquals(clause.put, Clause.Put.ON_EACH);
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{'replica': '#ALL', put: on-Each, 'nodeset': {sysprop.zone : east}}"));
assertTrue(exp.getMessage().contains("invalid value for put : on-Each"));
clause = Clause.create("{replica : '#EQUAL', nodeset : [{sysprop.zone : east}, {sysprop.zone : west}]}");
assertTrue(((List)clause.tag.val).get(0) instanceof Condition);
assertTrue( Utils.fromJSONString(Utils.toJSONString(clause)) instanceof Map);
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : '#EQUAL', nodeset : [{sysprop.zone : east}, {port : '8983'}]}"));
assertTrue(exp.getMessage().contains("all element must have same key"));
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : '#EQUAL', nodeset : [{sysprop.zone : east}, {sysprop.zone : '#EACH'}]}"));
assertTrue(exp.getMessage().contains("computed value #EACH not allowed in nodeset"));
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : '#EQUAL', nodeset : {sysprop.zone : east}}"));
assertTrue(exp.getMessage().contains("'nodeset' must have an array value when 'replica': '#EQUAL` is used"));
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : '#ALL', nodeset : [{sysprop.zone : east}, {sysprop.zone : west}]}"));
assertTrue(exp.getMessage().contains("cannot use array value for nodeset if replica : '#EQUAL' is not used"));
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : '50%', nodeset : [{sysprop.zone : east}, {sysprop.zone : west}]}"));
assertTrue(exp.getMessage().contains("cannot use array value for nodeset if replica : '#EQUAL' is not used"));
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : 3, nodeset : [{sysprop.zone : east}, {sysprop.zone : west}]}"));
assertTrue(exp.getMessage().contains("cannot use array value for nodeset if replica : '#EQUAL' is not used"));
exp = expectThrows(IllegalArgumentException.class, ()-> Clause.create("{replica : '#EQUAL', put: on-each-node, nodeset : [{sysprop.zone : east}, {sysprop.zone : west}]}"));
assertTrue(exp.getMessage().contains("cannot use put: 'on-each-node' with an array value in nodeset "));
}
public void testEqualFunction() {
String clusterStateStr = "{" +
" 'coll1': {" +
" 'router': {" +
" 'name': 'compositeId'" +
" }," +
" 'shards': {" +
" 'shard1': {" +
" 'range': '80000000-ffffffff'," +
" 'replicas': {" +
" 'r1': {" +
" 'core': 'r1'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1:80_'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r2': {" +
" 'core': 'r2'," +
" 'base_url': 'http://10.0.0.4:7574/solr'," +
" 'node_name': 'node2:80_'," +
" 'state': 'active'" +
" }" +
" }" +
" }," +
" 'shard2': {" +
" 'range': '0-7fffffff'," +
" 'replicas': {" +
" 'r3': {" +
" 'core': 'r3'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1:80_'," +
" 'state': 'active'," +
" 'leader': 'true'" +
" }," +
" 'r4': {" +
" 'core': 'r4'," +
" 'base_url': 'http://10.0.0.4:8987/solr'," +
" 'node_name': 'node4:80_'," +
" 'state': 'active'" +
" }," +
" 'r6': {" +
" 'core': 'r6'," +
" 'base_url': 'http://10.0.0.4:8989/solr'," +
" 'node_name': 'node3:80_'," +
" 'state': 'active'" +
" }," +
" 'r5': {" +
" 'core': 'r5'," +
" 'base_url': 'http://10.0.0.4:8983/solr'," +
" 'node_name': 'node1:80_'," +
" 'state': 'active'" +
" }" +
" }" +
" }" +
" }" +
" }" +
"}";
@SuppressWarnings({"unchecked"})
ClusterState clusterState = ClusterState.load(1, clusterStateStr.getBytes(UTF_8),
ImmutableSet.of("node1:80_", "node2:80_", "node3:80_", "node4:80_", "node5:80_"));
DelegatingClusterStateProvider clusterStateProvider = new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
SolrClientNodeStateProvider solrClientNodeStateProvider = new SolrClientNodeStateProvider(null) {
@Override
protected Map<String, Object> fetchTagValues(String node, Collection<String> tags) {
Map<String, Object> result = new HashMap<>();
AtomicInteger cores = new AtomicInteger();
forEachReplica(node, replicaInfo -> cores.incrementAndGet());
if (tags.contains(ImplicitSnitch.CORES)) result.put(ImplicitSnitch.CORES, cores.get());
if (tags.contains(ImplicitSnitch.DISK)) result.put(ImplicitSnitch.DISK, 100);
return result;
}
@Override
protected Map<String, Object> fetchReplicaMetrics(String node, Map<String, Pair<String, ReplicaInfo>> metricsKeyVsTagReplica) {
//e.g: solr.core.perReplicaDataColl.shard1.replica_n4:INDEX.sizeInBytes
Map<String, Object> result = new HashMap<>();
metricsKeyVsTagReplica.forEach((k, v) -> {
if (k.endsWith(":INDEX.sizeInBytes")) result.put(k, 100);
});
return result;
}
@Override
protected ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
};
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': '#EQUAL', 'node': '#ANY'}," +
" ]" +
"}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return clusterStateProvider;
}
@Override
public NodeStateProvider getNodeStateProvider() {
return solrClientNodeStateProvider;
}
});
List<Violation> violations = session.getViolations();
assertEquals(2, violations.size());
for (Violation violation : violations) {
if (violation.node.equals("node1:80_")) {
RangeVal val = (RangeVal) violation.getClause().replica.val;
assertEquals(1.0d, val.min.doubleValue(), 0.01);
assertEquals(2.0, val.max.doubleValue(), 0.01);
assertEquals(1.2d, val.actual.doubleValue(), 0.01d);
assertEquals(1, violation.replicaCountDelta.doubleValue(), 0.01);
assertEquals(3, violation.getViolatingReplicas().size());
Set<String> expected = ImmutableSet.of("r1", "r3", "r5");
for (Violation.ReplicaInfoAndErr replicaInfoAndErr : violation.getViolatingReplicas()) {
assertTrue(expected.contains(replicaInfoAndErr.replicaInfo.getCore()));
}
} else if (violation.node.equals("node5:80_")) {
assertEquals(-1, violation.replicaCountDelta.doubleValue(), 0.01);
} else {
fail();
}
}
// Violation violation = violations.get(0);
// assertEquals("node1:80_", violation.node);
}
private static void expectError(String name, Object val, String msg) {
Exception e = expectThrows(Exception.class, () -> Clause.validate(name, val, true));
assertTrue("expected exception containing " + msg, e.getMessage().contains(msg));
}
public void testOperands() {
Clause c = Clause.create("{replica:'<2', node:'#ANY'}");
assertFalse(c.replica.isPass(3));
assertFalse(c.replica.isPass(2));
assertTrue(c.replica.isPass(1));
assertEquals("{\"replica\":\"<2.0\"}", c.replica.toString());
c = Clause.create("{replica:'>2', node:'#ANY'}");
assertTrue(c.replica.isPass(3));
assertFalse(c.replica.isPass(2));
assertFalse(c.replica.isPass(1));
assertEquals("{\"replica\":\">2.0\"}", c.replica.toString());
c = Clause.create("{replica:0, nodeRole:'!overseer'}");
assertTrue(c.tag.isPass("OVERSEER"));
assertFalse(c.tag.isPass("overseer"));
c = Clause.create("{replica:0, sysLoadAvg:'<12.7'}");
assertTrue(c.tag.isPass("12.6"));
assertTrue(c.tag.isPass(12.6d));
assertFalse(c.tag.isPass("12.9"));
assertFalse(c.tag.isPass(12.9d));
c = Clause.create("{replica:0, sysLoadAvg:'>12.7'}");
assertTrue(c.tag.isPass("12.8"));
assertTrue(c.tag.isPass(12.8d));
assertFalse(c.tag.isPass("12.6"));
assertFalse(c.tag.isPass(12.6d));
c = Clause.create("{replica:0, 'metrics:x:y:z':'>12.7'}");
assertTrue(c.tag.val instanceof String);
assertTrue(c.tag.isPass("12.8"));
assertTrue(c.tag.isPass(12.8d));
assertFalse(c.tag.isPass("12.6"));
assertFalse(c.tag.isPass(12.6d));
c = Clause.create("{replica: '<3', sysprop.zone : [east, west]}");
assertTrue(c.tag.isPass("east"));
assertTrue(c.tag.isPass("west"));
assertFalse(c.tag.isPass("south"));
}
public void testNodeLost() {
String dataproviderdata = " {'liveNodes':[" +
" '127.0.0.1:65417_solr'," +
" '127.0.0.1:65434_solr']," +
" 'replicaInfo':{" +
" '127.0.0.1:65427_solr':{'testNodeLost':{'shard1':[{'core_node2':{type: NRT}}]}}," +
" '127.0.0.1:65417_solr':{'testNodeLost':{'shard1':[{'core_node1':{type: NRT}}]}}," +
" '127.0.0.1:65434_solr':{}}," +
" 'nodeValues':{" +
" '127.0.0.1:65417_solr':{" +
" 'node':'127.0.0.1:65417_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" '127.0.0.1:65434_solr':{" +
" 'node':'127.0.0.1:65434_solr'," +
" 'cores':0," +
" 'freedisk':884.7097854614258}}}";
String autoScalingjson = "{" +
" 'cluster-policy':[" +
" {" +
" 'cores':'<10'," +
" 'node':'#ANY'}," +
" {" +
" 'replica':'<2'," +
" 'shard':'#EACH'," +
" 'node':'#ANY'}," +
" {" +
" 'nodeRole':'overseer'," +
" 'replica':0}]," +
" 'cluster-preferences':[" +
" {" +
" 'minimize':'cores'," +
" 'precision':3}," +
" {" +
" 'maximize':'freedisk'," +
" 'precision':100}]}";
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(cloudManagerWithData(dataproviderdata));
@SuppressWarnings({"rawtypes"})
SolrRequest op = session.getSuggester(MOVEREPLICA).hint(Hint.SRC_NODE, "127.0.0.1:65427_solr").getSuggestion();
assertNotNull(op);
assertEquals("127.0.0.1:65434_solr", op.getParams().get("targetNode"));
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testNodeLostMultipleReplica() {
String nodeValues = " {" +
" 'node4:80_':{" +
" 'node':'10.0.0.4:8987_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" 'node3:80_':{" +
" 'node':'10.0.0.4:8989_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" 'node2:80_':{" +
" 'node':'10.0.0.4:7574_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
"}";
SolrCloudManager provider = getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues), clusterState);
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
if(useNodeset) {
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, nodeset:{'nodeRole': 'overseer'}}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
}
AutoScalingConfig config = new AutoScalingConfig(policies);
Policy policy = config.getPolicy();
Policy.Session session = policy.createSession(provider);
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.SRC_NODE, "node1:80_");
SolrRequest operation = suggester.getSuggestion();
assertNotNull(operation);
assertEquals("node2:80_", operation.getParams().get("targetNode"));
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.SRC_NODE, "node1:80_");
operation = suggester.getSuggestion();
assertNotNull(operation);
assertEquals("node3:80_", operation.getParams().get("targetNode"));
session = suggester.getSession();
suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.SRC_NODE, "node1:80_");
operation = suggester.getSuggestion();
assertNull(operation);
// lets change the policy such that all replicas that were on node1:80_
// can now fit on node2:80_
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<3', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
if(useNodeset){
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, nodeset: {'nodeRole': 'overseer'}}" +
" { 'replica': '<3', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
}
config = new AutoScalingConfig(policies);
policy = config.getPolicy();
session = null;
for (String expectedReplica : new String[] { "r1", "r3", "r5", null }) {
if (session == null) {
session = policy.createSession(provider);
} else {
session = suggester.getSession();
}
suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.SRC_NODE, "node1:80_");
operation = suggester.getSuggestion();
if (expectedReplica == null) {
assertNull(operation);
} else {
assertNotNull(operation);
assertEquals("node2:80_", operation.getParams().get("targetNode"));
assertEquals(expectedReplica, operation.getParams().get("replica"));
}
}
// now lets change the policy such that a node can have 2 shard2 replicas
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': 'shard1', 'node': '#ANY'}," +
" { 'replica': '<3', 'shard': 'shard2', 'node': '#ANY'}," +
" ]" +
"}");
if(useNodeset){
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, nodeset:{'nodeRole': 'overseer'}}" +
" { 'replica': '<2', 'shard': 'shard1', 'node': '#ANY'}," +
" { 'replica': '<3', 'shard': 'shard2', 'node': '#ANY'}," +
" ]" +
"}");
}
config = new AutoScalingConfig(policies);
policy = config.getPolicy();
session = null;
final String[] expectedReplica = new String[] { "r1", "r3", "r5" };
final String[] expectedTargetNode = new String[] { "node3:80_", "node3:80_", "node2:80_" };
for (int ii = 0; ii < expectedReplica.length; ++ii) {
if (session == null) {
session = policy.createSession(provider);
} else {
session = suggester.getSession();
}
suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.SRC_NODE, "node1:80_");
operation = suggester.getSuggestion();
assertNotNull(operation);
assertEquals(expectedTargetNode[ii], operation.getParams().get("targetNode"));
assertEquals(expectedReplica[ii], operation.getParams().get("replica"));
}
}
private static SolrCloudManager cloudManagerWithData(String data) {
return cloudManagerWithData((Map) Utils.fromJSONString(data));
}
@SuppressWarnings({"unchecked", "rawtypes"})
static SolrCloudManager cloudManagerWithData(Map m) {
Map replicaInfo = (Map) m.get("replicaInfo");
replicaInfo.forEach((node, val) -> {
Map m1 = (Map) val;
m1.forEach((coll, val2) -> {
Map m2 = (Map) val2;
m2.forEach((shard, val3) -> {
List l3 = (List) val3;
for (int i = 0; i < l3.size(); i++) {
Object o = l3.get(i);
Map m3 = (Map) o;
String name = m3.keySet().iterator().next().toString();
m3 = (Map) m3.get(name);
Replica.Type type = Replica.Type.get((String) m3.get("type"));
l3.set(i, new ReplicaInfo(name, name
, coll.toString(), shard.toString(), type, (String) node, m3));
}
});
});
});
@SuppressWarnings({"unchecked"})
AutoScalingConfig asc = m.containsKey("autoscalingJson") ? new AutoScalingConfig((Map<String, Object>) m.get("autoscalingJson")) : null;
return new DelegatingCloudManager(null) {
@Override
public DistribStateManager getDistribStateManager() {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() {
return asc;
}
};
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() throws IOException {
return ClusterState.load(0, new HashMap<>(), getLiveNodes(), CLUSTER_STATE);
}
@Override
@SuppressWarnings({"unchecked"})
public Set<String> getLiveNodes() {
return new HashSet<>((Collection<String>) m.get("liveNodes"));
}
};
}
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
@SuppressWarnings({"unchecked"})
Map<String, Object> result = (Map<String, Object>) Utils.getObjectByPath(m, false, Arrays.asList("nodeValues", node));
return result == null ? new HashMap<>() : result;
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
@SuppressWarnings({"unchecked"})
Map<String, Map<String, List<ReplicaInfo>>> result = (Map<String, Map<String, List<ReplicaInfo>>>) Utils.getObjectByPath(m, false, Arrays.asList("replicaInfo", node));
return result == null ? new HashMap<>() : result;
}
};
}
};
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testPolicyWithReplicaType() {
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" { 'replica': 0, 'shard': '#EACH', sysprop.fs : '!ssd', type : TLOG }" +
" { 'replica': 0, 'shard': '#EACH', sysprop.fs : '!slowdisk' , type : PULL }" +
" ]" +
"}");
if(useNodeset){
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, nodeset : {'nodeRole': 'overseer'}}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" { 'replica': 0, 'shard': '#EACH', nodeset : { sysprop.fs : '!ssd'}, type : TLOG }" +
" { 'replica': 0, 'shard': '#EACH', put:'on-each-node' nodeset : {sysprop.fs : '!slowdisk'} , type : PULL }" +
" ]" +
"}");
}
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480, rack: rack4, sysprop.fs: slowdisk}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873, rack: rack3, sysprop.fs: unknown }," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834, rack: rack2, sysprop.fs : ssd}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1, sysprop.fs: unknown}" +
"}");
Policy policy = new Policy(policies);
Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard1"))
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
assertEquals("PULL type node must be in 'slowdisk' node", "node1:80_", op.getParams().get("node"));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.PULL);
op = suggester.getSuggestion();
assertNotNull(op);
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
assertEquals("PULL type node must be in 'slowdisk' node", "node1:80_", op.getParams().get("node"));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard1"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
op = suggester.getSuggestion();
assertNotNull(op);
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
assertEquals("TLOG type node must be in 'ssd' node", "node3:80_", op.getParams().get("node"));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair("newColl", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
op = suggester.getSuggestion();
assertNotNull(op);
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
assertEquals("TLOG type node must be in 'ssd' node", "node3:80_", op.getParams().get("node"));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
op = suggester.getSuggestion();
assertNull("No node should qualify for this", op);
}
public void testMoveReplicasInMultipleCollections() throws IOException {
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:2}," +
"\"node3:80_\":{cores:4}" +
"}");
Policy policy = new Policy(new HashMap<>());
@SuppressWarnings({"unchecked"})
Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues,
(Map<String, Object>) loadFromResource("testMoveReplicasInMultipleCollections.json")))
.getSuggester(MOVEREPLICA)
.hint(Hint.COLL, "collection1")
.hint(Hint.COLL, "collection2")
.hint(Suggester.Hint.SRC_NODE, "node2:80_");
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
assertEquals("collection2", op.getParams().get("collection"));
assertEquals("node1:80_", op.getParams().get("targetNode"));
String coreNodeName = op.getParams().get("replica");
assertTrue(coreNodeName.equals("core_node3") || coreNodeName.equals("core_node1"));
suggester = suggester.getSession()
.getSuggester(MOVEREPLICA)
.hint(Hint.COLL, "collection1")
.hint(Hint.COLL, "collection2")
.hint(Suggester.Hint.SRC_NODE, "node2:80_");
op = suggester.getSuggestion();
assertNotNull(op);
assertEquals("collection2", op.getParams().get("collection"));
assertEquals("node1:80_", op.getParams().get("targetNode"));
coreNodeName = op.getParams().get("replica");
assertTrue(coreNodeName.equals("core_node3") || coreNodeName.equals("core_node1"));
suggester = suggester.getSession()
.getSuggester(MOVEREPLICA)
.hint(Hint.COLL, "collection1")
.hint(Hint.COLL, "collection2")
.hint(Suggester.Hint.SRC_NODE, "node2:80_");
op = suggester.getSuggestion();
assertNull(op);
}
public void testMultipleCollections() {
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 1}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl'}," +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl2', type : PULL}," +
" { 'replica': '<3', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl2'}," +
" { 'replica': 0, 'shard': '#EACH', sysprop.fs : '!ssd', type : TLOG }" +
" { 'replica': 0, 'shard': '#EACH', sysprop.fs : '!slowdisk' , type : PULL }" +
" ]" +
"}");
if(useNodeset){
policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 1}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, nodeset : {'nodeRole': 'overseer'}}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl'}," +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl2', type : PULL}," +
" { 'replica': '<3', 'shard': '#EACH', 'node': '#ANY', 'collection':'newColl2'}," +
" { 'replica': 0, 'shard': '#EACH', put: on-each-node , nodeset:{ sysprop.fs : '!ssd'}, type : TLOG }" +
" { 'replica': 0, 'shard': '#EACH', put: on-each-node ,nodeset : {sysprop.fs : '!slowdisk'} , type : PULL }" +
" ]" +
"}");
}
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480, rack: rack4, sysprop.fs: slowdisk}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873, rack: rack3, sysprop.fs: unknown}," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834, rack: rack2, sysprop.fs : ssd}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1, sysprop.fs: unknown}" +
"}");
@SuppressWarnings({"unchecked"})
Policy policy = new Policy(policies);
Suggester suggester = policy.createSession(getSolrCloudManager(nodeValues, clusterState))
.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard1"));
@SuppressWarnings({"rawtypes"})
SolrRequest op;
int countOp = 0;
int countNewCollOp = 0;
int countNewColl2Op = 0;
while ((op = suggester.getSuggestion()) != null) {
countOp++;
assertEquals(Replica.Type.PULL.name(), op.getParams().get("type"));
String collection = op.getParams().get("collection");
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
if (collection.equals("newColl")) countNewCollOp++;
else countNewColl2Op++;
assertEquals("PULL type node must be in 'slowdisk' node, countOp : " + countOp, "node1:80_", op.getParams().get("node"));
suggester = suggester.getSession().getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, Replica.Type.PULL)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard1"));
}
assertEquals(2, countOp);
assertEquals(1, countNewCollOp);
assertEquals(1, countNewColl2Op);
countOp = 0;
countNewCollOp = 0;
countNewColl2Op = 0;
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
while ((op = suggester.getSuggestion()) != null) {
countOp++;
assertEquals(Replica.Type.TLOG.name(), op.getParams().get("type"));
String collection = op.getParams().get("collection");
assertTrue("Collection for replica is not as expected " + collection, collection.equals("newColl") || collection.equals("newColl2"));
if (collection.equals("newColl")) countNewCollOp++;
else countNewColl2Op++;
assertEquals("TLOG type node must be in 'ssd' node", "node3:80_", op.getParams().get("node"));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard2"))
.hint(Hint.COLL_SHARD, new Pair<>("newColl2", "shard2"))
.hint(Hint.REPLICATYPE, Replica.Type.TLOG);
}
assertEquals(3, countOp);
assertEquals(1, countNewCollOp);
assertEquals(2, countNewColl2Op);
}
public void testRow() {
Policy policy = new Policy();
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
@SuppressWarnings({"unchecked"})
Map<String, Map<String, List<ReplicaInfo>>> o = (Map<String, Map<String, List<ReplicaInfo>>>) Utils.fromJSONString("{c1: {s0:[{}]}}");
Utils.setObjectByPath(o, "c1/s0[0]", new ReplicaInfo("r0", "c1.s0", "c1", "s0", Replica.Type.NRT, "nodex", new HashMap<>()));
return o;
}
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return Utils.makeMap("node", "nodex", "cores", 1);
}
};
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public String getPolicyNameByCollection(String coll) {
return null;
}
@Override
public Set<String> getLiveNodes() {
return Collections.singleton("nodex");
}
};
}
});
Row row = session.getNode("nodex");
Row r1 = row.addReplica("c1", "s1", Replica.Type.NRT);
Row r2 = r1.addReplica("c1", "s1", Replica.Type.NRT);
assertEquals(1, r1.collectionVsShardVsReplicas.get("c1").get("s1").size());
assertEquals(2, r2.collectionVsShardVsReplicas.get("c1").get("s1").size());
assertTrue(r2.collectionVsShardVsReplicas.get("c1").get("s1").get(0) instanceof ReplicaInfo);
assertTrue(r2.collectionVsShardVsReplicas.get("c1").get("s1").get(1) instanceof ReplicaInfo);
}
public void testMerge() {
@SuppressWarnings({"rawtypes"})
Map map = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}," +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]," +
" 'policies': {" +
" 'policy1': [" +
" { 'replica': '1', 'sysprop.fs': 'ssd', 'shard': '#EACH'}," +
" { 'replica': '<2', 'shard': '#ANY', 'node': '#ANY'}," +
" { 'replica': '<2', 'shard': '#EACH', 'sysprop.rack': 'rack1'}" +
" ]" +
" }" +
"}");
if(useNodeset){
map = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, nodeset: {'nodeRole': 'overseer'}}," +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}" +
" ]," +
" 'policies': {" +
" 'policy1': [" +
" { 'replica': '1', nodeset:{ 'sysprop.fs': 'ssd'}, 'shard': '#EACH'}," +
" { 'replica': '<2', 'shard': '#ANY', 'node': '#ANY'}," +
" { 'replica': '<2', 'shard': '#EACH',nodeset:{ 'sysprop.rack': 'rack1'}}" +
" ]" +
" }" +
"}");
}
@SuppressWarnings({"unchecked"})
Policy policy = new Policy(map);
List<Clause> clauses = Policy.mergePolicies("mycoll", policy.getPolicies().get("policy1"), policy.getClusterPolicy());
Collections.sort(clauses);
assertEquals(clauses.size(), 4);
assertEquals("1", String.valueOf(clauses.get(0).original.get("replica")));
assertEquals("0", String.valueOf(clauses.get(1).original.get("replica")));
assertEquals("#ANY", clauses.get(3).original.get("shard"));
assertEquals("rack1", clauses.get(2).tag.val);
assertEquals("overseer",clauses.get(1).tag.val);
}
public void testConditionsSort() {
String rules = "{" +
" 'cluster-policy':[" +
" { 'nodeRole':'overseer', replica: 0, 'strict':false}," +
" { 'replica':'<1', 'node':'node3:80_', 'shard':'#EACH'}," +
" { 'replica':'<2', 'node':'#ANY', 'shard':'#EACH'}," +
" { 'replica':1, 'sysprop.rack':'rack1'}]" +
" }";
if(useNodeset){
rules = "{" +
" 'cluster-policy':[" +
" { 'nodeRole':'overseer', replica: 0, 'strict':false}," +
" { 'replica':'<1', 'node':'node3:80_', 'shard':'#EACH'}," +
" { 'replica':'<2', 'node':'#ANY', 'shard':'#EACH'}," +
" { 'replica':1, nodeset: {'sysprop.rack':'rack1'}}]" +
" }";
}
@SuppressWarnings({"unchecked"})
Policy p = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
List<Clause> clauses = new ArrayList<>(p.getClusterPolicy());
Collections.sort(clauses);
assertEquals("nodeRole", clauses.get(1).tag.getName());
assertEquals("sysprop.rack", clauses.get(0).tag.getName());
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testRules() {
String rules = "{" +
"cluster-policy:[" +
"{nodeRole:'overseer',replica : 0 , strict:false}," +
"{replica:'<1',node:'node3:80_'}," +
"{replica:'<2',node:'#ANY', shard:'#EACH'}]," +
" cluster-preferences:[" +
"{minimize:cores , precision:2}," +
"{maximize:freedisk, precision:50}, " +
"{minimize:heapUsage, precision:1000}]}";
if(useNodeset){
rules = "{" +
"cluster-policy:[" +
"{nodeset:{nodeRole:'overseer'},replica : 0 , strict:false}," +
"{replica:'<1',node:'node3:80_'}," +
"{replica:'<2',node:'#ANY', shard:'#EACH'}]," +
" cluster-preferences:[" +
"{minimize:cores , precision:2}," +
"{maximize:freedisk, precision:50}, " +
"{minimize:heapUsage, precision:1000}]}";
}
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873}," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer}" +
"}");
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
Policy.Session session;
session = policy.createSession(getSolrCloudManager(nodeValues, clusterState));
List<Row> l = session.getSortedNodes();
assertEquals("node1:80_", l.get(0).node);
assertEquals("node3:80_", l.get(1).node);
assertEquals("node4:80_", l.get(2).node);
assertEquals("node2:80_", l.get(3).node);
List<Violation> violations = session.getViolations();
assertEquals(3, violations.size());
assertTrue(violations.stream().anyMatch(violation -> "node3:80_".equals(violation.getClause().tag.getValue())));
assertTrue(violations.stream().anyMatch(violation -> "nodeRole".equals(violation.getClause().tag.getName())));
assertTrue(violations.stream().anyMatch(violation -> (violation.getClause().replica.getOperand() == Operand.LESS_THAN && "node".equals(violation.getClause().tag.getName()))));
Suggester suggester = session.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("gettingstarted", "r1"));
SolrParams operation = suggester.getSuggestion().getParams();
assertEquals("node2:80_", operation.get("node"));
nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873}," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834}," +
"\"node5:80_\":{cores:0, freedisk: 895, heapUsage:17834}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer}" +
"}");
session = policy.createSession(getSolrCloudManager(nodeValues, clusterState));
SolrRequest opReq = session.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "node5:80_")
.getSuggestion();
assertNotNull(opReq);
assertEquals("node5:80_", opReq.getParams().get("targetNode"));
}
@Test
public void testSessionCaching() throws IOException, InterruptedException {
// PolicyHelper.SessionRef ref1 = new PolicyHelper.SessionRef();
String autoScalingjson = " '{cluster-policy':[" +
" { 'cores':'<10', 'node':'#ANY'}," +
" { 'replica':'<2', 'shard':'#EACH', 'node':'#ANY'}," +
" { 'nodeRole':'overseer','replica':0}]," +
" 'cluster-preferences':[{'minimize':'cores'}]}";
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
// PolicyHelper.SESSION_REF.set(ref1);
String nodeValues = " {" +
" 'node4:80_':{" +
" 'node':'10.0.0.4:8987_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" 'node3:80_':{" +
" 'node':'10.0.0.4:8989_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
" 'node2:80_':{" +
" 'node':'10.0.0.4:7574_solr'," +
" 'cores':1," +
" 'freedisk':884.7097854614258}," +
"}";
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 50}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, 'nodeRole': 'overseer'}" +
" { 'replica': '<2', 'shard': '#EACH', 'node': '#ANY'}," +
" ]" +
"}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
@SuppressWarnings({"unchecked", "rawtypes"})
final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager((Map<String, Map>) Utils.fromJSONString(nodeValues),
clusterState)) {
@Override
public DistribStateManager getDistribStateManager() {
return delegatingDistribStateManager(config);
}
};
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations("c", config, solrCloudManager, null,
Arrays.asList("s1", "s2"), 1, 0, 0,
null);
PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
assertNotNull(sessionRef);
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
Policy.Session session = sessionWrapper.get();
assertNotNull(session);
assertTrue(session.getPolicy() == config.getPolicy());
assertEquals(sessionWrapper.status, PolicyHelper.Status.EXECUTING);
sessionWrapper.release();
assertTrue(sessionRef.isEmpty());
PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
PolicyHelper.SessionWrapper[] s2 = new PolicyHelper.SessionWrapper[1];
AtomicLong secondTime = new AtomicLong();
Thread thread = new Thread(() -> {
try {
s2[0] = PolicyHelper.getSession(solrCloudManager);
secondTime.set(System.nanoTime());
} catch (Exception e) {
throw new RuntimeException(e);
}
});
thread.start();
Thread.sleep(50);
long beforeReturn = System.nanoTime();
s1.returnSession(s1.get());
assertEquals(1, s1.getRefCount());
thread.join();
assertNotNull(s2[0]);
assertTrue(secondTime.get() > beforeReturn);
assertTrue(s1.getCreateTime() == s2[0].getCreateTime());
s2[0].returnSession(s2[0].get());
assertEquals(2, s1.getRefCount());
s2[0].release();
assertFalse(sessionRef.isEmpty());
s1.release();
assertTrue(sessionRef.isEmpty());
}
@Test
public void testMultiSessionsCache() throws IOException, InterruptedException {
@SuppressWarnings({"rawtypes", "unchecked"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString(" {" +
" 'node1:80_':{ 'node':'10.0.0.4:8987_solr', 'cores':1 }," +
" 'node2:80_':{ 'node':'10.0.0.4:8989_solr', 'cores':1 }," +
" 'node3:80_':{ 'node':'10.0.0.4:7574_solr', 'cores':1 }" +
"}");
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{ 'cluster-preferences': [{ 'minimize': 'cores', 'precision': 1}]}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager(nodeValues, clusterState)) {
@Override
public DistribStateManager getDistribStateManager() {
return delegatingDistribStateManager(config);
}
};
PolicyHelper.SessionWrapper s1 = PolicyHelper.getSession(solrCloudManager);
// Must skip the wait time otherwise test takes a few seconds to run (and s1 is not returned now anyway so no point waiting).
PolicyHelper.SessionWrapper s2 = PolicyHelper.getSession(solrCloudManager, false);
// Got two sessions, they are different
assertNotSame(s1, s2);
// Done COMPUTING with first session, it can be reused
s1.returnSession(s1.get());
PolicyHelper.SessionWrapper s3 = PolicyHelper.getSession(solrCloudManager);
// First session indeed reused when a new session is requested
assertSame(s3, s1);
// Done COMPUTING with second session, it can be reused
s2.returnSession(s2.get());
PolicyHelper.SessionWrapper s4 = PolicyHelper.getSession(solrCloudManager);
// Second session indeed reused when a new session is requested
assertSame(s4, s2);
s4.returnSession(s4.get());
s4.release();
s2.release();
s3.returnSession(s3.get());
s3.release();
PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
// First session not yet released so is still in the cache
assertFalse(sessionRef.isEmpty());
s1.release();
assertTrue(sessionRef.isEmpty());
}
/**
* Verify number of sessions allocated when parallel session requests arrive is reasonable.
* Test takes about 3 seconds to run.
*/
@Test
@Slow
public void testMultiThreadedSessionsCache() throws IOException, InterruptedException {
@SuppressWarnings({"rawtypes", "unchecked"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString(" {" +
" 'node1:80_':{ 'node':'10.0.0.4:8987_solr', 'cores':1 }," +
" 'node2:80_':{ 'node':'10.0.0.4:8989_solr', 'cores':1 }," +
" 'node3:80_':{ 'node':'10.0.0.4:7574_solr', 'cores':1 }" +
"}");
@SuppressWarnings({"rawtypes"})
Map policies = (Map) Utils.fromJSONString("{ 'cluster-preferences': [{ 'minimize': 'cores', 'precision': 1}]}");
@SuppressWarnings({"unchecked"})
AutoScalingConfig config = new AutoScalingConfig(policies);
final SolrCloudManager solrCloudManager = new DelegatingCloudManager(getSolrCloudManager(nodeValues, clusterState)) {
@Override
public DistribStateManager getDistribStateManager() {
return delegatingDistribStateManager(config);
}
};
final Set<PolicyHelper.SessionWrapper> seenSessions = Sets.newHashSet();
final AtomicInteger completedThreads = new AtomicInteger(0);
final int COUNT_THREADS = 100;
Thread[] threads = new Thread[COUNT_THREADS];
for (int i = 0; i < COUNT_THREADS; i++) {
threads[i] = new Thread(() -> {
try {
// This thread requests a session, computes using it for 25ms then returns is, executes for 1000ms more,
// releases the sessions and finishes.
PolicyHelper.SessionWrapper session = PolicyHelper.getSession(solrCloudManager);
synchronized (seenSessions) {
seenSessions.add(session);
}
Thread.sleep(25);
session.returnSession(session.get());
Thread.sleep(1000);
session.release();
completedThreads.incrementAndGet();
} catch (InterruptedException | IOException ignored) {
}
});
threads[i].start();
}
for (int i = 0; i < COUNT_THREADS; i++) {
threads[i].join(12000);
}
assertEquals(COUNT_THREADS, completedThreads.get());
// The value asserted below is somewhat arbitrary. Running locally usually uses up to 5 sessions, so hopefully 30 is
// safe. Idea is to verify we do not allocate a high number of sessions even if many concurrent session
// requests arrive at the same time. The session computing time is short in purpose. If it were long, it would be
// expected for more sessions to be needed.
// Note we joined with all the threads having updated seenSessions so no need to synchronize ("All actions in a thread
// happen before any other thread successfully returns from a join() on that thread" - JSR-133)
assertTrue("Too many (>=30) sessions created: " + seenSessions.size(), seenSessions.size() < 30);
PolicyHelper.SessionRef sessionRef = (PolicyHelper.SessionRef) solrCloudManager.getObjectCache().get(PolicyHelper.SessionRef.class.getName());
assertTrue(sessionRef.isEmpty());
}
private DistribStateManager delegatingDistribStateManager(AutoScalingConfig config) {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() {
return config;
}
};
}
public void testNegativeConditions() {
String autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'replica':'<4','shard':'#EACH','node':'#ANY'}," +
" { 'replica': 0, 'sysprop.fs': '!ssd', 'shard': '#EACH'}," +//negative greedy condition
" {'nodeRole':'overseer','replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480, rack: rack4, sysprop.fs: slowdisk}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873, rack: rack3, sysprop.fs: slowdisk}," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834, rack: rack2, sysprop.fs : ssd}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1, sysprop.fs: slowdisk}" +
"}");
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
SolrCloudManager cloudManager = getSolrCloudManager(nodeValues, clusterState);
Policy.Session session = policy.createSession(cloudManager);
for (int i = 0; i < 3; i++) {
Suggester suggester = session.getSuggester(ADDREPLICA);
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.getSuggestion();
assertNotNull(op);
assertEquals("node3:80_", op.getParams().get("node"));
session = suggester.getSession();
}
}
public void testGreedyConditions() {
String autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'cores':'<10','node':'#ANY'}," +
" {'replica':'<3','shard':'#EACH','node':'#ANY'}," +
" { 'replica': 2, 'sysprop.fs': 'ssd', 'shard': '#EACH'}," +//greedy condition
" {'nodeRole':'overseer','replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
if(useNodeset){
autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'cores':'<10','node':'#ANY'}," +
" {'replica':'<3','shard':'#EACH','node':'#ANY'}," +
" { 'replica': 2, nodeset: {'sysprop.fs': 'ssd'}, 'shard': '#EACH'}," +//greedy condition
" {nodeset:{'nodeRole':'overseer'},'replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
}
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480, rack: rack4}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873, rack: rack3}," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834, rack: rack2, sysprop.fs : ssd}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, rack: rack1}" +
"}");
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
SolrCloudManager cloudManager = getSolrCloudManager(nodeValues, clusterState);
Policy.Session session = policy.createSession(cloudManager);
Suggester suggester = session.getSuggester(ADDREPLICA);
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"))
.getSuggestion();
assertNotNull(op);
assertEquals("node3:80_", op.getParams().get("node"));
suggester = suggester
.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"));
op = suggester.getSuggestion();
assertNotNull(op);
assertEquals("node3:80_", op.getParams().get("node"));
suggester = suggester
.getSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "shard1"));
op = suggester.getSuggestion();
assertNotNull(op);
assertEquals("node2:80_", op.getParams().get("node"));
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testMoveReplica() {
String autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'cores':'<10','node':'#ANY'}," +
" {'replica':'<3','shard':'#EACH','node':'#ANY'}," +
" {'nodeRole':'overseer','replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
if(useNodeset){
autoscaleJson = "{" +
" 'cluster-policy':[" +
" {'cores':'<10','node':'#ANY'}," +
" {'replica':'<3','shard':'#EACH','node':'#ANY'}," +
" {nodeset: {'nodeRole':'overseer'},'replica':'0'}]," +
" 'cluster-preferences':[" +
" {'minimize':'cores', 'precision':3}," +
" {'maximize':'freedisk','precision':100}]}";
}
Map replicaInfoMap = (Map) Utils.fromJSONString("{ '127.0.0.1:60099_solr':{}," +
" '127.0.0.1:60089_solr':{'compute_plan_action_test':{'shard1':[" +
" {'core_node1':{}}," +
" {'core_node2':{}}]}}}");
Map m = (Map) Utils.getObjectByPath(replicaInfoMap, false, "127.0.0.1:60089_solr/compute_plan_action_test");
m.put("shard1", Arrays.asList(
new ReplicaInfo("core_node1", "core_node1", "compute_plan_action_test", "shard1", Replica.Type.NRT, "127.0.0.1:60089_solr", Collections.emptyMap()),
new ReplicaInfo("core_node2", "core_node2", "compute_plan_action_test", "shard1", Replica.Type.NRT, "127.0.0.1:60089_solr", Collections.emptyMap())));
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map<String, Object>> tagsMap = (Map) Utils.fromJSONString("{" +
" '127.0.0.1:60099_solr':{" +
" 'cores':0," +
" 'freedisk':918005641216}," +
" '127.0.0.1:60089_solr':{" +
" 'cores':2," +
" 'freedisk':918005641216}}");
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoscaleJson));
Policy.Session session = policy.createSession(new DelegatingCloudManager(null) {
@Override
@SuppressWarnings({"unchecked"})
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public Set<String> getLiveNodes() {
return replicaInfoMap.keySet();
}
};
}
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return tagsMap.get(node);
}
@Override
@SuppressWarnings({"unchecked"})
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return (Map<String, Map<String, List<ReplicaInfo>>>) replicaInfoMap.get(node);
}
};
}
});
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "127.0.0.1:60099_solr");
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
assertNotNull("expect a non null operation", op);
}
public void testOtherTag() {
String rules = "{" +
"'cluster-preferences':[" +
"{'minimize':'cores','precision':2}," +
"{'maximize':'freedisk','precision':50}," +
"{'minimize':'heapUsage','precision':1000}" +
"]," +
"'cluster-policy':[" +
"{replica:0, 'nodeRole':'overseer','strict':false}," +
"{'replica':'<1','node':'node3:80_'}," +
"{'replica':'<2','node':'#ANY','shard':'#EACH'}" +
"]," +
"'policies':{" +
"'p1':[" +
"{replica:0, 'nodeRole':'overseer','strict':false}," +
"{'replica':'<1','node':'node3:80_'}," +
"{'replica':'<2','node':'#ANY','shard':'#EACH'}," +
"{'replica':'<3','shard':'#EACH','sysprop.rack':'#EACH'}" +
"]" +
"}" +
"}";
if(useNodeset){
rules = "{" +
"'cluster-preferences':[" +
"{'minimize':'cores','precision':2}," +
"{'maximize':'freedisk','precision':50}," +
"{'minimize':'heapUsage','precision':1000}" +
"]," +
"'cluster-policy':[" +
"{replica:0, nodeset:{'nodeRole':'overseer'},'strict':false}," +
"{'replica':'<1','node':'node3:80_'}," +
"{'replica':'<2','node':'#ANY','shard':'#EACH'}" +
"]," +
"'policies':{" +
"'p1':[" +
"{replica:0, nodeset:{'nodeRole':'overseer'},'strict':false}," +
"{'replica':'<1','node':'node3:80_'}," +
"{'replica':'<2','node':'#ANY','shard':'#EACH'}," +
"{'replica':'<3','shard':'#EACH', nodeset : { 'sysprop.rack':[rack1, rack2, rack3, rack4]}}" +
"]" +
"}" +
"}";
}
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heapUsage:10480, rack: rack4}," +
"\"node2:80_\":{cores:4, freedisk: 749, heapUsage:6873, rack: rack3}," +
"\"node3:80_\":{cores:7, freedisk: 262, heapUsage:7834, rack: rack2}," +
"\"node4:80_\":{cores:8, freedisk: 375, heapUsage:16900, nodeRole:overseer, sysprop.rack: rack1}" +
"}");
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(rules));
SolrCloudManager cloudManager = getSolrCloudManager(nodeValues, clusterState);
SolrCloudManager cdp = new DelegatingCloudManager(null) {
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
return cloudManager.getNodeStateProvider().getNodeValues(node, tags);
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return cloudManager.getNodeStateProvider().getReplicaInfo(node, keys);
}
};
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public Set<String> getLiveNodes() {
return cloudManager.getClusterStateProvider().getLiveNodes();
}
@Override
public String getPolicyNameByCollection(String coll) {
return "p1";
}
};
}
};
Policy.Session session = policy.createSession(cdp);
CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) session
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("newColl", "s1")).getSuggestion();
assertNotNull(op);
assertEquals("node2:80_", op.getNode());
}
@SuppressWarnings({"rawtypes"})
static SolrCloudManager getSolrCloudManager(final Map<String, Map> nodeValues, String clusterS) {
return getSolrCloudManager(nodeValues,(Map) Utils.fromJSONString(clusterS));
}
private static SolrCloudManager getSolrCloudManager(@SuppressWarnings({"rawtypes"})final Map<String, Map> nodeValues,
@SuppressWarnings({"rawtypes"})Map clusterS) {
return new SolrCloudManager() {
ObjectCache objectCache = new ObjectCache();
@Override
public ObjectCache getObjectCache() {
return objectCache;
}
@Override
public TimeSource getTimeSource() {
return TimeSource.NANO_TIME;
}
@Override
public void close() {
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public Set<String> getLiveNodes() {
return nodeValues.keySet();
}
};
}
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
Map<String, Object> result = new LinkedHashMap<>();
tags.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, clusterS);
}
};
}
@Override
public DistribStateManager getDistribStateManager() {
return null;
}
@Override
public DistributedQueueFactory getDistributedQueueFactory() {
return null;
}
@Override
public SolrResponse request(@SuppressWarnings({"rawtypes"})SolrRequest req) {
return null;
}
@Override
public byte[] httpRequest(String url, SolrRequest.METHOD method, Map<String, String> headers, String payload, int timeout, boolean followRedirects) {
return new byte[0];
}
};
}
public void testEmptyClusterState() {
String autoScaleJson = " {'policies':{'c1':[{" +
" 'replica':1," +
" 'shard':'#EACH'," +
" 'port':'50096'}]}}";
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
" '127.0.0.1:50097_solr':{" +
" 'cores':0," +
" 'port':'50097'}," +
" '127.0.0.1:50096_solr':{" +
" 'cores':0," +
" 'port':'50096'}}");
SolrCloudManager dataProvider = new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public Set<String> getLiveNodes() {
return new HashSet<>(Arrays.asList("127.0.0.1:50097_solr", "127.0.0.1:50096_solr"));
}
};
}
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, (Map)Utils.fromJSONString(clusterState));
}
};
}
};
@SuppressWarnings({"unchecked"})
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
"newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),
dataProvider, Collections.singletonMap("newColl", "c1"), Arrays.asList("shard1", "shard2"), 1, 0, 0, null);
assertTrue(locations.stream().allMatch(it -> it.node.equals("127.0.0.1:50096_solr")));
}
public void testMultiReplicaPlacement() {
String autoScaleJson = "{" +
" cluster-preferences: [" +
" { maximize : freedisk , precision: 50}," +
" { minimize : cores, precision: 2}" +
" ]," +
" cluster-policy: [" +
" { replica : '0' , nodeRole: overseer}," +
" { replica: '<2', shard: '#ANY', node: '#ANY'" +
" }" +
" ]," +
" policies: {" +
" policy1: [" +
" { replica: '<2', shard: '#EACH', node: '#ANY'}," +
" { replica: '<2', shard: '#EACH', sysprop.rack: rack1}" +
" ]" +
" }" +
"}";
if(useNodeset){
autoScaleJson = "{" +
" cluster-preferences: [" +
" { maximize : freedisk , precision: 50}," +
" { minimize : cores, precision: 2}" +
" ]," +
" cluster-policy: [" +
" { replica : '0' , nodeset: {nodeRole: overseer}}," +
" { replica: '<2', shard: '#ANY', node: '#ANY'" +
" }" +
" ]," +
" policies: {" +
" policy1: [" +
" { replica: '<2', shard: '#EACH', node: '#ANY'}," +
" { replica: '<2', shard: '#EACH', nodeset:{ sysprop.rack: rack1}}" +
" ]" +
" }" +
"}";
}
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heap:10480, sysprop.rack:rack3}," +
"\"node2:80_\":{cores:4, freedisk: 749, heap:6873, sysprop.fs : ssd, sysprop.rack:rack1}," +
"\"node3:80_\":{cores:7, freedisk: 262, heap:7834, sysprop.rack:rack4}," +
"\"node4:80_\":{cores:0, freedisk: 900, heap:16900, nodeRole:overseer, sysprop.rack:rack2}" +
"}");
SolrCloudManager cloudManager = new DelegatingCloudManager(null) {
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
return getReplicaDetails(node, (Map)Utils.fromJSONString(clusterState));
}
};
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public Set<String> getLiveNodes() {
return new HashSet<>(Arrays.asList("node1:80_", "node2:80_", "node3:80_", "node4:80_"));
}
};
}
};
@SuppressWarnings({"unchecked"})
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
"newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),
cloudManager, Collections.singletonMap("newColl", "policy1"), Arrays.asList("shard1", "shard2"), 3, 0, 0, null);
assertTrue(locations.stream().allMatch(it -> ImmutableList.of("node2:80_", "node1:80_", "node3:80_").contains(it.node)));
}
public void testMoveReplicaSuggester() {
String autoScalingjson = " '{cluster-policy':[" +
"{'cores':'<10', 'node':'#ANY'}," +
"{'replica':'<2', 'shard':'#EACH','node':'#ANY'}]," +
"'cluster-preferences':[{'minimize':'cores'}]}";
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(cloudManagerWithData((Map) loadFromResource("testMoveReplicaSuggester.json")));
Suggester suggester = session.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "10.0.0.6:7574_solr");
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
assertNotNull(op);
suggester = suggester.getSession()
.getSuggester(MOVEREPLICA)
.hint(Hint.TARGET_NODE, "10.0.0.6:7574_solr");
op = suggester.getSuggestion();
assertNull(op);
suggester = suggester.getSession()
.getSuggester(MOVEREPLICA)
.forceOperation(true)
.hint(Hint.TARGET_NODE, "10.0.0.6:8983_solr");
op = suggester.getSuggestion();
assertNull(op);
}
public void testComputePlanAfterNodeAdded() {
String autoScalingjson = "cluster-preferences:[" +
" {minimize : cores}," +
" {'maximize':freedisk , precision:100}], " +
" cluster-policy:[{cores:'<10',node:'#ANY'}," +
" {replica:'<2', shard:'#EACH',node:'#ANY'}," +
" { nodeRole:overseer,replica:0}]}";
if(useNodeset){
autoScalingjson = "cluster-preferences:[" +
" {minimize : cores}," +
" {'maximize':freedisk , precision:100}], " +
" cluster-policy:[{cores:'<10',node:'#ANY'}," +
" {replica:'<2', shard:'#EACH',node:'#ANY'}," +
" {nodeset:{ nodeRole:overseer},replica:0}]}";
}
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(cloudManagerWithData((Map) loadFromResource("testComputePlanAfterNodeAdded.json")));
Suggester suggester = session.getSuggester(CollectionParams.CollectionAction.MOVEREPLICA)
.hint(Hint.TARGET_NODE, "127.0.0.1:51147_solr");
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
log.info("{}", op);
assertNotNull("operation expected ", op);
}
public void testReplicaCountSuggestions() {
String autoScalingjson = " { cluster-policy:[" +
" { cores :'<10', node :'#ANY'}," +
" { replica :'<2', node:'#ANY'}," +
" { nodeRole : overseer, replica :0}]," +
" cluster-preferences :[{ minimize : cores }]}";
if(useNodeset){
autoScalingjson = " { cluster-policy:[" +
" { cores :'<10', node :'#ANY'}," +
" { replica :'<2', node:'#ANY'}," +
" { nodeset:{nodeRole : overseer}, replica :0}]," +
" cluster-preferences :[{ minimize : cores }]}";
}
@SuppressWarnings({"unchecked"})
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson)),
cloudManagerWithData((Map) loadFromResource("testReplicaCountSuggestions.json")));
assertFalse(l.isEmpty());
assertEquals(1.0d, l.get(0)._get( "violation/violation/delta",null));
assertEquals("POST", l.get(0)._get("operation/method",null));
assertEquals("/c/mycoll1", l.get(0)._get( "operation/path",null));
assertNotNull(l.get(0)._get("operation/command/move-replica", null));
assertEquals("10.0.0.6:7574_solr", l.get(0)._get( "operation/command/move-replica/targetNode",null));
/*
* one of the two cores on 10.0.0.6:8983_solr should move to 10.0.0.6:7574_solr and
* (everything else being equal) core_node1 is chosen ahead of core_node2 based on its name
*/
assertEquals("core_node1", l.get(0)._get("operation/command/move-replica/replica", null));
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testReplicaPercentage() {
List<Map> l = (List<Map>) loadFromResource("testReplicaPercentage.json");
String autoScalingjson = " { cluster-policy:[" +
" { replica :'51%', shard:'#EACH', node:'#ANY'}]," +
" cluster-preferences :[{ minimize : cores }]}";
AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(l.get(0)));
List<Violation> violations = session.getViolations();
assertEquals(2, violations.size());
for (Violation violation : violations) {
if (violation.node.equals("10.0.0.6:8983_solr")) {
assertEquals(1.0d, violation.replicaCountDelta, 0.01);
assertEquals(1.53d, ((RangeVal) violation.getClause().getReplica().val).actual);
} else if (violation.node.equals("10.0.0.6:7574_solr")) {
assertEquals(-1.0d, violation.replicaCountDelta, 0.01);
}
}
session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(Utils.getDeepCopy(l.get(1), 6)));
violations = session.getViolations();
assertEquals(0, violations.size());
autoScalingjson = " { cluster-policy:[" +
" { replica :'51%', shard: '#EACH' , node:'#ANY'}]," +
" cluster-preferences :[{ minimize : cores }]}";
autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(l.get(1)));
violations = session.getViolations();
assertEquals(0, violations.size());
autoScalingjson = " { cluster-policy:[" +
" { replica :'50%',node:'#ANY' , type: TLOG } ,{ replica :'50%',node:'#ANY' , type: PULL } ]," +
" cluster-preferences :[{ minimize : cores }]}";
autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
session = autoScalingConfig.getPolicy().createSession(cloudManagerWithData(l.get(2)));
violations = session.getViolations();
assertEquals(2, violations.size());
}
@SuppressWarnings({"unchecked"})
public void testReplicaZonesPercentage() {
String autoScalingjson = " { cluster-policy:[" +
" { replica :'33%', shard: '#EACH', sysprop.az : east}," +
" { replica :'67%', shard: '#EACH', sysprop.az : west}" +
" ]," +
" cluster-preferences :[{ minimize : cores }]}";
String COLL_NAME = "percentColl";
AutoScalingConfig autoScalingConfig = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Transaction txn = new Policy.Transaction(autoScalingConfig.getPolicy());
txn.open(cloudManagerWithData((Map<String, Object>) loadFromResource("testReplicaZonesPercentage.json")));
List<String> nodes = new ArrayList<>();
int westCount = 0, eastCount = 0;
for (int i = 0; i < 12; i++) {
@SuppressWarnings({"rawtypes"})
SolrRequest suggestion = txn.getCurrentSession()
.getSuggester(ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>(COLL_NAME, "shard1"))
.getSuggestion();
assertNotNull(suggestion);
String node = suggestion.getParams().get("node");
nodes.add(node);
if ("10.0.0.6:8983_solr".equals(node)) eastCount++;
if ("10.0.0.6:7574_solr".equals(node)) westCount++;
if (i % 3 == 1) assertEquals("10.0.0.6:8983_solr", node);
else assertEquals("10.0.0.6:7574_solr", node);
}
assertEquals(8, westCount);
assertEquals(4, eastCount);
List<Violation> violations = txn.close();
assertTrue(violations.isEmpty());
Policy.Session latestSession = txn.getCurrentSession();
assertEquals("10.0.0.6:7574_solr", latestSession.matrix.get(0).node);
AtomicInteger count = new AtomicInteger();
latestSession.matrix.get(0).forEachReplica(replicaInfo -> count.incrementAndGet());
assertEquals(8, count.get());
assertEquals("10.0.0.6:8983_solr", latestSession.matrix.get(1).node);
count.set(0);
latestSession.matrix.get(1).forEachReplica(replicaInfo -> count.incrementAndGet());
assertEquals(4, count.get());
}
@SuppressWarnings({"unchecked"})
public void testFreeDiskDeviation() {
@SuppressWarnings({"rawtypes"})
Map map = (Map) loadFromResource("testFreeDiskDeviation.json");
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) map.get("config"));
if(useNodeset){
cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString("{" +
" 'cluster-policy': [{'replica':'<2', 'shard':'#EACH', 'node':'#ANY'}," +
" {'replica': '#ALL', 'nodeset': {'freedisk': '>700'}, 'strict': false}]" +
" }"));
}
SolrCloudManager scm = cloudManagerWithData(map);
Suggester suggester = cfg.getPolicy()
.createSession(scm)
.getSuggester(ADDREPLICA);
MapWriter v2Request = (MapWriter) ((V2RequestSupport) suggester
.hint(Hint.COLL_SHARD, new Pair<>("mycoll2", "shard1"))
.getSuggestion()
.setUseV2(true))
.getV2Request();
assertEquals("/c/mycoll2/shards", v2Request._get("path",null));
assertEquals("add-replica", v2Request._get("command[0]/key",null));
assertEquals("node1:80_", v2Request._get("command/add-replica/node",null));
suggester = suggester.getSession()
.getSuggester(ADDREPLICA);
v2Request = (MapWriter) ((V2RequestSupport) suggester
.hint(Hint.COLL_SHARD, new Pair<>("mycoll2", "shard1"))
.getSuggestion()
.setUseV2(true))
.getV2Request();
assertEquals("/c/mycoll2/shards", v2Request._get("path",null));
assertEquals("add-replica", v2Request._get("command[0]/key",null));
assertEquals("node2:80_", v2Request._get("command/add-replica/node",null));
}
@SuppressWarnings({"unchecked"})
public void testFreeDiskSuggestions() {
String autoScalingjson = " { cluster-policy:[" +
" { replica :'0', freedisk:'<1000'}," +
" { nodeRole : overseer, replica :0}]," +
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
if(useNodeset){
autoScalingjson = " { cluster-policy:[" +
" { replica :'0', put:on-each-node , nodeset:{ freedisk:'<1000'}}," +
" { replica :0, put : on-each-node , nodeset : {nodeRole : overseer}}]," +
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
}
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testFreeDiskSuggestions.json"))).getViolations();
assertEquals(1, violations.size());
assertEquals(4, violations.get(0).getViolatingReplicas().size());
assertEquals(4, violations.get(0).replicaCountDelta, 0.1);
for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) {
assertEquals(500d, r.delta, 0.1);
}
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(cfg, cloudManagerWithData((Map) loadFromResource("testFreeDiskSuggestions.json")));
assertEquals(3, l.size());
assertEquals("r4", l.get(0)._get("operation/command/move-replica/replica", null));
assertEquals("node1:80_", l.get(0)._get("operation/command/move-replica/targetNode", null));
assertEquals("r3", l.get(1)._get("operation/command/move-replica/replica", null));
assertEquals("node1:80_", l.get(1)._get("operation/command/move-replica/targetNode", null));
assertEquals("r2", l.get(2)._get("operation/command/move-replica/replica", null));
assertEquals("node1:80_", l.get(2)._get("operation/command/move-replica/targetNode", null));
autoScalingjson = " { cluster-policy:[" +
" { replica :'#ALL', freedisk:'>1000'}," +
" { nodeRole : overseer, replica :0}]," +
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
if(useNodeset){
autoScalingjson = " { cluster-policy:[" +
" { replica :'#ALL', nodeset:{ freedisk:'>1000'}}," +
" { replica :0 , put: on-each-node , nodeset : {nodeRole : overseer}}]," +
" cluster-preferences :[{ minimize : cores, precision : 2 }]}";
}
cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testFreeDiskSuggestions.json"))).getViolations();
assertEquals(1, violations.size());
assertEquals(-4, violations.get(0).replicaCountDelta, 0.1);
assertEquals(1, violations.size());
assertEquals(0, violations.get(0).getViolatingReplicas().size());
l = PolicyHelper.getSuggestions(cfg, cloudManagerWithData((Map) loadFromResource("testFreeDiskSuggestions.json")));
assertEquals(3, l.size());
assertEquals("r4", l.get(0)._get("operation/command/move-replica/replica", null));
assertEquals("node1:80_", l.get(0)._get("operation/command/move-replica/targetNode", null));
assertEquals("r3", l.get(1)._get("operation/command/move-replica/replica", null));
assertEquals("node1:80_", l.get(1)._get("operation/command/move-replica/targetNode", null));
assertEquals("r2", l.get(2)._get("operation/command/move-replica/replica", null));
assertEquals("node1:80_", l.get(2)._get("operation/command/move-replica/targetNode", null));
}
public void testCoresSuggestions() {
String autoScalingjson = " { cluster-policy:[" +
" { cores :'<3', node :'#ANY'}]," +
" cluster-preferences :[{ minimize : cores }]}";
@SuppressWarnings({"unchecked"})
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testCoresSuggestions.json"))).getViolations();
assertFalse(violations.isEmpty());
assertEquals(2L, violations.get(0).replicaCountDelta.longValue());
List<Suggester.SuggestionInfo> l = PolicyHelper.getSuggestions(cfg,
cloudManagerWithData((Map) loadFromResource("testCoresSuggestions.json")));
assertEquals(2, l.size());
for (Suggester.SuggestionInfo suggestionInfo : l) {
assertEquals("10.0.0.6:7574_solr", suggestionInfo._get("operation/command/move-replica/targetNode", null));
assertEquals("POST", suggestionInfo._get("operation/method", null));
assertEquals("/c/mycoll1", suggestionInfo._get("operation/path", null));
}
}
public void testSyspropSuggestions1() {
String autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': '1', shard:'#EACH', sysprop.fs : 'ssd'}" +
" ]" +
"}";
if(useNodeset){
autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': '1', shard:'#EACH', nodeset:{ sysprop.fs : 'ssd'}}" +
" ]" +
"}";
}
@SuppressWarnings({"unchecked"})
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testSyspropSuggestions1.json"))).getViolations();
assertEquals("expected 2 violations", 2, violations.size());
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(cfg, cloudManagerWithData((Map) loadFromResource("testSyspropSuggestions1.json")));
assertEquals(2, suggestions.size());
for (Suggester.SuggestionInfo suggestion : suggestions) {
suggestion._get("operation/move-replica/targetNode", null);
}
}
public void testPortSuggestions() {
String autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, shard:'#EACH', port : '8983'}" +
" ]" +
"}";
if(useNodeset){
autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, shard:'#EACH', nodeset :{ port : '8983'}}" +
" ]" +
"}";
}
@SuppressWarnings({"unchecked"})
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testPortSuggestions.json"))).getViolations();
assertEquals(2, violations.size());
List<Suggester.SuggestionInfo> suggestions = PolicyHelper.getSuggestions(cfg, cloudManagerWithData((Map) loadFromResource("testPortSuggestions.json")));
assertEquals(4, suggestions.size());
for (Suggester.SuggestionInfo suggestionInfo : suggestions) {
assertEquals(suggestionInfo.operation.getPath(), "/c/c1");
}
}
public void testDiskSpaceHint() {
String autoScalingjson = "cluster-preferences:[" +
" {minimize : cores}]" +
" cluster-policy:[{cores:'<10',node:'#ANY'}," +
" {replica:'<2', shard:'#EACH',node:'#ANY'}," +
" { nodeRole:overseer,replica:0}]}";
@SuppressWarnings({"unchecked"})
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
Policy.Session session = policy.createSession(cloudManagerWithData((Map) loadFromResource("testDiskSpaceHint.json")));
Suggester suggester = session.getSuggester(CollectionAction.ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("coll1", "shard1"))
.hint(Hint.MINFREEDISK, 150);
CollectionAdminRequest.AddReplica op = (CollectionAdminRequest.AddReplica) suggester.getSuggestion();
assertEquals("127.0.0.1:51078_solr", op.getNode());
suggester = session.getSuggester(CollectionAction.ADDREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>("coll1", "shard1"));
op = (CollectionAdminRequest.AddReplica) suggester.getSuggestion();
assertEquals("127.0.0.1:51147_solr", op.getNode());
}
public void testDiskSpaceReqd() {
String autoScaleJson = "{" +
" cluster-preferences: [" +
" { minimize : cores, precision: 2}" +
" ]," +
" cluster-policy: [" +
" { replica : '0' , nodeRole: overseer}" +
" ]" +
"}";
@SuppressWarnings({"unchecked", "rawtypes"})
Map<String, Map> nodeValues = (Map<String, Map>) Utils.fromJSONString("{" +
"\"node1:80_\":{cores:12, freedisk: 334, heap:10480, sysprop.rack:rack3}," +
"\"node2:80_\":{cores:4, freedisk: 262, heap:6873, sysprop.fs : ssd, sysprop.rack:rack1}," +
"\"node3:80_\":{cores:7, freedisk: 749, heap:7834, sysprop.rack:rack4}," +
"\"node4:80_\":{cores:0, freedisk: 900, heap:16900, nodeRole:overseer, sysprop.rack:rack2}" +
"}");
SolrCloudManager cloudManager = new DelegatingCloudManager(null) {
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
Map<String, Object> result = new LinkedHashMap<>();
keys.stream().forEach(s -> result.put(s, nodeValues.get(node).get(s)));
return result;
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
if (node.equals("node1:80_")) {
Map m = Utils.makeMap("newColl",
Utils.makeMap("shard1", Collections.singletonList(new ReplicaInfo("r1", "shard1",
new Replica("r1", Utils.makeMap(ZkStateReader.NODE_NAME_PROP, "node1:80_", ZkStateReader.CORE_NAME_PROP, "core1"), "newColl", "shard1"),
Utils.makeMap(FREEDISK.perReplicaValue, 200)))));
return m;
} else if (node.equals("node2:80_")) {
Map m = Utils.makeMap("newColl",
Utils.makeMap("shard2", Collections.singletonList(new ReplicaInfo("r1", "shard2",
new Replica("r1", Utils.makeMap(ZkStateReader.NODE_NAME_PROP, "node2:80_", ZkStateReader.CORE_NAME_PROP, "core2"),"newColl", "shard2"),
Utils.makeMap(FREEDISK.perReplicaValue, 200)))));
return m;
}
return new HashMap<>();
}
};
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public Set<String> getLiveNodes() {
return new HashSet<>(Arrays.asList("node1:80_", "node2:80_", "node3:80_", "node4:80_"));
}
@Override
public DocCollection getCollection(String name) {
return new DocCollection(name, Collections.emptyMap(), Collections.emptyMap(), DocRouter.DEFAULT) {
@Override
public Replica getLeader(String sliceName) {
if (sliceName.equals("shard1"))
return new Replica("r1", Utils.makeMap(ZkStateReader.NODE_NAME_PROP, "node1:80_", ZkStateReader.CORE_NAME_PROP, "core1"), name, "shard1");
if (sliceName.equals("shard2"))
return new Replica("r2", Utils.makeMap(ZkStateReader.NODE_NAME_PROP, "node2:80_", ZkStateReader.CORE_NAME_PROP, "core2"),name, "shard2");
return null;
}
};
}
};
}
};
@SuppressWarnings({"unchecked"})
List<ReplicaPosition> locations = PolicyHelper.getReplicaLocations(
"newColl", new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScaleJson)),
cloudManager, null, Arrays.asList("shard1", "shard2"), 1, 0, 0, null);
assertTrue(locations.stream().allMatch(it -> "node3:80_".equals(it.node)));
}
public void testMoveReplicaLeaderlast() {
List<Pair<ReplicaInfo, Row>> validReplicas = new ArrayList<>();
Map<String, Object> propMap = Utils.makeMap(
"leader", "true",
ZkStateReader.NODE_NAME_PROP, "node1:80_",
ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.toString(),
ZkStateReader.CORE_NAME_PROP, "core1");
Replica replica = new Replica("r1", propMap, "c1", "s1");
ReplicaInfo replicaInfo = new ReplicaInfo(replica.collection, replica.slice ,replica, new HashMap<>());
validReplicas.add(new Pair<>(replicaInfo, null));
replicaInfo = new ReplicaInfo("r4", "c1_s2_r1", "c1", "s2", Replica.Type.NRT, "n1", Collections.singletonMap("leader", "true"));
validReplicas.add(new Pair<>(replicaInfo, null));
propMap.put("leader", false);
replica = new Replica("r2", propMap,"c1","s1");
replicaInfo = new ReplicaInfo(replica.collection, replica.slice, replica, new HashMap<>());
validReplicas.add(new Pair<>(replicaInfo, null));
replica = new Replica("r3", propMap,"c1","s1");
replicaInfo = new ReplicaInfo(replica.collection,replica.slice, replica, new HashMap<>());
validReplicas.add(new Pair<>(replicaInfo, null));
validReplicas.sort(MoveReplicaSuggester.leaderLast);
assertEquals("r2", validReplicas.get(0).first().getName());
assertEquals("r3", validReplicas.get(1).first().getName());
assertEquals("r1", validReplicas.get(2).first().getName());
assertEquals("r4", validReplicas.get(3).first().getName());
}
public void testScheduledTriggerFailure() throws Exception {
@SuppressWarnings({"rawtypes"})
Map jsonObj = (Map) loadFromResource("testScheduledTriggerFailure.json");
SolrCloudManager cloudManager = createCloudManager(jsonObj);
Suggester suggester = createSuggester(cloudManager, jsonObj, null);
int count = 0;
while (count < 10) {
CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion();
if (op == null) break;
count++;
if (log.isInfoEnabled()) {
log.info("OP:{}", op.getParams());
}
suggester = createSuggester(cloudManager, jsonObj, suggester);
}
assertEquals(0, count);
}
public void testUtilizeNodeFailure() throws Exception {
@SuppressWarnings({"rawtypes"})
Map jsonObj = (Map) loadFromResource("testUtilizeNodeFailure.json"); //(Map) Utils.fromJSONString(state);
SolrCloudManager cloudManager = createCloudManager(jsonObj);
Suggester suggester = createSuggester(cloudManager, jsonObj, null);
int count = 0;
while (count < 100) {
CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion();
if (op == null) break;
count++;
if (log.isInfoEnabled()) {
log.info("OP:{}", op.getParams());
}
suggester = createSuggester(cloudManager, jsonObj, suggester);
}
assertEquals("count = " + count, 0, count);
}
public void testUtilizeNodeFailure2() throws Exception {
@SuppressWarnings({"rawtypes"})
Map jsonObj = (Map) loadFromResource("testUtilizeNodeFailure2.json");
SolrCloudManager cloudManager = createCloudManager(jsonObj);
Suggester suggester = createSuggester(cloudManager, jsonObj, null);
int count = 0;
while (count < 100) {
CollectionAdminRequest.MoveReplica op = (CollectionAdminRequest.MoveReplica) suggester.getSuggestion();
if (op == null) break;
count++;
if (log.isInfoEnabled()) {
log.info("OP:{}", op.getParams());
}
suggester = createSuggester(cloudManager, jsonObj, suggester);
}
assertEquals("count = " + count, 1, count);
}
//SOLR-12358
@SuppressWarnings({"unchecked", "rawtypes"})
public void testSortError() {
Policy policy = new Policy((Map<String, Object>) Utils.fromJSONString("{cluster-preferences: [{minimize : cores, precision:1}, " +
"{maximize : freedisk, precision: 50}, " +
"{minimize: sysLoadAvg}]}"));
List l = (List) loadFromResource("testSortError.json");
List<Variable.Type> params = new ArrayList<>();
params.add(CORES);
params.add(Variable.Type.FREEDISK);
params.add(Variable.Type.SYSLOADAVG);
params.add(Variable.Type.NODE);
List<Row> rows = new ArrayList<>();
for (Object o : l) {
Map m = (Map) o;
Cell[] c = new Cell[params.size()];
List attrs = (List) m.get("attributes");
for (int i = 0; i < params.size(); i++) {
Variable.Type param = params.get(i);
for (Object attr : attrs) {
Object o1 = ((Map) attr).get(param.tagName);
if (o1 != null) {
o1 = param.validate(param.tagName, o1, false);
c[i] = new Cell(i, param.tagName, o1, o1, param, null);
}
}
}
rows.add(new Row((String) m.get("node"), c, false,
new HashMap<>(),
(Boolean) m.get("isLive"), null, new HashMap(), new HashMap()));
}
int deadNodes = 0;
for (Row row : rows) {
if (!row.isLive) deadNodes++;
}
Policy.setApproxValuesAndSortNodes(policy.getClusterPreferences(), rows);
for (int i = 0; i < deadNodes; i++) {
assertFalse(rows.get(i).isLive);
}
for (int i = deadNodes; i < rows.size(); i++) {
assertTrue(rows.get(i).isLive);
}
}
public void testViolationOutput() throws IOException {
String autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, shard:'#EACH', port : '8983'}" +
" ]" +
"}";
@SuppressWarnings({"unchecked"})
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testViolationOutput.json"))).getViolations();
StringWriter writer = new StringWriter();
NamedList<Object> val = new NamedList<>();
val.add("violations", violations);
if (random().nextBoolean()) {
new SolrJSONWriter(writer)
.writeObj(val)
.close();
} else {
JSONWriter.write(writer, true, JsonTextWriter.JSON_NL_MAP, val);
}
Object root = Utils.fromJSONString(writer.toString());
assertEquals(2l,
Utils.getObjectByPath(root, true, "violations[0]/violation/replica/NRT"));
}
@SuppressWarnings({"unchecked"})
public void testFreediskPercentage() {
String autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica': 0, freedisk : '<30%'}" +
" ]" +
"}";
AutoScalingConfig cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
List<Violation> violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testFreediskPercentage.json"))).getViolations();
assertEquals(1, violations.size());
assertEquals(4, violations.get(0).getViolatingReplicas().size());
for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) {
assertEquals(10.0d, r.delta.doubleValue(), 0.1);
}
autoScalingjson = "{" +
" 'cluster-preferences': [" +
" { 'maximize': 'freedisk', 'precision': 50}," +
" { 'minimize': 'cores', 'precision': 3}" +
" ]," +
" 'cluster-policy': [" +
" { 'replica':'#ALL' , freedisk : '>30%'}" +
" ]" +
"}";
cfg = new AutoScalingConfig((Map<String, Object>) Utils.fromJSONString(autoScalingjson));
violations = cfg.getPolicy().createSession(cloudManagerWithData((Map) loadFromResource("testFreediskPercentage.json"))).getViolations();
assertEquals(1, violations.size());
assertEquals(-4d, violations.get(0).replicaCountDelta, 0.01);
for (Violation.ReplicaInfoAndErr r : violations.get(0).getViolatingReplicas()) {
assertEquals(10.0d, r.delta.doubleValue(), 0.1);
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
public static void fixRequiredProps(Map<String, Object> testData) {
Map<String, Object> clusterState = (Map<String, Object>) testData.get("clusterstate");
clusterState.forEach((collection, val) -> {
Map<String, Object> docColl = (Map<String, Object>) val;
Map<String, Object> shards = (Map<String, Object>) docColl.get("shards");
shards.forEach((shardName, val2) -> {
Map<String, Object> shard = (Map<String, Object>) val2;
Map<String, Object> replicas = (Map<String, Object>) shard.get("replicas");
replicas.forEach((coreNode, val3) -> {
Map<String, Object> replica = (Map<String, Object>) val3;
if (!replica.containsKey("node_name")) {
replica.put("node_name", "node1:80_");
}
if (!replica.containsKey("core")) {
replica.put("core", "core_" + coreNode);
}
});
});
});
Map<String, Object> replicaInfo = (Map<String, Object>) testData.get("replicaInfo");
replicaInfo.forEach((node, val) -> {
Map m1 = (Map) val;
m1.forEach((coll, val2) -> {
Map m2 = (Map) val2;
m2.forEach((shard, val3) -> {
List l3 = (List) val3;
l3.forEach(o -> {
Map replica = (Map) o;
String coreNode = replica.keySet().iterator().next().toString();
replica = (Map) replica.get(coreNode);
if (!replica.containsKey("node_name")) {
replica.put("node_name", "node1:80_");
}
if (!replica.containsKey("core")) {
replica.put("core", "core_" + coreNode);
}
});
});
});
});
}
@SuppressWarnings({"unchecked", "rawtypes"})
public void testAutoscalingPreferencesUsedWithNoPolicy() throws IOException, InterruptedException {
Map<String, Object> m = (Map<String, Object>) loadFromResource("testAutoscalingPreferencesUsedWithNoPolicy.json");
fixRequiredProps(m);
Map clusterState = (Map) m.remove("clusterstate");
Map replicaInfo = (Map) m.get("replicaInfo");
replicaInfo.forEach((node, val) -> {
Map m1 = (Map) val;
m1.forEach((coll, val2) -> {
Map m2 = (Map) val2;
m2.forEach((shard, val3) -> {
List l3 = (List) val3;
for (int i = 0; i < l3.size(); i++) {
Object o = l3.get(i);
Map m3 = (Map) o;
String name = m3.keySet().iterator().next().toString();
m3 = (Map) m3.get(name);
Replica.Type type = Replica.Type.get((String) m3.get("type"));
l3.set(i, new ReplicaInfo(name, name
, coll.toString(), shard.toString(), type, (String) node, m3));
}
});
});
});
@SuppressWarnings({"unchecked"})
AutoScalingConfig asc = m.containsKey("autoscalingJson") ? new AutoScalingConfig((Map<String, Object>) m.get("autoscalingJson")) : new AutoScalingConfig(Collections.emptyMap());
DelegatingCloudManager cloudManager = new DelegatingCloudManager(null) {
@Override
public DistribStateManager getDistribStateManager() {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() {
return asc;
}
};
}
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
@SuppressWarnings({"unchecked"})
public Set<String> getLiveNodes() {
return new HashSet<>((Collection<String>) m.get("liveNodes"));
}
@Override
public ClusterState getClusterState() throws IOException {
return ClusterState.load(0, clusterState, getLiveNodes(), ZkStateReader.getCollectionPath("c1"));
}
@Override
public Map<String, Object> getClusterProperties() {
return Collections.singletonMap("defaults", Collections.singletonMap("cluster", Collections.singletonMap(CollectionAdminParams.USE_LEGACY_REPLICA_ASSIGNMENT, false)));
}
};
}
@Override
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
public Map<String, Object> getNodeValues(String node, Collection<String> tags) {
@SuppressWarnings({"unchecked"})
Map<String, Object> result = (Map<String, Object>) Utils.getObjectByPath(m, false, Arrays.asList("nodeValues", node));
return result == null ? new HashMap<>() : result;
}
@Override
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
@SuppressWarnings({"unchecked"})
Map<String, Map<String, List<ReplicaInfo>>> result = (Map<String, Map<String, List<ReplicaInfo>>>) Utils.getObjectByPath(m, false, Arrays.asList("replicaInfo", node));
return result == null ? new HashMap<>() : result;
}
};
}
};
Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
.forCollection("c1")
.forShard(Collections.singletonList("s1"))
.assignNrtReplicas(1)
.build();
Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(cloudManager);
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
DocCollection collection = state.getCollection("c1");
Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(state, collection);
List<ReplicaPosition> replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
assertEquals(1, replicaPositions.size());
ReplicaPosition replicaPosition = replicaPositions.get(0);
assertEquals("node3:8985_", replicaPosition.node); // only node3:80_:8985 has enough space to handle the new replica
assertEquals("s1", replicaPosition.shard); // sanity check
}
public void testPolicyForEmptyCollection() throws IOException, InterruptedException {
@SuppressWarnings({"rawtypes"})
Map m = (Map) loadFromResource("testEmptyCollection.json");
@SuppressWarnings({"unchecked", "rawtypes"})
Map clusterStateMap = (Map) m.remove("clusterstate");
@SuppressWarnings({"unchecked", "rawtypes"})
Map replicaInfoMap = (Map) m.remove("replicaInfo");
@SuppressWarnings({"unchecked"})
ClusterState clusterState = ClusterState.load(1, clusterStateMap, ImmutableSet.of("node1:80_", "node2:80_"), CLUSTER_STATE);
List<String> shards = Arrays.asList("shard1", "shard2", "shard3");
Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
.forCollection("test_empty_collection")
.forShard(shards)
.assignNrtReplicas(1)
.build();
DelegatingCloudManager cloudManager = new DelegatingCloudManager(null) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return new DelegatingClusterStateProvider(null) {
@Override
public ClusterState getClusterState() {
return clusterState;
}
@Override
public Set<String> getLiveNodes() {
return clusterState.getLiveNodes();
}
};
}
@Override
public DistribStateManager getDistribStateManager() {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() {
return new AutoScalingConfig(new HashMap<>());
}
};
}
public NodeStateProvider getNodeStateProvider() {
return new DelegatingNodeStateProvider(null) {
@Override
@SuppressWarnings({"unchecked"})
public Map<String, Object> getNodeValues(String node, Collection<String> keys) {
return Collections.EMPTY_MAP;
}
@Override
@SuppressWarnings({"unchecked"})
public Map<String, Map<String, List<ReplicaInfo>>> getReplicaInfo(String node, Collection<String> keys) {
//return Collections.EMPTY_MAP;
return replicaInfoMap;
}
};
}
};
Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(cloudManager);
ClusterState state = cloudManager.getClusterStateProvider().getClusterState();
DocCollection collection = state.getCollection("test_empty_collection");
Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(state, collection);
List<ReplicaPosition> replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
assertEquals(2,replicaPositions.stream().map((rp)-> rp.node).distinct().count());
assertEquals(3,replicaPositions.stream().map((rp)-> rp.shard).distinct().count());
}
/**
* Tests that an empty policy should not persist implicitly added keys to MapWriter
* <p>
* The reason behind doing this is to ensure that implicitly added cluster preferences do not ever
* go to ZooKeeper so that we can decide whether to enable autoscaling policy framework or not.
*
* @see Assign#usePolicyFramework(DocCollection, SolrCloudManager)
*/
public void testPolicyMapWriterWithEmptyPreferences() throws IOException {
@SuppressWarnings({"rawtypes"})
List<Map> defaultPreferences = Policy.DEFAULT_PREFERENCES
.stream().map(preference -> preference.getOriginal()).collect(Collectors.toList());
// first we create a completely empty policy
Policy policy = new Policy();
// sanity check that the default cluster preferences were added implicitly
assertNotNull(policy.getClusterPreferences());
// and they were the same as the default preferences
assertEquals(policy.getClusterPreferences().size(), defaultPreferences.size());
Set<String> writtenKeys = new HashSet<>();
policy.writeMap(new MapWriter.EntryWriter() {
@Override
public MapWriter.EntryWriter put(CharSequence k, Object v) throws IOException {
writtenKeys.add(k.toString());
return this;
}
});
// but those implicitly added cluster preferences are never written by MapWriter
assertEquals(0, writtenKeys.size());
// reset
writtenKeys.clear();
// now we create a policy that only has cluster preferences which happen to be the same as the default
// preferences
policy = new Policy(Utils.makeMap(CLUSTER_PREFERENCES, defaultPreferences));
// sanity checks
assertNotNull(policy.getClusterPreferences());
assertEquals(policy.getClusterPreferences().size(), defaultPreferences.size());
policy.writeMap(new MapWriter.EntryWriter() {
@Override
public MapWriter.EntryWriter put(CharSequence k, Object v) throws IOException {
writtenKeys.add(k.toString());
return this;
}
});
// since the user explicitly added those preferences, they should be written by MapWriter
assertEquals(1, writtenKeys.size());
assertTrue(writtenKeys.contains(CLUSTER_PREFERENCES));
}
}