| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.examples.pi; |
| |
| import java.io.DataInput; |
| import java.io.DataOutput; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.PrintWriter; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.concurrent.Callable; |
| |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.conf.Configured; |
| import org.apache.hadoop.examples.pi.math.Summation; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.NullWritable; |
| import org.apache.hadoop.io.Writable; |
| import org.apache.hadoop.mapred.JobTracker; |
| import org.apache.hadoop.mapreduce.Cluster; |
| import org.apache.hadoop.mapreduce.ClusterMetrics; |
| import org.apache.hadoop.mapreduce.InputFormat; |
| import org.apache.hadoop.mapreduce.InputSplit; |
| import org.apache.hadoop.mapreduce.Job; |
| import org.apache.hadoop.mapreduce.JobContext; |
| import org.apache.hadoop.mapreduce.MRJobConfig; |
| import org.apache.hadoop.mapreduce.Mapper; |
| import org.apache.hadoop.mapreduce.Partitioner; |
| import org.apache.hadoop.mapreduce.RecordReader; |
| import org.apache.hadoop.mapreduce.Reducer; |
| import org.apache.hadoop.mapreduce.TaskAttemptContext; |
| import org.apache.hadoop.mapreduce.TaskInputOutputContext; |
| import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
| import org.apache.hadoop.util.Tool; |
| import org.apache.hadoop.util.ToolRunner; |
| |
| /** |
| * The main class for computing sums using map/reduce jobs. |
| * A sum is partitioned into jobs. |
| * A job may be executed on the map-side or on the reduce-side. |
| * A map-side job has multiple maps and zero reducer. |
| * A reduce-side job has one map and multiple reducers. |
| * Depending on the clusters status in runtime, |
| * a mix-type job may be executed on either side. |
| */ |
| public final class DistSum extends Configured implements Tool { |
| private static final Log LOG = LogFactory.getLog(DistSum.class); |
| |
| private static final String NAME = DistSum.class.getSimpleName(); |
| private static final String N_PARTS = "mapreduce.pi." + NAME + ".nParts"; |
| ///////////////////////////////////////////////////////////////////////////// |
| /** DistSum job parameters */ |
| static class Parameters { |
| static final int COUNT = 6; |
| static final String LIST = "<nThreads> <nJobs> <type> <nPart> <remoteDir> <localDir>"; |
| static final String DESCRIPTION = |
| "\n <nThreads> The number of working threads." |
| + "\n <nJobs> The number of jobs per sum." |
| + "\n <type> 'm' for map side job, 'r' for reduce side job, 'x' for mix type." |
| + "\n <nPart> The number of parts per job." |
| + "\n <remoteDir> Remote directory for submitting jobs." |
| + "\n <localDir> Local directory for storing output files."; |
| |
| /** Number of worker threads */ |
| final int nThreads; |
| /** Number of jobs */ |
| final int nJobs; |
| /** Number of parts per job */ |
| final int nParts; |
| /** The machine used in the computation */ |
| final Machine machine; |
| /** The remote job directory */ |
| final String remoteDir; |
| /** The local output directory */ |
| final File localDir; |
| |
| private Parameters(Machine machine, int nThreads, int nJobs, int nParts, |
| String remoteDir, File localDir) { |
| this.machine = machine; |
| this.nThreads = nThreads; |
| this.nJobs = nJobs; |
| this.nParts = nParts; |
| this.remoteDir = remoteDir; |
| this.localDir = localDir; |
| } |
| |
| /** {@inheritDoc} */ |
| public String toString() { |
| return "\nnThreads = " + nThreads |
| + "\nnJobs = " + nJobs |
| + "\nnParts = " + nParts + " (" + machine + ")" |
| + "\nremoteDir = " + remoteDir |
| + "\nlocalDir = " + localDir; |
| } |
| |
| /** Parse parameters */ |
| static Parameters parse(String[] args, int i) { |
| if (args.length - i < COUNT) |
| throw new IllegalArgumentException("args.length - i < COUNT = " |
| + COUNT + ", args.length=" |
| + args.length + ", i=" + i + ", args=" + Arrays.asList(args)); |
| |
| final int nThreads = Integer.parseInt(args[i++]); |
| final int nJobs = Integer.parseInt(args[i++]); |
| final String type = args[i++]; |
| final int nParts = Integer.parseInt(args[i++]); |
| final String remoteDir = args[i++]; |
| final File localDir = new File(args[i++]); |
| |
| if (!"m".equals(type) && !"r".equals(type) && !"x".equals(type)) { |
| throw new IllegalArgumentException("type=" + type + " is not equal to m, r or x"); |
| } else if (nParts <= 0) { |
| throw new IllegalArgumentException("nParts = " + nParts + " <= 0"); |
| } else if (nJobs <= 0) { |
| throw new IllegalArgumentException("nJobs = " + nJobs + " <= 0"); |
| } else if (nThreads <= 0) { |
| throw new IllegalArgumentException("nThreads = " + nThreads + " <= 0"); |
| } |
| Util.checkDirectory(localDir); |
| |
| return new Parameters("m".equals(type)? MapSide.INSTANCE |
| : "r".equals(type)? ReduceSide.INSTANCE: MixMachine.INSTANCE, |
| nThreads, nJobs, nParts, remoteDir, localDir); |
| } |
| } |
| ///////////////////////////////////////////////////////////////////////////// |
| /** Abstract machine for job execution. */ |
| public static abstract class Machine { |
| /** Initialize a job */ |
| abstract void init(Job job) throws IOException; |
| |
| /** {@inheritDoc} */ |
| public String toString() {return getClass().getSimpleName();} |
| |
| /** Compute sigma */ |
| static void compute(Summation sigma, |
| TaskInputOutputContext<?, ?, NullWritable, TaskResult> context |
| ) throws IOException, InterruptedException { |
| String s; |
| LOG.info(s = "sigma=" + sigma); |
| context.setStatus(s); |
| |
| final long start = System.currentTimeMillis(); |
| sigma.compute(); |
| final long duration = System.currentTimeMillis() - start; |
| final TaskResult result = new TaskResult(sigma, duration); |
| |
| LOG.info(s = "result=" + result); |
| context.setStatus(s); |
| context.write(NullWritable.get(), result); |
| } |
| |
| /** Split for the summations */ |
| public static final class SummationSplit extends InputSplit implements Writable, Container<Summation> { |
| private final static String[] EMPTY = {}; |
| |
| private Summation sigma; |
| |
| public SummationSplit() {} |
| private SummationSplit(Summation sigma) {this.sigma = sigma;} |
| /** {@inheritDoc} */ |
| @Override |
| public Summation getElement() {return sigma;} |
| /** {@inheritDoc} */ |
| @Override |
| public long getLength() {return 1;} |
| /** {@inheritDoc} */ |
| @Override |
| public String[] getLocations() {return EMPTY;} |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void readFields(DataInput in) throws IOException { |
| sigma = SummationWritable.read(in); |
| } |
| /** {@inheritDoc} */ |
| @Override |
| public void write(DataOutput out) throws IOException { |
| new SummationWritable(sigma).write(out); |
| } |
| } |
| |
| /** An abstract InputFormat for the jobs */ |
| public static abstract class AbstractInputFormat extends InputFormat<NullWritable, SummationWritable> { |
| /** Specify how to read the records */ |
| @Override |
| public final RecordReader<NullWritable, SummationWritable> createRecordReader( |
| InputSplit generic, TaskAttemptContext context) { |
| final SummationSplit split = (SummationSplit)generic; |
| |
| //return a record reader |
| return new RecordReader<NullWritable, SummationWritable>() { |
| boolean done = false; |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void initialize(InputSplit split, TaskAttemptContext context) {} |
| /** {@inheritDoc} */ |
| @Override |
| public boolean nextKeyValue() {return !done ? done = true : false;} |
| /** {@inheritDoc} */ |
| @Override |
| public NullWritable getCurrentKey() {return NullWritable.get();} |
| /** {@inheritDoc} */ |
| @Override |
| public SummationWritable getCurrentValue() {return new SummationWritable(split.getElement());} |
| /** {@inheritDoc} */ |
| @Override |
| public float getProgress() {return done? 1f: 0f;} |
| /** {@inheritDoc} */ |
| @Override |
| public void close() {} |
| }; |
| } |
| } |
| } |
| ///////////////////////////////////////////////////////////////////////////// |
| /** |
| * A machine which does computation on the map side. |
| */ |
| public static class MapSide extends Machine { |
| private static final MapSide INSTANCE = new MapSide(); |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void init(Job job) { |
| // setup mapper |
| job.setMapperClass(SummingMapper.class); |
| job.setMapOutputKeyClass(NullWritable.class); |
| job.setMapOutputValueClass(TaskResult.class); |
| |
| // zero reducer |
| job.setNumReduceTasks(0); |
| |
| // setup input |
| job.setInputFormatClass(PartitionInputFormat.class); |
| } |
| |
| /** An InputFormat which partitions a summation */ |
| public static class PartitionInputFormat extends AbstractInputFormat { |
| /** Partitions the summation into parts and then return them as splits */ |
| @Override |
| public List<InputSplit> getSplits(JobContext context) { |
| //read sigma from conf |
| final Configuration conf = context.getConfiguration(); |
| final Summation sigma = SummationWritable.read(DistSum.class, conf); |
| final int nParts = conf.getInt(N_PARTS, 0); |
| |
| //create splits |
| final List<InputSplit> splits = new ArrayList<InputSplit>(nParts); |
| final Summation[] parts = sigma.partition(nParts); |
| for(int i = 0; i < parts.length; ++i) { |
| splits.add(new SummationSplit(parts[i])); |
| //LOG.info("parts[" + i + "] = " + parts[i]); |
| } |
| return splits; |
| } |
| } |
| |
| /** A mapper which computes sums */ |
| public static class SummingMapper extends |
| Mapper<NullWritable, SummationWritable, NullWritable, TaskResult> { |
| @Override |
| protected void map(NullWritable nw, SummationWritable sigma, final Context context |
| ) throws IOException, InterruptedException { |
| compute(sigma.getElement(), context); |
| } |
| } |
| } |
| ///////////////////////////////////////////////////////////////////////////// |
| /** |
| * A machine which does computation on the reduce side. |
| */ |
| public static class ReduceSide extends Machine { |
| private static final ReduceSide INSTANCE = new ReduceSide(); |
| |
| /** {@inheritDoc} */ |
| @Override |
| public void init(Job job) { |
| // setup mapper |
| job.setMapperClass(PartitionMapper.class); |
| job.setMapOutputKeyClass(IntWritable.class); |
| job.setMapOutputValueClass(SummationWritable.class); |
| |
| // setup partitioner |
| job.setPartitionerClass(IndexPartitioner.class); |
| |
| // setup reducer |
| job.setReducerClass(SummingReducer.class); |
| job.setOutputKeyClass(NullWritable.class); |
| job.setOutputValueClass(TaskResult.class); |
| final Configuration conf = job.getConfiguration(); |
| final int nParts = conf.getInt(N_PARTS, 1); |
| job.setNumReduceTasks(nParts); |
| |
| // setup input |
| job.setInputFormatClass(SummationInputFormat.class); |
| } |
| |
| /** An InputFormat which returns a single summation. */ |
| public static class SummationInputFormat extends AbstractInputFormat { |
| /** @return a list containing a single split of summation */ |
| @Override |
| public List<InputSplit> getSplits(JobContext context) { |
| //read sigma from conf |
| final Configuration conf = context.getConfiguration(); |
| final Summation sigma = SummationWritable.read(DistSum.class, conf); |
| |
| //create splits |
| final List<InputSplit> splits = new ArrayList<InputSplit>(1); |
| splits.add(new SummationSplit(sigma)); |
| return splits; |
| } |
| } |
| |
| /** A Mapper which partitions a summation */ |
| public static class PartitionMapper extends |
| Mapper<NullWritable, SummationWritable, IntWritable, SummationWritable> { |
| /** Partitions sigma into parts */ |
| @Override |
| protected void map(NullWritable nw, SummationWritable sigma, final Context context |
| ) throws IOException, InterruptedException { |
| final Configuration conf = context.getConfiguration(); |
| final int nParts = conf.getInt(N_PARTS, 0); |
| final Summation[] parts = sigma.getElement().partition(nParts); |
| for(int i = 0; i < parts.length; ++i) { |
| context.write(new IntWritable(i), new SummationWritable(parts[i])); |
| LOG.info("parts[" + i + "] = " + parts[i]); |
| } |
| } |
| } |
| |
| /** Use the index for partitioning. */ |
| public static class IndexPartitioner extends Partitioner<IntWritable, SummationWritable> { |
| /** Return the index as the partition. */ |
| @Override |
| public int getPartition(IntWritable index, SummationWritable value, int numPartitions) { |
| return index.get(); |
| } |
| } |
| |
| /** A Reducer which computes sums */ |
| public static class SummingReducer extends |
| Reducer<IntWritable, SummationWritable, NullWritable, TaskResult> { |
| @Override |
| protected void reduce(IntWritable index, Iterable<SummationWritable> sums, |
| Context context) throws IOException, InterruptedException { |
| LOG.info("index=" + index); |
| for(SummationWritable sigma : sums) |
| compute(sigma.getElement(), context); |
| } |
| } |
| } |
| ///////////////////////////////////////////////////////////////////////////// |
| /** |
| * A machine which chooses Machine in runtime according to the cluster status |
| */ |
| public static class MixMachine extends Machine { |
| private static final MixMachine INSTANCE = new MixMachine(); |
| |
| private Cluster cluster; |
| |
| /** {@inheritDoc} */ |
| @Override |
| public synchronized void init(Job job) throws IOException { |
| final Configuration conf = job.getConfiguration(); |
| if (cluster == null) |
| cluster = new Cluster(JobTracker.getAddress(conf), conf); |
| chooseMachine(conf).init(job); |
| } |
| |
| /** |
| * Choose a Machine in runtime according to the cluster status. |
| */ |
| private Machine chooseMachine(Configuration conf) throws IOException { |
| final int parts = conf.getInt(N_PARTS, Integer.MAX_VALUE); |
| try { |
| for(;; Thread.sleep(2000)) { |
| //get cluster status |
| final ClusterMetrics status = cluster.getClusterStatus(); |
| final int m = |
| status.getMapSlotCapacity() - status.getOccupiedMapSlots(); |
| final int r = |
| status.getReduceSlotCapacity() - status.getOccupiedReduceSlots(); |
| if (m >= parts || r >= parts) { |
| //favor ReduceSide machine |
| final Machine value = r >= parts? |
| ReduceSide.INSTANCE: MapSide.INSTANCE; |
| Util.out.println(" " + this + " is " + value + " (m=" + m + ", r=" + r + ")"); |
| return value; |
| } |
| } |
| } catch (InterruptedException e) { |
| throw new IOException(e); |
| } |
| } |
| |
| } |
| ///////////////////////////////////////////////////////////////////////////// |
| private final Util.Timer timer = new Util.Timer(true); |
| private Parameters parameters; |
| |
| /** Get Parameters */ |
| Parameters getParameters() {return parameters;} |
| /** Set Parameters */ |
| void setParameters(Parameters p) {parameters = p;} |
| |
| /** Create a job */ |
| private Job createJob(String name, Summation sigma) throws IOException { |
| final Job job = new Job(getConf(), parameters.remoteDir + "/" + name); |
| final Configuration jobconf = job.getConfiguration(); |
| job.setJarByClass(DistSum.class); |
| jobconf.setInt(N_PARTS, parameters.nParts); |
| SummationWritable.write(sigma, DistSum.class, jobconf); |
| |
| // disable task timeout |
| jobconf.setLong(MRJobConfig.TASK_TIMEOUT, 0); |
| // do not use speculative execution |
| jobconf.setBoolean(MRJobConfig.MAP_SPECULATIVE, false); |
| jobconf.setBoolean(MRJobConfig.REDUCE_SPECULATIVE, false); |
| |
| return job; |
| } |
| |
| /** Start a job to compute sigma */ |
| private void compute(final String name, Summation sigma) throws IOException { |
| if (sigma.getValue() != null) |
| throw new IOException("sigma.getValue() != null, sigma=" + sigma); |
| |
| //setup remote directory |
| final FileSystem fs = FileSystem.get(getConf()); |
| final Path dir = fs.makeQualified(new Path(parameters.remoteDir, name)); |
| if (!Util.createNonexistingDirectory(fs, dir)) |
| return; |
| |
| //setup a job |
| final Job job = createJob(name, sigma); |
| final Path outdir = new Path(dir, "out"); |
| FileOutputFormat.setOutputPath(job, outdir); |
| |
| //start a map/reduce job |
| final String startmessage = "steps/parts = " |
| + sigma.E.getSteps() + "/" + parameters.nParts |
| + " = " + Util.long2string(sigma.E.getSteps()/parameters.nParts); |
| Util.runJob(name, job, parameters.machine, startmessage, timer); |
| final List<TaskResult> results = Util.readJobOutputs(fs, outdir); |
| Util.writeResults(name, results, fs, parameters.remoteDir); |
| fs.delete(dir, true); |
| |
| //combine results |
| final List<TaskResult> combined = Util.combine(results); |
| final PrintWriter out = Util.createWriter(parameters.localDir, name); |
| try { |
| for(TaskResult r : combined) { |
| final String s = taskResult2string(name, r); |
| out.println(s); |
| out.flush(); |
| Util.out.println(s); |
| } |
| } finally { |
| out.close(); |
| } |
| if (combined.size() == 1) { |
| final Summation s = combined.get(0).getElement(); |
| if (sigma.contains(s) && s.contains(sigma)) |
| sigma.setValue(s.getValue()); |
| } |
| } |
| |
| /** Convert a TaskResult to a String */ |
| public static String taskResult2string(String name, TaskResult result) { |
| return NAME + " " + name + "> " + result; |
| } |
| |
| /** Convert a String to a (String, TaskResult) pair */ |
| public static Map.Entry<String, TaskResult> string2TaskResult(final String s) { |
| // LOG.info("line = " + line); |
| int j = s.indexOf(NAME); |
| if (j == 0) { |
| int i = j + NAME.length() + 1; |
| j = s.indexOf("> ", i); |
| final String key = s.substring(i, j); |
| final TaskResult value = TaskResult.valueOf(s.substring(j + 2)); |
| return new Map.Entry<String, TaskResult>(){ |
| @Override |
| public String getKey() {return key;} |
| @Override |
| public TaskResult getValue() {return value;} |
| @Override |
| public TaskResult setValue(TaskResult value) { |
| throw new UnsupportedOperationException(); |
| } |
| }; |
| } |
| return null; |
| } |
| |
| /** Callable computation */ |
| class Computation implements Callable<Computation> { |
| private final int index; |
| private final String name; |
| private final Summation sigma; |
| |
| Computation(int index, String name, Summation sigma) { |
| this.index = index; |
| this.name = name; |
| this.sigma = sigma; |
| } |
| |
| /** @return The job name */ |
| String getJobName() {return String.format("%s.job%03d", name, index);} |
| |
| /** {@inheritDoc} */ |
| @Override |
| public String toString() {return getJobName() + sigma;} |
| |
| /** Start the computation */ |
| @Override |
| public Computation call() { |
| if (sigma.getValue() == null) |
| try { |
| compute(getJobName(), sigma); |
| } catch(Exception e) { |
| Util.out.println("ERROR: Got an exception from " + getJobName()); |
| e.printStackTrace(Util.out); |
| } |
| return this; |
| } |
| } |
| |
| /** Partition sigma and execute the computations. */ |
| private Summation execute(String name, Summation sigma) { |
| final Summation[] summations = sigma.partition(parameters.nJobs); |
| final List<Computation> computations = new ArrayList<Computation>(); |
| for(int i = 0; i < summations.length; i++) |
| computations.add(new Computation(i, name, summations[i])); |
| try { |
| Util.execute(parameters.nThreads, computations); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| |
| final List<Summation> combined = Util.combine(Arrays.asList(summations)); |
| return combined.size() == 1? combined.get(0): null; |
| } |
| |
| /** {@inheritDoc} */ |
| @Override |
| public int run(String[] args) throws Exception { |
| //parse arguments |
| if (args.length != Parameters.COUNT + 2) |
| return Util.printUsage(args, getClass().getName() |
| + " <name> <sigma> " + Parameters.LIST |
| + "\n <name> The name." |
| + "\n <sigma> The summation." |
| + Parameters.DESCRIPTION); |
| |
| int i = 0; |
| final String name = args[i++]; |
| final Summation sigma = Summation.valueOf(args[i++]); |
| setParameters(DistSum.Parameters.parse(args, i)); |
| |
| Util.out.println(); |
| Util.out.println("name = " + name); |
| Util.out.println("sigma = " + sigma); |
| Util.out.println(parameters); |
| Util.out.println(); |
| |
| //run jobs |
| final Summation result = execute(name, sigma); |
| if (result.equals(sigma)) { |
| sigma.setValue(result.getValue()); |
| timer.tick("\n\nDONE\n\nsigma=" + sigma); |
| return 0; |
| } else { |
| timer.tick("\n\nDONE WITH ERROR\n\nresult=" + result); |
| return 1; |
| } |
| } |
| |
| /** main */ |
| public static void main(String[] args) throws Exception { |
| System.exit(ToolRunner.run(null, new DistSum(), args)); |
| } |
| } |