/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
package org.apache.tinkerpop.gremlin.console.jsr223;

import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.tinkerpop.gremlin.driver.Client;
import org.apache.tinkerpop.gremlin.driver.Cluster;
import org.apache.tinkerpop.gremlin.driver.RequestOptions;
import org.apache.tinkerpop.gremlin.driver.Result;
import org.apache.tinkerpop.gremlin.driver.ResultSet;
import org.apache.tinkerpop.gremlin.driver.Tokens;
import org.apache.tinkerpop.gremlin.driver.exception.ResponseException;
import org.apache.tinkerpop.gremlin.driver.message.ResponseStatusCode;
import org.apache.tinkerpop.gremlin.jsr223.console.GremlinShellEnvironment;
import org.apache.tinkerpop.gremlin.jsr223.console.RemoteAcceptor;
import org.apache.tinkerpop.gremlin.jsr223.console.RemoteException;
import org.apache.tinkerpop.gremlin.structure.util.ElementHelper;
import org.apache.tinkerpop.gremlin.util.Gremlin;

import javax.security.sasl.SaslException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

/**
 * A {@link RemoteAcceptor} that takes input from the console and sends it to Gremlin Server over the standard
 * Java driver.
 *
 * @author Stephen Mallette (http://stephen.genoprime.com)
 * @author Marko A. Rodriguez (http://markorodriguez.com)
 */
public class DriverRemoteAcceptor implements RemoteAcceptor {
    public static final int NO_TIMEOUT = 0;
    public static final String USER_AGENT = "Gremlin Console/" + Gremlin.version();

    private Cluster currentCluster;
    private Client currentClient;
    private int timeout = NO_TIMEOUT;
    private Map<String,String> aliases = new HashMap<>();
    private Optional<String> session = Optional.empty();

    private static final String TOKEN_RESET = "reset";
    private static final String TOKEN_SHOW = "show";

    private static final String TOKEN_NONE = "none";
    private static final String TOKEN_TIMEOUT = "timeout";
    private static final String TOKEN_ALIAS = "alias";
    private static final String TOKEN_SESSION = "session";
    private static final String TOKEN_SESSION_MANAGED = "session-managed";
    private static final String TOKEN_HELP = "help";
    private static final List<String> POSSIBLE_TOKENS = Arrays.asList(TOKEN_TIMEOUT, TOKEN_ALIAS, TOKEN_HELP);

    private final GremlinShellEnvironment shellEnvironment;

    public DriverRemoteAcceptor(final GremlinShellEnvironment shellEnvironment) {
        this.shellEnvironment = shellEnvironment;
    }

    @Override
    public Object connect(final List<String> args) throws RemoteException {
        if (args.size() < 1) throw new RemoteException("Expects the location of a configuration file or variable name for a Cluster object as an argument");

        try {
            final String fileOrVar = args.get(0);
            if (new File(fileOrVar).isFile())
                this.currentCluster = Cluster.open(fileOrVar);
            else if (shellEnvironment.getVariable(fileOrVar) != null) {
                final Object o = shellEnvironment.getVariable(fileOrVar);
                if (o instanceof Cluster) {
                    this.currentCluster = (Cluster) o;
                }
            }

            if (null == currentCluster)
                throw new RemoteException("Expects the location of a configuration file or variable name for a Cluster object as an argument");
            final boolean useSession = args.size() >= 2 && (args.get(1).equals(TOKEN_SESSION) || args.get(1).equals(TOKEN_SESSION_MANAGED));
            if (useSession) {
                final String sessionName = args.size() == 3 ? args.get(2) : UUID.randomUUID().toString();
                session = Optional.of(sessionName);

                final boolean managed = args.get(1).equals(TOKEN_SESSION_MANAGED);

                this.currentClient = this.currentCluster.connect(sessionName, managed);
            } else {
                this.currentClient = this.currentCluster.connect();
            }
            this.currentClient.init();
            return String.format("Configured %s", this.currentCluster) + getSessionStringSegment();
        } catch (final FileNotFoundException ignored) {
            throw new RemoteException("The 'connect' option must be accompanied by a valid configuration file");
        } catch (final Exception ex) {
            throw new RemoteException("Error during 'connect' - " + ex.getMessage(), ex);
        }
    }

    @Override
    public Object configure(final List<String> args) throws RemoteException {
        final String option = args.size() == 0 ? "" : args.get(0);
        if (!POSSIBLE_TOKENS.contains(option))
            throw new RemoteException(String.format("The 'config' option expects one of ['%s'] as an argument", String.join(",", POSSIBLE_TOKENS)));

        final List<String> arguments = args.subList(1, args.size());

        if (option.equals(TOKEN_HELP)) {
            return ":remote config [timeout [<ms>|none]|alias [reset|show|<alias> <actual>]|help]";
        } else if (option.equals(TOKEN_TIMEOUT)) {
            final String errorMessage = "The timeout option expects a positive integer representing milliseconds or 'none' as an argument";
            if (arguments.size() != 1) throw new RemoteException(errorMessage);
            try {
                // first check for MAX timeout then NONE and finally parse the config to int.
                timeout = arguments.get(0).equals(TOKEN_NONE) ? NO_TIMEOUT : Integer.parseInt(arguments.get(0));
                if (timeout < NO_TIMEOUT) throw new RemoteException("The value for the timeout cannot be less than " + NO_TIMEOUT);
                return timeout == NO_TIMEOUT ? "Remote timeout is disabled" : "Set remote timeout to " + timeout + "ms";
            } catch (Exception ignored) {
                throw new RemoteException(errorMessage);
            }
        } else if (option.equals(TOKEN_ALIAS)) {
            if (arguments.size() == 1 && arguments.get(0).equals(TOKEN_RESET)) {
                aliases.clear();
                return "Aliases cleared";
            }

            if (arguments.size() == 1 && arguments.get(0).equals(TOKEN_SHOW)) {
                return aliases;
            }

            if (arguments.size() % 2 != 0)
                throw new RemoteException("Arguments to alias must be 'reset' to clear any existing alias settings or key/value alias/binding pairs");

            final Map<String,Object> aliasMap = ElementHelper.asMap(arguments.toArray());
            aliases.clear();
            aliasMap.forEach((k,v) -> aliases.put(k, v.toString()));
            return aliases;
        }

        return this.toString();
    }

