blob: 2dc76eb42ae2d7c9b9470727f037d9f56c05e70a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.kafka.connect;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.SslContextDefinition;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;
public class StatelessKafkaConnectorUtil {
private static final String UNKNOWN_VERSION = "<Unable to determine Stateless NiFi Kafka Connector Version>";
private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
private static final Lock unpackNarLock = new ReentrantLock();
static final String NAR_DIRECTORY = "nar.directory";
static final String EXTENSIONS_DIRECTORY = "extensions.directory";
static final String WORKING_DIRECTORY = "working.directory";
static final String FLOW_SNAPSHOT = "flow.snapshot";
static final String KRB5_FILE = "krb5.file";
static final String NEXUS_BASE_URL = "nexus.url";
static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
static final String DATAFLOW_NAME = "name";
static final String TRUSTSTORE_FILE = "security.truststore";
static final String TRUSTSTORE_TYPE = "security.truststoreType";
static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
static final String KEYSTORE_FILE = "security.keystore";
static final String KEYSTORE_TYPE = "security.keystoreType";
static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
static final String KEY_PASSWORD = "security.keyPasswd";
static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";
static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";
static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";
private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");
public static void addCommonConfigElements(final ConfigDef configDef) {
configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
"Specifies the directory that stores the NiFi Archives (NARs)");
configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
"Specifies the directory that stores the extensions that will be downloaded (if any) from the configured Extension Client");
configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
"Specifies the temporary working directory for expanding NiFi Archives (NARs)");
configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
"Specifies the dataflow to run. This may be a file containing the dataflow, a URL that points to a dataflow, or a String containing the entire dataflow as an escaped JSON.");
configDef.define(DATAFLOW_NAME, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, "The name of the dataflow.");
configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
"Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
"Specifies the Base URL of the Nexus instance to source extensions from");
configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
"Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");
configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"The type of the Keystore file. Either JKS or PKCS12.");
configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
"The password for the keystore.");
configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
"The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
"http, not https.");
configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
"The type of the Truststore file. Either JKS or PKCS12.");
configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
"The password for the truststore.");
configDef.define(SENSITIVE_PROPS_KEY, ConfigDef.Type.PASSWORD, DEFAULT_SENSITIVE_PROPS_KEY, ConfigDef.Importance.MEDIUM, "A key that components can use for encrypting and decrypting " +
"sensitive values.");
}
public static String getVersion() {
final File bootstrapJar = detectBootstrapJar();
if (bootstrapJar == null) {
return UNKNOWN_VERSION;
}
try (final JarFile jarFile = new JarFile(bootstrapJar)) {
final Manifest manifest = jarFile.getManifest();
if (manifest != null) {
return manifest.getMainAttributes().getValue("Implementation-Version");
}
} catch (IOException e) {
logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
return UNKNOWN_VERSION;
}
return UNKNOWN_VERSION;
}
public static StatelessDataflow createDataflow(final Map<String, String> properties) {
final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);
final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
final String dataflowName = properties.get(DATAFLOW_NAME);
final DataflowDefinition<?> dataflowDefinition;
final StatelessBootstrap bootstrap;
try {
final Map<String, String> dataflowDefinitionProperties = new HashMap<>();
if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_URL);
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
} else if (configuredFlowSnapshot.trim().startsWith("{")) {
logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_CONTENTS);
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
} else {
logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_FILE);
final File flowSnapshotFile = new File(configuredFlowSnapshot);
dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
}
dataflowDefinitionProperties.put(BOOTSTRAP_FLOW_NAME, dataflowName);
MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));
// Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
// We do this because the bootstrap() method will expand all NAR files into the working directory.
// If we have multiple Connector instances, or multiple tasks, we don't want several threads all
// unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
unpackNarLock.lock();
try {
bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
} finally {
unpackNarLock.unlock();
}
dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
return bootstrap.createDataflow(dataflowDefinition);
} catch (final Exception e) {
throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
}
}
private static List<ParameterOverride> parseParameterOverrides(final Map<String, String> properties) {
final List<ParameterOverride> parameterOverrides = new ArrayList<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String parameterValue = entry.getValue();
ParameterOverride parameterOverride = null;
final Matcher matcher = PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
if (matcher.matches()) {
final String contextName = matcher.group(1);
final String parameterName = matcher.group(2);
parameterOverride = new ParameterOverride(contextName, parameterName, parameterValue);
} else {
final Matcher noContextMatcher = PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
if (noContextMatcher.matches()) {
final String parameterName = noContextMatcher.group(1);
parameterOverride = new ParameterOverride(parameterName, parameterValue);
}
}
if (parameterOverride != null) {
parameterOverrides.add(parameterOverride);
}
}
return parameterOverrides;
}
public static Map<String, String> getLoggableProperties(final Map<String, String> properties) {
final Map<String, String> loggable = new HashMap<>(properties);
loggable.keySet().removeIf(key -> key.startsWith("parameter."));
return loggable;
}
private static StatelessEngineConfiguration createEngineConfiguration(final Map<String, String> properties) {
final File narDirectory;
final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
if (narDirectoryFilename == null) {
narDirectory = detectNarDirectory();
} else {
narDirectory = new File(narDirectoryFilename);
}
final String dataflowName = properties.get(DATAFLOW_NAME);
final File baseWorkingDirectory;
final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY);
if (workingDirectoryFilename == null) {
baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY;
} else {
baseWorkingDirectory = new File(workingDirectoryFilename);
}
final File workingDirectory = new File(baseWorkingDirectory, dataflowName);
final File extensionsDirectory;
final String extensionsDirectoryFilename = properties.get(EXTENSIONS_DIRECTORY);
if (extensionsDirectoryFilename == null) {
extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY;
} else {
extensionsDirectory = new File(extensionsDirectoryFilename);
}
final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties);
final StatelessEngineConfiguration engineConfiguration = new StatelessEngineConfiguration() {
@Override
public File getWorkingDirectory() {
return workingDirectory;
}
@Override
public File getNarDirectory() {
return narDirectory;
}
@Override
public File getExtensionsDirectory() {
return extensionsDirectory;
}
@Override
public File getKrb5File() {
return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE));
}
@Override
public Optional<File> getContentRepositoryDirectory() {
return Optional.empty();
}
@Override
public SslContextDefinition getSslContext() {
return sslContextDefinition;
}
@Override
public String getSensitivePropsKey() {
return properties.getOrDefault(SENSITIVE_PROPS_KEY, DEFAULT_SENSITIVE_PROPS_KEY);
}
@Override
public List<ExtensionClientDefinition> getExtensionClients() {
final List<ExtensionClientDefinition> extensionClientDefinitions = new ArrayList<>();
final String nexusBaseUrl = properties.get(NEXUS_BASE_URL);
if (nexusBaseUrl != null) {
final ExtensionClientDefinition definition = new ExtensionClientDefinition();
definition.setUseSslContext(false);
definition.setExtensionClientType("nexus");
definition.setCommsTimeout("30 secs");
definition.setBaseUrl(nexusBaseUrl);
extensionClientDefinitions.add(definition);
}
return extensionClientDefinitions;
}
};
return engineConfiguration;
}
private static SslContextDefinition createSslContextDefinition(final Map<String, String> properties) {
final String truststoreFile = properties.get(TRUSTSTORE_FILE);
if (truststoreFile == null || truststoreFile.trim().isEmpty()) {
return null;
}
final SslContextDefinition sslContextDefinition;
sslContextDefinition = new SslContextDefinition();
sslContextDefinition.setTruststoreFile(truststoreFile);
sslContextDefinition.setTruststorePass(properties.get(TRUSTSTORE_PASSWORD));
sslContextDefinition.setTruststoreType(properties.get(TRUSTSTORE_TYPE));
final String keystoreFile = properties.get(KEYSTORE_FILE);
if (keystoreFile != null && !keystoreFile.trim().isEmpty()) {
sslContextDefinition.setKeystoreFile(keystoreFile);
sslContextDefinition.setKeystoreType(properties.get(KEYSTORE_TYPE));
final String keystorePass = properties.get(KEYSTORE_PASSWORD);
sslContextDefinition.setKeystorePass(keystorePass);
final String explicitKeyPass = properties.get(KEY_PASSWORD);
final String keyPass = (explicitKeyPass == null || explicitKeyPass.trim().isEmpty()) ? keystorePass : explicitKeyPass;
sslContextDefinition.setKeyPass(keyPass);
}
return sslContextDefinition;
}
private static URLClassLoader getConnectClassLoader() {
final ClassLoader classLoader = StatelessKafkaConnectorUtil.class.getClassLoader();
if (!(classLoader instanceof URLClassLoader)) {
throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
}
return (URLClassLoader) classLoader;
}
private static File detectBootstrapJar() {
final URLClassLoader urlClassLoader = getConnectClassLoader();
for (final URL url : urlClassLoader.getURLs()) {
final String artifactFilename = url.getFile();
if (artifactFilename == null) {
continue;
}
final File artifactFile = new File(artifactFilename);
if (STATELESS_BOOTSTRAP_FILE_PATTERN.matcher(artifactFile.getName()).matches()) {
return artifactFile;
}
}
return null;
}
private static File detectNarDirectory() {
final File bootstrapJar = detectBootstrapJar();
if (bootstrapJar == null) {
final URLClassLoader urlClassLoader = getConnectClassLoader();
logger.error("ClassLoader that loaded Stateless Kafka Connector did not contain nifi-stateless-bootstrap. URLs that were present: {}", Arrays.asList(urlClassLoader.getURLs()));
throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
}
final File narDirectory = bootstrapJar.getParentFile();
logger.info("Detected NAR Directory to be {}", narDirectory.getAbsolutePath());
return narDirectory;
}
}