/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.planner.sql.handlers;

import com.google.common.collect.Lists;
import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.commons.io.FileUtils;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.util.DrillFileUtils;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.FunctionValidationException;
import org.apache.drill.exec.exception.JarValidationException;
import org.apache.drill.exec.exception.VersionMismatchException;
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.fn.registry.RemoteFunctionRegistry;
import org.apache.drill.exec.physical.PhysicalPlan;
import org.apache.drill.exec.planner.sql.DirectPlan;
import org.apache.drill.exec.planner.sql.parser.SqlCreateFunction;
import org.apache.drill.exec.proto.UserBitShared.Jar;
import org.apache.drill.exec.proto.UserBitShared.Registry;
import org.apache.drill.exec.store.sys.store.DataChangeVersion;
import org.apache.drill.exec.util.JarUtil;
import org.apache.drill.exec.work.foreman.ForemanSetupException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.UUID;

public class CreateFunctionHandler extends DefaultSqlHandler {
  private static Logger logger = LoggerFactory.getLogger(CreateFunctionHandler.class);

  public CreateFunctionHandler(SqlHandlerConfig config) {
    super(config);
  }

  /**
   * Registers UDFs dynamically. Process consists of several steps:
   * <ol>
   * <li>Registering jar in jar registry to ensure that several jars with the same name is not registered.</li>
   * <li>Binary and source jars validation and back up.</li>
   * <li>Validation against local function registry.</li>
   * <li>Validation against remote function registry.</li>
   * <li>Remote function registry update.</li>
   * <li>Copying of jars to registry area and clean up.</li>
   * </ol>
   *
   * UDFs registration is allowed only if dynamic UDFs support is enabled.
   *
   * @return - Single row indicating list of registered UDFs, or error message otherwise.
   */
  @Override
  public PhysicalPlan getPlan(SqlNode sqlNode) throws ForemanSetupException, IOException {
    if (!context.getOption(ExecConstants.DYNAMIC_UDF_SUPPORT_ENABLED).bool_val) {
      throw UserException.validationError()
          .message("Dynamic UDFs support is disabled.")
          .build(logger);
    }
    RemoteFunctionRegistry remoteRegistry = context.getRemoteFunctionRegistry();
    JarManager jarManager = new JarManager(sqlNode, remoteRegistry);

    boolean inProgress = false;
    try {
      final String action = remoteRegistry.addToJars(jarManager.getBinaryName(), RemoteFunctionRegistry.Action.REGISTRATION);
      if (!(inProgress = action == null)) {
        return DirectPlan.createDirectPlan(context, false,
            String.format("Jar with %s name is used. Action: %s", jarManager.getBinaryName(), action));
      }

      jarManager.initRemoteBackup();
      List<String> functions = validateAgainstLocalRegistry(jarManager, context.getFunctionRegistry());
      initRemoteRegistration(functions, jarManager, remoteRegistry);
      jarManager.deleteQuietlyFromStagingArea();

      return DirectPlan.createDirectPlan(context, true,
          String.format("The following UDFs in jar %s have been registered:\n%s", jarManager.getBinaryName(), functions));

    } catch (Exception e) {
      logger.error("Error during UDF registration", e);
      return DirectPlan.createDirectPlan(context, false, e.getMessage());
    } finally {
      if (inProgress) {
        remoteRegistry.removeFromJars(jarManager.getBinaryName());
      }
      jarManager.cleanUp();
    }
  }

  /**
   * Instantiates coping of binary to local file system
   * and validates functions from this jar against local function registry.
   *
   * @param jarManager helps coping binary to local file system
   * @param localFunctionRegistry instance of local function registry to instantiate local validation
   * @return list of validated function signatures
   * @throws IOException in case of problems during copying binary to local file system
   * @throws FunctionValidationException in case duplicated function was found
   */
  private List<String> validateAgainstLocalRegistry(JarManager jarManager,
                                                    FunctionImplementationRegistry localFunctionRegistry) throws IOException {
    Path localBinary = jarManager.copyBinaryToLocal();
    return localFunctionRegistry.validate(localBinary);
  }

