blob: e6524c91961dc4a19fa25fc6a6323fdd1e234c3e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.s3a.commit;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.Statistic;
import org.apache.hadoop.fs.s3a.commit.magic.MagicCommitTracker;
import org.apache.hadoop.fs.s3a.impl.AbstractStoreOperation;
import org.apache.hadoop.fs.s3a.statistics.PutTrackerStatistics;
import static org.apache.hadoop.fs.s3a.commit.MagicCommitPaths.*;
/**
* Adds the code needed for S3A to support magic committers.
* It's pulled out to keep S3A FS class slightly less complex.
* This class can be instantiated even when magic commit is disabled;
* in this case:
* <ol>
* <li>{@link #isMagicCommitPath(Path)} will always return false.</li>
* <li>{@link #isUnderMagicPath(Path)} will always return false.</li>
* <li>{@link #createTracker(Path, String, PutTrackerStatistics)} will always
* return an instance of {@link PutTracker}.</li>
* </ol>
*
* <p>Important</p>: must not directly or indirectly import a class which
* uses any datatype in hadoop-mapreduce.
*/
public class MagicCommitIntegration extends AbstractStoreOperation {
private static final Logger LOG =
LoggerFactory.getLogger(MagicCommitIntegration.class);
private final S3AFileSystem owner;
private final boolean magicCommitEnabled;
/**
* Instantiate.
* @param owner owner class
* @param magicCommitEnabled is magic commit enabled.
*/
public MagicCommitIntegration(S3AFileSystem owner,
boolean magicCommitEnabled) {
super(owner.createStoreContext());
this.owner = owner;
this.magicCommitEnabled = magicCommitEnabled;
}
/**
* Given an (elements, key) pair, return the key of the final destination of
* the PUT, that is: where the final path is expected to go?
* @param elements path split to elements
* @param key key
* @return key for final put. If this is not a magic commit, the
* same as the key in.
*/
public String keyOfFinalDestination(List<String> elements, String key) {
if (isMagicCommitPath(elements)) {
return elementsToKey(finalDestination(elements));
} else {
return key;
}
}
/**
* Given a path and a key to that same path, create a tracker for it.
* This specific tracker will be chosen based on whether or not
* the path is a magic one.
* Auditing: the span used to invoke
* this method will be the one used to create the write operation helper
* for the commit tracker.
* @param path path of nominal write
* @param key key of path of nominal write
* @param trackerStatistics tracker statistics
* @return the tracker for this operation.
*/
public PutTracker createTracker(Path path, String key,
PutTrackerStatistics trackerStatistics) {
final List<String> elements = splitPathToElements(path);
PutTracker tracker;
if(isMagicFile(elements)) {
// path is of a magic file
if (isMagicCommitPath(elements)) {
final String destKey = keyOfFinalDestination(elements, key);
String pendingsetPath = key + CommitConstants.PENDING_SUFFIX;
getStoreContext().incrementStatistic(
Statistic.COMMITTER_MAGIC_FILES_CREATED);
tracker = new MagicCommitTracker(path,
getStoreContext().getBucket(),
key,
destKey,
pendingsetPath,
owner.getWriteOperationHelper(),
trackerStatistics);
LOG.debug("Created {}", tracker);
} else {
LOG.warn("File being created has a \"magic\" path, but the filesystem"
+ " has magic file support disabled: {}", path);
// downgrade to standard multipart tracking
tracker = new PutTracker(key);
}
} else {
// standard multipart tracking
tracker = new PutTracker(key);
}
return tracker;
}
/**
* This performs the calculation of the final destination of a set
* of elements.
*
* @param elements original (do not edit after this call)
* @return a list of elements, possibly empty
*/
private List<String> finalDestination(List<String> elements) {
return magicCommitEnabled ?
MagicCommitPaths.finalDestination(elements)
: elements;
}
/**
* Is magic commit enabled?
* @return true if magic commit is turned on.
*/
public boolean isMagicCommitEnabled() {
return magicCommitEnabled;
}
/**
* Predicate: is a path a magic commit path?
* @param path path to examine
* @return true if the path is or is under a magic directory
*/
public boolean isMagicCommitPath(Path path) {
return isMagicCommitPath(splitPathToElements(path));
}
/**
* Is this path a magic commit path in this filesystem?
* True if magic commit is enabled, the path is magic
* and the path is not actually a commit metadata file.
* @param elements element list
* @return true if writing path is to be uprated to a magic file write
*/
private boolean isMagicCommitPath(List<String> elements) {
return magicCommitEnabled && isMagicFile(elements);
}
/**
* Is the file a magic file: this predicate doesn't check
* for the FS actually having the magic bit being set.
* @param elements path elements
* @return true if the path is one a magic file write expects.
*/
private boolean isMagicFile(List<String> elements) {
return isMagicPath(elements) &&
!isCommitMetadataFile(elements);
}
/**
* Does this file contain all the commit metadata?
* @param elements path element list
* @return true if this file is one of the commit metadata files.
*/
private boolean isCommitMetadataFile(List<String> elements) {
String last = elements.get(elements.size() - 1);
return last.endsWith(CommitConstants.PENDING_SUFFIX)
|| last.endsWith(CommitConstants.PENDINGSET_SUFFIX);
}
/**
* Is this path in/under a magic path...regardless of file type.
* This is used to optimize create() operations.
* @param path path to check
* @return true if the path is one a magic file write expects.
*/
public boolean isUnderMagicPath(Path path) {
return magicCommitEnabled && isMagicPath(splitPathToElements(path));
}
}