blob: 007291a485ab534e286e6961fe0b845f1ba9c820 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import com.google.common.base.Strings;
import org.apache.lucene.codecs.CodecUtil;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;
import org.apache.lucene.store.FilterDirectory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrRequest;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.Http2SolrClient;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.InputStreamResponseParser;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.client.solrj.util.AsyncListener;
import org.apache.solr.client.solrj.util.Cancellable;
import org.apache.solr.cloud.CloudDescriptor;
import org.apache.solr.cloud.ZkController;
import org.apache.solr.common.AlreadyClosedException;
import org.apache.solr.common.ParWork;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.SolrException.ErrorCode;
import org.apache.solr.common.cloud.Replica;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.IOUtils;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.common.util.SuppressForbidden;
import org.apache.solr.core.CoreContainer;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RTimer;
import org.apache.solr.util.RefCounted;
import org.apache.solr.util.TestInjection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.solr.common.params.CommonParams.JAVABIN;
import static org.apache.solr.common.params.CommonParams.NAME;
import static org.apache.solr.handler.ReplicationHandler.ALIAS;
import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
import static org.apache.solr.handler.ReplicationHandler.COMMAND;
import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
import static org.apache.solr.handler.ReplicationHandler.FETCH_FROM_LEADER;
import static org.apache.solr.handler.ReplicationHandler.FILE;
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
import static org.apache.solr.handler.ReplicationHandler.FileInfo;
import static org.apache.solr.handler.ReplicationHandler.GENERATION;
import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import static org.apache.solr.handler.ReplicationHandler.SIZE;
import static org.apache.solr.handler.ReplicationHandler.SKIP_COMMIT_ON_MASTER_VERSION_ZERO;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.UncheckedIOException;
import java.io.Writer;
import java.lang.invoke.MethodHandles;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.FileStore;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.BooleanSupplier;
import java.util.function.Function;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;
/**
* <p> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
* master. </p>
*
*
* @since solr 1.4
*/
public class IndexFetcher {
private static final int _10K = 10000;
public static final String INDEX_PROPERTIES = "index.properties";
private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private String masterUrl;
final ReplicationHandler replicationHandler;
private volatile long replicationStartTimeStamp;
private RTimer replicationTimer;
private final SolrCore solrCore;
private volatile List<Map<String, Object>> filesToDownload;
private volatile List<Map<String, Object>> confFilesToDownload;
private volatile List<Map<String, Object>> filesDownloaded;
private volatile List<Map<String, Object>> confFilesDownloaded;
private volatile Map<String, Object> currentFile;
private volatile DirectoryFileFetcher dirFileFetcher;
private volatile LocalFsFileFetcher localFileFetcher;
private volatile ExecutorService fsyncService;
private volatile Future<?> fsyncServiceFuture;
private volatile boolean stop = false;
private volatile boolean abort = false;
private boolean useInternalCompression = false;
private boolean useExternalCompression = false;
final boolean fetchFromLeader;
private final Http2SolrClient solrClient;
private Integer connTimeout;
private Integer soTimeout;
private boolean skipCommitOnMasterVersionZero = false;
private boolean clearLocalIndexFirst = false;
private static final String INTERRUPT_RESPONSE_MESSAGE = "Interrupted while waiting for modify lock";
private final Map<String,Cancellable> fileFetchRequests = new ConcurrentHashMap<>();
public static class IndexFetchResult {
private final String message;
private final boolean successful;
private final Throwable exception;
public static final String FAILED_BY_INTERRUPT_MESSAGE = "Fetching index failed by interrupt";
public static final String FAILED_BY_EXCEPTION_MESSAGE = "Fetching index failed by exception";
/** pre-defined results */
public static final IndexFetchResult ALREADY_IN_SYNC = new IndexFetchResult("Local index commit is already in sync with peer", true, null);
public static final IndexFetchResult INDEX_FETCH_FAILURE = new IndexFetchResult("Fetching lastest index is failed", false, null);
public static final IndexFetchResult INDEX_FETCH_SUCCESS = new IndexFetchResult("Fetching latest index is successful", true, null);
public static final IndexFetchResult LOCK_OBTAIN_FAILED = new IndexFetchResult("Obtaining SnapPuller lock failed", false, null);
public static final IndexFetchResult CONTAINER_IS_SHUTTING_DOWN = new IndexFetchResult("I was asked to replicate but CoreContainer is shutting down", false, null);
public static final IndexFetchResult MASTER_VERSION_ZERO = new IndexFetchResult("Index in peer is empty and never committed yet", true, null);
public static final IndexFetchResult NO_INDEX_COMMIT_EXIST = new IndexFetchResult("No IndexCommit in local index", false, null);
public static final IndexFetchResult PEER_INDEX_COMMIT_DELETED = new IndexFetchResult("No files to download because IndexCommit in peer was deleted", false, null);
public static final IndexFetchResult LOCAL_ACTIVITY_DURING_REPLICATION = new IndexFetchResult("Local index modification during replication", false, null);
public static final IndexFetchResult EXPECTING_NON_LEADER = new IndexFetchResult("Replicating from leader but I'm the shard leader", false, null);
public static final IndexFetchResult LEADER_IS_NOT_ACTIVE = new IndexFetchResult("Replicating from leader but leader is not active", false, null);
IndexFetchResult(String message, boolean successful, Throwable exception) {
this.message = message;
this.successful = successful;
this.exception = exception;
}
/*
* @return exception thrown if failed by exception or interrupt, otherwise null
*/
public Throwable getException() {
return this.exception;
}
/*
* @return true if index fetch was successful, false otherwise
*/
public boolean getSuccessful() {
return this.successful;
}
public String getMessage() {
return this.message;
}
}
// private static HttpClient createHttpClient(SolrCore core, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
// final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
// httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
// httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
// httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
//
// return HttpClientUtil.createClient(httpClientParams, core.getCoreContainer().getUpdateShardHandler().getDefaultConnectionManager(), true);
// }
public IndexFetcher(@SuppressWarnings({"rawtypes"})final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
Object fetchFromLeader = initArgs.get(FETCH_FROM_LEADER);
if (fetchFromLeader != null && fetchFromLeader instanceof Boolean) {
this.fetchFromLeader = (boolean) fetchFromLeader;
} else {
this.fetchFromLeader = false;
}
Object skipCommitOnMasterVersionZero = initArgs.get(SKIP_COMMIT_ON_MASTER_VERSION_ZERO);
if (skipCommitOnMasterVersionZero != null && skipCommitOnMasterVersionZero instanceof Boolean) {
this.skipCommitOnMasterVersionZero = (boolean) skipCommitOnMasterVersionZero;
}
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null && !this.fetchFromLeader)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"'masterUrl' is required for a slave");
if (masterUrl != null && masterUrl.endsWith(ReplicationHandler.PATH)) {
masterUrl = masterUrl.substring(0, masterUrl.length()-12);
log.warn("'masterUrl' must be specified without the {} suffix", ReplicationHandler.PATH);
}
this.masterUrl = masterUrl;
this.replicationHandler = handler;
String compress = (String) initArgs.get(COMPRESSION);
useInternalCompression = INTERNAL.equals(compress);
useExternalCompression = EXTERNAL.equals(compress);
connTimeout = getParameter(initArgs, HttpClientUtil.PROP_CONNECTION_TIMEOUT, 30000, null);
// allow a master override for tests - you specify this in /replication slave section of solrconfig and some
// test don't want to define this
soTimeout = Integer.getInteger("solr.indexfetcher.sotimeout", -1);
if (soTimeout == -1) {
soTimeout = getParameter(initArgs, HttpClientUtil.PROP_SO_TIMEOUT, 120000, null);
}
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
// MRM TODO:
solrClient = sc.getCoreContainer().getUpdateShardHandler().getTheSharedHttpClient();
// createHttpClient(solrCore, httpBasicAuthUser, httpBasicAuthPassword, useExternalCompression);
}
@SuppressWarnings({"unchecked"})
protected <T> T getParameter(@SuppressWarnings({"rawtypes"})NamedList initArgs, String configKey, T defaultValue, StringBuilder sb) {
T toReturn = defaultValue;
if (initArgs != null) {
T temp = (T) initArgs.get(configKey);
toReturn = (temp != null) ? temp : defaultValue;
}
if(sb!=null && toReturn != null) sb.append(configKey).append(" : ").append(toReturn).append(",");
return toReturn;
}
/**
* Gets the latest commit version and generation from the master
*/
@SuppressWarnings({"unchecked", "rawtypes"})
NamedList getLatestVersion() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_INDEX_VERSION);
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);
// TODO modify to use shardhandler
try {
req.setBasePath(masterUrl);
return solrClient.request(req);
} catch (SolrServerException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, e.getMessage(), e);
}
}
/**
* Fetches the list of files in a given index commit point and updates internal list of files to download.
*/
@SuppressWarnings({"unchecked"})
private void fetchFileList(long gen) throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_GET_FILE_LIST);
params.set(GENERATION, String.valueOf(gen));
params.set(CommonParams.WT, JAVABIN);
params.set(CommonParams.QT, ReplicationHandler.PATH);
QueryRequest req = new QueryRequest(params);
// TODO modify to use shardhandler
try {
req.setBasePath(masterUrl);
NamedList response = solrClient.request(req);
List<Map<String,Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST);
if (files != null) filesToDownload = Collections.synchronizedList(files);
else {
filesToDownload = Collections.emptyList();
log.error("No files to download for index generation: {}", gen);
}
files = (List<Map<String,Object>>) response.get(CONF_FILES);
if (files != null) confFilesToDownload = Collections.synchronizedList(files);
} catch (SolrServerException e) {
throw new IOException(e);
}
}
IndexFetchResult fetchLatestIndex(boolean forceReplication) throws IOException, InterruptedException {
return fetchLatestIndex(forceReplication, false);
}
/**
* This command downloads all the necessary files from master to install a index commit point. Only changed files are
* downloaded. It also downloads the conf files (if they are modified).
*
* @param forceReplication force a replication in all cases
* @param forceCoreReload force a core reload in all cases
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
IndexFetchResult fetchLatestIndex(boolean forceReplication, boolean forceCoreReload) throws IOException, InterruptedException {
stop = false;
abort = false;
this.clearLocalIndexFirst = false;
boolean cleanupDone = false;
boolean successfulInstall = false;
markReplicationStart();
Directory tmpIndexDir = null;
String tmpIndexDirPath;
Directory indexDir = null;
String indexDirPath;
boolean deleteTmpIdxDir = true;
File tmpTlogDir = null;
if (!solrCore.getSolrCoreState().getLastReplicateIndexSuccess()) {
// if the last replication was not a success, we force a full replication
// when we are a bit more confident we may want to try a partial replication
// if the error is connection related or something, but we have to be careful
forceReplication = true;
log.info("Last replication failed, so I'll force replication");
}
try {
if (fetchFromLeader) {
assert !solrCore.isClosed(): "Replication should be stopped before closing the core";
Replica replica = null;
try {
replica = getLeaderReplica();
} catch (TimeoutException e) {
}
if (replica == null) {
log.warn("Leader is not available. Index fetch failed due to not finding leader: {}", masterUrl);
return IndexFetchResult.EXPECTING_NON_LEADER;
}
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
if (solrCore.getCoreDescriptor().getName().equals(replica.getName())) {
return IndexFetchResult.EXPECTING_NON_LEADER;
}
if (replica.getState() != Replica.State.ACTIVE) {
if (log.isInfoEnabled()) {
log.info("Replica {} is leader but it's state is {}, skipping replication", replica.getName(), replica.getState());
}
return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
}
if (!solrCore.getCoreContainer().getZkController().getZkStateReader().isNodeLive(replica.getNodeName())) {
if (log.isInfoEnabled()) {
log.info("Replica {} is leader but it's not hosted on a live node, skipping replication", replica.getName());
}
return IndexFetchResult.LEADER_IS_NOT_ACTIVE;
}
if (!replica.getCoreUrl().equals(masterUrl)) {
masterUrl = replica.getCoreUrl();
log.info("Updated masterUrl to {}", masterUrl);
// TODO: Do we need to set forceReplication = true?
} else {
log.debug("masterUrl didn't change");
}
}
//get the current 'replicateable' index version in the master
@SuppressWarnings({"rawtypes"})
NamedList response;
try {
response = getLatestVersion();
} catch (Exception e) {
final String errorMsg = e.toString();
if (!Strings.isNullOrEmpty(errorMsg) && errorMsg.contains(INTERRUPT_RESPONSE_MESSAGE)) {
log.warn("Master at: {} is not available. Index fetch failed by interrupt. Exception: {}", masterUrl, errorMsg);
return new IndexFetchResult(IndexFetchResult.FAILED_BY_INTERRUPT_MESSAGE, false, e);
} else {
log.warn("Master at: {} is not available. Index fetch failed by exception: {}", masterUrl, errorMsg);
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
}
}
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
log.info("Master's generation: {}", latestGeneration);
log.info("Master's version: {}", latestVersion);
// TODO: make sure that getLatestCommit only returns commit points for the main index (i.e. no side-car indexes)
IndexCommit commit = solrCore.getDeletionPolicy().getLatestCommit();
if (commit == null) {
// Presumably the IndexWriter hasn't been opened yet, and hence the deletion policy hasn't been updated with commit points
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
try {
searcherRefCounted = solrCore.getNewestSearcher(false);
if (searcherRefCounted == null) {
log.warn("No open searcher found - fetch aborted");
return IndexFetchResult.NO_INDEX_COMMIT_EXIST;
}
commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
} finally {
if (searcherRefCounted != null)
searcherRefCounted.decref();
}
}
long slaveVersion = IndexDeletionPolicyWrapper.getCommitTimestamp(commit);
if (log.isInfoEnabled()) {
log.info("Slave's generation: {}", commit.getGeneration());
log.info("Slave's version: {}", slaveVersion); // logOK
}
if (latestVersion == 0L) {
if (commit.getGeneration() > 1 || slaveVersion > 0) {
// since we won't get the files for an empty index,
// we just clear ours and commit
log.info("New index in Master. Deleting mine...");
RefCounted<IndexWriter> iw = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(solrCore);
try {
iw.get().deleteAll();
} finally {
iw.decref();
}
assert TestInjection.injectDelayBeforeSlaveCommitRefresh();
if (skipCommitOnMasterVersionZero) {
openNewSearcherAndUpdateCommitPoint();
} else {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore, new ModifiableSolrParams());
solrCore.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
}
}
//there is nothing to be replicated
successfulInstall = true;
log.debug("Nothing to replicate, master's version is 0");
return IndexFetchResult.MASTER_VERSION_ZERO;
}
// TODO: Should we be comparing timestamps (across machines) here?
if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
//master and slave are already in sync just return
log.info("Slave in sync with master.");
successfulInstall = true;
return IndexFetchResult.ALREADY_IN_SYNC;
}
log.info("Starting replication process");
// get the list of files first
fetchFileList(latestGeneration);
// this can happen if the commit point is deleted before we fetch the file list.
if (filesToDownload.isEmpty()) {
return IndexFetchResult.PEER_INDEX_COMMIT_DELETED;
}
if (log.isInfoEnabled()) {
log.info("Number of files in latest index in master: {}", filesToDownload.size());
}
// Create the sync service
fsyncService = ParWork.getExecutorService(4);
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generation of master is older than that of the slave , it means they are not compatible to be copied
// then a new index directory to be created and all the files need to be copied
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
.getCommitTimestamp(commit) >= latestVersion
|| commit.getGeneration() >= latestGeneration || forceReplication;
String timestamp = new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
String tmpIdxDirName = "index." + timestamp;
tmpIndexDirPath = solrCore.getDataDir() + tmpIdxDirName;
tmpIndexDir = solrCore.getDirectoryFactory().get(tmpIndexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
// cindex dir...
indexDirPath = solrCore.getIndexDir();
indexDir = solrCore.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
try {
// We will compare all the index files from the master vs the index files on disk to see if there is a mismatch
// in the metadata. If there is a mismatch for the same index file then we download the entire index
// (except when differential copy is applicable) again.
if (!isFullCopyNeeded && isIndexStale(indexDir)) {
isFullCopyNeeded = true;
}
if (!isFullCopyNeeded && !fetchFromLeader) {
// a searcher might be using some flushed but not committed segments
// because of soft commits (which open a searcher on IW's data)
// so we need to close the existing searcher on the last commit
// and wait until we are able to clean up all unused lucene files
if (solrCore.getCoreContainer().isZooKeeperAware()) {
solrCore.closeSearcher();
}
// rollback and reopen index writer and wait until all unused files
// are successfully deleted
solrCore.getUpdateHandler().newIndexWriter(true);
RefCounted<IndexWriter> writer = solrCore.getUpdateHandler().getSolrCoreState().getIndexWriter(null);
try {
IndexWriter indexWriter = writer.get();
int c = 0;
indexWriter.deleteUnusedFiles();
while (hasUnusedFiles(indexDir, commit)) {
indexWriter.deleteUnusedFiles();
log.info("Sleeping for 250ms to wait for unused lucene index files to be delete-able");
Thread.sleep(250);
c++;
if (c >= 120) {
log.warn("IndexFetcher unable to cleanup unused lucene index files so we must do a full copy instead");
isFullCopyNeeded = true;
break;
}
}
if (c > 0) {
log.info("IndexFetcher slept for {}ms for unused lucene index files to be delete-able", c * 250);
}
} finally {
writer.decref();
}
}
boolean reloadCore = false;
try {
// we have to be careful and do this after we know isFullCopyNeeded won't be flipped
if (!isFullCopyNeeded) {
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(solrCore, true);
}
log.info("Starting download (fullCopy={}) to {}", isFullCopyNeeded, tmpIndexDir);
successfulInstall = true;
boolean downloadFailed = false;
long bytesDownloaded = 0;
try {
bytesDownloaded = downloadIndexFiles(isFullCopyNeeded, indexDir, tmpIndexDir, indexDirPath, tmpIndexDirPath, latestGeneration);
} catch (CheckSumFailException e) {
downloadFailed = true;
successfulInstall = false;
}
final long timeTakenSeconds = getReplicationTimeElapsed();
final Long bytesDownloadedPerSecond = (timeTakenSeconds != 0 ? Long.valueOf(bytesDownloaded / timeTakenSeconds) : null);
log.info("Total time taken for download (fullCopy={},bytesDownloaded={}) : {} secs ({} bytes/sec) to {}",
isFullCopyNeeded, bytesDownloaded, timeTakenSeconds, bytesDownloadedPerSecond, tmpIndexDir);
Collection<Map<String,Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty() && !downloadFailed) {
reloadCore = true;
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded && successfulInstall) {
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
if (successfulInstall) deleteTmpIdxDir = false;
} else {
terminateAndWaitFsyncService();
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
if (!this.clearLocalIndexFirst) {//it was closed earlier
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
}
// Cleanup all index files not associated with any *named* snapshot.
solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
}
}
log.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);// write to a file time of replication and
// conf files.
}
} else if (!downloadFailed) {
if (isFullCopyNeeded && successfulInstall) {
terminateAndWaitFsyncService();
successfulInstall = solrCore.modifyIndexProps(tmpIdxDirName);
if (!successfulInstall) {
log.error("Modify index props failed");
}
if (successfulInstall) deleteTmpIdxDir = false;
} else if (successfulInstall) {
terminateAndWaitFsyncService();
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
if (!successfulInstall) {
log.error("Move index files failed");
throw new SolrException(ErrorCode.SERVER_ERROR, "Move index files failed");
}
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles,
successfulInstall);
}
}
} finally {
solrCore.searchEnabled = true;
solrCore.indexEnabled = true;
if (!isFullCopyNeeded) {
solrCore.getUpdateHandler().getSolrCoreState().openIndexWriter(solrCore);
}
}
// we must reload the core after we open the IW back up
if (successfulInstall && (reloadCore || forceCoreReload) && !isFullCopyNeeded) {
if (log.isInfoEnabled()) {
log.info("Reloading SolrCore {}", solrCore.getName());
}
reloadCore();
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
log.info("removing old index directory {}", indexDir);
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
try {
solrCore.getDirectoryFactory().remove(indexDir);
} catch (IllegalArgumentException e) {
if (log.isDebugEnabled()) {
log.debug("Error removing directory in IndexFetcher", e);
}
// could already be removed
}
}
}
if (isFullCopyNeeded) {
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
}
openNewSearcherAndUpdateCommitPoint();
}
if (!isFullCopyNeeded && !forceReplication && !successfulInstall && !abort) {
// we try with a full copy of the index
log.warn(
"Replication attempt was not successful - trying a full index replication reloadCore={}",
reloadCore);
successfulInstall = fetchLatestIndex(true, reloadCore).getSuccessful();
}
return successfulInstall ? IndexFetchResult.INDEX_FETCH_SUCCESS : IndexFetchResult.INDEX_FETCH_FAILURE;
} catch (ReplicationHandlerException e) {
log.error("User aborted Replication");
return new IndexFetchResult(IndexFetchResult.FAILED_BY_EXCEPTION_MESSAGE, false, e);
} catch (SolrException e) {
throw e;
} catch (InterruptedException e) {
throw new InterruptedException("Index fetch interrupted");
} catch (Exception e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
}
} finally {
cleanup(solrCore, tmpIndexDir, indexDir, deleteTmpIdxDir, tmpTlogDir, successfulInstall);
}
}
private Replica getLeaderReplica() throws InterruptedException, TimeoutException {
ZkController zkController = solrCore.getCoreContainer().getZkController();
CloudDescriptor cd = solrCore.getCoreDescriptor().getCloudDescriptor();
Replica leaderReplica = zkController.getZkStateReader().getLeaderRetry(
cd.getCollectionName(), cd.getShardId(), 5000);
return leaderReplica;
}
private void cleanup(final SolrCore core, Directory tmpIndexDir,
Directory indexDir, boolean deleteTmpIdxDir, File tmpTlogDir, boolean successfulInstall) throws IOException {
try {
if (!successfulInstall) {
try {
logReplicationTimeAndConfFiles(null, successfulInstall);
} catch (Exception e) {
// this can happen on shutdown, a fetch may be running in a thread after DirectoryFactory is closed
log.warn("Could not log failed replication details", e);
}
} else {
if (core.getCoreContainer().isZooKeeperAware()) {
// we only track replication success in SolrCloud mode
core.getUpdateHandler().getSolrCoreState().setLastReplicateIndexSuccess(successfulInstall);
}
}
} finally {
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
markReplicationStop();
dirFileFetcher = null;
localFileFetcher = null;
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService.shutdown();
fsyncService = null;
fsyncServiceFuture = null;
fsyncException = null;
// order below is important
try {
if (tmpIndexDir != null && deleteTmpIdxDir) {
core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
core.getDirectoryFactory().remove(tmpIndexDir);
}
} catch (Exception e) {
SolrException.log(log, e);
} finally {
try {
if (tmpIndexDir != null) core.getDirectoryFactory().release(tmpIndexDir);
} catch (Exception e) {
SolrException.log(log, e);
}
try {
if (indexDir != null) {
try {
core.getDirectoryFactory().release(indexDir);
} catch (IllegalArgumentException e) {
if (log.isDebugEnabled()) log.debug("Error releasing directory in IndexFetcher", e);
// could already be removed
}
}
} catch (Exception e) {
SolrException.log(log, e);
}
try {
if (tmpTlogDir != null) delTree(tmpTlogDir);
} catch (Exception e) {
SolrException.log(log, e);
}
}
}
}
private boolean hasUnusedFiles(Directory indexDir, IndexCommit commit) throws IOException {
String segmentsFileName = commit.getSegmentsFileName();
SegmentInfos infos = SegmentInfos.readCommit(indexDir, segmentsFileName);
Set<String> currentFiles = new HashSet<>(infos.files(true));
String[] allFiles = indexDir.listAll();
for (String file : allFiles) {
if (!file.equals(segmentsFileName) && !currentFiles.contains(file) && !file.endsWith(".lock")) {
log.info("Found unused file: {}", file);
return true;
}
}
return false;
}
private volatile Exception fsyncException;
/**
* terminate the fsync service and wait for all the tasks to complete. If it is already terminated
*/
private void terminateAndWaitFsyncService() throws Exception {
fsyncService.shutdown();
// give a long wait say 1 hr
fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
// if any fsync failed, throw that exception back
Exception fsyncExceptionCopy = fsyncException;
if (fsyncExceptionCopy != null) throw fsyncExceptionCopy;
}
/**
* Helper method to record the last replication's details so that we can show them on the statistics page across
* restarts.
* @throws IOException on IO error
*/
@SuppressForbidden(reason = "Need currentTimeMillis for debugging/stats")
private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) throws IOException {
List<String> confFiles = new ArrayList<>();
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty())
for (Map<String, Object> map1 : modifiedConfFiles)
confFiles.add((String) map1.get(NAME));
Properties props = replicationHandler.loadReplicationProperties();
long replicationTime = System.currentTimeMillis();
long replicationTimeTaken = getReplicationTimeElapsed();
Directory dir = null;
try {
dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
int indexCount = 1, confFilesCount = 1;
if (props.containsKey(TIMES_INDEX_REPLICATED)) {
indexCount = Integer.parseInt(props.getProperty(TIMES_INDEX_REPLICATED)) + 1;
}
StringBuilder sb = readToStringBuilder(replicationTime, props.getProperty(INDEX_REPLICATED_AT_LIST));
props.setProperty(INDEX_REPLICATED_AT_LIST, sb.toString());
props.setProperty(INDEX_REPLICATED_AT, String.valueOf(replicationTime));
props.setProperty(PREVIOUS_CYCLE_TIME_TAKEN, String.valueOf(replicationTimeTaken));
props.setProperty(TIMES_INDEX_REPLICATED, String.valueOf(indexCount));
if (clearLocalIndexFirst) {
props.setProperty(CLEARED_LOCAL_IDX, "true");
}
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
props.setProperty(CONF_FILES_REPLICATED, confFiles.toString());
props.setProperty(CONF_FILES_REPLICATED_AT, String.valueOf(replicationTime));
if (props.containsKey(TIMES_CONFIG_REPLICATED)) {
confFilesCount = Integer.parseInt(props.getProperty(TIMES_CONFIG_REPLICATED)) + 1;
}
props.setProperty(TIMES_CONFIG_REPLICATED, String.valueOf(confFilesCount));
}
props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded()));
if (!successfulInstall) {
int numFailures = 1;
if (props.containsKey(TIMES_FAILED)) {
numFailures = Integer.parseInt(props.getProperty(TIMES_FAILED)) + 1;
}
props.setProperty(TIMES_FAILED, String.valueOf(numFailures));
props.setProperty(REPLICATION_FAILED_AT, String.valueOf(replicationTime));
sb = readToStringBuilder(replicationTime, props.getProperty(REPLICATION_FAILED_AT_LIST));
props.setProperty(REPLICATION_FAILED_AT_LIST, sb.toString());
}
String tmpFileName = REPLICATION_PROPERTIES + "." + System.nanoTime();
final IndexOutput out = dir.createOutput(tmpFileName, DirectoryFactory.IOCONTEXT_NO_CACHE);
Writer outFile = null;
try {
outFile = new OutputStreamWriter(new PropertiesOutputStream(out), StandardCharsets.UTF_8);
props.store(outFile, "Replication details");
dir.sync(Collections.singleton(tmpFileName));
} finally {
IOUtils.closeQuietly(outFile);
IOUtils.closeQuietly(out);
}
solrCore.getDirectoryFactory().renameWithOverwrite(dir, tmpFileName, REPLICATION_PROPERTIES);
} catch (Throwable e) {
ParWork.propagateInterrupt(e);
log.warn("Exception while updating statistics", e);
if (e instanceof Error) {
throw e;
}
} finally {
if (dir != null) {
solrCore.getDirectoryFactory().release(dir);
}
}
}
long getTotalBytesDownloaded() {
long bytesDownloaded = 0;
//get size from list of files to download
for (Map<String, Object> file : getFilesDownloaded()) {
bytesDownloaded += (Long) file.get(SIZE);
}
//get size from list of conf files to download
for (Map<String, Object> file : getConfFilesDownloaded()) {
bytesDownloaded += (Long) file.get(SIZE);
}
//get size from current file being downloaded
Map<String, Object> currentFile = getCurrentFile();
if (currentFile != null) {
if (currentFile.containsKey("bytesDownloaded")) {
bytesDownloaded += (Long) currentFile.get("bytesDownloaded");
}
}
return bytesDownloaded;
}
private StringBuilder readToStringBuilder(long replicationTime, String str) {
StringBuilder sb = new StringBuilder();
List<String> l = new ArrayList<>();
if (str != null && str.length() != 0) {
String[] ss = str.split(",");
Collections.addAll(l, ss);
}
sb.append(replicationTime);
if (!l.isEmpty()) {
for (int i = 0; i < l.size() || i < 9; i++) {
if (i == l.size() || i == 9) break;
String s = l.get(i);
sb.append(",").append(s);
}
}
return sb;
}
private void openNewSearcherAndUpdateCommitPoint() throws IOException {
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
// must get the latest solrCore object because the one we have might be closed because of a reload
// todo stop keeping solrCore around
try (SolrCore core = solrCore.getCoreContainer().getCore(solrCore.getName())) {
// testing
// @SuppressWarnings({"rawtypes"}) SolrQueryRequest req = new LocalSolrQueryRequest(core, new ModifiableSolrParams());
// core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
Future[] waitSearcher = new Future[1];
searcher = core.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {
try {
waitSearcher[0].get();
} catch (InterruptedException | ExecutionException e) {
ParWork.propagateInterrupt(e);
throw new SolrException(ErrorCode.SERVER_ERROR, e);
}
}
commitPoint = searcher.get().getIndexReader().getIndexCommit();
} finally{
if (searcher != null) {
searcher.decref();
}
}
// update the commit point in replication handler
replicationHandler.indexCommitPoint = commitPoint;
}
private void reloadCore() {
solrCore.getCoreContainer().reload(solrCore.getName());
}
private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
log.info("Starting download of configuration files from master: {}", confFilesToDownload);
confFilesDownloaded = Collections.synchronizedList(new ArrayList<>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
try {
boolean status = tmpconfDir.mkdirs();
if (!status) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Failed to create temporary config folder: " + tmpconfDir.getName());
}
try (ParWork work = new ParWork(this, false)) {
for (Map<String,Object> file : confFilesToDownload) {
work.collect("fetchConfigFile", () -> {
try {
String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, CONF_FILE_SHORT, latestGeneration);
currentFile = file;
localFileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<>(file));
if (stop) {
log.info("Skipping conf file copying due to abort");
throw new AlreadyClosedException();
}
} catch (Exception e) {
log.error("", e);
} finally {
fileFetchRequests.remove(file.get(NAME));
}
});
}
}
// this is called before copying the files to the original conf dir
// so that if there is an exception avoid corrupting the original files.
terminateAndWaitFsyncService();
copyTmpConfFiles2Conf(tmpconfDir);
} finally {
delTree(tmpconfDir);
}
}
/**
* Download the index files. If a new index is needed, download all the files.
*
* @param downloadCompleteIndex is it a fresh index copy
* @param indexDir the indexDir to be merged to
* @param tmpIndexDir the directory to which files need to be downloaded to
* @param indexDirPath the path of indexDir
* @param latestGeneration the version number
*
* @return number of bytes downloaded
*/
private long downloadIndexFiles(boolean downloadCompleteIndex, Directory indexDir, Directory tmpIndexDir,
String indexDirPath, String tmpIndexDirPath, long latestGeneration)
throws Exception {
if (!abort) {
stop = false;
} else {
throw new ReplicationHandlerException("User aborted replication");
}
if (log.isDebugEnabled()) {
log.debug("Download files to dir: {}", Arrays.asList(indexDir.listAll()));
}
LongAdder bytesDownloaded = new LongAdder();
LongAdder bytesSkippedCopying = new LongAdder();
boolean doDifferentialCopy =
(indexDir instanceof FSDirectory || (indexDir instanceof FilterDirectory && FilterDirectory.unwrap(indexDir) instanceof FSDirectory)) && (tmpIndexDir instanceof FSDirectory || (
tmpIndexDir instanceof FilterDirectory && FilterDirectory.unwrap(tmpIndexDir) instanceof FSDirectory));
long totalSpaceRequired = 0;
synchronized (filesToDownload) {
for (Map<String,Object> file : filesToDownload) {
long size = (Long) file.get(SIZE);
totalSpaceRequired += size;
}
}
if (log.isInfoEnabled()) {
log.info("tmpIndexDir_type : {} , {}", tmpIndexDir.getClass(), FilterDirectory.unwrap(tmpIndexDir));
}
long usableSpace = usableDiskSpaceProvider.apply(tmpIndexDirPath);
long atsr = getApproxTotalSpaceReqd(totalSpaceRequired);
if (atsr > usableSpace) {
log.warn("WARNING: clearing disk space ahead of time to avoid running out of space, could cause problems with current SolrCore approxTotalSpaceReqd{}, usableSpace={}", atsr, usableSpace);
deleteFilesInAdvance(indexDir, indexDirPath, totalSpaceRequired, usableSpace);
}
if (log.isDebugEnabled()) {
log.debug("Files to download {}", filesToDownload);
}
try {
// MRM TODO: test parallel for file download (not enabled)
try (ParWork parWork = new ParWork(this, true)) {
synchronized (filesToDownload) {
for (Map<String,Object> file : filesToDownload) {
String filename = (String) file.get(NAME);
long size = (Long) file.get(SIZE);
Long serverChecksum = (Long) file.get(CHECKSUM);
CompareResult compareResult = compareFile(indexDir, filename, size, null, masterUrl, "filesToDownload");
if (compareResult.checkSummed && !compareResult.equal && !downloadCompleteIndex && !doDifferentialCopy) {
stop = true;
log.error("Checksum failed, start over with full copy replication");
throw new CheckSumFailException();
}
boolean alwaysDownload = false;
if (serverChecksum == null) {
alwaysDownload = filesToAlwaysDownloadIfNoChecksums(filename, size, compareResult);
}
boolean finalDoDifferentialCopy = doDifferentialCopy;
// parWork.collect("IndexFetcher", () -> {
if (log.isDebugEnabled()) {
log.debug("Downloading file={} size={} checksum={} alwaysDownload={}", filename, size, file.get(CHECKSUM), alwaysDownload);
}
if (!compareResult.equal || downloadCompleteIndex || alwaysDownload) {
File localFile = new File(indexDirPath, filename);
if (downloadCompleteIndex && finalDoDifferentialCopy && compareResult.equal && compareResult.checkSummed && localFile.exists()) {
if (log.isInfoEnabled()) {
log.info("Don't need to download this file. Local file's path is: {}, checksum is: {}", localFile.getAbsolutePath(), file.get(CHECKSUM));
}
// A hard link here should survive the eventual directory move, and should be more space efficient as
// compared to a file copy. TODO: Maybe we could do a move safely here?
Directory baseFromDir = getBaseDir(indexDir);
Directory baseToDir = getBaseDir(tmpIndexDir);
if (baseFromDir instanceof FSDirectory && baseToDir instanceof FSDirectory) {
Files.createLink(new File(tmpIndexDirPath, filename).toPath(), localFile.toPath());
} else {
bytesDownloaded.add(localFile.length());
boolean success = moveAFile(indexDir, tmpIndexDir, filename);
if (!success) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Move directory failed file=" + filename + " " + indexDir + " to " + tmpIndexDirPath);
}
}
} else {
try {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file, (String) file.get(NAME), FILE, latestGeneration);
currentFile = file;
dirFileFetcher.fetchFile();
bytesDownloaded.add(dirFileFetcher.getBytesDownloaded());
} catch (CheckSumFailException e) {
throw e;
} catch (Exception e) {
log.error("Problem downloading file {}", file, e);
throw e;
} finally {
fileFetchRequests.remove(file.get(NAME));
}
} if (stop) {
throw new AlreadyClosedException();
}
if (log.isDebugEnabled()) log.debug("Downloaded {}", tmpIndexDir, file.get(NAME));
filesDownloaded.add(Collections.unmodifiableMap(file));
} else {
if (log.isDebugEnabled()) {
log.debug("Skipping download for {} because it already exists", file.get(NAME));
}
}
}
}
}
} finally {
fileFetchRequests.clear();
}
log.info("Bytes downloaded: {}, Bytes skipped downloading: {}", bytesDownloaded, bytesSkippedCopying);
return bytesDownloaded.sum();
}
//only for testing purposes. do not use this anywhere else
//-----------START----------------------
static BooleanSupplier testWait = () -> true;
static Function<String, Long> usableDiskSpaceProvider = dir -> getUsableSpace(dir);
//------------ END---------------------
private static Long getUsableSpace(String dir) {
try {
File file = new File(dir);
if (!file.exists()) {
file = file.getParentFile();
if (!file.exists()) {//this is not a disk directory . so just pretend that there is enough space
return Long.MAX_VALUE;
}
}
FileStore fileStore = Files.getFileStore(file.toPath());
return fileStore.getUsableSpace();
} catch (IOException e) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Could not free disk space", e);
}
}
private long getApproxTotalSpaceReqd(long totalSpaceRequired) {
long approxTotalSpaceReqd = (long) (totalSpaceRequired * 1.05);// add 5% extra for safety
approxTotalSpaceReqd += (100 * 1024 * 1024); //we should have an extra of 100MB free after everything is downloaded
return approxTotalSpaceReqd;
}
private void deleteFilesInAdvance(Directory indexDir, String indexDirPath, long usableDiskSpace, long totalSpaceRequired) throws IOException {
long actualSpaceReqd = totalSpaceRequired;
List<String> filesTobeDeleted = new ArrayList<>();
long clearedSpace = 0;
//go through each file to check if this needs to be deleted
for (String f : indexDir.listAll()) {
for (Map<String, Object> fileInfo : filesToDownload) {
if (f.equals(fileInfo.get(NAME))) {
String filename = (String) fileInfo.get(NAME);
long size = (Long) fileInfo.get(SIZE);
CompareResult compareResult = compareFile(indexDir, filename, size, (Long) fileInfo.get(CHECKSUM), masterUrl, "deleteFilesInAdvance");
if (!compareResult.equal || filesToAlwaysDownloadIfNoChecksums(f, size, compareResult)) {
filesTobeDeleted.add(f);
clearedSpace += size;
} else {
/*this file will not be downloaded*/
actualSpaceReqd -= size;
}
}
}
}
if (usableDiskSpace > getApproxTotalSpaceReqd(actualSpaceReqd)) {
// after considering the files actually available locally we really don't need to do any delete
return;
}
log.info("This disk does not have enough space to download the index from leader/master. So cleaning up the local index. "
+ " This may lead to loss of data/or node if index replication fails in between");
//now we should disable searchers and index writers because this core will not have all the required files
this.clearLocalIndexFirst = true;
this.solrCore.searchEnabled = false;
this.solrCore.indexEnabled = false;
solrCore.getDirectoryFactory().doneWithDirectory(indexDir);
solrCore.deleteNonSnapshotIndexFiles(indexDirPath);
this.solrCore.closeSearcher();
assert testWait.getAsBoolean();
solrCore.getUpdateHandler().getSolrCoreState().closeIndexWriter(this.solrCore, false);
for (String f : filesTobeDeleted) {
try {
indexDir.deleteFile(f);
} catch (FileNotFoundException | NoSuchFileException e) {
//no problem , it was deleted by someone else
}
}
}
static boolean filesToAlwaysDownloadIfNoChecksums(String filename,
long size, CompareResult compareResult) {
// without checksums to compare, we always download .si, .liv, segments_N,
// and any very small files
return !compareResult.checkSummed && (filename.endsWith(".si") || filename.endsWith(".liv")
|| filename.startsWith("segments_") || size < _10K);
}
protected static class CompareResult {
boolean equal = false;
boolean checkSummed = false;
}
protected static CompareResult compareFile(Directory indexDir, String filename, Long backupIndexFileLen, Long backupIndexFileChecksum, String masterUrl, String context) {
CompareResult compareResult = new CompareResult();
try {
try (final IndexInput indexInput = indexDir.openInput(filename, IOContext.READONCE)) {
long indexFileLen = indexInput.length();
long indexFileChecksum = 0;
if (backupIndexFileChecksum != null) {
try {
indexFileChecksum = CodecUtil.retrieveChecksum(indexInput);
compareResult.checkSummed = true;
} catch (CorruptIndexException e) {
log.warn("Could not retrieve checksum from file.", e.getMessage());
compareResult.equal = false;
return compareResult;
} catch (Exception e) {
log.warn("Could not retrieve checksum from file.", e);
compareResult.equal = false;
}
}
if (!compareResult.checkSummed) {
//
log.info("we don't have checksums to compare");
if (indexFileLen == backupIndexFileLen) {
compareResult.equal = true;
return compareResult;
} else {
log.info(
"No checksum file length compare did not match, File={} Context={}. Expected length is {} and actual length is {} from={}", filename, context, backupIndexFileLen, indexFileLen, masterUrl);
compareResult.equal = false;
return compareResult;
}
}
// we have checksums to compare
if (indexFileLen == backupIndexFileLen && indexFileChecksum == backupIndexFileChecksum) {
compareResult.equal = true;
return compareResult;
} else {
log.info("Compare File {} did not match. expected checksum is {} and actual is checksum {}. " +
"expected length is {} and actual length is {}"
, filename, backupIndexFileChecksum, indexFileChecksum,
backupIndexFileLen, indexFileLen);
compareResult.equal = false;
return compareResult;
}
}
} catch (NoSuchFileException | FileNotFoundException e) {
compareResult.equal = false;
return compareResult;
} catch (IOException e) {
log.error("Could not read file {}. Downloading it again", filename, e);
compareResult.equal = false;
return compareResult;
}
}
/** Returns true if the file exists (can be opened), false
* if it cannot be opened, and (unlike Java's
* File.exists) throws IOException if there's some
* unexpected error. */
private static boolean slowFileExists(Directory dir, String fileName) throws IOException {
try (IndexInput input = dir.openInput(fileName, IOContext.READONCE)) {
return true;
} catch (NoSuchFileException | FileNotFoundException e) {
return false;
}
}
/**
* All the files which are common between master and slave must have same size and same checksum else we assume
* they are not compatible (stale).
*
* @return true if the index stale and we need to download a fresh copy, false otherwise.
* @throws IOException if low level io error
*/
private boolean isIndexStale(Directory dir) throws IOException {
for (Map<String, Object> file : filesToDownload) {
String filename = (String) file.get(NAME);
Long length = (Long) file.get(SIZE);
Long checksum = (Long) file.get(CHECKSUM);
if (slowFileExists(dir, filename)) {
if (checksum != null) {
if (!(compareFile(dir, filename, length, checksum, masterUrl, "isIndexStale").equal)) {
// file exists and size or checksum is different, therefore we must download it again
log.info("Index is stale using checksums");
return true;
}
} else {
if (length != dir.fileLength(filename)) {
log.warn("File {} did not match on stale index check. expected length is {} and actual length is {}",
filename, length, dir.fileLength(filename));
log.info("Index is stale using file lengths");
return true;
}
}
}
}
log.info("Index is not stale");
return false;
}
// special hack to work with FilterDirectory
protected Directory getBaseDir(Directory dir) {
Directory baseDir = dir;
while (baseDir instanceof FilterDirectory) {
baseDir = ((FilterDirectory) baseDir).getDelegate();
}
return baseDir;
}
/**
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure
* <p/>
*/
private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname) {
boolean success = false;
try {
if (log.isDebugEnabled()) {
log.debug("Moving file: {} size={}", fname, tmpIdxDir.fileLength(fname));
}
if (slowFileExists(indexDir, fname)) {
log.warn("Cannot complete replication attempt because file already exists: {}", fname);
// we fail - we downloaded the files we need, if we can't move one in, we can't
// count on the correct index
return false;
}
} catch (IOException e) {
SolrException.log(log, "could not check if a file exists", e);
return false;
}
try {
solrCore.getDirectoryFactory().move(tmpIdxDir, indexDir, fname, DirectoryFactory.IOCONTEXT_NO_CACHE);
success = true;
} catch (IOException e) {
SolrException.log(log, "Could not move file", e);
}
return success;
}
/**
* Copy all index files from the temp index dir to the actual index. The segments_N file is copied last.
*/
private boolean moveIndexFiles(Directory tmpIdxDir, Directory indexDir) {
if (log.isDebugEnabled()) {
try {
if (log.isInfoEnabled()) {
log.info("From dir files: {}", Arrays.asList(tmpIdxDir.listAll()));
log.info("To dir files: {}", Arrays.asList(indexDir.listAll())); //logOk
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
String segmentsFile = null;
synchronized (filesToDownload) {
for (Map<String,Object> f : filesDownloaded) {
String fname = (String) f.get(NAME);
// the segments file must be copied last
// or else if there is a failure in between the
// index will be corrupted
if (fname.startsWith("segments_")) {
//The segments file must be copied in the end
//Otherwise , if the copy fails index ends up corrupted
segmentsFile = fname;
continue;
}
if (!moveAFile(tmpIdxDir, indexDir, fname)) return false;
}
//copy the segments file last
if (segmentsFile != null) {
if (!moveAFile(tmpIdxDir, indexDir, segmentsFile)) return false;
}
}
return true;
}
/**
* Make file list
*/
private List<File> makeTmpConfDirFileList(File dir, List<File> fileList) {
File[] files = dir.listFiles();
for (File file : files) {
if (file.isFile()) {
fileList.add(file);
} else if (file.isDirectory()) {
fileList = makeTmpConfDirFileList(file, fileList);
}
}
return fileList;
}
/**
* The conf files are copied to the tmp dir to the conf dir. A backup of the old file is maintained
*/
private void copyTmpConfFiles2Conf(File tmpconfDir) {
boolean status = false;
File confDir = new File(solrCore.getResourceLoader().getConfigDir());
for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<>())) {
File oldFile = new File(confDir, file.getPath().substring(tmpconfDir.getPath().length(), file.getPath().length()));
if (!oldFile.getParentFile().exists()) {
status = oldFile.getParentFile().mkdirs();
if (!status) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Unable to mkdirs: " + oldFile.getParentFile());
}
}
if (oldFile.exists()) {
File backupFile = new File(oldFile.getPath() + "." + getDateAsStr(new Date(oldFile.lastModified())));
if (!backupFile.getParentFile().exists()) {
status = backupFile.getParentFile().mkdirs();
if (!status) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Unable to mkdirs: " + backupFile.getParentFile());
}
}
status = oldFile.renameTo(backupFile);
if (!status) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to rename: " + oldFile + " to: " + backupFile);
}
}
status = file.renameTo(oldFile);
if (!status) {
throw new SolrException(ErrorCode.SERVER_ERROR,
"Unable to rename: " + file + " to: " + oldFile);
}
}
}
/**
* The tlog files are moved from the tmp dir to the tlog dir as an atomic filesystem operation.
* A backup of the old directory is maintained. If the directory move fails, it will try to revert back the original
* tlog directory.
*/
private boolean copyTmpTlogFiles2Tlog(File tmpTlogDir) {
Path tlogDir = FileSystems.getDefault().getPath(solrCore.getUpdateHandler().getUpdateLog().getLogDir());
Path backupTlogDir = FileSystems.getDefault().getPath(tlogDir.getParent().toAbsolutePath().toString(), tmpTlogDir.getName());
try {
Files.move(tlogDir, backupTlogDir, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
SolrException.log(log, "Unable to rename: " + tlogDir + " to: " + backupTlogDir, e);
return false;
}
Path src = FileSystems.getDefault().getPath(backupTlogDir.toAbsolutePath().toString(), tmpTlogDir.getName());
try {
Files.move(src, tlogDir, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e) {
SolrException.log(log, "Unable to rename: " + src + " to: " + tlogDir, e);
// In case of error, try to revert back the original tlog directory
try {
Files.move(backupTlogDir, tlogDir, StandardCopyOption.ATOMIC_MOVE);
} catch (IOException e2) {
// bad, we were not able to revert back the original tlog directory
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to rename: " + backupTlogDir + " to: " + tlogDir);
}
return false;
}
return true;
}
private String getDateAsStr(Date d) {
return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
}
private final Map<String, FileInfo> confFileInfoCache = new HashMap<>();
/**
* The local conf files are compared with the conf files in the master. If they are same (by checksum) do not copy.
*
* @param confFilesToDownload The list of files obtained from master
*
* @return a list of configuration files which have changed on the master and need to be downloaded.
*/
@SuppressWarnings({"unchecked"})
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) {
if (confFilesToDownload == null || confFilesToDownload.isEmpty())
return Collections.EMPTY_LIST;
//build a map with alias/name as the key
@SuppressWarnings({"rawtypes"})
Map<String, Map<String, Object>> nameVsFile = new HashMap<>();
@SuppressWarnings({"rawtypes"})
NamedList names = new NamedList();
for (Map<String, Object> map : confFilesToDownload) {
//if alias is present that is the name the file may have in the slave
String name = (String) (map.get(ALIAS) == null ? map.get(NAME) : map.get(ALIAS));
nameVsFile.put(name, map);
names.add(name, null);
}
//get the details of the local conf files with the same alias/name
List<Map<String, Object>> localFilesInfo = replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache);
//compare their size/checksum to see if
for (Map<String, Object> fileInfo : localFilesInfo) {
String name = (String) fileInfo.get(NAME);
Map<String, Object> m = nameVsFile.get(name);
if (m == null) continue; // the file is not even present locally (so must be downloaded)
if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) {
nameVsFile.remove(name); //checksums are same so the file need not be downloaded
}
}
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
}
/**
* This simulates File.delete exception-wise, since this class has some strange behavior with it.
* The only difference is it returns null on success, throws SecurityException on SecurityException,
* otherwise returns Throwable preventing deletion (instead of false), for additional information.
*/
static Throwable delete(File file) {
try {
Files.delete(file.toPath());
return null;
} catch (SecurityException e) {
throw e;
} catch (Throwable other) {
return other;
}
}
static boolean delTree(File dir) {
while (Files.exists(dir.toPath())) {
try {
Files.walk(dir.toPath()).sorted(Comparator.reverseOrder()).forEach(new CoreContainer.FileConsumer());
} catch (NoSuchFileException | UncheckedIOException e) {
} catch (IOException e) {
log.warn("Unable to delete directory : {}", dir, e);
return false;
}
}
return true;
}
/**
* Stops the ongoing fetch
*/
void abortFetch() {
stop = true;
abort = true;
fileFetchRequests.forEach((s, cancellable) -> {
if (cancellable != null) {
cancellable.cancel();
}
});
fileFetchRequests.clear();
}
@SuppressForbidden(reason = "Need currentTimeMillis for debugging/stats")
private void markReplicationStart() {
replicationTimer = new RTimer();
replicationStartTimeStamp = System.nanoTime();
}
private void markReplicationStop() {
replicationStartTimeStamp = 0;
replicationTimer = null;
}
Date getReplicationStartTimeStamp() {
return new Date(TimeUnit.MILLISECONDS.convert(replicationStartTimeStamp, TimeUnit.NANOSECONDS));
}
long getReplicationTimeElapsed() {
long timeElapsed = 0;
if (replicationStartTimeStamp > 0)
timeElapsed = TimeUnit.SECONDS.convert((long) replicationTimer.getTime(), TimeUnit.MILLISECONDS);
return timeElapsed;
}
@SuppressWarnings({"unchecked"})
List<Map<String, Object>> getConfFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload;
//create a new instance. or else iterator may fail
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
}
@SuppressWarnings({"unchecked"})
List<Map<String, Object>> getConfFilesDownloaded() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesDownloaded;
// NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
}
@SuppressWarnings({"unchecked"})
List<Map<String, Object>> getFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = filesToDownload;
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
}
@SuppressWarnings({"unchecked"})
List<Map<String, Object>> getFilesDownloaded() {
List<Map<String, Object>> tmp = filesDownloaded;
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<>(tmp);
}
// TODO: currently does not reflect conf files
Map<String, Object> getCurrentFile() {
Map<String, Object> tmp = currentFile;
DirectoryFileFetcher tmpFileFetcher = dirFileFetcher;
if (tmp == null)
return null;
tmp = new HashMap<>(tmp);
if (tmpFileFetcher != null)
tmp.put("bytesDownloaded", tmpFileFetcher.getBytesDownloaded());
return tmp;
}
private static class ReplicationHandlerException extends InterruptedException {
public ReplicationHandlerException(String message) {
super(message);
}
}
private interface FileInterface {
public void sync() throws IOException;
public void write(byte[] buf, int packetSize) throws IOException;
public void close() throws Exception;
public void delete() throws Exception;
}
/**
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler
*/
private class FileFetcher {
private final FileInterface file;
protected boolean includeChecksum = false;
private final String fileName;
private final String saveAs;
private final String solrParamOutput;
private final Long indexGen;
private final long size;
private long bytesDownloaded = 0;
private byte[] buf;
private final Checksum checksum;
private int errorCount = 0;
FileFetcher(FileInterface file, Map<String, Object> fileDetails, String saveAs,
String solrParamOutput, long latestGen) throws IOException {
this.file = file;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
buf = new byte[(int)Math.min(this.size, ReplicationHandler.PACKET_SZ)];
this.solrParamOutput = solrParamOutput;
this.saveAs = saveAs;
indexGen = latestGen;
if (includeChecksum) {
checksum = new Adler32();
} else {
checksum = null;
}
}
public long getBytesDownloaded() {
return bytesDownloaded;
}
/**
* The main method which downloads file
*/
public void fetchFile() throws Exception {
log.info("fetch file {} from {}", file, masterUrl);
bytesDownloaded = 0;
try {
fetch();
} catch(CheckSumFailException e) {
throw e;
} catch(Exception e) {
SolrException.log(IndexFetcher.log, "Error fetching file", e);
throw e;
}
}
private void fetch() throws Exception {
try {
while (true && !stop && !abort) {
final FastInputStream is = getStream();
int result;
try {
//fetch packets one by one in a single request
result = fetchPackets(is);
if (result == 0) {
return;
}
//if there is an error continue. But continue from the point where it got broken
} finally {
if (is != null) {
while (is.read() != -1) {
}
}
IOUtils.closeQuietly(is); // stream is close shield protected
}
}
} catch (Exception e) {
log.error("Problem fetching file", e);
throw e;
} finally {
cleanup(null);
//if cleanup succeeds . The file is downloaded fully
fsyncServiceFuture = fsyncService.submit(() -> {
try {
file.close();
} catch (Exception e) {
fsyncException = e;
}
});
}
}
private int fetchPackets(FastInputStream fis) throws Exception {
try {
while (true) {
if (abort) {
throw new ReplicationHandlerException("User aborted replication");
}
if (stop) {
throw new ReplicationHandlerException("Index fetch stopped");
}
long checkSumServer = -1;
//read the size of the packet
int packetSize = fis.readInt();
if (packetSize <= 0) {
log.warn("No content received for file: {}", fileName);
return NO_CONTENT;
}
//TODO consider recoding the remaining logic to not use/need buf[]; instead use the internal buffer of fis
if (buf.length < packetSize) {
//This shouldn't happen since sender should use PACKET_SZ and we init the buf based on that too
buf = new byte[packetSize];
}
if (checksum != null) {
//read the checksum
checkSumServer = fis.readLong();
}
//then read the packet of bytes
fis.readFully(buf, 0, packetSize);
//compare the checksum as sent from the master
if (includeChecksum) {
checksum.reset();
checksum.update(buf, 0, packetSize);
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
log.error("Checksum not matched between client and server for file: {} {} {}", fileName, checkSumClient, checkSumServer);
//if checksum is wrong it is a problem return (there doesn't seem to be a retry in this case.)
stop = true;
throw new CheckSumFailException();
}
}
//if everything is fine, write down the packet to the file
file.write(buf, packetSize);
bytesDownloaded += packetSize;
log.info("Fetched and wrote {} bytes of file={} from replica={}", bytesDownloaded, fileName, masterUrl);
//errorCount is always set to zero after a successful packet
errorCount = 0;
if (bytesDownloaded >= size) {
return 0;
} else {
return 1;
}
}
} catch (CheckSumFailException e) {
throw e;
} catch (ReplicationHandlerException e) {
log.error("Exception fetching files", e);
throw e;
} catch (Exception e) {
log.warn("Error in fetching file: {} (downloaded {} of {} bytes)",
fileName, bytesDownloaded, size, e);
//for any failure, increment the error count
errorCount++;
//if it fails for the same packet for MAX_RETRIES fail and come out
if (errorCount > MAX_RETRIES) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Failed to fetch file: " + fileName +
" (downloaded " + bytesDownloaded + " of " + size + " bytes" +
", error count: " + errorCount + " > " + MAX_RETRIES + ")", e);
}
return ERR;
}
}
/**
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
* other wise it fails. So read everything as bytes and then extract an integer out of it
*/
private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
| ((b[2] & 0xff) << 8) | (b[3] & 0xff));
}
/**
* Same as above but to read longs from a byte array
*/
private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
}
/**
* cleanup everything
* @param ex exception if failed
*/
private void cleanup(Exception ex) {
try {
file.close();
} catch (Exception e) {/* no-op */
log.error("Error closing file: {}", this.saveAs, e);
}
if (bytesDownloaded != size) {
log.warn("bytesDownloaded != size bytesDownloaded={} size={}", bytesDownloaded, size);
//if the download is not complete then
//delete the file being downloaded
try {
file.delete();
} catch (Exception e) {
log.error("Error deleting file: {}", this.saveAs, e);
}
//if the failure is due to a user abort it is returned normally else an exception is thrown
SolrException exp = new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Unable to download " + fileName + " completely. Downloaded " + bytesDownloaded + "!=" + size);
if (ex != null) {
ex.addSuppressed(exp);
} else {
throw exp;
}
}
}
/**
* Open a new stream using HttpClient
*/
private FastInputStream getStream() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
// //the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, ReplicationHandler.PATH);
//add the version to download. This is used to reserve the download
params.set(solrParamOutput, fileName);
if (useInternalCompression) {
params.set(COMPRESSION, "true");
}
//use checksum
if (this.includeChecksum) {
params.set(CHECKSUM, true);
}
//wt=filestream this is a custom protocol
params.set(CommonParams.WT, FILE_STREAM);
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
if (bytesDownloaded > 0) {
params.set(OFFSET, Long.toString(bytesDownloaded));
}
@SuppressWarnings({"rawtypes"})
NamedList response;
InputStream is = null;
try {
QueryRequest req = new QueryRequest(params);
req.setBasePath(masterUrl);
req.setMethod(SolrRequest.METHOD.POST);
req.setResponseParser(new InputStreamResponseParser(FILE_STREAM));
//response =
Cancellable resp = solrClient.asyncRequestRaw(req, null, new AsyncListener<>() {
@Override
public void onSuccess(InputStream is) {
}
@Override
public void onFailure(Throwable throwable, int code) {
log.error("Exception fetching file", throwable);
}
});
fileFetchRequests.put(fileName, resp);
is = resp.getStream();
if (is == null) {
throw new SolrException(ErrorCode.SERVER_ERROR, "Did not find inputstream in response");
}
if (useInternalCompression) {
is = new InflaterInputStream(is);
}
return new FastInputStream(is);
} catch (Exception e) {
//close stream on error
try {
while (is.read() != -1) {
}
} catch (Exception e1) {
// quietly
}
IOUtils.closeQuietly(is); // stream is close shield protected
throw new IOException("Could not download file '" + fileName + "'", e);
}
}
}
private static class DirectoryFile implements FileInterface {
private final String saveAs;
private Directory copy2Dir;
private IndexOutput outStream;
DirectoryFile(Directory tmpIndexDir, String saveAs) throws IOException {
this.saveAs = saveAs;
this.copy2Dir = tmpIndexDir;
outStream = copy2Dir.createOutput(this.saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
}
public void sync() throws IOException {
copy2Dir.sync(Collections.singleton(saveAs));
}
public void write(byte[] buf, int packetSize) throws IOException {
outStream.writeBytes(buf, 0, packetSize);
}
public void close() throws Exception {
outStream.close();
}
public void delete() throws Exception {
copy2Dir.deleteFile(saveAs);
}
}
public class DirectoryFileFetcher extends FileFetcher {
DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
String solrParamOutput, long latestGen) throws IOException {
super(new DirectoryFile(tmpIndexDir, saveAs), fileDetails, saveAs, solrParamOutput, latestGen);
}
}
private static class LocalFsFile implements FileInterface {
private File copy2Dir;
FileChannel fileChannel;
private FileOutputStream fileOutputStream;
File file;
LocalFsFile(File dir, String saveAs) throws IOException {
this.copy2Dir = dir;
this.file = new File(copy2Dir, saveAs);
File parentDir = this.file.getParentFile();
if( ! parentDir.exists() ){
if ( ! parentDir.mkdirs() ) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Failed to create (sub)directory for file: " + saveAs);
}
}
this.fileOutputStream = new FileOutputStream(file);
this.fileChannel = this.fileOutputStream.getChannel();
}
public void sync() throws IOException {
FileUtils.sync(file);
}
public void write(byte[] buf, int packetSize) throws IOException {
fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
}
public void close() throws Exception {
//close the FileOutputStream (which also closes the Channel)
fileOutputStream.close();
}
public void delete() throws Exception {
Files.delete(file.toPath());
}
}
public class LocalFsFileFetcher extends FileFetcher {
LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
String solrParamOutput, long latestGen) throws IOException {
super(new LocalFsFile(dir, saveAs), fileDetails, saveAs, solrParamOutput, latestGen);
}
}
@SuppressWarnings({"rawtypes"})
NamedList getDetails() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_DETAILS);
params.set("slave", false);
params.set(CommonParams.QT, ReplicationHandler.PATH);
// TODO use shardhandler
QueryRequest request = new QueryRequest(params);
request.setBasePath(masterUrl);
return solrClient.request(request);
}
public void destroy() {
abortFetch();
}
String getMasterUrl() {
return masterUrl;
}
private static final int MAX_RETRIES = 2;
private static final int NO_CONTENT = 0;
private static final int ERR = 2;
public static final String REPLICATION_PROPERTIES = "replication.properties";
static final String INDEX_REPLICATED_AT = "indexReplicatedAt";
static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";
static final String CLEARED_LOCAL_IDX = "clearedLocalIndexFirst";
static final String CONF_FILES_REPLICATED = "confFilesReplicated";
static final String CONF_FILES_REPLICATED_AT = "confFilesReplicatedAt";
static final String TIMES_CONFIG_REPLICATED = "timesConfigReplicated";
static final String LAST_CYCLE_BYTES_DOWNLOADED = "lastCycleBytesDownloaded";
static final String TIMES_FAILED = "timesFailed";
static final String REPLICATION_FAILED_AT = "replicationFailedAt";
static final String PREVIOUS_CYCLE_TIME_TAKEN = "previousCycleTimeInSeconds";
static final String INDEX_REPLICATED_AT_LIST = "indexReplicatedAtList";
static final String REPLICATION_FAILED_AT_LIST = "replicationFailedAtList";
}