    @Override
    public Object submit(final List<String> args) throws RemoteException {
        final String line = getScript(String.join(" ", args), this.shellEnvironment);

        try {
            final List<Result> resultSet = send(line);
            this.shellEnvironment.setVariable(RESULT, resultSet);
            return resultSet.stream().map(result -> result.getObject()).iterator();
        } catch (SaslException sasl) {
            throw new RemoteException("Security error - check username/password and related settings", sasl);
        } catch (Exception ex) {
            final Optional<ResponseException> inner = findResponseException(ex);
            if (inner.isPresent()) {
                final ResponseException responseException = inner.get();
                if (responseException.getResponseStatusCode() == ResponseStatusCode.SERVER_ERROR_TIMEOUT) {
                    throw new RemoteException(String.format("%s - try increasing the timeout with the :remote command", responseException.getMessage()));
                } else if (responseException.getResponseStatusCode() == ResponseStatusCode.SERVER_ERROR_SERIALIZATION)
                    throw new RemoteException(String.format(
                            "Server could not serialize the result requested. Server error - %s. Note that the class must be serializable by the client and server for proper operation.", responseException.getMessage()),
                            responseException.getRemoteStackTrace().orElse(null));
                else
                    throw new RemoteException(responseException.getMessage(), responseException.getRemoteStackTrace().orElse(null));
            } else if (ex.getCause() != null) {
                final Throwable rootCause = ExceptionUtils.getRootCause(ex);
                if (rootCause instanceof TimeoutException)
                    throw new RemoteException("Host did not respond in a timely fashion - check the server status and submit again.");
                else
                    throw new RemoteException(rootCause.getMessage());
            } else {
                throw new RemoteException(ex.getMessage());
            }
        }
    }

    @Override
    public void close() throws IOException {
        if (this.currentClient != null) this.currentClient.close();
        if (this.currentCluster != null) this.currentCluster.close();
    }

    public int getTimeout() {
        return timeout;
    }

    private List<Result> send(final String gremlin) throws SaslException {
        try {
            final RequestOptions.Builder options = RequestOptions.build();
            aliases.forEach(options::addAlias);
            if (timeout > NO_TIMEOUT)
                options.timeout(timeout);

            options.userAgent(USER_AGENT);

            final ResultSet rs = this.currentClient.submit(gremlin, options.create());
            final List<Result> results = rs.all().get();
            final Map<String, Object> statusAttributes = rs.statusAttributes().getNow(null);

            // Check for and print warnings
            if (null != statusAttributes && statusAttributes.containsKey(Tokens.STATUS_ATTRIBUTE_WARNINGS)) {
                final Object warningAttributeObject = statusAttributes.get(Tokens.STATUS_ATTRIBUTE_WARNINGS);
                if (warningAttributeObject instanceof List) {
                    for (Object warningListItem : (List<?>)warningAttributeObject)
                        shellEnvironment.errPrintln(String.valueOf(warningListItem));
                } else {
                    shellEnvironment.errPrintln(String.valueOf(warningAttributeObject));
                }
            }

            return results;
        } catch (Exception e) {
            // handle security error as-is and unwrapped
            final Optional<Throwable> throwable  = Stream.of(ExceptionUtils.getThrowables(e)).filter(t -> t instanceof SaslException).findFirst();
            if (throwable.isPresent())
                throw (SaslException) throwable.get();

            throw new IllegalStateException(e.getMessage(), e);
        }
    }

    @Override
    public boolean allowRemoteConsole() {
        return true;
    }

    @Override
    public String toString() {
        return "Gremlin Server - [" + this.currentCluster + "]" + getSessionStringSegment();
    }

    private Optional<ResponseException> findResponseException(final Throwable ex) {
        if (ex instanceof ResponseException)
            return Optional.of((ResponseException) ex);

        if (null == ex.getCause())
            return Optional.empty();

        return findResponseException(ex.getCause());
    }

    private String getSessionStringSegment() {
        return session.isPresent() ? String.format("-[%s]", session.get()) : "";
    }

    /**
     * Retrieve a script as defined in the shell context.  This allows for multi-line scripts to be submitted.
     */
    public static String getScript(final String submittedScript, final GremlinShellEnvironment shellEnvironment) {
        return submittedScript.startsWith("@") ? shellEnvironment.getVariable(submittedScript.substring(1)).toString() : submittedScript;
    }
}
