import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.ConditionalMapWriter;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static;
import static;
import static;
import static;
import static;
import static org.apache.solr.common.ConditionalMapWriter.dedupeKeyPredicate;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
import static org.apache.solr.common.util.Utils.handleExp;
import static org.apache.solr.common.util.Utils.time;
import static org.apache.solr.common.util.Utils.timeElapsed;
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
public class PolicyHelper {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String POLICY_MAPPING_KEY = "PolicyHelper.policyMapping";
private static ThreadLocal<Map<String, String>> getPolicyMapping(SolrCloudManager cloudManager) {
return (ThreadLocal<Map<String, String>>) cloudManager.getObjectCache()
.computeIfAbsent(POLICY_MAPPING_KEY, k -> new ThreadLocal<>());
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
SolrCloudManager cloudManager,
Map<String, String> optionalPolicyMapping,
List<String> shardNames,
int nrtReplicas,
int tlogReplicas,
int pullReplicas,
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
ThreadLocal<Map<String, String>> policyMapping = getPolicyMapping(cloudManager);
ClusterStateProvider stateProvider = new DelegatingClusterStateProvider(cloudManager.getClusterStateProvider()) {
public String getPolicyNameByCollection(String coll) {
return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
optionalPolicyMapping.get(coll) :
SolrCloudManager delegatingManager = new DelegatingCloudManager(cloudManager) {
public ClusterStateProvider getClusterStateProvider() {
return stateProvider;
public DistribStateManager getDistribStateManager() {
if (autoScalingConfig != null) {
return new DelegatingDistribStateManager(null) {
public AutoScalingConfig getAutoScalingConfig() {
return autoScalingConfig;
} else {
return super.getDistribStateManager();
SessionWrapper sessionWrapper = null;
try {
try {
SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
Policy.Session origSession = sessionWrapper.session;
// new session needs to be created to avoid side-effects from per-collection policies
// TODO: refactor so cluster state cache is separate from storage of policies to avoid per cluster vs per collection interactions
// Need a Session that has all previous history of the original session, NOT filtered by what's present or not in Zookeeper
// (as does constructor Session(SolrCloudManager, Policy, Transaction)).
Policy.Session newSession = origSession.cloneToNewSession(delegatingManager);
Map<String, Double> diskSpaceReqd = new HashMap<>();
try {
DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collName);
if (coll != null) {
for (String shardName : shardNames) {
Replica ldr = coll.getLeader(shardName);
if (ldr != null && cloudManager.getClusterStateProvider().getLiveNodes().contains(ldr.getNodeName())) {
Map<String, Map<String, List<ReplicaInfo>>> details = cloudManager.getNodeStateProvider().getReplicaInfo(ldr.getNodeName(),
ReplicaInfo replicaInfo = details.getOrDefault(collName, emptyMap()).getOrDefault(shardName, singletonList(null)).get(0);
if (replicaInfo != null) {
Object idxSz = replicaInfo.getVariables().get(FREEDISK.perReplicaValue);
if (idxSz != null) {
diskSpaceReqd.put(shardName, 1.5 * (Double) Variable.Type.FREEDISK.validate(null, idxSz, false));
} catch (IOException e) {
log.warn("Exception while reading disk free metric values for nodes to be used for collection: {}", collName, e);
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
typeVsCount.put(Replica.Type.TLOG, tlogReplicas);
typeVsCount.put(Replica.Type.PULL, pullReplicas);
for (String shardName : shardNames) {
int idx = 0;
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
for (int i = 0; i < e.getValue(); i++) {
Suggester suggester = newSession.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, e.getKey())
.hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
if (nodesList != null) {
for (String nodeName : nodesList) {
suggester = suggester.hint(Hint.TARGET_NODE, nodeName);
if (diskSpaceReqd.get(shardName) != null) {
suggester.hint(Hint.MINFREEDISK, diskSpaceReqd.get(shardName));
SolrRequest op = suggester.getSuggestion();
if (op == null) {
String errorId = "AutoScaling.error.diagnostics." + System.nanoTime();
Policy.Session sessionCopy = suggester.session;
log.error("errorId : {} {}", errorId
, handleExp(log, "", () -> Utils.writeJson(getDiagnostics(sessionCopy), new StringWriter(), true).toString())); // logOk
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, " No node can satisfy the rules " +
Utils.toJSONString(Utils.getDeepCopy(newSession.expandedClauses, 4, true) + " More details from logs in node : "
+ Utils.getMDCNode() + ", errorId : " + errorId));
newSession = suggester.getSession();
positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
// We're happy with the updated session based on the original one, so let's update what the wrapper would hand
// to the next computation that wants a session.
} finally {
// We mark the wrapper (and its session) as being available to others.
if (sessionWrapper != null) {
return positions;
public static final int SESSION_EXPIRY = 180; // 3 minutes
public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
return getDiagnostics(session);
public static MapWriter getDiagnostics(Policy.Session session) {
List<Row> sorted = session.getSortedNodes();
return ew -> {
writeNodes(ew, sorted);
ew.put("liveNodes", session.cloudManager.getClusterStateProvider().getLiveNodes())
.put("violations", session.getViolations())
.put("config", session.getPolicy());
static void writeNodes(MapWriter.EntryWriter ew, List<Row> sorted) throws IOException {
Set<CharSequence> alreadyWritten = new HashSet<>();
BiPredicate<CharSequence, Object> p = dedupeKeyPredicate(alreadyWritten)
.and((s, o) -> !(o instanceof Map) || !((Map) o).isEmpty());
ew.put("sortedNodes", (IteratorWriter) iw -> {
for (Row row : sorted) {
iw.add((MapWriter) ew1 -> {
ew1.put("node", row.node, p).
put("isLive", row.isLive, p);
for (Cell cell : row.getCells())
ew1.put(, cell.val, p);
ew1.put("replicas", row.collectionVsShardVsReplicas);
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
SolrCloudManager cloudManager, SolrParams params) {
return getSuggestions(autoScalingConf, cloudManager, 20, 10, params);
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
SolrCloudManager cloudManager) {
return getSuggestions(autoScalingConf, cloudManager, 20, 10, null);
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
SolrCloudManager cloudManager, int max, int timeoutInSecs, SolrParams params) {
Policy policy = autoScalingConf.getPolicy();
Suggestion.Ctx ctx = new Suggestion.Ctx();
ctx.endTime = cloudManager.getTimeSource().getTimeNs() + TimeUnit.SECONDS.toNanos(timeoutInSecs);
ctx.max = max;
ctx.session = policy.createSession(cloudManager);
String[] t = params == null ? null : params.getParams("type");
List<String> types = t == null? Collections.emptyList(): Arrays.asList(t);
if(types.isEmpty() || types.contains( {
List<Violation> violations = ctx.session.getViolations();
for (Violation violation : violations) {
ctx.violation = null;
for (Violation current : ctx.session.getViolations()) {
for (Violation old : violations) {
if (!ctx.needMore()) return ctx.getSuggestions();
if (current.equals(old)) {
//could not be resolved
ctx.suggestions.add(new Suggester.SuggestionInfo(current, null, unresolved_violation));
if(types.isEmpty() || types.contains( {
if (ctx.needMore()) {
try {
addMissingReplicas(cloudManager, ctx);
} catch (IOException e) {
log.error("Unable to fetch cluster state", e);
if(types.isEmpty() || types.contains( {
if (ctx.needMore()) {
suggestOptimizations(ctx, Math.min(ctx.max - ctx.getSuggestions().size(), 10));
return ctx.getSuggestions();
private static void addMissingReplicas(SolrCloudManager cloudManager, Suggestion.Ctx ctx) throws IOException {
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> coll.forEach(slice -> {
if (!ctx.needMore()) return;
ReplicaCount replicaCount = new ReplicaCount();
slice.forEach(replica -> {
if (replica.getState() == Replica.State.ACTIVE || replica.getState() == Replica.State.RECOVERING) {
addMissingReplicas(replicaCount, coll, slice.getName(), Replica.Type.NRT, ctx);
addMissingReplicas(replicaCount, coll, slice.getName(), Replica.Type.PULL, ctx);
addMissingReplicas(replicaCount, coll, slice.getName(), Replica.Type.TLOG, ctx);
private static void addMissingReplicas(ReplicaCount count, DocCollection coll, String shard, Replica.Type type, Suggestion.Ctx ctx) {
int delta =, 0), type);
for (; ; ) {
if (!ctx.needMore()) return;
if (delta >= 0) break;
SolrRequest suggestion = ctx.addSuggestion(
.hint(Hint.REPLICATYPE, type)
.hint(Hint.COLL_SHARD, new Pair(coll.getName(), shard)),;
if (suggestion == null) return;
private static void suggestOptimizations(Suggestion.Ctx ctx, int count) {
int maxTotalSuggestions = ctx.getSuggestions().size() + count;
List<Row> matrix = ctx.session.matrix;
if (matrix.isEmpty()) return;
for (int i = 0; i < matrix.size(); i++) {
if (ctx.getSuggestions().size() >= maxTotalSuggestions || ctx.hasTimedOut()) break;
Row row = matrix.get(i);
Map<String, Collection<String>> collVsShards = new HashMap<>();
row.forEachReplica(ri -> collVsShards.computeIfAbsent(ri.getCollection(), s -> new HashSet<>()).add(ri.getShard()));
for (Map.Entry<String, Collection<String>> e : collVsShards.entrySet()) {
e.setValue(FreeDiskVariable.getSortedShards(Collections.singletonList(row), e.getValue(), e.getKey()));
for (Map.Entry<String, Collection<String>> e : collVsShards.entrySet()) {
if (!ctx.needMore()) return;
if (ctx.getSuggestions().size() >= maxTotalSuggestions || ctx.hasTimedOut()) break;
for (String shard : e.getValue()) {
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>(e.getKey(), shard))
.hint(Hint.SRC_NODE, row.node);
ctx.addSuggestion(suggester, Suggestion.Type.improvement);
if (ctx.getSuggestions().size() >= maxTotalSuggestions) break;
* Use this to dump the state of a system and to generate a testcase
public static void logState(SolrCloudManager cloudManager, Suggester suggester) {
if (log.isTraceEnabled()) {
try {
if (log.isTraceEnabled()) {
log.trace("LOGSTATE: {}",
Utils.writeJson(loggingInfo(cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy(), cloudManager, suggester),
new StringWriter(), true));
} catch (Exception e) {
throw new RuntimeException(e);
static MapWriter loggingInfo(Policy policy, SolrCloudManager cloudManager, Suggester suggester) {
return ew -> {
ew.put("diagnostics", getDiagnostics(policy,
if (suggester != null) {
ew.put("suggester", suggester);
public enum Status {
* A command is actively using and modifying the session to compute placements
* A command is not done yet processing its changes but no longer updates or even uses the session
* This class stores sessions for sharing purposes. If a process requires a session to
* compute operations:
* <ol>
* <li>see if there is an available non expired session in the cache,</li>
* <li>if yes, borrow it.</li>
* <li>if no, create a new one and borrow it.</li>
* <li>after computing (update) operations are done, {@link #returnSession(SessionWrapper)} back to the cache so it's
* again available for borrowing.</li>
* <li>after all borrowers are done computing then executing with the session, {@link #release(SessionWrapper)} it,
* which removes it from the cache.</li>
* </ol>
static class SessionRef {
* Lock protecting access to {@link #sessionWrapperSet} and to {@link #creationsInProgress}
private final Object lockObj = new Object();
* Sessions currently in use in {@link Status#COMPUTING} or {@link Status#EXECUTING} states. As soon as all
* uses of a session are over, that session is removed from this set. Sessions not actively in use are NOT kept around.
* <p>Access should only be done under the protection of {@link #lockObj}</p>
private Set<SessionWrapper> sessionWrapperSet = Collections.newSetFromMap(new IdentityHashMap<>());
* Number of sessions currently being created but not yet present in {@link #sessionWrapperSet}.
* <p>Access should only be done under the protection of {@link #lockObj}</p>
private int creationsInProgress = 0;
public SessionRef() {
// used only by tests
boolean isEmpty() {
synchronized (lockObj) {
return sessionWrapperSet.isEmpty();
* All operations suggested by the current session object
* is complete. Do not even cache anything
private void release(SessionWrapper sessionWrapper) {
boolean present;
synchronized (lockObj) {
present = sessionWrapperSet.remove(sessionWrapper);
if (!present) {
log.warn("released session {} not found in session set", sessionWrapper.getCreateTime());
} else {
if (log.isDebugEnabled()) {
TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
log.debug("final release, session {} lived a total of {}ms, ", sessionWrapper.getCreateTime(),
timeElapsed(timeSource, TimeUnit.MILLISECONDS.convert(sessionWrapper.getCreateTime(),
* Computing is over for this session and it may contain a new session with new state
* The session can be used by others while the caller is performing operations
private void returnSession(SessionWrapper sessionWrapper) {
boolean present;
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
present = sessionWrapperSet.contains(sessionWrapper);
// wake up single thread waiting for a session return (ok if not woken up, wait is short)
// Important to wake up a single one, otherwise of multiple waiting threads, all but one will immediately create new sessions
// Logging
if (present) {
if (log.isDebugEnabled()) {
log.debug("returnSession {}", sessionWrapper.getCreateTime());
} else {
log.warn("returning unknown session {} ", sessionWrapper.getCreateTime());
* <p>Method returning an available session that can be used for {@link Status#COMPUTING}, either from the
* {@link #sessionWrapperSet} cache or by creating a new one. The status of the returned session is set to {@link Status#COMPUTING}.</p>
* Some waiting is done in two cases:
* <ul>
* <li>A candidate session is present in {@link #sessionWrapperSet} but is still {@link Status#COMPUTING}, a random wait
* is observed to see if the session gets freed to save a session creation and allow session reuse,</li>
* <li>It is necessary to create a new session but there are already sessions in the process of being created, a
* random wait is observed (if no waiting already occurred waiting for a session to become free) before creation
* takes place, just in case one of the created sessions got used then {@link #returnSession(SessionWrapper)} in the meantime.</li>
* </ul>
* The random wait prevents the "thundering herd" effect when all threads needing a session at the same time create a new
* one even though some differentiated waits could have led to better reuse and less session creations.
* @param allowWait usually <code>true</code> except in tests that know there's no point in waiting because nothing
* will happen...
public SessionWrapper get(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
TimeSource timeSource = cloudManager.getTimeSource();
long oldestUpdateTimeNs = TimeUnit.SECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS) - SESSION_EXPIRY;
int zkVersion = cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion();
synchronized (lockObj) {
SessionWrapper sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
// Best case scenario: an available session
if (sw != null) {
if (log.isDebugEnabled()) {
log.debug("reusing session {}", sw.getCreateTime());
return sw;
// Wait for a while before deciding what to do if waiting could help...
if ((creationsInProgress != 0 || hasCandidateSession(zkVersion, oldestUpdateTimeNs)) && allowWait) {
// Either an existing session might be returned and become usable while we wait, or a session in the process of being
// created might finish creation, be used then returned and become usable. So we wait.
// wait 1 to 10 secs. Random to help spread wakeups.
long waitForMs = (long) (Math.random() * 9 * 1000) + 1000;
if (log.isDebugEnabled()) {
log.debug("No sessions are available, all busy COMPUTING (or {} creations in progress). starting wait of {}ms",
creationsInProgress, waitForMs);
long waitStart = time(timeSource, MILLISECONDS);
try {
} catch (InterruptedException e) {
if (log.isDebugEnabled()) {
log.debug("out of waiting. wait of {}ms, actual time elapsed {}ms", waitForMs, timeElapsed(timeSource, waitStart, MILLISECONDS));
// We've waited, now we can either reuse immediately an available session, or immediately create a new one
sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
// Second best case scenario: an available session
if (sw != null) {
if (log.isDebugEnabled()) {
log.debug("reusing session {} after wait", sw.getCreateTime());
return sw;
// We're going to create a new Session OUTSIDE of the critical section because session creation can take quite some time
SessionWrapper newSessionWrapper = null;
try {
if (log.isDebugEnabled()) {
log.debug("Creating a new session");
Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
newSessionWrapper = new SessionWrapper(session, this);
if (log.isDebugEnabled()) {
log.debug("New session created, {}", newSessionWrapper.getCreateTime());
return newSessionWrapper;
} finally {
synchronized (lockObj) {
if (newSessionWrapper != null) {
// Session created successfully
* Returns an available session from the cache (the best one once cache strategies are defined), or null if no session
* from the cache is available (i.e. all are still COMPUTING, are too old, wrong zk version or the cache is empty).<p>
* This method must be called while holding the monitor on {@link #lockObj}.<p>
* The method updates the session status to computing.
private SessionWrapper getAvailableSession(int zkVersion, long oldestUpdateTimeNs) {
for (SessionWrapper sw : sessionWrapperSet) {
if (sw.status == Status.EXECUTING && sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
sw.status = Status.COMPUTING;
return sw;
return null;
* Returns true if there's a session in the cache that could be returned (if it was free). This is required to
* know if there's any point in waiting or if a new session should better be created right away.
private boolean hasCandidateSession(int zkVersion, long oldestUpdateTimeNs) {
for (SessionWrapper sw : sessionWrapperSet) {
if (sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
return true;
return false;
* How to get a shared Policy Session
* 1) call {@link #getSession(SolrCloudManager)}
* 2) compute all suggestions
* 3) call {@link SessionWrapper#returnSession(Policy.Session)}
* 4) perform all suggestions
* 5) call {@link SessionWrapper#release()}
public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
return getSession(cloudManager, true);
static SessionWrapper getSession(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
return sessionRef.get(cloudManager, allowWait);
* Use this to get the last used session wrapper in this thread
* @param clear whether to unset the threadlocal or not
public static SessionWrapper getLastSessionWrapper(boolean clear) {
SessionWrapper wrapper = SESSION_WRAPPPER_REF.get();
if (clear) SESSION_WRAPPPER_REF.remove();
return wrapper;
static ThreadLocal<SessionWrapper> SESSION_WRAPPPER_REF = new ThreadLocal<>();
public static class SessionWrapper {
private final long createTime;
private long lastUpdateTime;
private Policy.Session session;
public Status status;
private final SessionRef ref;
* Number of commands currently using the session in {@link Status#EXECUTING}. There is one <b>additional</b> command
* using the session and updating it if {@link #status} is {@link Status#COMPUTING}
private final AtomicInteger refCount = new AtomicInteger();
public final long zkVersion;
* Nanoseconds (since/to some arbitrary time) when the session got created. Also used in logs (only in logs!) to identify the session.
public long getCreateTime() {
return createTime;
public long getLastUpdateTime() {
return lastUpdateTime;
public SessionWrapper(Policy.Session session, SessionRef ref) {
createTime = session.cloudManager.getTimeSource().getTimeNs();
lastUpdateTime = createTime;
this.session = session;
this.status = Status.COMPUTING; // Created for being used, so COMPUTING right away
this.ref = ref;
this.zkVersion = session.getPolicy().getZkVersion();
public Policy.Session get() {
return session;
public void update(Policy.Session session) {
// JMM multithreaded access issue on lastUpdateTime.
this.lastUpdateTime = session.cloudManager.getTimeSource().getTimeNs();
this.session = session;
public int getRefCount() {
return refCount.get();
* return this for later use and update the session with the latest state
* ensure that this is done after computing the suggestions
public void returnSession(Policy.Session session) {
if (this.status != Status.COMPUTING) {
log.warn("returning session {} not in state COMPUTING", this.getCreateTime());
* return this for later use without updating the internal Session for cases where it's easier to update separately
public void returnSession() {
//all ops are executed now it can be destroyed
public void release() {
if (refCount.decrementAndGet() <= 0) {