| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.sdk.io.aws.options; |
| |
| import com.amazonaws.ClientConfiguration; |
| import com.amazonaws.auth.AWSCredentialsProvider; |
| import com.amazonaws.auth.AWSStaticCredentialsProvider; |
| import com.amazonaws.auth.BasicAWSCredentials; |
| import com.amazonaws.auth.ClasspathPropertiesFileCredentialsProvider; |
| import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; |
| import com.amazonaws.auth.EC2ContainerCredentialsProviderWrapper; |
| import com.amazonaws.auth.EnvironmentVariableCredentialsProvider; |
| import com.amazonaws.auth.PropertiesFileCredentialsProvider; |
| import com.amazonaws.auth.SystemPropertiesCredentialsProvider; |
| import com.amazonaws.auth.profile.ProfileCredentialsProvider; |
| import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; |
| import com.amazonaws.services.s3.model.SSECustomerKey; |
| import com.fasterxml.jackson.annotation.JsonAutoDetect; |
| import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility; |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import com.fasterxml.jackson.annotation.JsonTypeInfo; |
| import com.fasterxml.jackson.core.JsonGenerator; |
| import com.fasterxml.jackson.core.JsonParser; |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.fasterxml.jackson.databind.DeserializationContext; |
| import com.fasterxml.jackson.databind.JsonDeserializer; |
| import com.fasterxml.jackson.databind.JsonSerializer; |
| import com.fasterxml.jackson.databind.Module; |
| import com.fasterxml.jackson.databind.SerializerProvider; |
| import com.fasterxml.jackson.databind.annotation.JsonDeserialize; |
| import com.fasterxml.jackson.databind.annotation.JsonSerialize; |
| import com.fasterxml.jackson.databind.jsontype.TypeDeserializer; |
| import com.fasterxml.jackson.databind.jsontype.TypeSerializer; |
| import com.fasterxml.jackson.databind.module.SimpleModule; |
| import com.google.auto.service.AutoService; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.util.Map; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; |
| |
| /** |
| * A Jackson {@link Module} that registers a {@link JsonSerializer} and {@link JsonDeserializer} for |
| * {@link AWSCredentialsProvider} and some subclasses. The serialized form is a JSON map. |
| * |
| * <p>It also adds serializers for S3 encryption objects {@link SSECustomerKey} and {@link |
| * SSEAwsKeyManagementParams}. |
| */ |
| @AutoService(Module.class) |
| public class AwsModule extends SimpleModule { |
| |
| private static final String AWS_ACCESS_KEY_ID = "awsAccessKeyId"; |
| private static final String AWS_SECRET_KEY = "awsSecretKey"; |
| private static final String CREDENTIALS_FILE_PATH = "credentialsFilePath"; |
| |
| public AwsModule() { |
| super("AwsModule"); |
| setMixInAnnotation(AWSCredentialsProvider.class, AWSCredentialsProviderMixin.class); |
| setMixInAnnotation(SSECustomerKey.class, SSECustomerKeyMixin.class); |
| setMixInAnnotation(SSEAwsKeyManagementParams.class, SSEAwsKeyManagementParamsMixin.class); |
| setMixInAnnotation(ClientConfiguration.class, ClientConfigurationMixin.class); |
| } |
| |
| /** A mixin to add Jackson annotations to {@link AWSCredentialsProvider}. */ |
| @JsonDeserialize(using = AWSCredentialsProviderDeserializer.class) |
| @JsonSerialize(using = AWSCredentialsProviderSerializer.class) |
| @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY) |
| private static class AWSCredentialsProviderMixin {} |
| |
| private static class AWSCredentialsProviderDeserializer |
| extends JsonDeserializer<AWSCredentialsProvider> { |
| |
| @Override |
| public AWSCredentialsProvider deserialize(JsonParser jsonParser, DeserializationContext context) |
| throws IOException { |
| return context.readValue(jsonParser, AWSCredentialsProvider.class); |
| } |
| |
| @Override |
| public AWSCredentialsProvider deserializeWithType( |
| JsonParser jsonParser, DeserializationContext context, TypeDeserializer typeDeserializer) |
| throws IOException { |
| Map<String, String> asMap = |
| jsonParser.readValueAs(new TypeReference<Map<String, String>>() {}); |
| |
| String typeNameKey = typeDeserializer.getPropertyName(); |
| String typeName = asMap.get(typeNameKey); |
| if (typeName == null) { |
| throw new IOException( |
| String.format("AWS credentials provider type name key '%s' not found", typeNameKey)); |
| } |
| |
| if (typeName.equals(AWSStaticCredentialsProvider.class.getSimpleName())) { |
| return new AWSStaticCredentialsProvider( |
| new BasicAWSCredentials(asMap.get(AWS_ACCESS_KEY_ID), asMap.get(AWS_SECRET_KEY))); |
| } else if (typeName.equals(PropertiesFileCredentialsProvider.class.getSimpleName())) { |
| return new PropertiesFileCredentialsProvider(asMap.get(CREDENTIALS_FILE_PATH)); |
| } else if (typeName.equals( |
| ClasspathPropertiesFileCredentialsProvider.class.getSimpleName())) { |
| return new ClasspathPropertiesFileCredentialsProvider(asMap.get(CREDENTIALS_FILE_PATH)); |
| } else if (typeName.equals(DefaultAWSCredentialsProviderChain.class.getSimpleName())) { |
| return new DefaultAWSCredentialsProviderChain(); |
| } else if (typeName.equals(EnvironmentVariableCredentialsProvider.class.getSimpleName())) { |
| return new EnvironmentVariableCredentialsProvider(); |
| } else if (typeName.equals(SystemPropertiesCredentialsProvider.class.getSimpleName())) { |
| return new SystemPropertiesCredentialsProvider(); |
| } else if (typeName.equals(ProfileCredentialsProvider.class.getSimpleName())) { |
| return new ProfileCredentialsProvider(); |
| } else if (typeName.equals(EC2ContainerCredentialsProviderWrapper.class.getSimpleName())) { |
| return new EC2ContainerCredentialsProviderWrapper(); |
| } else { |
| throw new IOException( |
| String.format("AWS credential provider type '%s' is not supported", typeName)); |
| } |
| } |
| } |
| |
| private static class AWSCredentialsProviderSerializer |
| extends JsonSerializer<AWSCredentialsProvider> { |
| // These providers are singletons, so don't require any serialization, other than type. |
| private static final ImmutableSet<Object> SINGLETON_CREDENTIAL_PROVIDERS = |
| ImmutableSet.of( |
| DefaultAWSCredentialsProviderChain.class, |
| EnvironmentVariableCredentialsProvider.class, |
| SystemPropertiesCredentialsProvider.class, |
| ProfileCredentialsProvider.class, |
| EC2ContainerCredentialsProviderWrapper.class); |
| |
| @Override |
| public void serialize( |
| AWSCredentialsProvider credentialsProvider, |
| JsonGenerator jsonGenerator, |
| SerializerProvider serializers) |
| throws IOException { |
| serializers.defaultSerializeValue(credentialsProvider, jsonGenerator); |
| } |
| |
| @Override |
| public void serializeWithType( |
| AWSCredentialsProvider credentialsProvider, |
| JsonGenerator jsonGenerator, |
| SerializerProvider serializers, |
| TypeSerializer typeSerializer) |
| throws IOException { |
| typeSerializer.writeTypePrefixForObject(credentialsProvider, jsonGenerator); |
| |
| if (credentialsProvider.getClass().equals(AWSStaticCredentialsProvider.class)) { |
| jsonGenerator.writeStringField( |
| AWS_ACCESS_KEY_ID, credentialsProvider.getCredentials().getAWSAccessKeyId()); |
| jsonGenerator.writeStringField( |
| AWS_SECRET_KEY, credentialsProvider.getCredentials().getAWSSecretKey()); |
| |
| } else if (credentialsProvider.getClass().equals(PropertiesFileCredentialsProvider.class)) { |
| try { |
| PropertiesFileCredentialsProvider specificProvider = |
| (PropertiesFileCredentialsProvider) credentialsProvider; |
| Field field = |
| PropertiesFileCredentialsProvider.class.getDeclaredField(CREDENTIALS_FILE_PATH); |
| field.setAccessible(true); |
| String credentialsFilePath = (String) field.get(specificProvider); |
| jsonGenerator.writeStringField(CREDENTIALS_FILE_PATH, credentialsFilePath); |
| } catch (NoSuchFieldException | IllegalAccessException e) { |
| throw new IOException("failed to access private field with reflection", e); |
| } |
| |
| } else if (credentialsProvider |
| .getClass() |
| .equals(ClasspathPropertiesFileCredentialsProvider.class)) { |
| try { |
| ClasspathPropertiesFileCredentialsProvider specificProvider = |
| (ClasspathPropertiesFileCredentialsProvider) credentialsProvider; |
| Field field = |
| ClasspathPropertiesFileCredentialsProvider.class.getDeclaredField( |
| CREDENTIALS_FILE_PATH); |
| field.setAccessible(true); |
| String credentialsFilePath = (String) field.get(specificProvider); |
| jsonGenerator.writeStringField(CREDENTIALS_FILE_PATH, credentialsFilePath); |
| } catch (NoSuchFieldException | IllegalAccessException e) { |
| throw new IOException("failed to access private field with reflection", e); |
| } |
| |
| } else if (!SINGLETON_CREDENTIAL_PROVIDERS.contains(credentialsProvider.getClass())) { |
| throw new IllegalArgumentException( |
| "Unsupported AWS credentials provider type " + credentialsProvider.getClass()); |
| } |
| typeSerializer.writeTypeSuffixForObject(credentialsProvider, jsonGenerator); |
| } |
| } |
| |
| /** A mixin to add Jackson annotations to {@link SSECustomerKey}. */ |
| @JsonDeserialize(using = SSECustomerKeyDeserializer.class) |
| private static class SSECustomerKeyMixin {} |
| |
| private static class SSECustomerKeyDeserializer extends JsonDeserializer<SSECustomerKey> { |
| @Override |
| public SSECustomerKey deserialize(JsonParser parser, DeserializationContext context) |
| throws IOException { |
| Map<String, String> asMap = parser.readValueAs(new TypeReference<Map<String, String>>() {}); |
| |
| final String key = asMap.getOrDefault("key", null); |
| final String algorithm = asMap.getOrDefault("algorithm", null); |
| final String md5 = asMap.getOrDefault("md5", null); |
| SSECustomerKey sseCustomerKey = new SSECustomerKey(key); |
| if (algorithm != null) { |
| sseCustomerKey.setAlgorithm(algorithm); |
| } |
| if (md5 != null) { |
| sseCustomerKey.setMd5(md5); |
| } |
| return sseCustomerKey; |
| } |
| } |
| |
| /** A mixin to add Jackson annotations to {@link SSEAwsKeyManagementParams}. */ |
| @JsonDeserialize(using = SSEAwsKeyManagementParamsDeserializer.class) |
| private static class SSEAwsKeyManagementParamsMixin {} |
| |
| private static class SSEAwsKeyManagementParamsDeserializer |
| extends JsonDeserializer<SSEAwsKeyManagementParams> { |
| @Override |
| public SSEAwsKeyManagementParams deserialize(JsonParser parser, DeserializationContext context) |
| throws IOException { |
| Map<String, String> asMap = parser.readValueAs(new TypeReference<Map<String, String>>() {}); |
| final String awsKmsKeyId = asMap.getOrDefault("awsKmsKeyId", null); |
| return new SSEAwsKeyManagementParams(awsKmsKeyId); |
| } |
| } |
| |
| @JsonAutoDetect( |
| fieldVisibility = Visibility.NONE, |
| getterVisibility = Visibility.NONE, |
| setterVisibility = Visibility.NONE) |
| interface ClientConfigurationMixin { |
| @JsonProperty |
| String getProxyHost(); |
| |
| @JsonProperty |
| Integer getProxyPort(); |
| |
| @JsonProperty |
| String getProxyUsername(); |
| |
| @JsonProperty |
| String getProxyPassword(); |
| } |
| } |