blob: d94e8faa25c08c4bb25bb872a4d5d08d98b1a66f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
import java.util.Formatter;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Synthetic job generated from a trace description.
*/
abstract class GridmixJob implements Callable<Job>, Delayed {
public static final String JOBNAME = "GRIDMIX";
public static final String ORIGNAME = "gridmix.job.name.original";
public static final Log LOG = LogFactory.getLog(GridmixJob.class);
private static final ThreadLocal<Formatter> nameFormat =
new ThreadLocal<Formatter>() {
@Override
protected Formatter initialValue() {
final StringBuilder sb = new StringBuilder(JOBNAME.length() + 5);
sb.append(JOBNAME);
return new Formatter(sb);
}
};
protected final int seq;
protected final Path outdir;
protected final Job job;
protected final JobStory jobdesc;
protected final UserGroupInformation ugi;
protected final long submissionTimeNanos;
private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
new ConcurrentHashMap<Integer,List<InputSplit>>();
protected static final String GRIDMIX_JOB_SEQ = "gridmix.job.seq";
protected static final String GRIDMIX_USE_QUEUE_IN_TRACE =
"gridmix.job-submission.use-queue-in-trace";
protected static final String GRIDMIX_DEFAULT_QUEUE =
"gridmix.job-submission.default-queue";
private static void setJobQueue(Job job, String queue) {
if (queue != null) {
job.getConfiguration().set(MRJobConfig.QUEUE_NAME, queue);
}
}
public GridmixJob(final Configuration conf, long submissionMillis,
final JobStory jobdesc, Path outRoot, UserGroupInformation ugi,
final int seq) throws IOException {
this.ugi = ugi;
this.jobdesc = jobdesc;
this.seq = seq;
((StringBuilder)nameFormat.get().out()).setLength(JOBNAME.length());
try {
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException {
Job ret =
new Job(conf,
nameFormat.get().format("%05d", seq).toString());
ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
String jobId = null == jobdesc.getJobID()
? "<unknown>"
: jobdesc.getJobID().toString();
ret.getConfiguration().set(ORIGNAME, jobId);
if (conf.getBoolean(GRIDMIX_USE_QUEUE_IN_TRACE, false)) {
setJobQueue(ret, jobdesc.getQueueName());
} else {
setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
}
return ret;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
submissionMillis, TimeUnit.MILLISECONDS);
outdir = new Path(outRoot, "" + seq);
}
protected GridmixJob(final Configuration conf, long submissionMillis,
final String name) throws IOException {
submissionTimeNanos = TimeUnit.NANOSECONDS.convert(
submissionMillis, TimeUnit.MILLISECONDS);
jobdesc = null;
outdir = null;
seq = -1;
ugi = UserGroupInformation.getCurrentUser();
try {
job = this.ugi.doAs(new PrivilegedExceptionAction<Job>() {
public Job run() throws IOException {
Job ret = new Job(conf, name);
ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
setJobQueue(ret, conf.get(GRIDMIX_DEFAULT_QUEUE));
return ret;
}
});
} catch (InterruptedException e) {
throw new IOException(e);
}
}
public UserGroupInformation getUgi() {
return ugi;
}
public String toString() {
return job.getJobName();
}
public long getDelay(TimeUnit unit) {
return unit.convert(submissionTimeNanos - System.nanoTime(),
TimeUnit.NANOSECONDS);
}
@Override
public int compareTo(Delayed other) {
if (this == other) {
return 0;
}
if (other instanceof GridmixJob) {
final long otherNanos = ((GridmixJob)other).submissionTimeNanos;
if (otherNanos < submissionTimeNanos) {
return 1;
}
if (otherNanos > submissionTimeNanos) {
return -1;
}
return id() - ((GridmixJob)other).id();
}
final long diff =
getDelay(TimeUnit.NANOSECONDS) - other.getDelay(TimeUnit.NANOSECONDS);
return 0 == diff ? 0 : (diff > 0 ? 1 : -1);
}
@Override
public boolean equals(Object other) {
if (this == other) {
return true;
}
// not possible unless job is cloned; all jobs should be unique
return other instanceof GridmixJob && id() == ((GridmixJob)other).id();
}
@Override
public int hashCode() {
return id();
}
int id() {
return seq;
}
Job getJob() {
return job;
}
JobStory getJobDesc() {
return jobdesc;
}
static void pushDescription(int seq, List<InputSplit> splits) {
if (null != descCache.putIfAbsent(seq, splits)) {
throw new IllegalArgumentException("Description exists for id " + seq);
}
}
static List<InputSplit> pullDescription(JobContext jobCtxt) {
return pullDescription(GridmixJob.getJobSeqId(jobCtxt));
}
static List<InputSplit> pullDescription(int seq) {
return descCache.remove(seq);
}
static void clearAll() {
descCache.clear();
}
void buildSplits(FilePool inputDir) throws IOException {
}
static int getJobSeqId(JobContext job) {
return job.getConfiguration().getInt(GRIDMIX_JOB_SEQ,-1);
}
public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
public int getPartition(GridmixKey key, V value, int numReduceTasks) {
return key.getPartition();
}
}
public static class SpecGroupingComparator
implements RawComparator<GridmixKey> {
private final DataInputBuffer di = new DataInputBuffer();
private final byte[] reset = di.getData();
@Override
public int compare(GridmixKey g1, GridmixKey g2) {
final byte t1 = g1.getType();
final byte t2 = g2.getType();
if (t1 == GridmixKey.REDUCE_SPEC ||
t2 == GridmixKey.REDUCE_SPEC) {
return t1 - t2;
}
assert t1 == GridmixKey.DATA;
assert t2 == GridmixKey.DATA;
return g1.compareTo(g2);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
try {
final int ret;
di.reset(b1, s1, l1);
final int x1 = WritableUtils.readVInt(di);
di.reset(b2, s2, l2);
final int x2 = WritableUtils.readVInt(di);
final int t1 = b1[s1 + x1];
final int t2 = b2[s2 + x2];
if (t1 == GridmixKey.REDUCE_SPEC ||
t2 == GridmixKey.REDUCE_SPEC) {
ret = t1 - t2;
} else {
assert t1 == GridmixKey.DATA;
assert t2 == GridmixKey.DATA;
ret =
WritableComparator.compareBytes(b1, s1, x1, b2, s2, x2);
}
di.reset(reset, 0, 0);
return ret;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
static class RawBytesOutputFormat<K>
extends FileOutputFormat<K,GridmixRecord> {
@Override
public RecordWriter<K,GridmixRecord> getRecordWriter(
TaskAttemptContext job) throws IOException {
Path file = getDefaultWorkFile(job, "");
FileSystem fs = file.getFileSystem(job.getConfiguration());
final FSDataOutputStream fileOut = fs.create(file, false);
return new RecordWriter<K,GridmixRecord>() {
@Override
public void write(K ignored, GridmixRecord value)
throws IOException {
value.writeRandom(fileOut, value.getSize());
}
@Override
public void close(TaskAttemptContext ctxt) throws IOException {
fileOut.close();
}
};
}
}
}