blob: b666db0490b9ce3572c0418c8c8649c33796ac24 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.regex.Matcher;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexCommit;
import org.apache.lucene.index.IndexFileNames;
import org.apache.lucene.index.IndexNotFoundException;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.util.IOUtils;
import org.apache.lucene.util.InfoStream;
/**
* A {@link ReplicationHandler} for replication of an index. Implements
* {@link #revisionReady} by copying the files pointed by the client resolver to
* the index {@link Directory} and then touches the index with
* {@link IndexWriter} to make sure any unused files are deleted.
* <p>
* <b>NOTE:</b> this handler assumes that {@link IndexWriter} is not opened by
* another process on the index directory. In fact, opening an
* {@link IndexWriter} on the same directory to which files are copied can lead
* to undefined behavior, where some or all the files will be deleted, override
* other files or simply create a mess. When you replicate an index, it is best
* if the index is never modified by {@link IndexWriter}, except the one that is
* open on the source index, from which you replicate.
* <p>
* This handler notifies the application via a provided {@link Callable} when an
* updated index commit was made available for it.
*
* @lucene.experimental
*/
public class IndexReplicationHandler implements ReplicationHandler {
/**
* The component used to log messages to the {@link InfoStream#getDefault()
* default} {@link InfoStream}.
*/
public static final String INFO_STREAM_COMPONENT = "IndexReplicationHandler";
private final Directory indexDir;
private final Callable<Boolean> callback;
private volatile Map<String,List<RevisionFile>> currentRevisionFiles;
private volatile String currentVersion;
private volatile InfoStream infoStream = InfoStream.getDefault();
/**
* Returns the last {@link IndexCommit} found in the {@link Directory}, or
* {@code null} if there are no commits.
*/
public static IndexCommit getLastCommit(Directory dir) throws IOException {
try {
if (DirectoryReader.indexExists(dir)) {
List<IndexCommit> commits = DirectoryReader.listCommits(dir);
// listCommits guarantees that we get at least one commit back, or
// IndexNotFoundException which we handle below
return commits.get(commits.size() - 1);
}
} catch (IndexNotFoundException e) {
// ignore the exception and return null
}
return null;
}
/**
* Verifies that the last file is segments_N and fails otherwise. It also
* removes and returns the file from the list, because it needs to be handled
* last, after all files. This is important in order to guarantee that if a
* reader sees the new segments_N, all other segment files are already on
* stable storage.
* <p>
* The reason why the code fails instead of putting segments_N file last is
* that this indicates an error in the Revision implementation.
*/
public static String getSegmentsFile(List<String> files, boolean allowEmpty) {
if (files.isEmpty()) {
if (allowEmpty) {
return null;
} else {
throw new IllegalStateException("empty list of files not allowed");
}
}
String segmentsFile = files.remove(files.size() - 1);
if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile
+ "; check your Revision implementation!");
}
return segmentsFile;
}
/**
* Cleanup the index directory by deleting all given files. Called when file
* copy or sync failed.
*/
public static void cleanupFilesOnFailure(Directory dir, List<String> files) {
for (String file : files) {
// suppress any exception because if we're here, it means copy
// failed, and we must cleanup after ourselves.
IOUtils.deleteFilesIgnoringExceptions(dir, file);
}
}
/**
* Cleans up the index directory from old index files. This method uses the
* last commit found by {@link #getLastCommit(Directory)}. If it matches the
* expected segmentsFile, then all files not referenced by this commit point
* are deleted.
* <p>
* <b>NOTE:</b> this method does a best effort attempt to clean the index
* directory. It suppresses any exceptions that occur, as this can be retried
* the next time.
*/
public static void cleanupOldIndexFiles(Directory dir, String segmentsFile, InfoStream infoStream) {
try {
IndexCommit commit = getLastCommit(dir);
// commit == null means weird IO errors occurred, ignore them
// if there were any IO errors reading the expected commit point (i.e.
// segments files mismatch), then ignore that commit either.
if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
Set<String> commitFiles = new HashSet<>(commit.getFileNames());
Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
for (String file : dir.listAll()) {
if (!commitFiles.contains(file)
&& (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) {
// suppress exceptions, it's just a best effort
IOUtils.deleteFilesIgnoringExceptions(dir, file);
}
}
}
} catch (Throwable t) {
// ignore any errors that happen during this state and only log it. this
// cleanup will have a chance to succeed the next time we get a new
// revision.
if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
infoStream.message(INFO_STREAM_COMPONENT, "cleanupOldIndexFiles(): failed on error " + t.getMessage());
}
}
}
/**
* Copies the files from the source directory to the target one, if they are
* not the same.
*/
public static void copyFiles(Directory source, Directory target, List<String> files) throws IOException {
if (!source.equals(target)) {
for (String file : files) {
target.copyFrom(source, file, file, IOContext.READONCE);
}
}
}
/**
* Constructor with the given index directory and callback to notify when the
* indexes were updated.
*/
public IndexReplicationHandler(Directory indexDir, Callable<Boolean> callback) throws IOException {
this.callback = callback;
this.indexDir = indexDir;
currentRevisionFiles = null;
currentVersion = null;
if (DirectoryReader.indexExists(indexDir)) {
final List<IndexCommit> commits = DirectoryReader.listCommits(indexDir);
final IndexCommit commit = commits.get(commits.size() - 1);
currentRevisionFiles = IndexRevision.revisionFiles(commit);
currentVersion = IndexRevision.revisionVersion(commit);
final InfoStream infoStream = InfoStream.getDefault();
if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
+ " currentRevisionFiles=" + currentRevisionFiles);
infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit);
}
}
}
@Override
public String currentVersion() {
return currentVersion;
}
@Override
public Map<String,List<RevisionFile>> currentRevisionFiles() {
return currentRevisionFiles;
}
@Override
public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles,
Map<String,List<String>> copiedFiles, Map<String,Directory> sourceDirectory) throws IOException {
if (revisionFiles.size() > 1) {
throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet());
}
Directory clientDir = sourceDirectory.values().iterator().next();
List<String> files = copiedFiles.values().iterator().next();
String segmentsFile = getSegmentsFile(files, false);
String pendingSegmentsFile = "pending_" + segmentsFile;
boolean success = false;
try {
// copy files from the client to index directory
copyFiles(clientDir, indexDir, files);
// fsync all copied files (except segmentsFile)
indexDir.sync(files);
// now copy and fsync segmentsFile as pending, then rename (simulating lucene commit)
indexDir.copyFrom(clientDir, segmentsFile, pendingSegmentsFile, IOContext.READONCE);
indexDir.sync(Collections.singletonList(pendingSegmentsFile));
indexDir.rename(pendingSegmentsFile, segmentsFile);
indexDir.syncMetaData();
success = true;
} finally {
if (!success) {
files.add(segmentsFile); // add it back so it gets deleted too
files.add(pendingSegmentsFile);
cleanupFilesOnFailure(indexDir, files);
}
}
// all files have been successfully copied + sync'd. update the handler's state
currentRevisionFiles = revisionFiles;
currentVersion = version;
if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
+ " currentRevisionFiles=" + currentRevisionFiles);
}
// Cleanup the index directory from old and unused index files.
// NOTE: we don't use IndexWriter.deleteUnusedFiles here since it may have
// side-effects, e.g. if it hits sudden IO errors while opening the index
// (and can end up deleting the entire index). It is not our job to protect
// against those errors, app will probably hit them elsewhere.
cleanupOldIndexFiles(indexDir, segmentsFile, infoStream);
// successfully updated the index, notify the callback that the index is
// ready.
if (callback != null) {
try {
callback.call();
} catch (Exception e) {
throw new IOException(e);
}
}
}
/** Sets the {@link InfoStream} to use for logging messages. */
public void setInfoStream(InfoStream infoStream) {
if (infoStream == null) {
infoStream = InfoStream.NO_OUTPUT;
}
this.infoStream = infoStream;
}
}