blob: a5d0482e75612262a11e0417bef8602a89a283ce [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.s3guard;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.TimeUnit;
import com.amazonaws.AmazonClientException;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
import com.amazonaws.services.dynamodbv2.document.BatchWriteItemOutcome;
import com.amazonaws.services.dynamodbv2.document.DynamoDB;
import com.amazonaws.services.dynamodbv2.document.Item;
import com.amazonaws.services.dynamodbv2.document.ItemCollection;
import com.amazonaws.services.dynamodbv2.document.PrimaryKey;
import com.amazonaws.services.dynamodbv2.document.PutItemOutcome;
import com.amazonaws.services.dynamodbv2.document.QueryOutcome;
import com.amazonaws.services.dynamodbv2.document.ScanOutcome;
import com.amazonaws.services.dynamodbv2.document.Table;
import com.amazonaws.services.dynamodbv2.document.TableWriteItems;
import com.amazonaws.services.dynamodbv2.document.spec.GetItemSpec;
import com.amazonaws.services.dynamodbv2.document.spec.QuerySpec;
import com.amazonaws.services.dynamodbv2.document.utils.ValueMap;
import com.amazonaws.services.dynamodbv2.model.CreateTableRequest;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput;
import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughputDescription;
import com.amazonaws.services.dynamodbv2.model.ResourceInUseException;
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
import com.amazonaws.services.dynamodbv2.model.TableDescription;
import com.amazonaws.services.dynamodbv2.model.WriteRequest;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3AInstrumentation;
import org.apache.hadoop.fs.s3a.Tristate;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.S3AUtils.translateException;
import static org.apache.hadoop.fs.s3a.s3guard.PathMetadataDynamoDBTranslation.*;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.*;
/**
* DynamoDBMetadataStore is a {@link MetadataStore} that persists
* file system metadata to DynamoDB.
*
* The current implementation uses a schema consisting of a single table. The
* name of the table can be configured by config key
* {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_NAME_KEY}.
* By default, it matches the name of the S3 bucket. Each item in the table
* represents a single directory or file. Its path is split into separate table
* attributes:
* <ul>
* <li> parent (absolute path of the parent, with bucket name inserted as
* first path component). </li>
* <li> child (path of that specific child, relative to parent). </li>
* <li> optional boolean attribute tracking whether the path is a directory.
* Absence or a false value indicates the path is a file. </li>
* <li> optional long attribute revealing modification time of file.
* This attribute is meaningful only to file items.</li>
* <li> optional long attribute revealing file length.
* This attribute is meaningful only to file items.</li>
* <li> optional long attribute revealing block size of the file.
* This attribute is meaningful only to file items.</li>
* </ul>
*
* The DynamoDB partition key is the parent, and the range key is the child.
*
* To allow multiple buckets to share the same DynamoDB table, the bucket
* name is treated as the root directory.
*
* For example, assume the consistent store contains metadata representing this
* file system structure:
*
* <pre>
* s3a://bucket/dir1
* |-- dir2
* | |-- file1
* | `-- file2
* `-- dir3
* |-- dir4
* | `-- file3
* |-- dir5
* | `-- file4
* `-- dir6
* </pre>
*
* This is persisted to a single DynamoDB table as:
*
* <pre>
* =========================================================================
* | parent | child | is_dir | mod_time | len | ... |
* =========================================================================
* | /bucket | dir1 | true | | | |
* | /bucket/dir1 | dir2 | true | | | |
* | /bucket/dir1 | dir3 | true | | | |
* | /bucket/dir1/dir2 | file1 | | 100 | 111 | |
* | /bucket/dir1/dir2 | file2 | | 200 | 222 | |
* | /bucket/dir1/dir3 | dir4 | true | | | |
* | /bucket/dir1/dir3 | dir5 | true | | | |
* | /bucket/dir1/dir3/dir4 | file3 | | 300 | 333 | |
* | /bucket/dir1/dir3/dir5 | file4 | | 400 | 444 | |
* | /bucket/dir1/dir3 | dir6 | true | | | |
* =========================================================================
* </pre>
*
* This choice of schema is efficient for read access patterns.
* {@link #get(Path)} can be served from a single item lookup.
* {@link #listChildren(Path)} can be served from a query against all rows
* matching the parent (the partition key) and the returned list is guaranteed
* to be sorted by child (the range key). Tracking whether or not a path is a
* directory helps prevent unnecessary queries during traversal of an entire
* sub-tree.
*
* Some mutating operations, notably {@link #deleteSubtree(Path)} and
* {@link #move(Collection, Collection)}, are less efficient with this schema.
* They require mutating multiple items in the DynamoDB table.
*
* By default, DynamoDB access is performed within the same AWS region as
* the S3 bucket that hosts the S3A instance. During initialization, it checks
* the location of the S3 bucket and creates a DynamoDB client connected to the
* same region. The region may also be set explicitly by setting the config
* parameter {@code fs.s3a.s3guard.ddb.region} to the corresponding region.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class DynamoDBMetadataStore implements MetadataStore {
public static final Logger LOG = LoggerFactory.getLogger(
DynamoDBMetadataStore.class);
/** parent/child name to use in the version marker. */
public static final String VERSION_MARKER = "../VERSION";
/** Current version number. */
public static final int VERSION = 100;
/** Error: version marker not found in table. */
public static final String E_NO_VERSION_MARKER
= "S3Guard table lacks version marker.";
/** Error: version mismatch. */
public static final String E_INCOMPATIBLE_VERSION
= "Database table is from an incompatible S3Guard version.";
/** Initial delay for retries when batched operations get throttled by
* DynamoDB. Value is {@value} msec. */
public static final long MIN_RETRY_SLEEP_MSEC = 100;
@VisibleForTesting
static final String DESCRIPTION
= "S3Guard metadata store in DynamoDB";
@VisibleForTesting
static final String READ_CAPACITY = "read-capacity";
@VisibleForTesting
static final String WRITE_CAPACITY = "write-capacity";
@VisibleForTesting
static final String STATUS = "status";
@VisibleForTesting
static final String TABLE = "table";
private static ValueMap deleteTrackingValueMap =
new ValueMap().withBoolean(":false", false);
private DynamoDB dynamoDB;
private String region;
private Table table;
private String tableName;
private Configuration conf;
private String username;
private RetryPolicy dataAccessRetryPolicy;
private S3AInstrumentation.S3GuardInstrumentation instrumentation;
/**
* A utility function to create DynamoDB instance.
* @param conf the file system configuration
* @param s3Region region of the associated S3 bucket (if any).
* @return DynamoDB instance.
* @throws IOException I/O error.
*/
private static DynamoDB createDynamoDB(Configuration conf, String s3Region)
throws IOException {
Preconditions.checkNotNull(conf);
final Class<? extends DynamoDBClientFactory> cls = conf.getClass(
S3GUARD_DDB_CLIENT_FACTORY_IMPL,
S3GUARD_DDB_CLIENT_FACTORY_IMPL_DEFAULT,
DynamoDBClientFactory.class);
LOG.debug("Creating DynamoDB client {} with S3 region {}", cls, s3Region);
final AmazonDynamoDB dynamoDBClient = ReflectionUtils.newInstance(cls, conf)
.createDynamoDBClient(s3Region);
return new DynamoDB(dynamoDBClient);
}
@Override
public void initialize(FileSystem fs) throws IOException {
Preconditions.checkArgument(fs instanceof S3AFileSystem,
"DynamoDBMetadataStore only supports S3A filesystem.");
final S3AFileSystem s3afs = (S3AFileSystem) fs;
instrumentation = s3afs.getInstrumentation().getS3GuardInstrumentation();
final String bucket = s3afs.getBucket();
String confRegion = s3afs.getConf().getTrimmed(S3GUARD_DDB_REGION_KEY);
if (!StringUtils.isEmpty(confRegion)) {
region = confRegion;
LOG.debug("Overriding S3 region with configured DynamoDB region: {}",
region);
} else {
region = s3afs.getBucketLocation();
LOG.debug("Inferring DynamoDB region from S3 bucket: {}", region);
}
username = s3afs.getUsername();
conf = s3afs.getConf();
dynamoDB = createDynamoDB(conf, region);
// use the bucket as the DynamoDB table name if not specified in config
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY, bucket);
setMaxRetries(conf);
initTable();
instrumentation.initialized();
}
/**
* Performs one-time initialization of the metadata store via configuration.
*
* This initialization depends on the configuration object to get AWS
* credentials, DynamoDBFactory implementation class, DynamoDB endpoints,
* DynamoDB table names etc. After initialization, this metadata store does
* not explicitly relate to any S3 bucket, which be nonexistent.
*
* This is used to operate the metadata store directly beyond the scope of the
* S3AFileSystem integration, e.g. command line tools.
* Generally, callers should use {@link #initialize(FileSystem)}
* with an initialized {@code S3AFileSystem} instance.
*
* Without a filesystem to act as a reference point, the configuration itself
* must declare the table name and region in the
* {@link Constants#S3GUARD_DDB_TABLE_NAME_KEY} and
* {@link Constants#S3GUARD_DDB_REGION_KEY} respectively.
*
* @see #initialize(FileSystem)
* @throws IOException if there is an error
* @throws IllegalArgumentException if the configuration is incomplete
*/
@Override
public void initialize(Configuration config) throws IOException {
conf = config;
// use the bucket as the DynamoDB table name if not specified in config
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
Preconditions.checkArgument(!StringUtils.isEmpty(tableName),
"No DynamoDB table name configured");
region = conf.getTrimmed(S3GUARD_DDB_REGION_KEY);
Preconditions.checkArgument(!StringUtils.isEmpty(region),
"No DynamoDB region configured");
dynamoDB = createDynamoDB(conf, region);
username = UserGroupInformation.getCurrentUser().getShortUserName();
setMaxRetries(conf);
initTable();
}
/**
* Set retry policy. This is driven by the value of
* {@link Constants#S3GUARD_DDB_MAX_RETRIES} with an exponential backoff
* between each attempt of {@link #MIN_RETRY_SLEEP_MSEC} milliseconds.
* @param config
*/
private void setMaxRetries(Configuration config) {
int maxRetries = config.getInt(S3GUARD_DDB_MAX_RETRIES,
S3GUARD_DDB_MAX_RETRIES_DEFAULT);
dataAccessRetryPolicy = RetryPolicies
.exponentialBackoffRetry(maxRetries, MIN_RETRY_SLEEP_MSEC,
TimeUnit.MILLISECONDS);
}
@Override
public void delete(Path path) throws IOException {
innerDelete(path, true);
}
@Override
public void forgetMetadata(Path path) throws IOException {
innerDelete(path, false);
}
/**
* Inner delete option, action based on the {@code tombstone} flag.
* No tombstone: delete the entry. Tombstone: create a tombstone entry.
* There is no check as to whether the entry exists in the table first.
* @param path path to delete
* @param tombstone flag to create a tombstone marker
* @throws IOException I/O error.
*/
private void innerDelete(Path path, boolean tombstone)
throws IOException {
path = checkPath(path);
LOG.debug("Deleting from table {} in region {}: {}",
tableName, region, path);
// deleting nonexistent item consumes 1 write capacity; skip it
if (path.isRoot()) {
LOG.debug("Skip deleting root directory as it does not exist in table");
return;
}
try {
if (tombstone) {
Item item = PathMetadataDynamoDBTranslation.pathMetadataToItem(
PathMetadata.tombstone(path));
table.putItem(item);
} else {
table.deleteItem(pathToKey(path));
}
} catch (AmazonClientException e) {
throw translateException("delete", path, e);
}
}
@Override
public void deleteSubtree(Path path) throws IOException {
path = checkPath(path);
LOG.debug("Deleting subtree from table {} in region {}: {}",
tableName, region, path);
final PathMetadata meta = get(path);
if (meta == null || meta.isDeleted()) {
LOG.debug("Subtree path {} does not exist; this will be a no-op", path);
return;
}
for (DescendantsIterator desc = new DescendantsIterator(this, meta);
desc.hasNext();) {
innerDelete(desc.next().getPath(), true);
}
}
private Item getConsistentItem(PrimaryKey key) {
final GetItemSpec spec = new GetItemSpec()
.withPrimaryKey(key)
.withConsistentRead(true); // strictly consistent read
return table.getItem(spec);
}
@Override
public PathMetadata get(Path path) throws IOException {
return get(path, false);
}
@Override
public PathMetadata get(Path path, boolean wantEmptyDirectoryFlag)
throws IOException {
path = checkPath(path);
LOG.debug("Get from table {} in region {}: {}", tableName, region, path);
try {
final PathMetadata meta;
if (path.isRoot()) {
// Root does not persist in the table
meta = new PathMetadata(makeDirStatus(username, path));
} else {
final Item item = getConsistentItem(pathToKey(path));
meta = itemToPathMetadata(item, username);
LOG.debug("Get from table {} in region {} returning for {}: {}",
tableName, region, path, meta);
}
if (wantEmptyDirectoryFlag && meta != null) {
final FileStatus status = meta.getFileStatus();
// for directory, we query its direct children to determine isEmpty bit
if (status.isDirectory()) {
final QuerySpec spec = new QuerySpec()
.withHashKey(pathToParentKeyAttribute(path))
.withConsistentRead(true)
.withFilterExpression(IS_DELETED + " = :false")
.withValueMap(deleteTrackingValueMap);
final ItemCollection<QueryOutcome> items = table.query(spec);
boolean hasChildren = items.iterator().hasNext();
// When this class has support for authoritative
// (fully-cached) directory listings, we may also be able to answer
// TRUE here. Until then, we don't know if we have full listing or
// not, thus the UNKNOWN here:
meta.setIsEmptyDirectory(
hasChildren ? Tristate.FALSE : Tristate.UNKNOWN);
}
}
return meta;
} catch (AmazonClientException e) {
throw translateException("get", path, e);
}
}
/**
* Make a FileStatus object for a directory at given path. The FileStatus
* only contains what S3A needs, and omits mod time since S3A uses its own
* implementation which returns current system time.
* @param owner username of owner
* @param path path to dir
* @return new FileStatus
*/
private FileStatus makeDirStatus(String owner, Path path) {
return new FileStatus(0, true, 1, 0, 0, 0, null,
owner, null, path);
}
@Override
public DirListingMetadata listChildren(Path path) throws IOException {
path = checkPath(path);
LOG.debug("Listing table {} in region {}: {}", tableName, region, path);
// find the children in the table
try {
final QuerySpec spec = new QuerySpec()
.withHashKey(pathToParentKeyAttribute(path))
.withConsistentRead(true); // strictly consistent read
final ItemCollection<QueryOutcome> items = table.query(spec);
final List<PathMetadata> metas = new ArrayList<>();
for (Item item : items) {
PathMetadata meta = itemToPathMetadata(item, username);
metas.add(meta);
}
LOG.trace("Listing table {} in region {} for {} returning {}",
tableName, region, path, metas);
return (metas.isEmpty() && get(path) == null)
? null
: new DirListingMetadata(path, metas, false);
} catch (AmazonClientException e) {
// failure, including the path not being present
throw translateException("listChildren", path, e);
}
}
// build the list of all parent entries.
Collection<PathMetadata> completeAncestry(
Collection<PathMetadata> pathsToCreate) {
// Key on path to allow fast lookup
Map<Path, PathMetadata> ancestry = new HashMap<>();
for (PathMetadata meta : pathsToCreate) {
Preconditions.checkArgument(meta != null);
Path path = meta.getFileStatus().getPath();
if (path.isRoot()) {
break;
}
ancestry.put(path, meta);
Path parent = path.getParent();
while (!parent.isRoot() && !ancestry.containsKey(parent)) {
LOG.debug("auto-create ancestor path {} for child path {}",
parent, path);
final FileStatus status = makeDirStatus(parent, username);
ancestry.put(parent, new PathMetadata(status, Tristate.FALSE, false));
parent = parent.getParent();
}
}
return ancestry.values();
}
@Override
public void move(Collection<Path> pathsToDelete,
Collection<PathMetadata> pathsToCreate) throws IOException {
if (pathsToDelete == null && pathsToCreate == null) {
return;
}
LOG.debug("Moving paths of table {} in region {}: {} paths to delete and {}"
+ " paths to create", tableName, region,
pathsToDelete == null ? 0 : pathsToDelete.size(),
pathsToCreate == null ? 0 : pathsToCreate.size());
LOG.trace("move: pathsToDelete = {}, pathsToCreate = {}", pathsToDelete,
pathsToCreate);
// In DynamoDBMetadataStore implementation, we assume that if a path
// exists, all its ancestors will also exist in the table.
// Following code is to maintain this invariant by putting all ancestor
// directories of the paths to create.
// ancestor paths that are not explicitly added to paths to create
Collection<PathMetadata> newItems = new ArrayList<>();
if (pathsToCreate != null) {
newItems.addAll(completeAncestry(pathsToCreate));
}
if (pathsToDelete != null) {
for (Path meta : pathsToDelete) {
newItems.add(PathMetadata.tombstone(meta));
}
}
try {
processBatchWriteRequest(null, pathMetadataToItem(newItems));
} catch (AmazonClientException e) {
throw translateException("move", (String) null, e);
}
}
/**
* Helper method to issue a batch write request to DynamoDB.
*
* Callers of this method should catch the {@link AmazonClientException} and
* translate it for better error report and easier debugging.
* @param keysToDelete primary keys to be deleted; can be null
* @param itemsToPut new items to be put; can be null
*/
private void processBatchWriteRequest(PrimaryKey[] keysToDelete,
Item[] itemsToPut) throws IOException {
final int totalToDelete = (keysToDelete == null ? 0 : keysToDelete.length);
final int totalToPut = (itemsToPut == null ? 0 : itemsToPut.length);
int count = 0;
while (count < totalToDelete + totalToPut) {
final TableWriteItems writeItems = new TableWriteItems(tableName);
int numToDelete = 0;
if (keysToDelete != null
&& count < totalToDelete) {
numToDelete = Math.min(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT,
totalToDelete - count);
writeItems.withPrimaryKeysToDelete(
Arrays.copyOfRange(keysToDelete, count, count + numToDelete));
count += numToDelete;
}
if (numToDelete < S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT
&& itemsToPut != null
&& count < totalToDelete + totalToPut) {
final int numToPut = Math.min(
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT - numToDelete,
totalToDelete + totalToPut - count);
final int index = count - totalToDelete;
writeItems.withItemsToPut(
Arrays.copyOfRange(itemsToPut, index, index + numToPut));
count += numToPut;
}
BatchWriteItemOutcome res = dynamoDB.batchWriteItem(writeItems);
// Check for unprocessed keys in case of exceeding provisioned throughput
Map<String, List<WriteRequest>> unprocessed = res.getUnprocessedItems();
int retryCount = 0;
while (unprocessed.size() > 0) {
retryBackoff(retryCount++);
res = dynamoDB.batchWriteItemUnprocessed(unprocessed);
unprocessed = res.getUnprocessedItems();
}
}
}
/**
* Put the current thread to sleep to implement exponential backoff
* depending on retryCount. If max retries are exceeded, throws an
* exception instead.
* @param retryCount number of retries so far
* @throws IOException when max retryCount is exceeded.
*/
private void retryBackoff(int retryCount) throws IOException {
try {
// Our RetryPolicy ignores everything but retryCount here.
RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
retryCount, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
throw new IOException(
String.format("Max retries exceeded (%d) for DynamoDB",
retryCount));
} else {
LOG.debug("Sleeping {} msec before next retry", action.delayMillis);
Thread.sleep(action.delayMillis);
}
} catch (Exception e) {
throw new IOException("Unexpected exception", e);
}
}
@Override
public void put(PathMetadata meta) throws IOException {
// For a deeply nested path, this method will automatically create the full
// ancestry and save respective item in DynamoDB table.
// So after put operation, we maintain the invariant that if a path exists,
// all its ancestors will also exist in the table.
// For performance purpose, we generate the full paths to put and use batch
// write item request to save the items.
LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
Collection<PathMetadata> wrapper = new ArrayList<>(1);
wrapper.add(meta);
put(wrapper);
}
@Override
public void put(Collection<PathMetadata> metas) throws IOException {
LOG.debug("Saving batch to table {} in region {}", tableName, region);
processBatchWriteRequest(null, pathMetadataToItem(completeAncestry(metas)));
}
/**
* Helper method to get full path of ancestors that are nonexistent in table.
*/
private Collection<PathMetadata> fullPathsToPut(PathMetadata meta)
throws IOException {
checkPathMetadata(meta);
final Collection<PathMetadata> metasToPut = new ArrayList<>();
// root path is not persisted
if (!meta.getFileStatus().getPath().isRoot()) {
metasToPut.add(meta);
}
// put all its ancestors if not present; as an optimization we return at its
// first existent ancestor
Path path = meta.getFileStatus().getPath().getParent();
while (path != null && !path.isRoot()) {
final Item item = getConsistentItem(pathToKey(path));
if (!itemExists(item)) {
final FileStatus status = makeDirStatus(path, username);
metasToPut.add(new PathMetadata(status, Tristate.FALSE, false));
path = path.getParent();
} else {
break;
}
}
return metasToPut;
}
private boolean itemExists(Item item) {
if (item == null) {
return false;
}
if (item.hasAttribute(IS_DELETED) &&
item.getBoolean(IS_DELETED)) {
return false;
}
return true;
}
/** Create a directory FileStatus using current system time as mod time. */
static FileStatus makeDirStatus(Path f, String owner) {
return new FileStatus(0, true, 1, 0, System.currentTimeMillis(), 0,
null, owner, owner, f);
}
@Override
public void put(DirListingMetadata meta) throws IOException {
LOG.debug("Saving to table {} in region {}: {}", tableName, region, meta);
// directory path
PathMetadata p = new PathMetadata(makeDirStatus(meta.getPath(), username),
meta.isEmpty(), false);
// First add any missing ancestors...
final Collection<PathMetadata> metasToPut = fullPathsToPut(p);
// next add all children of the directory
metasToPut.addAll(meta.getListing());
try {
processBatchWriteRequest(null, pathMetadataToItem(metasToPut));
} catch (AmazonClientException e) {
throw translateException("put", (String) null, e);
}
}
@Override
public synchronized void close() {
if (instrumentation != null) {
instrumentation.storeClosed();
}
if (dynamoDB != null) {
LOG.debug("Shutting down {}", this);
dynamoDB.shutdown();
dynamoDB = null;
}
}
@Override
public void destroy() throws IOException {
if (table == null) {
LOG.info("In destroy(): no table to delete");
return;
}
LOG.info("Deleting DynamoDB table {} in region {}", tableName, region);
Preconditions.checkNotNull(dynamoDB, "Not connected to DynamoDB");
try {
table.delete();
table.waitForDelete();
} catch (ResourceNotFoundException rnfe) {
LOG.info("ResourceNotFoundException while deleting DynamoDB table {} in "
+ "region {}. This may indicate that the table does not exist, "
+ "or has been deleted by another concurrent thread or process.",
tableName, region);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
LOG.warn("Interrupted while waiting for DynamoDB table {} being deleted",
tableName, ie);
throw new InterruptedIOException("Table " + tableName
+ " in region " + region + " has not been deleted");
} catch (AmazonClientException e) {
throw translateException("destroy", (String) null, e);
}
}
private ItemCollection<ScanOutcome> expiredFiles(long modTime) {
String filterExpression = "mod_time < :mod_time";
String projectionExpression = "parent,child";
ValueMap map = new ValueMap().withLong(":mod_time", modTime);
return table.scan(filterExpression, projectionExpression, null, map);
}
@Override
public void prune(long modTime) throws IOException {
int itemCount = 0;
try {
Collection<Path> deletionBatch =
new ArrayList<>(S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
int delay = conf.getInt(S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_KEY,
S3GUARD_DDB_BACKGROUND_SLEEP_MSEC_DEFAULT);
for (Item item : expiredFiles(modTime)) {
PathMetadata md = PathMetadataDynamoDBTranslation
.itemToPathMetadata(item, username);
Path path = md.getFileStatus().getPath();
deletionBatch.add(path);
itemCount++;
if (deletionBatch.size() == S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT) {
Thread.sleep(delay);
processBatchWriteRequest(pathToKey(deletionBatch), null);
deletionBatch.clear();
}
}
if (deletionBatch.size() > 0) {
Thread.sleep(delay);
processBatchWriteRequest(pathToKey(deletionBatch), null);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new InterruptedIOException("Pruning was interrupted");
}
LOG.info("Finished pruning {} items in batches of {}", itemCount,
S3GUARD_DDB_BATCH_WRITE_REQUEST_LIMIT);
}
@Override
public String toString() {
return getClass().getSimpleName() + '{'
+ "region=" + region
+ ", tableName=" + tableName
+ '}';
}
/**
* Create a table if it does not exist and wait for it to become active.
*
* If a table with the intended name already exists, then it uses that table.
* Otherwise, it will automatically create the table if the config
* {@link org.apache.hadoop.fs.s3a.Constants#S3GUARD_DDB_TABLE_CREATE_KEY} is
* enabled. The DynamoDB table creation API is asynchronous. This method wait
* for the table to become active after sending the creation request, so
* overall, this method is synchronous, and the table is guaranteed to exist
* after this method returns successfully.
*
* @throws IOException if table does not exist and auto-creation is disabled;
* or table is being deleted, or any other I/O exception occurred.
*/
@VisibleForTesting
void initTable() throws IOException {
table = dynamoDB.getTable(tableName);
try {
try {
LOG.debug("Binding to table {}", tableName);
TableDescription description = table.describe();
LOG.debug("Table state: {}", description);
final String status = description.getTableStatus();
switch (status) {
case "CREATING":
LOG.debug("Table {} in region {} is being created/updated. This may"
+ " indicate that the table is being operated by another "
+ "concurrent thread or process. Waiting for active...",
tableName, region);
waitForTableActive(table);
break;
case "DELETING":
throw new FileNotFoundException("DynamoDB table "
+ "'" + tableName + "' is being "
+ "deleted in region " + region);
case "UPDATING":
// table being updated; it can still be used.
LOG.debug("Table is being updated.");
break;
case "ACTIVE":
break;
default:
throw new IOException("Unknown DynamoDB table status " + status
+ ": tableName='" + tableName + "', region=" + region);
}
final Item versionMarker = getVersionMarkerItem();
verifyVersionCompatibility(tableName, versionMarker);
Long created = extractCreationTimeFromMarker(versionMarker);
LOG.debug("Using existing DynamoDB table {} in region {} created {}",
tableName, region, (created != null) ? new Date(created) : null);
} catch (ResourceNotFoundException rnfe) {
if (conf.getBoolean(S3GUARD_DDB_TABLE_CREATE_KEY, false)) {
final ProvisionedThroughput capacity = new ProvisionedThroughput(
conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
S3GUARD_DDB_TABLE_CAPACITY_READ_DEFAULT),
conf.getLong(S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
S3GUARD_DDB_TABLE_CAPACITY_WRITE_DEFAULT));
createTable(capacity);
} else {
throw (FileNotFoundException)new FileNotFoundException(
"DynamoDB table '" + tableName + "' does not "
+ "exist in region " + region + "; auto-creation is turned off")
.initCause(rnfe);
}
}
} catch (AmazonClientException e) {
throw translateException("initTable", (String) null, e);
}
}
/**
* Get the version mark item in the existing DynamoDB table.
*
* As the version marker item may be created by another concurrent thread or
* process, we retry a limited times before we fail to get it.
*/
private Item getVersionMarkerItem() throws IOException {
final PrimaryKey versionMarkerKey =
createVersionMarkerPrimaryKey(VERSION_MARKER);
int retryCount = 0;
Item versionMarker = table.getItem(versionMarkerKey);
while (versionMarker == null) {
try {
RetryPolicy.RetryAction action = dataAccessRetryPolicy.shouldRetry(null,
retryCount, 0, true);
if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
break;
} else {
LOG.debug("Sleeping {} ms before next retry", action.delayMillis);
Thread.sleep(action.delayMillis);
}
} catch (Exception e) {
throw new IOException("initTable: Unexpected exception", e);
}
retryCount++;
versionMarker = table.getItem(versionMarkerKey);
}
return versionMarker;
}
/**
* Verify that a table version is compatible with this S3Guard client.
* @param tableName name of the table (for error messages)
* @param versionMarker the version marker retrieved from the table
* @throws IOException on any incompatibility
*/
@VisibleForTesting
static void verifyVersionCompatibility(String tableName,
Item versionMarker) throws IOException {
if (versionMarker == null) {
LOG.warn("Table {} contains no version marker", tableName);
throw new IOException(E_NO_VERSION_MARKER
+ " Table: " + tableName);
} else {
final int version = extractVersionFromMarker(versionMarker);
if (VERSION != version) {
// version mismatch. Unless/until there is support for
// upgrading versions, treat this as an incompatible change
// and fail.
throw new IOException(E_INCOMPATIBLE_VERSION
+ " Table "+ tableName
+ " Expected version " + VERSION + " actual " + version);
}
}
}
/**
* Wait for table being active.
* @param t table to block on.
* @throws IOException IO problems
* @throws InterruptedIOException if the wait was interrupted
*/
private void waitForTableActive(Table t) throws IOException {
try {
t.waitForActive();
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for table {} in region {} active",
tableName, region, e);
Thread.currentThread().interrupt();
throw (IOException) new InterruptedIOException("DynamoDB table '"
+ tableName + "' is not active yet in region " + region).initCause(e);
}
}
/**
* Create a table, wait for it to become active, then add the version
* marker.
* @param capacity capacity to provision
* @throws IOException on any failure.
* @throws InterruptedIOException if the wait was interrupted
*/
private void createTable(ProvisionedThroughput capacity) throws IOException {
try {
LOG.info("Creating non-existent DynamoDB table {} in region {}",
tableName, region);
table = dynamoDB.createTable(new CreateTableRequest()
.withTableName(tableName)
.withKeySchema(keySchema())
.withAttributeDefinitions(attributeDefinitions())
.withProvisionedThroughput(capacity));
LOG.debug("Awaiting table becoming active");
} catch (ResourceInUseException e) {
LOG.warn("ResourceInUseException while creating DynamoDB table {} "
+ "in region {}. This may indicate that the table was "
+ "created by another concurrent thread or process.",
tableName, region);
}
waitForTableActive(table);
final Item marker = createVersionMarker(VERSION_MARKER, VERSION,
System.currentTimeMillis());
putItem(marker);
}
/**
* PUT a single item to the table.
* @param item item to put
* @return the outcome.
*/
PutItemOutcome putItem(Item item) {
LOG.debug("Putting item {}", item);
return table.putItem(item);
}
/**
* Provision the table with given read and write capacity units.
*/
void provisionTable(Long readCapacity, Long writeCapacity)
throws IOException {
final ProvisionedThroughput toProvision = new ProvisionedThroughput()
.withReadCapacityUnits(readCapacity)
.withWriteCapacityUnits(writeCapacity);
try {
final ProvisionedThroughputDescription p =
table.updateTable(toProvision).getProvisionedThroughput();
LOG.info("Provision table {} in region {}: readCapacityUnits={}, "
+ "writeCapacityUnits={}",
tableName, region, p.getReadCapacityUnits(),
p.getWriteCapacityUnits());
} catch (AmazonClientException e) {
throw translateException("provisionTable", (String) null, e);
}
}
Table getTable() {
return table;
}
String getRegion() {
return region;
}
@VisibleForTesting
DynamoDB getDynamoDB() {
return dynamoDB;
}
/**
* Validates a path object; it must be absolute, and contain a host
* (bucket) component.
*/
private Path checkPath(Path path) {
Preconditions.checkNotNull(path);
Preconditions.checkArgument(path.isAbsolute(), "Path %s is not absolute",
path);
URI uri = path.toUri();
Preconditions.checkNotNull(uri.getScheme(), "Path %s missing scheme", path);
Preconditions.checkArgument(uri.getScheme().equals(Constants.FS_S3A),
"Path %s scheme must be %s", path, Constants.FS_S3A);
Preconditions.checkArgument(!StringUtils.isEmpty(uri.getHost()), "Path %s" +
" is missing bucket.", path);
return path;
}
/**
* Validates a path meta-data object.
*/
private static void checkPathMetadata(PathMetadata meta) {
Preconditions.checkNotNull(meta);
Preconditions.checkNotNull(meta.getFileStatus());
Preconditions.checkNotNull(meta.getFileStatus().getPath());
}
@Override
public Map<String, String> getDiagnostics() throws IOException {
Map<String, String> map = new TreeMap<>();
if (table != null) {
TableDescription desc = getTableDescription(true);
map.put("name", desc.getTableName());
map.put(STATUS, desc.getTableStatus());
map.put("ARN", desc.getTableArn());
map.put("size", desc.getTableSizeBytes().toString());
map.put(TABLE, desc.toString());
ProvisionedThroughputDescription throughput
= desc.getProvisionedThroughput();
map.put(READ_CAPACITY, throughput.getReadCapacityUnits().toString());
map.put(WRITE_CAPACITY, throughput.getWriteCapacityUnits().toString());
map.put(TABLE, desc.toString());
} else {
map.put("name", "DynamoDB Metadata Store");
map.put(TABLE, "none");
map.put(STATUS, "undefined");
}
map.put("description", DESCRIPTION);
map.put("region", region);
if (dataAccessRetryPolicy != null) {
map.put("retryPolicy", dataAccessRetryPolicy.toString());
}
return map;
}
private TableDescription getTableDescription(boolean forceUpdate) {
TableDescription desc = table.getDescription();
if (desc == null || forceUpdate) {
desc = table.describe();
}
return desc;
}
@Override
public void updateParameters(Map<String, String> parameters)
throws IOException {
Preconditions.checkNotNull(table, "Not initialized");
TableDescription desc = getTableDescription(true);
ProvisionedThroughputDescription current
= desc.getProvisionedThroughput();
long currentRead = current.getReadCapacityUnits();
long newRead = getLongParam(parameters,
S3GUARD_DDB_TABLE_CAPACITY_READ_KEY,
currentRead);
long currentWrite = current.getWriteCapacityUnits();
long newWrite = getLongParam(parameters,
S3GUARD_DDB_TABLE_CAPACITY_WRITE_KEY,
currentWrite);
ProvisionedThroughput throughput = new ProvisionedThroughput()
.withReadCapacityUnits(newRead)
.withWriteCapacityUnits(newWrite);
if (newRead != currentRead || newWrite != currentWrite) {
LOG.info("Current table capacity is read: {}, write: {}",
currentRead, currentWrite);
LOG.info("Changing capacity of table to read: {}, write: {}",
newRead, newWrite);
table.updateTable(throughput);
} else {
LOG.info("Table capacity unchanged at read: {}, write: {}",
newRead, newWrite);
}
}
private long getLongParam(Map<String, String> parameters,
String key,
long defVal) {
String k = parameters.get(key);
if (k != null) {
return Long.parseLong(k);
} else {
return defVal;
}
}
}