blob: 6fb3b8720b0605badcd8cc9ae983b44040cce08a [file] [log] [blame]
package org.apache.solr.cloud;
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.store.Directory;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.impl.HttpSolrServer.HttpUriRequestResponse;
import org.apache.solr.client.solrj.request.AbstractUpdateRequest;
import org.apache.solr.client.solrj.request.CoreAdminRequest.WaitForState;
import org.apache.solr.client.solrj.request.UpdateRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.ClosableThread;
import org.apache.solr.common.cloud.Slice;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.cloud.ZkStateReader;
import org.apache.solr.common.cloud.ZooKeeperException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.UpdateParams;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.CoreDescriptor;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.RequestHandlers.LazyRequestHandlerWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.request.SolrRequestHandler;
import org.apache.solr.request.SolrRequestInfo;
import org.apache.solr.response.SolrQueryResponse;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.PeerSync;
import org.apache.solr.update.UpdateLog;
import org.apache.solr.update.UpdateLog.RecoveryInfo;
import org.apache.solr.update.processor.DistributedUpdateProcessor;
import org.apache.solr.util.RefCounted;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class RecoveryStrategy extends Thread implements ClosableThread {
private static final int MAX_RETRIES = 500;
private static final int INTERRUPTED = MAX_RETRIES + 1;
private static final int STARTING_RECOVERY_DELAY = 1000;
private static final String REPLICATION_HANDLER = "/replication";
private static Logger log = LoggerFactory.getLogger(RecoveryStrategy.class);
public static interface RecoveryListener {
public void recovered();
public void failed();
}
private volatile boolean close = false;
private RecoveryListener recoveryListener;
private ZkController zkController;
private String baseUrl;
private String coreZkNodeName;
private ZkStateReader zkStateReader;
private volatile String coreName;
private int retries;
private boolean recoveringAfterStartup;
private CoreContainer cc;
private volatile HttpUriRequest prevSendPreRecoveryHttpUriRequest;
public RecoveryStrategy(CoreContainer cc, CoreDescriptor cd, RecoveryListener recoveryListener) {
this.cc = cc;
this.coreName = cd.getName();
this.recoveryListener = recoveryListener;
setName("RecoveryThread");
zkController = cc.getZkController();
zkStateReader = zkController.getZkStateReader();
baseUrl = zkController.getBaseUrl();
coreZkNodeName = cd.getCloudDescriptor().getCoreNodeName();
}
public void setRecoveringAfterStartup(boolean recoveringAfterStartup) {
this.recoveringAfterStartup = recoveringAfterStartup;
}
// make sure any threads stop retrying
@Override
public void close() {
close = true;
try {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
// okay
}
log.warn("Stopping recovery for core={} coreNodeName={}", coreName, coreZkNodeName);
}
private void recoveryFailed(final SolrCore core,
final ZkController zkController, final String baseUrl,
final String shardZkNodeName, final CoreDescriptor cd) throws KeeperException, InterruptedException {
SolrException.log(log, "Recovery failed - I give up. core=" + coreName);
try {
zkController.publish(cd, ZkStateReader.RECOVERY_FAILED);
} finally {
close();
recoveryListener.failed();
}
}
private void replicate(String nodeName, SolrCore core, ZkNodeProps leaderprops)
throws SolrServerException, IOException {
ZkCoreNodeProps leaderCNodeProps = new ZkCoreNodeProps(leaderprops);
String leaderUrl = leaderCNodeProps.getCoreUrl();
log.info("Attempting to replicate from " + leaderUrl + ". core=" + coreName);
// send commit
commitOnLeader(leaderUrl);
// use rep handler directly, so we can do this sync rather than async
SolrRequestHandler handler = core.getRequestHandler(REPLICATION_HANDLER);
if (handler instanceof LazyRequestHandlerWrapper) {
handler = ((LazyRequestHandlerWrapper) handler).getWrappedHandler();
}
ReplicationHandler replicationHandler = (ReplicationHandler) handler;
if (replicationHandler == null) {
throw new SolrException(ErrorCode.SERVICE_UNAVAILABLE,
"Skipping recovery, no " + REPLICATION_HANDLER + " handler found");
}
ModifiableSolrParams solrParams = new ModifiableSolrParams();
solrParams.set(ReplicationHandler.MASTER_URL, leaderUrl);
if (isClosed()) retries = INTERRUPTED;
boolean success = replicationHandler.doFetch(solrParams, false);
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Replication for recovery failed.");
}
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
Directory dir = core.getDirectoryFactory().get(core.getIndexDir(), DirContext.META_DATA, null);
try {
log.debug(core.getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName()
+ " replicated "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits
+ " from "
+ leaderUrl
+ " gen:"
+ core.getDeletionPolicy().getLatestCommit().getGeneration()
+ " data:" + core.getDataDir()
+ " index:" + core.getIndexDir()
+ " newIndex:" + core.getNewIndexDir()
+ " files:" + Arrays.asList(dir.listAll()));
} finally {
core.getDirectoryFactory().release(dir);
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
}
}
private void commitOnLeader(String leaderUrl) throws SolrServerException,
IOException {
HttpSolrServer server = new HttpSolrServer(leaderUrl);
try {
server.setConnectionTimeout(30000);
UpdateRequest ureq = new UpdateRequest();
ureq.setParams(new ModifiableSolrParams());
ureq.getParams().set(DistributedUpdateProcessor.COMMIT_END_POINT, true);
ureq.getParams().set(UpdateParams.OPEN_SEARCHER, false);
ureq.setAction(AbstractUpdateRequest.ACTION.COMMIT, false, true).process(
server);
} finally {
server.shutdown();
}
}
@Override
public void run() {
// set request info for logging
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
SolrException.log(log, "SolrCore not found - cannot recover:" + coreName);
return;
}
SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
SolrQueryResponse rsp = new SolrQueryResponse();
SolrRequestInfo.setRequestInfo(new SolrRequestInfo(req, rsp));
log.info("Starting recovery process. core=" + coreName + " recoveringAfterStartup=" + recoveringAfterStartup);
try {
doRecovery(core);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
SolrException.log(log, "", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR, "",
e);
} catch (Exception e) {
log.error("", e);
throw new ZooKeeperException(SolrException.ErrorCode.SERVER_ERROR,
"", e);
}
} finally {
SolrRequestInfo.clearRequestInfo();
}
}
// TODO: perhaps make this grab a new core each time through the loop to handle core reloads?
public void doRecovery(SolrCore core) throws KeeperException, InterruptedException {
boolean replayed = false;
boolean successfulRecovery = false;
UpdateLog ulog;
ulog = core.getUpdateHandler().getUpdateLog();
if (ulog == null) {
SolrException.log(log, "No UpdateLog found - cannot recover. core=" + coreName);
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
return;
}
boolean firstTime = true;
List<Long> recentVersions;
UpdateLog.RecentUpdates recentUpdates = null;
try {
recentUpdates = ulog.getRecentUpdates();
recentVersions = recentUpdates.getVersions(ulog.numRecordsToKeep);
} catch (Exception e) {
SolrException.log(log, "Corrupt tlog - ignoring. core=" + coreName, e);
recentVersions = new ArrayList<>(0);
} finally {
if (recentUpdates != null) {
recentUpdates.close();
}
}
List<Long> startingVersions = ulog.getStartingVersions();
if (startingVersions != null && recoveringAfterStartup) {
try {
int oldIdx = 0; // index of the start of the old list in the current
// list
long firstStartingVersion = startingVersions.size() > 0 ? startingVersions
.get(0) : 0;
for (; oldIdx < recentVersions.size(); oldIdx++) {
if (recentVersions.get(oldIdx) == firstStartingVersion) break;
}
if (oldIdx > 0) {
log.info("####### Found new versions added after startup: num="
+ oldIdx);
log.info("###### currentVersions=" + recentVersions);
}
log.info("###### startupVersions=" + startingVersions);
} catch (Exception e) {
SolrException.log(log, "Error getting recent versions. core=" + coreName, e);
recentVersions = new ArrayList<>(0);
}
}
if (recoveringAfterStartup) {
// if we're recovering after startup (i.e. we have been down), then we need to know what the last versions were
// when we went down. We may have received updates since then.
recentVersions = startingVersions;
try {
if ((ulog.getStartingOperation() & UpdateLog.FLAG_GAP) != 0) {
// last operation at the time of startup had the GAP flag set...
// this means we were previously doing a full index replication
// that probably didn't complete and buffering updates in the
// meantime.
log.info("Looks like a previous replication recovery did not complete - skipping peer sync. core="
+ coreName);
firstTime = false; // skip peersync
}
} catch (Exception e) {
SolrException.log(log, "Error trying to get ulog starting operation. core="
+ coreName, e);
firstTime = false; // skip peersync
}
}
while (!successfulRecovery && !isInterrupted() && !isClosed()) { // don't use interruption or it will close channels though
try {
CloudDescriptor cloudDesc = core.getCoreDescriptor()
.getCloudDescriptor();
ZkNodeProps leaderprops = zkStateReader.getLeaderRetry(
cloudDesc.getCollectionName(), cloudDesc.getShardId());
final String leaderBaseUrl = leaderprops.getStr(ZkStateReader.BASE_URL_PROP);
final String leaderCoreName = leaderprops.getStr(ZkStateReader.CORE_NAME_PROP);
String leaderUrl = ZkCoreNodeProps.getCoreUrl(leaderBaseUrl, leaderCoreName);
String ourUrl = ZkCoreNodeProps.getCoreUrl(baseUrl, coreName);
boolean isLeader = leaderUrl.equals(ourUrl);
if (isLeader && !cloudDesc.isLeader()) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Cloud state still says we are leader.");
}
if (cloudDesc.isLeader()) {
// we are now the leader - no one else must have been suitable
log.warn("We have not yet recovered - but we are now the leader! core=" + coreName);
log.info("Finished recovery process. core=" + coreName);
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
return;
}
log.info("Publishing state of core "+core.getName()+" as recovering, leader is "+leaderUrl+" and I am "+ourUrl);
zkController.publish(core.getCoreDescriptor(), ZkStateReader.RECOVERING);
final Slice slice = zkStateReader.getClusterState().getSlice(cloudDesc.getCollectionName(), cloudDesc.getShardId());
try {
prevSendPreRecoveryHttpUriRequest.abort();
} catch (NullPointerException e) {
// okay
}
if (isClosed()) {
log.info("Recovery was cancelled");
break;
}
sendPrepRecoveryCmd(leaderBaseUrl, leaderCoreName, slice);
if (isClosed()) {
log.info("Recovery was cancelled");
break;
}
// we wait a bit so that any updates on the leader
// that started before they saw recovering state
// are sure to have finished
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// first thing we just try to sync
if (firstTime) {
firstTime = false; // only try sync the first time through the loop
log.info("Attempting to PeerSync from " + leaderUrl + " core=" + coreName + " - recoveringAfterStartup="+recoveringAfterStartup);
// System.out.println("Attempting to PeerSync from " + leaderUrl
// + " i am:" + zkController.getNodeName());
PeerSync peerSync = new PeerSync(core,
Collections.singletonList(leaderUrl), ulog.numRecordsToKeep, false, false);
peerSync.setStartingVersions(recentVersions);
boolean syncSuccess = peerSync.sync();
if (syncSuccess) {
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
// force open a new searcher
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
log.info("PeerSync Recovery was successful - registering as Active. core=" + coreName);
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
log.debug(core.getCoreDescriptor()
.getCoreContainer().getZkController().getNodeName()
+ " synched "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
} finally {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
}
// sync success - register as active and return
zkController.publish(core.getCoreDescriptor(),
ZkStateReader.ACTIVE);
successfulRecovery = true;
close = true;
return;
}
log.info("PeerSync Recovery was not successful - trying replication. core=" + coreName);
}
if (isClosed()) {
log.info("Recovery was cancelled");
break;
}
log.info("Starting Replication Recovery. core=" + coreName);
log.info("Begin buffering updates. core=" + coreName);
ulog.bufferUpdates();
replayed = false;
try {
replicate(zkController.getNodeName(), core, leaderprops);
if (isClosed()) {
log.info("Recovery was cancelled");
break;
}
replay(core);
replayed = true;
if (isClosed()) {
log.info("Recovery was cancelled");
break;
}
log.info("Replication Recovery was successful - registering as Active. core=" + coreName);
// if there are pending recovery requests, don't advert as active
zkController.publish(core.getCoreDescriptor(), ZkStateReader.ACTIVE);
close = true;
successfulRecovery = true;
recoveryListener.recovered();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted", e);
retries = INTERRUPTED;
} catch (Exception e) {
SolrException.log(log, "Error while trying to recover", e);
} finally {
if (!replayed) {
try {
ulog.dropBufferedUpdates();
} catch (Exception e) {
SolrException.log(log, "", e);
}
}
}
} catch (Exception e) {
SolrException.log(log, "Error while trying to recover. core=" + coreName, e);
}
if (!successfulRecovery) {
// lets pause for a moment and we need to try again...
// TODO: we don't want to retry for some problems?
// Or do a fall off retry...
try {
log.error("Recovery failed - trying again... (" + retries + ") core=" + coreName);
if (isClosed()) {
retries = INTERRUPTED;
}
retries++;
if (retries >= MAX_RETRIES) {
if (retries >= INTERRUPTED) {
SolrException.log(log, "Recovery failed - interrupted. core="
+ coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} catch (Exception e) {
SolrException.log(log,
"Could not publish that recovery failed", e);
}
} else {
SolrException.log(log,
"Recovery failed - max retries exceeded (" + retries + "). core=" + coreName);
try {
recoveryFailed(core, zkController, baseUrl, coreZkNodeName,
core.getCoreDescriptor());
} catch (Exception e) {
SolrException.log(log,
"Could not publish that recovery failed", e);
}
}
break;
}
} catch (Exception e) {
SolrException.log(log, "core=" + coreName, e);
}
try {
// start at 1 sec and work up to a couple min
double loopCount = Math.min(Math.pow(2, retries), 600);
log.info("Wait {} seconds before trying to recover again ({})", loopCount, retries);
for (int i = 0; i < loopCount; i++) {
if (isClosed()) break; // check if someone closed us
Thread.sleep(STARTING_RECOVERY_DELAY);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Recovery was interrupted. core=" + coreName, e);
retries = INTERRUPTED;
}
}
}
log.info("Finished recovery process. core=" + coreName);
}
private Future<RecoveryInfo> replay(SolrCore core)
throws InterruptedException, ExecutionException {
Future<RecoveryInfo> future = core.getUpdateHandler().getUpdateLog().applyBufferedUpdates();
if (future == null) {
// no replay needed\
log.info("No replay needed. core=" + coreName);
} else {
log.info("Replaying buffered documents. core=" + coreName);
// wait for replay
RecoveryInfo report = future.get();
if (report.failed) {
SolrException.log(log, "Replay failed");
throw new SolrException(ErrorCode.SERVER_ERROR, "Replay failed");
}
}
// solrcloud_debug
if (log.isDebugEnabled()) {
try {
RefCounted<SolrIndexSearcher> searchHolder = core
.getNewestSearcher(false);
SolrIndexSearcher searcher = searchHolder.get();
try {
log.debug(core.getCoreDescriptor().getCoreContainer()
.getZkController().getNodeName()
+ " replayed "
+ searcher.search(new MatchAllDocsQuery(), 1).totalHits);
} finally {
searchHolder.decref();
}
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, null, e);
}
}
return future;
}
@Override
public boolean isClosed() {
return close;
}
private void sendPrepRecoveryCmd(String leaderBaseUrl, String leaderCoreName, Slice slice)
throws SolrServerException, IOException, InterruptedException, ExecutionException {
HttpSolrServer server = new HttpSolrServer(leaderBaseUrl);
try {
server.setConnectionTimeout(30000);
WaitForState prepCmd = new WaitForState();
prepCmd.setCoreName(leaderCoreName);
prepCmd.setNodeName(zkController.getNodeName());
prepCmd.setCoreNodeName(coreZkNodeName);
prepCmd.setState(ZkStateReader.RECOVERING);
prepCmd.setCheckLive(true);
prepCmd.setOnlyIfLeader(true);
if (!Slice.CONSTRUCTION.equals(slice.getState()) && !Slice.RECOVERY.equals(slice.getState())) {
prepCmd.setOnlyIfLeaderActive(true);
}
HttpUriRequestResponse mrr = server.httpUriRequest(prepCmd);
prevSendPreRecoveryHttpUriRequest = mrr.httpUriRequest;
log.info("Sending prep recovery command to {}; {}", leaderBaseUrl, prepCmd.toString());
mrr.future.get();
} finally {
server.shutdown();
}
}
}