  /**
   * Validates jar and its functions against remote jars.
   * First checks if there is no duplicate by jar name and then looks for duplicates among functions.
   *
   * @param remoteJars list of remote jars to validate against
   * @param jarName jar name to be validated
   * @param functions list of functions present in jar to be validated
   * @throws JarValidationException in case of jar with the same name was found
   * @throws FunctionValidationException in case duplicated function was found
   */
  private void validateAgainstRemoteRegistry(List<Jar> remoteJars, String jarName, List<String> functions) {
    for (Jar remoteJar : remoteJars) {
      if (remoteJar.getName().equals(jarName)) {
        throw new JarValidationException(String.format("Jar with %s name has been already registered", jarName));
      }
      for (String remoteFunction : remoteJar.getFunctionSignatureList()) {
        for (String func : functions) {
          if (remoteFunction.equals(func)) {
            throw new FunctionValidationException(
                String.format("Found duplicated function in %s: %s", remoteJar.getName(), remoteFunction));
          }
        }
      }
    }
  }

  /**
   * Instantiates remote registration. First gets remote function registry with version.
   * Version is used to ensure that we update the same registry we validated against.
   * Then validates against list of remote jars.
   * If validation is successful, first copies jars to registry area and starts updating remote function registry.
   * If during update {@link VersionMismatchException} was detected,
   * attempts to repeat remote registration process till retry attempts exceeds the limit.
   * If retry attempts number hits 0, throws exception that failed to update remote function registry.
   * In case of any error, if jars have been already copied to registry area, they will be deleted.
   *
   * @param functions list of functions present in jar
   * @param jarManager helper class for copying jars to registry area
   * @param remoteRegistry remote function registry
   * @throws IOException in case of problems with copying jars to registry area
   */
  private void initRemoteRegistration(List<String> functions,
                  JarManager jarManager,
                  RemoteFunctionRegistry remoteRegistry) throws IOException {
    int retryAttempts = remoteRegistry.getRetryAttempts();
    boolean copyJars = true;
    try {
      while (retryAttempts >= 0) {
        DataChangeVersion version = new DataChangeVersion();
        List<Jar> remoteJars = remoteRegistry.getRegistry(version).getJarList();
        validateAgainstRemoteRegistry(remoteJars, jarManager.getBinaryName(), functions);
        if (copyJars) {
          jarManager.copyToRegistryArea();
          copyJars = false;
        }
        List<Jar> jars = Lists.newArrayList(remoteJars);
        jars.add(Jar.newBuilder().setName(jarManager.getBinaryName()).addAllFunctionSignature(functions).build());
        Registry updatedRegistry = Registry.newBuilder().addAllJar(jars).build();
        try {
          remoteRegistry.updateRegistry(updatedRegistry, version);
          return;
        } catch (VersionMismatchException ex) {
          logger.debug("Failed to update function registry during registration, version mismatch was detected.", ex);
          retryAttempts--;
        }
      }
      throw new DrillRuntimeException("Failed to update remote function registry. Exceeded retry attempts limit.");
    } catch (Exception e) {
      if (!copyJars) {
        jarManager.deleteQuietlyFromRegistryArea();
      }
      throw e;
    }
  }

  /**
   * Inner helper class that encapsulates logic for working with jars.
   * During initialization it creates path to staging jar, local and remote temporary jars, registry jars.
   * Is responsible for validation, copying and deletion actions.
   */
  private class JarManager {

    private final String binaryName;
    private final FileSystem fs;

    private final Path remoteTmpDir;
    private final Path localTmpDir;

    private final Path stagingBinary;
    private final Path stagingSource;

    private final Path tmpRemoteBinary;
    private final Path tmpRemoteSource;

    private final Path registryBinary;
    private final Path registrySource;

