/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.zeppelin.interpreter.remote;

import java.util.*;

import org.apache.thrift.TException;
import org.apache.zeppelin.display.AngularObject;
import org.apache.zeppelin.display.AngularObjectRegistry;
import org.apache.zeppelin.display.GUI;
import org.apache.zeppelin.helium.ApplicationEventListener;
import org.apache.zeppelin.display.Input;
import org.apache.zeppelin.interpreter.*;
import org.apache.zeppelin.interpreter.InterpreterResult.Type;
import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResultMessage;
import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client;
import org.apache.zeppelin.scheduler.Scheduler;
import org.apache.zeppelin.scheduler.SchedulerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;

/**
 * Proxy for Interpreter instance that runs on separate process
 */
public class RemoteInterpreter extends Interpreter {
  private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreter.class);

  private final RemoteInterpreterProcessListener remoteInterpreterProcessListener;
  private final ApplicationEventListener applicationEventListener;
  private Gson gson = new Gson();
  private String interpreterRunner;
  private String interpreterPath;
  private String localRepoPath;
  private String className;
  private String sessionKey;
  private FormType formType;
  private boolean initialized;
  private Map<String, String> env;
  private int connectTimeout;
  private int maxPoolSize;
  private String host;
  private int port;
  private String portRange;
  private String userName;
  private Boolean isUserImpersonate;
  private int outputLimit = Constants.ZEPPELIN_INTERPRETER_OUTPUT_LIMIT;

  /**
   * Remote interpreter and manage interpreter process
   */
  public RemoteInterpreter(Properties property, String sessionKey, String className,
      String interpreterRunner, String interpreterPath, String localRepoPath, int connectTimeout,
      int maxPoolSize, RemoteInterpreterProcessListener remoteInterpreterProcessListener,
      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
      int outputLimit, String portRange) {
    super(property);
    this.sessionKey = sessionKey;
    this.className = className;
    initialized = false;
    this.interpreterRunner = interpreterRunner;
    this.interpreterPath = interpreterPath;
    this.localRepoPath = localRepoPath;
    env = getEnvFromInterpreterProperty(property);
    this.connectTimeout = connectTimeout;
    this.maxPoolSize = maxPoolSize;
    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
    this.applicationEventListener = appListener;
    this.userName = userName;
    this.isUserImpersonate = isUserImpersonate;
    this.outputLimit = outputLimit;
    this.portRange = portRange;
  }


  /**
   * Connect to existing process
   */
  public RemoteInterpreter(Properties property, String sessionKey, String className, String host,
      int port, String localRepoPath, int connectTimeout, int maxPoolSize,
      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate,
      int outputLimit) {
    super(property);
    this.sessionKey = sessionKey;
    this.className = className;
    initialized = false;
    this.host = host;
    this.port = port;
    this.localRepoPath = localRepoPath;
    this.connectTimeout = connectTimeout;
    this.maxPoolSize = maxPoolSize;
    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
    this.applicationEventListener = appListener;
    this.userName = userName;
    this.isUserImpersonate = isUserImpersonate;
    this.outputLimit = outputLimit;
  }


  // VisibleForTesting
  public RemoteInterpreter(Properties property, String sessionKey, String className,
      String interpreterRunner, String interpreterPath, String localRepoPath,
      Map<String, String> env, int connectTimeout,
      RemoteInterpreterProcessListener remoteInterpreterProcessListener,
      ApplicationEventListener appListener, String userName, Boolean isUserImpersonate) {
    super(property);
    this.className = className;
    this.sessionKey = sessionKey;
    this.interpreterRunner = interpreterRunner;
    this.interpreterPath = interpreterPath;
    this.localRepoPath = localRepoPath;
    env.putAll(getEnvFromInterpreterProperty(property));
    this.env = env;
    this.connectTimeout = connectTimeout;
    this.maxPoolSize = 10;
    this.remoteInterpreterProcessListener = remoteInterpreterProcessListener;
    this.applicationEventListener = appListener;
    this.userName = userName;
    this.isUserImpersonate = isUserImpersonate;
  }

  private Map<String, String> getEnvFromInterpreterProperty(Properties property) {
    Map<String, String> env = new HashMap<>();
    for (Object key : property.keySet()) {
      if (isEnvString((String) key)) {
        env.put((String) key, property.getProperty((String) key));
      }
    }
    return env;
  }

  static boolean isEnvString(String key) {
    if (key == null || key.length() == 0) {
      return false;
    }

    return key.matches("^[A-Z_0-9]*");
  }

  @Override
  public String getClassName() {
    return className;
  }

  private boolean connectToExistingProcess() {
    return host != null && port > 0;
  }

  public RemoteInterpreterProcess getInterpreterProcess() {
    InterpreterGroup intpGroup = getInterpreterGroup();
    if (intpGroup == null) {
      return null;
    }

    synchronized (intpGroup) {
      if (intpGroup.getRemoteInterpreterProcess() == null) {
        RemoteInterpreterProcess remoteProcess;
        if (connectToExistingProcess()) {
          remoteProcess = new RemoteInterpreterRunningProcess(
              connectTimeout,
              remoteInterpreterProcessListener,
              applicationEventListener,
              host,
              port);
        } else {
          // create new remote process
          remoteProcess = new RemoteInterpreterManagedProcess(
              interpreterRunner, interpreterPath, localRepoPath, portRange, env, connectTimeout,
              remoteInterpreterProcessListener, applicationEventListener);
        }

        intpGroup.setRemoteInterpreterProcess(remoteProcess);
      }

      return intpGroup.getRemoteInterpreterProcess();
    }
  }

  public synchronized void init() {
    if (initialized == true) {
      return;
    }

    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();

    final InterpreterGroup interpreterGroup = getInterpreterGroup();

    interpreterProcess.setMaxPoolSize(
        Math.max(this.maxPoolSize, interpreterProcess.getMaxPoolSize()));
    String groupId = interpreterGroup.getId();

    synchronized (interpreterProcess) {
      Client client = null;
      try {
        client = interpreterProcess.getClient();
      } catch (Exception e1) {
        throw new InterpreterException(e1);
      }

      boolean broken = false;
      try {
        logger.info("Create remote interpreter {}", getClassName());
        if (localRepoPath != null) {
          property.put("zeppelin.interpreter.localRepo", localRepoPath);
        }

        property.put("zeppelin.interpreter.output.limit", Integer.toString(outputLimit));
        client.createInterpreter(groupId, sessionKey,
            getClassName(), (Map) property, userName);
        // Push angular object loaded from JSON file to remote interpreter
        if (!interpreterGroup.isAngularRegistryPushed()) {
          pushAngularObjectRegistryToRemote(client);
          interpreterGroup.setAngularRegistryPushed(true);
        }

      } catch (TException e) {
        logger.error("Failed to create interpreter: {}", getClassName());
        throw new InterpreterException(e);
      } finally {
        // TODO(jongyoul): Fixed it when not all of interpreter in same interpreter group are broken
        interpreterProcess.releaseClient(client, broken);
      }
    }
    initialized = true;
  }


  @Override
  public void open() {
    InterpreterGroup interpreterGroup = getInterpreterGroup();

    synchronized (interpreterGroup) {
      // initialize all interpreters in this interpreter group
      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
      // doesn't call open method, it's not open. It causes problem while running intp.close()
      // In case of Spark, this method initializes all of interpreters and init() method increases
      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
      // But for now, we have to initialise all of interpreters for some reasons.
      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
      RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
      if (!initialized) {
        // reference per session
        interpreterProcess.reference(interpreterGroup, userName, isUserImpersonate);
      }
      for (Interpreter intp : new ArrayList<>(interpreters)) {
        Interpreter p = intp;
        while (p instanceof WrappedInterpreter) {
          p = ((WrappedInterpreter) p).getInnerInterpreter();
        }
        try {
          ((RemoteInterpreter) p).init();
        } catch (InterpreterException e) {
          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
              p.getClassName());
          interpreters.remove(p);
        }
      }
    }
  }

  @Override
  public void close() {
    InterpreterGroup interpreterGroup = getInterpreterGroup();
    synchronized (interpreterGroup) {
      // close all interpreters in this session
      List<Interpreter> interpreters = interpreterGroup.get(sessionKey);
      // TODO(jl): this open method is called by LazyOpenInterpreter.open(). It, however,
      // initializes all of interpreters with same sessionKey. But LazyOpenInterpreter assumes if it
      // doesn't call open method, it's not open. It causes problem while running intp.close()
      // In case of Spark, this method initializes all of interpreters and init() method increases
      // reference count of RemoteInterpreterProcess. But while closing this interpreter group, all
      // other interpreters doesn't do anything because those LazyInterpreters aren't open.
      // But for now, we have to initialise all of interpreters for some reasons.
      // See Interpreter.getInterpreterInTheSameSessionByClassName(String)
      if (initialized) {
        // dereference per session
        getInterpreterProcess().dereference();
      }
      for (Interpreter intp : new ArrayList<>(interpreters)) {
        Interpreter p = intp;
        while (p instanceof WrappedInterpreter) {
          p = ((WrappedInterpreter) p).getInnerInterpreter();
        }
        try {
          ((RemoteInterpreter) p).closeInterpreter();
        } catch (InterpreterException e) {
          logger.error("Failed to initialize interpreter: {}. Remove it from interpreterGroup",
              p.getClassName());
          interpreters.remove(p);
        }
      }
    }
  }

  public void closeInterpreter() {
    if (this.initialized == false) {
      return;
    }
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    Client client = null;
    boolean broken = false;
    try {
      client = interpreterProcess.getClient();
      if (client != null) {
        client.close(sessionKey, className);
      }
    } catch (TException e) {
      broken = true;
      throw new InterpreterException(e);
    } catch (Exception e1) {
      throw new InterpreterException(e1);
    } finally {
      if (client != null) {
        interpreterProcess.releaseClient(client, broken);
      }
      this.initialized = false;
    }
  }

  @Override
  public InterpreterResult interpret(String st, InterpreterContext context) {
    if (logger.isDebugEnabled()) {
      logger.debug("st:\n{}", st);
    }

    FormType form = getFormType();
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    Client client = null;
    try {
      client = interpreterProcess.getClient();
    } catch (Exception e1) {
      throw new InterpreterException(e1);
    }

    InterpreterContextRunnerPool interpreterContextRunnerPool = interpreterProcess
        .getInterpreterContextRunnerPool();

    List<InterpreterContextRunner> runners = context.getRunners();
    if (runners != null && runners.size() != 0) {
      // assume all runners in this InterpreterContext have the same note id
      String noteId = runners.get(0).getNoteId();

      interpreterContextRunnerPool.clear(noteId);
      interpreterContextRunnerPool.addAll(noteId, runners);
    }

    boolean broken = false;
    try {

      final GUI currentGUI = context.getGui();
      RemoteInterpreterResult remoteResult = client.interpret(
          sessionKey, className, st, convert(context));

      Map<String, Object> remoteConfig = (Map<String, Object>) gson.fromJson(
          remoteResult.getConfig(), new TypeToken<Map<String, Object>>() {
          }.getType());
      context.getConfig().clear();
      context.getConfig().putAll(remoteConfig);

      if (form == FormType.NATIVE) {
        GUI remoteGui = gson.fromJson(remoteResult.getGui(), GUI.class);
        currentGUI.clear();
        currentGUI.setParams(remoteGui.getParams());
        currentGUI.setForms(remoteGui.getForms());
      } else if (form == FormType.SIMPLE) {
        final Map<String, Input> currentForms = currentGUI.getForms();
        final Map<String, Object> currentParams = currentGUI.getParams();
        final GUI remoteGUI = gson.fromJson(remoteResult.getGui(), GUI.class);
        final Map<String, Input> remoteForms = remoteGUI.getForms();
        final Map<String, Object> remoteParams = remoteGUI.getParams();
        currentForms.putAll(remoteForms);
        currentParams.putAll(remoteParams);
      }

      InterpreterResult result = convert(remoteResult);
      return result;
    } catch (TException e) {
      broken = true;
      throw new InterpreterException(e);
    } finally {
      interpreterProcess.releaseClient(client, broken);
    }
  }

  @Override
  public void cancel(InterpreterContext context) {
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    Client client = null;
    try {
      client = interpreterProcess.getClient();
    } catch (Exception e1) {
      throw new InterpreterException(e1);
    }

    boolean broken = false;
    try {
      client.cancel(sessionKey, className, convert(context));
    } catch (TException e) {
      broken = true;
      throw new InterpreterException(e);
    } finally {
      interpreterProcess.releaseClient(client, broken);
    }
  }

  @Override
  public FormType getFormType() {
    open();

    if (formType != null) {
      return formType;
    }

    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    Client client = null;
    try {
      client = interpreterProcess.getClient();
    } catch (Exception e1) {
      throw new InterpreterException(e1);
    }

    boolean broken = false;
    try {
      formType = FormType.valueOf(client.getFormType(sessionKey, className));
      return formType;
    } catch (TException e) {
      broken = true;
      throw new InterpreterException(e);
    } finally {
      interpreterProcess.releaseClient(client, broken);
    }
  }

  @Override
  public int getProgress(InterpreterContext context) {
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    if (interpreterProcess == null || !interpreterProcess.isRunning()) {
      return 0;
    }

    Client client = null;
    try {
      client = interpreterProcess.getClient();
    } catch (Exception e1) {
      throw new InterpreterException(e1);
    }

    boolean broken = false;
    try {
      return client.getProgress(sessionKey, className, convert(context));
    } catch (TException e) {
      broken = true;
      throw new InterpreterException(e);
    } finally {
      interpreterProcess.releaseClient(client, broken);
    }
  }


  @Override
  public List<InterpreterCompletion> completion(String buf, int cursor) {
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    Client client = null;
    try {
      client = interpreterProcess.getClient();
    } catch (Exception e1) {
      throw new InterpreterException(e1);
    }

    boolean broken = false;
    try {
      List completion = client.completion(sessionKey, className, buf, cursor);
      return completion;
    } catch (TException e) {
      broken = true;
      throw new InterpreterException(e);
    } finally {
      interpreterProcess.releaseClient(client, broken);
    }
  }

  @Override
  public Scheduler getScheduler() {
    int maxConcurrency = maxPoolSize;
    RemoteInterpreterProcess interpreterProcess = getInterpreterProcess();
    if (interpreterProcess == null) {
      return null;
    } else {
      return SchedulerFactory.singleton().createOrGetRemoteScheduler(
          RemoteInterpreter.class.getName() + sessionKey + interpreterProcess.hashCode(),
          sessionKey, interpreterProcess, maxConcurrency);
    }
  }

  private String getInterpreterGroupKey(InterpreterGroup interpreterGroup) {
    return interpreterGroup.getId();
  }

  private RemoteInterpreterContext convert(InterpreterContext ic) {
    return new RemoteInterpreterContext(ic.getNoteId(), ic.getParagraphId(), ic.getReplName(),
        ic.getParagraphTitle(), ic.getParagraphText(), gson.toJson(ic.getAuthenticationInfo()),
        gson.toJson(ic.getConfig()), gson.toJson(ic.getGui()), gson.toJson(ic.getRunners()));
  }

  private InterpreterResult convert(RemoteInterpreterResult result) {
    InterpreterResult r = new InterpreterResult(
        InterpreterResult.Code.valueOf(result.getCode()));

    for (RemoteInterpreterResultMessage m : result.getMsg()) {
      r.add(InterpreterResult.Type.valueOf(m.getType()), m.getData());
    }

    return r;
  }

  /**
   * Push local angular object registry to
   * remote interpreter. This method should be
   * call ONLY inside the init() method
   */
  void pushAngularObjectRegistryToRemote(Client client) throws TException {
    final AngularObjectRegistry angularObjectRegistry = this.getInterpreterGroup()
        .getAngularObjectRegistry();

    if (angularObjectRegistry != null && angularObjectRegistry.getRegistry() != null) {
      final Map<String, Map<String, AngularObject>> registry = angularObjectRegistry
          .getRegistry();

      logger.info("Push local angular object registry from ZeppelinServer to" +
          " remote interpreter group {}", this.getInterpreterGroup().getId());

      final java.lang.reflect.Type registryType = new TypeToken<Map<String,
          Map<String, AngularObject>>>() {
      }.getType();

      Gson gson = new Gson();
      client.angularRegistryPush(gson.toJson(registry, registryType));
    }
  }

  public Map<String, String> getEnv() {
    return env;
  }

  public void setEnv(Map<String, String> env) {
    this.env = env;
  }

  public void addEnv(Map<String, String> env) {
    if (this.env == null) {
      this.env = new HashMap<>();
    }
    this.env.putAll(env);
  }

  //Only for test
  public String getInterpreterRunner() {
    return interpreterRunner;
  }
}
