| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.hadoop.fs.azurebfs.services; |
| |
| import java.io.IOException; |
| import java.net.HttpURLConnection; |
| import java.net.URL; |
| import java.util.List; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; |
| import org.apache.hadoop.fs.azurebfs.contracts.exceptions.InvalidAbfsRestOperationException; |
| import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; |
| |
| /** |
| * The AbfsRestOperation for Rest AbfsClient. |
| */ |
| public class AbfsRestOperation { |
| // The type of the REST operation (Append, ReadFile, etc) |
| private final AbfsRestOperationType operationType; |
| // Blob FS client, which has the credentials, retry policy, and logs. |
| private final AbfsClient client; |
| // the HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE) |
| private final String method; |
| // full URL including query parameters |
| private final URL url; |
| // all the custom HTTP request headers provided by the caller |
| private final List<AbfsHttpHeader> requestHeaders; |
| |
| // This is a simple operation class, where all the upload methods have a |
| // request body and all the download methods have a response body. |
| private final boolean hasRequestBody; |
| |
| private static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); |
| |
| // For uploads, this is the request entity body. For downloads, |
| // this will hold the response entity body. |
| private byte[] buffer; |
| private int bufferOffset; |
| private int bufferLength; |
| |
| private AbfsHttpOperation result; |
| |
| public AbfsHttpOperation getResult() { |
| return result; |
| } |
| |
| /** |
| * Initializes a new REST operation. |
| * |
| * @param client The Blob FS client. |
| * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). |
| * @param url The full URL including query string parameters. |
| * @param requestHeaders The HTTP request headers. |
| */ |
| AbfsRestOperation(final AbfsRestOperationType operationType, |
| final AbfsClient client, |
| final String method, |
| final URL url, |
| final List<AbfsHttpHeader> requestHeaders) { |
| this.operationType = operationType; |
| this.client = client; |
| this.method = method; |
| this.url = url; |
| this.requestHeaders = requestHeaders; |
| this.hasRequestBody = (AbfsHttpConstants.HTTP_METHOD_PUT.equals(method) |
| || AbfsHttpConstants.HTTP_METHOD_PATCH.equals(method)); |
| } |
| |
| /** |
| * Initializes a new REST operation. |
| * |
| * @param operationType The type of the REST operation (Append, ReadFile, etc). |
| * @param client The Blob FS client. |
| * @param method The HTTP method (PUT, PATCH, POST, GET, HEAD, or DELETE). |
| * @param url The full URL including query string parameters. |
| * @param requestHeaders The HTTP request headers. |
| * @param buffer For uploads, this is the request entity body. For downloads, |
| * this will hold the response entity body. |
| * @param bufferOffset An offset into the buffer where the data beings. |
| * @param bufferLength The length of the data in the buffer. |
| */ |
| AbfsRestOperation(AbfsRestOperationType operationType, |
| AbfsClient client, |
| String method, |
| URL url, |
| List<AbfsHttpHeader> requestHeaders, |
| byte[] buffer, |
| int bufferOffset, |
| int bufferLength) { |
| this(operationType, client, method, url, requestHeaders); |
| this.buffer = buffer; |
| this.bufferOffset = bufferOffset; |
| this.bufferLength = bufferLength; |
| } |
| |
| /** |
| * Executes the REST operation with retry, by issuing one or more |
| * HTTP operations. |
| */ |
| void execute() throws AzureBlobFileSystemException { |
| int retryCount = 0; |
| while (!executeHttpOperation(retryCount++)) { |
| try { |
| Thread.sleep(client.getRetryPolicy().getRetryInterval(retryCount)); |
| } catch (InterruptedException ex) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| if (result.getStatusCode() >= HttpURLConnection.HTTP_BAD_REQUEST) { |
| throw new AbfsRestOperationException(result.getStatusCode(), result.getStorageErrorCode(), |
| result.getStorageErrorMessage(), null, result); |
| } |
| } |
| |
| /** |
| * Executes a single HTTP operation to complete the REST operation. If it |
| * fails, there may be a retry. The retryCount is incremented with each |
| * attempt. |
| */ |
| private boolean executeHttpOperation(final int retryCount) throws AzureBlobFileSystemException { |
| AbfsHttpOperation httpOperation = null; |
| try { |
| // initialize the HTTP request and open the connection |
| httpOperation = new AbfsHttpOperation(url, method, requestHeaders); |
| |
| // sign the HTTP request |
| if (client.getAccessToken() == null) { |
| // sign the HTTP request |
| client.getSharedKeyCredentials().signRequest( |
| httpOperation.getConnection(), |
| hasRequestBody ? bufferLength : 0); |
| } else { |
| httpOperation.getConnection().setRequestProperty(HttpHeaderConfigurations.AUTHORIZATION, |
| client.getAccessToken()); |
| } |
| |
| AbfsClientThrottlingIntercept.sendingRequest(operationType); |
| |
| if (hasRequestBody) { |
| // HttpUrlConnection requires |
| httpOperation.sendRequest(buffer, bufferOffset, bufferLength); |
| } |
| |
| httpOperation.processResponse(buffer, bufferOffset, bufferLength); |
| } catch (IOException ex) { |
| if (LOG.isDebugEnabled()) { |
| if (httpOperation != null) { |
| LOG.debug("HttpRequestFailure: " + httpOperation.toString(), ex); |
| } else { |
| LOG.debug("HttpRequestFailure: " + method + "," + url, ex); |
| } |
| } |
| if (!client.getRetryPolicy().shouldRetry(retryCount, -1)) { |
| throw new InvalidAbfsRestOperationException(ex); |
| } |
| return false; |
| } finally { |
| AbfsClientThrottlingIntercept.updateMetrics(operationType, httpOperation); |
| } |
| |
| LOG.debug("HttpRequest: " + httpOperation.toString()); |
| |
| if (client.getRetryPolicy().shouldRetry(retryCount, httpOperation.getStatusCode())) { |
| return false; |
| } |
| |
| result = httpOperation; |
| |
| return true; |
| } |
| } |