    JarManager(SqlNode sqlNode, RemoteFunctionRegistry remoteRegistry) throws ForemanSetupException {
      SqlCreateFunction node = unwrap(sqlNode, SqlCreateFunction.class);
      this.binaryName = ((SqlCharStringLiteral) node.getJar()).toValue();
      String sourceName = JarUtil.getSourceName(binaryName);

      this.stagingBinary = new Path(remoteRegistry.getStagingArea(), binaryName);
      this.stagingSource = new Path(remoteRegistry.getStagingArea(), sourceName);

      this.remoteTmpDir = new Path(remoteRegistry.getTmpArea(), UUID.randomUUID().toString());
      this.tmpRemoteBinary = new Path(remoteTmpDir, binaryName);
      this.tmpRemoteSource = new Path(remoteTmpDir, sourceName);

      this.registryBinary = new Path(remoteRegistry.getRegistryArea(), binaryName);
      this.registrySource = new Path(remoteRegistry.getRegistryArea(), sourceName);

      this.localTmpDir = new Path(DrillFileUtils.createTempDir().toURI());
      this.fs = remoteRegistry.getFs();
    }

    /**
     * @return binary jar name
     */
    String getBinaryName() {
      return binaryName;
    }

    /**
     * Validates that both binary and source jar are present in staging area,
     * it is expected that binary and source have standard naming convention.
     * Backs up both jars to unique folder in remote temporary area.
     *
     * @throws IOException in case of binary or source absence or problems during copying jars
     */
    void initRemoteBackup() throws IOException {
      checkPathExistence(stagingBinary);
      checkPathExistence(stagingSource);
      fs.mkdirs(remoteTmpDir);
      FileUtil.copy(fs, stagingBinary, fs, tmpRemoteBinary, false, true, fs.getConf());
      FileUtil.copy(fs, stagingSource, fs, tmpRemoteSource, false, true, fs.getConf());
    }

    /**
     * Copies binary jar to unique folder on local file system.
     * Source jar is not needed for local validation.
     *
     * @return path to local binary jar
     * @throws IOException in case of problems during copying binary jar
     */
    Path copyBinaryToLocal() throws IOException {
      Path localBinary = new Path(localTmpDir, binaryName);
      fs.copyToLocalFile(tmpRemoteBinary, localBinary);
      return localBinary;
    }

    /**
     * Copies binary and source jars to registry area,
     * in case of {@link IOException} removes copied jar(-s) from registry area
     *
     * @throws IOException is re-thrown in case of problems during copying process
     */
    void copyToRegistryArea() throws IOException {
      FileUtil.copy(fs, tmpRemoteBinary, fs, registryBinary, false, true, fs.getConf());
      try {
        FileUtil.copy(fs, tmpRemoteSource, fs, registrySource, false, true, fs.getConf());
      } catch (IOException e) {
        deleteQuietly(registryBinary, false);
        throw new IOException(e);
      }
    }

    /**
     * Deletes binary and sources jars from staging area, in case of problems, logs warning and proceeds.
     */
    void deleteQuietlyFromStagingArea() {
      deleteQuietly(stagingBinary, false);
      deleteQuietly(stagingSource, false);
    }

    /**
     * Deletes binary and sources jars from registry area, in case of problems, logs warning and proceeds.
     */
    void deleteQuietlyFromRegistryArea() {
      deleteQuietly(registryBinary, false);
      deleteQuietly(registrySource, false);
    }

    /**
     * Removes quietly remote and local unique folders in temporary directories.
     */
    void cleanUp() {
      FileUtils.deleteQuietly(new File(localTmpDir.toUri()));
      deleteQuietly(remoteTmpDir, true);
    }

    /**
     * Checks if passed path exists on predefined file system.
     *
     * @param path path to be checked
     * @throws IOException if path does not exist
     */
    private void checkPathExistence(Path path) throws IOException {
      if (!fs.exists(path)) {
        throw new IOException(String.format("File %s does not exist on file system %s",
            path.toUri().getPath(), fs.getUri()));
      }
    }

    /**
     * Deletes quietly file or directory, in case of errors, logs warning and proceeds.
     *
     * @param path path to file or directory
     * @param isDirectory set to true if we need to delete a directory
     */
    private void deleteQuietly(Path path, boolean isDirectory) {
      try {
        fs.delete(path, isDirectory);
      } catch (IOException e) {
        logger.warn(String.format("Error during deletion [%s]", path.toUri().getPath()), e);
      }
    }
  }
}
