blob: 1cca9fe2bfdb1c1851cb475c540f0b9bc6531513 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.IntBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.function.IntFunction;
import java.util.zip.CRC32;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.impl.CombinedFileRange;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_STANDARD_OPTIONS;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.impl.StoreImplementationUtils.isProbeForSyncable;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRanges;
/****************************************************************
* Abstract Checksumed FileSystem.
* It provide a basic implementation of a Checksumed FileSystem,
* which creates a checksum file for each raw file.
* It generates & verifies checksums at the client side.
*
*****************************************************************/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class ChecksumFileSystem extends FilterFileSystem {
private static final byte[] CHECKSUM_VERSION = new byte[] {'c', 'r', 'c', 0};
private int bytesPerChecksum = 512;
private boolean verifyChecksum = true;
private boolean writeChecksum = true;
public static double getApproxChkSumLength(long size) {
return ChecksumFSOutputSummer.CHKSUM_AS_FRACTION * size;
}
public ChecksumFileSystem(FileSystem fs) {
super(fs);
}
@Override
public void setConf(Configuration conf) {
super.setConf(conf);
if (conf != null) {
bytesPerChecksum = conf.getInt(LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_KEY,
LocalFileSystemConfigKeys.LOCAL_FS_BYTES_PER_CHECKSUM_DEFAULT);
Preconditions.checkState(bytesPerChecksum > 0,
"bytes per checksum should be positive but was %s",
bytesPerChecksum);
}
}
/**
* Set whether to verify checksum.
*/
@Override
public void setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
}
@Override
public void setWriteChecksum(boolean writeChecksum) {
this.writeChecksum = writeChecksum;
}
/** get the raw file system */
@Override
public FileSystem getRawFileSystem() {
return fs;
}
/**
* Return the name of the checksum file associated with a file.
*
* @param file the file path.
* @return name of the checksum file associated with a file.
*/
public Path getChecksumFile(Path file) {
return new Path(file.getParent(), "." + file.getName() + ".crc");
}
/**
* Return true if file is a checksum file name.
*
* @param file the file path.
* @return if file is a checksum file true, not false.
*/
public static boolean isChecksumFile(Path file) {
String name = file.getName();
return name.startsWith(".") && name.endsWith(".crc");
}
/**
* Return the length of the checksum file given the size of the
* actual file.
*
* @param file the file path.
* @param fileSize file size.
* @return checksum length.
*/
public long getChecksumFileLength(Path file, long fileSize) {
return getChecksumLength(fileSize, getBytesPerSum());
}
/**
* Return the bytes Per Checksum.
*
* @return bytes per check sum.
*/
public int getBytesPerSum() {
return bytesPerChecksum;
}
private int getSumBufferSize(int bytesPerSum, int bufferSize) {
int defaultBufferSize = getConf().getInt(
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT);
int proportionalBufferSize = bufferSize / bytesPerSum;
return Math.max(bytesPerSum,
Math.max(proportionalBufferSize, defaultBufferSize));
}
/*******************************************************
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource, StreamCapabilities {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
private static final int HEADER_LENGTH = 8;
private int bytesPerSum = 1;
public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file)
throws IOException {
this(fs, file, fs.getConf().getInt(
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_KEY,
LocalFileSystemConfigKeys.LOCAL_FS_STREAM_BUFFER_SIZE_DEFAULT));
}
public ChecksumFSInputChecker(ChecksumFileSystem fs, Path file, int bufferSize)
throws IOException {
super( file, fs.getFileStatus(file).getReplication() );
this.datas = fs.getRawFileSystem().open(file, bufferSize);
this.fs = fs;
Path sumFile = fs.getChecksumFile(file);
try {
int sumBufferSize = fs.getSumBufferSize(fs.getBytesPerSum(), bufferSize);
sums = fs.getRawFileSystem().open(sumFile, sumBufferSize);
byte[] version = new byte[CHECKSUM_VERSION.length];
sums.readFully(version);
if (!Arrays.equals(version, CHECKSUM_VERSION))
throw new IOException("Not a checksum file: "+sumFile);
this.bytesPerSum = sums.readInt();
set(fs.verifyChecksum, DataChecksum.newCrc32(), bytesPerSum,
FSInputChecker.CHECKSUM_SIZE);
} catch (IOException e) {
// mincing the message is terrible, but java throws permission
// exceptions as FNF because that's all the method signatures allow!
if (!(e instanceof FileNotFoundException) ||
e.getMessage().endsWith(" (Permission denied)")) {
LOG.warn("Problem opening checksum file: "+ file +
". Ignoring exception: " , e);
}
set(fs.verifyChecksum, null, 1, 0);
}
}
private long getChecksumFilePos( long dataPos ) {
return HEADER_LENGTH + FSInputChecker.CHECKSUM_SIZE*(dataPos/bytesPerSum);
}
@Override
protected long getChunkPosition( long dataPos ) {
return dataPos/bytesPerSum*bytesPerSum;
}
@Override
public int available() throws IOException {
return datas.available() + super.available();
}
@Override
public int read(long position, byte[] b, int off, int len)
throws IOException {
// parameter check
validatePositionedReadArgs(position, b, off, len);
if (len == 0) {
return 0;
}
int nread;
try (ChecksumFSInputChecker checker =
new ChecksumFSInputChecker(fs, file)) {
checker.seek(position);
nread = checker.read(b, off, len);
}
return nread;
}
@Override
public void close() throws IOException {
datas.close();
if( sums != null ) {
sums.close();
}
set(fs.verifyChecksum, null, 1, 0);
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
long sumsPos = getChecksumFilePos(targetPos);
fs.reportChecksumFailure(file, datas, targetPos, sums, sumsPos);
boolean newDataSource = datas.seekToNewSource(targetPos);
return sums.seekToNewSource(sumsPos) || newDataSource;
}
@Override
protected int readChunk(long pos, byte[] buf, int offset, int len,
byte[] checksum) throws IOException {
boolean eof = false;
if (needChecksum()) {
assert checksum != null; // we have a checksum buffer
assert checksum.length % CHECKSUM_SIZE == 0; // it is sane length
assert len >= bytesPerSum; // we must read at least one chunk
final int checksumsToRead = Math.min(
len/bytesPerSum, // number of checksums based on len to read
checksum.length / CHECKSUM_SIZE); // size of checksum buffer
long checksumPos = getChecksumFilePos(pos);
if(checksumPos != sums.getPos()) {
sums.seek(checksumPos);
}
int sumLenRead = sums.read(checksum, 0, CHECKSUM_SIZE * checksumsToRead);
if (sumLenRead >= 0 && sumLenRead % CHECKSUM_SIZE != 0) {
throw new ChecksumException(
"Checksum file not a length multiple of checksum size " +
"in " + file + " at " + pos + " checksumpos: " + checksumPos +
" sumLenread: " + sumLenRead,
pos);
}
if (sumLenRead <= 0) { // we're at the end of the file
eof = true;
} else {
// Adjust amount of data to read based on how many checksum chunks we read
len = Math.min(len, bytesPerSum * (sumLenRead / CHECKSUM_SIZE));
}
}
if(pos != datas.getPos()) {
datas.seek(pos);
}
int nread = readFully(datas, buf, offset, len);
if (eof && nread > 0) {
throw new ChecksumException("Checksum error: "+file+" at "+pos, pos);
}
return nread;
}
/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
public static long findChecksumOffset(long dataOffset,
int bytesPerSum) {
return HEADER_LENGTH + (dataOffset/bytesPerSum) * FSInputChecker.CHECKSUM_SIZE;
}
/**
* Find the checksum ranges that correspond to the given data ranges.
* @param dataRanges the input data ranges, which are assumed to be sorted
* and non-overlapping
* @return a list of AsyncReaderUtils.CombinedFileRange that correspond to
* the checksum ranges
*/
public static List<CombinedFileRange> findChecksumRanges(
List<? extends FileRange> dataRanges,
int bytesPerSum,
int minSeek,
int maxSize) {
List<CombinedFileRange> result = new ArrayList<>();
CombinedFileRange currentCrc = null;
for(FileRange range: dataRanges) {
long crcOffset = findChecksumOffset(range.getOffset(), bytesPerSum);
long crcEnd = findChecksumOffset(range.getOffset() + range.getLength() +
bytesPerSum - 1, bytesPerSum);
if (currentCrc == null ||
!currentCrc.merge(crcOffset, crcEnd, range, minSeek, maxSize)) {
currentCrc = new CombinedFileRange(crcOffset, crcEnd, range);
result.add(currentCrc);
}
}
return result;
}
/**
* Check the data against the checksums.
* @param sumsBytes the checksum data
* @param sumsOffset where from the checksum file this buffer started
* @param data the file data
* @param dataOffset where the file data started (must be a multiple of
* bytesPerSum)
* @param bytesPerSum how many bytes per a checksum
* @param file the path of the filename
* @return the data buffer
* @throws CompletionException if the checksums don't match
*/
static ByteBuffer checkBytes(ByteBuffer sumsBytes,
long sumsOffset,
ByteBuffer data,
long dataOffset,
int bytesPerSum,
Path file) {
// determine how many bytes we need to skip at the start of the sums
int offset =
(int) (findChecksumOffset(dataOffset, bytesPerSum) - sumsOffset);
IntBuffer sums = sumsBytes.asIntBuffer();
sums.position(offset / FSInputChecker.CHECKSUM_SIZE);
ByteBuffer current = data.duplicate();
int numChunks = data.remaining() / bytesPerSum;
CRC32 crc = new CRC32();
// check each chunk to ensure they match
for(int c = 0; c < numChunks; ++c) {
// set the buffer position and the limit
current.limit((c + 1) * bytesPerSum);
current.position(c * bytesPerSum);
// compute the crc
crc.reset();
crc.update(current);
int expected = sums.get();
int calculated = (int) crc.getValue();
if (calculated != expected) {
// cast of c added to silence findbugs
long errPosn = dataOffset + (long) c * bytesPerSum;
throw new CompletionException(new ChecksumException(
"Checksum error: " + file + " at " + errPosn +
" exp: " + expected + " got: " + calculated, errPosn));
}
}
// if everything matches, we return the data
return data;
}
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {
// If the stream doesn't have checksums, just delegate.
VectoredReadUtils.validateVectoredReadRanges(ranges);
if (sums == null) {
datas.readVectored(ranges, allocate);
return;
}
int minSeek = minSeekForVectorReads();
int maxSize = maxReadSizeForVectorReads();
List<CombinedFileRange> dataRanges =
VectoredReadUtils.mergeSortedRanges(Arrays.asList(sortRanges(ranges)), bytesPerSum,
minSeek, maxReadSizeForVectorReads());
List<CombinedFileRange> checksumRanges = findChecksumRanges(dataRanges,
bytesPerSum, minSeek, maxSize);
sums.readVectored(checksumRanges, allocate);
datas.readVectored(dataRanges, allocate);
// Data read is correct. I have verified content of dataRanges.
// There is some bug below here as test (testVectoredReadMultipleRanges)
// is failing, should be
// somewhere while slicing the merged data into smaller user ranges.
// Spend some time figuring out but it is a complex code.
for(CombinedFileRange checksumRange: checksumRanges) {
for(FileRange dataRange: checksumRange.getUnderlying()) {
// when we have both the ranges, validate the checksum
CompletableFuture<ByteBuffer> result =
checksumRange.getData().thenCombineAsync(dataRange.getData(),
(sumBuffer, dataBuffer) ->
checkBytes(sumBuffer, checksumRange.getOffset(),
dataBuffer, dataRange.getOffset(), bytesPerSum, file));
// Now, slice the read data range to the user's ranges
for(FileRange original: ((CombinedFileRange) dataRange).getUnderlying()) {
original.setData(result.thenApply(
(b) -> VectoredReadUtils.sliceTo(b, dataRange.getOffset(), original)));
}
}
}
}
@Override
public boolean hasCapability(String capability) {
return datas.hasCapability(capability);
}
}
private static class FSDataBoundedInputStream extends FSDataInputStream {
private FileSystem fs;
private Path file;
private long fileLen = -1L;
FSDataBoundedInputStream(FileSystem fs, Path file, InputStream in) {
super(in);
this.fs = fs;
this.file = file;
}
@Override
public boolean markSupported() {
return false;
}
/* Return the file length */
private long getFileLength() throws IOException {
if( fileLen==-1L ) {
fileLen = fs.getContentSummary(file).getLength();
}
return fileLen;
}
/**
* Skips over and discards <code>n</code> bytes of data from the
* input stream.
*
*The <code>skip</code> method skips over some smaller number of bytes
* when reaching end of file before <code>n</code> bytes have been skipped.
* The actual number of bytes skipped is returned. If <code>n</code> is
* negative, no bytes are skipped.
*
* @param n the number of bytes to be skipped.
* @return the actual number of bytes skipped.
* @exception IOException if an I/O error occurs.
* ChecksumException if the chunk to skip to is corrupted
*/
@Override
public synchronized long skip(long n) throws IOException {
long curPos = getPos();
long fileLength = getFileLength();
if( n+curPos > fileLength ) {
n = fileLength - curPos;
}
return super.skip(n);
}
/**
* Seek to the given position in the stream.
* The next read() will be from that position.
*
* <p>This method does not allow seek past the end of the file.
* This produces IOException.
*
* @param pos the postion to seek to.
* @exception IOException if an I/O error occurs or seeks after EOF
* ChecksumException if the chunk to seek to is corrupted
*/
@Override
public synchronized void seek(long pos) throws IOException {
if (pos > getFileLength()) {
throw new EOFException("Cannot seek after EOF");
}
super.seek(pos);
}
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
* @throws IOException if an I/O error occurs.
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
FileSystem fs;
InputStream in;
if (verifyChecksum) {
fs = this;
in = new ChecksumFSInputChecker(this, f, bufferSize);
} else {
fs = getRawFileSystem();
in = fs.open(f, bufferSize);
}
return new FSDataBoundedInputStream(fs, f, in);
}
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new UnsupportedOperationException("Append is not supported "
+ "by ChecksumFileSystem");
}
@Override
public boolean truncate(Path f, long newLength) throws IOException {
throw new UnsupportedOperationException("Truncate is not supported "
+ "by ChecksumFileSystem");
}
@Override
public void concat(final Path f, final Path[] psrcs) throws IOException {
throw new UnsupportedOperationException("Concat is not supported "
+ "by ChecksumFileSystem");
}
/**
* Calculated the length of the checksum file in bytes.
* @param size the length of the data file in bytes
* @param bytesPerSum the number of bytes in a checksum block
* @return the number of bytes in the checksum file
*/
public static long getChecksumLength(long size, int bytesPerSum) {
//the checksum length is equal to size passed divided by bytesPerSum +
//bytes written in the beginning of the checksum file.
return ((size + bytesPerSum - 1) / bytesPerSum) * FSInputChecker.CHECKSUM_SIZE +
ChecksumFSInputChecker.HEADER_LENGTH;
}
/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer
implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
private boolean isClosed = false;
ChecksumFSOutputSummer(ChecksumFileSystem fs,
Path file,
boolean overwrite,
int bufferSize,
short replication,
long blockSize,
Progressable progress,
FsPermission permission)
throws IOException {
super(DataChecksum.newDataChecksum(DataChecksum.Type.CRC32,
fs.getBytesPerSum()));
int bytesPerSum = fs.getBytesPerSum();
this.datas = fs.getRawFileSystem().create(file, permission, overwrite,
bufferSize, replication, blockSize,
progress);
int sumBufferSize = fs.getSumBufferSize(bytesPerSum, bufferSize);
this.sums = fs.getRawFileSystem().create(fs.getChecksumFile(file),
permission, true, sumBufferSize,
replication, blockSize, null);
sums.write(CHECKSUM_VERSION, 0, CHECKSUM_VERSION.length);
sums.writeInt(bytesPerSum);
}
@Override
public void close() throws IOException {
try {
flushBuffer();
sums.close();
datas.close();
} finally {
isClosed = true;
}
}
@Override
protected void writeChunk(byte[] b, int offset, int len, byte[] checksum,
int ckoff, int cklen)
throws IOException {
datas.write(b, offset, len);
sums.write(checksum, ckoff, cklen);
}
@Override
protected void checkClosed() throws IOException {
if (isClosed) {
throw new ClosedChannelException();
}
}
/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
/**
* Probe the inner stream for a capability.
* Syncable operations are rejected before being passed down.
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
if (isProbeForSyncable(capability)) {
return false;
}
return datas.hasCapability(capability);
}
}
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return create(f, permission, overwrite, true, bufferSize,
replication, blockSize, progress);
}
private FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, boolean createParent, int bufferSize,
short replication, long blockSize,
Progressable progress) throws IOException {
Path parent = f.getParent();
if (parent != null) {
if (!createParent && !exists(parent)) {
throw new FileNotFoundException("Parent directory doesn't exist: "
+ parent);
} else if (!mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent
+ " (exists=" + exists(parent) + ", cwd=" + getWorkingDirectory()
+ ")");
}
}
final FSDataOutputStream out;
if (writeChecksum) {
out = new FSDataOutputStream(
new ChecksumFSOutputSummer(this, f, overwrite, bufferSize, replication,
blockSize, progress, permission), null);
} else {
out = fs.create(f, permission, overwrite, bufferSize, replication,
blockSize, progress);
// remove the checksum file since we aren't writing one
Path checkFile = getChecksumFile(f);
if (fs.exists(checkFile)) {
fs.delete(checkFile, true);
}
}
return out;
}
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
return create(f, permission, overwrite, false, bufferSize, replication,
blockSize, progress);
}
@Override
public FSDataOutputStream create(final Path f,
final FsPermission permission,
final EnumSet<CreateFlag> flags,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress,
final Options.ChecksumOpt checksumOpt) throws IOException {
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
bufferSize, replication, blockSize, progress);
}
@Override
public FSDataOutputStream createNonRecursive(final Path f,
final FsPermission permission,
final EnumSet<CreateFlag> flags,
final int bufferSize,
final short replication,
final long blockSize,
final Progressable progress) throws IOException {
return create(f, permission, flags.contains(CreateFlag.OVERWRITE),
false, bufferSize, replication,
blockSize, progress);
}
abstract class FsOperation {
boolean run(Path p) throws IOException {
boolean status = apply(p);
if (status) {
Path checkFile = getChecksumFile(p);
if (fs.exists(checkFile)) {
apply(checkFile);
}
}
return status;
}
abstract boolean apply(Path p) throws IOException;
}
@Override
public void setPermission(Path src, final FsPermission permission)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.setPermission(p, permission);
return true;
}
}.run(src);
}
@Override
public void setOwner(Path src, final String username, final String groupname)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.setOwner(p, username, groupname);
return true;
}
}.run(src);
}
@Override
public void setAcl(Path src, final List<AclEntry> aclSpec)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.setAcl(p, aclSpec);
return true;
}
}.run(src);
}
@Override
public void modifyAclEntries(Path src, final List<AclEntry> aclSpec)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.modifyAclEntries(p, aclSpec);
return true;
}
}.run(src);
}
@Override
public void removeAcl(Path src) throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.removeAcl(p);
return true;
}
}.run(src);
}
@Override
public void removeAclEntries(Path src, final List<AclEntry> aclSpec)
throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.removeAclEntries(p, aclSpec);
return true;
}
}.run(src);
}
@Override
public void removeDefaultAcl(Path src) throws IOException {
new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
fs.removeDefaultAcl(p);
return true;
}
}.run(src);
}
/**
* Set replication for an existing file.
* Implement the abstract <tt>setReplication</tt> of <tt>FileSystem</tt>
* @param src file name
* @param replication new replication
* @throws IOException if an I/O error occurs.
* @return true if successful;
* false if file does not exist or is a directory
*/
@Override
public boolean setReplication(Path src, final short replication)
throws IOException {
return new FsOperation(){
@Override
boolean apply(Path p) throws IOException {
return fs.setReplication(p, replication);
}
}.run(src);
}
/**
* Rename files/dirs
*/
@Override
@SuppressWarnings("deprecation")
public boolean rename(Path src, Path dst) throws IOException {
if (fs.isDirectory(src)) {
return fs.rename(src, dst);
} else {
if (fs.isDirectory(dst)) {
dst = new Path(dst, src.getName());
}
boolean value = fs.rename(src, dst);
if (!value)
return false;
Path srcCheckFile = getChecksumFile(src);
Path dstCheckFile = getChecksumFile(dst);
if (fs.exists(srcCheckFile)) { //try to rename checksum
value = fs.rename(srcCheckFile, dstCheckFile);
} else if (fs.exists(dstCheckFile)) {
// no src checksum, so remove dst checksum
value = fs.delete(dstCheckFile, true);
}
return value;
}
}
/**
* Implement the delete(Path, boolean) in checksum
* file system.
*/
@Override
public boolean delete(Path f, boolean recursive) throws IOException{
FileStatus fstatus = null;
try {
fstatus = fs.getFileStatus(f);
} catch(FileNotFoundException e) {
return false;
}
if (fstatus.isDirectory()) {
//this works since the crcs are in the same
//directories and the files. so we just delete
//everything in the underlying filesystem
return fs.delete(f, recursive);
} else {
Path checkFile = getChecksumFile(f);
if (fs.exists(checkFile)) {
fs.delete(checkFile, true);
}
return fs.delete(f, true);
}
}
final private static PathFilter DEFAULT_FILTER = new PathFilter() {
@Override
public boolean accept(Path file) {
return !isChecksumFile(file);
}
};
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f
* given path
* @return the statuses of the files/directories in the given path
* @throws IOException if an I/O error occurs.
*/
@Override
public FileStatus[] listStatus(Path f) throws IOException {
return fs.listStatus(f, DEFAULT_FILTER);
}
@Override
public RemoteIterator<FileStatus> listStatusIterator(final Path p)
throws IOException {
// Not-using fs#listStatusIterator() since it includes crc files as well
return new DirListingIterator<>(p);
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f
* given path
* @return the statuses of the files/directories in the given patch
* @throws IOException if an I/O error occurs.
*/
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws IOException {
return fs.listLocatedStatus(f, DEFAULT_FILTER);
}
@Override
public boolean mkdirs(Path f) throws IOException {
return fs.mkdirs(f);
}
@Override
public void copyFromLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(getLocal(conf), src, this, dst, delSrc, conf);
}
/**
* The src file is under FS, and the dst is on the local disk.
* Copy it from FS control to the local dst name.
*/
@Override
public void copyToLocalFile(boolean delSrc, Path src, Path dst)
throws IOException {
Configuration conf = getConf();
FileUtil.copy(this, src, getLocal(conf), dst, delSrc, conf);
}
/**
* The src file is under FS, and the dst is on the local disk.
* Copy it from FS control to the local dst name.
* If src and dst are directories, the copyCrc parameter
* determines whether to copy CRC files.
* @param src src path.
* @param dst dst path.
* @param copyCrc copy csc flag.
* @throws IOException if an I/O error occurs.
*/
@SuppressWarnings("deprecation")
public void copyToLocalFile(Path src, Path dst, boolean copyCrc)
throws IOException {
if (!fs.isDirectory(src)) { // source is a file
fs.copyToLocalFile(src, dst);
FileSystem localFs = getLocal(getConf()).getRawFileSystem();
if (localFs.isDirectory(dst)) {
dst = new Path(dst, src.getName());
}
dst = getChecksumFile(dst);
if (localFs.exists(dst)) { //remove old local checksum file
localFs.delete(dst, true);
}
Path checksumFile = getChecksumFile(src);
if (copyCrc && fs.exists(checksumFile)) { //copy checksum file
fs.copyToLocalFile(checksumFile, dst);
}
} else {
FileStatus[] srcs = listStatus(src);
for (FileStatus srcFile : srcs) {
copyToLocalFile(srcFile.getPath(),
new Path(dst, srcFile.getPath().getName()), copyCrc);
}
}
}
@Override
public Path startLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
return tmpLocalFile;
}
@Override
public void completeLocalOutput(Path fsOutputFile, Path tmpLocalFile)
throws IOException {
moveFromLocalFile(tmpLocalFile, fsOutputFile);
}
/**
* Report a checksum error to the file system.
* @param f the file name containing the error
* @param in the stream open on the file
* @param inPos the position of the beginning of the bad data in the file
* @param sums the stream open on the checksum file
* @param sumsPos the position of the beginning of the bad data in the checksum file
* @return if retry is necessary
*/
public boolean reportChecksumFailure(Path f, FSDataInputStream in,
long inPos, FSDataInputStream sums, long sumsPos) {
return false;
}
/**
* This is overridden to ensure that this class's
* {@link #openFileWithOptions}() method is called, and so ultimately
* its {@link #open(Path, int)}.
*
* {@inheritDoc}
*/
@Override
public FutureDataInputStreamBuilder openFile(final Path path)
throws IOException, UnsupportedOperationException {
return ((FutureDataInputStreamBuilderImpl)
createDataInputStreamBuilder(this, path)).getThisBuilder();
}
/**
* Open the file as a blocking call to {@link #open(Path, int)}.
*
* {@inheritDoc}
*/
@Override
protected CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path path,
final OpenFileParameters parameters) throws IOException {
AbstractFSBuilderImpl.rejectUnknownMandatoryKeys(
parameters.getMandatoryKeys(),
FS_OPTION_OPENFILE_STANDARD_OPTIONS,
"for " + path);
return LambdaUtils.eval(
new CompletableFuture<>(),
() -> open(path, parameters.getBufferSize()));
}
/**
* This is overridden to ensure that this class's create() method is
* ultimately called.
*
* {@inheritDoc}
*/
public FSDataOutputStreamBuilder createFile(Path path) {
return createDataOutputStreamBuilder(this, path)
.create().overwrite(true);
}
/**
* This is overridden to ensure that this class's create() method is
* ultimately called.
*
* {@inheritDoc}
*/
public FSDataOutputStreamBuilder appendFile(Path path) {
return createDataOutputStreamBuilder(this, path).append();
}
/**
* Disable those operations which the checksummed FS blocks.
* {@inheritDoc}
*/
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
// query the superclass, which triggers argument validation.
final Path p = makeQualified(path);
switch (validatePathCapabilityArgs(p, capability)) {
case CommonPathCapabilities.FS_APPEND:
case CommonPathCapabilities.FS_CONCAT:
return false;
default:
return super.hasPathCapability(p, capability);
}
}
}