/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */
package org.apache.rocketmq.connect.runtime.controller.isolation;

import io.openmessaging.connector.api.component.Transform;
import io.openmessaging.connector.api.component.connector.Connector;
import io.openmessaging.connector.api.component.task.Task;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;

import io.openmessaging.connector.api.component.task.sink.SinkConnector;
import io.openmessaging.connector.api.component.task.source.SourceConnector;
import io.openmessaging.connector.api.data.RecordConverter;
import io.openmessaging.connector.api.errors.ConnectException;
import org.apache.rocketmq.connect.runtime.common.ConnectKeyValue;
import org.apache.rocketmq.connect.runtime.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Plugin {

    public enum ClassLoaderUsage {
        CURRENT_CLASSLOADER,
        PLUGINS
    }

    private static final Logger log = LoggerFactory.getLogger(Plugin.class);
    private final DelegatingClassLoader delegatingLoader;

    public Plugin(List<String> pluginLocations) {
        delegatingLoader = newDelegatingClassLoader(pluginLocations);
        delegatingLoader.initLoaders();
    }

    public void initLoaders(){
        delegatingLoader.initLoaders();
    }

    protected DelegatingClassLoader newDelegatingClassLoader(final List<String> paths) {
        return AccessController.doPrivileged(
                (PrivilegedAction<DelegatingClassLoader>) () -> new DelegatingClassLoader(paths)
        );
    }

    public DelegatingClassLoader delegatingLoader() {
        return delegatingLoader;
    }
    public Set<PluginWrapper<SinkConnector>> sinkConnectors() {
        return delegatingLoader.sinkConnectors();
    }

    public Set<PluginWrapper<SourceConnector>> sourceConnectors() {
        return delegatingLoader.sourceConnectors();
    }

    public Set<PluginWrapper<RecordConverter>> converters() {
        return delegatingLoader.converters();
    }

    public Set<PluginWrapper<Transform<?>>> transformations() {
        return delegatingLoader.transformations();
    }

    public ClassLoader currentThreadLoader() {
        return Thread.currentThread().getContextClassLoader();
    }

    public static ClassLoader compareAndSwapLoaders(ClassLoader loader) {
        ClassLoader current = Thread.currentThread().getContextClassLoader();
        if (!current.equals(loader)) {
            Thread.currentThread().setContextClassLoader(loader);
        }
        return current;
    }

    protected static <U> Class<? extends U> pluginClass(
            DelegatingClassLoader loader,
            String classOrAlias,
            Class<U> pluginClass
    ) throws ClassNotFoundException {
        Class<?> klass = loader.loadClass(classOrAlias, false);
        if (pluginClass.isAssignableFrom(klass)) {
            return (Class<? extends U>) klass;
        }

        throw new ClassNotFoundException(
                "Requested class: "
                        + classOrAlias
                        + " does not extend " + pluginClass.getSimpleName()
        );
    }

    public Connector newConnector(String connectorClassOrAlias) {
        Class<? extends Connector> klass = connectorClass(connectorClassOrAlias);
        return newPlugin(klass);
    }

    public Class<? extends Connector> connectorClass(String connectorClassOrAlias) {
        Class<? extends Connector> klass;
        try {
            klass = pluginClass(delegatingLoader, connectorClassOrAlias, Connector.class);
        } catch (ClassNotFoundException e) {
            List<PluginWrapper<? extends Connector>> matches = new ArrayList<>();
            Set<PluginWrapper<Connector>> connectors = delegatingLoader.connectors();
            for (PluginWrapper<? extends Connector> plugin : connectors) {
                Class<?> pluginClass = plugin.pluginClass();
                String simpleName = pluginClass.getSimpleName();
                if (simpleName.equals(connectorClassOrAlias)
                        || simpleName.equals(connectorClassOrAlias + "Connector")) {
                    matches.add(plugin);
                }
            }
            if (matches.isEmpty()) {
                throw new ConnectException(
                        "Failed to find any class that implements Connector and which name matches "
                                + connectorClassOrAlias
                                + ", available connectors are: "
                                + Utils.join(connectors, ", ")
                );
            }

            // conflict connector
            if (matches.size() > 1) {
                throw new ConnectException(
                        "More than one connector matches alias "
                                + connectorClassOrAlias
                                + ". Please use full package and class name instead. Classes found: "
                                + Utils.join(connectors, ", ")
                );
            }

            PluginWrapper<? extends Connector> entry = matches.get(0);
            klass = entry.pluginClass();
        }
        return klass;
    }

    public Task newTask(Class<? extends Task> taskClass) {
        return newPlugin(taskClass);
    }

    public RecordConverter newConverter(ConnectKeyValue config, String classPropertyName, ClassLoaderUsage classLoaderUsage) {
        if (!config.containsKey(classPropertyName)) {
            return null;
        }
        Class<? extends RecordConverter> klass = null;
        switch (classLoaderUsage) {
            case CURRENT_CLASSLOADER:
                klass = pluginClassFromConfig(config, classPropertyName, RecordConverter.class, delegatingLoader.converters());
                break;
            case PLUGINS:
                String converterClassOrAlias = Utils.getClass(config,classPropertyName).getName();
                try {
                    klass = pluginClass(delegatingLoader, converterClassOrAlias, RecordConverter.class);
                } catch (ClassNotFoundException e) {
                    throw new ConnectException(
                            "Failed to find any class that implements Converter and which name matches "
                                    + converterClassOrAlias + ", available converters are: "
                                    + pluginNames(delegatingLoader.converters())
                    );
                }
                break;
        }
        if (klass == null) {
            throw new ConnectException("Unable to initialize the Converter specified in '" + classPropertyName + "'");
        }

        // Configure the Converter using only the old configuration mechanism ...
        String configPrefix = classPropertyName + ".";
        Map<String, String> converterConfig = config.originalsWithPrefix(configPrefix);
        log.debug("Configuring the converter with configuration keys:{}{}", System.lineSeparator(), converterConfig.keySet());

        RecordConverter plugin;
        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
        try {
            plugin = newPlugin(klass);
            plugin.configure(converterConfig);
        } finally {
            compareAndSwapLoaders(savedLoader);
        }
        return plugin;
    }


    public <T> List<T> newPlugins(List<String> klassNames, ConnectKeyValue config, Class<T> pluginKlass) {
        List<T> plugins = new ArrayList<>();
        if (klassNames != null) {
            for (String klassName : klassNames) {
                plugins.add(newPlugin(klassName, config, pluginKlass));
            }
        }
        return plugins;
    }

    protected static <T> T newPlugin(Class<T> klass) {
        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
        try {
            return Utils.newInstance(klass);
        } catch (Throwable t) {
            throw new ConnectException("Instantiation error", t);
        } finally {
            compareAndSwapLoaders(savedLoader);
        }
    }


    public <T> T newPlugin(String klassName, ConnectKeyValue config, Class<T> pluginKlass) {
        T plugin;
        Class<? extends T> klass;
        try {
            klass = pluginClass(delegatingLoader, klassName, pluginKlass);
        } catch (ClassNotFoundException e) {
            String msg = String.format("Failed to find any class that implements %s and which "
                    + "name matches %s", pluginKlass, klassName);
            throw new ConnectException(msg);
        }
        ClassLoader savedLoader = compareAndSwapLoaders(klass.getClassLoader());
        try {
            plugin = newPlugin(klass);
        } finally {
            compareAndSwapLoaders(savedLoader);
        }
        return plugin;
    }

    private static <T> String pluginNames(Collection<PluginWrapper<T>> plugins) {
        return Utils.join(plugins, ", ");
    }


    protected <U> Class<? extends U> pluginClassFromConfig(
            ConnectKeyValue config,
            String propertyName,
            Class<U> pluginClass,
            Collection<PluginWrapper<U>> plugins
    ) {
        Class<?> klass = Utils.getClass(config, propertyName);
        if (pluginClass.isAssignableFrom(klass)) {
            return (Class<? extends U>) klass;
        }
        throw new ConnectException(
                "Failed to find any class that implements " + pluginClass.getSimpleName()
                        + " for the config "
                        + propertyName + ", available classes are: "
                        + pluginNames(plugins)
        );
    }

}
