/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.gobblin.data.management.copy;

import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;

import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTimeZone;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.HiddenFilter;

@Slf4j
public class TimeAwareRecursiveCopyableDatasetTest {
  private FileSystem fs;
  private Path baseDir1;
  private Path baseDir2;
  private Path baseDir3;
  private Path baseDir4;

  private static final String NUM_LOOKBACK_DAYS_STR = "2d";
  private static final Integer NUM_LOOKBACK_DAYS = 2;
  private static final String NUM_LOOKBACK_HOURS_STR = "4h";
  private static final Integer NUM_LOOKBACK_HOURS = 4;
  private static final Integer MAX_NUM_DAILY_DIRS = 4;
  private static final Integer MAX_NUM_HOURLY_DIRS = 48;
  private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h";
  private static final Integer NUM_DAYS_HOURS_DIRS = 25;
  private static final String NUM_LOOKBACK_HOURS_MINS_STR = "1h1m";

  @BeforeClass
  public void setUp() throws IOException {
    Assert.assertTrue(NUM_LOOKBACK_DAYS < MAX_NUM_DAILY_DIRS);
    Assert.assertTrue(NUM_LOOKBACK_HOURS < MAX_NUM_HOURLY_DIRS);

    this.fs = FileSystem.getLocal(new Configuration());

    baseDir1 = new Path("/tmp/src/ds1/hourly");
    if (fs.exists(baseDir1)) {
      fs.delete(baseDir1, true);
    }
    fs.mkdirs(baseDir1);

    baseDir2 = new Path("/tmp/src/ds1/daily");
    if (fs.exists(baseDir2)) {
      fs.delete(baseDir2, true);
    }
    fs.mkdirs(baseDir2);

    baseDir3 = new Path("/tmp/src/ds2/daily");
    if (fs.exists(baseDir3)) {
      fs.delete(baseDir3, true);
    }
    fs.mkdirs(baseDir3);

    baseDir4 = new Path("/tmp/src/ds3/daily");
    if (fs.exists(baseDir4)) {
      fs.delete(baseDir4, true);
    }
    fs.mkdirs(baseDir4);
    PeriodFormatter formatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
    Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
  }

  @Test
  public void testGetFilesAtPath() throws IOException {
    String datePattern = "yyyy/MM/dd/HH";
    DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);

