/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.oozie.tools;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Files;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.FilenameUtils;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.oozie.cli.CLIParser;
import org.apache.oozie.service.HadoopAccessorService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.WorkflowAppService;
import org.eclipse.jetty.util.ConcurrentHashSet;

public class OozieSharelibCLI {
    public static final String[] HELP_INFO = {
            "",
            "OozieSharelibCLI creates or upgrade sharelib for oozie",
    };
    public static final String HELP_CMD = "help";
    public static final String CREATE_CMD = "create";
    public static final String UPGRADE_CMD = "upgrade";
    public static final String LIB_OPT = "locallib";
    public static final String EXTRALIBS = "extralib";
    public static final String FS_OPT = "fs";
    public static final String CONCURRENCY_OPT = "concurrency";
    public static final String OOZIE_HOME = "oozie.home.dir";
    public static final String SHARE_LIB_PREFIX = "lib_";
    public static final String NEW_LINE = System.lineSeparator();
    public static final String EXTRALIBS_USAGE = "Extra sharelib resources. " +
            "This option requires a pair of sharelibname and coma-separated list of pathnames" +
            " in the following format:" + NEW_LINE +
            "\"sharelib_name=pathname[,pathname...]\"" + NEW_LINE +
            "Caveats:" + NEW_LINE +
            "* Each pathname is either a directory or a regular file (compressed files are not extracted prior to " +
            "the upload operation)." + NEW_LINE +
            "* Sharelibname shall be specified only once." + NEW_LINE + NEW_LINE +
            "* Do not upload multiple conflicting library versions for an extra sharelib directory as it may " +
            "cause runtime issues." + NEW_LINE +
            "This option can be present multiple times, in case of more than one sharelib" + NEW_LINE +
            "Example command:" + NEW_LINE + NEW_LINE +
            "$ oozie-setup.sh sharelib create -fs hdfs://localhost:9000 -locallib oozie-sharelib.tar.gz " +
            "-extralib share2=dir2,file2 -extralib share3=file3";
    public static final String EXTRALIBS_PATH_SEPARATOR = ",";
    public static final String EXTRALIBS_SHARELIB_KEY_VALUE_SEPARATOR = "=";

    public static final String DIRECTORY_PERMISSION = "755";
    public static final String FILE_PERMISSION = "544";

    private boolean used;

    public static void main(String[] args) throws Exception{
        System.exit(new OozieSharelibCLI().run(args));
    }

    public OozieSharelibCLI() {
        used = false;
    }

