| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a; |
| |
| import java.io.IOException; |
| import java.io.OutputStream; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.util.ArrayList; |
| import java.util.List; |
| import java.util.Locale; |
| import java.util.StringJoiner; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import com.amazonaws.SdkBaseException; |
| import com.amazonaws.event.ProgressEvent; |
| import com.amazonaws.event.ProgressEventType; |
| import com.amazonaws.event.ProgressListener; |
| import com.amazonaws.services.s3.model.PartETag; |
| import com.amazonaws.services.s3.model.PutObjectRequest; |
| import com.amazonaws.services.s3.model.PutObjectResult; |
| import com.amazonaws.services.s3.model.UploadPartRequest; |
| |
| import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; |
| import org.apache.hadoop.fs.statistics.IOStatisticsAggregator; |
| import org.apache.hadoop.util.Preconditions; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.Futures; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListeningExecutorService; |
| import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.MoreExecutors; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.classification.InterfaceAudience; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.fs.Abortable; |
| import org.apache.hadoop.fs.PathIOException; |
| import org.apache.hadoop.fs.StreamCapabilities; |
| import org.apache.hadoop.fs.Syncable; |
| import org.apache.hadoop.fs.s3a.commit.CommitConstants; |
| import org.apache.hadoop.fs.s3a.commit.PutTracker; |
| import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; |
| import org.apache.hadoop.fs.statistics.DurationTracker; |
| import org.apache.hadoop.fs.statistics.IOStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatisticsLogging; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSource; |
| import org.apache.hadoop.fs.store.LogExactlyOnce; |
| import org.apache.hadoop.util.Progressable; |
| |
| import static java.util.Objects.requireNonNull; |
| import static org.apache.hadoop.fs.s3a.S3AUtils.*; |
| import static org.apache.hadoop.fs.s3a.Statistic.*; |
| import static org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfInvocation; |
| import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; |
| |
| /** |
| * Upload files/parts directly via different buffering mechanisms: |
| * including memory and disk. |
| * |
| * If the stream is closed and no update has started, then the upload |
| * is instead done as a single PUT operation. |
| * |
| * Unstable: statistics and error handling might evolve. |
| * |
| * Syncable is declared as supported so the calls can be |
| * explicitly rejected. |
| */ |
| @InterfaceAudience.Private |
| @InterfaceStability.Unstable |
| class S3ABlockOutputStream extends OutputStream implements |
| StreamCapabilities, IOStatisticsSource, Syncable, Abortable { |
| |
| private static final Logger LOG = |
| LoggerFactory.getLogger(S3ABlockOutputStream.class); |
| |
| private static final String E_NOT_SYNCABLE = |
| "S3A streams are not Syncable. See HADOOP-17597."; |
| |
| /** Object being uploaded. */ |
| private final String key; |
| |
| /** Size of all blocks. */ |
| private final int blockSize; |
| |
| /** IO Statistics. */ |
| private final IOStatistics iostatistics; |
| |
| /** |
| * The options this instance was created with. |
| */ |
| private final BlockOutputStreamBuilder builder; |
| |
| /** Total bytes for uploads submitted so far. */ |
| private long bytesSubmitted; |
| |
| /** Callback for progress. */ |
| private final ProgressListener progressListener; |
| private final ListeningExecutorService executorService; |
| |
| /** |
| * Factory for blocks. |
| */ |
| private final S3ADataBlocks.BlockFactory blockFactory; |
| |
| /** Preallocated byte buffer for writing single characters. */ |
| private final byte[] singleCharWrite = new byte[1]; |
| |
| /** Multipart upload details; null means none started. */ |
| private MultiPartUpload multiPartUpload; |
| |
| /** Closed flag. */ |
| private final AtomicBoolean closed = new AtomicBoolean(false); |
| |
| /** Current data block. Null means none currently active */ |
| private S3ADataBlocks.DataBlock activeBlock; |
| |
| /** Count of blocks uploaded. */ |
| private long blockCount = 0; |
| |
| /** Statistics to build up. */ |
| private final BlockOutputStreamStatistics statistics; |
| |
| /** |
| * Write operation helper; encapsulation of the filesystem operations. |
| * This contains the audit span for the operation, and activates/deactivates |
| * it within calls. |
| */ |
| private final WriteOperations writeOperationHelper; |
| |
| /** |
| * Track multipart put operation. |
| */ |
| private final PutTracker putTracker; |
| |
| /** Should Syncable calls be downgraded? */ |
| private final boolean downgradeSyncableExceptions; |
| |
| /** |
| * Downagraded syncable API calls are only logged at warn |
| * once across the entire process. |
| */ |
| private static final LogExactlyOnce WARN_ON_SYNCABLE = |
| new LogExactlyOnce(LOG); |
| |
| /** is client side encryption enabled? */ |
| private final boolean isCSEEnabled; |
| |
| /** Thread level IOStatistics Aggregator. */ |
| private final IOStatisticsAggregator threadIOStatisticsAggregator; |
| |
| /** |
| * An S3A output stream which uploads partitions in a separate pool of |
| * threads; different {@link S3ADataBlocks.BlockFactory} |
| * instances can control where data is buffered. |
| * @throws IOException on any problem |
| */ |
| S3ABlockOutputStream(BlockOutputStreamBuilder builder) |
| throws IOException { |
| builder.validate(); |
| this.builder = builder; |
| this.key = builder.key; |
| this.blockFactory = builder.blockFactory; |
| this.blockSize = (int) builder.blockSize; |
| this.statistics = builder.statistics; |
| // test instantiations may not provide statistics; |
| this.iostatistics = statistics.getIOStatistics(); |
| this.writeOperationHelper = builder.writeOperations; |
| this.putTracker = builder.putTracker; |
| this.executorService = MoreExecutors.listeningDecorator( |
| builder.executorService); |
| this.multiPartUpload = null; |
| final Progressable progress = builder.progress; |
| this.progressListener = (progress instanceof ProgressListener) ? |
| (ProgressListener) progress |
| : new ProgressableListener(progress); |
| downgradeSyncableExceptions = builder.downgradeSyncableExceptions; |
| // create that first block. This guarantees that an open + close sequence |
| // writes a 0-byte entry. |
| createBlockIfNeeded(); |
| LOG.debug("Initialized S3ABlockOutputStream for {}" + |
| " output to {}", key, activeBlock); |
| if (putTracker.initialize()) { |
| LOG.debug("Put tracker requests multipart upload"); |
| initMultipartUpload(); |
| } |
| this.isCSEEnabled = builder.isCSEEnabled; |
| this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator; |
| } |
| |
| /** |
| * Demand create a destination block. |
| * @return the active block; null if there isn't one. |
| * @throws IOException on any failure to create |
| */ |
| private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() |
| throws IOException { |
| if (activeBlock == null) { |
| blockCount++; |
| if (blockCount>= Constants.MAX_MULTIPART_COUNT) { |
| LOG.error("Number of partitions in stream exceeds limit for S3: " |
| + Constants.MAX_MULTIPART_COUNT + " write may fail."); |
| } |
| activeBlock = blockFactory.create(blockCount, this.blockSize, statistics); |
| } |
| return activeBlock; |
| } |
| |
| /** |
| * Synchronized accessor to the active block. |
| * @return the active block; null if there isn't one. |
| */ |
| private synchronized S3ADataBlocks.DataBlock getActiveBlock() { |
| return activeBlock; |
| } |
| |
| /** |
| * Predicate to query whether or not there is an active block. |
| * @return true if there is an active block. |
| */ |
| private synchronized boolean hasActiveBlock() { |
| return activeBlock != null; |
| } |
| |
| /** |
| * Clear the active block. |
| */ |
| private void clearActiveBlock() { |
| if (activeBlock != null) { |
| LOG.debug("Clearing active block"); |
| } |
| synchronized (this) { |
| activeBlock = null; |
| } |
| } |
| |
| /** |
| * Check for the filesystem being open. |
| * @throws IOException if the filesystem is closed. |
| */ |
| void checkOpen() throws IOException { |
| if (closed.get()) { |
| throw new IOException("Filesystem " + writeOperationHelper + " closed"); |
| } |
| } |
| |
| /** |
| * The flush operation does not trigger an upload; that awaits |
| * the next block being full. What it does do is call {@code flush() } |
| * on the current block, leaving it to choose how to react. |
| * @throws IOException Any IO problem. |
| */ |
| @Override |
| public synchronized void flush() throws IOException { |
| try { |
| checkOpen(); |
| } catch (IOException e) { |
| LOG.warn("Stream closed: " + e.getMessage()); |
| return; |
| } |
| S3ADataBlocks.DataBlock dataBlock = getActiveBlock(); |
| if (dataBlock != null) { |
| dataBlock.flush(); |
| } |
| } |
| |
| /** |
| * Writes a byte to the destination. If this causes the buffer to reach |
| * its limit, the actual upload is submitted to the threadpool. |
| * @param b the int of which the lowest byte is written |
| * @throws IOException on any problem |
| */ |
| @Override |
| public synchronized void write(int b) throws IOException { |
| singleCharWrite[0] = (byte)b; |
| write(singleCharWrite, 0, 1); |
| } |
| |
| /** |
| * Writes a range of bytes from to the memory buffer. If this causes the |
| * buffer to reach its limit, the actual upload is submitted to the |
| * threadpool and the remainder of the array is written to memory |
| * (recursively). |
| * @param source byte array containing |
| * @param offset offset in array where to start |
| * @param len number of bytes to be written |
| * @throws IOException on any problem |
| */ |
| @Override |
| public synchronized void write(byte[] source, int offset, int len) |
| throws IOException { |
| |
| S3ADataBlocks.validateWriteArgs(source, offset, len); |
| checkOpen(); |
| if (len == 0) { |
| return; |
| } |
| statistics.writeBytes(len); |
| S3ADataBlocks.DataBlock block = createBlockIfNeeded(); |
| int written = block.write(source, offset, len); |
| int remainingCapacity = block.remainingCapacity(); |
| if (written < len) { |
| // not everything was written —the block has run out |
| // of capacity |
| // Trigger an upload then process the remainder. |
| LOG.debug("writing more data than block has capacity -triggering upload"); |
| uploadCurrentBlock(false); |
| // tail recursion is mildly expensive, but given buffer sizes must be MB. |
| // it's unlikely to recurse very deeply. |
| this.write(source, offset + written, len - written); |
| } else { |
| if (remainingCapacity == 0 && !isCSEEnabled) { |
| // the whole buffer is done, trigger an upload |
| uploadCurrentBlock(false); |
| } |
| } |
| } |
| |
| /** |
| * Start an asynchronous upload of the current block. |
| * |
| * @param isLast true, if part being uploaded is last and client side |
| * encryption is enabled. |
| * @throws IOException Problems opening the destination for upload, |
| * initializing the upload, or if a previous operation |
| * has failed. |
| */ |
| @Retries.RetryTranslated |
| private synchronized void uploadCurrentBlock(boolean isLast) |
| throws IOException { |
| Preconditions.checkState(hasActiveBlock(), "No active block"); |
| LOG.debug("Writing block # {}", blockCount); |
| initMultipartUpload(); |
| try { |
| multiPartUpload.uploadBlockAsync(getActiveBlock(), isLast); |
| bytesSubmitted += getActiveBlock().dataSize(); |
| } finally { |
| // set the block to null, so the next write will create a new block. |
| clearActiveBlock(); |
| } |
| } |
| |
| /** |
| * Init multipart upload. Assumption: this is called from |
| * a synchronized block. |
| * Note that this makes a blocking HTTPS request to the far end, so |
| * can take time and potentially fail. |
| * @throws IOException failure to initialize the upload |
| */ |
| @Retries.RetryTranslated |
| private void initMultipartUpload() throws IOException { |
| if (multiPartUpload == null) { |
| LOG.debug("Initiating Multipart upload"); |
| multiPartUpload = new MultiPartUpload(key); |
| } |
| } |
| |
| /** |
| * Close the stream. |
| * |
| * This will not return until the upload is complete |
| * or the attempt to perform the upload has failed. |
| * Exceptions raised in this method are indicative that the write has |
| * failed and data is at risk of being lost. |
| * @throws IOException on any failure. |
| */ |
| @Override |
| public void close() throws IOException { |
| if (closed.getAndSet(true)) { |
| // already closed |
| LOG.debug("Ignoring close() as stream is already closed"); |
| return; |
| } |
| S3ADataBlocks.DataBlock block = getActiveBlock(); |
| boolean hasBlock = hasActiveBlock(); |
| LOG.debug("{}: Closing block #{}: current block= {}", |
| this, |
| blockCount, |
| hasBlock ? block : "(none)"); |
| long bytes = 0; |
| try { |
| if (multiPartUpload == null) { |
| if (hasBlock) { |
| // no uploads of data have taken place, put the single block up. |
| // This must happen even if there is no data, so that 0 byte files |
| // are created. |
| bytes = putObject(); |
| bytesSubmitted = bytes; |
| } |
| } else { |
| // there's an MPU in progress'; |
| // IF there is more data to upload, or no data has yet been uploaded, |
| // PUT the final block |
| if (hasBlock && |
| (block.hasData() || multiPartUpload.getPartsSubmitted() == 0)) { |
| // send last part and set the value of isLastPart to true. |
| // Necessary to set this "true" in case of client side encryption. |
| uploadCurrentBlock(true); |
| } |
| // wait for the partial uploads to finish |
| final List<PartETag> partETags = |
| multiPartUpload.waitForAllPartUploads(); |
| bytes = bytesSubmitted; |
| |
| // then complete the operation |
| if (putTracker.aboutToComplete(multiPartUpload.getUploadId(), |
| partETags, |
| bytes, |
| iostatistics)) { |
| multiPartUpload.complete(partETags); |
| } else { |
| LOG.info("File {} will be visible when the job is committed", key); |
| } |
| } |
| if (!putTracker.outputImmediatelyVisible()) { |
| // track the number of bytes uploaded as commit operations. |
| statistics.commitUploaded(bytes); |
| } |
| LOG.debug("Upload complete to {} by {}", key, writeOperationHelper); |
| } catch (IOException ioe) { |
| // the operation failed. |
| // if this happened during a multipart upload, abort the |
| // operation, so as to not leave (billable) data |
| // pending on the bucket |
| maybeAbortMultipart(); |
| writeOperationHelper.writeFailed(ioe); |
| throw ioe; |
| } finally { |
| cleanupOnClose(); |
| } |
| // Note end of write. This does not change the state of the remote FS. |
| writeOperationHelper.writeSuccessful(bytes); |
| } |
| |
| /** |
| * Final operations in close/abort of stream. |
| * Shuts down block factory, closes any active block, |
| * and pushes out statistics. |
| */ |
| private synchronized void cleanupOnClose() { |
| cleanupWithLogger(LOG, getActiveBlock(), blockFactory); |
| mergeThreadIOStatistics(statistics.getIOStatistics()); |
| LOG.debug("Statistics: {}", statistics); |
| cleanupWithLogger(LOG, statistics); |
| clearActiveBlock(); |
| } |
| |
| /** |
| * Merging the current thread's IOStatistics with the current IOStatistics |
| * context. |
| * |
| * @param streamStatistics Stream statistics to be merged into thread |
| * statistics aggregator. |
| */ |
| private void mergeThreadIOStatistics(IOStatistics streamStatistics) { |
| getThreadIOStatistics().aggregate(streamStatistics); |
| } |
| |
| /** |
| * Best effort abort of the multipart upload; sets |
| * the field to null afterwards. |
| * @return any exception caught during the operation. |
| */ |
| private synchronized IOException maybeAbortMultipart() { |
| if (multiPartUpload != null) { |
| final IOException ioe = multiPartUpload.abort(); |
| multiPartUpload = null; |
| return ioe; |
| } else { |
| return null; |
| } |
| } |
| |
| /** |
| * Abort any active uploads, enter closed state. |
| * @return the outcome |
| */ |
| @Override |
| public AbortableResult abort() { |
| if (closed.getAndSet(true)) { |
| // already closed |
| LOG.debug("Ignoring abort() as stream is already closed"); |
| return new AbortableResultImpl(true, null); |
| } |
| try (DurationTracker d = |
| statistics.trackDuration(INVOCATION_ABORT.getSymbol())) { |
| return new AbortableResultImpl(false, maybeAbortMultipart()); |
| } finally { |
| cleanupOnClose(); |
| } |
| } |
| |
| /** |
| * Abortable result. |
| */ |
| private static final class AbortableResultImpl implements AbortableResult { |
| |
| /** |
| * Had the stream already been closed/aborted? |
| */ |
| private final boolean alreadyClosed; |
| |
| /** |
| * Was any exception raised during non-essential |
| * cleanup actions (i.e. MPU abort)? |
| */ |
| private final IOException anyCleanupException; |
| |
| /** |
| * Constructor. |
| * @param alreadyClosed Had the stream already been closed/aborted? |
| * @param anyCleanupException Was any exception raised during cleanup? |
| */ |
| private AbortableResultImpl(final boolean alreadyClosed, |
| final IOException anyCleanupException) { |
| this.alreadyClosed = alreadyClosed; |
| this.anyCleanupException = anyCleanupException; |
| } |
| |
| @Override |
| public boolean alreadyClosed() { |
| return alreadyClosed; |
| } |
| |
| @Override |
| public IOException anyCleanupException() { |
| return anyCleanupException; |
| } |
| |
| @Override |
| public String toString() { |
| return new StringJoiner(", ", |
| AbortableResultImpl.class.getSimpleName() + "[", "]") |
| .add("alreadyClosed=" + alreadyClosed) |
| .add("anyCleanupException=" + anyCleanupException) |
| .toString(); |
| } |
| } |
| |
| /** |
| * Upload the current block as a single PUT request; if the buffer |
| * is empty a 0-byte PUT will be invoked, as it is needed to create an |
| * entry at the far end. |
| * @throws IOException any problem. |
| * @return number of bytes uploaded. If thread was interrupted while |
| * waiting for upload to complete, returns zero with interrupted flag set |
| * on this thread. |
| */ |
| private int putObject() throws IOException { |
| LOG.debug("Executing regular upload for {}", writeOperationHelper); |
| |
| final S3ADataBlocks.DataBlock block = getActiveBlock(); |
| int size = block.dataSize(); |
| final S3ADataBlocks.BlockUploadData uploadData = block.startUpload(); |
| final PutObjectRequest putObjectRequest = uploadData.hasFile() ? |
| writeOperationHelper.createPutObjectRequest( |
| key, |
| uploadData.getFile(), |
| builder.putOptions) |
| : writeOperationHelper.createPutObjectRequest( |
| key, |
| uploadData.getUploadStream(), |
| size, |
| builder.putOptions); |
| BlockUploadProgress callback = |
| new BlockUploadProgress( |
| block, progressListener, now()); |
| putObjectRequest.setGeneralProgressListener(callback); |
| statistics.blockUploadQueued(size); |
| ListenableFuture<PutObjectResult> putObjectResult = |
| executorService.submit(() -> { |
| try { |
| // the putObject call automatically closes the input |
| // stream afterwards. |
| return writeOperationHelper.putObject(putObjectRequest, builder.putOptions); |
| } finally { |
| cleanupWithLogger(LOG, uploadData, block); |
| } |
| }); |
| clearActiveBlock(); |
| //wait for completion |
| try { |
| putObjectResult.get(); |
| return size; |
| } catch (InterruptedException ie) { |
| LOG.warn("Interrupted object upload", ie); |
| Thread.currentThread().interrupt(); |
| return 0; |
| } catch (ExecutionException ee) { |
| throw extractException("regular upload", key, ee); |
| } |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "S3ABlockOutputStream{"); |
| sb.append(writeOperationHelper.toString()); |
| sb.append(", blockSize=").append(blockSize); |
| // unsynced access; risks consistency in exchange for no risk of deadlock. |
| S3ADataBlocks.DataBlock block = activeBlock; |
| if (block != null) { |
| sb.append(", activeBlock=").append(block); |
| } |
| sb.append(" Statistics=") |
| .append(IOStatisticsLogging.ioStatisticsSourceToString(this)); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| private void incrementWriteOperations() { |
| writeOperationHelper.incrementWriteOperations(); |
| } |
| |
| /** |
| * Current time in milliseconds. |
| * @return time |
| */ |
| private Instant now() { |
| return Instant.now(); |
| } |
| |
| /** |
| * Get the statistics for this stream. |
| * @return stream statistics |
| */ |
| BlockOutputStreamStatistics getStatistics() { |
| return statistics; |
| } |
| |
| /** |
| * Return the stream capabilities. |
| * This stream always returns false when queried about hflush and hsync. |
| * If asked about {@link CommitConstants#STREAM_CAPABILITY_MAGIC_OUTPUT} |
| * it will return true iff this is an active "magic" output stream. |
| * @param capability string to query the stream support for. |
| * @return true if the capability is supported by this instance. |
| */ |
| @SuppressWarnings("deprecation") |
| @Override |
| public boolean hasCapability(String capability) { |
| switch (capability.toLowerCase(Locale.ENGLISH)) { |
| |
| // does the output stream have delayed visibility |
| case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT: |
| case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT_OLD: |
| return !putTracker.outputImmediatelyVisible(); |
| |
| // The flush/sync options are absolutely not supported |
| case StreamCapabilities.HFLUSH: |
| case StreamCapabilities.HSYNC: |
| return false; |
| |
| // yes, we do statistics. |
| case StreamCapabilities.IOSTATISTICS: |
| return true; |
| |
| // S3A supports abort. |
| case StreamCapabilities.ABORTABLE_STREAM: |
| return true; |
| |
| // IOStatistics context support for thread-level IOStatistics. |
| case StreamCapabilities.IOSTATISTICS_CONTEXT: |
| return true; |
| |
| default: |
| return false; |
| } |
| } |
| |
| @Override |
| public void hflush() throws IOException { |
| statistics.hflushInvoked(); |
| handleSyncableInvocation(); |
| } |
| |
| @Override |
| public void hsync() throws IOException { |
| statistics.hsyncInvoked(); |
| handleSyncableInvocation(); |
| } |
| |
| /** |
| * Shared processing of Syncable operation reporting/downgrade. |
| */ |
| private void handleSyncableInvocation() { |
| final UnsupportedOperationException ex |
| = new UnsupportedOperationException(E_NOT_SYNCABLE); |
| if (!downgradeSyncableExceptions) { |
| throw ex; |
| } |
| // downgrading. |
| WARN_ON_SYNCABLE.warn("Application invoked the Syncable API against" |
| + " stream writing to {}. This is Unsupported", |
| key); |
| // and log at debug |
| LOG.debug("Downgrading Syncable call", ex); |
| } |
| |
| @Override |
| public IOStatistics getIOStatistics() { |
| return iostatistics; |
| } |
| |
| /** |
| * Get the IOStatistics aggregator passed in the builder. |
| * @return an aggregator |
| */ |
| protected IOStatisticsAggregator getThreadIOStatistics() { |
| return threadIOStatisticsAggregator; |
| } |
| |
| /** |
| * Multiple partition upload. |
| */ |
| private class MultiPartUpload { |
| private final String uploadId; |
| private final List<ListenableFuture<PartETag>> partETagsFutures; |
| private int partsSubmitted; |
| private int partsUploaded; |
| private long bytesSubmitted; |
| |
| /** |
| * Any IOException raised during block upload. |
| * if non-null, then close() MUST NOT complete |
| * the file upload. |
| */ |
| private IOException blockUploadFailure; |
| |
| /** |
| * Constructor. |
| * Initiates the MPU request against S3. |
| * @param key upload destination |
| * @throws IOException failure |
| */ |
| |
| @Retries.RetryTranslated |
| MultiPartUpload(String key) throws IOException { |
| this.uploadId = trackDuration(statistics, |
| OBJECT_MULTIPART_UPLOAD_INITIATED.getSymbol(), |
| () -> writeOperationHelper.initiateMultiPartUpload( |
| key, |
| builder.putOptions)); |
| |
| this.partETagsFutures = new ArrayList<>(2); |
| LOG.debug("Initiated multi-part upload for {} with " + |
| "id '{}'", writeOperationHelper, uploadId); |
| } |
| |
| /** |
| * Get a count of parts submitted. |
| * @return the number of parts submitted; will always be >= the |
| * value of {@link #getPartsUploaded()} |
| */ |
| public int getPartsSubmitted() { |
| return partsSubmitted; |
| } |
| |
| /** |
| * Count of parts actually uploaded. |
| * @return the count of successfully completed part uploads. |
| */ |
| public int getPartsUploaded() { |
| return partsUploaded; |
| } |
| |
| /** |
| * Get the upload ID; will be null after construction completes. |
| * @return the upload ID |
| */ |
| public String getUploadId() { |
| return uploadId; |
| } |
| |
| /** |
| * Get the count of bytes submitted. |
| * @return the current upload size. |
| */ |
| public long getBytesSubmitted() { |
| return bytesSubmitted; |
| } |
| |
| /** |
| * A block upload has failed. |
| * Recorded it if there has been no previous failure. |
| * @param e error |
| */ |
| public void noteUploadFailure(final IOException e) { |
| if (blockUploadFailure == null) { |
| blockUploadFailure = e; |
| } |
| } |
| |
| /** |
| * If there is a block upload failure -throw it. |
| * @throws IOException if one has already been caught. |
| */ |
| public void maybeRethrowUploadFailure() throws IOException { |
| if (blockUploadFailure != null) { |
| throw blockUploadFailure; |
| } |
| } |
| |
| /** |
| * Upload a block of data. |
| * This will take the block |
| * @param block block to upload |
| * @throws IOException upload failure |
| * @throws PathIOException if too many blocks were written |
| */ |
| private void uploadBlockAsync(final S3ADataBlocks.DataBlock block, |
| Boolean isLast) |
| throws IOException { |
| LOG.debug("Queueing upload of {} for upload {}", block, uploadId); |
| Preconditions.checkNotNull(uploadId, "Null uploadId"); |
| maybeRethrowUploadFailure(); |
| partsSubmitted++; |
| final int size = block.dataSize(); |
| bytesSubmitted += size; |
| final int currentPartNumber = partETagsFutures.size() + 1; |
| final UploadPartRequest request; |
| final S3ADataBlocks.BlockUploadData uploadData; |
| try { |
| uploadData = block.startUpload(); |
| request = writeOperationHelper.newUploadPartRequest( |
| key, |
| uploadId, |
| currentPartNumber, |
| size, |
| uploadData.getUploadStream(), |
| uploadData.getFile(), |
| 0L); |
| request.setLastPart(isLast); |
| } catch (SdkBaseException aws) { |
| // catch and translate |
| IOException e = translateException("upload", key, aws); |
| // failure to start the upload. |
| noteUploadFailure(e); |
| throw e; |
| } catch (IOException e) { |
| // failure to start the upload. |
| noteUploadFailure(e); |
| throw e; |
| } |
| BlockUploadProgress callback = |
| new BlockUploadProgress( |
| block, progressListener, now()); |
| request.setGeneralProgressListener(callback); |
| statistics.blockUploadQueued(block.dataSize()); |
| ListenableFuture<PartETag> partETagFuture = |
| executorService.submit(() -> { |
| // this is the queued upload operation |
| // do the upload |
| try { |
| LOG.debug("Uploading part {} for id '{}'", |
| currentPartNumber, uploadId); |
| PartETag partETag = writeOperationHelper.uploadPart(request) |
| .getPartETag(); |
| LOG.debug("Completed upload of {} to part {}", |
| block, partETag.getETag()); |
| LOG.debug("Stream statistics of {}", statistics); |
| partsUploaded++; |
| return partETag; |
| } catch (IOException e) { |
| // save immediately. |
| noteUploadFailure(e); |
| throw e; |
| } finally { |
| // close the stream and block |
| cleanupWithLogger(LOG, uploadData, block); |
| } |
| }); |
| partETagsFutures.add(partETagFuture); |
| } |
| |
| /** |
| * Block awaiting all outstanding uploads to complete. |
| * @return list of results |
| * @throws IOException IO Problems |
| */ |
| private List<PartETag> waitForAllPartUploads() throws IOException { |
| LOG.debug("Waiting for {} uploads to complete", partETagsFutures.size()); |
| try { |
| return Futures.allAsList(partETagsFutures).get(); |
| } catch (InterruptedException ie) { |
| LOG.warn("Interrupted partUpload", ie); |
| Thread.currentThread().interrupt(); |
| return null; |
| } catch (ExecutionException ee) { |
| //there is no way of recovering so abort |
| //cancel all partUploads |
| LOG.debug("While waiting for upload completion", ee); |
| //abort multipartupload |
| this.abort(); |
| throw extractException("Multi-part upload with id '" + uploadId |
| + "' to " + key, key, ee); |
| } |
| } |
| |
| /** |
| * Cancel all active uploads. |
| */ |
| private void cancelAllActiveFutures() { |
| LOG.debug("Cancelling futures"); |
| for (ListenableFuture<PartETag> future : partETagsFutures) { |
| future.cancel(true); |
| } |
| } |
| |
| /** |
| * This completes a multipart upload. |
| * Sometimes it fails; here retries are handled to avoid losing all data |
| * on a transient failure. |
| * @param partETags list of partial uploads |
| * @throws IOException on any problem |
| */ |
| private void complete(List<PartETag> partETags) |
| throws IOException { |
| maybeRethrowUploadFailure(); |
| AtomicInteger errorCount = new AtomicInteger(0); |
| try { |
| trackDurationOfInvocation(statistics, |
| MULTIPART_UPLOAD_COMPLETED.getSymbol(), () -> { |
| writeOperationHelper.completeMPUwithRetries(key, |
| uploadId, |
| partETags, |
| bytesSubmitted, |
| errorCount, |
| builder.putOptions); |
| }); |
| } finally { |
| statistics.exceptionInMultipartComplete(errorCount.get()); |
| } |
| } |
| |
| /** |
| * Abort a multi-part upload. Retries are not attempted on failures. |
| * IOExceptions are caught; this is expected to be run as a cleanup process. |
| * @return any caught exception. |
| */ |
| private IOException abort() { |
| LOG.debug("Aborting upload"); |
| try { |
| trackDurationOfInvocation(statistics, |
| OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () -> { |
| cancelAllActiveFutures(); |
| writeOperationHelper.abortMultipartUpload(key, uploadId, |
| false, null); |
| }); |
| return null; |
| } catch (IOException e) { |
| // this point is only reached if the operation failed more than |
| // the allowed retry count |
| LOG.warn("Unable to abort multipart upload," |
| + " you may need to purge uploaded parts", e); |
| statistics.exceptionInMultipartAbort(); |
| return e; |
| } |
| } |
| } |
| |
| /** |
| * The upload progress listener registered for events returned |
| * during the upload of a single block. |
| * It updates statistics and handles the end of the upload. |
| * Transfer failures are logged at WARN. |
| */ |
| private final class BlockUploadProgress implements ProgressListener { |
| private final S3ADataBlocks.DataBlock block; |
| private final ProgressListener nextListener; |
| private final Instant transferQueueTime; |
| private Instant transferStartTime; |
| |
| /** |
| * Track the progress of a single block upload. |
| * @param block block to monitor |
| * @param nextListener optional next progress listener |
| * @param transferQueueTime time the block was transferred |
| * into the queue |
| */ |
| private BlockUploadProgress(S3ADataBlocks.DataBlock block, |
| ProgressListener nextListener, |
| Instant transferQueueTime) { |
| this.block = block; |
| this.transferQueueTime = transferQueueTime; |
| this.nextListener = nextListener; |
| } |
| |
| @Override |
| public void progressChanged(ProgressEvent progressEvent) { |
| ProgressEventType eventType = progressEvent.getEventType(); |
| long bytesTransferred = progressEvent.getBytesTransferred(); |
| |
| int size = block.dataSize(); |
| switch (eventType) { |
| |
| case REQUEST_BYTE_TRANSFER_EVENT: |
| // bytes uploaded |
| statistics.bytesTransferred(bytesTransferred); |
| break; |
| |
| case TRANSFER_PART_STARTED_EVENT: |
| transferStartTime = now(); |
| statistics.blockUploadStarted( |
| Duration.between(transferQueueTime, transferStartTime), |
| size); |
| incrementWriteOperations(); |
| break; |
| |
| case TRANSFER_PART_COMPLETED_EVENT: |
| statistics.blockUploadCompleted( |
| Duration.between(transferStartTime, now()), |
| size); |
| break; |
| |
| case TRANSFER_PART_FAILED_EVENT: |
| statistics.blockUploadFailed( |
| Duration.between(transferStartTime, now()), |
| size); |
| LOG.warn("Transfer failure of block {}", block); |
| break; |
| |
| default: |
| // nothing |
| } |
| |
| if (nextListener != null) { |
| nextListener.progressChanged(progressEvent); |
| } |
| } |
| } |
| |
| /** |
| * Bridge from AWS {@code ProgressListener} to Hadoop {@link Progressable}. |
| */ |
| private static class ProgressableListener implements ProgressListener { |
| private final Progressable progress; |
| |
| ProgressableListener(Progressable progress) { |
| this.progress = progress; |
| } |
| |
| public void progressChanged(ProgressEvent progressEvent) { |
| if (progress != null) { |
| progress.progress(); |
| } |
| } |
| } |
| |
| /** |
| * Create a builder. |
| * @return |
| */ |
| public static BlockOutputStreamBuilder builder() { |
| return new BlockOutputStreamBuilder(); |
| } |
| |
| /** |
| * Builder class for constructing an output stream. |
| */ |
| public static final class BlockOutputStreamBuilder { |
| |
| /** S3 object to work on. */ |
| private String key; |
| |
| /** The executor service to use to schedule work. */ |
| private ExecutorService executorService; |
| |
| /** |
| * Report progress in order to prevent timeouts. |
| * this object implements {@code ProgressListener} then it will be |
| * directly wired up to the AWS client, so receive detailed progress |
| * information. |
| */ |
| private Progressable progress; |
| |
| /** The size of a single block. */ |
| private long blockSize; |
| |
| /** The factory for creating stream destinations. */ |
| private S3ADataBlocks.BlockFactory blockFactory; |
| |
| /** The output statistics for the stream. */ |
| private BlockOutputStreamStatistics statistics = |
| EMPTY_BLOCK_OUTPUT_STREAM_STATISTICS; |
| |
| /** Operations to write data. */ |
| private WriteOperations writeOperations; |
| |
| /** put tracking for commit support. */ |
| private PutTracker putTracker; |
| |
| /** Should Syncable calls be downgraded? */ |
| private boolean downgradeSyncableExceptions; |
| |
| /** is Client side Encryption enabled? */ |
| private boolean isCSEEnabled; |
| |
| /** |
| * Put object options. |
| */ |
| private PutObjectOptions putOptions; |
| |
| /** |
| * thread-level IOStatistics Aggregator. |
| */ |
| private IOStatisticsAggregator ioStatisticsAggregator; |
| |
| private BlockOutputStreamBuilder() { |
| } |
| |
| /** |
| * Validate the arguments. |
| */ |
| public void validate() { |
| requireNonNull(key, "null key"); |
| requireNonNull(executorService, "null executorService"); |
| requireNonNull(blockFactory, "null blockFactory"); |
| requireNonNull(statistics, "null statistics"); |
| requireNonNull(writeOperations, "null writeOperationHelper"); |
| requireNonNull(putTracker, "null putTracker"); |
| requireNonNull(putOptions, "null putOptions"); |
| Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, |
| "Block size is too small: %s", blockSize); |
| requireNonNull(ioStatisticsAggregator, "null ioStatisticsAggregator"); |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withKey( |
| final String value) { |
| key = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withExecutorService( |
| final ExecutorService value) { |
| executorService = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withProgress( |
| final Progressable value) { |
| progress = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withBlockSize( |
| final long value) { |
| blockSize = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withBlockFactory( |
| final S3ADataBlocks.BlockFactory value) { |
| blockFactory = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withStatistics( |
| final BlockOutputStreamStatistics value) { |
| statistics = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withWriteOperations( |
| final WriteOperationHelper value) { |
| writeOperations = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withPutTracker( |
| final PutTracker value) { |
| putTracker = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withDowngradeSyncableExceptions( |
| final boolean value) { |
| downgradeSyncableExceptions = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withCSEEnabled(boolean value) { |
| isCSEEnabled = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withPutOptions( |
| final PutObjectOptions value) { |
| putOptions = value; |
| return this; |
| } |
| |
| /** |
| * Set builder value. |
| * |
| * @param value new value |
| * @return the builder |
| */ |
| public BlockOutputStreamBuilder withIOStatisticsAggregator( |
| final IOStatisticsAggregator value) { |
| ioStatisticsAggregator = value; |
| return this; |
| } |
| } |
| } |