blob: bd7c8404886c50ef966e013d7c77911b6b365bb2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud.api.collections;
import static org.apache.solr.cloud.api.collections.CollectionHandlingUtils.addExceptionToNamedList;
import static org.apache.solr.cloud.api.collections.CollectionHandlingUtils.logFailedOperation;
import static org.apache.solr.common.cloud.ZkStateReader.COLLECTION_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.REPLICA_PROP;
import static org.apache.solr.common.cloud.ZkStateReader.SHARD_ID_PROP;
import static org.apache.solr.common.params.CommonAdminParams.ASYNC;
import static org.apache.solr.common.params.CommonParams.NAME;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.solr.client.solrj.response.RequestStatusState;
import org.apache.solr.cloud.ConfigSetApiLockFactory;
import org.apache.solr.cloud.ConfigSetCmds;
import org.apache.solr.cloud.DistributedApiAsyncTracker;
import org.apache.solr.cloud.DistributedMultiLock;
import org.apache.solr.cloud.OverseerSolrResponse;
import org.apache.solr.cloud.ZkDistributedCollectionLockFactory;
import org.apache.solr.cloud.ZkDistributedConfigSetLockFactory;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CollectionParams;
import org.apache.solr.common.params.ConfigSetParams;
import org.apache.solr.common.params.CoreAdminParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.Pair;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.logging.MDCLoggingContext;
import org.apache.solr.response.SolrQueryResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class for execution Collection API and Config Set API commands in a distributed way, without
* going through Overseer and {@link OverseerCollectionMessageHandler} or {@link
* org.apache.solr.cloud.OverseerConfigSetMessageHandler}.
*
* <p>This class is only called when Collection and Config Set API calls are configured to be
* distributed, which implies cluster state updates are distributed as well.
*/
public class DistributedCollectionConfigSetCommandRunner {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final String ZK_PATH_SEPARATOR = "/";
private static final String ZK_DISTRIBUTED_API_ROOT = "/distributedapi";
/** Zookeeper node below which the locking hierarchy is anchored */
private static final String ZK_COLLECTION_LOCKS = ZK_DISTRIBUTED_API_ROOT + "/collectionlocks";
private static final String ZK_CONFIG_SET_LOCKS = ZK_DISTRIBUTED_API_ROOT + "/configsetlocks";
/** Zookeeper node below which the async id (requestId) tracking is done (for async requests) */
private static final String ZK_ASYNC_ROOT = ZK_DISTRIBUTED_API_ROOT + "/async";
private final ExecutorService distributedCollectionApiExecutorService;
/**
* All Collection API commands are executed as if they are asynchronous to stick to the same
* behavior as the Overseer based Collection API execution. The difference between sync and async
* is twofold: 1. Non async execution does wait for a while to see if the command has completed
* and if so, returns success (and if not, returns failure) 2. There is no way to query the status
* of a non async command that is still running, or to kill it (short of shutting down the node on
* which it's running). This is not the best design, but at this stage that's what happens with
* Overseer based Collection API, so implementing the same.
*
* <p>The common aspects between sync and async is that actual command execution is given an
* infinite time to eventually complete (or fail). Again this is not great, but that's how it is
* for now. Likely some cleanup required later (once Overseer is actually remove? Like Solr 10 or
* 11?).
*/
private final ExecutorService commandsExecutor;
private final CoreContainer coreContainer;
private final CollApiCmds.CommandMap commandMapper;
private final CollectionCommandContext ccc;
private final DistributedApiAsyncTracker asyncTaskTracker;
private volatile boolean shuttingDown = false;
public DistributedCollectionConfigSetCommandRunner(CoreContainer coreContainer) {
this.coreContainer = coreContainer;
if (log.isInfoEnabled()) {
// Note is it hard to print a log when Collection API is handled by Overseer because Overseer
// is started regardless of how Collection API is handled, so it doesn't really know...
log.info(
"Creating DistributedCollectionConfigSetCommandRunner. Collection and ConfigSet APIs are running distributed (not Overseer based)");
}
// TODO we should look at how everything is getting closed when the node is shutdown. But it
// seems that CollectionsHandler (that creates instances of this class) is not really closed, so
// maybe it doesn't matter?
// With distributed Collection API execution, each node will have such an executor but given how
// thread pools work, threads will only be created if needed (including the corePoolSize
// threads).
distributedCollectionApiExecutorService =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
5,
10,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new SolrNamedThreadFactory("DistributedCollectionApiExecutorService"));
commandsExecutor =
new ExecutorUtil.MDCAwareThreadPoolExecutor(
5,
20,
0L,
TimeUnit.MILLISECONDS,
new SynchronousQueue<>(),
new SolrNamedThreadFactory("DistributedCollectionApiCommandExecutor"));
ccc =
new DistributedCollectionCommandContext(
this.coreContainer, this.distributedCollectionApiExecutorService);
commandMapper = new CollApiCmds.CommandMap(ccc);
asyncTaskTracker =
new DistributedApiAsyncTracker(ccc.getZkStateReader().getZkClient(), ZK_ASYNC_ROOT);
}
/** See {@link DistributedApiAsyncTracker#getAsyncTaskRequestStatus(String)} */
public Pair<RequestStatusState, OverseerSolrResponse> getAsyncTaskRequestStatus(String asyncId)
throws Exception {
return asyncTaskTracker.getAsyncTaskRequestStatus(asyncId);
}
/** See {@link DistributedApiAsyncTracker#deleteSingleAsyncId(String)} */
public boolean deleteSingleAsyncId(String asyncId) throws Exception {
return asyncTaskTracker.deleteSingleAsyncId(asyncId);
}
/** See {@link DistributedApiAsyncTracker#deleteAllAsyncIds()} */
public void deleteAllAsyncIds() throws Exception {
asyncTaskTracker.deleteAllAsyncIds();
}
/**
* When {@link org.apache.solr.handler.admin.CollectionsHandler#invokeAction} does not enqueue to
* overseer queue and instead calls this method, this method is expected to do the equivalent of
* what Overseer does in {@link
* org.apache.solr.cloud.OverseerConfigSetMessageHandler#processMessage}.
*
* <p>The steps leading to that call in the Overseer execution path are (and the equivalent is
* done here):
*
* <ul>
* <li>{@link org.apache.solr.cloud.OverseerTaskProcessor#run()} gets the message from the ZK
* queue, grabs the corresponding locks (write lock on the config set target of the API
* command and a read lock on the base config set if any - the case for config set creation)
* then executes the command using an executor service (it also checks the asyncId if any is
* specified but async calls are not supported for Config Set API calls).
* <li>In {@link org.apache.solr.cloud.OverseerTaskProcessor}.{@code Runner.run()} (run on an
* executor thread) a call is made to {@link
* org.apache.solr.cloud.OverseerConfigSetMessageHandler#processMessage} which does a few
* checks and calls the appropriate Config Set method.
* </ul>
*/
public void runConfigSetCommand(
SolrQueryResponse rsp,
ConfigSetParams.ConfigSetAction action,
Map<String, Object> result,
long timeoutMs)
throws Exception {
// We refuse new tasks, but will wait for already submitted ones (i.e. those that made it
// through this method earlier). See stopAndWaitForPendingTasksToComplete() below
if (shuttingDown) {
throw new SolrException(
SolrException.ErrorCode.CONFLICT,
"Solr is shutting down, no more Config Set API tasks may be executed");
}
// never null
String configSetName = (String) result.get(NAME);
// baseConfigSetName will be null if we're not creating a new config set
String baseConfigSetName =
ConfigSetCmds.getBaseConfigSetName(
action, (String) result.get(ConfigSetCmds.BASE_CONFIGSET));
if (log.isInfoEnabled()) {
log.info("Running Config Set API locally for " + action + " " + configSetName); // nowarn
}
ConfigSetCommandRunner commandRunner =
new ConfigSetCommandRunner(
new ZkNodeProps(result), action, configSetName, baseConfigSetName);
final Future<Void> taskFuture;
try {
taskFuture = commandsExecutor.submit(commandRunner);
} catch (RejectedExecutionException ree) {
throw new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Too many executing commands", ree);
}
// Wait for a while... Just like Overseer based Config Set API (wait can timeout but actual
// command execution does not)
try {
taskFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR,
action + " " + configSetName + " timed out after " + timeoutMs + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, action + " " + configSetName + " interrupted", e);
}
}
/**
* When {@link org.apache.solr.handler.admin.CollectionsHandler#invokeAction} does not enqueue to
* overseer queue and instead calls this method, this method is expected to do the equivalent of
* what Overseer does in {@link
* org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler#processMessage}.
*
* <p>The steps leading to that call in the Overseer execution path are (and the equivalent is
* done here):
*
* <ul>
* <li>{@link org.apache.solr.cloud.OverseerTaskProcessor#run()} gets the message from the ZK
* queue, grabs the corresponding lock (Collection API calls do locking to prevent non
* compatible concurrent modifications of a collection), marks the async id of the task as
* running then executes the command using an executor service
* <li>In {@link org.apache.solr.cloud.OverseerTaskProcessor}.{@code Runner.run()} (run on an
* executor thread) a call is made to {@link
* org.apache.solr.cloud.api.collections.OverseerCollectionMessageHandler#processMessage}
* which sets the logging context, calls {@link CollApiCmds.CollectionApiCommand#call}
* </ul>
*/
public OverseerSolrResponse runCollectionCommand(
ZkNodeProps message, CollectionParams.CollectionAction action, long timeoutMs) {
// We refuse new tasks, but will wait for already submitted ones (i.e. those that made it
// through this method earlier). See stopAndWaitForPendingTasksToComplete() below
if (shuttingDown) {
throw new SolrException(
SolrException.ErrorCode.CONFLICT,
"Solr is shutting down, no more Collection API tasks may be executed");
}
final String asyncId = message.getStr(ASYNC);
if (log.isInfoEnabled()) {
log.info(
"Running Collection API locally for " + action.name() + " asyncId=" + asyncId); // nowarn
}
// Following the call below returning true, we must eventually cancel or complete the task.
// Happens either in the CollectionCommandRunner below or in the catch when the runner would not
// execute.
if (!asyncTaskTracker.createNewAsyncJobTracker(asyncId)) {
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST,
"Task with the same requestid already exists. (" + asyncId + ")");
}
CollectionCommandRunner commandRunner = new CollectionCommandRunner(message, action, asyncId);
final Future<OverseerSolrResponse> taskFuture;
try {
taskFuture = commandsExecutor.submit(commandRunner);
} catch (RejectedExecutionException ree) {
// The command will not run, need to cancel the async ID so it can be reused on a subsequent
// attempt by the client
asyncTaskTracker.cancelAsyncId(asyncId);
throw new SolrException(
SolrException.ErrorCode.SERVICE_UNAVAILABLE, "Too many executing commands", ree);
}
if (asyncId == null) {
// Non async calls wait for a while in case the command completes. If they time out, there's
// no way to track the job progress (improvement suggestion: decorrelate having a task ID from
// the fact of waiting for the job to complete)
try {
return taskFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException te) {
throw new SolrException(
SolrException.ErrorCode.SERVER_ERROR, action + " timed out after " + timeoutMs + "ms");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, action + " interrupted", e);
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, action + " failed", e);
}
} else {
// Async calls do not wait for the command to finish but get instead back the async id (that
// they just sent...)
NamedList<Object> resp = new NamedList<>();
resp.add(CoreAdminParams.REQUESTID, asyncId);
return new OverseerSolrResponse(resp);
}
}
/**
* Best effort wait for termination of all tasks for a (short) while, then return. This method is
* called when the JVM shuts down so tasks that did not complete are expected to be stopped mid
* processing.
*/
public void stopAndWaitForPendingTasksToComplete() {
shuttingDown = true;
commandsExecutor.shutdown();
distributedCollectionApiExecutorService.shutdown();
// Duration we are willing to wait for threads to terminate, total.
final long TOTAL_WAIT_NS = 10L * 1000 * 1000 * 1000;
long start = System.nanoTime();
try {
commandsExecutor.awaitTermination(TOTAL_WAIT_NS, TimeUnit.NANOSECONDS);
long remaining = TOTAL_WAIT_NS - (System.nanoTime() - start);
if (remaining > 0L) {
distributedCollectionApiExecutorService.awaitTermination(remaining, TimeUnit.NANOSECONDS);
}
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
/**
* Collection name can be found in either of two message parameters (why??). Return it from where
* it's defined. (see also parameter {@code collectionNameParamName} of {@link
* org.apache.solr.cloud.DistributedClusterStateUpdater.MutatingCommand#MutatingCommand(CollectionParams.CollectionAction,
* String)})
*/
public static String getCollectionName(ZkNodeProps message) {
return message.containsKey(COLLECTION_PROP)
? message.getStr(COLLECTION_PROP)
: message.getStr(NAME);
}
/**
* All Collection API commands are executed in separate threads so that the command can run to
* completion even if the client request times out, or for async tasks, the client request returns
* and the command is executed (which really is very similar). This provides a behavior very
* similar to the one provided by Overseer based Collection API execution.
*/
private class CollectionCommandRunner implements Callable<OverseerSolrResponse> {
private final ZkNodeProps message;
private final CollectionParams.CollectionAction action;
private final String asyncId;
private CollectionCommandRunner(
ZkNodeProps message, CollectionParams.CollectionAction action, String asyncId) {
this.message = message;
this.action = action;
this.asyncId = asyncId;
}
/**
* This method does always free the Collection API lock it grabs and updates the tracking of the
* {@code asyncId} (initiated before this method got called) on all execution paths out of this
* method!
*/
@Override
public OverseerSolrResponse call() {
final String collName = getCollectionName(message);
final String shardId = message.getStr(SHARD_ID_PROP);
final String replicaName = message.getStr(REPLICA_PROP);
MDCLoggingContext.setCollection(collName);
MDCLoggingContext.setShard(shardId);
MDCLoggingContext.setReplica(replicaName);
NamedList<Object> results = new NamedList<>();
try {
// Create API lock for executing the command. This call is non blocking (not blocked on
// waiting for a lock to be acquired anyway, might be blocked on access to ZK etc) We create
// a new CollectionApiLockFactory using a new ZkDistributedCollectionLockFactory because
// earlier (in the constructor of this class) the ZkStateReader was not yet available, due
// to how CoreContainer is built in part in the constructor and in part in its load()
// method. And this class is built from the CoreContainer constructor... The cost of these
// creations is low, and these classes do not hold state but only serve as an interface to
// Zookeeper. Note that after this call, we MUST execute the lock.release(); in the finally
// below
DistributedMultiLock lock =
new CollectionApiLockFactory(
new ZkDistributedCollectionLockFactory(
ccc.getZkStateReader().getZkClient(), ZK_COLLECTION_LOCKS))
.createCollectionApiLock(action.lockLevel, collName, shardId, replicaName);
try {
log.debug(
"CollectionCommandRunner about to acquire lock for action {} lock level {}. {}/{}/{}",
action,
action.lockLevel,
collName,
shardId,
replicaName);
// Block this thread until all required locks are acquired.
lock.waitUntilAcquired();
// Got the lock so moving from submitted to running if we run for an async task (if
// asyncId is null the asyncTaskTracker calls do nothing).
asyncTaskTracker.setTaskRunning(asyncId);
log.debug(
"DistributedCollectionConfigSetCommandRunner.runCollectionCommand. Lock acquired. Calling: {}, {}",
action,
message);
CollApiCmds.CollectionApiCommand command = commandMapper.getActionCommand(action);
if (command != null) {
command.call(ccc.getSolrCloudManager().getClusterState(), message, results);
} else {
asyncTaskTracker.cancelAsyncId(asyncId);
// Seeing this is a bug, not bad user data
String message = "Bug: Unknown operation " + action;
log.error(message);
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, message);
}
} finally {
try {
// TODO If the Collection API command failed because the collection does not exist,
// we've just created some lock directory structure for a non existent collection...
// Maybe try to remove it here? No big deal for now as leftover nodes in the lock
// hierarchy do no harm, and there shouldn't be too many of those.
lock.release();
} catch (SolrException se) {
log.error(
"Error when releasing collection locks for operation " + action, se); // nowarn
}
}
} catch (Exception e) {
if (e instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
logFailedOperation(action, e, collName);
addExceptionToNamedList(action, e, results);
}
OverseerSolrResponse res = new OverseerSolrResponse(results);
// Following call marks success or failure depending on the contents of res
asyncTaskTracker.setTaskCompleted(asyncId, res);
return res;
}
}
/**
* All Config Set API commands are executed in separate threads so that the command can run to
* completion even if the client request times out.<br>
* This provides a behavior very similar to the one provided by Overseer based Config Set API
* execution.
*
* <p>The logic of this class is similar to (and simpler than) the one of {@link
* CollectionCommandRunner}, more detailed comments there.
*/
private class ConfigSetCommandRunner implements Callable<Void> {
private final ZkNodeProps message;
private final ConfigSetParams.ConfigSetAction action;
private final String configSetName;
private final String baseConfigSetName;
private ConfigSetCommandRunner(
ZkNodeProps message,
ConfigSetParams.ConfigSetAction action,
String configSetName,
String baseConfigSetName) {
this.message = message;
this.action = action;
this.configSetName = configSetName;
this.baseConfigSetName = baseConfigSetName;
}
/** This method does always free the Collection API lock it grabs. */
@Override
public Void call() throws IOException {
// After this call, we MUST execute the lock.release(); in the finally below
DistributedMultiLock lock =
new ConfigSetApiLockFactory(
new ZkDistributedConfigSetLockFactory(
ccc.getZkStateReader().getZkClient(), ZK_CONFIG_SET_LOCKS))
.createConfigSetApiLock(configSetName, baseConfigSetName);
try {
log.debug(
"ConfigSetCommandRunner about to acquire lock for action {} config set {} base config set {}",
action,
configSetName,
baseConfigSetName);
// Block this thread until all required locks are acquired.
lock.waitUntilAcquired();
log.debug("ConfigSetCommandRunner. Lock acquired. Calling: {}, {}", action, message);
switch (action) {
case CREATE:
ConfigSetCmds.createConfigSet(message, coreContainer);
break;
case DELETE:
ConfigSetCmds.deleteConfigSet(message, coreContainer);
break;
default:
throw new SolrException(
SolrException.ErrorCode.BAD_REQUEST, "Bug! Unknown Config Set action: " + action);
}
} finally {
lock.release();
}
return null;
}
}
}