| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.fs; |
| |
| import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem.Statistics; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.test.GenericTestUtils; |
| import org.apache.hadoop.test.LambdaTestUtils; |
| import org.apache.hadoop.test.Whitebox; |
| import org.apache.hadoop.util.StringUtils; |
| |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT; |
| import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY; |
| import static org.apache.hadoop.fs.FileSystemTestHelper.*; |
| |
| import java.io.*; |
| import java.net.URI; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.stream.Collectors; |
| |
| import static org.apache.hadoop.test.PlatformAssumptions.assumeNotWindows; |
| import static org.apache.hadoop.test.PlatformAssumptions.assumeWindows; |
| import static org.junit.Assert.assertEquals; |
| import static org.junit.Assert.assertFalse; |
| import static org.junit.Assert.assertNotNull; |
| import static org.junit.Assert.assertTrue; |
| import static org.junit.Assert.fail; |
| import static org.mockito.Mockito.*; |
| |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.Timeout; |
| |
| import javax.annotation.Nonnull; |
| |
| import static org.assertj.core.api.Assertions.assertThat; |
| |
| /** |
| * This class tests the local file system via the FileSystem abstraction. |
| */ |
| public class TestLocalFileSystem { |
| private static final File base = |
| GenericTestUtils.getTestDir("work-dir/localfs"); |
| |
| private static final String TEST_ROOT_DIR = base.getAbsolutePath(); |
| private final Path TEST_PATH = new Path(TEST_ROOT_DIR, "test-file"); |
| private Configuration conf; |
| private LocalFileSystem fileSys; |
| |
| /** |
| * standard test timeout: {@value}. |
| */ |
| public static final int DEFAULT_TEST_TIMEOUT = 60 * 1000; |
| |
| /** |
| * Set the timeout for every test. |
| */ |
| @Rule |
| public Timeout testTimeout = new Timeout(DEFAULT_TEST_TIMEOUT); |
| |
| private void cleanupFile(FileSystem fs, Path name) throws IOException { |
| assertTrue(fs.exists(name)); |
| fs.delete(name, true); |
| assertTrue(!fs.exists(name)); |
| } |
| |
| @Before |
| public void setup() throws IOException { |
| conf = new Configuration(false); |
| conf.set("fs.file.impl", LocalFileSystem.class.getName()); |
| fileSys = FileSystem.getLocal(conf); |
| fileSys.delete(new Path(TEST_ROOT_DIR), true); |
| } |
| |
| @After |
| public void after() throws IOException { |
| FileUtil.setWritable(base, true); |
| FileUtil.fullyDelete(base); |
| assertTrue(!base.exists()); |
| RawLocalFileSystem.useStatIfAvailable(); |
| } |
| |
| /** |
| * Test the capability of setting the working directory. |
| */ |
| @Test |
| public void testWorkingDirectory() throws IOException { |
| Path origDir = fileSys.getWorkingDirectory(); |
| Path subdir = new Path(TEST_ROOT_DIR, "new"); |
| try { |
| // make sure it doesn't already exist |
| assertTrue(!fileSys.exists(subdir)); |
| // make it and check for it |
| assertTrue(fileSys.mkdirs(subdir)); |
| assertTrue(fileSys.isDirectory(subdir)); |
| |
| fileSys.setWorkingDirectory(subdir); |
| |
| // create a directory and check for it |
| Path dir1 = new Path("dir1"); |
| assertTrue(fileSys.mkdirs(dir1)); |
| assertTrue(fileSys.isDirectory(dir1)); |
| |
| // delete the directory and make sure it went away |
| fileSys.delete(dir1, true); |
| assertTrue(!fileSys.exists(dir1)); |
| |
| // create files and manipulate them. |
| Path file1 = new Path("file1"); |
| Path file2 = new Path("sub/file2"); |
| String contents = writeFile(fileSys, file1, 1); |
| fileSys.copyFromLocalFile(file1, file2); |
| assertTrue(fileSys.exists(file1)); |
| assertTrue(fileSys.isFile(file1)); |
| cleanupFile(fileSys, file2); |
| fileSys.copyToLocalFile(file1, file2); |
| cleanupFile(fileSys, file2); |
| |
| // try a rename |
| fileSys.rename(file1, file2); |
| assertTrue(!fileSys.exists(file1)); |
| assertTrue(fileSys.exists(file2)); |
| fileSys.rename(file2, file1); |
| |
| // try reading a file |
| InputStream stm = fileSys.open(file1); |
| byte[] buffer = new byte[3]; |
| int bytesRead = stm.read(buffer, 0, 3); |
| assertEquals(contents, new String(buffer, 0, bytesRead)); |
| stm.close(); |
| } finally { |
| fileSys.setWorkingDirectory(origDir); |
| } |
| } |
| |
| /** |
| * test Syncable interface on raw local file system |
| * @throws IOException |
| */ |
| @Test |
| public void testSyncable() throws IOException { |
| FileSystem fs = fileSys.getRawFileSystem(); |
| Path file = new Path(TEST_ROOT_DIR, "syncable"); |
| FSDataOutputStream out = fs.create(file);; |
| final int bytesWritten = 1; |
| byte[] expectedBuf = new byte[] {'0', '1', '2', '3'}; |
| try { |
| out.write(expectedBuf, 0, 1); |
| out.hflush(); |
| verifyFile(fs, file, bytesWritten, expectedBuf); |
| out.write(expectedBuf, bytesWritten, expectedBuf.length-bytesWritten); |
| out.hsync(); |
| verifyFile(fs, file, expectedBuf.length, expectedBuf); |
| } finally { |
| out.close(); |
| } |
| } |
| |
| private void verifyFile(FileSystem fs, Path file, int bytesToVerify, |
| byte[] expectedBytes) throws IOException { |
| FSDataInputStream in = fs.open(file); |
| try { |
| byte[] readBuf = new byte[bytesToVerify]; |
| in.readFully(readBuf, 0, bytesToVerify); |
| for (int i=0; i<bytesToVerify; i++) { |
| assertEquals(expectedBytes[i], readBuf[i]); |
| } |
| } finally { |
| in.close(); |
| } |
| } |
| |
| @Test |
| public void testCopy() throws IOException { |
| Path src = new Path(TEST_ROOT_DIR, "dingo"); |
| Path dst = new Path(TEST_ROOT_DIR, "yak"); |
| writeFile(fileSys, src, 1); |
| assertTrue(FileUtil.copy(fileSys, src, fileSys, dst, true, false, conf)); |
| assertTrue(!fileSys.exists(src) && fileSys.exists(dst)); |
| assertTrue(FileUtil.copy(fileSys, dst, fileSys, src, false, false, conf)); |
| assertTrue(fileSys.exists(src) && fileSys.exists(dst)); |
| assertTrue(FileUtil.copy(fileSys, src, fileSys, dst, true, true, conf)); |
| assertTrue(!fileSys.exists(src) && fileSys.exists(dst)); |
| fileSys.mkdirs(src); |
| assertTrue(FileUtil.copy(fileSys, dst, fileSys, src, false, false, conf)); |
| Path tmp = new Path(src, dst.getName()); |
| assertTrue(fileSys.exists(tmp) && fileSys.exists(dst)); |
| assertTrue(FileUtil.copy(fileSys, dst, fileSys, src, false, true, conf)); |
| assertTrue(fileSys.delete(tmp, true)); |
| fileSys.mkdirs(tmp); |
| try { |
| FileUtil.copy(fileSys, dst, fileSys, src, true, true, conf); |
| fail("Failed to detect existing dir"); |
| } catch (IOException e) { |
| // Expected |
| } |
| } |
| |
| @Test |
| public void testHomeDirectory() throws IOException { |
| Path home = fileSys.makeQualified( |
| new Path(System.getProperty("user.home"))); |
| Path fsHome = fileSys.getHomeDirectory(); |
| assertEquals(home, fsHome); |
| } |
| |
| @Test |
| public void testPathEscapes() throws IOException { |
| Path path = new Path(TEST_ROOT_DIR, "foo%bar"); |
| writeFile(fileSys, path, 1); |
| FileStatus status = fileSys.getFileStatus(path); |
| assertEquals(fileSys.makeQualified(path), status.getPath()); |
| cleanupFile(fileSys, path); |
| } |
| |
| @Test |
| public void testCreateFileAndMkdirs() throws IOException { |
| Path test_dir = new Path(TEST_ROOT_DIR, "test_dir"); |
| Path test_file = new Path(test_dir, "file1"); |
| assertTrue(fileSys.mkdirs(test_dir)); |
| |
| final int fileSize = new Random().nextInt(1 << 20) + 1; |
| writeFile(fileSys, test_file, fileSize); |
| |
| { |
| //check FileStatus and ContentSummary |
| final FileStatus status = fileSys.getFileStatus(test_file); |
| Assert.assertEquals(fileSize, status.getLen()); |
| final ContentSummary summary = fileSys.getContentSummary(test_dir); |
| Assert.assertEquals(fileSize, summary.getLength()); |
| } |
| |
| // creating dir over a file |
| Path bad_dir = new Path(test_file, "another_dir"); |
| |
| try { |
| fileSys.mkdirs(bad_dir); |
| fail("Failed to detect existing file in path"); |
| } catch (ParentNotDirectoryException e) { |
| // Expected |
| } |
| |
| try { |
| fileSys.mkdirs(null); |
| fail("Failed to detect null in mkdir arg"); |
| } catch (IllegalArgumentException e) { |
| // Expected |
| } |
| } |
| |
| /** Test deleting a file, directory, and non-existent path */ |
| @Test |
| public void testBasicDelete() throws IOException { |
| Path dir1 = new Path(TEST_ROOT_DIR, "dir1"); |
| Path file1 = new Path(TEST_ROOT_DIR, "file1"); |
| Path file2 = new Path(TEST_ROOT_DIR+"/dir1", "file2"); |
| Path file3 = new Path(TEST_ROOT_DIR, "does-not-exist"); |
| assertTrue(fileSys.mkdirs(dir1)); |
| writeFile(fileSys, file1, 1); |
| writeFile(fileSys, file2, 1); |
| assertFalse("Returned true deleting non-existant path", |
| fileSys.delete(file3)); |
| assertTrue("Did not delete file", fileSys.delete(file1)); |
| assertTrue("Did not delete non-empty dir", fileSys.delete(dir1)); |
| } |
| |
| @Test |
| public void testStatistics() throws Exception { |
| int fileSchemeCount = 0; |
| for (Statistics stats : FileSystem.getAllStatistics()) { |
| if (stats.getScheme().equals("file")) { |
| fileSchemeCount++; |
| } |
| } |
| assertEquals(1, fileSchemeCount); |
| } |
| |
| @Test |
| public void testHasFileDescriptor() throws IOException { |
| Path path = new Path(TEST_ROOT_DIR, "test-file"); |
| writeFile(fileSys, path, 1); |
| BufferedFSInputStream bis = null; |
| try { |
| bis = new BufferedFSInputStream(new RawLocalFileSystem() |
| .new LocalFSFileInputStream(path), 1024); |
| assertNotNull(bis.getFileDescriptor()); |
| } finally { |
| IOUtils.cleanupWithLogger(null, bis); |
| } |
| } |
| |
| @Test |
| public void testListStatusWithColons() throws IOException { |
| assumeNotWindows(); |
| File colonFile = new File(TEST_ROOT_DIR, "foo:bar"); |
| colonFile.mkdirs(); |
| FileStatus[] stats = fileSys.listStatus(new Path(TEST_ROOT_DIR)); |
| assertEquals("Unexpected number of stats", 1, stats.length); |
| assertEquals("Bad path from stat", colonFile.getAbsolutePath(), |
| stats[0].getPath().toUri().getPath()); |
| } |
| |
| @Test |
| public void testListStatusReturnConsistentPathOnWindows() throws IOException { |
| assumeWindows(); |
| String dirNoDriveSpec = TEST_ROOT_DIR; |
| if (dirNoDriveSpec.charAt(1) == ':') |
| dirNoDriveSpec = dirNoDriveSpec.substring(2); |
| |
| File file = new File(dirNoDriveSpec, "foo"); |
| file.mkdirs(); |
| FileStatus[] stats = fileSys.listStatus(new Path(dirNoDriveSpec)); |
| assertEquals("Unexpected number of stats", 1, stats.length); |
| assertEquals("Bad path from stat", new Path(file.getPath()).toUri().getPath(), |
| stats[0].getPath().toUri().getPath()); |
| } |
| |
| @Test |
| public void testReportChecksumFailure() throws IOException { |
| base.mkdirs(); |
| assertTrue(base.exists() && base.isDirectory()); |
| |
| final File dir1 = new File(base, "dir1"); |
| final File dir2 = new File(dir1, "dir2"); |
| dir2.mkdirs(); |
| assertTrue(dir2.exists() && FileUtil.canWrite(dir2)); |
| |
| final String dataFileName = "corruptedData"; |
| final Path dataPath = new Path(new File(dir2, dataFileName).toURI()); |
| final Path checksumPath = fileSys.getChecksumFile(dataPath); |
| final FSDataOutputStream fsdos = fileSys.create(dataPath); |
| try { |
| fsdos.writeUTF("foo"); |
| } finally { |
| fsdos.close(); |
| } |
| assertTrue(fileSys.pathToFile(dataPath).exists()); |
| final long dataFileLength = fileSys.getFileStatus(dataPath).getLen(); |
| assertTrue(dataFileLength > 0); |
| |
| // check the the checksum file is created and not empty: |
| assertTrue(fileSys.pathToFile(checksumPath).exists()); |
| final long checksumFileLength = fileSys.getFileStatus(checksumPath).getLen(); |
| assertTrue(checksumFileLength > 0); |
| |
| // this is a hack to force the #reportChecksumFailure() method to stop |
| // climbing up at the 'base' directory and use 'dir1/bad_files' as the |
| // corrupted files storage: |
| FileUtil.setWritable(base, false); |
| |
| FSDataInputStream dataFsdis = fileSys.open(dataPath); |
| FSDataInputStream checksumFsdis = fileSys.open(checksumPath); |
| |
| boolean retryIsNecessary = fileSys.reportChecksumFailure(dataPath, dataFsdis, 0, checksumFsdis, 0); |
| assertTrue(!retryIsNecessary); |
| |
| // the data file should be moved: |
| assertTrue(!fileSys.pathToFile(dataPath).exists()); |
| // the checksum file should be moved: |
| assertTrue(!fileSys.pathToFile(checksumPath).exists()); |
| |
| // check that the files exist in the new location where they were moved: |
| File[] dir1files = dir1.listFiles(new FileFilter() { |
| @Override |
| public boolean accept(File pathname) { |
| return pathname != null && !pathname.getName().equals("dir2"); |
| } |
| }); |
| assertTrue(dir1files != null); |
| assertTrue(dir1files.length == 1); |
| File badFilesDir = dir1files[0]; |
| |
| File[] badFiles = badFilesDir.listFiles(); |
| assertTrue(badFiles != null); |
| assertTrue(badFiles.length == 2); |
| boolean dataFileFound = false; |
| boolean checksumFileFound = false; |
| for (File badFile: badFiles) { |
| if (badFile.getName().startsWith(dataFileName)) { |
| assertTrue(dataFileLength == badFile.length()); |
| dataFileFound = true; |
| } else if (badFile.getName().contains(dataFileName + ".crc")) { |
| assertTrue(checksumFileLength == badFile.length()); |
| checksumFileFound = true; |
| } |
| } |
| assertTrue(dataFileFound); |
| assertTrue(checksumFileFound); |
| } |
| |
| private void checkTimesStatus(Path path, |
| long expectedModTime, long expectedAccTime) throws IOException { |
| FileStatus status = fileSys.getFileStatus(path); |
| assertEquals(expectedModTime, status.getModificationTime()); |
| assertEquals(expectedAccTime, status.getAccessTime()); |
| } |
| |
| @Test |
| public void testSetTimes() throws Exception { |
| Path path = new Path(TEST_ROOT_DIR, "set-times"); |
| writeFile(fileSys, path, 1); |
| |
| // test only to the nearest second, as the raw FS may not |
| // support millisecond timestamps |
| long newModTime = 12345000; |
| long newAccTime = 23456000; |
| |
| FileStatus status = fileSys.getFileStatus(path); |
| assertTrue("check we're actually changing something", newModTime != status.getModificationTime()); |
| assertTrue("check we're actually changing something", newAccTime != status.getAccessTime()); |
| |
| fileSys.setTimes(path, newModTime, newAccTime); |
| checkTimesStatus(path, newModTime, newAccTime); |
| |
| newModTime = 34567000; |
| |
| fileSys.setTimes(path, newModTime, -1); |
| checkTimesStatus(path, newModTime, newAccTime); |
| |
| newAccTime = 45678000; |
| |
| fileSys.setTimes(path, -1, newAccTime); |
| checkTimesStatus(path, newModTime, newAccTime); |
| } |
| |
| /** |
| * Regression test for HADOOP-9307: BufferedFSInputStream returning |
| * wrong results after certain sequences of seeks and reads. |
| */ |
| @Test |
| public void testBufferedFSInputStream() throws IOException { |
| Configuration conf = new Configuration(); |
| conf.setClass("fs.file.impl", RawLocalFileSystem.class, FileSystem.class); |
| conf.setInt(CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, 4096); |
| FileSystem fs = FileSystem.newInstance(conf); |
| |
| byte[] buf = new byte[10*1024]; |
| new Random().nextBytes(buf); |
| |
| // Write random bytes to file |
| FSDataOutputStream stream = fs.create(TEST_PATH); |
| try { |
| stream.write(buf); |
| } finally { |
| stream.close(); |
| } |
| |
| Random r = new Random(); |
| |
| FSDataInputStream stm = fs.open(TEST_PATH); |
| // Record the sequence of seeks and reads which trigger a failure. |
| int seeks[] = new int[10]; |
| int reads[] = new int[10]; |
| try { |
| for (int i = 0; i < 1000; i++) { |
| int seekOff = r.nextInt(buf.length); |
| int toRead = r.nextInt(Math.min(buf.length - seekOff, 32000)); |
| |
| seeks[i % seeks.length] = seekOff; |
| reads[i % reads.length] = toRead; |
| verifyRead(stm, buf, seekOff, toRead); |
| |
| } |
| } catch (AssertionError afe) { |
| StringBuilder sb = new StringBuilder(); |
| sb.append("Sequence of actions:\n"); |
| for (int j = 0; j < seeks.length; j++) { |
| sb.append("seek @ ").append(seeks[j]).append(" ") |
| .append("read ").append(reads[j]).append("\n"); |
| } |
| System.err.println(sb.toString()); |
| throw afe; |
| } finally { |
| stm.close(); |
| } |
| } |
| |
| /** |
| * Tests a simple rename of a directory. |
| */ |
| @Test |
| public void testRenameDirectory() throws IOException { |
| Path src = new Path(TEST_ROOT_DIR, "dir1"); |
| Path dst = new Path(TEST_ROOT_DIR, "dir2"); |
| fileSys.delete(src, true); |
| fileSys.delete(dst, true); |
| assertTrue(fileSys.mkdirs(src)); |
| assertTrue(fileSys.rename(src, dst)); |
| assertTrue(fileSys.exists(dst)); |
| assertFalse(fileSys.exists(src)); |
| } |
| |
| /** |
| * Tests that renaming a directory replaces the destination if the destination |
| * is an existing empty directory. |
| * |
| * Before: |
| * /dir1 |
| * /file1 |
| * /file2 |
| * /dir2 |
| * |
| * After rename("/dir1", "/dir2"): |
| * /dir2 |
| * /file1 |
| * /file2 |
| */ |
| @Test |
| public void testRenameReplaceExistingEmptyDirectory() throws IOException { |
| Path src = new Path(TEST_ROOT_DIR, "dir1"); |
| Path dst = new Path(TEST_ROOT_DIR, "dir2"); |
| fileSys.delete(src, true); |
| fileSys.delete(dst, true); |
| assertTrue(fileSys.mkdirs(src)); |
| writeFile(fileSys, new Path(src, "file1"), 1); |
| writeFile(fileSys, new Path(src, "file2"), 1); |
| assertTrue(fileSys.mkdirs(dst)); |
| assertTrue(fileSys.rename(src, dst)); |
| assertTrue(fileSys.exists(dst)); |
| assertTrue(fileSys.exists(new Path(dst, "file1"))); |
| assertTrue(fileSys.exists(new Path(dst, "file2"))); |
| assertFalse(fileSys.exists(src)); |
| } |
| |
| /** |
| * Tests that renaming a directory to an existing directory that is not empty |
| * results in a full copy of source to destination. |
| * |
| * Before: |
| * /dir1 |
| * /dir2 |
| * /dir3 |
| * /file1 |
| * /file2 |
| * |
| * After rename("/dir1/dir2/dir3", "/dir1"): |
| * /dir1 |
| * /dir3 |
| * /file1 |
| * /file2 |
| */ |
| @Test |
| public void testRenameMoveToExistingNonEmptyDirectory() throws IOException { |
| Path src = new Path(TEST_ROOT_DIR, "dir1/dir2/dir3"); |
| Path dst = new Path(TEST_ROOT_DIR, "dir1"); |
| fileSys.delete(src, true); |
| fileSys.delete(dst, true); |
| assertTrue(fileSys.mkdirs(src)); |
| writeFile(fileSys, new Path(src, "file1"), 1); |
| writeFile(fileSys, new Path(src, "file2"), 1); |
| assertTrue(fileSys.exists(dst)); |
| assertTrue(fileSys.rename(src, dst)); |
| assertTrue(fileSys.exists(dst)); |
| assertTrue(fileSys.exists(new Path(dst, "dir3"))); |
| assertTrue(fileSys.exists(new Path(dst, "dir3/file1"))); |
| assertTrue(fileSys.exists(new Path(dst, "dir3/file2"))); |
| assertFalse(fileSys.exists(src)); |
| } |
| |
| private void verifyRead(FSDataInputStream stm, byte[] fileContents, |
| int seekOff, int toRead) throws IOException { |
| byte[] out = new byte[toRead]; |
| stm.seek(seekOff); |
| stm.readFully(out); |
| byte[] expected = Arrays.copyOfRange(fileContents, seekOff, seekOff+toRead); |
| if (!Arrays.equals(out, expected)) { |
| String s ="\nExpected: " + |
| StringUtils.byteToHexString(expected) + |
| "\ngot: " + |
| StringUtils.byteToHexString(out) + |
| "\noff=" + seekOff + " len=" + toRead; |
| fail(s); |
| } |
| } |
| |
| @Test |
| public void testStripFragmentFromPath() throws Exception { |
| FileSystem fs = FileSystem.getLocal(new Configuration()); |
| Path pathQualified = TEST_PATH.makeQualified(fs.getUri(), |
| fs.getWorkingDirectory()); |
| Path pathWithFragment = new Path( |
| new URI(pathQualified.toString() + "#glacier")); |
| // Create test file with fragment |
| FileSystemTestHelper.createFile(fs, pathWithFragment); |
| Path resolved = fs.resolvePath(pathWithFragment); |
| assertEquals("resolvePath did not strip fragment from Path", pathQualified, |
| resolved); |
| } |
| |
| @Test |
| public void testAppendSetsPosCorrectly() throws Exception { |
| FileSystem fs = fileSys.getRawFileSystem(); |
| Path file = new Path(TEST_ROOT_DIR, "test-append"); |
| |
| fs.delete(file, true); |
| FSDataOutputStream out = fs.create(file); |
| |
| try { |
| out.write("text1".getBytes()); |
| } finally { |
| out.close(); |
| } |
| |
| // Verify the position |
| out = fs.append(file); |
| try { |
| assertEquals(5, out.getPos()); |
| out.write("text2".getBytes()); |
| } finally { |
| out.close(); |
| } |
| |
| // Verify the content |
| FSDataInputStream in = fs.open(file); |
| try { |
| byte[] buf = new byte[in.available()]; |
| in.readFully(buf); |
| assertEquals("text1text2", new String(buf)); |
| } finally { |
| in.close(); |
| } |
| } |
| |
| @Test |
| public void testFileStatusPipeFile() throws Exception { |
| RawLocalFileSystem origFs = new RawLocalFileSystem(); |
| RawLocalFileSystem fs = spy(origFs); |
| Configuration conf = mock(Configuration.class); |
| fs.setConf(conf); |
| Whitebox.setInternalState(fs, "useDeprecatedFileStatus", false); |
| Path path = new Path("/foo"); |
| File pipe = mock(File.class); |
| when(pipe.isFile()).thenReturn(false); |
| when(pipe.isDirectory()).thenReturn(false); |
| when(pipe.exists()).thenReturn(true); |
| |
| FileStatus stat = mock(FileStatus.class); |
| doReturn(pipe).when(fs).pathToFile(path); |
| doReturn(stat).when(fs).getFileStatus(path); |
| FileStatus[] stats = fs.listStatus(path); |
| assertTrue(stats != null && stats.length == 1 && stats[0] == stat); |
| } |
| |
| @Test |
| public void testFSOutputStreamBuilder() throws Exception { |
| Path path = new Path(TEST_ROOT_DIR, "testBuilder"); |
| |
| try { |
| FSDataOutputStreamBuilder builder = |
| fileSys.createFile(path).recursive(); |
| FSDataOutputStream out = builder.build(); |
| String content = "Create with a generic type of createFile!"; |
| byte[] contentOrigin = content.getBytes("UTF8"); |
| out.write(contentOrigin); |
| out.close(); |
| |
| FSDataInputStream input = fileSys.open(path); |
| byte[] buffer = |
| new byte[(int) (fileSys.getFileStatus(path).getLen())]; |
| input.readFully(0, buffer); |
| input.close(); |
| Assert.assertArrayEquals("The data be read should equals with the " |
| + "data written.", contentOrigin, buffer); |
| } catch (IOException e) { |
| throw e; |
| } |
| |
| // Test value not being set for replication, block size, buffer size |
| // and permission |
| FSDataOutputStreamBuilder builder = |
| fileSys.createFile(path); |
| try (FSDataOutputStream stream = builder.build()) { |
| assertThat(builder.getBlockSize()) |
| .withFailMessage("Should be default block size") |
| .isEqualTo(fileSys.getDefaultBlockSize()); |
| assertThat(builder.getReplication()) |
| .withFailMessage("Should be default replication factor") |
| .isEqualTo(fileSys.getDefaultReplication()); |
| assertThat(builder.getBufferSize()) |
| .withFailMessage("Should be default buffer size") |
| .isEqualTo(fileSys.getConf().getInt(IO_FILE_BUFFER_SIZE_KEY, |
| IO_FILE_BUFFER_SIZE_DEFAULT)); |
| assertThat(builder.getPermission()) |
| .withFailMessage("Should be default permission") |
| .isEqualTo(FsPermission.getFileDefault()); |
| } |
| |
| // Test set 0 to replication, block size and buffer size |
| builder = fileSys.createFile(path); |
| builder.bufferSize(0).blockSize(0).replication((short) 0); |
| assertThat(builder.getBlockSize()) |
| .withFailMessage("Block size should be 0") |
| .isZero(); |
| assertThat(builder.getReplication()) |
| .withFailMessage("Replication factor should be 0") |
| .isZero(); |
| assertThat(builder.getBufferSize()) |
| .withFailMessage("Buffer size should be 0") |
| .isZero(); |
| } |
| |
| /** |
| * A builder to verify configuration keys are supported. |
| */ |
| private static class BuilderWithSupportedKeys |
| extends FSDataOutputStreamBuilder<FSDataOutputStream, |
| BuilderWithSupportedKeys> { |
| |
| private final Set<String> supportedKeys = new HashSet<>(); |
| |
| BuilderWithSupportedKeys(@Nonnull final Collection<String> supportedKeys, |
| @Nonnull FileSystem fileSystem, @Nonnull Path p) { |
| super(fileSystem, p); |
| this.supportedKeys.addAll(supportedKeys); |
| } |
| |
| @Override |
| public BuilderWithSupportedKeys getThisBuilder() { |
| return this; |
| } |
| |
| @Override |
| public FSDataOutputStream build() |
| throws IllegalArgumentException, IOException { |
| Set<String> unsupported = new HashSet<>(getMandatoryKeys()); |
| unsupported.removeAll(supportedKeys); |
| Preconditions.checkArgument(unsupported.isEmpty(), |
| "unsupported key found: " + supportedKeys); |
| return getFS().create( |
| getPath(), getPermission(), getFlags(), getBufferSize(), |
| getReplication(), getBlockSize(), getProgress(), getChecksumOpt()); |
| } |
| } |
| |
| @Test |
| public void testFSOutputStreamBuilderOptions() throws Exception { |
| Path path = new Path(TEST_ROOT_DIR, "testBuilderOpt"); |
| final List<String> supportedKeys = Arrays.asList("strM"); |
| |
| FSDataOutputStreamBuilder<?, ?> builder = |
| new BuilderWithSupportedKeys(supportedKeys, fileSys, path); |
| builder.opt("strKey", "value"); |
| builder.opt("intKey", 123); |
| builder.opt("strM", "ignored"); |
| // Over-write an optional value with a mandatory value. |
| builder.must("strM", "value"); |
| builder.must("unsupported", 12.34); |
| |
| assertEquals("Optional value should be overwrite by a mandatory value", |
| "value", builder.getOptions().get("strM")); |
| |
| Set<String> mandatoryKeys = builder.getMandatoryKeys(); |
| Set<String> expectedKeys = new HashSet<>(); |
| expectedKeys.add("strM"); |
| expectedKeys.add("unsupported"); |
| assertEquals(expectedKeys, mandatoryKeys); |
| assertEquals(2, mandatoryKeys.size()); |
| |
| LambdaTestUtils.intercept(IllegalArgumentException.class, |
| "unsupported key found", builder::build |
| ); |
| } |
| |
| private static final int CRC_SIZE = 12; |
| |
| private static final byte[] DATA = "1234567890".getBytes(); |
| |
| /** |
| * Get the statistics for the file schema. Contains assertions |
| * @return the statistics on all file:// IO. |
| */ |
| protected Statistics getFileStatistics() { |
| final List<Statistics> all = FileSystem.getAllStatistics(); |
| final List<Statistics> fileStats = all |
| .stream() |
| .filter(s -> s.getScheme().equals("file")) |
| .collect(Collectors.toList()); |
| assertEquals("Number of statistics counters for file://", |
| 1, fileStats.size()); |
| // this should be used for local and rawLocal, as they share the |
| // same schema (although their class is different) |
| return fileStats.get(0); |
| } |
| |
| /** |
| * Write the byte array {@link #DATA} to the given output stream. |
| * @param s stream to write to. |
| * @throws IOException failure to write/close the file |
| */ |
| private void writeData(FSDataOutputStream s) throws IOException { |
| s.write(DATA); |
| s.close(); |
| } |
| |
| /** |
| * Evaluate the closure while counting bytes written during |
| * its execution, and verify that the count included the CRC |
| * write as well as the data. |
| * After the operation, the file is deleted. |
| * @param operation operation for assertion method. |
| * @param path path to write |
| * @param callable expression evaluated |
| * @param delete should the file be deleted after? |
| */ |
| private void assertWritesCRC(String operation, Path path, |
| LambdaTestUtils.VoidCallable callable, boolean delete) throws Exception { |
| final Statistics stats = getFileStatistics(); |
| final long bytesOut0 = stats.getBytesWritten(); |
| try { |
| callable.call(); |
| assertEquals("Bytes written in " + operation + "; stats=" + stats, |
| CRC_SIZE + DATA.length, stats.getBytesWritten() - bytesOut0); |
| } finally { |
| if (delete) { |
| // clean up |
| try { |
| fileSys.delete(path, false); |
| } catch (IOException ignored) { |
| // ignore this cleanup failure |
| } |
| } |
| } |
| } |
| |
| /** |
| * Verify that File IO through the classic non-builder APIs generate |
| * statistics which imply that CRCs were read and written. |
| */ |
| @Test |
| public void testCRCwithClassicAPIs() throws Throwable { |
| final Path file = new Path(TEST_ROOT_DIR, "testByteCountersClassicAPIs"); |
| assertWritesCRC("create()", |
| file, |
| () -> writeData(fileSys.create(file, true)), |
| false); |
| |
| final Statistics stats = getFileStatistics(); |
| final long bytesRead0 = stats.getBytesRead(); |
| fileSys.open(file).close(); |
| final long bytesRead1 = stats.getBytesRead(); |
| assertEquals("Bytes read in open() call with stats " + stats, |
| CRC_SIZE, bytesRead1 - bytesRead0); |
| } |
| |
| /** |
| * create/7 to use write the CRC. |
| */ |
| @Test |
| public void testCRCwithCreate7() throws Throwable { |
| final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreate7"); |
| assertWritesCRC("create/7", |
| file, |
| () -> writeData( |
| fileSys.create(file, |
| FsPermission.getFileDefault(), |
| true, |
| 8192, |
| (short)1, |
| 16384, |
| null)), |
| true); |
| } |
| |
| /** |
| * Create with ChecksumOpt to create checksums. |
| * If the LocalFS ever interpreted the flag, this test may fail. |
| */ |
| @Test |
| public void testCRCwithCreateChecksumOpt() throws Throwable { |
| final Path file = new Path(TEST_ROOT_DIR, "testCRCwithCreateChecksumOpt"); |
| assertWritesCRC("create with checksum opt", |
| file, |
| () -> writeData( |
| fileSys.create(file, |
| FsPermission.getFileDefault(), |
| EnumSet.of(CreateFlag.CREATE), |
| 8192, |
| (short)1, |
| 16384, |
| null, |
| Options.ChecksumOpt.createDisabled())), |
| true); |
| } |
| |
| /** |
| * Create createNonRecursive/6. |
| */ |
| @Test |
| public void testCRCwithCreateNonRecursive6() throws Throwable { |
| fileSys.mkdirs(TEST_PATH); |
| final Path file = new Path(TEST_ROOT_DIR, |
| "testCRCwithCreateNonRecursive6"); |
| assertWritesCRC("create with checksum opt", |
| file, |
| () -> writeData( |
| fileSys.createNonRecursive(file, |
| FsPermission.getFileDefault(), |
| true, |
| 8192, |
| (short)1, |
| 16384, |
| null)), |
| true); |
| } |
| |
| /** |
| * Create createNonRecursive with CreateFlags. |
| */ |
| @Test |
| public void testCRCwithCreateNonRecursiveCreateFlags() throws Throwable { |
| fileSys.mkdirs(TEST_PATH); |
| final Path file = new Path(TEST_ROOT_DIR, |
| "testCRCwithCreateNonRecursiveCreateFlags"); |
| assertWritesCRC("create with checksum opt", |
| file, |
| () -> writeData( |
| fileSys.createNonRecursive(file, |
| FsPermission.getFileDefault(), |
| EnumSet.of(CreateFlag.CREATE), |
| 8192, |
| (short)1, |
| 16384, |
| null)), |
| true); |
| } |
| |
| |
| /** |
| * This relates to MAPREDUCE-7184, where the openFile() call's |
| * CRC count wasn't making into the statistics for the current thread. |
| * If the evaluation was in a separate thread you'd expect that, |
| * but if the completable future is in fact being synchronously completed |
| * it should not happen. |
| */ |
| @Test |
| public void testReadIncludesCRCwithBuilders() throws Throwable { |
| |
| final Path file = new Path(TEST_ROOT_DIR, |
| "testReadIncludesCRCwithBuilders"); |
| Statistics stats = getFileStatistics(); |
| // write the file using the builder API |
| assertWritesCRC("createFile()", |
| file, |
| () -> writeData( |
| fileSys.createFile(file) |
| .overwrite(true).recursive() |
| .build()), |
| false); |
| |
| // now read back the data, again with the builder API |
| final long bytesRead0 = stats.getBytesRead(); |
| fileSys.openFile(file).build().get().close(); |
| assertEquals("Bytes read in openFile() call with stats " + stats, |
| CRC_SIZE, stats.getBytesRead() - bytesRead0); |
| // now write with overwrite = true |
| assertWritesCRC("createFileNonRecursive()", |
| file, |
| () -> { |
| try (FSDataOutputStream s = fileSys.createFile(file) |
| .overwrite(true) |
| .build()) { |
| s.write(DATA); |
| } |
| }, |
| true); |
| } |
| |
| /** |
| * Write with the builder, using the normal recursive create |
| * with create flags containing the overwrite option. |
| */ |
| @Test |
| public void testWriteWithBuildersRecursive() throws Throwable { |
| |
| final Path file = new Path(TEST_ROOT_DIR, |
| "testWriteWithBuildersRecursive"); |
| Statistics stats = getFileStatistics(); |
| // write the file using the builder API |
| assertWritesCRC("createFile()", |
| file, |
| () -> writeData( |
| fileSys.createFile(file) |
| .overwrite(false) |
| .recursive() |
| .build()), |
| true); |
| } |
| } |