blob: 17a026586e972daf07b7ded363a9921f391bd7d7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.raid;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.Date;
import java.text.SimpleDateFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileRecordReader;
import org.apache.hadoop.raid.RaidNode.Statistics;
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.util.StringUtils;
public class DistRaid {
protected static final Log LOG = LogFactory.getLog(DistRaid.class);
static final String NAME = "distRaid";
static final String JOB_DIR_LABEL = NAME + ".job.dir";
static final String OP_LIST_LABEL = NAME + ".op.list";
static final String OP_COUNT_LABEL = NAME + ".op.count";
static final int OP_LIST_BLOCK_SIZE = 32 * 1024 * 1024; // block size of control file
static final short OP_LIST_REPLICATION = 10; // replication factor of control file
private static final long OP_PER_MAP = 100;
private static final int MAX_MAPS_PER_NODE = 20;
private static final int SYNC_FILE_MAX = 10;
private static final SimpleDateFormat dateForm = new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static String jobName = NAME;
static enum Counter {
FILES_SUCCEEDED, FILES_FAILED, PROCESSED_BLOCKS, PROCESSED_SIZE, META_BLOCKS, META_SIZE
}
protected JobConf jobconf;
/** {@inheritDoc} */
public void setConf(Configuration conf) {
if (jobconf != conf) {
jobconf = conf instanceof JobConf ? (JobConf) conf : new JobConf(conf);
}
}
/** {@inheritDoc} */
public JobConf getConf() {
return jobconf;
}
public DistRaid(Configuration conf) {
setConf(createJobConf(conf));
}
private static final Random RANDOM = new Random();
protected static String getRandomId() {
return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
}
/**
*
* helper class which holds the policy and paths
*
*/
public static class RaidPolicyPathPair {
public PolicyInfo policy;
public List<FileStatus> srcPaths;
RaidPolicyPathPair(PolicyInfo policy, List<FileStatus> srcPaths) {
this.policy = policy;
this.srcPaths = srcPaths;
}
}
List<RaidPolicyPathPair> raidPolicyPathPairList = new ArrayList<RaidPolicyPathPair>();
/** Responsible for generating splits of the src file list. */
static class DistRaidInputFormat implements InputFormat<Text, PolicyInfo> {
/** Do nothing. */
public void validateInput(JobConf job) {
}
/**
* Produce splits such that each is no greater than the quotient of the
* total size and the number of splits requested.
*
* @param job
* The handle to the JobConf object
* @param numSplits
* Number of splits requested
*/
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
final int targetcount = srcCount / numSplits;
String srclist = job.get(OP_LIST_LABEL, "");
if (srcCount < 0 || "".equals(srclist)) {
throw new RuntimeException("Invalid metadata: #files(" + srcCount
+ ") listuri(" + srclist + ")");
}
Path srcs = new Path(srclist);
FileSystem fs = srcs.getFileSystem(job);
List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
Text key = new Text();
PolicyInfo value = new PolicyInfo();
SequenceFile.Reader in = null;
long prev = 0L;
int count = 0; // count src
try {
for (in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value);) {
long curr = in.getPosition();
long delta = curr - prev;
if (++count > targetcount) {
count = 0;
splits.add(new FileSplit(srcs, prev, delta, (String[]) null));
prev = curr;
}
}
} finally {
in.close();
}
long remaining = fs.getFileStatus(srcs).getLen() - prev;
if (remaining != 0) {
splits.add(new FileSplit(srcs, prev, remaining, (String[]) null));
}
LOG.info("jobname= " + jobName + " numSplits=" + numSplits +
", splits.size()=" + splits.size());
return splits.toArray(new FileSplit[splits.size()]);
}
/** {@inheritDoc} */
public RecordReader<Text, PolicyInfo> getRecordReader(InputSplit split,
JobConf job, Reporter reporter) throws IOException {
return new SequenceFileRecordReader<Text, PolicyInfo>(job,
(FileSplit) split);
}
}
/** The mapper for raiding files. */
static class DistRaidMapper implements
Mapper<Text, PolicyInfo, WritableComparable, Text> {
private JobConf jobconf;
private boolean ignoreFailures;
private int failcount = 0;
private int succeedcount = 0;
private Statistics st = null;
private String getCountString() {
return "Succeeded: " + succeedcount + " Failed: " + failcount;
}
/** {@inheritDoc} */
public void configure(JobConf job) {
this.jobconf = job;
ignoreFailures = false;
st = new Statistics();
}
/** Run a FileOperation */
public void map(Text key, PolicyInfo policy,
OutputCollector<WritableComparable, Text> out, Reporter reporter)
throws IOException {
try {
LOG.info("Raiding file=" + key.toString() + " policy=" + policy);
Path p = new Path(key.toString());
FileStatus fs = p.getFileSystem(jobconf).getFileStatus(p);
st.clear();
RaidNode.doRaid(jobconf, policy, fs, st, reporter);
++succeedcount;
reporter.incrCounter(Counter.PROCESSED_BLOCKS, st.numProcessedBlocks);
reporter.incrCounter(Counter.PROCESSED_SIZE, st.processedSize);
reporter.incrCounter(Counter.META_BLOCKS, st.numMetaBlocks);
reporter.incrCounter(Counter.META_SIZE, st.metaSize);
reporter.incrCounter(Counter.FILES_SUCCEEDED, 1);
} catch (IOException e) {
++failcount;
reporter.incrCounter(Counter.FILES_FAILED, 1);
String s = "FAIL: " + policy + ", " + key + " "
+ StringUtils.stringifyException(e);
out.collect(null, new Text(s));
LOG.info(s);
} finally {
reporter.setStatus(getCountString());
}
}
/** {@inheritDoc} */
public void close() throws IOException {
if (failcount == 0 || ignoreFailures) {
return;
}
throw new IOException(getCountString());
}
}
/**
* create new job conf based on configuration passed.
*
* @param conf
* @return
*/
private static JobConf createJobConf(Configuration conf) {
JobConf jobconf = new JobConf(conf, DistRaid.class);
jobName = NAME + " " + dateForm.format(new Date(RaidNode.now()));
jobconf.setJobName(jobName);
jobconf.setMapSpeculativeExecution(false);
jobconf.setJarByClass(DistRaid.class);
jobconf.setInputFormat(DistRaidInputFormat.class);
jobconf.setOutputKeyClass(Text.class);
jobconf.setOutputValueClass(Text.class);
jobconf.setMapperClass(DistRaidMapper.class);
jobconf.setNumReduceTasks(0);
return jobconf;
}
/** Add paths to be raided */
public void addRaidPaths(PolicyInfo info, List<FileStatus> paths) {
raidPolicyPathPairList.add(new RaidPolicyPathPair(info, paths));
}
/** Calculate how many maps to run. */
private static int getMapCount(int srcCount, int numNodes) {
int numMaps = (int) (srcCount / OP_PER_MAP);
numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
return Math.max(numMaps, 1);
}
/** invokes mapred job do parallel raiding */
public void doDistRaid() throws IOException {
if (raidPolicyPathPairList.size() == 0) {
LOG.info("DistRaid has no paths to raid.");
return;
}
try {
if (setup()) {
JobClient.runJob(jobconf);
}
} finally {
// delete job directory
final String jobdir = jobconf.get(JOB_DIR_LABEL);
if (jobdir != null) {
final Path jobpath = new Path(jobdir);
jobpath.getFileSystem(jobconf).delete(jobpath, true);
}
}
raidPolicyPathPairList.clear();
}
/**
* set up input file which has the list of input files.
*
* @return boolean
* @throws IOException
*/
private boolean setup() throws IOException {
final String randomId = getRandomId();
JobClient jClient = new JobClient(jobconf);
Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
LOG.info(JOB_DIR_LABEL + "=" + jobdir);
jobconf.set(JOB_DIR_LABEL, jobdir.toString());
Path log = new Path(jobdir, "_logs");
// The control file should have small size blocks. This helps
// in spreading out the load from mappers that will be spawned.
jobconf.setInt("dfs.blocks.size", OP_LIST_BLOCK_SIZE);
FileOutputFormat.setOutputPath(jobconf, log);
LOG.info("log=" + log);
// create operation list
FileSystem fs = jobdir.getFileSystem(jobconf);
Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
jobconf.set(OP_LIST_LABEL, opList.toString());
int opCount = 0, synCount = 0;
SequenceFile.Writer opWriter = null;
try {
opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
PolicyInfo.class, SequenceFile.CompressionType.NONE);
for (RaidPolicyPathPair p : raidPolicyPathPairList) {
for (FileStatus st : p.srcPaths) {
opWriter.append(new Text(st.getPath().toString()), p.policy);
opCount++;
if (++synCount > SYNC_FILE_MAX) {
opWriter.sync();
synCount = 0;
}
}
}
} finally {
if (opWriter != null) {
opWriter.close();
}
fs.setReplication(opList, OP_LIST_REPLICATION); // increase replication for control file
}
raidPolicyPathPairList.clear();
jobconf.setInt(OP_COUNT_LABEL, opCount);
LOG.info("Number of files=" + opCount);
jobconf.setNumMapTasks(getMapCount(opCount, new JobClient(jobconf)
.getClusterStatus().getTaskTrackers()));
LOG.info("jobName= " + jobName + " numMapTasks=" + jobconf.getNumMapTasks());
return opCount != 0;
}
}