blob: fa6f1fc69d17c923e9cc990bb7f73aa3f9c04767 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.sls.synthetic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.math3.distribution.AbstractRealDistribution;
import org.apache.commons.math3.random.JDKRandomGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.sls.appmaster.MRAMSimulator;
import org.codehaus.jackson.annotate.JsonCreator;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import javax.xml.bind.annotation.XmlRootElement;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.codehaus.jackson.JsonParser.Feature.INTERN_FIELD_NAMES;
import static org.codehaus.jackson.map.DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES;
/**
* This is a JobStoryProducer that operates from distribution of different
* workloads. The .json input file is used to determine how many weight, which
* size, number of maps/reducers and their duration, as well as the temporal
* distributed of submissions. For each parameter we control avg and stdev, and
* generate values via normal or log-normal distributions.
*/
public class SynthTraceJobProducer implements JobStoryProducer {
@SuppressWarnings("StaticVariableName")
private static final Log LOG = LogFactory.getLog(SynthTraceJobProducer.class);
private final Configuration conf;
private final AtomicInteger numJobs;
private final Trace trace;
private final long seed;
private int totalWeight;
private final Queue<StoryParams> listStoryParams;
private final JDKRandomGenerator rand;
public static final String SLS_SYNTHETIC_TRACE_FILE =
"sls.synthetic" + ".trace_file";
private final static int DEFAULT_MAPPER_PRIORITY = 20;
private final static int DEFAULT_REDUCER_PRIORITY = 10;
public SynthTraceJobProducer(Configuration conf) throws IOException {
this(conf, new Path(conf.get(SLS_SYNTHETIC_TRACE_FILE)));
}
public SynthTraceJobProducer(Configuration conf, Path path)
throws IOException {
LOG.info("SynthTraceJobProducer");
this.conf = conf;
this.rand = new JDKRandomGenerator();
ObjectMapper mapper = new ObjectMapper();
mapper.configure(INTERN_FIELD_NAMES, true);
mapper.configure(FAIL_ON_UNKNOWN_PROPERTIES, false);
FileSystem ifs = path.getFileSystem(conf);
FSDataInputStream fileIn = ifs.open(path);
// Initialize the random generator and the seed
this.trace = mapper.readValue(fileIn, Trace.class);
this.seed = trace.rand_seed;
this.rand.setSeed(seed);
// Initialize the trace
this.trace.init(rand);
this.numJobs = new AtomicInteger(trace.num_jobs);
for (Double w : trace.workload_weights) {
totalWeight += w;
}
// Initialize our story parameters
listStoryParams = createStory();
LOG.info("Generated " + listStoryParams.size() + " deadlines for "
+ this.numJobs.get() + " jobs");
}
// StoryParams hold the minimum amount of information needed to completely
// specify a job run: job definition, start time, and queue.
// This allows us to create "jobs" and then order them according to start time
static class StoryParams {
// Time the job gets submitted to
private long actualSubmissionTime;
// The queue the job gets submitted to
private String queue;
// Definition to construct the job from
private JobDefinition jobDef;
StoryParams(long actualSubmissionTime, String queue, JobDefinition jobDef) {
this.actualSubmissionTime = actualSubmissionTime;
this.queue = queue;
this.jobDef = jobDef;
}
}
private Queue<StoryParams> createStory() {
// create priority queue to keep start-time sorted
Queue<StoryParams> storyQueue =
new PriorityQueue<>(this.numJobs.get(), new Comparator<StoryParams>() {
@Override
public int compare(StoryParams o1, StoryParams o2) {
return Math
.toIntExact(o1.actualSubmissionTime - o2.actualSubmissionTime);
}
});
for (int i = 0; i < numJobs.get(); i++) {
// Generate a workload
Workload wl = trace.generateWorkload();
// Save all the parameters needed to completely define a job
long actualSubmissionTime = wl.generateSubmissionTime();
String queue = wl.queue_name;
JobDefinition job = wl.generateJobDefinition();
storyQueue.add(new StoryParams(actualSubmissionTime, queue, job));
}
return storyQueue;
}
@Override
public JobStory getNextJob() throws IOException {
if (numJobs.decrementAndGet() < 0) {
return null;
}
StoryParams storyParams = listStoryParams.poll();
return new SynthJob(rand, conf, storyParams.jobDef, storyParams.queue,
storyParams.actualSubmissionTime);
}
@Override
public void close(){
}
@Override
public String toString() {
return "SynthTraceJobProducer [ conf=" + conf + ", numJobs=" + numJobs
+ ", r=" + rand + ", totalWeight="
+ totalWeight + ", workloads=" + trace.workloads + "]";
}
public int getNumJobs() {
return trace.num_jobs;
}
// Helper to parse and maintain backwards compatibility with
// syn json formats
private static void validateJobDef(JobDefinition jobDef){
if(jobDef.tasks == null) {
LOG.info("Detected old JobDefinition format. Converting.");
try {
jobDef.tasks = new ArrayList<>();
jobDef.type = "mapreduce";
jobDef.deadline_factor = new Sample(jobDef.deadline_factor_avg,
jobDef.deadline_factor_stddev);
jobDef.duration = new Sample(jobDef.dur_avg,
jobDef.dur_stddev);
jobDef.reservation = new Sample(jobDef.chance_of_reservation);
TaskDefinition map = new TaskDefinition();
map.type = MRAMSimulator.MAP_TYPE;
map.count = new Sample(jobDef.mtasks_avg, jobDef.mtasks_stddev);
map.time = new Sample(jobDef.mtime_avg, jobDef.mtime_stddev);
map.max_memory = new Sample((double) jobDef.map_max_memory_avg,
jobDef.map_max_memory_stddev);
map.max_vcores = new Sample((double) jobDef.map_max_vcores_avg,
jobDef.map_max_vcores_stddev);
map.priority = DEFAULT_MAPPER_PRIORITY;
map.executionType = jobDef.map_execution_type;
jobDef.tasks.add(map);
TaskDefinition reduce = new TaskDefinition();
reduce.type = MRAMSimulator.REDUCE_TYPE;
reduce.count = new Sample(jobDef.rtasks_avg, jobDef.rtasks_stddev);
reduce.time = new Sample(jobDef.rtime_avg, jobDef.rtime_stddev);
reduce.max_memory = new Sample((double) jobDef.reduce_max_memory_avg,
jobDef.reduce_max_memory_stddev);
reduce.max_vcores = new Sample((double) jobDef.reduce_max_vcores_avg,
jobDef.reduce_max_vcores_stddev);
reduce.priority = DEFAULT_REDUCER_PRIORITY;
reduce.executionType = jobDef.reduce_execution_type;
jobDef.tasks.add(reduce);
} catch (JsonMappingException e) {
LOG.warn("Error converting old JobDefinition format", e);
}
}
}
public long getSeed() {
return seed;
}
public int getNodesPerRack() {
return trace.nodes_per_rack < 1 ? 1: trace.nodes_per_rack;
}
public int getNumNodes() {
return trace.num_nodes;
}
/**
* Class used to parse a trace configuration file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
@XmlRootElement
public static class Trace {
@JsonProperty("description")
String description;
@JsonProperty("num_nodes")
int num_nodes;
@JsonProperty("nodes_per_rack")
int nodes_per_rack;
@JsonProperty("num_jobs")
int num_jobs;
// in sec (selects a portion of time_distribution
@JsonProperty("rand_seed")
long rand_seed;
@JsonProperty("workloads")
List<Workload> workloads;
List<Double> workload_weights;
JDKRandomGenerator rand;
public void init(JDKRandomGenerator random){
this.rand = random;
// Pass rand forward
for(Workload w : workloads){
w.init(rand);
}
// Initialize workload weights
workload_weights = new ArrayList<>();
for(Workload w : workloads){
workload_weights.add(w.workload_weight);
}
}
Workload generateWorkload(){
return workloads.get(SynthUtils.getWeighted(workload_weights, rand));
}
}
/**
* Class used to parse a workload from file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class Workload {
@JsonProperty("workload_name")
String workload_name;
// used to change probability this workload is picked for each job
@JsonProperty("workload_weight")
double workload_weight;
@JsonProperty("queue_name")
String queue_name;
@JsonProperty("job_classes")
List<JobDefinition> job_classes;
@JsonProperty("time_distribution")
List<TimeSample> time_distribution;
JDKRandomGenerator rand;
List<Double> job_weights;
List<Double> time_weights;
public void init(JDKRandomGenerator random){
this.rand = random;
// Validate and pass rand forward
for(JobDefinition def : job_classes){
validateJobDef(def);
def.init(rand);
}
// Initialize job weights
job_weights = new ArrayList<>();
job_weights = new ArrayList<>();
for(JobDefinition j : job_classes){
job_weights.add(j.class_weight);
}
// Initialize time weights
time_weights = new ArrayList<>();
for(TimeSample ts : time_distribution){
time_weights.add(ts.weight);
}
}
public long generateSubmissionTime(){
int index = SynthUtils.getWeighted(time_weights, rand);
// Retrieve the lower and upper bounds for this time "bucket"
int start = time_distribution.get(index).time;
// Get the beginning of the next time sample (if it exists)
index = (index+1)<time_distribution.size() ? index+1 : index;
int end = time_distribution.get(index).time;
int range = end-start;
// Within this time "bucket", uniformly pick a time if our
// range is non-zero, otherwise just use the start time of the bucket
return start + (range>0 ? rand.nextInt(range) : 0);
}
public JobDefinition generateJobDefinition(){
return job_classes.get(SynthUtils.getWeighted(job_weights, rand));
}
@Override
public String toString(){
return "\nWorkload " + workload_name + ", weight: " + workload_weight
+ ", queue: " + queue_name + " "
+ job_classes.toString().replace("\n", "\n\t");
}
}
/**
* Class used to parse a job class from file.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class JobDefinition {
@JsonProperty("class_name")
String class_name;
@JsonProperty("user_name")
String user_name;
// used to change probability this class is chosen
@JsonProperty("class_weight")
double class_weight;
// am type to launch
@JsonProperty("type")
String type;
@JsonProperty("deadline_factor")
Sample deadline_factor;
@JsonProperty("duration")
Sample duration;
@JsonProperty("reservation")
Sample reservation;
@JsonProperty("tasks")
List<TaskDefinition> tasks;
@JsonProperty("params")
Map<String, String> params;
// Old JSON fields for backwards compatibility
// reservation related params
@JsonProperty("chance_of_reservation")
double chance_of_reservation;
@JsonProperty("deadline_factor_avg")
double deadline_factor_avg;
@JsonProperty("deadline_factor_stddev")
double deadline_factor_stddev;
// durations in sec
@JsonProperty("dur_avg")
double dur_avg;
@JsonProperty("dur_stddev")
double dur_stddev;
@JsonProperty("mtime_avg")
double mtime_avg;
@JsonProperty("mtime_stddev")
double mtime_stddev;
@JsonProperty("rtime_avg")
double rtime_avg;
@JsonProperty("rtime_stddev")
double rtime_stddev;
// number of tasks
@JsonProperty("mtasks_avg")
double mtasks_avg;
@JsonProperty("mtasks_stddev")
double mtasks_stddev;
@JsonProperty("rtasks_avg")
double rtasks_avg;
@JsonProperty("rtasks_stddev")
double rtasks_stddev;
// memory in MB
@JsonProperty("map_max_memory_avg")
long map_max_memory_avg;
@JsonProperty("map_max_memory_stddev")
double map_max_memory_stddev;
@JsonProperty("reduce_max_memory_avg")
long reduce_max_memory_avg;
@JsonProperty("reduce_max_memory_stddev")
double reduce_max_memory_stddev;
// vcores
@JsonProperty("map_max_vcores_avg")
long map_max_vcores_avg;
@JsonProperty("map_max_vcores_stddev")
double map_max_vcores_stddev;
@JsonProperty("reduce_max_vcores_avg")
long reduce_max_vcores_avg;
@JsonProperty("reduce_max_vcores_stddev")
double reduce_max_vcores_stddev;
//container execution type
@JsonProperty("map_execution_type")
String map_execution_type = ExecutionType.GUARANTEED.name();
@JsonProperty("reduce_execution_type")
String reduce_execution_type = ExecutionType.GUARANTEED.name();
public void init(JDKRandomGenerator rand){
deadline_factor.init(rand);
duration.init(rand);
reservation.init(rand);
for(TaskDefinition t : tasks){
t.count.init(rand);
t.time.init(rand);
t.max_memory.init(rand);
t.max_vcores.init(rand);
}
}
@Override
public String toString(){
return "\nJobDefinition " + class_name + ", weight: " + class_weight
+ ", type: " + type + " "
+ tasks.toString().replace("\n", "\n\t");
}
}
/**
* A task representing a type of container - e.g. "map" in mapreduce
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class TaskDefinition {
@JsonProperty("type")
String type;
@JsonProperty("count")
Sample count;
@JsonProperty("time")
Sample time;
@JsonProperty("max_memory")
Sample max_memory;
@JsonProperty("max_vcores")
Sample max_vcores;
@JsonProperty("priority")
int priority;
@JsonProperty("execution_type")
String executionType = ExecutionType.GUARANTEED.name();
@Override
public String toString(){
return "\nTaskDefinition " + type
+ " Count[" + count + "] Time[" + time + "] Memory[" + max_memory
+ "] Vcores[" + max_vcores + "] Priority[" + priority
+ "] ExecutionType[" + executionType + "]";
}
}
/**
* Class used to parse value sample information.
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class Sample {
private static final Dist DEFAULT_DIST = Dist.LOGNORM;
private final double val;
private final double std;
private final Dist dist;
private AbstractRealDistribution dist_instance;
private final List<String> discrete;
private final List<Double> weights;
private final Mode mode;
private JDKRandomGenerator rand;
private enum Mode{
CONST,
DIST,
DISC
}
private enum Dist{
LOGNORM,
NORM
}
public Sample(Double val) throws JsonMappingException{
this(val, null);
}
public Sample(Double val, Double std) throws JsonMappingException{
this(val, std, null, null, null);
}
@JsonCreator
public Sample(@JsonProperty("val") Double val,
@JsonProperty("std") Double std, @JsonProperty("dist") String dist,
@JsonProperty("discrete") List<String> discrete,
@JsonProperty("weights") List<Double> weights)
throws JsonMappingException{
// Different Modes
// - Constant: val must be specified, all else null. Sampling will
// return val.
// - Distribution: val, std specified, dist optional (defaults to
// LogNormal). Sampling will sample from the appropriate distribution
// - Discrete: discrete must be set to a list of strings or numbers,
// weights optional (defaults to uniform)
if(val!=null){
if(std==null){
// Constant
if(dist!=null || discrete!=null || weights!=null){
throw new JsonMappingException("Instantiation of " + Sample.class
+ " failed");
}
mode = Mode.CONST;
this.val = val;
this.std = 0;
this.dist = null;
this.discrete = null;
this.weights = null;
} else {
// Distribution
if(discrete!=null || weights != null){
throw new JsonMappingException("Instantiation of " + Sample.class
+ " failed");
}
mode = Mode.DIST;
this.val = val;
this.std = std;
this.dist = dist!=null ? Dist.valueOf(dist) : DEFAULT_DIST;
this.discrete = null;
this.weights = null;
}
} else {
// Discrete
if(discrete==null){
throw new JsonMappingException("Instantiation of " + Sample.class
+ " failed");
}
mode = Mode.DISC;
this.val = 0;
this.std = 0;
this.dist = null;
this.discrete = discrete;
if(weights == null){
weights = new ArrayList<>(Collections.nCopies(
discrete.size(), 1.0));
}
if(weights.size() != discrete.size()){
throw new JsonMappingException("Instantiation of " + Sample.class
+ " failed");
}
this.weights = weights;
}
}
public void init(JDKRandomGenerator random){
if(this.rand != null){
throw new YarnRuntimeException("init called twice");
}
this.rand = random;
if(mode == Mode.DIST){
switch(this.dist){
case LOGNORM:
this.dist_instance = SynthUtils.getLogNormalDist(rand, val, std);
return;
case NORM:
this.dist_instance = SynthUtils.getNormalDist(rand, val, std);
return;
default:
throw new YarnRuntimeException("Unknown distribution " + dist.name());
}
}
}
public int getInt(){
return Math.toIntExact(getLong());
}
public long getLong(){
return Math.round(getDouble());
}
public double getDouble(){
return Double.parseDouble(getString());
}
public String getString(){
if(this.rand == null){
throw new YarnRuntimeException("getValue called without init");
}
switch(mode){
case CONST:
return Double.toString(val);
case DIST:
return Double.toString(dist_instance.sample());
case DISC:
return this.discrete.get(SynthUtils.getWeighted(this.weights, rand));
default:
throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
}
}
@Override
public String toString(){
switch(mode){
case CONST:
return "value: " + Double.toString(val);
case DIST:
return "value: " + this.val + " std: " + this.std + " dist: "
+ this.dist.name();
case DISC:
return "discrete: " + this.discrete + ", weights: " + this.weights;
default:
throw new YarnRuntimeException("Unknown sampling mode " + mode.name());
}
}
}
/**
* This is used to define time-varying probability of a job start-time (e.g.,
* to simulate daily patterns).
*/
@SuppressWarnings({ "membername", "checkstyle:visibilitymodifier" })
public static class TimeSample {
// in sec
@JsonProperty("time")
int time;
@JsonProperty("weight")
double weight;
}
}