blob: 4d9866db30f867a792a04d1c77bed1259390b964 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Formatter;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Synthetic job generated from a trace description.
*/
abstract class GridmixJob implements Callable<Job>, Delayed {
// Gridmix job name format is GRIDMIX<6 digit sequence number>
public static final String JOB_NAME_PREFIX = "GRIDMIX";
public static final Log LOG = LogFactory.getLog(GridmixJob.class);
private static final ThreadLocal<Formatter> nameFormat =
new ThreadLocal<Formatter>() {
@Override
protected Formatter initialValue() {
final StringBuilder sb =
new StringBuilder(JOB_NAME_PREFIX.length() + 6);
sb.append(JOB_NAME_PREFIX);
return new Formatter(sb);
}
};
protected final int seq;
protected final Path outdir;
protected final Job job;
protected final JobStory jobdesc;
protected final UserGroupInformation ugi;
protected final long submissionTimeNanos;
private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
new ConcurrentHashMap<Integer,List<InputSplit>>();
protected static final String GRIDMIX_JOB_SEQ = "gridmix.job.seq";
protected static final String GRIDMIX_USE_QUEUE_IN_TRACE =
"gridmix.job-submission.use-queue-in-trace";
protected static final String GRIDMIX_DEFAULT_QUEUE =
"gridmix.job-submission.default-queue";
// configuration key to enable/disable High-Ram feature emulation
static final String GRIDMIX_HIGHRAM_EMULATION_ENABLE =
"gridmix.highram-emulation.enable";
private static void setJobQueue(Job job, String queue) {
if (queue != null) {
job.getConfiguration().set(MRJobConfig.QUEUE_NAME, queue);
}
}
public GridmixJob(final Configuration conf, long submissionMillis,
final JobStory jobdesc, Path outRoot, UserGroupInformation ugi,
final int seq) throws IOException {
this.ugi = ugi;
this.jobdesc = jobdesc;
this.seq = seq;
((StringBuilder)nameFormat.get().out()).setLength(JOB_NAME_PREFIX.length());
try {
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException {
String jobId = null == jobdesc.getJobID()
? "<unknown>"
: jobdesc.getJobID().toString();
Job ret = new Job(conf,
nameFormat.get().format("%06d", seq).toString());
ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
ret.getConfiguration().set(Gridmix.ORIGINAL_JOB_ID, jobId);
ret.getConfiguration().set(Gridmix.ORIGINAL_JOB_NAME,
jobdesc.getName());
if (conf.getBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false)) {
setJobQueue(ret, jobdesc.getQueueName());
} else {
setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
}
// check if the job can emulate compression
if (canEmulateCompression()) {
// set the compression related configs if compression emulation is
// enabled
if (CompressionEmulationUtil.isCompressionEmulationEnabled(conf)) {
CompressionEmulationUtil.configureCompressionEmulation(
jobdesc.getJobConf(), ret.getConfiguration());
}
}
// configure high ram properties if enabled
if (conf.getBoolean(GRIDMIX_HIGHRAM_EMULATION_ENABLE, true)) {
configureHighRamProperties(jobdesc.getJobConf(),
ret.getConfiguration());
}
return ret;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
submissionMillis, TimeUnit.MILLISECONDS);
outdir = new Path(outRoot, "" + seq);
}
// Scales the desired job-level configuration parameter. This API makes sure
// that the ratio of the job level configuration parameter to the cluster
// level configuration parameter is maintained in the simulated run. Hence
// the values are scaled from the original cluster's configuration to the
// simulated cluster's configuration for higher emulation accuracy.
// This kind of scaling is useful for memory parameters.
private static void scaleConfigParameter(Configuration sourceConf,
Configuration destConf, String clusterValueKey,
String jobValueKey, long defaultValue) {
long simulatedClusterDefaultValue =
destConf.getLong(clusterValueKey, defaultValue);
long originalClusterDefaultValue =
sourceConf.getLong(clusterValueKey, defaultValue);
long originalJobValue =
sourceConf.getLong(jobValueKey, defaultValue);
double scaleFactor = (double)originalJobValue/originalClusterDefaultValue;
long simulatedJobValue = (long)(scaleFactor * simulatedClusterDefaultValue);
if (LOG.isDebugEnabled()) {
LOG.debug("For the job configuration parameter '" + jobValueKey
+ "' and the cluster configuration parameter '"
+ clusterValueKey + "', the original job's configuration value"
+ " is scaled from '" + originalJobValue + "' to '"
+ simulatedJobValue + "' using the default (unit) value of "
+ "'" + originalClusterDefaultValue + "' for the original "
+ " cluster and '" + simulatedClusterDefaultValue + "' for the"
+ " simulated cluster.");
}
destConf.setLong(jobValueKey, simulatedJobValue);
}
// Checks if the scaling of original job's memory parameter value is
// valid
@SuppressWarnings("deprecation")
private static boolean checkMemoryUpperLimits(String jobKey, String limitKey,
Configuration conf,
boolean convertLimitToMB) {
if (conf.get(limitKey) != null) {
long limit = conf.getLong(limitKey, JobConf.DISABLED_MEMORY_LIMIT);
// scale only if the max memory limit is set.
if (limit >= 0) {
if (convertLimitToMB) {
limit /= (1024 * 1024); //Converting to MB
}
long scaledConfigValue =
conf.getLong(jobKey, JobConf.DISABLED_MEMORY_LIMIT);
// check now
if (scaledConfigValue > limit) {
throw new RuntimeException("Simulated job's configuration"
+ " parameter '" + jobKey + "' got scaled to a value '"
+ scaledConfigValue + "' which exceeds the upper limit of '"
+ limit + "' defined for the simulated cluster by the key '"
+ limitKey + "'. To disable High-Ram feature emulation, set '"
+ GRIDMIX_HIGHRAM_EMULATION_ENABLE + "' to 'false'.");
}
return true;
}
}
return false;
}
// Check if the parameter scaling does not exceed the cluster limits.
@SuppressWarnings("deprecation")
private static void validateTaskMemoryLimits(Configuration conf,
String jobKey, String clusterMaxKey) {
if (!checkMemoryUpperLimits(jobKey,
JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY, conf, true)) {
checkMemoryUpperLimits(jobKey, clusterMaxKey, conf, false);
}
}
/**
* Sets the high ram job properties in the simulated job's configuration.
*/
@SuppressWarnings("deprecation")
static void configureHighRamProperties(Configuration sourceConf,
Configuration destConf) {
// set the memory per map task
scaleConfigParameter(sourceConf, destConf,
MRConfig.MAPMEMORY_MB, MRJobConfig.MAP_MEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.MAP_MEMORY_MB,
JTConfig.JT_MAX_MAPMEMORY_MB);
// set the memory per reduce task
scaleConfigParameter(sourceConf, destConf,
MRConfig.REDUCEMEMORY_MB, MRJobConfig.REDUCE_MEMORY_MB,
JobConf.DISABLED_MEMORY_LIMIT);
// validate and fail early
validateTaskMemoryLimits(destConf, MRJobConfig.REDUCE_MEMORY_MB,
JTConfig.JT_MAX_REDUCEMEMORY_MB);
}
/**
* Indicates whether this {@link GridmixJob} supports compression emulation.
*/
protected abstract boolean canEmulateCompression();
protected GridmixJob(final Configuration conf, long submissionMillis,
final String name) throws IOException {
submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
submissionMillis, TimeUnit.MILLISECONDS);
jobdesc = null;
outdir = null;
seq = -1;
ugi = UserGroupInformation.getCurrentUser();
try {
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException {
Job ret = new Job(conf, name);
ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
return ret;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public UserGroupInformation getUgi() {
return ugi;
}
public String toString() {
return job.getJobName();
}
public long getDelay(TimeUnit unit) {
return unit.convert(submissionTimeNanos - System.nanoTime(),
TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
if (this == other) {
return 0;
}
if (other instanceof GridmixJob) {
final long otherNanos = ((GridmixJob)other).submissionTimeNanos;
if (otherNanos < submissionTimeNanos) {
return 1;
}
if (otherNanos > submissionTimeNanos) {
return -1;
}
return id() - ((GridmixJob)other).id();
}
final long diff =
getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
// not possible unless job is cloned; all jobs should be unique
return other instanceof GridmixJob && id() == ((GridmixJob)other).id();
}
@Override
public int hashCode() {
return id();
}
int id() {
return seq;
}
Job getJob() {
return job;
}
JobStory getJobDesc() {
return jobdesc;
}
static void pushDescription(int seq, List<InputSplit> splits) {
if (null != descCache.putIfAbsent(seq, splits)) {
throw new IllegalArgumentException("Description exists for id " + seq);
}
}
static List<InputSplit> pullDescription(JobContext jobCtxt) {
return pullDescription(GridmixJob.getJobSeqId(jobCtxt));
}
static List<InputSplit> pullDescription(int seq) {
return descCache.remove(seq);
}
static void clearAll() {
descCache.clear();
}
void buildSplits(FilePool inputDir) throws IOException {
}
static int getJobSeqId(JobContext job) {
return job.getConfiguration().getInt(GRIDMIX_JOB_SEQ,-1);
}
public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
public int getPartition(GridmixKey key, V value, int numReduceTasks) {
return key.getPartition();
}
}
public static class SpecGroupingComparator
implements RawComparator<GridmixKey> {
private final DataInputBuffer di = new DataInputBuffer();
private final byte[] reset = di.getData();
@Override
public int compare(GridmixKey g1, GridmixKey g2) {
final byte t1 = g1.getType();
final byte t2 = g2.getType();
if (t1 == GridmixKey.REDUCE_SPEC ||
t2 == GridmixKey.REDUCE_SPEC) {
return t1 - t2;
}
assert t1 == GridmixKey.DATA;
assert t2 == GridmixKey.DATA;
return g1.compareTo(g2);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
final int ret;
di.reset(b1, s1, l1);
final int x1 = WritableUtils.readVInt(di);
di.reset(b2, s2, l2);
final int x2 = WritableUtils.readVInt(di);
final int t1 = b1[s1 + x1];
final int t2 = b2[s2 + x2];
if (t1 == GridmixKey.REDUCE_SPEC ||
t2 == GridmixKey.REDUCE_SPEC) {
ret = t1 - t2;
} else {
assert t1 == GridmixKey.DATA;
assert t2 == GridmixKey.DATA;
ret =
WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
}
di.reset(reset, 0, 0);
return ret;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
static class RawBytesOutputFormat<K>
extends FileOutputFormat<K,GridmixRecord> {
@Override
public RecordWriter<K,GridmixRecord> getRecordWriter(
TaskAttemptContext job) throws IOException {
Path file = getDefaultWorkFile(job, "");
final DataOutputStream fileOut;
fileOut =
new DataOutputStream(CompressionEmulationUtil
.getPossiblyCompressedOutputStream(file, job.getConfiguration()));
return new RecordWriter<K,GridmixRecord>() {
@Override
public void write(K ignored, GridmixRecord value)
throws IOException {
// Let the Gridmix record fill itself.
value.write(fileOut);
}
@Override
public void close(TaskAttemptContext ctxt) throws IOException {
fileOut.close();
}
};
}
}
}