blob: 9267ab403b8c3a7819218ad6040a2a908d8a8867 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Tristate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
/**
* This is a local, in-memory, implementation of MetadataStore.
* This is <i>not</i> a coherent cache across processes. It is only
* locally-coherent.
*
* The purpose of this is for unit and integration testing.
* It could also be used to accelerate local-only operations where only one
* process is operating on a given object store, or multiple processes are
* accessing a read-only storage bucket.
*
* This MetadataStore does not enforce filesystem rules such as disallowing
* non-recursive removal of non-empty directories. It is assumed the caller
* already has to perform these sorts of checks.
*/
public class LocalMetadataStore implements MetadataStore {
public static final Logger LOG = LoggerFactory.getLogger(MetadataStore.class);
// TODO HADOOP-13649: use time instead of capacity for eviction.
public static final int DEFAULT_MAX_RECORDS = 128;
/**
* Maximum number of records.
*/
public static final String CONF_MAX_RECORDS =
"fs.metadatastore.local.max_records";
/** Contains directories and files. */
private LruHashMap<Path, PathMetadata> fileHash;
/** Contains directory listings. */
private LruHashMap<Path, DirListingMetadata> dirHash;
private FileSystem fs;
/* Null iff this FS does not have an associated URI host. */
private String uriHost;
@Override
public void initialize(FileSystem fileSystem) throws IOException {
Preconditions.checkNotNull(fileSystem);
fs = fileSystem;
URI fsURI = fs.getUri();
uriHost = fsURI.getHost();
if (uriHost != null && uriHost.equals("")) {
uriHost = null;
}
initialize(fs.getConf());
}
@Override
public void initialize(Configuration conf) throws IOException {
Preconditions.checkNotNull(conf);
int maxRecords = conf.getInt(CONF_MAX_RECORDS, DEFAULT_MAX_RECORDS);
if (maxRecords < 4) {
maxRecords = 4;
}
// Start w/ less than max capacity. Space / time trade off.
fileHash = new LruHashMap<>(maxRecords/2, maxRecords);
dirHash = new LruHashMap<>(maxRecords/4, maxRecords);
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"LocalMetadataStore{");
sb.append("uriHost='").append(uriHost).append('\'');
sb.append('}');
return sb.toString();
}
@Override
public void delete(Path p) throws IOException {
doDelete(p, false, true);
}
@Override
public void forgetMetadata(Path p) throws IOException {
doDelete(p, false, false);
}
@Override
public void deleteSubtree(Path path) throws IOException {
doDelete(path, true, true);
}
private synchronized void doDelete(Path p, boolean recursive, boolean
tombstone) {
Path path = standardize(p);
// Delete entry from file cache, then from cached parent directory, if any
deleteHashEntries(path, tombstone);
if (recursive) {
// Remove all entries that have this dir as path prefix.
deleteHashByAncestor(path, dirHash, tombstone);
deleteHashByAncestor(path, fileHash, tombstone);
}
}
@Override
public synchronized PathMetadata get(Path p) throws IOException {
return get(p, false);
}
@Override
public PathMetadata get(Path p, boolean wantEmptyDirectoryFlag)
throws IOException {
Path path = standardize(p);
synchronized (this) {
PathMetadata m = fileHash.mruGet(path);
if (wantEmptyDirectoryFlag && m != null &&
m.getFileStatus().isDirectory()) {
m.setIsEmptyDirectory(isEmptyDirectory(p));
}
if (LOG.isDebugEnabled()) {
LOG.debug("get({}) -> {}", path, m == null ? "null" : m.prettyPrint());
}
return m;
}
}
/**
* Determine if directory is empty.
* Call with lock held.
* @param p a Path, already filtered through standardize()
* @return TRUE / FALSE if known empty / not-empty, UNKNOWN otherwise.
*/
private Tristate isEmptyDirectory(Path p) {
DirListingMetadata dirMeta = dirHash.get(p);
return dirMeta.withoutTombstones().isEmpty();
}
@Override
public synchronized DirListingMetadata listChildren(Path p) throws
IOException {
Path path = standardize(p);
DirListingMetadata listing = dirHash.mruGet(path);
if (LOG.isDebugEnabled()) {
LOG.debug("listChildren({}) -> {}", path,
listing == null ? "null" : listing.prettyPrint());
}
// Make a copy so callers can mutate without affecting our state
return listing == null ? null : new DirListingMetadata(listing);
}
@Override
public void move(Collection<Path> pathsToDelete,
Collection<PathMetadata> pathsToCreate) throws IOException {
Preconditions.checkNotNull(pathsToDelete, "pathsToDelete is null");
Preconditions.checkNotNull(pathsToCreate, "pathsToCreate is null");
Preconditions.checkArgument(pathsToDelete.size() == pathsToCreate.size(),
"Must supply same number of paths to delete/create.");
// I feel dirty for using reentrant lock. :-|
synchronized (this) {
// 1. Delete pathsToDelete
for (Path meta : pathsToDelete) {
LOG.debug("move: deleting metadata {}", meta);
delete(meta);
}
// 2. Create new destination path metadata
for (PathMetadata meta : pathsToCreate) {
LOG.debug("move: adding metadata {}", meta);
put(meta);
}
// 3. We now know full contents of all dirs in destination subtree
for (PathMetadata meta : pathsToCreate) {
FileStatus status = meta.getFileStatus();
if (status == null || status.isDirectory()) {
continue;
}
DirListingMetadata dir = listChildren(status.getPath());
if (dir != null) { // could be evicted already
dir.setAuthoritative(true);
}
}
}
}
@Override
public void put(PathMetadata meta) throws IOException {
Preconditions.checkNotNull(meta);
FileStatus status = meta.getFileStatus();
Path path = standardize(status.getPath());
synchronized (this) {
/* Add entry for this file. */
if (LOG.isDebugEnabled()) {
LOG.debug("put {} -> {}", path, meta.prettyPrint());
}
fileHash.put(path, meta);
/* Directory case:
* We also make sure we have an entry in the dirHash, so subsequent
* listStatus(path) at least see the directory.
*
* If we had a boolean flag argument "isNew", we would know whether this
* is an existing directory the client discovered via getFileStatus(),
* or if it is a newly-created directory. In the latter case, we would
* be able to mark the directory as authoritative (fully-cached),
* saving round trips to underlying store for subsequent listStatus()
*/
if (status.isDirectory()) {
DirListingMetadata dir = dirHash.mruGet(path);
if (dir == null) {
dirHash.put(path, new DirListingMetadata(path, DirListingMetadata
.EMPTY_DIR, false));
}
}
/* Update cached parent dir. */
Path parentPath = path.getParent();
if (parentPath != null) {
DirListingMetadata parent = dirHash.mruGet(parentPath);
if (parent == null) {
/* Track this new file's listing in parent. Parent is not
* authoritative, since there may be other items in it we don't know
* about. */
parent = new DirListingMetadata(parentPath,
DirListingMetadata.EMPTY_DIR, false);
dirHash.put(parentPath, parent);
}
parent.put(status);
}
}
}
@Override
public synchronized void put(DirListingMetadata meta) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("put dirMeta {}", meta.prettyPrint());
}
dirHash.put(standardize(meta.getPath()), meta);
}
public synchronized void put(Collection<PathMetadata> metas) throws
IOException {
for (PathMetadata meta : metas) {
put(meta);
}
}
@Override
public void close() throws IOException {
}
@Override
public void destroy() throws IOException {
if (dirHash != null) {
dirHash.clear();
}
}
@Override
public synchronized void prune(long modTime) throws IOException {
Iterator<Map.Entry<Path, PathMetadata>> files =
fileHash.entrySet().iterator();
while (files.hasNext()) {
Map.Entry<Path, PathMetadata> entry = files.next();
if (expired(entry.getValue().getFileStatus(), modTime)) {
files.remove();
}
}
Iterator<Map.Entry<Path, DirListingMetadata>> dirs =
dirHash.entrySet().iterator();
while (dirs.hasNext()) {
Map.Entry<Path, DirListingMetadata> entry = dirs.next();
Path path = entry.getKey();
DirListingMetadata metadata = entry.getValue();
Collection<PathMetadata> oldChildren = metadata.getListing();
Collection<PathMetadata> newChildren = new LinkedList<>();
for (PathMetadata child : oldChildren) {
FileStatus status = child.getFileStatus();
if (!expired(status, modTime)) {
newChildren.add(child);
}
}
if (newChildren.size() != oldChildren.size()) {
dirHash.put(path, new DirListingMetadata(path, newChildren, false));
if (!path.isRoot()) {
DirListingMetadata parent = dirHash.get(path.getParent());
if (parent != null) {
parent.setAuthoritative(false);
}
}
}
}
}
private boolean expired(FileStatus status, long expiry) {
// Note: S3 doesn't track modification time on directories, so for
// consistency with the DynamoDB implementation we ignore that here
return status.getModificationTime() < expiry && !status.isDirectory();
}
@VisibleForTesting
static <T> void deleteHashByAncestor(Path ancestor, Map<Path, T> hash,
boolean tombstone) {
for (Iterator<Map.Entry<Path, T>> it = hash.entrySet().iterator();
it.hasNext();) {
Map.Entry<Path, T> entry = it.next();
Path f = entry.getKey();
T meta = entry.getValue();
if (isAncestorOf(ancestor, f)) {
if (tombstone) {
if (meta instanceof PathMetadata) {
entry.setValue((T) PathMetadata.tombstone(f));
} else if (meta instanceof DirListingMetadata) {
it.remove();
} else {
throw new IllegalStateException("Unknown type in hash");
}
} else {
it.remove();
}
}
}
}
/**
* @return true iff 'ancestor' is ancestor dir in path 'f'.
* All paths here are absolute. Dir does not count as its own ancestor.
*/
private static boolean isAncestorOf(Path ancestor, Path f) {
String aStr = ancestor.toString();
if (!ancestor.isRoot()) {
aStr += "/";
}
String fStr = f.toString();
return (fStr.startsWith(aStr));
}
/**
* Update fileHash and dirHash to reflect deletion of file 'f'. Call with
* lock held.
*/
private void deleteHashEntries(Path path, boolean tombstone) {
// Remove target file/dir
LOG.debug("delete file entry for {}", path);
if (tombstone) {
fileHash.put(path, PathMetadata.tombstone(path));
} else {
fileHash.remove(path);
}
// Update this and parent dir listing, if any
/* If this path is a dir, remove its listing */
LOG.debug("removing listing of {}", path);
dirHash.remove(path);
/* Remove this path from parent's dir listing */
Path parent = path.getParent();
if (parent != null) {
DirListingMetadata dir = dirHash.get(parent);
if (dir != null) {
LOG.debug("removing parent's entry for {} ", path);
if (tombstone) {
dir.markDeleted(path);
} else {
dir.remove(path);
}
}
}
}
/**
* Return a "standardized" version of a path so we always have a consistent
* hash value. Also asserts the path is absolute, and contains host
* component.
* @param p input Path
* @return standardized version of Path, suitable for hash key
*/
private Path standardize(Path p) {
Preconditions.checkArgument(p.isAbsolute(), "Path must be absolute");
URI uri = p.toUri();
if (uriHost != null) {
Preconditions.checkArgument(StringUtils.isNotEmpty(uri.getHost()));
}
return p;
}
@Override
public Map<String, String> getDiagnostics() throws IOException {
Map<String, String> map = new HashMap<>();
map.put("name", "local://metadata");
map.put("uriHost", uriHost);
map.put("description", "Local in-VM metadata store for testing");
return map;
}
@Override
public void updateParameters(Map<String, String> parameters)
throws IOException {
}
}