/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nifi.kafka.connect;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.nifi.kafka.connect.validators.ConnectDirectoryExistsValidator;
import org.apache.nifi.kafka.connect.validators.ConnectHttpUrlValidator;
import org.apache.nifi.kafka.connect.validators.FlowSnapshotValidator;
import org.apache.nifi.stateless.bootstrap.StatelessBootstrap;
import org.apache.nifi.stateless.config.ExtensionClientDefinition;
import org.apache.nifi.stateless.config.ParameterOverride;
import org.apache.nifi.stateless.config.SslContextDefinition;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.jar.JarFile;
import java.util.jar.Manifest;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.kafka.common.config.ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars;

public class StatelessKafkaConnectorUtil {
    private static final String UNKNOWN_VERSION = "<Unable to determine Stateless NiFi Kafka Connector Version>";
    private static final Logger logger = LoggerFactory.getLogger(StatelessKafkaConnectorUtil.class);
    private static final Lock unpackNarLock = new ReentrantLock();

    static final String NAR_DIRECTORY = "nar.directory";
    static final String EXTENSIONS_DIRECTORY = "extensions.directory";
    static final String WORKING_DIRECTORY = "working.directory";
    static final String FLOW_SNAPSHOT = "flow.snapshot";
    static final String KRB5_FILE = "krb5.file";
    static final String NEXUS_BASE_URL = "nexus.url";
    static final String DATAFLOW_TIMEOUT = "dataflow.timeout";
    static final String DATAFLOW_NAME = "name";

    static final String TRUSTSTORE_FILE = "security.truststore";
    static final String TRUSTSTORE_TYPE = "security.truststoreType";
    static final String TRUSTSTORE_PASSWORD = "security.truststorePasswd";
    static final String KEYSTORE_FILE = "security.keystore";
    static final String KEYSTORE_TYPE = "security.keystoreType";
    static final String KEYSTORE_PASSWORD = "security.keystorePasswd";
    static final String KEY_PASSWORD = "security.keyPasswd";
    static final String SENSITIVE_PROPS_KEY = "sensitive.props.key";

    static final String BOOTSTRAP_SNAPSHOT_URL = "nifi.stateless.flow.snapshot.url";
    static final String BOOTSTRAP_SNAPSHOT_FILE = "nifi.stateless.flow.snapshot.file";
    static final String BOOTSTRAP_SNAPSHOT_CONTENTS = "nifi.stateless.flow.snapshot.contents";
    static final String BOOTSTRAP_FLOW_NAME = "nifi.stateless.flow.name";

    static final String DEFAULT_KRB5_FILE = "/etc/krb5.conf";
    static final String DEFAULT_DATAFLOW_TIMEOUT = "60 sec";
    static final File DEFAULT_WORKING_DIRECTORY = new File("/tmp/nifi-stateless-working");
    static final File DEFAULT_EXTENSIONS_DIRECTORY = new File("/tmp/nifi-stateless-extensions");
    static final String DEFAULT_SENSITIVE_PROPS_KEY = "nifi-stateless";

