blob: e62075ecea9fda9d1e6eac5b4f26e7a239c5a6de [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.update;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.List;
import java.util.Set;
import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import org.apache.http.client.HttpClient;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.response.QueryResponse;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.core.SolrInfoBean;
import org.apache.solr.metrics.SolrMetricManager;
import org.apache.solr.metrics.SolrMetricProducer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.DISTRIB;
import static org.apache.solr.update.PeerSync.MissedUpdatesRequest;
import static org.apache.solr.update.PeerSync.absComparator;
import static org.apache.solr.update.PeerSync.percentile;
public class PeerSyncWithLeader implements SolrMetricProducer {
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private boolean debug = log.isDebugEnabled();
private String leaderUrl;
private int nUpdates;
private UpdateHandler uhandler;
private UpdateLog ulog;
private HttpSolrClient clientToLeader;
private boolean doFingerprint;
private SolrCore core;
private PeerSync.Updater updater;
private MissedUpdatesFinder missedUpdatesFinder;
private Set<Long> bufferedUpdates;
// metrics
private Timer syncTime;
private Counter syncErrors;
private Counter syncSkipped;
public PeerSyncWithLeader(SolrCore core, String leaderUrl, int nUpdates) {
this.core = core;
this.leaderUrl = leaderUrl;
this.nUpdates = nUpdates;
this.doFingerprint = !"true".equals(System.getProperty("solr.disableFingerprint"));
this.uhandler = core.getUpdateHandler();
this.ulog = uhandler.getUpdateLog();
HttpClient httpClient = core.getCoreContainer().getUpdateShardHandler().getDefaultHttpClient();
this.clientToLeader = new HttpSolrClient.Builder(leaderUrl).withHttpClient(httpClient).build();
this.updater = new PeerSync.Updater(msg(), core);
core.getCoreMetricManager().registerMetricProducer(SolrInfoBean.Category.REPLICATION.toString(), this);
}
public static final String METRIC_SCOPE = "peerSync";
@Override
public void initializeMetrics(SolrMetricManager manager, String registry, String tag, String scope) {
syncTime = manager.timer(null, registry, "time", scope, METRIC_SCOPE);
syncErrors = manager.counter(null, registry, "errors", scope, METRIC_SCOPE);
syncSkipped = manager.counter(null, registry, "skipped", scope, METRIC_SCOPE);
}
// start of peersync related debug messages. includes the core name for correlation.
private String msg() {
ZkController zkController = uhandler.core.getCoreContainer().getZkController();
String myURL = "";
if (zkController != null) {
myURL = zkController.getBaseUrl();
}
return "PeerSync: core="+uhandler.core.getName()+ " url="+myURL +" ";
}
/**
* Sync with leader
* @param startingVersions : recent versions on startup
* @return result of PeerSync with leader
*/
public PeerSync.PeerSyncResult sync(List<Long> startingVersions){
if (ulog == null) {
syncErrors.inc();
return PeerSync.PeerSyncResult.failure();
}
if (startingVersions.isEmpty()) {
log.warn("no frame of reference to tell if we've missed updates");
syncErrors.inc();
return PeerSync.PeerSyncResult.failure();
}
Timer.Context timerContext = null;
try {
if (log.isInfoEnabled()) {
log.info("{} START leader={} nUpdates={}", msg(), leaderUrl, nUpdates);
}
if (debug) {
log.debug("{} startingVersions={} {}", msg(), startingVersions.size(), startingVersions);
}
// check if we already in sync to begin with
if(doFingerprint && alreadyInSync()) {
syncSkipped.inc();
return PeerSync.PeerSyncResult.success();
}
// measure only when actual sync is performed
timerContext = syncTime.time();
List<Long> ourUpdates;
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
ourUpdates = recentUpdates.getVersions(nUpdates);
bufferedUpdates = recentUpdates.getBufferUpdates();
}
ourUpdates.sort(absComparator);
startingVersions.sort(absComparator);
long ourLowThreshold = percentile(startingVersions, 0.8f);
long ourHighThreshold = percentile(startingVersions, 0.2f);
// now make sure that the starting updates overlap our updates
// there shouldn't be reorders, so any overlap will do.
long smallestNewUpdate = Math.abs(ourUpdates.get(ourUpdates.size() - 1));
if (Math.abs(startingVersions.get(0)) < smallestNewUpdate) {
log.warn("{} too many updates received since start - startingUpdates no longer overlaps with our currentUpdates", msg());
syncErrors.inc();
return PeerSync.PeerSyncResult.failure();
}
// let's merge the lists
for (Long ver : startingVersions) {
if (Math.abs(ver) < smallestNewUpdate) {
ourUpdates.add(ver);
}
}
boolean success = doSync(ourUpdates, ourLowThreshold, ourHighThreshold);
if (log.isInfoEnabled()) {
log.info("{} DONE. sync {}", msg(), (success ? "succeeded" : "failed"));
}
if (!success) {
syncErrors.inc();
}
return success ? PeerSync.PeerSyncResult.success() : PeerSync.PeerSyncResult.failure();
} finally {
if (timerContext != null) {
timerContext.close();
}
try {
clientToLeader.close();
} catch (IOException e) {
log.warn("{} unable to close client to leader", msg(), e);
}
}
}
private boolean doSync(List<Long> ourUpdates, long ourLowThreshold, long ourHighThreshold) {
// get leader's recent versions and fingerprint
// note: by getting leader's versions later, we guarantee that leader's versions always super set of {@link bufferedUpdates}
NamedList<Object> leaderVersionsAndFingerprint = getVersions();
IndexFingerprint leaderFingerprint = getFingerprint(leaderVersionsAndFingerprint);
if (doFingerprint) {
if (leaderFingerprint == null) {
log.warn("Could not get fingerprint from the leader");
return false;
}
log.info("Leader fingerprint {}", leaderFingerprint);
}
missedUpdatesFinder = new MissedUpdatesFinder(ourUpdates, msg(), nUpdates, ourLowThreshold);
MissedUpdatesRequest missedUpdates = buildMissedUpdatesRequest(leaderVersionsAndFingerprint);
if (missedUpdates == MissedUpdatesRequest.ALREADY_IN_SYNC) return true;
if (missedUpdates != MissedUpdatesRequest.UNABLE_TO_SYNC) {
NamedList<Object> missedUpdatesRsp = requestUpdates(missedUpdates);
if (handleUpdates(missedUpdatesRsp, missedUpdates.totalRequestedUpdates, leaderFingerprint)) {
if (doFingerprint) {
return compareFingerprint(leaderFingerprint);
}
return true;
}
}
return false;
}
private MissedUpdatesRequest buildMissedUpdatesRequest(NamedList<Object> rsp) {
// we retrieved the last N updates from the replica
@SuppressWarnings({"unchecked"})
List<Long> otherVersions = (List<Long>)rsp.get("versions");
if (log.isInfoEnabled()) {
log.info("{} Received {} versions from {}", msg(), otherVersions.size(), leaderUrl);
}
if (otherVersions.isEmpty()) {
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
MissedUpdatesRequest updatesRequest = missedUpdatesFinder.find(otherVersions, leaderUrl);
if (updatesRequest == MissedUpdatesRequest.EMPTY) {
if (doFingerprint) return MissedUpdatesRequest.UNABLE_TO_SYNC;
return MissedUpdatesRequest.ALREADY_IN_SYNC;
}
return updatesRequest;
}
private NamedList<Object> requestUpdates(MissedUpdatesRequest missedUpdatesRequest) {
if (log.isInfoEnabled()) {
log.info("{} Requesting updates from {} n={} versions={}", msg(), leaderUrl
, missedUpdatesRequest.totalRequestedUpdates, missedUpdatesRequest.versionsAndRanges);
}
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/get");
params.set(DISTRIB, false);
params.set("getUpdates", missedUpdatesRequest.versionsAndRanges);
params.set("onlyIfActive", false);
params.set("skipDbq", true);
return request(params, "Failed on getting missed updates from the leader");
}
private boolean handleUpdates(NamedList<Object> rsp, long numRequestedUpdates, IndexFingerprint leaderFingerprint) {
// missed updates from leader, it does not contains updates from bufferedUpdates
@SuppressWarnings({"unchecked"})
List<Object> updates = (List<Object>)rsp.get("updates");
if (updates.size() < numRequestedUpdates) {
log.error("{} Requested {} updated from {} but retrieved {}", msg(), numRequestedUpdates, leaderUrl, updates.size());
return false;
}
// by apply buffering update, replica will have fingerprint equals to leader.
try (UpdateLog.RecentUpdates recentUpdates = ulog.getRecentUpdates()) {
for (Long bufferUpdate : bufferedUpdates) {
// updater will sort updates before apply
updates.add(recentUpdates.lookup(bufferUpdate));
}
}
// Leader will compute its fingerprint, then retrieve its recent updates versions.
// There are a case that some updates (gap) get into recent versions but do not exist in index (fingerprint).
// If the gap do not contains DBQ or DBI, it is safe to use leaderFingerprint.maxVersionEncountered as a cut point.
// TODO leader should do fingerprint and retrieve recent updates version in atomic
if (leaderFingerprint != null) {
boolean existDBIOrDBQInTheGap = updates.stream().anyMatch(e -> {
@SuppressWarnings({"unchecked"})
List<Object> u = (List<Object>) e;
long version = (Long) u.get(1);
int oper = (Integer)u.get(0) & UpdateLog.OPERATION_MASK;
// only DBI or DBQ in the gap (above) will satisfy this predicate
return version > leaderFingerprint.getMaxVersionEncountered() && (oper == UpdateLog.DELETE || oper == UpdateLog.DELETE_BY_QUERY);
});
if (!existDBIOrDBQInTheGap) {
// it is safe to use leaderFingerprint.maxVersionEncountered as cut point now.
updates.removeIf(e -> {
@SuppressWarnings({"unchecked"})
List<Object> u = (List<Object>) e;
long version = (Long) u.get(1);
return version > leaderFingerprint.getMaxVersionEncountered();
});
}
}
try {
updater.applyUpdates(updates, leaderUrl);
} catch (Exception e) {
return false;
}
return true;
}
private NamedList<Object> request(ModifiableSolrParams params, String onFail) {
try {
QueryResponse rsp = new QueryRequest(params, SolrRequest.METHOD.POST).process(clientToLeader);
Exception exception = rsp.getException();
if (exception != null) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, onFail);
}
return rsp.getResponse();
} catch (SolrServerException | IOException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, onFail);
}
}
private NamedList<Object> getVersions() {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt","/get");
params.set(DISTRIB,false);
params.set("getVersions",nUpdates);
params.set("fingerprint",doFingerprint);
return request(params, "Failed to get recent versions from leader");
}
private boolean alreadyInSync() {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set("qt", "/get");
params.set(DISTRIB,false);
params.set("getFingerprint", String.valueOf(Long.MAX_VALUE));
NamedList<Object> rsp = request(params, "Failed to get fingerprint from leader");
IndexFingerprint leaderFingerprint = getFingerprint(rsp);
return compareFingerprint(leaderFingerprint);
}
private IndexFingerprint getFingerprint(NamedList<Object> rsp) {
Object fingerprint = null;
if (rsp != null) fingerprint = rsp.get("fingerprint");
if (fingerprint == null) return null;
return IndexFingerprint.fromObject(fingerprint);
}
private boolean compareFingerprint(IndexFingerprint leaderFingerprint) {
if (leaderFingerprint == null) {
log.warn("Replica did not return a fingerprint - possibly an older Solr version or exception");
return false;
}
try {
IndexFingerprint ourFingerprint = IndexFingerprint.getFingerprint(core, Long.MAX_VALUE);
int cmp = IndexFingerprint.compare(leaderFingerprint, ourFingerprint);
log.info("Fingerprint comparison result: {}" , cmp);
if (cmp != 0) {
log.info("Leader fingerprint: {}, Our fingerprint: {}", leaderFingerprint , ourFingerprint);
}
return cmp == 0; // currently, we only check for equality...
} catch (IOException e) {
log.warn("Could not confirm if we are already in sync. Continue with PeerSync");
}
return false;
}
/**
* Helper class for doing comparison ourUpdates and other replicas's updates to find the updates that we missed
*/
public static class MissedUpdatesFinder extends PeerSync.MissedUpdatesFinderBase {
private long ourHighest;
private String logPrefix;
private long nUpdates;
MissedUpdatesFinder(List<Long> ourUpdates, String logPrefix, long nUpdates,
long ourLowThreshold) {
super(ourUpdates, ourLowThreshold);
this.logPrefix = logPrefix;
this.ourHighest = ourUpdates.get(0);
this.nUpdates = nUpdates;
}
public MissedUpdatesRequest find(List<Long> leaderVersions, Object updateFrom) {
leaderVersions.sort(absComparator);
log.debug("{} sorted versions from {} = {}", logPrefix, leaderVersions, updateFrom);
long leaderLowest = leaderVersions.get(leaderVersions.size() - 1);
if (Math.abs(ourHighest) < Math.abs(leaderLowest)) {
log.info("{} Our versions are too old comparing to leader, ourHighest={} otherLowest={}", logPrefix, ourHighest, leaderLowest);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
// we don't have to check the case we ahead of the leader.
// (maybe we are the old leader and we contain some updates that no one have)
// In that case, we will fail on compute fingerprint with the current leader and start segments replication
boolean completeList = leaderVersions.size() < nUpdates;
MissedUpdatesRequest updatesRequest = handleVersionsWithRanges(leaderVersions, completeList);
if (updatesRequest.totalRequestedUpdates > nUpdates) {
log.info("{} PeerSync will fail because number of missed updates is more than:{}", logPrefix, nUpdates);
return MissedUpdatesRequest.UNABLE_TO_SYNC;
}
if (updatesRequest == MissedUpdatesRequest.EMPTY) {
log.info("{} No additional versions requested", logPrefix);
}
return updatesRequest;
}
}
}