/**
 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the
 * Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a
 * copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language
 * governing permissions and limitations under the License.
 */
package org.apache.crunch.util;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.tools.CopyListing;
import org.apache.hadoop.tools.CopyListingFileStatus;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.tools.DistCpOptions.FileAttribute;
import org.apache.hadoop.tools.SimpleCopyListing;
import org.apache.hadoop.tools.util.DistCpUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Stack;

/**
 * A custom {@link CopyListing} implementation capable of dynamically renaming
 * the target paths according to a {@link #DISTCP_PATH_RENAMES configured set of values}.
 * <p>
 * Once https://issues.apache.org/jira/browse/HADOOP-16147 is available, this
 * class can be significantly simplified.
 * </p>
 */
public class CrunchRenameCopyListing extends SimpleCopyListing {
  /**
   * Comma-separated list of original-file:renamed-file path rename pairs.
   */
  public static final String DISTCP_PATH_RENAMES = "crunch.distcp.path.renames";

  private static final Logger LOG = LoggerFactory.getLogger(CrunchRenameCopyListing.class);
  private final Map<String, String> pathRenames;

  private long totalPaths = 0;
  private long totalBytesToCopy = 0;

  /**
   * Constructor, to initialize configuration.
   *
   * @param configuration The input configuration, with which the source/target FileSystems may be accessed.
   * @param credentials - Credentials object on which the FS delegation tokens are cached. If null
   * delegation token caching is skipped
   */
  public CrunchRenameCopyListing(Configuration configuration, Credentials credentials) {
    super(configuration, credentials);

    pathRenames = new HashMap<>();

    String[] pathRenameConf = configuration.getStrings(DISTCP_PATH_RENAMES);
    if (pathRenameConf == null) {
      throw new IllegalArgumentException("Missing required configuration: " + DISTCP_PATH_RENAMES);
    }
    for (String pathRename : pathRenameConf) {
      String[] pathRenameParts = pathRename.split(":");
      if (pathRenameParts.length != 2) {
        throw new IllegalArgumentException("Invalid path rename format: " + pathRename);
      }
      if (pathRenames.put(pathRenameParts[0], pathRenameParts[1]) != null) {
        throw new IllegalArgumentException("Invalid duplicate path rename: " + pathRenameParts[0]);
      }
    }
    LOG.info("Loaded {} path rename entries", pathRenames.size());

    // Clear out the rename configuration property, as it is no longer needed
    configuration.unset(DISTCP_PATH_RENAMES);
  }

