blob: 144d28d4a5399969eb9037ddf0b106f31329a360 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import static org.apache.tez.dag.app.dag.impl.RootInputVertexManager.TEZ_ROOT_INPUT_VERTEX_MANAGER_ENABLE_SLOW_START;
import static org.apache.tez.dag.app.dag.impl.RootInputVertexManager.TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION;
import static org.apache.tez.dag.app.dag.impl.RootInputVertexManager.TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyList;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.event.VertexState;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.TaskAttemptIdentifier;
import org.apache.tez.runtime.api.TaskIdentifier;
import org.apache.tez.runtime.api.VertexIdentifier;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.events.InputConfigureVertexTasksEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestRootInputVertexManager {
List<TaskAttemptIdentifier> emptyCompletions = null;
@Test(timeout = 5000)
public void testEventsFromMultipleInputs() throws IOException {
VertexManagerPluginContext context = mock(VertexManagerPluginContext.class);
TezConfiguration conf = new TezConfiguration();
UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf);
doReturn("vertex1").when(context).getVertexName();
doReturn(1).when(context).getVertexNumTasks(eq("vertex1"));
doReturn(vertexPayload).when(context).getUserPayload();
RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context);
rootInputVertexManager.initialize();
InputDescriptor id1 = mock(InputDescriptor.class);
List<Event> events1 = new LinkedList<Event>();
InputDataInformationEvent diEvent11 = InputDataInformationEvent.createWithSerializedPayload(0,
null);
events1.add(diEvent11);
rootInputVertexManager.onRootVertexInitialized("input1", id1, events1);
// All good so far, single input only.
InputDescriptor id2 = mock(InputDescriptor.class);
List<Event> events2 = new LinkedList<Event>();
InputDataInformationEvent diEvent21 = InputDataInformationEvent.createWithSerializedPayload(0,
null);
events2.add(diEvent21);
try {
// Should fail due to second input
rootInputVertexManager.onRootVertexInitialized("input2", id2, events2);
fail("Expecting failure in case of multiple inputs attempting to send events");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().startsWith(
"RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
}
}
@Test(timeout = 5000)
public void testConfigureFromMultipleInputs() throws IOException {
VertexManagerPluginContext context = mock(VertexManagerPluginContext.class);
TezConfiguration conf = new TezConfiguration();
UserPayload vertexPayload = TezUtils.createUserPayloadFromConf(conf);
doReturn("vertex1").when(context).getVertexName();
doReturn(-1).when(context).getVertexNumTasks(eq("vertex1"));
doReturn(vertexPayload).when(context).getUserPayload();
RootInputVertexManager rootInputVertexManager = new RootInputVertexManager(context);
rootInputVertexManager.initialize();
InputDescriptor id1 = mock(InputDescriptor.class);
List<Event> events1 = new LinkedList<Event>();
InputConfigureVertexTasksEvent diEvent11 = InputConfigureVertexTasksEvent.create(1, null,
null);
events1.add(diEvent11);
rootInputVertexManager.onRootVertexInitialized("input1", id1, events1);
// All good so far, single input only.
InputDescriptor id2 = mock(InputDescriptor.class);
List<Event> events2 = new LinkedList<Event>();
InputConfigureVertexTasksEvent diEvent21 = InputConfigureVertexTasksEvent.create(1, null,
null);
events2.add(diEvent21);
try {
// Should fail due to second input
rootInputVertexManager.onRootVertexInitialized("input2", id2, events2);
fail("Expecting failure in case of multiple inputs attempting to send events");
} catch (IllegalStateException e) {
assertTrue(e.getMessage().startsWith(
"RootInputVertexManager cannot configure multiple inputs. Use a custom VertexManager"));
}
}
@Test(timeout = 5000)
public void testRootInputVertexManagerSlowStart() {
Configuration conf = new Configuration();
RootInputVertexManager manager = null;
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
EdgeProperty eProp1 = EdgeProperty.create(
EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));
String mockSrcVertexId2 = "Vertex2";
EdgeProperty eProp2 = EdgeProperty.create(
EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));
String mockManagedVertexId = "Vertex3";
VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
when(mockContext.getVertexStatistics(any()))
.thenReturn(mock(VertexStatistics.class));
when(mockContext.getInputVertexEdgeProperties())
.thenReturn(mockInputVertices);
when(mockContext.getVertexName()).thenReturn(mockManagedVertexId);
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(3);
mockInputVertices.put(mockSrcVertexId1, eProp1);
mockInputVertices.put(mockSrcVertexId2, eProp2);
// check initialization
manager = createRootInputVertexManager(conf, mockContext, 0.1f, 0.1f);
final List<Integer> scheduledTasks = Lists.newLinkedList();
doAnswer(new ScheduledTasksAnswer(scheduledTasks)).when(
mockContext).scheduleTasks(anyList());
// source vertices have 0 tasks. immediate start of all managed tasks
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(0);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(0);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 3); // all tasks scheduled
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
try {
// source vertex have some tasks. min < 0.
manager = createRootInputVertexManager(conf, mockContext, -0.1f, 0.0f);
Assert.assertTrue(false); // should not come here
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
"Invalid values for slowStartMinFraction"));
}
try {
// source vertex have some tasks. max > 1.
manager = createRootInputVertexManager(conf, mockContext, 0.0f, 95.0f);
Assert.assertTrue(false); // should not come here
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
"Invalid values for slowStartMinFraction"));
}
try {
// source vertex have some tasks. min > max
manager = createRootInputVertexManager(conf, mockContext, 0.5f, 0.3f);
Assert.assertTrue(false); // should not come here
} catch (IllegalArgumentException e) {
Assert.assertTrue(e.getMessage().contains(
"Invalid values for slowStartMinFraction"));
}
// source vertex have some tasks. min > default and max undefined
int numTasks = 20;
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(numTasks);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(numTasks);
scheduledTasks.clear();
manager = createRootInputVertexManager(conf, mockContext, 0.975f, null);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(3, manager.pendingTasks.size());
Assert.assertEquals(numTasks*2, manager.totalNumSourceTasks);
Assert.assertEquals(0, manager.numSourceTasksCompleted);
float completedTasksThreshold = 0.975f * numTasks;
// Finish all tasks before exceeding the threshold
for (String mockSrcVertex : new String[] { mockSrcVertexId1,
mockSrcVertexId2 }) {
for (int i = 0; i < mockContext.getVertexNumTasks(mockSrcVertex); ++i) {
// complete 0th tasks outside the loop
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertex, i+1));
if ((i + 2) >= completedTasksThreshold) {
// stop before completing more than min/max source tasks
break;
}
}
}
// Since we haven't exceeded the threshold, all tasks are still pending
Assert.assertEquals(manager.totalTasksToSchedule,
manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size()); // no tasks scheduled
// Cross the threshold min/max threshold to schedule all tasks
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
Assert.assertEquals(3, manager.pendingTasks.size());
Assert.assertEquals(0, scheduledTasks.size());
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(0, manager.pendingTasks.size());
// all tasks scheduled
Assert.assertEquals(manager.totalTasksToSchedule, scheduledTasks.size());
// reset vertices for next test
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(2);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(2);
// source vertex have some tasks. min, max, 0
manager = createRootInputVertexManager(conf, mockContext, 0.0f, 0.0f);
manager.onVertexStarted(emptyCompletions);
Assert.assertEquals(manager.totalTasksToSchedule, 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 0);
// all source vertices need to be configured
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.totalNumSourceTasks, 4);
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 3); // all tasks scheduled
// min, max > 0 and min, max
manager = createRootInputVertexManager(conf, mockContext, 0.25f, 0.25f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
// task completion from non-bipartite stage does nothing
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
Assert.assertEquals(manager.numSourceTasksCompleted, 0);
// task completion on only 1 SG edge does nothing
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
Assert.assertEquals(manager.numSourceTasksCompleted, 1);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 3); // all tasks scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 2);
// min, max > 0 and min, max, absolute max 1.0
manager = createRootInputVertexManager(conf, mockContext, 1.0f, 1.0f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
// task completion from non-bipartite stage does nothing
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
Assert.assertEquals(manager.numSourceTasksCompleted, 0);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 1);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 1));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 3);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 1));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 3); // all tasks scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 4);
// min, max > 0 and min, max
manager = createRootInputVertexManager(conf, mockContext, 1.0f, 1.0f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
// task completion from non-bipartite stage does nothing
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 4);
Assert.assertEquals(manager.numSourceTasksCompleted, 0);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 1);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 1));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 3);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 1));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 3); // all tasks scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 4);
// reset vertices for next test
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(4);
when(mockContext.getVertexNumTasks(mockSrcVertexId2)).thenReturn(4);
// min, max > and min < max
manager = createRootInputVertexManager(conf, mockContext, 0.25f, 0.75f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 1));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 2);
// completion of same task again should not get counted
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 1));
Assert.assertEquals(manager.pendingTasks.size(), 3);
Assert.assertEquals(manager.numSourceTasksCompleted, 2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(manager.pendingTasks.size(), 1);
Assert.assertEquals(scheduledTasks.size(), 2); // 2 task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 4);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 2));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 2));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 1); // 1 tasks scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 6);
scheduledTasks.clear();
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 3)); // we are done. no action
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 0); // no task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 7);
// min, max > and min < max
manager = createRootInputVertexManager(conf, mockContext, 0.25f, 1.0f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 3); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(manager.pendingTasks.size(), 2);
Assert.assertEquals(scheduledTasks.size(), 1); // 1 task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 4);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 2));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 2));
Assert.assertEquals(manager.pendingTasks.size(), 1);
Assert.assertEquals(scheduledTasks.size(), 1); // 1 task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 6);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 3));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 3));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 1); // no task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 8);
// if there is single task to schedule, it should be schedule when src
// completed fraction is more than min slow start fraction
scheduledTasks.clear();
when(mockContext.getVertexNumTasks(mockManagedVertexId)).thenReturn(1);
manager = createRootInputVertexManager(conf, mockContext, 0.25f, 0.75f);
manager.onVertexStarted(emptyCompletions);
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId1,
VertexState.CONFIGURED));
manager.onVertexStateUpdated(new VertexStateUpdate(mockSrcVertexId2,
VertexState.CONFIGURED));
Assert.assertEquals(manager.pendingTasks.size(), 1); // no tasks scheduled
Assert.assertEquals(manager.totalNumSourceTasks, 8);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 0));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 1));
Assert.assertEquals(manager.pendingTasks.size(), 1);
Assert.assertEquals(scheduledTasks.size(), 0); // no task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 2);
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 1));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 0));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 1); // 1 task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 4);
scheduledTasks.clear();
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 2));
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId2, 2));
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 0); // no task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 6);
scheduledTasks.clear();
manager.onSourceTaskCompleted(createTaskAttemptIdentifier(
mockSrcVertexId1, 3)); // we are done. no action
Assert.assertEquals(manager.pendingTasks.size(), 0);
Assert.assertEquals(scheduledTasks.size(), 0); // no task scheduled
Assert.assertEquals(manager.numSourceTasksCompleted, 7);
}
@Test
public void testTezDrainCompletionsOnVertexStart() throws IOException {
Configuration conf = new Configuration();
RootInputVertexManager manager = null;
HashMap<String, EdgeProperty> mockInputVertices =
new HashMap<String, EdgeProperty>();
String mockSrcVertexId1 = "Vertex1";
EdgeProperty eProp1 = EdgeProperty.create(
EdgeProperty.DataMovementType.BROADCAST,
EdgeProperty.DataSourceType.PERSISTED,
EdgeProperty.SchedulingType.SEQUENTIAL,
OutputDescriptor.create("out"),
InputDescriptor.create("in"));
VertexManagerPluginContext mockContext =
mock(VertexManagerPluginContext.class);
when(mockContext.getVertexStatistics(any()))
.thenReturn(mock(VertexStatistics.class));
when(mockContext.getInputVertexEdgeProperties())
.thenReturn(mockInputVertices);
when(mockContext.getVertexNumTasks(mockSrcVertexId1)).thenReturn(3);
mockInputVertices.put(mockSrcVertexId1, eProp1);
// check initialization
manager = createRootInputVertexManager(conf, mockContext, 0.1f, 0.1f);
Assert.assertEquals(0, manager.numSourceTasksCompleted);
manager.onVertexStarted(Collections.singletonList(
createTaskAttemptIdentifier(mockSrcVertexId1, 0)));
Assert.assertEquals(1, manager.numSourceTasksCompleted);
}
static RootInputVertexManager createRootInputVertexManager(
Configuration conf, VertexManagerPluginContext context, Float min,
Float max) {
if (min != null) {
conf.setFloat(
TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION,
min);
} else {
conf.unset(
TEZ_ROOT_INPUT_VERTEX_MANAGER_MIN_SRC_FRACTION);
}
if (max != null) {
conf.setFloat(
TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION,
max);
} else {
conf.unset(TEZ_ROOT_INPUT_VERTEX_MANAGER_MAX_SRC_FRACTION);
}
if(max != null || min != null) {
conf.setBoolean(TEZ_ROOT_INPUT_VERTEX_MANAGER_ENABLE_SLOW_START,
true);
}
UserPayload payload;
try {
payload = TezUtils.createUserPayloadFromConf(conf);
} catch (IOException e) {
throw new RuntimeException(e);
}
when(context.getUserPayload()).thenReturn(payload);
RootInputVertexManager manager = new RootInputVertexManager(context);
manager.initialize();
return manager;
}
protected static class ScheduledTasksAnswer implements Answer<Object> {
private List<Integer> scheduledTasks;
public ScheduledTasksAnswer(List<Integer> scheduledTasks) {
this.scheduledTasks = scheduledTasks;
}
@Override
public Object answer(InvocationOnMock invocation) throws IOException {
Object[] args = invocation.getArguments();
scheduledTasks.clear();
@SuppressWarnings("unchecked")
List<VertexManagerPluginContext.ScheduleTaskRequest> tasks =
(List<VertexManagerPluginContext.ScheduleTaskRequest>)args[0];
for (VertexManagerPluginContext.ScheduleTaskRequest task : tasks) {
scheduledTasks.add(task.getTaskIndex());
}
return null;
}
}
public static TaskAttemptIdentifier createTaskAttemptIdentifier(String vName,
int tId) {
VertexIdentifier mockVertex = mock(VertexIdentifier.class);
when(mockVertex.getName()).thenReturn(vName);
TaskIdentifier mockTask = mock(TaskIdentifier.class);
when(mockTask.getIdentifier()).thenReturn(tId);
when(mockTask.getVertexIdentifier()).thenReturn(mockVertex);
TaskAttemptIdentifier mockAttempt = mock(TaskAttemptIdentifier.class);
when(mockAttempt.getIdentifier()).thenReturn(0);
when(mockAttempt.getTaskIdentifier()).thenReturn(mockTask);
return mockAttempt;
}
}