/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.nifi.stateless.bootstrap;

import org.apache.nifi.bundle.Bundle;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.nar.NarClassLoaders;
import org.apache.nifi.nar.NarUnpacker;
import org.apache.nifi.nar.SystemBundle;
import org.apache.nifi.stateless.config.ParameterProvider;
import org.apache.nifi.stateless.config.StatelessConfigurationException;
import org.apache.nifi.stateless.engine.NarUnpackLock;
import org.apache.nifi.stateless.engine.StatelessEngineConfiguration;
import org.apache.nifi.stateless.flow.DataflowDefinition;
import org.apache.nifi.stateless.flow.DataflowDefinitionParser;
import org.apache.nifi.stateless.flow.StatelessDataflow;
import org.apache.nifi.stateless.flow.StatelessDataflowFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.ServiceLoader;
import java.util.function.Predicate;
import java.util.regex.Pattern;

public class StatelessBootstrap {
    private static final Logger logger = LoggerFactory.getLogger(StatelessBootstrap.class);
    private static final Pattern STATELESS_NAR_PATTERN = Pattern.compile("nifi-stateless-nar-.*\\.nar-unpacked");
    private static final String NIFI_GROUP = "org.apache.nifi";
    private static final String NIFI_STATELESS_ARTIFACT_ID = "nifi-stateless-nar";
    private static final String NIFI_JETTY_ARTIFACT_ID = "nifi-jetty-bundle";
    private final ClassLoader statelessClassLoader;
    private final StatelessEngineConfiguration engineConfiguration;

    private StatelessBootstrap(final ClassLoader statelessClassLoader, final StatelessEngineConfiguration engineConfiguration) {
        this.statelessClassLoader = statelessClassLoader;
        this.engineConfiguration = engineConfiguration;
    }

    public <T> StatelessDataflow createDataflow(final DataflowDefinition<T> dataflowDefinition, final ParameterProvider parameterProvider)
                throws IOException, StatelessConfigurationException {
        final StatelessDataflowFactory<T> dataflowFactory = getSingleInstance(statelessClassLoader, StatelessDataflowFactory.class);
        final StatelessDataflow dataflow = dataflowFactory.createDataflow(engineConfiguration, dataflowDefinition, parameterProvider);
        return dataflow;
    }

    public DataflowDefinition<?> parseDataflowDefinition(final File flowDefinitionFile) throws StatelessConfigurationException, IOException {
        final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
        final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionFile, engineConfiguration);
        return dataflowDefinition;
    }

    public DataflowDefinition<?> parseDataflowDefinition(final Map<String, String> flowDefinitionProperties) throws StatelessConfigurationException, IOException {
        final DataflowDefinitionParser dataflowDefinitionParser = getSingleInstance(statelessClassLoader, DataflowDefinitionParser.class);
        final DataflowDefinition<?> dataflowDefinition = dataflowDefinitionParser.parseFlowDefinition(flowDefinitionProperties, engineConfiguration);
        return dataflowDefinition;
    }

    public static StatelessBootstrap bootstrap(final StatelessEngineConfiguration engineConfiguration) throws IOException {
        return bootstrap(engineConfiguration, ClassLoader.getSystemClassLoader());
    }

    public static StatelessBootstrap bootstrap(final StatelessEngineConfiguration engineConfiguration, final ClassLoader rootClassLoader) throws IOException {
        final File narDirectory = engineConfiguration.getNarDirectory();
        final File workingDirectory = engineConfiguration.getWorkingDirectory();

        // Ensure working directory exists, creating it if necessary
        if (!workingDirectory.exists() && !workingDirectory.mkdirs()) {
            throw new IOException("Working Directory " + workingDirectory + " does not exist and could not be created");
        }

        final Bundle systemBundle = SystemBundle.create(narDirectory.getAbsolutePath(), ClassLoader.getSystemClassLoader());
        final File frameworkWorkingDir = new File(workingDirectory, "nifi-framework");
        final File extensionsWorkingDir = new File(workingDirectory, "extensions");
        final List<Path> narDirectories = Collections.singletonList(narDirectory.toPath());

        // Unpack NARs
        final long unpackStart = System.currentTimeMillis();
        final Predicate<BundleCoordinate> narFilter = coordinate -> true;
        NarUnpackLock.lock();
        try {
            NarUnpacker.unpackNars(systemBundle, frameworkWorkingDir, extensionsWorkingDir, null, narDirectories, false, NarClassLoaders.FRAMEWORK_NAR_ID, false, false, narFilter);
        } finally {
            NarUnpackLock.unlock();
        }
        final long unpackMillis = System.currentTimeMillis() - unpackStart;
        logger.info("Unpacked NAR files in {} millis", unpackMillis);

        final File statelessNarWorkingDir = locateStatelessNarWorkingDirectory(extensionsWorkingDir);
        final File statelessNarInf = new File(statelessNarWorkingDir, "NAR-INF");
        final File statelessNarDependencies = new File(statelessNarInf, "bundled-dependencies");
        final File[] statelessNarContents = statelessNarDependencies.listFiles();
        if (statelessNarContents == null || statelessNarContents.length == 0) {
            throw new IOException("Could not access contents of Stateless NAR dependencies at " + statelessNarDependencies);
        }

        final URL[] urls = new URL[statelessNarContents.length];
        for (int i=0; i < statelessNarContents.length; i++) {
            final File dependency = statelessNarContents[i];
            final URL url = dependency.toURI().toURL();
            urls[i] = url;
        }

        final URLClassLoader statelessClassLoader = new URLClassLoader(urls, rootClassLoader);
        Thread.currentThread().setContextClassLoader(statelessClassLoader);
        return new StatelessBootstrap(statelessClassLoader, engineConfiguration);
    }

    private static boolean isRequiredForBootstrap(final BundleCoordinate coordinate) {
        final String group = coordinate.getGroup();
        if (!NIFI_GROUP.equals(group)) {
            return false;
        }

        final String artifactId = coordinate.getId();
        return NIFI_JETTY_ARTIFACT_ID.equals(artifactId) || NIFI_STATELESS_ARTIFACT_ID.equals(artifactId);
    }

    private static File locateStatelessNarWorkingDirectory(final File workingDirectory) throws IOException {
        final File[] files = workingDirectory.listFiles();
        if (files == null) {
            throw new IOException("Could not read contents of working directory " + workingDirectory);
        }

        final List<File> matching = new ArrayList<>();
        for (final File file : files) {
            final String filename = file.getName();
            if (STATELESS_NAR_PATTERN.matcher(filename).matches()) {
                matching.add(file);
            }
        }

        if (matching.isEmpty()) {
            throw new IOException("Could not find NiFi Stateless NAR in working directory " + workingDirectory);
        }
        if (matching.size() > 1) {
            throw new IOException("Found multiple NiFi Stateless NARs in working directory " + workingDirectory + ": " + matching);
        }

        return matching.get(0);
    }

    private static <T> T getSingleInstance(final ClassLoader classLoader, final Class<T> type) {
        final ServiceLoader<T> serviceLoader = ServiceLoader.load(type, classLoader);

        T instance = null;
        for (final T object : serviceLoader) {
            if (instance == null) {
                instance = object;
            } else {
                throw new IllegalStateException("Found multiple implementations of " + type);
            }
        }

        if (instance == null) {
            throw new IllegalStateException("Could not find any implementations of " + type);
        }

        return instance;
    }
}
