blob: eaa363bbf19b94bd19f99b57babfc92487d6f763 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.fs.s3a.S3AUtils;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.service.launcher.LauncherExitCodes;
import org.apache.hadoop.test.LambdaTestUtils;
import org.apache.hadoop.util.ExitUtil;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Constants.METADATASTORE_METADATA_TTL;
import static org.apache.hadoop.fs.s3a.Listing.toProvidedFileStatusIterator;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
* Tests for the {@link S3Guard} utility class.
*/
public class TestS3Guard extends Assert {
public static final String MS_FILE_1 = "s3a://bucket/dir/ms-file1";
public static final String MS_FILE_2 = "s3a://bucket/dir/ms-file2";
public static final String S3_FILE_3 = "s3a://bucket/dir/s3-file3";
public static final String S3_DIR_4 = "s3a://bucket/dir/s3-dir4";
public static final Path DIR_PATH = new Path("s3a://bucket/dir");
private MetadataStore ms;
private ITtlTimeProvider timeProvider;
@Before
public void setUp() throws Exception {
final Configuration conf = new Configuration(false);
ms = new LocalMetadataStore();
ms.initialize(conf, new S3Guard.TtlTimeProvider(conf));
timeProvider = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
}
@After
public void tearDown() throws Exception {
if (ms != null) {
ms.destroy();
}
}
/**
* Basic test to ensure results from S3 and MetadataStore are merged
* correctly.
*/
@Test
public void testDirListingUnionNonauth() throws Exception {
// Two files in metadata store listing
PathMetadata m1 = makePathMeta(MS_FILE_1, false);
PathMetadata m2 = makePathMeta(MS_FILE_2, false);
DirListingMetadata dirMeta = new DirListingMetadata(DIR_PATH,
Arrays.asList(m1, m2), false);
// Two other entries in s3
final S3AFileStatus s1Status = makeFileStatus(S3_FILE_3, false);
final S3AFileStatus s2Status = makeFileStatus(S3_DIR_4, true);
List<S3AFileStatus> s3Listing = Arrays.asList(
s1Status,
s2Status);
RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
s3Listing.toArray(new S3AFileStatus[0]));
RemoteIterator<S3AFileStatus> resultItr = S3Guard.dirListingUnion(
ms, DIR_PATH, storeItr, dirMeta, false,
timeProvider, s3AFileStatuses ->
toProvidedFileStatusIterator(dirMetaToStatuses(dirMeta)));
S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
resultItr, new HashSet<>());
assertEquals("listing length", 4, result.length);
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
// check the MS doesn't contain the s3 entries as nonauth
// unions should block them
assertNoRecord(ms, S3_FILE_3);
assertNoRecord(ms, S3_DIR_4);
// for entries which do exist, when updated in S3, the metastore is updated
S3AFileStatus f1Status2 = new S3AFileStatus(
200, System.currentTimeMillis(), new Path(MS_FILE_1),
1, null, "tag2", "ver2");
S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
f1Statuses[0] = f1Status2;
RemoteIterator<S3AFileStatus> itr = toProvidedFileStatusIterator(
f1Statuses);
FileStatus[] result2 = S3AUtils.iteratorToStatuses(
S3Guard.dirListingUnion(
ms, DIR_PATH, itr, dirMeta,
false, timeProvider,
s3AFileStatuses ->
toProvidedFileStatusIterator(
dirMetaToStatuses(dirMeta))),
new HashSet<>());
// the listing returns the new status
Assertions.assertThat(find(result2, MS_FILE_1))
.describedAs("Entry in listing results for %s", MS_FILE_1)
.isSameAs(f1Status2);
// as does a query of the MS
final PathMetadata updatedMD = verifyRecord(ms, MS_FILE_1);
Assertions.assertThat(updatedMD.getFileStatus())
.describedAs("Entry in metastore for %s: %s", MS_FILE_1, updatedMD)
.isEqualTo(f1Status2);
}
/**
* Auth mode unions are different.
*/
@Test
public void testDirListingUnionAuth() throws Exception {
// Two files in metadata store listing
PathMetadata m1 = makePathMeta(MS_FILE_1, false);
PathMetadata m2 = makePathMeta(MS_FILE_2, false);
DirListingMetadata dirMeta = new DirListingMetadata(DIR_PATH,
Arrays.asList(m1, m2), true);
// Two other entries in s3
S3AFileStatus s1Status = makeFileStatus(S3_FILE_3, false);
S3AFileStatus s2Status = makeFileStatus(S3_DIR_4, true);
List<S3AFileStatus> s3Listing = Arrays.asList(
s1Status,
s2Status);
ITtlTimeProvider timeProvider = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
RemoteIterator<S3AFileStatus> storeItr = toProvidedFileStatusIterator(
s3Listing.toArray(new S3AFileStatus[0]));
RemoteIterator<S3AFileStatus> resultItr = S3Guard
.dirListingUnion(ms, DIR_PATH, storeItr, dirMeta,
true, timeProvider,
s3AFileStatuses ->
toProvidedFileStatusIterator(
dirMetaToStatuses(dirMeta)));
S3AFileStatus[] result = S3AUtils.iteratorToStatuses(
resultItr, new HashSet<>());
assertEquals("listing length", 4, result.length);
assertContainsPaths(result, MS_FILE_1, MS_FILE_2, S3_FILE_3, S3_DIR_4);
// now verify an auth scan added the records
PathMetadata file3Meta = verifyRecord(ms, S3_FILE_3);
PathMetadata dir4Meta = verifyRecord(ms, S3_DIR_4);
// we can't check auth flag handling because local FS doesn't have one
// so do just check the dir status still all good.
Assertions.assertThat(dir4Meta)
.describedAs("Metastore entry for dir %s", dir4Meta)
.matches(m -> m.getFileStatus().isDirectory());
DirListingMetadata dirMeta2 = new DirListingMetadata(DIR_PATH,
Arrays.asList(m1, m2, file3Meta, dir4Meta), true);
// now s1 status is updated on S3
S3AFileStatus s1Status2 = new S3AFileStatus(
200, System.currentTimeMillis(), new Path(S3_FILE_3),
1, null, "tag2", "ver2");
S3AFileStatus[] f1Statuses = new S3AFileStatus[1];
f1Statuses[0] = s1Status2;
RemoteIterator<S3AFileStatus> itr =
toProvidedFileStatusIterator(f1Statuses);
FileStatus[] result2 = S3AUtils.iteratorToStatuses(
S3Guard.dirListingUnion(ms, DIR_PATH, itr, dirMeta,
true, timeProvider,
s3AFileStatuses ->
toProvidedFileStatusIterator(
dirMetaToStatuses(dirMeta))),
new HashSet<>());
// but the result of the listing contains the old entry
// because auth mode doesn't pick up changes in S3 which
// didn't go through s3guard
Assertions.assertThat(find(result2, S3_FILE_3))
.describedAs("Entry in listing results for %s", S3_FILE_3)
.isSameAs(file3Meta.getFileStatus());
}
/**
* Assert there is no record in the store.
* @param ms metastore
* @param path path
* @throws IOException IOError
*/
private void assertNoRecord(MetadataStore ms, String path)
throws IOException {
Assertions.assertThat(lookup(ms, path))
.describedAs("Metastore entry for %s", path)
.isNull();
}
/**
* Assert there is arecord in the store, then return it.
* @param ms metastore
* @param path path
* @return the record.
* @throws IOException IO Error
*/
private PathMetadata verifyRecord(MetadataStore ms, String path)
throws IOException {
final PathMetadata md = lookup(ms, path);
Assertions.assertThat(md)
.describedAs("Metastore entry for %s", path)
.isNotNull();
return md;
}
/**
* Look up a record.
* @param ms store
* @param path path
* @return the record or null
* @throws IOException IO Error
*/
private PathMetadata lookup(final MetadataStore ms, final String path)
throws IOException {
return ms.get(new Path(path));
}
@Test
public void testPutWithTtlDirListingMeta() throws Exception {
// arrange
DirListingMetadata dlm = new DirListingMetadata(new Path("/"), null,
false);
MetadataStore ms = spy(MetadataStore.class);
ITtlTimeProvider timeProvider =
mock(ITtlTimeProvider.class);
when(timeProvider.getNow()).thenReturn(100L);
// act
S3Guard.putWithTtl(ms, dlm, Collections.emptyList(), timeProvider, null);
// assert
assertEquals("last update in " + dlm, 100L, dlm.getLastUpdated());
verify(timeProvider, times(1)).getNow();
verify(ms, times(1)).put(dlm, Collections.emptyList(), null);
}
@Test
public void testPutWithTtlFileMeta() throws Exception {
// arrange
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
when(fileStatus.getPath()).thenReturn(new Path("/"));
PathMetadata pm = new PathMetadata(fileStatus);
MetadataStore ms = spy(MetadataStore.class);
ITtlTimeProvider timeProvider =
mock(ITtlTimeProvider.class);
when(timeProvider.getNow()).thenReturn(100L);
// act
S3Guard.putWithTtl(ms, pm, timeProvider, null);
// assert
assertEquals("last update in " + pm, 100L, pm.getLastUpdated());
verify(timeProvider, times(1)).getNow();
verify(ms, times(1)).put(pm, null);
}
@Test
public void testPutWithTtlCollection() throws Exception {
// arrange
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
when(fileStatus.getPath()).thenReturn(new Path("/"));
Collection<PathMetadata> pmCollection = new ArrayList<>();
for (int i = 0; i < 10; i++) {
pmCollection.add(new PathMetadata(fileStatus));
}
MetadataStore ms = spy(MetadataStore.class);
ITtlTimeProvider timeProvider =
mock(ITtlTimeProvider.class);
when(timeProvider.getNow()).thenReturn(100L);
// act
S3Guard.putWithTtl(ms, pmCollection, timeProvider, null);
// assert
pmCollection.forEach(
pm -> assertEquals(100L, pm.getLastUpdated())
);
verify(timeProvider, times(1)).getNow();
verify(ms, times(1)).put(pmCollection, null);
}
@Test
public void testGetWithTtlExpired() throws Exception {
// arrange
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
Path path = new Path("/file");
when(fileStatus.getPath()).thenReturn(path);
PathMetadata pm = new PathMetadata(fileStatus);
pm.setLastUpdated(100L);
MetadataStore ms = mock(MetadataStore.class);
when(ms.get(path, false)).thenReturn(pm);
ITtlTimeProvider timeProvider =
mock(ITtlTimeProvider.class);
when(timeProvider.getNow()).thenReturn(101L);
when(timeProvider.getMetadataTtl()).thenReturn(1L);
// act
final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider,
false, false);
// assert
assertNull(pmExpired);
}
@Test
public void testGetWithTtlNotExpired() throws Exception {
// arrange
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
Path path = new Path("/file");
when(fileStatus.getPath()).thenReturn(path);
PathMetadata pm = new PathMetadata(fileStatus);
pm.setLastUpdated(100L);
MetadataStore ms = mock(MetadataStore.class);
when(ms.get(path, false)).thenReturn(pm);
ITtlTimeProvider timeProvider =
mock(ITtlTimeProvider.class);
when(timeProvider.getNow()).thenReturn(101L);
when(timeProvider.getMetadataTtl()).thenReturn(2L);
// act
final PathMetadata pmNotExpired =
S3Guard.getWithTtl(ms, path, timeProvider, false, false);
// assert
assertNotNull(pmNotExpired);
}
@Test
public void testGetWithZeroLastUpdatedNotExpired() throws Exception {
// arrange
S3AFileStatus fileStatus = mock(S3AFileStatus.class);
Path path = new Path("/file");
when(fileStatus.getPath()).thenReturn(path);
PathMetadata pm = new PathMetadata(fileStatus);
// we set 0 this time as the last updated: can happen eg. when we use an
// old dynamo table
pm.setLastUpdated(0L);
MetadataStore ms = mock(MetadataStore.class);
when(ms.get(path, false)).thenReturn(pm);
ITtlTimeProvider timeProvider =
mock(ITtlTimeProvider.class);
when(timeProvider.getNow()).thenReturn(101L);
when(timeProvider.getMetadataTtl()).thenReturn(2L);
// act
final PathMetadata pmExpired = S3Guard.getWithTtl(ms, path, timeProvider,
false, false);
// assert
assertNotNull(pmExpired);
}
/**
* Makes sure that all uses of TTL timeouts use a consistent time unit.
* @throws Throwable failure
*/
@Test
public void testTTLConstruction() throws Throwable {
// first one
ITtlTimeProvider timeProviderExplicit = new S3Guard.TtlTimeProvider(
DEFAULT_METADATASTORE_METADATA_TTL);
// mirror the FS construction,
// from a config guaranteed to be empty (i.e. the code defval)
Configuration conf = new Configuration(false);
long millitime = conf.getTimeDuration(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
assertEquals(15 * 60_000, millitime);
S3Guard.TtlTimeProvider fsConstruction = new S3Guard.TtlTimeProvider(
millitime);
assertEquals("explicit vs fs construction", timeProviderExplicit,
fsConstruction);
assertEquals("first and second constructor", timeProviderExplicit,
new S3Guard.TtlTimeProvider(conf));
// set the conf to a time without unit
conf.setLong(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL);
assertEquals("first and second time set through long", timeProviderExplicit,
new S3Guard.TtlTimeProvider(conf));
double timeInSeconds = DEFAULT_METADATASTORE_METADATA_TTL / 1000;
double timeInMinutes = timeInSeconds / 60;
String timeStr = String.format("%dm", (int) timeInMinutes);
assertEquals(":wrong time in minutes from " + timeInMinutes,
"15m", timeStr);
conf.set(METADATASTORE_METADATA_TTL, timeStr);
assertEquals("Time in millis as string from "
+ conf.get(METADATASTORE_METADATA_TTL),
timeProviderExplicit,
new S3Guard.TtlTimeProvider(conf));
}
@Test
public void testLogS3GuardDisabled() throws Exception {
final Logger localLogger = LoggerFactory.getLogger(
TestS3Guard.class.toString() + UUID.randomUUID());
S3Guard.logS3GuardDisabled(localLogger,
S3Guard.DisabledWarnLevel.SILENT.toString(), "bucket");
S3Guard.logS3GuardDisabled(localLogger,
S3Guard.DisabledWarnLevel.INFORM.toString(), "bucket");
S3Guard.logS3GuardDisabled(localLogger,
S3Guard.DisabledWarnLevel.WARN.toString(), "bucket");
// Test that lowercase setting is accepted
S3Guard.logS3GuardDisabled(localLogger,
S3Guard.DisabledWarnLevel.WARN.toString()
.toLowerCase(Locale.US), "bucket");
ExitUtil.ExitException ex = LambdaTestUtils.intercept(
ExitUtil.ExitException.class,
String.format(S3Guard.DISABLED_LOG_MSG, "bucket"),
() -> S3Guard.logS3GuardDisabled(
localLogger, S3Guard.DisabledWarnLevel.FAIL.toString(), "bucket"));
if (ex.getExitCode() != LauncherExitCodes.EXIT_BAD_CONFIGURATION) {
throw ex;
}
LambdaTestUtils.intercept(IllegalArgumentException.class,
S3Guard.UNKNOWN_WARN_LEVEL,
() -> S3Guard.logS3GuardDisabled(
localLogger, "FOO_BAR_LEVEL", "bucket"));
}
void assertContainsPaths(FileStatus[] statuses, String...pathStr) {
for (String s :pathStr) {
assertContainsPath(statuses, s);
}
}
void assertContainsPath(FileStatus[] statuses, String pathStr) {
find(statuses, pathStr);
}
/**
* Look up an entry or raise an assertion
* @param statuses list of statuses
* @param pathStr path to search
* @return the entry if found
*/
private FileStatus find(FileStatus[] statuses, String pathStr) {
for (FileStatus s : statuses) {
if (s.getPath().toString().equals(pathStr)) {
return s;
}
}
// no match, fail meaningfully
Assertions.assertThat(statuses)
.anyMatch(s -> s.getPath().toString().equals(pathStr));
return null;
}
private PathMetadata makePathMeta(String pathStr, boolean isDir) {
return new PathMetadata(makeFileStatus(pathStr, isDir));
}
private S3AFileStatus makeFileStatus(String pathStr, boolean isDir) {
Path p = new Path(pathStr);
S3AFileStatus fileStatus;
if (isDir) {
fileStatus = new S3AFileStatus(Tristate.UNKNOWN, p, null);
} else {
fileStatus = new S3AFileStatus(
100, System.currentTimeMillis(), p, 1, null, null, null);
}
return fileStatus;
}
}