blob: 30361b32f23a2854b43edff6c5983011c72133de [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.TreeMap;
import junit.framework.Assert;
import org.apache.hadoop.mapred.TaskStatus.State;
import org.apache.hadoop.mapred.TaskStatus.Phase;
import org.apache.hadoop.mapreduce.TaskAttemptID;
/**
* An EventQueue that checks events against a list of expected events upon
* enqueueing. Also contains routines for creating expected HeartbeatEvents and
* all expected events related to running map or reduce tasks on a task tracker.
*/
class CheckedEventQueue extends SimulatorEventQueue {
/**
* expected list of events to be returned from all EventListener.accept()
* called at time t, t is the key if no events are generated an empty list
* needs to be put there
*
* IMPORTANT: this is NOT the events to be delivered at time t from the event
* queue, it is the list events to be inserted into the event queue at time t
*/
private SortedMap<Long, List<SimulatorEvent>> expectedEvents =
new TreeMap<Long, List<SimulatorEvent>>();
// current simulation time
private long now;
private long simulationStartTime;
/**
* We need the simulation start time so that we know the time of the first
* add().
*
* @param simulationStartTime
* Simulation start time.
*/
public CheckedEventQueue(long simulationStartTime) {
now = simulationStartTime;
this.simulationStartTime = simulationStartTime;
}
void check(SimulatorEvent event) {
for (Iterator<Map.Entry<Long, List<SimulatorEvent>>> it = expectedEvents.entrySet()
.iterator(); it.hasNext();) {
Map.Entry<Long, List<SimulatorEvent>> entry = it.next();
long insertTime = entry.getKey();
Assert.assertTrue(insertTime <= now);
if (insertTime < now) {
List<SimulatorEvent> events = entry.getValue();
if (!events.isEmpty()) {
Assert.fail("There are " + events.size() + " events at time "
+ insertTime + " before " + now + ". First event: "+events.get(0));
}
it.remove();
} else { // insertTime == now
break;
}
}
List<SimulatorEvent> expected = expectedEvents.get(now);
boolean found = false;
for (SimulatorEvent ee : expected) {
if (isSameEvent(ee, event)) {
expected.remove(ee);
found = true;
break;
}
}
Assert.assertTrue("Unexpected event to enqueue, now=" + now + ", event=" +
event + ", expecting=" + expected, found);
}
/**
* We intercept the main routine of the real EventQueue and check the new
* event returned by accept() against the expectedEvents table
*/
@Override
public boolean add(SimulatorEvent event) {
check(event);
return super.add(event);
}
@Override
public boolean addAll(Collection<? extends SimulatorEvent> events) {
for (SimulatorEvent event : events) {
check(event);
}
return super.addAll(events);
}
// We need to override get() to track the current simulation time
@Override
public SimulatorEvent get() {
SimulatorEvent ret = super.get();
if (ret != null) {
now = ret.getTimeStamp();
}
return ret;
}
/**
* Auxiliary function for populating the expectedEvents table If event is null
* then just marks that an accept happens at time 'when', and the list of new
* events is empty
*/
public void addExpected(long when, SimulatorEvent event) {
Assert.assertNotNull(event);
List<SimulatorEvent> expected = expectedEvents.get(when);
if (expected == null) {
expected = new ArrayList<SimulatorEvent>();
expectedEvents.put(when, expected);
}
expected.add(event);
}
public long getLastCheckTime() {
return expectedEvents.lastKey();
}
// there should be an empty expected event list left for the last
// time to check
public void checkMissingExpected() {
Assert.assertTrue(expectedEvents.size() <= 1);
for (List<SimulatorEvent> events : expectedEvents.values()) {
Assert.assertTrue(events.isEmpty());
}
}
// fills in the expected events corresponding to the execution of a map task
public void expectMapTask(SimulatorTaskTracker taskTracker,
TaskAttemptID taskId,
long mapStart, long mapRuntime) {
long mapDone = mapStart + mapRuntime;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
MapTaskStatus status = new MapTaskStatus(taskIdOldApi, 1.0f, 1,
State.SUCCEEDED, null, null, null, Phase.MAP, null);
status.setStartTime(mapStart);
status.setFinishTime(mapDone);
TaskAttemptCompletionEvent completionEvent =
new TaskAttemptCompletionEvent(taskTracker, status);
addExpected(mapStart, completionEvent);
}
// fills in the expected events corresponding to the execution of a reduce
// task
public void expectReduceTask(SimulatorTaskTracker taskTracker,
TaskAttemptID taskId, long mapDone,
long reduceRuntime) {
long reduceDone = mapDone + reduceRuntime;
org.apache.hadoop.mapred.TaskAttemptID taskIdOldApi =
org.apache.hadoop.mapred.TaskAttemptID.downgrade(taskId);
ReduceTaskStatus status = new ReduceTaskStatus(taskIdOldApi, 1.0f, 1,
State.SUCCEEDED, null, null, null, Phase.REDUCE, null);
status.setStartTime(mapDone);
status.setFinishTime(reduceDone);
TaskAttemptCompletionEvent completionEvent =
new TaskAttemptCompletionEvent(taskTracker, status);
addExpected(mapDone, completionEvent);
}
/**
* Fills in the events corresponding to the self heartbeats numAccepts is the
* number of times accept() will be called, it must be >= 1
*/
public void expectHeartbeats(SimulatorTaskTracker taskTracker,
int numAccepts, int heartbeatInterval) {
// initial heartbeat
addExpected(simulationStartTime,
new HeartbeatEvent(taskTracker, simulationStartTime));
long simulationTime = simulationStartTime;
for(int i=0; i<numAccepts; i++) {
long heartbeatTime = simulationTime + heartbeatInterval;
HeartbeatEvent he = new HeartbeatEvent(taskTracker, heartbeatTime);
addExpected(simulationTime, he);
simulationTime = heartbeatTime;
}
}
/**
* Returns true iff two events are the same. We did not use equals() because
* we may want to test for partial equality only, and we don't want to bother
* writing new hashCode()s either.
*/
protected boolean isSameEvent(SimulatorEvent event, SimulatorEvent otherEvent) {
// check for null reference
Assert.assertNotNull(event);
Assert.assertNotNull(otherEvent);
// type check
if (!event.getClass().equals(otherEvent.getClass())) {
return false;
}
// compare significant fields
if (event.listener != otherEvent.listener ||
event.timestamp != otherEvent.timestamp) {
return false;
}
if (event instanceof TaskAttemptCompletionEvent) {
TaskStatus s = ((TaskAttemptCompletionEvent)event).getStatus();
TaskStatus os = ((TaskAttemptCompletionEvent)otherEvent).getStatus();
if (!s.getTaskID().equals(os.getTaskID())) {
return false;
}
}
return true;
}
}