blob: 13ad560aa4bd2c18153e6915ed91d53a0ab2d937 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.autoscaling.sim;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Random;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import com.google.common.util.concurrent.AtomicDouble;
import org.apache.commons.math3.stat.descriptive.SummaryStatistics;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.autoscaling.AutoScalingConfig;
import org.apache.solr.client.solrj.cloud.autoscaling.PolicyHelper;
import org.apache.solr.client.solrj.cloud.autoscaling.ReplicaInfo;
import org.apache.solr.client.solrj.cloud.autoscaling.TriggerEventType;
import org.apache.solr.client.solrj.cloud.autoscaling.Variable;
import org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type;
import org.apache.solr.client.solrj.cloud.autoscaling.VersionedData;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.client.solrj.response.UpdateResponse;
import org.apache.solr.cloud.ActionThrottle;
import org.apache.solr.cloud.CloudUtil;
import org.apache.solr.cloud.Overseer;
import org.apache.solr.cloud.api.collections.AddReplicaCmd;
import org.apache.solr.cloud.api.collections.Assign;
import org.apache.solr.cloud.api.collections.CreateCollectionCmd;
import org.apache.solr.cloud.api.collections.CreateShardCmd;
import org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler;
import org.apache.solr.cloud.api.collections.SplitShardCmd;
import org.apache.solr.cloud.overseer.ClusterStateMutator;
import org.apache.solr.cloud.overseer.CollectionMutator;
import org.apache.solr.cloud.overseer.ZkWriteCommand;
import org.apache.solr.common.SolrDocumentList;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrInputDocument;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.DocRouter;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.rule.ImplicitSnitch;
import org.apache.solr.common.params.CollectionAdminParams;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.CommonAdminParams;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Utils;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.update.SolrIndexSplitter;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.MAX_SHARDS_PER_NODE;
import static org.apache.solr.common.cloud.ZkStateReader.NRT_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.PULL_REPLICAS;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICATION_FACTOR;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.TLOG_REPLICAS;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MODIFYCOLLECTION;
import static org.apache.solr.common.params.CommonParams.NAME;
/**
* Simulated {@link ClusterStateProvider}.
* <p>The following behaviors are supported:</p>
* <ul>
* <li>using autoscaling policy for replica placements</li>
* <li>maintaining and up-to-date list of /live_nodes and nodeAdded / nodeLost markers</li>
* <li>running a simulated leader election on collection changes (with throttling), when needed</li>
* <li>maintaining an up-to-date /clusterstate.json (single file format), which also tracks replica states,
* leader election changes, replica property changes, etc. Note: this file is only written,
* but never read by the framework!</li>
* <li>maintaining an up-to-date /clusterprops.json. Note: this file is only written, but never read by the
* framework!</li>
* </ul>
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class SimClusterStateProvider implements ClusterStateProvider {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final long DEFAULT_DOC_SIZE_BYTES = 2048;
private static final String BUFFERED_UPDATES = "__buffered_updates__";
private final LiveNodesSet liveNodes;
private final SimDistribStateManager stateManager;
private final SimCloudManager cloudManager;
private final Map<String, List<ReplicaInfo>> nodeReplicaMap = new ConcurrentHashMap<>();
private final Map<String, Map<String, List<ReplicaInfo>>> colShardReplicaMap = new ConcurrentHashMap<>();
private final Map<String, Object> clusterProperties = new ConcurrentHashMap<>();
private final Map<String, Map<String, Object>> collProperties = new ConcurrentHashMap<>();
private final Map<String, Map<String, Map<String, Object>>> sliceProperties = new ConcurrentHashMap<>();
private final ReentrantLock lock = new ReentrantLock();
private final Map<String, Map<String, ActionThrottle>> leaderThrottles = new ConcurrentHashMap<>();
// default map of: operation -> delay
private final Map<String, Long> defaultOpDelays = new ConcurrentHashMap<>();
// per-collection map of: collection -> op -> delay
private final Map<String, Map<String, Long>> opDelays = new ConcurrentHashMap<>();
private volatile int clusterStateVersion = 0;
private volatile String overseerLeader = null;
private volatile Map<String, Object> lastSavedProperties = null;
private final AtomicReference<Map<String, DocCollection>> collectionsStatesRef = new AtomicReference<>();
private final Random bulkUpdateRandom = new Random(0);
private transient boolean closed;
/**
* The instance needs to be initialized using the <code>sim*</code> methods in order
* to ensure proper behavior, otherwise it will behave as a cluster with zero replicas.
*/
public SimClusterStateProvider(LiveNodesSet liveNodes, SimCloudManager cloudManager) throws Exception {
this.liveNodes = liveNodes;
for (String nodeId : liveNodes.get()) {
createEphemeralLiveNode(nodeId);
}
this.cloudManager = cloudManager;
this.stateManager = cloudManager.getSimDistribStateManager();
// names are CollectionAction operation names, delays are in ms (simulated time)
defaultOpDelays.put(CollectionParams.CollectionAction.MOVEREPLICA.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.DELETEREPLICA.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.ADDREPLICA.name(), 500L);
defaultOpDelays.put(CollectionParams.CollectionAction.SPLITSHARD.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.CREATESHARD.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.DELETESHARD.name(), 5000L);
defaultOpDelays.put(CollectionParams.CollectionAction.CREATE.name(), 500L);
defaultOpDelays.put(CollectionParams.CollectionAction.DELETE.name(), 5000L);
}
// ============== SIMULATOR SETUP METHODS ====================
public void copyFrom(ClusterStateProvider other) throws Exception {
ClusterState state = other.getClusterState();
simSetClusterState(state);
clusterProperties.clear();
clusterProperties.putAll(other.getClusterProperties());
}
/**
* Initialize from an existing cluster state
* @param initialState initial cluster state
*/
public void simSetClusterState(ClusterState initialState) throws Exception {
lock.lockInterruptibly();
try {
collProperties.clear();
colShardReplicaMap.clear();
sliceProperties.clear();
nodeReplicaMap.clear();
liveNodes.clear();
for (String nodeId : stateManager.listData(ZkStateReader.LIVE_NODES_ZKNODE)) {
if (stateManager.hasData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId)) {
stateManager.removeData(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, -1);
}
if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId)) {
stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId, -1);
}
if (stateManager.hasData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId)) {
stateManager.removeData(ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId, -1);
}
}
liveNodes.addAll(initialState.getLiveNodes());
for (String nodeId : liveNodes.get()) {
createEphemeralLiveNode(nodeId);
}
initialState.forEachCollection(dc -> {
collProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>()).putAll(dc.getProperties());
opDelays.computeIfAbsent(dc.getName(), o -> new HashMap<String,Long>()).putAll(defaultOpDelays);
dc.getSlices().forEach(s -> {
sliceProperties.computeIfAbsent(dc.getName(), name -> new ConcurrentHashMap<>())
.computeIfAbsent(s.getName(), o -> new HashMap<String,Object>()).putAll(s.getProperties());
Replica leader = s.getLeader();
s.getReplicas().forEach(r -> {
Map<String, Object> props = new HashMap<>(r.getProperties());
if (leader != null && r.getName().equals(leader.getName())) {
props.put("leader", "true");
}
ReplicaInfo ri = new ReplicaInfo(r.getName(), r.getCoreName(), dc.getName(), s.getName(), r.getType(), r.getNodeName(), props);
if (leader != null && r.getName().equals(leader.getName())) {
ri.getVariables().put("leader", "true");
}
if (liveNodes.get().contains(r.getNodeName())) {
nodeReplicaMap.computeIfAbsent(r.getNodeName(), o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>())).add(ri);
colShardReplicaMap.computeIfAbsent(ri.getCollection(), name -> new ConcurrentHashMap<>())
.computeIfAbsent(ri.getShard(), shard -> new ArrayList<>()).add(ri);
} else {
log.warn("- dropping replica because its node {} is not live: {}", r.getNodeName(), r);
}
});
});
});
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
}
/**
* Reset the leader election throttles.
*/
public void simResetLeaderThrottles() {
leaderThrottles.clear();
}
private ActionThrottle getThrottle(String collection, String shard) {
return leaderThrottles.computeIfAbsent(collection, coll -> new ConcurrentHashMap<>())
.computeIfAbsent(shard, s -> new ActionThrottle("leader", 5000, cloudManager.getTimeSource()));
}
/**
* Get random node id.
* @return one of the live nodes
*/
public String simGetRandomNode() {
return simGetRandomNode(cloudManager.getRandom());
}
/**
* Get random node id.
* @param random instance of random.
* @return one of the live nodes
*/
public String simGetRandomNode(Random random) {
if (liveNodes.isEmpty()) {
return null;
}
List<String> nodes = new ArrayList<>(liveNodes.get());
return nodes.get(random.nextInt(nodes.size()));
}
// todo: maybe hook up DistribStateManager /clusterstate.json watchers?
private ReplicaInfo getReplicaInfo(Replica r) {
final List<ReplicaInfo> list = nodeReplicaMap.computeIfAbsent
(r.getNodeName(), o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>()));
synchronized (list) {
for (ReplicaInfo ri : list) {
if (r.getCoreName().equals(ri.getCore()) && r.getName().equals(ri.getName())) {
return ri;
}
}
}
return null;
}
/**
* Add a new node to the cluster.
* @param nodeId unique node id
*/
public void simAddNode(String nodeId) throws Exception {
ensureNotClosed();
if (liveNodes.contains(nodeId)) {
throw new Exception("Node " + nodeId + " already exists");
}
createEphemeralLiveNode(nodeId);
nodeReplicaMap.computeIfAbsent(nodeId, o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>()));
liveNodes.add(nodeId);
updateOverseerLeader();
}
/**
* Remove node from a cluster. This is equivalent to a situation when a node is lost.
* All replicas that were assigned to this node are marked as DOWN.
* @param nodeId node id
* @return true if a node existed and was removed
*/
public boolean simRemoveNode(String nodeId) throws Exception {
ensureNotClosed();
lock.lockInterruptibly();
try {
Set<String> collections = new HashSet<>();
// mark every replica on that node as down
boolean res = liveNodes.remove(nodeId);
setReplicaStates(nodeId, Replica.State.DOWN, collections);
if (!collections.isEmpty()) {
collectionsStatesRef.set(null);
}
// remove ephemeral nodes
stateManager.getRoot().removeEphemeralChildren(nodeId);
// create a nodeLost marker if needed
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
if (cfg.hasTriggerForEvents(TriggerEventType.NODELOST)) {
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_LOST_PATH + "/" + nodeId;
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
stateManager.makePath(path,
json, CreateMode.PERSISTENT, false);
log.debug(" -- created marker: {}", path);
}
updateOverseerLeader();
if (!collections.isEmpty()) {
simRunLeaderElection(collections, true);
}
return res;
} finally {
lock.unlock();
}
}
/**
* Remove all replica information related to dead nodes.
*/
public void simRemoveDeadNodes() throws Exception {
lock.lockInterruptibly();
try {
Set<String> myNodes = new HashSet<>(nodeReplicaMap.keySet());
myNodes.removeAll(liveNodes.get());
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
}
private synchronized void updateOverseerLeader() throws Exception {
if (overseerLeader != null && liveNodes.contains(overseerLeader)) {
return;
}
String path = Overseer.OVERSEER_ELECT + "/leader";
if (liveNodes.isEmpty()) {
overseerLeader = null;
// remove it from ZK
try {
cloudManager.getDistribStateManager().removeData(path, -1);
} catch (NoSuchElementException e) {
// ignore
}
return;
}
// pick first
overseerLeader = liveNodes.iterator().next();
log.debug("--- new Overseer leader: {}", overseerLeader);
// record it in ZK
Map<String, Object> id = new HashMap<>();
id.put("id", cloudManager.getTimeSource().getTimeNs() +
"-" + overseerLeader + "-n_0000000000");
try {
cloudManager.getDistribStateManager().makePath(path, Utils.toJSON(id), CreateMode.EPHEMERAL, false);
} catch (Exception e) {
log.warn("Exception saving overseer leader id", e);
}
}
public synchronized String simGetOverseerLeader() {
return overseerLeader;
}
// this method needs to be called under a lock
private void setReplicaStates(String nodeId, Replica.State state, Set<String> changedCollections) {
List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent(nodeId, o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>()));
synchronized (replicas) {
replicas.forEach(r -> {
r.getVariables().put(ZkStateReader.STATE_PROP, state.toString());
if (state != Replica.State.ACTIVE) {
r.getVariables().remove(ZkStateReader.LEADER_PROP);
}
changedCollections.add(r.getCollection());
});
}
}
// this method needs to be called under a lock
private void createEphemeralLiveNode(String nodeId) throws Exception {
DistribStateManager mgr = stateManager.withEphemeralId(nodeId);
mgr.makePath(ZkStateReader.LIVE_NODES_ZKNODE + "/" + nodeId, null, CreateMode.EPHEMERAL, true);
AutoScalingConfig cfg = stateManager.getAutoScalingConfig(null);
if (cfg.hasTriggerForEvents(TriggerEventType.NODEADDED)) {
byte[] json = Utils.toJSON(Collections.singletonMap("timestamp", cloudManager.getTimeSource().getEpochTimeNs()));
String path = ZkStateReader.SOLR_AUTOSCALING_NODE_ADDED_PATH + "/" + nodeId;
log.debug("-- creating marker: {}", path);
mgr.makePath(path, json, CreateMode.EPHEMERAL, true);
}
}
/**
* Restore a previously removed node. This also simulates a short replica recovery state.
* @param nodeId node id to restore
* @return true when this operation restored any replicas, false otherwise (empty node).
*/
public boolean simRestoreNode(String nodeId) throws Exception {
liveNodes.add(nodeId);
createEphemeralLiveNode(nodeId);
Set<String> collections = new HashSet<>();
lock.lockInterruptibly();
try {
setReplicaStates(nodeId, Replica.State.RECOVERING, collections);
} finally {
lock.unlock();
}
cloudManager.getTimeSource().sleep(1000);
lock.lockInterruptibly();
try {
setReplicaStates(nodeId, Replica.State.ACTIVE, collections);
if (!collections.isEmpty()) {
collectionsStatesRef.set(null);
simRunLeaderElection(collections, true);
return true;
} else {
return false;
}
} finally {
lock.unlock();
}
}
/**
* Add a new replica. Note that if any details of a replica (node, coreNodeName, SolrCore name, etc)
* are missing they will be filled in using the policy framework.
* @param message replica details
* @param results result of the operation
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simAddReplica(ZkNodeProps message, NamedList results) throws Exception {
if (message.getStr(CommonAdminParams.ASYNC) != null) {
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
}
ClusterState clusterState = getClusterState();
DocCollection coll = clusterState.getCollection(message.getStr(ZkStateReader.COLLECTION_PROP));
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
Replica.Type replicaType = Replica.Type.valueOf(message.getStr(ZkStateReader.REPLICA_TYPE, Replica.Type.NRT.name()).toUpperCase(Locale.ROOT));
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
replicaTypesVsCount.put(Replica.Type.NRT, message.getInt(NRT_REPLICAS, replicaType == Replica.Type.NRT ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.TLOG, message.getInt(TLOG_REPLICAS, replicaType == Replica.Type.TLOG ? 1 : 0));
replicaTypesVsCount.put(Replica.Type.PULL, message.getInt(PULL_REPLICAS, replicaType == Replica.Type.PULL ? 1 : 0));
int totalReplicas = 0;
for (Map.Entry<Replica.Type, Integer> entry : replicaTypesVsCount.entrySet()) {
totalReplicas += entry.getValue();
}
if (totalReplicas > 1) {
if (message.getStr(CoreAdminParams.NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'name' parameter is specified");
}
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) != null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Cannot create " + totalReplicas + " replicas if 'coreNodeName' parameter is specified");
}
}
List<ReplicaPosition> replicaPositions = AddReplicaCmd.buildReplicaPositions(cloudManager, clusterState, coll.getName(), message, replicaTypesVsCount, sessionWrapper);
for (ReplicaPosition replicaPosition : replicaPositions) {
AddReplicaCmd.CreateReplica createReplica = AddReplicaCmd.assignReplicaDetails(cloudManager, clusterState, message, replicaPosition);
if (message.getStr(CoreAdminParams.CORE_NODE_NAME) == null) {
createReplica.coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
}
ReplicaInfo ri = new ReplicaInfo(
createReplica.coreNodeName,
createReplica.coreName,
createReplica.collectionName,
createReplica.sliceName,
createReplica.replicaType,
createReplica.node,
message.getProperties()
);
simAddReplica(ri.getNode(), ri, true);
}
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
results.add("success", "");
}
/**
* Add a replica. Note that all details of the replica must be present here, including
* node, coreNodeName and SolrCore name.
* @param nodeId node id where the replica will be added
* @param replicaInfo replica info
* @param runLeaderElection if true then run a leader election after adding the replica.
*/
public void simAddReplica(String nodeId, ReplicaInfo replicaInfo, boolean runLeaderElection) throws Exception {
ensureNotClosed();
lock.lockInterruptibly();
try {
// make sure SolrCore name is unique across cluster and coreNodeName within collection
for (Map.Entry<String, List<ReplicaInfo>> e : nodeReplicaMap.entrySet()) {
final List<ReplicaInfo> replicas = e.getValue();
synchronized (replicas) {
for (ReplicaInfo ri : replicas) {
if (ri.getCore().equals(replicaInfo.getCore())) {
throw new Exception("Duplicate SolrCore name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
}
if (ri.getName().equals(replicaInfo.getName()) && ri.getCollection().equals(replicaInfo.getCollection())) {
throw new Exception("Duplicate coreNode name for existing=" + ri + " on node " + e.getKey() + " and new=" + replicaInfo);
}
}
}
}
if (!liveNodes.contains(nodeId)) {
throw new Exception("Target node " + nodeId + " is not live: " + liveNodes);
}
// verify info
if (replicaInfo.getCore() == null) {
throw new Exception("Missing core: " + replicaInfo);
}
// XXX replica info is not supposed to have this as a variable
replicaInfo.getVariables().remove(ZkStateReader.SHARD_ID_PROP);
if (replicaInfo.getName() == null) {
throw new Exception("Missing name: " + replicaInfo);
}
if (replicaInfo.getNode() == null) {
throw new Exception("Missing node: " + replicaInfo);
}
if (!replicaInfo.getNode().equals(nodeId)) {
throw new Exception("Wrong node (not " + nodeId + "): " + replicaInfo);
}
opDelay(replicaInfo.getCollection(), CollectionParams.CollectionAction.ADDREPLICA.name());
// mark replica as active
replicaInfo.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.ACTIVE.toString());
// add a property expected in Policy calculations, if missing
if (replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute) == null) {
replicaInfo.getVariables().put(Type.CORE_IDX.metricsAttribute, new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES));
replicaInfo.getVariables().put(Variable.coreidxsize,
new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)));
}
nodeReplicaMap.computeIfAbsent(nodeId, o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>())).add(replicaInfo);
colShardReplicaMap.computeIfAbsent(replicaInfo.getCollection(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(replicaInfo.getShard(), s -> new ArrayList<>())
.add(replicaInfo);
Map<String, Object> values = cloudManager.getSimNodeStateProvider().simGetAllNodeValues()
.computeIfAbsent(nodeId, id -> new ConcurrentHashMap<>(SimCloudManager.createNodeValues(id)));
// update the number of cores and freedisk in node values
Number cores = (Number)values.get(ImplicitSnitch.CORES);
if (cores == null) {
cores = 0;
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores.intValue() + 1);
Number disk = (Number)values.get(ImplicitSnitch.DISK);
if (disk == null) {
throw new Exception("Missing '" + ImplicitSnitch.DISK + "' in node metrics for node " + nodeId);
//disk = SimCloudManager.DEFAULT_FREE_DISK;
}
long replicaSize = ((Number)replicaInfo.getVariable(Type.CORE_IDX.metricsAttribute)).longValue();
Number replicaSizeGB = (Number)Type.CORE_IDX.convertVal(replicaSize);
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk.doubleValue() - replicaSizeGB.doubleValue());
// fake metrics
String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, replicaInfo.getCollection(),
replicaInfo.getShard(),
Utils.parseMetricsReplicaName(replicaInfo.getCollection(), replicaInfo.getCore()));
cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests");
cloudManager.getMetricManager().registry(registry).counter("QUERY./select.requests");
cloudManager.getMetricManager().registerGauge(null, registry,
() -> replicaSize, "", true, Type.CORE_IDX.metricsAttribute);
// at this point nuke our cached DocCollection state
collectionsStatesRef.set(null);
log.trace("-- simAddReplica {}", replicaInfo);
if (runLeaderElection) {
simRunLeaderElection(replicaInfo.getCollection(), replicaInfo.getShard(), true);
}
} finally {
lock.unlock();
}
}
/**
* Remove replica.
* @param nodeId node id
* @param coreNodeName coreNodeName
*/
public void simRemoveReplica(String nodeId, String collection, String coreNodeName) throws Exception {
ensureNotClosed();
lock.lockInterruptibly();
try {
final List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent
(nodeId, o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>()));
synchronized (replicas) {
for (int i = 0; i < replicas.size(); i++) {
if (collection.equals(replicas.get(i).getCollection()) && coreNodeName.equals(replicas.get(i).getName())) {
ReplicaInfo ri = replicas.remove(i);
colShardReplicaMap.computeIfAbsent(ri.getCollection(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(ri.getShard(), s -> new ArrayList<>())
.remove(ri);
collectionsStatesRef.set(null);
opDelay(ri.getCollection(), CollectionParams.CollectionAction.DELETEREPLICA.name());
// update the number of cores in node values, if node is live
if (liveNodes.contains(nodeId)) {
Number cores = (Number)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.CORES);
if (cores == null || cores.intValue() == 0) {
throw new Exception("Unexpected value of 'cores' (" + cores + ") on node: " + nodeId);
}
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.CORES, cores.intValue() - 1);
Number disk = (Number)cloudManager.getSimNodeStateProvider().simGetNodeValue(nodeId, ImplicitSnitch.DISK);
if (disk == null || disk.doubleValue() == 0.0) {
throw new Exception("Unexpected value of 'freedisk' (" + disk + ") on node: " + nodeId);
}
if (ri.getVariable(Type.CORE_IDX.metricsAttribute) == null) {
throw new RuntimeException("Missing replica size: " + ri);
}
long replicaSize = ((Number)ri.getVariable(Type.CORE_IDX.metricsAttribute)).longValue();
Number replicaSizeGB = (Number)Type.CORE_IDX.convertVal(replicaSize);
cloudManager.getSimNodeStateProvider().simSetNodeValue(nodeId, ImplicitSnitch.DISK, disk.doubleValue() + replicaSizeGB.doubleValue());
}
log.trace("-- simRemoveReplica {}", ri);
simRunLeaderElection(ri.getCollection(), ri.getShard(), true);
return;
}
}
}
throw new Exception("Replica " + coreNodeName + " not found on node " + nodeId);
} finally {
lock.unlock();
}
}
/**
* Save clusterstate.json to {@link DistribStateManager}.
* @return saved state
*/
private ClusterState saveClusterState(ClusterState state) throws IOException {
ensureNotClosed();
byte[] data = Utils.toJSON(state);
try {
VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_STATE);
int version = oldData != null ? oldData.getVersion() : 0;
assert clusterStateVersion == version : "local clusterStateVersion out of sync";
stateManager.setData(ZkStateReader.CLUSTER_STATE, data, version);
log.debug("** saved cluster state version {}", version);
clusterStateVersion++;
} catch (Exception e) {
throw new IOException(e);
}
return state;
}
/**
* Delay an operation by a configured amount.
* @param collection collection name
* @param op operation name.
*/
private void opDelay(String collection, String op) throws InterruptedException {
Map<String, Long> delays = opDelays.get(collection);
if (delays == null || delays.isEmpty() || !delays.containsKey(op)) {
return;
}
cloudManager.getTimeSource().sleep(delays.get(op));
}
public void simSetOpDelays(String collection, Map<String, Long> delays) {
Map<String, Long> currentDelays = opDelays.getOrDefault(collection, Collections.emptyMap());
Map<String, Long> newDelays = new HashMap<>(currentDelays);
delays.forEach((k, v) -> {
if (v == null) {
newDelays.remove(k);
} else {
newDelays.put(k, v);
}
});
opDelays.put(collection, newDelays);
}
/**
* Simulate running a shard leader election. This operation is a no-op if a leader already exists.
* If a new leader is elected the cluster state is saved.
* @param collections list of affected collections
* @param saveClusterState if true then save cluster state regardless of changes.
*/
private void simRunLeaderElection(Collection<String> collections, boolean saveClusterState) throws Exception {
ensureNotClosed();
if (saveClusterState) {
lock.lockInterruptibly();
try {
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
}
ClusterState state = getClusterState();
state.forEachCollection(dc -> {
if (!collections.contains(dc.getName())) {
return;
}
dc.getSlices().forEach(s -> {
if (log.isTraceEnabled()) {
log.trace("-- submit leader election for {} / {}", dc.getName(), s.getName());
}
cloudManager.submit(() -> {
simRunLeaderElection(dc.getName(), s.getName(), saveClusterState);
return true;
});
});
});
}
private void simRunLeaderElection(final String collection, final String slice,
final boolean saveState) throws Exception {
log.trace("Attempting leader election ({} / {})", collection, slice);
final AtomicBoolean stateChanged = new AtomicBoolean(Boolean.FALSE);
lock.lockInterruptibly();
try {
final ClusterState state = getClusterState();
final DocCollection col = state.getCollectionOrNull(collection);
if (null == col) {
log.trace("-- collection does not exist (anymore), skipping leader election ({} / {})",
collection, slice);
return;
}
final Slice s = col.getSlice(slice);
if (null == s) {
log.trace("-- slice does not exist, skipping leader election ({} / {})",
collection, slice);
return;
}
if (s.getState() == Slice.State.INACTIVE) {
if (log.isTraceEnabled()) {
log.trace("-- slice state is {}, skipping leader election ({} / {})",
s.getState(), collection, slice);
}
return;
}
if (s.getReplicas().isEmpty()) {
log.trace("-- no replicas, skipping leader election ({} / {})", collection, slice);
return;
}
final Replica leader = s.getLeader();
if (null != leader && liveNodes.contains(leader.getNodeName())) {
log.trace("-- already has livenode leader, skipping leader election {} / {}",
collection, slice);
return;
}
if (s.getState() != Slice.State.ACTIVE) {
if (log.isTraceEnabled()) {
log.trace("-- slice state is {}, but I will run leader election anyway ({} / {})",
s.getState(), collection, slice);
}
}
log.debug("Running leader election ({} / {})", collection, slice);
ActionThrottle lt = getThrottle(collection, s.getName());
synchronized (lt) {
// collect all active and live
List<ReplicaInfo> active = new ArrayList<>();
AtomicBoolean alreadyHasLeader = new AtomicBoolean(false);
s.getReplicas().forEach(r -> {
// find our ReplicaInfo for this replica
ReplicaInfo ri = getReplicaInfo(r);
if (ri == null) {
throw new IllegalStateException("-- could not find ReplicaInfo for replica " + r);
}
synchronized (ri) {
if (r.isActive(liveNodes.get())) {
if (ri.getVariables().get(ZkStateReader.LEADER_PROP) != null) {
if (log.isTraceEnabled()) {
log.trace("-- found existing leader {} / {}: {}, {}", collection, s.getName(), ri, r);
}
alreadyHasLeader.set(true);
return;
} else {
active.add(ri);
}
} else { // if it's on a node that is not live mark it down
if (log.isTraceEnabled()) {
log.trace("-- replica not active on live nodes: {}, {}", liveNodes.get(), r);
}
if (!liveNodes.contains(r.getNodeName())) {
ri.getVariables().put(ZkStateReader.STATE_PROP, Replica.State.DOWN.toString());
ri.getVariables().remove(ZkStateReader.LEADER_PROP);
stateChanged.set(true);
}
}
}
});
if (alreadyHasLeader.get()) {
if (log.isTraceEnabled()) {
log.trace("-- already has leader {} / {}: {}", collection, s.getName(), s);
}
return;
}
if (active.isEmpty()) {
if (log.isWarnEnabled()) {
log.warn("Can't find any active replicas for {} / {}: {}", collection, s.getName(), s);
}
if (log.isDebugEnabled()) {
log.debug("-- liveNodes: {}", liveNodes.get());
}
return;
}
// pick first active one
ReplicaInfo ri = null;
for (ReplicaInfo a : active) {
if (!a.getType().equals(Replica.Type.PULL)) {
ri = a;
break;
}
}
if (ri == null) {
log.warn("-- can't find any suitable replica type for {} / {}: {}", collection, s.getName(), s);
return;
}
// now mark the leader election throttle
lt.minimumWaitBetweenActions();
lt.markAttemptingAction();
synchronized (ri) {
ri.getVariables().put(ZkStateReader.LEADER_PROP, "true");
}
if (log.isDebugEnabled()) {
log.debug("-- elected new leader for {} / {} (currentVersion={}): {}", collection,
s.getName(), clusterStateVersion, ri);
}
stateChanged.set(true);
}
} finally {
if (stateChanged.get() || saveState) {
collectionsStatesRef.set(null);
}
lock.unlock();
}
}
/**
* Create a new collection. This operation uses policy framework for node and replica assignments.
* @param props collection details
* @param results results of the operation.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simCreateCollection(ZkNodeProps props, NamedList results) throws Exception {
ensureNotClosed();
if (props.getStr(CommonAdminParams.ASYNC) != null) {
results.add(CoreAdminParams.REQUESTID, props.getStr(CommonAdminParams.ASYNC));
}
boolean waitForFinalState = props.getBool(CommonAdminParams.WAIT_FOR_FINAL_STATE, false);
final String collectionName = props.getStr(NAME);
log.debug("-- simCreateCollection {}, currentVersion={}", collectionName, clusterStateVersion);
String router = props.getStr("router.name", DocRouter.DEFAULT_NAME);
// fail fast if parameters are wrong or incomplete
List<String> shardNames = CreateCollectionCmd.populateShardNames(props, router);
int maxShardsPerNode = props.getInt(MAX_SHARDS_PER_NODE, 1);
if (maxShardsPerNode == -1) maxShardsPerNode = Integer.MAX_VALUE;
CreateCollectionCmd.checkReplicaTypes(props);
// always force getting fresh state
lock.lockInterruptibly();
try {
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
final ClusterState clusterState = getClusterState();
String withCollection = props.getStr(CollectionAdminParams.WITH_COLLECTION);
String wcShard = null;
if (withCollection != null) {
if (!clusterState.hasCollection(withCollection)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The 'withCollection' does not exist: " + withCollection);
} else {
DocCollection collection = clusterState.getCollection(withCollection);
if (collection.getActiveSlices().size() > 1) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "The `withCollection` must have only one shard, found: " + collection.getActiveSlices().size());
}
wcShard = collection.getActiveSlices().iterator().next().getName();
}
}
final String withCollectionShard = wcShard;
ZkWriteCommand cmd = ZkWriteCommand.noop();
lock.lockInterruptibly();
try {
cmd = new ClusterStateMutator(cloudManager).createCollection(clusterState, props);
if (cmd.noop) {
log.warn("Collection {} already exists. exit", collectionName);
log.debug("-- collection: {}, clusterState: {}", collectionName, clusterState);
results.add("success", "no-op");
return;
}
// add collection props
DocCollection coll = cmd.collection;
collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).putAll(coll.getProperties());
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
// add slice props
coll.getSlices().forEach(s -> {
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll.getName(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(s.getName(), slice -> new ConcurrentHashMap<>());
s.getProperties().forEach((k, v) -> {
if (k != null && v != null) {
sliceProps.put(k, v);
}
});
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(s.getName(), sh -> new ArrayList<>());
});
// modify the `withCollection` and store this new collection's name with it
if (withCollection != null) {
ZkNodeProps message = new ZkNodeProps(
Overseer.QUEUE_OPERATION, MODIFYCOLLECTION.toString(),
ZkStateReader.COLLECTION_PROP, withCollection,
CollectionAdminParams.COLOCATED_WITH, collectionName);
cmd = new CollectionMutator(cloudManager).modifyCollection(clusterState,message);
}
// force recreation of collection states
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
opDelays.computeIfAbsent(collectionName, c -> new HashMap<>()).putAll(defaultOpDelays);
opDelay(collectionName, CollectionParams.CollectionAction.CREATE.name());
AtomicReference<PolicyHelper.SessionWrapper> sessionWrapper = new AtomicReference<>();
List<ReplicaPosition> replicaPositions = CreateCollectionCmd.buildReplicaPositions(cloudManager, getClusterState(), cmd.collection, props,
shardNames, sessionWrapper);
if (sessionWrapper.get() != null) {
sessionWrapper.get().release();
}
// calculate expected number of positions
int numTlogReplicas = props.getInt(TLOG_REPLICAS, 0);
int numNrtReplicas = props.getInt(NRT_REPLICAS, props.getInt(REPLICATION_FACTOR, numTlogReplicas>0?0:1));
int numPullReplicas = props.getInt(PULL_REPLICAS, 0);
int totalReplicas = shardNames.size() * (numNrtReplicas + numPullReplicas + numTlogReplicas);
if (totalReplicas != replicaPositions.size()) {
throw new RuntimeException("unexpected number of replica positions: expected " + totalReplicas + " but got " + replicaPositions.size());
}
final CountDownLatch finalStateLatch = new CountDownLatch(replicaPositions.size());
AtomicInteger replicaNum = new AtomicInteger(1);
replicaPositions.forEach(pos -> {
if (withCollection != null) {
// check that we have a replica of `withCollection` on this node and if not, create one
DocCollection collection = clusterState.getCollection(withCollection);
List<Replica> replicas = collection.getReplicas(pos.node);
if (replicas == null || replicas.isEmpty()) {
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", withCollection, withCollectionShard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
collection.getReplicas().size() + 1);
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, withCollection, 0),
coreName, withCollection, withCollectionShard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
simAddReplica(pos.node, ri, false);
// do not count down the latch here
return true;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.NODE_NAME_PROP, pos.node);
replicaProps.put(ZkStateReader.REPLICA_TYPE, pos.type.toString());
String coreName = String.format(Locale.ROOT, "%s_%s_replica_%s%s", collectionName, pos.shard, pos.type.name().substring(0,1).toLowerCase(Locale.ROOT),
replicaNum.getAndIncrement());
try {
replicaProps.put(ZkStateReader.CORE_NAME_PROP, coreName);
replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(0));
replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(0));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
coreName, collectionName, pos.shard, pos.type, pos.node, replicaProps);
cloudManager.submit(() -> {
simAddReplica(pos.node, ri, true);
finalStateLatch.countDown();
return true;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
});
// force recreation of collection states
lock.lockInterruptibly();
try {
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
//simRunLeaderElection(Collections.singleton(collectionName), true);
if (waitForFinalState) {
boolean finished = finalStateLatch.await(cloudManager.getTimeSource().convertDelay(TimeUnit.SECONDS, 60, TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
if (!finished) {
results.add("failure", "Timeout waiting for all replicas to become active.");
return;
}
}
results.add("success", "");
log.debug("-- finished createCollection {}, currentVersion={}", collectionName, clusterStateVersion);
}
/**
* Delete a collection
* @param collection collection name
* @param async async id
* @param results results of the operation
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simDeleteCollection(String collection, String async, NamedList results) throws Exception {
ensureNotClosed();
if (async != null) {
results.add(CoreAdminParams.REQUESTID, async);
}
lock.lockInterruptibly();
try {
collProperties.remove(collection);
sliceProperties.remove(collection);
leaderThrottles.remove(collection);
colShardReplicaMap.remove(collection);
SplitShardCmd.unlockForSplit(cloudManager, collection, null);
opDelay(collection, CollectionParams.CollectionAction.DELETE.name());
opDelays.remove(collection);
nodeReplicaMap.forEach((n, replicas) -> {
synchronized (replicas) {
for (Iterator<ReplicaInfo> it = replicas.iterator(); it.hasNext(); ) {
ReplicaInfo ri = it.next();
if (ri.getCollection().equals(collection)) {
it.remove();
// update the number of cores in node values
Number cores = (Number) cloudManager.getSimNodeStateProvider().simGetNodeValue(n, "cores");
if (cores != null) { // node is still up
if (cores.intValue() == 0) {
throw new RuntimeException("Unexpected value of 'cores' (" + cores + ") on node: " + n);
}
try {
cloudManager.getSimNodeStateProvider().simSetNodeValue(n, "cores", cores.intValue() - 1);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("interrupted");
}
}
}
}
}
});
collectionsStatesRef.set(null);
results.add("success", "");
} catch (Exception e) {
log.warn("Exception", e);
} finally {
lock.unlock();
}
}
/**
* Remove all collections.
*/
public void simDeleteAllCollections() throws Exception {
lock.lockInterruptibly();
try {
collectionsStatesRef.set(null);
collProperties.clear();
sliceProperties.clear();
leaderThrottles.clear();
nodeReplicaMap.clear();
colShardReplicaMap.clear();
cloudManager.getSimNodeStateProvider().simGetAllNodeValues().forEach((n, values) -> {
values.put(ImplicitSnitch.CORES, 0);
values.put(ImplicitSnitch.DISK, SimCloudManager.DEFAULT_FREE_DISK);
values.put(Variable.Type.TOTALDISK.tagName, SimCloudManager.DEFAULT_TOTAL_DISK);
values.put(ImplicitSnitch.SYSLOADAVG, 1.0);
values.put(ImplicitSnitch.HEAPUSAGE, 123450000);
});
cloudManager.getDistribStateManager().removeRecursively(ZkStateReader.COLLECTIONS_ZKNODE, true, false);
} finally {
lock.unlock();
}
}
private static final Set<String> NO_COPY_PROPS = new HashSet<>(Arrays.asList(
ZkStateReader.CORE_NODE_NAME_PROP,
ZkStateReader.NODE_NAME_PROP,
ZkStateReader.BASE_URL_PROP,
ZkStateReader.CORE_NAME_PROP
));
/**
* Move replica. This uses a similar algorithm as {@link org.apache.solr.cloud.api.collections.MoveReplicaCmd} <code>moveNormalReplica(...)</code> method.
* @param message operation details
* @param results operation results.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simMoveReplica(ZkNodeProps message, NamedList results) throws Exception {
ensureNotClosed();
if (message.getStr(CommonAdminParams.ASYNC) != null) {
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
}
String collection = message.getStr(COLLECTION_PROP);
String targetNode = message.getStr("targetNode");
ClusterState clusterState = getClusterState();
DocCollection coll = clusterState.getCollection(collection);
if (coll == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection: " + collection + " does not exist");
}
String replicaName = message.getStr(REPLICA_PROP);
Replica replica = coll.getReplica(replicaName);
if (replica == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST,
"Collection: " + collection + " replica: " + replicaName + " does not exist");
}
Slice slice = null;
for (Slice s : coll.getSlices()) {
if (s.getReplicas().contains(replica)) {
slice = s;
}
}
if (slice == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Replica has no 'slice' property! : " + replica);
}
opDelay(collection, CollectionParams.CollectionAction.MOVEREPLICA.name());
ReplicaInfo ri = getReplicaInfo(replica);
if (ri != null) {
if (ri.getVariable(Type.CORE_IDX.tagName) != null) {
// simulate very large replicas - add additional delay of 5s / GB
long sizeInGB = ((Number)ri.getVariable(Type.CORE_IDX.tagName)).longValue();
long opDelay = opDelays.getOrDefault(ri.getCollection(), Collections.emptyMap())
.getOrDefault(CollectionParams.CollectionAction.MOVEREPLICA.name(), defaultOpDelays.get(CollectionParams.CollectionAction.MOVEREPLICA.name()));
opDelay = TimeUnit.MILLISECONDS.toSeconds(opDelay);
if (sizeInGB > opDelay) {
// add 5s per each GB above the threshold
cloudManager.getTimeSource().sleep(TimeUnit.SECONDS.toMillis(sizeInGB - opDelay) * 5);
}
}
}
// TODO: for now simulate moveNormalReplica sequence, where we first add new replica and then delete the old one
String newSolrCoreName = Assign.buildSolrCoreName(stateManager, coll, slice.getName(), replica.getType());
String coreNodeName = Assign.assignCoreNodeName(stateManager, coll);
// copy other properties
Map<String, Object> props = replica.getProperties().entrySet().stream()
.filter(e -> !NO_COPY_PROPS.contains(e.getKey()))
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue()));
ReplicaInfo newReplica = new ReplicaInfo(coreNodeName, newSolrCoreName, collection, slice.getName(), replica.getType(), targetNode, props);
log.debug("-- new replica: {}", newReplica);
// xxx should run leader election here already?
simAddReplica(targetNode, newReplica, false);
// this will trigger leader election
simRemoveReplica(replica.getNodeName(), collection, replica.getName());
results.add("success", "");
}
/**
* Create a new shard. This uses a similar algorithm as {@link CreateShardCmd}.
* @param message operation details
* @param results operation results
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simCreateShard(ZkNodeProps message, NamedList results) throws Exception {
ensureNotClosed();
if (message.getStr(CommonAdminParams.ASYNC) != null) {
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
}
String collectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
ClusterState clusterState = getClusterState();
lock.lockInterruptibly();
try {
ZkWriteCommand cmd = new CollectionMutator(cloudManager).createShard(clusterState, message);
if (cmd.noop) {
results.add("success", "no-op");
return;
}
opDelay(collectionName, CollectionParams.CollectionAction.CREATESHARD.name());
// copy shard properties -- our equivalent of creating an empty shard in cluster state
DocCollection collection = cmd.collection;
Slice slice = collection.getSlice(sliceName);
Map<String, Object> props = sliceProperties.computeIfAbsent(collection.getName(), c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName, s -> new ConcurrentHashMap<>());
props.clear();
slice.getProperties().entrySet().stream()
.filter(e -> !e.getKey().equals("range"))
.filter(e -> !e.getKey().equals("replicas"))
.forEach(e -> props.put(e.getKey(), e.getValue()));
// 2. create new replicas
EnumMap<Replica.Type, Integer> replicaTypesVsCount = new EnumMap<>(Replica.Type.class);
int numNrtReplicas = message.getInt(NRT_REPLICAS, message.getInt(REPLICATION_FACTOR, collection.getInt(NRT_REPLICAS, collection.getInt(REPLICATION_FACTOR, 1))));
int numTlogReplicas = message.getInt(TLOG_REPLICAS, message.getInt(TLOG_REPLICAS, collection.getInt(TLOG_REPLICAS, 0)));
int numPullReplicas = message.getInt(PULL_REPLICAS, message.getInt(PULL_REPLICAS, collection.getInt(PULL_REPLICAS, 0)));
replicaTypesVsCount.put(Replica.Type.NRT, numNrtReplicas);
replicaTypesVsCount.put(Replica.Type.TLOG, numTlogReplicas);
replicaTypesVsCount.put(Replica.Type.PULL, numPullReplicas);
ZkNodeProps addReplicasProps = new ZkNodeProps(
COLLECTION_PROP, collectionName,
SHARD_ID_PROP, sliceName,
ZkStateReader.NRT_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.NRT)),
ZkStateReader.TLOG_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.TLOG)),
ZkStateReader.PULL_REPLICAS, String.valueOf(replicaTypesVsCount.get(Replica.Type.PULL)),
OverseerCollectionMessageHandler.CREATE_NODE_SET, message.getStr(OverseerCollectionMessageHandler.CREATE_NODE_SET)
);
try {
// this also takes care of leader election
simAddReplica(addReplicasProps, results);
} catch (Exception e) {
throw new RuntimeException(e);
}
collProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>());
results.add("success", "");
} finally {
lock.unlock();
}
}
/**
* Split a shard. This uses a similar algorithm as {@link SplitShardCmd}, including simulating its
* quirks, and leaving the original parent slice in place.
* @param message operation details
* @param results operation results.
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simSplitShard(ZkNodeProps message, NamedList results) throws Exception {
ensureNotClosed();
if (message.getStr(CommonAdminParams.ASYNC) != null) {
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
}
String collectionName = message.getStr(COLLECTION_PROP);
AtomicReference<String> sliceName = new AtomicReference<>();
sliceName.set(message.getStr(SHARD_ID_PROP));
String splitKey = message.getStr("split.key");
String methodStr = message.getStr(CommonAdminParams.SPLIT_METHOD, SolrIndexSplitter.SplitMethod.REWRITE.toLower());
SolrIndexSplitter.SplitMethod splitMethod = SolrIndexSplitter.SplitMethod.get(methodStr);
if (splitMethod == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown value '" + CommonAdminParams.SPLIT_METHOD +
": " + methodStr);
}
ClusterState clusterState = getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
Slice parentSlice = SplitShardCmd.getParentSlice(clusterState, collectionName, sliceName, splitKey);
Replica leader = parentSlice.getLeader();
// XXX leader election may not have happened yet - should we require it?
if (leader == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Shard " + collectionName +
" / " + sliceName.get() + " has no leader and can't be split");
}
SplitShardCmd.checkDiskSpace(collectionName, sliceName.get(), leader, splitMethod, cloudManager);
SplitShardCmd.lockForSplit(cloudManager, collectionName, sliceName.get());
// start counting buffered updates
Map<String, Object> props = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), ss -> new ConcurrentHashMap<>());
if (props.containsKey(BUFFERED_UPDATES)) {
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
throw new Exception("--- SOLR-12729: Overlapping splitShard commands for " + collectionName + "/" + sliceName.get());
}
props.put(BUFFERED_UPDATES, new AtomicLong());
List<DocRouter.Range> subRanges = new ArrayList<>();
List<String> subSlices = new ArrayList<>();
List<String> subShardNames = new ArrayList<>();
opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
SplitShardCmd.fillRanges(cloudManager, message, collection, parentSlice, subRanges, subSlices, subShardNames, true);
// add replicas for new subShards
int repFactor = parentSlice.getReplicas().size();
Assign.AssignRequest assignRequest = new Assign.AssignRequestBuilder()
.forCollection(collectionName)
.forShard(subSlices)
.assignNrtReplicas(repFactor)
.assignTlogReplicas(0)
.assignPullReplicas(0)
.onNodes(new ArrayList<>(clusterState.getLiveNodes()))
.build();
Assign.AssignStrategyFactory assignStrategyFactory = new Assign.AssignStrategyFactory(cloudManager);
Assign.AssignStrategy assignStrategy = assignStrategyFactory.create(clusterState, collection);
// reproduce the bug
List<ReplicaPosition> replicaPositions = assignStrategy.assign(cloudManager, assignRequest);
PolicyHelper.SessionWrapper sessionWrapper = PolicyHelper.getLastSessionWrapper(true);
if (sessionWrapper != null) sessionWrapper.release();
// adjust numDocs / deletedDocs / maxDoc
String numDocsStr = String.valueOf(getReplicaInfo(leader).getVariable("SEARCHER.searcher.numDocs", "0"));
long numDocs = Long.parseLong(numDocsStr);
long newNumDocs = numDocs / subSlices.size();
long remainderDocs = numDocs % subSlices.size();
long newIndexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES + newNumDocs * DEFAULT_DOC_SIZE_BYTES;
long remainderIndexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES + remainderDocs * DEFAULT_DOC_SIZE_BYTES;
String remainderSlice = null;
// add slice props
for (int i = 0; i < subRanges.size(); i++) {
String subSlice = subSlices.get(i);
DocRouter.Range range = subRanges.get(i);
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(subSlice, ss -> new ConcurrentHashMap<>());
sliceProps.put(Slice.RANGE, range);
sliceProps.put(Slice.PARENT, sliceName.get());
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.CONSTRUCTION.toString());
sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
}
// add replicas
for (ReplicaPosition replicaPosition : replicaPositions) {
String subSliceName = replicaPosition.shard;
String subShardNodeName = replicaPosition.node;
// String solrCoreName = collectionName + "_" + subSliceName + "_replica_n" + (replicaPosition.index);
String solrCoreName = Assign.buildSolrCoreName(collectionName, subSliceName, replicaPosition.type, Assign.incAndGetId(stateManager, collectionName, 0));
Map<String, Object> replicaProps = new HashMap<>();
replicaProps.put(ZkStateReader.SHARD_ID_PROP, replicaPosition.shard);
replicaProps.put(ZkStateReader.NODE_NAME_PROP, replicaPosition.node);
replicaProps.put(ZkStateReader.REPLICA_TYPE, replicaPosition.type.toString());
replicaProps.put(ZkStateReader.BASE_URL_PROP, Utils.getBaseUrlForNodeName(subShardNodeName, "http"));
long replicasNumDocs = newNumDocs;
long replicasIndexSize = newIndexSize;
if (remainderSlice == null) {
remainderSlice = subSliceName;
}
if (remainderSlice.equals(subSliceName)) { // only add to one sub slice
replicasNumDocs += remainderDocs;
replicasIndexSize += remainderIndexSize;
}
replicaProps.put("SEARCHER.searcher.numDocs", new AtomicLong(replicasNumDocs));
replicaProps.put("SEARCHER.searcher.maxDoc", new AtomicLong(replicasNumDocs));
replicaProps.put("SEARCHER.searcher.deletedDocs", new AtomicLong(0));
replicaProps.put(Type.CORE_IDX.metricsAttribute, new AtomicLong(replicasIndexSize));
replicaProps.put(Variable.coreidxsize, new AtomicDouble((Double)Type.CORE_IDX.convertVal(replicasIndexSize)));
ReplicaInfo ri = new ReplicaInfo("core_node" + Assign.incAndGetId(stateManager, collectionName, 0),
solrCoreName, collectionName, replicaPosition.shard, replicaPosition.type, subShardNodeName, replicaProps);
simAddReplica(replicaPosition.node, ri, false);
}
simRunLeaderElection(Collections.singleton(collectionName), true);
// delay it once again to better simulate replica recoveries
//opDelay(collectionName, CollectionParams.CollectionAction.SPLITSHARD.name());
boolean success = false;
try {
CloudUtil.waitForState(cloudManager, collectionName, 30, TimeUnit.SECONDS, (liveNodes, state) -> {
for (String subSlice : subSlices) {
Slice s = state.getSlice(subSlice);
if (s.getLeader() == null) {
log.debug("** no leader in {} / {}", collectionName, s);
return false;
}
if (s.getReplicas().size() < repFactor) {
if (log.isDebugEnabled()) {
log.debug("** expected {} repFactor but there are {} replicas", repFactor, s.getReplicas().size());
}
return false;
}
}
return true;
});
success = true;
} finally {
if (!success) {
Map<String, Object> sProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
sProps.remove(BUFFERED_UPDATES);
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
}
}
// mark the new slices as active and the old slice as inactive
if (log.isTraceEnabled()) {
log.trace("-- switching slice states after split shard: collection={}, parent={}, subSlices={}", collectionName,
sliceName.get(), subSlices);
}
lock.lockInterruptibly();
try {
Map<String, Object> sProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(sliceName.get(), s -> new ConcurrentHashMap<>());
sProps.put(ZkStateReader.STATE_PROP, Slice.State.INACTIVE.toString());
sProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
AtomicLong bufferedUpdates = (AtomicLong)sProps.remove(BUFFERED_UPDATES);
if (bufferedUpdates.get() > 0) {
// apply buffered updates
long perShard = bufferedUpdates.get() / subSlices.size();
long remainder = bufferedUpdates.get() % subSlices.size();
if (log.isDebugEnabled()) {
log.debug("-- applying {} buffered docs from {} / {}, perShard={}, remainder={}", bufferedUpdates.get(),
collectionName, parentSlice.getName(), perShard, remainder);
}
for (int i = 0; i < subSlices.size(); i++) {
String sub = subSlices.get(i);
long numUpdates = perShard;
if (i == 0) {
numUpdates += remainder;
}
simSetShardValue(collectionName, sub, "SEARCHER.searcher.numDocs", numUpdates, true, false);
simSetShardValue(collectionName, sub, "SEARCHER.searcher.maxDoc", numUpdates, true, false);
}
}
// XXX also mark replicas as down? currently SplitShardCmd doesn't do this
for (String s : subSlices) {
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>())
.computeIfAbsent(s, ss -> new ConcurrentHashMap<>());
sliceProps.put(ZkStateReader.STATE_PROP, Slice.State.ACTIVE.toString());
sliceProps.put(ZkStateReader.STATE_TIMESTAMP_PROP, String.valueOf(cloudManager.getTimeSource().getEpochTimeNs()));
}
// invalidate cached state
collectionsStatesRef.set(null);
} finally {
SplitShardCmd.unlockForSplit(cloudManager, collectionName, sliceName.get());
lock.unlock();
}
results.add("success", "");
}
/**
* Delete a shard. This uses a similar algorithm as {@link org.apache.solr.cloud.api.collections.DeleteShardCmd}
* @param message operation details
* @param results operation results
*/
@SuppressWarnings({"unchecked", "rawtypes"})
public void simDeleteShard(ZkNodeProps message, NamedList results) throws Exception {
ensureNotClosed();
if (message.getStr(CommonAdminParams.ASYNC) != null) {
results.add(CoreAdminParams.REQUESTID, message.getStr(CommonAdminParams.ASYNC));
}
String collectionName = message.getStr(COLLECTION_PROP);
String sliceName = message.getStr(SHARD_ID_PROP);
ClusterState clusterState = getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
if (collection == null) {
throw new Exception("Collection " + collectionName + " doesn't exist");
}
Slice slice = collection.getSlice(sliceName);
if (slice == null) {
throw new Exception(" Collection " + collectionName + " slice " + sliceName + " doesn't exist.");
}
opDelay(collectionName, CollectionParams.CollectionAction.DELETESHARD.name());
lock.lockInterruptibly();
try {
sliceProperties.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
colShardReplicaMap.computeIfAbsent(collectionName, c -> new ConcurrentHashMap<>()).remove(sliceName);
nodeReplicaMap.forEach((n, replicas) -> {
synchronized (replicas) {
Iterator<ReplicaInfo> it = replicas.iterator();
while (it.hasNext()) {
ReplicaInfo ri = it.next();
if (ri.getCollection().equals(collectionName) && ri.getShard().equals(sliceName)) {
it.remove();
}
}
}
});
collectionsStatesRef.set(null);
results.add("success", "");
} catch (Exception e) {
results.add("failure", e.toString());
} finally {
lock.unlock();
}
}
@SuppressWarnings({"rawtypes"})
public void createSystemCollection() throws IOException {
try {
synchronized (this) {
if (colShardReplicaMap.containsKey(CollectionAdminParams.SYSTEM_COLL)) {
return;
}
}
String repFactor = String.valueOf(Math.min(3, liveNodes.size()));
ZkNodeProps props = new ZkNodeProps(
NAME, CollectionAdminParams.SYSTEM_COLL,
REPLICATION_FACTOR, repFactor,
OverseerCollectionMessageHandler.NUM_SLICES, "1",
CommonAdminParams.WAIT_FOR_FINAL_STATE, "true");
simCreateCollection(props, new NamedList());
CloudUtil.waitForState(cloudManager, CollectionAdminParams.SYSTEM_COLL, 120, TimeUnit.SECONDS,
CloudUtil.clusterShape(1, Integer.parseInt(repFactor), false, true));
} catch (Exception e) {
throw new IOException(e);
}
}
/**
* Simulate an update by modifying replica metrics.
* The following core metrics are updated:
* <ul>
* <li><code>SEARCHER.searcher.numDocs</code> - increased by added docs, decreased by deleteById and deleteByQuery</li>
* <li><code>SEARCHER.searcher.deletedDocs</code> - decreased by deleteById and deleteByQuery by up to <code>numDocs</code></li>
* <li><code>SEARCHER.searcher.maxDoc</code> - always increased by the number of added docs.</li>
* </ul>
* <p>IMPORTANT limitations:</p>
* <ul>
* <li>document replacements are always counted as new docs</li>
* <li>delete by ID always succeeds (unless numDocs == 0)</li>
* <li>deleteByQuery is not supported unless the query is <code>*:*</code></li>
* </ul>
* @param req update request. This request MUST have the <code>collection</code> param set.
* @return {@link UpdateResponse}
* @throws SolrException on errors, such as nonexistent collection or unsupported deleteByQuery
*/
public UpdateResponse simUpdate(UpdateRequest req) throws SolrException, InterruptedException, IOException {
ensureNotClosed();
String collection = req.getCollection();
if (collection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
}
ensureSystemCollection(collection);
DocCollection coll = getClusterState().getCollection(collection);
DocRouter router = coll.getRouter();
List<String> deletes = req.getDeleteById();
Map<String, AtomicLong> freediskDeltaPerNode = new HashMap<>();
if (deletes != null && !deletes.isEmpty()) {
Map<String, AtomicLong> deletesPerShard = new HashMap<>();
Map<String, Number> indexSizePerShard = new HashMap<>();
for (String id : deletes) {
Slice s = router.getTargetSlice(id, null, null, req.getParams(), coll);
Replica leader = s.getLeader();
if (leader == null) {
throw new IOException("-- no leader in " + s);
}
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
ReplicaInfo ri = getReplicaInfo(leader);
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
if (numDocs == null || numDocs.intValue() <= 0) {
if (log.isDebugEnabled()) {
log.debug("-- attempting to delete nonexistent doc {} from {}", id, s.getLeader());
}
continue;
}
// this is somewhat wrong - we should wait until buffered updates are applied
// but this way the freedisk changes are much easier to track
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong(0))
.addAndGet(DEFAULT_DOC_SIZE_BYTES));
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
if (bufferedUpdates.get() > 0) {
bufferedUpdates.decrementAndGet();
} else {
if (log.isDebugEnabled()) {
log.debug("-- attempting to delete nonexistent buffered doc {} from {}", id, s.getLeader());
}
}
continue;
}
deletesPerShard.computeIfAbsent(s.getName(), slice -> new AtomicLong(0)).incrementAndGet();
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (indexSize != null) {
indexSizePerShard.put(s.getName(), indexSize);
}
}
if (!deletesPerShard.isEmpty()) {
lock.lockInterruptibly();
try {
for (Map.Entry<String, AtomicLong> entry : deletesPerShard.entrySet()) {
String shard = entry.getKey();
simSetShardValue(collection, shard, "SEARCHER.searcher.deletedDocs", entry.getValue().get(), true, false);
simSetShardValue(collection, shard, "SEARCHER.searcher.numDocs", -entry.getValue().get(), true, false);
Number indexSize = indexSizePerShard.get(shard);
long delSize = DEFAULT_DOC_SIZE_BYTES * entry.getValue().get();
if (indexSize != null) {
indexSize = indexSize.longValue() - delSize;
if (indexSize.longValue() < SimCloudManager.DEFAULT_IDX_SIZE_BYTES) {
indexSize = SimCloudManager.DEFAULT_IDX_SIZE_BYTES;
}
simSetShardValue(collection, shard, Type.CORE_IDX.metricsAttribute,
new AtomicLong(indexSize.longValue()), false, false);
simSetShardValue(collection, shard, Variable.coreidxsize,
new AtomicDouble((Double)Type.CORE_IDX.convertVal(indexSize)), false, false);
} else {
throw new Exception("unexpected indexSize for collection=" + collection + ", shard=" + shard + ": " + indexSize);
}
}
} catch (Exception e) {
throw new IOException(e);
} finally {
lock.unlock();
}
}
}
deletes = req.getDeleteQuery();
if (deletes != null && !deletes.isEmpty()) {
for (String q : deletes) {
if (!"*:*".equals(q)) {
throw new UnsupportedOperationException("Only '*:*' query is supported in deleteByQuery");
}
//log.debug("-- req delByQ {}", collection);
for (Slice s : coll.getSlices()) {
Replica leader = s.getLeader();
if (leader == null) {
throw new IOException("-- no leader in " + s);
}
cloudManager.getMetricManager().registry(createRegistryName(collection, s.getName(), leader)).counter("UPDATE./update.requests").inc();
ReplicaInfo ri = getReplicaInfo(leader);
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
if (numDocs == null || numDocs.intValue() == 0) {
continue;
}
lock.lockInterruptibly();
try {
Number indexSize = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (indexSize != null) {
long delta = indexSize.longValue() < SimCloudManager.DEFAULT_IDX_SIZE_BYTES ? 0 :
indexSize.longValue() - SimCloudManager.DEFAULT_IDX_SIZE_BYTES;
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong(0))
.addAndGet(delta));
} else {
throw new RuntimeException("Missing index size in " + ri);
}
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.deletedDocs", new AtomicLong(numDocs.longValue()), false, false);
simSetShardValue(collection, s.getName(), "SEARCHER.searcher.numDocs", new AtomicLong(0), false, false);
simSetShardValue(collection, s.getName(), Type.CORE_IDX.metricsAttribute,
new AtomicLong(SimCloudManager.DEFAULT_IDX_SIZE_BYTES), false, false);
simSetShardValue(collection, s.getName(), Variable.coreidxsize,
new AtomicDouble((Double)Type.CORE_IDX.convertVal(SimCloudManager.DEFAULT_IDX_SIZE_BYTES)), false, false);
} catch (Exception e) {
throw new IOException(e);
} finally {
lock.unlock();
}
}
}
}
List<SolrInputDocument> docs = req.getDocuments();
int docCount = 0;
Iterator<SolrInputDocument> it = null;
if (docs != null) {
docCount = docs.size();
} else {
it = req.getDocIterator();
if (it != null) {
while (it.hasNext()) {
it.next();
docCount++;
}
}
}
if (docCount > 0) {
//log.debug("-- req update {}/{}", collection, docCount);
// this approach to updating counters and metrics drastically increases performance
// of bulk updates, because simSetShardValue is relatively costly
Map<String, AtomicLong> docUpdates = new HashMap<>();
Map<String, Map<String, AtomicLong>> metricUpdates = new HashMap<>();
// XXX don't add more than 2bln docs in one request
boolean modified = false;
lock.lockInterruptibly();
try {
coll = getClusterState().getCollection(collection);
Slice[] slices = coll.getActiveSlicesArr();
if (slices.length == 0) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection without slices");
}
int[] perSlice = new int[slices.length];
if (it != null) {
// BULK UPDATE: simulate random doc assignment without actually calling DocRouter,
// which adds significant overhead
int totalAdded = 0;
for (int i = 0; i < slices.length; i++) {
Slice s = slices[i];
long count = (long) docCount * ((long) s.getRange().max - (long) s.getRange().min) / 0x100000000L;
perSlice[i] = (int) count;
totalAdded += perSlice[i];
}
// loss of precision due to integer math
int diff = docCount - totalAdded;
if (diff > 0) {
// spread the remainder more or less equally
int perRemain = diff / slices.length;
int remainder = diff % slices.length;
int remainderSlice = slices.length > 1 ? bulkUpdateRandom.nextInt(slices.length) : 0;
for (int i = 0; i < slices.length; i++) {
perSlice[i] += perRemain;
if (i == remainderSlice) {
perSlice[i] += remainder;
}
}
}
for (int i = 0; i < slices.length; i++) {
Slice s = slices[i];
Replica leader = s.getLeader();
if (leader == null) {
throw new IOException("-- no leader in " + s);
}
metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
.computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
.addAndGet(perSlice[i]);
modified = true;
long perSliceCount = perSlice[i];
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong(0))
.addAndGet(-perSliceCount * DEFAULT_DOC_SIZE_BYTES));
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
bufferedUpdates.addAndGet(perSlice[i]);
continue;
}
docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
.addAndGet(perSlice[i]);
}
} else {
// SMALL UPDATE: use exact assignment via DocRouter
for (SolrInputDocument doc : docs) {
String id = (String) doc.getFieldValue("id");
if (id == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Document without id: " + doc);
}
Slice s = coll.getRouter().getTargetSlice(id, doc, null, null, coll);
Replica leader = s.getLeader();
if (leader == null) {
throw new IOException("-- no leader in " + s);
}
metricUpdates.computeIfAbsent(s.getName(), sh -> new HashMap<>())
.computeIfAbsent(leader.getCoreName(), cn -> new AtomicLong())
.incrementAndGet();
modified = true;
s.getReplicas().forEach(r ->
freediskDeltaPerNode.computeIfAbsent(r.getNodeName(), node -> new AtomicLong())
.addAndGet(-DEFAULT_DOC_SIZE_BYTES));
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
bufferedUpdates.incrementAndGet();
continue;
}
docUpdates.computeIfAbsent(s.getName(), sh -> new AtomicLong())
.incrementAndGet();
}
}
if (modified) {
docUpdates.forEach((sh, count) -> {
try {
simSetShardValue(collection, sh, "SEARCHER.searcher.numDocs", count.get(), true, false);
simSetShardValue(collection, sh, "SEARCHER.searcher.maxDoc", count.get(), true, false);
simSetShardValue(collection, sh, "UPDATE./update.requests", count.get(), true, false);
// for each new document increase the size by DEFAULT_DOC_SIZE_BYTES
simSetShardValue(collection, sh, Type.CORE_IDX.metricsAttribute,
DEFAULT_DOC_SIZE_BYTES * count.get(), true, false);
simSetShardValue(collection, sh, Variable.coreidxsize,
Type.CORE_IDX.convertVal(DEFAULT_DOC_SIZE_BYTES * count.get()), true, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
metricUpdates.forEach((sh, cores) -> {
cores.forEach((core, count) -> {
String registry = SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, sh,
Utils.parseMetricsReplicaName(collection, core));
cloudManager.getMetricManager().registry(registry).counter("UPDATE./update.requests").inc(count.get());
});
});
}
} finally {
lock.unlock();
}
}
if (!freediskDeltaPerNode.isEmpty()) {
SimNodeStateProvider nodeStateProvider = cloudManager.getSimNodeStateProvider();
freediskDeltaPerNode.forEach((node, delta) -> {
if (delta.get() == 0) {
return;
}
try {
// this method does its own locking to prevent races
nodeStateProvider.simUpdateNodeValue(node, Type.FREEDISK.tagName, val -> {
if (val == null) {
throw new RuntimeException("no freedisk for node " + node);
}
double freedisk = ((Number) val).doubleValue();
double deltaGB = (Double) Type.FREEDISK.convertVal(delta.get());
freedisk += deltaGB;
if (freedisk < 0) {
log.warn("-- freedisk={} - ran out of disk space on node {}", freedisk, node);
freedisk = 0;
}
return freedisk;
});
} catch (Exception e) {
throw new RuntimeException(e);
}
});
}
SolrParams params = req.getParams();
if (params != null && (params.getBool(UpdateParams.OPTIMIZE, false) || params.getBool(UpdateParams.EXPUNGE_DELETES, false))) {
lock.lockInterruptibly();
try {
coll.getSlices().forEach(s -> {
Replica leader = s.getLeader();
ReplicaInfo ri = getReplicaInfo(leader);
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
if (numDocs == null || numDocs.intValue() == 0) {
numDocs = 0;
}
try {
simSetShardValue(ri.getCollection(), ri.getShard(), "SEARCHER.searcher.maxDoc", numDocs, false, false);
simSetShardValue(ri.getCollection(), ri.getShard(), "SEARCHER.searcher.deletedDocs", 0, false, false);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} finally {
lock.unlock();
}
}
return new UpdateResponse();
}
public QueryResponse simQuery(QueryRequest req) throws SolrException, InterruptedException, IOException {
ensureNotClosed();
String collection = req.getCollection();
if (collection == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection not set");
}
ensureSystemCollection(collection);
if (!colShardReplicaMap.containsKey(collection)) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection does not exist");
}
String query = req.getParams().get(CommonParams.Q);
if (query == null || !query.equals("*:*")) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Only '*:*' query is supported");
}
ClusterState clusterState = getClusterState();
DocCollection coll = clusterState.getCollection(collection);
AtomicLong count = new AtomicLong();
for (Slice s : coll.getActiveSlicesArr()) {
Replica r = s.getLeader();
if (r == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, collection + "/" + s.getName() + " has no leader");
}
ReplicaInfo ri = getReplicaInfo(r);
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs", 0L);
count.addAndGet(numDocs.longValue());
AtomicLong bufferedUpdates = (AtomicLong)sliceProperties.get(collection).get(s.getName()).get(BUFFERED_UPDATES);
if (bufferedUpdates != null) {
count.addAndGet(bufferedUpdates.get());
}
}
QueryResponse rsp = new QueryResponse();
NamedList<Object> values = new NamedList<>();
values.add("responseHeader", new NamedList<>());
SolrDocumentList docs = new SolrDocumentList();
docs.setNumFound(count.get());
values.add("response", docs);
rsp.setResponse(values);
return rsp;
}
private void ensureSystemCollection(String collection) throws InterruptedException, IOException {
if (!simListCollections().contains(collection)) {
if (CollectionAdminParams.SYSTEM_COLL.equals(collection)) {
// auto-create
createSystemCollection();
} else {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection '" + collection + "' doesn't exist");
}
}
}
private static String createRegistryName(String collection, String shard, Replica r) {
return SolrMetricManager.getRegistryName(SolrInfoBean.Group.core, collection, shard,
Utils.parseMetricsReplicaName(collection, r.getCoreName()));
}
/**
* Saves cluster properties to clusterprops.json.
* @return current properties
*/
@SuppressWarnings({"unchecked"})
private synchronized Map<String, Object> saveClusterProperties() throws Exception {
if (lastSavedProperties != null && lastSavedProperties.equals(clusterProperties)) {
return lastSavedProperties;
}
byte[] data = Utils.toJSON(clusterProperties);
VersionedData oldData = stateManager.getData(ZkStateReader.CLUSTER_PROPS);
int version = oldData != null ? oldData.getVersion() : -1;
stateManager.setData(ZkStateReader.CLUSTER_PROPS, data, version);
lastSavedProperties = new ConcurrentHashMap<>((Map)Utils.fromJSON(data));
return lastSavedProperties;
}
/**
* Set all cluster properties. This also updates the clusterprops.json data in
* {@link DistribStateManager}
* @param properties properties to set
*/
public void simSetClusterProperties(Map<String, Object> properties) throws Exception {
lock.lockInterruptibly();
try {
clusterProperties.clear();
if (properties != null) {
this.clusterProperties.putAll(properties);
}
saveClusterProperties();
} finally {
lock.unlock();
}
}
/**
* Set a cluster property. This also updates the clusterprops.json data in
* {@link DistribStateManager}
* @param key property name
* @param value property value
*/
public void simSetClusterProperty(String key, Object value) throws Exception {
lock.lockInterruptibly();
try {
if (value != null) {
clusterProperties.put(key, value);
} else {
clusterProperties.remove(key);
}
saveClusterProperties();
} finally {
lock.unlock();
}
}
/**
* Set collection properties.
* @param coll collection name
* @param properties properties
*/
public void simSetCollectionProperties(String coll, Map<String, Object> properties) throws Exception {
lock.lockInterruptibly();
try {
if (properties == null) {
collProperties.remove(coll);
} else {
Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
props.clear();
props.putAll(properties);
}
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
}
/**
* Set collection property.
* @param coll collection name
* @param key property name
* @param value property value
*/
public void simSetCollectionProperty(String coll, String key, String value) throws Exception {
lock.lockInterruptibly();
try {
final Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
if (value == null) {
props.remove(key);
} else {
props.put(key, value);
}
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
}
/**
* Set slice properties.
* @param coll collection name
* @param slice slice name
* @param properties slice properties
*/
public void simSetSliceProperties(String coll, String slice, Map<String, Object> properties) throws Exception {
lock.lockInterruptibly();
try {
final Map<String, Object> sliceProps = sliceProperties.computeIfAbsent
(coll, c -> new HashMap<>()).computeIfAbsent(slice, s -> new HashMap<>());
sliceProps.clear();
if (properties != null) {
sliceProps.putAll(properties);
}
collectionsStatesRef.set(null);
} finally {
lock.unlock();
}
}
/**
* Set per-collection value (eg. a metric). This value will be applied to each replica.
* @param collection collection name
* @param key property name
* @param value property value
*/
public void simSetCollectionValue(String collection, String key, Object value) throws Exception {
simSetCollectionValue(collection, key, value, false, false);
}
/**
* Set per-collection value (eg. a metric). This value will be applied to each replica.
* @param collection collection name
* @param key property name
* @param value property value
* @param divide if the value is a {@link Number} and this param is true, then the value will be evenly
* divided by the number of replicas.
*/
public void simSetCollectionValue(String collection, String key, Object value, boolean delta, boolean divide) throws Exception {
simSetShardValue(collection, null, key, value, delta, divide);
}
/**
* Set per-collection value (eg. a metric). This value will be applied to each replica in a selected shard.
* @param collection collection name
* @param shard shard name. If null then all shards will be affected.
* @param key property name
* @param value property value
*/
public void simSetShardValue(String collection, String shard, String key, Object value) throws Exception {
simSetShardValue(collection, shard, key, value, false, false);
}
/**
* Set per-collection value (eg. a metric). This value will be applied to each replica in a selected shard.
* @param collection collection name
* @param shard shard name. If null then all shards will be affected.
* @param key property name
* @param value property value
* @param delta if true then treat the numeric value as delta to add to the existing value
* (or set the value to delta if missing)
* @param divide if the value is a {@link Number} and this is true, then the value will be evenly
* divided by the number of replicas.
*/
public void simSetShardValue(String collection, String shard, String key, Object value, boolean delta, boolean divide) throws Exception {
final List<ReplicaInfo> infos;
if (shard == null) {
infos = new ArrayList<>();
colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
.forEach((sh, replicas) -> infos.addAll(replicas));
} else {
infos = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
.computeIfAbsent(shard, s -> new ArrayList<>());
}
if (infos.isEmpty()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Collection " + collection + " doesn't exist (shard=" + shard + ").");
}
if (divide && value != null && (value instanceof Number)) {
if ((value instanceof Long) || (value instanceof Integer)) {
value = ((Number) value).longValue() / infos.size();
} else {
value = ((Number) value).doubleValue() / infos.size();
}
}
for (ReplicaInfo r : infos) {
synchronized (r) {
if (value == null) {
r.getVariables().remove(key);
} else {
if (delta) {
Object prevValue = r.getVariables().get(key);
if (prevValue != null) {
if ((prevValue instanceof Number) && (value instanceof Number)) {
if (((prevValue instanceof Long) || (prevValue instanceof Integer) ||
(prevValue instanceof AtomicLong) || (prevValue instanceof AtomicInteger)) &&
((value instanceof Long) || (value instanceof Integer))) {
long newValue = ((Number)prevValue).longValue() + ((Number)value).longValue();
// minimize object allocations
if (prevValue instanceof AtomicLong) {
((AtomicLong)prevValue).set(newValue);
} else if (prevValue instanceof AtomicInteger) {
((AtomicInteger)prevValue).set(((Number)prevValue).intValue() + ((Number)value).intValue());
} else {
r.getVariables().put(key, newValue);
}
} else {
double newValue = ((Number)prevValue).doubleValue() + ((Number)value).doubleValue();
if (prevValue instanceof AtomicDouble) {
((AtomicDouble)prevValue).set(newValue);
} else {
r.getVariables().put(key, newValue);
}
}
} else {
throw new UnsupportedOperationException("delta cannot be applied to non-numeric values: " + prevValue + " and " + value);
}
} else {
if (value instanceof Integer) {
r.getVariables().put(key, new AtomicInteger((Integer)value));
} else if (value instanceof Long) {
r.getVariables().put(key, new AtomicLong((Long)value));
} else if (value instanceof Double) {
r.getVariables().put(key, new AtomicDouble((Double)value));
} else {
r.getVariables().put(key, value);
}
}
} else {
if (value instanceof Integer) {
r.getVariables().put(key, new AtomicInteger((Integer)value));
} else if (value instanceof Long) {
r.getVariables().put(key, new AtomicLong((Long)value));
} else if (value instanceof Double) {
r.getVariables().put(key, new AtomicDouble((Double)value));
} else {
r.getVariables().put(key, value);
}
}
}
}
}
}
public void simSetReplicaValues(String node, Map<String, Map<String, List<ReplicaInfo>>> source, boolean overwrite) {
List<ReplicaInfo> infos = nodeReplicaMap.get(node);
if (infos == null) {
throw new RuntimeException("Node not present: " + node);
}
// core_node_name is not unique across collections
Map<String, Map<String, ReplicaInfo>> infoMap = new HashMap<String, Map<String, ReplicaInfo>>();
infos.forEach(ri -> infoMap.computeIfAbsent(ri.getCollection(), o -> new HashMap<String, ReplicaInfo>()).put(ri.getName(), ri));
source.forEach((coll, shards) -> shards.forEach((shard, replicas) -> replicas.forEach(r -> {
ReplicaInfo target = infoMap.getOrDefault(coll, Collections.emptyMap()).get(r.getName());
if (target == null) {
throw new RuntimeException("Unable to find simulated replica of " + r);
}
r.getVariables().forEach((k, v) -> {
if (target.getVariables().containsKey(k)) {
if (overwrite) {
target.getVariables().put(k, v);
}
} else {
target.getVariables().put(k, v);
}
});
})));
}
/**
* Return all replica infos for a node.
* @param node node id
* @return copy of the list of replicas on that node, or empty list if none
*/
public List<ReplicaInfo> simGetReplicaInfos(String node) {
final List<ReplicaInfo> replicas = nodeReplicaMap.computeIfAbsent
(node, o -> Collections.synchronizedList(new ArrayList<ReplicaInfo>()));
// make a defensive copy to avoid ConcurrentModificationException
return Arrays.asList(replicas.toArray(new ReplicaInfo[replicas.size()]));
}
public List<ReplicaInfo> simGetReplicaInfos(String collection, String shard) {
List<ReplicaInfo> replicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>())
.computeIfAbsent(shard, s -> new ArrayList<>());
if (replicas == null) {
return Collections.emptyList();
} else {
// make a defensive copy to avoid ConcurrentModificationException
return Arrays.asList(replicas.toArray(new ReplicaInfo[replicas.size()]));
}
}
public ReplicaInfo simGetReplicaInfo(String collection, String coreNode) {
Map<String, List<ReplicaInfo>> shardsReplicas = colShardReplicaMap.computeIfAbsent(collection, c -> new ConcurrentHashMap<>());
for (List<ReplicaInfo> replicas : shardsReplicas.values()) {
for (ReplicaInfo ri : replicas) {
if (ri.getName().equals(coreNode)) {
return ri;
}
}
}
return null;
}
/**
* List collections.
* @return list of existing collections.
*/
public List<String> simListCollections() throws InterruptedException {
return new ArrayList<>(colShardReplicaMap.keySet());
}
public Map<String, Map<String, Object>> simGetCollectionStats() throws IOException, InterruptedException {
lock.lockInterruptibly();
try {
final Map<String, Map<String, Object>> stats = new TreeMap<>();
collectionsStatesRef.set(null);
ClusterState state = getClusterState();
state.forEachCollection(coll -> {
Map<String, Object> perColl = new LinkedHashMap<>();
stats.put(coll.getName(), perColl);
perColl.put("shardsTotal", coll.getSlices().size());
Map<String, AtomicInteger> shardState = new TreeMap<>();
int noLeader = 0;
SummaryStatistics docs = new SummaryStatistics();
SummaryStatistics bytes = new SummaryStatistics();
SummaryStatistics inactiveDocs = new SummaryStatistics();
SummaryStatistics inactiveBytes = new SummaryStatistics();
long deletedDocs = 0;
long bufferedDocs = 0;
int totalReplicas = 0;
int activeReplicas = 0;
for (Slice s : coll.getSlices()) {
shardState.computeIfAbsent(s.getState().toString(), st -> new AtomicInteger())
.incrementAndGet();
totalReplicas += s.getReplicas().size();
if (s.getState() != Slice.State.ACTIVE) {
if (!s.getReplicas().isEmpty()) {
ReplicaInfo ri = getReplicaInfo(s.getReplicas().iterator().next());
if (ri != null) {
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
Number numBytes = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (numDocs != null) {
inactiveDocs.addValue(numDocs.doubleValue());
}
if (numBytes != null) {
inactiveBytes.addValue(numBytes.doubleValue());
}
}
}
continue;
}
AtomicLong buffered = (AtomicLong)sliceProperties.get(coll.getName()).get(s.getName()).get(BUFFERED_UPDATES);
if (buffered != null) {
bufferedDocs += buffered.get();
}
for (Replica r : s.getReplicas()) {
if (r.getState() == Replica.State.ACTIVE) {
activeReplicas++;
}
}
Replica leader = s.getLeader();
if (leader == null) {
noLeader++;
if (!s.getReplicas().isEmpty()) {
leader = s.getReplicas().iterator().next();
}
}
ReplicaInfo ri = null;
if (leader != null) {
ri = getReplicaInfo(leader);
if (ri == null) {
log.warn("Unknown ReplicaInfo for {}", leader);
}
}
if (ri != null) {
Number numDocs = (Number)ri.getVariable("SEARCHER.searcher.numDocs");
Number delDocs = (Number)ri.getVariable("SEARCHER.searcher.deleteDocs");
Number numBytes = (Number)ri.getVariable(Type.CORE_IDX.metricsAttribute);
if (numDocs != null) {
docs.addValue(numDocs.doubleValue());
}
if (delDocs != null) {
deletedDocs += delDocs.longValue();
}
if (numBytes != null) {
bytes.addValue(numBytes.doubleValue());
}
}
}
perColl.put("shardsState", shardState);
perColl.put(" shardsWithoutLeader", noLeader);
perColl.put("totalReplicas", totalReplicas);
perColl.put(" activeReplicas", activeReplicas);
perColl.put(" inactiveReplicas", totalReplicas - activeReplicas);
long totalDocs = (long)docs.getSum() + bufferedDocs;
perColl.put("totalActiveDocs", String.format(Locale.ROOT, "%,d", totalDocs));
perColl.put(" bufferedDocs", String.format(Locale.ROOT, "%,d", bufferedDocs));
perColl.put(" maxActiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)docs.getMax()));
perColl.put(" minActiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)docs.getMin()));
perColl.put(" avgActiveSliceDocs", String.format(Locale.ROOT, "%,.0f", docs.getMean()));
perColl.put("totalInactiveDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getSum()));
perColl.put(" maxInactiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getMax()));
perColl.put(" minInactiveSliceDocs", String.format(Locale.ROOT, "%,d", (long)inactiveDocs.getMin()));
perColl.put(" avgInactiveSliceDocs", String.format(Locale.ROOT, "%,.0f", inactiveDocs.getMean()));
perColl.put("totalActiveBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getSum()));
perColl.put(" maxActiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getMax()));
perColl.put(" minActiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)bytes.getMin()));
perColl.put(" avgActiveSliceBytes", String.format(Locale.ROOT, "%,.0f", bytes.getMean()));
perColl.put("totalInactiveBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getSum()));
perColl.put(" maxInactiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getMax()));
perColl.put(" minInactiveSliceBytes", String.format(Locale.ROOT, "%,d", (long)inactiveBytes.getMin()));
perColl.put(" avgInactiveSliceBytes", String.format(Locale.ROOT, "%,.0f", inactiveBytes.getMean()));
perColl.put("totalActiveDeletedDocs", String.format(Locale.ROOT, "%,d", deletedDocs));
});
return stats;
} finally {
lock.unlock();
}
}
// interface methods
@Override
public ClusterState.CollectionRef getState(String collection) {
try {
return getClusterState().getCollectionRef(collection);
} catch (IOException e) {
return null;
}
}
@Override
public Set<String> getLiveNodes() {
return liveNodes.get();
}
@Override
public List<String> resolveAlias(String alias) {
throw new UnsupportedOperationException("resolveAlias not implemented");
}
@Override
public Map<String, String> getAliasProperties(String alias) {
throw new UnsupportedOperationException("getAliasProperties not implemented");
}
@Override
public ClusterState getClusterState() throws IOException {
ensureNotClosed();
try {
lock.lockInterruptibly();
try {
Map<String, DocCollection> states = getCollectionStates();
ClusterState state = new ClusterState(clusterStateVersion, liveNodes.get(), states);
return state;
} finally {
lock.unlock();
}
} catch (InterruptedException e) {
throw new IOException(e);
}
}
// this method uses a simple cache in collectionsStatesRef. Operations that modify
// cluster state should always reset this cache so that the changes become visible
private Map<String, DocCollection> getCollectionStates() throws IOException, InterruptedException {
lock.lockInterruptibly();
try {
Map<String, DocCollection> collectionStates = collectionsStatesRef.get();
if (collectionStates != null) {
return collectionStates;
}
collectionsStatesRef.set(null);
log.debug("** creating new collection states, currentVersion={}", clusterStateVersion);
Map<String, Map<String, Map<String, Replica>>> collMap = new HashMap<>();
nodeReplicaMap.forEach((n, replicas) -> {
synchronized (replicas) {
replicas.forEach(ri -> {
Map<String, Object> props;
synchronized (ri) {
props = new HashMap<>(ri.getVariables());
}
props.put(ZkStateReader.NODE_NAME_PROP, n);
props.put(ZkStateReader.CORE_NAME_PROP, ri.getCore());
props.put(ZkStateReader.REPLICA_TYPE, ri.getType().toString());
props.put(ZkStateReader.STATE_PROP, ri.getState().toString());
Replica r = new Replica(ri.getName(), props, ri.getCollection(), ri.getShard());
collMap.computeIfAbsent(ri.getCollection(), c -> new HashMap<>())
.computeIfAbsent(ri.getShard(), s -> new HashMap<>())
.put(ri.getName(), r);
});
}
});
// add empty slices
sliceProperties.forEach((c, perSliceProps) -> {
perSliceProps.forEach((slice, props) -> {
collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>()).computeIfAbsent(slice, s -> new ConcurrentHashMap<>());
});
});
// add empty collections
collProperties.keySet().forEach(c -> {
collMap.computeIfAbsent(c, co -> new ConcurrentHashMap<>());
});
Map<String, DocCollection> res = new HashMap<>();
collMap.forEach((coll, shards) -> {
Map<String, Slice> slices = new HashMap<>();
shards.forEach((s, replicas) -> {
Map<String, Object> sliceProps = sliceProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>()).computeIfAbsent(s, sl -> new ConcurrentHashMap<>());
Slice slice = new Slice(s, replicas, sliceProps, coll);
slices.put(s, slice);
});
@SuppressWarnings({"unchecked"})
Map<String, Object> collProps = collProperties.computeIfAbsent(coll, c -> new ConcurrentHashMap<>());
@SuppressWarnings({"unchecked"})
Map<String, Object> routerProp = (Map<String, Object>) collProps.getOrDefault(DocCollection.DOC_ROUTER, Collections.singletonMap("name", DocRouter.DEFAULT_NAME));
DocRouter router = DocRouter.getDocRouter((String)routerProp.getOrDefault("name", DocRouter.DEFAULT_NAME));
DocCollection dc = new DocCollection(coll, slices, collProps, router, clusterStateVersion, ZkStateReader.CLUSTER_STATE);
res.put(coll, dc);
});
saveClusterState(new ClusterState(clusterStateVersion, liveNodes.get(), res));
collectionsStatesRef.set(res);
return res;
} finally {
lock.unlock();
}
}
@Override
public Map<String, Object> getClusterProperties() {
return clusterProperties;
}
@Override
public String getPolicyNameByCollection(String coll) {
Map<String, Object> props = collProperties.computeIfAbsent(coll, c -> new HashMap<>());
return (String)props.get("policy");
}
@Override
public void connect() {
}
@Override
public void close() throws IOException {
closed = true;
}
@Override
public boolean isClosed() {
return closed;
}
private void ensureNotClosed() throws IOException {
if (closed) {
throw new IOException("already closed");
}
}
}