blob: f79f359a7bf720d60ca23d986d8a763ef1d577d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdate;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSchedule;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
public class TestDAGSchedulerNaturalOrderControlled {
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testSimpleFlow() {
EventHandler eventHandler = mock(EventHandler.class);
DAG dag = createMockDag();
DAGSchedulerNaturalOrderControlled dagScheduler =
new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
int numVertices = 5;
Vertex[] vertices = new Vertex[numVertices];
for (int i = 0; i < numVertices; i++) {
vertices[i] = dag.getVertex("vertex" + i);
}
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any());
reset(eventHandler);
// Schedule 3 tasks belonging to v2
for (int i = 0; i < 3; i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any());
reset(eventHandler);
// Schedule 3 tasks belonging to v3
for (int i = 0; i < 3; i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any());
reset(eventHandler);
// Schedule remaining tasks belonging to v2
for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[2].getTotalTasks() - 3)).handle(any());
reset(eventHandler);
// Schedule remaining tasks belonging to v3
for (int i = 3; i < vertices[3].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks() - 3)).handle(any());
reset(eventHandler);
// Schedule all tasks belonging to v4
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[4].getTotalTasks())).handle(any());
reset(eventHandler);
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testSourceRequestDelayed() {
// ShuffleVertexHandler - slowstart simulation
EventHandler eventHandler = mock(EventHandler.class);
DAG dag = createMockDag();
DAGSchedulerNaturalOrderControlled dagScheduler =
new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
int numVertices = 5;
Vertex[] vertices = new Vertex[numVertices];
for (int i = 0; i < numVertices; i++) {
vertices[i] = dag.getVertex("vertex" + i);
}
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any());
reset(eventHandler);
// v2 behaving as if configured with slow-start.
// Schedule all tasks belonging to v3.
for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any());
reset(eventHandler);
// Scheduling all tasks belonging to v4. None should get scheduled.
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, never()).handle(any());
reset(eventHandler);
// v2 now starts scheduling ...
// Schedule 3 tasks for v2 initially.
for (int i = 0; i < 3; i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(3)).handle(any());
reset(eventHandler);
// Schedule remaining tasks belonging to v2
for (int i = 3; i < vertices[2].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
ArgumentCaptor<Event> args = ArgumentCaptor.forClass(Event.class);
// All of v2 and v3 should be sent out.
verify(eventHandler, times(vertices[2].getTotalTasks() - 3 + vertices[4].getTotalTasks()))
.handle(
args.capture());
int count = 0;
// Verify the order in which the events were sent out.
for (Event raw : args.getAllValues()) {
TaskAttemptEventSchedule event = (TaskAttemptEventSchedule) raw;
if (count < vertices[2].getTotalTasks() - 3) {
assertEquals(2, event.getVertexID().getId());
} else {
assertEquals(4, event.getVertexID().getId());
}
count++;
}
reset(eventHandler);
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testParallelismUpdated() {
EventHandler eventHandler = mock(EventHandler.class);
DAG dag = createMockDag();
DAGSchedulerNaturalOrderControlled dagScheduler =
new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
int numVertices = 5;
Vertex[] vertices = new Vertex[numVertices];
for (int i = 0; i < numVertices; i++) {
vertices[i] = dag.getVertex("vertex" + i);
}
// Schedule all tasks belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks())).handle(any());
reset(eventHandler);
assertEquals(10, vertices[2].getTotalTasks());
// v2 will change parallelism
// Schedule all tasks belonging to v3
for (int i = 0; i < vertices[3].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[3].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[3].getTotalTasks())).handle(any());
reset(eventHandler);
// Schedule all tasks belonging to v4
for (int i = 0; i < vertices[4].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[4].getVertexId(), i, 0));
}
verify(eventHandler, never()).handle(any());
reset(eventHandler);
// Reset the parallelism for v2.
updateParallelismOnMockVertex(vertices[2], 3);
assertEquals(3, vertices[2].getTotalTasks());
// Schedule all tasks belonging to v2
for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[2].getTotalTasks() + vertices[4].getTotalTasks()))
.handle(any());
reset(eventHandler);
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testMultipleRequestsForSameTask() {
EventHandler eventHandler = mock(EventHandler.class);
DAG dag = createMockDag();
DAGSchedulerNaturalOrderControlled dagScheduler =
new DAGSchedulerNaturalOrderControlled(dag, eventHandler);
int numVertices = 5;
Vertex[] vertices = new Vertex[numVertices];
for (int i = 0; i < numVertices; i++) {
vertices[i] = dag.getVertex("vertex" + i);
}
// Schedule all but 1 task belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 0));
}
verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any());
reset(eventHandler);
// Schedule all tasks belonging to v2
for (int i = 0; i < vertices[2].getTotalTasks(); i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[2].getVertexId(), i, 0));
}
// Nothing should be scheduled
verify(eventHandler, never()).handle(any());
reset(eventHandler);
// Schedule an extra attempt for all but 1 task belonging to v0
for (int i = 0; i < vertices[0].getTotalTasks() - 1; i++) {
dagScheduler.scheduleTaskEx(createScheduleRequest(vertices[0].getVertexId(), i, 1));
}
// Only v0 requests should have gone out
verify(eventHandler, times(vertices[0].getTotalTasks() - 1)).handle(any());
reset(eventHandler);
// Schedule last task of v0, with attempt 1
dagScheduler.scheduleTaskEx(
createScheduleRequest(vertices[0].getVertexId(), vertices[0].getTotalTasks() - 1, 1));
// One v0 request and all of v2 should have gone out
verify(eventHandler, times(1 + vertices[2].getTotalTasks())).handle(any());
}
// Test parallelism updated form -1
// Reduce parallelism
// Different attempts scheduled for a single task.
private DAG createMockDag() {
DAG dag = mock(DAG.class);
/*
v0 v1
\ /
\ /
v2 v3
\ /
\ /
\ /
v4
v0 - Root
v1 - Root with 0 tasks.
v2 - can simulate AutoReduce. Parallelism goes down. Slow schedule.
v3 - can simulate ImmediateStart
v4 - Simulate one shuffle input, one broadcast input.
*/
int numVertices = 5;
Vertex[] vertices = new Vertex[numVertices];
vertices[0] = createMockVertex("vertex0", 0, 10, 0);
vertices[1] = createMockVertex("vertex1", 1, 0, 0);
vertices[2] = createMockVertex("vertex2", 2, 10, 1);
vertices[3] = createMockVertex("vertex3", 3, 10, 1);
vertices[4] = createMockVertex("vertex4", 4, 10, 2);
for (int i = 0; i < numVertices; i++) {
String name = vertices[i].getName();
TezVertexID vertexId = vertices[i].getVertexId();
doReturn(vertices[i]).when(dag).getVertex(name);
doReturn(vertices[i]).when(dag).getVertex(vertexId);
}
updateMockVertexWithConnections(vertices[0], createConnectionMap((Vertex[]) null),
createConnectionMap(vertices[2]));
updateMockVertexWithConnections(vertices[1], createConnectionMap((Vertex[]) null),
createConnectionMap(vertices[3]));
updateMockVertexWithConnections(vertices[2], createConnectionMap(vertices[0]),
createConnectionMap(vertices[4]));
updateMockVertexWithConnections(vertices[3], createConnectionMap(vertices[1]),
createConnectionMap(vertices[4]));
updateMockVertexWithConnections(vertices[4], createConnectionMap(vertices[2], vertices[3]),
createConnectionMap((Vertex[]) null));
return dag;
}
private void updateParallelismOnMockVertex(Vertex vertex, int newParallelism) {
doReturn(newParallelism).when(vertex).getTotalTasks();
}
private Vertex createMockVertex(String name, int vertexIdInt, int totalTasks,
int distanceFromRoot) {
ApplicationId appId = ApplicationId.newInstance(1000, 1);
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vertexId = TezVertexID.getInstance(dagId, vertexIdInt);
Vertex vertex = mock(Vertex.class);
doReturn(name).when(vertex).getName();
doReturn(totalTasks).when(vertex).getTotalTasks();
doReturn(vertexId).when(vertex).getVertexId();
doReturn(distanceFromRoot).when(vertex).getDistanceFromRoot();
doReturn(vertexId + " [" + name + "]").when(vertex).getLogIdentifier();
return vertex;
}
private Map<Vertex, Edge> createConnectionMap(Vertex... vertices) {
Map<Vertex, Edge> map = new HashMap<Vertex, Edge>();
if (vertices != null) {
for (Vertex vertex : vertices) {
map.put(vertex, mock(Edge.class));
}
}
return map;
}
private void updateMockVertexWithConnections(Vertex mockVertex, Map<Vertex, Edge> sources,
Map<Vertex, Edge> destinations) {
doReturn(sources).when(mockVertex).getInputVertices();
doReturn(destinations).when(mockVertex).getOutputVertices();
}
private TaskAttempt createTaskAttempt(TezVertexID vertexId, int taskIdInt, int attemptIdInt) {
TaskAttempt taskAttempt = mock(TaskAttempt.class);
TezTaskID taskId = TezTaskID.getInstance(vertexId, taskIdInt);
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, attemptIdInt);
doReturn(taskAttemptId).when(taskAttempt).getTaskAttemptID();
doReturn(vertexId).when(taskAttempt).getVertexID();
return taskAttempt;
}
private DAGEventSchedulerUpdate createScheduleRequest(TezVertexID vertexId, int taskIdInt,
int attemptIdInt) {
TaskAttempt mockAttempt = createTaskAttempt(vertexId, taskIdInt, attemptIdInt);
return new DAGEventSchedulerUpdate(DAGEventSchedulerUpdate.UpdateType.TA_SCHEDULE, mockAttempt);
}
}