blob: 0415857745ba67d7ff530a901608b7a65b4caae1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.azurebfs.services;
import java.io.Closeable;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import org.apache.hadoop.security.ssl.DelegatingSSLSocketFactory;
import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidUriException;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.SASTokenProviderException;
import org.apache.hadoop.fs.azurebfs.extensions.ExtensionHelper;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
import org.apache.hadoop.fs.azurebfs.AbfsConfiguration;
import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider;
import org.apache.hadoop.fs.azurebfs.utils.DateTimeUtils;
import org.apache.hadoop.io.IOUtils;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.*;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_DELETE_CONSIDERED_IDEMPOTENT;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.HTTPS_SCHEME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.*;
import static org.apache.hadoop.fs.azurebfs.constants.HttpQueryParams.*;
/**
* AbfsClient.
*/
public class AbfsClient implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class);
private final URL baseUrl;
private final SharedKeyCredentials sharedKeyCredentials;
private final String xMsVersion = "2019-12-12";
private final ExponentialRetryPolicy retryPolicy;
private final String filesystem;
private final AbfsConfiguration abfsConfiguration;
private final String userAgent;
private final AbfsPerfTracker abfsPerfTracker;
private final String accountName;
private final AuthType authType;
private AccessTokenProvider tokenProvider;
private SASTokenProvider sasTokenProvider;
private final AbfsCounters abfsCounters;
private AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AbfsClientContext abfsClientContext) {
this.baseUrl = baseUrl;
this.sharedKeyCredentials = sharedKeyCredentials;
String baseUrlString = baseUrl.toString();
this.filesystem = baseUrlString.substring(baseUrlString.lastIndexOf(FORWARD_SLASH) + 1);
this.abfsConfiguration = abfsConfiguration;
this.retryPolicy = abfsClientContext.getExponentialRetryPolicy();
this.accountName = abfsConfiguration.getAccountName().substring(0, abfsConfiguration.getAccountName().indexOf(AbfsHttpConstants.DOT));
this.authType = abfsConfiguration.getAuthType(accountName);
String sslProviderName = null;
if (this.baseUrl.toString().startsWith(HTTPS_SCHEME)) {
try {
LOG.trace("Initializing DelegatingSSLSocketFactory with {} SSL "
+ "Channel Mode", this.abfsConfiguration.getPreferredSSLFactoryOption());
DelegatingSSLSocketFactory.initializeDefaultFactory(this.abfsConfiguration.getPreferredSSLFactoryOption());
sslProviderName = DelegatingSSLSocketFactory.getDefaultFactory().getProviderName();
} catch (IOException e) {
// Suppress exception. Failure to init DelegatingSSLSocketFactory would have only performance impact.
LOG.trace("NonCritFailure: DelegatingSSLSocketFactory Init failed : "
+ "{}", e.getMessage());
}
}
this.userAgent = initializeUserAgent(abfsConfiguration, sslProviderName);
this.abfsPerfTracker = abfsClientContext.getAbfsPerfTracker();
this.abfsCounters = abfsClientContext.getAbfsCounters();
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final AccessTokenProvider tokenProvider,
final AbfsClientContext abfsClientContext) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.tokenProvider = tokenProvider;
}
public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredentials,
final AbfsConfiguration abfsConfiguration,
final SASTokenProvider sasTokenProvider,
final AbfsClientContext abfsClientContext) {
this(baseUrl, sharedKeyCredentials, abfsConfiguration, abfsClientContext);
this.sasTokenProvider = sasTokenProvider;
}
@Override
public void close() throws IOException {
if (tokenProvider instanceof Closeable) {
IOUtils.cleanupWithLogger(LOG, (Closeable) tokenProvider);
}
}
public String getFileSystem() {
return filesystem;
}
protected AbfsPerfTracker getAbfsPerfTracker() {
return abfsPerfTracker;
}
ExponentialRetryPolicy getRetryPolicy() {
return retryPolicy;
}
SharedKeyCredentials getSharedKeyCredentials() {
return sharedKeyCredentials;
}
List<AbfsHttpHeader> createDefaultHeaders() {
final List<AbfsHttpHeader> requestHeaders = new ArrayList<AbfsHttpHeader>();
requestHeaders.add(new AbfsHttpHeader(X_MS_VERSION, xMsVersion));
requestHeaders.add(new AbfsHttpHeader(ACCEPT, APPLICATION_JSON
+ COMMA + SINGLE_WHITE_SPACE + APPLICATION_OCTET_STREAM));
requestHeaders.add(new AbfsHttpHeader(ACCEPT_CHARSET,
UTF_8));
requestHeaders.add(new AbfsHttpHeader(CONTENT_TYPE, EMPTY_STRING));
requestHeaders.add(new AbfsHttpHeader(USER_AGENT, userAgent));
return requestHeaders;
}
AbfsUriQueryBuilder createDefaultUriQueryBuilder() {
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_TIMEOUT, DEFAULT_TIMEOUT);
return abfsUriQueryBuilder;
}
public AbfsRestOperation createFilesystem() throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = new AbfsUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreateFileSystem,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation setFilesystemProperties(final String properties) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES,
properties));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetFileSystemProperties,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation listPath(final String relativePath, final boolean recursive, final int listMaxResults,
final String continuation) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_DIRECTORY, getDirectoryQueryParameter(relativePath));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_MAXRESULTS, String.valueOf(listMaxResults));
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(relativePath, SASTokenProvider.LIST_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.ListPaths,
this,
HTTP_METHOD_GET,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation getFilesystemProperties() throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetFileSystemProperties,
this,
HTTP_METHOD_HEAD,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation deleteFilesystem() throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, FILESYSTEM);
final URL url = createRequestUrl(abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.DeleteFileSystem,
this,
HTTP_METHOD_DELETE,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation createPath(final String path, final boolean isFile, final boolean overwrite,
final String permission, final String umask,
final boolean isAppendBlob, final String eTag) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
if (!overwrite) {
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, AbfsHttpConstants.STAR));
}
if (permission != null && !permission.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
}
if (umask != null && !umask.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_UMASK, umask));
}
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RESOURCE, isFile ? FILE : DIRECTORY);
if (isAppendBlob) {
abfsUriQueryBuilder.addQuery(QUERY_PARAM_BLOBTYPE, APPEND_BLOB_TYPE);
}
String operation = isFile
? SASTokenProvider.CREATE_FILE_OPERATION
: SASTokenProvider.CREATE_DIRECTORY_OPERATION;
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CreatePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation renamePath(String source, final String destination, final String continuation)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
String encodedRenameSource = urlEncode(FORWARD_SLASH + this.getFileSystem() + source);
if (authType == AuthType.SAS) {
final AbfsUriQueryBuilder srcQueryBuilder = new AbfsUriQueryBuilder();
appendSASTokenToQuery(source, SASTokenProvider.RENAME_SOURCE_OPERATION, srcQueryBuilder);
encodedRenameSource += srcQueryBuilder.toString();
}
LOG.trace("Rename source queryparam added {}", encodedRenameSource);
requestHeaders.add(new AbfsHttpHeader(X_MS_RENAME_SOURCE, encodedRenameSource));
requestHeaders.add(new AbfsHttpHeader(IF_NONE_MATCH, STAR));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
appendSASTokenToQuery(destination, SASTokenProvider.RENAME_DESTINATION_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(destination, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.RenamePath,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
Instant renameRequestStartTime = Instant.now();
try {
op.execute();
} catch (AzureBlobFileSystemException e) {
final AbfsRestOperation idempotencyOp = renameIdempotencyCheckOp(
renameRequestStartTime, op, destination);
if (idempotencyOp.getResult().getStatusCode()
== op.getResult().getStatusCode()) {
// idempotency did not return different result
// throw back the exception
throw e;
} else {
return idempotencyOp;
}
}
return op;
}
/**
* Check if the rename request failure is post a retry and if earlier rename
* request might have succeeded at back-end.
*
* If there is a parallel rename activity happening from any other store
* interface, the logic here will detect the rename to have happened due to
* the one initiated from this ABFS filesytem instance as it was retried. This
* should be a corner case hence going ahead with LMT check.
* @param renameRequestStartTime startTime for the rename request
* @param op Rename request REST operation response
* @param destination rename destination path
* @return REST operation response post idempotency check
* @throws AzureBlobFileSystemException if GetFileStatus hits any exception
*/
public AbfsRestOperation renameIdempotencyCheckOp(
final Instant renameRequestStartTime,
final AbfsRestOperation op,
final String destination) throws AzureBlobFileSystemException {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)) {
// Server has returned HTTP 404, which means rename source no longer
// exists. Check on destination status and if it has a recent LMT timestamp.
// If yes, return success, else fall back to original rename request failure response.
try {
final AbfsRestOperation destStatusOp = getPathStatus(destination,
false);
if (destStatusOp.getResult().getStatusCode()
== HttpURLConnection.HTTP_OK) {
String lmt = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.LAST_MODIFIED);
if (DateTimeUtils.isRecentlyModified(lmt, renameRequestStartTime)) {
return destStatusOp;
}
}
} catch (AzureBlobFileSystemException e) {
// GetFileStatus on the destination failed, return original op
return op;
}
}
return op;
}
public AbfsRestOperation append(final String path, final long position, final byte[] buffer, final int offset,
final int length, final String cachedSasToken, final boolean isAppendBlob) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, APPEND_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders, buffer, offset, length, sasTokenForReuse);
try {
op.execute();
} catch (AzureBlobFileSystemException e) {
if (isAppendBlob && appendSuccessCheckOp(op, path, (position + length))) {
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.Append,
this,
HTTP_METHOD_PUT,
url,
requestHeaders, buffer, offset, length, sasTokenForReuse);
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
throw e;
}
return op;
}
// For AppendBlob its possible that the append succeeded in the backend but the request failed.
// However a retry would fail with an InvalidQueryParameterValue
// (as the current offset would be unacceptable).
// Hence, we pass/succeed the appendblob append call
// in case we are doing a retry after checking the length of the file
public boolean appendSuccessCheckOp(AbfsRestOperation op, final String path,
final long length) throws AzureBlobFileSystemException {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_BAD_REQUEST)) {
final AbfsRestOperation destStatusOp = getPathStatus(path, false);
if (destStatusOp.getResult().getStatusCode() == HttpURLConnection.HTTP_OK) {
String fileLength = destStatusOp.getResult().getResponseHeader(
HttpHeaderConfigurations.CONTENT_LENGTH);
if (length <= Long.parseLong(fileLength)) {
return true;
}
}
}
return false;
}
public AbfsRestOperation flush(final String path, final long position, boolean retainUncommittedData,
boolean isClose, final String cachedSasToken)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, FLUSH_ACTION);
abfsUriQueryBuilder.addQuery(QUERY_PARAM_POSITION, Long.toString(position));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RETAIN_UNCOMMITTED_DATA, String.valueOf(retainUncommittedData));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CLOSE, String.valueOf(isClose));
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.WRITE_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.Flush,
this,
HTTP_METHOD_PUT,
url,
requestHeaders, sasTokenForReuse);
op.execute();
return op;
}
public AbfsRestOperation setPathProperties(final String path, final String properties)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
requestHeaders.add(new AbfsHttpHeader(X_MS_PROPERTIES, properties));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, SET_PROPERTIES_ACTION);
appendSASTokenToQuery(path, SASTokenProvider.SET_PROPERTIES_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetPathProperties,
this,
HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation getPathStatus(final String path, final boolean includeProperties) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
String operation = SASTokenProvider.GET_PROPERTIES_OPERATION;
if (!includeProperties) {
// The default action (operation) is implicitly to get properties and this action requires read permission
// because it reads user defined properties. If the action is getStatus or getAclStatus, then
// only traversal (execute) permission is required.
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_STATUS);
operation = SASTokenProvider.GET_STATUS_OPERATION;
}
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(abfsConfiguration.isUpnUsed()));
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetPathStatus,
this,
HTTP_METHOD_HEAD,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation read(final String path, final long position, final byte[] buffer, final int bufferOffset,
final int bufferLength, final String eTag, String cachedSasToken) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
requestHeaders.add(new AbfsHttpHeader(RANGE,
String.format("bytes=%d-%d", position, position + bufferLength - 1)));
requestHeaders.add(new AbfsHttpHeader(IF_MATCH, eTag));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
// AbfsInputStream/AbfsOutputStream reuse SAS tokens for better performance
String sasTokenForReuse = appendSASTokenToQuery(path, SASTokenProvider.READ_OPERATION,
abfsUriQueryBuilder, cachedSasToken);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.ReadFile,
this,
HTTP_METHOD_GET,
url,
requestHeaders,
buffer,
bufferOffset,
bufferLength, sasTokenForReuse);
op.execute();
return op;
}
public AbfsRestOperation deletePath(final String path, final boolean recursive, final String continuation)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_RECURSIVE, String.valueOf(recursive));
abfsUriQueryBuilder.addQuery(QUERY_PARAM_CONTINUATION, continuation);
String operation = recursive ? SASTokenProvider.DELETE_RECURSIVE_OPERATION : SASTokenProvider.DELETE_OPERATION;
appendSASTokenToQuery(path, operation, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.DeletePath,
this,
HTTP_METHOD_DELETE,
url,
requestHeaders);
try {
op.execute();
} catch (AzureBlobFileSystemException e) {
final AbfsRestOperation idempotencyOp = deleteIdempotencyCheckOp(op);
if (idempotencyOp.getResult().getStatusCode()
== op.getResult().getStatusCode()) {
// idempotency did not return different result
// throw back the exception
throw e;
} else {
return idempotencyOp;
}
}
return op;
}
/**
* Check if the delete request failure is post a retry and if delete failure
* qualifies to be a success response assuming idempotency.
*
* There are below scenarios where delete could be incorrectly deducted as
* success post request retry:
* 1. Target was originally not existing and initial delete request had to be
* re-tried.
* 2. Parallel delete issued from any other store interface rather than
* delete issued from this filesystem instance.
* These are few corner cases and usually returning a success at this stage
* should help the job to continue.
* @param op Delete request REST operation response
* @return REST operation response post idempotency check
*/
public AbfsRestOperation deleteIdempotencyCheckOp(final AbfsRestOperation op) {
if ((op.isARetriedRequest())
&& (op.getResult().getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND)
&& DEFAULT_DELETE_CONSIDERED_IDEMPOTENT) {
// Server has returned HTTP 404, which means path no longer
// exists. Assuming delete result to be idempotent, return success.
final AbfsRestOperation successOp = new AbfsRestOperation(
AbfsRestOperationType.DeletePath,
this,
HTTP_METHOD_DELETE,
op.getUrl(),
op.getRequestHeaders());
successOp.hardSetResult(HttpURLConnection.HTTP_OK);
return successOp;
}
return op;
}
public AbfsRestOperation setOwner(final String path, final String owner, final String group)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
if (owner != null && !owner.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_OWNER, owner));
}
if (group != null && !group.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_GROUP, group));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SET_OWNER_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetOwner,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation setPermission(final String path, final String permission)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_PERMISSIONS, permission));
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SET_PERMISSION_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetPermissions,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation setAcl(final String path, final String aclSpecString) throws AzureBlobFileSystemException {
return setAcl(path, aclSpecString, AbfsHttpConstants.EMPTY_STRING);
}
public AbfsRestOperation setAcl(final String path, final String aclSpecString, final String eTag)
throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
// JDK7 does not support PATCH, so to workaround the issue we will use
// PUT and specify the real method in the X-Http-Method-Override header.
requestHeaders.add(new AbfsHttpHeader(X_HTTP_METHOD_OVERRIDE,
HTTP_METHOD_PATCH));
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.X_MS_ACL, aclSpecString));
if (eTag != null && !eTag.isEmpty()) {
requestHeaders.add(new AbfsHttpHeader(HttpHeaderConfigurations.IF_MATCH, eTag));
}
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.SET_ACCESS_CONTROL);
appendSASTokenToQuery(path, SASTokenProvider.SET_ACL_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.SetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_PUT,
url,
requestHeaders);
op.execute();
return op;
}
public AbfsRestOperation getAclStatus(final String path) throws AzureBlobFileSystemException {
return getAclStatus(path, abfsConfiguration.isUpnUsed());
}
public AbfsRestOperation getAclStatus(final String path, final boolean useUPN) throws AzureBlobFileSystemException {
final List<AbfsHttpHeader> requestHeaders = createDefaultHeaders();
final AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_ACTION, AbfsHttpConstants.GET_ACCESS_CONTROL);
abfsUriQueryBuilder.addQuery(HttpQueryParams.QUERY_PARAM_UPN, String.valueOf(useUPN));
appendSASTokenToQuery(path, SASTokenProvider.GET_ACL_OPERATION, abfsUriQueryBuilder);
final URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
final AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.GetAcl,
this,
AbfsHttpConstants.HTTP_METHOD_HEAD,
url,
requestHeaders);
op.execute();
return op;
}
/**
* Talks to the server to check whether the permission specified in
* the rwx parameter is present for the path specified in the path parameter.
*
* @param path Path for which access check needs to be performed
* @param rwx The permission to be checked on the path
* @return The {@link AbfsRestOperation} object for the operation
* @throws AzureBlobFileSystemException in case of bad requests
*/
public AbfsRestOperation checkAccess(String path, String rwx)
throws AzureBlobFileSystemException {
AbfsUriQueryBuilder abfsUriQueryBuilder = createDefaultUriQueryBuilder();
abfsUriQueryBuilder.addQuery(QUERY_PARAM_ACTION, CHECK_ACCESS);
abfsUriQueryBuilder.addQuery(QUERY_FS_ACTION, rwx);
appendSASTokenToQuery(path, SASTokenProvider.CHECK_ACCESS_OPERATION, abfsUriQueryBuilder);
URL url = createRequestUrl(path, abfsUriQueryBuilder.toString());
AbfsRestOperation op = new AbfsRestOperation(
AbfsRestOperationType.CheckAccess, this,
AbfsHttpConstants.HTTP_METHOD_HEAD, url, createDefaultHeaders());
op.execute();
return op;
}
/**
* Get the directory query parameter used by the List Paths REST API and used
* as the path in the continuation token. If the input path is null or the
* root path "/", empty string is returned. If the input path begins with '/',
* the return value is the substring beginning at offset 1. Otherwise, the
* input path is returned.
* @param path the path to be listed.
* @return the value of the directory query parameter
*/
public static String getDirectoryQueryParameter(final String path) {
String directory = path;
if (Strings.isNullOrEmpty(directory)) {
directory = AbfsHttpConstants.EMPTY_STRING;
} else if (directory.charAt(0) == '/') {
directory = directory.substring(1);
}
return directory;
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder
* @param path
* @param operation
* @param queryBuilder
* @return sasToken - returned for optional re-use.
* @throws SASTokenProviderException
*/
private String appendSASTokenToQuery(String path, String operation, AbfsUriQueryBuilder queryBuilder) throws SASTokenProviderException {
return appendSASTokenToQuery(path, operation, queryBuilder, null);
}
/**
* If configured for SAS AuthType, appends SAS token to queryBuilder
* @param path
* @param operation
* @param queryBuilder
* @param cachedSasToken - previously acquired SAS token to be reused.
* @return sasToken - returned for optional re-use.
* @throws SASTokenProviderException
*/
private String appendSASTokenToQuery(String path,
String operation,
AbfsUriQueryBuilder queryBuilder,
String cachedSasToken)
throws SASTokenProviderException {
String sasToken = null;
if (this.authType == AuthType.SAS) {
try {
LOG.trace("Fetch SAS token for {} on {}", operation, path);
if (cachedSasToken == null) {
sasToken = sasTokenProvider.getSASToken(this.accountName,
this.filesystem, path, operation);
if ((sasToken == null) || sasToken.isEmpty()) {
throw new UnsupportedOperationException("SASToken received is empty or null");
}
} else {
sasToken = cachedSasToken;
LOG.trace("Using cached SAS token.");
}
queryBuilder.setSASToken(sasToken);
LOG.trace("SAS token fetch complete for {} on {}", operation, path);
} catch (Exception ex) {
throw new SASTokenProviderException(String.format("Failed to acquire a SAS token for %s on %s due to %s",
operation,
path,
ex.toString()));
}
}
return sasToken;
}
private URL createRequestUrl(final String query) throws AzureBlobFileSystemException {
return createRequestUrl(EMPTY_STRING, query);
}
@VisibleForTesting
protected URL createRequestUrl(final String path, final String query)
throws AzureBlobFileSystemException {
final String base = baseUrl.toString();
String encodedPath = path;
try {
encodedPath = urlEncode(path);
} catch (AzureBlobFileSystemException ex) {
LOG.debug("Unexpected error.", ex);
throw new InvalidUriException(path);
}
final StringBuilder sb = new StringBuilder();
sb.append(base);
sb.append(encodedPath);
sb.append(query);
final URL url;
try {
url = new URL(sb.toString());
} catch (MalformedURLException ex) {
throw new InvalidUriException(sb.toString());
}
return url;
}
public static String urlEncode(final String value) throws AzureBlobFileSystemException {
String encodedString;
try {
encodedString = URLEncoder.encode(value, UTF_8)
.replace(PLUS, PLUS_ENCODE)
.replace(FORWARD_SLASH_ENCODE, FORWARD_SLASH);
} catch (UnsupportedEncodingException ex) {
throw new InvalidUriException(value);
}
return encodedString;
}
public synchronized String getAccessToken() throws IOException {
if (tokenProvider != null) {
return "Bearer " + tokenProvider.getToken().getAccessToken();
} else {
return null;
}
}
public AuthType getAuthType() {
return authType;
}
@VisibleForTesting
String initializeUserAgent(final AbfsConfiguration abfsConfiguration,
final String sslProviderName) {
StringBuilder sb = new StringBuilder();
sb.append(APN_VERSION);
sb.append(SINGLE_WHITE_SPACE);
sb.append(CLIENT_VERSION);
sb.append(SINGLE_WHITE_SPACE);
sb.append("(");
sb.append(System.getProperty(JAVA_VENDOR)
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
sb.append(SINGLE_WHITE_SPACE);
sb.append("JavaJRE");
sb.append(SINGLE_WHITE_SPACE);
sb.append(System.getProperty(JAVA_VERSION));
sb.append(SEMICOLON);
sb.append(SINGLE_WHITE_SPACE);
sb.append(System.getProperty(OS_NAME)
.replaceAll(SINGLE_WHITE_SPACE, EMPTY_STRING));
sb.append(SINGLE_WHITE_SPACE);
sb.append(System.getProperty(OS_VERSION));
sb.append(FORWARD_SLASH);
sb.append(System.getProperty(OS_ARCH));
sb.append(SEMICOLON);
appendIfNotEmpty(sb, sslProviderName, true);
appendIfNotEmpty(sb,
ExtensionHelper.getUserAgentSuffix(tokenProvider, EMPTY_STRING), true);
sb.append(SINGLE_WHITE_SPACE);
sb.append(abfsConfiguration.getClusterName());
sb.append(FORWARD_SLASH);
sb.append(abfsConfiguration.getClusterType());
sb.append(")");
appendIfNotEmpty(sb, abfsConfiguration.getCustomUserAgentPrefix(), false);
return String.format(Locale.ROOT, sb.toString());
}
private void appendIfNotEmpty(StringBuilder sb, String regEx,
boolean shouldAppendSemiColon) {
if (regEx == null || regEx.trim().isEmpty()) {
return;
}
sb.append(SINGLE_WHITE_SPACE);
sb.append(regEx);
if (shouldAppendSemiColon) {
sb.append(SEMICOLON);
}
}
@VisibleForTesting
URL getBaseUrl() {
return baseUrl;
}
@VisibleForTesting
public SASTokenProvider getSasTokenProvider() {
return this.sasTokenProvider;
}
/**
* Getter for abfsCounters from AbfsClient.
* @return AbfsCounters instance.
*/
protected AbfsCounters getAbfsCounters() {
return abfsCounters;
}
}