blob: a359fd3bec3e4fc7d911c3a8a40f2772d37c8f5b [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.v2.api.records.Counters;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.api.records.TaskReport;
import org.apache.hadoop.mapreduce.v2.api.records.TaskState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptStatusUpdateEvent.TaskAttemptStatus;
import org.apache.hadoop.mapreduce.v2.app.speculate.DefaultSpeculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.ExponentiallySmoothedTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.LegacyTaskRuntimeEstimator;
import org.apache.hadoop.mapreduce.v2.app.speculate.Speculator;
import org.apache.hadoop.mapreduce.v2.app.speculate.SpeculatorEvent;
import org.apache.hadoop.mapreduce.v2.app.speculate.TaskRuntimeEstimator;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.SystemClock;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.service.CompositeService;
import org.junit.Assert;
import org.junit.Test;
public class TestRuntimeEstimators {
private static int INITIAL_NUMBER_FREE_SLOTS = 600;
private static int MAP_SLOT_REQUIREMENT = 3;
// this has to be at least as much as map slot requirement
private static int REDUCE_SLOT_REQUIREMENT = 4;
private static int MAP_TASKS = 200;
private static int REDUCE_TASKS = 150;
MockClock clock;
Job myJob;
AppContext myAppContext;
private static final Log LOG = LogFactory.getLog(TestRuntimeEstimators.class);
private final AtomicInteger slotsInUse = new AtomicInteger(0);
AsyncDispatcher dispatcher;
DefaultSpeculator speculator;
TaskRuntimeEstimator estimator;
// This is a huge kluge. The real implementations have a decent approach
private final AtomicInteger completedMaps = new AtomicInteger(0);
private final AtomicInteger completedReduces = new AtomicInteger(0);
private final AtomicInteger successfulSpeculations
= new AtomicInteger(0);
private final AtomicLong taskTimeSavedBySpeculation
= new AtomicLong(0L);
private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private void coreTestEstimator
(TaskRuntimeEstimator testedEstimator, int expectedSpeculations) {
estimator = testedEstimator;
clock = new MockClock();
dispatcher = new AsyncDispatcher();
myJob = null;
slotsInUse.set(0);
completedMaps.set(0);
completedReduces.set(0);
successfulSpeculations.set(0);
taskTimeSavedBySpeculation.set(0);
clock.advanceTime(1000);
Configuration conf = new Configuration();
myAppContext = new MyAppContext(MAP_TASKS, REDUCE_TASKS);
myJob = myAppContext.getAllJobs().values().iterator().next();
estimator.contextualize(conf, myAppContext);
speculator = new DefaultSpeculator(conf, myAppContext, estimator, clock);
dispatcher.register(Speculator.EventType.class, speculator);
dispatcher.register(TaskEventType.class, new SpeculationRequestEventHandler());
dispatcher.init(conf);
dispatcher.start();
speculator.init(conf);
speculator.start();
// Now that the plumbing is hooked up, we do the following:
// do until all tasks are finished, ...
// 1: If we have spare capacity, assign as many map tasks as we can, then
// assign as many reduce tasks as we can. Note that an odd reduce
// task might be started while there are still map tasks, because
// map tasks take 3 slots and reduce tasks 2 slots.
// 2: Send a speculation event for every task attempt that's running
// note that new attempts might get started by the speculator
// discover undone tasks
int undoneMaps = MAP_TASKS;
int undoneReduces = REDUCE_TASKS;
// build a task sequence where all the maps precede any of the reduces
List<Task> allTasksSequence = new LinkedList<Task>();
allTasksSequence.addAll(myJob.getTasks(TaskType.MAP).values());
allTasksSequence.addAll(myJob.getTasks(TaskType.REDUCE).values());
while (undoneMaps + undoneReduces > 0) {
undoneMaps = 0; undoneReduces = 0;
// start all attempts which are new but for which there is enough slots
for (Task task : allTasksSequence) {
if (!task.isFinished()) {
if (task.getType() == TaskType.MAP) {
++undoneMaps;
} else {
++undoneReduces;
}
}
for (TaskAttempt attempt : task.getAttempts().values()) {
if (attempt.getState() == TaskAttemptState.NEW
&& INITIAL_NUMBER_FREE_SLOTS - slotsInUse.get()
>= taskTypeSlots(task.getType())) {
MyTaskAttemptImpl attemptImpl = (MyTaskAttemptImpl)attempt;
SpeculatorEvent event
= new SpeculatorEvent(attempt.getID(), false, clock.getTime());
speculator.handle(event);
attemptImpl.startUp();
} else {
// If a task attempt is in progress we should send the news to
// the Speculator.
TaskAttemptStatus status = new TaskAttemptStatus();
status.id = attempt.getID();
status.progress = attempt.getProgress();
status.stateString = attempt.getState().name();
status.taskState = attempt.getState();
SpeculatorEvent event = new SpeculatorEvent(status, clock.getTime());
speculator.handle(event);
}
}
}
long startTime = System.currentTimeMillis();
// drain the speculator event queue
while (!speculator.eventQueueEmpty()) {
Thread.yield();
if (System.currentTimeMillis() > startTime + 130000) {
return;
}
}
clock.advanceTime(1000L);
if (clock.getTime() % 10000L == 0L) {
speculator.scanForSpeculations();
}
}
Assert.assertEquals("We got the wrong number of successful speculations.",
expectedSpeculations, successfulSpeculations.get());
}
@Test
public void testLegacyEstimator() throws Exception {
TaskRuntimeEstimator specificEstimator = new LegacyTaskRuntimeEstimator();
coreTestEstimator(specificEstimator, 3);
}
@Test
public void testExponentialEstimator() throws Exception {
TaskRuntimeEstimator specificEstimator
= new ExponentiallySmoothedTaskRuntimeEstimator();
coreTestEstimator(specificEstimator, 3);
}
int taskTypeSlots(TaskType type) {
return type == TaskType.MAP ? MAP_SLOT_REQUIREMENT : REDUCE_SLOT_REQUIREMENT;
}
class SpeculationRequestEventHandler implements EventHandler<TaskEvent> {
@Override
public void handle(TaskEvent event) {
TaskId taskID = event.getTaskID();
Task task = myJob.getTask(taskID);
Assert.assertEquals
("Wrong type event", TaskEventType.T_ADD_SPEC_ATTEMPT, event.getType());
System.out.println("SpeculationRequestEventHandler.handle adds a speculation task for " + taskID);
addAttempt(task);
}
}
void addAttempt(Task task) {
MyTaskImpl myTask = (MyTaskImpl) task;
myTask.addAttempt();
}
class MyTaskImpl implements Task {
private final TaskId taskID;
private final Map<TaskAttemptId, TaskAttempt> attempts
= new ConcurrentHashMap<TaskAttemptId, TaskAttempt>(4);
MyTaskImpl(JobId jobID, int index, TaskType type) {
taskID = recordFactory.newRecordInstance(TaskId.class);
taskID.setId(index);
taskID.setTaskType(type);
taskID.setJobId(jobID);
}
void addAttempt() {
TaskAttempt taskAttempt
= new MyTaskAttemptImpl(taskID, attempts.size(), clock);
TaskAttemptId taskAttemptID = taskAttempt.getID();
attempts.put(taskAttemptID, taskAttempt);
System.out.println("TLTRE.MyTaskImpl.addAttempt " + getID());
SpeculatorEvent event = new SpeculatorEvent(taskID, +1);
dispatcher.getEventHandler().handle(event);
}
@Override
public TaskId getID() {
return taskID;
}
@Override
public TaskReport getReport() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public float getProgress() {
float result = 0.0F;
for (TaskAttempt attempt : attempts.values()) {
result = Math.max(result, attempt.getProgress());
}
return result;
}
@Override
public TaskType getType() {
return taskID.getTaskType();
}
@Override
public Map<TaskAttemptId, TaskAttempt> getAttempts() {
Map<TaskAttemptId, TaskAttempt> result
= new HashMap<TaskAttemptId, TaskAttempt>(attempts.size());
result.putAll(attempts);
return result;
}
@Override
public TaskAttempt getAttempt(TaskAttemptId attemptID) {
return attempts.get(attemptID);
}
@Override
public boolean isFinished() {
for (TaskAttempt attempt : attempts.values()) {
if (attempt.getState() == TaskAttemptState.SUCCEEDED) {
return true;
}
}
return false;
}
@Override
public boolean canCommit(TaskAttemptId taskAttemptID) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public TaskState getState() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
class MyJobImpl implements Job {
private final JobId jobID;
private final Map<TaskId, Task> allTasks = new HashMap<TaskId, Task>();
private final Map<TaskId, Task> mapTasks = new HashMap<TaskId, Task>();
private final Map<TaskId, Task> reduceTasks = new HashMap<TaskId, Task>();
MyJobImpl(JobId jobID, int numMaps, int numReduces) {
this.jobID = jobID;
for (int i = 0; i < numMaps; ++i) {
Task newTask = new MyTaskImpl(jobID, i, TaskType.MAP);
mapTasks.put(newTask.getID(), newTask);
allTasks.put(newTask.getID(), newTask);
}
for (int i = 0; i < numReduces; ++i) {
Task newTask = new MyTaskImpl(jobID, i, TaskType.REDUCE);
reduceTasks.put(newTask.getID(), newTask);
allTasks.put(newTask.getID(), newTask);
}
// give every task an attempt
for (Task task : allTasks.values()) {
MyTaskImpl myTaskImpl = (MyTaskImpl) task;
myTaskImpl.addAttempt();
}
}
@Override
public JobId getID() {
return jobID;
}
@Override
public JobState getState() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public JobReport getReport() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Map<TaskId, Task> getTasks() {
return allTasks;
}
@Override
public Map<TaskId, Task> getTasks(TaskType taskType) {
return taskType == TaskType.MAP ? mapTasks : reduceTasks;
}
@Override
public Task getTask(TaskId taskID) {
return allTasks.get(taskID);
}
@Override
public List<String> getDiagnostics() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getCompletedMaps() {
return completedMaps.get();
}
@Override
public int getCompletedReduces() {
return completedReduces.get();
}
@Override
public TaskAttemptCompletionEvent[]
getTaskAttemptCompletionEvents(int fromEventId, int maxEvents) {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getName() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public int getTotalMaps() {
return mapTasks.size();
}
@Override
public int getTotalReduces() {
return reduceTasks.size();
}
@Override
public boolean isUber() {
return false;
}
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
return true;
}
}
/*
* We follow the pattern of the real XxxImpl . We create a job and initialize
* it with a full suite of tasks which in turn have one attempt each in the
* NEW state. Attempts transition only from NEW to RUNNING to SUCCEEDED .
*/
class MyTaskAttemptImpl implements TaskAttempt {
private final TaskAttemptId myAttemptID;
long startMockTime = Long.MIN_VALUE;
long shuffleCompletedTime = Long.MAX_VALUE;
TaskAttemptState overridingState = TaskAttemptState.NEW;
MyTaskAttemptImpl(TaskId taskID, int index, Clock clock) {
myAttemptID = recordFactory.newRecordInstance(TaskAttemptId.class);
myAttemptID.setId(index);
myAttemptID.setTaskId(taskID);
}
void startUp() {
startMockTime = clock.getTime();
overridingState = null;
slotsInUse.addAndGet(taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
System.out.println("TLTRE.MyTaskAttemptImpl.startUp starting " + getID());
SpeculatorEvent event = new SpeculatorEvent(getID().getTaskId(), -1);
dispatcher.getEventHandler().handle(event);
}
@Override
public TaskAttemptId getID() {
return myAttemptID;
}
@Override
public TaskAttemptReport getReport() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public List<String> getDiagnostics() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Counters getCounters() {
throw new UnsupportedOperationException("Not supported yet.");
}
private float getCodeRuntime() {
int taskIndex = myAttemptID.getTaskId().getId();
int attemptIndex = myAttemptID.getId();
float result = 200.0F;
switch (taskIndex % 4) {
case 0:
if (taskIndex % 40 == 0 && attemptIndex == 0) {
result = 600.0F;
break;
}
break;
case 2:
break;
case 1:
result = 150.0F;
break;
case 3:
result = 250.0F;
break;
}
return result;
}
private float getMapProgress() {
float runtime = getCodeRuntime();
return Math.min
((float) (clock.getTime() - startMockTime) / (runtime * 1000.0F), 1.0F);
}
private float getReduceProgress() {
Job job = myAppContext.getJob(myAttemptID.getTaskId().getJobId());
float runtime = getCodeRuntime();
Collection<Task> allMapTasks = job.getTasks(TaskType.MAP).values();
int numberMaps = allMapTasks.size();
int numberDoneMaps = 0;
for (Task mapTask : allMapTasks) {
if (mapTask.isFinished()) {
++numberDoneMaps;
}
}
if (numberMaps == numberDoneMaps) {
shuffleCompletedTime = Math.min(shuffleCompletedTime, clock.getTime());
return Math.min
((float) (clock.getTime() - shuffleCompletedTime)
/ (runtime * 2000.0F) + 0.5F,
1.0F);
} else {
return ((float) numberDoneMaps) / numberMaps * 0.5F;
}
}
// we compute progress from time and an algorithm now
@Override
public float getProgress() {
if (overridingState == TaskAttemptState.NEW) {
return 0.0F;
}
return myAttemptID.getTaskId().getTaskType() == TaskType.MAP ? getMapProgress() : getReduceProgress();
}
@Override
public TaskAttemptState getState() {
if (overridingState != null) {
return overridingState;
}
TaskAttemptState result
= getProgress() < 1.0F ? TaskAttemptState.RUNNING : TaskAttemptState.SUCCEEDED;
if (result == TaskAttemptState.SUCCEEDED) {
overridingState = TaskAttemptState.SUCCEEDED;
System.out.println("MyTaskAttemptImpl.getState() -- attempt " + myAttemptID + " finished.");
slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
(myAttemptID.getTaskId().getTaskType() == TaskType.MAP
? completedMaps : completedReduces).getAndIncrement();
// check for a spectacularly successful speculation
TaskId taskID = myAttemptID.getTaskId();
Task task = myJob.getTask(taskID);
for (TaskAttempt otherAttempt : task.getAttempts().values()) {
if (otherAttempt != this
&& otherAttempt.getState() == TaskAttemptState.RUNNING) {
// we had two instances running. Try to determine how much
// we might have saved by speculation
if (getID().getId() > otherAttempt.getID().getId()) {
// the speculation won
successfulSpeculations.getAndIncrement();
float hisProgress = otherAttempt.getProgress();
long hisStartTime = ((MyTaskAttemptImpl)otherAttempt).startMockTime;
System.out.println("TLTRE:A speculation finished at time "
+ clock.getTime()
+ ". The stalled attempt is at " + (hisProgress * 100.0)
+ "% progress, and it started at "
+ hisStartTime + ", which is "
+ (clock.getTime() - hisStartTime) + " ago.");
long originalTaskEndEstimate
= (hisStartTime
+ estimator.estimatedRuntime(otherAttempt.getID()));
System.out.println(
"TLTRE: We would have expected the original attempt to take "
+ estimator.estimatedRuntime(otherAttempt.getID())
+ ", finishing at " + originalTaskEndEstimate);
long estimatedSavings = originalTaskEndEstimate - clock.getTime();
taskTimeSavedBySpeculation.addAndGet(estimatedSavings);
System.out.println("TLTRE: The task is " + task.getID());
slotsInUse.addAndGet(- taskTypeSlots(myAttemptID.getTaskId().getTaskType()));
((MyTaskAttemptImpl)otherAttempt).overridingState
= TaskAttemptState.KILLED;
} else {
System.out.println(
"TLTRE: The normal attempt beat the speculation in "
+ task.getID());
}
}
}
}
return result;
}
@Override
public boolean isFinished() {
return getProgress() == 1.0F;
}
@Override
public ContainerId getAssignedContainerID() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getNodeHttpAddress() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public long getLaunchTime() {
return startMockTime;
}
@Override
public long getFinishTime() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public String getAssignedContainerMgrAddress() {
throw new UnsupportedOperationException("Not supported yet.");
}
}
static class MockClock implements Clock {
private long currentTime = 0;
public long getTime() {
return currentTime;
}
void setMeasuredTime(long newTime) {
currentTime = newTime;
}
void advanceTime(long increment) {
currentTime += increment;
}
}
class MyAppMaster extends CompositeService {
final Clock clock;
public MyAppMaster(Clock clock) {
super(MyAppMaster.class.getName());
if (clock == null) {
clock = new SystemClock();
}
this.clock = clock;
LOG.info("Created MyAppMaster");
}
}
class MyAppContext implements AppContext {
// I'll be making Avro objects by hand. Please don't do that very often.
private final ApplicationAttemptId myAppAttemptID;
private final ApplicationId myApplicationID;
private final JobId myJobID;
private final Map<JobId, Job> allJobs;
MyAppContext(int numberMaps, int numberReduces) {
myApplicationID = recordFactory.newRecordInstance(ApplicationId.class);
myApplicationID.setClusterTimestamp(clock.getTime());
myApplicationID.setId(1);
myAppAttemptID = recordFactory
.newRecordInstance(ApplicationAttemptId.class);
myAppAttemptID.setApplicationId(myApplicationID);
myAppAttemptID.setAttemptId(0);
myJobID = recordFactory.newRecordInstance(JobId.class);
myJobID.setAppId(myApplicationID);
Job myJob
= new MyJobImpl(myJobID, numberMaps, numberReduces);
allJobs = Collections.singletonMap(myJobID, myJob);
}
@Override
public ApplicationAttemptId getApplicationAttemptId() {
return myAppAttemptID;
}
@Override
public ApplicationId getApplicationID() {
return myApplicationID;
}
@Override
public Job getJob(JobId jobID) {
return allJobs.get(jobID);
}
@Override
public Map<JobId, Job> getAllJobs() {
return allJobs;
}
@Override
public EventHandler getEventHandler() {
return dispatcher.getEventHandler();
}
@Override
public CharSequence getUser() {
throw new UnsupportedOperationException("Not supported yet.");
}
@Override
public Clock getClock() {
return clock;
}
@Override
public String getApplicationName() {
return null;
}
@Override
public long getStartTime() {
return 0;
}
}
}