blob: 0e620acefa956a8c0ae7a9d67705a6500e89e99e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package datafu.hourglass.fs;
import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Collections;
import java.util.Date;
import java.util.LinkedList;
import java.util.List;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.log4j.Logger;
/**
* A collection of utility methods for dealing with files and paths.
*
*/
public class PathUtils
{
private static Logger _log = Logger.getLogger(PathUtils.class);
public final static TimeZone timeZone = TimeZone.getTimeZone("UTC");
public static final SimpleDateFormat datedPathFormat = new SimpleDateFormat("yyyyMMdd");
public static final SimpleDateFormat nestedDatedPathFormat = new SimpleDateFormat("yyyy/MM/dd");
private static final Pattern timestampPathPattern = Pattern.compile(".+/(\\d{8})");
private static final Pattern dailyPathPattern = Pattern.compile("(.+)/(\\d{4}/\\d{2}/\\d{2})");
/**
* Filters out paths starting with "." and "_".
*/
public static final PathFilter nonHiddenPathFilter = new PathFilter() {
@Override
public boolean accept(Path path)
{
String s = path.getName();
return !s.startsWith(".") && !s.startsWith("_");
}
};
static
{
datedPathFormat.setTimeZone(timeZone);
nestedDatedPathFormat.setTimeZone(timeZone);
}
/**
* Delete all but the last N days of paths matching the "yyyyMMdd" format.
*
* @param fs the filesystem
* @param path path to delete file from
* @param retentionCount how many days to keep
* @throws IOException IOException
*/
public static void keepLatestDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
{
LinkedList<DatePath> outputs = new LinkedList<DatePath>(PathUtils.findDatedPaths(fs, path));
while (outputs.size() > retentionCount)
{
DatePath toDelete = outputs.removeFirst();
_log.info(String.format("Removing %s",toDelete.getPath()));
fs.delete(toDelete.getPath(),true);
}
}
/**
* Delete all but the last N days of paths matching the "yyyy/MM/dd" format.
*
* @param fs the filesystem
* @param path path to delete files from
* @param retentionCount how many days to keep
* @throws IOException IOException
*/
public static void keepLatestNestedDatedPaths(FileSystem fs, Path path, int retentionCount) throws IOException
{
List<DatePath> outputPaths = PathUtils.findNestedDatedPaths(fs, path);
if (outputPaths.size() > retentionCount)
{
Collections.sort(outputPaths);
_log.info(String.format("Applying retention range policy"));
for (DatePath output : outputPaths.subList(0, outputPaths.size() - retentionCount))
{
_log.info(String.format("Removing %s because it is outside retention range",output.getPath()));
fs.delete(output.getPath(),true);
}
}
}
/**
* List all paths matching the "yyyy/MM/dd" format under a given path.
*
* @param fs file system
* @param input path to search under
* @return paths paths matching pattern
* @throws IOException IOException
*/
public static List<DatePath> findNestedDatedPaths(FileSystem fs, Path input) throws IOException
{
List<DatePath> inputDates = new ArrayList<DatePath>();
FileStatus[] pathsStatus = fs.globStatus(new Path(input,"*/*/*"), nonHiddenPathFilter);
if (pathsStatus == null)
{
return inputDates;
}
for (FileStatus pathStatus : pathsStatus)
{
Matcher matcher = dailyPathPattern.matcher(pathStatus.getPath().toString());
if (matcher.matches())
{
String datePath = matcher.group(2);
Date date;
try
{
date = nestedDatedPathFormat.parse(datePath);
}
catch (ParseException e)
{
continue;
}
Calendar cal = Calendar.getInstance(timeZone);
cal.setTimeInMillis(date.getTime());
inputDates.add(new DatePath(cal.getTime(), pathStatus.getPath()));
}
}
return inputDates;
}
/**
* List all paths matching the "yyyyMMdd" format under a given path.
*
* @param fs file system
* @param path path to search under
* @return paths paths matching pattern
* @throws IOException IOException
*/
public static List<DatePath> findDatedPaths(FileSystem fs, Path path) throws IOException
{
FileStatus[] outputPaths = fs.globStatus(new Path(path, "*"), nonHiddenPathFilter);
List<DatePath> outputs = new ArrayList<DatePath>();
if (outputPaths != null)
{
for (FileStatus outputPath : outputPaths)
{
Date date;
try
{
date = datedPathFormat.parse(outputPath.getPath().getName());
}
catch (ParseException e)
{
continue;
}
outputs.add(new DatePath(date,outputPath.getPath()));
}
}
Collections.sort(outputs);
return outputs;
}
/**
* Gets the schema from a given Avro data file.
*
* @param fs the filesystem
* @param path path to get schema from
* @return The schema read from the data file's metadata.
* @throws IOException IOException
*/
public static Schema getSchemaFromFile(FileSystem fs, Path path) throws IOException
{
FSDataInputStream dataInputStream = fs.open(path);
DatumReader <GenericRecord> reader = new GenericDatumReader<GenericRecord>();
DataFileStream<GenericRecord> dataFileStream = new DataFileStream<GenericRecord>(dataInputStream, reader);
try
{
return dataFileStream.getSchema();
}
finally
{
dataFileStream.close();
}
}
/**
* Gets the schema for the first Avro file under the given path.
*
* @param fs the filesystem
* @param path path to fetch schema for
* @return Avro schema
* @throws IOException IOException
*/
public static Schema getSchemaFromPath(FileSystem fs, Path path) throws IOException
{
return getSchemaFromFile(fs,fs.listStatus(path, nonHiddenPathFilter)[0].getPath());
}
/**
* Sums the size of all files listed under a given path.
*
* @param fs file system
* @param path path to count bytes for
* @return total bytes under path
* @throws IOException IOException
*/
public static long countBytes(FileSystem fs, Path path) throws IOException
{
FileStatus[] files = fs.listStatus(path, nonHiddenPathFilter);
long totalForPath = 0L;
for (FileStatus file : files)
{
totalForPath += file.getLen();
}
return totalForPath;
}
/**
* Gets the date for a path in the "yyyyMMdd" format.
*
* @param path path to check
* @return date for path
*/
public static Date getDateForDatedPath(Path path)
{
Matcher matcher = timestampPathPattern.matcher(path.toString());
if (!matcher.matches())
{
throw new RuntimeException("Unexpected input filename: " + path);
}
try
{
return PathUtils.datedPathFormat.parse(matcher.group(1));
}
catch (ParseException e)
{
throw new RuntimeException("Unexpected input filename: " + path);
}
}
/**
* Gets the date for a path in the "yyyy/MM/dd" format.
*
* @param path path to check
* @return date
*/
public static Date getDateForNestedDatedPath(Path path)
{
Matcher matcher = dailyPathPattern.matcher(path.toString());
if (!matcher.matches())
{
throw new RuntimeException("Unexpected input filename: " + path);
}
try
{
return PathUtils.nestedDatedPathFormat.parse(matcher.group(2));
}
catch (ParseException e)
{
throw new RuntimeException("Unexpected input filename: " + path);
}
}
/**
* Gets the root path for a path in the "yyyy/MM/dd" format. This is part of the path preceding the
* "yyyy/MM/dd" portion.
*
* @param path in "yyyy/MM/dd" format
* @return root of input path
*/
public static Path getNestedPathRoot(Path path)
{
Matcher matcher = dailyPathPattern.matcher(path.toString());
if (!matcher.matches())
{
throw new RuntimeException("Unexpected input filename: " + path);
}
return new Path(matcher.group(1));
}
}