blob: 6775bf12b79b7ff3dd8a7e794feff73927499caf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.asterix.external.util.aws.s3;
import static org.apache.asterix.common.exceptions.ErrorCode.INVALID_PARAM_VALUE_ALLOWED_VALUE;
import static org.apache.asterix.common.exceptions.ErrorCode.PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT;
import static org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
import static org.apache.asterix.common.exceptions.ErrorCode.S3_REGION_NOT_SUPPORTED;
import static org.apache.asterix.external.util.ExternalDataUtils.getPrefix;
import static org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static org.apache.asterix.external.util.aws.s3.S3Constants.*;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.BiPredicate;
import java.util.regex.Matcher;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.input.record.reader.abstracts.AbstractExternalInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.HDFSUtils;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.api.exceptions.IWarningCollector;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.api.util.CleanupUtils;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.AwsSessionCredentials;
import software.amazon.awssdk.auth.credentials.InstanceProfileCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Request;
import software.amazon.awssdk.services.s3.model.ListObjectsV2Response;
import software.amazon.awssdk.services.s3.model.S3Exception;
import software.amazon.awssdk.services.s3.model.S3Object;
import software.amazon.awssdk.services.s3.model.S3Response;
public class S3Utils {
private S3Utils() {
throw new AssertionError("do not instantiate");
}
public static boolean isRetryableError(String errorCode) {
return errorCode.equals(ERROR_INTERNAL_ERROR) || errorCode.equals(ERROR_SLOW_DOWN);
}
/**
* Builds the S3 client using the provided configuration
*
* @param configuration properties
* @return S3 client
* @throws CompilationException CompilationException
*/
public static S3Client buildAwsS3Client(Map<String, String> configuration) throws CompilationException {
// TODO(Hussain): Need to ensure that all required parameters are present in a previous step
String instanceProfile = configuration.get(INSTANCE_PROFILE_FIELD_NAME);
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
String regionId = configuration.get(REGION_FIELD_NAME);
String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
S3ClientBuilder builder = S3Client.builder();
// Credentials
AwsCredentialsProvider credentialsProvider;
// nothing provided, anonymous authentication
if (instanceProfile == null && accessKeyId == null && secretAccessKey == null && sessionToken == null) {
credentialsProvider = AnonymousCredentialsProvider.create();
} else if (instanceProfile != null) {
// only "true" value is allowed
if (!instanceProfile.equalsIgnoreCase("true")) {
throw new CompilationException(INVALID_PARAM_VALUE_ALLOWED_VALUE, INSTANCE_PROFILE_FIELD_NAME, "true");
}
// no other authentication parameters are allowed
if (accessKeyId != null) {
throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
INSTANCE_PROFILE_FIELD_NAME);
}
if (secretAccessKey != null) {
throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
INSTANCE_PROFILE_FIELD_NAME);
}
if (sessionToken != null) {
throw new CompilationException(PARAM_NOT_ALLOWED_IF_PARAM_IS_PRESENT, SESSION_TOKEN_FIELD_NAME,
INSTANCE_PROFILE_FIELD_NAME);
}
credentialsProvider = InstanceProfileCredentialsProvider.create();
} else if (accessKeyId != null || secretAccessKey != null) {
// accessKeyId authentication
if (accessKeyId == null) {
throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
SECRET_ACCESS_KEY_FIELD_NAME);
}
if (secretAccessKey == null) {
throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
ACCESS_KEY_ID_FIELD_NAME);
}
// use session token if provided
if (sessionToken != null) {
credentialsProvider = StaticCredentialsProvider
.create(AwsSessionCredentials.create(accessKeyId, secretAccessKey, sessionToken));
} else {
credentialsProvider =
StaticCredentialsProvider.create(AwsBasicCredentials.create(accessKeyId, secretAccessKey));
}
} else {
// if only session token is provided, accessKeyId is required
throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
SESSION_TOKEN_FIELD_NAME);
}
builder.credentialsProvider(credentialsProvider);
// Validate the region
List<Region> regions = S3Client.serviceMetadata().regions();
Optional<Region> selectedRegion = regions.stream().filter(region -> region.id().equals(regionId)).findFirst();
if (selectedRegion.isEmpty()) {
throw new CompilationException(S3_REGION_NOT_SUPPORTED, regionId);
}
builder.region(selectedRegion.get());
// Validate the service endpoint if present
if (serviceEndpoint != null) {
try {
URI uri = new URI(serviceEndpoint);
try {
builder.endpointOverride(uri);
} catch (NullPointerException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
}
} catch (URISyntaxException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR,
String.format("Invalid service endpoint %s", serviceEndpoint));
}
}
return builder.build();
}
/**
* Builds the S3 client using the provided configuration
*
* @param configuration properties
* @param numberOfPartitions number of partitions in the cluster
*/
public static void configureAwsS3HdfsJobConf(JobConf conf, Map<String, String> configuration,
int numberOfPartitions) {
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
String sessionToken = configuration.get(SESSION_TOKEN_FIELD_NAME);
String serviceEndpoint = configuration.get(SERVICE_END_POINT_FIELD_NAME);
//Disable caching S3 FileSystem
HDFSUtils.disableHadoopFileSystemCache(conf, HADOOP_S3_PROTOCOL);
/*
* Authentication Methods:
* 1- Anonymous: no accessKeyId and no secretAccessKey
* 2- Temporary: has to provide accessKeyId, secretAccessKey and sessionToken
* 3- Private: has to provide accessKeyId and secretAccessKey
*/
if (accessKeyId == null) {
//Tells hadoop-aws it is an anonymous access
conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_ANONYMOUS_ACCESS);
} else {
conf.set(HADOOP_ACCESS_KEY_ID, accessKeyId);
conf.set(HADOOP_SECRET_ACCESS_KEY, secretAccessKey);
if (sessionToken != null) {
conf.set(HADOOP_SESSION_TOKEN, sessionToken);
//Tells hadoop-aws it is a temporary access
conf.set(HADOOP_CREDENTIAL_PROVIDER_KEY, HADOOP_TEMP_ACCESS);
}
}
/*
* This is to allow S3 definition to have path-style form. Should always be true to match the current
* way we access files in S3
*/
conf.set(HADOOP_PATH_STYLE_ACCESS, ExternalDataConstants.TRUE);
/*
* Set the size of S3 connection pool to be the number of partitions
*/
conf.set(HADOOP_S3_CONNECTION_POOL_SIZE, String.valueOf(numberOfPartitions));
if (serviceEndpoint != null) {
// Validation of the URL should be done at hadoop-aws level
conf.set(HADOOP_SERVICE_END_POINT, serviceEndpoint);
} else {
//Region is ignored and buckets could be found by the central endpoint
conf.set(HADOOP_SERVICE_END_POINT, Constants.CENTRAL_ENDPOINT);
}
}
/**
* Validate external dataset properties
*
* @param configuration properties
* @throws CompilationException Compilation exception
*/
public static void validateProperties(Map<String, String> configuration, SourceLocation srcLoc,
IWarningCollector collector) throws CompilationException {
// check if the format property is present
if (configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, srcLoc, ExternalDataConstants.KEY_FORMAT);
}
// Both parameters should be passed, or neither should be passed (for anonymous/no auth)
String accessKeyId = configuration.get(ACCESS_KEY_ID_FIELD_NAME);
String secretAccessKey = configuration.get(SECRET_ACCESS_KEY_FIELD_NAME);
if (accessKeyId == null || secretAccessKey == null) {
// If one is passed, the other is required
if (accessKeyId != null) {
throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, SECRET_ACCESS_KEY_FIELD_NAME,
ACCESS_KEY_ID_FIELD_NAME);
} else if (secretAccessKey != null) {
throw new CompilationException(REQUIRED_PARAM_IF_PARAM_IS_PRESENT, ACCESS_KEY_ID_FIELD_NAME,
SECRET_ACCESS_KEY_FIELD_NAME);
}
}
validateIncludeExclude(configuration);
// Check if the bucket is present
S3Client s3Client = buildAwsS3Client(configuration);
S3Response response;
boolean useOldApi = false;
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
String prefix = getPrefix(configuration);
try {
response = isBucketEmpty(s3Client, container, prefix, false);
} catch (S3Exception ex) {
// Method not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
useOldApi = true;
response = isBucketEmpty(s3Client, container, prefix, true);
} else {
throw ex;
}
} catch (SdkException ex2) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
}
} catch (SdkException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
} finally {
if (s3Client != null) {
CleanupUtils.close(s3Client, null);
}
}
boolean isEmpty = useOldApi ? ((ListObjectsResponse) response).contents().isEmpty()
: ((ListObjectsV2Response) response).contents().isEmpty();
if (isEmpty && collector.shouldWarn()) {
Warning warning = Warning.of(srcLoc, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
collector.warn(warning);
}
// Returns 200 only in case the bucket exists, otherwise, throws an exception. However, to
// ensure coverage, check if the result is successful as well and not only catch exceptions
if (!response.sdkHttpResponse().isSuccessful()) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_CONTAINER_NOT_FOUND, container);
}
}
/**
* Checks for a single object in the specified bucket to determine if the bucket is empty or not.
*
* @param s3Client s3 client
* @param container the container name
* @param prefix Prefix to be used
* @param useOldApi flag whether to use the old API or not
* @return returns the S3 response
*/
private static S3Response isBucketEmpty(S3Client s3Client, String container, String prefix, boolean useOldApi) {
S3Response response;
if (useOldApi) {
ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder();
listObjectsBuilder.prefix(prefix);
response = s3Client.listObjects(listObjectsBuilder.bucket(container).maxKeys(1).build());
} else {
ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder();
listObjectsBuilder.prefix(prefix);
response = s3Client.listObjectsV2(listObjectsBuilder.bucket(container).maxKeys(1).build());
}
return response;
}
/**
* Returns the lists of S3 objects.
*
* @param configuration properties
* @param includeExcludeMatcher include/exclude matchers to apply
*/
public static List<S3Object> listS3Objects(Map<String, String> configuration,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher,
IWarningCollector warningCollector) throws CompilationException {
// Prepare to retrieve the objects
List<S3Object> filesOnly;
String container = configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
S3Client s3Client = buildAwsS3Client(configuration);
String prefix = getPrefix(configuration);
try {
filesOnly = listS3Objects(s3Client, container, prefix, includeExcludeMatcher);
} catch (S3Exception ex) {
// New API is not implemented, try falling back to old API
try {
// For error code, see https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html
if (ex.awsErrorDetails().errorCode().equals(ERROR_METHOD_NOT_IMPLEMENTED)) {
filesOnly = oldApiListS3Objects(s3Client, container, prefix, includeExcludeMatcher);
} else {
throw ex;
}
} catch (SdkException ex2) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
}
} catch (SdkException ex) {
throw new CompilationException(ErrorCode.EXTERNAL_SOURCE_ERROR, getMessageOrToString(ex));
} finally {
if (s3Client != null) {
CleanupUtils.close(s3Client, null);
}
}
// Warn if no files are returned
if (filesOnly.isEmpty() && warningCollector.shouldWarn()) {
Warning warning = Warning.of(null, ErrorCode.EXTERNAL_SOURCE_CONFIGURATION_RETURNED_NO_FILES);
warningCollector.warn(warning);
}
return filesOnly;
}
/**
* Uses the latest API to retrieve the objects from the storage.
*
* @param s3Client S3 client
* @param container container name
* @param prefix definition prefix
* @param includeExcludeMatcher include/exclude matchers to apply
*/
private static List<S3Object> listS3Objects(S3Client s3Client, String container, String prefix,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
String newMarker = null;
List<S3Object> filesOnly = new ArrayList<>();
ListObjectsV2Response listObjectsResponse;
ListObjectsV2Request.Builder listObjectsBuilder = ListObjectsV2Request.builder().bucket(container);
listObjectsBuilder.prefix(prefix);
while (true) {
// List the objects from the start, or from the last marker in case of truncated result
if (newMarker == null) {
listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.build());
} else {
listObjectsResponse = s3Client.listObjectsV2(listObjectsBuilder.continuationToken(newMarker).build());
}
// Collect the paths to files only
collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
includeExcludeMatcher.getMatchersList(), filesOnly);
// Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
if (!listObjectsResponse.isTruncated()) {
break;
} else {
newMarker = listObjectsResponse.nextContinuationToken();
}
}
return filesOnly;
}
/**
* Uses the old API (in case the new API is not implemented) to retrieve the objects from the storage
*
* @param s3Client S3 client
* @param container container name
* @param prefix definition prefix
* @param includeExcludeMatcher include/exclude matchers to apply
*/
private static List<S3Object> oldApiListS3Objects(S3Client s3Client, String container, String prefix,
AbstractExternalInputStreamFactory.IncludeExcludeMatcher includeExcludeMatcher) {
String newMarker = null;
List<S3Object> filesOnly = new ArrayList<>();
ListObjectsResponse listObjectsResponse;
ListObjectsRequest.Builder listObjectsBuilder = ListObjectsRequest.builder().bucket(container);
listObjectsBuilder.prefix(prefix);
while (true) {
// List the objects from the start, or from the last marker in case of truncated result
if (newMarker == null) {
listObjectsResponse = s3Client.listObjects(listObjectsBuilder.build());
} else {
listObjectsResponse = s3Client.listObjects(listObjectsBuilder.marker(newMarker).build());
}
// Collect the paths to files only
collectAndFilterFiles(listObjectsResponse.contents(), includeExcludeMatcher.getPredicate(),
includeExcludeMatcher.getMatchersList(), filesOnly);
// Mark the flag as done if done, otherwise, get the marker of the previous response for the next request
if (!listObjectsResponse.isTruncated()) {
break;
} else {
newMarker = listObjectsResponse.nextMarker();
}
}
return filesOnly;
}
/**
* AWS S3 returns all the objects as paths, not differentiating between folder and files. The path is considered
* a file if it does not end up with a "/" which is the separator in a folder structure.
*
* @param s3Objects List of returned objects
*/
private static void collectAndFilterFiles(List<S3Object> s3Objects, BiPredicate<List<Matcher>, String> predicate,
List<Matcher> matchers, List<S3Object> filesOnly) {
for (S3Object object : s3Objects) {
// skip folders
if (object.key().endsWith("/")) {
continue;
}
// No filter, add file
if (predicate.test(matchers, object.key())) {
filesOnly.add(object);
}
}
}
}