blob: e7b72a93f3378550849fd5d8a88bc7880f185600 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.test.Whitebox;
import org.apache.hadoop.util.StringUtils;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.FileSystemTestHelper.*;
import java.io.*;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows;
import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import javax.annotation.Nonnull;
import static org.assertj.core.api.Assertions.assertThat;
/**
* This class tests the local file system via the FileSystem abstraction.
*/
public class TestLocalFileSystem {
private static final File base =
GenericTestUtils.getTestDir("work-dir/localfs");
private static final String TEST_ROOT_DIR = base.getAbsolutePath();
private final Path TEST_PATH = new Path(TEST_ROOT_DIR, "test-file");
private Configuration conf;
private LocalFileSystem fileSys;
/**
* standard test timeout: {@value}.
*/
public static final int DEFAULT_TEST_TIMEOUT = 60 * 1000;
/**
* Set the timeout for every test.
*/
@Rule
public Timeout testTimeout = new Timeout(DEFAULT_TEST_TIMEOUT);
private void cleanupFile(FileSystem fs, Path name) throws IOException {
assertTrue(fs.exists(name));
fs.delete(name, true);
assertTrue(!fs.exists(name));
}
@Before
public void setup() throws IOException {
conf = new Configuration(false);
conf.set("fs.file.impl", LocalFileSystem.class.getName());
fileSys = FileSystem.getLocal(conf);
fileSys.delete(new Path(TEST_ROOT_DIR), true);
}
@After
public void after() throws IOException {
FileUtil.setWritable(base, true);
FileUtil.fullyDelete(base);
assertTrue(!base.exists());
RawLocalFileSystem.useStatIfAvailable();
}
/**
* Test the capability of setting the working directory.
*/
@Test
public void testWorkingDirectory() throws IOException {
Path origDir = fileSys.getWorkingDirectory();
Path subdir = new Path(TEST_ROOT_DIR, "new");
try {
// make sure it doesn't already exist
assertTrue(!fileSys.exists(subdir));
// make it and check for it
assertTrue(fileSys.mkdirs(subdir));
assertTrue(fileSys.isDirectory(subdir));
fileSys.setWorkingDirectory(subdir);
// create a directory and check for it
Path dir1 = new Path("dir1");
assertTrue(fileSys.mkdirs(dir1));
assertTrue(fileSys.isDirectory(dir1));
// delete the directory and make sure it went away
fileSys.delete(dir1, true);
assertTrue(!fileSys.exists(dir1));
// create files and manipulate them.
Path file1 = new Path("file1");
Path file2 = new Path("sub/file2");
String contents = writeFile(fileSys, file1, 1);
fileSys.copyFromLocalFile(file1, file2);
assertTrue(fileSys.exists(file1));
assertTrue(fileSys.isFile(file1));
cleanupFile(fileSys, file2);
fileSys.copyToLocalFile(file1, file2);
cleanupFile(fileSys, file2);
// try a rename
fileSys.rename(file1, file2);
assertTrue(!fileSys.exists(file1));
assertTrue(fileSys.exists(file2));
fileSys.rename(file2, file1);
// try reading a file
InputStream stm = fileSys.open(file1);
byte[] buffer = new byte[3];
int bytesRead = stm.read(buffer, 0, 3);
assertEquals(contents, new String(buffer, 0, bytesRead));
stm.close();
} finally {
fileSys.setWorkingDirectory(origDir);
}
}
/**
* test Syncable interface on raw local file system
* @throws IOException
*/
@Test
public void testSyncable() throws IOException {
FileSystem fs = fileSys.getRawFileSystem();
Path file = new Path(TEST_ROOT_DIR, "syncable");
FSDataOutputStream out = fs.create(file);;
final int bytesWritten = 1;
byte[] expectedBuf = new byte[] {'0', '1', '2', '3'};
try {
out.write(expectedBuf, 0, 1);
out.hflush();
verifyFile(fs, file, bytesWritten, expectedBuf);
out.write(expectedBuf, bytesWritten, expectedBuf.length-bytesWritten);
out.hsync();
verifyFile(fs, file, expectedBuf.length, expectedBuf);
} finally {
out.close();
}
}
private void verifyFile(FileSystem fs, Path file, int bytesToVerify,
byte[] expectedBytes) throws IOException {
FSDataInputStream in = fs.open(file);
try {
byte[] readBuf = new byte[bytesToVerify];
in.readFully(readBuf, 0, bytesToVerify);
for (int i=0; i<bytesToVerify; i++) {
assertEquals(expectedBytes[i], readBuf[i]);
}
} finally {
in.close();
}
}
@Test
public void testCopy() throws IOException {
Path src = new Path(TEST_ROOT_DIR, "dingo");
Path dst = new Path(TEST_ROOT_DIR, "yak");
writeFile(fileSys, src, 1);
assertTrue(FileUtil.copy(fileSys, src, fileSys, dst, true, false, conf));
assertTrue(!fileSys.exists(src) && fileSys.exists(dst));
assertTrue(FileUtil.copy(fileSys, dst, fileSys, src, false, false, conf));
assertTrue(fileSys.exists(src) && fileSys.exists(dst));
assertTrue(FileUtil.copy(fileSys, src, fileSys, dst, true, true, conf));
assertTrue(!fileSys.exists(src) && fileSys.exists(dst));
fileSys.mkdirs(src);
assertTrue(FileUtil.copy(fileSys, dst, fileSys, src, false, false, conf));
Path tmp = new Path(src, dst.getName());
assertTrue(fileSys.exists(tmp) && fileSys.exists(dst));
assertTrue(FileUtil.copy(fileSys, dst, fileSys, src, false, true, conf));
assertTrue(fileSys.delete(tmp, true));
fileSys.mkdirs(tmp);
try {
FileUtil.copy(fileSys, dst, fileSys, src, true, true, conf);
fail("Failed to detect existing dir");
} catch (IOException e) {
// Expected
}
}
@Test
public void testHomeDirectory() throws IOException {
Path home = fileSys.makeQualified(
new Path(System.getProperty("user.home")));
Path fsHome = fileSys.getHomeDirectory();
assertEquals(home, fsHome);
}
@Test
public void testPathEscapes() throws IOException {
Path path = new Path(TEST_ROOT_DIR, "foo%bar");
writeFile(fileSys, path, 1);
FileStatus status = fileSys.getFileStatus(path);
assertEquals(fileSys.makeQualified(path), status.getPath());
cleanupFile(fileSys, path);
}
@Test
public void testCreateFileAndMkdirs() throws IOException {
Path test_dir = new Path(TEST_ROOT_DIR, "test_dir");
Path test_file = new Path(test_dir, "file1");
assertTrue(fileSys.mkdirs(test_dir));
final int fileSize = new Random().nextInt(1 << 20) + 1;
writeFile(fileSys, test_file, fileSize);
{
//check FileStatus and ContentSummary
final FileStatus status = fileSys.getFileStatus(test_file);
Assert.assertEquals(fileSize, status.getLen());
final ContentSummary summary = fileSys.getContentSummary(test_dir);
Assert.assertEquals(fileSize, summary.getLength());
}
// creating dir over a file
Path bad_dir = new Path(test_file, "another_dir");
try {
fileSys.mkdirs(bad_dir);
fail("Failed to detect existing file in path");
} catch (ParentNotDirectoryException e) {
// Expected
}
try {
fileSys.mkdirs(null);
fail("Failed to detect null in mkdir arg");
} catch (IllegalArgumentException e) {
// Expected
}
}
/** Test deleting a file, directory, and non-existent path */
@Test
public void testBasicDelete() throws IOException {
Path dir1 = new Path(TEST_ROOT_DIR, "dir1");
Path file1 = new Path(TEST_ROOT_DIR, "file1");
Path file2 = new Path(TEST_ROOT_DIR+"/dir1", "file2");
Path file3 = new Path(TEST_ROOT_DIR, "does-not-exist");
assertTrue(fileSys.mkdirs(dir1));
writeFile(fileSys, file1, 1);
writeFile(fileSys, file2, 1);
assertFalse("Returned true deleting non-existant path",
fileSys.delete(file3));
assertTrue("Did not delete file", fileSys.delete(file1));
assertTrue("Did not delete non-empty dir", fileSys.delete(dir1));
}
@Test
public void testStatistics() throws Exception {
int fileSchemeCount = 0;
for (Statistics stats : FileSystem.getAllStatistics()) {
if (stats.getScheme().equals("file")) {
fileSchemeCount++;
}
}
assertEquals(1, fileSchemeCount);
}
@Test
public void testHasFileDescriptor() throws IOException {
Path path = new Path(TEST_ROOT_DIR, "test-file");
writeFile(fileSys, path, 1);
BufferedFSInputStream bis = null;
try {
bis = new BufferedFSInputStream(new RawLocalFileSystem()
.new LocalFSFileInputStream(path), 1024);
assertNotNull(bis.getFileDescriptor());
} finally {
IOUtils.cleanupWithLogger(null, bis);
}
}
@Test
public void testListStatusWithColons() throws IOException {
assumeNotWindows();
File colonFile = new File(TEST_ROOT_DIR, "foo:bar");
colonFile.mkdirs();
FileStatus[] stats = fileSys.listStatus(new Path(TEST_ROOT_DIR));
assertEquals("Unexpected number of stats", 1, stats.length);
assertEquals("Bad path from stat", colonFile.getAbsolutePath(),
stats[0].getPath().toUri().getPath());
}
@Test
public void testListStatusReturnConsistentPathOnWindows() throws IOException {
assumeWindows();
String dirNoDriveSpec = TEST_ROOT_DIR;
if (dirNoDriveSpec.charAt(1) == ':')
dirNoDriveSpec = dirNoDriveSpec.substring(2);
File file = new File(dirNoDriveSpec, "foo");
file.mkdirs();
FileStatus[] stats = fileSys.listStatus(new Path(dirNoDriveSpec));
assertEquals("Unexpected number of stats", 1, stats.length);
assertEquals("Bad path from stat", new Path(file.getPath()).toUri().getPath(),
stats[0].getPath().toUri().getPath());
}
@Test
public void testReportChecksumFailure() throws IOException {
base.mkdirs();
assertTrue(base.exists() && base.isDirectory());
final File dir1 = new File(base, "dir1");
final File dir2 = new File(dir1, "dir2");
dir2.mkdirs();
assertTrue(dir2.exists() && FileUtil.canWrite(dir2));
final String dataFileName = "corruptedData";
final Path dataPath = new Path(new File(dir2, dataFileName).toURI());
final Path checksumPath = fileSys.getChecksumFile(dataPath);
final FSDataOutputStream fsdos = fileSys.create(dataPath);
try {
fsdos.writeUTF("foo");
} finally {
fsdos.close();
}
assertTrue(fileSys.pathToFile(dataPath).exists());
final long dataFileLength = fileSys.getFileStatus(dataPath).getLen();
assertTrue(dataFileLength > 0);
// check the the checksum file is created and not empty:
assertTrue(fileSys.pathToFile(checksumPath).exists());
final long checksumFileLength = fileSys.getFileStatus(checksumPath).getLen();
assertTrue(checksumFileLength > 0);
// this is a hack to force the #reportChecksumFailure() method to stop
// climbing up at the 'base' directory and use 'dir1/bad_files' as the
// corrupted files storage:
FileUtil.setWritable(base, false);
FSDataInputStream dataFsdis = fileSys.open(dataPath);
FSDataInputStream checksumFsdis = fileSys.open(checksumPath);
boolean retryIsNecessary = fileSys.reportChecksumFailure(dataPath, dataFsdis, 0, checksumFsdis, 0);
assertTrue(!retryIsNecessary);
// the data file should be moved:
assertTrue(!fileSys.pathToFile(dataPath).exists());
// the checksum file should be moved:
assertTrue(!fileSys.pathToFile(checksumPath).exists());
// check that the files exist in the new location where they were moved:
File[] dir1files = dir1.listFiles(new FileFilter() {
@Override
public boolean accept(File pathname) {
return pathname != null && !pathname.getName().equals("dir2");
}
});
assertTrue(dir1files != null);
assertTrue(dir1files.length == 1);
File badFilesDir = dir1files[0];
File[] badFiles = badFilesDir.listFiles();
assertTrue(badFiles != null);
assertTrue(badFiles.length == 2);
boolean dataFileFound = false;
boolean checksumFileFound = false;
for (File badFile: badFiles) {
if (badFile.getName().startsWith(dataFileName)) {
assertTrue(dataFileLength == badFile.length());
dataFileFound = true;
} else if (badFile.getName().contains(dataFileName + ".crc")) {
assertTrue(checksumFileLength == badFile.length());
checksumFileFound = true;
}
}
assertTrue(dataFileFound);
assertTrue(checksumFileFound);
}
private void checkTimesStatus(Path path,
long expectedModTime, long expectedAccTime) throws IOException {
FileStatus status = fileSys.getFileStatus(path);
assertEquals(expectedModTime, status.getModificationTime());
assertEquals(expectedAccTime, status.getAccessTime());
}
@Test
public void testSetTimes() throws Exception {
Path path = new Path(TEST_ROOT_DIR, "set-times");
writeFile(fileSys, path, 1);
// test only to the nearest second, as the raw FS may not
// support millisecond timestamps
long newModTime = 12345000;
long newAccTime = 23456000;
FileStatus status = fileSys.getFileStatus(path);
assertTrue("check we're actually changing something", newModTime != status.getModificationTime());
assertTrue("check we're actually changing something", newAccTime != status.getAccessTime());
fileSys.setTimes(path, newModTime, newAccTime);
checkTimesStatus(path, newModTime, newAccTime);
newModTime = 34567000;
fileSys.setTimes(path, newModTime, -1);
checkTimesStatus(path, newModTime, newAccTime);
newAccTime = 45678000;
fileSys.setTimes(path, -1, newAccTime);
checkTimesStatus(path, newModTime, newAccTime);
}
/**
* Regression test for HADOOP-9307: BufferedFSInputStream returning
* wrong results after certain sequences of seeks and reads.
*/
@Test
public void testBufferedFSInputStream() throws IOException {
Configuration conf = new Configuration();
conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class);
conf.setInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096);
FileSystem fs = FileSystem.newInstance(conf);
byte[] buf = new byte[10*1024];
new Random().nextBytes(buf);
// Write random bytes to file
FSDataOutputStream stream = fs.create(TEST_PATH);
try {
stream.write(buf);
} finally {
stream.close();
}
Random r = new Random();
FSDataInputStream stm = fs.open(TEST_PATH);
// Record the sequence of seeks and reads which trigger a failure.
int seeks[] = new int[10];
int reads[] = new int[10];
try {
for (int i = 0; i < 1000; i++) {
int seekOff = r.nextInt(buf.length);
int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000));
seeks[i % seeks.length] = seekOff;
reads[i % reads.length] = toRead;
verifyRead(stm, buf, seekOff, toRead);
}
} catch (AssertionError afe) {
StringBuilder sb = new StringBuilder();
sb.append("Sequence of actions:\n");
for (int j = 0; j < seeks.length; j++) {
sb.append("seek @ ").append(seeks[j]).append(" ")
.append("read ").append(reads[j]).append("\n");
}
System.err.println(sb.toString());
throw afe;
} finally {
stm.close();
}
}
/**
* Tests a simple rename of a directory.
*/
@Test
public void testRenameDirectory() throws IOException {
Path src = new Path(TEST_ROOT_DIR, "dir1");
Path dst = new Path(TEST_ROOT_DIR, "dir2");
fileSys.delete(src, true);
fileSys.delete(dst, true);
assertTrue(fileSys.mkdirs(src));
assertTrue(fileSys.rename(src, dst));
assertTrue(fileSys.exists(dst));
assertFalse(fileSys.exists(src));
}
/**
* Tests that renaming a directory replaces the destination if the destination
* is an existing empty directory.
*
* Before:
* /dir1
* /file1
* /file2
* /dir2
*
* After rename("/dir1", "/dir2"):
* /dir2
* /file1
* /file2
*/
@Test
public void testRenameReplaceExistingEmptyDirectory() throws IOException {
Path src = new Path(TEST_ROOT_DIR, "dir1");
Path dst = new Path(TEST_ROOT_DIR, "dir2");
fileSys.delete(src, true);
fileSys.delete(dst, true);
assertTrue(fileSys.mkdirs(src));
writeFile(fileSys, new Path(src, "file1"), 1);
writeFile(fileSys, new Path(src, "file2"), 1);
assertTrue(fileSys.mkdirs(dst));
assertTrue(fileSys.rename(src, dst));
assertTrue(fileSys.exists(dst));
assertTrue(fileSys.exists(new Path(dst, "file1")));
assertTrue(fileSys.exists(new Path(dst, "file2")));
assertFalse(fileSys.exists(src));
}
/**
* Tests that renaming a directory to an existing directory that is not empty
* results in a full copy of source to destination.
*
* Before:
* /dir1
* /dir2
* /dir3
* /file1
* /file2
*
* After rename("/dir1/dir2/dir3", "/dir1"):
* /dir1
* /dir3
* /file1
* /file2
*/
@Test
public void testRenameMoveToExistingNonEmptyDirectory() throws IOException {
Path src = new Path(TEST_ROOT_DIR, "dir1/dir2/dir3");
Path dst = new Path(TEST_ROOT_DIR, "dir1");
fileSys.delete(src, true);
fileSys.delete(dst, true);
assertTrue(fileSys.mkdirs(src));
writeFile(fileSys, new Path(src, "file1"), 1);
writeFile(fileSys, new Path(src, "file2"), 1);
assertTrue(fileSys.exists(dst));
assertTrue(fileSys.rename(src, dst));
assertTrue(fileSys.exists(dst));
assertTrue(fileSys.exists(new Path(dst, "dir3")));
assertTrue(fileSys.exists(new Path(dst, "dir3/file1")));
assertTrue(fileSys.exists(new Path(dst, "dir3/file2")));
assertFalse(fileSys.exists(src));
}
private void verifyRead(FSDataInputStream stm, byte[] fileContents,
int seekOff, int toRead) throws IOException {
byte[] out = new byte[toRead];
stm.seek(seekOff);
stm.readFully(out);
byte[] expected = Arrays.copyOfRange(fileContents, seekOff, seekOff+toRead);
if (!Arrays.equals(out, expected)) {
String s ="\nExpected: " +
StringUtils.byteToHexString(expected) +
"\ngot: " +
StringUtils.byteToHexString(out) +
"\noff=" + seekOff + " len=" + toRead;
fail(s);
}
}
@Test
public void testStripFragmentFromPath() throws Exception {
FileSystem fs = FileSystem.getLocal(new Configuration());
Path pathQualified = TEST_PATH.makeQualified(fs.getUri(),
fs.getWorkingDirectory());
Path pathWithFragment = new Path(
new URI(pathQualified.toString() + "#glacier"));
// Create test file with fragment
FileSystemTestHelper.createFile(fs, pathWithFragment);
Path resolved = fs.resolvePath(pathWithFragment);
assertEquals("resolvePath did not strip fragment from Path", pathQualified,
resolved);
}
@Test
public void testAppendSetsPosCorrectly() throws Exception {
FileSystem fs = fileSys.getRawFileSystem();
Path file = new Path(TEST_ROOT_DIR, "test-append");
fs.delete(file, true);
FSDataOutputStream out = fs.create(file);
try {
out.write("text1".getBytes());
} finally {
out.close();
}
// Verify the position
out = fs.append(file);
try {
assertEquals(5, out.getPos());
out.write("text2".getBytes());
} finally {
out.close();
}
// Verify the content
FSDataInputStream in = fs.open(file);
try {
byte[] buf = new byte[in.available()];
in.readFully(buf);
assertEquals("text1text2", new String(buf));
} finally {
in.close();
}
}
@Test
public void testFileStatusPipeFile() throws Exception {
RawLocalFileSystem origFs = new RawLocalFileSystem();
RawLocalFileSystem fs = spy(origFs);
Configuration conf = mock(Configuration.class);
fs.setConf(conf);
Whitebox.setInternalState(fs, "useDeprecatedFileStatus", false);
Path path = new Path("/foo");
File pipe = mock(File.class);
when(pipe.isFile()).thenReturn(false);
when(pipe.isDirectory()).thenReturn(false);
when(pipe.exists()).thenReturn(true);
FileStatus stat = mock(FileStatus.class);
doReturn(pipe).when(fs).pathToFile(path);
doReturn(stat).when(fs).getFileStatus(path);
FileStatus[] stats = fs.listStatus(path);
assertTrue(stats != null && stats.length == 1 && stats[0] == stat);
}
@Test
public void testFSOutputStreamBuilder() throws Exception {
Path path = new Path(TEST_ROOT_DIR, "testBuilder");
try {
FSDataOutputStreamBuilder builder =
fileSys.createFile(path).recursive();
FSDataOutputStream out = builder.build();
String content = "Create with a generic type of createFile!";
byte[] contentOrigin = content.getBytes("UTF8");
out.write(contentOrigin);
out.close();
FSDataInputStream input = fileSys.open(path);
byte[] buffer =
new byte[(int) (fileSys.getFileStatus(path).getLen())];
input.readFully(0, buffer);
input.close();
Assert.assertArrayEquals("The data be read should equals with the "
+ "data written.", contentOrigin, buffer);
} catch (IOException e) {
throw e;
}
// Test value not being set for replication, block size, buffer size
// and permission
FSDataOutputStreamBuilder builder =
fileSys.createFile(path);
try (FSDataOutputStream stream = builder.build()) {
assertThat(builder.getBlockSize())
.withFailMessage("Should be default block size")
.isEqualTo(fileSys.getDefaultBlockSize());
assertThat(builder.getReplication())
.withFailMessage("Should be default replication factor")
.isEqualTo(fileSys.getDefaultReplication());
assertThat(builder.getBufferSize())
.withFailMessage("Should be default buffer size")
.isEqualTo(fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT));
assertThat(builder.getPermission())
.withFailMessage("Should be default permission")
.isEqualTo(FsPermission.getFileDefault());
}
// Test set 0 to replication, block size and buffer size
builder = fileSys.createFile(path);
builder.bufferSize(0).blockSize(0).replication((short) 0);
assertThat(builder.getBlockSize())
.withFailMessage("Block size should be 0")
.isZero();
assertThat(builder.getReplication())
.withFailMessage("Replication factor should be 0")
.isZero();
assertThat(builder.getBufferSize())
.withFailMessage("Buffer size should be 0")
.isZero();
}
/**
* A builder to verify configuration keys are supported.
*/
private static class BuilderWithSupportedKeys
extends FSDataOutputStreamBuilder<FSDataOutputStream,
BuilderWithSupportedKeys> {
private final Set<String> supportedKeys = new HashSet<>();
BuilderWithSupportedKeys(@Nonnull final Collection<String> supportedKeys,
@Nonnull FileSystem fileSystem, @Nonnull Path p) {
super(fileSystem, p);
this.supportedKeys.addAll(supportedKeys);
}
@Override
public BuilderWithSupportedKeys getThisBuilder() {
return this;
}
@Override
public FSDataOutputStream build()
throws IllegalArgumentException, IOException {
Set<String> unsupported = new HashSet<>(getMandatoryKeys());
unsupported.removeAll(supportedKeys);
Preconditions.checkArgument(unsupported.isEmpty(),
"unsupported key found: " + supportedKeys);
return getFS().create(
getPath(), getPermission(), getFlags(), getBufferSize(),
getReplication(), getBlockSize(), getProgress(), getChecksumOpt());
}
}
@Test
public void testFSOutputStreamBuilderOptions() throws Exception {
Path path = new Path(TEST_ROOT_DIR, "testBuilderOpt");
final List<String> supportedKeys = Arrays.asList("strM");
FSDataOutputStreamBuilder<?, ?> builder =
new BuilderWithSupportedKeys(supportedKeys, fileSys, path);
builder.opt("strKey", "value");
builder.opt("intKey", 123);
builder.opt("strM", "ignored");
// Over-write an optional value with a mandatory value.
builder.must("strM", "value");
builder.must("unsupported", 12.34);
assertEquals("Optional value should be overwrite by a mandatory value",
"value", builder.getOptions().get("strM"));
Set<String> mandatoryKeys = builder.getMandatoryKeys();
Set<String> expectedKeys = new HashSet<>();
expectedKeys.add("strM");
expectedKeys.add("unsupported");
assertEquals(expectedKeys, mandatoryKeys);
assertEquals(2, mandatoryKeys.size());
LambdaTestUtils.intercept(IllegalArgumentException.class,
"unsupported key found", builder::build
);
}
private static final int CRC_SIZE = 12;
private static final byte[] DATA = "1234567890".getBytes();
/**
* Get the statistics for the file schema. Contains assertions
* @return the statistics on all file:// IO.
*/
protected Statistics getFileStatistics() {
final List<Statistics> all = FileSystem.getAllStatistics();
final List<Statistics> fileStats = all
.stream()
.filter(s -> s.getScheme().equals("file"))
.collect(Collectors.toList());
assertEquals("Number of statistics counters for file://",
1, fileStats.size());
// this should be used for local and rawLocal, as they share the
// same schema (although their class is different)
return fileStats.get(0);
}
/**
* Write the byte array {@link #DATA} to the given output stream.
* @param s stream to write to.
* @throws IOException failure to write/close the file
*/
private void writeData(FSDataOutputStream s) throws IOException {
s.write(DATA);
s.close();
}
/**
* Evaluate the closure while counting bytes written during
* its execution, and verify that the count included the CRC
* write as well as the data.
* After the operation, the file is deleted.
* @param operation operation for assertion method.
* @param path path to write
* @param callable expression evaluated
* @param delete should the file be deleted after?
*/
private void assertWritesCRC(String operation, Path path,
LambdaTestUtils.VoidCallable callable, boolean delete) throws Exception {
final Statistics stats = getFileStatistics();
final long bytesOut0 = stats.getBytesWritten();
try {
callable.call();
assertEquals("Bytes written in " + operation + "; stats=" + stats,
CRC_SIZE + DATA.length, stats.getBytesWritten() - bytesOut0);
} finally {
if (delete) {
// clean up
try {
fileSys.delete(path, false);
} catch (IOException ignored) {
// ignore this cleanup failure
}
}
}
}
/**
* Verify that File IO through the classic non-builder APIs generate
* statistics which imply that CRCs were read and written.
*/
@Test
public void testCRCwithClassicAPIs() throws Throwable {
final Path file = new Path(TEST_ROOT_DIR, "testByteCountersClassicAPIs");
assertWritesCRC("create()",
file,
() -> writeData(fileSys.create(file, true)),
false);
final Statistics stats = getFileStatistics();
final long bytesRead0 = stats.getBytesRead();
fileSys.open(file).close();
final long bytesRead1 = stats.getBytesRead();
assertEquals("Bytes read in open() call with stats " + stats,
CRC_SIZE, bytesRead1 - bytesRead0);
}
/**
* create/7 to use write the CRC.
*/
@Test
public void testCRCwithCreate7() throws Throwable {
final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreate7");
assertWritesCRC("create/7",
file,
() -> writeData(
fileSys.create(file,
FsPermission.getFileDefault(),
true,
8192,
(short)1,
16384,
null)),
true);
}
/**
* Create with ChecksumOpt to create checksums.
* If the LocalFS ever interpreted the flag, this test may fail.
*/
@Test
public void testCRCwithCreateChecksumOpt() throws Throwable {
final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreateChecksumOpt");
assertWritesCRC("create with checksum opt",
file,
() -> writeData(
fileSys.create(file,
FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE),
8192,
(short)1,
16384,
null,
Options.ChecksumOpt.createDisabled())),
true);
}
/**
* Create createNonRecursive/6.
*/
@Test
public void testCRCwithCreateNonRecursive6() throws Throwable {
fileSys.mkdirs(TEST_PATH);
final Path file = new Path(TEST_ROOT_DIR,
"testCRCwithCreateNonRecursive6");
assertWritesCRC("create with checksum opt",
file,
() -> writeData(
fileSys.createNonRecursive(file,
FsPermission.getFileDefault(),
true,
8192,
(short)1,
16384,
null)),
true);
}
/**
* Create createNonRecursive with CreateFlags.
*/
@Test
public void testCRCwithCreateNonRecursiveCreateFlags() throws Throwable {
fileSys.mkdirs(TEST_PATH);
final Path file = new Path(TEST_ROOT_DIR,
"testCRCwithCreateNonRecursiveCreateFlags");
assertWritesCRC("create with checksum opt",
file,
() -> writeData(
fileSys.createNonRecursive(file,
FsPermission.getFileDefault(),
EnumSet.of(CreateFlag.CREATE),
8192,
(short)1,
16384,
null)),
true);
}
/**
* This relates to MAPREDUCE-7184, where the openFile() call's
* CRC count wasn't making into the statistics for the current thread.
* If the evaluation was in a separate thread you'd expect that,
* but if the completable future is in fact being synchronously completed
* it should not happen.
*/
@Test
public void testReadIncludesCRCwithBuilders() throws Throwable {
final Path file = new Path(TEST_ROOT_DIR,
"testReadIncludesCRCwithBuilders");
Statistics stats = getFileStatistics();
// write the file using the builder API
assertWritesCRC("createFile()",
file,
() -> writeData(
fileSys.createFile(file)
.overwrite(true).recursive()
.build()),
false);
// now read back the data, again with the builder API
final long bytesRead0 = stats.getBytesRead();
fileSys.openFile(file).build().get().close();
assertEquals("Bytes read in openFile() call with stats " + stats,
CRC_SIZE, stats.getBytesRead() - bytesRead0);
// now write with overwrite = true
assertWritesCRC("createFileNonRecursive()",
file,
() -> {
try (FSDataOutputStream s = fileSys.createFile(file)
.overwrite(true)
.build()) {
s.write(DATA);
}
},
true);
}
/**
* Write with the builder, using the normal recursive create
* with create flags containing the overwrite option.
*/
@Test
public void testWriteWithBuildersRecursive() throws Throwable {
final Path file = new Path(TEST_ROOT_DIR,
"testWriteWithBuildersRecursive");
Statistics stats = getFileStatistics();
// write the file using the builder API
assertWritesCRC("createFile()",
file,
() -> writeData(
fileSys.createFile(file)
.overwrite(false)
.recursive()
.build()),
true);
}
}