/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.tez.dag.history.logging.proto;

import java.io.IOException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.yarn.util.Clock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;

/**
 * Class to create proto reader and writer for a date partitioned directory structure.
 *
 * @param <T> The proto message type.
 */
public class DatePartitionedLogger<T extends MessageLite> {
  private static final Logger LOG = LoggerFactory.getLogger(DatePartitionedLogger.class);
  // Everyone has permission to write, but with sticky set so that delete is restricted.
  // This is required, since the path is same for all users and everyone writes into it.
  private static final FsPermission DIR_PERMISSION = FsPermission.createImmutable((short)01777);

  // Since the directories have broad permissions restrict the file read access.
  private static final FsPermission FILE_UMASK = FsPermission.createImmutable((short)0066);

  private final Parser<T> parser;
  private final Path basePath;
  private final Configuration conf;
  private final Clock clock;

  public DatePartitionedLogger(Parser<T> parser, Path baseDir, Configuration conf, Clock clock)
      throws IOException {
    this.conf = new Configuration(conf);
    this.clock = clock;
    this.parser = parser;
    createDirIfNotExists(baseDir);
    this.basePath = baseDir.getFileSystem(conf).resolvePath(baseDir);
    FsPermission.setUMask(this.conf, FILE_UMASK);
  }

  private void createDirIfNotExists(Path path) throws IOException {
    FileSystem fileSystem = path.getFileSystem(conf);
    try {
      if (!fileSystem.exists(path)) {
        fileSystem.mkdirs(path);
        fileSystem.setPermission(path, DIR_PERMISSION);
      }
    } catch (IOException e) {
      // Ignore this exception, if there is a problem it'll fail when trying to read or write.
      LOG.warn("Error while trying to set permission: ", e);
    }
  }

  /**
   * Creates a writer for the given fileName, with date as today.
   */
  public ProtoMessageWriter<T> getWriter(String fileName) throws IOException {
    Path filePath = getPathForDate(getNow().toLocalDate(), fileName);
    return new ProtoMessageWriter<>(conf, filePath, parser);
  }

  /**
   * Creates a reader for the given filePath, no validation is done.
   */
  public ProtoMessageReader<T> getReader(Path filePath) throws IOException {
    return new ProtoMessageReader<>(conf, filePath, parser);
  }

  /**
   * Create a path for the given date and fileName. This can be used to create a reader.
   */
  public Path getPathForDate(LocalDate date, String fileName) throws IOException {
    Path path = new Path(basePath, getDirForDate(date));
    createDirIfNotExists(path);
    return new Path(path, fileName);
  }

  public Path getPathForSubdir(String dirName, String fileName) {
    return new Path(new Path(basePath, dirName), fileName);
  }

  /**
   * Extract the date from the directory name, this should be a directory created by this class.
   */
  public LocalDate getDateFromDir(String dirName) {
    if (!dirName.startsWith("date=")) {
      throw new IllegalArgumentException("Invalid directory: "+ dirName);
    }
    return LocalDate.parse(dirName.substring(5), DateTimeFormatter.ISO_LOCAL_DATE);
  }

  /**
   * Returns the directory name for a given date.
   */
  public String getDirForDate(LocalDate date) {
    return "date=" + DateTimeFormatter.ISO_LOCAL_DATE.format(date);
  }

  /**
   * Find next available directory, after the given directory.
   */
  public String getNextDirectory(String currentDir) throws IOException {
    // Fast check, if the next day directory exists return it.
    String nextDate = getDirForDate(getDateFromDir(currentDir).plusDays(1));
    FileSystem fileSystem = basePath.getFileSystem(conf);
    if (fileSystem.exists(new Path(basePath, nextDate))) {
      return nextDate;
    }
    // Have to scan the directory to find min date greater than currentDir.
    String dirName = null;
    RemoteIterator<FileStatus> iter = fileSystem.listStatusIterator(basePath);
    while (iter.hasNext()) {
      FileStatus status = iter.next();
      String name = status.getPath().getName();
      // String comparison is good enough, since its of form date=yyyy-MM-dd
      if (name.compareTo(currentDir) > 0 && (dirName == null || name.compareTo(dirName) < 0)) {
        dirName = name;
      }
    }
    return dirName;
  }

  /**
   * Returns new or changed files in the given directory. The offsets are used to find
   * changed files.
   */
  public List<FileStatus> scanForChangedFiles(String subDir, Map<String, Long> currentOffsets)
      throws IOException {
    Path dirPath = new Path(basePath, subDir);
    FileSystem fileSystem = basePath.getFileSystem(conf);
    List<FileStatus> newFiles = new ArrayList<>();
    if (!fileSystem.exists(dirPath)) {
      return newFiles;
    }
    RemoteIterator<FileStatus> iter = fileSystem.listStatusIterator(dirPath);
    while (iter.hasNext()) {
      FileStatus status = iter.next();
      String fileName = status.getPath().getName();
      Long offset = currentOffsets.get(fileName);
      // If the offset was never added or offset < fileSize.
      if (offset == null || offset < status.getLen()) {
        newFiles.add(status);
      }
    }
    return newFiles;
  }

  /**
   * Returns the current time, using the underlying clock in UTC time.
   */
  public LocalDateTime getNow() {
    // Use UTC date to ensure reader date is same on all timezones.
    return LocalDateTime.ofEpochSecond(clock.getTime() / 1000, 0, ZoneOffset.UTC);
  }

  public Configuration getConfig() {
    return conf;
  }
}
