blob: 52590ee254e527a1b38f734db07737778a2312ff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import java.io.IOException;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpSolrClient;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.cloud.ClusterState;
import org.apache.solr.common.cloud.DocCollection;
import org.apache.solr.common.cloud.ZkCoreNodeProps;
import org.apache.solr.common.cloud.ZkNodeProps;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.SolrCore;
import org.apache.solr.update.CdcrUpdateLog;
import org.apache.solr.common.util.SolrNamedThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Synchronize periodically the update log of non-leader nodes with their leaders.
* </p>
* <p>
* Non-leader nodes must always buffer updates in case of leader failures. They have to periodically
* synchronize their update logs with their leader to remove old transaction logs that will never be used anymore.
* This is performed by a background thread that is scheduled with a fixed delay. The background thread is sending
* the action {@link org.apache.solr.handler.CdcrParams.CdcrAction#LASTPROCESSEDVERSION} to the leader to retrieve
* the lowest last version number processed. This version is then used to move forward the buffer log reader.
* </p>
*/
class CdcrUpdateLogSynchronizer implements CdcrStateManager.CdcrStateObserver {
private CdcrLeaderStateManager leaderStateManager;
private ScheduledExecutorService scheduler;
private final SolrCore core;
private final String collection;
private final String shardId;
private final String path;
private int timeSchedule = DEFAULT_TIME_SCHEDULE;
private static final int DEFAULT_TIME_SCHEDULE = 60000; // by default, every minute
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
CdcrUpdateLogSynchronizer(SolrCore core, String path, SolrParams updateLogSynchonizerConfiguration) {
this.core = core;
this.path = path;
this.collection = core.getCoreDescriptor().getCloudDescriptor().getCollectionName();
this.shardId = core.getCoreDescriptor().getCloudDescriptor().getShardId();
if (updateLogSynchonizerConfiguration != null) {
this.timeSchedule = updateLogSynchonizerConfiguration.getInt(CdcrParams.SCHEDULE_PARAM, DEFAULT_TIME_SCHEDULE);
}
}
void setLeaderStateManager(final CdcrLeaderStateManager leaderStateManager) {
this.leaderStateManager = leaderStateManager;
this.leaderStateManager.register(this);
}
@Override
public void stateUpdate() {
// If I am not the leader, I need to synchronise periodically my update log with my leader.
if (!leaderStateManager.amILeader()) {
scheduler = Executors.newSingleThreadScheduledExecutor(new SolrNamedThreadFactory("cdcr-update-log-synchronizer"));
scheduler.scheduleWithFixedDelay(new UpdateLogSynchronisation(), 0, timeSchedule, TimeUnit.MILLISECONDS);
return;
}
this.shutdown();
}
boolean isStarted() {
return scheduler != null;
}
void shutdown() {
if (scheduler != null) {
// interrupts are often dangerous in Lucene / Solr code, but the
// test for this will leak threads without
scheduler.shutdownNow();
scheduler = null;
}
}
private class UpdateLogSynchronisation implements Runnable {
private String getLeaderUrl() {
ZkController zkController = core.getCoreContainer().getZkController();
ClusterState cstate = zkController.getClusterState();
DocCollection docCollection = cstate.getCollection(collection);
ZkNodeProps leaderProps = docCollection.getLeader(shardId);
if (leaderProps == null) { // we might not have a leader yet, returns null
return null;
}
ZkCoreNodeProps nodeProps = new ZkCoreNodeProps(leaderProps);
return nodeProps.getCoreUrl();
}
@Override
public void run() {
try {
String leaderUrl = getLeaderUrl();
if (leaderUrl == null) { // we might not have a leader yet, stop and try again later
return;
}
HttpSolrClient server = new HttpSolrClient.Builder(leaderUrl)
.withConnectionTimeout(15000)
.withSocketTimeout(60000)
.build();
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(CommonParams.ACTION, CdcrParams.CdcrAction.LASTPROCESSEDVERSION.toString());
@SuppressWarnings({"rawtypes"})
SolrRequest request = new QueryRequest(params);
request.setPath(path);
long lastVersion;
try {
@SuppressWarnings({"rawtypes"})
NamedList response = server.request(request);
lastVersion = (Long) response.get(CdcrParams.LAST_PROCESSED_VERSION);
if (log.isDebugEnabled()) {
log.debug("My leader {} says its last processed _version_ number is: {}. I am {}", leaderUrl, lastVersion,
core.getCoreDescriptor().getCloudDescriptor().getCoreNodeName());
}
} catch (IOException | SolrServerException e) {
log.warn("Couldn't get last processed version from leader {}: ", leaderUrl, e);
return;
} finally {
try {
server.close();
} catch (IOException ioe) {
log.warn("Caught exception trying to close client to {}: ", leaderUrl, ioe);
}
}
// if we received -1, it means that the log reader on the leader has not yet started to read log entries
// do nothing
if (lastVersion == -1) {
return;
}
try {
CdcrUpdateLog ulog = (CdcrUpdateLog) core.getUpdateHandler().getUpdateLog();
if (ulog.isBuffering()) {
log.debug("Advancing replica buffering tlog reader to {} @ {}:{}", lastVersion, collection, shardId);
ulog.getBufferToggle().seek(lastVersion);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): ", lastVersion, e);
} catch (IOException e) {
log.warn("Couldn't advance replica buffering tlog reader to {} (to remove old tlogs): ", lastVersion, e);
}
} catch (Throwable e) {
log.warn("Caught unexpected exception", e);
throw e;
}
}
}
}