blob: 91d19ecad1ec6af6ef2b7d7cdc2c5d4a21d2a894 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.contract;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
import org.junit.Test;
import org.junit.AssumptionViolatedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.FileNotFoundException;
import java.io.IOException;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertCapabilities;
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.getFileStatusEventually;
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeDataset;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsSourceToString;
/**
* Test creating files, overwrite options etc.
*/
public abstract class AbstractContractCreateTest extends
AbstractFSContractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(AbstractContractCreateTest.class);
/**
* How long to wait for a path to become visible.
*/
public static final int CREATE_TIMEOUT = 15000;
protected Path path(String filepath, boolean useBuilder) throws IOException {
return super.path(filepath + (useBuilder ? "" : "-builder"));
}
private void testCreateNewFile(boolean useBuilder) throws Throwable {
describe("Foundational 'create a file' test, using builder API=" +
useBuilder);
Path path = path("testCreateNewFile", useBuilder);
byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024 * 1024, false,
useBuilder);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
}
@Test
public void testCreateNewFile() throws Throwable {
testCreateNewFile(true);
testCreateNewFile(false);
}
private void testCreateFileOverExistingFileNoOverwrite(boolean useBuilder)
throws Throwable {
describe("Verify overwriting an existing file fails, using builder API=" +
useBuilder);
Path path = path("testCreateFileOverExistingFileNoOverwrite", useBuilder);
byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024, false);
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
try {
writeDataset(getFileSystem(), path, data2, data2.length, 1024, false,
useBuilder);
fail("writing without overwrite unexpectedly succeeded");
} catch (FileAlreadyExistsException expected) {
//expected
handleExpectedException(expected);
} catch (IOException relaxed) {
handleRelaxedException("Creating a file over a file with overwrite==false",
"FileAlreadyExistsException",
relaxed);
}
}
@Test
public void testCreateFileOverExistingFileNoOverwrite() throws Throwable {
testCreateFileOverExistingFileNoOverwrite(false);
testCreateFileOverExistingFileNoOverwrite(true);
}
private void testOverwriteExistingFile(boolean useBuilder) throws Throwable {
describe("Overwrite an existing file and verify the new data is there, " +
"use builder API=" + useBuilder);
Path path = path("testOverwriteExistingFile", useBuilder);
byte[] data = dataset(256, 'a', 'z');
writeDataset(getFileSystem(), path, data, data.length, 1024, false,
useBuilder);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data);
byte[] data2 = dataset(10 * 1024, 'A', 'Z');
writeDataset(getFileSystem(), path, data2, data2.length, 1024, true,
useBuilder);
ContractTestUtils.verifyFileContents(getFileSystem(), path, data2);
}
/**
* This test catches some eventual consistency problems that blobstores exhibit,
* as we are implicitly verifying that updates are consistent. This
* is why different file lengths and datasets are used
*/
@Test
public void testOverwriteExistingFile() throws Throwable {
testOverwriteExistingFile(false);
testOverwriteExistingFile(true);
}
private void testOverwriteEmptyDirectory(boolean useBuilder)
throws Throwable {
describe("verify trying to create a file over an empty dir fails, " +
"use builder API=" + useBuilder);
Path path = path("testOverwriteEmptyDirectory");
mkdirs(path);
assertIsDirectory(path);
byte[] data = dataset(256, 'a', 'z');
try {
writeDataset(getFileSystem(), path, data, data.length, 1024, true,
useBuilder);
assertIsDirectory(path);
fail("write of file over empty dir succeeded");
} catch (FileAlreadyExistsException expected) {
//expected
handleExpectedException(expected);
} catch (IOException e) {
handleRelaxedException("overwriting a dir with a file ",
"FileAlreadyExistsException",
e);
}
assertIsDirectory(path);
}
@Test
public void testOverwriteEmptyDirectory() throws Throwable {
testOverwriteEmptyDirectory(false);
testOverwriteEmptyDirectory(true);
}
private void testOverwriteNonEmptyDirectory(boolean useBuilder)
throws Throwable {
describe("verify trying to create a file over a non-empty dir fails, " +
"use builder API=" + useBuilder);
Path path = path("testOverwriteNonEmptyDirectory");
mkdirs(path);
try {
assertIsDirectory(path);
} catch (AssertionError failure) {
if (isSupported(CREATE_OVERWRITES_DIRECTORY)) {
// file/directory hack surfaces here
throw new AssumptionViolatedException(failure.toString(), failure);
}
// else: rethrow
throw failure;
}
Path child = new Path(path, "child");
writeTextFile(getFileSystem(), child, "child file", true);
byte[] data = dataset(256, 'a', 'z');
try {
writeDataset(getFileSystem(), path, data, data.length, 1024,
true, useBuilder);
FileStatus status = getFileSystem().getFileStatus(path);
boolean isDir = status.isDirectory();
if (!isDir && isSupported(CREATE_OVERWRITES_DIRECTORY)) {
// For some file systems, downgrade to a skip so that the failure is
// visible in test results.
skip("This Filesystem allows a file to overwrite a directory");
}
fail("write of file over dir succeeded");
} catch (FileAlreadyExistsException expected) {
//expected
handleExpectedException(expected);
} catch (IOException e) {
handleRelaxedException("overwriting a dir with a file ",
"FileAlreadyExistsException",
e);
}
assertIsDirectory(path);
assertIsFile(child);
}
@Test
public void testOverwriteNonEmptyDirectory() throws Throwable {
testOverwriteNonEmptyDirectory(false);
testOverwriteNonEmptyDirectory(true);
}
@Test
public void testCreatedFileIsImmediatelyVisible() throws Throwable {
describe("verify that a newly created file exists as soon as open returns");
Path path = path("testCreatedFileIsImmediatelyVisible");
try(FSDataOutputStream out = getFileSystem().create(path,
false,
4096,
(short) 1,
1024)) {
if (!getFileSystem().exists(path)) {
if (isSupported(CREATE_VISIBILITY_DELAYED)) {
// For some file systems, downgrade to a skip so that the failure is
// visible in test results.
skip("This Filesystem delays visibility of newly created files");
}
assertPathExists("expected path to be visible before anything written",
path);
}
}
}
@Test
public void testCreatedFileIsVisibleOnFlush() throws Throwable {
describe("verify that a newly created file exists once a flush has taken "
+ "place");
Path path = path("testCreatedFileIsVisibleOnFlush");
FileSystem fs = getFileSystem();
try(FSDataOutputStream out = fs.create(path,
false,
4096,
(short) 1,
1024)) {
out.write('a');
out.flush();
if (!fs.exists(path)) {
if (isSupported(IS_BLOBSTORE) ||
isSupported(CREATE_VISIBILITY_DELAYED)) {
// object store or some file systems: downgrade to a skip so that the
// failure is visible in test results
skip("For object store or some file systems, newly created files are"
+ " not immediately visible");
}
assertPathExists("expected path to be visible before file closed",
path);
}
}
}
@Test
public void testCreatedFileIsEventuallyVisible() throws Throwable {
describe("verify a written to file is visible after the stream is closed");
Path path = path("testCreatedFileIsEventuallyVisible");
FileSystem fs = getFileSystem();
try(FSDataOutputStream out = fs.create(path, false, 4096, (short) 1,
1024)) {
out.write(0x01);
out.close();
getFileStatusEventually(fs, path, CREATE_TIMEOUT);
}
}
@Test
public void testFileStatusBlocksizeNonEmptyFile() throws Throwable {
describe("validate the block size of a filesystem and files within it");
FileSystem fs = getFileSystem();
long rootPath = fs.getDefaultBlockSize(path("/"));
assertTrue("Root block size is invalid " + rootPath,
rootPath > 0);
Path path = path("testFileStatusBlocksizeNonEmptyFile");
byte[] data = dataset(256, 'a', 'z');
writeDataset(fs, path, data, data.length, 1024 * 1024, false);
validateBlockSize(fs, path, 1);
}
@Test
public void testFileStatusBlocksizeEmptyFile() throws Throwable {
describe("check that an empty file may return a 0-byte blocksize");
FileSystem fs = getFileSystem();
Path path = path("testFileStatusBlocksizeEmptyFile");
ContractTestUtils.touch(fs, path);
validateBlockSize(fs, path, 0);
}
private void validateBlockSize(FileSystem fs, Path path, int minValue)
throws IOException, InterruptedException {
FileStatus status =
getFileStatusEventually(fs, path, CREATE_TIMEOUT);
String statusDetails = status.toString();
assertTrue("File status block size too low: " + statusDetails
+ " min value: " + minValue,
status.getBlockSize() >= minValue);
long defaultBlockSize = fs.getDefaultBlockSize(path);
assertTrue("fs.getDefaultBlockSize(" + path + ") size " +
defaultBlockSize + " is below the minimum of " + minValue,
defaultBlockSize >= minValue);
}
@Test
public void testCreateMakesParentDirs() throws Throwable {
describe("check that after creating a file its parent directories exist");
FileSystem fs = getFileSystem();
Path grandparent = path("testCreateCreatesAndPopulatesParents");
Path parent = new Path(grandparent, "parent");
Path child = new Path(parent, "child");
touch(fs, child);
assertEquals("List status of parent should include the 1 child file",
1, fs.listStatus(parent).length);
assertTrue("Parent directory does not appear to be a directory",
fs.getFileStatus(parent).isDirectory());
assertEquals("List status of grandparent should include the 1 parent dir",
1, fs.listStatus(grandparent).length);
assertTrue("Grandparent directory does not appear to be a directory",
fs.getFileStatus(grandparent).isDirectory());
}
@Test
public void testCreateFileUnderFile() throws Throwable {
describe("Verify that it is forbidden to create file/file");
if (isSupported(CREATE_FILE_UNDER_FILE_ALLOWED)) {
// object store or some file systems: downgrade to a skip so that the
// failure is visible in test results
skip("This filesystem supports creating files under files");
}
Path grandparent = methodPath();
Path parent = new Path(grandparent, "parent");
expectCreateUnderFileFails(
"creating a file under a file",
grandparent,
parent);
}
@Test
public void testCreateUnderFileSubdir() throws Throwable {
describe("Verify that it is forbidden to create file/dir/file");
if (isSupported(CREATE_FILE_UNDER_FILE_ALLOWED)) {
// object store or some file systems: downgrade to a skip so that the
// failure is visible in test results
skip("This filesystem supports creating files under files");
}
Path grandparent = methodPath();
Path parent = new Path(grandparent, "parent");
Path child = new Path(parent, "child");
expectCreateUnderFileFails(
"creating a file under a subdirectory of a file",
grandparent,
child);
}
@Test
public void testMkdirUnderFile() throws Throwable {
describe("Verify that it is forbidden to create file/dir");
Path grandparent = methodPath();
Path parent = new Path(grandparent, "parent");
expectMkdirsUnderFileFails("mkdirs() under a file",
grandparent, parent);
}
@Test
public void testMkdirUnderFileSubdir() throws Throwable {
describe("Verify that it is forbidden to create file/dir/dir");
Path grandparent = methodPath();
Path parent = new Path(grandparent, "parent");
Path child = new Path(parent, "child");
expectMkdirsUnderFileFails("mkdirs() file/dir",
grandparent, child);
try {
// create the child
mkdirs(child);
} catch (FileAlreadyExistsException | ParentNotDirectoryException ex) {
// either of these may be raised.
handleExpectedException(ex);
} catch (IOException e) {
handleRelaxedException("creating a file under a subdirectory of a file ",
"FileAlreadyExistsException",
e);
}
}
/**
* Expect that touch() will fail because the parent is a file.
* @param action action for message
* @param file filename to create
* @param descendant path under file
* @throws Exception failure
*/
protected void expectCreateUnderFileFails(String action,
Path file, Path descendant)
throws Exception {
createFile(file);
try {
// create the child
createFile(descendant);
} catch (FileAlreadyExistsException | ParentNotDirectoryException ex) {
//expected
handleExpectedException(ex);
} catch (IOException e) {
handleRelaxedException(action,
"ParentNotDirectoryException",
e);
}
}
protected void expectMkdirsUnderFileFails(String action,
Path file, Path descendant)
throws Exception {
createFile(file);
try {
// now mkdirs
mkdirs(descendant);
} catch (FileAlreadyExistsException | ParentNotDirectoryException ex) {
//expected
handleExpectedException(ex);
} catch (IOException e) {
handleRelaxedException(action,
"ParentNotDirectoryException",
e);
}
}
private void createFile(Path path) throws IOException {
byte[] data = dataset(256, 'a', 'z');
FileSystem fs = getFileSystem();
writeDataset(fs, path, data, data.length, 1024 * 1024,
true);
}
@Test
public void testSyncable() throws Throwable {
describe("test declared and actual Syncable behaviors");
FileSystem fs = getFileSystem();
boolean supportsFlush = isSupported(SUPPORTS_HFLUSH);
boolean supportsSync = isSupported(SUPPORTS_HSYNC);
boolean metadataUpdatedOnHSync = isSupported(METADATA_UPDATED_ON_HSYNC);
validateSyncableSemantics(fs,
supportsSync,
supportsFlush,
metadataUpdatedOnHSync);
}
/**
* Validate the semantics of syncable.
* @param fs filesystem
* @param supportsSync sync is present
* @param supportsFlush flush is present.
* @param metadataUpdatedOnHSync Is the metadata updated after an hsync?
* @throws IOException failure
*/
protected void validateSyncableSemantics(final FileSystem fs,
final boolean supportsSync,
final boolean supportsFlush,
final boolean metadataUpdatedOnHSync)
throws IOException {
Path path = methodPath();
LOG.info("Expecting files under {} to have supportsSync={}"
+ " and supportsFlush={}; metadataUpdatedOnHSync={}",
path, supportsSync, supportsFlush, metadataUpdatedOnHSync);
try (FSDataOutputStream out = fs.create(path, true)) {
LOG.info("Created output stream {}", out);
// probe stream for support for flush/sync, whose capabilities
// of supports/does not support must match what is expected
String[] hflushCapabilities = {
StreamCapabilities.HFLUSH
};
String[] hsyncCapabilities = {
StreamCapabilities.HSYNC
};
if (supportsFlush) {
assertCapabilities(out, hflushCapabilities, null);
} else {
assertCapabilities(out, null, hflushCapabilities);
}
if (supportsSync) {
assertCapabilities(out, hsyncCapabilities, null);
} else {
assertCapabilities(out, null, hsyncCapabilities);
}
// write one byte, then hflush it
out.write('a');
try {
out.hflush();
if (!supportsFlush) {
// FSDataOutputStream silently downgrades to flush() here.
// This is not good, but if changed some applications
// break writing to some stores.
LOG.warn("FS doesn't support Syncable.hflush(),"
+ " but doesn't reject it either.");
}
} catch (UnsupportedOperationException e) {
if (supportsFlush) {
throw new AssertionError("hflush not supported", e);
}
}
// write a second byte, then hsync it.
out.write('b');
try {
out.hsync();
} catch (UnsupportedOperationException e) {
if (supportsSync) {
throw new AssertionError("HSync not supported", e);
}
}
if (supportsSync) {
// if sync really worked, data MUST be visible here
// first the metadata which MUST be present
final FileStatus st = fs.getFileStatus(path);
if (metadataUpdatedOnHSync) {
// not all stores reliably update it, HDFS/webHDFS in particular
assertEquals("Metadata not updated during write " + st,
2, st.getLen());
}
// there's no way to verify durability, but we can
// at least verify a new file input stream reads
// the data
try (FSDataInputStream in = fs.open(path)) {
assertEquals('a', in.read());
assertEquals('b', in.read());
assertEquals(-1, in.read());
LOG.info("Successfully read synced data on a new reader {}", in);
}
} else {
// no sync. Let's do a flush and see what happens.
out.flush();
// Now look at the filesystem.
try (FSDataInputStream in = fs.open(path)) {
int c = in.read();
if (c == -1) {
// nothing was synced; sync and flush really aren't there.
LOG.info("sync and flush are declared unsupported"
+ " -flushed changes were not saved");
} else {
LOG.info("sync and flush are declared unsupported"
+ " - but the stream does offer some sync/flush semantics");
}
// close outside a finally as we do want to see any exception raised.
in.close();
} catch (FileNotFoundException e) {
// that's OK if it's an object store or some file systems that newly created files
// are not immediately visible, but not if its a real FS
if (!isSupported(IS_BLOBSTORE) && !isSupported(CREATE_VISIBILITY_DELAYED)) {
throw e;
} else {
LOG.warn(
"Output file was not created; this is an object store or "
+ "a file system with different visibility semantics");
}
}
}
// close the output stream
out.close();
final String stats = ioStatisticsSourceToString(out);
if (!stats.isEmpty()) {
LOG.info("IOStatistics {}", stats);
}
}
}
}