blob: e0cdd98998ccd602a1f119c18acf3432ca6ed520 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTimeZone;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import com.google.common.collect.Lists;
import org.apache.gobblin.configuration.ConfigurationKeys;
@Slf4j
public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset {
private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive";
public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern";
public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + ".lookback.time";
public static final String DEFAULT_DATE_PATTERN_TIMEZONE = ConfigurationKeys.PST_TIMEZONE_NAME;
public static final String DATE_PATTERN_TIMEZONE_KEY = CONFIG_PREFIX + ".datetime.timezone";
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
private final LocalDateTime currentTime;
public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) {
super(fs, rootPath, properties, glob);
this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays()
.appendSuffix("d")
.appendHours()
.appendSuffix("h")
.appendMinutes()
.appendSuffix("m")
.toFormatter();
this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? LocalDateTime.now(
DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
: LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
this.validateLookbackWithDatePatternFormat(this.datePattern, this.lookbackTime);
}
void validateLookbackWithDatePatternFormat(String datePattern, String lookbackTime) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime refDateTime = new LocalDateTime(2017, 01, 31, 10, 59, 59);
String refDateTimeString = refDateTime.toString(formatter);
PeriodFormatterBuilder formatterBuilder;
// Validate that the lookback is supported for the time format
if (!refDateTime.withSecondOfMinute(0).toString(formatter).equals(refDateTimeString)) {
formatterBuilder = new PeriodFormatterBuilder().appendDays()
.appendSuffix("d")
.appendHours()
.appendSuffix("h")
.appendMinutes()
.appendSuffix("m")
.appendSeconds()
.appendSuffix("s");
if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely or secondly format, check %s",
LOOKBACK_TIME_KEY));
}
} else if (!refDateTime.withMinuteOfHour(0).toString(formatter).equals(refDateTimeString)) {
formatterBuilder = new PeriodFormatterBuilder().appendDays()
.appendSuffix("d")
.appendHours()
.appendSuffix("h")
.appendMinutes()
.appendSuffix("m");
if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly or minutely format, check %s",
LOOKBACK_TIME_KEY));
}
} else if (!refDateTime.withHourOfDay(0).toString(formatter).equals(refDateTimeString)) {
formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h");
if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
throw new IllegalArgumentException(String.format("Expected lookback time to be in daily or hourly format, check %s", LOOKBACK_TIME_KEY));
}
} else if (!refDateTime.withDayOfMonth(1).toString(formatter).equals(refDateTimeString)) {
formatterBuilder = new PeriodFormatterBuilder().appendDays().appendSuffix("d");
if (!lookbackTimeMatchesFormat(formatterBuilder, lookbackTime)) {
throw new IllegalArgumentException(String.format("Expected lookback time to be in daily format, check %s", LOOKBACK_TIME_KEY));
}
}
}
private boolean lookbackTimeMatchesFormat(PeriodFormatterBuilder formatterBuilder, String lookbackTime) {
try {
formatterBuilder.toFormatter().parsePeriod(lookbackTime);
} catch (IllegalArgumentException e) {
return false;
}
return true;
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
LocalDateTime endDate = currentTime;
DateTimeFormatter formatter = DateTimeFormat.forPattern(this.datePattern);
LocalDateTime startDate = formatter.parseLocalDateTime(endDate.minus(this.lookbackPeriod).toString(this.datePattern));
return recursivelyGetFilesAtDatePath(fs, path, "", fileFilter, 1, startDate, endDate, formatter);
}
private List<FileStatus> recursivelyGetFilesAtDatePath(FileSystem fs, Path path, String traversedDatePath, PathFilter fileFilter,
int level, LocalDateTime startDate, LocalDateTime endDate, DateTimeFormatter formatter) throws IOException {
List<FileStatus> fileStatuses = Lists.newArrayList();
Iterator<FileStatus> folderIterator = Arrays.asList(fs.listStatus(path)).iterator();
// Check if at the lowest level/granularity of the date folder
if (this.datePattern.split(FileSystems.getDefault().getSeparator()).length == level) {
// Truncate the start date to the most granular unit of time in the datepattern
while (folderIterator.hasNext()) {
Path folderPath = folderIterator.next().getPath();
String datePath = traversedDatePath.isEmpty() ? folderPath.getName() : new Path(traversedDatePath, folderPath.getName()).toString();
try {
LocalDateTime folderDate = formatter.parseLocalDateTime(datePath);
if (!folderDate.isBefore(startDate) && !folderDate.isAfter(endDate)) {
fileStatuses.addAll(super.getFilesAtPath(fs, folderPath, fileFilter));
}
} catch (IllegalArgumentException e) {
log.warn(String.format(
"Folder at path %s is not convertible to format %s. Please confirm that argument %s is valid", datePath,
this.datePattern, DATE_PATTERN_KEY));
}
}
} else {
// folder has a format such as yyyy/mm/dd/hh, so recursively find date paths
while (folderIterator.hasNext()) {
// Start building the date from top-down
String nextDate = folderIterator.next().getPath().getName();
String datePath = traversedDatePath.isEmpty() ? nextDate : new Path(traversedDatePath, nextDate).toString();
fileStatuses.addAll(recursivelyGetFilesAtDatePath(fs, new Path(path, nextDate), datePath, fileFilter, level + 1, startDate, endDate, formatter));
}
}
return fileStatuses;
}
}