blob: 06f671602147cba58deab23bbd2e809360ac773b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.cloud;
import java.lang.invoke.MethodHandles;
import org.apache.lucene.index.IndexCommit;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.SolrConfig;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.IndexFetcher;
import org.apache.solr.handler.ReplicationHandler;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.update.SolrIndexWriter;
import org.apache.solr.update.UpdateLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ReplicateFromLeader {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final CoreContainer cc;
private final String coreName;
private volatile ReplicationHandler replicationProcess;
private volatile long lastVersion = 0;
public ReplicateFromLeader(CoreContainer cc, String coreName) {
this.cc = cc;
this.coreName = coreName;
}
/**
* Start a replication handler thread that will periodically pull indices from the shard leader
*
* <p>This is separate from the ReplicationHandler that listens at /replication, used for recovery
* and leader actions. It is simpler to discard the entire polling ReplicationHandler rather then
* worrying about disabling polling and correctly setting all of the leader bits if we need to reset.
*
* <p>TODO: It may be cleaner to extract the polling logic use that directly instead of creating
* what might be a fairly heavyweight instance here.
*
* @param switchTransactionLog if true, ReplicationHandler will rotate the transaction log once
* the replication is done
*/
public void startReplication(boolean switchTransactionLog) {
try (SolrCore core = cc.getCore(coreName)) {
if (core == null) {
if (cc.isShutDown()) {
return;
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "SolrCore not found:" + coreName + " in "
+ CloudUtil.getLoadedCoreNamesAsString(cc));
}
}
SolrConfig.UpdateHandlerInfo uinfo = core.getSolrConfig().getUpdateHandlerInfo();
String pollIntervalStr = "00:00:03";
if (System.getProperty("jetty.testMode") != null) {
pollIntervalStr = "00:00:01";
}
if (uinfo.autoCommmitMaxTime != -1) {
pollIntervalStr = toPollIntervalStr(uinfo.autoCommmitMaxTime/2);
} else if (uinfo.autoSoftCommmitMaxTime != -1) {
pollIntervalStr = toPollIntervalStr(uinfo.autoSoftCommmitMaxTime/2);
}
log.info("Will start replication from leader with poll interval: {}", pollIntervalStr );
NamedList<Object> followerConfig = new NamedList<>();
followerConfig.add("fetchFromLeader", Boolean.TRUE);
// don't commit on leader version zero for PULL replicas as PULL should only get its index state from leader
boolean skipCommitOnLeaderVersionZero = switchTransactionLog;
if (!skipCommitOnLeaderVersionZero) {
CloudDescriptor cloudDescriptor = core.getCoreDescriptor().getCloudDescriptor();
if (cloudDescriptor != null) {
Replica replica =
cc.getZkController().getZkStateReader().getCollection(cloudDescriptor.getCollectionName())
.getSlice(cloudDescriptor.getShardId()).getReplica(cloudDescriptor.getCoreNodeName());
if (replica != null && replica.getType() == Replica.Type.PULL) {
skipCommitOnLeaderVersionZero = true; // only set this to true if we're a PULL replica, otherwise use value of switchTransactionLog
}
}
}
followerConfig.add(ReplicationHandler.SKIP_COMMIT_ON_LEADER_VERSION_ZERO, skipCommitOnLeaderVersionZero);
followerConfig.add("pollInterval", pollIntervalStr);
NamedList<Object> replicationConfig = new NamedList<>();
replicationConfig.add("follower", followerConfig);
String lastCommitVersion = getCommitVersion(core);
if (lastCommitVersion != null) {
lastVersion = Long.parseLong(lastCommitVersion);
}
replicationProcess = new ReplicationHandler();
if (switchTransactionLog) {
replicationProcess.setPollListener((solrCore, fetchResult) -> {
if (fetchResult == IndexFetcher.IndexFetchResult.INDEX_FETCH_SUCCESS) {
String commitVersion = getCommitVersion(core);
if (commitVersion == null) return;
if (Long.parseLong(commitVersion) == lastVersion) return;
UpdateLog updateLog = solrCore.getUpdateHandler().getUpdateLog();
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
CommitUpdateCommand cuc = new CommitUpdateCommand(req, false);
cuc.setVersion(Long.parseLong(commitVersion));
updateLog.commitAndSwitchToNewTlog(cuc);
lastVersion = Long.parseLong(commitVersion);
}
});
}
replicationProcess.init(replicationConfig);
replicationProcess.inform(core);
}
}
public static String getCommitVersion(SolrCore solrCore) {
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
try {
String commitVersion = commit.getUserData().get(SolrIndexWriter.COMMIT_COMMAND_VERSION);
if (commitVersion == null) return null;
else return commitVersion;
} catch (Exception e) {
log.warn("Cannot get commit command version from index commit point ",e);
return null;
}
}
private static String toPollIntervalStr(int ms) {
int sec = ms/1000;
int hour = sec / 3600;
sec = sec % 3600;
int min = sec / 60;
sec = sec % 60;
return hour + ":" + min + ":" + sec;
}
public void stopReplication() {
if (replicationProcess != null) {
replicationProcess.shutdown();
}
}
}