blob: 40393fe2a50d25e42803a02b8a389487f60ea7de [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintStream;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* Driver class for the Gridmix3 benchmark. Gridmix accepts a timestamped
* stream (trace) of job/task descriptions. For each job in the trace, the
* client will submit a corresponding, synthetic job to the target cluster at
* the rate in the original trace. The intent is to provide a benchmark that
* can be configured and extended to closely match the measured resource
* profile of actual, production loads.
*/
public class Gridmix extends Configured implements Tool {
public static final Log LOG = LogFactory.getLog(Gridmix.class);
/**
* Output (scratch) directory for submitted jobs. Relative paths are
* resolved against the path provided as input and absolute paths remain
* independent of it. The default is "gridmix".
*/
public static final String GRIDMIX_OUT_DIR = "gridmix.output.directory";
/**
* Number of submitting threads at the client and upper bound for
* in-memory split data. Submitting threads precompute InputSplits for
* submitted jobs. This limits the number of splits held in memory waiting
* for submission and also permits parallel computation of split data.
*/
public static final String GRIDMIX_SUB_THR = "gridmix.client.submit.threads";
/**
* The depth of the queue of job descriptions. Before splits are computed,
* a queue of pending descriptions is stored in memoory. This parameter
* limits the depth of that queue.
*/
public static final String GRIDMIX_QUE_DEP =
"gridmix.client.pending.queue.depth";
/**
* Multiplier to accelerate or decelerate job submission. As a crude means of
* sizing a job trace to a cluster, the time separating two jobs is
* multiplied by this factor.
*/
public static final String GRIDMIX_SUB_MUL = "gridmix.submit.multiplier";
// Submit data structures
private JobFactory factory;
private JobSubmitter submitter;
private JobMonitor monitor;
// Shutdown hook
private final Shutdown sdh = new Shutdown();
/**
* Write random bytes at the path provided.
* @see org.apache.hadoop.mapred.gridmix.GenerateData
*/
protected void writeInputData(long genbytes, Path ioPath)
throws IOException, InterruptedException {
final Configuration conf = getConf();
final GridmixJob genData = new GenerateData(conf, ioPath, genbytes);
submitter.add(genData);
LOG.info("Generating " + StringUtils.humanReadableInt(genbytes) +
" of test data...");
// TODO add listeners, use for job dependencies
TimeUnit.SECONDS.sleep(10);
try {
genData.getJob().waitForCompletion(false);
} catch (ClassNotFoundException e) {
throw new IOException("Internal error", e);
}
if (!genData.getJob().isSuccessful()) {
throw new IOException("Data generation failed!");
}
LOG.info("Done.");
}
protected InputStream createInputStream(String in) throws IOException {
if ("-".equals(in)) {
return System.in;
}
final Path pin = new Path(in);
return pin.getFileSystem(getConf()).open(pin);
}
/**
* Create each component in the pipeline and start it.
* @param conf Configuration data, no keys specific to this context
* @param traceIn Either a Path to the trace data or "-" for
* stdin
* @param ioPath Path from which input data is read
* @param scratchDir Path into which job output is written
* @param startFlag Semaphore for starting job trace pipeline
*/
private void startThreads(Configuration conf, String traceIn, Path ioPath,
Path scratchDir, CountDownLatch startFlag) throws IOException {
monitor = createJobMonitor();
submitter = createJobSubmitter(monitor,
conf.getInt(GRIDMIX_SUB_THR,
Runtime.getRuntime().availableProcessors() + 1),
conf.getInt(GRIDMIX_QUE_DEP, 5),
new FilePool(conf, ioPath));
factory = createJobFactory(submitter, traceIn, scratchDir, conf, startFlag);
monitor.start();
submitter.start();
factory.start();
}
protected JobMonitor createJobMonitor() throws IOException {
return new JobMonitor();
}
protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
int queueDepth, FilePool pool) throws IOException {
return new JobSubmitter(monitor, threads, queueDepth, pool);
}
protected JobFactory createJobFactory(JobSubmitter submitter, String traceIn,
Path scratchDir, Configuration conf, CountDownLatch startFlag)
throws IOException {
return new JobFactory(submitter, createInputStream(traceIn), scratchDir,
conf, startFlag);
}
public int run(String[] argv) throws IOException, InterruptedException {
if (argv.length < 2) {
printUsage(System.err);
return 1;
}
long genbytes = 0;
String traceIn = null;
Path ioPath = null;
try {
int i = 0;
genbytes = "-generate".equals(argv[i++])
? StringUtils.TraditionalBinaryPrefix.string2long(argv[i++])
: --i;
ioPath = new Path(argv[i++]);
traceIn = argv[i++];
if (i != argv.length) {
printUsage(System.err);
return 1;
}
} catch (Exception e) {
printUsage(System.err);
return 1;
}
InputStream trace = null;
try {
final Configuration conf = getConf();
Path scratchDir = new Path(ioPath, conf.get(GRIDMIX_OUT_DIR, "gridmix"));
// add shutdown hook for SIGINT, etc.
Runtime.getRuntime().addShutdownHook(sdh);
CountDownLatch startFlag = new CountDownLatch(1);
try {
// Create, start job submission threads
startThreads(conf, traceIn, ioPath, scratchDir, startFlag);
// Write input data if specified
if (genbytes > 0) {
writeInputData(genbytes, ioPath);
}
// scan input dir contents
submitter.refreshFilePool();
} catch (Throwable e) {
LOG.error("Startup failed", e);
if (factory != null) factory.abort(); // abort pipeline
} finally {
// signal for factory to start; sets start time
startFlag.countDown();
}
if (factory != null) {
// wait for input exhaustion
factory.join(Long.MAX_VALUE);
final Throwable badTraceException = factory.error();
if (null != badTraceException) {
LOG.error("Error in trace", badTraceException);
throw new IOException("Error in trace", badTraceException);
}
// wait for pending tasks to be submitted
submitter.shutdown();
submitter.join(Long.MAX_VALUE);
// wait for running tasks to complete
monitor.shutdown();
monitor.join(Long.MAX_VALUE);
}
} finally {
IOUtils.cleanup(LOG, trace);
}
return 0;
}
/**
* Handles orderly shutdown by requesting that each component in the
* pipeline abort its progress, waiting for each to exit and killing
* any jobs still running on the cluster.
*/
class Shutdown extends Thread {
static final long FAC_SLEEP = 1000;
static final long SUB_SLEEP = 4000;
static final long MON_SLEEP = 15000;
private void killComponent(Component<?> component, long maxwait) {
if (component == null) {
return;
}
component.abort();
try {
component.join(maxwait);
} catch (InterruptedException e) {
LOG.warn("Interrupted waiting for " + component);
}
}
@Override
public void run() {
LOG.info("Exiting...");
try {
killComponent(factory, FAC_SLEEP); // read no more tasks
killComponent(submitter, SUB_SLEEP); // submit no more tasks
killComponent(monitor, MON_SLEEP); // process remaining jobs here
} finally {
if (monitor == null) {
return;
}
List<Job> remainingJobs = monitor.getRemainingJobs();
if (remainingJobs.isEmpty()) {
return;
}
LOG.info("Killing running jobs...");
for (Job job : remainingJobs) {
try {
if (!job.isComplete()) {
job.killJob();
LOG.info("Killed " + job.getJobName() + " (" + job.getID() + ")");
} else {
if (job.isSuccessful()) {
monitor.onSuccess(job);
} else {
monitor.onFailure(job);
}
}
} catch (IOException e) {
LOG.warn("Failure killing " + job.getJobName(), e);
} catch (Exception e) {
LOG.error("Unexcpected exception", e);
}
}
LOG.info("Done.");
}
}
}
public static void main(String[] argv) throws Exception {
int res = -1;
try {
res = ToolRunner.run(new Configuration(), new Gridmix(), argv);
} finally {
System.exit(res);
}
}
protected void printUsage(PrintStream out) {
ToolRunner.printGenericCommandUsage(out);
out.println("Usage: gridmix [-generate <MiB>] <iopath> <trace>");
out.println(" e.g. gridmix -generate 100m foo -");
out.println("Configuration parameters:");
out.printf(" %-40s : Output directory\n", GRIDMIX_OUT_DIR);
out.printf(" %-40s : Submitting threads\n", GRIDMIX_SUB_THR);
out.printf(" %-40s : Queued job desc\n", GRIDMIX_QUE_DEP);
out.printf(" %-40s : Key fraction of rec\n",
AvgRecordFactory.GRIDMIX_KEY_FRC);
}
/**
* Components in the pipeline must support the following operations for
* orderly startup and shutdown.
*/
interface Component<T> {
/**
* Accept an item into this component from an upstream component. If
* shutdown or abort have been called, this may fail, depending on the
* semantics for the component.
*/
void add(T item) throws InterruptedException;
/**
* Attempt to start the service.
*/
void start();
/**
* Wait until the service completes. It is assumed that either a
* {@link #shutdown} or {@link #abort} has been requested.
*/
void join(long millis) throws InterruptedException;
/**
* Shut down gracefully, finishing all pending work. Reject new requests.
*/
void shutdown();
/**
* Shut down immediately, aborting any work in progress and discarding
* all pending work. It is legal to store pending work for another
* thread to process.
*/
void abort();
}
}