blob: 3ed81e1f2cd3981ae164fac1c337237f6f33cc2d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.distribution.LogNormalDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.tools.rumen.*;
import org.apache.hadoop.tools.rumen.Pre21JobHistoryConstants.Values;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.hadoop.mapreduce.MRJobConfig.QUEUE_NAME;
/**
* Generates random task data for a synthetic job.
*/
public class SynthJob implements JobStory {
@SuppressWarnings("StaticVariableName")
private static Log LOG = LogFactory.getLog(SynthJob.class);
private final Configuration conf;
private final int id;
@SuppressWarnings("ConstantName")
private static final AtomicInteger sequence = new AtomicInteger(0);
private final String name;
private final String queueName;
private final SynthJobClass jobClass;
// job timing
private final long submitTime;
private final long duration;
private final long deadline;
private final int numMapTasks;
private final int numRedTasks;
private final long mapMaxMemory;
private final long reduceMaxMemory;
private final long mapMaxVcores;
private final long reduceMaxVcores;
private final long[] mapRuntime;
private final float[] reduceRuntime;
private long totMapRuntime;
private long totRedRuntime;
public SynthJob(JDKRandomGenerator rand, Configuration conf,
SynthJobClass jobClass, long actualSubmissionTime) {
this.conf = conf;
this.jobClass = jobClass;
this.duration = MILLISECONDS.convert(jobClass.getDur(), SECONDS);
this.numMapTasks = jobClass.getMtasks();
this.numRedTasks = jobClass.getRtasks();
// sample memory distributions, correct for sub-minAlloc sizes
long tempMapMaxMemory = jobClass.getMapMaxMemory();
this.mapMaxMemory = tempMapMaxMemory < MRJobConfig.DEFAULT_MAP_MEMORY_MB
? MRJobConfig.DEFAULT_MAP_MEMORY_MB : tempMapMaxMemory;
long tempReduceMaxMemory = jobClass.getReduceMaxMemory();
this.reduceMaxMemory =
tempReduceMaxMemory < MRJobConfig.DEFAULT_REDUCE_MEMORY_MB
? MRJobConfig.DEFAULT_REDUCE_MEMORY_MB : tempReduceMaxMemory;
// sample vcores distributions, correct for sub-minAlloc sizes
long tempMapMaxVCores = jobClass.getMapMaxVcores();
this.mapMaxVcores = tempMapMaxVCores < MRJobConfig.DEFAULT_MAP_CPU_VCORES
? MRJobConfig.DEFAULT_MAP_CPU_VCORES : tempMapMaxVCores;
long tempReduceMaxVcores = jobClass.getReduceMaxVcores();
this.reduceMaxVcores =
tempReduceMaxVcores < MRJobConfig.DEFAULT_REDUCE_CPU_VCORES
? MRJobConfig.DEFAULT_REDUCE_CPU_VCORES : tempReduceMaxVcores;
if (numMapTasks > 0) {
conf.setLong(MRJobConfig.MAP_MEMORY_MB, this.mapMaxMemory);
conf.set(MRJobConfig.MAP_JAVA_OPTS,
"-Xmx" + (this.mapMaxMemory - 100) + "m");
}
if (numRedTasks > 0) {
conf.setLong(MRJobConfig.REDUCE_MEMORY_MB, this.reduceMaxMemory);
conf.set(MRJobConfig.REDUCE_JAVA_OPTS,
"-Xmx" + (this.reduceMaxMemory - 100) + "m");
}
boolean hasDeadline =
(rand.nextDouble() <= jobClass.jobClass.chance_of_reservation);
LogNormalDistribution deadlineFactor =
SynthUtils.getLogNormalDist(rand, jobClass.jobClass.deadline_factor_avg,
jobClass.jobClass.deadline_factor_stddev);
double deadlineFactorSample =
(deadlineFactor != null) ? deadlineFactor.sample() : -1;
this.queueName = jobClass.workload.getQueueName();
this.submitTime = MILLISECONDS.convert(actualSubmissionTime, SECONDS);
this.deadline =
hasDeadline ? MILLISECONDS.convert(actualSubmissionTime, SECONDS)
+ (long) Math.ceil(deadlineFactorSample * duration) : -1;
conf.set(QUEUE_NAME, queueName);
// name and initialize job randomness
final long seed = rand.nextLong();
rand.setSeed(seed);
id = sequence.getAndIncrement();
name = String.format(jobClass.getClassName() + "_%06d", id);
LOG.debug(name + " (" + seed + ")");
LOG.info("JOB TIMING`: job: " + name + " submission:" + submitTime
+ " deadline:" + deadline + " duration:" + duration
+ " deadline-submission: " + (deadline - submitTime));
// generate map and reduce runtimes
mapRuntime = new long[numMapTasks];
for (int i = 0; i < numMapTasks; i++) {
mapRuntime[i] = jobClass.getMapTimeSample();
totMapRuntime += mapRuntime[i];
}
reduceRuntime = new float[numRedTasks];
for (int i = 0; i < numRedTasks; i++) {
reduceRuntime[i] = jobClass.getReduceTimeSample();
totRedRuntime += (long) Math.ceil(reduceRuntime[i]);
}
}
public boolean hasDeadline() {
return deadline > 0;
}
@Override
public String getName() {
return name;
}
@Override
public String getUser() {
return jobClass.getUserName();
}
@Override
public JobID getJobID() {
return new JobID("job_mock_" + name, id);
}
@Override
public Values getOutcome() {
return Values.SUCCESS;
}
@Override
public long getSubmissionTime() {
return submitTime;
}
@Override
public int getNumberMaps() {
return numMapTasks;
}
@Override
public int getNumberReduces() {
return numRedTasks;
}
@Override
public TaskInfo getTaskInfo(TaskType taskType, int taskNumber) {
switch (taskType) {
case MAP:
return new TaskInfo(-1, -1, -1, -1, mapMaxMemory, mapMaxVcores);
case REDUCE:
return new TaskInfo(-1, -1, -1, -1, reduceMaxMemory, reduceMaxVcores);
default:
throw new IllegalArgumentException("Not interested");
}
}
@Override
public InputSplit[] getInputSplits() {
throw new UnsupportedOperationException();
}
@Override
public TaskAttemptInfo getTaskAttemptInfo(TaskType taskType, int taskNumber,
int taskAttemptNumber) {
switch (taskType) {
case MAP:
return new MapTaskAttemptInfo(State.SUCCEEDED,
getTaskInfo(taskType, taskNumber), mapRuntime[taskNumber], null);
case REDUCE:
// We assume uniform split between pull/sort/reduce
// aligned with naive progress reporting assumptions
return new ReduceTaskAttemptInfo(State.SUCCEEDED,
getTaskInfo(taskType, taskNumber),
(long) Math.round((reduceRuntime[taskNumber] / 3)),
(long) Math.round((reduceRuntime[taskNumber] / 3)),
(long) Math.round((reduceRuntime[taskNumber] / 3)), null);
default:
break;
}
throw new UnsupportedOperationException();
}
@Override
public TaskAttemptInfo getMapTaskAttemptInfoAdjusted(int taskNumber,
int taskAttemptNumber, int locality) {
throw new UnsupportedOperationException();
}
@Override
public org.apache.hadoop.mapred.JobConf getJobConf() {
return new JobConf(conf);
}
@Override
public String getQueueName() {
return queueName;
}
@Override
public String toString() {
return "SynthJob [\n" + " workload=" + jobClass.getWorkload().getId()
+ "\n" + " jobClass="
+ jobClass.getWorkload().getClassList().indexOf(jobClass) + "\n"
+ " conf=" + conf + ",\n" + " id=" + id + ",\n" + " name=" + name
+ ",\n" + " mapRuntime=" + Arrays.toString(mapRuntime) + ",\n"
+ " reduceRuntime=" + Arrays.toString(reduceRuntime) + ",\n"
+ " submitTime=" + submitTime + ",\n" + " numMapTasks=" + numMapTasks
+ ",\n" + " numRedTasks=" + numRedTasks + ",\n" + " mapMaxMemory="
+ mapMaxMemory + ",\n" + " reduceMaxMemory=" + reduceMaxMemory + ",\n"
+ " queueName=" + queueName + "\n" + "]";
}
public SynthJobClass getJobClass() {
return jobClass;
}
public long getTotalSlotTime() {
return totMapRuntime + totRedRuntime;
}
public long getDuration() {
return duration;
}
public long getDeadline() {
return deadline;
}
@Override
public boolean equals(Object other) {
if (!(other instanceof SynthJob)) {
return false;
}
SynthJob o = (SynthJob) other;
return Arrays.equals(mapRuntime, o.mapRuntime)
&& Arrays.equals(reduceRuntime, o.reduceRuntime)
&& submitTime == o.submitTime && numMapTasks == o.numMapTasks
&& numRedTasks == o.numRedTasks && mapMaxMemory == o.mapMaxMemory
&& reduceMaxMemory == o.reduceMaxMemory
&& mapMaxVcores == o.mapMaxVcores
&& reduceMaxVcores == o.reduceMaxVcores && queueName.equals(o.queueName)
&& jobClass.equals(o.jobClass) && totMapRuntime == o.totMapRuntime
&& totRedRuntime == o.totRedRuntime;
}
@Override
public int hashCode() {
// could have a bad distr; investigate if a relevant use case exists
return jobClass.hashCode() * (int) submitTime;
}
}