blob: aa6babea4d24b284db619e9204fefb5ae38fbc7f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package org.apache.hadoop.fs.adl;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.microsoft.azure.datalake.store.ADLStoreClient;
import com.microsoft.azure.datalake.store.ADLStoreOptions;
import com.microsoft.azure.datalake.store.DirectoryEntry;
import com.microsoft.azure.datalake.store.IfExists;
import com.microsoft.azure.datalake.store.LatencyTracker;
import com.microsoft.azure.datalake.store.UserGroupRepresentation;
import com.microsoft.azure.datalake.store.oauth2.AccessTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.ClientCredsTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.DeviceCodeTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.MsiTokenProvider;
import com.microsoft.azure.datalake.store.oauth2.RefreshTokenBasedTokenProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.ContentSummary.Builder;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.adl.oauth2.AzureADTokenProvider;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.ProviderUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.VersionInfo;
import static org.apache.hadoop.fs.adl.AdlConfKeys.*;
/**
* A FileSystem to access Azure Data Lake Store.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class AdlFileSystem extends FileSystem {
private static final Logger LOG =
LoggerFactory.getLogger(AdlFileSystem.class);
public static final String SCHEME = "adl";
static final int DEFAULT_PORT = 443;
private URI uri;
private String userName;
private boolean overrideOwner;
private ADLStoreClient adlClient;
private Path workingDirectory;
private boolean aclBitStatus;
private UserGroupRepresentation oidOrUpn;
// retained for tests
private AccessTokenProvider tokenProvider;
private AzureADTokenProvider azureTokenProvider;
static {
AdlConfKeys.addDeprecatedKeys();
}
@Override
public String getScheme() {
return SCHEME;
}
public URI getUri() {
return uri;
}
@Override
public int getDefaultPort() {
return DEFAULT_PORT;
}
@Override
public boolean supportsSymlinks() {
return false;
}
/**
* Called after a new FileSystem instance is constructed.
*
* @param storeUri a uri whose authority section names the host, port,
* etc. for this FileSystem
* @param originalConf the configuration to use for the FS. The account-
* specific options are patched over the base ones
* before any use is made of the config.
*/
@Override
public void initialize(URI storeUri, Configuration originalConf)
throws IOException {
String hostname = storeUri.getHost();
String accountName = getAccountNameFromFQDN(hostname);
Configuration conf = propagateAccountOptions(originalConf, accountName);
super.initialize(storeUri, conf);
this.setConf(conf);
this.uri = URI
.create(storeUri.getScheme() + "://" + storeUri.getAuthority());
try {
userName = UserGroupInformation.getCurrentUser().getShortUserName();
} catch (IOException e) {
userName = "hadoop";
LOG.warn("Got exception when getting Hadoop user name."
+ " Set the user name to '" + userName + "'.", e);
}
this.setWorkingDirectory(getHomeDirectory());
overrideOwner = getConf().getBoolean(ADL_DEBUG_OVERRIDE_LOCAL_USER_AS_OWNER,
ADL_DEBUG_SET_LOCAL_USER_AS_OWNER_DEFAULT);
aclBitStatus = conf.getBoolean(ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION,
ADL_SUPPORT_ACL_BIT_IN_FSPERMISSION_DEFAULT);
String accountFQDN = null;
String mountPoint = null;
if (!hostname.contains(".") && !hostname.equalsIgnoreCase(
"localhost")) { // this is a symbolic name. Resolve it.
String hostNameProperty = "dfs.adls." + hostname + ".hostname";
String mountPointProperty = "dfs.adls." + hostname + ".mountpoint";
accountFQDN = getNonEmptyVal(conf, hostNameProperty);
mountPoint = getNonEmptyVal(conf, mountPointProperty);
} else {
accountFQDN = hostname;
}
if (storeUri.getPort() > 0) {
accountFQDN = accountFQDN + ":" + storeUri.getPort();
}
adlClient = ADLStoreClient
.createClient(accountFQDN, getAccessTokenProvider(conf));
ADLStoreOptions options = new ADLStoreOptions();
options.enableThrowingRemoteExceptions();
if (getTransportScheme().equalsIgnoreCase(INSECURE_TRANSPORT_SCHEME)) {
options.setInsecureTransport();
}
if (mountPoint != null) {
options.setFilePathPrefix(mountPoint);
}
String clusterName = conf.get(ADL_EVENTS_TRACKING_CLUSTERNAME, "UNKNOWN");
String clusterType = conf.get(ADL_EVENTS_TRACKING_CLUSTERTYPE, "UNKNOWN");
String clientVersion = ADL_HADOOP_CLIENT_NAME + (StringUtils
.isEmpty(VersionInfo.getVersion().trim()) ?
ADL_HADOOP_CLIENT_VERSION.trim() :
VersionInfo.getVersion().trim());
options.setUserAgentSuffix(clientVersion + "/" +
VersionInfo.getVersion().trim() + "/" + clusterName + "/"
+ clusterType);
adlClient.setOptions(options);
boolean trackLatency = conf
.getBoolean(LATENCY_TRACKER_KEY, LATENCY_TRACKER_DEFAULT);
if (!trackLatency) {
LatencyTracker.disable();
}
boolean enableUPN = conf.getBoolean(ADL_ENABLEUPN_FOR_OWNERGROUP_KEY,
ADL_ENABLEUPN_FOR_OWNERGROUP_DEFAULT);
oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
UserGroupRepresentation.OID;
}
/**
* This method is provided for convenience for derived classes to define
* custom {@link AzureADTokenProvider} instance.
*
* In order to ensure secure hadoop infrastructure and user context for which
* respective {@link AdlFileSystem} instance is initialized,
* Loading {@link AzureADTokenProvider} is not sufficient.
*
* The order of loading {@link AzureADTokenProvider} is to first invoke
* {@link #getCustomAccessTokenProvider(Configuration)}, If method return null
* which means no implementation provided by derived classes, then
* configuration object is loaded to retrieve token configuration as specified
* is documentation.
*
* Custom token management takes the higher precedence during initialization.
*
* @param conf Configuration object
* @return null if the no custom {@link AzureADTokenProvider} token management
* is specified.
* @throws IOException if failed to initialize token provider.
*/
protected synchronized AzureADTokenProvider getCustomAccessTokenProvider(
Configuration conf) throws IOException {
String className = getNonEmptyVal(conf, AZURE_AD_TOKEN_PROVIDER_CLASS_KEY);
Class<? extends AzureADTokenProvider> azureADTokenProviderClass =
conf.getClass(AZURE_AD_TOKEN_PROVIDER_CLASS_KEY, null,
AzureADTokenProvider.class);
if (azureADTokenProviderClass == null) {
throw new IllegalArgumentException(
"Configuration " + className + " " + "not defined/accessible.");
}
azureTokenProvider = ReflectionUtils
.newInstance(azureADTokenProviderClass, conf);
if (azureTokenProvider == null) {
throw new IllegalArgumentException("Failed to initialize " + className);
}
azureTokenProvider.initialize(conf);
return azureTokenProvider;
}
private AccessTokenProvider getAccessTokenProvider(Configuration config)
throws IOException {
Configuration conf = ProviderUtils.excludeIncompatibleCredentialProviders(
config, AdlFileSystem.class);
TokenProviderType type = conf.getEnum(
AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_KEY,
AdlConfKeys.AZURE_AD_TOKEN_PROVIDER_TYPE_DEFAULT);
switch (type) {
case RefreshToken:
tokenProvider = getConfRefreshTokenBasedTokenProvider(conf);
break;
case ClientCredential:
tokenProvider = getConfCredentialBasedTokenProvider(conf);
break;
case MSI:
tokenProvider = getMsiBasedTokenProvider(conf);
break;
case DeviceCode:
tokenProvider = getDeviceCodeTokenProvider(conf);
break;
case Custom:
default:
AzureADTokenProvider azureADTokenProvider = getCustomAccessTokenProvider(
conf);
tokenProvider = new SdkTokenProviderAdapter(azureADTokenProvider);
break;
}
return tokenProvider;
}
private AccessTokenProvider getConfCredentialBasedTokenProvider(
Configuration conf) throws IOException {
String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY);
String refreshUrl = getPasswordString(conf, AZURE_AD_REFRESH_URL_KEY);
String clientSecret = getPasswordString(conf, AZURE_AD_CLIENT_SECRET_KEY);
return new ClientCredsTokenProvider(refreshUrl, clientId, clientSecret);
}
private AccessTokenProvider getConfRefreshTokenBasedTokenProvider(
Configuration conf) throws IOException {
String clientId = getPasswordString(conf, AZURE_AD_CLIENT_ID_KEY);
String refreshToken = getPasswordString(conf, AZURE_AD_REFRESH_TOKEN_KEY);
return new RefreshTokenBasedTokenProvider(clientId, refreshToken);
}
private AccessTokenProvider getMsiBasedTokenProvider(
Configuration conf) throws IOException {
return new MsiTokenProvider(conf.getInt(MSI_PORT, -1));
}
private AccessTokenProvider getDeviceCodeTokenProvider(
Configuration conf) throws IOException {
String clientAppId = getNonEmptyVal(conf, DEVICE_CODE_CLIENT_APP_ID);
return new DeviceCodeTokenProvider(clientAppId);
}
@VisibleForTesting
AccessTokenProvider getTokenProvider() {
return tokenProvider;
}
@VisibleForTesting
AzureADTokenProvider getAzureTokenProvider() {
return azureTokenProvider;
}
/**
* Constructing home directory locally is fine as long as Hadoop
* local user name and ADL user name relationship story is not fully baked
* yet.
*
* @return Hadoop local user home directory.
*/
@Override
public Path getHomeDirectory() {
return makeQualified(new Path("/user/" + userName));
}
/**
* Create call semantic is handled differently in case of ADL. Create
* semantics is translated to Create/Append
* semantics.
* 1. No dedicated connection to server.
* 2. Buffering is locally done, Once buffer is full or flush is invoked on
* the by the caller. All the pending
* data is pushed to ADL as APPEND operation code.
* 3. On close - Additional call is send to server to close the stream, and
* release lock from the stream.
*
* Necessity of Create/Append semantics is
* 1. ADL backend server does not allow idle connection for longer duration
* . In case of slow writer scenario,
* observed connection timeout/Connection reset causing occasional job
* failures.
* 2. Performance boost to jobs which are slow writer, avoided network latency
* 3. ADL equally better performing with multiple of 4MB chunk as append
* calls.
*
* @param f File path
* @param permission Access permission for the newly created file
* @param overwrite Remove existing file and recreate new one if true
* otherwise throw error if file exist
* @param bufferSize Buffer size, ADL backend does not honour
* @param replication Replication count, ADL backend does not honour
* @param blockSize Block size, ADL backend does not honour
* @param progress Progress indicator
* @return FSDataOutputStream OutputStream on which application can push
* stream of bytes
* @throws IOException when system error, internal server error or user error
*/
@Override
public FSDataOutputStream create(Path f, FsPermission permission,
boolean overwrite, int bufferSize, short replication, long blockSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
IfExists overwriteRule = overwrite ? IfExists.OVERWRITE : IfExists.FAIL;
return new FSDataOutputStream(new AdlFsOutputStream(adlClient
.createFile(toRelativeFilePath(f), overwriteRule,
Integer.toOctalString(applyUMask(permission).toShort()), true),
getConf()), this.statistics);
}
/**
* Opens an FSDataOutputStream at the indicated Path with write-progress
* reporting. Same as create(), except fails if parent directory doesn't
* already exist.
*
* @param f the file name to open
* @param permission Access permission for the newly created file
* @param flags {@link CreateFlag}s to use for this stream.
* @param bufferSize the size of the buffer to be used. ADL backend does
* not honour
* @param replication required block replication for the file. ADL backend
* does not honour
* @param blockSize Block size, ADL backend does not honour
* @param progress Progress indicator
* @throws IOException when system error, internal server error or user error
* @see #setPermission(Path, FsPermission)
* @deprecated API only for 0.20-append
*/
@Override
public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
EnumSet<CreateFlag> flags, int bufferSize, short replication,
long blockSize, Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
IfExists overwriteRule = IfExists.FAIL;
for (CreateFlag flag : flags) {
if (flag == CreateFlag.OVERWRITE) {
overwriteRule = IfExists.OVERWRITE;
break;
}
}
return new FSDataOutputStream(new AdlFsOutputStream(adlClient
.createFile(toRelativeFilePath(f), overwriteRule,
Integer.toOctalString(applyUMask(permission).toShort()), false),
getConf()), this.statistics);
}
/**
* Append to an existing file (optional operation).
*
* @param f the existing file to be appended.
* @param bufferSize the size of the buffer to be used. ADL backend does
* not honour
* @param progress Progress indicator
* @throws IOException when system error, internal server error or user error
*/
@Override
public FSDataOutputStream append(Path f, int bufferSize,
Progressable progress) throws IOException {
statistics.incrementWriteOps(1);
return new FSDataOutputStream(
new AdlFsOutputStream(adlClient.getAppendStream(toRelativeFilePath(f)),
getConf()), this.statistics);
}
/**
* Azure data lake does not support user configuration for data replication
* hence not leaving system to query on
* azure data lake.
*
* Stub implementation
*
* @param p Not honoured
* @param replication Not honoured
* @return True hard coded since ADL file system does not support
* replication configuration
* @throws IOException No exception would not thrown in this case however
* aligning with parent api definition.
*/
@Override
public boolean setReplication(final Path p, final short replication)
throws IOException {
statistics.incrementWriteOps(1);
return true;
}
/**
* Open call semantic is handled differently in case of ADL. Instead of
* network stream is returned to the user,
* Overridden FsInputStream is returned.
*
* @param f File path
* @param buffersize Buffer size, Not honoured
* @return FSDataInputStream InputStream on which application can read
* stream of bytes
* @throws IOException when system error, internal server error or user error
*/
@Override
public FSDataInputStream open(final Path f, final int buffersize)
throws IOException {
statistics.incrementReadOps(1);
return new FSDataInputStream(
new AdlFsInputStream(adlClient.getReadStream(toRelativeFilePath(f)),
statistics, getConf()));
}
/**
* Return a file status object that represents the path.
*
* @param f The path we want information from
* @return a FileStatus object
* @throws IOException when the path does not exist or any other error;
* IOException see specific implementation
*/
@Override
public FileStatus getFileStatus(final Path f) throws IOException {
statistics.incrementReadOps(1);
DirectoryEntry entry =
adlClient.getDirectoryEntry(toRelativeFilePath(f), oidOrUpn);
return toFileStatus(entry, f);
}
/**
* List the statuses of the files/directories in the given path if the path is
* a directory.
*
* @param f given path
* @return the statuses of the files/directories in the given patch
* @throws IOException when the path does not exist or any other error;
* IOException see specific implementation
*/
@Override
public FileStatus[] listStatus(final Path f) throws IOException {
statistics.incrementReadOps(1);
List<DirectoryEntry> entries =
adlClient.enumerateDirectory(toRelativeFilePath(f), oidOrUpn);
return toFileStatuses(entries, f);
}
/**
* Renames Path src to Path dst. Can take place on local fs
* or remote DFS.
*
* ADLS support POSIX standard for rename operation.
*
* @param src path to be renamed
* @param dst new path after rename
* @return true if rename is successful
* @throws IOException on failure
*/
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
statistics.incrementWriteOps(1);
if (toRelativeFilePath(src).equals("/")) {
return false;
}
return adlClient.rename(toRelativeFilePath(src), toRelativeFilePath(dst));
}
@Override
@Deprecated
public void rename(final Path src, final Path dst,
final Options.Rename... options) throws IOException {
statistics.incrementWriteOps(1);
boolean overwrite = false;
for (Rename renameOption : options) {
if (renameOption == Rename.OVERWRITE) {
overwrite = true;
break;
}
}
adlClient
.rename(toRelativeFilePath(src), toRelativeFilePath(dst), overwrite);
}
/**
* Concat existing files together.
*
* @param trg the path to the target destination.
* @param srcs the paths to the sources to use for the concatenation.
* @throws IOException when system error, internal server error or user error
*/
@Override
public void concat(final Path trg, final Path[] srcs) throws IOException {
statistics.incrementWriteOps(1);
List<String> sourcesList = new ArrayList<String>();
for (Path entry : srcs) {
sourcesList.add(toRelativeFilePath(entry));
}
adlClient.concatenateFiles(toRelativeFilePath(trg), sourcesList);
}
/**
* Delete a file.
*
* @param path the path to delete.
* @param recursive if path is a directory and set to
* true, the directory is deleted else throws an exception.
* In case of a file the recursive can be set to either
* true or false.
* @return true if delete is successful else false.
* @throws IOException when system error, internal server error or user error
*/
@Override
public boolean delete(final Path path, final boolean recursive)
throws IOException {
statistics.incrementWriteOps(1);
String relativePath = toRelativeFilePath(path);
// Delete on root directory not supported.
if (relativePath.equals("/")) {
// This is important check after recent commit
// HADOOP-12977 and HADOOP-13716 validates on root for
// 1. if root is empty and non recursive delete then return false.
// 2. if root is non empty and non recursive delete then throw exception.
if (!recursive
&& adlClient.enumerateDirectory(toRelativeFilePath(path), 1).size()
> 0) {
throw new IOException("Delete on root is not supported.");
}
return false;
}
return recursive ?
adlClient.deleteRecursive(relativePath) :
adlClient.delete(relativePath);
}
/**
* Make the given file and all non-existent parents into
* directories. Has the semantics of Unix 'mkdir -p'.
* Existence of the directory hierarchy is not an error.
*
* @param path path to create
* @param permission to apply to path
*/
@Override
public boolean mkdirs(final Path path, final FsPermission permission)
throws IOException {
statistics.incrementWriteOps(1);
return adlClient.createDirectory(toRelativeFilePath(path),
Integer.toOctalString(applyUMask(permission).toShort()));
}
private FileStatus[] toFileStatuses(final List<DirectoryEntry> entries,
final Path parent) {
FileStatus[] fileStatuses = new FileStatus[entries.size()];
int index = 0;
for (DirectoryEntry entry : entries) {
FileStatus status = toFileStatus(entry, parent);
if (!(entry.name == null || entry.name == "")) {
status.setPath(
new Path(parent.makeQualified(uri, workingDirectory), entry.name));
}
fileStatuses[index++] = status;
}
return fileStatuses;
}
private FsPermission applyUMask(FsPermission permission) {
if (permission == null) {
permission = FsPermission.getDefault();
}
return permission.applyUMask(FsPermission.getUMask(getConf()));
}
private FileStatus toFileStatus(final DirectoryEntry entry, final Path f) {
Path p = makeQualified(f);
boolean aclBit = aclBitStatus ? entry.aclBit : false;
if (overrideOwner) {
return new AdlFileStatus(entry, p, userName, "hdfs", aclBit);
}
return new AdlFileStatus(entry, p, aclBit);
}
/**
* Set owner of a path (i.e. a file or a directory).
* The parameters owner and group cannot both be null.
*
* @param path The path
* @param owner If it is null, the original username remains unchanged.
* @param group If it is null, the original groupname remains unchanged.
*/
@Override
public void setOwner(final Path path, final String owner, final String group)
throws IOException {
statistics.incrementWriteOps(1);
adlClient.setOwner(toRelativeFilePath(path), owner, group);
}
/**
* Set permission of a path.
*
* @param path The path
* @param permission Access permission
*/
@Override
public void setPermission(final Path path, final FsPermission permission)
throws IOException {
statistics.incrementWriteOps(1);
adlClient.setPermission(toRelativeFilePath(path),
Integer.toOctalString(permission.toShort()));
}
/**
* Modifies ACL entries of files and directories. This method can add new ACL
* entries or modify the permissions on existing ACL entries. All existing
* ACL entries that are not specified in this call are retained without
* changes. (Modifications are merged into the current ACL.)
*
* @param path Path to modify
* @param aclSpec List of AclEntry describing modifications
* @throws IOException if an ACL could not be modified
*/
@Override
public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
for (AclEntry aclEntry : aclSpec) {
msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
.parseAclEntry(aclEntry.toString()));
}
adlClient.modifyAclEntries(toRelativeFilePath(path), msAclEntries);
}
/**
* Removes ACL entries from files and directories. Other ACL entries are
* retained.
*
* @param path Path to modify
* @param aclSpec List of AclEntry describing entries to remove
* @throws IOException if an ACL could not be modified
*/
@Override
public void removeAclEntries(final Path path, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
for (AclEntry aclEntry : aclSpec) {
msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
.parseAclEntry(aclEntry.toString(), true));
}
adlClient.removeAclEntries(toRelativeFilePath(path), msAclEntries);
}
/**
* Removes all default ACL entries from files and directories.
*
* @param path Path to modify
* @throws IOException if an ACL could not be modified
*/
@Override
public void removeDefaultAcl(final Path path) throws IOException {
statistics.incrementWriteOps(1);
adlClient.removeDefaultAcls(toRelativeFilePath(path));
}
/**
* Removes all but the base ACL entries of files and directories. The entries
* for user, group, and others are retained for compatibility with permission
* bits.
*
* @param path Path to modify
* @throws IOException if an ACL could not be removed
*/
@Override
public void removeAcl(final Path path) throws IOException {
statistics.incrementWriteOps(1);
adlClient.removeAllAcls(toRelativeFilePath(path));
}
/**
* Fully replaces ACL of files and directories, discarding all existing
* entries.
*
* @param path Path to modify
* @param aclSpec List of AclEntry describing modifications, must include
* entries for user, group, and others for compatibility with
* permission bits.
* @throws IOException if an ACL could not be modified
*/
@Override
public void setAcl(final Path path, final List<AclEntry> aclSpec)
throws IOException {
statistics.incrementWriteOps(1);
List<com.microsoft.azure.datalake.store.acl.AclEntry> msAclEntries = new
ArrayList<com.microsoft.azure.datalake.store.acl.AclEntry>();
for (AclEntry aclEntry : aclSpec) {
msAclEntries.add(com.microsoft.azure.datalake.store.acl.AclEntry
.parseAclEntry(aclEntry.toString()));
}
adlClient.setAcl(toRelativeFilePath(path), msAclEntries);
}
/**
* Gets the ACL of a file or directory.
*
* @param path Path to get
* @return AclStatus describing the ACL of the file or directory
* @throws IOException if an ACL could not be read
*/
@Override
public AclStatus getAclStatus(final Path path) throws IOException {
statistics.incrementReadOps(1);
com.microsoft.azure.datalake.store.acl.AclStatus adlStatus =
adlClient.getAclStatus(toRelativeFilePath(path), oidOrUpn);
AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
aclStatusBuilder.owner(adlStatus.owner);
aclStatusBuilder.group(adlStatus.group);
aclStatusBuilder.setPermission(
new FsPermission(Short.valueOf(adlStatus.octalPermissions, 8)));
aclStatusBuilder.stickyBit(adlStatus.stickyBit);
String aclListString = com.microsoft.azure.datalake.store.acl.AclEntry
.aclListToString(adlStatus.aclSpec);
List<AclEntry> aclEntries = AclEntry.parseAclSpec(aclListString, true);
aclStatusBuilder.addEntries(aclEntries);
return aclStatusBuilder.build();
}
/**
* Checks if the user can access a path. The mode specifies which access
* checks to perform. If the requested permissions are granted, then the
* method returns normally. If access is denied, then the method throws an
* {@link AccessControlException}.
*
* @param path Path to check
* @param mode type of access to check
* @throws AccessControlException if access is denied
* @throws java.io.FileNotFoundException if the path does not exist
* @throws IOException see specific implementation
*/
@Override
public void access(final Path path, FsAction mode) throws IOException {
statistics.incrementReadOps(1);
if (!adlClient.checkAccess(toRelativeFilePath(path), mode.SYMBOL)) {
throw new AccessControlException("Access Denied : " + path.toString());
}
}
/**
* Return the {@link ContentSummary} of a given {@link Path}.
*
* @param f path to use
*/
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
statistics.incrementReadOps(1);
com.microsoft.azure.datalake.store.ContentSummary msSummary = adlClient
.getContentSummary(toRelativeFilePath(f));
return new Builder().length(msSummary.length)
.directoryCount(msSummary.directoryCount).fileCount(msSummary.fileCount)
.spaceConsumed(msSummary.spaceConsumed).build();
}
@VisibleForTesting
protected String getTransportScheme() {
return SECURE_TRANSPORT_SCHEME;
}
@VisibleForTesting
String toRelativeFilePath(Path path) {
return path.makeQualified(uri, workingDirectory).toUri().getPath();
}
/**
* Get the current working directory for the given file system.
*
* @return the directory pathname
*/
@Override
public Path getWorkingDirectory() {
return workingDirectory;
}
/**
* Set the current working directory for the given file system. All relative
* paths will be resolved relative to it.
*
* @param dir Working directory path.
*/
@Override
public void setWorkingDirectory(final Path dir) {
if (dir == null) {
throw new InvalidPathException("Working directory cannot be set to NULL");
}
/**
* Do not validate the scheme and URI of the passsed parameter. When Adls
* runs as additional file system, working directory set has the default
* file system scheme and uri.
*
* Found a problem during PIG execution in
* https://github.com/apache/pig/blob/branch-0
* .15/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer
* /PigInputFormat.java#L235
* However similar problem would be present in other application so
* defaulting to build working directory using relative path only.
*/
this.workingDirectory = this.makeAbsolute(dir);
}
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time.
*
* @deprecated use {@link #getDefaultBlockSize(Path)} instead
*/
@Deprecated
public long getDefaultBlockSize() {
return ADL_BLOCK_SIZE;
}
/**
* Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
*
* @param f path of file
* @return the default block size for the path's filesystem
*/
public long getDefaultBlockSize(Path f) {
return getDefaultBlockSize();
}
/**
* Get the block size.
* @param f the filename
* @return the number of bytes in a block
*/
/**
* @deprecated Use getFileStatus() instead
*/
@Deprecated
public long getBlockSize(Path f) throws IOException {
return ADL_BLOCK_SIZE;
}
/**
* Get replication.
*
* @param src file name
* @return file replication
* @deprecated Use getFileStatus() instead
*/
@Deprecated
public short getReplication(Path src) {
return ADL_REPLICATION_FACTOR;
}
private Path makeAbsolute(Path path) {
return path.isAbsolute() ? path : new Path(this.workingDirectory, path);
}
private static String getNonEmptyVal(Configuration conf, String key) {
String value = conf.get(key);
if (StringUtils.isEmpty(value)) {
throw new IllegalArgumentException(
"No value for " + key + " found in conf file.");
}
return value;
}
/**
* A wrapper of {@link Configuration#getPassword(String)}. It returns
* <code>String</code> instead of <code>char[]</code>.
*
* @param conf the configuration
* @param key the property key
* @return the password string
* @throws IOException if the password was not found
*/
private static String getPasswordString(Configuration conf, String key)
throws IOException {
char[] passchars = conf.getPassword(key);
if (passchars == null) {
throw new IOException("Password " + key + " not found");
}
return new String(passchars);
}
@VisibleForTesting
public void setUserGroupRepresentationAsUPN(boolean enableUPN) {
oidOrUpn = enableUPN ? UserGroupRepresentation.UPN :
UserGroupRepresentation.OID;
}
/**
* Gets ADL account name from ADL FQDN.
* @param accountFQDN ADL account fqdn
* @return ADL account name
*/
public static String getAccountNameFromFQDN(String accountFQDN) {
return accountFQDN.contains(".")
? accountFQDN.substring(0, accountFQDN.indexOf("."))
: accountFQDN;
}
/**
* Propagates account-specific settings into generic ADL configuration keys.
* This is done by propagating the values of the form
* {@code fs.adl.account.${account_name}.key} to
* {@code fs.adl.key}, for all values of "key"
*
* The source of the updated property is set to the key name of the account
* property, to aid in diagnostics of where things came from.
*
* Returns a new configuration. Why the clone?
* You can use the same conf for different filesystems, and the original
* values are not updated.
*
*
* @param source Source Configuration object
* @param accountName account name. Must not be empty
* @return a (potentially) patched clone of the original
*/
public static Configuration propagateAccountOptions(Configuration source,
String accountName) {
Preconditions.checkArgument(StringUtils.isNotEmpty(accountName),
"accountName");
final String accountPrefix = AZURE_AD_ACCOUNT_PREFIX + accountName +'.';
LOG.debug("Propagating entries under {}", accountPrefix);
final Configuration dest = new Configuration(source);
for (Map.Entry<String, String> entry : source) {
final String key = entry.getKey();
// get the (unexpanded) value.
final String value = entry.getValue();
if (!key.startsWith(accountPrefix) || accountPrefix.equals(key)) {
continue;
}
// there's a account prefix, so strip it
final String stripped = key.substring(accountPrefix.length());
// propagate the value, building a new origin field.
// to track overwrites, the generic key is overwritten even if
// already matches the new one.
String origin = "[" + StringUtils.join(
source.getPropertySources(key), ", ") +"]";
final String generic = AZURE_AD_PREFIX + stripped;
LOG.debug("Updating {} from {}", generic, origin);
dest.set(generic, value, key + " via " + origin);
}
return dest;
}
}