    LocalDateTime endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));

    Set<String> candidateFiles = new HashSet<>();
    for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
      String startDate = endDate.minusHours(i).toString(formatter);
      Path subDirPath = new Path(baseDir1, new Path(startDate));
      fs.mkdirs(subDirPath);
      Path filePath = new Path(subDirPath, i + ".avro");
      fs.create(filePath);
      if (i < (NUM_LOOKBACK_HOURS + 1)) {
        candidateFiles.add(filePath.toString());
      }
    }

    //Lookback time = "4h"
    Properties properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_HOURS_STR);
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH");

    PathFilter pathFilter = new HiddenFilter();
    TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
        new Path("/tmp/src/*/hourly"));
    List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);

    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_HOURS + 1);

    for (FileStatus fileStatus: fileStatusList) {
      Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
    }

    //Lookback time = "1d1h"
    properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR);
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH");
    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
        new Path("/tmp/src/*/hourly"));
    fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);
    candidateFiles = new HashSet<>();
    datePattern = "yyyy/MM/dd/HH";
    formatter = DateTimeFormat.forPattern(datePattern);

    for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
      String startDate = endDate.minusHours(i).toString(formatter);
      Path subDirPath = new Path(baseDir1, new Path(startDate));
      Path filePath = new Path(subDirPath, i + ".avro");
      if (i < NUM_DAYS_HOURS_DIRS + 1) {
        candidateFiles.add(filePath.toString());
      }
    }

    Assert.assertEquals(fileStatusList.size(), NUM_DAYS_HOURS_DIRS + 1);
    for (FileStatus fileStatus: fileStatusList) {
      Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
    }

    //Lookback time = "2d"
    datePattern = "yyyy/MM/dd";
    formatter = DateTimeFormat.forPattern(datePattern);
    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));

    candidateFiles = new HashSet<>();
    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
      String startDate = endDate.minusDays(i).toString(formatter);
      Path subDirPath = new Path(baseDir2, new Path(startDate));
      fs.mkdirs(subDirPath);
      Path filePath = new Path(subDirPath, i + ".avro");
      fs.create(filePath);
      if (i < (NUM_LOOKBACK_DAYS + 1)) {
        candidateFiles.add(filePath.toString());
      }
    }

    properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_STR);
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd");

    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
        new Path("/tmp/src/*/daily"));
    fileStatusList = dataset.getFilesAtPath(fs, baseDir2, pathFilter);

    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
    for (FileStatus fileStatus: fileStatusList) {
      Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
    }

    // test ds of daily/yyyy-MM-dd-HH-mm
    datePattern = "yyyy-MM-dd-HH-mm";
    formatter = DateTimeFormat.forPattern(datePattern);
    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));

    Random random = new Random();

    candidateFiles = new HashSet<>();
    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
      String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).toString(formatter);
      if (i == 0) {
        // avoid future dates on minutes, so have consistency test result
        startDate = endDate.minusHours(i).withMinuteOfHour(0).toString(formatter);
      }
      Path subDirPath = new Path(baseDir3, new Path(startDate));
      fs.mkdirs(subDirPath);
      Path filePath = new Path(subDirPath, i + ".avro");
      fs.create(filePath);
      if (i < (NUM_LOOKBACK_DAYS + 1)) {
        candidateFiles.add(filePath.toString());
      }
    }

    properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, "2d1h");
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH-mm");

    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
        new Path("/tmp/src/ds2/daily"));

    fileStatusList = dataset.getFilesAtPath(fs, baseDir3, pathFilter);

    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
    for (FileStatus fileStatus: fileStatusList) {
      Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
    }

    // test ds of daily/yyyy-MM-dd-HH-mm-ss
    datePattern = "yyyy-MM-dd-HH-mm-ss";
    formatter = DateTimeFormat.forPattern(datePattern);
    endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));

    candidateFiles = new HashSet<>();
    for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
      String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);
      if (i == 0) {
        // avoid future dates on minutes, so have consistency test result
        startDate = endDate.minusHours(i).withMinuteOfHour(0).withSecondOfMinute(0).toString(formatter);
      }
      Path subDirPath = new Path(baseDir4, new Path(startDate));
      fs.mkdirs(subDirPath);
      Path filePath = new Path(subDirPath, i + ".avro");
      fs.create(filePath);
      if (i < (NUM_LOOKBACK_DAYS + 1)) {
        candidateFiles.add(filePath.toString());
      }
    }

    properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, "2d1h");
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH-mm-ss");

    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir4, properties,
        new Path("/tmp/src/ds3/daily"));

    fileStatusList = dataset.getFilesAtPath(fs, baseDir4, pathFilter);

    Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
    for (FileStatus fileStatus: fileStatusList) {
      Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
    }
  }

  @Test (expectedExceptions = IllegalArgumentException.class)
  public void testInstantiationError() {
    //Daily directories, but look back time has days and hours. We should expect an assertion error.
    Properties properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR);
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd");

    TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
        new Path("/tmp/src/*/daily"));

    // hourly directories, but look back time has hours and minutes. We should expect an assertion error.
    properties = new Properties();
    properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_HOURS_MINS_STR);
    properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH");

    dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
        new Path("/tmp/src/ds2/daily"));

  }

  @AfterClass
  public void clean() throws IOException {
    //Delete tmp directories
    this.fs.delete(baseDir1, true);
    this.fs.delete(baseDir2, true);
    this.fs.delete(baseDir3, true);
    this.fs.delete(baseDir4, true);
  }
}