blob: 74e7c88b79d0341c344d1710c39e1c096247a2d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.util.IOUtils;
/** Handles copying one set of files, e.g. all files for a new NRT point, or files for pre-copying a merged segment.
* This notifies the caller via OnceDone when the job finishes or failed.
*
* @lucene.experimental */
public abstract class CopyJob implements Comparable<CopyJob> {
private final static AtomicLong counter = new AtomicLong();
protected final ReplicaNode dest;
protected final Map<String,FileMetaData> files;
public final long ord = counter.incrementAndGet();
/** True for an NRT sync, false for pre-copying a newly merged segment */
public final boolean highPriority;
public final OnceDone onceDone;
public final long startNS = System.nanoTime();
public final String reason;
protected final List<Map.Entry<String,FileMetaData>> toCopy;
protected long totBytes;
protected long totBytesCopied;
// The file we are currently copying:
protected CopyOneFile current;
// Set when we are cancelled
protected volatile Throwable exc;
protected volatile String cancelReason;
// toString may concurrently access this:
protected final Map<String,String> copiedFiles = new ConcurrentHashMap<>();
protected CopyJob(String reason, Map<String,FileMetaData> files, ReplicaNode dest, boolean highPriority, OnceDone onceDone) throws IOException {
this.reason = reason;
this.files = files;
this.dest = dest;
this.highPriority = highPriority;
this.onceDone = onceDone;
// Exceptions in here are bad:
try {
this.toCopy = dest.getFilesToCopy(this.files);
} catch (Throwable t) {
cancel("exc during init", t);
throw new CorruptIndexException("exception while checking local files", "n/a", t);
}
}
/** Callback invoked by CopyJob once all files have (finally) finished copying */
public interface OnceDone {
public void run(CopyJob job) throws IOException;
}
/** Transfers whatever tmp files were already copied in this previous job and cancels the previous job */
public synchronized void transferAndCancel(CopyJob prevJob) throws IOException {
synchronized(prevJob) {
dest.message("CopyJob: now transfer prevJob " + prevJob);
try {
_transferAndCancel(prevJob);
} catch (Throwable t) {
dest.message("xfer: exc during transferAndCancel");
cancel("exc during transferAndCancel", t);
throw IOUtils.rethrowAlways(t);
}
}
}
private synchronized void _transferAndCancel(CopyJob prevJob) throws IOException {
// Caller must already be sync'd on prevJob:
assert Thread.holdsLock(prevJob);
if (prevJob.exc != null) {
// Already cancelled
dest.message("xfer: prevJob was already cancelled; skip transfer");
return;
}
// Cancel the previous job
prevJob.exc = new Throwable();
// Carry over already copied files that we also want to copy
Iterator<Map.Entry<String,FileMetaData>> it = toCopy.iterator();
long bytesAlreadyCopied = 0;
// Iterate over all files we think we need to copy:
while (it.hasNext()) {
Map.Entry<String,FileMetaData> ent = it.next();
String fileName = ent.getKey();
String prevTmpFileName = prevJob.copiedFiles.get(fileName);
if (prevTmpFileName != null) {
// This fileName is common to both jobs, and the old job already finished copying it (to a temp file), so we keep it:
long fileLength = ent.getValue().length;
bytesAlreadyCopied += fileLength;
dest.message("xfer: carry over already-copied file " + fileName + " (" + prevTmpFileName + ", " + fileLength + " bytes)");
copiedFiles.put(fileName, prevTmpFileName);
// So we don't try to delete it, below:
prevJob.copiedFiles.remove(fileName);
// So it's not in our copy list anymore:
it.remove();
} else if (prevJob.current != null && prevJob.current.name.equals(fileName)) {
// This fileName is common to both jobs, and it's the file that the previous job was in the process of copying. In this case
// we continue copying it from the prevoius job. This is important for cases where we are copying over a large file
// because otherwise we could keep failing the NRT copy and restarting this file from the beginning and never catch up:
dest.message("xfer: carry over in-progress file " + fileName + " (" + prevJob.current.tmpName + ") bytesCopied=" + prevJob.current.getBytesCopied() + " of " + prevJob.current.bytesToCopy);
bytesAlreadyCopied += prevJob.current.getBytesCopied();
assert current == null;
// must set current first, before writing/read to c.in/out in case that hits an exception, so that we then close the temp
// IndexOutput when cancelling ourselves:
current = newCopyOneFile(prevJob.current);
// Tell our new (primary) connection we'd like to copy this file first, but resuming from how many bytes we already copied last time:
// We do this even if bytesToCopy == bytesCopied, because we still need to readLong() the checksum from the primary connection:
assert prevJob.current.getBytesCopied() <= prevJob.current.bytesToCopy;
prevJob.current = null;
totBytes += current.metaData.length;
// So it's not in our copy list anymore:
it.remove();
} else {
dest.message("xfer: file " + fileName + " will be fully copied");
}
}
dest.message("xfer: " + bytesAlreadyCopied + " bytes already copied of " + totBytes);
// Delete all temp files the old job wrote but we don't need:
dest.message("xfer: now delete old temp files: " + prevJob.copiedFiles.values());
IOUtils.deleteFilesIgnoringExceptions(dest.dir, prevJob.copiedFiles.values());
if (prevJob.current != null) {
IOUtils.closeWhileHandlingException(prevJob.current);
if (Node.VERBOSE_FILES) {
dest.message("remove partial file " + prevJob.current.tmpName);
}
dest.deleter.deleteNewFile(prevJob.current.tmpName);
prevJob.current = null;
}
}
protected abstract CopyOneFile newCopyOneFile(CopyOneFile current);
/** Begin copying files */
public abstract void start() throws IOException;
/** Use current thread (blocking) to do all copying and then return once done, or throw exception on failure */
public abstract void runBlocking() throws Exception;
public void cancel(String reason, Throwable exc) throws IOException {
if (this.exc != null) {
// Already cancelled
return;
}
dest.message(String.format(Locale.ROOT, "top: cancel after copying %s; exc=%s:\n files=%s\n copiedFiles=%s",
Node.bytesToString(totBytesCopied),
exc,
files == null ? "null" : files.keySet(), copiedFiles.keySet()));
if (exc == null) {
exc = new Throwable();
}
this.exc = exc;
this.cancelReason = reason;
// Delete all temp files we wrote:
IOUtils.deleteFilesIgnoringExceptions(dest.dir, copiedFiles.values());
if (current != null) {
IOUtils.closeWhileHandlingException(current);
if (Node.VERBOSE_FILES) {
dest.message("remove partial file " + current.tmpName);
}
dest.deleter.deleteNewFile(current.tmpName);
current = null;
}
}
/** Return true if this job is trying to copy any of the same files as the other job */
public abstract boolean conflicts(CopyJob other);
/** Renames all copied (tmp) files to their true file names */
public abstract void finish() throws IOException;
public abstract boolean getFailed();
/** Returns only those file names (a subset of {@link #getFileNames}) that need to be copied */
public abstract Set<String> getFileNamesToCopy();
/** Returns all file names referenced in this copy job */
public abstract Set<String> getFileNames();
public abstract CopyState getCopyState();
public abstract long getTotalBytesCopied();
}