/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
 * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
 * and limitations under the License.
 */

package org.apache.storm.scheduler.utils;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Comparator;
import java.util.Map;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.HttpClient;
import org.apache.http.client.ResponseHandler;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.utils.URIBuilder;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.storm.DaemonConfig;
import org.apache.storm.utils.ServerConfigUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.yaml.snakeyaml.Yaml;
import org.yaml.snakeyaml.constructor.SafeConstructor;

/**
 * A dynamic loader that can load scheduler configurations for user resource guarantees from Artifactory (an artifact repository manager).
 */
public class ArtifactoryConfigLoader implements IConfigLoader {
    protected static final String LOCAL_ARTIFACT_DIR = "scheduler_artifacts";
    static final String cacheFilename = "latest.yaml";
    private static final String DEFAULT_ARTIFACTORY_BASE_DIRECTORY = "/artifactory";
    private static final int DEFAULT_POLLTIME_SECS = 600;
    private static final int DEFAULT_TIMEOUT_SECS = 10;
    private static final String ARTIFACTORY_SCHEME_PREFIX = "artifactory+";

    private static final Logger LOG = LoggerFactory.getLogger(ArtifactoryConfigLoader.class);

    private Map<String, Object> conf;
    private int artifactoryPollTimeSecs = DEFAULT_POLLTIME_SECS;
    private boolean cacheInitialized = false;
    // Location of the file in the artifactory archive.  Also used to name file in cache.
    private String localCacheDir;
    private String baseDirectory = DEFAULT_ARTIFACTORY_BASE_DIRECTORY;
    private int lastReturnedTime = 0;
    private int timeoutSeconds = DEFAULT_TIMEOUT_SECS;
    private Map<String, Object> lastReturnedValue;
    private URI targetUri = null;
    private JSONParser jsonParser;
    private String scheme;

