| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.azurebfs; |
| |
| import java.io.File; |
| import java.io.FileNotFoundException; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.net.HttpURLConnection; |
| import java.net.URI; |
| import java.net.URISyntaxException; |
| import java.util.List; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.Preconditions; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.commons.lang3.ArrayUtils; |
| import org.apache.hadoop.fs.azurebfs.services.AbfsClient; |
| import org.apache.hadoop.fs.azurebfs.services.AbfsClientThrottlingIntercept; |
| import org.apache.hadoop.classification.InterfaceStability; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.BlockLocation; |
| import org.apache.hadoop.fs.CreateFlag; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FSDataOutputStream; |
| import org.apache.hadoop.fs.FileAlreadyExistsException; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.fs.PathIOException; |
| import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; |
| import org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations; |
| import org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.FileSystemOperationUnhandledException; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriAuthorityException; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException; |
| import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; |
| import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizationException; |
| import org.apache.hadoop.fs.azurebfs.extensions.AbfsAuthorizer; |
| import org.apache.hadoop.fs.azurebfs.security.AbfsDelegationTokenManager; |
| import org.apache.hadoop.fs.permission.AclEntry; |
| import org.apache.hadoop.fs.permission.AclStatus; |
| import org.apache.hadoop.fs.permission.FsAction; |
| import org.apache.hadoop.fs.permission.FsPermission; |
| import org.apache.hadoop.security.AccessControlException; |
| import org.apache.hadoop.security.token.Token; |
| import org.apache.hadoop.security.UserGroupInformation; |
| import org.apache.hadoop.util.Progressable; |
| |
| /** |
| * A {@link org.apache.hadoop.fs.FileSystem} for reading and writing files stored on <a |
| * href="http://store.azure.com/">Windows Azure</a> |
| */ |
| @InterfaceStability.Evolving |
| public class AzureBlobFileSystem extends FileSystem { |
| public static final Logger LOG = LoggerFactory.getLogger(AzureBlobFileSystem.class); |
| private URI uri; |
| private Path workingDir; |
| private AzureBlobFileSystemStore abfsStore; |
| private boolean isClosed; |
| |
| private boolean delegationTokenEnabled = false; |
| private AbfsDelegationTokenManager delegationTokenManager; |
| private AbfsAuthorizer authorizer; |
| |
| @Override |
| public void initialize(URI uri, Configuration configuration) |
| throws IOException { |
| uri = ensureAuthority(uri, configuration); |
| super.initialize(uri, configuration); |
| setConf(configuration); |
| |
| LOG.debug("Initializing AzureBlobFileSystem for {}", uri); |
| |
| this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority()); |
| this.abfsStore = new AzureBlobFileSystemStore(uri, this.isSecureScheme(), configuration); |
| final AbfsConfiguration abfsConfiguration = abfsStore.getAbfsConfiguration(); |
| |
| this.setWorkingDirectory(this.getHomeDirectory()); |
| |
| if (abfsConfiguration.getCreateRemoteFileSystemDuringInitialization()) { |
| if (this.tryGetFileStatus(new Path(AbfsHttpConstants.ROOT_PATH)) == null) { |
| try { |
| this.createFileSystem(); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(null, ex, AzureServiceErrorCode.FILE_SYSTEM_ALREADY_EXISTS); |
| } |
| } |
| } |
| |
| if (UserGroupInformation.isSecurityEnabled()) { |
| this.delegationTokenEnabled = abfsConfiguration.isDelegationTokenManagerEnabled(); |
| |
| if (this.delegationTokenEnabled) { |
| LOG.debug("Initializing DelegationTokenManager for {}", uri); |
| this.delegationTokenManager = abfsConfiguration.getDelegationTokenManager(); |
| } |
| } |
| |
| AbfsClientThrottlingIntercept.initializeSingleton(abfsConfiguration.isAutoThrottlingEnabled()); |
| |
| // Initialize ABFS authorizer |
| // |
| this.authorizer = abfsConfiguration.getAbfsAuthorizer(); |
| } |
| |
| @Override |
| public String toString() { |
| final StringBuilder sb = new StringBuilder( |
| "AzureBlobFileSystem{"); |
| sb.append("uri=").append(uri); |
| sb.append(", user='").append(abfsStore.getUser()).append('\''); |
| sb.append(", primaryUserGroup='").append(abfsStore.getPrimaryGroup()).append('\''); |
| sb.append('}'); |
| return sb.toString(); |
| } |
| |
| public boolean isSecureScheme() { |
| return false; |
| } |
| |
| @Override |
| public URI getUri() { |
| return this.uri; |
| } |
| |
| @Override |
| public FSDataInputStream open(final Path path, final int bufferSize) throws IOException { |
| LOG.debug("AzureBlobFileSystem.open path: {} bufferSize: {}", path, bufferSize); |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.READ, qualifiedPath); |
| |
| try { |
| InputStream inputStream = abfsStore.openFileForRead(qualifiedPath, statistics); |
| return new FSDataInputStream(inputStream); |
| } catch(AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| return null; |
| } |
| } |
| |
| @Override |
| public FSDataOutputStream create(final Path f, final FsPermission permission, final boolean overwrite, final int bufferSize, |
| final short replication, final long blockSize, final Progressable progress) throws IOException { |
| LOG.debug("AzureBlobFileSystem.create path: {} permission: {} overwrite: {} bufferSize: {}", |
| f, |
| permission, |
| overwrite, |
| blockSize); |
| |
| trailingPeriodCheck(f); |
| |
| Path qualifiedPath = makeQualified(f); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| OutputStream outputStream = abfsStore.createFile(qualifiedPath, overwrite, |
| permission == null ? FsPermission.getFileDefault() : permission, FsPermission.getUMask(getConf())); |
| return new FSDataOutputStream(outputStream, statistics); |
| } catch(AzureBlobFileSystemException ex) { |
| checkException(f, ex); |
| return null; |
| } |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission, |
| final boolean overwrite, final int bufferSize, final short replication, final long blockSize, |
| final Progressable progress) throws IOException { |
| |
| final Path parent = f.getParent(); |
| final FileStatus parentFileStatus = tryGetFileStatus(parent); |
| |
| if (parentFileStatus == null) { |
| throw new FileNotFoundException("Cannot create file " |
| + f.getName() + " because parent folder does not exist."); |
| } |
| |
| return create(f, permission, overwrite, bufferSize, replication, blockSize, progress); |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public FSDataOutputStream createNonRecursive(final Path f, final FsPermission permission, |
| final EnumSet<CreateFlag> flags, final int bufferSize, final short replication, final long blockSize, |
| final Progressable progress) throws IOException { |
| |
| // Check if file should be appended or overwritten. Assume that the file |
| // is overwritten on if the CREATE and OVERWRITE create flags are set. |
| final EnumSet<CreateFlag> createflags = |
| EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE); |
| final boolean overwrite = flags.containsAll(createflags); |
| |
| // Delegate the create non-recursive call. |
| return this.createNonRecursive(f, permission, overwrite, |
| bufferSize, replication, blockSize, progress); |
| } |
| |
| @Override |
| @SuppressWarnings("deprecation") |
| public FSDataOutputStream createNonRecursive(final Path f, |
| final boolean overwrite, final int bufferSize, final short replication, final long blockSize, |
| final Progressable progress) throws IOException { |
| return this.createNonRecursive(f, FsPermission.getFileDefault(), |
| overwrite, bufferSize, replication, blockSize, progress); |
| } |
| |
| @Override |
| public FSDataOutputStream append(final Path f, final int bufferSize, final Progressable progress) throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.append path: {} bufferSize: {}", |
| f.toString(), |
| bufferSize); |
| |
| Path qualifiedPath = makeQualified(f); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| OutputStream outputStream = abfsStore.openFileForWrite(qualifiedPath, false); |
| return new FSDataOutputStream(outputStream, statistics); |
| } catch(AzureBlobFileSystemException ex) { |
| checkException(f, ex); |
| return null; |
| } |
| } |
| |
| public boolean rename(final Path src, final Path dst) throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.rename src: {} dst: {}", src.toString(), dst.toString()); |
| |
| trailingPeriodCheck(dst); |
| |
| Path parentFolder = src.getParent(); |
| if (parentFolder == null) { |
| return false; |
| } |
| Path qualifiedSrcPath = makeQualified(src); |
| Path qualifiedDstPath = makeQualified(dst); |
| |
| // rename under same folder; |
| if(makeQualified(parentFolder).equals(qualifiedDstPath)) { |
| return tryGetFileStatus(qualifiedSrcPath) != null; |
| } |
| |
| FileStatus dstFileStatus = null; |
| if (qualifiedSrcPath.equals(qualifiedDstPath)) { |
| // rename to itself |
| // - if it doesn't exist, return false |
| // - if it is file, return true |
| // - if it is dir, return false. |
| dstFileStatus = tryGetFileStatus(qualifiedDstPath); |
| if (dstFileStatus == null) { |
| return false; |
| } |
| return dstFileStatus.isDirectory() ? false : true; |
| } |
| |
| // Non-HNS account need to check dst status on driver side. |
| if (!abfsStore.getIsNamespaceEnabled() && dstFileStatus == null) { |
| dstFileStatus = tryGetFileStatus(qualifiedDstPath); |
| } |
| |
| try { |
| String sourceFileName = src.getName(); |
| Path adjustedDst = dst; |
| |
| if (dstFileStatus != null) { |
| if (!dstFileStatus.isDirectory()) { |
| return qualifiedSrcPath.equals(qualifiedDstPath); |
| } |
| adjustedDst = new Path(dst, sourceFileName); |
| } |
| |
| qualifiedDstPath = makeQualified(adjustedDst); |
| performAbfsAuthCheck(FsAction.READ_WRITE, qualifiedSrcPath, qualifiedDstPath); |
| |
| abfsStore.rename(qualifiedSrcPath, qualifiedDstPath); |
| return true; |
| } catch(AzureBlobFileSystemException ex) { |
| checkException( |
| src, |
| ex, |
| AzureServiceErrorCode.PATH_ALREADY_EXISTS, |
| AzureServiceErrorCode.INVALID_RENAME_SOURCE_PATH, |
| AzureServiceErrorCode.SOURCE_PATH_NOT_FOUND, |
| AzureServiceErrorCode.INVALID_SOURCE_OR_DESTINATION_RESOURCE_TYPE, |
| AzureServiceErrorCode.RENAME_DESTINATION_PARENT_PATH_NOT_FOUND, |
| AzureServiceErrorCode.INTERNAL_OPERATION_ABORT); |
| return false; |
| } |
| |
| } |
| |
| @Override |
| public boolean delete(final Path f, final boolean recursive) throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.delete path: {} recursive: {}", f.toString(), recursive); |
| |
| Path qualifiedPath = makeQualified(f); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| if (f.isRoot()) { |
| if (!recursive) { |
| return false; |
| } |
| |
| return deleteRoot(); |
| } |
| |
| try { |
| abfsStore.delete(qualifiedPath, recursive); |
| return true; |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(f, ex, AzureServiceErrorCode.PATH_NOT_FOUND); |
| return false; |
| } |
| |
| } |
| |
| @Override |
| public FileStatus[] listStatus(final Path f) throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.listStatus path: {}", f.toString()); |
| |
| Path qualifiedPath = makeQualified(f); |
| performAbfsAuthCheck(FsAction.READ, qualifiedPath); |
| |
| try { |
| FileStatus[] result = abfsStore.listStatus(qualifiedPath); |
| return result; |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(f, ex); |
| return null; |
| } |
| } |
| |
| /** |
| * Performs a check for (.) until root in the path to throw an exception. |
| * The purpose is to differentiate between dir/dir1 and dir/dir1. |
| * Without the exception the behavior seen is dir1. will appear |
| * to be present without it's actual creation as dir/dir1 and dir/dir1. are |
| * treated as identical. |
| * @param path the path to be checked for trailing period (.) |
| * @throws IllegalArgumentException if the path has a trailing period (.) |
| */ |
| private void trailingPeriodCheck(Path path) throws IllegalArgumentException { |
| while (!path.isRoot()){ |
| String pathToString = path.toString(); |
| if (pathToString.length() != 0) { |
| if (pathToString.charAt(pathToString.length() - 1) == '.') { |
| throw new IllegalArgumentException( |
| "ABFS does not allow files or directories to end with a dot."); |
| } |
| path = path.getParent(); |
| } |
| else { |
| break; |
| } |
| } |
| } |
| |
| @Override |
| public boolean mkdirs(final Path f, final FsPermission permission) throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.mkdirs path: {} permissions: {}", f, permission); |
| |
| trailingPeriodCheck(f); |
| |
| final Path parentFolder = f.getParent(); |
| if (parentFolder == null) { |
| // Cannot create root |
| return true; |
| } |
| |
| Path qualifiedPath = makeQualified(f); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.createDirectory(qualifiedPath, permission == null ? FsPermission.getDirDefault() : permission, |
| FsPermission.getUMask(getConf())); |
| return true; |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(f, ex, AzureServiceErrorCode.PATH_ALREADY_EXISTS); |
| return true; |
| } |
| } |
| |
| @Override |
| public synchronized void close() throws IOException { |
| if (isClosed) { |
| return; |
| } |
| |
| super.close(); |
| LOG.debug("AzureBlobFileSystem.close"); |
| this.isClosed = true; |
| } |
| |
| @Override |
| public FileStatus getFileStatus(final Path f) throws IOException { |
| LOG.debug("AzureBlobFileSystem.getFileStatus path: {}", f); |
| |
| Path qualifiedPath = makeQualified(f); |
| performAbfsAuthCheck(FsAction.READ, qualifiedPath); |
| |
| try { |
| return abfsStore.getFileStatus(qualifiedPath); |
| } catch(AzureBlobFileSystemException ex) { |
| checkException(f, ex); |
| return null; |
| } |
| } |
| |
| /** |
| * Qualify a path to one which uses this FileSystem and, if relative, |
| * made absolute. |
| * @param path to qualify. |
| * @return this path if it contains a scheme and authority and is absolute, or |
| * a new path that includes a path and authority and is fully qualified |
| * @see Path#makeQualified(URI, Path) |
| * @throws IllegalArgumentException if the path has a schema/URI different |
| * from this FileSystem. |
| */ |
| @Override |
| public Path makeQualified(Path path) { |
| // To support format: abfs://{dfs.nameservices}/file/path, |
| // path need to be first converted to URI, then get the raw path string, |
| // during which {dfs.nameservices} will be omitted. |
| if (path != null) { |
| String uriPath = path.toUri().getPath(); |
| path = uriPath.isEmpty() ? path : new Path(uriPath); |
| } |
| return super.makeQualified(path); |
| } |
| |
| |
| @Override |
| public Path getWorkingDirectory() { |
| return this.workingDir; |
| } |
| |
| @Override |
| public void setWorkingDirectory(final Path newDir) { |
| if (newDir.isAbsolute()) { |
| this.workingDir = newDir; |
| } else { |
| this.workingDir = new Path(workingDir, newDir); |
| } |
| } |
| |
| @Override |
| public String getScheme() { |
| return FileSystemUriSchemes.ABFS_SCHEME; |
| } |
| |
| @Override |
| public Path getHomeDirectory() { |
| return makeQualified(new Path( |
| FileSystemConfigurations.USER_HOME_DIRECTORY_PREFIX |
| + "/" + abfsStore.getUser())); |
| } |
| |
| /** |
| * Return an array containing hostnames, offset and size of |
| * portions of the given file. For ABFS we'll just lie and give |
| * fake hosts to make sure we get many splits in MR jobs. |
| */ |
| @Override |
| public BlockLocation[] getFileBlockLocations(FileStatus file, |
| long start, long len) { |
| if (file == null) { |
| return null; |
| } |
| |
| if ((start < 0) || (len < 0)) { |
| throw new IllegalArgumentException("Invalid start or len parameter"); |
| } |
| |
| if (file.getLen() < start) { |
| return new BlockLocation[0]; |
| } |
| final String blobLocationHost = abfsStore.getAbfsConfiguration().getAzureBlockLocationHost(); |
| |
| final String[] name = { blobLocationHost }; |
| final String[] host = { blobLocationHost }; |
| long blockSize = file.getBlockSize(); |
| if (blockSize <= 0) { |
| throw new IllegalArgumentException( |
| "The block size for the given file is not a positive number: " |
| + blockSize); |
| } |
| int numberOfLocations = (int) (len / blockSize) |
| + ((len % blockSize == 0) ? 0 : 1); |
| BlockLocation[] locations = new BlockLocation[numberOfLocations]; |
| for (int i = 0; i < locations.length; i++) { |
| long currentOffset = start + (i * blockSize); |
| long currentLength = Math.min(blockSize, start + len - currentOffset); |
| locations[i] = new BlockLocation(name, host, currentOffset, currentLength); |
| } |
| |
| return locations; |
| } |
| |
| @Override |
| protected void finalize() throws Throwable { |
| LOG.debug("finalize() called."); |
| close(); |
| super.finalize(); |
| } |
| |
| /** |
| * Get the username of the FS. |
| * @return the short name of the user who instantiated the FS |
| */ |
| public String getOwnerUser() { |
| return abfsStore.getUser(); |
| } |
| |
| /** |
| * Get the group name of the owner of the FS. |
| * @return primary group name |
| */ |
| public String getOwnerUserPrimaryGroup() { |
| return abfsStore.getPrimaryGroup(); |
| } |
| |
| private boolean deleteRoot() throws IOException { |
| LOG.debug("Deleting root content"); |
| |
| final ExecutorService executorService = Executors.newFixedThreadPool(10); |
| |
| try { |
| final FileStatus[] ls = listStatus(makeQualified(new Path(File.separator))); |
| final ArrayList<Future> deleteTasks = new ArrayList<>(); |
| for (final FileStatus fs : ls) { |
| final Future deleteTask = executorService.submit(new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| delete(fs.getPath(), fs.isDirectory()); |
| return null; |
| } |
| }); |
| deleteTasks.add(deleteTask); |
| } |
| |
| for (final Future deleteTask : deleteTasks) { |
| execute("deleteRoot", new Callable<Void>() { |
| @Override |
| public Void call() throws Exception { |
| deleteTask.get(); |
| return null; |
| } |
| }); |
| } |
| } |
| finally { |
| executorService.shutdownNow(); |
| } |
| |
| return true; |
| } |
| |
| /** |
| * Set owner of a path (i.e. a file or a directory). |
| * The parameters owner and group cannot both be null. |
| * |
| * @param path The path |
| * @param owner If it is null, the original username remains unchanged. |
| * @param group If it is null, the original groupname remains unchanged. |
| */ |
| @Override |
| public void setOwner(final Path path, final String owner, final String group) |
| throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.setOwner path: {}", path); |
| if (!getIsNamespaceEnabled()) { |
| super.setOwner(path, owner, group); |
| return; |
| } |
| |
| if ((owner == null || owner.isEmpty()) && (group == null || group.isEmpty())) { |
| throw new IllegalArgumentException("A valid owner or group must be specified."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.setOwner(qualifiedPath, |
| owner, |
| group); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Set permission of a path. |
| * |
| * @param path The path |
| * @param permission Access permission |
| */ |
| @Override |
| public void setPermission(final Path path, final FsPermission permission) |
| throws IOException { |
| LOG.debug("AzureBlobFileSystem.setPermission path: {}", path); |
| if (!getIsNamespaceEnabled()) { |
| super.setPermission(path, permission); |
| return; |
| } |
| |
| if (permission == null) { |
| throw new IllegalArgumentException("The permission can't be null"); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.setPermission(qualifiedPath, |
| permission); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Modifies ACL entries of files and directories. This method can add new ACL |
| * entries or modify the permissions on existing ACL entries. All existing |
| * ACL entries that are not specified in this call are retained without |
| * changes. (Modifications are merged into the current ACL.) |
| * |
| * @param path Path to modify |
| * @param aclSpec List of AbfsAclEntry describing modifications |
| * @throws IOException if an ACL could not be modified |
| */ |
| @Override |
| public void modifyAclEntries(final Path path, final List<AclEntry> aclSpec) |
| throws IOException { |
| LOG.debug("AzureBlobFileSystem.modifyAclEntries path: {}", path.toString()); |
| |
| if (!getIsNamespaceEnabled()) { |
| throw new UnsupportedOperationException( |
| "modifyAclEntries is only supported by storage accounts with the " |
| + "hierarchical namespace enabled."); |
| } |
| |
| if (aclSpec == null || aclSpec.isEmpty()) { |
| throw new IllegalArgumentException("The value of the aclSpec parameter is invalid."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.modifyAclEntries(qualifiedPath, |
| aclSpec); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Removes ACL entries from files and directories. Other ACL entries are |
| * retained. |
| * |
| * @param path Path to modify |
| * @param aclSpec List of AclEntry describing entries to remove |
| * @throws IOException if an ACL could not be modified |
| */ |
| @Override |
| public void removeAclEntries(final Path path, final List<AclEntry> aclSpec) |
| throws IOException { |
| LOG.debug("AzureBlobFileSystem.removeAclEntries path: {}", path); |
| |
| if (!getIsNamespaceEnabled()) { |
| throw new UnsupportedOperationException( |
| "removeAclEntries is only supported by storage accounts with the " |
| + "hierarchical namespace enabled."); |
| } |
| |
| if (aclSpec == null || aclSpec.isEmpty()) { |
| throw new IllegalArgumentException("The aclSpec argument is invalid."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.removeAclEntries(qualifiedPath, aclSpec); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Removes all default ACL entries from files and directories. |
| * |
| * @param path Path to modify |
| * @throws IOException if an ACL could not be modified |
| */ |
| @Override |
| public void removeDefaultAcl(final Path path) throws IOException { |
| LOG.debug("AzureBlobFileSystem.removeDefaultAcl path: {}", path); |
| |
| if (!getIsNamespaceEnabled()) { |
| throw new UnsupportedOperationException( |
| "removeDefaultAcl is only supported by storage accounts with the " |
| + "hierarchical namespace enabled."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.removeDefaultAcl(qualifiedPath); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Removes all but the base ACL entries of files and directories. The entries |
| * for user, group, and others are retained for compatibility with permission |
| * bits. |
| * |
| * @param path Path to modify |
| * @throws IOException if an ACL could not be removed |
| */ |
| @Override |
| public void removeAcl(final Path path) throws IOException { |
| LOG.debug("AzureBlobFileSystem.removeAcl path: {}", path); |
| |
| if (!getIsNamespaceEnabled()) { |
| throw new UnsupportedOperationException( |
| "removeAcl is only supported by storage accounts with the " |
| + "hierarchical namespace enabled."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.removeAcl(qualifiedPath); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Fully replaces ACL of files and directories, discarding all existing |
| * entries. |
| * |
| * @param path Path to modify |
| * @param aclSpec List of AclEntry describing modifications, must include |
| * entries for user, group, and others for compatibility with |
| * permission bits. |
| * @throws IOException if an ACL could not be modified |
| */ |
| @Override |
| public void setAcl(final Path path, final List<AclEntry> aclSpec) |
| throws IOException { |
| LOG.debug("AzureBlobFileSystem.setAcl path: {}", path); |
| |
| if (!getIsNamespaceEnabled()) { |
| throw new UnsupportedOperationException( |
| "setAcl is only supported by storage accounts with the hierarchical " |
| + "namespace enabled."); |
| } |
| |
| if (aclSpec == null || aclSpec.size() == 0) { |
| throw new IllegalArgumentException("The aclSpec argument is invalid."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.WRITE, qualifiedPath); |
| |
| try { |
| abfsStore.setAcl(qualifiedPath, aclSpec); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| } |
| } |
| |
| /** |
| * Gets the ACL of a file or directory. |
| * |
| * @param path Path to get |
| * @return AbfsAclStatus describing the ACL of the file or directory |
| * @throws IOException if an ACL could not be read |
| */ |
| @Override |
| public AclStatus getAclStatus(final Path path) throws IOException { |
| LOG.debug("AzureBlobFileSystem.getAclStatus path: {}", path.toString()); |
| |
| if (!getIsNamespaceEnabled()) { |
| throw new UnsupportedOperationException( |
| "getAclStatus is only supported by storage account with the " |
| + "hierarchical namespace enabled."); |
| } |
| |
| Path qualifiedPath = makeQualified(path); |
| performAbfsAuthCheck(FsAction.READ, qualifiedPath); |
| |
| try { |
| return abfsStore.getAclStatus(qualifiedPath); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(path, ex); |
| return null; |
| } |
| } |
| |
| /** |
| * Checks if the user can access a path. The mode specifies which access |
| * checks to perform. If the requested permissions are granted, then the |
| * method returns normally. If access is denied, then the method throws an |
| * {@link AccessControlException}. |
| * |
| * @param path Path to check |
| * @param mode type of access to check |
| * @throws AccessControlException if access is denied |
| * @throws java.io.FileNotFoundException if the path does not exist |
| * @throws IOException see specific implementation |
| */ |
| @Override |
| public void access(final Path path, final FsAction mode) throws IOException { |
| LOG.debug("AzureBlobFileSystem.access path : {}, mode : {}", path, mode); |
| Path qualifiedPath = makeQualified(path); |
| try { |
| this.abfsStore.access(qualifiedPath, mode); |
| } catch (AzureBlobFileSystemException ex) { |
| checkCheckAccessException(path, ex); |
| } |
| } |
| |
| private FileStatus tryGetFileStatus(final Path f) { |
| try { |
| return getFileStatus(f); |
| } catch (IOException ex) { |
| LOG.debug("File not found {}", f); |
| return null; |
| } |
| } |
| |
| private boolean fileSystemExists() throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.fileSystemExists uri: {}", uri); |
| try { |
| abfsStore.getFilesystemProperties(); |
| } catch (AzureBlobFileSystemException ex) { |
| try { |
| checkException(null, ex); |
| // Because HEAD request won't contain message body, |
| // there is not way to get the storage error code |
| // workaround here is to check its status code. |
| } catch (FileNotFoundException e) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private void createFileSystem() throws IOException { |
| LOG.debug( |
| "AzureBlobFileSystem.createFileSystem uri: {}", uri); |
| try { |
| abfsStore.createFilesystem(); |
| } catch (AzureBlobFileSystemException ex) { |
| checkException(null, ex); |
| } |
| } |
| |
| private URI ensureAuthority(URI uri, final Configuration conf) { |
| |
| Preconditions.checkNotNull(uri, "uri"); |
| |
| if (uri.getAuthority() == null) { |
| final URI defaultUri = FileSystem.getDefaultUri(conf); |
| |
| if (defaultUri != null && isAbfsScheme(defaultUri.getScheme())) { |
| try { |
| // Reconstruct the URI with the authority from the default URI. |
| uri = new URI( |
| uri.getScheme(), |
| defaultUri.getAuthority(), |
| uri.getPath(), |
| uri.getQuery(), |
| uri.getFragment()); |
| } catch (URISyntaxException e) { |
| // This should never happen. |
| throw new IllegalArgumentException(new InvalidUriException(uri.toString())); |
| } |
| } |
| } |
| |
| if (uri.getAuthority() == null) { |
| throw new IllegalArgumentException(new InvalidUriAuthorityException(uri.toString())); |
| } |
| |
| return uri; |
| } |
| |
| private boolean isAbfsScheme(final String scheme) { |
| if (scheme == null) { |
| return false; |
| } |
| |
| if (scheme.equals(FileSystemUriSchemes.ABFS_SCHEME) |
| || scheme.equals(FileSystemUriSchemes.ABFS_SECURE_SCHEME)) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| @VisibleForTesting |
| <T> FileSystemOperation<T> execute( |
| final String scopeDescription, |
| final Callable<T> callableFileOperation) throws IOException { |
| return execute(scopeDescription, callableFileOperation, null); |
| } |
| |
| @VisibleForTesting |
| <T> FileSystemOperation<T> execute( |
| final String scopeDescription, |
| final Callable<T> callableFileOperation, |
| T defaultResultValue) throws IOException { |
| |
| try { |
| final T executionResult = callableFileOperation.call(); |
| return new FileSystemOperation<>(executionResult, null); |
| } catch (AbfsRestOperationException abfsRestOperationException) { |
| return new FileSystemOperation<>(defaultResultValue, abfsRestOperationException); |
| } catch (AzureBlobFileSystemException azureBlobFileSystemException) { |
| throw new IOException(azureBlobFileSystemException); |
| } catch (Exception exception) { |
| if (exception instanceof ExecutionException) { |
| exception = (Exception) getRootCause(exception); |
| } |
| final FileSystemOperationUnhandledException fileSystemOperationUnhandledException |
| = new FileSystemOperationUnhandledException(exception); |
| throw new IOException(fileSystemOperationUnhandledException); |
| } |
| } |
| |
| private void checkCheckAccessException(final Path path, |
| final AzureBlobFileSystemException exception) throws IOException { |
| if (exception instanceof AbfsRestOperationException) { |
| AbfsRestOperationException ere = (AbfsRestOperationException) exception; |
| if (ere.getStatusCode() == HttpURLConnection.HTTP_FORBIDDEN) { |
| throw (IOException) new AccessControlException(ere.getMessage()) |
| .initCause(exception); |
| } |
| } |
| checkException(path, exception); |
| } |
| |
| /** |
| * Given a path and exception, choose which IOException subclass |
| * to create. |
| * Will return if and only iff the error code is in the list of allowed |
| * error codes. |
| * @param path path of operation triggering exception; may be null |
| * @param exception the exception caught |
| * @param allowedErrorCodesList varargs list of error codes. |
| * @throws IOException if the exception error code is not on the allowed list. |
| */ |
| private void checkException(final Path path, |
| final AzureBlobFileSystemException exception, |
| final AzureServiceErrorCode... allowedErrorCodesList) throws IOException { |
| if (exception instanceof AbfsRestOperationException) { |
| AbfsRestOperationException ere = (AbfsRestOperationException) exception; |
| |
| if (ArrayUtils.contains(allowedErrorCodesList, ere.getErrorCode())) { |
| return; |
| } |
| int statusCode = ere.getStatusCode(); |
| |
| //AbfsRestOperationException.getMessage() contains full error info including path/uri. |
| if (statusCode == HttpURLConnection.HTTP_NOT_FOUND) { |
| throw (IOException) new FileNotFoundException(ere.getMessage()) |
| .initCause(exception); |
| } else if (statusCode == HttpURLConnection.HTTP_CONFLICT) { |
| throw (IOException) new FileAlreadyExistsException(ere.getMessage()) |
| .initCause(exception); |
| } else { |
| throw ere; |
| } |
| } else { |
| if (path == null) { |
| throw exception; |
| } |
| // record info of path |
| throw new PathIOException(path.toString(), exception); |
| } |
| } |
| |
| /** |
| * Gets the root cause of a provided {@link Throwable}. If there is no cause for the |
| * {@link Throwable} provided into this function, the original {@link Throwable} is returned. |
| * |
| * @param throwable starting {@link Throwable} |
| * @return root cause {@link Throwable} |
| */ |
| private Throwable getRootCause(Throwable throwable) { |
| if (throwable == null) { |
| throw new IllegalArgumentException("throwable can not be null"); |
| } |
| |
| Throwable result = throwable; |
| while (result.getCause() != null) { |
| result = result.getCause(); |
| } |
| |
| return result; |
| } |
| |
| /** |
| * Get a delegation token from remote service endpoint if |
| * 'fs.azure.enable.kerberos.support' is set to 'true', and |
| * 'fs.azure.enable.delegation.token' is set to 'true'. |
| * @param renewer the account name that is allowed to renew the token. |
| * @return delegation token |
| * @throws IOException thrown when getting the current user. |
| */ |
| @Override |
| public synchronized Token<?> getDelegationToken(final String renewer) throws IOException { |
| return this.delegationTokenEnabled ? this.delegationTokenManager.getDelegationToken(renewer) |
| : super.getDelegationToken(renewer); |
| } |
| |
| @VisibleForTesting |
| FileSystem.Statistics getFsStatistics() { |
| return this.statistics; |
| } |
| |
| @VisibleForTesting |
| static class FileSystemOperation<T> { |
| private final T result; |
| private final AbfsRestOperationException exception; |
| |
| FileSystemOperation(final T result, final AbfsRestOperationException exception) { |
| this.result = result; |
| this.exception = exception; |
| } |
| |
| public boolean failed() { |
| return this.exception != null; |
| } |
| } |
| |
| @VisibleForTesting |
| AzureBlobFileSystemStore getAbfsStore() { |
| return abfsStore; |
| } |
| |
| @VisibleForTesting |
| AbfsClient getAbfsClient() { |
| return abfsStore.getClient(); |
| } |
| |
| @VisibleForTesting |
| boolean getIsNamespaceEnabled() throws AzureBlobFileSystemException { |
| return abfsStore.getIsNamespaceEnabled(); |
| } |
| |
| /** |
| * Use ABFS authorizer to check if user is authorized to perform specific |
| * {@link FsAction} on specified {@link Path}s. |
| * |
| * @param action The {@link FsAction} being requested on the provided {@link Path}s. |
| * @param paths The absolute paths of the storage being accessed. |
| * @throws AbfsAuthorizationException on authorization failure. |
| * @throws IOException network problems or similar. |
| * @throws IllegalArgumentException if the required parameters are not provided. |
| */ |
| private void performAbfsAuthCheck(FsAction action, Path... paths) |
| throws AbfsAuthorizationException, IOException { |
| if (authorizer == null) { |
| LOG.debug("ABFS authorizer is not initialized. No authorization check will be performed."); |
| } else { |
| Preconditions.checkArgument(paths.length > 0, "no paths supplied for authorization check"); |
| |
| LOG.debug("Auth check for action: {} on paths: {}", action.toString(), Arrays.toString(paths)); |
| if (!authorizer.isAuthorized(action, paths)) { |
| throw new AbfsAuthorizationException( |
| "User is not authorized for action " + action.toString() |
| + " on paths: " + Arrays.toString(paths)); |
| } |
| } |
| } |
| } |