blob: 50e622d1e4a96db76c3f433a0d16dab07110cbf8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.tools;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Stack;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.mapreduce.JobSubmissionFiles;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
/**
* A Map-reduce program to recursively change files properties
* such as owner, group and permission.
*/
public class DistCh extends DistTool {
static final String NAME = "distch";
static final String JOB_DIR_LABEL = NAME + ".job.dir";
static final String OP_LIST_LABEL = NAME + ".op.list";
static final String OP_COUNT_LABEL = NAME + ".op.count";
static final String USAGE = "java " + DistCh.class.getName()
+ " [OPTIONS] <path:owner:group:permission>+ "
+ "\n\nThe values of owner, group and permission can be empty."
+ "\nPermission is a octal number."
+ "\n\nOPTIONS:"
+ "\n-f <urilist_uri> Use list at <urilist_uri> as src list"
+ "\n-i Ignore failures"
+ "\n-log <logdir> Write logs to <logdir>"
;
private static final long OP_PER_MAP = 1000;
private static final int MAX_MAPS_PER_NODE = 20;
private static final int SYNC_FILE_MAX = 10;
static enum Counter { SUCCEED, FAIL }
static enum Option {
IGNORE_FAILURES("-i", NAME + ".ignore.failures");
final String cmd, propertyname;
private Option(String cmd, String propertyname) {
this.cmd = cmd;
this.propertyname = propertyname;
}
}
DistCh(Configuration conf) {
super(createJobConf(conf));
}
private static JobConf createJobConf(Configuration conf) {
JobConf jobconf = new JobConf(conf, DistCh.class);
jobconf.setJobName(NAME);
jobconf.setMapSpeculativeExecution(false);
jobconf.setInputFormat(ChangeInputFormat.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(Text.class);
jobconf.setMapperClass(ChangeFilesMapper.class);
jobconf.setNumReduceTasks(0);
return jobconf;
}
/** File operations. */
static class FileOperation implements Writable {
private Path src;
private String owner;
private String group;
private FsPermission permission;
FileOperation() {}
FileOperation(Path src, FileOperation that) {
this.src = src;
this.owner = that.owner;
this.group = that.group;
this.permission = that.permission;
checkState();
}
/**
* path:owner:group:permission
* e.g.
* /user/foo:foo:bar:700
*/
FileOperation(String line) {
try {
String[] t = line.split(":", 4);
for(int i = 0; i < t.length; i++) {
if ("".equals(t[i])) {
t[i] = null;
}
}
src = new Path(t[0]);
owner = t[1];
group = t[2];
permission = t[3] == null? null:
new FsPermission(Short.parseShort(t[3], 8));
checkState();
}
catch(Exception e) {
throw (IllegalArgumentException)new IllegalArgumentException(
"line=" + line).initCause(e);
}
}
private void checkState() throws IllegalStateException {
if (owner == null && group == null && permission == null) {
throw new IllegalStateException(
"owner == null && group == null && permission == null");
}
}
static final FsPermission FILE_UMASK
= FsPermission.createImmutable((short)0111);
private boolean isDifferent(FileStatus original) {
if (owner != null && !owner.equals(original.getOwner())) {
return true;
}
if (group != null && !group.equals(original.getGroup())) {
return true;
}
if (permission != null) {
FsPermission orig = original.getPermission();
return original.isDir()? !permission.equals(orig):
!permission.applyUMask(FILE_UMASK).equals(orig);
}
return false;
}
void run(Configuration conf) throws IOException {
FileSystem fs = src.getFileSystem(conf);
if (permission != null) {
fs.setPermission(src, permission);
}
if (owner != null || group != null) {
fs.setOwner(src, owner, group);
}
}
/** {@inheritDoc} */
public void readFields(DataInput in) throws IOException {
this.src = new Path(Text.readString(in));
owner = DistTool.readString(in);
group = DistTool.readString(in);
permission = in.readBoolean()? FsPermission.read(in): null;
}
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
Text.writeString(out, src.toString());
DistTool.writeString(out, owner);
DistTool.writeString(out, group);
boolean b = permission != null;
out.writeBoolean(b);
if (b) {permission.write(out);}
}
/** {@inheritDoc} */
public String toString() {
return src + ":" + owner + ":" + group + ":" + permission;
}
}
/** Responsible for generating splits of the src file list. */
static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
/** Do nothing. */
public void validateInput(JobConf job) {}
/**
* Produce splits such that each is no greater than the quotient of the
* total size and the number of splits requested.
* @param job The handle to the JobConf object
* @param numSplits Number of splits requested
*/
public InputSplit[] getSplits(JobConf job, int numSplits
) throws IOException {
final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
final int targetcount = srcCount / numSplits;
String srclist = job.get(OP_LIST_LABEL, "");
if (srcCount < 0 || "".equals(srclist)) {
throw new RuntimeException("Invalid metadata: #files(" + srcCount +
") listuri(" + srclist + ")");
}
Path srcs = new Path(srclist);
FileSystem fs = srcs.getFileSystem(job);
List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
Text key = new Text();
FileOperation value = new FileOperation();
SequenceFile.Reader in = null;
long prev = 0L;
int count = 0; //count src
try {
for(in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value); ) {
long curr = in.getPosition();
long delta = curr - prev;
if (++count > targetcount) {
count = 0;
splits.add(new FileSplit(srcs, prev, delta, (String[])null));
prev = curr;
}
}
}
finally {
in.close();
}
long remaining = fs.getFileStatus(srcs).getLen() - prev;
if (remaining != 0) {
splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
}
LOG.info("numSplits=" + numSplits + ", splits.size()=" + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}
/** {@inheritDoc} */
public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return new SequenceFileRecordReader<Text, FileOperation>(job,
(FileSplit)split);
}
}
/** The mapper for changing files. */
static class ChangeFilesMapper
implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
private JobConf jobconf;
private boolean ignoreFailures;
private int failcount = 0;
private int succeedcount = 0;
private String getCountString() {
return "Succeeded: " + succeedcount + " Failed: " + failcount;
}
/** {@inheritDoc} */
public void configure(JobConf job) {
this.jobconf = job;
ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
}
/** Run a FileOperation */
public void map(Text key, FileOperation value,
OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
) throws IOException {
try {
value.run(jobconf);
++succeedcount;
reporter.incrCounter(Counter.SUCCEED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FAIL, 1);
String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
/** {@inheritDoc} */
public void close() throws IOException {
if (failcount == 0 || ignoreFailures) {
return;
}
throw new IOException(getCountString());
}
}
private static void check(Configuration conf, List<FileOperation> ops
) throws InvalidInputException {
List<Path> srcs = new ArrayList<Path>();
for(FileOperation op : ops) {
srcs.add(op.src);
}
DistTool.checkSource(conf, srcs);
}
private static List<FileOperation> fetchList(Configuration conf, Path inputfile
) throws IOException {
List<FileOperation> result = new ArrayList<FileOperation>();
for(String line : readFile(conf, inputfile)) {
result.add(new FileOperation(line));
}
return result;
}
/** This is the main driver for recursively changing files properties. */
public int run(String[] args) throws Exception {
List<FileOperation> ops = new ArrayList<FileOperation>();
Path logpath = null;
boolean isIgnoreFailures = false;
try {
for (int idx = 0; idx < args.length; idx++) {
if ("-f".equals(args[idx])) {
if (++idx == args.length) {
System.out.println("urilist_uri not specified");
System.out.println(USAGE);
return -1;
}
ops.addAll(fetchList(jobconf, new Path(args[idx])));
} else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
isIgnoreFailures = true;
} else if ("-log".equals(args[idx])) {
if (++idx == args.length) {
System.out.println("logdir not specified");
System.out.println(USAGE);
return -1;
}
logpath = new Path(args[idx]);
} else if ('-' == args[idx].codePointAt(0)) {
System.out.println("Invalid switch " + args[idx]);
System.out.println(USAGE);
ToolRunner.printGenericCommandUsage(System.out);
return -1;
} else {
ops.add(new FileOperation(args[idx]));
}
}
// mandatory command-line parameters
if (ops.isEmpty()) {
throw new IllegalStateException("Operation is empty");
}
LOG.info("ops=" + ops);
LOG.info("isIgnoreFailures=" + isIgnoreFailures);
jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
check(jobconf, ops);
try {
if (setup(ops, logpath)) {
JobClient.runJob(jobconf);
}
} finally {
try {
if (logpath == null) {
//delete log directory
final Path logdir = FileOutputFormat.getOutputPath(jobconf);
if (logdir != null) {
logdir.getFileSystem(jobconf).delete(logdir, true);
}
}
}
finally {
//delete job directory
final String jobdir = jobconf.get(JOB_DIR_LABEL);
if (jobdir != null) {
final Path jobpath = new Path(jobdir);
jobpath.getFileSystem(jobconf).delete(jobpath, true);
}
}
}
} catch(DuplicationException e) {
LOG.error("Input error:", e);
return DuplicationException.ERROR_CODE;
} catch(Exception e) {
LOG.error(NAME + " failed: ", e);
System.out.println(USAGE);
ToolRunner.printGenericCommandUsage(System.out);
return -1;
}
return 0;
}
/** Calculate how many maps to run. */
private static int getMapCount(int srcCount, int numNodes) {
int numMaps = (int)(srcCount / OP_PER_MAP);
numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
return Math.max(numMaps, 1);
}
private boolean setup(List<FileOperation> ops, Path log) throws IOException {
final String randomId = getRandomId();
JobClient jClient = new JobClient(jobconf);
Path stagingArea;
try {
stagingArea = JobSubmissionFiles.getStagingDir(
jClient, jobconf);
} catch (InterruptedException e) {
throw new IOException(e);
}
Path jobdir = new Path(stagingArea + NAME + "_" + randomId);
FsPermission mapredSysPerms =
new FsPermission(JobSubmissionFiles.JOB_DIR_PERMISSION);
FileSystem.mkdirs(jClient.getFs(), jobdir, mapredSysPerms);
LOG.info(JOB_DIR_LABEL + "=" + jobdir);
if (log == null) {
log = new Path(jobdir, "_logs");
}
FileOutputFormat.setOutputPath(jobconf, log);
LOG.info("log=" + log);
//create operation list
FileSystem fs = jobdir.getFileSystem(jobconf);
Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
jobconf.set(OP_LIST_LABEL, opList.toString());
int opCount = 0, synCount = 0;
SequenceFile.Writer opWriter = null;
try {
opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
FileOperation.class, SequenceFile.CompressionType.NONE);
for(FileOperation op : ops) {
FileStatus srcstat = fs.getFileStatus(op.src);
if (srcstat.isDir() && op.isDifferent(srcstat)) {
++opCount;
opWriter.append(new Text(op.src.toString()), op);
}
Stack<Path> pathstack = new Stack<Path>();
for(pathstack.push(op.src); !pathstack.empty(); ) {
for(FileStatus stat : fs.listStatus(pathstack.pop())) {
if (stat.isDir()) {
pathstack.push(stat.getPath());
}
if (op.isDifferent(stat)) {
++opCount;
if (++synCount > SYNC_FILE_MAX) {
opWriter.sync();
synCount = 0;
}
Path f = stat.getPath();
opWriter.append(new Text(f.toString()), new FileOperation(f, op));
}
}
}
}
} finally {
opWriter.close();
}
checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
jobconf.setInt(OP_COUNT_LABEL, opCount);
LOG.info(OP_COUNT_LABEL + "=" + opCount);
jobconf.setNumMapTasks(getMapCount(opCount,
new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
return opCount != 0;
}
private static void checkDuplication(FileSystem fs, Path file, Path sorted,
Configuration conf) throws IOException {
SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
new Text.Comparator(), Text.class, FileOperation.class, conf);
sorter.sort(file, sorted);
SequenceFile.Reader in = null;
try {
in = new SequenceFile.Reader(fs, sorted, conf);
FileOperation curop = new FileOperation();
Text prevsrc = null, cursrc = new Text();
for(; in.next(cursrc, curop); ) {
if (prevsrc != null && cursrc.equals(prevsrc)) {
throw new DuplicationException(
"Invalid input, there are duplicated files in the sources: "
+ prevsrc + ", " + cursrc);
}
prevsrc = cursrc;
cursrc = new Text();
curop = new FileOperation();
}
}
finally {
in.close();
}
}
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
}
}