blob: 521caaafecec344afc6f74044e7d3be9ac0f7520 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTimeZone;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.HiddenFilter;
@Slf4j
public class TimeAwareRecursiveCopyableDatasetTest {
private FileSystem fs;
private Path baseDir1;
private Path baseDir2;
private Path baseDir3;
private Path baseDir4;
private static final String NUM_LOOKBACK_DAYS_STR = "2d";
private static final Integer NUM_LOOKBACK_DAYS = 2;
private static final String NUM_LOOKBACK_HOURS_STR = "4h";
private static final Integer NUM_LOOKBACK_HOURS = 4;
private static final Integer MAX_NUM_DAILY_DIRS = 4;
private static final Integer MAX_NUM_HOURLY_DIRS = 48;
private static final String NUM_LOOKBACK_DAYS_HOURS_STR = "1d1h";
private static final Integer NUM_DAYS_HOURS_DIRS = 25;
private static final String NUM_LOOKBACK_HOURS_MINS_STR = "1h1m";
@BeforeClass
public void setUp() throws IOException {
Assert.assertTrue(NUM_LOOKBACK_DAYS < MAX_NUM_DAILY_DIRS);
Assert.assertTrue(NUM_LOOKBACK_HOURS < MAX_NUM_HOURLY_DIRS);
this.fs = FileSystem.getLocal(new Configuration());
baseDir1 = new Path("/tmp/src/ds1/hourly");
if (fs.exists(baseDir1)) {
fs.delete(baseDir1, true);
}
fs.mkdirs(baseDir1);
baseDir2 = new Path("/tmp/src/ds1/daily");
if (fs.exists(baseDir2)) {
fs.delete(baseDir2, true);
}
fs.mkdirs(baseDir2);
baseDir3 = new Path("/tmp/src/ds2/daily");
if (fs.exists(baseDir3)) {
fs.delete(baseDir3, true);
}
fs.mkdirs(baseDir3);
baseDir4 = new Path("/tmp/src/ds3/daily");
if (fs.exists(baseDir4)) {
fs.delete(baseDir4, true);
}
fs.mkdirs(baseDir4);
PeriodFormatter formatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
Period period = formatter.parsePeriod(NUM_LOOKBACK_DAYS_HOURS_STR);
}
@Test
public void testGetFilesAtPath() throws IOException {
String datePattern = "yyyy/MM/dd/HH";
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
Set<String> candidateFiles = new HashSet<>();
for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
String startDate = endDate.minusHours(i).toString(formatter);
Path subDirPath = new Path(baseDir1, new Path(startDate));
fs.mkdirs(subDirPath);
Path filePath = new Path(subDirPath, i + ".avro");
fs.create(filePath);
if (i < (NUM_LOOKBACK_HOURS + 1)) {
candidateFiles.add(filePath.toString());
}
}
//Lookback time = "4h"
Properties properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_HOURS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH");
PathFilter pathFilter = new HiddenFilter();
TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
new Path("/tmp/src/*/hourly"));
List<FileStatus> fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);
Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_HOURS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
//Lookback time = "1d1h"
properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd/HH");
dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir1, properties,
new Path("/tmp/src/*/hourly"));
fileStatusList = dataset.getFilesAtPath(fs, baseDir1, pathFilter);
candidateFiles = new HashSet<>();
datePattern = "yyyy/MM/dd/HH";
formatter = DateTimeFormat.forPattern(datePattern);
for (int i = 0; i < MAX_NUM_HOURLY_DIRS; i++) {
String startDate = endDate.minusHours(i).toString(formatter);
Path subDirPath = new Path(baseDir1, new Path(startDate));
Path filePath = new Path(subDirPath, i + ".avro");
if (i < NUM_DAYS_HOURS_DIRS + 1) {
candidateFiles.add(filePath.toString());
}
}
Assert.assertEquals(fileStatusList.size(), NUM_DAYS_HOURS_DIRS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
//Lookback time = "2d"
datePattern = "yyyy/MM/dd";
formatter = DateTimeFormat.forPattern(datePattern);
endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
candidateFiles = new HashSet<>();
for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
String startDate = endDate.minusDays(i).toString(formatter);
Path subDirPath = new Path(baseDir2, new Path(startDate));
fs.mkdirs(subDirPath);
Path filePath = new Path(subDirPath, i + ".avro");
fs.create(filePath);
if (i < (NUM_LOOKBACK_DAYS + 1)) {
candidateFiles.add(filePath.toString());
}
}
properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd");
dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
new Path("/tmp/src/*/daily"));
fileStatusList = dataset.getFilesAtPath(fs, baseDir2, pathFilter);
Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
// test ds of daily/yyyy-MM-dd-HH-mm
datePattern = "yyyy-MM-dd-HH-mm";
formatter = DateTimeFormat.forPattern(datePattern);
endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
Random random = new Random();
candidateFiles = new HashSet<>();
for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).toString(formatter);
if (i == 0) {
// avoid future dates on minutes, so have consistency test result
startDate = endDate.minusHours(i).withMinuteOfHour(0).toString(formatter);
}
Path subDirPath = new Path(baseDir3, new Path(startDate));
fs.mkdirs(subDirPath);
Path filePath = new Path(subDirPath, i + ".avro");
fs.create(filePath);
if (i < (NUM_LOOKBACK_DAYS + 1)) {
candidateFiles.add(filePath.toString());
}
}
properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, "2d1h");
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH-mm");
dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
new Path("/tmp/src/ds2/daily"));
fileStatusList = dataset.getFilesAtPath(fs, baseDir3, pathFilter);
Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
// test ds of daily/yyyy-MM-dd-HH-mm-ss
datePattern = "yyyy-MM-dd-HH-mm-ss";
formatter = DateTimeFormat.forPattern(datePattern);
endDate = LocalDateTime.now(DateTimeZone.forID(TimeAwareRecursiveCopyableDataset.DEFAULT_DATE_PATTERN_TIMEZONE));
candidateFiles = new HashSet<>();
for (int i = 0; i < MAX_NUM_DAILY_DIRS; i++) {
String startDate = endDate.minusDays(i).withMinuteOfHour(random.nextInt(60)).withSecondOfMinute(random.nextInt(60)).toString(formatter);
if (i == 0) {
// avoid future dates on minutes, so have consistency test result
startDate = endDate.minusHours(i).withMinuteOfHour(0).withSecondOfMinute(0).toString(formatter);
}
Path subDirPath = new Path(baseDir4, new Path(startDate));
fs.mkdirs(subDirPath);
Path filePath = new Path(subDirPath, i + ".avro");
fs.create(filePath);
if (i < (NUM_LOOKBACK_DAYS + 1)) {
candidateFiles.add(filePath.toString());
}
}
properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, "2d1h");
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH-mm-ss");
dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir4, properties,
new Path("/tmp/src/ds3/daily"));
fileStatusList = dataset.getFilesAtPath(fs, baseDir4, pathFilter);
Assert.assertEquals(fileStatusList.size(), NUM_LOOKBACK_DAYS + 1);
for (FileStatus fileStatus: fileStatusList) {
Assert.assertTrue(candidateFiles.contains(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()).toString()));
}
}
@Test (expectedExceptions = IllegalArgumentException.class)
public void testInstantiationError() {
//Daily directories, but look back time has days and hours. We should expect an assertion error.
Properties properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_DAYS_HOURS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy/MM/dd");
TimeAwareRecursiveCopyableDataset dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir2, properties,
new Path("/tmp/src/*/daily"));
// hourly directories, but look back time has hours and minutes. We should expect an assertion error.
properties = new Properties();
properties.setProperty(TimeAwareRecursiveCopyableDataset.LOOKBACK_TIME_KEY, NUM_LOOKBACK_HOURS_MINS_STR);
properties.setProperty(TimeAwareRecursiveCopyableDataset.DATE_PATTERN_KEY, "yyyy-MM-dd-HH");
dataset = new TimeAwareRecursiveCopyableDataset(fs, baseDir3, properties,
new Path("/tmp/src/ds2/daily"));
}
@AfterClass
public void clean() throws IOException {
//Delete tmp directories
this.fs.delete(baseDir1, true);
this.fs.delete(baseDir2, true);
this.fs.delete(baseDir3, true);
this.fs.delete(baseDir4, true);
}
}