| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.druid.indexer.path; |
| |
| import com.fasterxml.jackson.annotation.JsonProperty; |
| import org.apache.druid.indexer.HadoopDruidIndexerConfig; |
| import org.apache.druid.indexer.hadoop.FSSpideringIterator; |
| import org.apache.druid.java.util.common.granularity.Granularity; |
| import org.apache.druid.java.util.common.guava.Comparators; |
| import org.apache.druid.java.util.common.logger.Logger; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.joda.time.DateTime; |
| import org.joda.time.Interval; |
| import org.joda.time.format.DateTimeFormat; |
| import org.joda.time.format.DateTimeFormatter; |
| |
| import java.io.IOException; |
| import java.util.Set; |
| import java.util.TreeSet; |
| import java.util.regex.Pattern; |
| |
| /** |
| */ |
| public class GranularityPathSpec implements PathSpec |
| { |
| private static final Logger log = new Logger(GranularityPathSpec.class); |
| |
| private String inputPath; |
| private String filePattern; |
| private Granularity dataGranularity; |
| private String pathFormat; |
| private Class<? extends InputFormat> inputFormat; |
| |
| @JsonProperty |
| public String getInputPath() |
| { |
| return inputPath; |
| } |
| |
| public void setInputPath(String inputPath) |
| { |
| this.inputPath = inputPath; |
| } |
| |
| @JsonProperty |
| public Class<? extends InputFormat> getInputFormat() |
| { |
| return inputFormat; |
| } |
| |
| public void setInputFormat(Class<? extends InputFormat> inputFormat) |
| { |
| this.inputFormat = inputFormat; |
| } |
| |
| @JsonProperty |
| public String getFilePattern() |
| { |
| return filePattern; |
| } |
| |
| public void setFilePattern(String filePattern) |
| { |
| this.filePattern = filePattern; |
| } |
| |
| @JsonProperty |
| public Granularity getDataGranularity() |
| { |
| return dataGranularity; |
| } |
| |
| public void setDataGranularity(Granularity dataGranularity) |
| { |
| this.dataGranularity = dataGranularity; |
| } |
| |
| @JsonProperty |
| public String getPathFormat() |
| { |
| return pathFormat; |
| } |
| |
| public void setPathFormat(String pathFormat) |
| { |
| this.pathFormat = pathFormat; |
| } |
| |
| @Override |
| public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException |
| { |
| final Set<Interval> intervals = new TreeSet<>(Comparators.intervalsByStartThenEnd()); |
| for (Interval inputInterval : config.getInputIntervals()) { |
| for (Interval interval : dataGranularity.getIterable(inputInterval)) { |
| intervals.add(trim(inputInterval, interval)); |
| } |
| } |
| |
| Path betaInput = new Path(inputPath); |
| FileSystem fs = betaInput.getFileSystem(job.getConfiguration()); |
| Set<String> paths = new TreeSet<>(); |
| Pattern fileMatcher = Pattern.compile(filePattern); |
| |
| DateTimeFormatter customFormatter = null; |
| if (pathFormat != null) { |
| customFormatter = DateTimeFormat.forPattern(pathFormat); |
| } |
| |
| for (Interval interval : intervals) { |
| DateTime t = interval.getStart(); |
| String intervalPath; |
| if (customFormatter != null) { |
| intervalPath = customFormatter.print(t); |
| } else { |
| intervalPath = dataGranularity.toPath(t); |
| } |
| |
| Path granularPath = new Path(betaInput, intervalPath); |
| log.info("Checking path[%s]", granularPath); |
| for (FileStatus status : FSSpideringIterator.spiderIterable(fs, granularPath)) { |
| final Path filePath = status.getPath(); |
| if (fileMatcher.matcher(filePath.toString()).matches()) { |
| paths.add(filePath.toString()); |
| } |
| } |
| } |
| |
| log.info("Appending path %s", paths); |
| StaticPathSpec.addToMultipleInputs(config, job, paths, inputFormat); |
| |
| return job; |
| } |
| |
| private Interval trim(Interval inputInterval, Interval interval) |
| { |
| long start = interval.getStartMillis(); |
| long end = interval.getEndMillis(); |
| |
| boolean makeNew = false; |
| if (start < inputInterval.getStartMillis()) { |
| start = inputInterval.getStartMillis(); |
| makeNew = true; |
| } |
| if (end > inputInterval.getEndMillis()) { |
| end = inputInterval.getEndMillis(); |
| makeNew = true; |
| } |
| return makeNew ? new Interval(start, end, interval.getChronology()) : interval; |
| } |
| |
| } |