blob: 7e148810b470aeb6ef0daa8d23214604dc36d924 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import org.apache.lucene.util.IOUtils;
/** Handles one set of files that need copying, either because we have a
* new NRT point, or we are pre-copying merged files for merge warming. */
class SimpleCopyJob extends CopyJob {
final Connection c;
final byte[] copyBuffer = new byte[65536];
final CopyState copyState;
private Iterator<Map.Entry<String,FileMetaData>> iter;
public SimpleCopyJob(String reason, Connection c, CopyState copyState, SimpleReplicaNode dest, Map<String,FileMetaData> files, boolean highPriority, OnceDone onceDone)
throws IOException {
super(reason, files, dest, highPriority, onceDone);
dest.message("create SimpleCopyJob o" + ord);
this.c = c;
this.copyState = copyState;
}
@Override
public synchronized void start() throws IOException {
if (iter == null) {
iter = toCopy.iterator();
// Send all file names / offsets up front to avoid ping-ping latency:
try {
// This means we resumed an already in-progress copy; we do this one first:
if (current != null) {
c.out.writeByte((byte) 0);
c.out.writeString(current.name);
c.out.writeVLong(current.getBytesCopied());
totBytes += current.metaData.length;
}
for (Map.Entry<String,FileMetaData> ent : toCopy) {
String fileName = ent.getKey();
FileMetaData metaData = ent.getValue();
totBytes += metaData.length;
c.out.writeByte((byte) 0);
c.out.writeString(fileName);
c.out.writeVLong(0);
}
c.out.writeByte((byte) 1);
c.flush();
c.s.shutdownOutput();
if (current != null) {
// Do this only at the end, after sending all requested files, so we don't deadlock due to socket buffering waiting for primary to
// send us this length:
long len = c.in.readVLong();
if (len != current.metaData.length) {
throw new IllegalStateException("file " + current.name + ": meta data says length=" + current.metaData.length + " but c.in says " + len);
}
}
dest.message("SimpleCopyJob.init: done start files count=" + toCopy.size() + " totBytes=" + totBytes);
} catch (Throwable t) {
cancel("exc during start", t);
throw new NodeCommunicationException("exc during start", t);
}
} else {
throw new IllegalStateException("already started");
}
}
@Override
public long getTotalBytesCopied() {
return totBytesCopied;
}
@Override
public Set<String> getFileNamesToCopy() {
Set<String> fileNames = new HashSet<>();
for(Map.Entry<String,FileMetaData> ent : toCopy) {
fileNames.add(ent.getKey());
}
return fileNames;
}
@Override
public Set<String> getFileNames() {
return files.keySet();
}
/** Higher priority and then "first come first serve" order. */
@Override
public int compareTo(CopyJob _other) {
SimpleCopyJob other = (SimpleCopyJob) _other;
if (highPriority != other.highPriority) {
return highPriority ? -1 : 1;
} else if (ord < other.ord) {
return -1;
} else if (ord > other.ord) {
return 1;
} else {
return 0;
}
}
@Override
public void finish() throws IOException {
dest.message(String.format(Locale.ROOT,
"top: file copy done; took %.1f msec to copy %d bytes; now rename %d tmp files",
(System.nanoTime() - startNS)/1000000.0,
totBytesCopied,
copiedFiles.size()));
// NOTE: if any of the files we copied overwrote a file in the current commit point, we (ReplicaNode) removed the commit point up
// front so that the commit is not corrupt. This way if we hit exc here, or if we crash here, we won't leave a corrupt commit in
// the index:
for(Map.Entry<String,String> ent : copiedFiles.entrySet()) {
String tmpFileName = ent.getValue();
String fileName = ent.getKey();
if (Node.VERBOSE_FILES) {
dest.message("rename file " + tmpFileName + " to " + fileName);
}
// NOTE: if this throws exception, then some files have been moved to their true names, and others are leftover .tmp files. I don't
// think heroic exception handling is necessary (no harm will come, except some leftover files), nor warranted here (would make the
// code more complex, for the exceptional cases when something is wrong w/ your IO system):
dest.dir.rename(tmpFileName, fileName);
}
copiedFiles.clear();
}
/** Do an iota of work; returns true if all copying is done */
synchronized boolean visit() throws IOException {
if (exc != null) {
// We were externally cancelled:
return true;
}
if (current == null) {
if (iter.hasNext() == false) {
c.close();
return true;
}
Map.Entry<String,FileMetaData> next = iter.next();
FileMetaData metaData = next.getValue();
String fileName = next.getKey();
long len = c.in.readVLong();
if (len != metaData.length) {
throw new IllegalStateException("file " + fileName + ": meta data says length=" + metaData.length + " but c.in says " + len);
}
current = new CopyOneFile(c.in, dest, fileName, metaData, copyBuffer);
}
if (current.visit()) {
// This file is done copying
copiedFiles.put(current.name, current.tmpName);
totBytesCopied += current.getBytesCopied();
assert totBytesCopied <= totBytes: "totBytesCopied=" + totBytesCopied + " totBytes=" + totBytes;
current = null;
return false;
}
return false;
}
protected CopyOneFile newCopyOneFile(CopyOneFile prev) {
return new CopyOneFile(prev, c.in);
}
@Override
public synchronized void transferAndCancel(CopyJob prevJob) throws IOException {
try {
super.transferAndCancel(prevJob);
} finally {
IOUtils.closeWhileHandlingException(((SimpleCopyJob) prevJob).c);
}
}
public synchronized void cancel(String reason, Throwable exc) throws IOException {
try {
super.cancel(reason, exc);
} finally {
IOUtils.closeWhileHandlingException(c);
}
}
@Override
public boolean getFailed() {
return exc != null;
}
@Override
public String toString() {
return "SimpleCopyJob(ord=" + ord + " " + reason + " highPriority=" + highPriority + " files count=" + files.size() + " bytesCopied=" + totBytesCopied + " (of " + totBytes + ") filesCopied=" + copiedFiles.size() + ")";
}
@Override
public void runBlocking() throws IOException {
while (visit() == false);
if (getFailed()) {
throw new RuntimeException("copy failed: " + cancelReason, exc);
}
}
@Override
public CopyState getCopyState() {
return copyState;
}
@Override
public synchronized boolean conflicts(CopyJob _other) {
Set<String> filesToCopy = new HashSet<>();
for(Map.Entry<String,FileMetaData> ent : toCopy) {
filesToCopy.add(ent.getKey());
}
SimpleCopyJob other = (SimpleCopyJob) _other;
synchronized (other) {
for(Map.Entry<String,FileMetaData> ent : other.toCopy) {
if (filesToCopy.contains(ent.getKey())) {
return true;
}
}
}
return false;
}
}