blob: 73bc721e8466c4466c24205db456290b5d2cf2fb [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.data.management.copy;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.joda.time.DateTimeZone;
import org.joda.time.LocalDateTime;
import org.joda.time.Period;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.joda.time.format.PeriodFormatter;
import org.joda.time.format.PeriodFormatterBuilder;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import org.apache.gobblin.configuration.ConfigurationKeys;
public class TimeAwareRecursiveCopyableDataset extends RecursiveCopyableDataset {
private static final String CONFIG_PREFIX = CopyConfiguration.COPY_PREFIX + ".recursive";
public static final String DATE_PATTERN_KEY = CONFIG_PREFIX + ".date.pattern";
public static final String LOOKBACK_TIME_KEY = CONFIG_PREFIX + ".lookback.time";
public static final String DEFAULT_DATE_PATTERN_TIMEZONE = ConfigurationKeys.PST_TIMEZONE_NAME;
public static final String DATE_PATTERN_TIMEZONE_KEY = CONFIG_PREFIX + ".datetime.timezone";
private final String lookbackTime;
private final String datePattern;
private final Period lookbackPeriod;
private final boolean isPatternDaily;
private final boolean isPatternHourly;
private final boolean isPatternMinutely;
private final LocalDateTime currentTime;
private final DatePattern pattern;
enum DatePattern {
MINUTELY, HOURLY, DAILY
}
public TimeAwareRecursiveCopyableDataset(FileSystem fs, Path rootPath, Properties properties, Path glob) {
super(fs, rootPath, properties, glob);
this.lookbackTime = properties.getProperty(LOOKBACK_TIME_KEY);
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").appendMinutes().appendSuffix("m").toFormatter();
this.lookbackPeriod = periodFormatter.parsePeriod(lookbackTime);
this.datePattern = properties.getProperty(DATE_PATTERN_KEY);
this.isPatternMinutely = isDatePatternMinutely(datePattern);
this.isPatternHourly = !this.isPatternMinutely && isDatePatternHourly(datePattern);
this.isPatternDaily = !this.isPatternMinutely && !this.isPatternHourly;
this.currentTime = properties.containsKey(DATE_PATTERN_TIMEZONE_KEY) ? LocalDateTime.now(
DateTimeZone.forID(DATE_PATTERN_TIMEZONE_KEY))
: LocalDateTime.now(DateTimeZone.forID(DEFAULT_DATE_PATTERN_TIMEZONE));
if (this.isPatternDaily) {
Preconditions.checkArgument(isLookbackTimeStringDaily(this.lookbackTime), "Expected day format for lookback time; found hourly or minutely format");
pattern = DatePattern.DAILY;
} else if (this.isPatternHourly) {
Preconditions.checkArgument(isLookbackTimeStringHourly(this.lookbackTime), "Expected hourly format for lookback time; found minutely format");
pattern = DatePattern.HOURLY;
} else {
pattern = DatePattern.MINUTELY;
}
}
/**
* TODO: Replace it with {@link org.apache.gobblin.time.TimeIterator} as {@link LocalDateTime} will not adjust time
* to a given time zone
*/
public static class DateRangeIterator implements Iterator {
private LocalDateTime startDate;
private LocalDateTime endDate;
private DatePattern datePattern;
public DateRangeIterator(LocalDateTime startDate, LocalDateTime endDate, DatePattern datePattern) {
this.startDate = startDate;
this.endDate = endDate;
this.datePattern = datePattern;
}
@Override
public boolean hasNext() {
return !startDate.isAfter(endDate);
}
@Override
public LocalDateTime next() {
LocalDateTime dateTime = startDate;
switch (datePattern) {
case MINUTELY:
startDate = startDate.plusMinutes(1);
break;
case HOURLY:
startDate = startDate.plusHours(1);
break;
case DAILY:
startDate = startDate.plusDays(1);
break;
}
return dateTime;
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
}
private boolean isDatePatternHourly(String datePattern) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 0, 0);
String refDateTimeString = refDateTime.toString(formatter);
LocalDateTime refDateTimeAtStartOfDay = refDateTime.withHourOfDay(0);
String refDateTimeStringAtStartOfDay = refDateTimeAtStartOfDay.toString(formatter);
return !refDateTimeString.equals(refDateTimeStringAtStartOfDay);
}
private boolean isDatePatternMinutely(String datePattern) {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime refDateTime = new LocalDateTime(2017, 01, 01, 10, 59, 0);
String refDateTimeString = refDateTime.toString(formatter);
LocalDateTime refDateTimeAtStartOfHour = refDateTime.withMinuteOfHour(0);
String refDateTimeStringAtStartOfHour = refDateTimeAtStartOfHour.toString(formatter);
return !refDateTimeString.equals(refDateTimeStringAtStartOfHour);
}
private boolean isLookbackTimeStringDaily(String lookbackTime) {
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").toFormatter();
try {
periodFormatter.parsePeriod(lookbackTime);
return true;
} catch (Exception e) {
return false;
}
}
private boolean isLookbackTimeStringHourly(String lookbackTime) {
PeriodFormatter periodFormatter = new PeriodFormatterBuilder().appendDays().appendSuffix("d").appendHours().appendSuffix("h").toFormatter();
try {
periodFormatter.parsePeriod(lookbackTime);
return true;
} catch (Exception e) {
return false;
}
}
@Override
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter) throws IOException {
DateTimeFormatter formatter = DateTimeFormat.forPattern(datePattern);
LocalDateTime endDate = currentTime;
LocalDateTime startDate = endDate.minus(this.lookbackPeriod);
DateRangeIterator dateRangeIterator = new DateRangeIterator(startDate, endDate, pattern);
List<FileStatus> fileStatuses = Lists.newArrayList();
while (dateRangeIterator.hasNext()) {
Path pathWithDateTime = new Path(path, dateRangeIterator.next().toString(formatter));
fileStatuses.addAll(super.getFilesAtPath(fs, pathWithDateTime, fileFilter));
}
return fileStatuses;
}
}