    public ArtifactoryConfigLoader(Map<String, Object> conf) {
        this.conf = conf;
        Integer thisTimeout = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_TIMEOUT_SECS);
        if (thisTimeout != null) {
            timeoutSeconds = thisTimeout;
        }
        Integer thisPollTime = (Integer) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_POLLTIME_SECS);
        if (thisPollTime != null) {
            artifactoryPollTimeSecs = thisPollTime;
        }
        String thisBase = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_ARTIFACTORY_BASE_DIRECTORY);
        if (thisBase != null) {
            baseDirectory = thisBase;
        }
        String uriString = (String) conf.get(DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
        if (uriString == null) {
            LOG.error("No URI defined in {} configuration.", DaemonConfig.SCHEDULER_CONFIG_LOADER_URI);
        } else {
            try {
                targetUri = new URI(uriString);
                scheme = targetUri.getScheme().substring(ARTIFACTORY_SCHEME_PREFIX.length());
            } catch (URISyntaxException e) {
                LOG.error("Failed to parse uri={}", uriString);
            }
        }
        jsonParser = new JSONParser();
    }

    /**
     * Load the configs associated with the configKey from the targetURI.
     * @param configKey The key from which we want to get the scheduler config.
     * @return The scheduler configuration if exists; null otherwise.
     */
    @Override
    public Map<String, Object> load(String configKey) {
        if (targetUri == null) {
            return null;
        }

        // Check for new file every so often
        int currentTimeSecs = Time.currentTimeSecs();
        if (lastReturnedValue != null && ((currentTimeSecs - lastReturnedTime) < artifactoryPollTimeSecs)) {
            LOG.debug("currentTimeSecs: {}; lastReturnedTime {}; artifactoryPollTimeSecs: {}. Returning our last map.",
                      currentTimeSecs, lastReturnedTime, artifactoryPollTimeSecs);
            return (Map<String, Object>) lastReturnedValue.get(configKey);
        }

        try {
            Map<String, Object> raw = loadFromUri(targetUri);
            if (raw != null) {
                return (Map<String, Object>) raw.get(configKey);
            }
        } catch (Exception e) {
            LOG.error("Failed to load from uri {}", targetUri);
        }
        return null;
    }

    /**
     * Protected so we can override this in unit tests.
     *
     * @param api null if we are trying to download artifact, otherwise a string to call REST api,
     *        e.g. "/api/storage"
     * @param artifact location of artifact
     * @param host Artifactory hostname
     * @param port Artifactory port
     * @return null on failure or the response string if return code is in 200 range
     */
    protected String doGet(String api, String artifact, String host, Integer port) {
        URIBuilder builder = new URIBuilder().setScheme(scheme).setHost(host).setPort(port);

        String path = null;
        if (api != null) {
            path = baseDirectory + "/" + api + "/" + artifact;
        } else {
            path = baseDirectory + "/" + artifact;
        }

        // Get rid of multiple '/' in url
        path = path.replaceAll("/[/]+", "/");
        builder.setPath(path);

        RequestConfig requestConfig = RequestConfig.custom().setConnectTimeout(timeoutSeconds * 1000).build();
        HttpClient httpclient = HttpClientBuilder.create().setDefaultRequestConfig(requestConfig).build();

        String returnValue;
        try {
            LOG.debug("About to issue a GET to {}", builder);
            HttpGet httpget = new HttpGet(builder.build());
            String responseBody;
            responseBody = httpclient.execute(httpget, GetStringResponseHandler.getInstance());
            returnValue = responseBody;
        } catch (Exception e) {
            LOG.error("Received exception while connecting to Artifactory", e);
            returnValue = null;
        }

        LOG.debug("Returning {}", returnValue);
        return returnValue;
    }

    private JSONObject getArtifactMetadata(String location, String host, Integer port) {
        String metadataStr = null;

        metadataStr = doGet("/api/storage", location, host, port);

        if (metadataStr == null) {
            return null;
        }

        JSONObject returnValue;
        try {
            returnValue = (JSONObject) jsonParser.parse(metadataStr);
        } catch (ParseException e) {
            LOG.error("Could not parse JSON string {}", metadataStr, e);
            return null;
        }

        return returnValue;
    }

    private String loadMostRecentArtifact(String location, String host, Integer port) {
        // Is this a directory or is it a file?
        JSONObject json = getArtifactMetadata(location, host, port);
        if (json == null) {
            LOG.error("got null metadata");
            return null;
        }
        String downloadUri = (String) json.get("downloadUri");

        // This means we are pointing at a file.
        if (downloadUri != null) {
            // Then get it and return the file as string.
            String returnValue = doGet(null, location, host, port);
            saveInArtifactoryCache(returnValue);
            return returnValue;
        }

        // This should mean that we were pointed at a directory.
        // Find the most recent child and load that.
        JSONArray msg = (JSONArray) json.get("children");
        if (msg == null || msg.size() == 0) {
            LOG.error("Expected directory children not present");
            return null;
        }
        JSONObject newest = (JSONObject) Collections.max(msg, new DirEntryCompare());
        if (newest == null) {
            LOG.error("Failed to find most recent artifact uri in {}", location);
            return null;
        }

        String uri = (String) newest.get("uri");
        if (uri == null) {
            LOG.error("Expected directory uri not present");
            return null;
        }
        String returnValue = doGet(null, location + uri, host, port);
        saveInArtifactoryCache(returnValue);
        return returnValue;
    }

    private void updateLastReturned(Map ret) {
        lastReturnedTime = Time.currentTimeSecs();
        lastReturnedValue = ret;
    }

    private Map<String, Object> loadFromFile(File file) {
        Map<String, Object> ret = null;

        try {
            ret = (Map<String, Object>) Utils.readYamlFile(file.getCanonicalPath());
        } catch (IOException e) {
            LOG.error("Filed to load from file. Exception: {}", e.getMessage());
        }

        if (ret != null) {
            try {
                LOG.debug("returning a new map from file {}", file.getCanonicalPath());
            } catch (java.io.IOException e) {
                LOG.debug("Could not get PATH from file object in debug print. Ignoring");
            }
            return ret;
        }

        return null;
    }

    private Map<String, Object> getLatestFromCache() {
        String localFileName = localCacheDir + File.separator + cacheFilename;
        return loadFromFile(new File(localFileName));
    }

    private void saveInArtifactoryCache(String yamlData) {
        if (yamlData == null) {
            LOG.warn("Will not save null data into the artifactory cache");
            return;
        }

        String localFileName = localCacheDir + File.separator + cacheFilename;

        File cacheFile = new File(localFileName);
        try (FileOutputStream fos = new FileOutputStream(cacheFile)) {
            fos.write(yamlData.getBytes());
            fos.flush();
        } catch (IOException e) {
            LOG.error("Received exception when writing file {}.  Attempting delete", localFileName, e);
            try {
                cacheFile.delete();
            } catch (Exception deleteException) {
                LOG.error("Received exception when deleting file {}.", localFileName, deleteException);
            }
        }
    }

    private void makeArtifactoryCache(String location) throws IOException {
        // First make the cache dir
        String localDirName = ServerConfigUtils.masterLocalDir(conf) + File.separator + LOCAL_ARTIFACT_DIR;
        File dir = new File(localDirName);
        if (!dir.exists()) {
            dir.mkdirs();
        }

        localCacheDir = localDirName + File.separator + location.replaceAll(File.separator, "_");
        dir = new File(localCacheDir);
        if (!dir.exists()) {
            dir.mkdir();
        }
        cacheInitialized = true;
    }

    private Map<String, Object> loadFromUri(URI uri) throws IOException {
        String host = uri.getHost();
        Integer port = uri.getPort();
        String location = uri.getPath();
        if (location.toLowerCase().startsWith(baseDirectory.toLowerCase())) {
            location = location.substring(baseDirectory.length());
        }

        if (!cacheInitialized) {
            makeArtifactoryCache(location);
        }

        // Get the most recent artifact as a String, and then parse the yaml
        String yamlConfig = loadMostRecentArtifact(location, host, port);

        // If we failed to get anything from Artifactory try to get it from our local cache
        if (yamlConfig == null) {
            Map<String, Object> ret = getLatestFromCache();
            updateLastReturned(ret);
            return ret;
        }

        // Now parse it and return the map.
        Yaml yaml = new Yaml(new SafeConstructor());
        Map ret = null;
        try {
            ret = (Map) yaml.load(yamlConfig);
        } catch (Exception e) {
            LOG.error("Could not parse yaml.");
            return null;
        }

        if (ret != null) {
            LOG.debug("returning a new map from Artifactory");
            updateLastReturned(ret);
            return ret;
        }

        return null;
    }

    /**
     * A private class used to check the response coming back from httpclient.
     */
    private static class GetStringResponseHandler implements ResponseHandler<String> {
        private static GetStringResponseHandler singleton = null;

        /**
         * Get instance.
         * @return a singleton httpclient GET response handler
         */
        public static GetStringResponseHandler getInstance() {
            if (singleton == null) {
                singleton = new GetStringResponseHandler();
            }
            return singleton;
        }

        /**
         * Handle response.
         * @param response The http response to verify.
         * @return null on failure or the response string if return code is in 200 range
         */
        @Override
        public String handleResponse(final HttpResponse response) throws IOException {
            int status = response.getStatusLine().getStatusCode();
            HttpEntity entity = response.getEntity();
            String entityString = (entity != null ? EntityUtils.toString(entity) : null);
            if (status >= 200 && status < 300) {
                return entityString;
            } else {
                LOG.error("Got unexpected response code {}; entity: {}", status, entityString);
                return null;
            }
        }
    }

    private class DirEntryCompare implements Comparator<JSONObject> {

        @Override
        public int compare(JSONObject o1, JSONObject o2) {
            return ((String) o1.get("uri")).compareTo((String) o2.get("uri"));
        }
    }
}