blob: ab406f4cfd08ba1502e535e28ec7eee273649afa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred.gridmix;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.tools.rumen.JobStory;
import org.apache.hadoop.tools.rumen.JobStoryProducer;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.Condition;
public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
private LoadStatus loadStatus = new LoadStatus();
private List<Job> runningWaitingJobs;
private final Condition overloaded = this.lock.newCondition();
/**
* The minimum ratio between pending+running map tasks (aka. incomplete map
* tasks) and cluster map slot capacity for us to consider the cluster is
* overloaded. For running maps, we only count them partially. Namely, a 40%
* completed map is counted as 0.6 map tasks in our calculation.
*/
static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f;
/**
* Creating a new instance does not start the thread.
*
* @param submitter Component to which deserialized jobs are passed
* @param jobProducer Stream of job traces with which to construct a
* {@link org.apache.hadoop.tools.rumen.ZombieJobProducer}
* @param scratch Directory into which to write output from simulated jobs
* @param conf Config passed to all jobs to be submitted
* @param startFlag Latch released from main to start pipeline
* @throws java.io.IOException
*/
public StressJobFactory(
JobSubmitter submitter, JobStoryProducer jobProducer, Path scratch,
Configuration conf, CountDownLatch startFlag) throws IOException {
super(
submitter, jobProducer, scratch, conf, startFlag);
//Setting isOverloaded as true , now JF would wait for atleast first
//set of ClusterStats based on which it can decide how many job it has
//to submit.
this.loadStatus.isOverloaded = true;
}
public Thread createReaderThread() {
return new StressReaderThread("StressJobFactory");
}
/*
* Worker thread responsible for reading descriptions, assigning sequence
* numbers, and normalizing time.
*/
private class StressReaderThread extends Thread {
public StressReaderThread(String name) {
super(name);
}
/**
* STRESS: Submits the job in STRESS mode.
* while(JT is overloaded) {
* wait();
* }
* If not overloaded , get number of slots available.
* Keep submitting the jobs till ,total jobs is sufficient to
* load the JT.
* That is submit (Sigma(no of maps/Job)) > (2 * no of slots available)
*/
public void run() {
try {
startFlag.await();
if (Thread.currentThread().isInterrupted()) {
return;
}
LOG.info("START STRESS @ " + System.currentTimeMillis());
while (!Thread.currentThread().isInterrupted()) {
lock.lock();
try {
while (loadStatus.isOverloaded) {
//Wait while JT is overloaded.
try {
overloaded.await();
} catch (InterruptedException ie) {
return;
}
}
int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable);
for (int i = 0; i < noOfSlotsAvailable; i++) {
try {
final JobStory job = getNextJobFiltered();
if (null == job) {
return;
}
//TODO: We need to take care of scenario when one map takes more
//than 1 slot.
i += job.getNumberMaps();
submitter.add(
new GridmixJob(
conf, 0L, job, scratch, sequence.getAndIncrement()));
} catch (IOException e) {
LOG.error(" EXCEPTOIN in availableSlots ", e);
error = e;
return;
}
}
} finally {
lock.unlock();
}
}
} catch (InterruptedException e) {
return;
} finally {
IOUtils.cleanup(null, jobProducer);
}
}
}
/**
* <p/>
* STRESS Once you get the notification from StatsCollector.Collect the
* clustermetrics. Update current loadStatus with new load status of JT.
*
* @param item
*/
@Override
public void update(Statistics.ClusterStats item) {
lock.lock();
try {
ClusterMetrics clusterMetrics = item.getStatus();
LoadStatus newStatus;
runningWaitingJobs = item.getRunningWaitingJobs();
newStatus = checkLoadAndGetSlotsToBackfill(clusterMetrics);
loadStatus.isOverloaded = newStatus.isOverloaded;
loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill;
overloaded.signalAll();
} finally {
lock.unlock();
}
}
/**
* We try to use some light-weight mechanism to determine cluster load.
*
* @param clusterStatus
* @return Whether, from job client perspective, the cluster is overloaded.
*/
private LoadStatus checkLoadAndGetSlotsToBackfill(
ClusterMetrics clusterStatus) {
LoadStatus loadStatus = new LoadStatus();
// If there are more jobs than number of task trackers, we assume the
// cluster is overloaded. This is to bound the memory usage of the
// simulator job tracker, in situations where we have jobs with small
// number of map tasks and large number of reduce tasks.
if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackerCount()) {
if (LOG.isDebugEnabled()) {
LOG.debug(
System.currentTimeMillis() + " Overloaded is " +
Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
runningWaitingJobs.size() + " >= " +
clusterStatus.getTaskTrackerCount() + " )\n");
}
loadStatus.isOverloaded = true;
loadStatus.numSlotsBackfill = 0;
return loadStatus;
}
float incompleteMapTasks = 0; // include pending & running map tasks.
for (Job job : runningWaitingJobs) {
try{
incompleteMapTasks += (1 - Math.min(
job.getStatus().getMapProgress(), 1.0)) *
((JobConf) job.getConfiguration()).getNumMapTasks();
}catch(IOException io) {
//There might be issues with this current job
//try others
LOG.error(" Error while calculating load " , io);
continue;
}catch(InterruptedException ie) {
//There might be issues with this current job
//try others
LOG.error(" Error while calculating load " , ie);
continue;
}
}
float overloadedThreshold =
OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMapSlotCapacity();
boolean overloaded = incompleteMapTasks > overloadedThreshold;
String relOp = (overloaded) ? ">" : "<=";
if (LOG.isDebugEnabled()) {
LOG.info(
System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
overloaded) + " incompleteMapTasks " + relOp + " " +
OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
incompleteMapTasks + " " + relOp + " " +
OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" +
clusterStatus.getMapSlotCapacity() + ")");
}
if (overloaded) {
loadStatus.isOverloaded = true;
loadStatus.numSlotsBackfill = 0;
} else {
loadStatus.isOverloaded = false;
loadStatus.numSlotsBackfill =
(int) (overloadedThreshold - incompleteMapTasks);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Current load Status is " + loadStatus);
}
return loadStatus;
}
static class LoadStatus {
volatile boolean isOverloaded = false;
volatile int numSlotsBackfill = -1;
public String toString() {
return " is Overloaded " + isOverloaded + " no of slots available " +
numSlotsBackfill;
}
}
/**
* Start the reader thread, wait for latch if necessary.
*/
@Override
public void start() {
LOG.info(" Starting Stress submission ");
this.rThread.start();
}
}