[GOBBLIN-2005] Use correct root path when finding relative paths on destination side (#3882)
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
index cd4f188..366249b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/UnixTimestampRecursiveCopyableDataset.java
@@ -41,6 +41,8 @@
import com.google.common.collect.Lists;
+import lombok.AllArgsConstructor;
+
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.filters.AndPathFilter;
@@ -88,22 +90,23 @@
* based on {@link #timestampPattern} and filters out the paths that are out the date range
*
*/
+ @AllArgsConstructor
class TimestampPathFilter implements PathFilter {
+ private final Path path;
@Override
public boolean accept(Path path) {
LocalDate endDate = currentTime.toLocalDate();
LocalDate startDate = endDate.minus(lookbackPeriod);
- Path relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), datasetRoot());
+ Path relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(path), this.path);
Matcher matcher = timestampPattern.matcher(relativePath.toString());
if (!matcher.matches()) {
return false;
}
Long timestamp = Long.parseLong(matcher.group(1));
LocalDate dateOfTimestamp = new LocalDateTime(timestamp, dateTimeZone).toLocalDate();
- return !(dateOfTimestamp == null || dateOfTimestamp.isAfter(endDate) || dateOfTimestamp.isEqual(startDate)
- || dateOfTimestamp.isBefore(startDate));
+ return !(dateOfTimestamp.isAfter(endDate) || dateOfTimestamp.isEqual(startDate) || dateOfTimestamp.isBefore(startDate));
}
}
@@ -111,8 +114,8 @@
protected List<FileStatus> getFilesAtPath(FileSystem fs, Path path, PathFilter fileFilter)
throws IOException {
- // Filter files by lookback period (fileNames >= startDate and fileNames <= endDate)
- PathFilter andPathFilter = new AndPathFilter(fileFilter, new TimestampPathFilter());
+ // Filter files by lookback period (fileNames >= startDate and fileNames < endDate)
+ PathFilter andPathFilter = new AndPathFilter(fileFilter, new TimestampPathFilter(path));
List<FileStatus> files = super.getFilesAtPath(fs, path, andPathFilter);
if (VersionSelectionPolicy.ALL == versionSelectionPolicy) {
@@ -122,7 +125,7 @@
Map<Pair<String, LocalDate>, TreeMap<Long, List<FileStatus>>> pathTimestampFilesMap = new HashMap<>();
// Now select files per day based on version selection policy
for (FileStatus fileStatus : files) {
- String relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), datasetRoot()).toString();
+ String relativePath = PathUtils.relativizePath(PathUtils.getPathWithoutSchemeAndAuthority(fileStatus.getPath()), path).toString();
Matcher matcher = timestampPattern.matcher(relativePath);
if (!matcher.matches()) {
continue;