| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a.s3guard; |
| |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.net.URI; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.Set; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.s3a.S3AFileStatus; |
| import org.apache.hadoop.fs.s3a.S3AInstrumentation; |
| import org.apache.hadoop.fs.s3a.Tristate; |
| import org.apache.hadoop.util.ReflectionUtils; |
| |
| import static org.apache.hadoop.fs.s3a.Constants.S3_METADATA_STORE_IMPL; |
| import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_LATENCY; |
| import static org.apache.hadoop.fs.s3a.Statistic.S3GUARD_METADATASTORE_PUT_PATH_REQUEST; |
| import static org.apache.hadoop.fs.s3a.S3AUtils.createUploadFileStatus; |
| |
| /** |
| * Logic for integrating MetadataStore with S3A. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| public final class S3Guard { |
| private static final Logger LOG = LoggerFactory.getLogger(S3Guard.class); |
| |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| @VisibleForTesting |
| public static final String S3GUARD_DDB_CLIENT_FACTORY_IMPL = |
| "fs.s3a.s3guard.ddb.client.factory.impl"; |
| |
| static final Class<? extends DynamoDBClientFactory> |
| S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT = |
| DynamoDBClientFactory.DefaultDynamoDBClientFactory.class; |
| private static final FileStatus[] EMPTY_LISTING = new FileStatus[0]; |
| |
| // Utility class. All static functions. |
| private S3Guard() { } |
| |
| /* Utility functions. */ |
| |
| /** |
| * Create a new instance of the configured MetadataStore. |
| * The returned MetadataStore will have been initialized via |
| * {@link MetadataStore#initialize(FileSystem)} by this function before |
| * returning it. Callers must clean up by calling |
| * {@link MetadataStore#close()} when done using the MetadataStore. |
| * |
| * @param fs FileSystem whose Configuration specifies which |
| * implementation to use. |
| * @return Reference to new MetadataStore. |
| * @throws IOException if the metadata store cannot be instantiated |
| */ |
| public static MetadataStore getMetadataStore(FileSystem fs) |
| throws IOException { |
| Preconditions.checkNotNull(fs); |
| Configuration conf = fs.getConf(); |
| Preconditions.checkNotNull(conf); |
| MetadataStore msInstance; |
| try { |
| Class<? extends MetadataStore> msClass = getMetadataStoreClass(conf); |
| msInstance = ReflectionUtils.newInstance(msClass, conf); |
| LOG.debug("Using {} metadata store for {} filesystem", |
| msClass.getSimpleName(), fs.getScheme()); |
| msInstance.initialize(fs); |
| return msInstance; |
| } catch (FileNotFoundException e) { |
| // Don't log this exception as it means the table doesn't exist yet; |
| // rely on callers to catch and treat specially |
| throw e; |
| } catch (RuntimeException | IOException e) { |
| String message = "Failed to instantiate metadata store " + |
| conf.get(S3_METADATA_STORE_IMPL) |
| + " defined in " + S3_METADATA_STORE_IMPL |
| + ": " + e; |
| LOG.error(message, e); |
| if (e instanceof IOException) { |
| throw e; |
| } else { |
| throw new IOException(message, e); |
| } |
| } |
| } |
| |
| static Class<? extends MetadataStore> getMetadataStoreClass( |
| Configuration conf) { |
| if (conf == null) { |
| return NullMetadataStore.class; |
| } |
| if (conf.get(S3_METADATA_STORE_IMPL) != null && LOG.isDebugEnabled()) { |
| LOG.debug("Metastore option source {}", |
| conf.getPropertySources(S3_METADATA_STORE_IMPL)); |
| } |
| |
| Class<? extends MetadataStore> aClass = conf.getClass( |
| S3_METADATA_STORE_IMPL, NullMetadataStore.class, |
| MetadataStore.class); |
| return aClass; |
| } |
| |
| |
| /** |
| * Helper function which puts a given S3AFileStatus into the MetadataStore and |
| * returns the same S3AFileStatus. Instrumentation monitors the put operation. |
| * @param ms MetadataStore to {@code put()} into. |
| * @param status status to store |
| * @param instrumentation instrumentation of the s3a file system |
| * @return The same status as passed in |
| * @throws IOException if metadata store update failed |
| */ |
| public static S3AFileStatus putAndReturn(MetadataStore ms, |
| S3AFileStatus status, |
| S3AInstrumentation instrumentation) throws IOException { |
| long startTimeNano = System.nanoTime(); |
| ms.put(new PathMetadata(status)); |
| instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_PUT_PATH_LATENCY, |
| (System.nanoTime() - startTimeNano)); |
| instrumentation.incrementCounter(S3GUARD_METADATASTORE_PUT_PATH_REQUEST, 1); |
| return status; |
| } |
| |
| /** |
| * Convert the data of a directory listing to an array of {@link FileStatus} |
| * entries. Tombstones are filtered out at this point. If the listing is null |
| * an empty array is returned. |
| * @param dirMeta directory listing -may be null |
| * @return a possibly-empty array of file status entries |
| */ |
| public static FileStatus[] dirMetaToStatuses(DirListingMetadata dirMeta) { |
| if (dirMeta == null) { |
| return EMPTY_LISTING; |
| } |
| |
| Collection<PathMetadata> listing = dirMeta.getListing(); |
| List<FileStatus> statuses = new ArrayList<>(); |
| |
| for (PathMetadata pm : listing) { |
| if (!pm.isDeleted()) { |
| statuses.add(pm.getFileStatus()); |
| } |
| } |
| |
| return statuses.toArray(new FileStatus[0]); |
| } |
| |
| /** |
| * Given directory listing metadata from both the backing store and the |
| * MetadataStore, merge the two sources of truth to create a consistent |
| * view of the current directory contents, which can be returned to clients. |
| * |
| * Also update the MetadataStore to reflect the resulting directory listing. |
| * |
| * @param ms MetadataStore to use. |
| * @param path path to directory |
| * @param backingStatuses Directory listing from the backing store. |
| * @param dirMeta Directory listing from MetadataStore. May be null. |
| * @param isAuthoritative State of authoritative mode |
| * @return Final result of directory listing. |
| * @throws IOException if metadata store update failed |
| */ |
| public static FileStatus[] dirListingUnion(MetadataStore ms, Path path, |
| List<FileStatus> backingStatuses, DirListingMetadata dirMeta, |
| boolean isAuthoritative) throws IOException { |
| |
| // Fast-path for NullMetadataStore |
| if (isNullMetadataStore(ms)) { |
| return backingStatuses.toArray(new FileStatus[backingStatuses.size()]); |
| } |
| |
| assertQualified(path); |
| |
| if (dirMeta == null) { |
| // The metadataStore had zero state for this directory |
| dirMeta = new DirListingMetadata(path, DirListingMetadata.EMPTY_DIR, |
| false); |
| } |
| |
| Set<Path> deleted = dirMeta.listTombstones(); |
| |
| // Since we treat the MetadataStore as a "fresher" or "consistent" view |
| // of metadata, we always use its metadata first. |
| |
| // Since the authoritative case is already handled outside this function, |
| // we will basically start with the set of directory entries in the |
| // DirListingMetadata, and add any that only exist in the backingStatuses. |
| |
| boolean changed = false; |
| for (FileStatus s : backingStatuses) { |
| if (deleted.contains(s.getPath())) { |
| continue; |
| } |
| |
| // Minor race condition here. Multiple threads could add to this |
| // mutable DirListingMetadata. Since it is backed by a |
| // ConcurrentHashMap, the last put() wins. |
| // More concerning is two threads racing on listStatus() and delete(). |
| // Any FileSystem has similar race conditions, but we could persist |
| // a stale entry longer. We could expose an atomic |
| // DirListingMetadata#putIfNotPresent() |
| boolean updated = dirMeta.put(s); |
| changed = changed || updated; |
| } |
| |
| if (changed && isAuthoritative) { |
| dirMeta.setAuthoritative(true); // This is the full directory contents |
| ms.put(dirMeta); |
| } |
| |
| return dirMetaToStatuses(dirMeta); |
| } |
| |
| /** |
| * Although NullMetadataStore does nothing, callers may wish to avoid work |
| * (fast path) when the NullMetadataStore is in use. |
| * @param ms The MetadataStore to test |
| * @return true iff the MetadataStore is the null, or no-op, implementation. |
| */ |
| public static boolean isNullMetadataStore(MetadataStore ms) { |
| return (ms instanceof NullMetadataStore); |
| } |
| |
| /** |
| * Update MetadataStore to reflect creation of the given directories. |
| * |
| * If an IOException is raised while trying to update the entry, this |
| * operation catches the exception and returns. |
| * @param ms MetadataStore to update. |
| * @param dirs null, or an ordered list of directories from leaf to root. |
| * E.g. if /a/ exists, and mkdirs(/a/b/c/d) is called, this |
| * list will contain [/a/b/c/d, /a/b/c, /a/b]. /a/b/c/d is |
| * an empty, dir, and the other dirs only contain their child |
| * dir. |
| * @param owner Hadoop user name. |
| * @param authoritative Whether to mark new directories as authoritative. |
| */ |
| public static void makeDirsOrdered(MetadataStore ms, List<Path> dirs, |
| String owner, boolean authoritative) { |
| if (dirs == null) { |
| return; |
| } |
| |
| /* We discussed atomicity of this implementation. |
| * The concern is that multiple clients could race to write different |
| * cached directories to the MetadataStore. Two solutions are proposed: |
| * 1. Move mkdirs() into MetadataStore interface and let implementations |
| * ensure they are atomic. |
| * 2. Specify that the semantics of MetadataStore#putListStatus() is |
| * always additive, That is, if MetadataStore has listStatus() state |
| * for /a/b that contains [/a/b/file0, /a/b/file1], and we then call |
| * putListStatus(/a/b -> [/a/b/file2, /a/b/file3], isAuthoritative=true), |
| * then we will end up with final state of |
| * [/a/b/file0, /a/b/file1, /a/b/file2, /a/b/file3], isAuthoritative = |
| * true |
| */ |
| FileStatus prevStatus = null; |
| |
| // Use new batched put to reduce round trips. |
| List<PathMetadata> pathMetas = new ArrayList<>(dirs.size()); |
| |
| try { |
| // Iterate from leaf to root |
| for (int i = 0; i < dirs.size(); i++) { |
| boolean isLeaf = (prevStatus == null); |
| Path f = dirs.get(i); |
| assertQualified(f); |
| FileStatus status = |
| createUploadFileStatus(f, true, 0, 0, owner); |
| |
| // We only need to put a DirListingMetadata if we are setting |
| // authoritative bit |
| DirListingMetadata dirMeta = null; |
| if (authoritative) { |
| Collection<PathMetadata> children; |
| if (isLeaf) { |
| children = DirListingMetadata.EMPTY_DIR; |
| } else { |
| children = new ArrayList<>(1); |
| children.add(new PathMetadata(prevStatus)); |
| } |
| dirMeta = new DirListingMetadata(f, children, authoritative); |
| ms.put(dirMeta); |
| } |
| |
| pathMetas.add(new PathMetadata(status)); |
| prevStatus = status; |
| } |
| |
| // Batched put |
| ms.put(pathMetas); |
| } catch (IOException ioe) { |
| LOG.error("MetadataStore#put() failure:", ioe); |
| } |
| } |
| |
| /** |
| * Helper function that records the move of directory paths, adding |
| * resulting metadata to the supplied lists. |
| * Does not store in MetadataStore. |
| * @param ms MetadataStore, used to make this a no-op, when it is |
| * NullMetadataStore. |
| * @param srcPaths stores the source path here |
| * @param dstMetas stores destination metadata here |
| * @param srcPath source path to store |
| * @param dstPath destination path to store |
| * @param owner file owner to use in created records |
| */ |
| public static void addMoveDir(MetadataStore ms, Collection<Path> srcPaths, |
| Collection<PathMetadata> dstMetas, Path srcPath, Path dstPath, |
| String owner) { |
| if (isNullMetadataStore(ms)) { |
| return; |
| } |
| assertQualified(srcPath, dstPath); |
| |
| FileStatus dstStatus = createUploadFileStatus(dstPath, true, 0, 0, owner); |
| addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus); |
| } |
| |
| /** |
| * Like {@link #addMoveDir(MetadataStore, Collection, Collection, Path, |
| * Path, String)} (), but for files. |
| * @param ms MetadataStore, used to make this a no-op, when it is |
| * NullMetadataStore. |
| * @param srcPaths stores the source path here |
| * @param dstMetas stores destination metadata here |
| * @param srcPath source path to store |
| * @param dstPath destination path to store |
| * @param size length of file moved |
| * @param blockSize blocksize to associate with destination file |
| * @param owner file owner to use in created records |
| */ |
| public static void addMoveFile(MetadataStore ms, Collection<Path> srcPaths, |
| Collection<PathMetadata> dstMetas, Path srcPath, Path dstPath, |
| long size, long blockSize, String owner) { |
| if (isNullMetadataStore(ms)) { |
| return; |
| } |
| assertQualified(srcPath, dstPath); |
| FileStatus dstStatus = createUploadFileStatus(dstPath, false, |
| size, blockSize, owner); |
| addMoveStatus(srcPaths, dstMetas, srcPath, dstStatus); |
| } |
| |
| /** |
| * Helper method that records the move of all ancestors of a path. |
| * |
| * In S3A, an optimization is to delete unnecessary fake directory objects if |
| * the directory is non-empty. In that case, for a nested child to move, S3A |
| * is not listing and thus moving all its ancestors (up to source root). So we |
| * take care of those inferred directories of this path explicitly. |
| * |
| * As {@link #addMoveFile} and {@link #addMoveDir}, this method adds resulting |
| * metadata to the supplied lists. It does not store in MetadataStore. |
| * |
| * @param ms MetadataStore, no-op if it is NullMetadataStore |
| * @param srcPaths stores the source path here |
| * @param dstMetas stores destination metadata here |
| * @param srcRoot source root up to which (exclusive) should we add ancestors |
| * @param srcPath source path of the child to add ancestors |
| * @param dstPath destination path of the child to add ancestors |
| * @param owner Hadoop user name |
| */ |
| public static void addMoveAncestors(MetadataStore ms, |
| Collection<Path> srcPaths, Collection<PathMetadata> dstMetas, |
| Path srcRoot, Path srcPath, Path dstPath, String owner) { |
| if (isNullMetadataStore(ms)) { |
| return; |
| } |
| |
| assertQualified(srcRoot, srcPath, dstPath); |
| |
| if (srcPath.equals(srcRoot)) { |
| LOG.debug("Skip moving ancestors of source root directory {}", srcRoot); |
| return; |
| } |
| |
| Path parentSrc = srcPath.getParent(); |
| Path parentDst = dstPath.getParent(); |
| while (parentSrc != null |
| && !parentSrc.isRoot() |
| && !parentSrc.equals(srcRoot) |
| && !srcPaths.contains(parentSrc)) { |
| LOG.debug("Renaming non-listed parent {} to {}", parentSrc, parentDst); |
| S3Guard.addMoveDir(ms, srcPaths, dstMetas, parentSrc, parentDst, owner); |
| parentSrc = parentSrc.getParent(); |
| parentDst = parentDst.getParent(); |
| } |
| } |
| |
| public static void addAncestors(MetadataStore metadataStore, |
| Path qualifiedPath, String username) throws IOException { |
| Collection<PathMetadata> newDirs = new ArrayList<>(); |
| Path parent = qualifiedPath.getParent(); |
| while (!parent.isRoot()) { |
| PathMetadata directory = metadataStore.get(parent); |
| if (directory == null || directory.isDeleted()) { |
| FileStatus status = new FileStatus(0, true, 1, 0, 0, 0, null, username, |
| null, parent); |
| PathMetadata meta = new PathMetadata(status, Tristate.FALSE, false); |
| newDirs.add(meta); |
| } else { |
| break; |
| } |
| parent = parent.getParent(); |
| } |
| metadataStore.put(newDirs); |
| } |
| |
| private static void addMoveStatus(Collection<Path> srcPaths, |
| Collection<PathMetadata> dstMetas, |
| Path srcPath, |
| FileStatus dstStatus) { |
| srcPaths.add(srcPath); |
| dstMetas.add(new PathMetadata(dstStatus)); |
| } |
| |
| /** |
| * Assert that the path is qualified with a host and scheme. |
| * @param p path to check |
| * @throws NullPointerException if either argument does not hold |
| */ |
| public static void assertQualified(Path p) { |
| URI uri = p.toUri(); |
| // Paths must include bucket in case MetadataStore is shared between |
| // multiple S3AFileSystem instances |
| Preconditions.checkNotNull(uri.getHost(), "Null host in " + uri); |
| |
| // This should never fail, but is retained for completeness. |
| Preconditions.checkNotNull(uri.getScheme(), "Null scheme in " + uri); |
| } |
| |
| /** |
| * Assert that all paths are valid. |
| * @param paths path to check |
| * @throws NullPointerException if either argument does not hold |
| */ |
| public static void assertQualified(Path...paths) { |
| for (Path path : paths) { |
| assertQualified(path); |
| } |
| } |
| } |