| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.s3a.commit.impl; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.UUID; |
| import java.util.stream.Collectors; |
| import java.util.stream.IntStream; |
| |
| import javax.annotation.Nullable; |
| |
| import com.amazonaws.services.s3.model.MultipartUpload; |
| import com.amazonaws.services.s3.model.PartETag; |
| import com.amazonaws.services.s3.model.UploadPartRequest; |
| import com.amazonaws.services.s3.model.UploadPartResult; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.LocatedFileStatus; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathFilter; |
| import org.apache.hadoop.fs.PathIOException; |
| import org.apache.hadoop.fs.RemoteIterator; |
| import org.apache.hadoop.fs.s3a.S3AFileSystem; |
| import org.apache.hadoop.fs.s3a.S3AUtils; |
| import org.apache.hadoop.fs.s3a.WriteOperations; |
| import org.apache.hadoop.fs.s3a.commit.CommitConstants; |
| import org.apache.hadoop.fs.s3a.commit.PathCommitException; |
| import org.apache.hadoop.fs.s3a.commit.files.PendingSet; |
| import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit; |
| import org.apache.hadoop.fs.s3a.commit.files.SuccessData; |
| import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation; |
| import org.apache.hadoop.fs.s3a.impl.HeaderProcessing; |
| import org.apache.hadoop.fs.s3a.impl.InternalConstants; |
| import org.apache.hadoop.fs.s3a.impl.PutObjectOptions; |
| import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; |
| import org.apache.hadoop.fs.statistics.DurationTracker; |
| import org.apache.hadoop.fs.statistics.IOStatistics; |
| import org.apache.hadoop.fs.statistics.IOStatisticsSource; |
| import org.apache.hadoop.fs.statistics.IOStatisticsContext; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.util.DurationInfo; |
| import org.apache.hadoop.util.Preconditions; |
| import org.apache.hadoop.util.Progressable; |
| import org.apache.hadoop.util.functional.TaskPool; |
| |
| import static java.util.Objects.requireNonNull; |
| import static org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter; |
| import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_COMMIT_JOB; |
| import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_MATERIALIZE_FILE; |
| import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_LOAD_SINGLE_PENDING_FILE; |
| import static org.apache.hadoop.fs.s3a.Statistic.COMMITTER_STAGE_FILE_UPLOAD; |
| import static org.apache.hadoop.fs.s3a.commit.CommitConstants.XA_MAGIC_MARKER; |
| import static org.apache.hadoop.fs.s3a.commit.CommitConstants._SUCCESS; |
| import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration; |
| import static org.apache.hadoop.util.functional.RemoteIterators.cleanupRemoteIterator; |
| |
| /** |
| * The implementation of the various actions a committer needs. |
| * This doesn't implement the protocol/binding to a specific execution engine, |
| * just the operations needed to to build one. |
| * |
| * When invoking FS operations, it assumes that the underlying FS is |
| * handling retries and exception translation: it does not attempt to |
| * duplicate that work. |
| * |
| */ |
| public class CommitOperations extends AbstractStoreOperation |
| implements IOStatisticsSource { |
| private static final Logger LOG = LoggerFactory.getLogger( |
| CommitOperations.class); |
| |
| /** |
| * Destination filesystem. |
| */ |
| private final S3AFileSystem fs; |
| |
| /** Statistics. */ |
| private final CommitterStatistics statistics; |
| |
| /** |
| * Write operations for the destination fs. |
| */ |
| private final WriteOperations writeOperations; |
| |
| /** |
| * Filter to find all {code .pendingset} files. |
| */ |
| public static final PathFilter PENDINGSET_FILTER = |
| path -> path.toString().endsWith(CommitConstants.PENDINGSET_SUFFIX); |
| |
| /** |
| * Filter to find all {code .pending} files. |
| */ |
| public static final PathFilter PENDING_FILTER = |
| path -> path.toString().endsWith(CommitConstants.PENDING_SUFFIX); |
| |
| /** |
| * Instantiate. |
| * @param fs FS to bind to |
| * @throws IOException failure to bind. |
| */ |
| public CommitOperations(S3AFileSystem fs) throws IOException { |
| this(requireNonNull(fs), fs.newCommitterStatistics(), "/"); |
| } |
| |
| /** |
| * Instantiate. This creates a new audit span for |
| * the commit operations. |
| * @param fs FS to bind to |
| * @param committerStatistics committer statistics |
| * @param outputPath destination of work. |
| * @throws IOException failure to bind. |
| */ |
| public CommitOperations(S3AFileSystem fs, |
| CommitterStatistics committerStatistics, |
| String outputPath) throws IOException { |
| super(requireNonNull(fs).createStoreContext()); |
| this.fs = fs; |
| statistics = requireNonNull(committerStatistics); |
| // create a span |
| writeOperations = fs.createWriteOperationHelper( |
| fs.getAuditSpanSource().createSpan( |
| COMMITTER_COMMIT_JOB.getSymbol(), |
| outputPath, null)); |
| } |
| |
| /** |
| * Convert an ordered list of strings to a list of index etag parts. |
| * @param tagIds list of tags |
| * @return same list, now in numbered tuples |
| */ |
| public static List<PartETag> toPartEtags(List<String> tagIds) { |
| return IntStream.range(0, tagIds.size()) |
| .mapToObj(i -> new PartETag(i + 1, tagIds.get(i))) |
| .collect(Collectors.toList()); |
| } |
| |
| @Override |
| public String toString() { |
| return "CommitOperations{" + fs.getUri() + '}'; |
| } |
| |
| /** @return statistics. */ |
| protected CommitterStatistics getStatistics() { |
| return statistics; |
| } |
| |
| @Override |
| public IOStatistics getIOStatistics() { |
| return statistics.getIOStatistics(); |
| } |
| |
| /** |
| * Commit the operation, throwing an exception on any failure. |
| * @param commit commit to execute |
| * @throws IOException on a failure |
| */ |
| public void commitOrFail( |
| final SinglePendingCommit commit) throws IOException { |
| commit(commit, commit.getFilename()).maybeRethrow(); |
| } |
| |
| /** |
| * Commit a single pending commit; exceptions are caught |
| * and converted to an outcome. |
| * @param commit entry to commit |
| * @param origin origin path/string for outcome text |
| * @return the outcome |
| */ |
| public MaybeIOE commit( |
| final SinglePendingCommit commit, |
| final String origin) { |
| LOG.debug("Committing single commit {}", commit); |
| MaybeIOE outcome; |
| String destKey = "unknown destination"; |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Committing file %s size %s", |
| commit.getDestinationKey(), |
| commit.getLength())) { |
| |
| commit.validate(); |
| destKey = commit.getDestinationKey(); |
| long l = trackDuration(statistics, COMMITTER_MATERIALIZE_FILE.getSymbol(), |
| () -> innerCommit(commit)); |
| LOG.debug("Successful commit of file length {}", l); |
| outcome = MaybeIOE.NONE; |
| statistics.commitCompleted(commit.getLength()); |
| } catch (IOException e) { |
| String msg = String.format("Failed to commit upload against %s: %s", |
| destKey, e); |
| LOG.warn(msg, e); |
| outcome = new MaybeIOE(e); |
| statistics.commitFailed(); |
| } catch (Exception e) { |
| String msg = String.format("Failed to commit upload against %s," + |
| " described in %s: %s", destKey, origin, e); |
| LOG.warn(msg, e); |
| outcome = new MaybeIOE(new PathCommitException(origin, msg, e)); |
| statistics.commitFailed(); |
| } |
| return outcome; |
| } |
| |
| /** |
| * Inner commit operation. |
| * @param commit entry to commit |
| * @return bytes committed. |
| * @throws IOException failure |
| */ |
| private long innerCommit( |
| final SinglePendingCommit commit) throws IOException { |
| // finalize the commit |
| writeOperations.commitUpload( |
| commit.getDestinationKey(), |
| commit.getUploadId(), |
| toPartEtags(commit.getEtags()), |
| commit.getLength()); |
| return commit.getLength(); |
| } |
| |
| /** |
| * Locate all files with the pending suffix under a directory. |
| * @param pendingDir directory |
| * @param recursive recursive listing? |
| * @return iterator of all located entries |
| * @throws IOException if there is a problem listing the path. |
| */ |
| public RemoteIterator<LocatedFileStatus> locateAllSinglePendingCommits( |
| Path pendingDir, |
| boolean recursive) throws IOException { |
| return listAndFilter(fs, pendingDir, recursive, PENDING_FILTER); |
| } |
| |
| /** |
| * Load all single pending commits in the directory, using the |
| * outer submitter. |
| * All load failures are logged and then added to list of files which would |
| * not load. |
| * |
| * @param pendingDir directory containing commits |
| * @param recursive do a recursive scan? |
| * @param commitContext commit context |
| * |
| * @return tuple of loaded entries and those pending files which would |
| * not load/validate. |
| * |
| * @throws IOException on a failure to list the files. |
| */ |
| public Pair<PendingSet, |
| List<Pair<LocatedFileStatus, IOException>>> |
| loadSinglePendingCommits(Path pendingDir, |
| boolean recursive, |
| CommitContext commitContext) |
| throws IOException { |
| |
| PendingSet commits = new PendingSet(); |
| List<SinglePendingCommit> pendingFiles = Collections.synchronizedList( |
| new ArrayList<>(1)); |
| List<Pair<LocatedFileStatus, IOException>> failures = Collections.synchronizedList( |
| new ArrayList<>(1)); |
| |
| TaskPool.foreach(locateAllSinglePendingCommits(pendingDir, recursive)) |
| //. stopOnFailure() |
| .suppressExceptions(false) |
| .executeWith(commitContext.getOuterSubmitter()) |
| .run(status -> { |
| Path path = status.getPath(); |
| try { |
| // load the file |
| SinglePendingCommit singleCommit = trackDuration(statistics, |
| COMMITTER_LOAD_SINGLE_PENDING_FILE.getSymbol(), () -> |
| SinglePendingCommit.load(fs, |
| path, |
| status, |
| commitContext.getSinglePendingFileSerializer())); |
| // aggregate stats |
| commits.getIOStatistics() |
| .aggregate(singleCommit.getIOStatistics()); |
| // then clear so they aren't marshalled again. |
| singleCommit.getIOStatistics().clear(); |
| pendingFiles.add(singleCommit); |
| } catch (IOException e) { |
| LOG.warn("Failed to load commit file {}", path, e); |
| failures.add(Pair.of(status, e)); |
| } |
| }); |
| commits.setCommits(pendingFiles); |
| return Pair.of(commits, failures); |
| } |
| |
| /** |
| * Convert any exception to an IOE, if needed. |
| * @param key key to use in a path exception |
| * @param ex exception |
| * @return an IOE, either the passed in value or a new one wrapping the other |
| * exception. |
| */ |
| public IOException makeIOE(String key, Exception ex) { |
| return ex instanceof IOException |
| ? (IOException) ex |
| : new PathCommitException(key, ex.toString(), ex); |
| } |
| |
| /** |
| * Abort the multipart commit supplied. This is the lower level operation |
| * which doesn't generate an outcome, instead raising an exception. |
| * @param commit pending commit to abort |
| * @throws FileNotFoundException if the abort ID is unknown |
| * @throws IOException on any failure |
| */ |
| public void abortSingleCommit(SinglePendingCommit commit) |
| throws IOException { |
| String destKey = commit.getDestinationKey(); |
| String origin = commit.getFilename() != null |
| ? (" defined in " + commit.getFilename()) |
| : ""; |
| String uploadId = commit.getUploadId(); |
| LOG.info("Aborting commit ID {} to object {}{}", uploadId, destKey, origin); |
| abortMultipartCommit(destKey, uploadId); |
| } |
| |
| /** |
| * Create an {@code AbortMultipartUpload} request and POST it to S3, |
| * incrementing statistics afterwards. |
| * @param destKey destination key |
| * @param uploadId upload to cancel |
| * @throws FileNotFoundException if the abort ID is unknown |
| * @throws IOException on any failure |
| */ |
| public void abortMultipartCommit(String destKey, String uploadId) |
| throws IOException { |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Aborting commit ID %s to path %s", uploadId, destKey)) { |
| writeOperations.abortMultipartCommit(destKey, uploadId); |
| } finally { |
| statistics.commitAborted(); |
| } |
| } |
| |
| /** |
| * Enumerate all pending files in a dir/tree, abort. |
| * @param pendingDir directory of pending operations |
| * @param commitContext commit context |
| * @param recursive recurse? |
| * @return the outcome of all the abort operations |
| * @throws IOException if there is a problem listing the path. |
| */ |
| public MaybeIOE abortAllSinglePendingCommits(Path pendingDir, |
| CommitContext commitContext, |
| boolean recursive) |
| throws IOException { |
| Preconditions.checkArgument(pendingDir != null, "null pendingDir"); |
| LOG.debug("Aborting all pending commit filess under {}" |
| + " (recursive={}", pendingDir, recursive); |
| RemoteIterator<LocatedFileStatus> pendingFiles; |
| try { |
| pendingFiles = ls(pendingDir, recursive); |
| } catch (FileNotFoundException fnfe) { |
| LOG.info("No directory to abort {}", pendingDir); |
| return MaybeIOE.NONE; |
| } |
| MaybeIOE outcome = MaybeIOE.NONE; |
| if (!pendingFiles.hasNext()) { |
| LOG.debug("No files to abort under {}", pendingDir); |
| } |
| while (pendingFiles.hasNext()) { |
| final LocatedFileStatus status = pendingFiles.next(); |
| Path pendingFile = status.getPath(); |
| if (pendingFile.getName().endsWith(CommitConstants.PENDING_SUFFIX)) { |
| try { |
| abortSingleCommit(SinglePendingCommit.load(fs, |
| pendingFile, |
| status, |
| commitContext.getSinglePendingFileSerializer())); |
| } catch (FileNotFoundException e) { |
| LOG.debug("listed file already deleted: {}", pendingFile); |
| } catch (IOException | IllegalArgumentException e) { |
| if (MaybeIOE.NONE.equals(outcome)) { |
| outcome = new MaybeIOE(makeIOE(pendingFile.toString(), e)); |
| } |
| } finally { |
| // quietly try to delete the pending file |
| S3AUtils.deleteQuietly(fs, pendingFile, false); |
| } |
| } |
| } |
| cleanupRemoteIterator(pendingFiles); |
| return outcome; |
| } |
| |
| /** |
| * List files. |
| * @param path path |
| * @param recursive recursive listing? |
| * @return iterator |
| * @throws IOException failure |
| */ |
| protected RemoteIterator<LocatedFileStatus> ls(Path path, boolean recursive) |
| throws IOException { |
| return fs.listFiles(path, recursive); |
| } |
| |
| /** |
| * List all pending uploads to the destination FS under a path. |
| * @param dest destination path |
| * @return A list of the pending uploads to any directory under that path. |
| * @throws IOException IO failure |
| */ |
| public List<MultipartUpload> listPendingUploadsUnderPath(Path dest) |
| throws IOException { |
| return writeOperations.listMultipartUploads(fs.pathToKey(dest)); |
| } |
| |
| /** |
| * Abort all pending uploads to the destination FS under a path. |
| * @param dest destination path |
| * @return a count of the number of uploads aborted. |
| * @throws IOException IO failure |
| */ |
| public int abortPendingUploadsUnderPath(Path dest) throws IOException { |
| return writeOperations.abortMultipartUploadsUnderPath(fs.pathToKey(dest)); |
| } |
| |
| /** |
| * Delete any existing {@code _SUCCESS} file. |
| * @param outputPath output directory |
| * @throws IOException IO problem |
| */ |
| public void deleteSuccessMarker(Path outputPath) throws IOException { |
| fs.delete(new Path(outputPath, _SUCCESS), false); |
| } |
| |
| /** |
| * Save the success data to the {@code _SUCCESS} file. |
| * @param outputPath output directory |
| * @param successData success data to save. |
| * @param addMetrics should the FS metrics be added? |
| * @throws IOException IO problem |
| */ |
| public void createSuccessMarker(Path outputPath, |
| SuccessData successData, |
| boolean addMetrics) |
| throws IOException { |
| Preconditions.checkArgument(outputPath != null, "null outputPath"); |
| |
| if (addMetrics) { |
| addFileSystemStatistics(successData.getMetrics()); |
| } |
| |
| // now write |
| Path markerPath = new Path(outputPath, _SUCCESS); |
| LOG.debug("Touching success marker for job {}: {}", markerPath, |
| successData); |
| try (DurationInfo ignored = new DurationInfo(LOG, |
| "Writing success file %s", markerPath)) { |
| successData.save(fs, markerPath, SuccessData.serializer()); |
| } |
| } |
| |
| /** |
| * Revert a pending commit by deleting the destination. |
| * @param commit pending commit |
| * @throws IOException failure |
| */ |
| public void revertCommit(SinglePendingCommit commit) throws IOException { |
| LOG.info("Revert {}", commit); |
| try { |
| writeOperations.revertCommit(commit.getDestinationKey()); |
| } finally { |
| statistics.commitReverted(); |
| } |
| } |
| |
| /** |
| * Upload all the data in the local file, returning the information |
| * needed to commit the work. |
| * @param localFile local file (be a file) |
| * @param destPath destination path |
| * @param partition partition/subdir. Not used |
| * @param uploadPartSize size of upload |
| * @param progress progress callback |
| * @return a pending upload entry |
| * @throws IOException failure |
| */ |
| public SinglePendingCommit uploadFileToPendingCommit(File localFile, |
| Path destPath, |
| String partition, |
| long uploadPartSize, |
| Progressable progress) |
| throws IOException { |
| |
| LOG.debug("Initiating multipart upload from {} to {}", |
| localFile, destPath); |
| Preconditions.checkArgument(destPath != null); |
| if (!localFile.isFile()) { |
| throw new FileNotFoundException("Not a file: " + localFile); |
| } |
| String destURI = destPath.toUri().toString(); |
| String destKey = fs.pathToKey(destPath); |
| String uploadId = null; |
| |
| // flag to indicate to the finally clause that the operation |
| // failed. it is cleared as the last action in the try block. |
| boolean threw = true; |
| final DurationTracker tracker = statistics.trackDuration( |
| COMMITTER_STAGE_FILE_UPLOAD.getSymbol()); |
| try (DurationInfo d = new DurationInfo(LOG, |
| "Upload staged file from %s to %s", |
| localFile.getAbsolutePath(), |
| destPath)) { |
| |
| statistics.commitCreated(); |
| uploadId = writeOperations.initiateMultiPartUpload(destKey, |
| PutObjectOptions.keepingDirs()); |
| long length = localFile.length(); |
| |
| SinglePendingCommit commitData = new SinglePendingCommit(); |
| commitData.setDestinationKey(destKey); |
| commitData.setBucket(fs.getBucket()); |
| commitData.touch(System.currentTimeMillis()); |
| commitData.setUploadId(uploadId); |
| commitData.setUri(destURI); |
| commitData.setText(partition != null ? "partition: " + partition : ""); |
| commitData.setLength(length); |
| |
| long offset = 0; |
| long numParts = (length / uploadPartSize + |
| ((length % uploadPartSize) > 0 ? 1 : 0)); |
| // always write one part, even if it is just an empty one |
| if (numParts == 0) { |
| numParts = 1; |
| } |
| if (numParts > InternalConstants.DEFAULT_UPLOAD_PART_COUNT_LIMIT) { |
| // fail if the file is too big. |
| // it would be possible to be clever here and recalculate the part size, |
| // but this is not currently done. |
| throw new PathIOException(destPath.toString(), |
| String.format("File to upload (size %d)" |
| + " is too big to be uploaded in parts of size %d", |
| numParts, length)); |
| } |
| |
| List<PartETag> parts = new ArrayList<>((int) numParts); |
| |
| LOG.debug("File size is {}, number of parts to upload = {}", |
| length, numParts); |
| for (int partNumber = 1; partNumber <= numParts; partNumber += 1) { |
| progress.progress(); |
| long size = Math.min(length - offset, uploadPartSize); |
| UploadPartRequest part; |
| part = writeOperations.newUploadPartRequest( |
| destKey, |
| uploadId, |
| partNumber, |
| (int) size, |
| null, |
| localFile, |
| offset); |
| part.setLastPart(partNumber == numParts); |
| UploadPartResult partResult = writeOperations.uploadPart(part); |
| offset += uploadPartSize; |
| parts.add(partResult.getPartETag()); |
| } |
| |
| commitData.bindCommitData(parts); |
| statistics.commitUploaded(length); |
| // clear the threw flag. |
| threw = false; |
| return commitData; |
| } finally { |
| if (threw && uploadId != null) { |
| try { |
| abortMultipartCommit(destKey, uploadId); |
| } catch (IOException e) { |
| LOG.error("Failed to abort upload {} to {}", uploadId, destKey, e); |
| } |
| } |
| if (threw) { |
| tracker.failed(); |
| } |
| // close tracker and so report statistics of success/failure |
| tracker.close(); |
| } |
| } |
| |
| /** |
| * Add the filesystem statistics to the map; overwriting anything |
| * with the same name. |
| * @param dest destination map |
| */ |
| public void addFileSystemStatistics(Map<String, Long> dest) { |
| dest.putAll(fs.getInstrumentation().toMap()); |
| } |
| |
| /** |
| * Note that a task has completed. |
| * @param success success flag |
| */ |
| public void taskCompleted(boolean success) { |
| statistics.taskCompleted(success); |
| } |
| |
| /** |
| * Note that a job has completed. |
| * @param success success flag |
| */ |
| public void jobCompleted(boolean success) { |
| statistics.jobCompleted(success); |
| } |
| |
| /** |
| * Create a commit context for a job or task. |
| * |
| * @param context job context |
| * @param path path for all work. |
| * @param committerThreads thread pool size |
| * @param ioStatisticsContext IOStatistics context of current thread |
| * @return the commit context to pass in. |
| * @throws IOException failure. |
| */ |
| public CommitContext createCommitContext( |
| JobContext context, |
| Path path, |
| int committerThreads, |
| IOStatisticsContext ioStatisticsContext) throws IOException { |
| return new CommitContext(this, context, |
| committerThreads, |
| ioStatisticsContext); |
| } |
| |
| /** |
| * Create a stub commit context for tests. |
| * There's no job context. |
| * @param path path for all work. |
| * @param jobId job ID; if null a random UUID is generated. |
| * @param committerThreads number of committer threads. |
| * @return the commit context to pass in. |
| * @throws IOException failure. |
| */ |
| public CommitContext createCommitContextForTesting( |
| Path path, @Nullable String jobId, int committerThreads) throws IOException { |
| final String id = jobId != null |
| ? jobId |
| : UUID.randomUUID().toString(); |
| |
| return new CommitContext(this, |
| getStoreContext().getConfiguration(), |
| id, |
| committerThreads, |
| IOStatisticsContext.getCurrentIOStatisticsContext()); |
| } |
| |
| /** |
| * Get the magic file length of a file. |
| * If the FS doesn't support the API, the attribute is missing or |
| * the parse to long fails, then Optional.empty() is returned. |
| * Static for some easier testability. |
| * @param fs filesystem |
| * @param path path |
| * @return either a length or None. |
| * @throws IOException on error |
| * */ |
| public static Optional<Long> extractMagicFileLength(FileSystem fs, Path path) |
| throws IOException { |
| byte[] bytes; |
| try { |
| bytes = fs.getXAttr(path, XA_MAGIC_MARKER); |
| } catch (UnsupportedOperationException e) { |
| // FS doesn't support xattr. |
| LOG.debug("Filesystem {} doesn't support XAttr API", fs); |
| return Optional.empty(); |
| } |
| return HeaderProcessing.extractXAttrLongValue(bytes); |
| } |
| |
| /** |
| * A holder for a possible IOException; the call {@link #maybeRethrow()} |
| * will throw any exception passed into the constructor, and be a no-op |
| * if none was. |
| * |
| * Why isn't a Java 8 optional used here? The main benefit would be that |
| * {@link #maybeRethrow()} could be done as a map(), but because Java doesn't |
| * allow checked exceptions in a map, the following code is invalid |
| * <pre> |
| * exception.map((e) -> {throw e;} |
| * </pre> |
| * As a result, the code to work with exceptions would be almost as convoluted |
| * as the original. |
| */ |
| public static class MaybeIOE { |
| private final IOException exception; |
| |
| public static final MaybeIOE NONE = new MaybeIOE(null); |
| |
| /** |
| * Construct with an exception. |
| * @param exception exception |
| */ |
| public MaybeIOE(IOException exception) { |
| this.exception = exception; |
| } |
| |
| /** |
| * Get any exception. |
| * @return the exception. |
| */ |
| public IOException getException() { |
| return exception; |
| } |
| |
| /** |
| * Is there an exception in this class? |
| * @return true if there is an exception |
| */ |
| public boolean hasException() { |
| return exception != null; |
| } |
| |
| /** |
| * Rethrow any exception. |
| * @throws IOException the exception field, if non-null. |
| */ |
| public void maybeRethrow() throws IOException { |
| if (exception != null) { |
| throw exception; |
| } |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder("MaybeIOE{"); |
| sb.append(hasException() ? exception : ""); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| /** |
| * Get an instance based on the exception: either a value |
| * or a reference to {@link #NONE}. |
| * @param ex exception |
| * @return an instance. |
| */ |
| public static MaybeIOE of(IOException ex) { |
| return ex != null ? new MaybeIOE(ex) : NONE; |
| } |
| } |
| |
| } |