    protected Options createUpgradeOptions(String subCommand){
        Option sharelib = new Option(LIB_OPT, true, "Local share library directory");
        Option uri = new Option(FS_OPT, true, "URI of the fileSystem to " + subCommand + " oozie share library");
        Option concurrency = new Option(CONCURRENCY_OPT, true, "Number of threads to be used for copy operations. (default=1)");
        Options options = new Options();
        options.addOption(sharelib);
        options.addOption(uri);
        options.addOption(concurrency);
        Option addLibsOption = new Option(EXTRALIBS, true, EXTRALIBS_USAGE);
        options.addOption(addLibsOption);
        return options;
    }

    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "False positive")
    public synchronized int run(String[] args) throws Exception{
        if (used) {
            throw new IllegalStateException("CLI instance already used");
        }

        used = true;

        CLIParser parser = new CLIParser("oozie-setup.sh", HELP_INFO);
        String oozieHome = System.getProperty(OOZIE_HOME);
        parser.addCommand(HELP_CMD, "", "display usage for all commands or specified command", new Options(), false);
        parser.addCommand(CREATE_CMD, "", "create a new timestamped version of oozie sharelib",
                createUpgradeOptions(CREATE_CMD), false);
        parser.addCommand(UPGRADE_CMD, "",
                "[deprecated][use command \"create\" to create new version]   upgrade oozie sharelib \n",
                createUpgradeOptions(UPGRADE_CMD), false);

        try {
            final CLIParser.Command command = parser.parse(args);
            String sharelibAction = command.getName();

            if (sharelibAction.equals(HELP_CMD)){
                parser.showHelp(command.getCommandLine());
                return 0;
            }

            if (!command.getCommandLine().hasOption(FS_OPT)){
                throw new Exception("-fs option must be specified");
            }

            int threadPoolSize = Integer.valueOf(command.getCommandLine().getOptionValue(CONCURRENCY_OPT, "1"));
            File srcFile = null;

            //Check whether user provided locallib
            if (command.getCommandLine().hasOption(LIB_OPT)){
                srcFile = new File(command.getCommandLine().getOptionValue(LIB_OPT));
            }
            else {
                //Since user did not provide locallib, find the default one under oozie home dir
                Collection<File> files =
                        FileUtils.listFiles(new File(oozieHome), new WildcardFileFilter("oozie-sharelib*.tar.gz"), null);

                if (files.size() > 1){
                    throw new IOException("more than one sharelib tar found at " + oozieHome);
                }

                if (files.isEmpty()){
                    throw new IOException("default sharelib tar not found in oozie home dir: " + oozieHome);
                }

                srcFile = files.iterator().next();
            }

            Map<String, String> extraLibs = new HashMap<>();
            if (command.getCommandLine().hasOption(EXTRALIBS)) {
                String[] param = command.getCommandLine().getOptionValues(EXTRALIBS);
                extraLibs = getExtraLibs(param);
            }

           File temp = Files.createTempDirectory("oozie").toFile();
            temp.deleteOnExit();

            //Check whether the lib is a tar file or folder
            if (!srcFile.isDirectory()){
                FileUtil.unTar(srcFile, temp);
                srcFile = new File(temp.toString() + "/share/lib");
            }
            else {
                //Get the lib directory since it's a folder
                srcFile = new File(srcFile, "lib");
            }

            String hdfsUri = command.getCommandLine().getOptionValue(FS_OPT);
            Path srcPath = new Path(srcFile.toString());

            Services services = new Services();
            services.getConf().set(Services.CONF_SERVICE_CLASSES,
                "org.apache.oozie.service.LiteWorkflowAppService, org.apache.oozie.service.HadoopAccessorService");
            services.getConf().set(Services.CONF_SERVICE_EXT_CLASSES, "");
            services.init();
            WorkflowAppService lwas = services.get(WorkflowAppService.class);
            HadoopAccessorService has = services.get(HadoopAccessorService.class);
            Path dstPath = lwas.getSystemLibPath();

            URI uri = new Path(hdfsUri).toUri();
            Configuration fsConf = has.createConfiguration(uri.getAuthority());
            FileSystem fs = FileSystem.get(uri, fsConf);

            if (!fs.exists(dstPath)) {
                fs.mkdirs(dstPath);
            }
            ECPolicyDisabler.tryDisableECPolicyForPath(fs, dstPath);

            if (sharelibAction.equals(CREATE_CMD) || sharelibAction.equals(UPGRADE_CMD)){
                dstPath= new Path(dstPath.toString() +  Path.SEPARATOR +  SHARE_LIB_PREFIX + getTimestampDirectory()  );
            }

            System.out.println("the destination path for sharelib is: " + dstPath);

            checkIfSourceFilesExist(srcFile);
            copyToSharelib(threadPoolSize, srcFile, srcPath, dstPath, fs);
            copyExtraLibs(threadPoolSize, extraLibs, dstPath, fs);

            if (sharelibAction.equals(CREATE_CMD) || sharelibAction.equals(UPGRADE_CMD)) {
                applySharelibPermission(fs, dstPath);
            }

            services.destroy();
            FileUtils.deleteDirectory(temp);

            return 0;
        }
        catch (ParseException ex) {
            System.err.println("Invalid sub-command: " + ex.getMessage());
            System.err.println();
            System.err.println(parser.shortHelp());
            return 1;
        }
        catch (NumberFormatException ex) {
            logError("Invalid configuration value: ", ex);
            return 1;
        }
        catch (Exception ex) {
            logError(ex.getMessage(), ex);
            return 1;
        }
    }

    @VisibleForTesting
    static Map<String,String> getExtraLibs(String[] param) {
        Map<String, String> extraLibs = new HashMap<>();

        for (String lib : param) {
            String[] addLibParts = lib.split(EXTRALIBS_SHARELIB_KEY_VALUE_SEPARATOR);
            if (addLibParts.length != 2) {
                printExtraSharelibUsage();
                throw new IllegalArgumentException(String
                        .format("Argument of extralibs '%s' is in a wrong format. Exiting.", param));
            }
            String sharelibName = addLibParts[0];
            String sharelibPaths = addLibParts[1];
            if (extraLibs.containsKey(sharelibName)) {
                printExtraSharelibUsage();
                throw new IllegalArgumentException(String
                        .format("Extra sharelib, '%s', has been specified multiple times. " + "Exiting.", param));
            }
            extraLibs.put(sharelibName, sharelibPaths);
        }
        return extraLibs;
    }

    private static void printExtraSharelibUsage() {
        System.err.println(EXTRALIBS_USAGE);
    }


    @VisibleForTesting
    @SuppressFBWarnings(value = "PATH_TRAVERSAL_IN", justification = "FilenameUtils is used to filter user input. JDK8+ is used.")
    void copyExtraLibs(int threadPoolSize, Map<String, String> extraLibs, Path dstPath, FileSystem fs) throws IOException {
        for (Map.Entry<String, String> sharelib : extraLibs.entrySet()) {
            Path libDestPath = new Path(dstPath.toString() + Path.SEPARATOR + sharelib.getKey());
            for (String libPath : sharelib.getValue().split(EXTRALIBS_PATH_SEPARATOR)) {
                File srcFile = new File(FilenameUtils.getFullPath(libPath) + FilenameUtils.getName(libPath));
                Path srcPath = new Path(FilenameUtils.getFullPath(libPath) + FilenameUtils.getName(libPath));
                checkIfSourceFilesExist(srcFile);
                copyToSharelib(threadPoolSize, srcFile, srcPath, libDestPath, fs);
            }
        }
    }

    @VisibleForTesting
    protected void copyToSharelib(int threadPoolSize, File srcFile, Path srcPath, Path dstPath, FileSystem fs) throws IOException {
        if (threadPoolSize > 1) {
            long fsLimitsMinBlockSize = fs.getConf()
                    .getLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_DEFAULT);
            long bytesPerChecksum = fs.getConf()
                    .getLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT);
            new ConcurrentCopyFromLocal(threadPoolSize, fsLimitsMinBlockSize, bytesPerChecksum)
                    .concurrentCopyFromLocal(fs, srcFile, dstPath);

        } else {
            fs.copyFromLocalFile(false, srcPath, dstPath);
        }
    }

    @VisibleForTesting
    protected void checkIfSourceFilesExist(File srcFile) throws IOException {
        if (!srcFile.exists()){
            throw new IOException(srcFile + " cannot be found");
        }
    }


    private static void logError(String errorMessage, Throwable ex) {
        System.err.println();
        System.err.println("Error: " + errorMessage);
        System.err.println();
        System.err.println("Stack trace for the error was (for debug purposes):");
        System.err.println("--------------------------------------");
        ex.printStackTrace(System.err);
        System.err.println("--------------------------------------");
        System.err.println();
    }

    public String getTimestampDirectory() {
        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss");
        Date date = new Date();
        return dateFormat.format(date).toString();
    }

    @VisibleForTesting
    static final class CopyTaskConfiguration {
        private final FileSystem fs;
        private final File srcFile;
        private final Path dstPath;

        CopyTaskConfiguration(FileSystem fs, File srcFile, Path dstPath) {
            this.fs = fs;
            this.srcFile = srcFile;
            this.dstPath = dstPath;
        }

        @Override
        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || getClass() != o.getClass()) {
                return false;
            }

            CopyTaskConfiguration that = (CopyTaskConfiguration) o;
            if (!srcFile.equals(that.srcFile)) {
                return false;
            }
            return dstPath.equals(that.dstPath);
        }

        @Override
        public int hashCode() {
            int result = srcFile.hashCode();
            result = 31 * result + dstPath.hashCode();
            return result;
        }

    }

    @VisibleForTesting
    static final class BlockSizeCalculator {

        protected static long getValidBlockSize (long fileLenght, long fsLimitsMinBlockSize, long bytesPerChecksum) {
            if (fsLimitsMinBlockSize > fileLenght) {
                return fsLimitsMinBlockSize;
            }
            // bytesPerChecksum must divide block size
            if (fileLenght % bytesPerChecksum == 0) {
                return fileLenght;
            }
            long ratio = fileLenght/bytesPerChecksum;
            return (ratio + 1) * bytesPerChecksum;
        }
    }

    @VisibleForTesting
    static final class CopyTaskCallable implements Callable<CopyTaskConfiguration> {

        private final short REPLICATION_FACTOR;
        private final FileSystem fileSystem;
        private final File file;
        private final Path destinationPath;
        private final Path targetName;
        private final long blockSize;

        private final Set<CopyTaskConfiguration> failedCopyTasks;

        CopyTaskCallable(CopyTaskConfiguration copyTask, File file, Path trgName, long blockSize,
                                 Set<CopyTaskConfiguration> failedCopyTasks) {
            Objects.requireNonNull(copyTask, "copyTask cannot be null");
            Objects.requireNonNull(file, "file cannot be null");
            Objects.requireNonNull(trgName, "trgName cannot be null");
            Objects.requireNonNull(failedCopyTasks, "failedCopyTask cannot be null");
            Objects.requireNonNull(copyTask.dstPath, "copyTask.dstPath cannot be null");
            Objects.requireNonNull(copyTask.fs, "copyTask.fs cannot be null");
            this.file = file;
            this.destinationPath = copyTask.dstPath;
            this.failedCopyTasks = failedCopyTasks;
            this.fileSystem = copyTask.fs;
            this.REPLICATION_FACTOR = (short) this.fileSystem.getConf().getInt(
                    DFSConfigKeys.DFS_REPLICATION_KEY, DFSConfigKeys.DFS_REPLICATION_DEFAULT);
            this.blockSize = blockSize;
            this.targetName = trgName;
        }

        @Override
        public CopyTaskConfiguration call() throws Exception {
            CopyTaskConfiguration cp = new CopyTaskConfiguration(fileSystem, file, targetName);
            failedCopyTasks.add(cp);
            final Path destinationFilePath = new Path(destinationPath + File.separator +  file.getName());
            final boolean overwrite = true;
            final int bufferSize = CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
            try (FSDataOutputStream out = fileSystem
                    .create(destinationFilePath, overwrite, bufferSize, REPLICATION_FACTOR, blockSize)) {
                Files.copy(file.toPath(), out);
            }
            return cp;
        }
    }

    private void applySharelibPermission(FileSystem fs, Path dstPath) throws IOException {
        for(FileStatus stat: fs.listStatus(dstPath)) {
            if(stat.isDirectory()) {
                applyDirectoryPermission(fs, stat);
                applySharelibPermission(fs, stat.getPath());
            } else {
                applyFilePermission(fs, stat);
            }
        }
    }

    private void applyDirectoryPermission(FileSystem fs, FileStatus stat) throws IOException {
        fs.setPermission(stat.getPath(), new FsPermission(DIRECTORY_PERMISSION));
    }


    private void applyFilePermission(FileSystem fs, FileStatus stat) throws IOException {
        fs.setPermission(stat.getPath(), new FsPermission(FILE_PERMISSION));
    }

    @VisibleForTesting
    static final class ConcurrentCopyFromLocal {

        private static final int DEFAULT_RETRY_COUNT = 5;
        private static final int STARTING_RETRY_DELAY_IN_MS = 1000;
        private int retryCount;
        private int retryDelayInMs;
        private long fsLimitsMinBlockSize;
        private long bytesPerChecksum;

        private final int threadPoolSize;
        private final ExecutorService threadPool;
        private final Set<CopyTaskConfiguration> failedCopyTasks = new ConcurrentHashSet<>();

        public ConcurrentCopyFromLocal(int threadPoolSize, long fsLimitsMinBlockSize, long bytesPerChecksum) {
            Preconditions.checkArgument(threadPoolSize > 0, "Thread Pool size must be greater than 0");
            Preconditions.checkArgument(fsLimitsMinBlockSize > 0, "Minimun block size must be greater than 0");
            Preconditions.checkArgument(bytesPerChecksum > 0, "Bytes per checksum must be greater than 0");
            this.bytesPerChecksum = bytesPerChecksum;
            this.fsLimitsMinBlockSize = fsLimitsMinBlockSize;
            this.threadPoolSize = threadPoolSize;
            this.threadPool = Executors.newFixedThreadPool(threadPoolSize);
            this.retryCount = DEFAULT_RETRY_COUNT;
            this.retryDelayInMs = STARTING_RETRY_DELAY_IN_MS;
        }

        @VisibleForTesting
        void concurrentCopyFromLocal(FileSystem fs, File srcFile, Path dstPath) throws IOException {
            List<Future<CopyTaskConfiguration>> futures = Collections.emptyList();
            CopyTaskConfiguration copyTask = new CopyTaskConfiguration(fs, srcFile, dstPath);
            try {
                futures = copyFolderRecursively(copyTask);
                System.out.println("Running " + futures.size() + " copy tasks on " + threadPoolSize + " threads");
            } finally {
                checkCopyResults(futures);
                System.out.println("Copy tasks are done");
                threadPool.shutdown();
            }
        }

        private List<Future<CopyTaskConfiguration>> copyFolderRecursively(final CopyTaskConfiguration copyTask) {
            List<Future<CopyTaskConfiguration>> taskList = new ArrayList<>();
            File[] fileList = copyTask.srcFile.listFiles();
            if (fileList != null) {
                for (final File file : fileList) {
                    final Path trgName = new Path(copyTask.dstPath, file.getName());
                    if (file.isDirectory()) {
                        taskList.addAll(copyFolderRecursively(
                                new CopyTaskConfiguration(copyTask.fs, file, trgName)));
                    } else {
                        final long blockSize = BlockSizeCalculator
                                .getValidBlockSize(file.length(), fsLimitsMinBlockSize, bytesPerChecksum);
                        taskList.add(threadPool
                                .submit(new CopyTaskCallable(copyTask, file, trgName, blockSize, failedCopyTasks)));
                    }
                }
            }
            return taskList;
        }

        private void checkCopyResults(final List<Future<CopyTaskConfiguration>> futures)
                throws IOException {
            boolean exceptionOccurred = false;
            for (Future<CopyTaskConfiguration> future : futures) {
                CopyTaskConfiguration cp;
                try {
                    cp = future.get();
                    if (cp != null) {
                        failedCopyTasks.remove(cp);
                    }
                } catch (CancellationException ce) {
                    exceptionOccurred = true;
                    logError("Copy task was cancelled", ce);
                } catch (ExecutionException ee) {
                    exceptionOccurred = true;
                    logError("Copy task failed with exception", ee.getCause());
                } catch (InterruptedException ie) {
                    exceptionOccurred = true;
                    Thread.currentThread().interrupt();
                }
            }
            if (exceptionOccurred) {
                System.err.println("At least one copy task failed with exception. Retrying failed copy tasks.");
                retryFailedCopyTasks();

                if (!failedCopyTasks.isEmpty() && retryCount == 0) {
                    throw new IOException("At least one copy task failed with exception");
                }
            }
        }

        private void retryFailedCopyTasks() throws IOException {

            while (retryCount > 0 && !failedCopyTasks.isEmpty()) {
                try {
                    System.err.println("Waiting " + retryDelayInMs + " ms before retrying failed copy tasks.");
                    Thread.sleep(retryDelayInMs);
                    retryDelayInMs = retryDelayInMs * 2;
                } catch (InterruptedException e) {
                    System.err.println(e.getMessage());
                }

                for (CopyTaskConfiguration cp : failedCopyTasks) {
                    System.err.println("Retrying to copy " + cp.srcFile + " to " + cp.dstPath);
                    try {
                        copyFromLocalFile(cp);
                        failedCopyTasks.remove(cp);
                    }
                    catch (IOException e) {
                        System.err.printf("Copying [%s] to [%s] failed with exception: [%s]%n. Proceed to next file.%n"
                                ,cp.srcFile, cp.dstPath, e.getMessage());
                    }
                }

                --retryCount;
            }

            if (!failedCopyTasks.isEmpty() && retryCount == 0) {
                throw new IOException("Could not install Oozie ShareLib properly.");
            }
        }

        private void copyFromLocalFile(CopyTaskConfiguration cp) throws IOException{
            final FileSystem fs = cp.fs;
            fs.delete(cp.dstPath, false);
            fs.copyFromLocalFile(false, new Path(cp.srcFile.toURI()), cp.dstPath);
        }
    }
}
