/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.rya.indexing.pcj.fluo.demo;

import static com.google.common.base.Preconditions.checkNotNull;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.ZooKeeperInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.fluo.api.client.FluoAdmin.AlreadyInitializedException;
import org.apache.fluo.api.client.FluoClient;
import org.apache.fluo.api.client.FluoFactory;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.api.config.ObserverSpecification;
import org.apache.fluo.api.mini.MiniFluo;
import org.apache.log4j.ConsoleAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.apache.log4j.PatternLayout;
import org.apache.rya.accumulo.AccumuloRdfConfiguration;
import org.apache.rya.accumulo.AccumuloRyaDAO;
import org.apache.rya.accumulo.instance.AccumuloRyaInstanceDetailsRepository;
import org.apache.rya.api.RdfCloudTripleStoreConfiguration;
import org.apache.rya.api.instance.RyaDetails;
import org.apache.rya.api.instance.RyaDetails.EntityCentricIndexDetails;
import org.apache.rya.api.instance.RyaDetails.FreeTextIndexDetails;
import org.apache.rya.api.instance.RyaDetails.JoinSelectivityDetails;
import org.apache.rya.api.instance.RyaDetails.PCJIndexDetails;
import org.apache.rya.api.instance.RyaDetails.ProspectorDetails;
import org.apache.rya.api.instance.RyaDetails.TemporalIndexDetails;
import org.apache.rya.api.instance.RyaDetailsRepository;
import org.apache.rya.api.instance.RyaDetailsRepository.RyaDetailsRepositoryException;
import org.apache.rya.indexing.pcj.fluo.app.export.rya.RyaExportParameters;
import org.apache.rya.indexing.pcj.fluo.app.observers.FilterObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.JoinObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.QueryResultObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.StatementPatternObserver;
import org.apache.rya.indexing.pcj.fluo.app.observers.TripleObserver;
import org.apache.rya.indexing.pcj.fluo.demo.Demo.DemoExecutionException;
import org.apache.rya.rdftriplestore.RdfCloudTripleStore;
import org.apache.rya.rdftriplestore.RyaSailRepository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.RepositoryException;

import com.google.common.base.Optional;
import com.google.common.io.Files;

/**
 * Runs {@link Demo}s that require Rya and Fluo.
 */
public class DemoDriver {
    private static final Logger log = Logger.getLogger(DemoDriver.class);

    private static final String RYA_TABLE_PREFIX = "demo_";

    public static final String USE_MOCK_INSTANCE = ".useMockInstance";
    public static final String CLOUDBASE_INSTANCE = "sc.cloudbase.instancename";
    public static final String CLOUDBASE_USER = "sc.cloudbase.username";
    public static final String CLOUDBASE_PASSWORD = "sc.cloudbase.password";

    // Rya data store and connections.
    private static MiniAccumuloCluster accumulo = null;
    private static Connector accumuloConn = null;
    private static RyaSailRepository ryaRepo = null;
    private static RepositoryConnection ryaConn = null;

    // Fluo data store and connections.
    private static MiniFluo fluo = null;
    private static FluoClient fluoClient = null;

    public static void main(final String[] args) {
        setupLogging();

        // Setup the resources required to run the demo.
        try {
            log.info("Initializing resources used by the demo...");
            setupResources();
        } catch (final DemoInitializationException e) {
            log.error("Could not initialize the demo's resources. Exiting.", e);
            System.exit(-1);
        }
        log.info("");

        // Run the demo.
        try {
            new FluoAndHistoricPcjsDemo().execute(accumulo, accumuloConn, RYA_TABLE_PREFIX, ryaRepo, ryaConn, fluo, fluoClient);
        } catch (final DemoExecutionException e) {
            log.error("An exception was thrown durring demo execution. The demo can not continue.", e);
        }

        // Tear down the demo environment.
        log.info("Shutting down the demo...");
        shutdownResources();
        log.info("Demo exiting.");
    }

