blob: 78cedc293ac92877b0bd12268a35a27ed75edda2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Retries;
import org.apache.hadoop.fs.s3a.Retries.RetryTranslated;
import org.apache.hadoop.fs.s3a.S3AFileStatus;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Constants.DEFAULT_AUTHORITATIVE_PATH;
import static org.apache.hadoop.fs.s3a.S3AUtils.createUploadFileStatus;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.authoritativeEmptyDirectoryMarker;
import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_BAD_CONFIGURATION;
/**
* Logic for integrating MetadataStore with S3A.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class S3Guard {
private static final Logger LOG = LoggerFactory.getLogger(S3Guard.class);
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
public static final String S3GUARD_DDB_CLIENT_FACTORY_IMPL =
"fs.s3a.s3guard.ddb.client.factory.impl";
static final Class<? extends DynamoDBClientFactory>
S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT =
DynamoDBClientFactory.DefaultDynamoDBClientFactory.class;
private static final S3AFileStatus[] EMPTY_LISTING = new S3AFileStatus[0];
/**
* Hard-coded policy : {@value}.
* If true, when merging an S3 LIST with S3Guard in non-auth mode,
* only updated entries are added; new entries are left out.
* This policy choice reduces the amount of data stored in Dynamo,
* and hence the complexity of the merge in a non-auth listing.
*/
@VisibleForTesting
public static final boolean DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH = false;
// Utility class. All static functions.
private S3Guard() { }
/* Utility functions. */
/**
* Create a new instance of the configured MetadataStore.
* The returned MetadataStore will have been initialized via
* {@link MetadataStore#initialize(FileSystem, ITtlTimeProvider)}
* by this function before returning it. Callers must clean up by calling
* {@link MetadataStore#close()} when done using the MetadataStore.
*
* @param fs FileSystem whose Configuration specifies which
* implementation to use.
* @param ttlTimeProvider
* @return Reference to new MetadataStore.
* @throws IOException if the metadata store cannot be instantiated
*/
@Retries.OnceTranslated
public static MetadataStore getMetadataStore(FileSystem fs,
ITtlTimeProvider ttlTimeProvider)
throws IOException {
Preconditions.checkNotNull(fs);
Configuration conf = fs.getConf();
Preconditions.checkNotNull(conf);
MetadataStore msInstance;
try {
Class<? extends MetadataStore> msClass = getMetadataStoreClass(conf);
msInstance = ReflectionUtils.newInstance(msClass, conf);
LOG.debug("Using {} metadata store for {} filesystem",
msClass.getSimpleName(), fs.getScheme());
msInstance.initialize(fs, ttlTimeProvider);
return msInstance;
} catch (FileNotFoundException e) {
// Don't log this exception as it means the table doesn't exist yet;
// rely on callers to catch and treat specially
throw e;
} catch (RuntimeException | IOException e) {
String message = "Failed to instantiate metadata store " +
conf.get(S3_METADATA_STORE_IMPL)
+ " defined in " + S3_METADATA_STORE_IMPL
+ ": " + e;
LOG.error(message, e);
if (e instanceof IOException) {
throw e;
} else {
throw new IOException(message, e);
}
}
}
static Class<? extends MetadataStore> getMetadataStoreClass(
Configuration conf) {
if (conf == null) {
return NullMetadataStore.class;
}
if (conf.get(S3_METADATA_STORE_IMPL) != null && LOG.isDebugEnabled()) {
LOG.debug("Metastore option source {}",
(Object)conf.getPropertySources(S3_METADATA_STORE_IMPL));
}
Class<? extends MetadataStore> aClass = conf.getClass(
S3_METADATA_STORE_IMPL, NullMetadataStore.class,
MetadataStore.class);
return aClass;
}
/**
* We update the metastore for the specific case of S3 value == S3Guard value
* so as to place a more recent modtime in the store.
* because if not, we will continue to probe S3 whenever we look for this
* object, even we only do this if confident the S3 status is the same
* as the one in the store (i.e. it is not an older version)
* @param metadataStore MetadataStore to {@code put()} into.
* @param pm current data
* @param s3AFileStatus status to store
* @param timeProvider Time provider to use when writing entries
* @return true if the entry was updated.
* @throws IOException if metadata store update failed
*/
@RetryTranslated
public static boolean refreshEntry(
MetadataStore metadataStore,
PathMetadata pm,
S3AFileStatus s3AFileStatus,
ITtlTimeProvider timeProvider) throws IOException {
// the modtime of the data is the same as/older than the s3guard value
// either an old object has been found, or the existing one was retrieved
// in both cases -return s3guard value
S3AFileStatus msStatus = pm.getFileStatus();
// first check: size
boolean sizeMatch = msStatus.getLen() == s3AFileStatus.getLen();
// etags are expected on all objects, but handle the situation
// that a third party store doesn't serve them.
String s3Etag = s3AFileStatus.getETag();
String pmEtag = msStatus.getETag();
boolean etagsMatch = s3Etag != null && s3Etag.equals(pmEtag);
// version ID: only in some stores, and will be missing in the metastore
// if the entry was created through a list operation.
String s3VersionId = s3AFileStatus.getVersionId();
String pmVersionId = msStatus.getVersionId();
boolean versionsMatchOrMissingInMetastore =
pmVersionId == null || pmVersionId.equals(s3VersionId);
if (sizeMatch && etagsMatch && versionsMatchOrMissingInMetastore) {
// update the store, return the new value
LOG.debug("Refreshing the metastore entry/timestamp");
putAndReturn(metadataStore, s3AFileStatus, timeProvider);
return true;
}
return false;
}
/**
* Helper function which puts a given S3AFileStatus into the MetadataStore and
* returns the same S3AFileStatus. Instrumentation monitors the put operation.
* @param ms MetadataStore to {@code put()} into.
* @param status status to store
* @param timeProvider Time provider to use when writing entries
* @return The same status as passed in
* @throws IOException if metadata store update failed
*/
@RetryTranslated
public static S3AFileStatus putAndReturn(MetadataStore ms,
S3AFileStatus status,
ITtlTimeProvider timeProvider) throws IOException {
return putAndReturn(ms, status, timeProvider, null);
}
/**
* Helper function which puts a given S3AFileStatus into the MetadataStore and
* returns the same S3AFileStatus. Instrumentation monitors the put operation.
* @param ms MetadataStore to {@code put()} into.
* @param status status to store
* @param timeProvider Time provider to use when writing entries
* @param operationState possibly-null metastore state tracker.
* @return The same status as passed in
* @throws IOException if metadata store update failed
*/
@RetryTranslated
public static S3AFileStatus putAndReturn(
final MetadataStore ms,
final S3AFileStatus status,
final ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState) throws IOException {
long startTimeNano = System.nanoTime();
try {
putWithTtl(ms, new PathMetadata(status), timeProvider, operationState);
} finally {
ms.getInstrumentation().entryAdded((System.nanoTime() - startTimeNano));
}
return status;
}
/**
* Creates an authoritative directory marker for the store.
* @param ms MetadataStore to {@code put()} into.
* @param status status to store
* @param timeProvider Time provider to use when writing entries
* @param operationState possibly-null metastore state tracker.
* @throws IOException if metadata store update failed
*/
@RetryTranslated
public static void putAuthDirectoryMarker(
final MetadataStore ms,
final S3AFileStatus status,
final ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState) throws IOException {
long startTimeNano = System.nanoTime();
try {
final PathMetadata fileMeta = authoritativeEmptyDirectoryMarker(status);
putWithTtl(ms, fileMeta, timeProvider, operationState);
} finally {
ms.getInstrumentation().directoryMarkedAuthoritative();
ms.getInstrumentation().entryAdded((System.nanoTime() - startTimeNano));
}
}
/**
* Initiate a bulk write and create an operation state for it.
* This may then be passed into put operations.
* @param metastore store
* @param operation the type of the operation.
* @param path path under which updates will be explicitly put.
* @return a store-specific state to pass into the put operations, or null
* @throws IOException failure
*/
public static BulkOperationState initiateBulkWrite(
@Nullable final MetadataStore metastore,
final BulkOperationState.OperationType operation,
final Path path) throws IOException {
Preconditions.checkArgument(
operation != BulkOperationState.OperationType.Rename,
"Rename operations cannot be started through initiateBulkWrite");
if (metastore == null || isNullMetadataStore(metastore)) {
return null;
} else {
return metastore.initiateBulkWrite(operation, path);
}
}
/**
* Convert the data of a directory listing to an array of {@link FileStatus}
* entries. Tombstones are filtered out at this point. If the listing is null
* an empty array is returned.
* @param dirMeta directory listing -may be null
* @return a possibly-empty array of file status entries
*/
public static S3AFileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) {
if (dirMeta == null) {
return EMPTY_LISTING;
}
Collection<PathMetadata> listing = dirMeta.getListing();
List<FileStatus> statuses = new ArrayList<>();
for (PathMetadata pm : listing) {
if (!pm.isDeleted()) {
statuses.add(pm.getFileStatus());
}
}
return statuses.toArray(new S3AFileStatus[0]);
}
/**
* Given directory listing metadata from both the backing store and the
* MetadataStore, merge the two sources of truth to create a consistent
* view of the current directory contents, which can be returned to clients.
*
* Also update the MetadataStore to reflect the resulting directory listing.
*
* In not authoritative case: update file metadata if mod_time in listing
* of a file is greater then what is currently in the ms
*
* @param ms MetadataStore to use.
* @param path path to directory
* @param backingStatuses Directory listing from the backing store.
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param isAuthoritative State of authoritative mode
* @param timeProvider Time provider to use when updating entries
* @param toStatusItr function to convert array of file status to
* RemoteIterator.
* @return Final result of directory listing.
* @throws IOException if metadata store update failed
*/
public static RemoteIterator<S3AFileStatus> dirListingUnion(
MetadataStore ms, Path path,
RemoteIterator<S3AFileStatus> backingStatuses,
DirListingMetadata dirMeta, boolean isAuthoritative,
ITtlTimeProvider timeProvider,
Function<S3AFileStatus[], RemoteIterator<S3AFileStatus>> toStatusItr)
throws IOException {
// Fast-path for NullMetadataStore
if (isNullMetadataStore(ms)) {
return backingStatuses;
}
assertQualified(path);
if (dirMeta == null) {
// The metadataStore had zero state for this directory
dirMeta = new DirListingMetadata(path, DirListingMetadata.EMPTY_DIR,
false);
}
// Since we treat the MetadataStore as a "fresher" or "consistent" view
// of metadata, we always use its metadata first.
// Since the authoritative case is already handled outside this function,
// we will basically start with the set of directory entries in the
// DirListingMetadata, and add any that only exist in the backingStatuses.
//
// We try to avoid writing any more child entries than need be to :-
// (a) save time and money.
// (b) avoid overwriting the authoritative bit of children (HADOOP-16746).
// For auth mode updates, we supply the full listing and a list of which
// child entries have not been changed; the store gets to optimize its
// update however it chooses.
//
// for non-auth-mode S3Guard, we just build a list of entries to add and
// submit them in a batch; this is more efficient than trickling out the
// updates one-by-one.
BulkOperationState operationState = ms.initiateBulkWrite(
BulkOperationState.OperationType.Listing,
path);
if (isAuthoritative) {
authoritativeUnion(ms, path, backingStatuses, dirMeta,
timeProvider, operationState);
} else {
nonAuthoritativeUnion(ms, path, backingStatuses, dirMeta,
timeProvider, operationState);
}
IOUtils.cleanupWithLogger(LOG, operationState);
return toStatusItr.apply(dirMetaToStatuses(dirMeta));
}
/**
* Perform the authoritative union operation.
* Here all updated/missing entries are added back; we take care
* not to overwrite unchanged entries as that will lose their
* isAuthoritative bit (HADOOP-16746).
* @param ms MetadataStore to use.
* @param path path to directory
* @param backingStatuses Directory listing from the backing store.
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param timeProvider Time provider to use when updating entries
* @param operationState ongoing operation
* @throws IOException if metadata store update failed
*/
private static void authoritativeUnion(
final MetadataStore ms,
final Path path,
final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
// track all unchanged entries; used so the metastore can identify entries
// it doesn't need to update
List<Path> unchangedEntries = new ArrayList<>(dirMeta.getListing().size());
boolean changed = !dirMeta.isAuthoritative();
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
while (backingStatuses.hasNext()) {
S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
}
// this is built up to be whatever entry is to be added to the dirMeta
// collection
PathMetadata pathMetadata = dirMetaMap.get(statusPath);
if (pathMetadata == null) {
// there's no entry in the listing, so create one.
pathMetadata = new PathMetadata(s);
} else {
// no change -add the path to the list of unchangedEntries
unchangedEntries.add(statusPath);
}
// Minor race condition here. Multiple threads could add to this
// mutable DirListingMetadata. Since it is backed by a
// ConcurrentHashMap, the last put() wins.
// More concerning is two threads racing on listStatus() and delete().
// Any FileSystem has similar race conditions, but we could persist
// a stale entry longer. We could expose an atomic
// DirListingMetadata#putIfNotPresent()
changed |= dirMeta.put(pathMetadata);
}
if (changed) {
// in an authoritative update, we pass in the full list of entries,
// but do declare which have not changed to avoid needless and potentially
// destructive overwrites.
LOG.debug("Marking the directory {} as authoritative", path);
ms.getInstrumentation().directoryMarkedAuthoritative();
dirMeta.setAuthoritative(true); // This is the full directory contents
// write the updated dir entry and any changed children.
S3Guard.putWithTtl(ms, dirMeta, unchangedEntries, timeProvider, operationState);
}
}
/**
* Perform the authoritative union operation.
* @param ms MetadataStore to use.
* @param path path to directory
* @param backingStatuses Directory listing from the backing store.
* @param dirMeta Directory listing from MetadataStore. May be null.
* @param timeProvider Time provider to use when updating entries
* @param operationState ongoing operation
* @throws IOException if metadata store update failed
*/
private static void nonAuthoritativeUnion(
final MetadataStore ms,
final Path path,
final RemoteIterator<S3AFileStatus> backingStatuses,
final DirListingMetadata dirMeta,
final ITtlTimeProvider timeProvider,
final BulkOperationState operationState) throws IOException {
List<PathMetadata> entriesToAdd = new ArrayList<>();
Set<Path> deleted = dirMeta.listTombstones();
final Map<Path, PathMetadata> dirMetaMap = dirMeta.getListing().stream()
.collect(Collectors.toMap(pm -> pm.getFileStatus().getPath(), pm -> pm));
while (backingStatuses.hasNext()) {
S3AFileStatus s = backingStatuses.next();
final Path statusPath = s.getPath();
if (deleted.contains(statusPath)) {
continue;
}
// this is the record in dynamo
PathMetadata pathMetadata = dirMetaMap.get(statusPath);
// in non-auth listings, we compare the file status of the metastore
// list with those in the FS, and overwrite the MS entry if
// either of two conditions are met
// - there is no entry in the metastore and
// DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH is compiled to true
// - there is an entry in the metastore the FS entry is newer.
boolean shouldUpdate;
if (pathMetadata != null) {
// entry is in DDB; check modification time
shouldUpdate = s.getModificationTime() > (pathMetadata.getFileStatus())
.getModificationTime();
// create an updated record.
pathMetadata = new PathMetadata(s);
} else {
// entry is not present. Create for insertion into dirMeta
pathMetadata = new PathMetadata(s);
// use hard-coded policy about updating
shouldUpdate = DIR_MERGE_UPDATES_ALL_RECORDS_NONAUTH;
}
if (shouldUpdate) {
// we do want to update DDB and the listing with a new entry.
LOG.debug("Update ms with newer metadata of: {}", s);
// ensure it gets into the dirListing
// add to the list of entries to add later,
entriesToAdd.add(pathMetadata);
}
// add the entry to the union; no-op if it was already there.
dirMeta.put(pathMetadata);
}
if (!entriesToAdd.isEmpty()) {
// non-auth, just push out the updated entry list
LOG.debug("Adding {} entries under directory {}", entriesToAdd.size(), path);
putWithTtl(ms, entriesToAdd, timeProvider, operationState);
}
}
/**
* Although NullMetadataStore does nothing, callers may wish to avoid work
* (fast path) when the NullMetadataStore is in use.
* @param ms The MetadataStore to test
* @return true iff the MetadataStore is the null, or no-op, implementation.
*/
public static boolean isNullMetadataStore(MetadataStore ms) {
return (ms instanceof NullMetadataStore);
}
/**
* Update MetadataStore to reflect creation of the given directories.
*
* If an IOException is raised while trying to update the entry, this
* operation catches the exception, swallows it and returns.
*
* @deprecated this is no longer called by {@code S3AFilesystem.innerMkDirs}.
* See: HADOOP-15079 (January 2018).
* It is currently retained because of its discussion in the method on
* atomicity and in case we need to reinstate it or adapt the current
* process of directory marker creation.
* But it is not being tested and so may age with time...consider
* deleting it in future if it's clear there's no need for it.
* @param ms MetadataStore to update.
* @param dirs null, or an ordered list of directories from leaf to root.
* E.g. if /a/ exists, and mkdirs(/a/b/c/d) is called, this
* list will contain [/a/b/c/d, /a/b/c, /a/b]. /a/b/c/d is
* an empty, dir, and the other dirs only contain their child
* dir.
* @param owner Hadoop user name.
* @param authoritative Whether to mark new directories as authoritative.
* @param timeProvider Time provider.
*/
@Deprecated
@Retries.OnceExceptionsSwallowed
public static void makeDirsOrdered(MetadataStore ms, List<Path> dirs,
String owner, boolean authoritative, ITtlTimeProvider timeProvider) {
if (dirs == null) {
return;
}
/* We discussed atomicity of this implementation.
* The concern is that multiple clients could race to write different
* cached directories to the MetadataStore. Two solutions are proposed:
* 1. Move mkdirs() into MetadataStore interface and let implementations
* ensure they are atomic.
* 2. Specify that the semantics of MetadataStore#putListStatus() is
* always additive, That is, if MetadataStore has listStatus() state
* for /a/b that contains [/a/b/file0, /a/b/file1], and we then call
* putListStatus(/a/b -> [/a/b/file2, /a/b/file3], isAuthoritative=true),
* then we will end up with final state of
* [/a/b/file0, /a/b/file1, /a/b/file2, /a/b/file3], isAuthoritative =
* true
*/
S3AFileStatus prevStatus = null;
// Use new batched put to reduce round trips.
List<PathMetadata> pathMetas = new ArrayList<>(dirs.size());
try {
// Iterate from leaf to root
for (int i = 0; i < dirs.size(); i++) {
boolean isLeaf = (prevStatus == null);
Path f = dirs.get(i);
assertQualified(f);
S3AFileStatus status =
createUploadFileStatus(f, true, 0, 0, owner, null, null);
// We only need to put a DirListingMetadata if we are setting
// authoritative bit
DirListingMetadata dirMeta = null;
if (authoritative) {
Collection<PathMetadata> children;
if (isLeaf) {
children = DirListingMetadata.EMPTY_DIR;
} else {
children = new ArrayList<>(1);
children.add(new PathMetadata(prevStatus));
}
dirMeta = new DirListingMetadata(f, children, authoritative);
S3Guard.putWithTtl(ms, dirMeta, Collections.emptyList(), timeProvider, null);
}
pathMetas.add(new PathMetadata(status));
prevStatus = status;
}
// Batched put
S3Guard.putWithTtl(ms, pathMetas, timeProvider, null);
} catch (IOException ioe) {
LOG.error("MetadataStore#put() failure:", ioe);
}
}
/**
* Helper function that records the move of directory paths, adding
* resulting metadata to the supplied lists.
* Does not store in MetadataStore.
* @param ms MetadataStore, used to make this a no-op, when it is
* NullMetadataStore.
* @param srcPaths stores the source path here
* @param dstMetas stores destination metadata here
* @param srcPath source path to store
* @param dstPath destination path to store
* @param owner file owner to use in created records
*/
public static void addMoveDir(MetadataStore ms, Collection<Path> srcPaths,
Collection<PathMetadata> dstMetas, Path srcPath, Path dstPath,
String owner) {
if (isNullMetadataStore(ms)) {
return;
}
assertQualified(srcPath, dstPath);
S3AFileStatus dstStatus = createUploadFileStatus(dstPath, true, 0,
0, owner, null, null);
addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
}
/**
* Like {@link #addMoveDir(MetadataStore, Collection, Collection, Path,
* Path, String)} (), but for files.
* @param ms MetadataStore, used to make this a no-op, when it is
* NullMetadataStore.
* @param srcPaths stores the source path here
* @param dstMetas stores destination metadata here
* @param srcPath source path to store
* @param dstPath destination path to store
* @param size length of file moved
* @param blockSize blocksize to associate with destination file
* @param owner file owner to use in created records
* @param eTag the s3 object eTag of file moved
* @param versionId the s3 object versionId of file moved
*/
public static void addMoveFile(MetadataStore ms, Collection<Path> srcPaths,
Collection<PathMetadata> dstMetas, Path srcPath, Path dstPath,
long size, long blockSize, String owner, String eTag, String versionId) {
if (isNullMetadataStore(ms)) {
return;
}
assertQualified(srcPath, dstPath);
S3AFileStatus dstStatus = createUploadFileStatus(dstPath, false,
size, blockSize, owner, eTag, versionId);
addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus);
}
/**
* Helper method that records the move of all ancestors of a path.
*
* In S3A, an optimization is to delete unnecessary fake directory objects if
* the directory is non-empty. In that case, for a nested child to move, S3A
* is not listing and thus moving all its ancestors (up to source root). So we
* take care of those inferred directories of this path explicitly.
*
* As {@link #addMoveFile} and {@link #addMoveDir}, this method adds resulting
* metadata to the supplied lists. It does not update the MetadataStore.
*
* @param ms MetadataStore, no-op if it is NullMetadataStore
* @param srcPaths stores the source path here
* @param dstMetas stores destination metadata here
* @param srcRoot source root up to which (exclusive) should we add ancestors
* @param srcPath source path of the child to add ancestors
* @param dstPath destination path of the child to add ancestors
* @param owner Hadoop user name
*/
public static void addMoveAncestors(MetadataStore ms,
Collection<Path> srcPaths, Collection<PathMetadata> dstMetas,
Path srcRoot, Path srcPath, Path dstPath, String owner) {
if (isNullMetadataStore(ms)) {
return;
}
assertQualified(srcRoot, srcPath, dstPath);
if (srcPath.equals(srcRoot)) {
LOG.debug("Skip moving ancestors of source root directory {}", srcRoot);
return;
}
Path parentSrc = srcPath.getParent();
Path parentDst = dstPath.getParent();
while (parentSrc != null
&& !parentSrc.isRoot()
&& !parentSrc.equals(srcRoot)
&& !srcPaths.contains(parentSrc)) {
LOG.debug("Renaming non-listed parent {} to {}", parentSrc, parentDst);
S3Guard.addMoveDir(ms, srcPaths, dstMetas, parentSrc, parentDst, owner);
parentSrc = parentSrc.getParent();
parentDst = parentDst.getParent();
}
}
/**
* This adds all new ancestors of a path as directories.
* This forwards to
* {@link MetadataStore#addAncestors(Path, BulkOperationState)}.
* <p>
* Originally it implemented the logic to probe for an add ancestors,
* but with the addition of a store-specific bulk operation state
* it became unworkable.
*
* @param metadataStore store
* @param qualifiedPath path to update
* @param operationState (nullable) operational state for a bulk update
* @throws IOException failure
*/
@Retries.RetryTranslated
public static void addAncestors(
final MetadataStore metadataStore,
final Path qualifiedPath,
final ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState) throws IOException {
metadataStore.addAncestors(qualifiedPath, operationState);
}
/**
* Add the fact that a file was moved from a source path to a destination.
* @param srcPaths collection of source paths to update
* @param dstMetas collection of destination meta data entries to update.
* @param srcPath path of the source file.
* @param dstStatus status of the source file after it was copied.
*/
private static void addMoveStatus(Collection<Path> srcPaths,
Collection<PathMetadata> dstMetas,
Path srcPath,
S3AFileStatus dstStatus) {
srcPaths.add(srcPath);
dstMetas.add(new PathMetadata(dstStatus));
}
/**
* Assert that the path is qualified with a host and scheme.
* @param p path to check
* @throws NullPointerException if either argument does not hold
*/
public static void assertQualified(Path p) {
URI uri = p.toUri();
// Paths must include bucket in case MetadataStore is shared between
// multiple S3AFileSystem instances
Preconditions.checkNotNull(uri.getHost(), "Null host in " + uri);
// This should never fail, but is retained for completeness.
Preconditions.checkNotNull(uri.getScheme(), "Null scheme in " + uri);
}
/**
* Assert that all paths are valid.
* @param paths path to check
* @throws NullPointerException if either argument does not hold
*/
public static void assertQualified(Path...paths) {
for (Path path : paths) {
assertQualified(path);
}
}
/**
* Runtime implementation for TTL Time Provider interface.
*/
public static class TtlTimeProvider implements ITtlTimeProvider {
private long authoritativeDirTtl;
public TtlTimeProvider(long authoritativeDirTtl) {
this.authoritativeDirTtl = authoritativeDirTtl;
}
public TtlTimeProvider(Configuration conf) {
this.authoritativeDirTtl =
conf.getTimeDuration(METADATASTORE_METADATA_TTL,
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
}
@Override
public long getNow() {
return System.currentTimeMillis();
}
@Override public long getMetadataTtl() {
return authoritativeDirTtl;
}
@Override
public boolean equals(final Object o) {
if (this == o) { return true; }
if (o == null || getClass() != o.getClass()) { return false; }
final TtlTimeProvider that = (TtlTimeProvider) o;
return authoritativeDirTtl == that.authoritativeDirTtl;
}
@Override
public int hashCode() {
return Objects.hash(authoritativeDirTtl);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"TtlTimeProvider{");
sb.append("authoritativeDirTtl=").append(authoritativeDirTtl);
sb.append(" millis}");
return sb.toString();
}
}
/**
* Put a directory entry, setting the updated timestamp of the
* directory and its children.
* @param ms metastore
* @param dirMeta directory
* @param unchangedEntries list of unchanged entries from the listing
* @param timeProvider nullable time provider
* @throws IOException failure.
*/
public static void putWithTtl(MetadataStore ms,
DirListingMetadata dirMeta,
final List<Path> unchangedEntries,
final ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState)
throws IOException {
long now = timeProvider.getNow();
dirMeta.setLastUpdated(now);
dirMeta.getListing()
.forEach(pm -> pm.setLastUpdated(now));
ms.put(dirMeta, unchangedEntries, operationState);
}
/**
* Put an entry, using the time provider to set its timestamp.
* @param ms metastore
* @param fileMeta entry to write
* @param timeProvider nullable time provider
* @param operationState nullable state for a bulk update
* @throws IOException failure.
*/
public static void putWithTtl(MetadataStore ms, PathMetadata fileMeta,
@Nullable ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState) throws IOException {
if (timeProvider != null) {
fileMeta.setLastUpdated(timeProvider.getNow());
} else {
LOG.debug("timeProvider is null, put {} without setting last_updated",
fileMeta);
}
ms.put(fileMeta, operationState);
}
/**
* Put entries, using the time provider to set their timestamp.
* @param ms metastore
* @param fileMetas file metadata entries.
* @param timeProvider nullable time provider
* @param operationState nullable state for a bulk update
* @throws IOException failure.
*/
public static void putWithTtl(MetadataStore ms,
Collection<? extends PathMetadata> fileMetas,
@Nullable ITtlTimeProvider timeProvider,
@Nullable final BulkOperationState operationState)
throws IOException {
patchLastUpdated(fileMetas, timeProvider);
ms.put(fileMetas, operationState);
}
/**
* Patch any collection of metadata entries with the timestamp
* of a time provider.
* This <i>MUST</i> be used when creating new entries for directories.
* @param fileMetas file metadata entries.
* @param timeProvider nullable time provider
*/
static void patchLastUpdated(
final Collection<? extends PathMetadata> fileMetas,
@Nullable final ITtlTimeProvider timeProvider) {
if (timeProvider != null) {
final long now = timeProvider.getNow();
fileMetas.forEach(fileMeta -> fileMeta.setLastUpdated(now));
} else {
LOG.debug("timeProvider is null, put {} without setting last_updated",
fileMetas);
}
}
/**
* Get a path entry provided it is not considered expired.
* If the allowAuthoritative flag is true, return without
* checking for TTL expiry.
* @param ms metastore
* @param path path to look up.
* @param timeProvider nullable time provider
* @param needEmptyDirectoryFlag if true, implementation will
* return known state of directory emptiness.
* @param allowAuthoritative if this flag is true, the ttl won't apply to the
* metadata - so it will be returned regardless of it's expiry.
* @return the metadata or null if there as no entry.
* @throws IOException failure.
*/
public static PathMetadata getWithTtl(MetadataStore ms, Path path,
@Nullable ITtlTimeProvider timeProvider,
final boolean needEmptyDirectoryFlag,
final boolean allowAuthoritative) throws IOException {
final PathMetadata pathMetadata = ms.get(path, needEmptyDirectoryFlag);
// if timeProvider is null let's return with what the ms has
if (timeProvider == null) {
LOG.debug("timeProvider is null, returning pathMetadata as is");
return pathMetadata;
}
// authoritative mode is enabled for this directory, return what the ms has
if (allowAuthoritative) {
LOG.debug("allowAuthoritative is true, returning pathMetadata as is");
return pathMetadata;
}
long ttl = timeProvider.getMetadataTtl();
if (pathMetadata != null) {
// Special case: the path metadata's last updated is 0. This can happen
// eg. with an old db using this implementation
if (pathMetadata.getLastUpdated() == 0) {
LOG.debug("PathMetadata TTL for {} is 0, so it will be returned as "
+ "not expired.", path);
return pathMetadata;
}
if (!pathMetadata.isExpired(ttl, timeProvider.getNow())) {
return pathMetadata;
} else {
LOG.debug("PathMetadata TTl for {} is expired in metadata store"
+ " -removing entry", path);
// delete the tombstone
ms.forgetMetadata(path);
return null;
}
}
return null;
}
/**
* List children; mark the result as non-auth if the TTL has expired.
* If the allowAuthoritative flag is true, return without filtering or
* checking for TTL expiry.
* If false: the expiry scan takes place and the
* TODO: should we always purge tombstones? Even in auth?
* @param ms metastore
* @param path path to look up.
* @param timeProvider nullable time provider
* @param allowAuthoritative if this flag is true, the ttl won't apply to the
* metadata - so it will be returned regardless of it's expiry.
* @return the listing of entries under a path, or null if there as no entry.
* @throws IOException failure.
*/
@RetryTranslated
public static DirListingMetadata listChildrenWithTtl(MetadataStore ms,
Path path,
@Nullable ITtlTimeProvider timeProvider,
boolean allowAuthoritative)
throws IOException {
DirListingMetadata dlm = ms.listChildren(path);
if (timeProvider == null) {
LOG.debug("timeProvider is null, returning DirListingMetadata as is");
return dlm;
}
if (allowAuthoritative) {
LOG.debug("allowAuthoritative is true, returning pathMetadata as is");
return dlm;
}
// filter expired entries
if (dlm != null) {
List<PathMetadata> expired = dlm.removeExpiredEntriesFromListing(
timeProvider.getMetadataTtl(),
timeProvider.getNow());
// now purge the tombstones
for (PathMetadata metadata : expired) {
if (metadata.isDeleted()) {
ms.forgetMetadata(metadata.getFileStatus().getPath());
}
}
}
return dlm;
}
public static Collection<String> getAuthoritativePaths(S3AFileSystem fs) {
return getAuthoritativePaths(
fs.getUri(),
fs.getConf(),
p -> fs.maybeAddTrailingSlash(fs.qualify(p).toString()));
}
/**
* Get the authoritative paths of a filesystem.
*
* @param uri FS URI
* @param conf configuration
* @param qualifyToDir a qualification operation
* @return list of URIs valid for this FS.
*/
@VisibleForTesting
static Collection<String> getAuthoritativePaths(
final URI uri,
final Configuration conf,
final Function<Path, String> qualifyToDir) {
String[] rawAuthoritativePaths =
conf.getTrimmedStrings(AUTHORITATIVE_PATH, DEFAULT_AUTHORITATIVE_PATH);
Collection<String> authoritativePaths = new ArrayList<>();
if (rawAuthoritativePaths.length > 0) {
for (int i = 0; i < rawAuthoritativePaths.length; i++) {
Path path = new Path(rawAuthoritativePaths[i]);
URI pathURI = path.toUri();
if (pathURI.getAuthority() != null &&
!pathURI.getAuthority().equals(uri.getAuthority())) {
// skip on auth
continue;
}
if (pathURI.getScheme() != null &&
!pathURI.getScheme().equals(uri.getScheme())) {
// skip on auth
continue;
}
authoritativePaths.add(qualifyToDir.apply(path));
}
}
return authoritativePaths;
}
/**
* Is the path for the given FS instance authoritative?
* @param p path
* @param fs filesystem
* @param authMetadataStore is the MS authoritative.
* @param authPaths possibly empty list of authoritative paths
* @return true iff the path is authoritative
*/
public static boolean allowAuthoritative(Path p, S3AFileSystem fs,
boolean authMetadataStore, Collection<String> authPaths) {
String haystack = fs.maybeAddTrailingSlash(fs.qualify(p).toString());
if (authMetadataStore) {
return true;
}
if (!authPaths.isEmpty()) {
for (String needle : authPaths) {
if (haystack.startsWith(needle)) {
return true;
}
}
}
return false;
}
/**
* Format string to use when warning that S3Guard is disabled.
*/
@VisibleForTesting
public static final String DISABLED_LOG_MSG =
"S3Guard is disabled on this bucket: %s";
/**
* Error string use in exception raised on an unknown log level.
*/
public static final String UNKNOWN_WARN_LEVEL =
"Unknown " + S3GUARD_DISABLED_WARN_LEVEL + " value: ";
/**
* Warning levels to use when reporting S3Guard as disabled.
*/
public enum DisabledWarnLevel {
SILENT,
INFORM,
WARN,
FAIL
}
/**
* Log that S3Guard is disabled -optionally raise an exception.
* @param logger Log to log to
* @param warnLevelStr string value of warn action.
* @param bucket bucket to use in log/error messages
* @throws ExitUtil.ExitException if s3guard was disabled
* and the log level is "fail"
* @throws IllegalArgumentException unknown warning level.
*/
public static void logS3GuardDisabled(Logger logger, String warnLevelStr,
String bucket)
throws ExitUtil.ExitException, IllegalArgumentException {
final DisabledWarnLevel warnLevel;
try {
warnLevel = DisabledWarnLevel.valueOf(warnLevelStr.toUpperCase(Locale.US));
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(UNKNOWN_WARN_LEVEL + warnLevelStr, e);
}
String text = String.format(DISABLED_LOG_MSG, bucket);
switch (warnLevel) {
case SILENT:
logger.debug(text);
break;
case INFORM:
logger.info(text);
break;
case WARN:
logger.warn(text);
break;
case FAIL:
logger.error(text);
throw new ExitUtil.ExitException(EXIT_BAD_CONFIGURATION, text);
default:
throw new IllegalArgumentException(UNKNOWN_WARN_LEVEL + warnLevelStr);
}
}
}