/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.sling.distribution.journal.it;


import static java.lang.Boolean.getBoolean;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap;
import static org.apache.kafka.clients.admin.AdminClient.create;
import static org.apache.kafka.clients.producer.ProducerConfig.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.sling.testing.paxexam.SlingOptions.paxTinybundles;
import static org.apache.sling.testing.paxexam.SlingOptions.slingDistribution;
import static org.apache.sling.testing.paxexam.SlingOptions.slingQuickstartOak;
import static org.ops4j.pax.exam.Constants.START_LEVEL_SYSTEM_BUNDLES;
import static org.ops4j.pax.exam.CoreOptions.bootDelegationPackage;
import static org.ops4j.pax.exam.CoreOptions.bundle;
import static org.ops4j.pax.exam.CoreOptions.composite;
import static org.ops4j.pax.exam.CoreOptions.frameworkStartLevel;
import static org.ops4j.pax.exam.CoreOptions.keepCaches;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
import static org.ops4j.pax.exam.CoreOptions.repository;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
import static org.ops4j.pax.exam.CoreOptions.url;
import static org.ops4j.pax.exam.CoreOptions.vmOption;
import static org.ops4j.pax.exam.CoreOptions.when;
import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
import static org.osgi.util.converter.Converters.standardConverter;

import java.io.File;
import java.util.Map;

import javax.inject.Inject;

import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.kafka.KafkaClientProvider;
import org.apache.sling.distribution.journal.kafka.KafkaEndpoint;
import org.apache.sling.distribution.serialization.impl.vlt.VaultDistributionPackageBuilderFactory;
import org.apache.sling.testing.paxexam.SlingOptions;
import org.apache.sling.testing.paxexam.TestSupport;
import org.ops4j.pax.exam.ConfigurationManager;
import org.ops4j.pax.exam.Constants;
import org.ops4j.pax.exam.CoreOptions;
import org.ops4j.pax.exam.Option;
import org.ops4j.pax.exam.options.CompositeOption;
import org.ops4j.pax.exam.options.DefaultCompositeOption;
import org.ops4j.pax.exam.options.libraries.JUnitBundlesOption;
import org.osgi.framework.BundleContext;
import org.osgi.service.cm.ConfigurationAdmin;

import com.google.common.collect.ImmutableMap;

public class DistributionTestSupport extends TestSupport {
    public static final String TOPIC_PACKAGE = "aemdistribution_package";
    public static final String TOPIC_DISCOVERY = "aemdistribution_discovery";
    public static final String TOPIC_COMMAND = "aemdistribution_command";
    public static final String TOPIC_STATUS = "aemdistribution_status";
    public static final String TOPIC_EVENT = "aemdistribution_event";
    
    @Inject
    protected BundleContext bundleContext;

    @Inject
    protected ConfigurationAdmin configAdmin;

    private int httpPort = 8181;

    public DistributionTestSupport withHttpPort(int httpPort) {
        this.httpPort = httpPort;
        return this;
    }
    
    public Option baseConfiguration() {
        String workingDirectory = workingDirectory();
        FileUtil.deleteDir(new File(workingDirectory));
        return baseConfiguration(workingDirectory);
    }

