/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.logaggregation;

import java.io.DataInputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
import java.nio.file.AccessDeniedException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.HarFs;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogReader;
import com.google.common.annotations.VisibleForTesting;

public class LogCLIHelpers implements Configurable {

  public static final String PER_LOG_FILE_INFO_PATTERN =
      "%20s\t%20s" + System.getProperty("line.separator");
  public static final String CONTAINER_ON_NODE_PATTERN =
      "Container: %s on %s";

  private Configuration conf;

  @Private
  @VisibleForTesting
  public int dumpAContainersLogs(String appId, String containerId,
      String nodeId, String jobOwner) throws IOException {
    ContainerLogsRequest options = new ContainerLogsRequest();
    options.setAppId(ApplicationId.fromString(appId));
    options.setContainerId(containerId);
    options.setNodeId(nodeId);
    options.setAppOwner(jobOwner);
    Set<String> logs = new HashSet<String>();
    options.setLogTypes(logs);
    options.setBytes(Long.MAX_VALUE);
    return dumpAContainerLogsForLogType(options, false);
  }

  @Private
  @VisibleForTesting
  /**
   * Return the owner for a given AppId
   * @param remoteRootLogDir
   * @param appId
   * @param bestGuess
   * @param conf
   * @return the owner or null
   * @throws IOException
   */
  public static String getOwnerForAppIdOrNull(
      ApplicationId appId, String bestGuess,
      Configuration conf) throws IOException {
    Path remoteRootLogDir = new Path(conf.get(
        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
    Path fullPath = LogAggregationUtils.getRemoteAppLogDir(remoteRootLogDir,
        appId, bestGuess, suffix);
    FileContext fc =
        FileContext.getFileContext(remoteRootLogDir.toUri(), conf);
    String pathAccess = fullPath.toString();
    try {
      if (fc.util().exists(fullPath)) {
        return bestGuess;
      }
      Path toMatch = LogAggregationUtils.
          getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
      pathAccess = toMatch.toString();
      FileStatus[] matching  = fc.util().globStatus(toMatch);
      if (matching == null || matching.length != 1) {
        return null;
      }
      //fetch the user from the full path /app-logs/user[/suffix]/app_id
      Path parent = matching[0].getPath().getParent();
      //skip the suffix too
      if (suffix != null && !StringUtils.isEmpty(suffix)) {
        parent = parent.getParent();
      }
      return parent.getName();
    } catch (AccessControlException | AccessDeniedException ex) {
      logDirNoAccessPermission(pathAccess, bestGuess, ex.getMessage());
      return null;
    }
  }

  @Private
  @VisibleForTesting
  public int dumpAContainerLogsForLogType(ContainerLogsRequest options)
      throws IOException {
    return dumpAContainerLogsForLogType(options, true);
  }

  @Private
  @VisibleForTesting
  public int dumpAContainerLogsForLogType(ContainerLogsRequest options,
      boolean outputFailure) throws IOException {
    ApplicationId applicationId = options.getAppId();
    String jobOwner = options.getAppOwner();
    String nodeId = options.getNodeId();
    String containerId = options.getContainerId();
    String localDir = options.getOutputLocalDir();
    List<String> logType = new ArrayList<String>(options.getLogTypes());
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        applicationId, jobOwner);
    if (nodeFiles == null) {
      return -1;
    }
    boolean foundContainerLogs = false;
    while (nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      String fileName = thisNodeFile.getPath().getName();
      if (fileName.equals(applicationId + ".har")) {
        Path p = new Path("har:///"
            + thisNodeFile.getPath().toUri().getRawPath());
        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
        continue;
      }
      if (fileName.contains(LogAggregationUtils.getNodeString(nodeId))
          && !fileName.endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
        AggregatedLogFormat.LogReader reader = null;
        PrintStream out = createPrintStream(localDir, fileName, containerId);
        try {
          reader = new AggregatedLogFormat.LogReader(getConf(),
              thisNodeFile.getPath());
          if (getContainerLogsStream(containerId, reader) == null) {
            continue;
          }
          String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
              containerId, thisNodeFile.getPath().getName());
          out.println(containerString);
          out.println(StringUtils.repeat("=", containerString.length()));
          // We have to re-create reader object to reset the stream index
          // after calling getContainerLogsStream which would move the stream
          // index to the end of the log file.
          reader =
              new AggregatedLogFormat.LogReader(getConf(),
                thisNodeFile.getPath());
          if (logType == null || logType.isEmpty()) {
            if (dumpAContainerLogs(containerId, reader, out,
                thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
              foundContainerLogs = true;
            }
          } else {
            if (dumpAContainerLogsForALogType(containerId, reader, out,
                thisNodeFile.getModificationTime(), logType,
                options.getBytes()) > -1) {
              foundContainerLogs = true;
            }
          }
        } finally {
          if (reader != null) {
            reader.close();
          }
          closePrintStream(out);
        }
      }
    }
    if (!foundContainerLogs) {
      if (outputFailure) {
        containerLogNotFound(containerId);
      }
      return -1;
    }
    return 0;
  }

  @Private
  public int dumpAContainerLogsForLogTypeWithoutNodeId(
      ContainerLogsRequest options) throws IOException {
    ApplicationId applicationId = options.getAppId();
    String jobOwner = options.getAppOwner();
    String containerId = options.getContainerId();
    String localDir = options.getOutputLocalDir();
    List<String> logType = new ArrayList<String>(options.getLogTypes());
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        applicationId, jobOwner);
    if (nodeFiles == null) {
      return -1;
    }
    boolean foundContainerLogs = false;
    while(nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      if (!thisNodeFile.getPath().getName().endsWith(
          LogAggregationUtils.TMP_FILE_SUFFIX)) {
        AggregatedLogFormat.LogReader reader = null;
        PrintStream out = System.out;
        try {
          reader =
              new AggregatedLogFormat.LogReader(getConf(),
              thisNodeFile.getPath());
          if (getContainerLogsStream(containerId, reader) == null) {
            continue;
          }
          // We have to re-create reader object to reset the stream index
          // after calling getContainerLogsStream which would move the stream
          // index to the end of the log file.
          reader =
              new AggregatedLogFormat.LogReader(getConf(),
              thisNodeFile.getPath());
          out = createPrintStream(localDir, thisNodeFile.getPath().getName(),
              containerId);
          String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
              containerId, thisNodeFile.getPath().getName());
          out.println(containerString);
          out.println(StringUtils.repeat("=", containerString.length()));
          if (logType == null || logType.isEmpty()) {
            if (dumpAContainerLogs(containerId, reader, out,
                thisNodeFile.getModificationTime(), options.getBytes()) > -1) {
              foundContainerLogs = true;
            }
          } else {
            if (dumpAContainerLogsForALogType(containerId, reader, out,
                thisNodeFile.getModificationTime(), logType,
                options.getBytes()) > -1) {
              foundContainerLogs = true;
            }
          }
        } finally {
          if (reader != null) {
            reader.close();
          }
          closePrintStream(out);
        }
      }
    }
    if (!foundContainerLogs) {
      containerLogNotFound(containerId);
      return -1;
    }
    return 0;
  }

  @Private
  public int dumpAContainerLogs(String containerIdStr,
      AggregatedLogFormat.LogReader reader, PrintStream out,
      long logUploadedTime, long bytes) throws IOException {
    DataInputStream valueStream = getContainerLogsStream(
        containerIdStr, reader);

    if (valueStream == null) {
      return -1;
    }

    boolean foundContainerLogs = false;
    while (true) {
      try {
        LogReader.readAContainerLogsForALogType(valueStream, out,
            logUploadedTime, bytes);
        foundContainerLogs = true;
      } catch (EOFException eof) {
        break;
      }
    }
    if (foundContainerLogs) {
      return 0;
    }
    return -1;
  }

  private DataInputStream getContainerLogsStream(String containerIdStr,
      AggregatedLogFormat.LogReader reader) throws IOException {
    DataInputStream valueStream;
    LogKey key = new LogKey();
    valueStream = reader.next(key);

    while (valueStream != null && !key.toString().equals(containerIdStr)) {
      // Next container
      key = new LogKey();
      valueStream = reader.next(key);
    }
    return valueStream;
  }

  @Private
  public int dumpAContainerLogsForALogType(String containerIdStr,
      AggregatedLogFormat.LogReader reader, PrintStream out,
      long logUploadedTime, List<String> logType, long bytes)
      throws IOException {
    DataInputStream valueStream = getContainerLogsStream(
        containerIdStr, reader);
    if (valueStream == null) {
      return -1;
    }

    boolean foundContainerLogs = false;
    while (true) {
      try {
        int result = LogReader.readContainerLogsForALogType(
            valueStream, out, logUploadedTime, logType, bytes);
        if (result == 0) {
          foundContainerLogs = true;
        }
      } catch (EOFException eof) {
        break;
      }
    }

    if (foundContainerLogs) {
      return 0;
    }
    return -1;
  }

  @Private
  public int dumpAllContainersLogs(ContainerLogsRequest options)
      throws IOException {
    ApplicationId appId = options.getAppId();
    String appOwner = options.getAppOwner();
    String localDir = options.getOutputLocalDir();
    List<String> logTypes = new ArrayList<String>(options.getLogTypes());
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        appId, appOwner);
    if (nodeFiles == null) {
      return -1;
    }
    boolean foundAnyLogs = false;
    while (nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      if (thisNodeFile.getPath().getName().equals(appId + ".har")) {
        Path p = new Path("har:///"
            + thisNodeFile.getPath().toUri().getRawPath());
        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
        continue;
      }
      if (!thisNodeFile.getPath().getName()
          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
        AggregatedLogFormat.LogReader reader =
            new AggregatedLogFormat.LogReader(getConf(),
                thisNodeFile.getPath());
        try {

          DataInputStream valueStream;
          LogKey key = new LogKey();
          valueStream = reader.next(key);

          while (valueStream != null) {
            PrintStream out = createPrintStream(localDir,
                thisNodeFile.getPath().getName(), key.toString());
            try {
              String containerString = String.format(
                  CONTAINER_ON_NODE_PATTERN, key,
                  thisNodeFile.getPath().getName());
              out.println(containerString);
              out.println(StringUtils.repeat("=", containerString.length()));
              while (true) {
                try {
                  if (logTypes == null || logTypes.isEmpty()) {
                    LogReader.readAContainerLogsForALogType(valueStream, out,
                        thisNodeFile.getModificationTime(),
                        options.getBytes());
                    foundAnyLogs = true;
                  } else {
                    int result = LogReader.readContainerLogsForALogType(
                        valueStream, out, thisNodeFile.getModificationTime(),
                        logTypes, options.getBytes());
                    if (result == 0) {
                      foundAnyLogs = true;
                    }
                  }
                } catch (EOFException eof) {
                  break;
                }
              }
            } finally {
              closePrintStream(out);
            }

            // Next container
            key = new LogKey();
            valueStream = reader.next(key);
          }
        } finally {
          reader.close();
        }
      }
    }
    if (!foundAnyLogs) {
      emptyLogDir(getRemoteAppLogDir(appId, appOwner).toString());
      return -1;
    }
    return 0;
  }

  @Private
  public int printAContainerLogMetadata(ContainerLogsRequest options,
      PrintStream out, PrintStream err)
      throws IOException {
    ApplicationId appId = options.getAppId();
    String appOwner = options.getAppOwner();
    String nodeId = options.getNodeId();
    String containerIdStr = options.getContainerId();
    boolean getAllContainers = (containerIdStr == null);
    String nodeIdStr = (nodeId == null) ? null
        : LogAggregationUtils.getNodeString(nodeId);
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        appId, appOwner);
    if (nodeFiles == null) {
      return -1;
    }
    boolean foundAnyLogs = false;
    while (nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      if (nodeIdStr != null) {
        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
          continue;
        }
      }
      if (!thisNodeFile.getPath().getName()
          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
        AggregatedLogFormat.LogReader reader =
            new AggregatedLogFormat.LogReader(getConf(),
            thisNodeFile.getPath());
        try {
          DataInputStream valueStream;
          LogKey key = new LogKey();
          valueStream = reader.next(key);
          while (valueStream != null) {
            if (getAllContainers || (key.toString().equals(containerIdStr))) {
              String containerString = String.format(CONTAINER_ON_NODE_PATTERN,
                  key, thisNodeFile.getPath().getName());
              out.println(containerString);
              out.println("Log Upload Time:"
                  + thisNodeFile.getModificationTime());
              out.println(StringUtils.repeat("=", containerString.length()));
              out.printf(PER_LOG_FILE_INFO_PATTERN, "LogType", "LogLength");
              out.println(StringUtils.repeat("=", containerString.length()));
              while (true) {
                try {
                  Pair<String, String> logMeta =
                      LogReader.readContainerMetaDataAndSkipData(
                          valueStream, out);
                  out.printf(PER_LOG_FILE_INFO_PATTERN,
                      logMeta.getFirst(), logMeta.getSecond());
                } catch (EOFException eof) {
                  break;
                }
              }
              foundAnyLogs = true;
              if (!getAllContainers) {
                break;
              }
            }
            // Next container
            key = new LogKey();
            valueStream = reader.next(key);
          }
        } finally {
          reader.close();
        }
      }
    }
    if (!foundAnyLogs) {
      if (containerIdStr != null && nodeId != null) {
        err.println("The container " + containerIdStr + " couldn't be found "
            + "on the node specified: " + nodeId);
      } else if (nodeId != null) {
        err.println("Can not find log metadata for any containers on "
            + nodeId);
      } else if (containerIdStr != null) {
        err.println("Can not find log metadata for container: "
            + containerIdStr);
      }
      return -1;
    }
    return 0;
  }

  @Private
  public void printNodesList(ContainerLogsRequest options,
      PrintStream out, PrintStream err) throws IOException {
    ApplicationId appId = options.getAppId();
    String appOwner = options.getAppOwner();
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        appId, appOwner);
    if (nodeFiles == null) {
      return;
    }
    boolean foundNode = false;
    StringBuilder sb = new StringBuilder();
    while (nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      sb.append(thisNodeFile.getPath().getName() + "\n");
      foundNode = true;
    }
    if (!foundNode) {
      err.println("No nodes found that aggregated logs for "
          + "the application: " + appId);
    } else {
      out.println(sb.toString());
    }
  }

  @Private
  public void printContainersList(ContainerLogsRequest options,
      PrintStream out, PrintStream err) throws IOException {
    ApplicationId appId = options.getAppId();
    String appOwner = options.getAppOwner();
    String nodeId = options.getNodeId();
    String nodeIdStr = (nodeId == null) ? null
        : LogAggregationUtils.getNodeString(nodeId);
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        appId, appOwner);
    if (nodeFiles == null) {
      return;
    }
    boolean foundAnyLogs = false;
    while (nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      if (nodeIdStr != null) {
        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
          continue;
        }
      }
      if (!thisNodeFile.getPath().getName()
          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
        AggregatedLogFormat.LogReader reader =
            new AggregatedLogFormat.LogReader(getConf(),
            thisNodeFile.getPath());
        try {
          DataInputStream valueStream;
          LogKey key = new LogKey();
          valueStream = reader.next(key);
          while (valueStream != null) {
            out.println(String.format(CONTAINER_ON_NODE_PATTERN, key,
                thisNodeFile.getPath().getName()));
            foundAnyLogs = true;
            // Next container
            key = new LogKey();
            valueStream = reader.next(key);
          }
        } finally {
          reader.close();
        }
      }
    }
    if (!foundAnyLogs) {
      if (nodeId != null) {
        err.println("Can not find information for any containers on "
            + nodeId);
      } else {
        err.println("Can not find any container information for "
            + "the application: " + appId);
      }
    }
  }

  private RemoteIterator<FileStatus> getRemoteNodeFileDir(ApplicationId appId,
      String appOwner) throws IOException {
    Path remoteAppLogDir = getRemoteAppLogDir(appId, appOwner);
    RemoteIterator<FileStatus> nodeFiles = null;
    try {
      Path qualifiedLogDir =
          FileContext.getFileContext(getConf()).makeQualified(remoteAppLogDir);
      nodeFiles = FileContext.getFileContext(qualifiedLogDir.toUri(),
          getConf()).listStatus(remoteAppLogDir);
    } catch (FileNotFoundException fnf) {
      logDirNotExist(remoteAppLogDir.toString());
    } catch (AccessControlException | AccessDeniedException ace) {
      logDirNoAccessPermission(remoteAppLogDir.toString(), appOwner,
        ace.getMessage());
    }
    return nodeFiles;
  }

  private Path getRemoteAppLogDir(ApplicationId appId, String appOwner) {
    Path remoteRootLogDir = new Path(getConf().get(
        YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
        YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
    String user = appOwner;
    String logDirSuffix = LogAggregationUtils
        .getRemoteNodeLogDirSuffix(getConf());
    // TODO Change this to get a list of files from the LAS.
    return LogAggregationUtils.getRemoteAppLogDir(
        remoteRootLogDir, appId, user, logDirSuffix);
  }

  @Override
  public void setConf(Configuration conf) {
    this.conf = conf;
  }

  @Override
  public Configuration getConf() {
    return this.conf;
  }

  private static void containerLogNotFound(String containerId) {
    System.err.println("Logs for container " + containerId
        + " are not present in this log-file.");
  }

  private static void logDirNotExist(String remoteAppLogDir) {
    System.err.println(remoteAppLogDir + " does not exist.");
    System.err.println("Log aggregation has not completed or is not enabled.");
  }

  private static void emptyLogDir(String remoteAppLogDir) {
    System.err.println(remoteAppLogDir + " does not have any log files.");
  }

  private static void logDirNoAccessPermission(String remoteAppLogDir,
      String appOwner, String errorMessage) throws IOException {
    System.err.println("Guessed logs' owner is " + appOwner
        + " and current user "
        + UserGroupInformation.getCurrentUser().getUserName() + " does not "
        + "have permission to access " + remoteAppLogDir
        + ". Error message found: " + errorMessage);
  }

  @Private
  public PrintStream createPrintStream(String localDir, String nodeId,
      String containerId) throws IOException {
    PrintStream out = System.out;
    if(localDir != null && !localDir.isEmpty()) {
      Path nodePath = new Path(localDir, LogAggregationUtils
          .getNodeString(nodeId));
      Files.createDirectories(Paths.get(nodePath.toString()));
      Path containerLogPath = new Path(nodePath, containerId);
      out = new PrintStream(containerLogPath.toString(), "UTF-8");
    }
    return out;
  }

  public void closePrintStream(PrintStream out) {
    if (out != System.out) {
      IOUtils.closeQuietly(out);
    }
  }

  @Private
  public Set<String> listContainerLogs(ContainerLogsRequest options)
      throws IOException {
    Set<String> logTypes = new HashSet<String>();
    ApplicationId appId = options.getAppId();
    String appOwner = options.getAppOwner();
    String nodeId = options.getNodeId();
    String containerIdStr = options.getContainerId();
    boolean getAllContainers = (containerIdStr == null);
    String nodeIdStr = (nodeId == null) ? null
        : LogAggregationUtils.getNodeString(nodeId);
    RemoteIterator<FileStatus> nodeFiles = getRemoteNodeFileDir(
        appId, appOwner);
    if (nodeFiles == null) {
      return logTypes;
    }
    while (nodeFiles.hasNext()) {
      FileStatus thisNodeFile = nodeFiles.next();
      if (nodeIdStr != null) {
        if (!thisNodeFile.getPath().getName().contains(nodeIdStr)) {
          continue;
        }
      }
      if (!thisNodeFile.getPath().getName()
          .endsWith(LogAggregationUtils.TMP_FILE_SUFFIX)) {
        AggregatedLogFormat.LogReader reader =
            new AggregatedLogFormat.LogReader(getConf(),
            thisNodeFile.getPath());
        try {
          DataInputStream valueStream;
          LogKey key = new LogKey();
          valueStream = reader.next(key);
          while (valueStream != null) {
            if (getAllContainers || (key.toString().equals(containerIdStr))) {
              while (true) {
                try {
                  String logFile = LogReader.readContainerMetaDataAndSkipData(
                      valueStream, null).getFirst();
                  logTypes.add(logFile);
                } catch (EOFException eof) {
                  break;
                }
              }
              if (!getAllContainers) {
                break;
              }
            }
            // Next container
            key = new LogKey();
            valueStream = reader.next(key);
          }
        } finally {
          reader.close();
        }
      }
    }
    return logTypes;
  }
}
