/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.atlas.repository.graphdb.janus;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.atlas.ApplicationProperties;
import org.apache.atlas.AtlasException;
import org.apache.atlas.repository.graphdb.AtlasGraph;
import org.apache.atlas.repository.graphdb.GraphDatabase;
import org.apache.atlas.repository.graphdb.janus.serializer.BigDecimalSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.BigIntegerSerializer;
import org.apache.atlas.repository.graphdb.janus.serializer.TypeCategorySerializer;
import org.apache.atlas.typesystem.types.DataTypes.TypeCategory;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.structure.io.graphson.GraphSONMapper;
import org.janusgraph.core.JanusGraph;
import org.janusgraph.core.JanusGraphException;
import org.janusgraph.core.JanusGraphFactory;
import org.janusgraph.core.schema.JanusGraphManagement;
import org.janusgraph.diskstorage.StandardIndexProvider;
import org.janusgraph.diskstorage.StandardStoreManager;
import org.janusgraph.diskstorage.solr.Solr6Index;
import org.janusgraph.graphdb.database.serialize.attribute.SerializableSerializer;
import org.janusgraph.graphdb.tinkerpop.JanusGraphIoRegistry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

import static org.apache.atlas.ApplicationProperties.DEFAULT_INDEX_RECOVERY;
import static org.apache.atlas.ApplicationProperties.INDEX_RECOVERY_CONF;

/**
 * Default implementation for Graph Provider that doles out JanusGraph.
 */
public class AtlasJanusGraphDatabase implements GraphDatabase<AtlasJanusVertex, AtlasJanusEdge> {
    private static final Logger LOG      = LoggerFactory.getLogger(AtlasJanusGraphDatabase.class);

    private static final String OLDER_STORAGE_EXCEPTION = "Storage version is incompatible with current client";

    /**
     * Constant for the configuration property that indicates the prefix.
     */
    public static final String GRAPH_PREFIX               = "atlas.graph";
    public static final String INDEX_BACKEND_CONF         = "index.search.backend";
    public static final String SOLR_ZOOKEEPER_URL         = "atlas.graph.index.search.solr.zookeeper-url";
    public static final String SOLR_ZOOKEEPER_URLS        = "atlas.graph.index.search.solr.zookeeper-urls";
    public static final String INDEX_BACKEND_LUCENE       = "lucene";
    public static final String INDEX_BACKEND_ES           = "elasticsearch";
    public static final String GRAPH_TX_LOG_CONF          = "tx.log-tx";
    public static final String GRAPH_TX_LOG_VERBOSE_CONF  = "tx.recovery.verbose";
    public static final String SOLR_INDEX_TX_LOG_TTL_CONF = "write.ahead.log.ttl.in.hours";
    public static final String GRAPH_TX_LOG_TTL_CONF      = "log.tx.ttl";
    public static final long   DEFAULT_GRAPH_TX_LOG_TTL   = 72; //Hrs

    private static volatile AtlasJanusGraph atlasGraphInstance = null;
    private static volatile JanusGraph graphInstance;

    public AtlasJanusGraphDatabase() {
        //update registry
        GraphSONMapper.build().addRegistry(JanusGraphIoRegistry.getInstance()).create();
    }

    public static Configuration getConfiguration() throws AtlasException {
        Configuration configProperties = ApplicationProperties.get();

        if (isEmbeddedSolr()) { // AtlasJanusGraphIndexClient.performRequestHandlerAction() fails for embedded-solr; disable freetext until this issue is resolved
            startEmbeddedSolr();

            configProperties.setProperty(ApplicationProperties.ENABLE_FREETEXT_SEARCH_CONF, false);
        }

        configProperties.setProperty(SOLR_ZOOKEEPER_URLS, configProperties.getStringArray(SOLR_ZOOKEEPER_URL));

        Configuration janusConfig = ApplicationProperties.getSubsetConfiguration(configProperties, GRAPH_PREFIX);

        //add serializers for non-standard property value types that Atlas uses
        janusConfig.addProperty("attributes.custom.attribute1.attribute-class", TypeCategory.class.getName());
        janusConfig.addProperty("attributes.custom.attribute1.serializer-class", TypeCategorySerializer.class.getName());

        //not ideal, but avoids making large changes to Atlas
        janusConfig.addProperty("attributes.custom.attribute2.attribute-class", ArrayList.class.getName());
        janusConfig.addProperty("attributes.custom.attribute2.serializer-class", SerializableSerializer.class.getName());

        janusConfig.addProperty("attributes.custom.attribute3.attribute-class", BigInteger.class.getName());
        janusConfig.addProperty("attributes.custom.attribute3.serializer-class", BigIntegerSerializer.class.getName());

        janusConfig.addProperty("attributes.custom.attribute4.attribute-class", BigDecimal.class.getName());
        janusConfig.addProperty("attributes.custom.attribute4.serializer-class", BigDecimalSerializer.class.getName());

        return janusConfig;
    }

