blob: 5cd06c2c0efa2e770ca8d4f4704d6d013970fd42 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.ObjectListing;
import com.amazonaws.services.s3.AmazonS3;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
/**
* Test S3Guard list consistency feature by injecting delayed listObjects()
* visibility via {@link InconsistentAmazonS3Client}.
*
* Tests here generally:
* 1. Use the inconsistency injection mentioned above.
* 2. Only run when S3Guard is enabled.
*/
public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
@Override
protected AbstractFSContract createContract(Configuration conf) {
conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
S3ClientFactory.class);
// Other configs would break test assumptions
conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, DEFAULT_DELAY_KEY_MSEC);
return new S3AContract(conf);
}
/**
* Helper function for other test cases: does a single rename operation and
* validates the aftermath.
* @param mkdirs Directories to create
* @param srcdirs Source paths for rename operation
* @param dstdirs Destination paths for rename operation
* @param yesdirs Files that must exist post-rename (e.g. srcdirs children)
* @param nodirs Files that must not exist post-rename (e.g. dstdirs children)
* @throws Exception
*/
private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs,
Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception {
S3AFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.hasMetadataStore());
if (mkdirs != null) {
for (Path mkdir : mkdirs) {
assertTrue(fs.mkdirs(mkdir));
}
clearInconsistency(fs);
}
assertTrue("srcdirs and dstdirs must have equal length",
srcdirs.length == dstdirs.length);
for (int i = 0; i < srcdirs.length; i++) {
assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i],
fs.rename(srcdirs[i], dstdirs[i]));
}
for (Path yesdir : yesdirs) {
assertTrue("Path was supposed to exist: " + yesdir, fs.exists(yesdir));
}
for (Path nodir : nodirs) {
assertFalse("Path is not supposed to exist: " + nodir, fs.exists(nodir));
}
}
/**
* Tests that after renaming a directory, the original directory and its
* contents are indeed missing and the corresponding new paths are visible.
* @throws Exception
*/
@Test
public void testConsistentListAfterRename() throws Exception {
Path[] mkdirs = {
path("d1/f"),
path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)
};
Path[] srcdirs = {path("d1")};
Path[] dstdirs = {path("d2")};
Path[] yesdirs = {path("d2"), path("d2/f"),
path("d2/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
Path[] nodirs = {path("d1"), path("d1/f"),
path("d1/f" + DEFAULT_DELAY_KEY_SUBSTRING)};
doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs);
getFileSystem().delete(path("d1"), true);
getFileSystem().delete(path("d2"), true);
}
/**
* Tests a circular sequence of renames to verify that overwriting recently
* deleted files and reading recently created files from rename operations
* works as expected.
* @throws Exception
*/
@Test
public void testRollingRenames() throws Exception {
Path[] dir0 = {path("rolling/1")};
Path[] dir1 = {path("rolling/2")};
Path[] dir2 = {path("rolling/3")};
// These sets have to be in reverse order compared to the movement
Path[] setA = {dir1[0], dir0[0]};
Path[] setB = {dir2[0], dir1[0]};
Path[] setC = {dir0[0], dir2[0]};
for(int i = 0; i < 2; i++) {
Path[] firstSet = i == 0 ? setA : null;
doTestRenameSequence(firstSet, setA, setB, setB, dir0);
doTestRenameSequence(null, setB, setC, setC, dir1);
doTestRenameSequence(null, setC, setA, setA, dir2);
}
S3AFileSystem fs = getFileSystem();
assertFalse("Renaming deleted file should have failed",
fs.rename(dir2[0], dir1[0]));
assertTrue("Renaming over existing file should have succeeded",
fs.rename(dir1[0], dir0[0]));
}
/**
* Tests that deleted files immediately stop manifesting in list operations
* even when the effect in S3 is delayed.
* @throws Exception
*/
@Test
public void testConsistentListAfterDelete() throws Exception {
S3AFileSystem fs = getFileSystem();
// test will fail if NullMetadataStore (the default) is configured: skip it.
Assume.assumeTrue(fs.hasMetadataStore());
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
Path inconsistentPath =
path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] testDirs = {path("a/b/dir1"),
path("a/b/dir2"),
inconsistentPath};
for (Path path : testDirs) {
assertTrue(fs.mkdirs(path));
}
clearInconsistency(fs);
for (Path path : testDirs) {
assertTrue(fs.delete(path, false));
}
FileStatus[] paths = fs.listStatus(path("a/b/"));
List<Path> list = new ArrayList<>();
for (FileStatus fileState : paths) {
list.add(fileState.getPath());
}
assertFalse(list.contains(path("a/b/dir1")));
assertFalse(list.contains(path("a/b/dir2")));
// This should fail without S3Guard, and succeed with it.
assertFalse(list.contains(inconsistentPath));
}
/**
* Tests that rename immediately after files in the source directory are
* deleted results in exactly the correct set of destination files and none
* of the source files.
* @throws Exception
*/
@Test
public void testConsistentRenameAfterDelete() throws Exception {
S3AFileSystem fs = getFileSystem();
// test will fail if NullMetadataStore (the default) is configured: skip it.
Assume.assumeTrue(fs.hasMetadataStore());
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
Path inconsistentPath =
path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] testDirs = {path("a/b/dir1"),
path("a/b/dir2"),
inconsistentPath};
for (Path path : testDirs) {
assertTrue(fs.mkdirs(path));
}
clearInconsistency(fs);
assertTrue(fs.delete(testDirs[1], false));
assertTrue(fs.delete(testDirs[2], false));
fs.rename(path("a"), path("a3"));
FileStatus[] paths = fs.listStatus(path("a3/b"));
List<Path> list = new ArrayList<>();
for (FileStatus fileState : paths) {
list.add(fileState.getPath());
}
assertTrue(list.contains(path("a3/b/dir1")));
assertFalse(list.contains(path("a3/b/dir2")));
// This should fail without S3Guard, and succeed with it.
assertFalse(list.contains(path("a3/b/dir3-" +
DEFAULT_DELAY_KEY_SUBSTRING)));
try {
RemoteIterator<LocatedFileStatus> old = fs.listFilesAndEmptyDirectories(
path("a"), true);
fail("Recently renamed dir should not be visible");
} catch(FileNotFoundException e) {
// expected
}
}
@Test
public void testConsistentListStatusAfterPut() throws Exception {
S3AFileSystem fs = getFileSystem();
// This test will fail if NullMetadataStore (the default) is configured:
// skip it.
Assume.assumeTrue(fs.hasMetadataStore());
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
Path inconsistentPath =
path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] testDirs = {path("a/b/dir1"),
path("a/b/dir2"),
inconsistentPath};
for (Path path : testDirs) {
assertTrue(fs.mkdirs(path));
}
FileStatus[] paths = fs.listStatus(path("a/b/"));
List<Path> list = new ArrayList<>();
for (FileStatus fileState : paths) {
list.add(fileState.getPath());
}
assertTrue(list.contains(path("a/b/dir1")));
assertTrue(list.contains(path("a/b/dir2")));
// This should fail without S3Guard, and succeed with it.
assertTrue(list.contains(inconsistentPath));
}
/**
* Similar to {@link #testConsistentListStatusAfterPut()}, this tests that the
* FS listLocatedStatus() call will return consistent list.
*/
@Test
public void testConsistentListLocatedStatusAfterPut() throws Exception {
final S3AFileSystem fs = getFileSystem();
// This test will fail if NullMetadataStore (the default) is configured:
// skip it.
Assume.assumeTrue(fs.hasMetadataStore());
String rootDir = "doTestConsistentListLocatedStatusAfterPut";
fs.mkdirs(path(rootDir));
final int[] numOfPaths = {0, 1, 5};
for (int normalPathNum : numOfPaths) {
for (int delayedPathNum : new int[] {0, 2}) {
LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
normalPathNum, delayedPathNum);
doTestConsistentListLocatedStatusAfterPut(fs, rootDir, normalPathNum,
delayedPathNum);
}
}
}
/**
* Helper method to implement the tests of consistent listLocatedStatus().
* @param fs The S3 file system from contract
* @param normalPathNum number paths listed directly from S3 without delaying
* @param delayedPathNum number paths listed with delaying
* @throws Exception
*/
private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs,
String rootDir, int normalPathNum, int delayedPathNum) throws Exception {
final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
int index = 0;
for (; index < normalPathNum; index++) {
testDirs.add(path(rootDir + "/dir-" +
index));
}
for (; index < normalPathNum + delayedPathNum; index++) {
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
testDirs.add(path(rootDir + "/dir-" + index +
DEFAULT_DELAY_KEY_SUBSTRING));
}
for (Path path : testDirs) {
// delete the old test path (if any) so that when we call mkdirs() later,
// the to delay directories will be tracked via putObject() request.
fs.delete(path, true);
assertTrue(fs.mkdirs(path));
}
// this should return the union data from S3 and MetadataStore
final RemoteIterator<LocatedFileStatus> statusIterator =
fs.listLocatedStatus(path(rootDir + "/"));
List<Path> list = new ArrayList<>();
for (; statusIterator.hasNext();) {
list.add(statusIterator.next().getPath());
}
// This should fail without S3Guard, and succeed with it because part of the
// children under test path are delaying visibility
for (Path path : testDirs) {
assertTrue("listLocatedStatus should list " + path, list.contains(path));
}
}
/**
* Tests that the S3AFS listFiles() call will return consistent file list.
*/
@Test
public void testConsistentListFiles() throws Exception {
final S3AFileSystem fs = getFileSystem();
// This test will fail if NullMetadataStore (the default) is configured:
// skip it.
Assume.assumeTrue(fs.hasMetadataStore());
final int[] numOfPaths = {0, 2};
for (int dirNum : numOfPaths) {
for (int normalFile : numOfPaths) {
for (int delayedFile : new int[] {0, 1}) {
for (boolean recursive : new boolean[] {true, false}) {
doTestListFiles(fs, dirNum, normalFile, delayedFile, recursive);
}
}
}
}
}
/**
* Helper method to implement the tests of consistent listFiles().
*
* The file structure has dirNum subdirectories, and each directory (including
* the test base directory itself) has normalFileNum normal files and
* delayedFileNum delayed files.
*
* @param fs The S3 file system from contract
* @param dirNum number of subdirectories
* @param normalFileNum number files in each directory without delay to list
* @param delayedFileNum number files in each directory with delay to list
* @param recursive listFiles recursively if true
* @throws Exception if any unexpected error
*/
private void doTestListFiles(S3AFileSystem fs, int dirNum, int normalFileNum,
int delayedFileNum, boolean recursive) throws Exception {
describe("Testing dirNum=%d, normalFile=%d, delayedFile=%d, "
+ "recursive=%s", dirNum, normalFileNum, delayedFileNum, recursive);
final Path baseTestDir = path("doTestListFiles-" + dirNum + "-"
+ normalFileNum + "-" + delayedFileNum + "-" + recursive);
// delete the old test path (if any) so that when we call mkdirs() later,
// the to delay sub directories will be tracked via putObject() request.
fs.delete(baseTestDir, true);
// make subdirectories (if any)
final List<Path> testDirs = new ArrayList<>(dirNum + 1);
assertTrue(fs.mkdirs(baseTestDir));
testDirs.add(baseTestDir);
for (int i = 0; i < dirNum; i++) {
final Path subdir = path(baseTestDir + "/dir-" + i);
assertTrue(fs.mkdirs(subdir));
testDirs.add(subdir);
}
final Collection<String> fileNames
= new ArrayList<>(normalFileNum + delayedFileNum);
int index = 0;
for (; index < normalFileNum; index++) {
fileNames.add("file-" + index);
}
for (; index < normalFileNum + delayedFileNum; index++) {
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
fileNames.add("file-" + index + "-" + DEFAULT_DELAY_KEY_SUBSTRING);
}
int filesAndEmptyDirectories = 0;
// create files under each test directory
for (Path dir : testDirs) {
for (String fileName : fileNames) {
writeTextFile(fs, new Path(dir, fileName), "I, " + fileName, false);
filesAndEmptyDirectories++;
}
}
// this should return the union data from S3 and MetadataStore
final RemoteIterator<LocatedFileStatus> statusIterator
= fs.listFiles(baseTestDir, recursive);
final Collection<Path> listedFiles = new HashSet<>();
for (; statusIterator.hasNext();) {
final FileStatus status = statusIterator.next();
assertTrue("FileStatus " + status + " is not a file!", status.isFile());
listedFiles.add(status.getPath());
}
LOG.info("S3AFileSystem::listFiles('{}', {}) -> {}",
baseTestDir, recursive, listedFiles);
// This should fail without S3Guard, and succeed with it because part of the
// files to list are delaying visibility
if (!recursive) {
// in this case only the top level files are listed
verifyFileIsListed(listedFiles, baseTestDir, fileNames);
assertEquals("Unexpected number of files returned by listFiles() call",
normalFileNum + delayedFileNum, listedFiles.size());
} else {
for (Path dir : testDirs) {
verifyFileIsListed(listedFiles, dir, fileNames);
}
assertEquals("Unexpected number of files returned by listFiles() call",
filesAndEmptyDirectories,
listedFiles.size());
}
}
private static void verifyFileIsListed(Collection<Path> listedFiles,
Path currentDir, Collection<String> fileNames) {
for (String fileName : fileNames) {
final Path file = new Path(currentDir, fileName);
assertTrue(file + " should have been listed", listedFiles.contains(file));
}
}
@Test
public void testCommitByRenameOperations() throws Throwable {
S3AFileSystem fs = getFileSystem();
Assume.assumeTrue(fs.hasMetadataStore());
Path work = path("test-commit-by-rename-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path task00 = new Path(work, "task00");
fs.mkdirs(task00);
String name = "part-00";
try (FSDataOutputStream out =
fs.create(new Path(task00, name), false)) {
out.writeChars("hello");
}
for (FileStatus stat : fs.listStatus(task00)) {
fs.rename(stat.getPath(), work);
}
List<FileStatus> files = new ArrayList<>(2);
for (FileStatus stat : fs.listStatus(work)) {
if (stat.isFile()) {
files.add(stat);
}
}
assertFalse("renamed file " + name + " not found in " + work,
files.isEmpty());
assertEquals("more files found than expected in " + work
+ " " + ls(work), 1, files.size());
FileStatus status = files.get(0);
assertEquals("Wrong filename in " + status,
name, status.getPath().getName());
}
@Test
public void testInconsistentS3ClientDeletes() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path root = path("testInconsistentClient" + DEFAULT_DELAY_KEY_SUBSTRING);
for (int i = 0; i < 3; i++) {
fs.mkdirs(new Path(root, "dir" + i));
touch(fs, new Path(root, "file" + i));
for (int j = 0; j < 3; j++) {
touch(fs, new Path(new Path(root, "dir" + i), "file" + i + "-" + j));
}
}
clearInconsistency(fs);
AmazonS3 client = fs.getAmazonS3Client();
String key = fs.pathToKey(root) + "/";
ObjectListing preDeleteDelimited = client.listObjects(
fs.createListObjectsRequest(key, "/"));
ObjectListing preDeleteUndelimited = client.listObjects(
fs.createListObjectsRequest(key, null));
fs.delete(root, true);
ObjectListing postDeleteDelimited = client.listObjects(
fs.createListObjectsRequest(key, "/"));
ObjectListing postDeleteUndelimited = client.listObjects(
fs.createListObjectsRequest(key, null));
assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
"in a non-recursive listing",
preDeleteDelimited.getObjectSummaries().size(),
postDeleteDelimited.getObjectSummaries().size()
);
assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a non-recursive listing",
preDeleteDelimited.getCommonPrefixes().size(),
postDeleteDelimited.getCommonPrefixes().size()
);
assertEquals("InconsistentAmazonS3Client added back objects incorrectly " +
"in a recursive listing",
preDeleteUndelimited.getObjectSummaries().size(),
postDeleteUndelimited.getObjectSummaries().size()
);
assertEquals("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a recursive listing",
preDeleteUndelimited.getCommonPrefixes().size(),
postDeleteUndelimited.getCommonPrefixes().size()
);
}
private static void clearInconsistency(S3AFileSystem fs) throws Exception {
AmazonS3 s3 = fs.getAmazonS3Client();
InconsistentAmazonS3Client ic = InconsistentAmazonS3Client.castFrom(s3);
ic.clearInconsistency();
}
}