| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.InterruptedIOException; |
| import java.net.URI; |
| import java.nio.file.AccessDeniedException; |
| import java.text.DateFormat; |
| import java.text.SimpleDateFormat; |
| import java.time.Instant; |
| import java.time.OffsetDateTime; |
| import java.time.ZoneOffset; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.EnumSet; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.Objects; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.ThreadPoolExecutor; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import javax.annotation.Nullable; |
| |
| import com.amazonaws.AmazonClientException; |
| import com.amazonaws.AmazonServiceException; |
| import com.amazonaws.SdkBaseException; |
| import com.amazonaws.services.s3.AmazonS3; |
| import com.amazonaws.services.s3.model.AbortMultipartUploadRequest; |
| import com.amazonaws.services.s3.model.CannedAccessControlList; |
| import com.amazonaws.services.s3.model.CopyObjectRequest; |
| import com.amazonaws.services.s3.model.DeleteObjectsRequest; |
| import com.amazonaws.services.s3.model.DeleteObjectsResult; |
| import com.amazonaws.services.s3.model.GetObjectMetadataRequest; |
| import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; |
| import com.amazonaws.services.s3.model.InitiateMultipartUploadResult; |
| import com.amazonaws.services.s3.model.ListMultipartUploadsRequest; |
| import com.amazonaws.services.s3.model.ListObjectsRequest; |
| import com.amazonaws.services.s3.model.ListObjectsV2Request; |
| import com.amazonaws.services.s3.model.MultiObjectDeleteException; |
| import com.amazonaws.services.s3.model.MultipartUpload; |
| import com.amazonaws.services.s3.model.ObjectMetadata; |
| import com.amazonaws.services.s3.model.PutObjectRequest; |
| import com.amazonaws.services.s3.model.PutObjectResult; |
| |
| import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; |
| import com.amazonaws.services.s3.model.SSECustomerKey; |
| import com.amazonaws.services.s3.model.UploadPartRequest; |
| import com.amazonaws.services.s3.model.UploadPartResult; |
| import com.amazonaws.services.s3.transfer.Copy; |
| import com.amazonaws.services.s3.transfer.TransferManager; |
| import com.amazonaws.services.s3.transfer.TransferManagerConfiguration; |
| import com.amazonaws.services.s3.transfer.Upload; |
| import com.amazonaws.services.s3.transfer.model.CopyResult; |
| import com.amazonaws.services.s3.transfer.model.UploadResult; |
| import com.amazonaws.event.ProgressListener; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.commons.lang3.tuple.Triple; |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.CommonPathCapabilities; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.Globber; |
| import org.apache.hadoop.fs.impl.OpenFileParameters; |
| import org.apache.hadoop.fs.s3a.auth.SignerManager; |
| import org.apache.hadoop.fs.s3a.auth.delegation.DelegationOperations; |
| import org.apache.hadoop.fs.s3a.auth.delegation.DelegationTokenProvider; |
| import org.apache.hadoop.fs.s3a.impl.BulkDeleteRetryHandler; |
| import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; |
| import org.apache.hadoop.fs.s3a.impl.ContextAccessors; |
| import org.apache.hadoop.fs.s3a.impl.CopyOutcome; |
| import org.apache.hadoop.fs.s3a.impl.DeleteOperation; |
| import org.apache.hadoop.fs.s3a.impl.DirectoryPolicy; |
| import org.apache.hadoop.fs.s3a.impl.DirectoryPolicyImpl; |
| import org.apache.hadoop.fs.s3a.impl.InternalConstants; |
| import org.apache.hadoop.fs.s3a.impl.ListingOperationCallbacks; |
| import org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport; |
| import org.apache.hadoop.fs.s3a.impl.OperationCallbacks; |
| import org.apache.hadoop.fs.s3a.impl.RenameOperation; |
| import org.apache.hadoop.fs.s3a.impl.S3AMultipartUploaderBuilder; |
| import org.apache.hadoop.fs.s3a.impl.StatusProbeEnum; |
| import org.apache.hadoop.fs.s3a.impl.StoreContext; |
| import org.apache.hadoop.fs.s3a.impl.StoreContextBuilder; |
| import org.apache.hadoop.fs.s3a.impl.statistics.S3AMultipartUploaderStatisticsImpl; |
| import org.apache.hadoop.fs.s3a.s3guard.BulkOperationState; |
| import org.apache.hadoop.fs.s3a.select.InternalSelectConstants; |
| import org.apache.hadoop.fs.s3a.tools.MarkerToolOperations; |
| import org.apache.hadoop.fs.s3a.tools.MarkerToolOperationsImpl; |
| import org.apache.hadoop.io.IOUtils; |
| import org.apache.hadoop.io.Text; |
| import org.apache.hadoop.security.token.DelegationTokenIssuer; |
| import org.apache.hadoop.security.token.TokenIdentifier; |
| import org.apache.hadoop.util.DurationInfo; |
| import org.apache.hadoop.util.LambdaUtils; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.GlobalStorageStatistics; |
| import org.apache.hadoop.fs.InvalidRequestException; |
| import org.apache.hadoop.fs.LocalDirAllocator; |
| import org.apache.hadoop.fs.LocalFileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.fs.PathIOException; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.StreamCapabilities; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.fs.s3a.auth.RoleModel; |
| import org.apache.hadoop.fs.s3a.auth.delegation.AWSPolicyProvider; |
| import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecretOperations; |
| import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; |
| import org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens; |
| import org.apache.hadoop.fs.s3a.auth.delegation.AbstractS3ATokenIdentifier; |
| import org.apache.hadoop.fs.s3a.commit.CommitConstants; |
| import org.apache.hadoop.fs.s3a.commit.PutTracker; |
| import org.apache.hadoop.fs.s3a.commit.MagicCommitIntegration; |
| import org.apache.hadoop.fs.s3a.impl.ChangeTracker; |
| import org.apache.hadoop.fs.s3a.select.SelectBinding; |
| import org.apache.hadoop.fs.s3a.select.SelectConstants; |
| import org.apache.hadoop.fs.s3a.s3guard.DirListingMetadata; |
| import org.apache.hadoop.fs.s3a.s3guard.MetadataStore; |
| import org.apache.hadoop.fs.s3a.s3guard.PathMetadata; |
| import org.apache.hadoop.fs.s3a.s3guard.S3Guard; |
| import org.apache.hadoop.fs.s3a.s3guard.ITtlTimeProvider; |
| import org.apache.hadoop.fs.s3native.S3xLoginHelper; |
| import org.apache.hadoop.io.retry.RetryPolicies; |
| import org.apache.hadoop.fs.store.EtagChecksum; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.BlockingThreadPoolExecutorService; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.ReflectionUtils; |
| import org.apache.hadoop.util.SemaphoredDelegatingExecutor; |
| import org.apache.hadoop.util.concurrent.HadoopExecutors; |
| |
| import static org.apache.hadoop.fs.impl.AbstractFSBuilderImpl.rejectUnknownMandatoryKeys; |
| import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs; |
| import static org.apache.hadoop.fs.s3a.Constants.*; |
| import static org.apache.hadoop.fs.s3a.Invoker.*; |
| import static org.apache.hadoop.fs.s3a.S3AUtils.*; |
| import static org.apache.hadoop.fs.s3a.Statistic.*; |
| import static org.apache.commons.lang3.StringUtils.isNotEmpty; |
| import static org.apache.hadoop.fs.s3a.auth.RolePolicies.STATEMENT_ALLOW_SSE_KMS_RW; |
| import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations; |
| import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable; |
| import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding; |
| import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit; |
| import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions; |
| import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket; |
| import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404; |
| import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.fixBucketRegion; |
| import static org.apache.hadoop.fs.s3a.impl.NetworkBinding.logDnsLookup; |
| import static org.apache.hadoop.fs.s3a.s3guard.S3Guard.dirMetaToStatuses; |
| import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; |
| |
| /** |
| * The core S3A Filesystem implementation. |
| * |
| * This subclass is marked as private as code should not be creating it |
| * directly; use {@link FileSystem#get(Configuration)} and variants to |
| * create one. |
| * |
| * If cast to {@code S3AFileSystem}, extra methods and features may be accessed. |
| * Consider those private and unstable. |
| * |
| * Because it prints some of the state of the instrumentation, |
| * the output of {@link #toString()} must also be considered unstable. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Evolving |
| public class S3AFileSystem extends FileSystem implements StreamCapabilities, |
| AWSPolicyProvider, DelegationTokenProvider { |
| /** |
| * Default blocksize as used in blocksize and FS status queries. |
| */ |
| public static final int DEFAULT_BLOCKSIZE = 32 * 1024 * 1024; |
| |
| /** |
| * This declared delete as idempotent. |
| * This is an "interesting" topic in past Hadoop FS work. |
| * Essentially: with a single caller, DELETE is idempotent |
| * but in a shared filesystem, it is is very much not so. |
| * Here, on the basis that isn't a filesystem with consistency guarantees, |
| * retryable results in files being deleted. |
| */ |
| public static final boolean DELETE_CONSIDERED_IDEMPOTENT = true; |
| |
| private URI uri; |
| private Path workingDir; |
| private String username; |
| private AmazonS3 s3; |
| // initial callback policy is fail-once; it's there just to assist |
| // some mock tests and other codepaths trying to call the low level |
| // APIs on an uninitialized filesystem. |
| private Invoker invoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, |
| Invoker.LOG_EVENT); |
| // Only used for very specific code paths which behave differently for |
| // S3Guard. Retries FileNotFound, so be careful if you use this. |
| private Invoker s3guardInvoker = new Invoker(RetryPolicies.TRY_ONCE_THEN_FAIL, |
| Invoker.LOG_EVENT); |
| private final Retried onRetry = this::operationRetried; |
| private String bucket; |
| private int maxKeys; |
| private Listing listing; |
| private long partSize; |
| private boolean enableMultiObjectsDelete; |
| private TransferManager transfers; |
| private ListeningExecutorService boundedThreadPool; |
| private ThreadPoolExecutor unboundedThreadPool; |
| private int executorCapacity; |
| private long multiPartThreshold; |
| public static final Logger LOG = LoggerFactory.getLogger(S3AFileSystem.class); |
| private static final Logger PROGRESS = |
| LoggerFactory.getLogger("org.apache.hadoop.fs.s3a.S3AFileSystem.Progress"); |
| private LocalDirAllocator directoryAllocator; |
| private CannedAccessControlList cannedACL; |
| private boolean failOnMetadataWriteError; |
| |
| /** |
| * This must never be null; until initialized it just declares that there |
| * is no encryption. |
| */ |
| private EncryptionSecrets encryptionSecrets = new EncryptionSecrets(); |
| private S3AInstrumentation instrumentation; |
| private final S3AStorageStatistics storageStatistics = |
| createStorageStatistics(); |
| private long readAhead; |
| private S3AInputPolicy inputPolicy; |
| private ChangeDetectionPolicy changeDetectionPolicy; |
| private final AtomicBoolean closed = new AtomicBoolean(false); |
| private volatile boolean isClosed = false; |
| private MetadataStore metadataStore; |
| private boolean allowAuthoritativeMetadataStore; |
| private Collection<String> allowAuthoritativePaths; |
| |
| /** Delegation token integration; non-empty when DT support is enabled. */ |
| private Optional<S3ADelegationTokens> delegationTokens = Optional.empty(); |
| |
| /** Principal who created the FS; recorded during initialization. */ |
| private UserGroupInformation owner; |
| |
| private String blockOutputBuffer; |
| private S3ADataBlocks.BlockFactory blockFactory; |
| private int blockOutputActiveBlocks; |
| private WriteOperationHelper writeHelper; |
| private SelectBinding selectBinding; |
| private boolean useListV1; |
| private MagicCommitIntegration committerIntegration; |
| |
| private AWSCredentialProviderList credentials; |
| private SignerManager signerManager; |
| |
| private ITtlTimeProvider ttlTimeProvider; |
| |
| /** |
| * Page size for deletions. |
| */ |
| private int pageSize; |
| |
| /** |
| * Specific operations used by rename and delete operations. |
| */ |
| private final S3AFileSystem.OperationCallbacksImpl |
| operationCallbacks = new OperationCallbacksImpl(); |
| |
| private final ListingOperationCallbacks listingOperationCallbacks = |
| new ListingOperationCallbacksImpl(); |
| /** |
| * Directory policy. |
| */ |
| private DirectoryPolicy directoryPolicy; |
| |
| /** |
| * Context accessors for re-use. |
| */ |
| private final ContextAccessors contextAccessors = new ContextAccessorsImpl(); |
| |
| /** Add any deprecated keys. */ |
| @SuppressWarnings("deprecation") |
| private static void addDeprecatedKeys() { |
| // this is retained as a placeholder for when new deprecated keys |
| // need to be added. |
| Configuration.DeprecationDelta[] deltas = { |
| }; |
| |
| if (deltas.length > 0) { |
| Configuration.addDeprecations(deltas); |
| Configuration.reloadExistingConfigurations(); |
| } |
| } |
| |
| static { |
| addDeprecatedKeys(); |
| } |
| |
| /** Called after a new FileSystem instance is constructed. |
| * @param name a uri whose authority section names the host, port, etc. |
| * for this FileSystem |
| * @param originalConf the configuration to use for the FS. The |
| * bucket-specific options are patched over the base ones before any use is |
| * made of the config. |
| */ |
| public void initialize(URI name, Configuration originalConf) |
| throws IOException { |
| // get the host; this is guaranteed to be non-null, non-empty |
| bucket = name.getHost(); |
| try { |
| LOG.debug("Initializing S3AFileSystem for {}", bucket); |
| // clone the configuration into one with propagated bucket options |
| Configuration conf = propagateBucketOptions(originalConf, bucket); |
| // patch the Hadoop security providers |
| patchSecurityCredentialProviders(conf); |
| // look for delegation token support early. |
| boolean delegationTokensEnabled = hasDelegationTokenBinding(conf); |
| if (delegationTokensEnabled) { |
| LOG.debug("Using delegation tokens"); |
| } |
| // set the URI, this will do any fixup of the URI to remove secrets, |
| // canonicalize. |
| setUri(name, delegationTokensEnabled); |
| super.initialize(uri, conf); |
| setConf(conf); |
| |
| // look for encryption data |
| // DT Bindings may override this |
| setEncryptionSecrets(new EncryptionSecrets( |
| getEncryptionAlgorithm(bucket, conf), |
| getServerSideEncryptionKey(bucket, getConf()))); |
| |
| invoker = new Invoker(new S3ARetryPolicy(getConf()), onRetry); |
| instrumentation = new S3AInstrumentation(uri); |
| |
| // Username is the current user at the time the FS was instantiated. |
| owner = UserGroupInformation.getCurrentUser(); |
| username = owner.getShortUserName(); |
| workingDir = new Path("/user", username) |
| .makeQualified(this.uri, this.getWorkingDirectory()); |
| |
| s3guardInvoker = new Invoker(new S3GuardExistsRetryPolicy(getConf()), |
| onRetry); |
| writeHelper = new WriteOperationHelper(this, getConf()); |
| |
| failOnMetadataWriteError = conf.getBoolean(FAIL_ON_METADATA_WRITE_ERROR, |
| FAIL_ON_METADATA_WRITE_ERROR_DEFAULT); |
| |
| maxKeys = intOption(conf, MAX_PAGING_KEYS, DEFAULT_MAX_PAGING_KEYS, 1); |
| partSize = getMultipartSizeProperty(conf, |
| MULTIPART_SIZE, DEFAULT_MULTIPART_SIZE); |
| multiPartThreshold = getMultipartSizeProperty(conf, |
| MIN_MULTIPART_THRESHOLD, DEFAULT_MIN_MULTIPART_THRESHOLD); |
| |
| //check but do not store the block size |
| longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1); |
| enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true); |
| |
| readAhead = longBytesOption(conf, READAHEAD_RANGE, |
| DEFAULT_READAHEAD_RANGE, 0); |
| |
| initThreadPools(conf); |
| |
| int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION); |
| if (listVersion < 1 || listVersion > 2) { |
| LOG.warn("Configured fs.s3a.list.version {} is invalid, forcing " + |
| "version 2", listVersion); |
| } |
| useListV1 = (listVersion == 1); |
| |
| signerManager = new SignerManager(bucket, this, conf, owner); |
| signerManager.initCustomSigners(); |
| |
| // creates the AWS client, including overriding auth chain if |
| // the FS came with a DT |
| // this may do some patching of the configuration (e.g. setting |
| // the encryption algorithms) |
| bindAWSClient(name, delegationTokensEnabled); |
| |
| initTransferManager(); |
| |
| initCannedAcls(conf); |
| |
| // This initiates a probe against S3 for the bucket existing. |
| doBucketProbing(); |
| |
| inputPolicy = S3AInputPolicy.getPolicy( |
| conf.getTrimmed(INPUT_FADVISE, INPUT_FADV_NORMAL)); |
| LOG.debug("Input fadvise policy = {}", inputPolicy); |
| changeDetectionPolicy = ChangeDetectionPolicy.getPolicy(conf); |
| LOG.debug("Change detection policy = {}", changeDetectionPolicy); |
| boolean magicCommitterEnabled = conf.getBoolean( |
| CommitConstants.MAGIC_COMMITTER_ENABLED, |
| CommitConstants.DEFAULT_MAGIC_COMMITTER_ENABLED); |
| LOG.debug("Filesystem support for magic committers {} enabled", |
| magicCommitterEnabled ? "is" : "is not"); |
| committerIntegration = new MagicCommitIntegration( |
| this, magicCommitterEnabled); |
| |
| // instantiate S3 Select support |
| selectBinding = new SelectBinding(writeHelper); |
| |
| boolean blockUploadEnabled = conf.getBoolean(FAST_UPLOAD, true); |
| |
| if (!blockUploadEnabled) { |
| LOG.warn("The \"slow\" output stream is no longer supported"); |
| } |
| blockOutputBuffer = conf.getTrimmed(FAST_UPLOAD_BUFFER, |
| DEFAULT_FAST_UPLOAD_BUFFER); |
| partSize = ensureOutputParameterInRange(MULTIPART_SIZE, partSize); |
| blockFactory = S3ADataBlocks.createFactory(this, blockOutputBuffer); |
| blockOutputActiveBlocks = intOption(conf, |
| FAST_UPLOAD_ACTIVE_BLOCKS, DEFAULT_FAST_UPLOAD_ACTIVE_BLOCKS, 1); |
| LOG.debug("Using S3ABlockOutputStream with buffer = {}; block={};" + |
| " queue limit={}", |
| blockOutputBuffer, partSize, blockOutputActiveBlocks); |
| long authDirTtl = conf.getTimeDuration(METADATASTORE_METADATA_TTL, |
| DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS); |
| ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl); |
| |
| setMetadataStore(S3Guard.getMetadataStore(this, ttlTimeProvider)); |
| allowAuthoritativeMetadataStore = conf.getBoolean(METADATASTORE_AUTHORITATIVE, |
| DEFAULT_METADATASTORE_AUTHORITATIVE); |
| allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this); |
| |
| if (hasMetadataStore()) { |
| LOG.debug("Using metadata store {}, authoritative store={}, authoritative path={}", |
| getMetadataStore(), allowAuthoritativeMetadataStore, allowAuthoritativePaths); |
| } |
| |
| // LOG if S3Guard is disabled on the warn level set in config |
| if (!hasMetadataStore()) { |
| String warnLevel = conf.getTrimmed(S3GUARD_DISABLED_WARN_LEVEL, |
| DEFAULT_S3GUARD_DISABLED_WARN_LEVEL); |
| S3Guard.logS3GuardDisabled(LOG, warnLevel, bucket); |
| } |
| // directory policy, which may look at authoritative paths |
| directoryPolicy = DirectoryPolicyImpl.getDirectoryPolicy(conf, |
| this::allowAuthoritative); |
| LOG.debug("Directory marker retention policy is {}", directoryPolicy); |
| |
| initMultipartUploads(conf); |
| |
| pageSize = intOption(getConf(), BULK_DELETE_PAGE_SIZE, |
| BULK_DELETE_PAGE_SIZE_DEFAULT, 0); |
| listing = new Listing(listingOperationCallbacks, createStoreContext()); |
| } catch (AmazonClientException e) { |
| // amazon client exception: stop all services then throw the translation |
| stopAllServices(); |
| throw translateException("initializing ", new Path(name), e); |
| } catch (IOException | RuntimeException e) { |
| // other exceptions: stop the services. |
| stopAllServices(); |
| throw e; |
| } |
| |
| } |
| |
| /** |
| * Test bucket existence in S3. |
| * When the value of {@link Constants#S3A_BUCKET_PROBE} is set to 0, |
| * bucket existence check is not done to improve performance of |
| * S3AFileSystem initialization. When set to 1 or 2, bucket existence check |
| * will be performed which is potentially slow. |
| * If 3 or higher: warn and use the v2 check. |
| * Also logging DNS address of the s3 endpoint if the bucket probe value is |
| * greater than 0 else skipping it for increased performance. |
| * @throws UnknownStoreException the bucket is absent |
| * @throws IOException any other problem talking to S3 |
| */ |
| @Retries.RetryTranslated |
| private void doBucketProbing() throws IOException { |
| int bucketProbe = getConf() |
| .getInt(S3A_BUCKET_PROBE, S3A_BUCKET_PROBE_DEFAULT); |
| Preconditions.checkArgument(bucketProbe >= 0, |
| "Value of " + S3A_BUCKET_PROBE + " should be >= 0"); |
| switch (bucketProbe) { |
| case 0: |
| LOG.debug("skipping check for bucket existence"); |
| break; |
| case 1: |
| logDnsLookup(getConf()); |
| verifyBucketExists(); |
| break; |
| case 2: |
| logDnsLookup(getConf()); |
| verifyBucketExistsV2(); |
| break; |
| default: |
| // we have no idea what this is, assume it is from a later release. |
| LOG.warn("Unknown bucket probe option {}: {}; falling back to check #2", |
| S3A_BUCKET_PROBE, bucketProbe); |
| verifyBucketExistsV2(); |
| break; |
| } |
| } |
| |
| /** |
| * Initialize the thread pool. |
| * This must be re-invoked after replacing the S3Client during test |
| * runs. |
| * @param conf configuration. |
| */ |
| private void initThreadPools(Configuration conf) { |
| int maxThreads = conf.getInt(MAX_THREADS, DEFAULT_MAX_THREADS); |
| if (maxThreads < 2) { |
| LOG.warn(MAX_THREADS + " must be at least 2: forcing to 2."); |
| maxThreads = 2; |
| } |
| int totalTasks = intOption(conf, |
| MAX_TOTAL_TASKS, DEFAULT_MAX_TOTAL_TASKS, 1); |
| long keepAliveTime = longOption(conf, KEEPALIVE_TIME, |
| DEFAULT_KEEPALIVE_TIME, 0); |
| boundedThreadPool = BlockingThreadPoolExecutorService.newInstance( |
| maxThreads, |
| maxThreads + totalTasks, |
| keepAliveTime, TimeUnit.SECONDS, |
| "s3a-transfer-shared"); |
| unboundedThreadPool = new ThreadPoolExecutor( |
| maxThreads, Integer.MAX_VALUE, |
| keepAliveTime, TimeUnit.SECONDS, |
| new LinkedBlockingQueue<>(), |
| BlockingThreadPoolExecutorService.newDaemonThreadFactory( |
| "s3a-transfer-unbounded")); |
| unboundedThreadPool.allowCoreThreadTimeOut(true); |
| executorCapacity = intOption(conf, |
| EXECUTOR_CAPACITY, DEFAULT_EXECUTOR_CAPACITY, 1); |
| } |
| |
| /** |
| * Create the storage statistics or bind to an existing one. |
| * @return a storage statistics instance. |
| */ |
| protected static S3AStorageStatistics createStorageStatistics() { |
| return (S3AStorageStatistics) |
| GlobalStorageStatistics.INSTANCE |
| .put(S3AStorageStatistics.NAME, |
| () -> new S3AStorageStatistics()); |
| } |
| |
| /** |
| * Verify that the bucket exists. This does not check permissions, |
| * not even read access. |
| * Retry policy: retrying, translated. |
| * @throws UnknownStoreException the bucket is absent |
| * @throws IOException any other problem talking to S3 |
| */ |
| @Retries.RetryTranslated |
| protected void verifyBucketExists() |
| throws UnknownStoreException, IOException { |
| if (!invoker.retry("doesBucketExist", bucket, true, |
| () -> s3.doesBucketExist(bucket))) { |
| throw new UnknownStoreException("Bucket " + bucket + " does not exist"); |
| } |
| } |
| |
| /** |
| * Verify that the bucket exists. This will correctly throw an exception |
| * when credentials are invalid. |
| * Retry policy: retrying, translated. |
| * @throws UnknownStoreException the bucket is absent |
| * @throws IOException any other problem talking to S3 |
| */ |
| @Retries.RetryTranslated |
| protected void verifyBucketExistsV2() |
| throws UnknownStoreException, IOException { |
| if (!invoker.retry("doesBucketExistV2", bucket, true, |
| () -> s3.doesBucketExistV2(bucket))) { |
| throw new UnknownStoreException("Bucket " + bucket + " does not exist"); |
| } |
| } |
| |
| /** |
| * Get S3A Instrumentation. For test purposes. |
| * @return this instance's instrumentation. |
| */ |
| public S3AInstrumentation getInstrumentation() { |
| return instrumentation; |
| } |
| |
| /** |
| * Get current listing instance. |
| * @return this instance's listing. |
| */ |
| public Listing getListing() { |
| return listing; |
| } |
| |
| /** |
| * Set up the client bindings. |
| * If delegation tokens are enabled, the FS first looks for a DT |
| * ahead of any other bindings;. |
| * If there is a DT it uses that to do the auth |
| * and switches to the DT authenticator automatically (and exclusively) |
| * @param name URI of the FS |
| * @param dtEnabled are delegation tokens enabled? |
| * @throws IOException failure. |
| */ |
| private void bindAWSClient(URI name, boolean dtEnabled) throws IOException { |
| Configuration conf = getConf(); |
| credentials = null; |
| String uaSuffix = ""; |
| |
| if (dtEnabled) { |
| // Delegation support. |
| // Create and start the DT integration. |
| // Then look for an existing DT for this bucket, switch to authenticating |
| // with it if so. |
| |
| LOG.debug("Using delegation tokens"); |
| S3ADelegationTokens tokens = new S3ADelegationTokens(); |
| this.delegationTokens = Optional.of(tokens); |
| tokens.bindToFileSystem(getCanonicalUri(), |
| createStoreContext(), |
| createDelegationOperations()); |
| tokens.init(conf); |
| tokens.start(); |
| // switch to the DT provider and bypass all other configured |
| // providers. |
| if (tokens.isBoundToDT()) { |
| // A DT was retrieved. |
| LOG.debug("Using existing delegation token"); |
| // and use the encryption settings from that client, whatever they were |
| } else { |
| LOG.debug("No delegation token for this instance"); |
| } |
| // Get new credential chain |
| credentials = tokens.getCredentialProviders(); |
| // and any encryption secrets which came from a DT |
| tokens.getEncryptionSecrets() |
| .ifPresent(this::setEncryptionSecrets); |
| // and update the UA field with any diagnostics provided by |
| // the DT binding. |
| uaSuffix = tokens.getUserAgentField(); |
| } else { |
| // DT support is disabled, so create the normal credential chain |
| credentials = createAWSCredentialProviderSet(name, conf); |
| } |
| LOG.debug("Using credential provider {}", credentials); |
| Class<? extends S3ClientFactory> s3ClientFactoryClass = conf.getClass( |
| S3_CLIENT_FACTORY_IMPL, DEFAULT_S3_CLIENT_FACTORY_IMPL, |
| S3ClientFactory.class); |
| |
| s3 = ReflectionUtils.newInstance(s3ClientFactoryClass, conf) |
| .createS3Client(getUri(), bucket, credentials, uaSuffix); |
| } |
| |
| /** |
| * Implementation of all operations used by delegation tokens. |
| */ |
| private class DelegationOperationsImpl implements DelegationOperations { |
| |
| @Override |
| public List<RoleModel.Statement> listAWSPolicyRules(final Set<AccessLevel> access) { |
| return S3AFileSystem.this.listAWSPolicyRules(access); |
| } |
| } |
| |
| /** |
| * Create an instance of the delegation operations. |
| * @return callbacks for DT support. |
| */ |
| @VisibleForTesting |
| public DelegationOperations createDelegationOperations() { |
| return new DelegationOperationsImpl(); |
| } |
| |
| /** |
| * Set the encryption secrets for requests. |
| * @param secrets secrets |
| */ |
| protected void setEncryptionSecrets(final EncryptionSecrets secrets) { |
| this.encryptionSecrets = secrets; |
| } |
| |
| /** |
| * Get the encryption secrets. |
| * This potentially sensitive information and must be treated with care. |
| * @return the current encryption secrets. |
| */ |
| public EncryptionSecrets getEncryptionSecrets() { |
| return encryptionSecrets; |
| } |
| |
| private void initTransferManager() { |
| TransferManagerConfiguration transferConfiguration = |
| new TransferManagerConfiguration(); |
| transferConfiguration.setMinimumUploadPartSize(partSize); |
| transferConfiguration.setMultipartUploadThreshold(multiPartThreshold); |
| transferConfiguration.setMultipartCopyPartSize(partSize); |
| transferConfiguration.setMultipartCopyThreshold(multiPartThreshold); |
| |
| transfers = new TransferManager(s3, unboundedThreadPool); |
| transfers.setConfiguration(transferConfiguration); |
| } |
| |
| private void initCannedAcls(Configuration conf) { |
| String cannedACLName = conf.get(CANNED_ACL, DEFAULT_CANNED_ACL); |
| if (!cannedACLName.isEmpty()) { |
| cannedACL = CannedAccessControlList.valueOf(cannedACLName); |
| } else { |
| cannedACL = null; |
| } |
| } |
| |
| @Retries.RetryTranslated |
| private void initMultipartUploads(Configuration conf) throws IOException { |
| boolean purgeExistingMultipart = conf.getBoolean(PURGE_EXISTING_MULTIPART, |
| DEFAULT_PURGE_EXISTING_MULTIPART); |
| long purgeExistingMultipartAge = longOption(conf, |
| PURGE_EXISTING_MULTIPART_AGE, DEFAULT_PURGE_EXISTING_MULTIPART_AGE, 0); |
| |
| if (purgeExistingMultipart) { |
| try { |
| abortOutstandingMultipartUploads(purgeExistingMultipartAge); |
| } catch (AccessDeniedException e) { |
| instrumentation.errorIgnored(); |
| LOG.debug("Failed to purge multipart uploads against {}," + |
| " FS may be read only", bucket); |
| } |
| } |
| } |
| |
| /** |
| * Abort all outstanding MPUs older than a given age. |
| * @param seconds time in seconds |
| * @throws IOException on any failure, other than 403 "permission denied" |
| */ |
| @Retries.RetryTranslated |
| public void abortOutstandingMultipartUploads(long seconds) |
| throws IOException { |
| Preconditions.checkArgument(seconds >= 0); |
| Date purgeBefore = |
| new Date(new Date().getTime() - seconds * 1000); |
| LOG.debug("Purging outstanding multipart uploads older than {}", |
| purgeBefore); |
| invoker.retry("Purging multipart uploads", bucket, true, |
| () -> transfers.abortMultipartUploads(bucket, purgeBefore)); |
| } |
| |
| /** |
| * Return the protocol scheme for the FileSystem. |
| * |
| * @return "s3a" |
| */ |
| @Override |
| public String getScheme() { |
| return "s3a"; |
| } |
| |
| /** |
| * Returns a URI whose scheme and authority identify this FileSystem. |
| */ |
| @Override |
| public URI getUri() { |
| return uri; |
| } |
| |
| /** |
| * Set the URI field through {@link S3xLoginHelper} and |
| * optionally {@link #canonicalizeUri(URI)} |
| * Exported for testing. |
| * @param fsUri filesystem URI. |
| * @param canonicalize true if the URI should be canonicalized. |
| */ |
| @VisibleForTesting |
| protected void setUri(URI fsUri, boolean canonicalize) { |
| URI u = S3xLoginHelper.buildFSURI(fsUri); |
| this.uri = canonicalize ? u : canonicalizeUri(u); |
| } |
| |
| /** |
| * Get the canonical URI. |
| * @return the canonical URI of this FS. |
| */ |
| public URI getCanonicalUri() { |
| return uri; |
| } |
| |
| @VisibleForTesting |
| @Override |
| public int getDefaultPort() { |
| return 0; |
| } |
| |
| /** |
| * Returns the S3 client used by this filesystem. |
| * This is for internal use within the S3A code itself. |
| * @return AmazonS3Client |
| */ |
| AmazonS3 getAmazonS3Client() { |
| return s3; |
| } |
| |
| /** |
| * Returns the S3 client used by this filesystem. |
| * <i>Warning: this must only be used for testing, as it bypasses core |
| * S3A operations. </i> |
| * @param reason a justification for requesting access. |
| * @return AmazonS3Client |
| */ |
| @VisibleForTesting |
| public AmazonS3 getAmazonS3ClientForTesting(String reason) { |
| LOG.warn("Access to S3A client requested, reason {}", reason); |
| return s3; |
| } |
| |
| /** |
| * Set the client -used in mocking tests to force in a different client. |
| * @param client client. |
| */ |
| protected void setAmazonS3Client(AmazonS3 client) { |
| Preconditions.checkNotNull(client, "client"); |
| LOG.debug("Setting S3 client to {}", client); |
| s3 = client; |
| |
| // Need to use a new TransferManager that uses the new client. |
| // Also, using a new TransferManager requires a new threadpool as the old |
| // TransferManager will shut the thread pool down when it is garbage |
| // collected. |
| initThreadPools(getConf()); |
| initTransferManager(); |
| } |
| |
| /** |
| * Get the region of a bucket. |
| * @return the region in which a bucket is located |
| * @throws AccessDeniedException if the caller lacks permission. |
| * @throws IOException on any failure. |
| */ |
| @Retries.RetryTranslated |
| public String getBucketLocation() throws IOException { |
| return getBucketLocation(bucket); |
| } |
| |
| /** |
| * Get the region of a bucket; fixing up the region so it can be used |
| * in the builders of other AWS clients. |
| * Requires the caller to have the AWS role permission |
| * {@code s3:GetBucketLocation}. |
| * Retry policy: retrying, translated. |
| * @param bucketName the name of the bucket |
| * @return the region in which a bucket is located |
| * @throws AccessDeniedException if the caller lacks permission. |
| * @throws IOException on any failure. |
| */ |
| @VisibleForTesting |
| @Retries.RetryTranslated |
| public String getBucketLocation(String bucketName) throws IOException { |
| final String region = invoker.retry("getBucketLocation()", bucketName, true, |
| () -> s3.getBucketLocation(bucketName)); |
| return fixBucketRegion(region); |
| } |
| |
| /** |
| * Returns the read ahead range value used by this filesystem. |
| * @return the readahead range |
| */ |
| @VisibleForTesting |
| long getReadAheadRange() { |
| return readAhead; |
| } |
| |
| /** |
| * Get the input policy for this FS instance. |
| * @return the input policy |
| */ |
| @InterfaceStability.Unstable |
| public S3AInputPolicy getInputPolicy() { |
| return inputPolicy; |
| } |
| |
| /** |
| * Get the change detection policy for this FS instance. |
| * Only public to allow access in tests in other packages. |
| * @return the change detection policy |
| */ |
| @VisibleForTesting |
| public ChangeDetectionPolicy getChangeDetectionPolicy() { |
| return changeDetectionPolicy; |
| } |
| |
| /** |
| * Get the encryption algorithm of this endpoint. |
| * @return the encryption algorithm. |
| */ |
| public S3AEncryptionMethods getServerSideEncryptionAlgorithm() { |
| return encryptionSecrets.getEncryptionMethod(); |
| } |
| |
| /** |
| * Demand create the directory allocator, then create a temporary file. |
| * This does not mark the file for deletion when a process exits. |
| * {@link LocalDirAllocator#createTmpFileForWrite(String, long, Configuration)}. |
| * @param pathStr prefix for the temporary file |
| * @param size the size of the file that is going to be written |
| * @param conf the Configuration object |
| * @return a unique temporary file |
| * @throws IOException IO problems |
| */ |
| File createTmpFileForWrite(String pathStr, long size, |
| Configuration conf) throws IOException { |
| if (directoryAllocator == null) { |
| synchronized (this) { |
| String bufferDir = conf.get(BUFFER_DIR) != null |
| ? BUFFER_DIR : HADOOP_TMP_DIR; |
| directoryAllocator = new LocalDirAllocator(bufferDir); |
| } |
| } |
| Path path = directoryAllocator.getLocalPathForWrite(pathStr, |
| size, conf); |
| File dir = new File(path.getParent().toUri().getPath()); |
| String prefix = path.getName(); |
| // create a temp file on this directory |
| return File.createTempFile(prefix, null, dir); |
| } |
| |
| /** |
| * Get the bucket of this filesystem. |
| * @return the bucket |
| */ |
| public String getBucket() { |
| return bucket; |
| } |
| |
| /** |
| * Set the bucket. |
| * @param bucket the bucket |
| */ |
| @VisibleForTesting |
| protected void setBucket(String bucket) { |
| this.bucket = bucket; |
| } |
| |
| /** |
| * Get the canned ACL of this FS. |
| * @return an ACL, if any |
| */ |
| CannedAccessControlList getCannedACL() { |
| return cannedACL; |
| } |
| |
| /** |
| * Change the input policy for this FS. |
| * @param inputPolicy new policy |
| */ |
| @InterfaceStability.Unstable |
| public void setInputPolicy(S3AInputPolicy inputPolicy) { |
| Objects.requireNonNull(inputPolicy, "Null inputStrategy"); |
| LOG.debug("Setting input strategy: {}", inputPolicy); |
| this.inputPolicy = inputPolicy; |
| } |
| |
| /** |
| * Turns a path (relative or otherwise) into an S3 key. |
| * |
| * @param path input path, may be relative to the working dir |
| * @return a key excluding the leading "/", or, if it is the root path, "" |
| */ |
| @VisibleForTesting |
| public String pathToKey(Path path) { |
| if (!path.isAbsolute()) { |
| path = new Path(workingDir, path); |
| } |
| |
| if (path.toUri().getScheme() != null && path.toUri().getPath().isEmpty()) { |
| return ""; |
| } |
| |
| return path.toUri().getPath().substring(1); |
| } |
| |
| /** |
| * Turns a path (relative or otherwise) into an S3 key, adding a trailing |
| * "/" if the path is not the root <i>and</i> does not already have a "/" |
| * at the end. |
| * |
| * @param key s3 key or "" |
| * @return the with a trailing "/", or, if it is the root key, "", |
| */ |
| @InterfaceAudience.Private |
| public String maybeAddTrailingSlash(String key) { |
| return S3AUtils.maybeAddTrailingSlash(key); |
| } |
| |
| /** |
| * Convert a path back to a key. |
| * @param key input key |
| * @return the path from this key |
| */ |
| Path keyToPath(String key) { |
| return new Path("/" + key); |
| } |
| |
| /** |
| * Convert a key to a fully qualified path. |
| * This includes fixing up the URI so that if it ends with a trailing slash, |
| * that is corrected, similar to {@code Path.normalizePath()}. |
| * @param key input key |
| * @return the fully qualified path including URI scheme and bucket name. |
| */ |
| public Path keyToQualifiedPath(String key) { |
| return qualify(keyToPath(key)); |
| } |
| |
| @Override |
| public Path makeQualified(final Path path) { |
| Path q = super.makeQualified(path); |
| if (!q.isRoot()) { |
| String urlString = q.toUri().toString(); |
| if (urlString.endsWith(Path.SEPARATOR)) { |
| // this is a path which needs root stripping off to avoid |
| // confusion, See HADOOP-15430 |
| LOG.debug("Stripping trailing '/' from {}", q); |
| // deal with an empty "/" at the end by mapping to the parent and |
| // creating a new path from it |
| q = new Path(urlString.substring(0, urlString.length() - 1)); |
| } |
| } |
| if (!q.isRoot() && q.getName().isEmpty()) { |
| q = q.getParent(); |
| } |
| return q; |
| } |
| |
| /** |
| * Qualify a path. |
| * This includes fixing up the URI so that if it ends with a trailing slash, |
| * that is corrected, similar to {@code Path.normalizePath()}. |
| * @param path path to qualify |
| * @return a qualified path. |
| */ |
| public Path qualify(Path path) { |
| return makeQualified(path); |
| } |
| |
| /** |
| * Check that a Path belongs to this FileSystem. |
| * Unlike the superclass, this version does not look at authority, |
| * only hostnames. |
| * @param path to check |
| * @throws IllegalArgumentException if there is an FS mismatch |
| */ |
| @Override |
| public void checkPath(Path path) { |
| S3xLoginHelper.checkPath(getConf(), getUri(), path, getDefaultPort()); |
| } |
| |
| /** |
| * Override the base canonicalization logic and relay to |
| * {@link S3xLoginHelper#canonicalizeUri(URI, int)}. |
| * This allows for the option of changing this logic for better DT handling. |
| * @param rawUri raw URI. |
| * @return the canonical URI to use in delegation tokens and file context. |
| */ |
| @Override |
| protected URI canonicalizeUri(URI rawUri) { |
| return S3xLoginHelper.canonicalizeUri(rawUri, getDefaultPort()); |
| } |
| |
| /** |
| * Opens an FSDataInputStream at the indicated Path. |
| * @param f the file name to open |
| * @param bufferSize the size of the buffer to be used. |
| */ |
| @Retries.RetryTranslated |
| public FSDataInputStream open(Path f, int bufferSize) |
| throws IOException { |
| return open(f, Optional.empty(), Optional.empty()); |
| } |
| |
| /** |
| * Opens an FSDataInputStream at the indicated Path. |
| * if status contains an S3AFileStatus reference, it is used |
| * and so a HEAD request to the store is avoided. |
| * |
| * @param file the file to open |
| * @param options configuration options if opened with the builder API. |
| * @param providedStatus optional file status. |
| * @throws IOException IO failure. |
| */ |
| @Retries.RetryTranslated |
| private FSDataInputStream open( |
| final Path file, |
| final Optional<Configuration> options, |
| final Optional<S3AFileStatus> providedStatus) |
| throws IOException { |
| |
| entryPoint(INVOCATION_OPEN); |
| final Path path = qualify(file); |
| S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, |
| providedStatus); |
| |
| S3AReadOpContext readContext; |
| if (options.isPresent()) { |
| Configuration o = options.get(); |
| // normal path. Open the file with the chosen seek policy, if different |
| // from the normal one. |
| // and readahead. |
| S3AInputPolicy policy = S3AInputPolicy.getPolicy( |
| o.get(INPUT_FADVISE, inputPolicy.toString())); |
| long readAheadRange2 = o.getLong(READAHEAD_RANGE, readAhead); |
| // TODO support change detection policy from options? |
| readContext = createReadContext( |
| fileStatus, |
| policy, |
| changeDetectionPolicy, |
| readAheadRange2); |
| } else { |
| readContext = createReadContext( |
| fileStatus, |
| inputPolicy, |
| changeDetectionPolicy, |
| readAhead); |
| } |
| LOG.debug("Opening '{}'", readContext); |
| |
| return new FSDataInputStream( |
| new S3AInputStream( |
| readContext, |
| createObjectAttributes(fileStatus), |
| s3)); |
| } |
| |
| /** |
| * Create the read context for reading from the referenced file, |
| * using FS state as well as the status. |
| * @param fileStatus file status. |
| * @param seekPolicy input policy for this operation |
| * @param readAheadRange readahead value. |
| * @return a context for read and select operations. |
| */ |
| private S3AReadOpContext createReadContext( |
| final FileStatus fileStatus, |
| final S3AInputPolicy seekPolicy, |
| final ChangeDetectionPolicy changePolicy, |
| final long readAheadRange) { |
| return new S3AReadOpContext(fileStatus.getPath(), |
| hasMetadataStore(), |
| invoker, |
| s3guardInvoker, |
| statistics, |
| instrumentation, |
| fileStatus, |
| seekPolicy, |
| changePolicy, |
| readAheadRange); |
| } |
| |
| /** |
| * Create the attributes of an object for subsequent use. |
| * @param f path path of the request. |
| * @param eTag the eTag of the S3 object |
| * @param versionId S3 object version ID |
| * @param len length of the file |
| * @return attributes to use when building the query. |
| */ |
| private S3ObjectAttributes createObjectAttributes( |
| final Path f, |
| final String eTag, |
| final String versionId, |
| final long len) { |
| return new S3ObjectAttributes(bucket, |
| f, |
| pathToKey(f), |
| getServerSideEncryptionAlgorithm(), |
| encryptionSecrets.getEncryptionKey(), |
| eTag, |
| versionId, |
| len); |
| } |
| |
| /** |
| * Create the attributes of an object for subsequent use. |
| * @param fileStatus file status to build from. |
| * @return attributes to use when building the query. |
| */ |
| private S3ObjectAttributes createObjectAttributes( |
| final S3AFileStatus fileStatus) { |
| return createObjectAttributes( |
| fileStatus.getPath(), |
| fileStatus.getETag(), |
| fileStatus.getVersionId(), |
| fileStatus.getLen()); |
| } |
| |
| /** |
| * Create an FSDataOutputStream at the indicated Path with write-progress |
| * reporting. |
| * Retry policy: retrying, translated on the getFileStatus() probe. |
| * No data is uploaded to S3 in this call, so retry issues related to that. |
| * @param f the file name to open |
| * @param permission the permission to set. |
| * @param overwrite if a file with this name already exists, then if true, |
| * the file will be overwritten, and if false an error will be thrown. |
| * @param bufferSize the size of the buffer to be used. |
| * @param replication required block replication for the file. |
| * @param blockSize the requested block size. |
| * @param progress the progress reporter. |
| * @throws IOException in the event of IO related errors. |
| * @see #setPermission(Path, FsPermission) |
| */ |
| @Override |
| @SuppressWarnings("IOResourceOpenedButNotSafelyClosed") |
| public FSDataOutputStream create(Path f, FsPermission permission, |
| boolean overwrite, int bufferSize, short replication, long blockSize, |
| Progressable progress) throws IOException { |
| entryPoint(INVOCATION_CREATE); |
| final Path path = qualify(f); |
| String key = pathToKey(path); |
| FileStatus status = null; |
| try { |
| // get the status or throw an FNFE. |
| // when overwriting, there is no need to look for any existing file, |
| // and attempting to do so can poison the load balancers with 404 |
| // entries. |
| status = innerGetFileStatus(path, false, |
| overwrite |
| ? StatusProbeEnum.DIRECTORIES |
| : StatusProbeEnum.ALL); |
| |
| // if the thread reaches here, there is something at the path |
| if (status.isDirectory()) { |
| // path references a directory: automatic error |
| throw new FileAlreadyExistsException(path + " is a directory"); |
| } |
| if (!overwrite) { |
| // path references a file and overwrite is disabled |
| throw new FileAlreadyExistsException(path + " already exists"); |
| } |
| LOG.debug("Overwriting file {}", path); |
| } catch (FileNotFoundException e) { |
| // this means the file is not found |
| |
| } |
| instrumentation.fileCreated(); |
| PutTracker putTracker = |
| committerIntegration.createTracker(path, key); |
| String destKey = putTracker.getDestKey(); |
| return new FSDataOutputStream( |
| new S3ABlockOutputStream(this, |
| destKey, |
| new SemaphoredDelegatingExecutor(boundedThreadPool, |
| blockOutputActiveBlocks, true), |
| progress, |
| partSize, |
| blockFactory, |
| instrumentation.newOutputStreamStatistics(statistics), |
| getWriteOperationHelper(), |
| putTracker), |
| null); |
| } |
| |
| /** |
| * Get a {@code WriteOperationHelper} instance. |
| * |
| * This class permits other low-level operations against the store. |
| * It is unstable and |
| * only intended for code with intimate knowledge of the object store. |
| * If using this, be prepared for changes even on minor point releases. |
| * @return a new helper. |
| */ |
| @InterfaceAudience.Private |
| public WriteOperationHelper getWriteOperationHelper() { |
| return writeHelper; |
| } |
| |
| /** |
| * {@inheritDoc} |
| * @throws FileNotFoundException if the parent directory is not present -or |
| * is not a directory. |
| */ |
| @Override |
| public FSDataOutputStream createNonRecursive(Path p, |
| FsPermission permission, |
| EnumSet<CreateFlag> flags, |
| int bufferSize, |
| short replication, |
| long blockSize, |
| Progressable progress) throws IOException { |
| entryPoint(INVOCATION_CREATE_NON_RECURSIVE); |
| final Path path = makeQualified(p); |
| Path parent = path.getParent(); |
| // expect this to raise an exception if there is no parent dir |
| if (parent != null && !parent.isRoot()) { |
| S3AFileStatus status; |
| try { |
| // optimize for the directory existing: Call list first |
| status = innerGetFileStatus(parent, false, |
| StatusProbeEnum.DIRECTORIES); |
| } catch (FileNotFoundException e) { |
| // no dir, fall back to looking for a file |
| // (failure condition if true) |
| status = innerGetFileStatus(parent, false, |
| StatusProbeEnum.HEAD_ONLY); |
| } |
| if (!status.isDirectory()) { |
| throw new FileAlreadyExistsException("Not a directory: " + parent); |
| } |
| } |
| return create(path, permission, |
| flags.contains(CreateFlag.OVERWRITE), bufferSize, |
| replication, blockSize, progress); |
| } |
| |
| /** |
| * Append to an existing file (optional operation). |
| * @param f the existing file to be appended. |
| * @param bufferSize the size of the buffer to be used. |
| * @param progress for reporting progress if it is not null. |
| * @throws IOException indicating that append is not supported. |
| */ |
| public FSDataOutputStream append(Path f, int bufferSize, |
| Progressable progress) throws IOException { |
| throw new UnsupportedOperationException("Append is not supported " |
| + "by S3AFileSystem"); |
| } |
| |
| |
| /** |
| * Renames Path src to Path dst. Can take place on local fs |
| * or remote DFS. |
| * |
| * Warning: S3 does not support renames. This method does a copy which can |
| * take S3 some time to execute with large files and directories. Since |
| * there is no Progressable passed in, this can time out jobs. |
| * |
| * Note: This implementation differs with other S3 drivers. Specifically: |
| * <pre> |
| * Fails if src is a file and dst is a directory. |
| * Fails if src is a directory and dst is a file. |
| * Fails if the parent of dst does not exist or is a file. |
| * Fails if dst is a directory that is not empty. |
| * </pre> |
| * |
| * @param src path to be renamed |
| * @param dst new path after rename |
| * @throws IOException on IO failure |
| * @return true if rename is successful |
| */ |
| @Retries.RetryTranslated |
| public boolean rename(Path src, Path dst) throws IOException { |
| try (DurationInfo ignored = new DurationInfo(LOG, false, |
| "rename(%s, %s", src, dst)) { |
| long bytesCopied = innerRename(src, dst); |
| LOG.debug("Copied {} bytes", bytesCopied); |
| return true; |
| } catch (AmazonClientException e) { |
| throw translateException("rename(" + src +", " + dst + ")", src, e); |
| } catch (RenameFailedException e) { |
| LOG.info("{}", e.getMessage()); |
| LOG.debug("rename failure", e); |
| return e.getExitCode(); |
| } catch (FileNotFoundException e) { |
| LOG.debug(e.toString()); |
| return false; |
| } |
| } |
| |
| /** |
| * Validate the rename parameters and status of the filesystem; |
| * returns the source and any destination File Status. |
| * @param src qualified path to be renamed |
| * @param dst qualified path after rename |
| * @return the source and (possibly null) destination status entries. |
| * @throws RenameFailedException if some criteria for a state changing |
| * rename was not met. This means work didn't happen; it's not something |
| * which is reported upstream to the FileSystem APIs, for which the semantics |
| * of "false" are pretty vague. |
| * @throws FileNotFoundException there's no source file. |
| * @throws IOException on IO failure. |
| */ |
| @Retries.RetryTranslated |
| private Pair<S3AFileStatus, S3AFileStatus> initiateRename( |
| final Path src, |
| final Path dst) throws IOException { |
| String srcKey = pathToKey(src); |
| String dstKey = pathToKey(dst); |
| |
| if (srcKey.isEmpty()) { |
| throw new RenameFailedException(src, dst, "source is root directory"); |
| } |
| if (dstKey.isEmpty()) { |
| throw new RenameFailedException(src, dst, "dest is root directory"); |
| } |
| |
| // get the source file status; this raises a FNFE if there is no source |
| // file. |
| S3AFileStatus srcStatus = innerGetFileStatus(src, true, |
| StatusProbeEnum.ALL); |
| |
| if (srcKey.equals(dstKey)) { |
| LOG.debug("rename: src and dest refer to the same file or directory: {}", |
| dst); |
| throw new RenameFailedException(src, dst, |
| "source and dest refer to the same file or directory") |
| .withExitCode(srcStatus.isFile()); |
| } |
| |
| S3AFileStatus dstStatus = null; |
| try { |
| dstStatus = innerGetFileStatus(dst, true, StatusProbeEnum.ALL); |
| // if there is no destination entry, an exception is raised. |
| // hence this code sequence can assume that there is something |
| // at the end of the path; the only detail being what it is and |
| // whether or not it can be the destination of the rename. |
| if (srcStatus.isDirectory()) { |
| if (dstStatus.isFile()) { |
| throw new RenameFailedException(src, dst, |
| "source is a directory and dest is a file") |
| .withExitCode(srcStatus.isFile()); |
| } else if (dstStatus.isEmptyDirectory() != Tristate.TRUE) { |
| throw new RenameFailedException(src, dst, |
| "Destination is a non-empty directory") |
| .withExitCode(false); |
| } |
| // at this point the destination is an empty directory |
| } else { |
| // source is a file. The destination must be a directory, |
| // empty or not |
| if (dstStatus.isFile()) { |
| throw new RenameFailedException(src, dst, |
| "Cannot rename onto an existing file") |
| .withExitCode(false); |
| } |
| } |
| |
| } catch (FileNotFoundException e) { |
| LOG.debug("rename: destination path {} not found", dst); |
| // Parent must exist |
| Path parent = dst.getParent(); |
| if (!pathToKey(parent).isEmpty() |
| && !parent.equals(src.getParent())) { |
| try { |
| // only look against S3 for directories; saves |
| // a HEAD request on all normal operations. |
| S3AFileStatus dstParentStatus = innerGetFileStatus(parent, |
| false, StatusProbeEnum.DIRECTORIES); |
| if (!dstParentStatus.isDirectory()) { |
| throw new RenameFailedException(src, dst, |
| "destination parent is not a directory"); |
| } |
| } catch (FileNotFoundException e2) { |
| throw new RenameFailedException(src, dst, |
| "destination has no parent "); |
| } |
| } |
| } |
| return Pair.of(srcStatus, dstStatus); |
| } |
| |
| /** |
| * The inner rename operation. See {@link #rename(Path, Path)} for |
| * the description of the operation. |
| * This operation throws an exception on any failure which needs to be |
| * reported and downgraded to a failure. |
| * Retries: retry translated, assuming all operations it is called do |
| * so. For safely, consider catch and handle AmazonClientException |
| * because this is such a complex method there's a risk it could surface. |
| * @param source path to be renamed |
| * @param dest new path after rename |
| * @throws RenameFailedException if some criteria for a state changing |
| * rename was not met. This means work didn't happen; it's not something |
| * which is reported upstream to the FileSystem APIs, for which the semantics |
| * of "false" are pretty vague. |
| * @return the number of bytes copied. |
| * @throws FileNotFoundException there's no source file. |
| * @throws IOException on IO failure. |
| * @throws AmazonClientException on failures inside the AWS SDK |
| */ |
| @Retries.RetryMixed |
| private long innerRename(Path source, Path dest) |
| throws RenameFailedException, FileNotFoundException, IOException, |
| AmazonClientException { |
| Path src = qualify(source); |
| Path dst = qualify(dest); |
| |
| LOG.debug("Rename path {} to {}", src, dst); |
| entryPoint(INVOCATION_RENAME); |
| |
| String srcKey = pathToKey(src); |
| String dstKey = pathToKey(dst); |
| |
| Pair<S3AFileStatus, S3AFileStatus> p = initiateRename(src, dst); |
| |
| // Initiate the rename. |
| // this will call back into this class via the rename callbacks |
| // and interact directly with any metastore. |
| RenameOperation renameOperation = new RenameOperation( |
| createStoreContext(), |
| src, srcKey, p.getLeft(), |
| dst, dstKey, p.getRight(), |
| operationCallbacks, |
| pageSize); |
| return renameOperation.execute(); |
| } |
| |
| @Override public Token<? extends TokenIdentifier> getFsDelegationToken() |
| throws IOException { |
| return getDelegationToken(null); |
| } |
| |
| /** |
| * The callbacks made by the rename and delete operations. |
| * This separation allows the operation to be factored out and |
| * still avoid knowledge of the S3AFilesystem implementation. |
| */ |
| private class OperationCallbacksImpl implements OperationCallbacks { |
| |
| @Override |
| public S3ObjectAttributes createObjectAttributes(final Path path, |
| final String eTag, |
| final String versionId, |
| final long len) { |
| return S3AFileSystem.this.createObjectAttributes(path, eTag, versionId, |
| len); |
| } |
| |
| @Override |
| public S3ObjectAttributes createObjectAttributes( |
| final S3AFileStatus fileStatus) { |
| return S3AFileSystem.this.createObjectAttributes(fileStatus); |
| } |
| |
| @Override |
| public S3AReadOpContext createReadContext(final FileStatus fileStatus) { |
| return S3AFileSystem.this.createReadContext(fileStatus, |
| inputPolicy, |
| changeDetectionPolicy, readAhead); |
| } |
| |
| @Override |
| @Retries.RetryTranslated |
| public void deleteObjectAtPath(final Path path, |
| final String key, |
| final boolean isFile, |
| final BulkOperationState operationState) |
| throws IOException { |
| once("delete", path.toString(), () -> |
| S3AFileSystem.this.deleteObjectAtPath(path, key, isFile, |
| operationState)); |
| } |
| |
| @Override |
| @Retries.RetryTranslated |
| public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories( |
| final Path path, |
| final S3AFileStatus status, |
| final boolean collectTombstones, |
| final boolean includeSelf) throws IOException { |
| return innerListFiles( |
| path, |
| true, |
| includeSelf |
| ? Listing.ACCEPT_ALL_BUT_S3N |
| : new Listing.AcceptAllButSelfAndS3nDirs(path), |
| status, |
| collectTombstones, |
| true); |
| } |
| |
| @Override |
| public CopyResult copyFile(final String srcKey, |
| final String destKey, |
| final S3ObjectAttributes srcAttributes, |
| final S3AReadOpContext readContext) throws IOException { |
| return S3AFileSystem.this.copyFile(srcKey, destKey, |
| srcAttributes.getLen(), srcAttributes, readContext); |
| } |
| |
| @Override |
| public DeleteObjectsResult removeKeys( |
| final List<DeleteObjectsRequest.KeyVersion> keysToDelete, |
| final boolean deleteFakeDir, |
| final List<Path> undeletedObjectsOnFailure, |
| final BulkOperationState operationState, |
| final boolean quiet) |
| throws MultiObjectDeleteException, AmazonClientException, IOException { |
| return S3AFileSystem.this.removeKeys(keysToDelete, deleteFakeDir, |
| undeletedObjectsOnFailure, operationState, quiet); |
| } |
| |
| @Override |
| public void finishRename(final Path sourceRenamed, final Path destCreated) |
| throws IOException { |
| Path destParent = destCreated.getParent(); |
| if (!sourceRenamed.getParent().equals(destParent)) { |
| LOG.debug("source & dest parents are different; fix up dir markers"); |
| if (!keepDirectoryMarkers(destParent)) { |
| deleteUnnecessaryFakeDirectories(destParent, null); |
| } |
| maybeCreateFakeParentDirectory(sourceRenamed); |
| } |
| } |
| |
| @Override |
| public boolean allowAuthoritative(final Path p) { |
| return S3AFileSystem.this.allowAuthoritative(p); |
| } |
| |
| @Override |
| @Retries.RetryTranslated |
| public RemoteIterator<S3AFileStatus> listObjects( |
| final Path path, |
| final String key) |
| throws IOException { |
| return once("listObjects", key, () -> |
| listing.createFileStatusListingIterator(path, |
| createListObjectsRequest(key, null), |
| ACCEPT_ALL, |
| Listing.ACCEPT_ALL_BUT_S3N, |
| null)); |
| } |
| } |
| |
| protected class ListingOperationCallbacksImpl implements |
| ListingOperationCallbacks { |
| |
| @Override |
| @Retries.RetryRaw |
| public CompletableFuture<S3ListResult> listObjectsAsync( |
| S3ListRequest request) |
| throws IOException { |
| return submit(unboundedThreadPool, |
| () -> listObjects(request)); |
| } |
| |
| @Override |
| @Retries.RetryRaw |
| public CompletableFuture<S3ListResult> continueListObjectsAsync( |
| S3ListRequest request, |
| S3ListResult prevResult) |
| throws IOException { |
| return submit(unboundedThreadPool, |
| () -> continueListObjects(request, prevResult)); |
| } |
| |
| @Override |
| public S3ALocatedFileStatus toLocatedFileStatus( |
| S3AFileStatus status) |
| throws IOException { |
| return S3AFileSystem.this.toLocatedFileStatus(status); |
| } |
| |
| @Override |
| public S3ListRequest createListObjectsRequest( |
| String key, |
| String delimiter) { |
| return S3AFileSystem.this.createListObjectsRequest(key, delimiter); |
| } |
| |
| @Override |
| public long getDefaultBlockSize(Path path) { |
| return S3AFileSystem.this.getDefaultBlockSize(path); |
| } |
| |
| @Override |
| public int getMaxKeys() { |
| return S3AFileSystem.this.getMaxKeys(); |
| } |
| |
| @Override |
| public ITtlTimeProvider getUpdatedTtlTimeProvider() { |
| return S3AFileSystem.this.ttlTimeProvider; |
| } |
| |
| @Override |
| public boolean allowAuthoritative(final Path p) { |
| return S3AFileSystem.this.allowAuthoritative(p); |
| } |
| } |
| |
| /** |
| * Low-level call to get at the object metadata. |
| * @param path path to the object |
| * @return metadata |
| * @throws IOException IO and object access problems. |
| */ |
| @VisibleForTesting |
| @Retries.RetryTranslated |
| public ObjectMetadata getObjectMetadata(Path path) throws IOException { |
| return getObjectMetadata(path, null, invoker, null); |
| } |
| |
| /** |
| * Low-level call to get at the object metadata. |
| * @param path path to the object |
| * @param changeTracker the change tracker to detect version inconsistencies |
| * @param changeInvoker the invoker providing the retry policy |
| * @param operation the operation being performed (e.g. "read" or "copy") |
| * @return metadata |
| * @throws IOException IO and object access problems. |
| */ |
| @VisibleForTesting |
| @Retries.RetryTranslated |
| public ObjectMetadata getObjectMetadata(Path path, |
| ChangeTracker changeTracker, Invoker changeInvoker, String operation) |
| throws IOException { |
| checkNotClosed(); |
| return once("getObjectMetadata", path.toString(), |
| () -> |
| // this always does a full HEAD to the object |
| getObjectMetadata( |
| pathToKey(path), changeTracker, changeInvoker, operation)); |
| } |
| |
| /** |
| * Get all the headers of the object of a path, if the object exists. |
| * @param path path to probe |
| * @return an immutable map of object headers. |
| * @throws IOException failure of the query |
| */ |
| @Retries.RetryTranslated |
| public Map<String, Object> getObjectHeaders(Path path) throws IOException { |
| LOG.debug("getObjectHeaders({})", path); |
| checkNotClosed(); |
| incrementReadOperations(); |
| return getObjectMetadata(path).getRawMetadata(); |
| } |
| |
| /** |
| * Does this Filesystem have a metadata store? |
| * @return true iff the FS has been instantiated with a metadata store |
| */ |
| public boolean hasMetadataStore() { |
| return !S3Guard.isNullMetadataStore(metadataStore); |
| } |
| |
| /** |
| * Does the filesystem have an authoritative metadata store? |
| * @return true if there is a metadata store and the authoritative flag |
| * is set for this filesystem. |
| */ |
| @VisibleForTesting |
| public boolean hasAuthoritativeMetadataStore() { |
| return hasMetadataStore() && allowAuthoritativeMetadataStore; |
| } |
| |
| /** |
| * Get the metadata store. |
| * This will always be non-null, but may be bound to the |
| * {@code NullMetadataStore}. |
| * @return the metadata store of this FS instance |
| */ |
| @VisibleForTesting |
| public MetadataStore getMetadataStore() { |
| return metadataStore; |
| } |
| |
| /** For testing only. See ITestS3GuardEmptyDirs. */ |
| @VisibleForTesting |
| void setMetadataStore(MetadataStore ms) { |
| Preconditions.checkNotNull(ms); |
| metadataStore = ms; |
| } |
| |
| /** |
| * Entry point to an operation. |
| * Increments the statistic; verifies the FS is active. |
| * @param operation The operation to increment |
| * @throws IOException if the |
| */ |
| protected void entryPoint(Statistic operation) throws IOException { |
| checkNotClosed(); |
| incrementStatistic(operation); |
| } |
| |
| /** |
| * Increment a statistic by 1. |
| * This increments both the instrumentation and storage statistics. |
| * @param statistic The operation to increment |
| */ |
| protected void incrementStatistic(Statistic statistic) { |
| incrementStatistic(statistic, 1); |
| } |
| |
| /** |
| * Increment a statistic by a specific value. |
| * This increments both the instrumentation and storage statistics. |
| * @param statistic The operation to increment |
| * @param count the count to increment |
| */ |
| protected void incrementStatistic(Statistic statistic, long count) { |
| instrumentation.incrementCounter(statistic, count); |
| storageStatistics.incrementCounter(statistic, count); |
| } |
| |
| /** |
| * Decrement a gauge by a specific value. |
| * @param statistic The operation to decrement |
| * @param count the count to decrement |
| */ |
| protected void decrementGauge(Statistic statistic, long count) { |
| instrumentation.decrementGauge(statistic, count); |
| } |
| |
| /** |
| * Increment a gauge by a specific value. |
| * @param statistic The operation to increment |
| * @param count the count to increment |
| */ |
| protected void incrementGauge(Statistic statistic, long count) { |
| instrumentation.incrementGauge(statistic, count); |
| } |
| |
| /** |
| * Callback when an operation was retried. |
| * Increments the statistics of ignored errors or throttled requests, |
| * depending up on the exception class. |
| * @param ex exception. |
| */ |
| public void operationRetried(Exception ex) { |
| if (isThrottleException(ex)) { |
| operationThrottled(false); |
| } else { |
| incrementStatistic(IGNORED_ERRORS); |
| } |
| } |
| |
| /** |
| * Callback from {@link Invoker} when an operation is retried. |
| * @param text text of the operation |
| * @param ex exception |
| * @param retries number of retries |
| * @param idempotent is the method idempotent |
| */ |
| public void operationRetried( |
| String text, |
| Exception ex, |
| int retries, |
| boolean idempotent) { |
| operationRetried(ex); |
| } |
| |
| /** |
| * Callback from {@link Invoker} when an operation against a metastore |
| * is retried. |
| * Always increments the {@link Statistic#S3GUARD_METADATASTORE_RETRY} |
| * statistic/counter; |
| * if it is a throttling exception will update the associated |
| * throttled metrics/statistics. |
| * |
| * @param ex exception |
| * @param retries number of retries |
| * @param idempotent is the method idempotent |
| */ |
| public void metastoreOperationRetried(Exception ex, |
| int retries, |
| boolean idempotent) { |
| incrementStatistic(S3GUARD_METADATASTORE_RETRY); |
| if (isThrottleException(ex)) { |
| operationThrottled(true); |
| } else { |
| incrementStatistic(IGNORED_ERRORS); |
| } |
| } |
| |
| /** |
| * Note that an operation was throttled -this will update |
| * specific counters/metrics. |
| * @param metastore was the throttling observed in the S3Guard metastore? |
| */ |
| private void operationThrottled(boolean metastore) { |
| LOG.debug("Request throttled on {}", metastore ? "S3": "DynamoDB"); |
| if (metastore) { |
| incrementStatistic(S3GUARD_METADATASTORE_THROTTLED); |
| instrumentation.addValueToQuantiles(S3GUARD_METADATASTORE_THROTTLE_RATE, |
| 1); |
| } else { |
| incrementStatistic(STORE_IO_THROTTLED); |
| instrumentation.addValueToQuantiles(STORE_IO_THROTTLE_RATE, 1); |
| } |
| } |
| |
| /** |
| * Get the storage statistics of this filesystem. |
| * @return the storage statistics |
| */ |
| @Override |
| public S3AStorageStatistics getStorageStatistics() { |
| return storageStatistics; |
| } |
| |
| /** |
| * Request object metadata; increments counters in the process. |
| * Retry policy: retry untranslated. |
| * @param key key |
| * @return the metadata |
| * @throws IOException if the retry invocation raises one (it shouldn't). |
| */ |
| @Retries.RetryRaw |
| @VisibleForTesting |
| ObjectMetadata getObjectMetadata(String key) throws IOException { |
| return getObjectMetadata(key, null, invoker,null); |
| } |
| |
| /** |
| * Request object metadata; increments counters in the process. |
| * Retry policy: retry untranslated. |
| * Uses changeTracker to detect an unexpected file version (eTag or versionId) |
| * @param key key |
| * @param changeTracker the change tracker to detect unexpected object version |
| * @param changeInvoker the invoker providing the retry policy |
| * @param operation the operation (e.g. "read" or "copy") triggering this call |
| * @return the metadata |
| * @throws IOException if the retry invocation raises one (it shouldn't). |
| * @throws RemoteFileChangedException if an unexpected version is detected |
| */ |
| @Retries.RetryRaw |
| protected ObjectMetadata getObjectMetadata(String key, |
| ChangeTracker changeTracker, |
| Invoker changeInvoker, |
| String operation) throws IOException { |
| GetObjectMetadataRequest request = |
| new GetObjectMetadataRequest(bucket, key); |
| //SSE-C requires to be filled in if enabled for object metadata |
| generateSSECustomerKey().ifPresent(request::setSSECustomerKey); |
| ObjectMetadata meta = changeInvoker.retryUntranslated("GET " + key, true, |
| () -> { |
| incrementStatistic(OBJECT_METADATA_REQUESTS); |
| LOG.debug("HEAD {} with change tracker {}", key, changeTracker); |
| if (changeTracker != null) { |
| changeTracker.maybeApplyConstraint(request); |
| } |
| ObjectMetadata objectMetadata = s3.getObjectMetadata(request); |
| if (changeTracker != null) { |
| changeTracker.processMetadata(objectMetadata, operation); |
| } |
| return objectMetadata; |
| }); |
| incrementReadOperations(); |
| return meta; |
| } |
| |
| /** |
| * Initiate a {@code listObjects} operation, incrementing metrics |
| * in the process. |
| * |
| * Retry policy: retry untranslated. |
| * @param request request to initiate |
| * @return the results |
| * @throws IOException if the retry invocation raises one (it shouldn't). |
| */ |
| @Retries.RetryRaw |
| protected S3ListResult listObjects(S3ListRequest request) throws IOException { |
| incrementReadOperations(); |
| incrementStatistic(OBJECT_LIST_REQUESTS); |
| LOG.debug("LIST {}", request); |
| validateListArguments(request); |
| try(DurationInfo ignored = |
| new DurationInfo(LOG, false, "LIST")) { |
| return invoker.retryUntranslated( |
| request.toString(), |
| true, |
| () -> { |
| if (useListV1) { |
| return S3ListResult.v1(s3.listObjects(request.getV1())); |
| } else { |
| return S3ListResult.v2(s3.listObjectsV2(request.getV2())); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Validate the list arguments with this bucket's settings. |
| * @param request the request to validate |
| */ |
| private void validateListArguments(S3ListRequest request) { |
| if (useListV1) { |
| Preconditions.checkArgument(request.isV1()); |
| } else { |
| Preconditions.checkArgument(!request.isV1()); |
| } |
| } |
| |
| /** |
| * List the next set of objects. |
| * Retry policy: retry untranslated. |
| * @param request last list objects request to continue |
| * @param prevResult last paged result to continue from |
| * @return the next result object |
| * @throws IOException none, just there for retryUntranslated. |
| */ |
| @Retries.RetryRaw |
| protected S3ListResult continueListObjects(S3ListRequest request, |
| S3ListResult prevResult) throws IOException { |
| incrementReadOperations(); |
| validateListArguments(request); |
| try(DurationInfo ignored = |
| new DurationInfo(LOG, false, "LIST (continued)")) { |
| return invoker.retryUntranslated( |
| request.toString(), |
| true, |
| () -> { |
| incrementStatistic(OBJECT_CONTINUE_LIST_REQUESTS); |
| if (useListV1) { |
| return S3ListResult.v1( |
| s3.listNextBatchOfObjects(prevResult.getV1())); |
| } else { |
| request.getV2().setContinuationToken(prevResult.getV2() |
| .getNextContinuationToken()); |
| return S3ListResult.v2(s3.listObjectsV2(request.getV2())); |
| } |
| }); |
| } |
| } |
| |
| /** |
| * Increment read operations. |
| */ |
| public void incrementReadOperations() { |
| statistics.incrementReadOps(1); |
| } |
| |
| /** |
| * Increment the write operation counter. |
| * This is somewhat inaccurate, as it appears to be invoked more |
| * often than needed in progress callbacks. |
| */ |
| public void incrementWriteOperations() { |
| statistics.incrementWriteOps(1); |
| } |
| |
| /** |
| * Delete an object. This is the low-level internal call which |
| * <i>does not</i> update the metastore. |
| * Increments the {@code OBJECT_DELETE_REQUESTS} and write |
| * operation statistics. |
| * This call does <i>not</i> create any mock parent entries. |
| * |
| * Retry policy: retry untranslated; delete considered idempotent. |
| * @param key key to blob to delete. |
| * @throws AmazonClientException problems working with S3 |
| * @throws InvalidRequestException if the request was rejected due to |
| * a mistaken attempt to delete the root directory. |
| */ |
| @VisibleForTesting |
| @Retries.RetryRaw |
| protected void deleteObject(String key) |
| throws AmazonClientException, IOException { |
| blockRootDelete(key); |
| incrementWriteOperations(); |
| try (DurationInfo ignored = |
| new DurationInfo(LOG, false, |
| "deleting %s", key)) { |
| invoker.retryUntranslated(String.format("Delete %s:/%s", bucket, key), |
| DELETE_CONSIDERED_IDEMPOTENT, |
| ()-> { |
| incrementStatistic(OBJECT_DELETE_REQUESTS); |
| s3.deleteObject(bucket, key); |
| return null; |
| }); |
| } |
| } |
| |
| /** |
| * Delete an object, also updating the metastore. |
| * This call does <i>not</i> create any mock parent entries. |
| * Retry policy: retry untranslated; delete considered idempotent. |
| * @param f path path to delete |
| * @param key key of entry |
| * @param isFile is the path a file (used for instrumentation only) |
| * @param operationState (nullable) operational state for a bulk update |
| * @throws AmazonClientException problems working with S3 |
| * @throws IOException IO failure in the metastore |
| */ |
| @Retries.RetryMixed |
| void deleteObjectAtPath(Path f, |
| String key, |
| boolean isFile, |
| @Nullable final BulkOperationState operationState) |
| throws AmazonClientException, IOException { |
| if (isFile) { |
| instrumentation.fileDeleted(1); |
| } else { |
| instrumentation.directoryDeleted(); |
| } |
| deleteObject(key); |
| metadataStore.delete(f, operationState); |
| } |
| |
| /** |
| * Reject any request to delete an object where the key is root. |
| * @param key key to validate |
| * @throws InvalidRequestException if the request was rejected due to |
| * a mistaken attempt to delete the root directory. |
| */ |
| private void blockRootDelete(String key) throws InvalidRequestException { |
| if (key.isEmpty() || "/".equals(key)) { |
| throw new InvalidRequestException("Bucket "+ bucket |
| +" cannot be deleted"); |
| } |
| } |
| |
| /** |
| * Perform a bulk object delete operation. |
| * Increments the {@code OBJECT_DELETE_REQUESTS} and write |
| * operation statistics. |
| * Retry policy: retry untranslated; delete considered idempotent. |
| * If the request is throttled, this is logged in the throttle statistics, |
| * with the counter set to the number of keys, rather than the number |
| * of invocations of the delete operation. |
| * This is because S3 considers each key as one mutating operation on |
| * the store when updating its load counters on a specific partition |
| * of an S3 bucket. |
| * If only the request was measured, this operation would under-report. |
| * @param deleteRequest keys to delete on the s3-backend |
| * @return the AWS response |
| * @throws MultiObjectDeleteException one or more of the keys could not |
| * be deleted. |
| * @throws AmazonClientException amazon-layer failure. |
| */ |
| @Retries.RetryRaw |
| private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest) |
| throws MultiObjectDeleteException, AmazonClientException, IOException { |
| incrementWriteOperations(); |
| BulkDeleteRetryHandler retryHandler = |
| new BulkDeleteRetryHandler(createStoreContext()); |
| try(DurationInfo ignored = |
| new DurationInfo(LOG, false, "DELETE %d keys", |
| deleteRequest.getKeys().size())) { |
| return invoker.retryUntranslated("delete", |
| DELETE_CONSIDERED_IDEMPOTENT, |
| (text, e, r, i) -> { |
| // handle the failure |
| retryHandler.bulkDeleteRetried(deleteRequest, e); |
| }, |
| () -> { |
| incrementStatistic(OBJECT_DELETE_REQUESTS, 1); |
| return s3.deleteObjects(deleteRequest); |
| }); |
| } catch (MultiObjectDeleteException e) { |
| // one or more of the keys could not be deleted. |
| // log and rethrow |
| List<MultiObjectDeleteException.DeleteError> errors = e.getErrors(); |
| LOG.debug("Partial failure of delete, {} errors", errors.size(), e); |
| for (MultiObjectDeleteException.DeleteError error : errors) { |
| LOG.debug("{}: \"{}\" - {}", |
| error.getKey(), error.getCode(), error.getMessage()); |
| } |
| throw e; |
| } |
| } |
| |
| /** |
| * Create a putObject request. |
| * Adds the ACL and metadata |
| * @param key key of object |
| * @param metadata metadata header |
| * @param srcfile source file |
| * @return the request |
| */ |
| public PutObjectRequest newPutObjectRequest(String key, |
| ObjectMetadata metadata, File srcfile) { |
| Preconditions.checkNotNull(srcfile); |
| PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, |
| srcfile); |
| setOptionalPutRequestParameters(putObjectRequest); |
| putObjectRequest.setCannedAcl(cannedACL); |
| putObjectRequest.setMetadata(metadata); |
| return putObjectRequest; |
| } |
| |
| /** |
| * Create a {@link PutObjectRequest} request. |
| * The metadata is assumed to have been configured with the size of the |
| * operation. |
| * @param key key of object |
| * @param metadata metadata header |
| * @param inputStream source data. |
| * @return the request |
| */ |
| PutObjectRequest newPutObjectRequest(String key, |
| ObjectMetadata metadata, |
| InputStream inputStream) { |
| Preconditions.checkNotNull(inputStream); |
| Preconditions.checkArgument(isNotEmpty(key), "Null/empty key"); |
| PutObjectRequest putObjectRequest = new PutObjectRequest(bucket, key, |
| inputStream, metadata); |
| setOptionalPutRequestParameters(putObjectRequest); |
| putObjectRequest.setCannedAcl(cannedACL); |
| return putObjectRequest; |
| } |
| |
| /** |
| * Create a new object metadata instance. |
| * Any standard metadata headers are added here, for example: |
| * encryption. |
| * @return a new metadata instance |
| */ |
| public ObjectMetadata newObjectMetadata() { |
| final ObjectMetadata om = new ObjectMetadata(); |
| setOptionalObjectMetadata(om); |
| return om; |
| } |
| |
| /** |
| * Create a new object metadata instance. |
| * Any standard metadata headers are added here, for example: |
| * encryption. |
| * |
| * @param length length of data to set in header. |
| * @return a new metadata instance |
| */ |
| public ObjectMetadata newObjectMetadata(long length) { |
| final ObjectMetadata om = newObjectMetadata(); |
| if (length >= 0) { |
| om.setContentLength(length); |
| } |
| return om; |
| } |
| |
| /** |
| * Start a transfer-manager managed async PUT of an object, |
| * incrementing the put requests and put bytes |
| * counters. |
| * It does not update the other counters, |
| * as existing code does that as progress callbacks come in. |
| * Byte length is calculated from the file length, or, if there is no |
| * file, from the content length of the header. |
| * Because the operation is async, any stream supplied in the request |
| * must reference data (files, buffers) which stay valid until the upload |
| * completes. |
| * Retry policy: N/A: the transfer manager is performing the upload. |
| * @param putObjectRequest the request |
| * @return the upload initiated |
| */ |
| @Retries.OnceRaw |
| public UploadInfo putObject(PutObjectRequest putObjectRequest) { |
| long len = getPutRequestLength(putObjectRequest); |
| LOG.debug("PUT {} bytes to {} via transfer manager ", |
| len, putObjectRequest.getKey()); |
| incrementPutStartStatistics(len); |
| Upload upload = transfers.upload(putObjectRequest); |
| return new UploadInfo(upload, len); |
| } |
| |
| /** |
| * PUT an object directly (i.e. not via the transfer manager). |
| * Byte length is calculated from the file length, or, if there is no |
| * file, from the content length of the header. |
| * |
| * Retry Policy: none. |
| * <i>Important: this call will close any input stream in the request.</i> |
| * @param putObjectRequest the request |
| * @return the upload initiated |
| * @throws AmazonClientException on problems |
| * @throws MetadataPersistenceException if metadata about the write could |
| * not be saved to the metadata store and |
| * fs.s3a.metadatastore.fail.on.write.error=true |
| */ |
| @VisibleForTesting |
| @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated") |
| PutObjectResult putObjectDirect(PutObjectRequest putObjectRequest) |
| throws AmazonClientException, MetadataPersistenceException { |
| long len = getPutRequestLength(putObjectRequest); |
| LOG.debug("PUT {} bytes to {}", len, putObjectRequest.getKey()); |
| incrementPutStartStatistics(len); |
| try { |
| PutObjectResult result = s3.putObject(putObjectRequest); |
| incrementPutCompletedStatistics(true, len); |
| // update metadata |
| finishedWrite(putObjectRequest.getKey(), len, |
| result.getETag(), result.getVersionId(), null); |
| return result; |
| } catch (AmazonClientException e) { |
| incrementPutCompletedStatistics(false, len); |
| throw e; |
| } |
| } |
| |
| /** |
| * Get the length of the PUT, verifying that the length is known. |
| * @param putObjectRequest a request bound to a file or a stream. |
| * @return the request length |
| * @throws IllegalArgumentException if the length is negative |
| */ |
| private long getPutRequestLength(PutObjectRequest putObjectRequest) { |
| long len; |
| if (putObjectRequest.getFile() != null) { |
| len = putObjectRequest.getFile().length(); |
| } else { |
| len = putObjectRequest.getMetadata().getContentLength(); |
| } |
| Preconditions.checkState(len >= 0, "Cannot PUT object of unknown length"); |
| return len; |
| } |
| |
| /** |
| * Upload part of a multi-partition file. |
| * Increments the write and put counters. |
| * <i>Important: this call does not close any input stream in the request.</i> |
| * |
| * Retry Policy: none. |
| * @param request request |
| * @return the result of the operation. |
| * @throws AmazonClientException on problems |
| */ |
| @Retries.OnceRaw |
| UploadPartResult uploadPart(UploadPartRequest request) |
| throws AmazonClientException { |
| long len = request.getPartSize(); |
| incrementPutStartStatistics(len); |
| try { |
| setOptionalUploadPartRequestParameters(request); |
| UploadPartResult uploadPartResult = s3.uploadPart(request); |
| incrementPutCompletedStatistics(true, len); |
| return uploadPartResult; |
| } catch (AmazonClientException e) { |
| incrementPutCompletedStatistics(false, len); |
| throw e; |
| } |
| } |
| |
| /** |
| * At the start of a put/multipart upload operation, update the |
| * relevant counters. |
| * |
| * @param bytes bytes in the request. |
| */ |
| public void incrementPutStartStatistics(long bytes) { |
| LOG.debug("PUT start {} bytes", bytes); |
| incrementWriteOperations(); |
| incrementStatistic(OBJECT_PUT_REQUESTS); |
| incrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1); |
| if (bytes > 0) { |
| incrementGauge(OBJECT_PUT_BYTES_PENDING, bytes); |
| } |
| } |
| |
| /** |
| * At the end of a put/multipart upload operation, update the |
| * relevant counters and gauges. |
| * |
| * @param success did the operation succeed? |
| * @param bytes bytes in the request. |
| */ |
| public void incrementPutCompletedStatistics(boolean success, long bytes) { |
| LOG.debug("PUT completed success={}; {} bytes", success, bytes); |
| incrementWriteOperations(); |
| if (bytes > 0) { |
| incrementStatistic(OBJECT_PUT_BYTES, bytes); |
| decrementGauge(OBJECT_PUT_BYTES_PENDING, bytes); |
| } |
| incrementStatistic(OBJECT_PUT_REQUESTS_COMPLETED); |
| decrementGauge(OBJECT_PUT_REQUESTS_ACTIVE, 1); |
| } |
| |
| /** |
| * Callback for use in progress callbacks from put/multipart upload events. |
| * Increments those statistics which are expected to be updated during |
| * the ongoing upload operation. |
| * @param key key to file that is being written (for logging) |
| * @param bytes bytes successfully uploaded. |
| */ |
| public void incrementPutProgressStatistics(String key, long bytes) { |
| PROGRESS.debug("PUT {}: {} bytes", key, bytes); |
| incrementWriteOperations(); |
| if (bytes > 0) { |
| statistics.incrementBytesWritten(bytes); |
| } |
| } |
| |
| /** |
| * Delete a list of keys on a s3-backend. |
| * This does <i>not</i> update the metastore. |
| * Retry policy: retry untranslated; delete considered idempotent. |
| * @param keysToDelete collection of keys to delete on the s3-backend. |
| * if empty, no request is made of the object store. |
| * @param deleteFakeDir indicates whether this is for deleting fake dirs |
| * @param quiet should a bulk query be quiet, or should its result list |
| * all deleted keys? |
| * @return the deletion result if a multi object delete was invoked |
| * and it returned without a failure. |
| * @throws InvalidRequestException if the request was rejected due to |
| * a mistaken attempt to delete the root directory. |
| * @throws MultiObjectDeleteException one or more of the keys could not |
| * be deleted in a multiple object delete operation. |
| * The number of rejected objects will be added to the metric |
| * {@link Statistic#FILES_DELETE_REJECTED}. |
| * @throws AmazonClientException other amazon-layer failure. |
| */ |
| @Retries.RetryRaw |
| private DeleteObjectsResult removeKeysS3( |
| List<DeleteObjectsRequest.KeyVersion> keysToDelete, |
| boolean deleteFakeDir, |
| boolean quiet) |
| throws MultiObjectDeleteException, AmazonClientException, |
| IOException { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Initiating delete operation for {} objects", |
| keysToDelete.size()); |
| for (DeleteObjectsRequest.KeyVersion key : keysToDelete) { |
| LOG.debug(" {} {}", key.getKey(), |
| key.getVersion() != null ? key.getVersion() : ""); |
| } |
| } |
| DeleteObjectsResult result = null; |
| if (keysToDelete.isEmpty()) { |
| // exit fast if there are no keys to delete |
| return result; |
| } |
| for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { |
| blockRootDelete(keyVersion.getKey()); |
| } |
| try { |
| if (enableMultiObjectsDelete) { |
| result = deleteObjects( |
| new DeleteObjectsRequest(bucket) |
| .withKeys(keysToDelete) |
| .withQuiet(quiet)); |
| } else { |
| for (DeleteObjectsRequest.KeyVersion keyVersion : keysToDelete) { |
| deleteObject(keyVersion.getKey()); |
| } |
| } |
| } catch (MultiObjectDeleteException ex) { |
| // partial delete. |
| // Update the stats with the count of the actual number of successful |
| // deletions. |
| int rejected = ex.getErrors().size(); |
| noteDeleted(keysToDelete.size() - rejected, deleteFakeDir); |
| incrementStatistic(FILES_DELETE_REJECTED, rejected); |
| throw ex; |
| } |
| noteDeleted(keysToDelete.size(), deleteFakeDir); |
| return result; |
| } |
| |
| /** |
| * Note the deletion of files or fake directories deleted. |
| * @param count count of keys deleted. |
| * @param deleteFakeDir are the deletions fake directories? |
| */ |
| private void noteDeleted(final int count, final boolean deleteFakeDir) { |
| if (!deleteFakeDir) { |
| instrumentation.fileDeleted(count); |
| } else { |
| instrumentation.fakeDirsDeleted(count); |
| } |
| } |
| |
| /** |
| * Invoke {@link #removeKeysS3(List, boolean, boolean)} with handling of |
| * {@code MultiObjectDeleteException}. |
| * |
| * @param keysToDelete collection of keys to delete on the s3-backend. |
| * if empty, no request is made of the object store. |
| * @param deleteFakeDir indicates whether this is for deleting fake dirs |
| * @param operationState (nullable) operational state for a bulk update |
| * @throws InvalidRequestException if the request was rejected due to |
| * a mistaken attempt to delete the root directory. |
| * @throws MultiObjectDeleteException one or more of the keys could not |
| * be deleted in a multiple object delete operation. |
| * @throws AmazonClientException amazon-layer failure. |
| * @throws IOException other IO Exception. |
| */ |
| @VisibleForTesting |
| @Retries.RetryMixed |
| public void removeKeys( |
| final List<DeleteObjectsRequest.KeyVersion> keysToDelete, |
| final boolean deleteFakeDir, |
| final BulkOperationState operationState) |
| throws MultiObjectDeleteException, AmazonClientException, |
| IOException { |
| removeKeys(keysToDelete, deleteFakeDir, new ArrayList<>(), operationState, |
| true); |
| } |
| |
| /** |
| * Invoke {@link #removeKeysS3(List, boolean, boolean)} with handling of |
| * {@code MultiObjectDeleteException} before the exception is rethrown. |
| * Specifically: |
| * <ol> |
| * <li>Failure and !deleteFakeDir: S3Guard is updated with all |
| * deleted entries</li> |
| * <li>Failure where deleteFakeDir == true: do nothing with S3Guard</li> |
| * <li>Success: do nothing with S3Guard</li> |
| * </ol> |
| * @param keysToDelete collection of keys to delete on the s3-backend. |
| * if empty, no request is made of the object store. |
| * @param deleteFakeDir indicates whether this is for deleting fake dirs. |
| * @param undeletedObjectsOnFailure List which will be built up of all |
| * files that were not deleted. This happens even as an exception |
| * is raised. |
| * @param operationState (nullable) operational state for a bulk update |
| * @param quiet should a bulk query be quiet, or should its result list |
| * all deleted keys |
| * @return the deletion result if a multi object delete was invoked |
| * and it returned without a failure, else null. |
| * @throws InvalidRequestException if the request was rejected due to |
| * a mistaken attempt to delete the root directory. |
| * @throws MultiObjectDeleteException one or more of the keys could not |
| * be deleted in a multiple object delete operation. |
| * @throws AmazonClientException amazon-layer failure. |
| * @throws IOException other IO Exception. |
| */ |
| @Retries.RetryMixed |
| DeleteObjectsResult removeKeys( |
| final List<DeleteObjectsRequest.KeyVersion> keysToDelete, |
| final boolean deleteFakeDir, |
| final List<Path> undeletedObjectsOnFailure, |
| final BulkOperationState operationState, |
| final boolean quiet) |
| throws MultiObjectDeleteException, AmazonClientException, IOException { |
| undeletedObjectsOnFailure.clear(); |
| try (DurationInfo ignored = new DurationInfo(LOG, false, |
| "Deleting %d keys", keysToDelete.size())) { |
| return removeKeysS3(keysToDelete, deleteFakeDir, quiet); |
| } catch (MultiObjectDeleteException ex) { |
| LOG.debug("Partial delete failure"); |
| // what to do if an IOE was raised? Given an exception was being |
| // raised anyway, and the failures are logged, do nothing. |
| if (!deleteFakeDir) { |
| // when deleting fake directories we don't want to delete metastore |
| // entries so we only process these failures on "real" deletes. |
| Triple<List<Path>, List<Path>, List<Pair<Path, IOException>>> results = |
| new MultiObjectDeleteSupport(createStoreContext(), operationState) |
| .processDeleteFailure(ex, keysToDelete); |
| undeletedObjectsOnFailure.addAll(results.getMiddle()); |
| } |
| throw ex; |
| } catch (AmazonClientException | IOException ex) { |
| List<Path> paths = new MultiObjectDeleteSupport( |
| createStoreContext(), |
| operationState) |
| .processDeleteFailureGenericException(ex, keysToDelete); |
| // other failures. Assume nothing was deleted |
| undeletedObjectsOnFailure.addAll(paths); |
| throw ex; |
| } |
| } |
| |
| /** |
| * Delete a Path. This operation is at least {@code O(files)}, with |
| * added overheads to enumerate the path. It is also not atomic. |
| * |
| * @param f the path to delete. |
| * @param recursive if path is a directory and set to |
| * true, the directory is deleted else throws an exception. In |
| * case of a file the recursive can be set to either true or false. |
| * @return true if the path existed and then was deleted; false if there |
| * was no path in the first place, or the corner cases of root path deletion |
| * have surfaced. |
| * @throws IOException due to inability to delete a directory or file. |
| */ |
| @Retries.RetryTranslated |
| public boolean delete(Path f, boolean recursive) throws IOException { |
| try { |
| entryPoint(INVOCATION_DELETE); |
| DeleteOperation deleteOperation = new DeleteOperation( |
| createStoreContext(), |
| innerGetFileStatus(f, true, StatusProbeEnum.ALL), |
| recursive, |
| operationCallbacks, |
| pageSize); |
| boolean outcome = deleteOperation.execute(); |
| if (outcome) { |
| try { |
| maybeCreateFakeParentDirectory(f); |
| } catch (AccessDeniedException e) { |
| LOG.warn("Cannot create directory marker at {}: {}", |
| f.getParent(), e.toString()); |
| LOG.debug("Failed to create fake dir above {}", f, e); |
| } |
| } |
| return outcome; |
| } catch (FileNotFoundException e) { |
| LOG.debug("Couldn't delete {} - does not exist: {}", f, e.toString()); |
| instrumentation.errorIgnored(); |
| return false; |
| } catch (AmazonClientException e) { |
| throw translateException("delete", f, e); |
| } |
| } |
| |
| /** |
| * Create a fake directory if required. |
| * That is: it is not the root path and the path does not exist. |
| * Retry policy: retrying; untranslated. |
| * @param f path to create |
| * @throws IOException IO problem |
| */ |
| @Retries.RetryTranslated |
| private void createFakeDirectoryIfNecessary(Path f) |
| throws IOException, AmazonClientException { |
| String key = pathToKey(f); |
| // we only make the LIST call; the codepaths to get here should not |
| // be reached if there is an empty dir marker -and if they do, it |
| // is mostly harmless to create a new one. |
| if (!key.isEmpty() && !s3Exists(f, StatusProbeEnum.DIRECTORIES)) { |
| LOG.debug("Creating new fake directory at {}", f); |
| createFakeDirectory(key); |
| } |
| } |
| |
| /** |
| * Create a fake parent directory if required. |
| * That is: it parent is not the root path and does not yet exist. |
| * @param path whose parent is created if needed. |
| * @throws IOException IO problem |
| */ |
| @Retries.RetryTranslated |
| void maybeCreateFakeParentDirectory(Path path) |
| throws IOException, AmazonClientException { |
| Path parent = path.getParent(); |
| if (parent != null && !parent.isRoot()) { |
| createFakeDirectoryIfNecessary(parent); |
| } |
| } |
| |
| /** |
| * List the statuses of the files/directories in the given path if the path is |
| * a directory. |
| * |
| * @param f given path |
| * @return the statuses of the files/directories in the given patch |
| * @throws FileNotFoundException when the path does not exist; |
| * IOException see specific implementation |
| */ |
| public FileStatus[] listStatus(Path f) throws FileNotFoundException, |
| IOException { |
| return once("listStatus", |
| f.toString(), |
| () -> iteratorToStatuses(innerListStatus(f), new HashSet<>())); |
| } |
| |
| /** |
| * List the statuses of the files/directories in the given path if the path is |
| * a directory. |
| * |
| * @param f given path |
| * @return the statuses of the files/directories in the given patch |
| * @throws FileNotFoundException when the path does not exist; |
| * @throws IOException due to an IO problem. |
| * @throws AmazonClientException on failures inside the AWS SDK |
| */ |
| private RemoteIterator<S3AFileStatus> innerListStatus(Path f) |
| throws FileNotFoundException, |
| IOException, AmazonClientException { |
| Path path = qualify(f); |
| LOG.debug("List status for path: {}", path); |
| entryPoint(INVOCATION_LIST_STATUS); |
| |
| Triple<RemoteIterator<S3AFileStatus>, DirListingMetadata, Boolean> |
| statusesAssumingNonEmptyDir = listing |
| .getFileStatusesAssumingNonEmptyDir(path); |
| |
| if (!statusesAssumingNonEmptyDir.getLeft().hasNext() && |
| statusesAssumingNonEmptyDir.getRight()) { |
| // We are sure that this is an empty directory in auth mode. |
| return statusesAssumingNonEmptyDir.getLeft(); |
| } else if (!statusesAssumingNonEmptyDir.getLeft().hasNext()) { |
| // We may have an empty dir, or may have file or may have nothing. |
| // So we call innerGetFileStatus to get the status, this may throw |
| // FileNotFoundException if we have nothing. |
| // So We are guaranteed to have either a dir marker or a file. |
| final S3AFileStatus fileStatus = innerGetFileStatus(path, false, |
| StatusProbeEnum.ALL); |
| // If it is a file return directly. |
| if (fileStatus.isFile()) { |
| LOG.debug("Adding: rd (not a dir): {}", path); |
| S3AFileStatus[] stats = new S3AFileStatus[1]; |
| stats[0] = fileStatus; |
| return listing.createProvidedFileStatusIterator( |
| stats, |
| ACCEPT_ALL, |
| Listing.ACCEPT_ALL_BUT_S3N); |
| } |
| } |
| // Here we have a directory which may or may not be empty. |
| // So we update the metastore and return. |
| return S3Guard.dirListingUnion( |
| metadataStore, |
| path, |
| statusesAssumingNonEmptyDir.getLeft(), |
| statusesAssumingNonEmptyDir.getMiddle(), |
| allowAuthoritative(path), |
| ttlTimeProvider, p -> |
| listing.createProvidedFileStatusIterator( |
| dirMetaToStatuses(statusesAssumingNonEmptyDir.getMiddle()), |
| ACCEPT_ALL, |
| Listing.ACCEPT_ALL_BUT_S3N)); |
| } |
| |
| /** |
| * Is a path to be considered as authoritative? |
| * True iff there is an authoritative metastore or if there |
| * is a non-auth store with the supplied path under |
| * one of the paths declared as authoritative. |
| * @param path path |
| * @return true if the path is auth |
| */ |
| public boolean allowAuthoritative(final Path path) { |
| return S3Guard.allowAuthoritative(path, this, |
| allowAuthoritativeMetadataStore, allowAuthoritativePaths); |
| } |
| |
| /** |
| * Create a {@code ListObjectsRequest} request against this bucket, |
| * with the maximum keys returned in a query set by {@link #maxKeys}. |
| * @param key key for request |
| * @param delimiter any delimiter |
| * @return the request |
| */ |
| @VisibleForTesting |
| public S3ListRequest createListObjectsRequest(String key, |
| String delimiter) { |
| return createListObjectsRequest(key, delimiter, null); |
| } |
| |
| private S3ListRequest createListObjectsRequest(String key, |
| String delimiter, Integer overrideMaxKeys) { |
| if (!useListV1) { |
| ListObjectsV2Request request = |
| new ListObjectsV2Request().withBucketName(bucket) |
| .withMaxKeys(maxKeys) |
| .withPrefix(key); |
| if (delimiter != null) { |
| request.setDelimiter(delimiter); |
| } |
| if (overrideMaxKeys != null) { |
| request.setMaxKeys(overrideMaxKeys); |
| } |
| return S3ListRequest.v2(request); |
| } else { |
| ListObjectsRequest request = new ListObjectsRequest(); |
| request.setBucketName(bucket); |
| request.setMaxKeys(maxKeys); |
| request.setPrefix(key); |
| if (delimiter != null) { |
| request.setDelimiter(delimiter); |
| } |
| if (overrideMaxKeys != null) { |
| request.setMaxKeys(overrideMaxKeys); |
| } |
| return S3ListRequest.v1(request); |
| } |
| } |
| |
| /** |
| * Set the current working directory for the given file system. All relative |
| * paths will be resolved relative to it. |
| * |
| * @param newDir the current working directory. |
| */ |
| public void setWorkingDirectory(Path newDir) { |
| workingDir = makeQualified(newDir); |
| } |
| |
| /** |
| * Get the current working directory for the given file system. |
| * @return the directory pathname |
| */ |
| public Path getWorkingDirectory() { |
| return workingDir; |
| } |
| |
| /** |
| * Get the username of the FS. |
| * @return the short name of the user who instantiated the FS |
| */ |
| public String getUsername() { |
| return username; |
| } |
| |
| /** |
| * Get the owner of this FS: who created it? |
| * @return the owner of the FS. |
| */ |
| public UserGroupInformation getOwner() { |
| return owner; |
| } |
| |
| /** |
| * |
| * Make the given path and all non-existent parents into |
| * directories. Has the semantics of Unix {@code 'mkdir -p'}. |
| * Existence of the directory hierarchy is not an error. |
| * @param path path to create |
| * @param permission to apply to f |
| * @return true if a directory was created or already existed |
| * @throws FileAlreadyExistsException there is a file at the path specified |
| * @throws IOException other IO problems |
| */ |
| // TODO: If we have created an empty file at /foo/bar and we then call |
| // mkdirs for /foo/bar/baz/roo what happens to the empty file /foo/bar/? |
| public boolean mkdirs(Path path, FsPermission permission) throws IOException, |
| FileAlreadyExistsException { |
| try { |
| entryPoint(INVOCATION_MKDIRS); |
| return innerMkdirs(path, permission); |
| } catch (AmazonClientException e) { |
| throw translateException("mkdirs", path, e); |
| } |
| } |
| |
| /** |
| * |
| * Make the given path and all non-existent parents into |
| * directories. |
| * See {@link #mkdirs(Path, FsPermission)} |
| * @param p path to create |
| * @param permission to apply to f |
| * @return true if a directory was created or already existed |
| * @throws FileAlreadyExistsException there is a file at the path specified |
| * @throws IOException other IO problems |
| * @throws AmazonClientException on failures inside the AWS SDK |
| */ |
| private boolean innerMkdirs(Path p, FsPermission permission) |
| throws IOException, FileAlreadyExistsException, AmazonClientException { |
| Path f = qualify(p); |
| LOG.debug("Making directory: {}", f); |
| if (p.isRoot()) { |
| // fast exit for root. |
| return true; |
| } |
| FileStatus fileStatus; |
| |
| try { |
| fileStatus = innerGetFileStatus(f, false, |
| StatusProbeEnum.ALL); |
| |
| if (fileStatus.isDirectory()) { |
| return true; |
| } else { |
| throw new FileAlreadyExistsException("Path is a file: " + f); |
| } |
| } catch (FileNotFoundException e) { |
| // Walk path to root, ensuring closest ancestor is a directory, not file |
| Path fPart = f.getParent(); |
| while (fPart != null && !fPart.isRoot()) { |
| try { |
| fileStatus = getFileStatus(fPart); |
| if (fileStatus.isDirectory()) { |
| break; |
| } |
| if (fileStatus.isFile()) { |
| throw new FileAlreadyExistsException(String.format( |
| "Can't make directory for path '%s' since it is a file.", |
| fPart)); |
| } |
| } catch (FileNotFoundException fnfe) { |
| instrumentation.errorIgnored(); |
| } |
| fPart = fPart.getParent(); |
| } |
| String key = pathToKey(f); |
| // this will create the marker file, delete the parent entries |
| // and update S3Guard |
| createFakeDirectory(key); |
| return true; |
| } |
| } |
| |
| /** |
| * Return a file status object that represents the path. |
| * @param f The path we want information from |
| * @return a FileStatus object |
| * @throws FileNotFoundException when the path does not exist |
| * @throws IOException on other problems. |
| */ |
| @Retries.RetryTranslated |
| public FileStatus getFileStatus(final Path f) throws IOException { |
| entryPoint(INVOCATION_GET_FILE_STATUS); |
| return innerGetFileStatus(f, false, StatusProbeEnum.ALL); |
| } |
| |
| /** |
| * Get the status of a file or directory, first through S3Guard and then |
| * through S3. |
| * The S3 probes can leave 404 responses in the S3 load balancers; if |
| * a check is only needed for a directory, declaring this saves time and |
| * avoids creating one for the object. |
| * When only probing for directories, if an entry for a file is found in |
| * S3Guard it is returned, but checks for updated values are skipped. |
| * Internal version of {@link #getFileStatus(Path)}. |
| * @param f The path we want information from |
| * @param needEmptyDirectoryFlag if true, implementation will calculate |
| * a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()} |
| * @param probes probes to make. |
| * @return a S3AFileStatus object |
| * @throws FileNotFoundException when the path does not exist |
| * @throws IOException on other problems. |
| */ |
| @VisibleForTesting |
| @Retries.RetryTranslated |
| S3AFileStatus innerGetFileStatus(final Path f, |
| final boolean needEmptyDirectoryFlag, |
| final Set<StatusProbeEnum> probes) throws IOException { |
| final Path path = qualify(f); |
| String key = pathToKey(path); |
| LOG.debug("Getting path status for {} ({}); needEmptyDirectory={}", |
| path, key, needEmptyDirectoryFlag); |
| |
| boolean allowAuthoritative = allowAuthoritative(path); |
| // Check MetadataStore, if any. |
| PathMetadata pm = null; |
| if (hasMetadataStore()) { |
| pm = S3Guard.getWithTtl(metadataStore, path, ttlTimeProvider, |
| needEmptyDirectoryFlag, allowAuthoritative); |
| } |
| Set<Path> tombstones = Collections.emptySet(); |
| if (pm != null) { |
| S3AFileStatus msStatus = pm.getFileStatus(); |
| if (pm.isDeleted()) { |
| OffsetDateTime deletedAt = OffsetDateTime.ofInstant( |
| Instant.ofEpochMilli(msStatus.getModificationTime()), |
| ZoneOffset.UTC); |
| throw new FileNotFoundException("Path " + path + " is recorded as " + |
| "deleted by S3Guard at " + deletedAt); |
| } |
| |
| // if ms is not authoritative, check S3 if there's any recent |
| // modification - compare the modTime to check if metadata is up to date |
| // Skip going to s3 if the file checked is a directory. Because if the |
| // dest is also a directory, there's no difference. |
| |
| if (!msStatus.isDirectory() && |
| !allowAuthoritative && |
| probes.contains(StatusProbeEnum.Head)) { |
| // a file has been found in a non-auth path and the caller has not said |
| // they only care about directories |
| LOG.debug("Metadata for {} found in the non-auth metastore.", path); |
| // If the timestamp of the pm is close to "now", we don't need to |
| // bother with a check of S3. that means: |
| // one of : status modtime is close to now, |
| // or pm.getLastUpdated() == now |
| |
| // get the time in which a status modtime is considered valid |
| // in a non-auth metastore |
| long validTime = |
| ttlTimeProvider.getNow() - ttlTimeProvider.getMetadataTtl(); |
| final long msModTime = msStatus.getModificationTime(); |
| |
| if (msModTime < validTime) { |
| LOG.debug("Metastore entry of {} is out of date, probing S3", path); |
| try { |
| S3AFileStatus s3AFileStatus = s3GetFileStatus(path, |
| key, |
| probes, |
| tombstones, |
| needEmptyDirectoryFlag); |
| // if the new status is more current than that in the metastore, |
| // it means S3 has changed and the store needs updating |
| final long s3ModTime = s3AFileStatus.getModificationTime(); |
| |
| if (s3ModTime > msModTime) { |
| // there's new data in S3 |
| LOG.debug("S3Guard metadata for {} is outdated;" |
| + " s3modtime={}; msModTime={} updating metastore", |
| path, s3ModTime, msModTime); |
| // add to S3Guard |
| S3Guard.putAndReturn(metadataStore, s3AFileStatus, |
| ttlTimeProvider); |
| } else { |
| // the modtime of the data is the same as/older than the s3guard |
| // value either an old object has been found, or the existing one |
| // was retrieved in both cases -refresh the S3Guard entry so the |
| // record's TTL is updated. |
| S3Guard.refreshEntry(metadataStore, pm, s3AFileStatus, |
| ttlTimeProvider); |
| } |
| // return the value |
| // note that the checks for empty dir status below can be skipped |
| // because the call to s3GetFileStatus include the checks there |
| return s3AFileStatus; |
| } catch (FileNotFoundException fne) { |
| // the attempt to refresh the record failed because there was |
| // no entry. Either it is a new file not visible, or it |
| // has been deleted (and therefore S3Guard is out of sync with S3) |
| LOG.warn("Failed to find file {}. Either it is not yet visible, or " |
| + "it has been deleted.", path); |
| } |
| } |
| } |
| |
| if (needEmptyDirectoryFlag && msStatus.isDirectory()) { |
| // the caller needs to know if a directory is empty, |
| // and that this is a directory. |
| if (pm.isEmptyDirectory() != Tristate.UNKNOWN) { |
| // We have a definitive true / false from MetadataStore, we are done. |
| return msStatus; |
| } else { |
| // execute a S3Guard listChildren command to list tombstones under the |
| // path. |
| // This list will be used in the forthcoming s3GetFileStatus call. |
| DirListingMetadata children = |
| S3Guard.listChildrenWithTtl(metadataStore, path, ttlTimeProvider, |
| allowAuthoritative); |
| if (children != null) { |
| tombstones = children.listTombstones(); |
| } |
| LOG.debug("MetadataStore doesn't know if {} is empty, using S3.", |
| path); |
| } |
| } else { |
| // Either this is not a directory, or we don't care if it is empty |
| return msStatus; |
| } |
| |
| // now issue the S3 getFileStatus call. |
| try { |
| S3AFileStatus s3FileStatus = s3GetFileStatus(path, |
| key, |
| probes, |
| tombstones, |
| true); |
| // entry was found, so save in S3Guard and return the final value. |
| return S3Guard.putAndReturn(metadataStore, s3FileStatus, |
| ttlTimeProvider); |
| } catch (FileNotFoundException e) { |
| // If the metadata store has no children for it and it's not listed in |
| // S3 yet, we'll conclude that it is an empty directory |
| return S3AFileStatus.fromFileStatus(msStatus, Tristate.TRUE, |
| null, null); |
| } |
| } else { |
| // there was no entry in S3Guard |
| // retrieve the data and update the metadata store in the process. |
| return S3Guard.putAndReturn(metadataStore, |
| s3GetFileStatus(path, |
| key, |
| probes, |
| tombstones, |
| needEmptyDirectoryFlag), |
| ttlTimeProvider); |
| } |
| } |
| |
| /** |
| * Raw {@code getFileStatus} that talks direct to S3. |
| * Used to implement {@link #innerGetFileStatus(Path, boolean, Set)}, |
| * and for direct management of empty directory blobs. |
| * |
| * Checks made, in order: |
| * <ol> |
| * <li> |
| * Head: look for an object at the given key, provided that |
| * the key doesn't end in "/" |
| * </li> |
| * <li> |
| * DirMarker: look for the directory marker -the key with a trailing / |
| * if not passed in. |
| * If an object was found with size 0 bytes, a directory status entry |
| * is returned which declares that the directory is empty. |
| * </li> |
| * <li> |
| * List: issue a LIST on the key (with / if needed), require one |
| * entry to be found for the path to be considered a non-empty directory. |
| * </li> |
| * </ol> |
| * |
| * Notes: |
| * <ul> |
| * <li> |
| * Objects ending in / which are not 0-bytes long are not treated as |
| * directory markers, but instead as files. |
| * </li> |
| * <li> |
| * There's ongoing discussions about whether a dir marker |
| * should be interpreted as an empty dir. |
| * </li> |
| * <li> |
| * The HEAD requests require the permissions to read an object, |
| * including (we believe) the ability to decrypt the file. |
| * At the very least, for SSE-C markers, you need the same key on |
| * the client for the HEAD to work. |
| * </li> |
| * <li> |
| * The List probe needs list permission; it is also more prone to |
| * inconsistency, even on newly created files. |
| * </li> |
| * </ul> |
| * |
| * Retry policy: retry translated. |
| * @param path Qualified path |
| * @param key Key string for the path |
| * @param probes probes to make |
| * @param tombstones tombstones to filter |
| * @param needEmptyDirectoryFlag if true, implementation will calculate |
| * a TRUE or FALSE value for {@link S3AFileStatus#isEmptyDirectory()} |
| * @return Status |
| * @throws FileNotFoundException the supplied probes failed. |
| * @throws IOException on other problems. |
| */ |
| @VisibleForTesting |
| @Retries.RetryTranslated |
| S3AFileStatus s3GetFileStatus(final Path path, |
| final String key, |
| final Set<StatusProbeEnum> probes, |
| @Nullable final Set<Path> tombstones, |
| final boolean needEmptyDirectoryFlag) throws IOException { |
| LOG.debug("S3GetFileStatus {}", path); |
| // either you aren't looking for the directory flag, or you are, |
| // and if you are, the probe list must contain list. |
| Preconditions.checkArgument(!needEmptyDirectoryFlag |
| || probes.contains(StatusProbeEnum.List), |
| "s3GetFileStatus(%s) wants to know if a directory is empty but" |
| + " does not request a list probe", path); |
| |
| if (!key.isEmpty() && !key.endsWith("/") |
| && probes.contains(StatusProbeEnum.Head)) { |
| try { |
| // look for the simple file |
| ObjectMetadata meta = getObjectMetadata(key); |
| LOG.debug("Found exact file: normal file {}", key); |
| return new S3AFileStatus(meta.getContentLength(), |
| dateToLong(meta.getLastModified()), |
| path, |
| getDefaultBlockSize(path), |
| username, |
| meta.getETag(), |
| meta.getVersionId()); |
| } catch (AmazonServiceException e) { |
| // if the response is a 404 error, it just means that there is |
| // no file at that path...the remaining checks will be needed. |
| if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) { |
| throw translateException("getFileStatus", path, e); |
| } |
| } catch (AmazonClientException e) { |
| throw translateException("getFileStatus", path, e); |
| } |
| } |
| |
| // execute the list |
| if (probes.contains(StatusProbeEnum.List)) { |
| try { |
| // this will find a marker dir / as well as an entry. |
| // When making a simple "is this a dir check" all is good. |
| // but when looking for an empty dir, we need to verify there are no |
| // children, so ask for two entries, so as to find |
| // a child |
| String dirKey = maybeAddTrailingSlash(key); |
| // list size is dir marker + at least one non-tombstone entry |
| // there's a corner case: more tombstones than you have in a |
| // single page list. We assume that if you have been deleting |
| // that many files, then the AWS listing will have purged some |
| // by the time of listing so that the response includes some |
| // which have not. |
| |
| int listSize; |
| if (tombstones == null) { |
| // no tombstones so look for a marker and at least one child. |
| listSize = 2; |
| } else { |
| // build a listing > tombstones. If the caller has many thousands |
| // of tombstones this won't work properly, which is why pruning |
| // of expired tombstones matters. |
| listSize = Math.min(2 + tombstones.size(), Math.max(2, maxKeys)); |
| } |
| S3ListRequest request = createListObjectsRequest(dirKey, "/", |
| listSize); |
| // execute the request |
| S3ListResult listResult = listObjects(request); |
| |
| |
| if (listResult.hasPrefixesOrObjects(contextAccessors, tombstones)) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Found path as directory (with /)"); |
| listResult.logAtDebug(LOG); |
| } |
| // At least one entry has been found. |
| // If looking for an empty directory, the marker must exist but no |
| // children. |
| // So the listing must contain the marker entry only. |
| if (needEmptyDirectoryFlag |
| && listResult.representsEmptyDirectory( |
| contextAccessors, dirKey, tombstones)) { |
| return new S3AFileStatus(Tristate.TRUE, path, username); |
| } |
| // either an empty directory is not needed, or the |
| // listing does not meet the requirements. |
| return new S3AFileStatus(Tristate.FALSE, path, username); |
| } else if (key.isEmpty()) { |
| LOG.debug("Found root directory"); |
| return new S3AFileStatus(Tristate.TRUE, path, username); |
| } |
| } catch (AmazonServiceException e) { |
| if (e.getStatusCode() != SC_404 || isUnknownBucket(e)) { |
| throw translateException("getFileStatus", path, e); |
| } |
| } catch (AmazonClientException e) { |
| throw translateException("getFileStatus", path, e); |
| } |
| } |
| |
| LOG.debug("Not Found: {}", path); |
| throw new FileNotFoundException("No such file or directory: " + path); |
| } |
| |
| /** |
| * Raw version of {@link FileSystem#exists(Path)} which uses S3 only: |
| * S3Guard MetadataStore, if any, will be skipped. |
| * Retry policy: retrying; translated. |
| * @param path qualified path to look for |
| * @param probes probes to make |
| * @return true if path exists in S3 |
| * @throws IOException IO failure |
| */ |
| @Retries.RetryTranslated |
| private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes) |
| throws IOException { |
| String key = pathToKey(path); |
| try { |
| s3GetFileStatus(path, key, probes, null, false); |
| return true; |
| } catch (FileNotFoundException e) { |
| return false; |
| } |
| } |
| |
| /** |
| * The src file is on the local disk. Add it to FS at |
| * the given dst name. |
| * |
| * This version doesn't need to create a temporary file to calculate the md5. |
| * Sadly this doesn't seem to be used by the shell cp :( |
| * |
| * delSrc indicates if the source should be removed |
| * @param delSrc whether to delete the src |
| * @param overwrite whether to overwrite an existing file |
| * @param src path |
| * @param dst path |
| * @throws IOException IO problem |
| * @throws FileAlreadyExistsException the destination file exists and |
| * overwrite==false |
| * @throws AmazonClientException failure in the AWS SDK |
| */ |
| @Override |
| public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, |
| Path dst) throws IOException { |
| entryPoint(INVOCATION_COPY_FROM_LOCAL_FILE); |
| LOG.debug("Copying local file from {} to {}", src, dst); |
| // innerCopyFromLocalFile(delSrc, overwrite, src, dst); |
| super.copyFromLocalFile(delSrc, overwrite, src, dst); |
| } |
| |
| /** |
| * The src file is on the local disk. Add it to FS at |
| * the given dst name. |
| * |
| * This version doesn't need to create a temporary file to calculate the md5. |
| * Sadly this doesn't seem to be used by the shell cp :( |
| * |
| * <i>HADOOP-15932:</i> this method has been unwired from |
| * {@link #copyFromLocalFile(boolean, boolean, Path, Path)} until |
| * it is extended to list and copy whole directories. |
| * delSrc indicates if the source should be removed |
| * @param delSrc whether to delete the src |
| * @param overwrite whether to overwrite an existing file |
| * @param src Source path: must be on local filesystem |
| * @param dst path |
| * @throws IOException IO problem |
| * @throws FileAlreadyExistsException the destination file exists and |
| * overwrite==false, or if the destination is a directory. |
| * @throws FileNotFoundException if the source file does not exit |
| * @throws AmazonClientException failure in the AWS SDK |
| * @throws IllegalArgumentException if the source path is not on the local FS |
| */ |
| @Retries.RetryTranslated |
| private void innerCopyFromLocalFile(boolean delSrc, boolean overwrite, |
| Path src, Path dst) |
| throws IOException, FileAlreadyExistsException, AmazonClientException { |
| entryPoint(INVOCATION_COPY_FROM_LOCAL_FILE); |
| LOG.debug("Copying local file from {} to {}", src, dst); |
| |
| // Since we have a local file, we don't need to stream into a temporary file |
| LocalFileSystem local = getLocal(getConf()); |
| File srcfile = local.pathToFile(src); |
| if (!srcfile.exists()) { |
| throw new FileNotFoundException("No file: " + src); |
| } |
| if (!srcfile.isFile()) { |
| throw new FileNotFoundException("Not a file: " + src); |
| } |
| |
| try { |
| FileStatus status = getFileStatus(dst); |
| if (!status.isFile()) { |
| throw new FileAlreadyExistsException(dst + " exists and is not a file"); |
| } |
| if (!overwrite) { |
| throw new FileAlreadyExistsException(dst + " already exists"); |
| } |
| } catch (FileNotFoundException e) { |
| // no destination, all is well |
| } |
| final String key = pathToKey(dst); |
| final ObjectMetadata om = newObjectMetadata(srcfile.length()); |
| Progressable progress = null; |
| PutObjectRequest putObjectRequest = newPutObjectRequest(key, om, srcfile); |
| invoker.retry("copyFromLocalFile(" + src + ")", dst.toString(), true, |
| () -> executePut(putObjectRequest, progress)); |
| if (delSrc) { |
| local.delete(src, false); |
| } |
| } |
| |
| /** |
| * Execute a PUT via the transfer manager, blocking for completion, |
| * updating the metastore afterwards. |
| * If the waiting for completion is interrupted, the upload will be |
| * aborted before an {@code InterruptedIOException} is thrown. |
| * @param putObjectRequest request |
| * @param progress optional progress callback |
| * @return the upload result |
| * @throws InterruptedIOException if the blocking was interrupted. |
| * @throws MetadataPersistenceException if metadata about the write could |
| * not be saved to the metadata store and |
| * fs.s3a.metadatastore.fail.on.write.error=true |
| */ |
| @Retries.OnceRaw("For PUT; post-PUT actions are RetryTranslated") |
| UploadResult executePut(PutObjectRequest putObjectRequest, |
| Progressable progress) |
| throws InterruptedIOException, MetadataPersistenceException { |
| String key = putObjectRequest.getKey(); |
| UploadInfo info = putObject(putObjectRequest); |
| Upload upload = info.getUpload(); |
| ProgressableProgressListener listener = new ProgressableProgressListener( |
| this, key, upload, progress); |
| upload.addProgressListener(listener); |
| UploadResult result = waitForUploadCompletion(key, info); |
| listener.uploadCompleted(); |
| // post-write actions |
| finishedWrite(key, info.getLength(), |
| result.getETag(), result.getVersionId(), null); |
| return result; |
| } |
| |
| /** |
| * Wait for an upload to complete. |
| * If the waiting for completion is interrupted, the upload will be |
| * aborted before an {@code InterruptedIOException} is thrown. |
| * If the upload (or its result collection) failed, this is where |
| * the failure is raised as an AWS exception |
| * @param key destination key |
| * @param uploadInfo upload to wait for |
| * @return the upload result |
| * @throws InterruptedIOException if the blocking was interrupted. |
| */ |
| @Retries.OnceRaw |
| UploadResult waitForUploadCompletion(String key, UploadInfo uploadInfo) |
| throws InterruptedIOException { |
| Upload upload = uploadInfo.getUpload(); |
| try { |
| UploadResult result = upload.waitForUploadResult(); |
| incrementPutCompletedStatistics(true, uploadInfo.getLength()); |
| return result; |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted: aborting upload"); |
| incrementPutCompletedStatistics(false, uploadInfo.getLength()); |
| upload.abort(); |
| throw (InterruptedIOException) |
| new InterruptedIOException("Interrupted in PUT to " |
| + keyToQualifiedPath(key)) |
| .initCause(e); |
| } |
| } |
| |
| /** |
| * Close the filesystem. This shuts down all transfers. |
| * @throws IOException IO problem |
| */ |
| @Override |
| public void close() throws IOException { |
| if (closed.getAndSet(true)) { |
| // already closed |
| return; |
| } |
| isClosed = true; |
| LOG.debug("Filesystem {} is closed", uri); |
| try { |
| super.close(); |
| } finally { |
| stopAllServices(); |
| } |
| } |
| |
| /** |
| * Stop all services. |
| * This is invoked in close() and during failures of initialize() |
| * -make sure that all operations here are robust to failures in |
| * both the expected state of this FS and of failures while being stopped. |
| */ |
| protected synchronized void stopAllServices() { |
| if (transfers != null) { |
| try { |
| transfers.shutdownNow(true); |
| } catch (RuntimeException e) { |
| // catch and swallow for resilience. |
| LOG.debug("When shutting down", e); |
| } |
| transfers = null; |
| } |
| HadoopExecutors.shutdown(boundedThreadPool, LOG, |
| THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); |
| boundedThreadPool = null; |
| HadoopExecutors.shutdown(unboundedThreadPool, LOG, |
| THREAD_POOL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); |
| unboundedThreadPool = null; |
| cleanupWithLogger(LOG, |
| metadataStore, |
| instrumentation, |
| delegationTokens.orElse(null), |
| signerManager); |
| closeAutocloseables(LOG, credentials); |
| delegationTokens = Optional.empty(); |
| signerManager = null; |
| credentials = null; |
| } |
| |
| /** |
| * Verify that the input stream is open. Non blocking; this gives |
| * the last state of the volatile {@link #closed} field. |
| * @throws IOException if the connection is closed. |
| */ |
| private void checkNotClosed() throws IOException { |
| if (isClosed) { |
| throw new IOException(uri + ": " + E_FS_CLOSED); |
| } |
| } |
| |
| /** |
| * Get the delegation token support for this filesystem; |
| * not null iff delegation support is enabled. |
| * @return the token support, or an empty option. |
| */ |
| @VisibleForTesting |
| public Optional<S3ADelegationTokens> getDelegationTokens() { |
| return delegationTokens; |
| } |
| |
| /** |
| * Return a service name iff delegation tokens are enabled and the |
| * token binding is issuing delegation tokens. |
| * @return the canonical service name or null |
| */ |
| @Override |
| public String getCanonicalServiceName() { |
| // this could all be done in map statements, but it'd be harder to |
| // understand and maintain. |
| // Essentially: no DTs, no canonical service name. |
| if (!delegationTokens.isPresent()) { |
| return null; |
| } |
| // DTs present: ask the binding if it is willing to |
| // serve tokens (or fail noisily). |
| S3ADelegationTokens dt = delegationTokens.get(); |
| return dt.getTokenIssuingPolicy() != NoTokensAvailable |
| ? dt.getCanonicalServiceName() |
| : null; |
| } |
| |
| /** |
| * Get a delegation token if the FS is set up for them. |
| * If the user already has a token, it is returned, |
| * <i>even if it has expired</i>. |
| * @param renewer the account name that is allowed to renew the token. |
| * @return the delegation token or null |
| * @throws IOException IO failure |
| */ |
| @Override |
| public Token<AbstractS3ATokenIdentifier> getDelegationToken(String renewer) |
| throws IOException { |
| entryPoint(Statistic.INVOCATION_GET_DELEGATION_TOKEN); |
| LOG.debug("Delegation token requested"); |
| if (delegationTokens.isPresent()) { |
| return delegationTokens.get().getBoundOrNewDT(encryptionSecrets, |
| (renewer != null ? new Text(renewer) : new Text())); |
| } else { |
| // Delegation token support is not set up |
| LOG.debug("Token support is not enabled"); |
| return null; |
| } |
| } |
| |
| /** |
| * Ask any DT plugin for any extra token issuers. |
| * These do not get told of the encryption secrets and can |
| * return any type of token. |
| * This allows DT plugins to issue extra tokens for |
| * ancillary services. |
| */ |
| @Override |
| public DelegationTokenIssuer[] getAdditionalTokenIssuers() |
| throws IOException { |
| if (delegationTokens.isPresent()) { |
| return delegationTokens.get().getAdditionalTokenIssuers(); |
| } else { |
| // Delegation token support is not set up |
| LOG.debug("Token support is not enabled"); |
| return null; |
| } |
| } |
| |
| /** |
| * Build the AWS policy for restricted access to the resources needed |
| * by this bucket. |
| * The policy generated includes S3 access, S3Guard access |
| * if needed, and KMS operations. |
| * @param access access level desired. |
| * @return a policy for use in roles |
| */ |
| @Override |
| public List<RoleModel.Statement> listAWSPolicyRules( |
| final Set<AccessLevel> access) { |
| if (access.isEmpty()) { |
| return Collections.emptyList(); |
| } |
| List<RoleModel.Statement> statements = new ArrayList<>( |
| allowS3Operations(bucket, |
| access.contains(AccessLevel.WRITE) |
| || access.contains(AccessLevel.ADMIN))); |
| |
| // no attempt is made to qualify KMS access; there's no |
| // way to predict read keys, and not worried about granting |
| // too much encryption access. |
| statements.add(STATEMENT_ALLOW_SSE_KMS_RW); |
| |
| // add any metastore policies |
| if (metadataStore instanceof AWSPolicyProvider) { |
| statements.addAll( |
| ((AWSPolicyProvider) metadataStore).listAWSPolicyRules(access)); |
| } |
| return statements; |
| } |
| |
| /** |
| * Copy a single object in the bucket via a COPY operation. |
| * There's no update of metadata, directory markers, etc. |
| * Callers must implement. |
| * @param srcKey source object path |
| * @param dstKey destination object path |
| * @param size object size |
| * @param srcAttributes S3 attributes of the source object |
| * @param readContext the read context |
| * @return the result of the copy |
| * @throws InterruptedIOException the operation was interrupted |
| * @throws IOException Other IO problems |
| */ |
| @Retries.RetryTranslated |
| private CopyResult copyFile(String srcKey, String dstKey, long size, |
| S3ObjectAttributes srcAttributes, S3AReadOpContext readContext) |
| throws IOException, InterruptedIOException { |
| LOG.debug("copyFile {} -> {} ", srcKey, dstKey); |
| |
| ProgressListener progressListener = progressEvent -> { |
| switch (progressEvent.getEventType()) { |
| case TRANSFER_PART_COMPLETED_EVENT: |
| incrementWriteOperations(); |
| break; |
| default: |
| break; |
| } |
| }; |
| |
| ChangeTracker changeTracker = new ChangeTracker( |
| keyToQualifiedPath(srcKey).toString(), |
| changeDetectionPolicy, |
| readContext.instrumentation.newInputStreamStatistics() |
| .getVersionMismatchCounter(), |
| srcAttributes); |
| |
| String action = "copyFile(" + srcKey + ", " + dstKey + ")"; |
| Invoker readInvoker = readContext.getReadInvoker(); |
| |
| ObjectMetadata srcom; |
| try { |
| srcom = once(action, srcKey, |
| () -> |
| getObjectMetadata(srcKey, changeTracker, readInvoker, "copy")); |
| } catch (FileNotFoundException e) { |
| // if rename fails at this point it means that the expected file was not |
| // found. |
| // The cause is believed to always be one of |
| // - File was deleted since LIST/S3Guard metastore.list.() knew of it. |
| // - S3Guard is asking for a specific version and it's been removed by |
| // lifecycle rules. |
| // - there's a 404 cached in the S3 load balancers. |
| LOG.debug("getObjectMetadata({}) failed to find an expected file", |
| srcKey, e); |
| // We create an exception, but the text depends on the S3Guard state |
| String message = hasMetadataStore() |
| ? RemoteFileChangedException.FILE_NEVER_FOUND |
| : RemoteFileChangedException.FILE_NOT_FOUND_SINGLE_ATTEMPT; |
| throw new RemoteFileChangedException( |
| keyToQualifiedPath(srcKey).toString(), |
| action, |
| message, |
| e); |
| } |
| ObjectMetadata dstom = cloneObjectMetadata(srcom); |
| setOptionalObjectMetadata(dstom); |
| |
| return readInvoker.retry( |
| action, srcKey, |
| true, |
| () -> { |
| CopyObjectRequest copyObjectRequest = |
| new CopyObjectRequest(bucket, srcKey, bucket, dstKey); |
| changeTracker.maybeApplyConstraint(copyObjectRequest); |
| |
| setOptionalCopyObjectRequestParameters(srcom, copyObjectRequest); |
| copyObjectRequest.setCannedAccessControlList(cannedACL); |
| copyObjectRequest.setNewObjectMetadata(dstom); |
| Optional.ofNullable(srcom.getStorageClass()) |
| .ifPresent(copyObjectRequest::setStorageClass); |
| incrementStatistic(OBJECT_COPY_REQUESTS); |
| Copy copy = transfers.copy(copyObjectRequest); |
| copy.addProgressListener(progressListener); |
| CopyOutcome copyOutcome = CopyOutcome.waitForCopy(copy); |
| InterruptedException interruptedException = |
| copyOutcome.getInterruptedException(); |
| if (interruptedException != null) { |
| // copy interrupted: convert to an IOException. |
| throw (IOException)new InterruptedIOException( |
| "Interrupted copying " + srcKey |
| + " to " + dstKey + ", cancelling") |
| .initCause(interruptedException); |
| } |
| SdkBaseException awsException = copyOutcome.getAwsException(); |
| if (awsException != null) { |
| changeTracker.processException(awsException, "copy"); |
| throw awsException; |
| } |
| CopyResult result = copyOutcome.getCopyResult(); |
| changeTracker.processResponse(result); |
| incrementWriteOperations(); |
| instrumentation.filesCopied(1, size); |
| return result; |
| }); |
| } |
| |
| /** |
| * Propagate encryption parameters from source file if set else use the |
| * current filesystem encryption settings. |
| * @param srcom source object meta. |
| * @param copyObjectRequest copy object request body. |
| */ |
| private void setOptionalCopyObjectRequestParameters( |
| ObjectMetadata srcom, |
| CopyObjectRequest copyObjectRequest) { |
| String sourceKMSId = srcom.getSSEAwsKmsKeyId(); |
| if (isNotEmpty(sourceKMSId)) { |
| // source KMS ID is propagated |
| LOG.debug("Propagating SSE-KMS settings from source {}", |
| sourceKMSId); |
| copyObjectRequest.setSSEAwsKeyManagementParams( |
| new SSEAwsKeyManagementParams(sourceKMSId)); |
| } |
| switch(getServerSideEncryptionAlgorithm()) { |
| /** |
| * Overriding with client encryption settings. |
| */ |
| case SSE_C: |
| generateSSECustomerKey().ifPresent(customerKey -> { |
| copyObjectRequest.setSourceSSECustomerKey(customerKey); |
| copyObjectRequest.setDestinationSSECustomerKey(customerKey); |
| }); |
| break; |
| case SSE_KMS: |
| generateSSEAwsKeyParams().ifPresent( |
| copyObjectRequest::setSSEAwsKeyManagementParams); |
| break; |
| default: |
| } |
| } |
| |
| /** |
| * Set the optional parameters when initiating the request (encryption, |
| * headers, storage, etc). |
| * @param request request to patch. |
| */ |
| protected void setOptionalMultipartUploadRequestParameters( |
| InitiateMultipartUploadRequest request) { |
| generateSSEAwsKeyParams().ifPresent(request::setSSEAwsKeyManagementParams); |
| generateSSECustomerKey().ifPresent(request::setSSECustomerKey); |
| } |
| |
| /** |
| * Sets server side encryption parameters to the part upload |
| * request when encryption is enabled. |
| * @param request upload part request |
| */ |
| protected void setOptionalUploadPartRequestParameters( |
| UploadPartRequest request) { |
| generateSSECustomerKey().ifPresent(request::setSSECustomerKey); |
| } |
| |
| /** |
| * Initiate a multipart upload from the preconfigured request. |
| * Retry policy: none + untranslated. |
| * @param request request to initiate |
| * @return the result of the call |
| * @throws AmazonClientException on failures inside the AWS SDK |
| * @throws IOException Other IO problems |
| */ |
| @Retries.OnceRaw |
| InitiateMultipartUploadResult initiateMultipartUpload( |
| InitiateMultipartUploadRequest request) throws IOException { |
| LOG.debug("Initiate multipart upload to {}", request.getKey()); |
| incrementStatistic(OBJECT_MULTIPART_UPLOAD_INITIATED); |
| return getAmazonS3Client().initiateMultipartUpload(request); |
| } |
| |
| private void setOptionalPutRequestParameters(PutObjectRequest request) { |
| generateSSEAwsKeyParams().ifPresent(request::setSSEAwsKeyManagementParams); |
| generateSSECustomerKey().ifPresent(request::setSSECustomerKey); |
| } |
| |
| private void setOptionalObjectMetadata(ObjectMetadata metadata) { |
| final S3AEncryptionMethods algorithm |
| = getServerSideEncryptionAlgorithm(); |
| if (S3AEncryptionMethods.SSE_S3.equals(algorithm)) { |
| metadata.setSSEAlgorithm(algorithm.getMethod()); |
| } |
| } |
| |
| /** |
| * Create the AWS SDK structure used to configure SSE, |
| * if the encryption secrets contain the information/settings for this. |
| * @return an optional set of KMS Key settings |
| */ |
| private Optional<SSEAwsKeyManagementParams> generateSSEAwsKeyParams() { |
| return EncryptionSecretOperations.createSSEAwsKeyManagementParams( |
| encryptionSecrets); |
| } |
| |
| /** |
| * Create the SSE-C structure for the AWS SDK, if the encryption secrets |
| * contain the information/settings for this. |
| * This will contain a secret extracted from the bucket/configuration. |
| * @return an optional customer key. |
| */ |
| private Optional<SSECustomerKey> generateSSECustomerKey() { |
| return EncryptionSecretOperations.createSSECustomerKey( |
| encryptionSecrets); |
| } |
| |
| /** |
| * Perform post-write actions. |
| * <p></p> |
| * This operation MUST be called after any PUT/multipart PUT completes |
| * successfully. |
| * <p></p> |
| * The actions include: |
| * <ol> |
| * <li> |
| * Calling |
| * {@link #deleteUnnecessaryFakeDirectories(Path, BulkOperationState)} |
| * if directory markers are not being retained. |
| * </li> |
| * <li> |
| * Updating any metadata store with details on the newly created |
| * object. |
| * </li> |
| * </ol> |
| * @param key key written to |
| * @param length total length of file written |
| * @param eTag eTag of the written object |
| * @param versionId S3 object versionId of the written object |
| * @param operationState state of any ongoing bulk operation. |
| * @throws MetadataPersistenceException if metadata about the write could |
| * not be saved to the metadata store and |
| * fs.s3a.metadatastore.fail.on.write.error=true |
| */ |
| @InterfaceAudience.Private |
| @Retries.RetryTranslated("Except if failOnMetadataWriteError=false, in which" |
| + " case RetryExceptionsSwallowed") |
| void finishedWrite(String key, long length, String eTag, String versionId, |
| @Nullable final BulkOperationState operationState) |
| throws MetadataPersistenceException { |
| LOG.debug("Finished write to {}, len {}. etag {}, version {}", |
| key, length, eTag, versionId); |
| Path p = keyToQualifiedPath(key); |
| Preconditions.checkArgument(length >= 0, "content length is negative"); |
| final boolean isDir = objectRepresentsDirectory(key, length); |
| // kick off an async delete |
| CompletableFuture<?> deletion; |
| if (!keepDirectoryMarkers(p)) { |
| deletion = submit( |
| unboundedThreadPool, |
| () -> { |
| deleteUnnecessaryFakeDirectories( |
| p.getParent(), |
| operationState); |
| return null; |
| }); |
| } else { |
| deletion = null; |
| } |
| // this is only set if there is a metastore to update and the |
| // operationState parameter passed in was null. |
| BulkOperationState stateToClose = null; |
| |
| // See note about failure semantics in S3Guard documentation |
| try { |
| if (hasMetadataStore()) { |
| BulkOperationState activeState = operationState; |
| if (activeState == null) { |
| // create an operation state if there was none, so that the |
| // information gleaned from addAncestors is preserved into the |
| // subsequent put. |
| stateToClose = S3Guard.initiateBulkWrite(metadataStore, |
| isDir |
| ? BulkOperationState.OperationType.Mkdir |
| : BulkOperationState.OperationType.Put, |
| keyToPath(key)); |
| activeState = stateToClose; |
| } |
| S3Guard.addAncestors(metadataStore, p, ttlTimeProvider, activeState); |
| S3AFileStatus status = createUploadFileStatus(p, |
| isDir, length, |
| getDefaultBlockSize(p), username, eTag, versionId); |
| boolean authoritative = false; |
| if (isDir) { |
| // this is a directory marker so put it as such. |
| status.setIsEmptyDirectory(Tristate.TRUE); |
| // and maybe mark as auth |
| authoritative = allowAuthoritative(p); |
| } |
| if (!authoritative) { |
| // for files and non-auth directories |
| S3Guard.putAndReturn(metadataStore, status, |
| ttlTimeProvider, |
| activeState); |
| } else { |
| // authoritative directory |
| S3Guard.putAuthDirectoryMarker(metadataStore, status, |
| ttlTimeProvider, |
| activeState); |
| } |
| } |
| // and catch up with any delete operation. |
| waitForCompletionIgnoringExceptions(deletion); |
| } catch (IOException e) { |
| if (failOnMetadataWriteError) { |
| throw new MetadataPersistenceException(p.toString(), e); |
| } else { |
| LOG.error("S3Guard: Error updating MetadataStore for write to {}", |
| p, e); |
| } |
| instrumentation.errorIgnored(); |
| } finally { |
| // if a new operation state was created, close it. |
| IOUtils.cleanupWithLogger(LOG, stateToClose); |
| } |
| } |
| |
| /** |
| * Should we keep directory markers under the path being created |
| * by mkdir/file creation/rename? |
| * @param path path to probe |
| * @return true if the markers MAY be retained, |
| * false if they MUST be deleted |
| */ |
| private boolean keepDirectoryMarkers(Path path) { |
| return directoryPolicy.keepDirectoryMarkers(path); |
| } |
| |
| /** |
| * Delete mock parent directories which are no longer needed. |
| * Retry policy: retrying; exceptions swallowed. |
| * @param path path |
| * @param operationState (nullable) operational state for a bulk update |
| */ |
| @Retries.RetryExceptionsSwallowed |
| private void deleteUnnecessaryFakeDirectories(Path path, |
| final BulkOperationState operationState) { |
| List<DeleteObjectsRequest.KeyVersion> keysToRemove = new ArrayList<>(); |
| while (!path.isRoot()) { |
| String key = pathToKey(path); |
| key = (key.endsWith("/")) ? key : (key + "/"); |
| LOG.trace("To delete unnecessary fake directory {} for {}", key, path); |
| keysToRemove.add(new DeleteObjectsRequest.KeyVersion(key)); |
| path = path.getParent(); |
| } |
| try { |
| removeKeys(keysToRemove, true, operationState); |
| } catch(AmazonClientException | IOException e) { |
| instrumentation.errorIgnored(); |
| if (LOG.isDebugEnabled()) { |
| StringBuilder sb = new StringBuilder(); |
| for(DeleteObjectsRequest.KeyVersion kv : keysToRemove) { |
| sb.append(kv.getKey()).append(","); |
| } |
| LOG.debug("While deleting keys {} ", sb.toString(), e); |
| } |
| } |
| } |
| |
| /** |
| * Create a fake directory, always ending in "/". |
| * Retry policy: retrying; translated. |
| * @param objectName name of directory object. |
| * @throws IOException IO failure |
| */ |
| @Retries.RetryTranslated |
| private void createFakeDirectory(final String objectName) |
| throws IOException { |
| if (!objectName.endsWith("/")) { |
| createEmptyObject(objectName + "/"); |
| } else { |
| createEmptyObject(objectName); |
| } |
| } |
| |
| /** |
| * Used to create an empty file that represents an empty directory. |
| * Retry policy: retrying; translated. |
| * @param objectName object to create |
| * @throws IOException IO failure |
| */ |
| @Retries.RetryTranslated |
| private void createEmptyObject(final String objectName) |
| throws IOException { |
| final InputStream im = new InputStream() { |
| @Override |
| public int read() throws IOException { |
| return -1; |
| } |
| }; |
| |
| PutObjectRequest putObjectRequest = newPutObjectRequest(objectName, |
| newObjectMetadata(0L), |
| im); |
| invoker.retry("PUT 0-byte object ", objectName, |
| true, |
| () -> putObjectDirect(putObjectRequest)); |
| incrementPutProgressStatistics(objectName, 0); |
| instrumentation.directoryCreated(); |
| } |
| |
| /** |
| * Creates a copy of the passed {@link ObjectMetadata}. |
| * Does so without using the {@link ObjectMetadata#clone()} method, |
| * to avoid copying unnecessary headers. |
| * @param source the {@link ObjectMetadata} to copy |
| * @return a copy of {@link ObjectMetadata} with only relevant attributes |
| */ |
| private ObjectMetadata cloneObjectMetadata(ObjectMetadata source) { |
| // This approach may be too brittle, especially if |
| // in future there are new attributes added to ObjectMetadata |
| // that we do not explicitly call to set here |
| ObjectMetadata ret = newObjectMetadata(source.getContentLength()); |
| |
| // Possibly null attributes |
| // Allowing nulls to pass breaks it during later use |
| if (source.getCacheControl() != null) { |
| ret.setCacheControl(source.getCacheControl()); |
| } |
| if (source.getContentDisposition() != null) { |
| ret.setContentDisposition(source.getContentDisposition()); |
| } |
| if (source.getContentEncoding() != null) { |
| ret.setContentEncoding(source.getContentEncoding()); |
| } |
| if (source.getContentMD5() != null) { |
| ret.setContentMD5(source.getContentMD5()); |
| } |
| if (source.getContentType() != null) { |
| ret.setContentType(source.getContentType()); |
| } |
| if (source.getExpirationTime() != null) { |
| ret.setExpirationTime(source.getExpirationTime()); |
| } |
| if (source.getExpirationTimeRuleId() != null) { |
| ret.setExpirationTimeRuleId(source.getExpirationTimeRuleId()); |
| } |
| if (source.getHttpExpiresDate() != null) { |
| ret.setHttpExpiresDate(source.getHttpExpiresDate()); |
| } |
| if (source.getLastModified() != null) { |
| ret.setLastModified(source.getLastModified()); |
| } |
| if (source.getOngoingRestore() != null) { |
| ret.setOngoingRestore(source.getOngoingRestore()); |
| } |
| if (source.getRestoreExpirationTime() != null) { |
| ret.setRestoreExpirationTime(source.getRestoreExpirationTime()); |
| } |
| if (source.getSSEAlgorithm() != null) { |
| ret.setSSEAlgorithm(source.getSSEAlgorithm()); |
| } |
| if (source.getSSECustomerAlgorithm() != null) { |
| ret.setSSECustomerAlgorithm(source.getSSECustomerAlgorithm()); |
| } |
| if (source.getSSECustomerKeyMd5() != null) { |
| ret.setSSECustomerKeyMd5(source.getSSECustomerKeyMd5()); |
| } |
| |
| for (Map.Entry<String, String> e : source.getUserMetadata().entrySet()) { |
| ret.addUserMetadata(e.getKey(), e.getValue()); |
| } |
| return ret; |
| } |
| |
| /** |
| * Return the number of bytes that large input files should be optimally |
| * be split into to minimize I/O time. |
| * @deprecated use {@link #getDefaultBlockSize(Path)} instead |
| */ |
| @Deprecated |
| public long getDefaultBlockSize() { |
| return getConf().getLongBytes(FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE); |
| } |
| |
| /** |
| * Get the directory marker policy of this filesystem. |
| * @return the marker policy. |
| */ |
| public DirectoryPolicy getDirectoryMarkerPolicy() { |
| return directoryPolicy; |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "S3AFileSystem{"); |
| sb.append("uri=").append(uri); |
| sb.append(", workingDir=").append(workingDir); |
| sb.append(", inputPolicy=").append(inputPolicy); |
| sb.append(", partSize=").append(partSize); |
| sb.append(", enableMultiObjectsDelete=").append(enableMultiObjectsDelete); |
| sb.append(", maxKeys=").append(maxKeys); |
| if (cannedACL != null) { |
| sb.append(", cannedACL=").append(cannedACL.toString()); |
| } |
| sb.append(", readAhead=").append(readAhead); |
| if (getConf() != null) { |
| sb.append(", blockSize=").append(getDefaultBlockSize()); |
| } |
| sb.append(", multiPartThreshold=").append(multiPartThreshold); |
| if (getServerSideEncryptionAlgorithm() != null) { |
| sb.append(", serverSideEncryptionAlgorithm='") |
| .append(getServerSideEncryptionAlgorithm()) |
| .append('\''); |
| } |
| if (blockFactory != null) { |
| sb.append(", blockFactory=").append(blockFactory); |
| } |
| sb.append(", metastore=").append(metadataStore); |
| sb.append(", authoritativeStore=").append(allowAuthoritativeMetadataStore); |
| sb.append(", authoritativePath=").append(allowAuthoritativePaths); |
| sb.append(", useListV1=").append(useListV1); |
| if (committerIntegration != null) { |
| sb.append(", magicCommitter=").append(isMagicCommitEnabled()); |
| } |
| sb.append(", boundedExecutor=").append(boundedThreadPool); |
| sb.append(", unboundedExecutor=").append(unboundedThreadPool); |
| sb.append(", credentials=").append(credentials); |
| sb.append(", delegation tokens=") |
| .append(delegationTokens.map(Objects::toString).orElse("disabled")); |
| sb.append(", ").append(directoryPolicy); |
| sb.append(", statistics {") |
| .append(statistics) |
| .append("}"); |
| if (instrumentation != null) { |
| sb.append(", metrics {") |
| .append(instrumentation.dump("{", "=", "} ", true)) |
| .append("}"); |
| } |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| /** |
| * Get the partition size for multipart operations. |
| * @return the value as set during initialization |
| */ |
| public long getPartitionSize() { |
| return partSize; |
| } |
| |
| /** |
| * Get the threshold for multipart files. |
| * @return the value as set during initialization |
| */ |
| public long getMultiPartThreshold() { |
| return multiPartThreshold; |
| } |
| |
| /** |
| * Get the maximum key count. |
| * @return a value, valid after initialization |
| */ |
| int getMaxKeys() { |
| return maxKeys; |
| } |
| |
| /** |
| * Is magic commit enabled? |
| * @return true if magic commit support is turned on. |
| */ |
| public boolean isMagicCommitEnabled() { |
| return committerIntegration.isMagicCommitEnabled(); |
| } |
| |
| /** |
| * Predicate: is a path a magic commit path? |
| * True if magic commit is enabled and the path qualifies as special. |
| * @param path path to examine |
| * @return true if the path is or is under a magic directory |
| */ |
| public boolean isMagicCommitPath(Path path) { |
| return committerIntegration.isMagicCommitPath(path); |
| } |
| |
| /** |
| * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}. |
| * Override superclass so as to disable symlink resolution as symlinks |
| * are not supported by S3A. |
| * {@inheritDoc} |
| */ |
| @Override |
| public FileStatus[] globStatus(Path pathPattern) throws IOException { |
| return globStatus(pathPattern, ACCEPT_ALL); |
| } |
| |
| /** |
| * Increments the statistic {@link Statistic#INVOCATION_GLOB_STATUS}. |
| * Override superclass so as to disable symlink resolution as symlinks |
| * are not supported by S3A. |
| * {@inheritDoc} |
| */ |
| @Override |
| public FileStatus[] globStatus( |
| final Path pathPattern, |
| final PathFilter filter) |
| throws IOException { |
| entryPoint(INVOCATION_GLOB_STATUS); |
| return Globber.createGlobber(this) |
| .withPathPattern(pathPattern) |
| .withPathFiltern(filter) |
| .withResolveSymlinks(false) |
| .build() |
| .glob(); |
| } |
| |
| /** |
| * Override superclass so as to add statistic collection. |
| * {@inheritDoc} |
| */ |
| @Override |
| public boolean exists(Path f) throws IOException { |
| entryPoint(INVOCATION_EXISTS); |
| return super.exists(f); |
| } |
| |
| /** |
| * Optimized probe for a path referencing a dir. |
| * Even though it is optimized to a single HEAD, applications |
| * should not over-use this method...it is all too common. |
| * {@inheritDoc} |
| */ |
| @Override |
| @SuppressWarnings("deprecation") |
| public boolean isDirectory(Path f) throws IOException { |
| entryPoint(INVOCATION_IS_DIRECTORY); |
| try { |
| return innerGetFileStatus(f, false, StatusProbeEnum.DIRECTORIES) |
| .isDirectory(); |
| } catch (FileNotFoundException e) { |
| // not found or it is a file. |
| return false; |
| } |
| } |
| |
| /** |
| * Optimized probe for a path referencing a file. |
| * Even though it is optimized to a single HEAD, applications |
| * should not over-use this method...it is all too common. |
| * {@inheritDoc} |
| */ |
| @Override |
| @SuppressWarnings("deprecation") |
| public boolean isFile(Path f) throws IOException { |
| entryPoint(INVOCATION_IS_FILE); |
| try { |
| return innerGetFileStatus(f, false, StatusProbeEnum.HEAD_ONLY) |
| .isFile(); |
| } catch (FileNotFoundException e) { |
| // not found or it is a dir. |
| return false; |
| } |
| } |
| |
| /** |
| * When enabled, get the etag of a object at the path via HEAD request and |
| * return it as a checksum object. |
| * <ol> |
| * <li>If a tag has not changed, consider the object unchanged.</li> |
| * <li>Two tags being different does not imply the data is different.</li> |
| * </ol> |
| * Different S3 implementations may offer different guarantees. |
| * |
| * This check is (currently) only made if |
| * {@link Constants#ETAG_CHECKSUM_ENABLED} is set; turning it on |
| * has caused problems with Distcp (HADOOP-15273). |
| * |
| * @param f The file path |
| * @param length The length of the file range for checksum calculation |
| * @return The EtagChecksum or null if checksums are not enabled or supported. |
| * @throws IOException IO failure |
| * @see <a href="http://docs.aws.amazon.com/AmazonS3/latest/API/RESTCommonResponseHeaders.html">Common Response Headers</a> |
| */ |
| @Override |
| @Retries.RetryTranslated |
| public EtagChecksum getFileChecksum(Path f, final long length) |
| throws IOException { |
| Preconditions.checkArgument(length >= 0); |
| entryPoint(INVOCATION_GET_FILE_CHECKSUM); |
| |
| if (getConf().getBoolean(ETAG_CHECKSUM_ENABLED, |
| ETAG_CHECKSUM_ENABLED_DEFAULT)) { |
| Path path = qualify(f); |
| LOG.debug("getFileChecksum({})", path); |
| ObjectMetadata headers = getObjectMetadata(path); |
| String eTag = headers.getETag(); |
| return eTag != null ? new EtagChecksum(eTag) : null; |
| } else { |
| // disabled |
| return null; |
| } |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * |
| * This implementation is optimized for S3, which can do a bulk listing |
| * off all entries under a path in one single operation. Thus there is |
| * no need to recursively walk the directory tree. |
| * |
| * Instead a {@link ListObjectsRequest} is created requesting a (windowed) |
| * listing of all entries under the given path. This is used to construct |
| * an {@code ObjectListingIterator} instance, iteratively returning the |
| * sequence of lists of elements under the path. This is then iterated |
| * over in a {@code FileStatusListingIterator}, which generates |
| * {@link S3AFileStatus} instances, one per listing entry. |
| * These are then translated into {@link LocatedFileStatus} instances. |
| * |
| * This is essentially a nested and wrapped set of iterators, with some |
| * generator classes. |
| * @param f a path |
| * @param recursive if the subdirectories need to be traversed recursively |
| * |
| * @return an iterator that traverses statuses of the files/directories |
| * in the given path |
| * @throws FileNotFoundException if {@code path} does not exist |
| * @throws IOException if any I/O error occurred |
| */ |
| @Override |
| @Retries.RetryTranslated |
| public RemoteIterator<LocatedFileStatus> listFiles(Path f, |
| boolean recursive) throws FileNotFoundException, IOException { |
| return toLocatedFileStatusIterator(innerListFiles(f, recursive, |
| new Listing.AcceptFilesOnly(qualify(f)), null, true, false)); |
| } |
| |
| private static RemoteIterator<LocatedFileStatus> toLocatedFileStatusIterator( |
| RemoteIterator<? extends LocatedFileStatus> iterator) { |
| return new RemoteIterator<LocatedFileStatus>() { |
| @Override |
| public boolean hasNext() throws IOException { |
| return iterator.hasNext(); |
| } |
| |
| @Override |
| public LocatedFileStatus next() throws IOException { |
| return iterator.next(); |
| } |
| }; |
| } |
| |
| /** |
| * Recursive List of files and empty directories. |
| * @param f path to list from |
| * @return an iterator. |
| * @throws IOException failure |
| */ |
| @Retries.RetryTranslated |
| public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectories( |
| Path f, boolean recursive) throws IOException { |
| return innerListFiles(f, recursive, Listing.ACCEPT_ALL_BUT_S3N, |
| null, true, false); |
| } |
| |
| /** |
| * Recursive List of files and empty directories, force metadatastore |
| * to act like it is non-authoritative. |
| * @param f path to list from |
| * @param recursive |
| * @return an iterator. |
| * @throws IOException failure |
| */ |
| @Retries.RetryTranslated |
| public RemoteIterator<S3ALocatedFileStatus> listFilesAndEmptyDirectoriesForceNonAuth( |
| Path f, boolean recursive) throws IOException { |
| return innerListFiles(f, recursive, Listing.ACCEPT_ALL_BUT_S3N, |
| null, true, true); |
| } |
| |
| /** |
| * List files under the path. |
| * <ol> |
| * <li> |
| * If the path is authoritative on the client, |
| * only S3Guard will be queried. |
| * </li> |
| * <li> |
| * Otherwise, the S3Guard values are returned first, then the S3 |
| * entries will be retrieved and returned if not already listed.</li> |
| * <li> |
| * when collectTombstones} is true, S3Guard tombstones will |
| * be used to filter out deleted files. |
| * They MUST be used for normal listings; it is only for |
| * deletion and low-level operations that they MAY be bypassed. |
| * </li> |
| * <li> |
| * The optional {@code status} parameter will be used to skip the |
| * initial getFileStatus call. |
| * </li> |
| * </ol> |
| * |
| * In case of recursive listing, if any of the directories reachable from |
| * the path are not authoritative on the client, this method will query S3 |
| * for all the directories in the listing in addition to returning S3Guard |
| * entries. |
| * |
| * @param f path |
| * @param recursive recursive listing? |
| * @param acceptor file status filter |
| * @param status optional status of path to list. |
| * @param collectTombstones should tombstones be collected from S3Guard? |
| * @param forceNonAuthoritativeMS forces metadata store to act like non |
| * authoritative. This is useful when |
| * listFiles output is used by import tool. |
| * @return an iterator over the listing. |
| * @throws IOException failure |
| */ |
| @Retries.RetryTranslated |
| private RemoteIterator<S3ALocatedFileStatus> innerListFiles( |
| final Path f, |
| final boolean recursive, |
| final Listing.FileStatusAcceptor acceptor, |
| final S3AFileStatus status, |
| final boolean collectTombstones, |
| final boolean forceNonAuthoritativeMS) throws IOException { |
| entryPoint(INVOCATION_LIST_FILES); |
| Path path = qualify(f); |
| LOG.debug("listFiles({}, {})", path, recursive); |
| try { |
| // if a status was given and it is a file. |
| if (status != null && status.isFile()) { |
| // simple case: File |
| LOG.debug("Path is a file: {}", path); |
| return new Listing.SingleStatusRemoteIterator( |
| toLocatedFileStatus(status)); |
| } |
| // Assuming the path to be a directory |
| // do a bulk operation. |
| RemoteIterator<S3ALocatedFileStatus> listFilesAssumingDir = |
| listing.getListFilesAssumingDir(path, |
| recursive, |
| acceptor, |
| collectTombstones, |
| forceNonAuthoritativeMS); |
| // If there are no list entries present, we |
| // fallback to file existence check as the path |
| // can be a file or empty directory. |
| if (!listFilesAssumingDir.hasNext()) { |
| // If file status was already passed, reuse it. |
| final S3AFileStatus fileStatus = status != null |
| ? status |
| : (S3AFileStatus) getFileStatus(path); |
| if (fileStatus.isFile()) { |
| return new Listing.SingleStatusRemoteIterator( |
| toLocatedFileStatus(fileStatus)); |
| } |
| } |
| // If we have reached here, it means either there are files |
| // in this directory or it is empty. |
| return listFilesAssumingDir; |
| } catch (AmazonClientException e) { |
| throw translateException("listFiles", path, e); |
| } |
| } |
| |
| /** |
| * Override superclass so as to add statistic collection. |
| * {@inheritDoc} |
| */ |
| @Override |
| public RemoteIterator<LocatedFileStatus> listLocatedStatus(Path f) |
| throws FileNotFoundException, IOException { |
| return listLocatedStatus(f, ACCEPT_ALL); |
| } |
| |
| /** |
| * {@inheritDoc}. |
| * |
| * S3 Optimized directory listing. The initial operation performs the |
| * first bulk listing; extra listings will take place |
| * when all the current set of results are used up. |
| * @param f a path |
| * @param filter a path filter |
| * @return an iterator that traverses statuses of the files/directories |
| * in the given path |
| * @throws FileNotFoundException if {@code path} does not exist |
| * @throws IOException if any I/O error occurred |
| */ |
| @Override |
| @Retries.OnceTranslated("s3guard not retrying") |
| public RemoteIterator<LocatedFileStatus> listLocatedStatus(final Path f, |
| final PathFilter filter) |
| throws FileNotFoundException, IOException { |
| entryPoint(INVOCATION_LIST_LOCATED_STATUS); |
| Path path = qualify(f); |
| LOG.debug("listLocatedStatus({}, {}", path, filter); |
| RemoteIterator<? extends LocatedFileStatus> iterator = |
| once("listLocatedStatus", path.toString(), |
| () -> { |
| // Assuming the path to be a directory, |
| // trigger a list call directly. |
| final RemoteIterator<S3ALocatedFileStatus> |
| locatedFileStatusIteratorForDir = |
| listing.getLocatedFileStatusIteratorForDir(path, filter); |
| |
| // If no listing is present then path might be a file. |
| if (!locatedFileStatusIteratorForDir.hasNext()) { |
| final S3AFileStatus fileStatus = |
| (S3AFileStatus) getFileStatus(path); |
| if (fileStatus.isFile()) { |
| // simple case: File |
| LOG.debug("Path is a file"); |
| return new Listing.SingleStatusRemoteIterator( |
| filter.accept(path) |
| ? toLocatedFileStatus(fileStatus) |
| : null); |
| } |
| } |
| // Either empty or non-empty directory. |
| return locatedFileStatusIteratorForDir; |
| }); |
| return toLocatedFileStatusIterator(iterator); |
| } |
| |
| /** |
| * Generate list located status for a directory. |
| * Also performing tombstone reconciliation for guarded directories. |
| * @param dir directory to check. |
| * @param filter a path filter. |
| * @return an iterator that traverses statuses of the given dir. |
| * @throws IOException in case of failure. |
| */ |
| private RemoteIterator<S3ALocatedFileStatus> getLocatedFileStatusIteratorForDir( |
| Path dir, PathFilter filter) throws IOException { |
| final String key = maybeAddTrailingSlash(pathToKey(dir)); |
| final Listing.FileStatusAcceptor acceptor = |
| new Listing.AcceptAllButSelfAndS3nDirs(dir); |
| boolean allowAuthoritative = allowAuthoritative(dir); |
| DirListingMetadata meta = |
| S3Guard.listChildrenWithTtl(metadataStore, dir, |
| ttlTimeProvider, allowAuthoritative); |
| Set<Path> tombstones = meta != null |
| ? meta.listTombstones() |
| : null; |
| final RemoteIterator<S3AFileStatus> cachedFileStatusIterator = |
| listing.createProvidedFileStatusIterator( |
| dirMetaToStatuses(meta), filter, acceptor); |
| return (allowAuthoritative && meta != null |
| && meta.isAuthoritative()) |
| ? listing.createLocatedFileStatusIterator( |
| cachedFileStatusIterator) |
| : listing.createTombstoneReconcilingIterator( |
| listing.createLocatedFileStatusIterator( |
| listing.createFileStatusListingIterator(dir, |
| createListObjectsRequest(key, "/"), |
| filter, |
| acceptor, |
| cachedFileStatusIterator)), |
| tombstones); |
| } |
| |
| /** |
| * Build a {@link S3ALocatedFileStatus} from a {@link FileStatus} instance. |
| * @param status file status |
| * @return a located status with block locations set up from this FS. |
| * @throws IOException IO Problems. |
| */ |
| S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status) |
| throws IOException { |
| return new S3ALocatedFileStatus(status, |
| status.isFile() ? |
| getFileBlockLocations(status, 0, status.getLen()) |
| : null); |
| } |
| |
| /** |
| * List any pending multipart uploads whose keys begin with prefix, using |
| * an iterator that can handle an unlimited number of entries. |
| * See {@link #listMultipartUploads(String)} for a non-iterator version of |
| * this. |
| * |
| * @param prefix optional key prefix to search |
| * @return Iterator over multipart uploads. |
| * @throws IOException on failure |
| */ |
| public MultipartUtils.UploadIterator listUploads(@Nullable String prefix) |
| throws IOException { |
| return MultipartUtils.listMultipartUploads(s3, invoker, bucket, maxKeys, |
| prefix); |
| } |
| |
| /** |
| * Listing all multipart uploads; limited to the first few hundred. |
| * See {@link #listUploads(String)} for an iterator-based version that does |
| * not limit the number of entries returned. |
| * Retry policy: retry, translated. |
| * @return a listing of multipart uploads. |
| * @param prefix prefix to scan for, "" for none |
| * @throws IOException IO failure, including any uprated AmazonClientException |
| */ |
| @InterfaceAudience.Private |
| @Retries.RetryTranslated |
| public List<MultipartUpload> listMultipartUploads(String prefix) |
| throws IOException { |
| ListMultipartUploadsRequest request = new ListMultipartUploadsRequest( |
| bucket); |
| if (!prefix.isEmpty()) { |
| if (!prefix.endsWith("/")) { |
| prefix = prefix + "/"; |
| } |
| request.setPrefix(prefix); |
| } |
| |
| return invoker.retry("listMultipartUploads", prefix, true, |
| () -> s3.listMultipartUploads(request).getMultipartUploads()); |
| } |
| |
| /** |
| * Abort a multipart upload. |
| * Retry policy: none. |
| * @param destKey destination key |
| * @param uploadId Upload ID |
| */ |
| @Retries.OnceRaw |
| void abortMultipartUpload(String destKey, String uploadId) { |
| LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey); |
| getAmazonS3Client().abortMultipartUpload( |
| new AbortMultipartUploadRequest(getBucket(), |
| destKey, |
| uploadId)); |
| } |
| |
| /** |
| * Abort a multipart upload. |
| * Retry policy: none. |
| * @param upload the listed upload to abort. |
| */ |
| @Retries.OnceRaw |
| void abortMultipartUpload(MultipartUpload upload) { |
| String destKey; |
| String uploadId; |
| destKey = upload.getKey(); |
| uploadId = upload.getUploadId(); |
| if (LOG.isInfoEnabled()) { |
| DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
| LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}", |
| uploadId, destKey, upload.getInitiator(), |
| df.format(upload.getInitiated())); |
| } |
| getAmazonS3Client().abortMultipartUpload( |
| new AbortMultipartUploadRequest(getBucket(), |
| destKey, |
| uploadId)); |
| } |
| |
| /** |
| * Create a new instance of the committer statistics. |
| * @return a new committer statistics instance |
| */ |
| public S3AInstrumentation.CommitterStatistics newCommitterStatistics() { |
| return instrumentation.newCommitterStatistics(); |
| } |
| |
| @SuppressWarnings("deprecation") |
| @Override |
| public boolean hasPathCapability(final Path path, final String capability) |
| throws IOException { |
| final Path p = makeQualified(path); |
| String cap = validatePathCapabilityArgs(p, capability); |
| switch (cap) { |
| |
| case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER: |
| case CommitConstants.STORE_CAPABILITY_MAGIC_COMMITTER_OLD: |
| // capability depends on FS configuration |
| return isMagicCommitEnabled(); |
| |
| case SelectConstants.S3_SELECT_CAPABILITY: |
| // select is only supported if enabled |
| return selectBinding.isEnabled(); |
| |
| case CommonPathCapabilities.FS_CHECKSUMS: |
| // capability depends on FS configuration |
| return getConf().getBoolean(ETAG_CHECKSUM_ENABLED, |
| ETAG_CHECKSUM_ENABLED_DEFAULT); |
| |
| case CommonPathCapabilities.FS_MULTIPART_UPLOADER: |
| return true; |
| |
| // this client is safe to use with buckets |
| // containing directory markers anywhere in |
| // the hierarchy |
| case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE: |
| return true; |
| |
| /* |
| * Marker policy capabilities are handed off. |
| */ |
| case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP: |
| case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_DELETE: |
| case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_AUTHORITATIVE: |
| case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_KEEP: |
| case STORE_CAPABILITY_DIRECTORY_MARKER_ACTION_DELETE: |
| return getDirectoryMarkerPolicy().hasPathCapability(path, cap); |
| |
| default: |
| return super.hasPathCapability(p, cap); |
| } |
| } |
| |
| /** |
| * Return the capabilities of this filesystem instance. |
| * |
| * This has been supplanted by {@link #hasPathCapability(Path, String)}. |
| * @param capability string to query the stream support for. |
| * @return whether the FS instance has the capability. |
| */ |
| @Deprecated |
| @Override |
| public boolean hasCapability(String capability) { |
| try { |
| return hasPathCapability(new Path("/"), capability); |
| } catch (IOException ex) { |
| // should never happen, so log and downgrade. |
| LOG.debug("Ignoring exception on hasCapability({}})", capability, ex); |
| return false; |
| } |
| } |
| |
| /** |
| * Get a shared copy of the AWS credentials, with its reference |
| * counter updated. |
| * Caller is required to call {@code close()} on this after |
| * they have finished using it. |
| * @param purpose what is this for? This is initially for logging |
| * @return a reference to shared credentials. |
| */ |
| public AWSCredentialProviderList shareCredentials(final String purpose) { |
| LOG.debug("Sharing credentials for: {}", purpose); |
| return credentials.share(); |
| } |
| |
| @VisibleForTesting |
| public ITtlTimeProvider getTtlTimeProvider() { |
| return ttlTimeProvider; |
| } |
| |
| @VisibleForTesting |
| protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) { |
| this.ttlTimeProvider = ttlTimeProvider; |
| metadataStore.setTtlTimeProvider(ttlTimeProvider); |
| } |
| |
| /** |
| * This is a proof of concept of a select API. |
| * Once a proper factory mechanism for opening files is added to the |
| * FileSystem APIs, this will be deleted <i>without any warning</i>. |
| * @param source path to source data |
| * @param expression select expression |
| * @param options request configuration from the builder. |
| * @param providedStatus any passed in status |
| * @return the stream of the results |
| * @throws IOException IO failure |
| */ |
| @Retries.RetryTranslated |
| private FSDataInputStream select(final Path source, |
| final String expression, |
| final Configuration options, |
| final Optional<S3AFileStatus> providedStatus) |
| throws IOException { |
| entryPoint(OBJECT_SELECT_REQUESTS); |
| requireSelectSupport(source); |
| final Path path = makeQualified(source); |
| final S3AFileStatus fileStatus = extractOrFetchSimpleFileStatus(path, |
| providedStatus); |
| |
| // readahead range can be dynamically set |
| long ra = options.getLong(READAHEAD_RANGE, readAhead); |
| S3ObjectAttributes objectAttributes = createObjectAttributes(fileStatus); |
| S3AReadOpContext readContext = createReadContext(fileStatus, inputPolicy, |
| changeDetectionPolicy, ra); |
| |
| if (changeDetectionPolicy.getSource() != ChangeDetectionPolicy.Source.None |
| && fileStatus.getETag() != null) { |
| // if there is change detection, and the status includes at least an |
| // etag, |
| // check that the object metadata lines up with what is expected |
| // based on the object attributes (which may contain an eTag or |
| // versionId). |
| // This is because the select API doesn't offer this. |
| // (note: this is trouble for version checking as cannot force the old |
| // version in the final read; nor can we check the etag match) |
| ChangeTracker changeTracker = |
| new ChangeTracker(uri.toString(), |
| changeDetectionPolicy, |
| readContext.instrumentation.newInputStreamStatistics() |
| .getVersionMismatchCounter(), |
| objectAttributes); |
| |
| // will retry internally if wrong version detected |
| Invoker readInvoker = readContext.getReadInvoker(); |
| getObjectMetadata(path, changeTracker, readInvoker, "select"); |
| } |
| |
| // build and execute the request |
| return selectBinding.select( |
| readContext, |
| expression, |
| options, |
| generateSSECustomerKey(), |
| objectAttributes); |
| } |
| |
| /** |
| * Verify the FS supports S3 Select. |
| * @param source source file. |
| * @throws UnsupportedOperationException if not. |
| */ |
| private void requireSelectSupport(final Path source) throws |
| UnsupportedOperationException { |
| if (!selectBinding.isEnabled()) { |
| throw new UnsupportedOperationException( |
| SelectConstants.SELECT_UNSUPPORTED); |
| } |
| } |
| |
| /** |
| * Extract the status from the optional parameter, querying |
| * S3Guard/s3 if it is absent. |
| * @param path path of the status |
| * @param optStatus optional status |
| * @return a file status |
| * @throws FileNotFoundException if there is no normal file at that path |
| * @throws IOException IO failure |
| */ |
| private S3AFileStatus extractOrFetchSimpleFileStatus( |
| final Path path, final Optional<S3AFileStatus> optStatus) |
| throws IOException { |
| S3AFileStatus fileStatus; |
| if (optStatus.isPresent()) { |
| fileStatus = optStatus.get(); |
| } else { |
| // this looks at S3guard and gets any type of status back, |
| // if it falls back to S3 it does a HEAD only. |
| // therefore: if there is no S3Guard and there is a dir, this |
| // will raise a FileNotFoundException |
| fileStatus = innerGetFileStatus(path, false, |
| StatusProbeEnum.HEAD_ONLY); |
| } |
| // we check here for the passed in status or the S3Guard value |
| // for being a directory |
| if (fileStatus.isDirectory()) { |
| throw new FileNotFoundException(path.toString() + " is a directory"); |
| } |
| return fileStatus; |
| } |
| |
| /** |
| * Initiate the open or select operation. |
| * This is invoked from both the FileSystem and FileContext APIs |
| * @param rawPath path to the file |
| * @param parameters open file parameters from the builder. |
| * @return a future which will evaluate to the opened/selected file. |
| * @throws IOException failure to resolve the link. |
| * @throws PathIOException operation is a select request but S3 select is |
| * disabled |
| * @throws IllegalArgumentException unknown mandatory key |
| */ |
| @Override |
| @Retries.RetryTranslated |
| public CompletableFuture<FSDataInputStream> openFileWithOptions( |
| final Path rawPath, |
| final OpenFileParameters parameters) throws IOException { |
| final Path path = qualify(rawPath); |
| Configuration options = parameters.getOptions(); |
| Set<String> mandatoryKeys = parameters.getMandatoryKeys(); |
| String sql = options.get(SelectConstants.SELECT_SQL, null); |
| boolean isSelect = sql != null; |
| // choice of keys depends on open type |
| if (isSelect) { |
| rejectUnknownMandatoryKeys( |
| mandatoryKeys, |
| InternalSelectConstants.SELECT_OPTIONS, |
| "for " + path + " in S3 Select operation"); |
| } else { |
| rejectUnknownMandatoryKeys( |
| mandatoryKeys, |
| InternalConstants.STANDARD_OPENFILE_KEYS, |
| "for " + path + " in non-select file I/O"); |
| } |
| FileStatus providedStatus = parameters.getStatus(); |
| S3AFileStatus fileStatus; |
| if (providedStatus != null) { |
| Preconditions.checkArgument(path.equals(providedStatus.getPath()), |
| "FileStatus parameter is not for the path %s: %s", |
| path, providedStatus); |
| if (providedStatus instanceof S3AFileStatus) { |
| // can use this status to skip our own probes, |
| // including etag and version. |
| LOG.debug("File was opened with a supplied S3AFileStatus;" |
| + " skipping getFileStatus call in open() operation: {}", |
| providedStatus); |
| fileStatus = (S3AFileStatus) providedStatus; |
| } else if (providedStatus instanceof S3ALocatedFileStatus) { |
| LOG.debug("File was opened with a supplied S3ALocatedFileStatus;" |
| + " skipping getFileStatus call in open() operation: {}", |
| providedStatus); |
| fileStatus = ((S3ALocatedFileStatus) providedStatus).toS3AFileStatus(); |
| } else { |
| LOG.debug("Ignoring file status {}", providedStatus); |
| fileStatus = null; |
| } |
| } else { |
| fileStatus = null; |
| } |
| Optional<S3AFileStatus> ost = Optional.ofNullable(fileStatus); |
| CompletableFuture<FSDataInputStream> result = new CompletableFuture<>(); |
| if (!isSelect) { |
| // normal path. |
| unboundedThreadPool.submit(() -> |
| LambdaUtils.eval(result, |
| () -> open(path, Optional.of(options), ost))); |
| } else { |
| // it is a select statement. |
| // fail fast if the operation is not available |
| requireSelectSupport(path); |
| // submit the query |
| unboundedThreadPool.submit(() -> |
| LambdaUtils.eval(result, |
| () -> select(path, sql, options, ost))); |
| } |
| return result; |
| } |
| |
| @Override |
| public S3AMultipartUploaderBuilder createMultipartUploader( |
| final Path basePath) |
| throws IOException { |
| StoreContext ctx = createStoreContext(); |
| return new S3AMultipartUploaderBuilder(this, |
| getWriteOperationHelper(), |
| ctx, |
| basePath, |
| new S3AMultipartUploaderStatisticsImpl(ctx::incrementStatistic)); |
| } |
| |
| /** |
| * Build an immutable store context. |
| * If called while the FS is being initialized, |
| * some of the context will be incomplete. |
| * new store context instances should be created as appropriate. |
| * @return the store context of this FS. |
| */ |
| @InterfaceAudience.Private |
| public StoreContext createStoreContext() { |
| return new StoreContextBuilder().setFsURI(getUri()) |
| .setBucket(getBucket()) |
| .setConfiguration(getConf()) |
| .setUsername(getUsername()) |
| .setOwner(owner) |
| .setExecutor(boundedThreadPool) |
| .setExecutorCapacity(executorCapacity) |
| .setInvoker(invoker) |
| .setInstrumentation(getInstrumentation()) |
| .setStorageStatistics(getStorageStatistics()) |
| .setInputPolicy(getInputPolicy()) |
| .setChangeDetectionPolicy(changeDetectionPolicy) |
| .setMultiObjectDeleteEnabled(enableMultiObjectsDelete) |
| .setMetadataStore(metadataStore) |
| .setUseListV1(useListV1) |
| .setContextAccessors(new ContextAccessorsImpl()) |
| .setTimeProvider(getTtlTimeProvider()) |
| .build(); |
| } |
| |
| /** |
| * Create a marker tools operations binding for this store. |
| * @return callbacks for operations. |
| */ |
| @InterfaceAudience.Private |
| public MarkerToolOperations createMarkerToolOperations() { |
| return new MarkerToolOperationsImpl(operationCallbacks); |
| } |
| |
| /** |
| * The implementation of context accessors. |
| */ |
| private class ContextAccessorsImpl implements ContextAccessors { |
| |
| @Override |
| public Path keyToPath(final String key) { |
| return keyToQualifiedPath(key); |
| } |
| |
| @Override |
| public String pathToKey(final Path path) { |
| return S3AFileSystem.this.pathToKey(path); |
| } |
| |
| @Override |
| public File createTempFile(final String prefix, final long size) |
| throws IOException { |
| return createTmpFileForWrite(prefix, size, getConf()); |
| } |
| |
| @Override |
| public String getBucketLocation() throws IOException { |
| return S3AFileSystem.this.getBucketLocation(); |
| } |
| |
| @Override |
| public Path makeQualified(final Path path) { |
| return S3AFileSystem.this.makeQualified(path); |
| } |
| |
| } |
| } |