blob: 5fbc27e8693ba75f8657c80df1c07ddba424ef3c [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.dataloads.bulk;
import org.apache.commons.cli.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import java.util.Date;
import java.util.concurrent.TimeUnit;
public class HDFSDataPruner {
private static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
private Date startDate;
private long firstTimeMillis;
private long lastTimeMillis;
private Path globPath;
protected FileSystem fileSystem;
private HDFSDataPruner() {
}
HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException {
this.startDate = dateAtMidnight(startDate);
this.lastTimeMillis = startDate.getTime();
this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
this.globPath = new Path(globPath);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", fsUri);
this.fileSystem = FileSystem.get(conf);
}
public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
/**
* Example
* start=$(date -d '30 days ago' +%m/%d/%Y)
* yarn jar Metron-DataLoads-0.1BETA.jar org.apache.metron.dataloads.bulk.HDFSDataPruner -f hdfs://ec2-52-36-25-217.us-west-2.compute.amazonaws.com:8020 -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
* echo ${start}
**/
Options options = new Options();
Options help = new Options();
{
Option o = new Option("h", "help", false, "This screen");
o.setRequired(false);
help.addOption(o);
}
{
Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
o.setArgName("START_DATE");
o.setRequired(true);
options.addOption(o);
}
{
Option o = new Option("f", "filesystem", true, "Filesystem uri - e.g. hdfs://host:8020 or file:///");
o.setArgName("FILESYSTEM");
o.setRequired(true);
options.addOption(o);
}
{
Option o = new Option("n", "numdays", true, "Number of days back to purge");
o.setArgName("NUMDAYS");
o.setRequired(true);
options.addOption(o);
}
{
Option o = new Option("g", "glob-string", true, "Glob filemask for files to delete - e.g. /apps/metron/enrichment/bro_doc/file-*");
o.setArgName("GLOBSTRING");
o.setRequired(true);
options.addOption(o);
}
try {
CommandLineParser parser = new PosixParser();
CommandLine cmd = null;
try {
cmd = parser.parse(help,argv,true);
if( cmd.getOptions().length > 0){
final HelpFormatter usageFormatter = new HelpFormatter();
usageFormatter.printHelp("HDFSDataPruner", null, options, null, true);
System.exit(0);
}
cmd = parser.parse(options, argv);
} catch (ParseException pe) {
final HelpFormatter usageFormatter = new HelpFormatter();
usageFormatter.printHelp("HDFSDataPruner", null, options, null, true);
System.exit(-1);
}
String start = cmd.getOptionValue("s");
Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
String fileSystemUri = cmd.getOptionValue("f");
Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
String globString = cmd.getOptionValue("g");
if(LOG.isDebugEnabled()) {
LOG.debug("Running prune with args: " + startDate + " " + numDays + " " + fileSystemUri + " " + globString);
}
HDFSDataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
LOG.info("Pruned " + pruner.prune() + " files from " + fileSystemUri + globString);
} catch (Exception e) {
e.printStackTrace();
System.exit(-1);
}
}
public Long prune() throws IOException {
Long filesPruned = new Long(0);
Date today = dateAtMidnight(new Date());
if (!today.after(startDate)) {
throw new RuntimeException("Prune Start Date must be prior to today");
}
FileStatus[] filesToDelete = fileSystem.globStatus(globPath, new HDFSDataPruner.DateFileFilter(this));
for (FileStatus fileStatus : filesToDelete) {
if (LOG.isDebugEnabled()) {
LOG.debug("Deleting File: " + fileStatus.getPath());
}
fileSystem.delete(fileStatus.getPath(), false);
filesPruned++;
}
return filesPruned;
}
private Date dateAtMidnight(Date date) {
Calendar calendar = Calendar.getInstance();
calendar.setTime(date);
calendar.set(Calendar.HOUR, 0);
calendar.set(Calendar.MINUTE, 0);
calendar.set(Calendar.SECOND, 0);
calendar.set(Calendar.MILLISECOND, 0);
return calendar.getTime();
}
class DateFileFilter extends Configured implements PathFilter {
HDFSDataPruner pruner;
Boolean failOnError = false;
DateFileFilter(HDFSDataPruner pruner) {
this.pruner = pruner;
}
DateFileFilter(HDFSDataPruner pruner, Boolean failOnError) {
this(pruner);
this.failOnError = failOnError;
}
@Override
public boolean accept(Path path) {
try {
if(pruner.LOG.isDebugEnabled()) {
pruner.LOG.debug("ACCEPT - working with file: " + path);
}
if (pruner.fileSystem.isDirectory(path)) {
return false;
}
} catch (IOException e) {
pruner.LOG.error("IOException", e);
if (failOnError) {
throw new RuntimeException(e);
}
return false;
}
try {
FileStatus file = pruner.fileSystem.getFileStatus(path);
long fileModificationTime = file.getModificationTime();
boolean accept = false;
if (fileModificationTime >= pruner.firstTimeMillis && fileModificationTime < pruner.lastTimeMillis) {
accept = true;
}
return accept;
} catch (IOException e) {
pruner.LOG.error("IOException", e);
if (failOnError) {
throw new RuntimeException(e);
}
return false;
}
}
}
}