blob: 47284195a3a5d809a3fa531403f981a13ca2245f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.net.URI;
import java.nio.file.AccessDeniedException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.EnumSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.Nullable;
import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.SdkBaseException;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.Headers;
import com.amazonaws.services.s3.model.CannedAccessControlList;
import com.amazonaws.services.s3.model.CopyObjectRequest;
import com.amazonaws.services.s3.model.DeleteObjectsRequest;
import com.amazonaws.services.s3.model.DeleteObjectsResult;
import com.amazonaws.services.s3.model.GetObjectMetadataRequest;
import com.amazonaws.services.s3.model.GetObjectRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest;
import com.amazonaws.services.s3.model.InitiateMultipartUploadResult;
import com.amazonaws.services.s3.model.ListMultipartUploadsRequest;
import com.amazonaws.services.s3.model.ListObjectsRequest;
import com.amazonaws.services.s3.model.ListObjectsV2Request;
import com.amazonaws.services.s3.model.MultiObjectDeleteException;
import com.amazonaws.services.s3.model.MultipartUpload;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.s3.model.StorageClass;
import com.amazonaws.services.s3.model.UploadPartRequest;
import com.amazonaws.services.s3.model.UploadPartResult;
import com.amazonaws.services.s3.transfer.Copy;
import com.amazonaws.services.s3.transfer.TransferManager;
import com.amazonaws.services.s3.transfer.TransferManagerConfiguration;
import com.amazonaws.services.s3.transfer.Upload;
import com.amazonaws.services.s3.transfer.model.CopyResult;
import com.amazonaws.services.s3.transfer.model.UploadResult;
import com.amazonaws.event.ProgressListener;
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonPathCapabilities;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSDataOutputStreamBuilder;
import org.apache.hadoop.fs.Globber;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.s3a.audit.AuditSpanS3A;
import org.apache.hadoop.fs.s3a.auth.SignerManager;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations;
import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider;
import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler;
import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
import org.apache.hadoop.fs.s3a.impl.ContextAccessors;
import org.apache.hadoop.fs.s3a.impl.CopyFromLocalOperation;
import org.apache.hadoop.fs.s3a.impl.CopyOutcome;
import org.apache.hadoop.fs.s3a.impl.CreateFileBuilder;
import org.apache.hadoop.fs.s3a.impl.DeleteOperation;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy;
import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl;
import org.apache.hadoop.fs.s3a.impl.GetContentSummaryOperation;
import org.apache.hadoop.fs.s3a.impl.HeaderProcessing;
import org.apache.hadoop.fs.s3a.impl.InternalConstants;
import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.MkdirOperation;
import org.apache.hadoop.fs.s3a.impl.OpenFileSupport;
import org.apache.hadoop.fs.s3a.impl.OperationCallbacks;
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
import org.apache.hadoop.fs.s3a.impl.RenameOperation;
import org.apache.hadoop.fs.s3a.impl.RequestFactoryImpl;
import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder;
import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum;
import org.apache.hadoop.fs.s3a.impl.StoreContext;
import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations;
import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl;
import org.apache.hadoop.fs.statistics.DurationTracker;
import org.apache.hadoop.fs.statistics.DurationTrackerFactory;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
import org.apache.hadoop.fs.store.audit.ActiveThreadSpanSource;
import org.apache.hadoop.fs.store.audit.AuditSpan;
import org.apache.hadoop.fs.store.audit.AuditSpanSource;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DurationInfo;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.s3a.api.RequestFactory;
import org.apache.hadoop.fs.s3a.audit.AuditManagerS3A;
import org.apache.hadoop.fs.s3a.audit.AuditIntegration;
import org.apache.hadoop.fs.s3a.audit.OperationAuditor;
import org.apache.hadoop.fs.s3a.auth.RoleModel;
import org.apache.hadoop.fs.s3a.auth.delegation.AWSPolicyProvider;
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
import org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens;
import org.apache.hadoop.fs.s3a.auth.delegation.AbstractS3ATokenIdentifier;
import org.apache.hadoop.fs.s3a.commit.CommitConstants;
import org.apache.hadoop.fs.s3a.commit.PutTracker;
import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration;
import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
import org.apache.hadoop.fs.s3a.select.SelectBinding;
import org.apache.hadoop.fs.s3a.select.SelectConstants;
import org.apache.hadoop.fs.s3a.s3guard.S3Guard;
import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.s3a.statistics.S3AStatisticsContext;
import org.apache.hadoop.fs.s3a.statistics.impl.BondedS3AStatisticsContext;
import org.apache.hadoop.fs.s3native.S3xLoginHelper;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.fs.store.EtagChecksum;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.BlockingThreadPoolExecutorService;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.SemaphoredDelegatingExecutor;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.util.functional.CallableRaisingIOE;
import static java.util.Objects.requireNonNull;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IOSTATISTICS_LOGGING_LEVEL_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.s3a.Constants.*;
import static org.apache.hadoop.fs.s3a.Invoker.*;
import static org.apache.hadoop.fs.s3a.Listing.toLocatedFileStatusIterator;
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
import static org.apache.hadoop.fs.s3a.Statistic.*;
import static org.apache.hadoop.fs.s3a.audit.S3AAuditConstants.INITIALIZE_SPAN;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_SSE_KMS_RW;
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_NO_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.CreateFileBuilder.OPTIONS_CREATE_FILE_OVERWRITE;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isObjectNotFound;
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_INACCESSIBLE;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.AP_REQUIRED_EXCEPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.ARN_BUCKET_OPTION;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_403;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
import static org.apache.hadoop.fs.s3a.impl.InternalConstants.UPLOAD_PART_COUNT_LIMIT;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion;
import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup;
import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.checkNoS3Guard;
import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.logIOStatisticsAtLevel;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_CONTINUE_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.OBJECT_LIST_REQUEST;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.pairedTrackerFactory;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
import static org.apache.hadoop.util.Preconditions.checkArgument;
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
/**
* The core S3A Filesystem implementation.
*
* This subclass is marked as private as code should not be creating it
* directly; use {@link FileSystem#get(Configuration)} and variants to
* create one.
*
* If cast to {@code S3AFileSystem}, extra methods and features may be accessed.
* Consider those private and unstable.
*
* Because it prints some of the state of the instrumentation,
* the output of {@link #toString()} must also be considered unstable.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class S3AFileSystem extends FileSystem implements StreamCapabilities,
AWSPolicyProvider, DelegationTokenProvider, IOStatisticsSource,
AuditSpanSource<AuditSpanS3A>, ActiveThreadSpanSource<AuditSpanS3A> {
/**
* Default blocksize as used in blocksize and FS status queries.
*/
public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024;
private URI uri;
private Path workingDir;
private String username;
private AmazonS3 s3;
// initial callback policy is fail-once; it's there just to assist
// some mock tests and other codepaths trying to call the low level
// APIs on an uninitialized filesystem.
private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL,
Invoker.LOG_EVENT);
private final Retried onRetry = this::operationRetried;
/**
* Represents bucket name for all S3 operations. If per bucket override for
* {@link InternalConstants#ARN_BUCKET_OPTION} property is set, then the bucket is updated to
* point to the configured Arn.
*/
private String bucket;
private int maxKeys;
private Listing listing;
private long partSize;
private boolean enableMultiObjectsDelete;
private TransferManager transfers;
private ExecutorService boundedThreadPool;
private ThreadPoolExecutor unboundedThreadPool;
// S3 reads are prefetched asynchronously using this future pool.
private ExecutorServiceFuturePool futurePool;
// If true, the prefetching input stream is used for reads.
private boolean prefetchEnabled;
// Size in bytes of a single prefetch block.
private int prefetchBlockSize;
// Size of prefetch queue (in number of blocks).
private int prefetchBlockCount;
private int executorCapacity;
private long multiPartThreshold;
public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class);
private static final Logger PROGRESS =
LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress");
private LocalDirAllocator directoryAllocator;
private CannedAccessControlList cannedACL;
/**
* This must never be null; until initialized it just declares that there
* is no encryption.
*/
private EncryptionSecrets encryptionSecrets = new EncryptionSecrets();
/** The core instrumentation. */
private S3AInstrumentation instrumentation;
/** Accessors to statistics for this FS. */
private S3AStatisticsContext statisticsContext;
/** Storage Statistics Bonded to the instrumentation. */
private S3AStorageStatistics storageStatistics;
/**
* Default input policy; may be overridden in
* {@code openFile()}.
*/
private S3AInputPolicy inputPolicy;
/** Vectored IO context. */
private VectoredIOContext vectoredIOContext;
private long readAhead;
private ChangeDetectionPolicy changeDetectionPolicy;
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile boolean isClosed = false;
private Collection<String> allowAuthoritativePaths;
/** Delegation token integration; non-empty when DT support is enabled. */
private Optional<S3ADelegationTokens> delegationTokens = Optional.empty();
/** Principal who created the FS; recorded during initialization. */
private UserGroupInformation owner;
private String blockOutputBuffer;
private S3ADataBlocks.BlockFactory blockFactory;
private int blockOutputActiveBlocks;
private boolean useListV1;
private MagicCommitIntegration committerIntegration;
private AWSCredentialProviderList credentials;
private SignerManager signerManager;
/**
* Page size for deletions.
*/
private int pageSize;
private final ListingOperationCallbacks listingOperationCallbacks =
new ListingOperationCallbacksImpl();
/**
* Helper for the openFile() method.
*/
private OpenFileSupport openFileHelper;
/**
* Directory policy.
*/
private DirectoryPolicy directoryPolicy;
/**
* Context accessors for re-use.
*/
private final ContextAccessors contextAccessors = new ContextAccessorsImpl();
/**
* Factory for AWS requests.
*/
private RequestFactory requestFactory;
/**
* Audit manager (service lifecycle).
* Creates the audit service and manages the binding of different audit spans
* to different threads.
* Initially this is a no-op manager; once the service is initialized it will
* be replaced with a configured one.
*/
private AuditManagerS3A auditManager =
AuditIntegration.stubAuditManager();
/**
* Is this S3A FS instance using S3 client side encryption?
*/
private boolean isCSEEnabled;
/**
* Bucket AccessPoint.
*/
private ArnResource accessPoint;
/** Add any deprecated keys. */
@SuppressWarnings("deprecation")
private static void addDeprecatedKeys() {
Configuration.DeprecationDelta[] deltas = {
new Configuration.DeprecationDelta(
FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS),
new Configuration.DeprecationDelta(
SERVER_SIDE_ENCRYPTION_ALGORITHM,
S3_ENCRYPTION_ALGORITHM),
new Configuration.DeprecationDelta(
SERVER_SIDE_ENCRYPTION_KEY,
S3_ENCRYPTION_KEY)
};
if (deltas.length > 0) {
Configuration.addDeprecations(deltas);
Configuration.reloadExistingConfigurations();
}
}
static {
addDeprecatedKeys();
}
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
* for this FileSystem
* @param originalConf the configuration to use for the FS. The
* bucket-specific options are patched over the base ones before any use is
* made of the config.
*/
public void initialize(URI name, Configuration originalConf)
throws IOException {
// get the host; this is guaranteed to be non-null, non-empty
bucket = name.getHost();
AuditSpan span = null;
try {
LOG.debug("Initializing S3AFileSystem for {}", bucket);
// clone the configuration into one with propagated bucket options
Configuration conf = propagateBucketOptions(originalConf, bucket);
// HADOOP-17894. remove references to s3a stores in JCEKS credentials.
conf = ProviderUtils.excludeIncompatibleCredentialProviders(
conf, S3AFileSystem.class);
String arn = String.format(ARN_BUCKET_OPTION, bucket);
String configuredArn = conf.getTrimmed(arn, "");
if (!configuredArn.isEmpty()) {
accessPoint = ArnResource.accessPointFromArn(configuredArn);
LOG.info("Using AccessPoint ARN \"{}\" for bucket {}", configuredArn, bucket);
bucket = accessPoint.getFullArn();
} else if (conf.getBoolean(AWS_S3_ACCESSPOINT_REQUIRED, false)) {
LOG.warn("Access Point usage is required because \"{}\" is enabled," +
" but not configured for the bucket: {}", AWS_S3_ACCESSPOINT_REQUIRED, bucket);
throw new PathIOException(bucket, AP_REQUIRED_EXCEPTION);
}
// fix up the classloader of the configuration to be whatever
// classloader loaded this filesystem.
// See: HADOOP-17372
conf.setClassLoader(this.getClass().getClassLoader());
// patch the Hadoop security providers
patchSecurityCredentialProviders(conf);
// look for delegation token support early.
boolean delegationTokensEnabled = hasDelegationTokenBinding(conf);
if (delegationTokensEnabled) {
LOG.debug("Using delegation tokens");
}
// set the URI, this will do any fixup of the URI to remove secrets,
// canonicalize.
setUri(name, delegationTokensEnabled);
super.initialize(uri, conf);
setConf(conf);
// look for encryption data
// DT Bindings may override this
setEncryptionSecrets(
buildEncryptionSecrets(bucket, conf));
invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry);
instrumentation = new S3AInstrumentation(uri);
initializeStatisticsBinding();
// If CSE-KMS method is set then CSE is enabled.
isCSEEnabled = S3AEncryptionMethods.CSE_KMS.getMethod()
.equals(getS3EncryptionAlgorithm().getMethod());
LOG.debug("Client Side Encryption enabled: {}", isCSEEnabled);
setCSEGauge();
// Username is the current user at the time the FS was instantiated.
owner = UserGroupInformation.getCurrentUser();
username = owner.getShortUserName();
workingDir = new Path("/user", username)
.makeQualified(this.uri, this.getWorkingDirectory());
maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1);
partSize = getMultipartSizeProperty(conf,
MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE);
multiPartThreshold = getMultipartSizeProperty(conf,
MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD);
//check but do not store the block size
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
this.prefetchBlockSize = intOption(
conf, PREFETCH_BLOCK_SIZE_KEY, PREFETCH_BLOCK_DEFAULT_SIZE, PREFETCH_BLOCK_DEFAULT_SIZE);
this.prefetchBlockCount =
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
initThreadPools(conf);
int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
if (listVersion < 1 || listVersion > 2) {
LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " +
"version 2", listVersion);
}
useListV1 = (listVersion == 1);
if (accessPoint != null && useListV1) {
LOG.warn("V1 list configured in fs.s3a.list.version. This is not supported in by" +
" access points. Upgrading to V2");
useListV1 = false;
}
signerManager = new SignerManager(bucket, this, conf, owner);
signerManager.initCustomSigners();
// start auditing
initializeAuditService();
// create the requestFactory.
// requires the audit manager to be initialized.
requestFactory = createRequestFactory();
// create an initial span for all other operations.
span = createSpan(INITIALIZE_SPAN, bucket, null);
// creates the AWS client, including overriding auth chain if
// the FS came with a DT
// this may do some patching of the configuration (e.g. setting
// the encryption algorithms)
bindAWSClient(name, delegationTokensEnabled);
initTransferManager();
// This initiates a probe against S3 for the bucket existing.
doBucketProbing();
inputPolicy = S3AInputPolicy.getPolicy(
conf.getTrimmed(INPUT_FADVISE,
Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_DEFAULT),
S3AInputPolicy.Normal);
LOG.debug("Input fadvise policy = {}", inputPolicy);
changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf);
LOG.debug("Change detection policy = {}", changeDetectionPolicy);
boolean magicCommitterEnabled = conf.getBoolean(
CommitConstants.MAGIC_COMMITTER_ENABLED,
CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED);
LOG.debug("Filesystem support for magic committers {} enabled",
magicCommitterEnabled ? "is" : "is not");
committerIntegration = new MagicCommitIntegration(
this, magicCommitterEnabled);
boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true);
if (!blockUploadEnabled) {
LOG.warn("The \"slow\" output stream is no longer supported");
}
blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER,
DEFAULT_FAST_UPLOAD_BUFFER);
partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize);
blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer);
blockOutputActiveBlocks = intOption(conf,
FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1);
// If CSE is enabled, do multipart uploads serially.
if (isCSEEnabled) {
blockOutputActiveBlocks = 1;
}
LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" +
" queue limit={}",
blockOutputBuffer, partSize, blockOutputActiveBlocks);
// verify there's no S3Guard in the store config.
checkNoS3Guard(this.getUri(), getConf());
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);
// directory policy, which may look at authoritative paths
directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf,
this::allowAuthoritative);
LOG.debug("Directory marker retention policy is {}", directoryPolicy);
initMultipartUploads(conf);
pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE,
BULK_DELETE_PAGE_SIZE_DEFAULT, 0);
checkArgument(pageSize <= InternalConstants.MAX_ENTRIES_TO_DELETE,
"page size out of range: %s", pageSize);
listing = new Listing(listingOperationCallbacks, createStoreContext());
// now the open file logic
openFileHelper = new OpenFileSupport(
changeDetectionPolicy,
longBytesOption(conf, READAHEAD_RANGE,
DEFAULT_READAHEAD_RANGE, 0),
username,
intOption(conf, IO_FILE_BUFFER_SIZE_KEY,
IO_FILE_BUFFER_SIZE_DEFAULT, 0),
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
inputPolicy);
vectoredIOContext = populateVectoredIOContext(conf);
} catch (AmazonClientException e) {
// amazon client exception: stop all services then throw the translation
cleanupWithLogger(LOG, span);
stopAllServices();
if (this.futurePool != null) {
this.futurePool = null;
}
throw translateException("initializing ", new Path(name), e);
} catch (IOException | RuntimeException e) {
// other exceptions: stop the services.
cleanupWithLogger(LOG, span);
stopAllServices();
if (this.futurePool != null) {
this.futurePool = null;
}
throw e;
}
}
/**
* Populates the configurations related to vectored IO operation
* in the context which has to passed down to input streams.
* @param conf configuration object.
* @return VectoredIOContext.
*/
private VectoredIOContext populateVectoredIOContext(Configuration conf) {
final int minSeekVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MIN_SEEK_SIZE,
DEFAULT_AWS_S3_VECTOR_READS_MIN_SEEK_SIZE, 0);
final int maxReadSizeVectored = (int) longBytesOption(conf, AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE,
DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE, 0);
return new VectoredIOContext()
.setMinSeekForVectoredReads(minSeekVectored)
.setMaxReadSizeForVectoredReads(maxReadSizeVectored)
.build();
}
/**
* Set the client side encryption gauge to 0 or 1, indicating if CSE is
* enabled through the gauge or not.
*/
private void setCSEGauge() {
IOStatisticsStore ioStatisticsStore =
(IOStatisticsStore) getIOStatistics();
if (isCSEEnabled) {
ioStatisticsStore
.setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 1L);
} else {
ioStatisticsStore
.setGauge(CLIENT_SIDE_ENCRYPTION_ENABLED.getSymbol(), 0L);
}
}
/**
* Test bucket existence in S3.
* When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0,
* bucket existence check is not done to improve performance of
* S3AFileSystem initialization. When set to 1 or 2, bucket existence check
* will be performed which is potentially slow.
* If 3 or higher: warn and use the v2 check.
* Also logging DNS address of the s3 endpoint if the bucket probe value is
* greater than 0 else skipping it for increased performance.
* @throws UnknownStoreException the bucket is absent
* @throws IOException any other problem talking to S3
*/
@Retries.RetryTranslated
private void doBucketProbing() throws IOException {
int bucketProbe = getConf()
.getInt(S3A_BUCKET_PROBE, S3A_BUCKET_PROBE_DEFAULT);
Preconditions.checkArgument(bucketProbe >= 0,
"Value of " + S3A_BUCKET_PROBE + " should be >= 0");
switch (bucketProbe) {
case 0:
LOG.debug("skipping check for bucket existence");
break;
case 1:
logDnsLookup(getConf());
verifyBucketExists();
break;
case 2:
logDnsLookup(getConf());
verifyBucketExistsV2();
break;
default:
// we have no idea what this is, assume it is from a later release.
LOG.warn("Unknown bucket probe option {}: {}; falling back to check #2",
S3A_BUCKET_PROBE, bucketProbe);
verifyBucketExistsV2();
break;
}
}
/**
* Initialize the statistics binding.
* This is done by creating an {@code IntegratedS3AStatisticsContext}
* with callbacks to get the FS's instrumentation and FileSystem.statistics
* field; the latter may change after {@link #initialize(URI, Configuration)},
* so needs to be dynamically adapted.
* Protected so that (mock) subclasses can replace it with a
* different statistics binding, if desired.
*/
protected void initializeStatisticsBinding() {
storageStatistics = createStorageStatistics(
requireNonNull(getIOStatistics()));
statisticsContext = new BondedS3AStatisticsContext(
new BondedS3AStatisticsContext.S3AFSStatisticsSource() {
@Override
public S3AInstrumentation getInstrumentation() {
return S3AFileSystem.this.getInstrumentation();
}
@Override
public Statistics getInstanceStatistics() {
return S3AFileSystem.this.statistics;
}
});
}
/**
* Initialize the thread pool.
* This must be re-invoked after replacing the S3Client during test
* runs.
* @param conf configuration.
*/
private void initThreadPools(Configuration conf) {
final String name = "s3a-transfer-" + getBucket();
int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS);
if (maxThreads < 2) {
LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2.");
maxThreads = 2;
}
int totalTasks = intOption(conf,
MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1);
long keepAliveTime = longOption(conf, KEEPALIVE_TIME,
DEFAULT_KEEPALIVE_TIME, 0);
int numPrefetchThreads = this.prefetchEnabled ? this.prefetchBlockCount : 0;
boundedThreadPool = BlockingThreadPoolExecutorService.newInstance(
maxThreads,
maxThreads + totalTasks + numPrefetchThreads,
keepAliveTime, TimeUnit.SECONDS,
name + "-bounded");
unboundedThreadPool = new ThreadPoolExecutor(
maxThreads, Integer.MAX_VALUE,
keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
BlockingThreadPoolExecutorService.newDaemonThreadFactory(
name + "-unbounded"));
unboundedThreadPool.allowCoreThreadTimeOut(true);
executorCapacity = intOption(conf,
EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1);
if (this.prefetchEnabled) {
this.futurePool = new ExecutorServiceFuturePool(boundedThreadPool);
}
}
/**
* Create the storage statistics or bind to an existing one.
* @param ioStatistics IOStatistics to build the storage statistics from.
* @return a storage statistics instance; expected to be that of the FS.
*/
protected static S3AStorageStatistics createStorageStatistics(
final IOStatistics ioStatistics) {
return (S3AStorageStatistics)
GlobalStorageStatistics.INSTANCE
.put(S3AStorageStatistics.NAME,
() -> new S3AStorageStatistics(ioStatistics));
}
/**
* Verify that the bucket exists. This does not check permissions,
* not even read access.
* Retry policy: retrying, translated.
* @throws UnknownStoreException the bucket is absent
* @throws IOException any other problem talking to S3
*/
@Retries.RetryTranslated
protected void verifyBucketExists()
throws UnknownStoreException, IOException {
if (!invoker.retry("doesBucketExist", bucket, true,
trackDurationOfOperation(getDurationTrackerFactory(),
STORE_EXISTS_PROBE.getSymbol(),
() -> s3.doesBucketExist(bucket)))) {
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does "
+ "not exist");
}
}
/**
* Verify that the bucket exists. This will correctly throw an exception
* when credentials are invalid.
* Retry policy: retrying, translated.
* @throws UnknownStoreException the bucket is absent
* @throws IOException any other problem talking to S3
*/
@Retries.RetryTranslated
protected void verifyBucketExistsV2()
throws UnknownStoreException, IOException {
if (!invoker.retry("doesBucketExistV2", bucket, true,
trackDurationOfOperation(getDurationTrackerFactory(),
STORE_EXISTS_PROBE.getSymbol(),
() -> {
// Bug in SDK always returns `true` for AccessPoint ARNs with `doesBucketExistV2()`
// expanding implementation to use ARNs and buckets correctly
try {
s3.getBucketAcl(bucket);
} catch (AmazonServiceException ex) {
int statusCode = ex.getStatusCode();
if (statusCode == SC_404 ||
(statusCode == SC_403 && ex.getMessage().contains(AP_INACCESSIBLE))) {
return false;
}
}
return true;
}))) {
throw new UnknownStoreException("s3a://" + bucket + "/", " Bucket does "
+ "not exist");
}
}
/**
* Get S3A Instrumentation. For test purposes.
* @return this instance's instrumentation.
*/
@VisibleForTesting
public S3AInstrumentation getInstrumentation() {
return instrumentation;
}
/**
* Get FS Statistic for this S3AFS instance.
*
* @return FS statistic instance.
*/
@VisibleForTesting
public FileSystem.Statistics getFsStatistics() {
return statistics;
}
/**
* Get current listing instance.
* @return this instance's listing.
*/
public Listing getListing() {
return listing;
}
/**
* Set up the client bindings.
* If delegation tokens are enabled, the FS first looks for a DT
* ahead of any other bindings;.
* If there is a DT it uses that to do the auth
* and switches to the DT authenticator automatically (and exclusively)
* @param name URI of the FS
* @param dtEnabled are delegation tokens enabled?
* @throws IOException failure.
*/
private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
Configuration conf = getConf();
credentials = null;
String uaSuffix = "";
if (dtEnabled) {
// Delegation support.
// Create and start the DT integration.
// Then look for an existing DT for this bucket, switch to authenticating
// with it if so.
LOG.debug("Using delegation tokens");
S3ADelegationTokens tokens = new S3ADelegationTokens();
this.delegationTokens = Optional.of(tokens);
tokens.bindToFileSystem(getCanonicalUri(),
createStoreContext(),
createDelegationOperations());
tokens.init(conf);
tokens.start();
// switch to the DT provider and bypass all other configured
// providers.
if (tokens.isBoundToDT()) {
// A DT was retrieved.
LOG.debug("Using existing delegation token");
// and use the encryption settings from that client, whatever they were
} else {
LOG.debug("No delegation token for this instance");
}
// Get new credential chain
credentials = tokens.getCredentialProviders();
// and any encryption secrets which came from a DT
tokens.getEncryptionSecrets()
.ifPresent(this::setEncryptionSecrets);
// and update the UA field with any diagnostics provided by
// the DT binding.
uaSuffix = tokens.getUserAgentField();
} else {
// DT support is disabled, so create the normal credential chain
credentials = createAWSCredentialProviderSet(name, conf);
}
LOG.debug("Using credential provider {}", credentials);
Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass(
S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL,
S3ClientFactory.class);
String endpoint = accessPoint == null
? conf.getTrimmed(ENDPOINT, DEFAULT_ENDPOINT)
: accessPoint.getEndpoint();
S3ClientFactory.S3ClientCreationParameters parameters = null;
parameters = new S3ClientFactory.S3ClientCreationParameters()
.withCredentialSet(credentials)
.withPathUri(name)
.withEndpoint(endpoint)
.withMetrics(statisticsContext.newStatisticsFromAwsSdk())
.withPathStyleAccess(conf.getBoolean(PATH_STYLE_ACCESS, false))
.withUserAgentSuffix(uaSuffix)
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
.withRequestHandlers(auditManager.createRequestHandlers());
s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf)
.createS3Client(getUri(),
parameters);
}
/**
* Initialize and launch the audit manager and service.
* As this takes the FS IOStatistics store, it must be invoked
* after instrumentation is initialized.
* @throws IOException failure to instantiate/initialize.
*/
protected void initializeAuditService() throws IOException {
auditManager = AuditIntegration.createAndStartAuditManager(
getConf(),
instrumentation.createMetricsUpdatingStore());
}
/**
* The audit manager.
* @return the audit manager
*/
@InterfaceAudience.Private
public AuditManagerS3A getAuditManager() {
return auditManager;
}
/**
* Get the auditor; valid once initialized.
* @return the auditor.
*/
@InterfaceAudience.Private
public OperationAuditor getAuditor() {
return getAuditManager().getAuditor();
}
/**
* Get the active audit span.
* @return the span.
*/
@InterfaceAudience.Private
@Override
public AuditSpanS3A getActiveAuditSpan() {
return getAuditManager().getActiveAuditSpan();
}
/**
* Get the audit span source; allows for components like the committers
* to have a source of spans without being hard coded to the FS only.
* @return the source of spans -base implementation is this instance.
*/
@InterfaceAudience.Private
public AuditSpanSource getAuditSpanSource() {
return this;
}
/**
* Start an operation; this informs the audit service of the event
* and then sets it as the active span.
* @param operation operation name.
* @param path1 first path of operation
* @param path2 second path of operation
* @return a span for the audit
* @throws IOException failure
*/
public AuditSpanS3A createSpan(String operation,
@Nullable String path1,
@Nullable String path2)
throws IOException {
return getAuditManager().createSpan(operation, path1, path2);
}
/**
* Build the request factory.
* MUST be called after reading encryption secrets from settings/
* delegation token.
* Protected, in case test/mock classes want to implement their
* own variants.
* @return request factory.
*/
protected RequestFactory createRequestFactory() {
long partCountLimit = longOption(getConf(),
UPLOAD_PART_COUNT_LIMIT,
DEFAULT_UPLOAD_PART_COUNT_LIMIT,
1);
if (partCountLimit != DEFAULT_UPLOAD_PART_COUNT_LIMIT) {
LOG.warn("Configuration property {} shouldn't be overridden by client",
UPLOAD_PART_COUNT_LIMIT);
}
// ACLs; this is passed to the
// request factory.
initCannedAcls(getConf());
// Any encoding type
String contentEncoding = getConf().getTrimmed(CONTENT_ENCODING, null);
String storageClassConf = getConf()
.getTrimmed(STORAGE_CLASS, "")
.toUpperCase(Locale.US);
StorageClass storageClass;
try {
storageClass = StorageClass.fromValue(storageClassConf);
} catch (IllegalArgumentException e) {
LOG.warn("Unknown storage class property {}: {}; falling back to default storage class",
STORAGE_CLASS, storageClassConf);
storageClass = null;
}
return RequestFactoryImpl.builder()
.withBucket(requireNonNull(bucket))
.withCannedACL(getCannedACL())
.withEncryptionSecrets(requireNonNull(encryptionSecrets))
.withMultipartPartCountLimit(partCountLimit)
.withRequestPreparer(getAuditManager()::requestCreated)
.withContentEncoding(contentEncoding)
.withStorageClass(storageClass)
.build();
}
/**
* Get the request factory which uses this store's audit span.
* @return the request factory.
*/
@VisibleForTesting
public RequestFactory getRequestFactory() {
return requestFactory;
}
/**
* Implementation of all operations used by delegation tokens.
*/
private class DelegationOperationsImpl implements DelegationOperations {
@Override
public List<RoleModel.Statement> listAWSPolicyRules(final Set<AccessLevel> access) {
return S3AFileSystem.this.listAWSPolicyRules(access);
}
}
/**
* Create an instance of the delegation operations.
* @return callbacks for DT support.
*/
@VisibleForTesting
public DelegationOperations createDelegationOperations() {
return new DelegationOperationsImpl();
}
/**
* Set the encryption secrets for requests.
* @param secrets secrets
*/
protected void setEncryptionSecrets(final EncryptionSecrets secrets) {
this.encryptionSecrets = secrets;
if (requestFactory != null) {
requestFactory.setEncryptionSecrets(secrets);
}
}
/**
* Get the encryption secrets.
* This potentially sensitive information and must be treated with care.
* @return the current encryption secrets.
*/
public EncryptionSecrets getEncryptionSecrets() {
return encryptionSecrets;
}
private void initTransferManager() {
TransferManagerConfiguration transferConfiguration =
new TransferManagerConfiguration();
transferConfiguration.setMinimumUploadPartSize(partSize);
transferConfiguration.setMultipartUploadThreshold(multiPartThreshold);
transferConfiguration.setMultipartCopyPartSize(partSize);
transferConfiguration.setMultipartCopyThreshold(multiPartThreshold);
transfers = new TransferManager(s3, unboundedThreadPool);
transfers.setConfiguration(transferConfiguration);
}
private void initCannedAcls(Configuration conf) {
String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL);
if (!cannedACLName.isEmpty()) {
cannedACL = CannedAccessControlList.valueOf(cannedACLName);
} else {
cannedACL = null;
}
}
@Retries.RetryTranslated
private void initMultipartUploads(Configuration conf) throws IOException {
boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART,
DEFAULT_PURGE_EXISTING_MULTIPART);
long purgeExistingMultipartAge = longOption(conf,
PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0);
if (purgeExistingMultipart) {
try {
abortOutstandingMultipartUploads(purgeExistingMultipartAge);
} catch (AccessDeniedException e) {
instrumentation.errorIgnored();
LOG.debug("Failed to purge multipart uploads against {}," +
" FS may be read only", bucket);
}
}
}
/**
* Abort all outstanding MPUs older than a given age.
* @param seconds time in seconds
* @throws IOException on any failure, other than 403 "permission denied"
*/
@Retries.RetryTranslated
public void abortOutstandingMultipartUploads(long seconds)
throws IOException {
Preconditions.checkArgument(seconds >= 0);
Date purgeBefore =
new Date(new Date().getTime() - seconds * 1000);
LOG.debug("Purging outstanding multipart uploads older than {}",
purgeBefore);
invoker.retry("Purging multipart uploads", bucket, true,
() -> transfers.abortMultipartUploads(bucket, purgeBefore));
}
/**
* Return the protocol scheme for the FileSystem.
*
* @return "s3a"
*/
@Override
public String getScheme() {
return "s3a";
}
/**
* Returns a URI whose scheme and authority identify this FileSystem.
*/
@Override
public URI getUri() {
return uri;
}
/**
* Set the URI field through {@link S3xLoginHelper} and
* optionally {@link #canonicalizeUri(URI)}
* Exported for testing.
* @param fsUri filesystem URI.
* @param canonicalize true if the URI should be canonicalized.
*/
@VisibleForTesting
protected void setUri(URI fsUri, boolean canonicalize) {
URI u = S3xLoginHelper.buildFSURI(fsUri);
this.uri = canonicalize ? u : canonicalizeUri(u);
}
/**
* Get the canonical URI.
* @return the canonical URI of this FS.
*/
public URI getCanonicalUri() {
return uri;
}
@VisibleForTesting
@Override
public int getDefaultPort() {
return 0;
}
/**
* Returns the S3 client used by this filesystem.
* This is for internal use within the S3A code itself.
* @return AmazonS3Client
*/
AmazonS3 getAmazonS3Client() {
return s3;
}
/**
* Returns the S3 client used by this filesystem.
* <i>Warning: this must only be used for testing, as it bypasses core
* S3A operations. </i>
* @param reason a justification for requesting access.
* @return AmazonS3Client
*/
@VisibleForTesting
public AmazonS3 getAmazonS3ClientForTesting(String reason) {
LOG.warn("Access to S3A client requested, reason {}", reason);
return s3;
}
/**
* Set the client -used in mocking tests to force in a different client.
* @param client client.
*/
protected void setAmazonS3Client(AmazonS3 client) {
Preconditions.checkNotNull(client, "client");
LOG.debug("Setting S3 client to {}", client);
s3 = client;
// Need to use a new TransferManager that uses the new client.
// Also, using a new TransferManager requires a new threadpool as the old
// TransferManager will shut the thread pool down when it is garbage
// collected.
initThreadPools(getConf());
initTransferManager();
}
/**
* Get the region of a bucket.
* Invoked from StoreContext; consider an entry point.
* @return the region in which a bucket is located
* @throws AccessDeniedException if the caller lacks permission.
* @throws IOException on any failure.
*/
@Retries.RetryTranslated
@InterfaceAudience.LimitedPrivate("diagnostics")
public String getBucketLocation() throws IOException {
return getBucketLocation(bucket);
}
/**
* Get the region of a bucket; fixing up the region so it can be used
* in the builders of other AWS clients.
* Requires the caller to have the AWS role permission
* {@code s3:GetBucketLocation}.
* Retry policy: retrying, translated.
* @param bucketName the name of the bucket
* @return the region in which a bucket is located
* @throws AccessDeniedException if the caller lacks permission.
* @throws IOException on any failure.
*/
@VisibleForTesting
@AuditEntryPoint
@Retries.RetryTranslated
public String getBucketLocation(String bucketName) throws IOException {
final String region = trackDurationAndSpan(
STORE_EXISTS_PROBE, bucketName, null, () ->
invoker.retry("getBucketLocation()", bucketName, true, () ->
// If accessPoint then region is known from Arn
accessPoint != null
? accessPoint.getRegion()
: s3.getBucketLocation(bucketName)));
return fixBucketRegion(region);
}
/**
* Get the input policy for this FS instance.
* @return the input policy
*/
@InterfaceStability.Unstable
public S3AInputPolicy getInputPolicy() {
return inputPolicy;
}
/**
* Get the change detection policy for this FS instance.
* Only public to allow access in tests in other packages.
* @return the change detection policy
*/
@VisibleForTesting
public ChangeDetectionPolicy getChangeDetectionPolicy() {
return changeDetectionPolicy;
}
/**
* Get the encryption algorithm of this endpoint.
* @return the encryption algorithm.
*/
public S3AEncryptionMethods getS3EncryptionAlgorithm() {
return encryptionSecrets.getEncryptionMethod();
}
/**
* Demand create the directory allocator, then create a temporary file.
* This does not mark the file for deletion when a process exits.
* {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}.
* @param pathStr prefix for the temporary file
* @param size the size of the file that is going to be written
* @param conf the Configuration object
* @return a unique temporary file
* @throws IOException IO problems
*/
File createTmpFileForWrite(String pathStr, long size,
Configuration conf) throws IOException {
if (directoryAllocator == null) {
synchronized (this) {
String bufferDir = conf.get(BUFFER_DIR) != null
? BUFFER_DIR : HADOOP_TMP_DIR;
directoryAllocator = new LocalDirAllocator(bufferDir);
}
}
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
size, conf);
File dir = new File(path.getParent().toUri().getPath());
String prefix = path.getName();
// create a temp file on this directory
return File.createTempFile(prefix, null, dir);
}
/**
* Get the bucket of this filesystem.
* @return the bucket
*/
public String getBucket() {
return bucket;
}
/**
* Set the bucket.
* @param bucket the bucket
*/
@VisibleForTesting
protected void setBucket(String bucket) {
this.bucket = bucket;
}
/**
* Get the canned ACL of this FS.
* @return an ACL, if any
*/
CannedAccessControlList getCannedACL() {
return cannedACL;
}
/**
* Change the input policy for this FS.
* This is now a no-op, retained in case some application
* or external test invokes it.
*
* @deprecated use openFile() options
* @param inputPolicy new policy
*/
@InterfaceStability.Unstable
@Deprecated
public void setInputPolicy(S3AInputPolicy inputPolicy) {
LOG.warn("setInputPolicy is no longer supported");
}
/**
* Turns a path (relative or otherwise) into an S3 key.
*
* @param path input path, may be relative to the working dir
* @return a key excluding the leading "/", or, if it is the root path, ""
*/
@VisibleForTesting
public String pathToKey(Path path) {
if (!path.isAbsolute()) {
path = new Path(workingDir, path);
}
if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) {
return "";
}
return path.toUri().getPath().substring(1);
}
/**
* Turns a path (relative or otherwise) into an S3 key, adding a trailing
* "/" if the path is not the root <i>and</i> does not already have a "/"
* at the end.
*
* @param key s3 key or ""
* @return the with a trailing "/", or, if it is the root key, "",
*/
@InterfaceAudience.Private
public String maybeAddTrailingSlash(String key) {
return S3AUtils.maybeAddTrailingSlash(key);
}
/**
* Convert a path back to a key.
* @param key input key
* @return the path from this key
*/
Path keyToPath(String key) {
return new Path("/" + key);
}
/**
* Convert a key to a fully qualified path.
* This includes fixing up the URI so that if it ends with a trailing slash,
* that is corrected, similar to {@code Path.normalizePath()}.
* @param key input key
* @return the fully qualified path including URI scheme and bucket name.
*/
public Path keyToQualifiedPath(String key) {
return qualify(keyToPath(key));
}
@Override
public Path makeQualified(final Path path) {
Path q = super.makeQualified(path);
if (!q.isRoot()) {
String urlString = q.toUri().toString();
if (urlString.endsWith(Path.SEPARATOR)) {
// this is a path which needs root stripping off to avoid
// confusion, See HADOOP-15430
LOG.debug("Stripping trailing '/' from {}", q);
// deal with an empty "/" at the end by mapping to the parent and
// creating a new path from it
q = new Path(urlString.substring(0, urlString.length() - 1));
}
}
if (!q.isRoot() && q.getName().isEmpty()) {
q = q.getParent();
}
return q;
}
/**
* Qualify a path.
* This includes fixing up the URI so that if it ends with a trailing slash,
* that is corrected, similar to {@code Path.normalizePath()}.
* @param path path to qualify
* @return a qualified path.
*/
public Path qualify(Path path) {
return makeQualified(path);
}
/**
* Check that a Path belongs to this FileSystem.
* Unlike the superclass, this version does not look at authority,
* only hostnames.
* @param path to check
* @throws IllegalArgumentException if there is an FS mismatch
*/
@Override
public void checkPath(Path path) {
S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort());
}
/**
* Override the base canonicalization logic and relay to
* {@link S3xLoginHelper#canonicalizeUri(URI, int)}.
* This allows for the option of changing this logic for better DT handling.
* @param rawUri raw URI.
* @return the canonical URI to use in delegation tokens and file context.
*/
@Override
protected URI canonicalizeUri(URI rawUri) {
return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort());
}
/**
* Opens an FSDataInputStream at the indicated Path.
* @param f the file name to open
* @param bufferSize the size of the buffer to be used.
*/
@Retries.RetryTranslated
public FSDataInputStream open(Path f, int bufferSize)
throws IOException {
return executeOpen(qualify(f),
openFileHelper.openSimpleFile(bufferSize));
}
/**
* Opens an FSDataInputStream at the indicated Path.
* The {@code fileInformation} parameter controls how the file
* is opened, whether it is normal vs. an S3 select call,
* can a HEAD be skipped, etc.
* @param path the file to open
* @param fileInformation information about the file to open
* @throws IOException IO failure.
*/
@AuditEntryPoint
@Retries.RetryTranslated
private FSDataInputStream executeOpen(
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
// create the input stream statistics before opening
// the file so that the time to prepare to open the file is included.
S3AInputStreamStatistics inputStreamStats =
statisticsContext.newInputStreamStatistics();
// this span is passed into the stream.
final AuditSpan auditSpan = entryPoint(INVOCATION_OPEN, path);
final S3AFileStatus fileStatus =
trackDuration(inputStreamStats,
ACTION_FILE_OPENED.getSymbol(), () ->
extractOrFetchSimpleFileStatus(path, fileInformation));
S3AReadOpContext readContext = createReadContext(
fileStatus,
auditSpan);
fileInformation.applyOptions(readContext);
LOG.debug("Opening '{}'", readContext);
if (this.prefetchEnabled) {
return new FSDataInputStream(
new S3APrefetchingInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats));
} else {
return new FSDataInputStream(
new S3AInputStream(
readContext.build(),
createObjectAttributes(path, fileStatus),
createInputStreamCallbacks(auditSpan),
inputStreamStats,
unboundedThreadPool));
}
}
/**
* Override point: create the callbacks for S3AInputStream.
* @return an implementation of the InputStreamCallbacks,
*/
private S3AInputStream.InputStreamCallbacks createInputStreamCallbacks(
final AuditSpan auditSpan) {
return new InputStreamCallbacksImpl(auditSpan);
}
/**
* Operations needed by S3AInputStream to read data.
*/
private final class InputStreamCallbacksImpl implements
S3AInputStream.InputStreamCallbacks {
/**
* Audit span to activate before each call.
*/
private final AuditSpan auditSpan;
/**
* Create.
* @param auditSpan Audit span to activate before each call.
*/
private InputStreamCallbacksImpl(final AuditSpan auditSpan) {
this.auditSpan = requireNonNull(auditSpan);
}
/**
* Closes the audit span.
*/
@Override
public void close() {
auditSpan.close();
}
@Override
public GetObjectRequest newGetRequest(final String key) {
// active the audit span used for the operation
try (AuditSpan span = auditSpan.activate()) {
return getRequestFactory().newGetObjectRequest(key);
}
}
@Override
public S3Object getObject(GetObjectRequest request) {
// active the audit span used for the operation
try (AuditSpan span = auditSpan.activate()) {
return s3.getObject(request);
}
}
@Override
public <T> CompletableFuture<T> submit(final CallableRaisingIOE<T> operation) {
CompletableFuture<T> result = new CompletableFuture<>();
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result, () -> {
try (AuditSpan span = auditSpan.activate()) {
return operation.apply();
}
}));
return result;
}
}
/**
* Create the read context for reading from the referenced file,
* using FS state as well as the status.
* @param fileStatus file status.
* @param auditSpan audit span.
* @return a context for read and select operations.
*/
@VisibleForTesting
protected S3AReadOpContext createReadContext(
final FileStatus fileStatus,
final AuditSpan auditSpan) {
final S3AReadOpContext roc = new S3AReadOpContext(
fileStatus.getPath(),
invoker,
statistics,
statisticsContext,
fileStatus,
vectoredIOContext,
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator(),
futurePool,
prefetchBlockSize,
prefetchBlockCount)
.withAuditSpan(auditSpan);
openFileHelper.applyDefaultOptions(roc);
return roc.build();
}
/**
* Create the attributes of an object for subsequent use.
* @param f path path of the request.
* @param eTag the eTag of the S3 object
* @param versionId S3 object version ID
* @param len length of the file
* @return attributes to use when building the query.
*/
private S3ObjectAttributes createObjectAttributes(
final Path f,
final String eTag,
final String versionId,
final long len) {
return new S3ObjectAttributes(bucket,
f,
pathToKey(f),
getS3EncryptionAlgorithm(),
encryptionSecrets.getEncryptionKey(),
eTag,
versionId,
len);
}
/**
* Create the attributes of an object for subsequent use.
* @param path path -this is used over the file status path.
* @param fileStatus file status to build from.
* @return attributes to use when building the query.
*/
private S3ObjectAttributes createObjectAttributes(
final Path path,
final S3AFileStatus fileStatus) {
return createObjectAttributes(
path,
fileStatus.getEtag(),
fileStatus.getVersionId(),
fileStatus.getLen());
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting.
* Retry policy: retrying, translated on the getFileStatus() probe.
* No data is uploaded to S3 in this call, so retry issues related to that.
* @param f the file name to open
* @param permission the permission to set.
* @param overwrite if a file with this name already exists, then if true,
* the file will be overwritten, and if false an error will be thrown.
* @param bufferSize the size of the buffer to be used.
* @param replication required block replication for the file.
* @param blockSize the requested block size.
* @param progress the progress reporter.
* @throws IOException in the event of IO related errors.
* @see #setPermission(Path, FsPermission)
*/
@Override
@AuditEntryPoint
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
final Path path = qualify(f);
// the span will be picked up inside the output stream
return trackDurationAndSpan(INVOCATION_CREATE, path, () ->
innerCreateFile(path,
progress,
getActiveAuditSpan(),
overwrite
? OPTIONS_CREATE_FILE_OVERWRITE
: OPTIONS_CREATE_FILE_NO_OVERWRITE));
}
/**
* Create an FSDataOutputStream at the indicated Path with write-progress
* reporting; in the active span.
* Retry policy: retrying, translated on the getFileStatus() probe.
* No data is uploaded to S3 in this call, so no retry issues related to that.
* The "performance" flag disables safety checks for the path being a file,
* parent directory existing, and doesn't attempt to delete
* dir markers, irrespective of FS settings.
* If true, this method call does no IO at all.
* @param path the file name to open
* @param progress the progress reporter.
* @param auditSpan audit span
* @param options options for the file
* @throws IOException in the event of IO related errors.
*/
@SuppressWarnings("IOResourceOpenedButNotSafelyClosed")
@Retries.RetryTranslated
private FSDataOutputStream innerCreateFile(
final Path path,
final Progressable progress,
final AuditSpan auditSpan,
final CreateFileBuilder.CreateFileOptions options) throws IOException {
auditSpan.activate();
String key = pathToKey(path);
EnumSet<CreateFlag> flags = options.getFlags();
boolean overwrite = flags.contains(CreateFlag.OVERWRITE);
boolean performance = options.isPerformance();
boolean skipProbes = performance || isUnderMagicCommitPath(path);
if (skipProbes) {
LOG.debug("Skipping existence/overwrite checks");
} else {
try {
// get the status or throw an FNFE.
// when overwriting, there is no need to look for any existing file,
// just a directory (for safety)
FileStatus status = innerGetFileStatus(path, false,
overwrite
? StatusProbeEnum.DIRECTORIES
: StatusProbeEnum.ALL);
// if the thread reaches here, there is something at the path
if (status.isDirectory()) {
// path references a directory: automatic error
throw new FileAlreadyExistsException(path + " is a directory");
}
if (!overwrite) {
// path references a file and overwrite is disabled
throw new FileAlreadyExistsException(path + " already exists");
}
LOG.debug("Overwriting file {}", path);
} catch (FileNotFoundException e) {
// this means there is nothing at the path; all good.
}
}
instrumentation.fileCreated();
final BlockOutputStreamStatistics outputStreamStatistics
= statisticsContext.newOutputStreamStatistics();
PutTracker putTracker =
committerIntegration.createTracker(path, key, outputStreamStatistics);
String destKey = putTracker.getDestKey();
// put options are derived from the path and the
// option builder.
boolean keep = performance || keepDirectoryMarkers(path);
final PutObjectOptions putOptions =
new PutObjectOptions(keep, null, options.getHeaders());
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
S3ABlockOutputStream.builder()
.withKey(destKey)
.withBlockFactory(blockFactory)
.withBlockSize(partSize)
.withStatistics(outputStreamStatistics)
.withProgress(progress)
.withPutTracker(putTracker)
.withWriteOperations(
createWriteOperationHelper(auditSpan))
.withExecutorService(
new SemaphoredDelegatingExecutor(
boundedThreadPool,
blockOutputActiveBlocks,
true,
outputStreamStatistics))
.withDowngradeSyncableExceptions(
getConf().getBoolean(
DOWNGRADE_SYNCABLE_EXCEPTIONS,
DOWNGRADE_SYNCABLE_EXCEPTIONS_DEFAULT))
.withCSEEnabled(isCSEEnabled)
.withPutOptions(putOptions)
.withIOStatisticsAggregator(
IOStatisticsContext.getCurrentIOStatisticsContext().getAggregator());
return new FSDataOutputStream(
new S3ABlockOutputStream(builder),
null);
}
/**
* Create a Write Operation Helper with the current active span.
* All operations made through this helper will activate the
* span before execution.
*
* This class permits other low-level operations against the store.
* It is unstable and
* only intended for code with intimate knowledge of the object store.
* If using this, be prepared for changes even on minor point releases.
* @return a new helper.
*/
@InterfaceAudience.Private
public WriteOperationHelper getWriteOperationHelper() {
return createWriteOperationHelper(getActiveAuditSpan());
}
/**
* Create a Write Operation Helper with the given span.
* All operations made through this helper will activate the
* span before execution.
* @param auditSpan audit span
* @return a new helper.
*/
@InterfaceAudience.Private
public WriteOperationHelper createWriteOperationHelper(AuditSpan auditSpan) {
return new WriteOperationHelper(this,
getConf(),
statisticsContext,
getAuditSpanSource(),
auditSpan);
}
/**
* Create instance of an FSDataOutputStreamBuilder for
* creating a file at the given path.
* @param path path to create
* @return a builder.
* @throws UncheckedIOException for problems creating the audit span
*/
@Override
@AuditEntryPoint
public FSDataOutputStreamBuilder createFile(final Path path) {
try {
final Path qualified = qualify(path);
final AuditSpan span = entryPoint(INVOCATION_CREATE_FILE,
pathToKey(qualified),
null);
return new CreateFileBuilder(this,
qualified,
new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_FILE, span))
.create()
.overwrite(true);
} catch (IOException e) {
// catch any IOEs raised in span creation and convert to
// an UncheckedIOException
throw new UncheckedIOException(e);
}
}
/**
* Callback for create file operations.
*/
private final class CreateFileBuilderCallbacksImpl implements
CreateFileBuilder.CreateFileBuilderCallbacks {
private final Statistic statistic;
/** span for operations. */
private final AuditSpan span;
private CreateFileBuilderCallbacksImpl(
final Statistic statistic,
final AuditSpan span) {
this.statistic = statistic;
this.span = span;
}
@Override
public FSDataOutputStream createFileFromBuilder(
final Path path,
final Progressable progress,
final CreateFileBuilder.CreateFileOptions options) throws IOException {
// the span will be picked up inside the output stream
return trackDuration(getDurationTrackerFactory(), statistic.getSymbol(), () ->
innerCreateFile(path, progress, span, options));
}
}
/**
* {@inheritDoc}
* The S3A implementations downgrades to the recursive creation, to avoid
* any race conditions with parent entries "disappearing".
*/
@Override
@AuditEntryPoint
public FSDataOutputStream createNonRecursive(Path p,
FsPermission permission,
EnumSet<CreateFlag> flags,
int bufferSize,
short replication,
long blockSize,
Progressable progress) throws IOException {
final Path path = makeQualified(p);
// span is created and passed in to the callbacks.
final AuditSpan span = entryPoint(INVOCATION_CREATE_NON_RECURSIVE,
pathToKey(path),
null);
// uses the CreateFileBuilder, filling it in with the relevant arguments.
final CreateFileBuilder builder = new CreateFileBuilder(this,
path,
new CreateFileBuilderCallbacksImpl(INVOCATION_CREATE_NON_RECURSIVE, span))
.create()
.withFlags(flags)
.blockSize(blockSize)
.bufferSize(bufferSize);
if (progress != null) {
builder.progress(progress);
}
return builder.build();
}
/**
* Append to an existing file (optional operation).
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used.
* @param progress for reporting progress if it is not null.
* @throws IOException indicating that append is not supported.
*/
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
throw new UnsupportedOperationException("Append is not supported "
+ "by S3AFileSystem");
}
/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.
*
* Warning: S3 does not support renames. This method does a copy which can
* take S3 some time to execute with large files and directories. Since
* there is no Progressable passed in, this can time out jobs.
*
* Note: This implementation differs with other S3 drivers. Specifically:
* <pre>
* Fails if src is a file and dst is a directory.
* Fails if src is a directory and dst is a file.
* Fails if the parent of dst does not exist or is a file.
* Fails if dst is a directory that is not empty.
* </pre>
*
* @param src path to be renamed
* @param dst new path after rename
* @throws IOException on IO failure
* @return true if rename is successful
*/
@AuditEntryPoint
@Retries.RetryTranslated
public boolean rename(Path src, Path dst) throws IOException {
try {
long bytesCopied = trackDurationAndSpan(
INVOCATION_RENAME, src.toString(), dst.toString(), () ->
innerRename(src, dst));
LOG.debug("Copied {} bytes", bytesCopied);
return true;
} catch (AmazonClientException e) {
throw translateException("rename(" + src +", " + dst + ")", src, e);
} catch (RenameFailedException e) {
LOG.info("{}", e.getMessage());
LOG.debug("rename failure", e);
return e.getExitCode();
}
}
/**
* Validate the rename parameters and status of the filesystem;
* returns the source and any destination File Status.
* @param src qualified path to be renamed
* @param dst qualified path after rename
* @return the source and (possibly null) destination status entries.
* @throws RenameFailedException if some criteria for a state changing
* rename was not met. This means work didn't happen; it's not something
* which is reported upstream to the FileSystem APIs, for which the semantics
* of "false" are pretty vague.
* @throws FileNotFoundException there's no source file.
* @throws IOException on IO failure.
*/
@Retries.RetryTranslated
private Pair<S3AFileStatus, S3AFileStatus> initiateRename(
final Path src,
final Path dst) throws IOException {
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
if (srcKey.isEmpty()) {
throw new RenameFailedException(src, dst, "source is root directory");
}
if (dstKey.isEmpty()) {
throw new RenameFailedException(src, dst, "dest is root directory");
}
// get the source file status; this raises a FNFE if there is no source
// file.
S3AFileStatus srcStatus = innerGetFileStatus(src, true,
StatusProbeEnum.ALL);
if (srcKey.equals(dstKey)) {
LOG.debug("rename: src and dest refer to the same file or directory: {}",
dst);
throw new RenameFailedException(src, dst,
"source and dest refer to the same file or directory")
.withExitCode(srcStatus.isFile());
}
S3AFileStatus dstStatus = null;
try {
dstStatus = innerGetFileStatus(dst, true, StatusProbeEnum.ALL);
// if there is no destination entry, an exception is raised.
// hence this code sequence can assume that there is something
// at the end of the path; the only detail being what it is and
// whether or not it can be the destination of the rename.
if (srcStatus.isDirectory()) {
if (dstStatus.isFile()) {
throw new FileAlreadyExistsException(
"Failed to rename " + src + " to " + dst
+"; source is a directory and dest is a file");
} else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) {
throw new RenameFailedException(src, dst,
"Destination is a non-empty directory")
.withExitCode(false);
}
// at this point the destination is an empty directory
} else {
// source is a file. The destination must be a directory,
// empty or not
if (dstStatus.isFile()) {
throw new FileAlreadyExistsException(
"Failed to rename " + src + " to " + dst
+ "; destination file exists");
}
}
} catch (FileNotFoundException e) {
LOG.debug("rename: destination path {} not found", dst);
// Parent must exist
Path parent = dst.getParent();
if (!pathToKey(parent).isEmpty()
&& !parent.equals(src.getParent())) {
try {
// make sure parent isn't a file.
// don't look for parent being a dir as there is a risk
// of a race between dest dir cleanup and rename in different
// threads.
S3AFileStatus dstParentStatus = innerGetFileStatus(parent,
false, StatusProbeEnum.FILE);
// if this doesn't raise an exception then
// the parent is a file or a dir.
if (!dstParentStatus.isDirectory()) {
throw new RenameFailedException(src, dst,
"destination parent is not a directory");
}
} catch (FileNotFoundException expected) {
// nothing was found. Don't worry about it;
// expect rename to implicitly create the parent dir
}
}
}
return Pair.of(srcStatus, dstStatus);
}
/**
* The inner rename operation. See {@link #rename(Path, Path)} for
* the description of the operation.
* This operation throws an exception on any failure which needs to be
* reported and downgraded to a failure.
* Retries: retry translated, assuming all operations it is called do
* so. For safely, consider catch and handle AmazonClientException
* because this is such a complex method there's a risk it could surface.
* @param source path to be renamed
* @param dest new path after rename
* @throws RenameFailedException if some criteria for a state changing
* rename was not met. This means work didn't happen; it's not something
* which is reported upstream to the FileSystem APIs, for which the semantics
* of "false" are pretty vague.
* @return the number of bytes copied.
* @throws FileNotFoundException there's no source file.
* @throws IOException on IO failure.
* @throws AmazonClientException on failures inside the AWS SDK
*/
@Retries.RetryMixed
private long innerRename(Path source, Path dest)
throws RenameFailedException, FileNotFoundException, IOException,
AmazonClientException {
Path src = qualify(source);
Path dst = qualify(dest);
LOG.debug("Rename path {} to {}", src, dst);
String srcKey = pathToKey(src);
String dstKey = pathToKey(dst);
Pair<S3AFileStatus, S3AFileStatus> p = initiateRename(src, dst);
// Initiate the rename.
// this will call back into this class via the rename callbacks
RenameOperation renameOperation = new RenameOperation(
createStoreContext(),
src, srcKey, p.getLeft(),
dst, dstKey, p.getRight(),
new OperationCallbacksImpl(),
pageSize);
return renameOperation.execute();
}
@Override public Token<? extends TokenIdentifier> getFsDelegationToken()
throws IOException {
return getDelegationToken(null);
}
/**
* The callbacks made by the rename and delete operations.
* This separation allows the operation to be factored out and
* still avoid knowledge of the S3AFilesystem implementation.
* The Audit span active at the time of creation is cached and activated
* before every call.
*/
private final class OperationCallbacksImpl implements OperationCallbacks {
/** Audit Span at time of creation. */
private final AuditSpan auditSpan;
private OperationCallbacksImpl() {
auditSpan = getActiveAuditSpan();
}
@Override
public S3ObjectAttributes createObjectAttributes(final Path path,
final String eTag,
final String versionId,
final long len) {
return S3AFileSystem.this.createObjectAttributes(path, eTag, versionId,
len);
}
@Override
public S3ObjectAttributes createObjectAttributes(
final S3AFileStatus fileStatus) {
return S3AFileSystem.this.createObjectAttributes(
fileStatus.getPath(),
fileStatus);
}
@Override
public S3AReadOpContext createReadContext(final FileStatus fileStatus) {
return S3AFileSystem.this.createReadContext(fileStatus,
auditSpan);
}
@Override
@Retries.RetryTranslated
public void deleteObjectAtPath(final Path path,
final String key,
final boolean isFile)
throws IOException {
auditSpan.activate();
once("delete", path.toString(), () ->
S3AFileSystem.this.deleteObjectAtPath(path, key, isFile));
}
@Override
@Retries.RetryTranslated
public RemoteIterator<S3ALocatedFileStatus> listFilesAndDirectoryMarkers(
final Path path,
final S3AFileStatus status,
final boolean includeSelf) throws IOException {
auditSpan.activate();
return innerListFiles(
path,
true,
includeSelf
? Listing.ACCEPT_ALL_BUT_S3N
: new Listing.AcceptAllButSelfAndS3nDirs(path),
status
);
}
@Override
public CopyResult copyFile(final String srcKey,
final String destKey,
final S3ObjectAttributes srcAttributes,
final S3AReadOpContext readContext) throws IOException {
auditSpan.activate();
return S3AFileSystem.this.copyFile(srcKey, destKey,
srcAttributes.getLen(), srcAttributes, readContext);
}
@Override
public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException, IOException {
auditSpan.activate();
S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir);
}
@Override
public void finishRename(final Path sourceRenamed, final Path destCreated)
throws IOException {
auditSpan.activate();
Path destParent = destCreated.getParent();
if (!sourceRenamed.getParent().equals(destParent)) {
LOG.debug("source & dest parents are different; fix up dir markers");
if (!keepDirectoryMarkers(destParent)) {
deleteUnnecessaryFakeDirectories(destParent);
}
maybeCreateFakeParentDirectory(sourceRenamed);
}
}
@Override
@Retries.RetryTranslated
public RemoteIterator<S3AFileStatus> listObjects(
final Path path,
final String key)
throws IOException {
return once("listObjects", key, () ->
listing.createFileStatusListingIterator(path,
createListObjectsRequest(key, null),
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N,
auditSpan));
}
}
/**
* Callbacks from {@link Listing}.
* Auditing: the listing object is long-lived; the audit span
* for a single listing is passed in from the listing
* method calls and then down to the callbacks.
*/
protected class ListingOperationCallbacksImpl implements
ListingOperationCallbacks {
@Override
public CompletableFuture<S3ListResult> listObjectsAsync(
S3ListRequest request,
DurationTrackerFactory trackerFactory,
AuditSpan span) {
return submit(unboundedThreadPool, span, () ->
listObjects(request,
pairedTrackerFactory(trackerFactory,
getDurationTrackerFactory())));
}
@Override
@Retries.RetryRaw
public CompletableFuture<S3ListResult> continueListObjectsAsync(
S3ListRequest request,
S3ListResult prevResult,
DurationTrackerFactory trackerFactory,
AuditSpan span) {
return submit(unboundedThreadPool, span,
() -> continueListObjects(request, prevResult,
pairedTrackerFactory(trackerFactory,
getDurationTrackerFactory())));
}
@Override
public S3ALocatedFileStatus toLocatedFileStatus(
S3AFileStatus status)
throws IOException {
return S3AFileSystem.this.toLocatedFileStatus(status);
}
@Override
public S3ListRequest createListObjectsRequest(
String key,
String delimiter,
AuditSpan span) {
span.activate();
return S3AFileSystem.this.createListObjectsRequest(key, delimiter);
}
@Override
public long getDefaultBlockSize(Path path) {
return S3AFileSystem.this.getDefaultBlockSize(path);
}
@Override
public int getMaxKeys() {
return S3AFileSystem.this.getMaxKeys();
}
}
/**
* Low-level call to get at the object metadata.
* This method is used in some external applications and so
* must be viewed as a public entry point.
* Auditing: An audit entry point.
* @param path path to the object. This will be qualified.
* @return metadata
* @throws IOException IO and object access problems.
*/
@VisibleForTesting
@AuditEntryPoint
@InterfaceAudience.LimitedPrivate("utilities")
@Retries.RetryTranslated
@InterfaceStability.Evolving
public ObjectMetadata getObjectMetadata(Path path) throws IOException {
return trackDurationAndSpan(INVOCATION_GET_FILE_STATUS, path, () ->
getObjectMetadata(makeQualified(path), null, invoker,
"getObjectMetadata"));
}
/**
* Low-level call to get at the object metadata.
* @param path path to the object
* @param changeTracker the change tracker to detect version inconsistencies
* @param changeInvoker the invoker providing the retry policy
* @param operation the operation being performed (e.g. "read" or "copy")
* @return metadata
* @throws IOException IO and object access problems.
*/
@Retries.RetryTranslated
private ObjectMetadata getObjectMetadata(Path path,
ChangeTracker changeTracker, Invoker changeInvoker, String operation)
throws IOException {
String key = pathToKey(path);
return once(operation, path.toString(), () ->
// HEAD against the object
getObjectMetadata(
key, changeTracker, changeInvoker, operation));
}
/**
* Entry point to an operation.
* Increments the statistic; verifies the FS is active.
* @param operation The operation being invoked
* @param path first path of operation
* @return a span for the audit
* @throws IOException failure of audit service
*/
protected AuditSpan entryPoint(Statistic operation,
Path path) throws IOException {
return entryPoint(operation,
(path != null ? pathToKey(path): null),
null);
}
/**
* Entry point to an operation.
* Increments the statistic; verifies the FS is active.
* @param operation The operation being invoked
* @param path1 first path of operation
* @param path2 second path of operation
* @return a span for the audit
* @throws IOException failure of audit service
*/
protected AuditSpan entryPoint(Statistic operation,
@Nullable String path1,
@Nullable String path2) throws IOException {
checkNotClosed();
incrementStatistic(operation);
return createSpan(operation.getSymbol(),
path1, path2);
}
/**
* Given an IOException raising callable/lambda expression,
* execute it and update the relevant statistic within a span
* of the same statistic.
* @param statistic statistic key
* @param path first path for span (nullable)
* @param path2 second path for span
* @param input input callable.
* @param <B> return type.
* @return the result of the operation.
* @throws IOException if raised in the callable
*/
private <B> B trackDurationAndSpan(
Statistic statistic, String path, String path2,
CallableRaisingIOE<B> input) throws IOException {
checkNotClosed();
try (AuditSpan span = createSpan(statistic.getSymbol(),
path, path2)) {
return trackDuration(getDurationTrackerFactory(),
statistic.getSymbol(), input);
}
}
/**
* Overloaded version of {@code trackDurationAndSpan()}.
* Takes a single nullable path as the path param,
* @param statistic statistic key
* @param path path for span (nullable)
* @param input input callable.
* @param <B> return type.
* @return the result of the operation.
* @throws IOException if raised in the callable
*/
private <B> B trackDurationAndSpan(
Statistic statistic,
@Nullable Path path,
CallableRaisingIOE<B> input) throws IOException {
return trackDurationAndSpan(statistic,
path != null ? pathToKey(path): null,
null, input);
}
/**
* Increment a statistic by 1.
* This increments both the instrumentation and storage statistics.
* @param statistic The operation to increment
*/
protected void incrementStatistic(Statistic statistic) {
incrementStatistic(statistic, 1);
}
/**
* Increment a statistic by a specific value.
* This increments both the instrumentation and storage statistics.
* @param statistic The operation to increment
* @param count the count to increment
*/
protected void incrementStatistic(Statistic statistic, long count) {
statisticsContext.incrementCounter(statistic, count);
}
/**
* Decrement a gauge by a specific value.
* @param statistic The operation to decrement
* @param count the count to decrement
*/
protected void decrementGauge(Statistic statistic, long count) {
statisticsContext.decrementGauge(statistic, count);
}
/**
* Increment a gauge by a specific value.
* @param statistic The operation to increment
* @param count the count to increment
*/
protected void incrementGauge(Statistic statistic, long count) {
statisticsContext.incrementGauge(statistic, count);
}
/**
* Callback when an operation was retried.
* Increments the statistics of ignored errors or throttled requests,
* depending up on the exception class.
* @param ex exception.
*/
public void operationRetried(Exception ex) {
if (isThrottleException(ex)) {
LOG.debug("Request throttled");
incrementStatistic(STORE_IO_THROTTLED);
statisticsContext.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1);
} else {
incrementStatistic(STORE_IO_RETRY);
incrementStatistic(IGNORED_ERRORS);
}
}
/**
* Callback from {@link Invoker} when an operation is retried.
* @param text text of the operation
* @param ex exception
* @param retries number of retries
* @param idempotent is the method idempotent
*/
public void operationRetried(
String text,
Exception ex,
int retries,
boolean idempotent) {
operationRetried(ex);
}
/**
* Get the storage statistics of this filesystem.
* @return the storage statistics
*/
@Override
public S3AStorageStatistics getStorageStatistics() {
return storageStatistics;
}
/**
* Get the instrumentation's IOStatistics.
* @return statistics
*/
@Override
public IOStatistics getIOStatistics() {
return instrumentation != null
? instrumentation.getIOStatistics()
: null;
}
/**
* Get the factory for duration tracking.
* @return a factory from the instrumentation.
*/
protected DurationTrackerFactory getDurationTrackerFactory() {
return instrumentation != null ?
instrumentation.getDurationTrackerFactory()
: null;
}
/**
* Request object metadata; increments counters in the process.
* Retry policy: retry untranslated.
* This method is used in some external applications and so
* must be viewed as a public entry point.
* Auditing: this call does NOT initiate a new AuditSpan; the expectation
* is that there is already an active span.
* @param key key
* @return the metadata
* @throws IOException if the retry invocation raises one (it shouldn't).
*/
@Retries.RetryRaw
@VisibleForTesting
@InterfaceAudience.LimitedPrivate("external utilities")
ObjectMetadata getObjectMetadata(String key) throws IOException {
return getObjectMetadata(key, null, invoker, "getObjectMetadata");
}
/**
* Request object metadata; increments counters in the process.
* Retry policy: retry untranslated.
* Uses changeTracker to detect an unexpected file version (eTag or versionId)
* @param key key
* @param changeTracker the change tracker to detect unexpected object version
* @param changeInvoker the invoker providing the retry policy
* @param operation the operation (e.g. "read" or "copy") triggering this call
* @return the metadata
* @throws IOException if the retry invocation raises one (it shouldn't).
* @throws RemoteFileChangedException if an unexpected version is detected
*/
@Retries.RetryRaw
protected ObjectMetadata getObjectMetadata(String key,
ChangeTracker changeTracker,
Invoker changeInvoker,
String operation) throws IOException {
ObjectMetadata meta = changeInvoker.retryUntranslated("GET " + key, true,
() -> {
GetObjectMetadataRequest request
= getRequestFactory().newGetObjectMetadataRequest(key);
incrementStatistic(OBJECT_METADATA_REQUESTS);
DurationTracker duration = getDurationTrackerFactory()
.trackDuration(ACTION_HTTP_HEAD_REQUEST.getSymbol());
try {
LOG.debug("HEAD {} with change tracker {}", key, changeTracker);
if (changeTracker != null) {
changeTracker.maybeApplyConstraint(request);
}
ObjectMetadata objectMetadata = s3.getObjectMetadata(request);
if (changeTracker != null) {
changeTracker.processMetadata(objectMetadata, operation);
}
return objectMetadata;
} catch(AmazonServiceException ase) {
if (!isObjectNotFound(ase)) {
// file not found is not considered a failure of the call,
// so only switch the duration tracker to update failure
// metrics on other exception outcomes.
duration.failed();
}
throw ase;
} finally {
// update the tracker.
duration.close();
}
});
incrementReadOperations();
return meta;
}
/**
* Initiate a {@code listObjects} operation, incrementing metrics
* in the process.
*
* Retry policy: retry untranslated.
* @param request request to initiate
* @param trackerFactory duration tracking
* @return the results
* @throws IOException if the retry invocation raises one (it shouldn't).
*/
@Retries.RetryRaw
protected S3ListResult listObjects(S3ListRequest request,
@Nullable final DurationTrackerFactory trackerFactory)
throws IOException {
incrementReadOperations();
LOG.debug("LIST {}", request);
validateListArguments(request);
try(DurationInfo ignored =
new DurationInfo(LOG, false, "LIST")) {
return invoker.retryUntranslated(
request.toString(),
true,
trackDurationOfOperation(trackerFactory,
OBJECT_LIST_REQUEST,
() -> {
if (useListV1) {
return S3ListResult.v1(s3.listObjects(request.getV1()));
} else {
return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
}
}));
}
}
/**
* Validate the list arguments with this bucket's settings.
* @param request the request to validate
*/
private void validateListArguments(S3ListRequest request) {
if (useListV1) {
Preconditions.checkArgument(request.isV1());
} else {
Preconditions.checkArgument(!request.isV1());
}
}
/**
* List the next set of objects.
* Retry policy: retry untranslated.
* @param request last list objects request to continue
* @param prevResult last paged result to continue from
* @param trackerFactory duration tracking
* @return the next result object
* @throws IOException none, just there for retryUntranslated.
*/
@Retries.RetryRaw
protected S3ListResult continueListObjects(S3ListRequest request,
S3ListResult prevResult,
final DurationTrackerFactory trackerFactory) throws IOException {
incrementReadOperations();
validateListArguments(request);
try(DurationInfo ignored =
new DurationInfo(LOG, false, "LIST (continued)")) {
return invoker.retryUntranslated(
request.toString(),
true,
trackDurationOfOperation(
trackerFactory,
OBJECT_CONTINUE_LIST_REQUEST,
() -> {
if (useListV1) {
return S3ListResult.v1(
s3.listNextBatchOfObjects(
getRequestFactory()
.newListNextBatchOfObjectsRequest(
prevResult.getV1())));
} else {
request.getV2().setContinuationToken(prevResult.getV2()
.getNextContinuationToken());
return S3ListResult.v2(s3.listObjectsV2(request.getV2()));
}
}));
}
}
/**
* Increment read operations.
*/
public void incrementReadOperations() {
statistics.incrementReadOps(1);
}
/**
* Increment the write operation counter.
* This is somewhat inaccurate, as it appears to be invoked more
* often than needed in progress callbacks.
*/
public void incrementWriteOperations() {
statistics.incrementWriteOps(1);
}
/**
* Delete an object.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics.
* This call does <i>not</i> create any mock parent entries.
*
* Retry policy: retry untranslated; delete considered idempotent.
* @param key key to blob to delete.
* @throws AmazonClientException problems working with S3
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
*/
@VisibleForTesting
@Retries.RetryRaw
protected void deleteObject(String key)
throws AmazonClientException, IOException {
blockRootDelete(key);
incrementWriteOperations();
try (DurationInfo ignored =
new DurationInfo(LOG, false,
"deleting %s", key)) {
invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key),
DELETE_CONSIDERED_IDEMPOTENT,
()-> {
incrementStatistic(OBJECT_DELETE_OBJECTS);
trackDurationOfInvocation(getDurationTrackerFactory(),
OBJECT_DELETE_REQUEST.getSymbol(),
() -> s3.deleteObject(getRequestFactory()
.newDeleteObjectRequest(key)));
return null;
});
}
}
/**
* Delete an object.
* This call does <i>not</i> create any mock parent entries.
* Retry policy: retry untranslated; delete considered idempotent.
* @param f path path to delete
* @param key key of entry
* @param isFile is the path a file (used for instrumentation only)
* @throws AmazonClientException problems working with S3
* @throws IOException from invoker signature only -should not be raised.
*/
@Retries.RetryRaw
void deleteObjectAtPath(Path f,
String key,
boolean isFile)
throws AmazonClientException, IOException {
if (isFile) {
instrumentation.fileDeleted(1);
} else {
instrumentation.directoryDeleted();
}
deleteObject(key);
}
/**
* Reject any request to delete an object where the key is root.
* @param key key to validate
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
*/
private void blockRootDelete(String key) throws InvalidRequestException {
if (key.isEmpty() || "/".equals(key)) {
throw new InvalidRequestException("Bucket "+ bucket
+" cannot be deleted");
}
}
/**
* Perform a bulk object delete operation against S3.
* Increments the {@code OBJECT_DELETE_REQUESTS} and write
* operation statistics
* <p></p>
* {@code OBJECT_DELETE_OBJECTS} is updated with the actual number
* of objects deleted in the request.
* <p></p>
* Retry policy: retry untranslated; delete considered idempotent.
* If the request is throttled, this is logged in the throttle statistics,
* with the counter set to the number of keys, rather than the number
* of invocations of the delete operation.
* This is because S3 considers each key as one mutating operation on
* the store when updating its load counters on a specific partition
* of an S3 bucket.
* If only the request was measured, this operation would under-report.
* @param deleteRequest keys to delete on the s3-backend
* @return the AWS response
* @throws MultiObjectDeleteException one or more of the keys could not
* be deleted.
* @throws AmazonClientException amazon-layer failure.
*/
@Retries.RetryRaw
private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
throws MultiObjectDeleteException, AmazonClientException, IOException {
incrementWriteOperations();
BulkDeleteRetryHandler retryHandler =
new BulkDeleteRetryHandler(createStoreContext());
int keyCount = deleteRequest.getKeys().size();
try(DurationInfo ignored =
new DurationInfo(LOG, false, "DELETE %d keys",
keyCount)) {
return invoker.retryUntranslated("delete",
DELETE_CONSIDERED_IDEMPOTENT,
(text, e, r, i) -> {
// handle the failure
retryHandler.bulkDeleteRetried(deleteRequest, e);
},
// duration is tracked in the bulk delete counters
trackDurationOfOperation(getDurationTrackerFactory(),
OBJECT_BULK_DELETE_REQUEST.getSymbol(), () -> {
incrementStatistic(OBJECT_DELETE_OBJECTS, keyCount);
return s3.deleteObjects(deleteRequest);
}));
} catch (MultiObjectDeleteException e) {
// one or more of the keys could not be deleted.
// log and rethrow
List<MultiObjectDeleteException.DeleteError> errors = e.getErrors();
LOG.debug("Partial failure of delete, {} errors", errors.size(), e);
for (MultiObjectDeleteException.DeleteError error : errors) {
LOG.debug("{}: \"{}\" - {}",
error.getKey(), error.getCode(), error.getMessage());
}
throw e;
}
}
/**
* Create a putObject request.
* Adds the ACL and metadata
* @param key key of object
* @param metadata metadata header
* @param srcfile source file
* @return the request
*/
public PutObjectRequest newPutObjectRequest(String key,
ObjectMetadata metadata, File srcfile) {
return requestFactory.newPutObjectRequest(key, metadata, null, srcfile);
}
/**
* Create a new object metadata instance.
* Any standard metadata headers are added here, for example:
* encryption.
*
* @param length length of data to set in header.
* @return a new metadata instance
*/
public ObjectMetadata newObjectMetadata(long length) {
return requestFactory.newObjectMetadata(length);
}
/**
* Start a transfer-manager managed async PUT of an object,
* incrementing the put requests and put bytes
* counters.
* It does not update the other counters,
* as existing code does that as progress callbacks come in.
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
* Because the operation is async, any stream supplied in the request
* must reference data (files, buffers) which stay valid until the upload
* completes.
* Retry policy: N/A: the transfer manager is performing the upload.
* Auditing: must be inside an audit span.
* @param putObjectRequest the request
* @return the upload initiated
*/
@Retries.OnceRaw
public UploadInfo putObject(PutObjectRequest putObjectRequest) {
long len = getPutRequestLength(putObjectRequest);
LOG.debug("PUT {} bytes to {} via transfer manager ",
len, putObjectRequest.getKey());
incrementPutStartStatistics(len);
Upload upload = transfers.upload(putObjectRequest);
return new UploadInfo(upload, len);
}
/**
* PUT an object directly (i.e. not via the transfer manager).
* Byte length is calculated from the file length, or, if there is no
* file, from the content length of the header.
*
* Retry Policy: none.
* Auditing: must be inside an audit span.
* <i>Important: this call will close any input stream in the request.</i>
* @param putObjectRequest the request
* @param putOptions put object options
* @return the upload initiated
* @throws AmazonClientException on problems
*/
@VisibleForTesting
@Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest,
PutObjectOptions putOptions)
throws AmazonClientException {
long len = getPutRequestLength(putObjectRequest);
LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey());
incrementPutStartStatistics(len);
try {
PutObjectResult result = trackDurationOfSupplier(
getDurationTrackerFactory(),
OBJECT_PUT_REQUESTS.getSymbol(), () ->
s3.putObject(putObjectRequest));
incrementPutCompletedStatistics(true, len);
// apply any post-write actions.
finishedWrite(putObjectRequest.getKey(), len,
result.getETag(), result.getVersionId(),
putOptions);
return result;
} catch (SdkBaseException e) {
incrementPutCompletedStatistics(false, len);
throw e;
}
}
/**
* Get the length of the PUT, verifying that the length is known.
* @param putObjectRequest a request bound to a file or a stream.
* @return the request length
* @throws IllegalArgumentException if the length is negative
*/
private long getPutRequestLength(PutObjectRequest putObjectRequest) {
long len;
if (putObjectRequest.getFile() != null) {
len = putObjectRequest.getFile().length();
} else {
len = putObjectRequest.getMetadata().getContentLength();
}
Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length");
return len;
}
/**
* Upload part of a multi-partition file.
* Increments the write and put counters.
* <i>Important: this call does not close any input stream in the request.</i>
*
* Retry Policy: none.
* @param request request
* @return the result of the operation.
* @throws AmazonClientException on problems
*/
@Retries.OnceRaw
UploadPartResult uploadPart(UploadPartRequest request)
throws AmazonClientException {
long len = request.getPartSize();
incrementPutStartStatistics(len);
try {
UploadPartResult uploadPartResult = s3.uploadPart(request);
incrementPutCompletedStatistics(true, len);
return uploadPartResult;
} catch (AmazonClientException e) {
incrementPutCompletedStatistics(false, len);
throw e;
}
}
/**
* At the start of a put/multipart upload operation, update the
* relevant counters.
*
* @param bytes bytes in the request.
*/
public void incrementPutStartStatistics(long bytes) {
LOG.debug("PUT start {} bytes", bytes);
incrementWriteOperations();
incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
if (bytes > 0) {
incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
}
}
/**
* At the end of a put/multipart upload operation, update the
* relevant counters and gauges.
*
* @param success did the operation succeed?
* @param bytes bytes in the request.
*/
public void incrementPutCompletedStatistics(boolean success, long bytes) {
LOG.debug("PUT completed success={}; {} bytes", success, bytes);
if (bytes > 0) {
incrementStatistic(OBJECT_PUT_BYTES, bytes);
decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes);
}
incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED);
decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1);
}
/**
* Callback for use in progress callbacks from put/multipart upload events.
* Increments those statistics which are expected to be updated during
* the ongoing upload operation.
* @param key key to file that is being written (for logging)
* @param bytes bytes successfully uploaded.
*/
public void incrementPutProgressStatistics(String key, long bytes) {
PROGRESS.debug("PUT {}: {} bytes", key, bytes);
incrementWriteOperations();
if (bytes > 0) {
statistics.incrementBytesWritten(bytes);
}
}
/**
* Delete a list of keys on a s3-backend.
* Retry policy: retry untranslated; delete considered idempotent.
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
* be deleted in a multiple object delete operation.
* The number of rejected objects will be added to the metric
* {@link Statistic#FILES_DELETE_REJECTED}.
* @throws AmazonClientException other amazon-layer failure.
*/
@Retries.RetryRaw
private void removeKeysS3(
List<DeleteObjectsRequest.KeyVersion> keysToDelete,
boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Initiating delete operation for {} objects",
keysToDelete.size());
for (DeleteObjectsRequest.KeyVersion key : keysToDelete) {
LOG.debug(" {} {}", key.getKey(),
key.getVersion() != null ? key.getVersion() : "");
}
}
if (keysToDelete.isEmpty()) {
// exit fast if there are no keys to delete
return;
}
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
blockRootDelete(keyVersion.getKey());
}
try {
if (enableMultiObjectsDelete) {
if (keysToDelete.size() <= pageSize) {
deleteObjects(getRequestFactory()
.newBulkDeleteRequest(keysToDelete));
} else {
// Multi object deletion of more than 1000 keys is not supported
// by s3. So we are paging the keys by page size.
LOG.debug("Partitioning the keys to delete as it is more than " +
"page size. Number of keys: {}, Page size: {}",
keysToDelete.size(), pageSize);
for (List<DeleteObjectsRequest.KeyVersion> batchOfKeysToDelete :
Lists.partition(keysToDelete, pageSize)) {
deleteObjects(getRequestFactory()
.newBulkDeleteRequest(batchOfKeysToDelete));
}
}
} else {
for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) {
deleteObject(keyVersion.getKey());
}
}
} catch (MultiObjectDeleteException ex) {
// partial delete.
// Update the stats with the count of the actual number of successful
// deletions.
int rejected = ex.getErrors().size();
noteDeleted(keysToDelete.size() - rejected, deleteFakeDir);
incrementStatistic(FILES_DELETE_REJECTED, rejected);
throw ex;
}
noteDeleted(keysToDelete.size(), deleteFakeDir);
}
/**
* Note the deletion of files or fake directories deleted.
* @param count count of keys deleted.
* @param deleteFakeDir are the deletions fake directories?
*/
private void noteDeleted(final int count, final boolean deleteFakeDir) {
if (!deleteFakeDir) {
instrumentation.fileDeleted(count);
} else {
instrumentation.fakeDirsDeleted(count);
}
}
/**
* Invoke {@link #removeKeysS3(List, boolean)}.
* If a {@code MultiObjectDeleteException} is raised, the
* relevant statistics are updated.
*
* @param keysToDelete collection of keys to delete on the s3-backend.
* if empty, no request is made of the object store.
* @param deleteFakeDir indicates whether this is for deleting fake dirs
* @throws InvalidRequestException if the request was rejected due to
* a mistaken attempt to delete the root directory.
* @throws MultiObjectDeleteException one or more of the keys could not
* be deleted in a multiple object delete operation.
* @throws AmazonClientException amazon-layer failure.
* @throws IOException other IO Exception.
*/
@VisibleForTesting
@Retries.RetryRaw
public void removeKeys(
final List<DeleteObjectsRequest.KeyVersion> keysToDelete,
final boolean deleteFakeDir)
throws MultiObjectDeleteException, AmazonClientException,
IOException {
try (DurationInfo ignored = new DurationInfo(LOG, false,
"Deleting %d keys", keysToDelete.size())) {
removeKeysS3(keysToDelete, deleteFakeDir);
}
}
/**
* Delete a Path. This operation is at least {@code O(files)}, with
* added overheads to enumerate the path. It is also not atomic.
*
* @param f the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception. In
* case of a file the recursive can be set to either true or false.
* @return true if the path existed and then was deleted; false if there
* was no path in the first place, or the corner cases of root path deletion
* have surfaced.
* @throws IOException due to inability to delete a directory or file.
*/
@Override
@Retries.RetryTranslated
@AuditEntryPoint
public boolean delete(Path f, boolean recursive) throws IOException {
checkNotClosed();
final Path path = qualify(f);
// span covers delete, getFileStatus, fake directory operations.
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
path.toString(), null)) {
boolean outcome = trackDuration(getDurationTrackerFactory(),
INVOCATION_DELETE.getSymbol(),
new DeleteOperation(
createStoreContext(),
innerGetFileStatus(path, true, StatusProbeEnum.ALL),
recursive,
new OperationCallbacksImpl(),
pageSize));
if (outcome) {
try {
maybeCreateFakeParentDirectory(path);
} catch (AccessDeniedException e) {
LOG.warn("Cannot create directory marker at {}: {}",
f.getParent(), e.toString());
LOG.debug("Failed to create fake dir above {}", path, e);
}
}
return outcome;
} catch (FileNotFoundException e) {
LOG.debug("Couldn't delete {} - does not exist: {}", path, e.toString());
instrumentation.errorIgnored();
return false;
} catch (AmazonClientException e) {
throw translateException("delete", path, e);
}
}
/**
* Create a fake directory if required.
* That is: it is not the root path and the path does not exist.
* Retry policy: retrying; untranslated.
* @param f path to create
* @throws IOException IO problem
*/
@Retries.RetryTranslated
private void createFakeDirectoryIfNecessary(Path f)
throws IOException, AmazonClientException {
String key = pathToKey(f);
// we only make the LIST call; the codepaths to get here should not
// be reached if there is an empty dir marker -and if they do, it
// is mostly harmless to create a new one.
if (!key.isEmpty() && !s3Exists(f, StatusProbeEnum.DIRECTORIES)) {
LOG.debug("Creating new fake directory at {}", f);
createFakeDirectory(key, putOptionsForPath(f));
}
}
/**
* Create a fake parent directory if required.
* That is: it parent is not the root path and does not yet exist.
* @param path whose parent is created if needed.
* @throws IOException IO problem
*/
@Retries.RetryTranslated
@VisibleForTesting
protected void maybeCreateFakeParentDirectory(Path path)
throws IOException, AmazonClientException {
Path parent = path.getParent();
if (parent != null && !parent.isRoot() && !isUnderMagicCommitPath(parent)) {
createFakeDirectoryIfNecessary(parent);
}
}
/**
* Override subclass such that we benefit for async listing done
* in {@code S3AFileSystem}. See {@code Listing#ObjectListingIterator}.
* {@inheritDoc}
*
*/
@Override
@AuditEntryPoint
public RemoteIterator<FileStatus> listStatusIterator(Path p)
throws FileNotFoundException, IOException {
Path path = qualify(p);
return typeCastingRemoteIterator(trackDurationAndSpan(
INVOCATION_LIST_STATUS, path, () ->
once("listStatus", path.toString(), () ->
innerListStatus(p))));
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* IOException see specific implementation
*/
@Override
@AuditEntryPoint
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
IOException {
Path path = qualify(f);
return trackDurationAndSpan(INVOCATION_LIST_STATUS, path, () ->
once("listStatus", path.toString(),
() -> iteratorToStatuses(innerListStatus(path))));
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory. The returned iterator is within the current active span.
*
* Auditing: This method MUST be called within a span.
* The span is attached to the iterator. All further S3 calls
* made by the iterator will be within the span.
* @param f qualified path
* @return the statuses of the files/directories in the given patch
* @throws FileNotFoundException when the path does not exist;
* @throws IOException due to an IO problem.
* @throws AmazonClientException on failures inside the AWS SDK
*/
private RemoteIterator<S3AFileStatus> innerListStatus(Path f)
throws FileNotFoundException,
IOException, AmazonClientException {
Path path = qualify(f);
LOG.debug("List status for path: {}", path);
final RemoteIterator<S3AFileStatus> statusIt = listing
.getFileStatusesAssumingNonEmptyDir(path, getActiveAuditSpan());
if (!statusIt.hasNext()) {
// We may have an empty dir, or may have file or may have nothing.
// So we call innerGetFileStatus to get the status, this may throw
// FileNotFoundException if we have nothing.
// So We are guaranteed to have either a dir marker or a file.
final S3AFileStatus fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.ALL);
// If it is a file return directly.
if (fileStatus.isFile()) {
LOG.debug("Adding: rd (not a dir): {}", path);
S3AFileStatus[] stats = new S3AFileStatus[1];
stats[0] = fileStatus;
return listing.createProvidedFileStatusIterator(
stats,
ACCEPT_ALL,
Listing.ACCEPT_ALL_BUT_S3N);
}
}
// Here we have a directory which may or may not be empty.
return statusIt;
}
/**
* Is a path to be considered as authoritative?
* is a store with the supplied path under
* one of the paths declared as authoritative.
* @param path path
* @return true if the path is auth
*/
public boolean allowAuthoritative(final Path path) {
return S3Guard.allowAuthoritative(path, this,
allowAuthoritativePaths);
}
/**
* Create a {@code ListObjectsRequest} request against this bucket,
* with the maximum keys returned in a query set by {@link #maxKeys}.
* @param key key for request
* @param delimiter any delimiter
* @return the request
*/
@VisibleForTesting
public S3ListRequest createListObjectsRequest(String key,
String delimiter) {
return createListObjectsRequest(key, delimiter, maxKeys);
}
/**
* Create the List objects request appropriate for the
* active list request option.
* @param key key for request
* @param delimiter any delimiter
* @param limit limit of keys
* @return the request
*/
private S3ListRequest createListObjectsRequest(String key,
String delimiter, int limit) {
if (!useListV1) {
ListObjectsV2Request request =
getRequestFactory().newListObjectsV2Request(
key, delimiter, limit);
return S3ListRequest.v2(request);
} else {
ListObjectsRequest request =
getRequestFactory().newListObjectsV1Request(
key, delimiter, limit);
return S3ListRequest.v1(request);
}
}
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
* @param newDir the current working directory.
*/
public void setWorkingDirectory(Path newDir) {
workingDir = makeQualified(newDir);
}
/**
* Get the current working directory for the given file system.
* @return the directory pathname
*/
public Path getWorkingDirectory() {
return workingDir;
}
/**
* Get the username of the FS.
* @return the short name of the user who instantiated the FS
*/
public String getUsername() {
return username;
}
/**
* Get the owner of this FS: who created it?
* @return the owner of the FS.
*/
public UserGroupInformation getOwner() {
return owner;
}
/**
*
* Make the given path and all non-existent parents into
* directories. Has the semantics of Unix {@code 'mkdir -p'}.
* Existence of the directory hierarchy is not an error.
* Parent elements are scanned to see if any are a file,
* <i>except under __magic</i> paths.
* There the FS assumes that the destination directory creation
* did that scan and that paths in job/task attempts are all
* "well formed"
* @param p path to create
* @param permission to apply to path
* @return true if a directory was created or already existed
* @throws FileAlreadyExistsException there is a file at the path specified
* or is discovered on one of its ancestors.
* @throws IOException other IO problems
*/
@Override
@AuditEntryPoint
public boolean mkdirs(Path p, FsPermission permission) throws IOException,
FileAlreadyExistsException {
Path path = qualify(p);
return trackDurationAndSpan(
INVOCATION_MKDIRS, path,
new MkdirOperation(
createStoreContext(),
path,
createMkdirOperationCallbacks(),
isMagicCommitPath(path)));
}
/**
* Override point: create the callbacks for Mkdir.
* This does not create a new span; caller must be in one.
* @return an implementation of the MkdirCallbacks,
*/
@VisibleForTesting
public MkdirOperation.MkdirCallbacks createMkdirOperationCallbacks() {
return new MkdirOperationCallbacksImpl();
}
/**
* Callbacks from the {@link MkdirOperation}.
*/
protected class MkdirOperationCallbacksImpl implements
MkdirOperation.MkdirCallbacks {
@Override
public S3AFileStatus probePathStatus(final Path path,
final Set<StatusProbeEnum> probes) throws IOException {
return S3AFileSystem.this.innerGetFileStatus(path, false, probes);
}
@Override
public void createFakeDirectory(final Path dir, final boolean keepMarkers)
throws IOException {
S3AFileSystem.this.createFakeDirectory(
pathToKey(dir),
keepMarkers
? PutObjectOptions.keepingDirs()
: putOptionsForPath(dir));
}
}
/**
* This is a very slow operation against object storage.
* Execute it as a single span with whatever optimizations
* have been implemented.
* {@inheritDoc}
*/
@Override
@Retries.RetryTranslated
@AuditEntryPoint
public ContentSummary getContentSummary(final Path f) throws IOException {
final Path path = qualify(f);
return trackDurationAndSpan(
INVOCATION_GET_CONTENT_SUMMARY, path,
new GetContentSummaryOperation(
createStoreContext(),
path,
createGetContentSummaryCallbacks()));
}
/**
* Override point: create the callbacks for getContentSummary.
* This does not create a new span; caller must be in one.
* @return an implementation of the GetContentSummaryCallbacksImpl
*/
protected GetContentSummaryOperation.GetContentSummaryCallbacks
createGetContentSummaryCallbacks() {
return new GetContentSummaryCallbacksImpl();
}
/**
* Callbacks from the {@link GetContentSummaryOperation}.
*/
protected class GetContentSummaryCallbacksImpl implements
GetContentSummaryOperation.GetContentSummaryCallbacks {
@Override
public S3AFileStatus probePathStatus(final Path path,
final Set<StatusProbeEnum> probes) throws IOException {
return S3AFileSystem.this.innerGetFileStatus(path, false, probes);
}
@Override
public RemoteIterator<S3ALocatedFileStatus> listFilesIterator(final Path path,
final boolean recursive) throws IOException {
return S3AFileSystem.this.innerListFiles(path, recursive, Listing.ACCEPT_ALL_BUT_S3N, null);
}
}
/**
* Soft check of access by forwarding to the audit manager
* and so on to the auditor.
* {@inheritDoc}
*/
@Override
@AuditEntryPoint
public void access(final Path f, final FsAction mode)
throws AccessControlException, FileNotFoundException, IOException {
Path path = qualify(f);
LOG.debug("check access mode {} for {}", path, mode);
trackDurationAndSpan(
INVOCATION_ACCESS, path, () -> {
final S3AFileStatus stat = innerGetFileStatus(path, false,
StatusProbeEnum.ALL);
if (!getAuditManager().checkAccess(path, stat, mode)) {
incrementStatistic(AUDIT_ACCESS_CHECK_FAILURE);
throw new AccessControlException(String.format(
"Permission denied: user=%s, path=\"%s\":%s:%s:%s%s",
getOwner().getUserName(),
stat.getPath(),
stat.getOwner(), stat.getGroup(),
stat.isDirectory() ? "d" : "-", mode));
}
// simply for the API binding.
return true;
});
}
/**
* Return a file status object that represents the path.
* @param f The path we want information from
* @return a FileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException on other problems.
*/
@Override
@AuditEntryPoint
@Retries.RetryTranslated
public FileStatus getFileStatus(final Path f) throws IOException {
Path path = qualify(f);
return trackDurationAndSpan(
INVOCATION_GET_FILE_STATUS, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.ALL));
}
/**
* Get the status of a file or directory.
* Internal version of {@link #getFileStatus(Path)}.
* @param f The path we want information from
* @param needEmptyDirectoryFlag if true, implementation will calculate
* a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
* @param probes probes to make.
* @return a S3AFileStatus object
* @throws FileNotFoundException when the path does not exist
* @throws IOException on other problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
S3AFileStatus innerGetFileStatus(final Path f,
final boolean needEmptyDirectoryFlag,
final Set<StatusProbeEnum> probes) throws IOException {
final Path path = qualify(f);
String key = pathToKey(path);
LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}",
path, key, needEmptyDirectoryFlag);
return s3GetFileStatus(path,
key,
probes,
needEmptyDirectoryFlag);
}
/**
* Probe store for file status with control of which probes are issued..
* Used to implement {@link #innerGetFileStatus(Path, boolean, Set)},
* and for direct management of empty directory blobs.
*
* Checks made, in order:
* <ol>
* <li>
* Head: look for an object at the given key, provided that
* the key doesn't end in "/"
* </li>
* <li>
* DirMarker/List: issue a LIST on the key (with / if needed), require one
* entry to be found for the path to be considered a non-empty directory.
* </li>
* </ol>
*
* Notes:
* <ul>
* <li>
* Objects ending in / are treated as directory markers,
* irrespective of length.
* </li>
* <li>
* The HEAD requests require the permissions to read an object,
* including (we believe) the ability to decrypt the file.
* At the very least, for SSE-C markers, you need the same key on
* the client for the HEAD to work.
* </li>
* <li>
* The List probe needs list permission.
* </li>
* </ul>
*
* Retry policy: retry translated.
* @param path Qualified path
* @param key Key string for the path
* @param probes probes to make
* @param needEmptyDirectoryFlag if true, implementation will calculate
* a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()}
* @return Status
* @throws FileNotFoundException the supplied probes failed.
* @throws IOException on other problems.
*/
@VisibleForTesting
@Retries.RetryTranslated
S3AFileStatus s3GetFileStatus(final Path path,
final String key,
final Set<StatusProbeEnum> probes,
final boolean needEmptyDirectoryFlag) throws IOException {
LOG.debug("S3GetFileStatus {}", path);
// either you aren't looking for the directory flag, or you are,
// and if you are, the probe list must contain list.
Preconditions.checkArgument(!needEmptyDirectoryFlag
|| probes.contains(StatusProbeEnum.List),
"s3GetFileStatus(%s) wants to know if a directory is empty but"
+ " does not request a list probe", path);
if (key.isEmpty() && !needEmptyDirectoryFlag) {
return new S3AFileStatus(Tristate.UNKNOWN, path, username);
}
if (!key.isEmpty() && !key.endsWith("/")
&& probes.contains(StatusProbeEnum.Head)) {
try {
// look for the simple file
ObjectMetadata meta = getObjectMetadata(key);
LOG.debug("Found exact file: normal file {}", key);
long contentLength = meta.getContentLength();
// check if CSE is enabled, then strip padded length.
if (isCSEEnabled
&& meta.getUserMetaDataOf(Headers.CRYPTO_CEK_ALGORITHM) != null
&& contentLength >= CSE_PADDING_LENGTH) {
contentLength -= CSE_PADDING_LENGTH;
}
return new S3AFileStatus(contentLength,
dateToLong(meta.getLastModified()),
path,
getDefaultBlockSize(path),
username,
meta.getETag(),
meta.getVersionId());
} catch (AmazonServiceException e) {
// if the response is a 404 error, it just means that there is
// no file at that path...the remaining checks will be needed.
// But: an empty bucket is also a 404, so check for that
// and fail.
if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
throw translateException("getFileStatus", path, e);
}
} catch (AmazonClientException e) {
throw translateException("getFileStatus", path, e);
}
}
// execute the list
if (probes.contains(StatusProbeEnum.List)) {
try {
// this will find a marker dir / as well as an entry.
// When making a simple "is this a dir check" all is good.
// but when looking for an empty dir, we need to verify there are no
// children, so ask for two entries, so as to find
// a child
String dirKey = maybeAddTrailingSlash(key);
// list size is dir marker + at least one entry
final int listSize = 2;
S3ListRequest request = createListObjectsRequest(dirKey, "/",
listSize);
// execute the request
S3ListResult listResult = listObjects(request,
getDurationTrackerFactory());
if (listResult.hasPrefixesOrObjects()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Found path as directory (with /)");
listResult.logAtDebug(LOG);
}
// At least one entry has been found.
// If looking for an empty directory, the marker must exist but no
// children.
// So the listing must contain the marker entry only.
if (needEmptyDirectoryFlag
&& listResult.representsEmptyDirectory(dirKey)) {
return new S3AFileStatus(Tristate.TRUE, path, username);
}
// either an empty directory is not needed, or the
// listing does not meet the requirements.
return new S3AFileStatus(Tristate.FALSE, path, username);
} else if (key.isEmpty()) {
LOG.debug("Found root directory");
return new S3AFileStatus(Tristate.TRUE, path, username);
}
} catch (AmazonServiceException e) {
if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) {
throw translateException("getFileStatus", path, e);
}
} catch (AmazonClientException e) {
throw translateException("getFileStatus", path, e);
}
}
LOG.debug("Not Found: {}", path);
throw new FileNotFoundException("No such file or directory: " + path);
}
/**
* Probe S3 for a file or dir existing, with the given probe set.
* Retry policy: retrying; translated.
* @param path qualified path to look for
* @param probes probes to make
* @return true if path exists in S3
* @throws IOException IO failure
*/
@Retries.RetryTranslated
private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
throws IOException {
String key = pathToKey(path);
try {
s3GetFileStatus(path, key, probes, false);
return true;
} catch (FileNotFoundException e) {
return false;
}
}
/**
* The src file is on the local disk. Add it to FS at
* the given dst name.
*
* This version doesn't need to create a temporary file to calculate the md5.
* Sadly this doesn't seem to be used by the shell cp :(
*
* delSrc indicates if the source should be removed
* @param delSrc whether to delete the src
* @param overwrite whether to overwrite an existing file
* @param src path
* @param dst path
* @throws IOException IO problem
* @throws FileAlreadyExistsException the destination file exists and
* overwrite==false
* @throws AmazonClientException failure in the AWS SDK
*/
@Override
@AuditEntryPoint
public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
Path dst) throws IOException {
checkNotClosed();
LOG.debug("Copying local file from {} to {}", src, dst);
trackDurationAndSpan(INVOCATION_COPY_FROM_LOCAL_FILE, dst,
() -> new CopyFromLocalOperation(
createStoreContext(),
src,
dst,
delSrc,
overwrite,
createCopyFromLocalCallbacks()).execute());
}
protected CopyFromLocalOperation.CopyFromLocalOperationCallbacks
createCopyFromLocalCallbacks() throws IOException {
LocalFileSystem local = getLocal(getConf());
return new CopyFromLocalCallbacksImpl(local);
}
protected final class CopyFromLocalCallbacksImpl implements
CopyFromLocalOperation.CopyFromLocalOperationCallbacks {
private final LocalFileSystem local;
private CopyFromLocalCallbacksImpl(LocalFileSystem local) {
this.local = local;
}
@Override
public RemoteIterator<LocatedFileStatus> listLocalStatusIterator(
final Path path) throws IOException {
return local.listLocatedStatus(path);
}
@Override
public File pathToLocalFile(Path path) {
return local.pathToFile(path);
}
@Override
public boolean deleteLocal(Path path, boolean recursive) throws IOException {
return local.delete(path, recursive);
}
@Override
public void copyLocalFileFromTo(File file, Path from, Path to) throws IOException {
trackDurationAndSpan(
OBJECT_PUT_REQUESTS,
to,
() -> {
final String key = pathToKey(to);
final ObjectMetadata om = newObjectMetadata(file.length());
Progressable progress = null;
PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, file);
S3AFileSystem.this.invoker.retry(
"putObject(" + "" + ")", to.toString(),
true,
() -> executePut(putObjectRequest, progress, putOptionsForPath(to)));
return null;
});
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
return S3AFileSystem.this.getFileStatus(f);
}
@Override
public boolean createEmptyDir(Path path, StoreContext storeContext)
throws IOException {
return trackDuration(getDurationTrackerFactory(),
INVOCATION_MKDIRS.getSymbol(),
new MkdirOperation(
storeContext,
path,
createMkdirOperationCallbacks(), false));
}
}
/**
* Execute a PUT via the transfer manager, blocking for completion.
* If the waiting for completion is interrupted, the upload will be
* aborted before an {@code InterruptedIOException} is thrown.
* @param putObjectRequest request
* @param progress optional progress callback
* @param putOptions put object options
* @return the upload result
* @throws InterruptedIOException if the blocking was interrupted.
*/
@Retries.OnceRaw("For PUT; post-PUT actions are RetrySwallowed")
UploadResult executePut(
final PutObjectRequest putObjectRequest,
final Progressable progress,
final PutObjectOptions putOptions)
throws InterruptedIOException {
String key = putObjectRequest.getKey();
long len = getPutRequestLength(putObjectRequest);
UploadInfo info = putObject(putObjectRequest);
Upload upload = info.getUpload();
ProgressableProgressListener listener = new ProgressableProgressListener(
this, key, upload, progress);
upload.addProgressListener(listener);
UploadResult result = waitForUploadCompletion(key, info);
listener.uploadCompleted();
// post-write actions
finishedWrite(key, len,
result.getETag(), result.getVersionId(), putOptions);
return result;
}
/**
* Wait for an upload to complete.
* If the waiting for completion is interrupted, the upload will be
* aborted before an {@code InterruptedIOException} is thrown.
* If the upload (or its result collection) failed, this is where
* the failure is raised as an AWS exception.
* Calls {@link #incrementPutCompletedStatistics(boolean, long)}
* to update the statistics.
* @param key destination key
* @param uploadInfo upload to wait for
* @return the upload result
* @throws InterruptedIOException if the blocking was interrupted.
*/
@Retries.OnceRaw
UploadResult waitForUploadCompletion(String key, UploadInfo uploadInfo)
throws InterruptedIOException {
Upload upload = uploadInfo.getUpload();
try {
UploadResult result = upload.waitForUploadResult();
incrementPutCompletedStatistics(true, uploadInfo.getLength());
return result;
} catch (InterruptedException e) {
LOG.info("Interrupted: aborting upload");
incrementPutCompletedStatistics(false, uploadInfo.getLength());
upload.abort();
throw (InterruptedIOException)
new InterruptedIOException("Interrupted in PUT to "
+ keyToQualifiedPath(key))
.initCause(e);
}
}
/**
* Close the filesystem. This shuts down all transfers.
* @throws IOException IO problem
*/
@Override
public void close() throws IOException {
if (closed.getAndSet(true)) {
// already closed
return;
}
isClosed = true;
LOG.debug("Filesystem {} is closed", uri);
if (getConf() != null) {
String iostatisticsLoggingLevel =
getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL,
IOSTATISTICS_LOGGING_LEVEL_DEFAULT);
logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics());
}
try {
super.close();
} finally {
stopAllServices();
}
// Log IOStatistics at debug.
if (LOG.isDebugEnabled()) {
// robust extract and convert to string
LOG.debug("Statistics for {}: {}", uri,
IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics()));
}
}
/**
* Stop all services.
* This is invoked in close() and during failures of initialize()
* -make sure that all operations here are robust to failures in
* both the expected state of this FS and of failures while being stopped.
*/
protected synchronized void stopAllServices() {
// shutting down the transfer manager also shuts
// down the S3 client it is bonded to.
if (transfers != null) {
try {
transfers.shutdownNow(true);
} catch (RuntimeException e) {
// catch and swallow for resilience.
LOG.debug("When shutting down", e);
}
transfers = null;
}
// At this point the S3A client is shut down,
// now the executor pools are closed
HadoopExecutors.shutdown(boundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
boundedThreadPool = null;
HadoopExecutors.shutdown(unboundedThreadPool, LOG,
THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
unboundedThreadPool = null;
// other services are shutdown.
cleanupWithLogger(LOG,
instrumentation,
delegationTokens.orElse(null),
signerManager,
auditManager);
closeAutocloseables(LOG, credentials);
delegationTokens = Optional.empty();
signerManager = null;
credentials = null;
}
/**
* Verify that the input stream is open. Non blocking; this gives
* the last state of the volatile {@link #closed} field.
* @throws IOException if the connection is closed.
*/
private void checkNotClosed() throws IOException {
if (isClosed) {
throw new IOException(uri + ": " + E_FS_CLOSED);
}
}
/**
* Get the delegation token support for this filesystem;
* not null iff delegation support is enabled.
* @return the token support, or an empty option.
*/
@VisibleForTesting
public Optional<S3ADelegationTokens> getDelegationTokens() {
return delegationTokens;
}
/**
* Return a service name iff delegation tokens are enabled and the
* token binding is issuing delegation tokens.
* @return the canonical service name or null
*/
@Override
public String getCanonicalServiceName() {
// this could all be done in map statements, but it'd be harder to
// understand and maintain.
// Essentially: no DTs, no canonical service name.
if (!delegationTokens.isPresent()) {
return null;
}
// DTs present: ask the binding if it is willing to
// serve tokens (or fail noisily).
S3ADelegationTokens dt = delegationTokens.get();
return dt.getTokenIssuingPolicy() != NoTokensAvailable
? dt.getCanonicalServiceName()
: null;
}
/**
* Get a delegation token if the FS is set up for them.
* If the user already has a token, it is returned,
* <i>even if it has expired</i>.
* @param renewer the account name that is allowed to renew the token.
* @return the delegation token or null
* @throws IOException IO failure
*/
@Override
@AuditEntryPoint
public Token<AbstractS3ATokenIdentifier> getDelegationToken(String renewer)
throws IOException {
checkNotClosed();
LOG.debug("Delegation token requested");
if (delegationTokens.isPresent()) {
return trackDurationAndSpan(
INVOCATION_GET_DELEGATION_TOKEN, null, () ->
delegationTokens.get().getBoundOrNewDT(
encryptionSecrets,
(renewer != null ? new Text(renewer) : new Text())));
} else {
// Delegation token support is not set up
LOG.debug("Token support is not enabled");
return null;
}
}
/**
* Ask any DT plugin for any extra token issuers.
* These do not get told of the encryption secrets and can
* return any type of token.
* This allows DT plugins to issue extra tokens for
* ancillary services.
*/
@Override
public DelegationTokenIssuer[] getAdditionalTokenIssuers()
throws IOException {
checkNotClosed();
if (delegationTokens.isPresent()) {
return delegationTokens.get().getAdditionalTokenIssuers();
} else {
// Delegation token support is not set up
LOG.debug("Token support is not enabled");
return null;
}
}
/**
* Build the AWS policy for restricted access to the resources needed
* by this bucket.
* if needed, and KMS operations.
* @param access access level desired.
* @return a policy for use in roles
*/
@Override
@InterfaceAudience.Private
public List<RoleModel.Statement> listAWSPolicyRules(
final Set<AccessLevel> access) {
if (access.isEmpty()) {
return Collections.emptyList();
}
List<RoleModel.Statement> statements = new ArrayList<>(
allowS3Operations(bucket,
access.contains(AccessLevel.WRITE)
|| access.contains(AccessLevel.ADMIN)));
// no attempt is made to qualify KMS access; there's no
// way to predict read keys, and not worried about granting
// too much encryption access.
statements.add(STATEMENT_ALLOW_SSE_KMS_RW);
return statements;
}
/**
* Copy a single object in the bucket via a COPY operation.
* There's no update of metadata, directory markers, etc.
* Callers must implement.
* @param srcKey source object path
* @param dstKey destination object path
* @param size object size
* @param srcAttributes S3 attributes of the source object
* @param readContext the read context
* @return the result of the copy
* @throws InterruptedIOException the operation was interrupted
* @throws IOException Other IO problems
*/
@Retries.RetryTranslated
private CopyResult copyFile(String srcKey, String dstKey, long size,
S3ObjectAttributes srcAttributes, S3AReadOpContext readContext)
throws IOException, InterruptedIOException {
LOG.debug("copyFile {} -> {} ", srcKey, dstKey);
ProgressListener progressListener = progressEvent -> {
switch (progressEvent.getEventType()) {
case TRANSFER_PART_COMPLETED_EVENT:
incrementWriteOperations();
break;
default:
break;
}
};
ChangeTracker changeTracker = new ChangeTracker(
keyToQualifiedPath(srcKey).toString(),
changeDetectionPolicy,
readContext.getS3AStatisticsContext()
.newInputStreamStatistics()
.getChangeTrackerStatistics(),
srcAttributes);
String action = "copyFile(" + srcKey + ", " + dstKey + ")";
Invoker readInvoker = readContext.getReadInvoker();
ObjectMetadata srcom;
try {
srcom = once(action, srcKey,
() ->
getObjectMetadata(srcKey, changeTracker, readInvoker, "copy"));
} catch (FileNotFoundException e) {
// if rename fails at this point it means that the expected file was not
// found.
// This means the File was deleted since LIST enumerated it.
LOG.debug("getObjectMetadata({}) failed to find an expected file",
srcKey, e);
// We create an exception, but the text depends on the S3Guard state
throw new RemoteFileChangedException(
keyToQualifiedPath(srcKey).toString(),
action,
RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT,
e);
}
return readInvoker.retry(
action, srcKey,
true,
() -> {
CopyObjectRequest copyObjectRequest =
getRequestFactory().newCopyObjectRequest(srcKey, dstKey, srcom);
changeTracker.maybeApplyConstraint(copyObjectRequest);
incrementStatistic(OBJECT_COPY_REQUESTS);
Copy copy = transfers.copy(copyObjectRequest,
getAuditManager().createStateChangeListener());
copy.addProgressListener(progressListener);
CopyOutcome copyOutcome = CopyOutcome.waitForCopy(copy);
InterruptedException interruptedException =
copyOutcome.getInterruptedException();
if (interruptedException != null) {
// copy interrupted: convert to an IOException.
throw (IOException)new InterruptedIOException(
"Interrupted copying " + srcKey
+ " to " + dstKey + ", cancelling")
.initCause(interruptedException);
}
SdkBaseException awsException = copyOutcome.getAwsException();
if (awsException != null) {
changeTracker.processException(awsException, "copy");
throw awsException;
}
CopyResult result = copyOutcome.getCopyResult();
changeTracker.processResponse(result);
incrementWriteOperations();
instrumentation.filesCopied(1, size);
return result;
});
}
/**
* Initiate a multipart upload from the preconfigured request.
* Retry policy: none + untranslated.
* @param request request to initiate
* @return the result of the call
* @throws AmazonClientException on failures inside the AWS SDK
* @throws IOException Other IO problems
*/
@Retries.OnceRaw
InitiateMultipartUploadResult initiateMultipartUpload(
InitiateMultipartUploadRequest request) throws IOException {
LOG.debug("Initiate multipart upload to {}", request.getKey());
return trackDurationOfSupplier(getDurationTrackerFactory(),
OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(),
() -> getAmazonS3Client().initiateMultipartUpload(request));
}
/**
* Perform post-write actions.
* <p>
* This operation MUST be called after any PUT/multipart PUT completes
* successfully.
* <p>
* The actions include calling
* {@link #deleteUnnecessaryFakeDirectories(Path)}
* if directory markers are not being retained.
* @param key key written to
* @param length total length of file written
* @param eTag eTag of the written object
* @param versionId S3 object versionId of the written object
* @param putOptions put object options
*/
@InterfaceAudience.Private
@Retries.RetryExceptionsSwallowed
void finishedWrite(
String key,
long length,
String eTag,
String versionId,
PutObjectOptions putOptions) {
LOG.debug("Finished write to {}, len {}. etag {}, version {}",
key, length, eTag, versionId);
Preconditions.checkArgument(length >= 0, "content length is negative");
if (!putOptions.isKeepMarkers()) {
Path p = keyToQualifiedPath(key);
deleteUnnecessaryFakeDirectories(p.getParent());
}
}
/**
* Should we keep directory markers under the path being created
* by mkdir/file creation/rename?
* This is done if marker retention is enabled for the path,
* or it is under a magic path where we are saving IOPs
* knowing that all committers are on the same code version and
* therefore marker aware.
* @param path path to probe
* @return true if the markers MAY be retained,
* false if they MUST be deleted
*/
private boolean keepDirectoryMarkers(Path path) {
return directoryPolicy.keepDirectoryMarkers(path)
|| isUnderMagicCommitPath(path);
}
/**
* Should we keep directory markers under the path being created
* by mkdir/file creation/rename?
* See {@link #keepDirectoryMarkers(Path)} for the policy.
*
* @param path path to probe
* @return the options to use with the put request
*/
private PutObjectOptions putOptionsForPath(Path path) {
return keepDirectoryMarkers(path)
? PutObjectOptions.keepingDirs()
: PutObjectOptions.deletingDirs();
}
/**
* Delete mock parent directories which are no longer needed.
* Retry policy: retrying; exceptions swallowed.
* @param path path
*
*/
@Retries.RetryExceptionsSwallowed
private void deleteUnnecessaryFakeDirectories(Path path) {
List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>();
while (!path.isRoot()) {
String key = pathToKey(path);
key = (key.endsWith("/")) ? key : (key + "/");
LOG.trace("To delete unnecessary fake directory {} for {}", key, path);
keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key));
path = path.getParent();
}
try {
removeKeys(keysToRemove, true);
} catch(AmazonClientException | IOException e) {
instrumentation.errorIgnored();
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
for(DeleteObjectsRequest.KeyVersion kv : keysToRemove) {
sb.append(kv.getKey()).append(",");
}
LOG.debug("While deleting keys {} ", sb.toString(), e);
}
}
}
/**
* Create a fake directory, always ending in "/".
* Retry policy: retrying; translated.
* @param objectName name of directory object.
* @param putOptions put object options
* @throws IOException IO failure
*/
@Retries.RetryTranslated
private void createFakeDirectory(final String objectName,
final PutObjectOptions putOptions)
throws IOException {
createEmptyObject(objectName, putOptions);
}
/**
* Used to create an empty file that represents an empty directory.
* The policy for deleting parent dirs depends on the path, dir
* status and the putOptions value.
* Retry policy: retrying; translated.
* @param objectName object to create
* @param putOptions put object options
* @throws IOException IO failure
*/
@Retries.RetryTranslated
private void createEmptyObject(final String objectName, PutObjectOptions putOptions)
throws IOException {
invoker.retry("PUT 0-byte object ", objectName,
true, () ->
putObjectDirect(getRequestFactory()
.newDirectoryMarkerRequest(objectName), putOptions));
incrementPutProgressStatistics(objectName, 0);
instrumentation.directoryCreated();
}
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize I/O time.
*/
public long getDefaultBlockSize() {
return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE);
}
/**
* Get the directory marker policy of this filesystem.
* @return the marker policy.
*/
public DirectoryPolicy getDirectoryMarkerPolicy() {
return directoryPolicy;
}
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(
"S3AFileSystem{");
sb.append("uri=").append(uri);
sb.append(", workingDir=").append(workingDir);
sb.append(", partSize=").append(partSize);
sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete);
sb.append(", maxKeys=").append(maxKeys);
if (cannedACL != null) {
sb.append(", cannedACL=").append(cannedACL);
}
if (openFileHelper != null) {
sb.append(", ").append(openFileHelper);
}
if (getConf() != null) {
sb.append(", blockSize=").append(getDefaultBlockSize());
}
sb.append(", multiPartThreshold=").append(multiPartThreshold);
if (getS3EncryptionAlgorithm() != null) {
sb.append(", s3EncryptionAlgorithm='")
.append(getS3EncryptionAlgorithm())
.append('\'');
}
if (blockFactory != null) {
sb.append(", blockFactory=").append(blockFactory);
}
sb.append(", auditManager=").append(auditManager);
sb.append(", authoritativePath=").append(allowAuthoritativePaths);
sb.append(", useListV1=").append(useListV1);
if (committerIntegration != null) {
sb.append(", magicCommitter=").append(isMagicCommitEnabled());
}
sb.append(", boundedExecutor=").append(boundedThreadPool);
sb.append(", unboundedExecutor=").append(unboundedThreadPool);
sb.append(", credentials=").append(credentials);
sb.append(", delegation tokens=")
.append(delegationTokens.map(Objects::toString).orElse("disabled"));
sb.append(", ").append(directoryPolicy);
// if logging at debug, toString returns the entire IOStatistics set.
if (getInstrumentation() != null) {
sb.append(", instrumentation {")
.append(getInstrumentation().toString())
.append("}");
}
sb.append(", ClientSideEncryption=").append(isCSEEnabled);
if (accessPoint != null) {
sb.append(", arnForBucket=").append(accessPoint.getFullArn());
}
sb.append('}');
return sb.toString();
}
/**
* Get the partition size for multipart operations.
* @return the value as set during initialization
*/
public long getPartitionSize() {
return partSize;
}
/**
* Get the threshold for multipart files.
* @return the value as set during initialization
*/
public long getMultiPartThreshold() {
return multiPartThreshold;
}
/**
* Get the maximum key count.
* @return a value, valid after initialization
*/
int getMaxKeys() {
return maxKeys;
}
/**
* Is magic commit enabled?
* @return true if magic commit support is turned on.
*/
public boolean isMagicCommitEnabled() {
return committerIntegration.isMagicCommitEnabled();
}
/**
* Predicate: is a path a magic commit path?
* True if magic commit is enabled and the path qualifies as special,
* and is not a a .pending or .pendingset file,
* @param path path to examine
* @return true if writing a file to the path triggers a "magic" write.
*/
public boolean isMagicCommitPath(Path path) {
return committerIntegration.isMagicCommitPath(path);
}
/**
* Predicate: is a path under a magic commit path?
* True if magic commit is enabled and the path is under __magic,
* irrespective of file type.
* @param path path to examine
* @return true if the path is in a magic dir and the FS has magic writes enabled.
*/
private boolean isUnderMagicCommitPath(Path path) {
return committerIntegration.isUnderMagicPath(path);
}
/**
* Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
* Override superclass so as to disable symlink resolution as symlinks
* are not supported by S3A.
* {@inheritDoc}
*/
@Override
public FileStatus[] globStatus(Path pathPattern) throws IOException {
return globStatus(pathPattern, ACCEPT_ALL);
}
/**
* Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}.
* Override superclass so as to disable symlink resolution as symlinks
* are not supported by S3A.
*
* Although an AuditEntryPoint, the globber itself will talk do
* the filesystem through the filesystem API, so its operations will
* all appear part of separate operations.
* {@inheritDoc}
*/
@Override
@AuditEntryPoint
public FileStatus[] globStatus(
final Path pathPattern,
final PathFilter filter)
throws IOException {
return trackDurationAndSpan(
INVOCATION_GLOB_STATUS, pathPattern, () ->
Globber.createGlobber(this)
.withPathPattern(pathPattern)
.withPathFiltern(filter)
.withResolveSymlinks(false)
.build()
.glob());
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
@AuditEntryPoint
public boolean exists(Path f) throws IOException {
final Path path = qualify(f);
try {
trackDurationAndSpan(
INVOCATION_EXISTS, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.ALL));
return true;
} catch (FileNotFoundException e) {
return false;
}
}
/**
* Optimized probe for a path referencing a dir.
* Even though it is optimized to a single HEAD, applications
* should not over-use this method...it is all too common.
* {@inheritDoc}
*/
@Override
@AuditEntryPoint
@SuppressWarnings("deprecation")
public boolean isDirectory(Path f) throws IOException {
final Path path = qualify(f);
try {
return trackDurationAndSpan(
INVOCATION_IS_DIRECTORY, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.DIRECTORIES)
.isDirectory());
} catch (FileNotFoundException e) {
// not found or it is a file.
return false;
}
}
/**
* Optimized probe for a path referencing a file.
* Even though it is optimized to a single HEAD, applications
* should not over-use this method...it is all too common.
* {@inheritDoc}
*/
@Override
@AuditEntryPoint
@SuppressWarnings("deprecation")
public boolean isFile(Path f) throws IOException {
final Path path = qualify(f);
try {
return trackDurationAndSpan(INVOCATION_IS_FILE, path, () ->
innerGetFileStatus(path, false, StatusProbeEnum.HEAD_ONLY)
.isFile());
} catch (FileNotFoundException e) {
// not found or it is a dir.
return false;
}
}
/**
* When enabled, get the etag of a object at the path via HEAD request and
* return it as a checksum object.
* <ol>
* <li>If a tag has not changed, consider the object unchanged.</li>
* <li>Two tags being different does not imply the data is different.</li>
* </ol>
* Different S3 implementations may offer different guarantees.
*
* This check is (currently) only made if
* {@link Constants#ETAG_CHECKSUM_ENABLED} is set; turning it on
* has caused problems with Distcp (HADOOP-15273).
*
* @param f The file path
* @param length The length of the file range for checksum calculation
* @return The EtagChecksum or null if checksums are not enabled or supported.
* @throws IOException IO failure
* @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a>
*/
@Override
@Retries.RetryTranslated
@AuditEntryPoint
public EtagChecksum getFileChecksum(Path f, final long length)
throws IOException {
Preconditions.checkArgument(length >= 0);
final Path path = qualify(f);
if (getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
ETAG_CHECKSUM_ENABLED_DEFAULT)) {
return trackDurationAndSpan(INVOCATION_GET_FILE_CHECKSUM, path, () -> {
LOG.debug("getFileChecksum({})", path);
ObjectMetadata headers = getObjectMetadata(path, null,
invoker,
"getFileChecksum are");
String eTag = headers.getETag();
return eTag != null ? new EtagChecksum(eTag) : null;
});
} else {
// disabled
return null;
}
}
/**
* Get header processing support.
* @return a new header processing instance.
*/
private HeaderProcessing getHeaderProcessing() {
return new HeaderProcessing(createStoreContext(),
createHeaderProcessingCallbacks());
}
@Override
@AuditEntryPoint
public byte[] getXAttr(final Path path, final String name)
throws IOException {
checkNotClosed();
try (AuditSpan span = createSpan(
INVOCATION_XATTR_GET_NAMED.getSymbol(),
path.toString(), null)) {
return getHeaderProcessing().getXAttr(path, name);
}
}
@Override
@AuditEntryPoint
public Map<String, byte[]> getXAttrs(final Path path) throws IOException {
checkNotClosed();
try (AuditSpan span = createSpan(
INVOCATION_XATTR_GET_MAP.getSymbol(),
path.toString(), null)) {
return getHeaderProcessing().getXAttrs(path);
}
}
@Override
@AuditEntryPoint
public Map<String, byte[]> getXAttrs(final Path path,
final List<String> names)
throws IOException {
checkNotClosed();
try (AuditSpan span = createSpan(
INVOCATION_XATTR_GET_NAMED_MAP.getSymbol(),
path.toString(), null)) {
return getHeaderProcessing().getXAttrs(path, names);
}
}
@Override
@AuditEntryPoint
public List<String> listXAttrs(final Path path) throws IOException {
checkNotClosed();
try (AuditSpan span = createSpan(
INVOCATION_OP_XATTR_LIST.getSymbol(),
path.toString(), null)) {
return getHeaderProcessing().listXAttrs(path);
}
}
/**
* Create the callbacks.
* @return An implementation of the header processing
* callbacks.
*/
protected HeaderProcessing.HeaderProcessingCallbacks
createHeaderProcessingCallbacks() {
return new HeaderProcessingCallbacksImpl();
}
/**
* Operations needed for Header Processing.
*/
protected final class HeaderProcessingCallbacksImpl implements
HeaderProcessing.HeaderProcessingCallbacks {
@Override
public ObjectMetadata getObjectMetadata(final String key)
throws IOException {
return once("getObjectMetadata", key, () ->
S3AFileSystem.this.getObjectMetadata(key));
}
}
/**
* {@inheritDoc}.
*
* This implementation is optimized for S3, which can do a bulk listing
* off all entries under a path in one single operation. Thus there is
* no need to recursively walk the directory tree.
*
* Instead a {@link ListObjectsRequest} is created requesting a (windowed)
* listing of all entries under the given path. This is used to construct
* an {@code ObjectListingIterator} instance, iteratively returning the
* sequence of lists of elements under the path. This is then iterated
* over in a {@code FileStatusListingIterator}, which generates
* {@link S3AFileStatus} instances, one per listing entry.
* These are then translated into {@link LocatedFileStatus} instances.
*
* This is essentially a nested and wrapped set of iterators, with some
* generator classes.
* @param f a path
* @param recursive if the subdirectories need to be traversed recursively
*
* @return an iterator that traverses statuses of the files/directories
* in the given path
* @throws FileNotFoundException if {@code path} does not exist
* @throws IOException if any I/O error occurred
*/
@Override
@Retries.RetryTranslated
@AuditEntryPoint
public RemoteIterator<LocatedFileStatus> listFiles(Path f,
boolean recursive) throws FileNotFoundException, IOException {
final Path path = qualify(f);
return toLocatedFileStatusIterator(
trackDurationAndSpan(INVOCATION_LIST_FILES, path, () ->
innerListFiles(path, recursive,
new Listing.AcceptFilesOnly(path), null)));
}
/**
* Recursive List of files and empty directories.
* @param f path to list from
* @param recursive recursive?
* @return an iterator.
* @throws IOException failure
*/
@InterfaceAudience.Private
@Retries.RetryTranslated
@AuditEntryPoint
public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories(
Path f, boolean recursive) throws IOException {
final Path path = qualify(f);
return trackDurationAndSpan(INVOCATION_LIST_FILES, path, () ->
innerListFiles(path, recursive,
Listing.ACCEPT_ALL_BUT_S3N,
null));
}
/**
* List files under the path.
* <ol>
* <li>
* The optional {@code status} parameter will be used to skip the
* initial getFileStatus call.
* </li>
* </ol>
*
* @param f path
* @param recursive recursive listing?
* @param acceptor file status filter
* @param status optional status of path to list.
* @return an iterator over the listing.
* @throws IOException failure
*/
@Retries.RetryTranslated
private RemoteIterator<S3ALocatedFileStatus> innerListFiles(
final Path f,
final boolean recursive,
final Listing.FileStatusAcceptor acceptor,
final S3AFileStatus status) throws IOException {
Path path = qualify(f);
LOG.debug("listFiles({}, {})", path, recursive);
try {
// if a status was given and it is a file.
if (status != null && status.isFile()) {
// simple case: File
LOG.debug("Path is a file: {}", path);
return listing.createSingleStatusIterator(
toLocatedFileStatus(status));
}
// Assuming the path to be a directory
// do a bulk operation.
RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir =
listing.getListFilesAssumingDir(path,
recursive,
acceptor,
getActiveAuditSpan());
// If there are no list entries present, we
// fallback to file existence check as the path
// can be a file or empty directory.
if (!listFilesAssumingDir.hasNext()) {
// If file status was already passed, reuse it.
final S3AFileStatus fileStatus = status != null
? status
: innerGetFileStatus(path, false, StatusProbeEnum.ALL);
if (fileStatus.isFile()) {
return listing.createSingleStatusIterator(
toLocatedFileStatus(fileStatus));
}
}
// If we have reached here, it means either there are files
// in this directory or it is empty.
return listFilesAssumingDir;
} catch (AmazonClientException e) {
throw translateException("listFiles", path, e);
}
}
/**
* Override superclass so as to add statistic collection.
* {@inheritDoc}
*/
@Override
public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f)
throws FileNotFoundException, IOException {
return listLocatedStatus(f, ACCEPT_ALL);
}
/**
* {@inheritDoc}.
*
* S3 Optimized directory listing. The initial operation performs the
* first bulk listing; extra listings will take place
* when all the current set of results are used up.
* @param f a path
* @param filter a path filter
* @return an iterator that traverses statuses of the files/directories
* in the given path
* @throws FileNotFoundException if {@code path} does not exist
* @throws IOException if any I/O error occurred
*/
@Override
@Retries.OnceTranslated("s3guard not retrying")
@AuditEntryPoint
public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f,
final PathFilter filter)
throws FileNotFoundException, IOException {
Path path = qualify(f);
AuditSpan span = entryPoint(INVOCATION_LIST_LOCATED_STATUS, path);
LOG.debug("listLocatedStatus({}, {}", path, filter);
RemoteIterator<? extends LocatedFileStatus> iterator =
once("listLocatedStatus", path.toString(),
() -> {
// Assuming the path to be a directory,
// trigger a list call directly.
final RemoteIterator<S3ALocatedFileStatus>
locatedFileStatusIteratorForDir =
listing.getLocatedFileStatusIteratorForDir(path, filter,
span);
// If no listing is present then path might be a file.
if (!locatedFileStatusIteratorForDir.hasNext()) {
final S3AFileStatus fileStatus =
innerGetFileStatus(path, false, StatusProbeEnum.ALL);
if (fileStatus.isFile()) {
// simple case: File
LOG.debug("Path is a file");
return listing.createSingleStatusIterator(
filter.accept(path)
? toLocatedFileStatus(fileStatus)
: null);
}
}
// Either empty or non-empty directory.
return locatedFileStatusIteratorForDir;
});
return toLocatedFileStatusIterator(iterator);
}
/**
* Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance.
* @param status file status
* @return a located status with block locations set up from this FS.
* @throws IOException IO Problems.
*/
S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status)
throws IOException {
return new S3ALocatedFileStatus(status,
status.isFile() ?
getFileBlockLocations(status, 0, status.getLen())
: null);
}
/**
* List any pending multipart uploads whose keys begin with prefix, using
* an iterator that can handle an unlimited number of entries.
* See {@link #listMultipartUploads(String)} for a non-iterator version of
* this.
*
* @param prefix optional key prefix to search
* @return Iterator over multipart uploads.
* @throws IOException on failure
*/
@InterfaceAudience.Private
@Retries.RetryTranslated
@AuditEntryPoint
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
throws IOException {
// span is picked up retained in the listing.
return trackDurationAndSpan(MULTIPART_UPLOAD_LIST, prefix, null, () ->
MultipartUtils.listMultipartUploads(
createStoreContext(),
s3, prefix, maxKeys
));
}
/**
* Listing all multipart uploads; limited to the first few hundred.
* See {@link #listUploads(String)} for an iterator-based version that does
* not limit the number of entries returned.
* Retry policy: retry, translated.
* @return a listing of multipart uploads.
* @param prefix prefix to scan for, "" for none
* @throws IOException IO failure, including any uprated AmazonClientException
*/
@InterfaceAudience.Private
@Retries.RetryTranslated
public List<MultipartUpload> listMultipartUploads(String prefix)
throws IOException {
// add a trailing / if needed.
if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
prefix = prefix + "/";
}
String p = prefix;
return invoker.retry("listMultipartUploads", p, true, () -> {
ListMultipartUploadsRequest request = getRequestFactory()
.newListMultipartUploadsRequest(p);
return s3.listMultipartUploads(request).getMultipartUploads();
});
}
/**
* Abort a multipart upload.
* Retry policy: none.
* @param destKey destination key
* @param uploadId Upload ID
*/
@Retries.OnceRaw
void abortMultipartUpload(String destKey, String uploadId) {
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
getAmazonS3Client().abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequest(
destKey,
uploadId));
}
/**
* Abort a multipart upload.
* Retry policy: none.
* @param upload the listed upload to abort.
*/
@Retries.OnceRaw
void abortMultipartUpload(MultipartUpload upload) {
String destKey;
String uploadId;
destKey = upload.getKey();
uploadId = upload.getUploadId();
if (LOG.isInfoEnabled()) {
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
uploadId, destKey, upload.getInitiator(),
df.format(upload.getInitiated()));
}
getAmazonS3Client().abortMultipartUpload(
getRequestFactory().newAbortMultipartUploadRequest(
destKey,
uploadId));
}
/**
* Create a new instance of the committer statistics.
* @return a new committer statistics instance
*/
public CommitterStatistics newCommitterStatistics() {
return statisticsContext.newCommitterStatistics();
}
@SuppressWarnings("deprecation")
@Override
public boolean hasPathCapability(final Path path, final String capability)
throws IOException {
final Path p = makeQualified(path);
String cap = validatePathCapabilityArgs(p, capability);
switch (cap) {
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER:
case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER_OLD:
// capability depends on FS configuration
return isMagicCommitEnabled();
case SelectConstants.S3_SELECT_CAPABILITY:
// select is only supported if enabled and client side encryption is
// disabled.
return !isCSEEnabled && SelectBinding.isSelectEnabled(getConf());
case CommonPathCapabilities.FS_CHECKSUMS:
// capability depends on FS configuration
return getConf().getBoolean(ETAG_CHECKSUM_ENABLED,
ETAG_CHECKSUM_ENABLED_DEFAULT);
case CommonPathCapabilities.ABORTABLE_STREAM:
return true;
case CommonPathCapabilities.FS_MULTIPART_UPLOADER:
// client side encryption doesn't support multipart uploader.
return !isCSEEnabled;
// this client is safe to use with buckets
// containing directory markers anywhere in
// the hierarchy
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
return true;
// etags are avaialable in listings, but they
// are not consistent across renames.
// therefore, only availability is declared
case CommonPathCapabilities.ETAGS_AVAILABLE:
return true;
/*
* Marker policy capabilities are handed off.
*/
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE:
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE:
return getDirectoryMarkerPolicy().hasPathCapability(path, cap);
// keep for a magic path or if the policy retains it
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP:
return keepDirectoryMarkers(path);
// delete is the opposite of keep
case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE:
return !keepDirectoryMarkers(path);
// create file options
case FS_S3A_CREATE_PERFORMANCE:
case FS_S3A_CREATE_HEADER:
return true;
default:
return super.hasPathCapability(p, cap);
}
}
/**
* Return the capabilities of this filesystem instance.
*
* This has been supplanted by {@link #hasPathCapability(Path, String)}.
* @param capability string to query the stream support for.
* @return whether the FS instance has the capability.
*/
@Deprecated
@Override
public boolean hasCapability(String capability) {
try {
return hasPathCapability(new Path("/"), capability);
} catch (IOException ex) {
// should never happen, so log and downgrade.
LOG.debug("Ignoring exception on hasCapability({}})", capability, ex);
return false;
}
}
/**
* Get a shared copy of the AWS credentials, with its reference
* counter updated.
* Caller is required to call {@code close()} on this after
* they have finished using it.
* @param purpose what is this for? This is initially for logging
* @return a reference to shared credentials.
*/
public AWSCredentialProviderList shareCredentials(final String purpose) {
LOG.debug("Sharing credentials for: {}", purpose);
return credentials.share();
}
/**
* This is a proof of concept of a select API.
* @param source path to source data
* @param options request configuration from the builder.
* @param fileInformation any passed in information.
* @return the stream of the results
* @throws IOException IO failure
*/
@Retries.RetryTranslated
@AuditEntryPoint
private FSDataInputStream select(final Path source,
final Configuration options,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
requireSelectSupport(source);
final AuditSpan auditSpan = entryPoint(OBJECT_SELECT_REQUESTS, source);
final Path path = makeQualified(source);
String expression = fileInformation.getSql();
final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path,
fileInformation);
// readahead range can be dynamically set
S3ObjectAttributes objectAttributes = createObjectAttributes(
path, fileStatus);
ChangeDetectionPolicy changePolicy = fileInformation.getChangePolicy();
S3AReadOpContext readContext = createReadContext(
fileStatus,
auditSpan);
fileInformation.applyOptions(readContext);
if (changePolicy.getSource() != ChangeDetectionPolicy.Source.None
&& fileStatus.getEtag() != null) {
// if there is change detection, and the status includes at least an
// etag,
// check that the object metadata lines up with what is expected
// based on the object attributes (which may contain an eTag or
// versionId).
// This is because the select API doesn't offer this.
// (note: this is trouble for version checking as cannot force the old
// version in the final read; nor can we check the etag match)
ChangeTracker changeTracker =
new ChangeTracker(uri.toString(),
changePolicy,
readContext.getS3AStatisticsContext()
.newInputStreamStatistics()
.getChangeTrackerStatistics(),
objectAttributes);
// will retry internally if wrong version detected
Invoker readInvoker = readContext.getReadInvoker();
getObjectMetadata(path, changeTracker, readInvoker, "select");
}
// instantiate S3 Select support using the current span
// as the active span for operations.
SelectBinding selectBinding = new SelectBinding(
createWriteOperationHelper(auditSpan));
// build and execute the request
return selectBinding.select(
readContext,
expression,
options,
objectAttributes);
}
/**
* Verify the FS supports S3 Select.
* @param source source file.
* @throws UnsupportedOperationException if not.
*/
private void requireSelectSupport(final Path source) throws
UnsupportedOperationException {
if (!isCSEEnabled && !SelectBinding.isSelectEnabled(getConf())) {
throw new UnsupportedOperationException(
SelectConstants.SELECT_UNSUPPORTED);
}
}
/**
* Get the file status of the source file.
* If in the fileInformation parameter return that
* if not found, issue a HEAD request, looking for a
* file only.
* @param path path of the file to open
* @param fileInformation information on the file to open
* @return a file status
* @throws FileNotFoundException if a HEAD request found no file
* @throws IOException IO failure
*/
private S3AFileStatus extractOrFetchSimpleFileStatus(
final Path path,
final OpenFileSupport.OpenFileInformation fileInformation)
throws IOException {
S3AFileStatus fileStatus = fileInformation.getStatus();
if (fileStatus == null) {
// we check here for the passed in status
// being a directory
fileStatus = innerGetFileStatus(path, false,
StatusProbeEnum.HEAD_ONLY);
}
if (fileStatus.isDirectory()) {
throw new FileNotFoundException(path.toString() + " is a directory");
}
return fileStatus;
}
/**
* Initiate the open() or select() operation.
* This is invoked from both the FileSystem and FileContext APIs.
* It's declared as an audit entry point but the span creation is pushed
* down into the open/select methods it ultimately calls.
* @param rawPath path to the file
* @param parameters open file parameters from the builder.
* @return a future which will evaluate to the opened/selected file.
* @throws IOException failure to resolve the link.
* @throws PathIOException operation is a select request but S3 select is
* disabled
* @throws IllegalArgumentException unknown mandatory key
*/
@Override
@Retries.RetryTranslated
@AuditEntryPoint
public CompletableFuture<FSDataInputStream> openFileWithOptions(
final Path rawPath,
final OpenFileParameters parameters) throws IOException {
final Path path = qualify(rawPath);
OpenFileSupport.OpenFileInformation fileInformation =
openFileHelper.prepareToOpenFile(
path,
parameters,
getDefaultBlockSize());
CompletableFuture<FSDataInputStream> result = new CompletableFuture<>();
if (!fileInformation.isS3Select()) {
// normal path.
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result,
() -> executeOpen(path, fileInformation)));
} else {
// it is a select statement.
// fail fast if the operation is not available
requireSelectSupport(path);
// submit the query
unboundedThreadPool.submit(() ->
LambdaUtils.eval(result,
() -> select(path, parameters.getOptions(), fileInformation)));
}
return result;
}
@Override
@AuditEntryPoint
public S3AMultipartUploaderBuilder createMultipartUploader(
final Path basePath)
throws IOException {
if(isCSEEnabled) {
throw new UnsupportedOperationException("Multi-part uploader not "
+ "supported for Client side encryption.");
}
final Path path = makeQualified(basePath);
try (AuditSpan span = entryPoint(MULTIPART_UPLOAD_INSTANTIATED, path)) {
StoreContext ctx = createStoreContext();
return new S3AMultipartUploaderBuilder(this,
createWriteOperationHelper(span),
ctx,
path,
statisticsContext.createMultipartUploaderStatistics());
}
}
/**
* Build an immutable store context.
* If called while the FS is being initialized,
* some of the context will be incomplete.
* new store context instances should be created as appropriate.
* @return the store context of this FS.
*/
@InterfaceAudience.Private
public StoreContext createStoreContext() {
return new StoreContextBuilder().setFsURI(getUri())
.setBucket(getBucket())
.setConfiguration(getConf())
.setUsername(getUsername())
.setOwner(owner)
.setExecutor(boundedThreadPool)
.setExecutorCapacity(executorCapacity)
.setInvoker(invoker)
.setInstrumentation(statisticsContext)
.setStorageStatistics(getStorageStatistics())
.setInputPolicy(getInputPolicy())
.setChangeDetectionPolicy(changeDetectionPolicy)
.setMultiObjectDeleteEnabled(enableMultiObjectsDelete)
.setUseListV1(useListV1)
.setContextAccessors(new ContextAccessorsImpl())
.setAuditor(getAuditor())
.setEnableCSE(isCSEEnabled)
.build();
}
/**
* Create a marker tools operations binding for this store.
* Auditing:
* @param target target path
* @return callbacks for operations.
* @throws IOException if raised during span creation
*/
@AuditEntryPoint
@InterfaceAudience.Private
public MarkerToolOperations createMarkerToolOperations(final String target)
throws IOException {
createSpan("marker-tool-scan", target,
null);
return new MarkerToolOperationsImpl(new OperationCallbacksImpl());
}
/**
* This is purely for testing, as it force initializes all static
* initializers. See HADOOP-17385 for details.
*/
@InterfaceAudience.Private
public static void initializeClass() {
LOG.debug("Initialize S3A class");
}
/**
* The implementation of context accessors.
*/
private class ContextAccessorsImpl implements ContextAccessors {
@Override
public Path keyToPath(final String key) {
return keyToQualifiedPath(key);
}
@Override
public String pathToKey(final Path path) {
return S3AFileSystem.this.pathToKey(path);
}
@Override
public File createTempFile(final String prefix, final long size)
throws IOException {
return createTmpFileForWrite(prefix, size, getConf());
}
@Override
public String getBucketLocation() throws IOException {
return S3AFileSystem.this.getBucketLocation();
}
@Override
public Path makeQualified(final Path path) {
return S3AFileSystem.this.makeQualified(path);
}
@Override
public AuditSpan getActiveAuditSpan() {
return S3AFileSystem.this.getActiveAuditSpan();
}
@Override
public RequestFactory getRequestFactory() {
return S3AFileSystem.this.getRequestFactory();
}
}
/**
* a method to know if Client side encryption is enabled or not.
* @return a boolean stating if CSE is enabled.
*/
public boolean isCSEEnabled() {
return isCSEEnabled;
}
}