blob: 593caaea24e1cd90ca6b79b6d7b5ebb3ded73e34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.client.solrj.cloud.autoscaling;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiPredicate;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.cloud.DistribStateManager;
import org.apache.solr.client.solrj.cloud.SolrCloudManager;
import org.apache.solr.client.solrj.cloud.autoscaling.Suggester.Hint;
import org.apache.solr.client.solrj.impl.ClusterStateProvider;
import org.apache.solr.common.ConditionalMapWriter;
import org.apache.solr.common.IteratorWriter;
import org.apache.solr.common.MapWriter;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.ReplicaPosition;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.TimeSource;
import org.apache.solr.common.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Type.improvement;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Type.repair;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Type.unresolved_violation;
import static org.apache.solr.client.solrj.cloud.autoscaling.Suggestion.Type.violation;
import static org.apache.solr.client.solrj.cloud.autoscaling.Variable.Type.FREEDISK;
import static org.apache.solr.common.ConditionalMapWriter.dedupeKeyPredicate;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.ADDREPLICA;
import static org.apache.solr.common.params.CollectionParams.CollectionAction.MOVEREPLICA;
import static org.apache.solr.common.params.CoreAdminParams.NODE;
import static org.apache.solr.common.util.Utils.handleExp;
import static org.apache.solr.common.util.Utils.time;
import static org.apache.solr.common.util.Utils.timeElapsed;
/**
*
* @deprecated to be removed in Solr 9.0 (see SOLR-14656)
*/
public class PolicyHelper {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String POLICY_MAPPING_KEY = "PolicyHelper.policyMapping";
@SuppressWarnings({"unchecked"})
private static ThreadLocal<Map<String, String>> getPolicyMapping(SolrCloudManager cloudManager) {
return (ThreadLocal<Map<String, String>>) cloudManager.getObjectCache()
.computeIfAbsent(POLICY_MAPPING_KEY, k -> new ThreadLocal<>());
}
public static List<ReplicaPosition> getReplicaLocations(String collName, AutoScalingConfig autoScalingConfig,
SolrCloudManager cloudManager,
Map<String, String> optionalPolicyMapping,
List<String> shardNames,
int nrtReplicas,
int tlogReplicas,
int pullReplicas,
List<String> nodesList) {
List<ReplicaPosition> positions = new ArrayList<>();
ThreadLocal<Map<String, String>> policyMapping = getPolicyMapping(cloudManager);
ClusterStateProvider stateProvider = new DelegatingClusterStateProvider(cloudManager.getClusterStateProvider()) {
@Override
public String getPolicyNameByCollection(String coll) {
return policyMapping.get() != null && policyMapping.get().containsKey(coll) ?
optionalPolicyMapping.get(coll) :
delegate.getPolicyNameByCollection(coll);
}
};
SolrCloudManager delegatingManager = new DelegatingCloudManager(cloudManager) {
@Override
public ClusterStateProvider getClusterStateProvider() {
return stateProvider;
}
@Override
public DistribStateManager getDistribStateManager() {
if (autoScalingConfig != null) {
return new DelegatingDistribStateManager(null) {
@Override
public AutoScalingConfig getAutoScalingConfig() {
return autoScalingConfig;
}
};
} else {
return super.getDistribStateManager();
}
}
};
policyMapping.set(optionalPolicyMapping);
SessionWrapper sessionWrapper = null;
try {
try {
SESSION_WRAPPPER_REF.set(sessionWrapper = getSession(delegatingManager));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "unable to get autoscaling policy session", e);
}
Policy.Session origSession = sessionWrapper.session;
// new session needs to be created to avoid side-effects from per-collection policies
// TODO: refactor so cluster state cache is separate from storage of policies to avoid per cluster vs per collection interactions
// Need a Session that has all previous history of the original session, NOT filtered by what's present or not in Zookeeper
// (as does constructor Session(SolrCloudManager, Policy, Transaction)).
Policy.Session newSession = origSession.cloneToNewSession(delegatingManager);
Map<String, Double> diskSpaceReqd = new HashMap<>();
try {
DocCollection coll = cloudManager.getClusterStateProvider().getCollection(collName);
if (coll != null) {
for (String shardName : shardNames) {
Replica ldr = coll.getLeader(shardName);
if (ldr != null && cloudManager.getClusterStateProvider().getLiveNodes().contains(ldr.getNodeName())) {
Map<String, Map<String, List<ReplicaInfo>>> details = cloudManager.getNodeStateProvider().getReplicaInfo(ldr.getNodeName(),
Collections.singleton(FREEDISK.perReplicaValue));
ReplicaInfo replicaInfo = details.getOrDefault(collName, emptyMap()).getOrDefault(shardName, singletonList(null)).get(0);
if (replicaInfo != null) {
Object idxSz = replicaInfo.getVariables().get(FREEDISK.perReplicaValue);
if (idxSz != null) {
diskSpaceReqd.put(shardName, 1.5 * (Double) Variable.Type.FREEDISK.validate(null, idxSz, false));
}
}
}
}
}
} catch (IOException e) {
log.warn("Exception while reading disk free metric values for nodes to be used for collection: {}", collName, e);
}
Map<Replica.Type, Integer> typeVsCount = new EnumMap<>(Replica.Type.class);
typeVsCount.put(Replica.Type.NRT, nrtReplicas);
typeVsCount.put(Replica.Type.TLOG, tlogReplicas);
typeVsCount.put(Replica.Type.PULL, pullReplicas);
for (String shardName : shardNames) {
int idx = 0;
for (Map.Entry<Replica.Type, Integer> e : typeVsCount.entrySet()) {
for (int i = 0; i < e.getValue(); i++) {
Suggester suggester = newSession.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, e.getKey())
.hint(Hint.COLL_SHARD, new Pair<>(collName, shardName));
if (nodesList != null) {
for (String nodeName : nodesList) {
suggester = suggester.hint(Hint.TARGET_NODE, nodeName);
}
}
if (diskSpaceReqd.get(shardName) != null) {
suggester.hint(Hint.MINFREEDISK, diskSpaceReqd.get(shardName));
}
@SuppressWarnings({"rawtypes"})
SolrRequest op = suggester.getSuggestion();
if (op == null) {
String errorId = "AutoScaling.error.diagnostics." + System.nanoTime();
Policy.Session sessionCopy = suggester.session;
log.error("errorId : {} {}", errorId
, handleExp(log, "", () -> Utils.writeJson(getDiagnostics(sessionCopy), new StringWriter(), true).toString())); // logOk
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, " No node can satisfy the rules " +
Utils.toJSONString(Utils.getDeepCopy(newSession.expandedClauses, 4, true) + " More details from logs in node : "
+ Utils.getMDCNode() + ", errorId : " + errorId));
}
newSession = suggester.getSession();
positions.add(new ReplicaPosition(shardName, ++idx, e.getKey(), op.getParams().get(NODE)));
}
}
}
// We're happy with the updated session based on the original one, so let's update what the wrapper would hand
// to the next computation that wants a session.
sessionWrapper.update(newSession);
} finally {
policyMapping.remove();
// We mark the wrapper (and its session) as being available to others.
if (sessionWrapper != null) {
sessionWrapper.returnSession();
}
}
return positions;
}
public static final int SESSION_EXPIRY = 180; // 3 minutes
public static MapWriter getDiagnostics(Policy policy, SolrCloudManager cloudManager) {
Policy.Session session = policy.createSession(cloudManager);
return getDiagnostics(session);
}
public static MapWriter getDiagnostics(Policy.Session session) {
List<Row> sorted = session.getSortedNodes();
return ew -> {
writeNodes(ew, sorted);
ew.put("liveNodes", session.cloudManager.getClusterStateProvider().getLiveNodes())
.put("violations", session.getViolations())
.put("config", session.getPolicy());
};
}
static void writeNodes(MapWriter.EntryWriter ew, List<Row> sorted) throws IOException {
Set<CharSequence> alreadyWritten = new HashSet<>();
BiPredicate<CharSequence, Object> p = dedupeKeyPredicate(alreadyWritten)
.and(ConditionalMapWriter.NON_NULL_VAL)
.and((s, o) -> !(o instanceof Map) || !((Map) o).isEmpty());
ew.put("sortedNodes", (IteratorWriter) iw -> {
for (Row row : sorted) {
iw.add((MapWriter) ew1 -> {
alreadyWritten.clear();
ew1.put("node", row.node, p).
put("isLive", row.isLive, p);
for (Cell cell : row.getCells())
ew1.put(cell.name, cell.val, p);
ew1.put("replicas", row.collectionVsShardVsReplicas);
});
}
});
}
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
SolrCloudManager cloudManager, SolrParams params) {
return getSuggestions(autoScalingConf, cloudManager, 20, 10, params);
}
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
SolrCloudManager cloudManager) {
return getSuggestions(autoScalingConf, cloudManager, 20, 10, null);
}
public static List<Suggester.SuggestionInfo> getSuggestions(AutoScalingConfig autoScalingConf,
SolrCloudManager cloudManager, int max, int timeoutInSecs, SolrParams params) {
Policy policy = autoScalingConf.getPolicy();
Suggestion.Ctx ctx = new Suggestion.Ctx();
ctx.endTime = cloudManager.getTimeSource().getTimeNs() + TimeUnit.SECONDS.toNanos(timeoutInSecs);
ctx.max = max;
ctx.session = policy.createSession(cloudManager);
String[] t = params == null ? null : params.getParams("type");
List<String> types = t == null? Collections.emptyList(): Arrays.asList(t);
if(types.isEmpty() || types.contains(violation.name())) {
List<Violation> violations = ctx.session.getViolations();
for (Violation violation : violations) {
violation.getClause().getThirdTag().varType.getSuggestions(ctx.setViolation(violation));
ctx.violation = null;
}
for (Violation current : ctx.session.getViolations()) {
for (Violation old : violations) {
if (!ctx.needMore()) return ctx.getSuggestions();
if (current.equals(old)) {
//could not be resolved
ctx.suggestions.add(new Suggester.SuggestionInfo(current, null, unresolved_violation));
break;
}
}
}
}
if(types.isEmpty() || types.contains(repair.name())) {
if (ctx.needMore()) {
try {
addMissingReplicas(cloudManager, ctx);
} catch (IOException e) {
log.error("Unable to fetch cluster state", e);
}
}
}
if(types.isEmpty() || types.contains(improvement.name())) {
if (ctx.needMore()) {
suggestOptimizations(ctx, Math.min(ctx.max - ctx.getSuggestions().size(), 10));
}
}
return ctx.getSuggestions();
}
private static void addMissingReplicas(SolrCloudManager cloudManager, Suggestion.Ctx ctx) throws IOException {
cloudManager.getClusterStateProvider().getClusterState().forEachCollection(coll -> coll.forEach(slice -> {
if (!ctx.needMore()) return;
ReplicaCount replicaCount = new ReplicaCount();
slice.forEach(replica -> {
if (replica.getState() == Replica.State.ACTIVE || replica.getState() == Replica.State.RECOVERING) {
replicaCount.increment(replica.getType());
}
});
addMissingReplicas(replicaCount, coll, slice.getName(), Replica.Type.NRT, ctx);
addMissingReplicas(replicaCount, coll, slice.getName(), Replica.Type.PULL, ctx);
addMissingReplicas(replicaCount, coll, slice.getName(), Replica.Type.TLOG, ctx);
}
));
}
@SuppressWarnings({"unchecked"})
private static void addMissingReplicas(ReplicaCount count, DocCollection coll, String shard, Replica.Type type, Suggestion.Ctx ctx) {
int delta = count.delta(coll.getExpectedReplicaCount(type, 0), type);
for (; ; ) {
if (!ctx.needMore()) return;
if (delta >= 0) break;
@SuppressWarnings({"rawtypes"})
SolrRequest suggestion = ctx.addSuggestion(
ctx.session.getSuggester(ADDREPLICA)
.hint(Hint.REPLICATYPE, type)
.hint(Hint.COLL_SHARD, new Pair(coll.getName(), shard)), Suggestion.Type.repair);
if (suggestion == null) return;
delta++;
}
}
private static void suggestOptimizations(Suggestion.Ctx ctx, int count) {
int maxTotalSuggestions = ctx.getSuggestions().size() + count;
List<Row> matrix = ctx.session.matrix;
if (matrix.isEmpty()) return;
for (int i = 0; i < matrix.size(); i++) {
if (ctx.getSuggestions().size() >= maxTotalSuggestions || ctx.hasTimedOut()) break;
Row row = matrix.get(i);
Map<String, Collection<String>> collVsShards = new HashMap<>();
row.forEachReplica(ri -> collVsShards.computeIfAbsent(ri.getCollection(), s -> new HashSet<>()).add(ri.getShard()));
for (Map.Entry<String, Collection<String>> e : collVsShards.entrySet()) {
e.setValue(FreeDiskVariable.getSortedShards(Collections.singletonList(row), e.getValue(), e.getKey()));
}
for (Map.Entry<String, Collection<String>> e : collVsShards.entrySet()) {
if (!ctx.needMore()) return;
if (ctx.getSuggestions().size() >= maxTotalSuggestions || ctx.hasTimedOut()) break;
for (String shard : e.getValue()) {
Suggester suggester = ctx.session.getSuggester(MOVEREPLICA)
.hint(Hint.COLL_SHARD, new Pair<>(e.getKey(), shard))
.hint(Hint.SRC_NODE, row.node);
ctx.addSuggestion(suggester, Suggestion.Type.improvement);
if (ctx.getSuggestions().size() >= maxTotalSuggestions) break;
}
}
}
}
/**
* Use this to dump the state of a system and to generate a testcase
*/
public static void logState(SolrCloudManager cloudManager, Suggester suggester) {
if (log.isTraceEnabled()) {
try {
if (log.isTraceEnabled()) {
log.trace("LOGSTATE: {}",
Utils.writeJson(loggingInfo(cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy(), cloudManager, suggester),
new StringWriter(), true));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
static MapWriter loggingInfo(Policy policy, SolrCloudManager cloudManager, Suggester suggester) {
return ew -> {
ew.put("diagnostics", getDiagnostics(policy,
cloudManager));
if (suggester != null) {
ew.put("suggester", suggester);
}
};
}
public enum Status {
/**
* A command is actively using and modifying the session to compute placements
*/
COMPUTING,
/**
* A command is not done yet processing its changes but no longer updates or even uses the session
*/
EXECUTING
}
/**
* This class stores sessions for sharing purposes. If a process requires a session to
* compute operations:
* <ol>
* <li>see if there is an available non expired session in the cache,</li>
* <li>if yes, borrow it.</li>
* <li>if no, create a new one and borrow it.</li>
* <li>after computing (update) operations are done, {@link #returnSession(SessionWrapper)} back to the cache so it's
* again available for borrowing.</li>
* <li>after all borrowers are done computing then executing with the session, {@link #release(SessionWrapper)} it,
* which removes it from the cache.</li>
* </ol>
*/
static class SessionRef {
/**
* Lock protecting access to {@link #sessionWrapperSet} and to {@link #creationsInProgress}
*/
private final Object lockObj = new Object();
/**
* Sessions currently in use in {@link Status#COMPUTING} or {@link Status#EXECUTING} states. As soon as all
* uses of a session are over, that session is removed from this set. Sessions not actively in use are NOT kept around.
*
* <p>Access should only be done under the protection of {@link #lockObj}</p>
*/
private Set<SessionWrapper> sessionWrapperSet = Collections.newSetFromMap(new IdentityHashMap<>());
/**
* Number of sessions currently being created but not yet present in {@link #sessionWrapperSet}.
*
* <p>Access should only be done under the protection of {@link #lockObj}</p>
*/
private int creationsInProgress = 0;
public SessionRef() {
}
// used only by tests
boolean isEmpty() {
synchronized (lockObj) {
return sessionWrapperSet.isEmpty();
}
}
/**
* All operations suggested by the current session object
* is complete. Do not even cache anything
*/
private void release(SessionWrapper sessionWrapper) {
boolean present;
synchronized (lockObj) {
present = sessionWrapperSet.remove(sessionWrapper);
}
if (!present) {
log.warn("released session {} not found in session set", sessionWrapper.getCreateTime());
} else {
if (log.isDebugEnabled()) {
TimeSource timeSource = sessionWrapper.session.cloudManager.getTimeSource();
log.debug("final release, session {} lived a total of {}ms, ", sessionWrapper.getCreateTime(),
timeElapsed(timeSource, TimeUnit.MILLISECONDS.convert(sessionWrapper.getCreateTime(),
TimeUnit.NANOSECONDS), MILLISECONDS)); // logOk
}
}
}
/**
* Computing is over for this session and it may contain a new session with new state
* The session can be used by others while the caller is performing operations
*/
private void returnSession(SessionWrapper sessionWrapper) {
boolean present;
synchronized (lockObj) {
sessionWrapper.status = Status.EXECUTING;
present = sessionWrapperSet.contains(sessionWrapper);
// wake up single thread waiting for a session return (ok if not woken up, wait is short)
// Important to wake up a single one, otherwise of multiple waiting threads, all but one will immediately create new sessions
lockObj.notify();
}
// Logging
if (present) {
if (log.isDebugEnabled()) {
log.debug("returnSession {}", sessionWrapper.getCreateTime());
}
} else {
log.warn("returning unknown session {} ", sessionWrapper.getCreateTime());
}
}
/**
* <p>Method returning an available session that can be used for {@link Status#COMPUTING}, either from the
* {@link #sessionWrapperSet} cache or by creating a new one. The status of the returned session is set to {@link Status#COMPUTING}.</p>
*
* Some waiting is done in two cases:
* <ul>
* <li>A candidate session is present in {@link #sessionWrapperSet} but is still {@link Status#COMPUTING}, a random wait
* is observed to see if the session gets freed to save a session creation and allow session reuse,</li>
* <li>It is necessary to create a new session but there are already sessions in the process of being created, a
* random wait is observed (if no waiting already occurred waiting for a session to become free) before creation
* takes place, just in case one of the created sessions got used then {@link #returnSession(SessionWrapper)} in the meantime.</li>
* </ul>
*
* The random wait prevents the "thundering herd" effect when all threads needing a session at the same time create a new
* one even though some differentiated waits could have led to better reuse and less session creations.
*
* @param allowWait usually <code>true</code> except in tests that know there's no point in waiting because nothing
* will happen...
*/
public SessionWrapper get(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
TimeSource timeSource = cloudManager.getTimeSource();
long oldestUpdateTimeNs = TimeUnit.SECONDS.convert(timeSource.getTimeNs(), TimeUnit.NANOSECONDS) - SESSION_EXPIRY;
int zkVersion = cloudManager.getDistribStateManager().getAutoScalingConfig().getZkVersion();
synchronized (lockObj) {
SessionWrapper sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
// Best case scenario: an available session
if (sw != null) {
if (log.isDebugEnabled()) {
log.debug("reusing session {}", sw.getCreateTime());
}
return sw;
}
// Wait for a while before deciding what to do if waiting could help...
if ((creationsInProgress != 0 || hasCandidateSession(zkVersion, oldestUpdateTimeNs)) && allowWait) {
// Either an existing session might be returned and become usable while we wait, or a session in the process of being
// created might finish creation, be used then returned and become usable. So we wait.
// wait 1 to 10 secs. Random to help spread wakeups.
long waitForMs = (long) (Math.random() * 9 * 1000) + 1000;
if (log.isDebugEnabled()) {
log.debug("No sessions are available, all busy COMPUTING (or {} creations in progress). starting wait of {}ms",
creationsInProgress, waitForMs);
}
long waitStart = time(timeSource, MILLISECONDS);
try {
lockObj.wait(waitForMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (log.isDebugEnabled()) {
log.debug("out of waiting. wait of {}ms, actual time elapsed {}ms", waitForMs, timeElapsed(timeSource, waitStart, MILLISECONDS));
}
// We've waited, now we can either reuse immediately an available session, or immediately create a new one
sw = getAvailableSession(zkVersion, oldestUpdateTimeNs);
// Second best case scenario: an available session
if (sw != null) {
if (log.isDebugEnabled()) {
log.debug("reusing session {} after wait", sw.getCreateTime());
}
return sw;
}
}
// We're going to create a new Session OUTSIDE of the critical section because session creation can take quite some time
creationsInProgress++;
}
SessionWrapper newSessionWrapper = null;
try {
if (log.isDebugEnabled()) {
log.debug("Creating a new session");
}
Policy.Session session = cloudManager.getDistribStateManager().getAutoScalingConfig().getPolicy().createSession(cloudManager);
newSessionWrapper = new SessionWrapper(session, this);
if (log.isDebugEnabled()) {
log.debug("New session created, {}", newSessionWrapper.getCreateTime());
}
return newSessionWrapper;
} finally {
synchronized (lockObj) {
creationsInProgress--;
if (newSessionWrapper != null) {
// Session created successfully
sessionWrapperSet.add(newSessionWrapper);
}
}
}
}
/**
* Returns an available session from the cache (the best one once cache strategies are defined), or null if no session
* from the cache is available (i.e. all are still COMPUTING, are too old, wrong zk version or the cache is empty).<p>
* This method must be called while holding the monitor on {@link #lockObj}.<p>
* The method updates the session status to computing.
*/
private SessionWrapper getAvailableSession(int zkVersion, long oldestUpdateTimeNs) {
for (SessionWrapper sw : sessionWrapperSet) {
if (sw.status == Status.EXECUTING && sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
sw.status = Status.COMPUTING;
return sw;
}
}
return null;
}
/**
* Returns true if there's a session in the cache that could be returned (if it was free). This is required to
* know if there's any point in waiting or if a new session should better be created right away.
*/
private boolean hasCandidateSession(int zkVersion, long oldestUpdateTimeNs) {
for (SessionWrapper sw : sessionWrapperSet) {
if (sw.getLastUpdateTime() >= oldestUpdateTimeNs && sw.zkVersion == zkVersion) {
return true;
}
}
return false;
}
}
/**
* How to get a shared Policy Session
* 1) call {@link #getSession(SolrCloudManager)}
* 2) compute all suggestions
* 3) call {@link SessionWrapper#returnSession(Policy.Session)}
* 4) perform all suggestions
* 5) call {@link SessionWrapper#release()}
*/
public static SessionWrapper getSession(SolrCloudManager cloudManager) throws IOException, InterruptedException {
return getSession(cloudManager, true);
}
static SessionWrapper getSession(SolrCloudManager cloudManager, boolean allowWait) throws IOException, InterruptedException {
SessionRef sessionRef = (SessionRef) cloudManager.getObjectCache().computeIfAbsent(SessionRef.class.getName(), s -> new SessionRef());
return sessionRef.get(cloudManager, allowWait);
}
/**
* Use this to get the last used session wrapper in this thread
*
* @param clear whether to unset the threadlocal or not
*/
public static SessionWrapper getLastSessionWrapper(boolean clear) {
SessionWrapper wrapper = SESSION_WRAPPPER_REF.get();
if (clear) SESSION_WRAPPPER_REF.remove();
return wrapper;
}
static ThreadLocal<SessionWrapper> SESSION_WRAPPPER_REF = new ThreadLocal<>();
public static class SessionWrapper {
private final long createTime;
private long lastUpdateTime;
private Policy.Session session;
public Status status;
private final SessionRef ref;
/**
* Number of commands currently using the session in {@link Status#EXECUTING}. There is one <b>additional</b> command
* using the session and updating it if {@link #status} is {@link Status#COMPUTING}
*/
private final AtomicInteger refCount = new AtomicInteger();
public final long zkVersion;
/**
* Nanoseconds (since/to some arbitrary time) when the session got created. Also used in logs (only in logs!) to identify the session.
*/
public long getCreateTime() {
return createTime;
}
public long getLastUpdateTime() {
return lastUpdateTime;
}
public SessionWrapper(Policy.Session session, SessionRef ref) {
createTime = session.cloudManager.getTimeSource().getTimeNs();
lastUpdateTime = createTime;
this.session = session;
this.status = Status.COMPUTING; // Created for being used, so COMPUTING right away
this.ref = ref;
this.zkVersion = session.getPolicy().getZkVersion();
}
public Policy.Session get() {
return session;
}
public void update(Policy.Session session) {
// JMM multithreaded access issue on lastUpdateTime.
this.lastUpdateTime = session.cloudManager.getTimeSource().getTimeNs();
this.session = session;
}
public int getRefCount() {
return refCount.get();
}
/**
* return this for later use and update the session with the latest state
* ensure that this is done after computing the suggestions
*/
public void returnSession(Policy.Session session) {
if (this.status != Status.COMPUTING) {
log.warn("returning session {} not in state COMPUTING", this.getCreateTime());
}
this.update(session);
this.returnSession();
}
/**
* return this for later use without updating the internal Session for cases where it's easier to update separately
*/
public void returnSession() {
refCount.incrementAndGet();
ref.returnSession(this);
}
//all ops are executed now it can be destroyed
public void release() {
if (refCount.decrementAndGet() <= 0) {
ref.release(this);
}
}
}
}