blob: 7db296b00360ae5b7620c3d4a29c6a1e70fe50af [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.lucene.replicator.nrt;
import java.io.Closeable;
import java.io.IOException;
import java.util.Locale;
import org.apache.lucene.store.DataInput;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexOutput;
/** Copies one file from an incoming DataInput to a dest filename in a local Directory */
public class CopyOneFile implements Closeable {
private final DataInput in;
private final IndexOutput out;
private final ReplicaNode dest;
public final String name;
public final String tmpName;
public final FileMetaData metaData;
public final long bytesToCopy;
private final long copyStartNS;
private final byte[] buffer;
private long bytesCopied;
public CopyOneFile(DataInput in, ReplicaNode dest, String name, FileMetaData metaData, byte[] buffer) throws IOException {
this.in = in;
this.name = name;
this.dest = dest;
this.buffer = buffer;
// TODO: pass correct IOCtx, e.g. seg total size
out = dest.createTempOutput(name, "copy", IOContext.DEFAULT);
tmpName = out.getName();
// last 8 bytes are checksum, which we write ourselves after copying all bytes and confirming checksum:
bytesToCopy = metaData.length - Long.BYTES;
if (Node.VERBOSE_FILES) {
dest.message("file " + name + ": start copying to tmp file " + tmpName + " length=" + (8+bytesToCopy));
}
copyStartNS = System.nanoTime();
this.metaData = metaData;
dest.startCopyFile(name);
}
/** Transfers this file copy to another input, continuing where the first one left off */
public CopyOneFile(CopyOneFile other, DataInput in) {
this.in = in;
this.dest = other.dest;
this.name = other.name;
this.out = other.out;
this.tmpName = other.tmpName;
this.metaData = other.metaData;
this.bytesCopied = other.bytesCopied;
this.bytesToCopy = other.bytesToCopy;
this.copyStartNS = other.copyStartNS;
this.buffer = other.buffer;
}
public void close() throws IOException {
out.close();
dest.finishCopyFile(name);
}
/** Copy another chunk of bytes, returning true once the copy is done */
public boolean visit() throws IOException {
// Copy up to 640 KB per visit:
for(int i=0;i<10;i++) {
long bytesLeft = bytesToCopy - bytesCopied;
if (bytesLeft == 0) {
long checksum = out.getChecksum();
if (checksum != metaData.checksum) {
// Bits flipped during copy!
dest.message("file " + tmpName + ": checksum mismatch after copy (bits flipped during network copy?) after-copy checksum=" + checksum + " vs expected=" + metaData.checksum + "; cancel job");
throw new IOException("file " + name + ": checksum mismatch after file copy");
}
// Paranoia: make sure the primary node is not smoking crack, by somehow sending us an already corrupted file whose checksum (in its
// footer) disagrees with reality:
long actualChecksumIn = in.readLong();
if (actualChecksumIn != checksum) {
dest.message("file " + tmpName + ": checksum claimed by primary disagrees with the file's footer: claimed checksum=" + checksum + " vs actual=" + actualChecksumIn);
throw new IOException("file " + name + ": checksum mismatch after file copy");
}
out.writeLong(checksum);
bytesCopied += Long.BYTES;
close();
if (Node.VERBOSE_FILES) {
dest.message(String.format(Locale.ROOT, "file %s: done copying [%s, %.3fms]",
name,
Node.bytesToString(metaData.length),
(System.nanoTime() - copyStartNS)/1000000.0));
}
return true;
}
int toCopy = (int) Math.min(bytesLeft, buffer.length);
in.readBytes(buffer, 0, toCopy);
out.writeBytes(buffer, 0, toCopy);
// TODO: rsync will fsync a range of the file; maybe we should do that here for large files in case we crash/killed
bytesCopied += toCopy;
}
return false;
}
public long getBytesCopied() {
return bytesCopied;
}
}