blob: 44da97832eec4a704c5149c1a2f12422d11ea877 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import com.amazonaws.AmazonWebServiceClient;
import com.amazonaws.ClientConfiguration;
import com.amazonaws.Protocol;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AnonymousAWSCredentials;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.PropertiesCredentials;
import com.amazonaws.http.conn.ssl.SdkTLSSocketFactory;
import com.amazonaws.regions.Region;
import com.amazonaws.regions.Regions;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractSessionFactoryProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.proxy.ProxyConfiguration;
import org.apache.nifi.proxy.ProxySpec;
import org.apache.nifi.ssl.SSLContextService;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
* Abstract base class for aws processors. This class uses aws credentials for creating aws clients
* @deprecated use {@link AbstractAWSCredentialsProviderProcessor} instead which uses credentials providers or creating aws clients
* @see AbstractAWSCredentialsProviderProcessor
public abstract class AbstractAWSProcessor<ClientType extends AmazonWebServiceClient> extends AbstractSessionFactoryProcessor {
public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success")
.description("FlowFiles are routed to success relationship").build();
public static final Relationship REL_FAILURE = new Relationship.Builder().name("failure")
.description("FlowFiles are routed to failure relationship").build();
public static final Set<Relationship> relationships = Collections.unmodifiableSet(
new HashSet<>(Arrays.asList(REL_SUCCESS, REL_FAILURE)));
public static final PropertyDescriptor CREDENTIALS_FILE = CredentialPropertyDescriptors.CREDENTIALS_FILE;
public static final PropertyDescriptor ACCESS_KEY = CredentialPropertyDescriptors.ACCESS_KEY;
public static final PropertyDescriptor SECRET_KEY = CredentialPropertyDescriptors.SECRET_KEY;
public static final PropertyDescriptor PROXY_HOST = new PropertyDescriptor.Builder()
.name("Proxy Host")
.description("Proxy host name or IP")
public static final PropertyDescriptor PROXY_HOST_PORT = new PropertyDescriptor.Builder()
.name("Proxy Host Port")
.description("Proxy host port")
public static final PropertyDescriptor PROXY_USERNAME = new PropertyDescriptor.Builder()
.displayName("Proxy Username")
.description("Proxy username")
public static final PropertyDescriptor PROXY_PASSWORD = new PropertyDescriptor.Builder()
.displayName("Proxy Password")
.description("Proxy password")
public static final PropertyDescriptor REGION = new PropertyDescriptor.Builder()
public static final PropertyDescriptor TIMEOUT = new PropertyDescriptor.Builder()
.name("Communications Timeout")
.defaultValue("30 secs")
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("SSL Context Service")
.description("Specifies an optional SSL Context Service that, if provided, will be used to create connections")
public static final PropertyDescriptor ENDPOINT_OVERRIDE = new PropertyDescriptor.Builder()
.name("Endpoint Override URL")
.description("Endpoint URL to use instead of the AWS default including scheme, host, port, and path. " +
"The AWS libraries select an endpoint URL based on the AWS region, but this property overrides " +
"the selected endpoint URL, allowing use with other S3-compatible endpoints.")
protected volatile ClientType client;
protected volatile Region region;
private static final String VPCE_ENDPOINT_SUFFIX = "";
private static final Pattern VPCE_ENDPOINT_PATTERN = Pattern.compile("^(?:.+[vpce-][a-z0-9-]+\\.)?([a-z0-9-]+)$");
// If protocol is changed to be a property, ensure other uses are also changed
protected static final Protocol DEFAULT_PROTOCOL = Protocol.HTTPS;
protected static final String DEFAULT_USER_AGENT = "NiFi";
private static final ProxySpec[] PROXY_SPECS = {ProxySpec.HTTP_AUTH};
public static final PropertyDescriptor PROXY_CONFIGURATION_SERVICE = ProxyConfiguration.createProxyConfigPropertyDescriptor(true, PROXY_SPECS);
public static AllowableValue createAllowableValue(final Regions region) {
return new AllowableValue(region.getName(), region.getDescription(), "AWS Region Code : " + region.getName());
public static AllowableValue[] getAvailableRegions() {
final List<AllowableValue> values = new ArrayList<>();
for (final Regions region : Regions.values()) {
return values.toArray(new AllowableValue[0]);
public Set<Relationship> getRelationships() {
return relationships;
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final List<ValidationResult> problems = new ArrayList<>(super.customValidate(validationContext));
final boolean accessKeySet = validationContext.getProperty(ACCESS_KEY).isSet();
final boolean secretKeySet = validationContext.getProperty(SECRET_KEY).isSet();
if ((accessKeySet && !secretKeySet) || (secretKeySet && !accessKeySet)) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("If setting Secret Key or Access Key, must set both").build());
final boolean credentialsFileSet = validationContext.getProperty(CREDENTIALS_FILE).isSet();
if ((secretKeySet || accessKeySet) && credentialsFileSet) {
problems.add(new ValidationResult.Builder().input("Access Key").valid(false).explanation("Cannot set both Credentials File and Secret Key/Access Key").build());
final boolean proxyHostSet = validationContext.getProperty(PROXY_HOST).isSet();
final boolean proxyPortSet = validationContext.getProperty(PROXY_HOST_PORT).isSet();
if ((proxyHostSet && !proxyPortSet) || (!proxyHostSet && proxyPortSet)) {
problems.add(new ValidationResult.Builder().subject("Proxy Host and Port").valid(false).explanation("If Proxy Host or Proxy Port is set, both must be set").build());
final boolean proxyUserSet = validationContext.getProperty(PROXY_USERNAME).isSet();
final boolean proxyPwdSet = validationContext.getProperty(PROXY_PASSWORD).isSet();
if ((proxyUserSet && !proxyPwdSet) || (!proxyUserSet && proxyPwdSet)) {
problems.add(new ValidationResult.Builder().subject("Proxy User and Password").valid(false).explanation("If Proxy Username or Proxy Password is set, both must be set").build());
if (proxyUserSet && !proxyHostSet) {
problems.add(new ValidationResult.Builder().subject("Proxy").valid(false).explanation("If Proxy username is set, proxy host must be set").build());
ProxyConfiguration.validateProxySpec(validationContext, problems, PROXY_SPECS);
return problems;
protected ClientConfiguration createConfiguration(final ProcessContext context) {
final ClientConfiguration config = new ClientConfiguration();
// If this is changed to be a property, ensure other uses are also changed
final int commsTimeout = context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).intValue();
if(this.getSupportedPropertyDescriptors().contains(SSL_CONTEXT_SERVICE)) {
final SSLContextService sslContextService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
if (sslContextService != null) {
final SSLContext sslContext = sslContextService.createContext();
// NIFI-3788: Changed hostnameVerifier from null to DHV (BrowserCompatibleHostnameVerifier is deprecated)
SdkTLSSocketFactory sdkTLSSocketFactory = new SdkTLSSocketFactory(sslContext, new DefaultHostnameVerifier());
final ProxyConfiguration proxyConfig = ProxyConfiguration.getConfiguration(context, () -> {
if (context.getProperty(PROXY_HOST).isSet()) {
final ProxyConfiguration componentProxyConfig = new ProxyConfiguration();
String proxyHost = context.getProperty(PROXY_HOST).evaluateAttributeExpressions().getValue();
Integer proxyPort = context.getProperty(PROXY_HOST_PORT).evaluateAttributeExpressions().asInteger();
String proxyUsername = context.getProperty(PROXY_USERNAME).evaluateAttributeExpressions().getValue();
String proxyPassword = context.getProperty(PROXY_PASSWORD).evaluateAttributeExpressions().getValue();
return componentProxyConfig;
return ProxyConfiguration.DIRECT_CONFIGURATION;
if (Proxy.Type.HTTP.equals(proxyConfig.getProxyType())) {
if (proxyConfig.hasCredential()) {
return config;
public void onScheduled(final ProcessContext context) {
this.client = createClient(context, getCredentials(context), createConfiguration(context));
* Allow optional override of onTrigger with the ProcessSessionFactory where required for AWS processors (e.g. ConsumeKinesisStream)
* @see AbstractProcessor
public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
final ProcessSession session = sessionFactory.createSession();
try {
onTrigger(context, session);
} catch (final Throwable t) {
throw t;
* Default to requiring the "standard" onTrigger with a single ProcessSession
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
protected void initializeRegionAndEndpoint(ProcessContext context) {
// if the processor supports REGION, get the configured region.
if (getSupportedPropertyDescriptors().contains(REGION)) {
final String region = context.getProperty(REGION).getValue();
if (region != null) {
this.region = Region.getRegion(Regions.fromName(region));
if (client != null) {
} else {
this.region = null;
// if the endpoint override has been configured, set the endpoint.
// (per Amazon docs this should only be configured at client creation)
if (client != null && getSupportedPropertyDescriptors().contains(ENDPOINT_OVERRIDE)) {
final String urlstr = StringUtils.trimToEmpty(context.getProperty(ENDPOINT_OVERRIDE).evaluateAttributeExpressions().getValue());
if (!urlstr.isEmpty()) {
getLogger().info("Overriding endpoint with {}", urlstr);
if (urlstr.endsWith(VPCE_ENDPOINT_SUFFIX)) {
// handling vpce endpoints
// falling back to the configured region if the parse fails
// e.g. in case of https://vpce-***-***.sqs.{region}
String region = parseRegionForVPCE(urlstr, this.region.getName());
this.client.setEndpoint(urlstr, this.client.getServiceName(), region);
} else {
// handling non-vpce custom endpoints where the AWS library can parse the region out
// e.g. https://sqs.{region}.***.***.***.gov
Note to developer(s):
When setting an endpoint for an AWS Client i.e. client.setEndpoint(endpointUrl),
AWS Java SDK fails to parse the region correctly when the provided endpoint
is an AWS PrivateLink so this method does the job of parsing the region name and
returning it.
Refer NIFI-5456, NIFI-5893 & NIFI-8662
private String parseRegionForVPCE(String url, String configuredRegion) {
int index = url.length() - VPCE_ENDPOINT_SUFFIX.length();
Matcher matcher = VPCE_ENDPOINT_PATTERN.matcher(url.substring(0, index));
if (matcher.matches()) {
} else {
getLogger().info("Unable to get a match with the VPCE endpoint pattern; using the configured region: " + configuredRegion);
return configuredRegion;
* Create client from the arguments
* @param context process context
* @param credentials static aws credentials
* @param config aws client configuration
* @return ClientType aws client
* @deprecated use {@link AbstractAWSCredentialsProviderProcessor#createClient(ProcessContext, AWSCredentialsProvider, ClientConfiguration)}
protected abstract ClientType createClient(final ProcessContext context, final AWSCredentials credentials, final ClientConfiguration config);
protected ClientType getClient() {
return client;
protected Region getRegion() {
return region;
protected AWSCredentials getCredentials(final ProcessContext context) {
final String accessKey = context.getProperty(ACCESS_KEY).evaluateAttributeExpressions().getValue();
final String secretKey = context.getProperty(SECRET_KEY).evaluateAttributeExpressions().getValue();
final String credentialsFile = context.getProperty(CREDENTIALS_FILE).getValue();
if (credentialsFile != null) {
try {
return new PropertiesCredentials(new File(credentialsFile));
} catch (final IOException ioe) {
throw new ProcessException("Could not read Credentials File", ioe);
if (accessKey != null && secretKey != null) {
return new BasicAWSCredentials(accessKey, secretKey);
return new AnonymousAWSCredentials();
public void onShutdown() {
if ( getClient() != null ) {