    public Option baseConfiguration(String baseDirectory) {
        // Patch versions of features provided by SlingOptions
        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.commons.mime");
        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.commons.metrics");
        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.core");
        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal");
        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal.messages");
        SlingOptions.versionResolver.setVersionFromProject("org.apache.sling", "org.apache.sling.distribution.journal.kafka");
        SlingOptions.versionResolver.setVersionFromProject("io.dropwizard.metrics", "metrics-core");
        SlingOptions.versionResolver.setVersion("org.slf4j", "log4j-over-slf4j", "1.7.6");

        Option baseOptions = composite(
                failOnUnresolvedBundles(),
                keepCaches(),
                localMavenRepo(),
                //repository("https://repository.apache.org/snapshots/").id("apache-snapshots").allowSnapshots(),
                repository("https://repo1.maven.org/maven2").id("central"),
                CoreOptions.workingDirectory(workingDirectory()),
                mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.testing.paxexam").versionAsInProject(),
                paxTinybundles(),
                SlingOptions.logback(),
                mavenBundle().groupId("org.slf4j").artifactId("log4j-over-slf4j").version(SlingOptions.versionResolver),
                mvn("com.google.code.gson", "gson"),
                
                // The base sling Quickstart
                slingQuickstart(baseDirectory),
                mavenBundle().groupId("org.apache.felix").artifactId("org.apache.felix.webconsole.plugins.ds").version(SlingOptions.versionResolver),
                mavenBundle().groupId("org.apache.sling").artifactId("org.apache.sling.commons.metrics").version(SlingOptions.versionResolver),
                mvn("org.apache.felix", "org.apache.felix.rootcause"),
                mvn("org.apache.felix", "org.apache.felix.systemready"),
                kafka(),

                // The bundle built (org.apache.sling.distribution.journal)
                mvn("org.apache.sling", "org.apache.sling.distribution.journal"),
                mvn("org.apache.sling", "org.apache.sling.distribution.journal.messages"),
                mvn("org.apache.sling", "org.apache.sling.distribution.journal.kafka"),

                // distribution bundles
                slingDistribution(),

                // testing
                //slingResourcePresence(), // see https://github.com/apache/sling-org-apache-sling-resource-presence
                jsoup(),
                myJunitBundles()

        );

        // Remote debugger on the forked JVM
        // Run with mvn install -DisDebugEnabled=true
        return getBoolean("isDebugEnabled")
                ? composite(remoteDebug(), baseOptions)
                : baseOptions;
    }

    public static CompositeOption myJunitBundles() {
        return new DefaultCompositeOption(
                composite(defaultTestSystemOptions()),
                new JUnitBundlesOption(), 
                systemProperty("pax.exam.invoker").value("junit"),
                mvn("org.mockito", "mockito-all"),
                mvn("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.hamcrest"),
                mvn("org.awaitility", "awaitility"),
                bundle("link:classpath:META-INF/links/org.ops4j.pax.exam.invoker.junit.link"));
    }
    
    /**
     * Dependencies for mockito 2
     */
    public static Option mockito2() {
        return composite(
                mvn("org.objenesis", "objenesis"),
                mvn("net.bytebuddy", "byte-buddy"),
                mvn("net.bytebuddy", "byte-buddy-agent"),
                mvn("org.mockito", "mockito-core")
                );
    }
    
