blob: 4f3e28b38ac83bbb309a1f3243f939509f93ffe1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import java.io.Closeable;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.stream.Collectors;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.cloud.ZkShardTerms;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.apache.solr.core.CloseHook;
import org.apache.solr.core.PluginBag;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.update.SolrCoreState;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.VersionInfo;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.plugin.SolrCoreAware;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.handler.admin.CoreAdminHandler.COMPLETED;
import static org.apache.solr.handler.admin.CoreAdminHandler.FAILED;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_MESSAGE;
import static org.apache.solr.handler.admin.CoreAdminHandler.RESPONSE_STATUS;
import static org.apache.solr.handler.admin.CoreAdminHandler.RUNNING;
/**
* <p>
* This request handler implements the CDCR API and is responsible of the execution of the
* {@link CdcrReplicator} threads.
* </p>
* <p>
* It relies on three classes, {@link org.apache.solr.handler.CdcrLeaderStateManager},
* {@link org.apache.solr.handler.CdcrBufferStateManager} and {@link org.apache.solr.handler.CdcrProcessStateManager}
* to synchronise the state of the CDCR across all the nodes.
* </p>
* <p>
* The CDCR process can be either {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED} or {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED} by using the
* actions {@link org.apache.solr.handler.CdcrParams.CdcrAction#STOP} and {@link org.apache.solr.handler.CdcrParams.CdcrAction#START} respectively. If a node is leader and the process
* state is {@link org.apache.solr.handler.CdcrParams.ProcessState#STARTED}, the {@link CdcrReplicatorManager} will
* start the {@link CdcrReplicator} threads. If a node becomes non-leader or if the process state becomes
* {@link org.apache.solr.handler.CdcrParams.ProcessState#STOPPED}, the {@link CdcrReplicator} threads are stopped.
* </p>
* <p>
* The CDCR can be switched to a "buffering" mode, in which the update log will never delete old transaction log
* files. Such a mode can be enabled or disabled using the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#ENABLEBUFFER} and
* {@link org.apache.solr.handler.CdcrParams.CdcrAction#DISABLEBUFFER} respectively.
* </p>
* <p>
* Known limitations: The source and target clusters must have the same topology. Replication between clusters
* with a different number of shards will likely results in an inconsistent index.
* </p>
* @deprecated since 8.6
*/
@Deprecated
public class CdcrRequestHandler extends RequestHandlerBase implements SolrCoreAware {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private SolrCore core;
private String collection;
private String shard;
private String path;
private SolrParams updateLogSynchronizerConfiguration;
private SolrParams replicatorConfiguration;
private SolrParams bufferConfiguration;
private Map<String, List<SolrParams>> replicasConfiguration;
private CdcrProcessStateManager processStateManager;
private CdcrBufferStateManager bufferStateManager;
private CdcrReplicatorManager replicatorManager;
private CdcrLeaderStateManager leaderStateManager;
private CdcrUpdateLogSynchronizer updateLogSynchronizer;
private CdcrBufferManager bufferManager;
@Override
public void init(@SuppressWarnings({"rawtypes"})NamedList args) {
super.init(args);
log.warn("CDCR (in its current form) is deprecated as of 8.6 and shall be removed in 9.0. See SOLR-14022 for details.");
if (args != null) {
// Configuration of the Update Log Synchronizer
Object updateLogSynchonizerParam = args.get(CdcrParams.UPDATE_LOG_SYNCHRONIZER_PARAM);
if (updateLogSynchonizerParam != null && updateLogSynchonizerParam instanceof NamedList) {
updateLogSynchronizerConfiguration = ((NamedList) updateLogSynchonizerParam).toSolrParams();
}
// Configuration of the Replicator
Object replicatorParam = args.get(CdcrParams.REPLICATOR_PARAM);
if (replicatorParam != null && replicatorParam instanceof NamedList) {
replicatorConfiguration = ((NamedList) replicatorParam).toSolrParams();
}
// Configuration of the Buffer
Object bufferParam = args.get(CdcrParams.BUFFER_PARAM);
if (bufferParam != null && bufferParam instanceof NamedList) {
bufferConfiguration = ((NamedList) bufferParam).toSolrParams();
}
// Configuration of the Replicas
replicasConfiguration = new HashMap<>();
@SuppressWarnings({"rawtypes"})
List replicas = args.getAll(CdcrParams.REPLICA_PARAM);
for (Object replica : replicas) {
if (replica != null && replica instanceof NamedList) {
SolrParams params = ((NamedList) replica).toSolrParams();
if (!replicasConfiguration.containsKey(params.get(CdcrParams.SOURCE_COLLECTION_PARAM))) {
replicasConfiguration.put(params.get(CdcrParams.SOURCE_COLLECTION_PARAM), new ArrayList<>());
}
replicasConfiguration.get(params.get(CdcrParams.SOURCE_COLLECTION_PARAM)).add(params);
}
}
}
}
@Override
public void handleRequestBody(SolrQueryRequest req, SolrQueryResponse rsp) throws Exception {
// Pick the action
SolrParams params = req.getParams();
CdcrParams.CdcrAction action = null;
String a = params.get(CommonParams.ACTION);
if (a != null) {
action = CdcrParams.CdcrAction.get(a);
}
if (action == null) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Unknown action: " + a);
}
switch (action) {
case START: {
this.handleStartAction(req, rsp);
break;
}
case STOP: {
this.handleStopAction(req, rsp);
break;
}
case STATUS: {
this.handleStatusAction(req, rsp);
break;
}
case COLLECTIONCHECKPOINT: {
this.handleCollectionCheckpointAction(req, rsp);
break;
}
case SHARDCHECKPOINT: {
this.handleShardCheckpointAction(req, rsp);
break;
}
case ENABLEBUFFER: {
this.handleEnableBufferAction(req, rsp);
break;
}
case DISABLEBUFFER: {
this.handleDisableBufferAction(req, rsp);
break;
}
case LASTPROCESSEDVERSION: {
this.handleLastProcessedVersionAction(req, rsp);
break;
}
case QUEUES: {
this.handleQueuesAction(req, rsp);
break;
}
case OPS: {
this.handleOpsAction(req, rsp);
break;
}
case ERRORS: {
this.handleErrorsAction(req, rsp);
break;
}
case BOOTSTRAP: {
this.handleBootstrapAction(req, rsp);
break;
}
case BOOTSTRAP_STATUS: {
this.handleBootstrapStatus(req, rsp);
break;
}
case CANCEL_BOOTSTRAP: {
this.handleCancelBootstrap(req, rsp);
break;
}
default: {
throw new RuntimeException("Unknown action: " + action);
}
}
rsp.setHttpCaching(false);
}
@Override
public void inform(SolrCore core) {
this.core = core;
collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
// Make sure that the core is ZKAware
if (!core.getCoreContainer().isZooKeeperAware()) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Solr instance is not running in SolrCloud mode.");
}
// Make sure that the core is using the CdcrUpdateLog implementation
if (!(core.getUpdateHandler().getUpdateLog() instanceof CdcrUpdateLog)) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Solr instance is not configured with the cdcr update log.");
}
// Find the registered path of the handler
path = null;
for (Map.Entry<String, PluginBag.PluginHolder<SolrRequestHandler>> entry : core.getRequestHandlers().getRegistry().entrySet()) {
if (core.getRequestHandlers().isLoaded(entry.getKey()) && entry.getValue().get() == this) {
path = entry.getKey();
break;
}
}
if (path == null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"The CdcrRequestHandler is not registered with the current core.");
}
if (!path.startsWith("/")) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"The CdcrRequestHandler needs to be registered to a path. Typically this is '/cdcr'");
}
// Initialisation phase
// If the Solr cloud is being initialised, each CDCR node will start up in its default state, i.e., STOPPED
// and non-leader. The leader state will be updated later, when all the Solr cores have been loaded.
// If the Solr cloud has already been initialised, and the core is reloaded (i.e., because a node died or a new node
// is added to the cluster), the CDCR node will synchronise its state with the global CDCR state that is stored
// in zookeeper.
// Initialise the buffer state manager
bufferStateManager = new CdcrBufferStateManager(core, bufferConfiguration);
// Initialise the process state manager
processStateManager = new CdcrProcessStateManager(core);
// Initialise the leader state manager
leaderStateManager = new CdcrLeaderStateManager(core);
// Initialise the replicator states manager
replicatorManager = new CdcrReplicatorManager(core, path, replicatorConfiguration, replicasConfiguration);
replicatorManager.setProcessStateManager(processStateManager);
replicatorManager.setLeaderStateManager(leaderStateManager);
// we need to inform it of a state event since the process and leader state
// may have been synchronised during the initialisation
replicatorManager.stateUpdate();
// Initialise the update log synchronizer
updateLogSynchronizer = new CdcrUpdateLogSynchronizer(core, path, updateLogSynchronizerConfiguration);
updateLogSynchronizer.setLeaderStateManager(leaderStateManager);
// we need to inform it of a state event since the leader state
// may have been synchronised during the initialisation
updateLogSynchronizer.stateUpdate();
// Initialise the buffer manager
bufferManager = new CdcrBufferManager(core);
bufferManager.setLeaderStateManager(leaderStateManager);
bufferManager.setBufferStateManager(bufferStateManager);
// we need to inform it of a state event since the leader state
// may have been synchronised during the initialisation
bufferManager.stateUpdate();
// register the close hook
this.registerCloseHook(core);
}
/**
* register a close hook to properly shutdown the state manager and scheduler
*/
private void registerCloseHook(SolrCore core) {
core.addCloseHook(new CloseHook() {
@Override
public void preClose(SolrCore core) {
log.info("Solr core is being closed - shutting down CDCR handler @ {}:{}", collection, shard);
updateLogSynchronizer.shutdown();
replicatorManager.shutdown();
bufferStateManager.shutdown();
processStateManager.shutdown();
leaderStateManager.shutdown();
}
@Override
public void postClose(SolrCore core) {
}
});
}
/**
* <p>
* Update and synchronize the process state.
* </p>
* <p>
* The process state manager must notify the replicator states manager of the change of state.
* </p>
*/
private void handleStartAction(SolrQueryRequest req, SolrQueryResponse rsp) {
if (processStateManager.getState() == CdcrParams.ProcessState.STOPPED) {
processStateManager.setState(CdcrParams.ProcessState.STARTED);
processStateManager.synchronize();
}
rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
}
private void handleStopAction(SolrQueryRequest req, SolrQueryResponse rsp) {
if (processStateManager.getState() == CdcrParams.ProcessState.STARTED) {
processStateManager.setState(CdcrParams.ProcessState.STOPPED);
processStateManager.synchronize();
}
rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
}
private void handleStatusAction(SolrQueryRequest req, SolrQueryResponse rsp) {
rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
}
@SuppressWarnings({"unchecked", "rawtypes"})
private NamedList getStatus() {
NamedList status = new NamedList();
status.add(CdcrParams.ProcessState.getParam(), processStateManager.getState().toLower());
status.add(CdcrParams.BufferState.getParam(), bufferStateManager.getState().toLower());
return status;
}
/**
* This action is generally executed on the target cluster in order to retrieve the latest update checkpoint.
* This checkpoint is used on the source cluster to setup the
* {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader} of a shard leader. <br/>
* This method will execute in parallel one
* {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} request per shard leader. It will
* then pick the lowest version number as checkpoint. Picking the lowest amongst all shards will ensure that we do not
* pick a checkpoint that is ahead of the source cluster. This can occur when other shard leaders are sending new
* updates to the target cluster while we are currently instantiating the
* {@link org.apache.solr.update.CdcrUpdateLog.CdcrLogReader}.
* This solution only works in scenarios where the topology of the source and target clusters are identical.
*/
private void handleCollectionCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp)
throws IOException, SolrServerException {
ZkController zkController = core.getCoreContainer().getZkController();
try {
zkController.getZkStateReader().forceUpdateCollection(collection);
} catch (Exception e) {
log.warn("Error when updating cluster state", e);
}
ClusterState cstate = zkController.getClusterState();
DocCollection docCollection = cstate.getCollectionOrNull(collection);
Collection<Slice> shards = docCollection == null? null : docCollection.getActiveSlices();
ExecutorService parallelExecutor = ExecutorUtil.newMDCAwareCachedThreadPool(new SolrNamedThreadFactory("parallelCdcrExecutor"));
long checkpoint = Long.MAX_VALUE;
try {
List<Callable<Long>> callables = new ArrayList<>();
for (Slice shard : shards) {
ZkNodeProps leaderProps = zkController.getZkStateReader().getLeaderRetry(collection, shard.getName());
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
callables.add(new SliceCheckpointCallable(nodeProps.getCoreUrl(), path));
}
for (final Future<Long> future : parallelExecutor.invokeAll(callables)) {
long version = future.get();
if (version < checkpoint) { // we must take the lowest checkpoint from all the shards
checkpoint = version;
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error while requesting shard's checkpoints", e);
} catch (ExecutionException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error while requesting shard's checkpoints", e);
} finally {
parallelExecutor.shutdown();
}
rsp.add(CdcrParams.CHECKPOINT, checkpoint);
}
/**
* Retrieve the version number of the latest entry of the {@link org.apache.solr.update.UpdateLog}.
*/
private void handleShardCheckpointAction(SolrQueryRequest req, SolrQueryResponse rsp) {
if (!leaderStateManager.amILeader()) {
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
"' sent to non-leader replica");
}
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
VersionInfo versionInfo = ulog.getVersionInfo();
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
long maxVersionFromRecent = recentUpdates.getMaxRecentVersion();
long maxVersionFromIndex = versionInfo.getMaxVersionFromIndex(req.getSearcher());
log.info("Found maxVersionFromRecent {} maxVersionFromIndex {}", maxVersionFromRecent, maxVersionFromIndex);
// there is no race with ongoing bootstrap because we don't expect any updates to come from the source
long maxVersion = Math.max(maxVersionFromIndex, maxVersionFromRecent);
if (maxVersion == 0L) {
maxVersion = -1;
}
rsp.add(CdcrParams.CHECKPOINT, maxVersion);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Action '" + CdcrParams.CdcrAction.SHARDCHECKPOINT +
"' could not read max version");
}
}
private void handleEnableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
if (bufferStateManager.getState() == CdcrParams.BufferState.DISABLED) {
bufferStateManager.setState(CdcrParams.BufferState.ENABLED);
bufferStateManager.synchronize();
}
rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
}
private void handleDisableBufferAction(SolrQueryRequest req, SolrQueryResponse rsp) {
if (bufferStateManager.getState() == CdcrParams.BufferState.ENABLED) {
bufferStateManager.setState(CdcrParams.BufferState.DISABLED);
bufferStateManager.synchronize();
}
rsp.add(CdcrParams.CdcrAction.STATUS.toLower(), this.getStatus());
}
/**
* <p>
* We have to take care of four cases:
* <ul>
* <li>Replication & Buffering</li>
* <li>Replication & No Buffering</li>
* <li>No Replication & Buffering</li>
* <li>No Replication & No Buffering</li>
* </ul>
* In the first three cases, at least one log reader should have been initialised. We should take the lowest
* last processed version across all the initialised readers. In the last case, there isn't a log reader
* initialised. We should instantiate one and get the version of the first entries.
* </p>
*/
private void handleLastProcessedVersionAction(SolrQueryRequest req, SolrQueryResponse rsp) {
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
if (!leaderStateManager.amILeader()) {
log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.LASTPROCESSEDVERSION, collectionName, shard);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.LASTPROCESSEDVERSION +
" sent to non-leader replica");
}
// take care of the first three cases
// first check the log readers from the replicator states
long lastProcessedVersion = Long.MAX_VALUE;
for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
long version = Long.MAX_VALUE;
if (state.getLogReader() != null) {
version = state.getLogReader().getLastVersion();
}
lastProcessedVersion = Math.min(lastProcessedVersion, version);
}
// next check the log reader of the buffer
CdcrUpdateLog.CdcrLogReader bufferLogReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).getBufferToggle();
if (bufferLogReader != null) {
lastProcessedVersion = Math.min(lastProcessedVersion, bufferLogReader.getLastVersion());
}
// the fourth case: no cdc replication, no buffering: all readers were null
if (processStateManager.getState().equals(CdcrParams.ProcessState.STOPPED) &&
bufferStateManager.getState().equals(CdcrParams.BufferState.DISABLED)) {
CdcrUpdateLog.CdcrLogReader logReader = ((CdcrUpdateLog) core.getUpdateHandler().getUpdateLog()).newLogReader();
try {
// let the reader initialize lastVersion
logReader.next();
lastProcessedVersion = Math.min(lastProcessedVersion, logReader.getLastVersion());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error while fetching the last processed version", e);
} catch (IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Error while fetching the last processed version", e);
} finally {
logReader.close();
}
}
log.debug("Returning the lowest last processed version {} @ {}:{}", lastProcessedVersion, collectionName, shard);
rsp.add(CdcrParams.LAST_PROCESSED_VERSION, lastProcessedVersion);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void handleQueuesAction(SolrQueryRequest req, SolrQueryResponse rsp) {
NamedList hosts = new NamedList();
for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
NamedList queueStats = new NamedList();
CdcrUpdateLog.CdcrLogReader logReader = state.getLogReader();
if (logReader == null) {
String collectionName = req.getCore().getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = req.getCore().getCoreDescriptor().getCloudDescriptor().getShardId();
log.warn("The log reader for target collection {} is not initialised @ {}:{}",
state.getTargetCollection(), collectionName, shard);
queueStats.add(CdcrParams.QUEUE_SIZE, -1l);
} else {
queueStats.add(CdcrParams.QUEUE_SIZE, logReader.getNumberOfRemainingRecords());
}
queueStats.add(CdcrParams.LAST_TIMESTAMP, state.getTimestampOfLastProcessedOperation());
if (hosts.get(state.getZkHost()) == null) {
hosts.add(state.getZkHost(), new NamedList());
}
((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), queueStats);
}
rsp.add(CdcrParams.QUEUES, hosts);
UpdateLog updateLog = core.getUpdateHandler().getUpdateLog();
rsp.add(CdcrParams.TLOG_TOTAL_SIZE, updateLog.getTotalLogsSize());
rsp.add(CdcrParams.TLOG_TOTAL_COUNT, updateLog.getTotalLogsNumber());
rsp.add(CdcrParams.UPDATE_LOG_SYNCHRONIZER,
updateLogSynchronizer.isStarted() ? CdcrParams.ProcessState.STARTED.toLower() : CdcrParams.ProcessState.STOPPED.toLower());
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void handleOpsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
NamedList hosts = new NamedList();
for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
NamedList ops = new NamedList();
ops.add(CdcrParams.COUNTER_ALL, state.getBenchmarkTimer().getOperationsPerSecond());
ops.add(CdcrParams.COUNTER_ADDS, state.getBenchmarkTimer().getAddsPerSecond());
ops.add(CdcrParams.COUNTER_DELETES, state.getBenchmarkTimer().getDeletesPerSecond());
if (hosts.get(state.getZkHost()) == null) {
hosts.add(state.getZkHost(), new NamedList());
}
((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), ops);
}
rsp.add(CdcrParams.OPERATIONS_PER_SECOND, hosts);
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void handleErrorsAction(SolrQueryRequest req, SolrQueryResponse rsp) {
NamedList hosts = new NamedList();
for (CdcrReplicatorState state : replicatorManager.getReplicatorStates()) {
NamedList errors = new NamedList();
errors.add(CdcrParams.CONSECUTIVE_ERRORS, state.getConsecutiveErrors());
errors.add(CdcrReplicatorState.ErrorType.BAD_REQUEST.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.BAD_REQUEST));
errors.add(CdcrReplicatorState.ErrorType.INTERNAL.toLower(), state.getErrorCount(CdcrReplicatorState.ErrorType.INTERNAL));
NamedList lastErrors = new NamedList();
for (String[] lastError : state.getLastErrors()) {
lastErrors.add(lastError[0], lastError[1]);
}
errors.add(CdcrParams.LAST, lastErrors);
if (hosts.get(state.getZkHost()) == null) {
hosts.add(state.getZkHost(), new NamedList());
}
((NamedList) hosts.get(state.getZkHost())).add(state.getTargetCollection(), errors);
}
rsp.add(CdcrParams.ERRORS, hosts);
}
private void handleBootstrapAction(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, InterruptedException, SolrServerException {
String collectionName = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
String shard = core.getCoreDescriptor().getCloudDescriptor().getShardId();
if (!leaderStateManager.amILeader()) {
log.warn("Action {} sent to non-leader replica @ {}:{}", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
throw new SolrException(SolrException.ErrorCode.BAD_REQUEST, "Action " + CdcrParams.CdcrAction.BOOTSTRAP +
" sent to non-leader replica");
}
CountDownLatch latch = new CountDownLatch(1); // latch to make sure BOOTSTRAP_STATUS gives correct response
Runnable runnable = () -> {
Lock recoveryLock = req.getCore().getSolrCoreState().getRecoveryLock();
boolean locked = recoveryLock.tryLock();
SolrCoreState coreState = core.getSolrCoreState();
try {
if (!locked) {
handleCancelBootstrap(req, rsp);
} else if (leaderStateManager.amILeader()) {
coreState.setCdcrBootstrapRunning(true);
latch.countDown(); // free the latch as current bootstrap is executing
//running.set(true);
String leaderUrl = ReplicationHandler.getObjectWithBackwardCompatibility(req.getParams(), ReplicationHandler.LEADER_URL, ReplicationHandler.LEGACY_LEADER_URL, null);
BootstrapCallable bootstrapCallable = new BootstrapCallable(leaderUrl, core);
coreState.setCdcrBootstrapCallable(bootstrapCallable);
Future<Boolean> bootstrapFuture = core.getCoreContainer().getUpdateShardHandler().getRecoveryExecutor()
.submit(bootstrapCallable);
coreState.setCdcrBootstrapFuture(bootstrapFuture);
try {
bootstrapFuture.get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Bootstrap was interrupted", e);
} catch (ExecutionException e) {
log.error("Bootstrap operation failed", e);
}
} else {
log.error("Action {} sent to non-leader replica @ {}:{}. Aborting bootstrap.", CdcrParams.CdcrAction.BOOTSTRAP, collectionName, shard);
}
} finally {
if (locked) {
coreState.setCdcrBootstrapRunning(false);
recoveryLock.unlock();
} else {
latch.countDown(); // free the latch as current bootstrap is executing
}
}
};
try {
core.getCoreContainer().getUpdateShardHandler().getUpdateExecutor().submit(runnable);
rsp.add(RESPONSE_STATUS, "submitted");
latch.await(10000, TimeUnit.MILLISECONDS); // put the latch for current bootstrap command
} catch (RejectedExecutionException ree) {
// no problem, we're probably shutting down
rsp.add(RESPONSE_STATUS, "failed");
}
}
private void handleCancelBootstrap(SolrQueryRequest req, SolrQueryResponse rsp) {
BootstrapCallable callable = (BootstrapCallable)core.getSolrCoreState().getCdcrBootstrapCallable();
IOUtils.closeQuietly(callable);
rsp.add(RESPONSE_STATUS, "cancelled");
}
private void handleBootstrapStatus(SolrQueryRequest req, SolrQueryResponse rsp) throws IOException, SolrServerException {
SolrCoreState coreState = core.getSolrCoreState();
if (coreState.getCdcrBootstrapRunning()) {
rsp.add(RESPONSE_STATUS, RUNNING);
return;
}
Future<Boolean> future = coreState.getCdcrBootstrapFuture();
BootstrapCallable callable = (BootstrapCallable)coreState.getCdcrBootstrapCallable();
if (future == null) {
rsp.add(RESPONSE_STATUS, "notfound");
rsp.add(RESPONSE_MESSAGE, "No bootstrap found in running, completed or failed states");
} else if (future.isCancelled() || callable.isClosed()) {
rsp.add(RESPONSE_STATUS, "cancelled");
} else if (future.isDone()) {
// could be a normal termination or an exception
try {
Boolean result = future.get();
if (result) {
rsp.add(RESPONSE_STATUS, COMPLETED);
} else {
rsp.add(RESPONSE_STATUS, FAILED);
}
} catch (InterruptedException e) {
// should not happen?
} catch (ExecutionException e) {
rsp.add(RESPONSE_STATUS, FAILED);
rsp.add(RESPONSE, e);
} catch (CancellationException ce) {
rsp.add(RESPONSE_STATUS, FAILED);
rsp.add(RESPONSE_MESSAGE, "Bootstrap was cancelled");
}
} else {
rsp.add(RESPONSE_STATUS, RUNNING);
}
}
static class BootstrapCallable implements Callable<Boolean>, Closeable {
private final String leaderUrl;
private final SolrCore core;
private volatile boolean closed = false;
BootstrapCallable(String leaderUrl, SolrCore core) {
this.leaderUrl = leaderUrl;
this.core = core;
}
@Override
public void close() throws IOException {
closed = true;
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
replicationHandler.abortFetch();
}
public boolean isClosed() {
return closed;
}
@Override
public Boolean call() throws Exception {
boolean success = false;
UpdateLog ulog = core.getUpdateHandler().getUpdateLog();
// we start buffering updates as a safeguard however we do not expect
// to receive any updates from the source during bootstrap
ulog.bufferUpdates();
try {
commitOnLeader(leaderUrl);
// use rep handler directly, so we can do this sync rather than async
SolrRequestHandler handler = core.getRequestHandler(ReplicationHandler.PATH);
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
if (replicationHandler == null) {
throw new SolrException(SolrException.ErrorCode.SERVICE_UNAVAILABLE,
"Skipping recovery, no " + ReplicationHandler.PATH + " handler found");
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.LEGACY_LEADER_URL, leaderUrl);
// we do not want the raw tlog files from the source
solrParams.set(ReplicationHandler.TLOG_FILES, false);
success = replicationHandler.doFetch(solrParams, false).getSuccessful();
Future<UpdateLog.RecoveryInfo> future = ulog.applyBufferedUpdates();
if (future == null) {
// no replay needed
log.info("No replay needed.");
} else {
log.info("Replaying buffered documents.");
// wait for replay
UpdateLog.RecoveryInfo report = future.get();
if (report.failed) {
SolrException.log(log, "Replay failed");
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Replay failed");
}
}
if (success) {
ZkController zkController = core.getCoreContainer().getZkController();
String collectionName = core.getCoreDescriptor().getCollectionName();
ClusterState clusterState = zkController.getZkStateReader().getClusterState();
DocCollection collection = clusterState.getCollection(collectionName);
Slice slice = collection.getSlice(core.getCoreDescriptor().getCloudDescriptor().getShardId());
ZkShardTerms terms = zkController.getShardTerms(collectionName, slice.getName());
String coreNodeName = core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName();
Set<String> allExceptLeader = slice.getReplicas().stream().filter(replica -> !replica.getName().equals(coreNodeName)).map(Replica::getName).collect(Collectors.toSet());
terms.ensureTermsIsHigher(coreNodeName, allExceptLeader);
}
return success;
} finally {
if (closed || !success) {
// we cannot apply the buffer in this case because it will introduce newer versions in the
// update log and then the source cluster will get those versions via collectioncheckpoint
// causing the versions in between to be completely missed
boolean dropped = ulog.dropBufferedUpdates();
assert dropped;
}
}
}
private void commitOnLeader(String leaderUrl) throws SolrServerException,
IOException {
try (HttpSolrClient client = new HttpSolrClient.Builder(leaderUrl)
.withConnectionTimeout(30000)
.build()) {
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
client);
}
}
}
@Override
public String getDescription() {
return "Manage Cross Data Center Replication";
}
@Override
public Category getCategory() {
return Category.REPLICATION;
}
/**
* A thread subclass for executing a single
* {@link org.apache.solr.handler.CdcrParams.CdcrAction#SHARDCHECKPOINT} action.
*/
private static final class SliceCheckpointCallable implements Callable<Long> {
final String baseUrl;
final String cdcrPath;
SliceCheckpointCallable(final String baseUrl, final String cdcrPath) {
this.baseUrl = baseUrl;
this.cdcrPath = cdcrPath;
}
@Override
public Long call() throws Exception {
try (HttpSolrClient server = new HttpSolrClient.Builder(baseUrl)
.withConnectionTimeout(15000)
.withSocketTimeout(60000)
.build()) {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.SHARDCHECKPOINT.toString());
@SuppressWarnings({"rawtypes"})
SolrRequest request = new QueryRequest(params);
request.setPath(cdcrPath);
@SuppressWarnings({"rawtypes"})
NamedList response = server.request(request);
return (Long) response.get(CdcrParams.CHECKPOINT);
}
}
}
}