  @Override
  public void doBuildListing(SequenceFile.Writer fileListWriter, DistCpOptions options) throws IOException {
    try {
      for (Path path : options.getSourcePaths()) {
        FileSystem sourceFS = path.getFileSystem(getConf());
        final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
        final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
        final boolean preserveRawXAttrs = options.shouldPreserveRawXattrs();
        path = makeQualified(path);

        FileStatus rootStatus = sourceFS.getFileStatus(path);
        Path sourcePathRoot = computeSourceRootPath(rootStatus, options);

        FileStatus[] sourceFiles = sourceFS.listStatus(path);
        boolean explore = (sourceFiles != null && sourceFiles.length > 0);
        if (!explore || rootStatus.isDirectory()) {
          CopyListingFileStatus rootCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, rootStatus, preserveAcls,
              preserveXAttrs, preserveRawXAttrs);
          writeToFileListingRoot(fileListWriter, rootCopyListingStatus, sourcePathRoot, options);
        }
        if (explore) {
          for (FileStatus sourceStatus : sourceFiles) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
            }
            CopyListingFileStatus sourceCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceStatus,
                preserveAcls && sourceStatus.isDirectory(), preserveXAttrs && sourceStatus.isDirectory(),
                preserveRawXAttrs && sourceStatus.isDirectory());
            writeToFileListing(fileListWriter, sourceCopyListingStatus, sourcePathRoot, options);

            if (isDirectoryAndNotEmpty(sourceFS, sourceStatus)) {
              if (LOG.isDebugEnabled()) {
                LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
              }
              traverseNonEmptyDirectory(fileListWriter, sourceStatus, sourcePathRoot, options);
            }
          }
        }
      }
      fileListWriter.close();
      fileListWriter = null;
    } finally {
      if (fileListWriter != null) {
        try {
          fileListWriter.close();
        } catch(IOException e) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Exception in closing {}", fileListWriter, e);
          }
        }
      }
    }
  }

  private Path computeSourceRootPath(FileStatus sourceStatus, DistCpOptions options) throws IOException {
    Path target = options.getTargetPath();
    FileSystem targetFS = target.getFileSystem(getConf());
    final boolean targetPathExists = options.getTargetPathExists();

    boolean solitaryFile = options.getSourcePaths().size() == 1 && !sourceStatus.isDirectory();

    if (solitaryFile) {
      if (targetFS.isFile(target) || !targetPathExists) {
        return sourceStatus.getPath();
      } else {
        return sourceStatus.getPath().getParent();
      }
    } else {
      boolean specialHandling =
          (options.getSourcePaths().size() == 1 && !targetPathExists) || options.shouldSyncFolder() || options.shouldOverwrite();

      return specialHandling && sourceStatus.isDirectory() ? sourceStatus.getPath() : sourceStatus.getPath().getParent();
    }
  }

  private Path makeQualified(Path path) throws IOException {
    final FileSystem fs = path.getFileSystem(getConf());
    return path.makeQualified(fs.getUri(), fs.getWorkingDirectory());
  }

  private static boolean isDirectoryAndNotEmpty(FileSystem fileSystem, FileStatus fileStatus) throws IOException {
    return fileStatus.isDirectory() && getChildren(fileSystem, fileStatus).length > 0;
  }

  private static FileStatus[] getChildren(FileSystem fileSystem, FileStatus parent) throws IOException {
    return fileSystem.listStatus(parent.getPath());
  }

  private void traverseNonEmptyDirectory(SequenceFile.Writer fileListWriter, FileStatus sourceStatus, Path sourcePathRoot,
      DistCpOptions options) throws IOException {
    FileSystem sourceFS = sourcePathRoot.getFileSystem(getConf());
    final boolean preserveAcls = options.shouldPreserve(FileAttribute.ACL);
    final boolean preserveXAttrs = options.shouldPreserve(FileAttribute.XATTR);
    final boolean preserveRawXattrs = options.shouldPreserveRawXattrs();
    Stack<FileStatus> pathStack = new Stack<>();
    pathStack.push(sourceStatus);

    while (!pathStack.isEmpty()) {
      for (FileStatus child : getChildren(sourceFS, pathStack.pop())) {
        if (LOG.isDebugEnabled()) {
          LOG.debug("Recording source-path: {} for copy.", sourceStatus.getPath());
        }
        CopyListingFileStatus childCopyListingStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, child,
            preserveAcls && child.isDirectory(), preserveXAttrs && child.isDirectory(), preserveRawXattrs && child.isDirectory());
        writeToFileListing(fileListWriter, childCopyListingStatus, sourcePathRoot, options);
        if (isDirectoryAndNotEmpty(sourceFS, child)) {
          if (LOG.isDebugEnabled()) {
            LOG.debug("Traversing non-empty source dir: {}", sourceStatus.getPath());
          }
          pathStack.push(child);
        }
      }
    }
  }

  private void writeToFileListingRoot(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot,
      DistCpOptions options) throws IOException {
    boolean syncOrOverwrite = options.shouldSyncFolder() || options.shouldOverwrite();
    if (fileStatus.getPath().equals(sourcePathRoot) && fileStatus.isDirectory() && syncOrOverwrite) {
      // Skip the root-paths when syncOrOverwrite
      if (LOG.isDebugEnabled()) {
        LOG.debug("Skip {}", fileStatus.getPath());
      }
      return;
    }
    writeToFileListing(fileListWriter, fileStatus, sourcePathRoot, options);
  }

  private void writeToFileListing(SequenceFile.Writer fileListWriter, CopyListingFileStatus fileStatus, Path sourcePathRoot,
      DistCpOptions options) throws IOException {
    if (LOG.isDebugEnabled()) {
      LOG.debug("REL PATH: {}, FULL PATH: {}",
          DistCpUtils.getRelativePath(sourcePathRoot, fileStatus.getPath()), fileStatus.getPath());
    }

    if (!shouldCopy(fileStatus.getPath())) {
      return;
    }

    fileListWriter.append(getFileListingKey(sourcePathRoot, fileStatus),
        getFileListingValue(fileStatus));
    fileListWriter.sync();

    if (!fileStatus.isDirectory()) {
      totalBytesToCopy += fileStatus.getLen();
    }
    totalPaths++;
  }

  /**
   * Returns the key for an entry in the copy listing sequence file
   * @param sourcePathRoot the root source path for determining the relative target path
   * @param fileStatus the copy listing file status
   * @return the key for the sequence file entry
   */
  protected Text getFileListingKey(Path sourcePathRoot, CopyListingFileStatus fileStatus) {
    Path fileStatusPath = fileStatus.getPath();
    String pathName = fileStatusPath.getName();
    String renamedPathName = pathRenames.get(pathName);

    if (renamedPathName != null && !pathName.equals(renamedPathName)) {
      LOG.info("Applying dynamic rename of {} to {}", pathName, renamedPathName);
      fileStatusPath = new Path(fileStatusPath.getParent(), renamedPathName);
    }
    return new Text(DistCpUtils.getRelativePath(sourcePathRoot, fileStatusPath));
  }

  /**
   * Returns the value for an entry in the copy listing sequence file
   * @param fileStatus the copy listing file status
   * @return the value for the sequence file entry
   */
  protected CopyListingFileStatus getFileListingValue(CopyListingFileStatus fileStatus) {
    return fileStatus;
  }

  @Override
  protected long getBytesToCopy() {
    return totalBytesToCopy;
  }

  @Override
  protected long getNumberOfPaths() {
    return totalPaths;
  }
}
