blob: 5e854eb476161a8433e6de27b2fe10b1bbc53fc2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.core;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.logging.MDCLoggingContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.invoke.MethodHandles;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
class SolrCores {
private static Object modifyLock = new Object(); // for locking around manipulating any of the core maps.
private final Map<String, SolrCore> cores = new LinkedHashMap<>(); // For "permanent" cores
// These descriptors, once loaded, will _not_ be unloaded, i.e. they are not "transient".
private final Map<String, CoreDescriptor> residentDescriptors = new LinkedHashMap<>();
private final CoreContainer container;
private Set<String> currentlyLoadingCores = Collections.newSetFromMap(new ConcurrentHashMap<String,Boolean>());
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// This map will hold objects that are being currently operated on. The core (value) may be null in the case of
// initial load. The rule is, never to any operation on a core that is currently being operated upon.
private static final Set<String> pendingCoreOps = new HashSet<>();
// Due to the fact that closes happen potentially whenever anything is _added_ to the transient core list, we need
// to essentially queue them up to be handled via pendingCoreOps.
private static final List<SolrCore> pendingCloses = new ArrayList<>();
private TransientSolrCoreCacheFactory transientSolrCoreCacheFactory;
SolrCores(CoreContainer container) {
this.container = container;
}
protected void addCoreDescriptor(CoreDescriptor p) {
synchronized (modifyLock) {
if (p.isTransient()) {
getTransientCacheHandler().addTransientDescriptor(p.getName(), p);
} else {
residentDescriptors.put(p.getName(), p);
}
}
}
protected void removeCoreDescriptor(CoreDescriptor p) {
synchronized (modifyLock) {
if (p.isTransient()) {
getTransientCacheHandler().removeTransientDescriptor(p.getName());
} else {
residentDescriptors.remove(p.getName());
}
}
}
public void load(SolrResourceLoader loader) {
synchronized (modifyLock) {
transientSolrCoreCacheFactory = TransientSolrCoreCacheFactory.newInstance(loader, container);
}
}
// We are shutting down. You can't hold the lock on the various lists of cores while they shut down, so we need to
// make a temporary copy of the names and shut them down outside the lock.
protected void close() {
waitForLoadingCoresToFinish(30*1000);
Collection<SolrCore> coreList = new ArrayList<>();
// Release transient core cache.
synchronized (modifyLock) {
if (transientSolrCoreCacheFactory != null) {
getTransientCacheHandler().close();
}
}
// It might be possible for one of the cores to move from one list to another while we're closing them. So
// loop through the lists until they're all empty. In particular, the core could have moved from the transient
// list to the pendingCloses list.
do {
coreList.clear();
synchronized (modifyLock) {
// make a copy of the cores then clear the map so the core isn't handed out to a request again
coreList.addAll(cores.values());
cores.clear();
if (transientSolrCoreCacheFactory != null) {
coreList.addAll(getTransientCacheHandler().prepareForShutdown());
}
coreList.addAll(pendingCloses);
pendingCloses.clear();
}
ExecutorService coreCloseExecutor = ExecutorUtil.newMDCAwareFixedThreadPool(Integer.MAX_VALUE,
new SolrNamedThreadFactory("coreCloseExecutor"));
try {
for (SolrCore core : coreList) {
coreCloseExecutor.submit(() -> {
MDCLoggingContext.setCore(core);
try {
core.close();
} catch (Throwable e) {
SolrException.log(log, "Error shutting down core", e);
if (e instanceof Error) {
throw (Error) e;
}
} finally {
MDCLoggingContext.clear();
}
return core;
});
}
} finally {
ExecutorUtil.shutdownAndAwaitTermination(coreCloseExecutor);
}
} while (coreList.size() > 0);
}
// Returns the old core if there was a core of the same name.
//WARNING! This should be the _only_ place you put anything into the list of transient cores!
protected SolrCore putCore(CoreDescriptor cd, SolrCore core) {
synchronized (modifyLock) {
addCoreDescriptor(cd); // cd must always be registered if we register a core
if (cd.isTransient()) {
return getTransientCacheHandler().addCore(cd.getName(), core);
} else {
return cores.put(cd.getName(), core);
}
}
}
/**
* @return A list of "permanent" cores, i.e. cores that may not be swapped out and are currently loaded.
*
* A core may be non-transient but still lazily loaded. If it is "permanent" and lazy-load _and_
* not yet loaded it will _not_ be returned by this call.
*
* This list is a new copy, it can be modified by the caller (e.g. it can be sorted).
*
* Note: This is one of the places where SolrCloud is incompatible with Transient Cores. This call is used in
* cancelRecoveries, transient cores don't participate.
*/
List<SolrCore> getCores() {
synchronized (modifyLock) {
return new ArrayList<>(cores.values());
}
}
/**
* Gets the cores that are currently loaded, i.e. cores that have
* 1> loadOnStartup=true and are either not-transient or, if transient, have been loaded and have not been aged out
* 2> loadOnStartup=false and have been loaded but either non-transient or have not been aged out.
*
* Put another way, this will not return any names of cores that are lazily loaded but have not been called for yet
* or are transient and either not loaded or have been swapped out.
*
* @return An unsorted list. This list is a new copy, it can be modified by the caller (e.g. it can be sorted).
*/
List<String> getLoadedCoreNames() {
synchronized (modifyLock) {
return distinctSetsUnion(cores.keySet(), getTransientCacheHandler().getLoadedCoreNames());
}
}
/**
* Gets a collection of all cores names, loaded and unloaded.
* For efficiency, prefer to check {@link #getCoreDescriptor(String)} != null instead of {@link #getAllCoreNames()}.contains(String)
*
* @return An unsorted list. This list is a new copy, it can be modified by the caller (e.g. it can be sorted).
*/
public List<String> getAllCoreNames() {
synchronized (modifyLock) {
return distinctSetsUnion(residentDescriptors.keySet(), getTransientCacheHandler().getAllCoreNames());
}
}
/**
* Makes the union of two distinct sets.
*
* @return An unsorted list. This list is a new copy, it can be modified by the caller (e.g. it can be sorted).
*/
private static <T> List<T> distinctSetsUnion(Set<T> set1, Set<T> set2) {
assert areSetsDistinct(set1, set2);
List<T> union = new ArrayList<>(set1.size() + set2.size());
union.addAll(set1);
union.addAll(set2);
return union;
}
/**
* Indicates whether two sets are distinct (intersection is empty).
*/
private static <T> boolean areSetsDistinct(Set<T> set1, Set<T> set2) {
return set1.stream().noneMatch(set2::contains);
}
/**
* Gets the number of currently loaded permanent (non transient) cores.
* Faster equivalent for {@link #getCores()}.size().
*/
int getNumLoadedPermanentCores() {
synchronized (modifyLock) {
return cores.size();
}
}
/**
* Gets the number of currently loaded transient cores.
*/
int getNumLoadedTransientCores() {
synchronized (modifyLock) {
return getTransientCacheHandler().getLoadedCoreNames().size();
}
}
/**
* Gets the number of unloaded cores, including permanent and transient cores.
*/
int getNumUnloadedCores() {
synchronized (modifyLock) {
assert areSetsDistinct(residentDescriptors.keySet(), getTransientCacheHandler().getAllCoreNames());
return getTransientCacheHandler().getAllCoreNames().size() - getTransientCacheHandler().getLoadedCoreNames().size()
+ residentDescriptors.size() - cores.size();
}
}
/**
* Gets the total number of cores, including permanent and transient cores, loaded and unloaded cores.
* Faster equivalent for {@link #getAllCoreNames()}.size().
*/
public int getNumAllCores() {
synchronized (modifyLock) {
assert areSetsDistinct(residentDescriptors.keySet(), getTransientCacheHandler().getAllCoreNames());
return residentDescriptors.size() + getTransientCacheHandler().getAllCoreNames().size();
}
}
SolrCore getCore(String name) {
synchronized (modifyLock) {
return cores.get(name);
}
}
protected void swap(String n0, String n1) {
synchronized (modifyLock) {
SolrCore c0 = cores.get(n0);
SolrCore c1 = cores.get(n1);
if (c0 == null) { // Might be an unloaded transient core
c0 = container.getCore(n0);
if (c0 == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such core: " + n0);
}
}
if (c1 == null) { // Might be an unloaded transient core
c1 = container.getCore(n1);
if (c1 == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "No such core: " + n1);
}
}
// When we swap the cores, we also need to swap the associated core descriptors. Note, this changes the
// name of the coreDescriptor by virtue of the c-tor
CoreDescriptor cd1 = c1.getCoreDescriptor();
addCoreDescriptor(new CoreDescriptor(n1, c0.getCoreDescriptor()));
addCoreDescriptor(new CoreDescriptor(n0, cd1));
cores.put(n0, c1);
cores.put(n1, c0);
c0.setName(n1);
c1.setName(n0);
container.getMetricManager().swapRegistries(
c0.getCoreMetricManager().getRegistryName(),
c1.getCoreMetricManager().getRegistryName());
}
}
protected SolrCore remove(String name) {
synchronized (modifyLock) {
SolrCore ret = cores.remove(name);
// It could have been a newly-created core. It could have been a transient core. The newly-created cores
// in particular should be checked. It could have been a dynamic core.
if (ret == null) {
ret = getTransientCacheHandler().removeCore(name);
}
return ret;
}
}
SolrCore getCoreFromAnyList(String name, boolean incRefCount) {
return getCoreFromAnyList(name, incRefCount, null);
}
/* If you don't increment the reference count, someone could close the core before you use it. */
SolrCore getCoreFromAnyList(String name, boolean incRefCount, UUID coreId) {
synchronized (modifyLock) {
SolrCore core = cores.get(name);
if (core == null) {
core = getTransientCacheHandler().getCore(name);
}
if(core != null && coreId != null && coreId != core.uniqueId) return null;
if (core != null && incRefCount) {
core.open();
}
return core;
}
}
// See SOLR-5366 for why the UNLOAD command needs to know whether a core is actually loaded or not, it might have
// to close the core. However, there's a race condition. If the core happens to be in the pending "to close" queue,
// we should NOT close it in unload core.
protected boolean isLoadedNotPendingClose(String name) {
// Just all be synchronized
synchronized (modifyLock) {
if (cores.containsKey(name)) {
return true;
}
if (getTransientCacheHandler().containsCore(name)) {
// Check pending
for (SolrCore core : pendingCloses) {
if (core.getName().equals(name)) {
return false;
}
}
return true;
}
}
return false;
}
protected boolean isLoaded(String name) {
synchronized (modifyLock) {
return cores.containsKey(name) || getTransientCacheHandler().containsCore(name);
}
}
protected CoreDescriptor getUnloadedCoreDescriptor(String cname) {
synchronized (modifyLock) {
CoreDescriptor desc = residentDescriptors.get(cname);
if (desc == null) {
desc = getTransientCacheHandler().getTransientDescriptor(cname);
if (desc == null) {
return null;
}
}
return new CoreDescriptor(cname, desc);
}
}
// Wait here until any pending operations (load, unload or reload) are completed on this core.
protected SolrCore waitAddPendingCoreOps(String name) {
// Keep multiple threads from operating on a core at one time.
synchronized (modifyLock) {
boolean pending;
do { // Are we currently doing anything to this core? Loading, unloading, reloading?
pending = pendingCoreOps.contains(name); // wait for the core to be done being operated upon
if (! pending) { // Linear list, but shouldn't be too long
for (SolrCore core : pendingCloses) {
if (core.getName().equals(name)) {
pending = true;
break;
}
}
}
if (container.isShutDown()) return null; // Just stop already.
if (pending) {
try {
modifyLock.wait();
} catch (InterruptedException e) {
return null; // Seems best not to do anything at all if the thread is interrupted
}
}
} while (pending);
// We _really_ need to do this within the synchronized block!
if (! container.isShutDown()) {
if (! pendingCoreOps.add(name)) {
log.warn("Replaced an entry in pendingCoreOps {}, we should not be doing this", name);
}
return getCoreFromAnyList(name, false); // we might have been _unloading_ the core, so return the core if it was loaded.
}
}
return null;
}
// We should always be removing the first thing in the list with our name! The idea here is to NOT do anything n
// any core while some other operation is working on that core.
protected void removeFromPendingOps(String name) {
synchronized (modifyLock) {
if (! pendingCoreOps.remove(name)) {
log.warn("Tried to remove core {} from pendingCoreOps and it wasn't there. ", name);
}
modifyLock.notifyAll();
}
}
protected Object getModifyLock() {
return modifyLock;
}
// Be a little careful. We don't want to either open or close a core unless it's _not_ being opened or closed by
// another thread. So within this lock we'll walk along the list of pending closes until we find something NOT in
// the list of threads currently being loaded or reloaded. The "usual" case will probably return the very first
// one anyway..
protected SolrCore getCoreToClose() {
synchronized (modifyLock) {
for (SolrCore core : pendingCloses) {
if (! pendingCoreOps.contains(core.getName())) {
pendingCoreOps.add(core.getName());
pendingCloses.remove(core);
return core;
}
}
}
return null;
}
/**
* Return the CoreDescriptor corresponding to a given core name.
* Blocks if the SolrCore is still loading until it is ready.
* @param coreName the name of the core
* @return the CoreDescriptor
*/
public CoreDescriptor getCoreDescriptor(String coreName) {
synchronized (modifyLock) {
CoreDescriptor coreDescriptor = residentDescriptors.get(coreName);
if (coreDescriptor != null) {
return coreDescriptor;
}
return getTransientCacheHandler().getTransientDescriptor(coreName);
}
}
/**
* Get the CoreDescriptors for every {@link SolrCore} managed here (permanent and transient, loaded and unloaded).
*
* @return An unordered list copy. This list can be modified by the caller (e.g. sorted).
*/
public List<CoreDescriptor> getCoreDescriptors() {
synchronized (modifyLock) {
Collection<CoreDescriptor> transientCoreDescriptors = getTransientCacheHandler().getTransientDescriptors();
List<CoreDescriptor> coreDescriptors = new ArrayList<>(residentDescriptors.size() + transientCoreDescriptors.size());
coreDescriptors.addAll(residentDescriptors.values());
coreDescriptors.addAll(transientCoreDescriptors);
return coreDescriptors;
}
}
// cores marked as loading will block on getCore
public void markCoreAsLoading(CoreDescriptor cd) {
synchronized (modifyLock) {
currentlyLoadingCores.add(cd.getName());
}
}
//cores marked as loading will block on getCore
public void markCoreAsNotLoading(CoreDescriptor cd) {
synchronized (modifyLock) {
currentlyLoadingCores.remove(cd.getName());
}
}
// returns when no cores are marked as loading
public void waitForLoadingCoresToFinish(long timeoutMs) {
long time = System.nanoTime();
long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
synchronized (modifyLock) {
while (!currentlyLoadingCores.isEmpty()) {
try {
modifyLock.wait(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (System.nanoTime() >= timeout) {
log.warn("Timed out waiting for SolrCores to finish loading.");
break;
}
}
}
}
// returns when core is finished loading, throws exception if no such core loading or loaded
public void waitForLoadingCoreToFinish(String core, long timeoutMs) {
long time = System.nanoTime();
long timeout = time + TimeUnit.NANOSECONDS.convert(timeoutMs, TimeUnit.MILLISECONDS);
synchronized (modifyLock) {
while (isCoreLoading(core)) {
try {
modifyLock.wait(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (System.nanoTime() >= timeout) {
log.warn("Timed out waiting for SolrCore, {}, to finish loading.", core);
break;
}
}
}
}
public boolean isCoreLoading(String name) {
return currentlyLoadingCores.contains(name);
}
// Let transient cache implementation tell us when it ages out a core
public void queueCoreToClose(SolrCore coreToClose) {
synchronized (modifyLock) {
pendingCloses.add(coreToClose); // Essentially just queue this core up for closing.
modifyLock.notifyAll(); // Wakes up closer thread too
}
}
/**
* @return the cache holding the transient cores; never null.
*/
public TransientSolrCoreCache getTransientCacheHandler() {
synchronized (modifyLock) {
if (transientSolrCoreCacheFactory == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, getClass().getName() + " not loaded; call load() before using it");
}
return transientSolrCoreCacheFactory.getTransientSolrCoreCache();
}
}
}