    private static final Pattern STATELESS_BOOTSTRAP_FILE_PATTERN = Pattern.compile("nifi-stateless-bootstrap-(.*).jar");
    private static final Pattern PARAMETER_WITH_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*?):(.*)");
    private static final Pattern PARAMETER_WITHOUT_CONTEXT_PATTERN = Pattern.compile("parameter\\.(.*)");

    public static void addCommonConfigElements(final ConfigDef configDef) {
        configDef.define(NAR_DIRECTORY, ConfigDef.Type.STRING, null, new ConnectDirectoryExistsValidator(), ConfigDef.Importance.HIGH,
            "Specifies the directory that stores the NiFi Archives (NARs)");
        configDef.define(EXTENSIONS_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
            "Specifies the directory that stores the extensions that will be downloaded (if any) from the configured Extension Client");
        configDef.define(WORKING_DIRECTORY, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH,
            "Specifies the temporary working directory for expanding NiFi Archives (NARs)");
        configDef.define(FLOW_SNAPSHOT, ConfigDef.Type.STRING, null, new FlowSnapshotValidator(), ConfigDef.Importance.HIGH,
            "Specifies the dataflow to run. This may be a file containing the dataflow, a URL that points to a dataflow, or a String containing the entire dataflow as an escaped JSON.");
        configDef.define(DATAFLOW_NAME, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, "The name of the dataflow.");

        configDef.define(StatelessKafkaConnectorUtil.KRB5_FILE, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_KRB5_FILE, ConfigDef.Importance.MEDIUM,
            "Specifies the krb5.conf file to use if connecting to Kerberos-enabled services");
        configDef.define(StatelessKafkaConnectorUtil.NEXUS_BASE_URL, ConfigDef.Type.STRING, null, new ConnectHttpUrlValidator(), ConfigDef.Importance.MEDIUM,
            "Specifies the Base URL of the Nexus instance to source extensions from");

        configDef.define(StatelessKafkaConnectorUtil.DATAFLOW_TIMEOUT, ConfigDef.Type.STRING, StatelessKafkaConnectorUtil.DEFAULT_DATAFLOW_TIMEOUT, ConfigDef.Importance.MEDIUM,
            "Specifies the amount of time to wait for the dataflow to finish processing input before considering the dataflow a failure");

        configDef.define(KEYSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
            "Filename of the keystore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications.");
        configDef.define(KEYSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
            "The type of the Keystore file. Either JKS or PKCS12.");
        configDef.define(KEYSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
            "The password for the keystore.");
        configDef.define(KEY_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
            "The password for the key in the keystore. If not provided, the password is assumed to be the same as the keystore password.");
        configDef.define(TRUSTSTORE_FILE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
            "Filename of the truststore that Stateless NiFi should use for connecting to NiFi Registry and for Site-to-Site communications. If not specified, communications will occur only over " +
                "http, not https.");
        configDef.define(TRUSTSTORE_TYPE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM,
            "The type of the Truststore file. Either JKS or PKCS12.");
        configDef.define(TRUSTSTORE_PASSWORD, ConfigDef.Type.PASSWORD, null, ConfigDef.Importance.MEDIUM,
            "The password for the truststore.");
        configDef.define(SENSITIVE_PROPS_KEY, ConfigDef.Type.PASSWORD, DEFAULT_SENSITIVE_PROPS_KEY, ConfigDef.Importance.MEDIUM, "A key that components can use for encrypting and decrypting " +
            "sensitive values.");
    }

    public static String getVersion() {
        final File bootstrapJar = detectBootstrapJar();
        if (bootstrapJar == null) {
            return UNKNOWN_VERSION;
        }

        try (final JarFile jarFile = new JarFile(bootstrapJar)) {
            final Manifest manifest = jarFile.getManifest();
            if (manifest != null) {
                return manifest.getMainAttributes().getValue("Implementation-Version");
            }
        } catch (IOException e) {
            logger.warn("Could not determine Version of NiFi Stateless Kafka Connector", e);
            return UNKNOWN_VERSION;
        }

        return UNKNOWN_VERSION;
    }

    public static StatelessDataflow createDataflow(final Map<String, String> properties) {
        final StatelessEngineConfiguration engineConfiguration = createEngineConfiguration(properties);
        final String configuredFlowSnapshot = properties.get(FLOW_SNAPSHOT);

        final List<ParameterOverride> parameterOverrides = parseParameterOverrides(properties);
        final String dataflowName = properties.get(DATAFLOW_NAME);

        final DataflowDefinition<?> dataflowDefinition;
        final StatelessBootstrap bootstrap;
        try {
            final Map<String, String> dataflowDefinitionProperties = new HashMap<>();

            if (configuredFlowSnapshot.startsWith("http://") || configuredFlowSnapshot.startsWith("https://")) {
                logger.debug("Configured Flow Snapshot appears to be a URL. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_URL);
                dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_URL, configuredFlowSnapshot);
            } else if (configuredFlowSnapshot.trim().startsWith("{")) {
                logger.debug("Configured Flow Snapshot appears to be JSON. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_CONTENTS);
                dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_CONTENTS, configuredFlowSnapshot);
            } else {
                logger.debug("Configured Flow Snapshot appears to be a File. Will use {} property to configured Stateless NiFi", BOOTSTRAP_SNAPSHOT_FILE);
                final File flowSnapshotFile = new File(configuredFlowSnapshot);
                dataflowDefinitionProperties.put(BOOTSTRAP_SNAPSHOT_FILE, flowSnapshotFile.getAbsolutePath());
            }

            dataflowDefinitionProperties.put(BOOTSTRAP_FLOW_NAME, dataflowName);

            MDC.setContextMap(Collections.singletonMap("dataflow", dataflowName));

            // Use a Write Lock to ensure that only a single thread is calling StatelessBootstrap.bootstrap().
            // We do this because the bootstrap() method will expand all NAR files into the working directory.
            // If we have multiple Connector instances, or multiple tasks, we don't want several threads all
            // unpacking NARs at the same time, as it could potentially result in the working directory becoming corrupted.
            unpackNarLock.lock();
            try {
                bootstrap = StatelessBootstrap.bootstrap(engineConfiguration, StatelessNiFiSourceTask.class.getClassLoader());
            } finally {
                unpackNarLock.unlock();
            }

            dataflowDefinition = bootstrap.parseDataflowDefinition(dataflowDefinitionProperties, parameterOverrides);
            return bootstrap.createDataflow(dataflowDefinition);
        } catch (final Exception e) {
            throw new RuntimeException("Failed to bootstrap Stateless NiFi Engine", e);
        }
    }

    private static List<ParameterOverride> parseParameterOverrides(final Map<String, String> properties) {
        final List<ParameterOverride> parameterOverrides = new ArrayList<>();

        for (final Map.Entry<String, String> entry : properties.entrySet()) {
            final String parameterValue = entry.getValue();

            ParameterOverride parameterOverride = null;
            final Matcher matcher = PARAMETER_WITH_CONTEXT_PATTERN.matcher(entry.getKey());
            if (matcher.matches()) {
                final String contextName = matcher.group(1);
                final String parameterName = matcher.group(2);
                parameterOverride = new ParameterOverride(contextName, parameterName, parameterValue);
            } else {
                final Matcher noContextMatcher = PARAMETER_WITHOUT_CONTEXT_PATTERN.matcher(entry.getKey());
                if (noContextMatcher.matches()) {
                    final String parameterName = noContextMatcher.group(1);
                    parameterOverride = new ParameterOverride(parameterName, parameterValue);
                }
            }

            if (parameterOverride != null) {
                parameterOverrides.add(parameterOverride);
            }
        }

        return parameterOverrides;
    }

    public static Map<String, String> getLoggableProperties(final Map<String, String> properties) {
        final Map<String, String> loggable = new HashMap<>(properties);
        loggable.keySet().removeIf(key -> key.startsWith("parameter."));
        return loggable;
    }

    private static StatelessEngineConfiguration createEngineConfiguration(final Map<String, String> properties) {
        final File narDirectory;
        final String narDirectoryFilename = properties.get(NAR_DIRECTORY);
        if (narDirectoryFilename == null) {
            narDirectory = detectNarDirectory();
        } else {
            narDirectory = new File(narDirectoryFilename);
        }

        final String dataflowName = properties.get(DATAFLOW_NAME);

        final File baseWorkingDirectory;
        final String workingDirectoryFilename = properties.get(WORKING_DIRECTORY);
        if (workingDirectoryFilename == null) {
            baseWorkingDirectory = DEFAULT_WORKING_DIRECTORY;
        } else {
            baseWorkingDirectory = new File(workingDirectoryFilename);
        }
        final File workingDirectory = new File(baseWorkingDirectory, dataflowName);

        final File extensionsDirectory;
        final String extensionsDirectoryFilename = properties.get(EXTENSIONS_DIRECTORY);
        if (extensionsDirectoryFilename == null) {
            extensionsDirectory = DEFAULT_EXTENSIONS_DIRECTORY;
        } else {
            extensionsDirectory = new File(extensionsDirectoryFilename);
        }

        final SslContextDefinition sslContextDefinition = createSslContextDefinition(properties);

        final StatelessEngineConfiguration engineConfiguration = new StatelessEngineConfiguration() {
            @Override
            public File getWorkingDirectory() {
                return workingDirectory;
            }

            @Override
            public File getNarDirectory() {
                return narDirectory;
            }

            @Override
            public File getExtensionsDirectory() {
                return extensionsDirectory;
            }

            @Override
            public File getKrb5File() {
                return new File(properties.getOrDefault(KRB5_FILE, DEFAULT_KRB5_FILE));
            }

            @Override
            public Optional<File> getContentRepositoryDirectory() {
                return Optional.empty();
            }

            @Override
            public SslContextDefinition getSslContext() {
                return sslContextDefinition;
            }

            @Override
            public String getSensitivePropsKey() {
                return properties.getOrDefault(SENSITIVE_PROPS_KEY, DEFAULT_SENSITIVE_PROPS_KEY);
            }

            @Override
            public List<ExtensionClientDefinition> getExtensionClients() {
                final List<ExtensionClientDefinition> extensionClientDefinitions = new ArrayList<>();

                final String nexusBaseUrl = properties.get(NEXUS_BASE_URL);
                if (nexusBaseUrl != null) {
                    final ExtensionClientDefinition definition = new ExtensionClientDefinition();
                    definition.setUseSslContext(false);
                    definition.setExtensionClientType("nexus");
                    definition.setCommsTimeout("30 secs");
                    definition.setBaseUrl(nexusBaseUrl);
                    extensionClientDefinitions.add(definition);
                }

                return extensionClientDefinitions;
            }
        };

        return engineConfiguration;
    }

    private static SslContextDefinition createSslContextDefinition(final Map<String, String> properties) {
        final String truststoreFile = properties.get(TRUSTSTORE_FILE);
        if (truststoreFile == null || truststoreFile.trim().isEmpty()) {
            return null;
        }

        final SslContextDefinition sslContextDefinition;
        sslContextDefinition = new SslContextDefinition();
        sslContextDefinition.setTruststoreFile(truststoreFile);
        sslContextDefinition.setTruststorePass(properties.get(TRUSTSTORE_PASSWORD));
        sslContextDefinition.setTruststoreType(properties.get(TRUSTSTORE_TYPE));

        final String keystoreFile = properties.get(KEYSTORE_FILE);
        if (keystoreFile != null && !keystoreFile.trim().isEmpty()) {
            sslContextDefinition.setKeystoreFile(keystoreFile);
            sslContextDefinition.setKeystoreType(properties.get(KEYSTORE_TYPE));

            final String keystorePass = properties.get(KEYSTORE_PASSWORD);
            sslContextDefinition.setKeystorePass(keystorePass);

            final String explicitKeyPass = properties.get(KEY_PASSWORD);
            final String keyPass = (explicitKeyPass == null || explicitKeyPass.trim().isEmpty()) ? keystorePass : explicitKeyPass;
            sslContextDefinition.setKeyPass(keyPass);
        }

        return sslContextDefinition;
    }

    private static URLClassLoader getConnectClassLoader() {
        final ClassLoader classLoader = StatelessKafkaConnectorUtil.class.getClassLoader();
        if (!(classLoader instanceof URLClassLoader)) {
            throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
        }

        return (URLClassLoader) classLoader;
    }

    private static File detectBootstrapJar() {
        final URLClassLoader urlClassLoader = getConnectClassLoader();
        for (final URL url : urlClassLoader.getURLs()) {
            final String artifactFilename = url.getFile();
            if (artifactFilename == null) {
                continue;
            }

            final File artifactFile = new File(artifactFilename);
            if (STATELESS_BOOTSTRAP_FILE_PATTERN.matcher(artifactFile.getName()).matches()) {
                return artifactFile;
            }
        }

        return null;
    }

    private static File detectNarDirectory() {
        final File bootstrapJar = detectBootstrapJar();
        if (bootstrapJar == null) {
            final URLClassLoader urlClassLoader = getConnectClassLoader();
            logger.error("ClassLoader that loaded Stateless Kafka Connector did not contain nifi-stateless-bootstrap. URLs that were present: {}", Arrays.asList(urlClassLoader.getURLs()));
            throw new IllegalStateException("No configuration value was set for the " + NAR_DIRECTORY + " configuration property, and was unable to determine the NAR directory automatically");
        }

        final File narDirectory = bootstrapJar.getParentFile();
        logger.info("Detected NAR Directory to be {}", narDirectory.getAbsolutePath());
        return narDirectory;
    }
}
