blob: f93213b57aea61852605191665f0191f309aba9f [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.nifi.stateless.config;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.Call;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.Response;
import okhttp3.ResponseBody;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.registry.client.NiFiRegistryException;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedFlowSnapshotMetadata;
import org.apache.nifi.stateless.core.RegistryUtil;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
import org.apache.nifi.stateless.flow.StandardDataflowDefinition;
import org.apache.nifi.stateless.flow.TransactionThresholds;
import org.apache.nifi.stateless.parameter.EnvironmentVariableParameterProvider;
import org.apache.nifi.stateless.parameter.ParameterOverrideProvider;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class PropertiesFileFlowDefinitionParser implements DataflowDefinitionParser {
private static final Logger logger = LoggerFactory.getLogger(PropertiesFileFlowDefinitionParser.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private static final Pattern PROPERTY_LINE_PATTERN = Pattern.compile("(.*?)(?<!\\\\)=(.*)");
// parameter context pattern starts with "nifi.stateless.parameters." followed by the name of a parameter context.
// After the name of the parameter context, it may or may not have a ".<parameter name>" component, then an equals (=) and a value.
private static final Pattern PARAMETER_CONTEXT_PATTERN = Pattern.compile("\\Qnifi.stateless.parameters.\\E(.*?)(\\..*)?");
private static final Pattern REPORTING_TASK_PATTERN = Pattern.compile("\\Qnifi.stateless.reporting.task.\\E(.*?)\\.(.*)");
private static final String PARAMETER_PROVIDER_PREFIX = "nifi.stateless.parameter.provider.";
private static final Pattern PARAMETER_PROVIDER_PATTERN = Pattern.compile("\\Q" + PARAMETER_PROVIDER_PREFIX + "\\E(.*?)\\.(.*)");
// Any property value of the form env{...} can be used to reference an environment variable. For example, env{ABC} references the ABC environment variable.
private static final Pattern ENV_VARIABLE_PATTERN = Pattern.compile("env\\{(.*)}");
// Property names/keys
private static final String PROPERTIES_PREFIX = "properties.";
private static final String FAILURE_PORTS_KEY = "nifi.stateless.failure.port.names";
private static final String REGISTRY_URL_KEY = "nifi.stateless.registry.url";
private static final String BUCKET_ID_KEY = "nifi.stateless.flow.bucketId";
private static final String FLOW_ID_KEY = "";
private static final String FLOW_VERSION_KEY = "nifi.stateless.flow.version";
private static final String FLOW_SNAPSHOT_FILE_KEY = "nifi.stateless.flow.snapshot.file";
private static final String FLOW_SNAPSHOT_URL_KEY = "nifi.stateless.flow.snapshot.url";
private static final String FLOW_SNAPSHOT_CONTENTS_KEY = "nifi.stateless.flow.snapshot.contents";
private static final String FLOW_SNAPSHOT_URL_USE_SSLCONTEXT_KEY = "nifi.stateless.flow.snapshot.url.use.ssl.context";
private static final String FLOW_NAME = "";
private static final String TRANSACTION_THRESHOLD_FLOWFILES = "nifi.stateless.transaction.thresholds.flowfiles";
private static final String TRANSACTION_THRESHOLD_DATA_SIZE = "nifi.stateless.transaction.thresholds.bytes";
private static final String TRANSACTION_THRESHOLD_TIME = "nifi.stateless.transaction.thresholds.time";
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final File propertiesFile, final StatelessEngineConfiguration engineConfig, final List<ParameterOverride> parameterOverrides)
throws IOException, StatelessConfigurationException {
final Map<String, String> properties = readPropertyValues(propertiesFile);
return parseFlowDefinition(properties, engineConfig, parameterOverrides);
public DataflowDefinition<VersionedFlowSnapshot> parseFlowDefinition(final Map<String, String> properties, final StatelessEngineConfiguration engineConfig,
final List<ParameterOverride> parameterOverrides) throws IOException, StatelessConfigurationException {
// A common problem is users accidentally including whitespace at the beginning or end of property values.
// We can't just blindly trim the white space because it may be relevant. For example, there may be a Parameter
// that has a value of a literal space ( ), such as a character for a delimiter, and we don't want to remove
// that and result in an empty string. So we will log a warning that it may be a problem.
final Set<String> failurePortNames = getFailurePortNames(properties);
final VersionedFlowSnapshot flowSnapshot = fetchVersionedFlowSnapshot(properties, engineConfig.getSslContext());
final List<ParameterContextDefinition> parameterContextDefinitions = getParameterContexts(properties);
final List<ReportingTaskDefinition> reportingTaskDefinitions = getReportingTasks(properties);
final List<ParameterProviderDefinition> parameterProviderDefinitions = getParameterProviders(properties, parameterOverrides);
final TransactionThresholds transactionThresholds = getTransactionThresholds(properties);
final String rootGroupName = flowSnapshot.getFlowContents().getName();
final String flowName = properties.getOrDefault(FLOW_NAME, rootGroupName);
return new StandardDataflowDefinition.Builder()
private List<ReportingTaskDefinition> getReportingTasks(final Map<String, String> properties) {
final Map<String, ReportingTaskDefinition> reportingTaskDefinitions = new LinkedHashMap<>();
for (final String propertyName : properties.keySet()) {
final Matcher matcher = REPORTING_TASK_PATTERN.matcher(propertyName);
if (!matcher.matches()) {
// For a property name like:
// We consider 'abc' the <reporting task key> and 'name' the <relative property name>
final String reportingTaskKey =;
final ReportingTaskDefinition definition = reportingTaskDefinitions.computeIfAbsent(reportingTaskKey, key -> new ReportingTaskDefinition());
final String relativePropertyName =;
final String propertyValue = properties.get(propertyName);
if (relativePropertyName.startsWith("properties.")) {
if (relativePropertyName.length() < 12) {
logger.warn("Encountered unexpected property <" + propertyName + "> in flow definition. This property will be ignored.");
final String reportingTaskPropertyName = relativePropertyName.substring(11);
definition.getPropertyValues().put(reportingTaskPropertyName, propertyValue);
} else {
switch (relativePropertyName) {
case "name":
case "type":
case "frequency":
case "bundle":
logger.warn("Encountered unexpected property <" + propertyName + "> in flow definition. This property will be ignored.");
return new ArrayList<>(reportingTaskDefinitions.values());
private List<ParameterProviderDefinition> getParameterProviders(final Map<String, String> properties, final List<ParameterOverride> parameterOverrides) {
final Map<String, ParameterProviderDefinition> parameterProviderDefinitions = new LinkedHashMap<>();
parameterProviderDefinitions.put("Default Parameter Override Provider", createParameterOverrideProvider(parameterOverrides));
parameterProviderDefinitions.put("Default Environment Variable Provider", createEnvironmentVariableProvider());
for (final String propertyName : properties.keySet()) {
final Matcher matcher = PARAMETER_PROVIDER_PATTERN.matcher(propertyName);
if (!matcher.matches()) {
// For a property name like:
// We consider 'abc' the <parameter provider key> and 'name' the <relative property name>
final String parameterProviderKey =;
final ParameterProviderDefinition definition = parameterProviderDefinitions.computeIfAbsent(parameterProviderKey, key -> new ParameterProviderDefinition());
final String relativePropertyName =;
final String propertyValue = properties.get(propertyName);
if (relativePropertyName.startsWith(PROPERTIES_PREFIX)) {
if (relativePropertyName.length() <= PROPERTIES_PREFIX.length()) {
logger.warn("Encountered unexpected property <" + propertyName + "> in flow definition. This property will be ignored.");
final String providerPropertyName = relativePropertyName.substring(PROPERTIES_PREFIX.length());
definition.getPropertyValues().put(providerPropertyName, propertyValue);
} else {
switch (relativePropertyName) {
case "name":
case "type":
case "bundle":
logger.warn("Encountered unexpected property <" + propertyName + "> in flow definition. This property will be ignored.");
// Validate that all providers have the required necessary information
for (final Map.Entry<String, ParameterProviderDefinition> entry : parameterProviderDefinitions.entrySet()) {
final String providerKey = entry.getKey();
final ParameterProviderDefinition definition = entry.getValue();
if (definition.getName() == null) {
logger.warn("Parameter Provider identified in Properties with key <" + providerKey + "> was not provided a name. Will default name to <" + providerKey + ">");
if (definition.getType() == null) {
throw new IllegalArgumentException("Parameter Provider <" + definition.getName() + "> does not have a Type set. This must be set by adding a property named " +
PARAMETER_PROVIDER_PREFIX + providerKey + ".type");
return new ArrayList<>(parameterProviderDefinitions.values());
private ParameterProviderDefinition createEnvironmentVariableProvider() {
final ParameterProviderDefinition overrideProvider = new ParameterProviderDefinition();
overrideProvider.setName("Environment Variable Parameter Provider");
return overrideProvider;
private ParameterProviderDefinition createParameterOverrideProvider(final List<ParameterOverride> parameterOverrides) {
final ParameterProviderDefinition overrideProvider = new ParameterProviderDefinition();
overrideProvider.setName("Parameter Override Provider");
final Map<String, String> propertyValues = new LinkedHashMap<>();
for (final ParameterOverride override : parameterOverrides) {
final String contextName = override.getContextName();
final String parameterName = override.getParameterName();
final String propertyName = contextName == null ? parameterName : contextName + ":" + parameterName;
propertyValues.put(propertyName, override.getParameterValue());
return overrideProvider;
private List<ParameterContextDefinition> getParameterContexts(final Map<String, String> properties) {
final Map<String, ParameterContextDefinition> contextDefinitions = new LinkedHashMap<>();
for (final String propertyName : properties.keySet()) {
final Matcher matcher = PARAMETER_CONTEXT_PATTERN.matcher(propertyName);
if (!matcher.matches()) {
final String parameterContextKey =;
final ParameterContextDefinition paramContext = contextDefinitions.computeIfAbsent(parameterContextKey, key -> new ParameterContextDefinition());
String parameterName =;
final String value = properties.get(propertyName);
if (parameterName == null) {
// If no parameter name is given, then the value is the name of the Parameter context itself.
// Ensure that the name is set
if (paramContext.getName() == null) {
// If properties file contains a parameter with no name, just ignore it. However, the parameter name currently has
// a . at the beginning because capturing group 2 captures .<parameter name> so remove the .
if (parameterName.equals(".")) {
} else {
parameterName = parameterName.substring(1);
List<ParameterDefinition> parameterDefinitions = paramContext.getParameters();
if (parameterDefinitions == null) {
parameterDefinitions = new ArrayList<>();
final ParameterDefinition parameter = new ParameterDefinition();
return new ArrayList<>(contextDefinitions.values());
private TransactionThresholds getTransactionThresholds(final Map<String, String> properties) {
final Long flowfileThreshold = getLongProperty(properties, TRANSACTION_THRESHOLD_FLOWFILES);
final Double dataSizeThreshold = getDataSizeProperty(properties, TRANSACTION_THRESHOLD_DATA_SIZE, DataUnit.B);
final Double timeThreshold = getTimePeriodProperty(properties, TRANSACTION_THRESHOLD_TIME, TimeUnit.NANOSECONDS);
final OptionalLong maxFlowFiles = flowfileThreshold == null ? OptionalLong.empty() : OptionalLong.of(flowfileThreshold);
final OptionalLong maxBytes = dataSizeThreshold == null ? OptionalLong.empty() : OptionalLong.of(dataSizeThreshold.longValue());
final OptionalLong maxNanos = timeThreshold == null ? OptionalLong.empty() : OptionalLong.of(timeThreshold.longValue());
return new TransactionThresholds() {
public OptionalLong getMaxFlowFiles() {
return maxFlowFiles;
public OptionalLong getMaxContentSize(final DataUnit dataUnit) {
return maxBytes.isPresent() ? OptionalLong.of((long) dataUnit.convert(maxBytes.getAsLong(), DataUnit.B)) : OptionalLong.empty();
public OptionalLong getMaxTime(final TimeUnit timeUnit) {
return maxNanos.isPresent() ? OptionalLong.of(timeUnit.convert(maxNanos.getAsLong(), TimeUnit.NANOSECONDS)) : OptionalLong.empty();
private String getTrimmedProperty(final Map<String, String> properties, final String propertyName) {
final String propertyValue = properties.get(propertyName);
return (propertyValue == null || propertyValue.trim().isEmpty()) ? null : propertyValue.trim();
private Long getLongProperty(final Map<String, String> properties, final String propertyName) {
final String propertyValue = getTrimmedProperty(properties, propertyName);
try {
return propertyValue == null ? null : Long.parseLong(propertyValue);
} catch (final NumberFormatException nfe) {
throw new IllegalArgumentException("Configured property <" + propertyName + "> has a value that is not a valid 64-bit integer");
private Double getDataSizeProperty(final Map<String, String> properties, final String propertyName, final DataUnit dataUnit) {
final String propertyValue = getTrimmedProperty(properties, propertyName);
try {
return propertyValue == null ? null : DataUnit.parseDataSize(propertyValue, dataUnit);
} catch (final Exception e) {
throw new IllegalArgumentException("Configured property <" + propertyName + "> has a value that is not a valid data size");
private Double getTimePeriodProperty(final Map<String, String> properties, final String propertyName, final TimeUnit timeUnit) {
final String propertyValue = getTrimmedProperty(properties, propertyName);
try {
return propertyValue == null ? null : FormatUtils.getPreciseTimeDuration(propertyValue, timeUnit);
} catch (final Exception e) {
throw new IllegalArgumentException("Configured property <" + propertyName + "> has a value that is not a valid time period");
private Set<String> getFailurePortNames(final Map<String, String> properties) {
final Set<String> failurePortNames = new HashSet<>();
for (final String portName : properties.getOrDefault(FAILURE_PORTS_KEY, "").split(",")) {
return failurePortNames;
private void warnOnWhitespace(final Map<String, String> properties) {
properties.forEach((key, value) -> {
if (value == null) {
if (!key.trim().equals(key)) {
logger.warn("Found property with name <{}>. This property name contains leading or trailing white space, which may not be intended.", key);
if (!value.trim().equals(value) && !value.trim().isEmpty()) {
// If value consists only of white space, don't worry about it. But if the value consists of non-white-space characters but has leading or trailing white space
// it may be cause for concern.
logger.warn("Found property with name <{}> and value <{}>. This property value contains leading or trailing white space, which may not be intended.", key, value);
* Read the given properties file into a Map&lt;String, String&gt;. We are parsing this ourselves, rather than using Properties.load()
* because Properties.load() does not allow for property names to contain spaces unless they are espcaed using a backslash (\).
* This makes sense for many use cases, but for our use case, the a very common use case will be for the properties file to contain Parameters
* whose names have spaces. Expecting users to manually escape that will be very frustrating and error-prone.
* Another option would be to avoid using Properties and instead use JSON, YAML, or the like. However, these have their downsides, as well.
* JSON is easy to read but can be frustrating to write correctly when writing it manually. YAML will likely be easier, but it is less well known
* than either properties or JSON.
* The final consideration is that we want to remain consistent with the configuration format. We don't want to take in a properties file for the
* engine configuration and a JSON/YAML/XML/etc. configuration for the flow configuration. And since NiFi is already based on properties file and
* that seems to be the easiest to manually read/write, that's what we are going with here.
* @param propertiesFile the properties file to parse
* @return a Map of Property Name to Property Values.
* @throws IOException if unable to read from the file.
private Map<String, String> readPropertyValues(final File propertiesFile) throws IOException {
final Map<String, String> properties = new LinkedHashMap<>();
try (final InputStream in = new FileInputStream(propertiesFile);
final Reader inReader = new InputStreamReader(in);
final BufferedReader reader = new BufferedReader(inReader)) {
String line;
while ((line = reader.readLine()) != null) {
final String trimmed = line.trim();
if (trimmed.startsWith("#") || trimmed.startsWith("!")) {
// Line is a comment
if (trimmed.isEmpty()) {
// Empty line.
final Matcher matcher = PROPERTY_LINE_PATTERN.matcher(line);
if (!matcher.matches()) {
// Invalid line.
logger.warn("Encountered line in properties file {} that is not a comment, not blank, and does not adhere to the format of <propertyName>=<propertyValue>: {}",
propertiesFile.getAbsolutePath(), line);
final String propertyName =;
final String propertyValue = substituteEnvironmentVariables(;
properties.put(propertyName, propertyValue);
return properties;
private String substituteEnvironmentVariables(final String propertyValue) {
final Matcher matcher = ENV_VARIABLE_PATTERN.matcher(propertyValue);
if (!matcher.matches()) {
return propertyValue;
final String envVariable =;
final String envValue = System.getenv(envVariable);
return envValue == null ? "" : envValue;
private VersionedFlowSnapshot fetchVersionedFlowSnapshot(final Map<String, String> properties, final SslContextDefinition sslContextDefinition)
throws IOException, StatelessConfigurationException {
final String flowSnapshotFilename = properties.get(FLOW_SNAPSHOT_FILE_KEY);
if (flowSnapshotFilename != null && !flowSnapshotFilename.trim().isEmpty()) {
final File flowSnapshotFile = new File(flowSnapshotFilename.trim());
try {
return readVersionedFlowSnapshot(flowSnapshotFile);
} catch (final Exception e) {
throw new IOException("Configuration indicates that the flow to run is located at " + flowSnapshotFilename
+ " but failed to load dataflow from that location", e);
final String flowSnapshotUrl = properties.get(FLOW_SNAPSHOT_URL_KEY);
if (flowSnapshotUrl != null && !flowSnapshotUrl.trim().isEmpty()) {
final String useSslPropertyValue = properties.get(FLOW_SNAPSHOT_URL_USE_SSLCONTEXT_KEY);
final boolean useSsl = Boolean.parseBoolean(useSslPropertyValue);
try {
return fetchFlowFromUrl(flowSnapshotUrl, useSsl ? sslContextDefinition : null);
} catch (final Exception e) {
throw new StatelessConfigurationException("Could not fetch flow from URL", e);
final String flowContents = properties.get(FLOW_SNAPSHOT_CONTENTS_KEY);
if (flowContents != null && !flowContents.trim().isEmpty()) {
final byte[] flowContentsBytes = flowContents.getBytes(StandardCharsets.UTF_8);
try (final InputStream in = new ByteArrayInputStream(flowContentsBytes)) {
return readVersionedFlowSnapshot(in);
} catch (final Exception e) {
throw new IOException("Configuration includes escaped JSON contents but failed to parse the dataflow", e);
// Try downloading flow from registry
final String registryUrl = properties.get(REGISTRY_URL_KEY);
final String bucketId = properties.get(BUCKET_ID_KEY);
final String flowId = properties.get(FLOW_ID_KEY);
final String flowVersionValue = properties.get(FLOW_VERSION_KEY);
final Integer flowVersion;
try {
flowVersion = isEmpty(flowVersionValue) ? null : Integer.parseInt(flowVersionValue);
} catch (final NumberFormatException nfe) {
throw new StatelessConfigurationException("The " + FLOW_VERSION_KEY + " property was expected to contain a number but had a value of " + flowVersionValue);
if (isEmpty(registryUrl) || isEmpty(bucketId) || isEmpty(flowId)) {
throw new IllegalArgumentException("Configuration does not provide the filename of the flow to run; a URL to fetch it from; the dataflow contents; " +
"or the registryUrl, bucketId, and flowId.");
try {
final SSLContext sslContext = SslConfigurationUtil.createSslContext(sslContextDefinition);
return fetchFlowFromRegistry(registryUrl, bucketId, flowId, flowVersion, sslContext);
} catch (final NiFiRegistryException e) {
throw new StatelessConfigurationException("Could not fetch flow from Registry", e);
private boolean isEmpty(final String value) {
return value == null || value.trim().isEmpty();
private VersionedFlowSnapshot fetchFlowFromUrl(final String url, final SslContextDefinition sslContextDefinition) throws IOException {
final OkHttpClient.Builder clientBuilder = new OkHttpClient.Builder()
.callTimeout(30, TimeUnit.SECONDS);
if (sslContextDefinition != null) {
final TlsConfiguration tlsConfiguration = SslConfigurationUtil.createTlsConfiguration(sslContextDefinition);
try {
final X509TrustManager trustManager = SslContextFactory.getX509TrustManager(tlsConfiguration);
final SSLContext sslContext = SslContextFactory.createSslContext(tlsConfiguration);
clientBuilder.sslSocketFactory(sslContext.getSocketFactory(), trustManager);
} catch (final TlsException e) {
throw new IllegalArgumentException("TLS Configuration Failed: Check SSL Context Properties", e);
final OkHttpClient client =;
final Request getRequest = new Request.Builder()
final Call call = client.newCall(getRequest);
try (final Response response = call.execute()) {
final ResponseBody responseBody = response.body();
if (!response.isSuccessful()) {
final String responseText = responseBody == null ? "<No Message Received from Server>" : responseBody.string();
throw new IOException("Failed to download flow from URL " + url + ": Response was " + response.code() + ": " + responseText);
if (responseBody == null) {
throw new IOException("Failed to download flow from URL " + url + ": Received successful response code " + response.code() + " but no Response body");
try {
final VersionedFlowSnapshot snapshot = OBJECT_MAPPER.readValue(responseBody.bytes(), VersionedFlowSnapshot.class);
return snapshot;
} catch (final Exception e) {
throw new IOException("Downloaded flow from " + url + " but failed to parse the contents as a Versioned Flow. Please verify that the correct URL was provided.", e);
private VersionedFlowSnapshot fetchFlowFromRegistry(final String registryUrl, final String bucketId, final String flowId, final Integer flowVersion,
final SSLContext sslContext) throws IOException, NiFiRegistryException {"Fetching flow from NiFi Registry at {}", registryUrl);
final long start = System.currentTimeMillis();
final RegistryUtil registryUtil = new RegistryUtil(registryUrl, sslContext);
final VersionedFlowSnapshot snapshot = registryUtil.getFlowByID(bucketId, flowId, flowVersion == null ? -1 : flowVersion);
final long millis = System.currentTimeMillis() - start;"Successfully fetched flow from NiFi Registry in {} millis", millis);
return snapshot;
private VersionedFlowSnapshot readVersionedFlowSnapshot(final File snapshotFile) throws IOException {
try (final InputStream fis = new FileInputStream(snapshotFile)) {
return readVersionedFlowSnapshot(fis);
private VersionedFlowSnapshot readVersionedFlowSnapshot(final InputStream in) throws IOException {
final ObjectMapper objectMapper = new ObjectMapper();
final VersionedFlowSnapshot snapshot = objectMapper.readValue(in, VersionedFlowSnapshot.class);
final VersionedFlowSnapshotMetadata metadata = new VersionedFlowSnapshotMetadata();
return snapshot;