blob: cd7cd5a6b2c89aab671e0c02672bb8663c224539 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.indexer.path;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.hadoop.FSSpideringIterator;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.joda.time.DateTime;
import org.joda.time.Interval;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import java.io.IOException;
import java.util.Set;
import java.util.TreeSet;
import java.util.regex.Pattern;
/**
*/
public class GranularityPathSpec implements PathSpec
{
private static final Logger log = new Logger(GranularityPathSpec.class);
private String inputPath;
private String filePattern;
private Granularity dataGranularity;
private String pathFormat;
private Class<? extends InputFormat> inputFormat;
@JsonProperty
public String getInputPath()
{
return inputPath;
}
public void setInputPath(String inputPath)
{
this.inputPath = inputPath;
}
@JsonProperty
public Class<? extends InputFormat> getInputFormat()
{
return inputFormat;
}
public void setInputFormat(Class<? extends InputFormat> inputFormat)
{
this.inputFormat = inputFormat;
}
@JsonProperty
public String getFilePattern()
{
return filePattern;
}
public void setFilePattern(String filePattern)
{
this.filePattern = filePattern;
}
@JsonProperty
public Granularity getDataGranularity()
{
return dataGranularity;
}
public void setDataGranularity(Granularity dataGranularity)
{
this.dataGranularity = dataGranularity;
}
@JsonProperty
public String getPathFormat()
{
return pathFormat;
}
public void setPathFormat(String pathFormat)
{
this.pathFormat = pathFormat;
}
@Override
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{
final Set<Interval> intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd());
for (Interval inputInterval : config.getInputIntervals()) {
for (Interval interval : dataGranularity.getIterable(inputInterval)) {
intervals.add(trim(inputInterval, interval));
}
}
Path betaInput = new Path(inputPath);
FileSystem fs = betaInput.getFileSystem(job.getConfiguration());
Set<String> paths = new TreeSet<>();
Pattern fileMatcher = Pattern.compile(filePattern);
DateTimeFormatter customFormatter = null;
if (pathFormat != null) {
customFormatter = DateTimeFormat.forPattern(pathFormat);
}
for (Interval interval : intervals) {
DateTime t = interval.getStart();
String intervalPath;
if (customFormatter != null) {
intervalPath = customFormatter.print(t);
} else {
intervalPath = dataGranularity.toPath(t);
}
Path granularPath = new Path(betaInput, intervalPath);
log.info("Checking path[%s]", granularPath);
for (FileStatus status : FSSpideringIterator.spiderIterable(fs, granularPath)) {
final Path filePath = status.getPath();
if (fileMatcher.matcher(filePath.toString()).matches()) {
paths.add(filePath.toString());
}
}
}
log.info("Appending path %s", paths);
StaticPathSpec.addToMultipleInputs(config, job, paths, inputFormat);
return job;
}
private Interval trim(Interval inputInterval, Interval interval)
{
long start = interval.getStartMillis();
long end = interval.getEndMillis();
boolean makeNew = false;
if (start < inputInterval.getStartMillis()) {
start = inputInterval.getStartMillis();
makeNew = true;
}
if (end > inputInterval.getEndMillis()) {
end = inputInterval.getEndMillis();
makeNew = true;
}
return makeNew ? new Interval(start, end, interval.getChronology()) : interval;
}
}