    private static void setupLogging() {
        // Turn off all the loggers and customize how they write to the console.
        final Logger rootLogger = LogManager.getRootLogger();
        rootLogger.setLevel(Level.OFF);
        final ConsoleAppender ca = (ConsoleAppender) rootLogger.getAppender("stdout");
        ca.setLayout(new PatternLayout("%-5p - %m%n"));


        // Turn the loggers used by the demo back on.
        log.setLevel(Level.INFO);
    }

    /**
     * Indicates a problem while initializing the demo's resources prevented it from starting.
     */
    private static final class DemoInitializationException extends Exception {
        private static final long serialVersionUID = 1L;

        public DemoInitializationException(final String message, final Exception cause) {
            super(message, cause);
        }
    }

    private static void setupResources() throws DemoInitializationException {
        try{
            // Initialize the Mini Accumulo that will be used to store Triples and get a connection to it.
            log.debug("Starting up the Mini Accumulo Cluster used by Rya.");
            accumulo = startMiniAccumulo();

            // Setup the Rya library to use the Mini Accumulo.
            log.debug("Starting up the Rya Repository.");
            ryaRepo = setupRya(accumulo);
            ryaConn = ryaRepo.getConnection();

            // Initialize the Mini Fluo that will be used to store created queries.
            log.debug("Starting up the Mini Fluo instance.");
            fluo = startMiniFluo();
            fluoClient = FluoFactory.newClient( fluo.getClientConfiguration() );
        } catch(final Exception e) {
            throw new DemoInitializationException("Could not run the demo because of a problem while initializing the mini resources.", e);
        }
    }

    private static void shutdownResources() {
        if(ryaConn != null) {
            try {
                log.debug("Shutting down Rya Connection.");
                ryaConn.close();
            } catch(final Exception e) {
                log.error("Could not shut down the Rya Connection.", e);
            }
        }

        if(ryaRepo != null) {
            try {
                log.debug("Shutting down Rya Repo.");
                ryaRepo.shutDown();
            } catch(final Exception e) {
                log.error("Could not shut down the Rya Repo.", e);
            }
        }

        if(accumulo != null) {
            try {
                log.debug("Shutting down the Mini Accumulo being used as a Rya store.");
                accumulo.stop();
            } catch(final Exception e) {
                log.error("Could not shut down the Mini Accumulo.", e);
            }
        }

        if(fluoClient != null) {
            try {
                log.debug("Shutting down Fluo Client.");
                fluoClient.close();
            } catch(final Exception e) {
                log.error("Could not shut down the Fluo Client.", e);
            }
        }

        if(fluo != null) {
            try {
                log.debug("Shutting down Mini Fluo.");
                fluo.close();
            } catch (final Exception e) {
                log.error("Could not shut down the Mini Fluo.", e);
            }
        }
    }

    /**
     * Setup a Mini Accumulo cluster that uses a temporary directory to store its data.
     *
     * @return A Mini Accumulo cluster.
     */
    private static MiniAccumuloCluster startMiniAccumulo() throws IOException, InterruptedException, AccumuloException, AccumuloSecurityException {
        final File miniDataDir = Files.createTempDir();

        // Setup and start the Mini Accumulo.
        final MiniAccumuloCluster accumulo = new MiniAccumuloCluster(miniDataDir, "password");
        accumulo.start();

        // Store a connector to the Mini Accumulo.
        final Instance instance = new ZooKeeperInstance(accumulo.getInstanceName(), accumulo.getZooKeepers());
        accumuloConn = instance.getConnector("root", new PasswordToken("password"));

        return accumulo;
    }

