blob: 0246b5415f18feebb6fd60c8d571d43c6696cc81 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.ListObjectsV2Result;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.contract.AbstractFSContract;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.contract.s3a.S3AContract;
import com.amazonaws.services.s3.model.S3ObjectSummary;
import com.google.common.collect.Lists;
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import static org.apache.hadoop.fs.contract.ContractTestUtils.touch;
import static org.apache.hadoop.fs.contract.ContractTestUtils.writeTextFile;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.FailureInjectionPolicy.*;
import static org.apache.hadoop.fs.s3a.InconsistentAmazonS3Client.*;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
/**
* Test S3Guard list consistency feature by injecting delayed listObjects()
* visibility via {@link InconsistentAmazonS3Client}.
*
* Tests here generally:
* 1. Use the inconsistency injection mentioned above.
* 2. Only run when S3Guard is enabled.
*/
public class ITestS3GuardListConsistency extends AbstractS3ATestBase {
private Invoker invoker;
@Override
public void setup() throws Exception {
super.setup();
invoker = new Invoker(new S3ARetryPolicy(getConfiguration()),
Invoker.NO_OP
);
Assume.assumeTrue("No metadata store in test filesystem",
getFileSystem().hasMetadataStore());
}
@Override
public void teardown() throws Exception {
clearInconsistency(getFileSystem());
super.teardown();
}
@Override
protected AbstractFSContract createContract(Configuration conf) {
conf.setClass(S3_CLIENT_FACTORY_IMPL, InconsistentS3ClientFactory.class,
S3ClientFactory.class);
// Other configs would break test assumptions
conf.set(FAIL_INJECT_INCONSISTENCY_KEY, DEFAULT_DELAY_KEY_SUBSTRING);
conf.setFloat(FAIL_INJECT_INCONSISTENCY_PROBABILITY, 1.0f);
// this is a long value to guarantee that the inconsistency holds
// even over long-haul connections, and in the debugger too/
conf.setLong(FAIL_INJECT_INCONSISTENCY_MSEC, 600_1000L);
return new S3AContract(conf);
}
/**
* Helper function for other test cases: does a single rename operation and
* validates the aftermath.
* @param mkdirs Directories to create
* @param srcdirs Source paths for rename operation
* @param dstdirs Destination paths for rename operation
* @param yesdirs Files that must exist post-rename (e.g. srcdirs children)
* @param nodirs Files that must not exist post-rename (e.g. dstdirs children)
* @throws Exception
*/
private void doTestRenameSequence(Path[] mkdirs, Path[] srcdirs,
Path[] dstdirs, Path[] yesdirs, Path[] nodirs) throws Exception {
S3AFileSystem fs = getFileSystem();
if (mkdirs != null) {
for (Path mkdir : mkdirs) {
assertTrue(fs.mkdirs(mkdir));
}
clearInconsistency(fs);
}
assertEquals("srcdirs and dstdirs must have equal length",
srcdirs.length, dstdirs.length);
for (int i = 0; i < srcdirs.length; i++) {
assertTrue("Rename returned false: " + srcdirs[i] + " -> " + dstdirs[i],
fs.rename(srcdirs[i], dstdirs[i]));
}
for (Path yesdir : yesdirs) {
assertTrue("Path was supposed to exist: " + yesdir, fs.exists(yesdir));
}
for (Path nodir : nodirs) {
assertFalse("Path is not supposed to exist: " + nodir, fs.exists(nodir));
}
}
/**
* Delete an array of paths; log exceptions.
* @param paths paths to delete
*/
private void deletePathsQuietly(Path...paths) {
for (Path dir : paths) {
try {
getFileSystem().delete(dir, true);
} catch (IOException e) {
LOG.info("Failed to delete {}: {}", dir, e.toString());
LOG.debug("Delete failure:, e");
}
}
}
/**
* Tests that after renaming a directory, the original directory and its
* contents are indeed missing and the corresponding new paths are visible.
* @throws Exception
*/
@Test
public void testConsistentListAfterRename() throws Exception {
Path d1f = path("d1/f");
Path d1f2 = path("d1/f-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] mkdirs = {d1f, d1f2};
Path d1 = path("d1");
Path[] srcdirs = {d1};
Path d2 = path("d2");
Path[] dstdirs = {d2};
Path d2f2 = path("d2/f-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] yesdirs = {d2, path("d2/f"), d2f2};
Path[] nodirs = {
d1, d1f, d1f2};
try {
doTestRenameSequence(mkdirs, srcdirs, dstdirs, yesdirs, nodirs);
} finally {
clearInconsistency(getFileSystem());
deletePathsQuietly(d1, d2, d1f, d1f2, d2f2);
}
}
/**
* Tests a circular sequence of renames to verify that overwriting recently
* deleted files and reading recently created files from rename operations
* works as expected.
* @throws Exception
*/
@Test
public void testRollingRenames() throws Exception {
Path[] dir0 = {path("rolling/1")};
Path[] dir1 = {path("rolling/2")};
Path[] dir2 = {path("rolling/3")};
// These sets have to be in reverse order compared to the movement
Path[] setA = {dir1[0], dir0[0]};
Path[] setB = {dir2[0], dir1[0]};
Path[] setC = {dir0[0], dir2[0]};
try {
for(int i = 0; i < 2; i++) {
Path[] firstSet = i == 0 ? setA : null;
doTestRenameSequence(firstSet, setA, setB, setB, dir0);
doTestRenameSequence(null, setB, setC, setC, dir1);
doTestRenameSequence(null, setC, setA, setA, dir2);
}
S3AFileSystem fs = getFileSystem();
assertFalse("Renaming deleted file should have failed",
fs.rename(dir2[0], dir1[0]));
assertTrue("Renaming over existing file should have succeeded",
fs.rename(dir1[0], dir0[0]));
} finally {
clearInconsistency(getFileSystem());
deletePathsQuietly(dir0[0], dir1[0], dir2[0]);
}
}
/**
* Tests that deleted files immediately stop manifesting in list operations
* even when the effect in S3 is delayed.
* @throws Exception
*/
@Test
public void testConsistentListAfterDelete() throws Exception {
S3AFileSystem fs = getFileSystem();
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
Path inconsistentPath =
path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path dir1 = path("a/b/dir1");
Path dir2 = path("a/b/dir2");
Path[] testDirs = {
dir1,
dir2,
inconsistentPath};
for (Path path : testDirs) {
assertTrue("Can't create directory: " + path, fs.mkdirs(path));
}
clearInconsistency(fs);
for (Path path : testDirs) {
assertTrue("Can't delete path: " + path, fs.delete(path, false));
}
FileStatus[] paths = fs.listStatus(path("a/b/"));
List<Path> list = new ArrayList<>();
for (FileStatus fileState : paths) {
list.add(fileState.getPath());
}
Assertions.assertThat(list)
.describedAs("Expected deleted files to be excluded")
.doesNotContain(dir1)
.doesNotContain(dir2)
.doesNotContain(inconsistentPath);
}
/**
* Tests that rename immediately after files in the source directory are
* deleted results in exactly the correct set of destination files and none
* of the source files.
* @throws Exception
*/
@Test
public void testConsistentRenameAfterDelete() throws Exception {
S3AFileSystem fs = getFileSystem();
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
Path inconsistentPath =
path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] testDirs = {path("a/b/dir1"),
path("a/b/dir2"),
inconsistentPath};
for (Path path : testDirs) {
assertTrue(fs.mkdirs(path));
}
clearInconsistency(fs);
assertTrue(fs.delete(testDirs[1], false));
assertTrue(fs.delete(testDirs[2], false));
ContractTestUtils.rename(fs, path("a"), path("a3"));
ContractTestUtils.assertPathsDoNotExist(fs,
"Source paths shouldn't exist post rename operation",
testDirs[0], testDirs[1], testDirs[2]);
FileStatus[] paths = fs.listStatus(path("a3/b"));
List<Path> list = new ArrayList<>();
for (FileStatus fileState : paths) {
list.add(fileState.getPath());
}
Assertions.assertThat(list)
.contains(path("a3/b/dir1"))
.doesNotContain(path("a3/b/dir2"))
.doesNotContain(path("a3/b/dir3-" +
DEFAULT_DELAY_KEY_SUBSTRING));
intercept(FileNotFoundException.class, "",
"Recently renamed dir should not be visible",
() -> S3AUtils.mapLocatedFiles(
fs.listFilesAndEmptyDirectories(path("a"), true),
FileStatus::getPath));
}
@Test
public void testConsistentListStatusAfterPut() throws Exception {
S3AFileSystem fs = getFileSystem();
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
Path inconsistentPath =
path("a/b/dir3-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path[] testDirs = {path("a/b/dir1"),
path("a/b/dir2"),
inconsistentPath};
for (Path path : testDirs) {
assertTrue(fs.mkdirs(path));
}
FileStatus[] paths = fs.listStatus(path("a/b/"));
List<Path> list = new ArrayList<>();
for (FileStatus fileState : paths) {
list.add(fileState.getPath());
}
Assertions.assertThat(list)
.contains(path("a/b/dir1"))
.contains(path("a/b/dir2"))
.contains(inconsistentPath);
}
/**
* Similar to {@link #testConsistentListStatusAfterPut()}, this tests that the
* FS listLocatedStatus() call will return consistent list.
*/
@Test
public void testConsistentListLocatedStatusAfterPut() throws Exception {
final S3AFileSystem fs = getFileSystem();
String rootDir = "doTestConsistentListLocatedStatusAfterPut";
fs.mkdirs(path(rootDir));
final int[] numOfPaths = {0, 1, 5};
for (int normalPathNum : numOfPaths) {
for (int delayedPathNum : new int[] {0, 2}) {
LOG.info("Testing with normalPathNum={}, delayedPathNum={}",
normalPathNum, delayedPathNum);
doTestConsistentListLocatedStatusAfterPut(fs, rootDir, normalPathNum,
delayedPathNum);
}
}
}
/**
* Helper method to implement the tests of consistent listLocatedStatus().
* @param fs The S3 file system from contract
* @param normalPathNum number paths listed directly from S3 without delaying
* @param delayedPathNum number paths listed with delaying
* @throws Exception
*/
private void doTestConsistentListLocatedStatusAfterPut(S3AFileSystem fs,
String rootDir, int normalPathNum, int delayedPathNum) throws Exception {
final List<Path> testDirs = new ArrayList<>(normalPathNum + delayedPathNum);
int index = 0;
for (; index < normalPathNum; index++) {
testDirs.add(path(rootDir + "/dir-" +
index));
}
for (; index < normalPathNum + delayedPathNum; index++) {
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
testDirs.add(path(rootDir + "/dir-" + index +
DEFAULT_DELAY_KEY_SUBSTRING));
}
for (Path path : testDirs) {
// delete the old test path (if any) so that when we call mkdirs() later,
// the to delay directories will be tracked via putObject() request.
fs.delete(path, true);
assertTrue(fs.mkdirs(path));
}
// this should return the union data from S3 and MetadataStore
final RemoteIterator<LocatedFileStatus> statusIterator =
fs.listLocatedStatus(path(rootDir + "/"));
List<Path> list = new ArrayList<>();
for (; statusIterator.hasNext();) {
list.add(statusIterator.next().getPath());
}
// This should fail without S3Guard, and succeed with it because part of the
// children under test path are delaying visibility
for (Path path : testDirs) {
assertTrue("listLocatedStatus should list " + path, list.contains(path));
}
}
/**
* Tests that the S3AFS listFiles() call will return consistent file list.
*/
@Test
public void testConsistentListFiles() throws Exception {
final S3AFileSystem fs = getFileSystem();
final int[] numOfPaths = {0, 2};
for (int dirNum : numOfPaths) {
for (int normalFile : numOfPaths) {
for (int delayedFile : new int[] {0, 1}) {
for (boolean recursive : new boolean[] {true, false}) {
doTestListFiles(fs, dirNum, normalFile, delayedFile, recursive);
}
}
}
}
}
/**
* Helper method to implement the tests of consistent listFiles().
*
* The file structure has dirNum subdirectories, and each directory (including
* the test base directory itself) has normalFileNum normal files and
* delayedFileNum delayed files.
*
* @param fs The S3 file system from contract
* @param dirNum number of subdirectories
* @param normalFileNum number files in each directory without delay to list
* @param delayedFileNum number files in each directory with delay to list
* @param recursive listFiles recursively if true
* @throws Exception if any unexpected error
*/
private void doTestListFiles(S3AFileSystem fs, int dirNum, int normalFileNum,
int delayedFileNum, boolean recursive) throws Exception {
describe("Testing dirNum=%d, normalFile=%d, delayedFile=%d, "
+ "recursive=%s", dirNum, normalFileNum, delayedFileNum, recursive);
final Path baseTestDir = path("doTestListFiles-" + dirNum + "-"
+ normalFileNum + "-" + delayedFileNum + "-" + recursive);
// delete the old test path (if any) so that when we call mkdirs() later,
// the to delay sub directories will be tracked via putObject() request.
fs.delete(baseTestDir, true);
// make subdirectories (if any)
final List<Path> testDirs = new ArrayList<>(dirNum + 1);
assertTrue(fs.mkdirs(baseTestDir));
testDirs.add(baseTestDir);
for (int i = 0; i < dirNum; i++) {
final Path subdir = path(baseTestDir + "/dir-" + i);
assertTrue(fs.mkdirs(subdir));
testDirs.add(subdir);
}
final Collection<String> fileNames
= new ArrayList<>(normalFileNum + delayedFileNum);
int index = 0;
for (; index < normalFileNum; index++) {
fileNames.add("file-" + index);
}
for (; index < normalFileNum + delayedFileNum; index++) {
// Any S3 keys that contain DELAY_KEY_SUBSTRING will be delayed
// in listObjects() results via InconsistentS3Client
fileNames.add("file-" + index + "-" + DEFAULT_DELAY_KEY_SUBSTRING);
}
int filesAndEmptyDirectories = 0;
// create files under each test directory
for (Path dir : testDirs) {
for (String fileName : fileNames) {
writeTextFile(fs, new Path(dir, fileName), "I, " + fileName, false);
filesAndEmptyDirectories++;
}
}
// this should return the union data from S3 and MetadataStore
final RemoteIterator<LocatedFileStatus> statusIterator
= fs.listFiles(baseTestDir, recursive);
final Collection<Path> listedFiles = new HashSet<>();
for (; statusIterator.hasNext();) {
final FileStatus status = statusIterator.next();
assertTrue("FileStatus " + status + " is not a file!", status.isFile());
listedFiles.add(status.getPath());
}
LOG.info("S3AFileSystem::listFiles('{}', {}) -> {}",
baseTestDir, recursive, listedFiles);
// This should fail without S3Guard, and succeed with it because part of the
// files to list are delaying visibility
if (!recursive) {
// in this case only the top level files are listed
verifyFileIsListed(listedFiles, baseTestDir, fileNames);
assertEquals("Unexpected number of files returned by listFiles() call",
normalFileNum + delayedFileNum, listedFiles.size());
} else {
for (Path dir : testDirs) {
verifyFileIsListed(listedFiles, dir, fileNames);
}
assertEquals("Unexpected number of files returned by listFiles() call",
filesAndEmptyDirectories,
listedFiles.size());
}
}
private static void verifyFileIsListed(Collection<Path> listedFiles,
Path currentDir, Collection<String> fileNames) {
for (String fileName : fileNames) {
final Path file = new Path(currentDir, fileName);
assertTrue(file + " should have been listed", listedFiles.contains(file));
}
}
@Test
public void testCommitByRenameOperations() throws Throwable {
S3AFileSystem fs = getFileSystem();
Path work = path("test-commit-by-rename-" + DEFAULT_DELAY_KEY_SUBSTRING);
Path task00 = new Path(work, "task00");
fs.mkdirs(task00);
String name = "part-00";
try (FSDataOutputStream out =
fs.create(new Path(task00, name), false)) {
out.writeChars("hello");
}
for (FileStatus stat : fs.listStatus(task00)) {
fs.rename(stat.getPath(), work);
}
List<FileStatus> files = new ArrayList<>(2);
for (FileStatus stat : fs.listStatus(work)) {
if (stat.isFile()) {
files.add(stat);
}
}
assertFalse("renamed file " + name + " not found in " + work,
files.isEmpty());
assertEquals("more files found than expected in " + work
+ " " + ls(work), 1, files.size());
FileStatus status = files.get(0);
assertEquals("Wrong filename in " + status,
name, status.getPath().getName());
}
@Test
public void testInconsistentS3ClientDeletes() throws Throwable {
describe("Verify that delete adds tombstones which block entries"
+ " returned in (inconsistent) listings");
// Test only implemented for v2 S3 list API
assumeV2ListAPI();
S3AFileSystem fs = getFileSystem();
Path root = path("testInconsistentS3ClientDeletes-"
+ DEFAULT_DELAY_KEY_SUBSTRING);
for (int i = 0; i < 3; i++) {
fs.mkdirs(new Path(root, "dir-" + i));
touch(fs, new Path(root, "file-" + i));
for (int j = 0; j < 3; j++) {
touch(fs, new Path(new Path(root, "dir-" + i), "file-" + i + "-" + j));
}
}
clearInconsistency(fs);
String key = fs.pathToKey(root) + "/";
LOG.info("Listing objects before executing delete()");
ListObjectsV2Result preDeleteDelimited = listObjectsV2(fs, key, "/");
ListObjectsV2Result preDeleteUndelimited = listObjectsV2(fs, key, null);
LOG.info("Deleting the directory {}", root);
fs.delete(root, true);
LOG.info("Delete completed; listing results which must exclude deleted"
+ " paths");
ListObjectsV2Result postDeleteDelimited = listObjectsV2(fs, key, "/");
boolean stripTombstones = false;
assertObjectSummariesEqual(
"InconsistentAmazonS3Client added back objects incorrectly " +
"in a non-recursive listing",
preDeleteDelimited, postDeleteDelimited,
stripTombstones);
assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a non-recursive listing",
preDeleteDelimited.getCommonPrefixes(),
postDeleteDelimited.getCommonPrefixes());
LOG.info("Executing Deep listing");
ListObjectsV2Result postDeleteUndelimited = listObjectsV2(fs, key, null);
assertObjectSummariesEqual("InconsistentAmazonS3Client added back objects"
+ " incorrectly in a recursive listing",
preDeleteUndelimited, postDeleteUndelimited,
stripTombstones);
assertListSizeEqual("InconsistentAmazonS3Client added back prefixes incorrectly " +
"in a recursive listing",
preDeleteUndelimited.getCommonPrefixes(),
postDeleteUndelimited.getCommonPrefixes()
);
}
private void assertObjectSummariesEqual(final String message,
final ListObjectsV2Result expected,
final ListObjectsV2Result actual,
final boolean stripTombstones) {
assertCollectionsEqual(
message,
stringify(expected.getObjectSummaries(), stripTombstones),
stringify(actual.getObjectSummaries(), stripTombstones));
}
List<String> stringify(List<S3ObjectSummary> objects,
boolean stripTombstones) {
return objects.stream()
.filter(s -> !stripTombstones || !(s.getKey().endsWith("/")))
.map(s -> s.getKey())
.collect(Collectors.toList());
}
/**
* Require the v2 S3 list API.
*/
protected void assumeV2ListAPI() {
Assume.assumeTrue(getConfiguration()
.getInt(LIST_VERSION, DEFAULT_LIST_VERSION) == 2);
}
/**
* Verify that delayed S3 listings doesn't stop the FS from deleting
* a directory tree. This has not always been the case; this test
* verifies the fix and prevents regression.
*/
@Test
public void testDeleteUsesS3Guard() throws Throwable {
describe("Verify that delete() uses S3Guard to get a consistent"
+ " listing of its directory structure");
assumeV2ListAPI();
S3AFileSystem fs = getFileSystem();
Path root = path(
"testDeleteUsesS3Guard-" + DEFAULT_DELAY_KEY_SUBSTRING);
for (int i = 0; i < 3; i++) {
Path path = new Path(root, "file-" + i);
touch(fs, path);
}
// we now expect the listing to miss these
String key = fs.pathToKey(root) + "/";
// verify that the inconsistent listing does not show these
LOG.info("Listing objects before executing delete()");
List<Path> preDeletePaths = objectsToPaths(listObjectsV2(fs, key, null));
Assertions.assertThat(preDeletePaths)
.isEmpty();
// do the delete
fs.delete(root, true);
// now go through every file and verify that it is not there.
// if you comment out the delete above and run this test case,
// the assertion will fail; this is how the validity of the assertions
// were verified.
clearInconsistency(fs);
List<Path> postDeletePaths =
objectsToPaths(listObjectsV2(fs, key, null));
Assertions.assertThat(postDeletePaths)
.isEmpty();
}
private List<Path> objectsToPaths(ListObjectsV2Result r) {
S3AFileSystem fs = getFileSystem();
return r.getObjectSummaries().stream()
.map(s -> fs.keyToQualifiedPath(s.getKey()))
.collect(Collectors.toList());
}
/**
* Tests that the file's eTag and versionId are preserved in recursive
* listings.
*/
@Test
public void testListingReturnsVersionMetadata() throws Throwable {
S3AFileSystem fs = getFileSystem();
// write simple file
Path parent = path(getMethodName());
Path file = new Path(parent, "file1");
try (FSDataOutputStream outputStream = fs.create(file)) {
outputStream.writeChars("hello");
}
// get individual file status
FileStatus[] fileStatuses = fs.listStatus(file);
assertEquals(1, fileStatuses.length);
S3AFileStatus status = (S3AFileStatus) fileStatuses[0];
String eTag = status.getETag();
assertNotNull("Etag in " + eTag, eTag);
String versionId = status.getVersionId();
// get status through recursive directory listing
RemoteIterator<LocatedFileStatus> filesIterator = fs.listFiles(
parent, true);
List<LocatedFileStatus> files = Lists.newArrayList();
while (filesIterator.hasNext()) {
files.add(filesIterator.next());
}
Assertions.assertThat(files)
.hasSize(1);
// ensure eTag and versionId are preserved in directory listing
S3ALocatedFileStatus locatedFileStatus =
(S3ALocatedFileStatus) files.get(0);
assertEquals("etag of " + locatedFileStatus,
eTag, locatedFileStatus.getETag());
assertEquals("versionID of " + locatedFileStatus,
versionId, locatedFileStatus.getVersionId());
}
/**
* Assert that the two collections match using
* object equality of the elements within.
* @param message text for the assertion
* @param expected expected list
* @param actual actual list
* @param <T> type of list
*/
private <T> void assertCollectionsEqual(String message,
Collection<T> expected,
Collection<T> actual) {
Assertions.assertThat(actual)
.describedAs(message)
.containsExactlyInAnyOrderElementsOf(expected);
}
/**
* Assert that the two list sizes match; failure message includes the lists.
* @param message text for the assertion
* @param expected expected list
* @param actual actual list
* @param <T> type of list
*/
private <T> void assertListSizeEqual(String message,
List<T> expected,
List<T> actual) {
String leftContents = expected.stream()
.map(n -> n.toString())
.collect(Collectors.joining("\n"));
String rightContents = actual.stream()
.map(n -> n.toString())
.collect(Collectors.joining("\n"));
String summary = "\nExpected:" + leftContents
+ "\n-----------\n"
+ "Actual:" + rightContents
+ "\n-----------\n";
if (expected.size() != actual.size()) {
LOG.error(message + summary);
}
assertEquals(message + summary, expected.size(), actual.size());
}
/**
* Retrying v2 list directly through the s3 client.
* @param fs filesystem
* @param key key to list under
* @param delimiter any delimiter
* @return the listing
* @throws IOException on error
*/
@Retries.RetryRaw
private ListObjectsV2Result listObjectsV2(S3AFileSystem fs,
String key, String delimiter) throws IOException {
ListObjectsV2Request k = fs.createListObjectsRequest(key, delimiter)
.getV2();
return invoker.retryUntranslated("list", true,
() -> {
return fs.getAmazonS3ClientForTesting("list").listObjectsV2(k);
});
}
}