blob: 2717473504a38ba6e6cd1795cd34176d4ef3b2ad [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.solr.handler;
import static org.apache.solr.handler.ReplicationHandler.ALIAS;
import static org.apache.solr.handler.ReplicationHandler.CHECKSUM;
import static org.apache.solr.handler.ReplicationHandler.CMD_DETAILS;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE;
import static org.apache.solr.handler.ReplicationHandler.CMD_GET_FILE_LIST;
import static org.apache.solr.handler.ReplicationHandler.CMD_INDEX_VERSION;
import static org.apache.solr.handler.ReplicationHandler.COMMAND;
import static org.apache.solr.handler.ReplicationHandler.COMPRESSION;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILES;
import static org.apache.solr.handler.ReplicationHandler.CONF_FILE_SHORT;
import static org.apache.solr.handler.ReplicationHandler.EXTERNAL;
import static org.apache.solr.handler.ReplicationHandler.FILE;
import static org.apache.solr.handler.ReplicationHandler.FILE_STREAM;
import static org.apache.solr.handler.ReplicationHandler.GENERATION;
import static org.apache.solr.handler.ReplicationHandler.INTERNAL;
import static org.apache.solr.handler.ReplicationHandler.MASTER_URL;
import static org.apache.solr.handler.ReplicationHandler.NAME;
import static org.apache.solr.handler.ReplicationHandler.OFFSET;
import static org.apache.solr.handler.ReplicationHandler.SIZE;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import java.util.zip.InflaterInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.HttpClient;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IndexInput;
import org.apache.lucene.store.IndexOutput;
import org.apache.solr.client.solrj.SolrServerException;
import org.apache.solr.client.solrj.impl.HttpClientUtil;
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.apache.solr.client.solrj.request.QueryRequest;
import org.apache.solr.common.SolrException;
import org.apache.solr.common.params.CommonParams;
import org.apache.solr.common.params.ModifiableSolrParams;
import org.apache.solr.common.params.SolrParams;
import org.apache.solr.common.util.ExecutorUtil;
import org.apache.solr.common.util.FastInputStream;
import org.apache.solr.common.util.NamedList;
import org.apache.solr.core.DirectoryFactory;
import org.apache.solr.core.DirectoryFactory.DirContext;
import org.apache.solr.core.IndexDeletionPolicyWrapper;
import org.apache.solr.core.SolrCore;
import org.apache.solr.handler.ReplicationHandler.FileInfo;
import org.apache.solr.request.LocalSolrQueryRequest;
import org.apache.solr.request.SolrQueryRequest;
import org.apache.solr.search.SolrIndexSearcher;
import org.apache.solr.update.CommitUpdateCommand;
import org.apache.solr.util.DefaultSolrThreadFactory;
import org.apache.solr.util.FileUtils;
import org.apache.solr.util.PropertiesInputStream;
import org.apache.solr.util.PropertiesOutputStream;
import org.apache.solr.util.RefCounted;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p/> Provides functionality of downloading changed index files as well as config files and a timer for scheduling fetches from the
* master. </p>
*
*
* @since solr 1.4
*/
public class SnapPuller {
public static final String INDEX_PROPERTIES = "index.properties";
private static final Logger LOG = LoggerFactory.getLogger(SnapPuller.class.getName());
private final String masterUrl;
private final ReplicationHandler replicationHandler;
private final Integer pollInterval;
private String pollIntervalStr;
private ScheduledExecutorService executorService;
private volatile long executorStartTime;
private volatile long replicationStartTime;
private final SolrCore solrCore;
private volatile List<Map<String, Object>> filesToDownload;
private volatile List<Map<String, Object>> confFilesToDownload;
private volatile List<Map<String, Object>> filesDownloaded;
private volatile List<Map<String, Object>> confFilesDownloaded;
private volatile Map<String, Object> currentFile;
private volatile DirectoryFileFetcher dirFileFetcher;
private volatile LocalFsFileFetcher localFileFetcher;
private volatile ExecutorService fsyncService;
private volatile boolean stop = false;
private boolean useInternal = false;
private boolean useExternal = false;
/**
* Disable the timer task for polling
*/
private AtomicBoolean pollDisabled = new AtomicBoolean(false);
// HttpClient shared by all cores (used if timeout is not specified for a core)
private static HttpClient client;
// HttpClient for this instance if connectionTimeout or readTimeout has been specified
private final HttpClient myHttpClient;
private static synchronized HttpClient createHttpClient(String connTimeout, String readTimeout, String httpBasicAuthUser, String httpBasicAuthPassword, boolean useCompression) {
if (connTimeout == null && readTimeout == null && client != null) return client;
final ModifiableSolrParams httpClientParams = new ModifiableSolrParams();
httpClientParams.set(HttpClientUtil.PROP_CONNECTION_TIMEOUT, connTimeout != null ? connTimeout : "5000");
httpClientParams.set(HttpClientUtil.PROP_SO_TIMEOUT, readTimeout != null ? readTimeout : "20000");
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_USER, httpBasicAuthUser);
httpClientParams.set(HttpClientUtil.PROP_BASIC_AUTH_PASS, httpBasicAuthPassword);
httpClientParams.set(HttpClientUtil.PROP_ALLOW_COMPRESSION, useCompression);
// Keeping a very high number so that if you have a large number of cores
// no requests are kept waiting for an idle connection.
httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS, 10000);
httpClientParams.set(HttpClientUtil.PROP_MAX_CONNECTIONS_PER_HOST, 10000);
HttpClient httpClient = HttpClientUtil.createClient(httpClientParams);
if (client == null && connTimeout == null && readTimeout == null) client = httpClient;
return httpClient;
}
public SnapPuller(final NamedList initArgs, final ReplicationHandler handler, final SolrCore sc) {
solrCore = sc;
final SolrParams params = SolrParams.toSolrParams(initArgs);
String masterUrl = (String) initArgs.get(MASTER_URL);
if (masterUrl == null)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"'masterUrl' is required for a slave");
if (masterUrl.endsWith("/replication")) {
masterUrl = masterUrl.substring(0, masterUrl.length()-12);
LOG.warn("'masterUrl' must be specified without the /replication suffix");
}
this.masterUrl = masterUrl;
this.replicationHandler = handler;
pollIntervalStr = (String) initArgs.get(POLL_INTERVAL);
pollInterval = readInterval(pollIntervalStr);
String compress = (String) initArgs.get(COMPRESSION);
useInternal = INTERNAL.equals(compress);
useExternal = EXTERNAL.equals(compress);
String connTimeout = (String) initArgs.get(HttpClientUtil.PROP_CONNECTION_TIMEOUT);
String readTimeout = (String) initArgs.get(HttpClientUtil.PROP_SO_TIMEOUT);
String httpBasicAuthUser = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_USER);
String httpBasicAuthPassword = (String) initArgs.get(HttpClientUtil.PROP_BASIC_AUTH_PASS);
myHttpClient = createHttpClient(connTimeout, readTimeout, httpBasicAuthUser, httpBasicAuthPassword, useExternal);
if (pollInterval != null && pollInterval > 0) {
startExecutorService();
} else {
LOG.info(" No value set for 'pollInterval'. Timer Task not started.");
}
}
private void startExecutorService() {
Runnable task = new Runnable() {
@Override
public void run() {
if (pollDisabled.get()) {
LOG.info("Poll disabled");
return;
}
try {
LOG.debug("Polling for index modifications");
executorStartTime = System.currentTimeMillis();
replicationHandler.doFetch(null, false);
} catch (Exception e) {
LOG.error("Exception in fetching index", e);
}
}
};
executorService = Executors.newSingleThreadScheduledExecutor(
new DefaultSolrThreadFactory("snapPuller"));
long initialDelay = pollInterval - (System.currentTimeMillis() % pollInterval);
executorService.scheduleAtFixedRate(task, initialDelay, pollInterval, TimeUnit.MILLISECONDS);
LOG.info("Poll Scheduled at an interval of " + pollInterval + "ms");
}
/**
* Gets the latest commit version and generation from the master
*/
@SuppressWarnings("unchecked")
NamedList getLatestVersion() throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_INDEX_VERSION);
params.set(CommonParams.WT, "javabin");
params.set(CommonParams.QT, "/replication");
QueryRequest req = new QueryRequest(params);
HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler
server.setSoTimeout(60000);
server.setConnectionTimeout(15000);
try {
return server.request(req);
} catch (SolrServerException e) {
throw new IOException(e);
}
}
/**
* Fetches the list of files in a given index commit point and updates internal list of files to download.
*/
private void fetchFileList(long gen) throws IOException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_GET_FILE_LIST);
params.set(GENERATION, String.valueOf(gen));
params.set(CommonParams.WT, "javabin");
params.set(CommonParams.QT, "/replication");
QueryRequest req = new QueryRequest(params);
HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX modify to use shardhandler
server.setSoTimeout(60000);
server.setConnectionTimeout(15000);
try {
NamedList response = server.request(req);
List<Map<String, Object>> files = (List<Map<String,Object>>) response.get(CMD_GET_FILE_LIST);
if (files != null)
filesToDownload = Collections.synchronizedList(files);
else {
filesToDownload = Collections.emptyList();
LOG.error("No files to download for index generation: "+ gen);
}
files = (List<Map<String,Object>>) response.get(CONF_FILES);
if (files != null)
confFilesToDownload = Collections.synchronizedList(files);
} catch (SolrServerException e) {
throw new IOException(e);
}
}
private boolean successfulInstall = false;
/**
* This command downloads all the necessary files from master to install a index commit point. Only changed files are
* downloaded. It also downloads the conf files (if they are modified).
*
* @param core the SolrCore
* @param forceReplication force a replication in all cases
* @return true on success, false if slave is already in sync
* @throws IOException if an exception occurs
*/
boolean fetchLatestIndex(final SolrCore core, boolean forceReplication) throws IOException, InterruptedException {
successfulInstall = false;
replicationStartTime = System.currentTimeMillis();
Directory tmpIndexDir = null;
String tmpIndex = null;
Directory indexDir = null;
String indexDirPath = null;
boolean deleteTmpIdxDir = true;
try {
//get the current 'replicateable' index version in the master
NamedList response = null;
try {
response = getLatestVersion();
} catch (Exception e) {
LOG.error("Master at: " + masterUrl + " is not available. Index fetch failed. Exception: " + e.getMessage());
return false;
}
long latestVersion = (Long) response.get(CMD_INDEX_VERSION);
long latestGeneration = (Long) response.get(GENERATION);
IndexCommit commit;
RefCounted<SolrIndexSearcher> searcherRefCounted = null;
try {
searcherRefCounted = core.getNewestSearcher(false);
if (searcherRefCounted == null) {
SolrException.log(LOG, "No open searcher found - fetch aborted");
return false;
}
commit = searcherRefCounted.get().getIndexReader().getIndexCommit();
} finally {
if (searcherRefCounted != null)
searcherRefCounted.decref();
}
if (latestVersion == 0L) {
if (forceReplication && commit.getGeneration() != 0) {
// since we won't get the files for an empty index,
// we just clear ours and commit
RefCounted<IndexWriter> iw = core.getUpdateHandler().getSolrCoreState().getIndexWriter(core);
try {
iw.get().deleteAll();
} finally {
iw.decref();
}
SolrQueryRequest req = new LocalSolrQueryRequest(core,
new ModifiableSolrParams());
core.getUpdateHandler().commit(new CommitUpdateCommand(req, false));
}
//there is nothing to be replicated
successfulInstall = true;
return true;
}
if (!forceReplication && IndexDeletionPolicyWrapper.getCommitTimestamp(commit) == latestVersion) {
//master and slave are already in sync just return
LOG.info("Slave in sync with master.");
successfulInstall = true;
return true;
}
LOG.info("Master's generation: " + latestGeneration);
LOG.info("Slave's generation: " + commit.getGeneration());
LOG.info("Starting replication process");
// get the list of files first
fetchFileList(latestGeneration);
// this can happen if the commit point is deleted before we fetch the file list.
if(filesToDownload.isEmpty()) return false;
LOG.info("Number of files in latest index in master: " + filesToDownload.size());
// Create the sync service
fsyncService = Executors.newSingleThreadExecutor(new DefaultSolrThreadFactory("fsyncService"));
// use a synchronized list because the list is read by other threads (to show details)
filesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
// if the generateion of master is older than that of the slave , it means they are not compatible to be copied
// then a new index direcory to be created and all the files need to be copied
boolean isFullCopyNeeded = IndexDeletionPolicyWrapper
.getCommitTimestamp(commit) >= latestVersion
|| commit.getGeneration() >= latestGeneration || forceReplication;
String tmpIdxDirName = "index." + new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(new Date());
tmpIndex = createTempindexDir(core, tmpIdxDirName);
tmpIndexDir = core.getDirectoryFactory().get(tmpIndex, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
// cindex dir...
indexDirPath = core.getIndexDir();
indexDir = core.getDirectoryFactory().get(indexDirPath, DirContext.DEFAULT, core.getSolrConfig().indexConfig.lockType);
try {
if (isIndexStale(indexDir)) {
isFullCopyNeeded = true;
}
LOG.info("Starting download to " + tmpIndexDir + " fullCopy=" + isFullCopyNeeded);
successfulInstall = false;
downloadIndexFiles(isFullCopyNeeded, tmpIndexDir, latestGeneration);
LOG.info("Total time taken for download : " + ((System.currentTimeMillis() - replicationStartTime) / 1000) + " secs");
Collection<Map<String, Object>> modifiedConfFiles = getModifiedConfFiles(confFilesToDownload);
if (!modifiedConfFiles.isEmpty()) {
downloadConfFiles(confFilesToDownload, latestGeneration);
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
LOG.info("removing old index directory " + indexDir);
core.getDirectoryFactory().doneWithDirectory(indexDir);
core.getDirectoryFactory().remove(indexDir);
}
}
LOG.info("Configuration files are modified, core will be reloaded");
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);//write to a file time of replication and conf files.
reloadCore();
}
} else {
terminateAndWaitFsyncService();
if (isFullCopyNeeded) {
successfulInstall = modifyIndexProps(tmpIdxDirName);
deleteTmpIdxDir = false;
} else {
successfulInstall = moveIndexFiles(tmpIndexDir, indexDir);
}
if (successfulInstall) {
logReplicationTimeAndConfFiles(modifiedConfFiles, successfulInstall);
}
}
if (successfulInstall) {
if (isFullCopyNeeded) {
// let the system know we are changing dir's and the old one
// may be closed
if (indexDir != null) {
LOG.info("removing old index directory " + indexDir);
core.getDirectoryFactory().doneWithDirectory(indexDir);
core.getDirectoryFactory().remove(indexDir);
}
}
openNewWriterAndSearcher(isFullCopyNeeded);
}
replicationStartTime = 0;
return successfulInstall;
} catch (ReplicationHandlerException e) {
LOG.error("User aborted Replication");
return false;
} catch (SolrException e) {
throw e;
} catch (InterruptedException e) {
throw new InterruptedException("Index fetch interrupted");
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR, "Index fetch failed : ", e);
}
} finally {
try {
if (!successfulInstall) {
logReplicationTimeAndConfFiles(null, successfulInstall);
}
filesToDownload = filesDownloaded = confFilesDownloaded = confFilesToDownload = null;
replicationStartTime = 0;
dirFileFetcher = null;
localFileFetcher = null;
if (fsyncService != null && !fsyncService.isShutdown()) fsyncService
.shutdownNow();
fsyncService = null;
stop = false;
fsyncException = null;
} finally {
if (deleteTmpIdxDir && tmpIndexDir != null) {
try {
core.getDirectoryFactory().doneWithDirectory(tmpIndexDir);
core.getDirectoryFactory().remove(tmpIndexDir);
} catch (IOException e) {
SolrException.log(LOG, "Error removing directory " + tmpIndexDir, e);
}
}
if (tmpIndexDir != null) {
core.getDirectoryFactory().release(tmpIndexDir);
}
if (indexDir != null) {
core.getDirectoryFactory().release(indexDir);
}
}
}
}
private volatile Exception fsyncException;
/**
* terminate the fsync service and wait for all the tasks to complete. If it is already terminated
*/
private void terminateAndWaitFsyncService() throws Exception {
if (fsyncService.isTerminated()) return;
fsyncService.shutdown();
// give a long wait say 1 hr
fsyncService.awaitTermination(3600, TimeUnit.SECONDS);
// if any fsync failed, throw that exception back
Exception fsyncExceptionCopy = fsyncException;
if (fsyncExceptionCopy != null) throw fsyncExceptionCopy;
}
/**
* Helper method to record the last replication's details so that we can show them on the statistics page across
* restarts.
* @throws IOException on IO error
*/
private void logReplicationTimeAndConfFiles(Collection<Map<String, Object>> modifiedConfFiles, boolean successfulInstall) throws IOException {
List<String> confFiles = new ArrayList<String>();
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty())
for (Map<String, Object> map1 : modifiedConfFiles)
confFiles.add((String) map1.get(NAME));
Properties props = replicationHandler.loadReplicationProperties();
long replicationTime = System.currentTimeMillis();
long replicationTimeTaken = (replicationTime - getReplicationStartTime()) / 1000;
Directory dir = null;
try {
dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
int indexCount = 1, confFilesCount = 1;
if (props.containsKey(TIMES_INDEX_REPLICATED)) {
indexCount = Integer.valueOf(props.getProperty(TIMES_INDEX_REPLICATED)) + 1;
}
StringBuffer sb = readToStringBuffer(replicationTime, props.getProperty(INDEX_REPLICATED_AT_LIST));
props.setProperty(INDEX_REPLICATED_AT_LIST, sb.toString());
props.setProperty(INDEX_REPLICATED_AT, String.valueOf(replicationTime));
props.setProperty(PREVIOUS_CYCLE_TIME_TAKEN, String.valueOf(replicationTimeTaken));
props.setProperty(TIMES_INDEX_REPLICATED, String.valueOf(indexCount));
if (modifiedConfFiles != null && !modifiedConfFiles.isEmpty()) {
props.setProperty(CONF_FILES_REPLICATED, confFiles.toString());
props.setProperty(CONF_FILES_REPLICATED_AT, String.valueOf(replicationTime));
if (props.containsKey(TIMES_CONFIG_REPLICATED)) {
confFilesCount = Integer.valueOf(props.getProperty(TIMES_CONFIG_REPLICATED)) + 1;
}
props.setProperty(TIMES_CONFIG_REPLICATED, String.valueOf(confFilesCount));
}
props.setProperty(LAST_CYCLE_BYTES_DOWNLOADED, String.valueOf(getTotalBytesDownloaded(this)));
if (!successfulInstall) {
int numFailures = 1;
if (props.containsKey(TIMES_FAILED)) {
numFailures = Integer.valueOf(props.getProperty(TIMES_FAILED)) + 1;
}
props.setProperty(TIMES_FAILED, String.valueOf(numFailures));
props.setProperty(REPLICATION_FAILED_AT, String.valueOf(replicationTime));
sb = readToStringBuffer(replicationTime, props.getProperty(REPLICATION_FAILED_AT_LIST));
props.setProperty(REPLICATION_FAILED_AT_LIST, sb.toString());
}
final IndexOutput out = dir.createOutput(REPLICATION_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
OutputStream outFile = new PropertiesOutputStream(out);
try {
props.store(outFile, "Replication details");
dir.sync(Collections.singleton(REPLICATION_PROPERTIES));
} finally {
IOUtils.closeQuietly(outFile);
}
} catch (Exception e) {
LOG.warn("Exception while updating statistics", e);
} finally {
if (dir != null) {
solrCore.getDirectoryFactory().release(dir);
}
}
}
static long getTotalBytesDownloaded(SnapPuller snappuller) {
long bytesDownloaded = 0;
//get size from list of files to download
for (Map<String, Object> file : snappuller.getFilesDownloaded()) {
bytesDownloaded += (Long) file.get(SIZE);
}
//get size from list of conf files to download
for (Map<String, Object> file : snappuller.getConfFilesDownloaded()) {
bytesDownloaded += (Long) file.get(SIZE);
}
//get size from current file being downloaded
Map<String, Object> currentFile = snappuller.getCurrentFile();
if (currentFile != null) {
if (currentFile.containsKey("bytesDownloaded")) {
bytesDownloaded += (Long) currentFile.get("bytesDownloaded");
}
}
return bytesDownloaded;
}
private StringBuffer readToStringBuffer(long replicationTime, String str) {
StringBuffer sb = new StringBuffer();
List<String> l = new ArrayList<String>();
if (str != null && str.length() != 0) {
String[] ss = str.split(",");
for (int i = 0; i < ss.length; i++) {
l.add(ss[i]);
}
}
sb.append(replicationTime);
if (!l.isEmpty()) {
for (int i = 0; i < l.size() || i < 9; i++) {
if (i == l.size() || i == 9) break;
String s = l.get(i);
sb.append(",").append(s);
}
}
return sb;
}
private void openNewWriterAndSearcher(boolean isFullCopyNeeded) throws IOException {
SolrQueryRequest req = new LocalSolrQueryRequest(solrCore,
new ModifiableSolrParams());
// reboot the writer on the new index and get a new searcher
solrCore.getUpdateHandler().newIndexWriter(isFullCopyNeeded);
RefCounted<SolrIndexSearcher> searcher = null;
IndexCommit commitPoint;
try {
Future[] waitSearcher = new Future[1];
searcher = solrCore.getSearcher(true, true, waitSearcher, true);
if (waitSearcher[0] != null) {
try {
waitSearcher[0].get();
} catch (InterruptedException e) {
SolrException.log(LOG, e);
} catch (ExecutionException e) {
SolrException.log(LOG, e);
}
}
commitPoint = searcher.get().getIndexReader().getIndexCommit();
} finally {
req.close();
if (searcher != null) {
searcher.decref();
}
}
// update the commit point in replication handler
replicationHandler.indexCommitPoint = commitPoint;
}
/**
* All the files are copied to a temp dir first
*/
private String createTempindexDir(SolrCore core, String tmpIdxDirName) {
// TODO: there should probably be a DirectoryFactory#concatPath(parent, name)
// or something
String tmpIdxDir = core.getDataDir() + tmpIdxDirName;
return tmpIdxDir;
}
private void reloadCore() {
new Thread() {
@Override
public void run() {
try {
solrCore.getCoreDescriptor().getCoreContainer().reload(solrCore.getName());
} catch (Exception e) {
LOG.error("Could not reload core ", e);
}
}
}.start();
}
private void downloadConfFiles(List<Map<String, Object>> confFilesToDownload, long latestGeneration) throws Exception {
LOG.info("Starting download of configuration files from master: " + confFilesToDownload);
confFilesDownloaded = Collections.synchronizedList(new ArrayList<Map<String, Object>>());
File tmpconfDir = new File(solrCore.getResourceLoader().getConfigDir(), "conf." + getDateAsStr(new Date()));
try {
boolean status = tmpconfDir.mkdirs();
if (!status) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Failed to create temporary config folder: " + tmpconfDir.getName());
}
for (Map<String, Object> file : confFilesToDownload) {
String saveAs = (String) (file.get(ALIAS) == null ? file.get(NAME) : file.get(ALIAS));
localFileFetcher = new LocalFsFileFetcher(tmpconfDir, file, saveAs, true, latestGeneration);
currentFile = file;
localFileFetcher.fetchFile();
confFilesDownloaded.add(new HashMap<String, Object>(file));
}
// this is called before copying the files to the original conf dir
// so that if there is an exception avoid corrupting the original files.
terminateAndWaitFsyncService();
copyTmpConfFiles2Conf(tmpconfDir);
} finally {
delTree(tmpconfDir);
}
}
/**
* Download the index files. If a new index is needed, download all the files.
*
* @param downloadCompleteIndex is it a fresh index copy
* @param tmpIndexDir the directory to which files need to be downloadeed to
* @param latestGeneration the version number
*/
private void downloadIndexFiles(boolean downloadCompleteIndex,
Directory tmpIndexDir, long latestGeneration) throws Exception {
String indexDir = solrCore.getIndexDir();
// it's okay to use null for lock factory since we know this dir will exist
Directory dir = solrCore.getDirectoryFactory().get(indexDir, DirContext.DEFAULT, solrCore.getSolrConfig().indexConfig.lockType);
try {
for (Map<String,Object> file : filesToDownload) {
if (!dir.fileExists((String) file.get(NAME)) || downloadCompleteIndex) {
dirFileFetcher = new DirectoryFileFetcher(tmpIndexDir, file,
(String) file.get(NAME), false, latestGeneration);
currentFile = file;
dirFileFetcher.fetchFile();
filesDownloaded.add(new HashMap<String,Object>(file));
} else {
LOG.info("Skipping download for " + file.get(NAME) + " because it already exists");
}
}
} finally {
solrCore.getDirectoryFactory().release(dir);
}
}
/**
* All the files which are common between master and slave must have same size else we assume they are
* not compatible (stale).
*
* @return true if the index stale and we need to download a fresh copy, false otherwise.
* @throws IOException if low level io error
*/
private boolean isIndexStale(Directory dir) throws IOException {
for (Map<String, Object> file : filesToDownload) {
if (dir.fileExists((String) file.get(NAME))
&& dir.fileLength((String) file.get(NAME)) != (Long) file.get(SIZE)) {
// file exists and size is different, therefore we must assume
// corrupted index
return true;
}
}
return false;
}
/**
* Copy a file by the File#renameTo() method. If it fails, it is considered a failure
* <p/>
*/
private boolean moveAFile(Directory tmpIdxDir, Directory indexDir, String fname, List<String> copiedfiles) {
boolean success = false;
try {
if (indexDir.fileExists(fname)) {
LOG.info("Skipping move file - it already exists:" + fname);
return true;
}
} catch (IOException e) {
SolrException.log(LOG, "could not check if a file exists", e);
return false;
}
try {
solrCore.getDirectoryFactory().move(tmpIdxDir, indexDir, fname, DirectoryFactory.IOCONTEXT_NO_CACHE);
success = true;
} catch (IOException e) {
SolrException.log(LOG, "Could not move file", e);
}
return success;
}
/**
* Copy all index files from the temp index dir to the actual index. The segments_N file is copied last.
*/
private boolean moveIndexFiles(Directory tmpIdxDir, Directory indexDir) {
String segmentsFile = null;
List<String> movedfiles = new ArrayList<String>();
for (Map<String, Object> f : filesDownloaded) {
String fname = (String) f.get(NAME);
// the segments file must be copied last
// or else if there is a failure in between the
// index will be corrupted
if (fname.startsWith("segments_")) {
//The segments file must be copied in the end
//Otherwise , if the copy fails index ends up corrupted
segmentsFile = fname;
continue;
}
if (!moveAFile(tmpIdxDir, indexDir, fname, movedfiles)) return false;
movedfiles.add(fname);
}
//copy the segments file last
if (segmentsFile != null) {
if (!moveAFile(tmpIdxDir, indexDir, segmentsFile, movedfiles)) return false;
}
return true;
}
/**
* Make file list
*/
private List<File> makeTmpConfDirFileList(File dir, List<File> fileList) {
File[] files = dir.listFiles();
for (File file : files) {
if (file.isFile()) {
fileList.add(file);
} else if (file.isDirectory()) {
fileList = makeTmpConfDirFileList(file, fileList);
}
}
return fileList;
}
/**
* The conf files are copied to the tmp dir to the conf dir. A backup of the old file is maintained
*/
private void copyTmpConfFiles2Conf(File tmpconfDir) {
boolean status = false;
File confDir = new File(solrCore.getResourceLoader().getConfigDir());
for (File file : makeTmpConfDirFileList(tmpconfDir, new ArrayList<File>())) {
File oldFile = new File(confDir, file.getPath().substring(tmpconfDir.getPath().length(), file.getPath().length()));
if (!oldFile.getParentFile().exists()) {
status = oldFile.getParentFile().mkdirs();
if (status) {
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to mkdirs: " + oldFile.getParentFile());
}
}
if (oldFile.exists()) {
File backupFile = new File(oldFile.getPath() + "." + getDateAsStr(new Date(oldFile.lastModified())));
if (!backupFile.getParentFile().exists()) {
status = backupFile.getParentFile().mkdirs();
if (status) {
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to mkdirs: " + backupFile.getParentFile());
}
}
status = oldFile.renameTo(backupFile);
if (!status) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to rename: " + oldFile + " to: " + backupFile);
}
}
status = file.renameTo(oldFile);
if (status) {
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to rename: " + file + " to: " + oldFile);
}
}
}
private String getDateAsStr(Date d) {
return new SimpleDateFormat(SnapShooter.DATE_FMT, Locale.ROOT).format(d);
}
/**
* If the index is stale by any chance, load index from a different dir in the data dir.
*/
private boolean modifyIndexProps(String tmpIdxDirName) {
LOG.info("New index installed. Updating index properties... index="+tmpIdxDirName);
Properties p = new Properties();
Directory dir = null;
try {
dir = solrCore.getDirectoryFactory().get(solrCore.getDataDir(), DirContext.META_DATA, solrCore.getSolrConfig().indexConfig.lockType);
if (dir.fileExists(SnapPuller.INDEX_PROPERTIES)){
final IndexInput input = dir.openInput(SnapPuller.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
final InputStream is = new PropertiesInputStream(input);
try {
p.load(is);
} catch (Exception e) {
LOG.error("Unable to load " + SnapPuller.INDEX_PROPERTIES, e);
} finally {
IOUtils.closeQuietly(is);
}
}
try {
dir.deleteFile(SnapPuller.INDEX_PROPERTIES);
} catch (IOException e) {
// no problem
}
final IndexOutput out = dir.createOutput(SnapPuller.INDEX_PROPERTIES, DirectoryFactory.IOCONTEXT_NO_CACHE);
p.put("index", tmpIdxDirName);
OutputStream os = null;
try {
os = new PropertiesOutputStream(out);
p.store(os, SnapPuller.INDEX_PROPERTIES);
dir.sync(Collections.singleton(INDEX_PROPERTIES));
} catch (Exception e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to write " + SnapPuller.INDEX_PROPERTIES, e);
} finally {
IOUtils.closeQuietly(os);
}
return true;
} catch (IOException e1) {
throw new RuntimeException(e1);
} finally {
if (dir != null) {
try {
solrCore.getDirectoryFactory().release(dir);
} catch (IOException e) {
SolrException.log(LOG, "", e);
}
}
}
}
private final Map<String, FileInfo> confFileInfoCache = new HashMap<String, FileInfo>();
/**
* The local conf files are compared with the conf files in the master. If they are same (by checksum) do not copy.
*
* @param confFilesToDownload The list of files obtained from master
*
* @return a list of configuration files which have changed on the master and need to be downloaded.
*/
private Collection<Map<String, Object>> getModifiedConfFiles(List<Map<String, Object>> confFilesToDownload) {
if (confFilesToDownload == null || confFilesToDownload.isEmpty())
return Collections.EMPTY_LIST;
//build a map with alias/name as the key
Map<String, Map<String, Object>> nameVsFile = new HashMap<String, Map<String, Object>>();
NamedList names = new NamedList();
for (Map<String, Object> map : confFilesToDownload) {
//if alias is present that is the name the file may have in the slave
String name = (String) (map.get(ALIAS) == null ? map.get(NAME) : map.get(ALIAS));
nameVsFile.put(name, map);
names.add(name, null);
}
//get the details of the local conf files with the same alias/name
List<Map<String, Object>> localFilesInfo = replicationHandler.getConfFileInfoFromCache(names, confFileInfoCache);
//compare their size/checksum to see if
for (Map<String, Object> fileInfo : localFilesInfo) {
String name = (String) fileInfo.get(NAME);
Map<String, Object> m = nameVsFile.get(name);
if (m == null) continue; // the file is not even present locally (so must be downloaded)
if (m.get(CHECKSUM).equals(fileInfo.get(CHECKSUM))) {
nameVsFile.remove(name); //checksums are same so the file need not be downloaded
}
}
return nameVsFile.isEmpty() ? Collections.EMPTY_LIST : nameVsFile.values();
}
static boolean delTree(File dir) {
boolean isSuccess = true;
File contents[] = dir.listFiles();
if (contents != null) {
for (File file : contents) {
if (file.isDirectory()) {
boolean success = delTree(file);
if (!success) {
LOG.warn("Unable to delete directory : " + file);
isSuccess = false;
}
} else {
boolean success = file.delete();
if (!success) {
LOG.warn("Unable to delete file : " + file);
isSuccess = false;
return false;
}
}
}
}
return isSuccess && dir.delete();
}
/**
* Disable periodic polling
*/
void disablePoll() {
pollDisabled.set(true);
LOG.info("inside disable poll, value of pollDisabled = " + pollDisabled);
}
/**
* Enable periodic polling
*/
void enablePoll() {
pollDisabled.set(false);
LOG.info("inside enable poll, value of pollDisabled = " + pollDisabled);
}
/**
* Stops the ongoing pull
*/
void abortPull() {
stop = true;
}
long getReplicationStartTime() {
return replicationStartTime;
}
List<Map<String, Object>> getConfFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesToDownload;
//create a new instance. or else iterator may fail
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
List<Map<String, Object>> getConfFilesDownloaded() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = confFilesDownloaded;
// NOTE: it's safe to make a copy of a SynchronizedCollection(ArrayList)
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
List<Map<String, Object>> getFilesToDownload() {
//make a copy first because it can be null later
List<Map<String, Object>> tmp = filesToDownload;
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
List<Map<String, Object>> getFilesDownloaded() {
List<Map<String, Object>> tmp = filesDownloaded;
return tmp == null ? Collections.EMPTY_LIST : new ArrayList<Map<String, Object>>(tmp);
}
// TODO: currently does not reflect conf files
Map<String, Object> getCurrentFile() {
Map<String, Object> tmp = currentFile;
DirectoryFileFetcher tmpFileFetcher = dirFileFetcher;
if (tmp == null)
return null;
tmp = new HashMap<String, Object>(tmp);
if (tmpFileFetcher != null)
tmp.put("bytesDownloaded", tmpFileFetcher.bytesDownloaded);
return tmp;
}
boolean isPollingDisabled() {
return pollDisabled.get();
}
Long getNextScheduledExecTime() {
Long nextTime = null;
if (executorStartTime > 0)
nextTime = executorStartTime + pollInterval;
return nextTime;
}
private static class ReplicationHandlerException extends InterruptedException {
public ReplicationHandlerException(String message) {
super(message);
}
}
/**
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler.DirectoryFileStream
*/
private class DirectoryFileFetcher {
boolean includeChecksum = true;
Directory copy2Dir;
String fileName;
String saveAs;
long size;
long bytesDownloaded = 0;
byte[] buf = new byte[1024 * 1024];
Checksum checksum;
int errorCount = 0;
private boolean isConf;
private boolean aborted = false;
private Long indexGen;
private IndexOutput outStream;
DirectoryFileFetcher(Directory tmpIndexDir, Map<String, Object> fileDetails, String saveAs,
boolean isConf, long latestGen) throws IOException {
this.copy2Dir = tmpIndexDir;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
this.isConf = isConf;
this.saveAs = saveAs;
indexGen = latestGen;
outStream = copy2Dir.createOutput(saveAs, DirectoryFactory.IOCONTEXT_NO_CACHE);
if (includeChecksum)
checksum = new Adler32();
}
/**
* The main method which downloads file
*/
void fetchFile() throws Exception {
try {
while (true) {
final FastInputStream is = getStream();
int result;
try {
//fetch packets one by one in a single request
result = fetchPackets(is);
if (result == 0 || result == NO_CONTENT) {
return;
}
//if there is an error continue. But continue from the point where it got broken
} finally {
IOUtils.closeQuietly(is);
}
}
} finally {
cleanup();
//if cleanup suceeds . The file is downloaded fully. do an fsync
fsyncService.submit(new Runnable(){
@Override
public void run() {
try {
copy2Dir.sync(Collections.singleton(saveAs));
} catch (IOException e) {
fsyncException = e;
}
}
});
}
}
private int fetchPackets(FastInputStream fis) throws Exception {
byte[] intbytes = new byte[4];
byte[] longbytes = new byte[8];
try {
while (true) {
if (stop) {
stop = false;
aborted = true;
throw new ReplicationHandlerException("User aborted replication");
}
long checkSumServer = -1;
fis.readFully(intbytes);
//read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
LOG.warn("No content recieved for file: " + currentFile);
return NO_CONTENT;
}
if (buf.length < packetSize)
buf = new byte[packetSize];
if (checksum != null) {
//read the checksum
fis.readFully(longbytes);
checkSumServer = readLong(longbytes);
}
//then read the packet of bytes
fis.readFully(buf, 0, packetSize);
//compare the checksum as sent from the master
if (includeChecksum) {
checksum.reset();
checksum.update(buf, 0, packetSize);
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
LOG.error("Checksum not matched between client and server for: " + currentFile);
//if checksum is wrong it is a problem return for retry
return 1;
}
}
//if everything is fine, write down the packet to the file
writeBytes(packetSize);
bytesDownloaded += packetSize;
if (bytesDownloaded >= size)
return 0;
//errorcount is always set to zero after a successful packet
errorCount = 0;
}
} catch (ReplicationHandlerException e) {
throw e;
} catch (Exception e) {
LOG.warn("Error in fetching packets ", e);
//for any failure , increment the error count
errorCount++;
//if it fails for the same pacaket for MAX_RETRIES fail and come out
if (errorCount > MAX_RETRIES) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Fetch failed for file:" + fileName, e);
}
return ERR;
}
}
protected void writeBytes(int packetSize) throws IOException {
outStream.writeBytes(buf, 0, packetSize);
}
/**
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
* other wise it fails. So read everything as bytes and then extract an integer out of it
*/
private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
| ((b[2] & 0xff) << 8) | (b[3] & 0xff));
}
/**
* Same as above but to read longs from a byte array
*/
private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
}
/**
* cleanup everything
*/
private void cleanup() {
try {
outStream.close();
} catch (Exception e) {/* noop */
LOG.error("Error closing the file stream: "+ this.saveAs ,e);
}
if (bytesDownloaded != size) {
//if the download is not complete then
//delete the file being downloaded
try {
copy2Dir.deleteFile(saveAs);
} catch (Exception e) {
LOG.error("Error deleting file in cleanup" + e.getMessage());
}
//if the failure is due to a user abort it is returned nomally else an exception is thrown
if (!aborted)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to download " + fileName + " completely. Downloaded "
+ bytesDownloaded + "!=" + size);
}
}
/**
* Open a new stream using HttpClient
*/
FastInputStream getStream() throws IOException {
HttpSolrServer s = new HttpSolrServer(masterUrl, myHttpClient, null); //XXX use shardhandler
s.setSoTimeout(60000);
s.setConnectionTimeout(15000);
ModifiableSolrParams params = new ModifiableSolrParams();
// //the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, "/replication");
//add the version to download. This is used to reserve the download
if (isConf) {
//set cf instead of file for config file
params.set(CONF_FILE_SHORT, fileName);
} else {
params.set(FILE, fileName);
}
if (useInternal) {
params.set(COMPRESSION, "true");
}
//use checksum
if (this.includeChecksum) {
params.set(CHECKSUM, true);
}
//wt=filestream this is a custom protocol
params.set(CommonParams.WT, FILE_STREAM);
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
if (bytesDownloaded > 0) {
params.set(OFFSET, Long.toString(bytesDownloaded));
}
NamedList response;
InputStream is = null;
try {
QueryRequest req = new QueryRequest(params);
response = s.request(req);
is = (InputStream) response.get("stream");
if(useInternal) {
is = new InflaterInputStream(is);
}
return new FastInputStream(is);
} catch (Throwable t) {
//close stream on error
IOUtils.closeQuietly(is);
throw new IOException("Could not download file '" + fileName + "'", t);
}
}
}
/**
* The class acts as a client for ReplicationHandler.FileStream. It understands the protocol of wt=filestream
*
* @see org.apache.solr.handler.ReplicationHandler.LocalFsFileStream
*/
private class LocalFsFileFetcher {
boolean includeChecksum = true;
private File copy2Dir;
String fileName;
String saveAs;
long size;
long bytesDownloaded = 0;
FileChannel fileChannel;
private FileOutputStream fileOutputStream;
byte[] buf = new byte[1024 * 1024];
Checksum checksum;
File file;
int errorCount = 0;
private boolean isConf;
private boolean aborted = false;
private Long indexGen;
// TODO: could do more code sharing with DirectoryFileFetcher
LocalFsFileFetcher(File dir, Map<String, Object> fileDetails, String saveAs,
boolean isConf, long latestGen) throws IOException {
this.copy2Dir = dir;
this.fileName = (String) fileDetails.get(NAME);
this.size = (Long) fileDetails.get(SIZE);
this.isConf = isConf;
this.saveAs = saveAs;
indexGen = latestGen;
this.file = new File(copy2Dir, saveAs);
File parentDir = this.file.getParentFile();
if( ! parentDir.exists() ){
if ( ! parentDir.mkdirs() ) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Failed to create (sub)directory for file: " + saveAs);
}
}
this.fileOutputStream = new FileOutputStream(file);
this.fileChannel = this.fileOutputStream.getChannel();
if (includeChecksum)
checksum = new Adler32();
}
/**
* The main method which downloads file
*/
void fetchFile() throws Exception {
try {
while (true) {
final FastInputStream is = getStream();
int result;
try {
//fetch packets one by one in a single request
result = fetchPackets(is);
if (result == 0 || result == NO_CONTENT) {
return;
}
//if there is an error continue. But continue from the point where it got broken
} finally {
IOUtils.closeQuietly(is);
}
}
} finally {
cleanup();
//if cleanup suceeds . The file is downloaded fully. do an fsync
fsyncService.submit(new Runnable(){
@Override
public void run() {
try {
FileUtils.sync(file);
} catch (IOException e) {
fsyncException = e;
}
}
});
}
}
private int fetchPackets(FastInputStream fis) throws Exception {
byte[] intbytes = new byte[4];
byte[] longbytes = new byte[8];
try {
while (true) {
if (stop) {
stop = false;
aborted = true;
throw new ReplicationHandlerException("User aborted replication");
}
long checkSumServer = -1;
fis.readFully(intbytes);
//read the size of the packet
int packetSize = readInt(intbytes);
if (packetSize <= 0) {
LOG.warn("No content recieved for file: " + currentFile);
return NO_CONTENT;
}
if (buf.length < packetSize)
buf = new byte[packetSize];
if (checksum != null) {
//read the checksum
fis.readFully(longbytes);
checkSumServer = readLong(longbytes);
}
//then read the packet of bytes
fis.readFully(buf, 0, packetSize);
//compare the checksum as sent from the master
if (includeChecksum) {
checksum.reset();
checksum.update(buf, 0, packetSize);
long checkSumClient = checksum.getValue();
if (checkSumClient != checkSumServer) {
LOG.error("Checksum not matched between client and server for: " + currentFile);
//if checksum is wrong it is a problem return for retry
return 1;
}
}
//if everything is fine, write down the packet to the file
fileChannel.write(ByteBuffer.wrap(buf, 0, packetSize));
bytesDownloaded += packetSize;
if (bytesDownloaded >= size)
return 0;
//errorcount is always set to zero after a successful packet
errorCount = 0;
}
} catch (ReplicationHandlerException e) {
throw e;
} catch (Exception e) {
LOG.warn("Error in fetching packets ", e);
//for any failure , increment the error count
errorCount++;
//if it fails for the same pacaket for MAX_RETRIES fail and come out
if (errorCount > MAX_RETRIES) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Fetch failed for file:" + fileName, e);
}
return ERR;
}
}
/**
* The webcontainer flushes the data only after it fills the buffer size. So, all data has to be read as readFully()
* other wise it fails. So read everything as bytes and then extract an integer out of it
*/
private int readInt(byte[] b) {
return (((b[0] & 0xff) << 24) | ((b[1] & 0xff) << 16)
| ((b[2] & 0xff) << 8) | (b[3] & 0xff));
}
/**
* Same as above but to read longs from a byte array
*/
private long readLong(byte[] b) {
return (((long) (b[0] & 0xff)) << 56) | (((long) (b[1] & 0xff)) << 48)
| (((long) (b[2] & 0xff)) << 40) | (((long) (b[3] & 0xff)) << 32)
| (((long) (b[4] & 0xff)) << 24) | ((b[5] & 0xff) << 16)
| ((b[6] & 0xff) << 8) | ((b[7] & 0xff));
}
/**
* cleanup everything
*/
private void cleanup() {
try {
//close the FileOutputStream (which also closes the Channel)
fileOutputStream.close();
} catch (Exception e) {/* noop */
LOG.error("Error closing the file stream: "+ this.saveAs ,e);
}
if (bytesDownloaded != size) {
//if the download is not complete then
//delete the file being downloaded
try {
file.delete();
} catch (Exception e) {
LOG.error("Error deleting file in cleanup" + e.getMessage());
}
//if the failure is due to a user abort it is returned nomally else an exception is thrown
if (!aborted)
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
"Unable to download " + fileName + " completely. Downloaded "
+ bytesDownloaded + "!=" + size);
}
}
/**
* Open a new stream using HttpClient
*/
FastInputStream getStream() throws IOException {
HttpSolrServer s = new HttpSolrServer(masterUrl, myHttpClient, null); //XXX use shardhandler
s.setSoTimeout(60000);
s.setConnectionTimeout(15000);
ModifiableSolrParams params = new ModifiableSolrParams();
// //the method is command=filecontent
params.set(COMMAND, CMD_GET_FILE);
params.set(GENERATION, Long.toString(indexGen));
params.set(CommonParams.QT, "/replication");
//add the version to download. This is used to reserve the download
if (isConf) {
//set cf instead of file for config file
params.set(CONF_FILE_SHORT, fileName);
} else {
params.set(FILE, fileName);
}
if (useInternal) {
params.set(COMPRESSION, "true");
}
//use checksum
if (this.includeChecksum) {
params.set(CHECKSUM, true);
}
//wt=filestream this is a custom protocol
params.set(CommonParams.WT, FILE_STREAM);
// This happen if there is a failure there is a retry. the offset=<sizedownloaded> ensures that
// the server starts from the offset
if (bytesDownloaded > 0) {
params.set(OFFSET, Long.toString(bytesDownloaded));
}
NamedList response;
InputStream is = null;
try {
QueryRequest req = new QueryRequest(params);
response = s.request(req);
is = (InputStream) response.get("stream");
if(useInternal) {
is = new InflaterInputStream(is);
}
return new FastInputStream(is);
} catch (Throwable t) {
//close stream on error
IOUtils.closeQuietly(is);
throw new IOException("Could not download file '" + fileName + "'", t);
}
}
}
NamedList getDetails() throws IOException, SolrServerException {
ModifiableSolrParams params = new ModifiableSolrParams();
params.set(COMMAND, CMD_DETAILS);
params.set("slave", false);
params.set(CommonParams.QT, "/replication");
HttpSolrServer server = new HttpSolrServer(masterUrl, myHttpClient); //XXX use shardhandler
server.setSoTimeout(60000);
server.setConnectionTimeout(15000);
QueryRequest request = new QueryRequest(params);
return server.request(request);
}
static Integer readInterval(String interval) {
if (interval == null)
return null;
int result = 0;
if (interval != null) {
Matcher m = INTERVAL_PATTERN.matcher(interval.trim());
if (m.find()) {
String hr = m.group(1);
String min = m.group(2);
String sec = m.group(3);
result = 0;
try {
if (sec != null && sec.length() > 0)
result += Integer.parseInt(sec);
if (min != null && min.length() > 0)
result += (60 * Integer.parseInt(min));
if (hr != null && hr.length() > 0)
result += (60 * 60 * Integer.parseInt(hr));
result *= 1000;
} catch (NumberFormatException e) {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
INTERVAL_ERR_MSG);
}
} else {
throw new SolrException(SolrException.ErrorCode.SERVER_ERROR,
INTERVAL_ERR_MSG);
}
}
return result;
}
public void destroy() {
try {
if (executorService != null) executorService.shutdown();
} catch (Throwable e) {
SolrException.log(LOG, e);
}
try {
abortPull();
} catch (Throwable e) {
SolrException.log(LOG, e);
}
try {
if (executorService != null) ExecutorUtil
.shutdownNowAndAwaitTermination(executorService);
} catch (Throwable e) {
SolrException.log(LOG, e);
}
}
String getMasterUrl() {
return masterUrl;
}
String getPollInterval() {
return pollIntervalStr;
}
private static final int MAX_RETRIES = 5;
private static final int NO_CONTENT = 1;
private static final int ERR = 2;
public static final String REPLICATION_PROPERTIES = "replication.properties";
public static final String POLL_INTERVAL = "pollInterval";
public static final String INTERVAL_ERR_MSG = "The " + POLL_INTERVAL + " must be in this format 'HH:mm:ss'";
private static final Pattern INTERVAL_PATTERN = Pattern.compile("(\\d*?):(\\d*?):(\\d*)");
static final String INDEX_REPLICATED_AT = "indexReplicatedAt";
static final String TIMES_INDEX_REPLICATED = "timesIndexReplicated";
static final String CONF_FILES_REPLICATED = "confFilesReplicated";
static final String CONF_FILES_REPLICATED_AT = "confFilesReplicatedAt";
static final String TIMES_CONFIG_REPLICATED = "timesConfigReplicated";
static final String LAST_CYCLE_BYTES_DOWNLOADED = "lastCycleBytesDownloaded";
static final String TIMES_FAILED = "timesFailed";
static final String REPLICATION_FAILED_AT = "replicationFailedAt";
static final String PREVIOUS_CYCLE_TIME_TAKEN = "previousCycleTimeInSeconds";
static final String INDEX_REPLICATED_AT_LIST = "indexReplicatedAtList";
static final String REPLICATION_FAILED_AT_LIST = "replicationFailedAtList";
}