blob: 9cbc767cd0b608ec9b52f11b81f26a1e2c14b446 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce;
import java.io.IOException;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.examples.Sort;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.streaming.StreamJob;
public class GridMixRunner {
private static final int NUM_OF_LARGE_JOBS_PER_CLASS = 0;
private static final int NUM_OF_MEDIUM_JOBS_PER_CLASS = 0;
private static final int NUM_OF_SMALL_JOBS_PER_CLASS = 0;
private static final int NUM_OF_REDUCERS_FOR_SMALL_JOB = 15;
private static final int NUM_OF_REDUCERS_FOR_MEDIUM_JOB = 170;
private static final int NUM_OF_REDUCERS_FOR_LARGE_JOB = 370;
private static final String GRID_MIX_DATA = "/gridmix/data";
private static final String VARCOMPSEQ =
GRID_MIX_DATA + "/WebSimulationBlockCompressed";
private static final String FIXCOMPSEQ =
GRID_MIX_DATA + "/MonsterQueryBlockCompressed";
private static final String VARINFLTEXT =
GRID_MIX_DATA + "/SortUncompressed";
private static final String GRIDMIXCONFIG = "gridmix_config.xml";
private static final Configuration config = initConfig();
private static final FileSystem fs = initFs();
private final JobControl gridmix;
private int numOfJobs = 0;
private enum Size {
SMALL("small", // name
"/{part-*-00000,part-*-00001,part-*-00002}", // default input subset
NUM_OF_SMALL_JOBS_PER_CLASS, // defuault num jobs
NUM_OF_REDUCERS_FOR_SMALL_JOB), // default num reducers
MEDIUM("medium", // name
"/{part-*-000*0, part-*-000*1, part-*-000*2}", // default input subset
NUM_OF_MEDIUM_JOBS_PER_CLASS, // defuault num jobs
NUM_OF_REDUCERS_FOR_MEDIUM_JOB), // default num reducers
LARGE("large", // name
"", // default input subset
NUM_OF_LARGE_JOBS_PER_CLASS, // defuault num jobs
NUM_OF_REDUCERS_FOR_LARGE_JOB); // default num reducers
private final String str;
private final String path;
private final int numJobs;
private final int numReducers;
Size(String str, String path, int numJobs, int numReducers) {
this.str = str;
this.path = path;
this.numJobs = numJobs;
this.numReducers = numReducers;
}
public String defaultPath(String base) {
return base + path;
}
public int defaultNumJobs() {
return numJobs;
}
public int defaultNumReducers() {
return numReducers;
}
public String toString() {
return str;
}
}
private enum GridMixJob {
STREAMSORT("streamSort") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("streamSort.%sJobs.inputFiles", size);
final String indir =
getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
final String outdir = addTSSuffix("perf-out/stream-out-dir-" + size);
StringBuffer sb = new StringBuffer();
sb.append("-input ").append(indir).append(" ");
sb.append("-output ").append(outdir).append(" ");
sb.append("-mapper cat ");
sb.append("-reducer cat ");
sb.append("-numReduceTasks ").append(numReducers);
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
Configuration conf = StreamJob.createJob(args);
conf.setBoolean(FileOutputFormat.COMPRESS, outputCompressed);
conf.setBoolean(JobContext.MAP_OUTPUT_COMPRESS, mapoutputCompressed);
Job job = new Job(conf, "GridmixStreamingSorter." + size);
ControlledJob cjob = new ControlledJob(job, null);
gridmix.addJob(cjob);
} catch (Exception ex) {
ex.printStackTrace();
}
}
},
JAVASORT("javaSort") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("javaSort.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop,
size.defaultPath(VARINFLTEXT));
final String outdir = addTSSuffix("perf-out/sort-out-dir-" + size);
clearDir(outdir);
try {
Configuration conf = new Configuration();
conf.setBoolean(FileOutputFormat.COMPRESS, outputCompressed);
conf.setBoolean(JobContext.MAP_OUTPUT_COMPRESS, mapoutputCompressed);
Job job = new Job(conf);
job.setJarByClass(Sort.class);
job.setJobName("GridmixJavaSorter." + size);
job.setMapperClass(Mapper.class);
job.setReducerClass(Reducer.class);
job.setNumReduceTasks(numReducers);
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(org.apache.hadoop.io.Text.class);
job.setOutputValueClass(org.apache.hadoop.io.Text.class);
FileInputFormat.addInputPaths(job, indir);
FileOutputFormat.setOutputPath(job, new Path(outdir));
ControlledJob cjob = new ControlledJob(job, null);
gridmix.addJob(cjob);
} catch (Exception ex) {
ex.printStackTrace();
}
}
},
WEBDATASCAN("webdataScan") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("webdataScan.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-"
+ size);
StringBuffer sb = new StringBuffer();
sb.append("-keepmap 0.2 ");
sb.append("-keepred 5 ");
sb.append("-inFormat");
sb.append(" org.apache.hadoop.mapreduce." +
"lib.input.SequenceFileInputFormat ");
sb.append("-outFormat");
sb.append(" org.apache.hadoop.mapreduce." +
"lib.output.SequenceFileOutputFormat ");
sb.append("-outKey org.apache.hadoop.io.Text ");
sb.append("-outValue org.apache.hadoop.io.Text ");
sb.append("-indir ").append(indir).append(" ");
sb.append("-outdir ").append(outdir).append(" ");
sb.append("-r ").append(numReducers);
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
Job job = GenericMRLoadJobCreator.createJob(
args, mapoutputCompressed, outputCompressed);
job.setJobName("GridmixWebdatascan." + size);
ControlledJob cjob = new ControlledJob(job, null);
gridmix.addJob(cjob);
} catch (Exception ex) {
System.out.println(ex.getStackTrace());
}
}
},
COMBINER("combiner") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("combiner.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
final String outdir = addTSSuffix("perf-out/combiner-out-dir-" + size);
StringBuffer sb = new StringBuffer();
sb.append("-r ").append(numReducers).append(" ");
sb.append("-indir ").append(indir).append(" ");
sb.append("-outdir ").append(outdir);
sb.append("-mapoutputCompressed ");
sb.append(mapoutputCompressed).append(" ");
sb.append("-outputCompressed ").append(outputCompressed);
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
Job job = CombinerJobCreator.createJob(args);
job.setJobName("GridmixCombinerJob." + size);
ControlledJob cjob = new ControlledJob(job, null);
gridmix.addJob(cjob);
} catch (Exception ex) {
ex.printStackTrace();
}
}
},
MONSTERQUERY("monsterQuery") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop =
String.format("monsterQuery.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(FIXCOMPSEQ));
final String outdir = addTSSuffix("perf-out/mq-out-dir-" + size);
int iter = 3;
try {
ControlledJob pjob = null;
ControlledJob cjob = null;
for (int i = 0; i < iter; i++) {
String outdirfull = outdir + "." + i;
String indirfull = (0 == i) ? indir : outdir + "." + (i - 1);
Path outfile = new Path(outdirfull);
StringBuffer sb = new StringBuffer();
sb.append("-keepmap 10 ");
sb.append("-keepred 40 ");
sb.append("-inFormat");
sb.append(" org.apache.hadoop.mapreduce." +
"lib.input.SequenceFileInputFormat ");
sb.append("-outFormat");
sb.append(" org.apache.hadoop.mapreduce." +
"lib.output.SequenceFileOutputFormat ");
sb.append("-outKey org.apache.hadoop.io.Text ");
sb.append("-outValue org.apache.hadoop.io.Text ");
sb.append("-indir ").append(indirfull).append(" ");
sb.append("-outdir ").append(outdirfull).append(" ");
sb.append("-r ").append(numReducers);
String[] args = sb.toString().split(" ");
try {
fs.delete(outfile, true);
} catch (IOException ex) {
System.out.println(ex.toString());
}
Job job = GenericMRLoadJobCreator.createJob(
args, mapoutputCompressed, outputCompressed);
job.setJobName("GridmixMonsterQuery." + size);
cjob = new ControlledJob(job, null);
if (pjob != null) {
cjob.addDependingJob(pjob);
}
gridmix.addJob(cjob);
pjob = cjob;
}
} catch (Exception e) {
System.out.println(e.getStackTrace());
}
}
},
WEBDATASORT("webdataSort") {
public void addJob(int numReducers, boolean mapoutputCompressed,
boolean outputCompressed, Size size, JobControl gridmix) {
final String prop = String.format("webdataSort.%sJobs.inputFiles", size);
final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
final String outdir =
addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
StringBuffer sb = new StringBuffer();
sb.append("-keepmap 100 ");
sb.append("-keepred 100 ");
sb.append("-inFormat org.apache.hadoop.mapreduce." +
"lib.input.SequenceFileInputFormat ");
sb.append("-outFormat org.apache.hadoop.mapreduce." +
"lib.output.SequenceFileOutputFormat ");
sb.append("-outKey org.apache.hadoop.io.Text ");
sb.append("-outValue org.apache.hadoop.io.Text ");
sb.append("-indir ").append(indir).append(" ");
sb.append("-outdir ").append(outdir).append(" ");
sb.append("-r ").append(numReducers);
String[] args = sb.toString().split(" ");
clearDir(outdir);
try {
Job job = GenericMRLoadJobCreator.createJob(
args, mapoutputCompressed, outputCompressed);
job.setJobName("GridmixWebdataSort." + size);
ControlledJob cjob = new ControlledJob(job, null);
gridmix.addJob(cjob);
} catch (Exception ex) {
System.out.println(ex.getStackTrace());
}
}
};
private final String name;
GridMixJob(String name) {
this.name = name;
}
public String getName() {
return name;
}
public abstract void addJob(int numReducers, boolean mapComp,
boolean outComp, Size size, JobControl gridmix);
}
public GridMixRunner() throws IOException {
gridmix = new JobControl("GridMix");
if (null == config || null == fs) {
throw new IOException("Bad configuration. Cannot continue.");
}
}
private static FileSystem initFs() {
try {
return FileSystem.get(config);
} catch (Exception e) {
System.out.println("fs initation error: " + e.getMessage());
}
return null;
}
private static Configuration initConfig() {
Configuration conf = new Configuration();
String configFile = System.getenv("GRIDMIXCONFIG");
if (configFile == null) {
String configDir = System.getProperty("user.dir");
if (configDir == null) {
configDir = ".";
}
configFile = configDir + "/" + GRIDMIXCONFIG;
}
try {
Path fileResource = new Path(configFile);
conf.addResource(fileResource);
} catch (Exception e) {
System.err.println("Error reading config file " + configFile + ":" +
e.getMessage());
return null;
}
return conf;
}
private static int[] getInts(Configuration conf, String name, int defaultV) {
String[] vals = conf.getStrings(name, String.valueOf(defaultV));
int[] results = new int[vals.length];
for (int i = 0; i < vals.length; ++i) {
results[i] = Integer.parseInt(vals[i]);
}
return results;
}
private static String getInputDirsFor(String jobType, String defaultIndir) {
String inputFile[] = config.getStrings(jobType, defaultIndir);
StringBuffer indirBuffer = new StringBuffer();
for (int i = 0; i < inputFile.length; i++) {
indirBuffer = indirBuffer.append(inputFile[i]).append(",");
}
return indirBuffer.substring(0, indirBuffer.length() - 1);
}
private static void clearDir(String dir) {
try {
Path outfile = new Path(dir);
fs.delete(outfile, true);
} catch (IOException ex) {
ex.printStackTrace();
System.out.println("delete file error:");
System.out.println(ex.toString());
}
}
private boolean select(int total, int selected, int index) {
if (selected <= 0 || selected >= total) {
return selected > 0;
}
int step = total / selected;
int effectiveTotal = total - total % selected;
return (index <= effectiveTotal - 1 && (index % step == 0));
}
private static String addTSSuffix(String s) {
Date date = Calendar.getInstance().getTime();
String ts = String.valueOf(date.getTime());
return s + "_" + ts;
}
private void addJobs(GridMixJob job, Size size) throws IOException {
final String prefix = String.format("%s.%sJobs", job.getName(), size);
int[] numJobs = getInts(config, prefix + ".numOfJobs",
size.defaultNumJobs());
int[] numReduces = getInts(config, prefix + ".numOfReduces",
size.defaultNumReducers());
if (numJobs.length != numReduces.length) {
throw new IOException("Configuration error: " +
prefix + ".numOfJobs must match " +
prefix + ".numOfReduces");
}
int numMapoutputCompressed = config.getInt(
prefix + ".numOfMapoutputCompressed", 0);
int numOutputCompressed = config.getInt(
prefix + ".numOfOutputCompressed", size.defaultNumJobs());
int totalJobs = 0;
for (int nJob : numJobs) {
totalJobs += nJob;
}
int currentIndex = 0;
for (int i = 0; i < numJobs.length; ++i) {
for (int j = 0; j < numJobs[i]; ++j) {
boolean mapoutputComp =
select(totalJobs, numMapoutputCompressed, currentIndex);
boolean outputComp =
select(totalJobs, numOutputCompressed, currentIndex);
job.addJob(numReduces[i], mapoutputComp, outputComp, size, gridmix);
++numOfJobs;
++currentIndex;
}
}
}
private void addAllJobs(GridMixJob job) throws IOException {
for (Size size : EnumSet.allOf(Size.class)) {
addJobs(job, size);
}
}
public void addjobs() throws IOException {
for (GridMixJob jobtype : EnumSet.allOf(GridMixJob.class)) {
addAllJobs(jobtype);
}
System.out.println("total " +
gridmix.getWaitingJobList().size() + " jobs");
}
class SimpleStats {
long minValue;
long maxValue;
long averageValue;
long mediumValue;
int n;
SimpleStats(long[] data) {
Arrays.sort(data);
n = data.length;
minValue = data[0];
maxValue = data[n - 1];
mediumValue = data[n / 2];
long total = 0;
for (int i = 0; i < n; i++) {
total += data[i];
}
averageValue = total / n;
}
}
class TaskExecutionStats {
TreeMap<String, SimpleStats> theStats;
void computeStats(String name, long[] data) {
SimpleStats v = new SimpleStats(data);
theStats.put(name, v);
}
TaskExecutionStats() {
theStats = new TreeMap<String, SimpleStats>();
}
}
private TreeMap<String, String> getStatForJob(ControlledJob cjob) {
TreeMap<String, String> retv = new TreeMap<String, String>();
JobID mapreduceID = cjob.getMapredJobID();
Job job = cjob.getJob();
String jobName = job.getJobName();
retv.put("JobId", mapreduceID.toString());
retv.put("JobName", jobName);
TaskExecutionStats theTaskExecutionStats = new TaskExecutionStats();
try {
Counters jobCounters = job.getCounters();
Iterator<CounterGroup> groups = jobCounters.iterator();
while (groups.hasNext()) {
CounterGroup g = groups.next();
String gn = g.getName();
Iterator<Counter> cs = g.iterator();
while (cs.hasNext()) {
Counter c = cs.next();
String n = c.getName();
long v = c.getValue();
retv.put(mapreduceID + "." + jobName + "." + gn + "." + n, "" + v);
}
}
JobClient jc = new JobClient(job.getConfiguration());
TaskReport[] maps = jc
.getMapTaskReports((org.apache.hadoop.mapred.JobID)mapreduceID);
TaskReport[] reduces = jc
.getReduceTaskReports((org.apache.hadoop.mapred.JobID)mapreduceID);
retv.put(mapreduceID + "." + jobName + "." + "numOfMapTasks", ""
+ maps.length);
retv.put(mapreduceID + "." + jobName + "." + "numOfReduceTasks", ""
+ reduces.length);
long[] mapExecutionTimes = new long[maps.length];
long[] reduceExecutionTimes = new long[reduces.length];
Date date = Calendar.getInstance().getTime();
long startTime = date.getTime();
long finishTime = 0;
for (int j = 0; j < maps.length; j++) {
TaskReport map = maps[j];
long thisStartTime = map.getStartTime();
long thisFinishTime = map.getFinishTime();
if (thisStartTime > 0 && thisFinishTime > 0) {
mapExecutionTimes[j] = thisFinishTime - thisStartTime;
}
if (startTime > thisStartTime) {
startTime = thisStartTime;
}
if (finishTime < thisFinishTime) {
finishTime = thisFinishTime;
}
}
theTaskExecutionStats.computeStats("mapExecutionTimeStats",
mapExecutionTimes);
retv.put(mapreduceID + "." + jobName + "." + "mapStartTime", ""
+ startTime);
retv.put(mapreduceID + "." + jobName + "." + "mapEndTime", ""
+ finishTime);
for (int j = 0; j < reduces.length; j++) {
TaskReport reduce = reduces[j];
long thisStartTime = reduce.getStartTime();
long thisFinishTime = reduce.getFinishTime();
if (thisStartTime > 0 && thisFinishTime > 0) {
reduceExecutionTimes[j] = thisFinishTime - thisStartTime;
}
if (startTime > thisStartTime) {
startTime = thisStartTime;
}
if (finishTime < thisFinishTime) {
finishTime = thisFinishTime;
}
}
theTaskExecutionStats.computeStats("reduceExecutionTimeStats",
reduceExecutionTimes);
retv.put(mapreduceID + "." + jobName + "." + "reduceStartTime", ""
+ startTime);
retv.put(mapreduceID + "." + jobName + "." + "reduceEndTime", ""
+ finishTime);
if (cjob.getJobState() == ControlledJob.State.SUCCESS) {
retv.put(mapreduceID + "." + "jobStatus", "successful");
} else if (cjob.getJobState() == ControlledJob.State.FAILED) {
retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "failed");
} else {
retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "unknown");
}
Iterator<Entry<String, SimpleStats>> entries =
theTaskExecutionStats.theStats.entrySet().iterator();
while (entries.hasNext()) {
Entry<String, SimpleStats> e = entries.next();
SimpleStats v = e.getValue();
retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "." + "min",
"" + v.minValue);
retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "." + "max",
"" + v.maxValue);
retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "."
+ "medium", "" + v.mediumValue);
retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "." + "avg",
"" + v.averageValue);
retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "."
+ "numOfItems", "" + v.n);
}
} catch (Exception e) {
e.printStackTrace();
}
return retv;
}
private void printJobStat(TreeMap<String, String> stat) {
Iterator<Entry<String, String>> entries = stat.entrySet().iterator();
while (entries.hasNext()) {
Entry<String, String> e = entries.next();
System.out.println(e.getKey() + "\t" + e.getValue());
}
}
private void printStatsForJobs(List<ControlledJob> jobs) {
for (int i = 0; i < jobs.size(); i++) {
printJobStat(getStatForJob(jobs.get(i)));
}
}
public void run() {
Thread theGridmixRunner = new Thread(gridmix);
theGridmixRunner.start();
long startTime = System.currentTimeMillis();
while (!gridmix.allFinished()) {
System.out.println("Jobs in waiting state: "
+ gridmix.getWaitingJobList().size());
System.out.println("Jobs in ready state: "
+ gridmix.getReadyJobsList().size());
System.out.println("Jobs in running state: "
+ gridmix.getRunningJobList().size());
System.out.println("Jobs in success state: "
+ gridmix.getSuccessfulJobList().size());
System.out.println("Jobs in failed state: "
+ gridmix.getFailedJobList().size());
System.out.println("\n");
try {
Thread.sleep(10 * 1000);
} catch (Exception e) {
}
}
long endTime = System.currentTimeMillis();
List<ControlledJob> fail = gridmix.getFailedJobList();
List<ControlledJob> succeed = gridmix.getSuccessfulJobList();
int numOfSuccessfulJob = succeed.size();
if (numOfSuccessfulJob > 0) {
System.out.println(numOfSuccessfulJob + " jobs succeeded");
printStatsForJobs(succeed);
}
int numOfFailedjob = fail.size();
if (numOfFailedjob > 0) {
System.out.println("------------------------------- ");
System.out.println(numOfFailedjob + " jobs failed");
printStatsForJobs(fail);
}
System.out.println("GridMix results:");
System.out.println("Total num of Jobs: " + numOfJobs);
System.out.println("ExecutionTime: " + ((endTime-startTime) / 1000));
gridmix.stop();
}
public static void main(String argv[]) throws Exception {
GridMixRunner gridmixRunner = new GridMixRunner();
gridmixRunner.addjobs();
gridmixRunner.run();
}
}