    static {
        addHBase2Support();

        addSolr6Index();
    }

    private static void addHBase2Support() {
        try {
            Field field = StandardStoreManager.class.getDeclaredField("ALL_MANAGER_CLASSES");
            field.setAccessible(true);

            Field modifiersField = Field.class.getDeclaredField("modifiers");
            modifiersField.setAccessible(true);
            modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

            Map<String, String> customMap = new HashMap<>(StandardStoreManager.getAllManagerClasses());
            customMap.put("hbase2", org.janusgraph.diskstorage.hbase2.HBaseStoreManager.class.getName());
            ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
            field.set(null, immap);

            LOG.debug("Injected HBase2 support - {}", org.janusgraph.diskstorage.hbase2.HBaseStoreManager.class.getName());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static void addSolr6Index() {
        try {
            Field field = StandardIndexProvider.class.getDeclaredField("ALL_MANAGER_CLASSES");
            field.setAccessible(true);

            Field modifiersField = Field.class.getDeclaredField("modifiers");
            modifiersField.setAccessible(true);
            modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);

            Map<String, String> customMap = new HashMap<>(StandardIndexProvider.getAllProviderClasses());
            customMap.put("solr", Solr6Index.class.getName());
            ImmutableMap<String, String> immap = ImmutableMap.copyOf(customMap);
            field.set(null, immap);

            LOG.debug("Injected solr6 index - {}", Solr6Index.class.getName());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public static JanusGraph getGraphInstance() {
        if (graphInstance == null) {
            synchronized (AtlasJanusGraphDatabase.class) {
                if (graphInstance == null) {
                    Configuration config;
                    try {
                        config = getConfiguration();
                    } catch (AtlasException e) {
                        throw new RuntimeException(e);
                    }

                    configureTxLogBasedIndexRecovery();

                    graphInstance      = initJanusGraph(config);
                    atlasGraphInstance = new AtlasJanusGraph();

                    validateIndexBackend(config);

                }
            }
        }
        return graphInstance;
    }

    @VisibleForTesting
    static JanusGraph initJanusGraph(Configuration config) {
        try {
            return JanusGraphFactory.open(config);
        } catch (JanusGraphException e) {
            LOG.warn("JanusGraphException: {}", e.getMessage());
            if (e.getMessage().startsWith(OLDER_STORAGE_EXCEPTION)) {
                LOG.info("Newer client is being used with older janus storage version. Setting allow-upgrade=true and reattempting connection");
                config.addProperty("graph.allow-upgrade", true);
                return JanusGraphFactory.open(config);
            } else {
                throw new RuntimeException(e);
            }
        }
    }

    public static void configureTxLogBasedIndexRecovery() {
        try {
            boolean  recoveryEnabled = ApplicationProperties.get().getBoolean(INDEX_RECOVERY_CONF, DEFAULT_INDEX_RECOVERY);
            long     ttl             = ApplicationProperties.get().getLong(SOLR_INDEX_TX_LOG_TTL_CONF, DEFAULT_GRAPH_TX_LOG_TTL);
            Duration txLogTtlSecs    = Duration.ofSeconds(Duration.ofHours(ttl).getSeconds());

            Map<String, Object> properties = new HashMap<String, Object>() {{
                put(GRAPH_TX_LOG_CONF, recoveryEnabled);
                put(GRAPH_TX_LOG_VERBOSE_CONF, recoveryEnabled);
                put(GRAPH_TX_LOG_TTL_CONF, txLogTtlSecs);
            }};

            updateGlobalConfiguration(properties);

            LOG.info("Tx Log-based Index Recovery: {}!", recoveryEnabled ? "Enabled" : "Disabled");
        } catch (Exception e) {
            LOG.error("Error: Failed!", e);
        }
    }

    private static void updateGlobalConfiguration(Map<String, Object> map) {
        JanusGraph           graph            = null;
        JanusGraphManagement managementSystem = null;

        try {
            graph            = initJanusGraph(getConfiguration());
            managementSystem = graph.openManagement();

            for (Map.Entry<String, Object> entry : map.entrySet()) {
                managementSystem.set(entry.getKey(), entry.getValue());
            }

            LOG.info("Global properties updated!: {}", map);
        } catch (Exception ex) {
            LOG.error("Error updating global configuration: {}", map, ex);
        } finally {
            if (managementSystem != null) {
                managementSystem.commit();
            }

            if (graph != null) {
                graph.close();
            }
        }
    }

    public static JanusGraph getBulkLoadingGraphInstance() {
        try {
            Configuration cfg = getConfiguration();
            cfg.setProperty("storage.batch-loading", true);
            return JanusGraphFactory.open(cfg);
        } catch (IllegalArgumentException ex) {
            LOG.error("getBulkLoadingGraphInstance: Failed!", ex);
        } catch (AtlasException ex) {
            LOG.error("getBulkLoadingGraphInstance: Failed!", ex);
        }

        return null;
    }

    public static void unload() {
        synchronized (AtlasJanusGraphDatabase.class) {

            if (graphInstance == null) {
                return;
            }
            graphInstance.tx().commit();
            graphInstance.close();
            graphInstance = null;
        }
    }

    static void validateIndexBackend(Configuration config) {
        String configuredIndexBackend = config.getString(INDEX_BACKEND_CONF);

        JanusGraphManagement managementSystem = getGraphInstance().openManagement();
        String currentIndexBackend = managementSystem.get(INDEX_BACKEND_CONF);
        managementSystem.commit();

        if (!configuredIndexBackend.equals(currentIndexBackend)) {
            throw new RuntimeException("Configured Index Backend " + configuredIndexBackend
                    + " differs from earlier configured Index Backend " + currentIndexBackend + ". Aborting!");
        }

    }

    @Override
    public boolean isGraphLoaded() {
        return graphInstance != null;
    }

    @Override
    public void initializeTestGraph() {
        //nothing to do

    }

    @Override
    public void cleanup() {
        JanusGraph g = getGraphInstance();
        try {
            if(g != null) {
                g.close();
            }
        } catch (Throwable t) {
            LOG.warn("Could not close test JanusGraph", t);
            t.printStackTrace();
        }

        try {
            if(g != null) {
                JanusGraphFactory.drop(g);
            }
        } catch (Throwable t) {
            LOG.warn("Could not clear test JanusGraph", t);
            t.printStackTrace();
        }

        if (isEmbeddedSolr()) {
            try {
                stopEmbeddedSolr();
            } catch (Throwable t) {
                LOG.warn("Could not stop local solr server", t);
            }
        }
    }

    @Override
    public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraph() {
        getGraphInstance();
        return atlasGraphInstance;
    }

    @Override
    public AtlasGraph<AtlasJanusVertex, AtlasJanusEdge> getGraphBulkLoading() {
        return new AtlasJanusGraph(getBulkLoadingGraphInstance());
    }

    private static void startEmbeddedSolr() throws AtlasException {
        LOG.info("==> startEmbeddedSolr()");

        try {
            Class<?> localSolrRunnerClz = Class.forName("org.apache.atlas.runner.LocalSolrRunner");
            Method   startMethod        = localSolrRunnerClz.getMethod("start");

            startMethod.invoke(null);
        } catch (Exception excp) {
            LOG.error("startEmbeddedSolr(): failed", excp);

            throw new AtlasException("startEmbeddedSolr(): failed", excp);
        }

        LOG.info("<== startEmbeddedSolr()");
    }

    private static void stopEmbeddedSolr() throws AtlasException {
        LOG.info("==> stopEmbeddedSolr()");

        try {
            Class<?> localSolrRunnerClz = Class.forName("org.apache.atlas.runner.LocalSolrRunner");
            Method   stopMethod         = localSolrRunnerClz.getMethod("stop");

            stopMethod.invoke(null);
        } catch (Exception excp) {
            LOG.error("stopEmbeddedSolr(): failed", excp);

            throw new AtlasException("stopEmbeddedSolr(): failed", excp);
        }

        LOG.info("<== stopEmbeddedSolr()");
    }

    public static boolean isEmbeddedSolr() {
        boolean ret = false;

        try {
            Configuration conf     = ApplicationProperties.get();
            Object        property = conf.getProperty("atlas.graph.index.search.solr.embedded");

            if (property != null && property instanceof String) {
                ret = Boolean.valueOf((String) property);
            }
        } catch (AtlasException ignored) { }

        return ret;
    }
}