    /**
     * Standard test system options with just at inject removed as it collides with
     * the one provided in sling 
     */
    private static Option[] defaultTestSystemOptions() {
        ConfigurationManager cm = new ConfigurationManager();
        String logging = cm.getProperty(Constants.EXAM_LOGGING_KEY,
            Constants.EXAM_LOGGING_PAX_LOGGING);

        return new Option[] {
            bootDelegationPackage("sun.*"),
            frameworkStartLevel(Constants.START_LEVEL_TEST_BUNDLE),
            url("link:classpath:META-INF/links/org.ops4j.pax.exam.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.exam.inject.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.extender.service.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.osgi.compendium.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),

            when(logging.equals(Constants.EXAM_LOGGING_PAX_LOGGING)).useOptions(
                url("link:classpath:META-INF/links/org.ops4j.pax.logging.api.link").startLevel(
                    START_LEVEL_SYSTEM_BUNDLES)),

            url("link:classpath:META-INF/links/org.ops4j.base.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.core.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.extender.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.framework.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.lifecycle.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            url("link:classpath:META-INF/links/org.ops4j.pax.swissbox.tracker.link").startLevel(
                START_LEVEL_SYSTEM_BUNDLES),
            /*
            url("link:classpath:META-INF/links/org.apache.geronimo.specs.atinject.link")
                .startLevel(START_LEVEL_SYSTEM_BUNDLES) };
                */
        };
    }
    
    protected Option slingQuickstart(String baseDirectory) {
        final String workingDirectory = String.format("%s/instance", baseDirectory);
        final String dataStoreDirectory = String.format("%s/shareddatastore", baseDirectory);
        System.out.println(String.format("Quickstart with workingDirectory %s, port %s", workingDirectory, httpPort));
        return slingQuickstartOakSharedBlobstore(workingDirectory, dataStoreDirectory, httpPort);
    }
    
    public static Option defaultOsgiConfigs() {
        return defaultOsgiConfigs("");
    }

    /**
     * OSGI configurations targeted to author and publish instances
     */
    protected static Option defaultOsgiConfigs(String journalEndpoint) {
        return composite(
                newConfiguration("org.apache.sling.jcr.resource.internal.JcrSystemUserValidator")
                    .put("allow.only.system.user", false).asOption(),
                    
                // For production the users would be: replication-service,content-writer-service
                factoryConfiguration("org.apache.sling.serviceusermapping.impl.ServiceUserMapperImpl.amended")
                        .put("user.mapping", new String[]{"org.apache.sling.distribution.journal:bookkeeper=admin","org.apache.sling.distribution.journal:importer=admin"})
                        .asOption(),

                factoryConfiguration(VaultDistributionPackageBuilderFactory.class.getName())
                        .put("name", "journal")
                        .put("type", "inmemory")
                        .put("useBinaryReferences", "true")
                        .put("aclHandling", "IGNORE")
                        .put("package.filters", new String[]{"/home/users|-.*/.tokens", "/home/users|-.*/rep:cache"})
                        .put("property.filters", new String[]{"/|-^.*/cq:lastReplicated|-^.*/cq:lastReplicatedBy|-^.*/cq:lastReplicationAction"})
                        .asOption(),

                newConfiguration("org.apache.sling.distribution.journal.kafka.KafkaClientProvider")
                    .asOption(),

                newConfiguration("org.apache.sling.distribution.component.impl.DistributionComponentFactoryMap")
                        .put("mapping.agent", new String[]{//
                                "pub:org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory"})
                        .asOption(),

                newConfiguration("org.apache.sling.distribution.journal.impl.shared.JournalAvailableChecker")
                        .put("scheduler.period", 1L)
                        .asOption()

        );

    }

    /**
     * OSGI configurations targeted to the author instances only
     */
    public static Option authorOsgiConfigs() {
        return composite(
                factoryConfiguration("org.apache.sling.distribution.journal.impl.publisher.DistributionPublisherFactory")
                        .put("name", "agent1")
                        .put("packageBuilder.target", "(name=journal)")
                        .asOption(),
                factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
                    .put("kind", "agent")
                    .put("provider.roots", "/libs/sling/distribution/services/agents")
                    .asOption(),
                 factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
                    .put("kind", "exporter")
                    .put("provider.roots", "/libs/sling/distribution/services/exporters")
                    .asOption(),
                 factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
                    .put("kind", "importer")
                    .put("provider.roots", "/libs/sling/distribution/services/importers")
                    .asOption()
        );

    }
    
    protected static Option publishOsgiConfigs() {
        return publishOsgiConfigs("subscriber-agent1");
    }

    /**
     * OSGI configuration targeted to the publish instances only
     */
    public static Option publishOsgiConfigs(String agentName) {
        return publishOsgiConfigs(agentName, true, false);
    }

    protected static Option publishOsgiConfigs(String agentName, boolean editable, boolean stagingPrecondition) {
        return composite(
                factoryConfiguration("org.apache.sling.distribution.resources.impl.DistributionServiceResourceProviderFactory")
                        .put("kind", "agent")
                        .put("provider.roots", "/libs/sling/distribution/services/agents")
                        .asOption(),

                factoryConfiguration("org.apache.sling.distribution.journal.impl.subscriber.DistributionSubscriberFactory")
                        .put("name", agentName)
                        .put("agentNames", new String[]{"agent1"})
                        .put("packageBuilder.target", "(name=journal)")
                        .put("precondition.target",  stagingPrecondition ? "(name=staging)" : "(name=default)")
                        .put("editable", editable)
                        .put("announceDelay", "500")
                        .asOption());
    }


    protected static Option remoteDebug() {
        return remoteDebug(5005);
    }

    protected static Option remoteDebug(int debugPort) {
        System.out.println(String.format("Remote debugger on port: %s", debugPort));
        return vmOption("-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005");
    }

    private static Option slingQuickstartOakSharedBlobstore(String workingDirectory, String dataStoreDirectory, int httpPort) {

        // Option for a quickstart running an Oak repository with a nodestore and a shared data store.

        String slingHome = String.format("%s/sling", workingDirectory);
        String repositoryHome = String.format("%s/repository", slingHome);
        String localIndexDir = String.format("%s/index", repositoryHome);
        return composite(

                slingQuickstartOak(),

                mavenBundle()
                        .groupId("org.apache.jackrabbit")
                        .artifactId("oak-lucene")
                        .version(SlingOptions.versionResolver),

                mavenBundle()
                        .groupId("org.apache.jackrabbit")
                        .artifactId("oak-segment-tar")
                        .version(SlingOptions.versionResolver),

                mavenBundle()
                        .groupId("org.apache.sling")
                        .artifactId("org.apache.sling.jcr.oak.server")
                        .version(SlingOptions.versionResolver),

                newConfiguration("org.apache.felix.http")
                        .put("org.osgi.service.http.port", httpPort).asOption(),

                newConfiguration("org.apache.jackrabbit.oak.segment.SegmentNodeStoreService")
                        .put("customBlobStore", true)
                        .put("repository.home", repositoryHome)
                        .put("name", "NodeStore with custom blob store").asOption(),

                newConfiguration("org.apache.jackrabbit.oak.plugins.blob.datastore.FileDataStore")
                        .put("path", dataStoreDirectory)
                        .put("minRecordLength", 16384).asOption(),

                newConfiguration("org.apache.jackrabbit.oak.plugins.index.lucene.LuceneIndexProviderService")
                        .put("localIndexDir", localIndexDir).asOption()
        );
    }
    
    private static Option kafka() {
        return composite(
                mvn("com.fasterxml.jackson.core", "jackson-core"),
                mvn("com.fasterxml.jackson.core", "jackson-annotations"),
                mvn("com.fasterxml.jackson.core", "jackson-databind"),
                mvn("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.kafka-clients")
        );
    }
    
    public static Option mvn(String groupId, String artifactId) {
        return mavenBundle().groupId(groupId).artifactId(artifactId).versionAsInProject();
    }

    private static Option jsoup() {
        return mavenBundle().groupId("org.jsoup").artifactId("jsoup").versionAsInProject();
    }

    public static void createTopic(String topicName) {
        NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
        try (AdminClient admin = create(singletonMap(BOOTSTRAP_SERVERS_CONFIG,
                standardConverter().convert(kafkaProperties()).to(KafkaEndpoint.class).kafkaBootstrapServers()))) {
            CreateTopicsResult result = admin.createTopics(singletonList(newTopic));
            result.values().get(topicName).get();
        } catch (Exception e) {
            throw new RuntimeException(format("Failed to create topic %s", topicName), e);
        }
    }

    public static void createTopics() {
        createTopic(TOPIC_DISCOVERY);
        createTopic(TOPIC_PACKAGE);
        createTopic(TOPIC_COMMAND);
        createTopic(TOPIC_STATUS);
        createTopic(TOPIC_EVENT);
    }

    public static MessagingProvider createProvider() {
        KafkaClientProvider provider = new KafkaClientProvider();
        provider.activate(standardConverter().convert(kafkaProperties()).to(KafkaEndpoint.class));
        return provider;
    }

    private static Map<String,String> kafkaProperties() {
         return ImmutableMap.of(
                "kafkaDefaultApiTimeout", "5000",
                "kafkaConnectTimeout", "32000");
    }

}