    /**
     * Format a Mini Accumulo to be a Rya repository.
     *
     * @param accumulo - The Mini Accumulo cluster Rya will sit on top of. (not null)
     * @return The Rya repository sitting on top of the Mini Accumulo.
     */
    private static RyaSailRepository setupRya(final MiniAccumuloCluster accumulo) throws AccumuloException, AccumuloSecurityException, RepositoryException, AlreadyInitializedException, RyaDetailsRepositoryException {
        checkNotNull(accumulo);

        // Setup the Rya Repository that will be used to create Repository Connections.
        final RdfCloudTripleStore ryaStore = new RdfCloudTripleStore();
        final AccumuloRyaDAO crdfdao = new AccumuloRyaDAO();
        crdfdao.setConnector(accumuloConn);

        // Setup Rya configuration values.
        final String ryaInstanceName = "demo_";

        final AccumuloRdfConfiguration conf = new AccumuloRdfConfiguration();
        conf.setTablePrefix("demo_");
        conf.setDisplayQueryPlan(true);

        conf.setBoolean(USE_MOCK_INSTANCE, true);
        conf.set(RdfCloudTripleStoreConfiguration.CONF_TBL_PREFIX, RYA_TABLE_PREFIX);
        conf.set(CLOUDBASE_USER, "root");
        conf.set(CLOUDBASE_PASSWORD, "password");
        conf.set(CLOUDBASE_INSTANCE, accumulo.getInstanceName());

        crdfdao.setConf(conf);
        ryaStore.setRyaDAO(crdfdao);

        final RyaSailRepository ryaRepo = new RyaSailRepository(ryaStore);
        ryaRepo.initialize();

        // Create Rya Details for the instance name.
        final RyaDetailsRepository detailsRepo = new AccumuloRyaInstanceDetailsRepository(accumuloConn, ryaInstanceName);

        final RyaDetails details = RyaDetails.builder()
                .setRyaInstanceName(ryaInstanceName)
                .setRyaVersion("0.0.0.0")
                .setFreeTextDetails( new FreeTextIndexDetails(true) )
                .setEntityCentricIndexDetails( new EntityCentricIndexDetails(true) )
              //RYA-215                .setGeoIndexDetails( new GeoIndexDetails(true) )
                .setTemporalIndexDetails( new TemporalIndexDetails(true) )
                .setPCJIndexDetails(
                        PCJIndexDetails.builder()
                            .setEnabled(true) )
                .setJoinSelectivityDetails( new JoinSelectivityDetails( Optional.absent() ) )
                .setProspectorDetails( new ProspectorDetails( Optional.absent() ))
                .build();

        detailsRepo.initialize(details);

        return ryaRepo;
    }

    /**
     * Setup a Mini Fluo cluster that uses a temporary directory to store its data.ll
     *
     * @return A Mini Fluo cluster.
     */
    private static MiniFluo startMiniFluo() {
        final File miniDataDir = Files.createTempDir();

        // Setup the observers that will be used by the Fluo PCJ Application.
        final List<ObserverSpecification> observers = new ArrayList<>();
        observers.add(new ObserverSpecification(TripleObserver.class.getName()));
        observers.add(new ObserverSpecification(StatementPatternObserver.class.getName()));
        observers.add(new ObserverSpecification(JoinObserver.class.getName()));
        observers.add(new ObserverSpecification(FilterObserver.class.getName()));

        // Provide export parameters child test classes may provide to the export observer.
        final HashMap<String, String> exportParams = new HashMap<>();
        final RyaExportParameters ryaParams = new RyaExportParameters(exportParams);
        ryaParams.setUseRyaBindingSetExporter(true);
        ryaParams.setAccumuloInstanceName(accumulo.getInstanceName());
        ryaParams.setZookeeperServers(accumulo.getZooKeepers());
        ryaParams.setExporterUsername("root");
        ryaParams.setExporterPassword("password");
        ryaParams.setRyaInstanceName("demo_");

        final ObserverSpecification exportObserverConfig = new ObserverSpecification(QueryResultObserver.class.getName(), exportParams);
        observers.add(exportObserverConfig);

        // Configure how the mini fluo will run.
        final FluoConfiguration config = new FluoConfiguration();
        config.setApplicationName("IntegrationTests");
        config.setMiniDataDir(miniDataDir.getAbsolutePath());
        config.addObservers(observers);

        final MiniFluo miniFluo = FluoFactory.newMiniFluo(config);
        return miniFluo;
    }
}
