blob: 8fa574c696921d4c441e5c0cafa72e02ab7ee702 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import static org.junit.Assert.*;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.TaskAttemptListener;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventRecoverTask;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventManagerUserCodeError;
import org.apache.tez.dag.app.dag.event.VertexEventRecoverVertex;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.AMUserCodeException.Source;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
import org.apache.tez.dag.history.events.VertexRecoverableEventsGeneratedEvent;
import org.apache.tez.dag.history.events.VertexFinishedEvent;
import org.apache.tez.dag.history.events.VertexInitializedEvent;
import org.apache.tez.dag.history.events.VertexStartedEvent;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestVertexRecovery {
private static final Log LOG = LogFactory.getLog(TestVertexRecovery.class);
private DrainDispatcher dispatcher;
private AppContext mockAppContext;
private ApplicationId appId = ApplicationId.newInstance(
System.currentTimeMillis(), 1);
private DAGImpl dag;
private TezDAGID dagId = TezDAGID.getInstance(appId, 1);
private String user = "user";
private long initRequestedTime = 100L;
private long initedTime = initRequestedTime + 100L;
/*
* v1 v2 \ / v3
*/
private DAGPlan createDAGPlan() {
DAGPlan dag =
DAGPlan
.newBuilder()
.setName("testverteximpl")
.addVertex(
VertexPlan
.newBuilder()
.setName("vertex1")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder().addHost("host1")
.addRack("rack1").build())
.setTaskConfig(
PlanTaskConfiguration.newBuilder().setNumTasks(1)
.setVirtualCores(4).setMemoryMb(1024)
.setJavaOpts("").setTaskModule("x1.y1").build())
.addOutEdgeId("e1")
.addOutputs(
DAGProtos.RootInputLeafOutputProto
.newBuilder()
.setIODescriptor(
TezEntityDescriptorProto.newBuilder()
.setClassName("output").build())
.setName("outputx")
.setControllerDescriptor(
TezEntityDescriptorProto
.newBuilder()
.setClassName(
CountingOutputCommitter.class.getName())))
.build())
.addVertex(
VertexPlan
.newBuilder()
.setName("vertex2")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder().addHost("host2")
.addRack("rack2").build())
.setTaskConfig(
PlanTaskConfiguration.newBuilder().setNumTasks(2)
.setVirtualCores(4).setMemoryMb(1024)
.setJavaOpts("").setTaskModule("x2.y2").build())
.addOutEdgeId("e2").build())
.addVertex(
VertexPlan
.newBuilder()
.setName("vertex3")
.setType(PlanVertexType.NORMAL)
.setProcessorDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
"x3.y3"))
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder().addHost("host3")
.addRack("rack3").build())
.setTaskConfig(
PlanTaskConfiguration.newBuilder().setNumTasks(2)
.setVirtualCores(4).setMemoryMb(1024)
.setJavaOpts("foo").setTaskModule("x3.y3").build())
.addInEdgeId("e1")
.addInEdgeId("e2")
.addOutputs(
DAGProtos.RootInputLeafOutputProto
.newBuilder()
.setIODescriptor(
TezEntityDescriptorProto.newBuilder()
.setClassName("output").build())
.setName("outputx")
.setControllerDescriptor(
TezEntityDescriptorProto
.newBuilder()
.setClassName(
CountingOutputCommitter.class.getName())))
.build()
)
.addEdge(
EdgePlan
.newBuilder()
.setEdgeDestination(
TezEntityDescriptorProto.newBuilder().setClassName(
"i3_v1"))
.setInputVertexName("vertex1")
.setEdgeSource(
TezEntityDescriptorProto.newBuilder()
.setClassName("o1"))
.setOutputVertexName("vertex3")
.setDataMovementType(
PlanEdgeDataMovementType.SCATTER_GATHER).setId("e1")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build())
.addEdge(
EdgePlan
.newBuilder()
.setEdgeDestination(
TezEntityDescriptorProto.newBuilder().setClassName(
"i3_v2"))
.setInputVertexName("vertex2")
.setEdgeSource(
TezEntityDescriptorProto.newBuilder()
.setClassName("o2"))
.setOutputVertexName("vertex3")
.setDataMovementType(
PlanEdgeDataMovementType.SCATTER_GATHER).setId("e2")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()).build();
return dag;
}
class VertexEventHanlder implements EventHandler<VertexEvent> {
private List<VertexEvent> events = new ArrayList<VertexEvent>();
@Override
public void handle(VertexEvent event) {
events.add(event);
((VertexImpl) dag.getVertex(event.getVertexId())).handle(event);
}
public List<VertexEvent> getEvents() {
return this.events;
}
}
class TaskEventHandler implements EventHandler<TaskEvent> {
private List<TaskEvent> events = new ArrayList<TaskEvent>();
@Override
public void handle(TaskEvent event) {
events.add(event);
((TaskImpl) dag.getVertex(event.getTaskID().getVertexID()).getTask(
event.getTaskID())).handle(event);
}
public List<TaskEvent> getEvents() {
return events;
}
}
class TaskAttemptEventHandler implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
// TezTaskID taskId = event.getTaskAttemptID().getTaskID();
// ((TaskAttemptImpl) vertex1.getTask(taskId).getAttempt(
// event.getTaskAttemptID())).handle(event);
}
}
private VertexEventHanlder vertexEventHandler;
private TaskEventHandler taskEventHandler;
@Before
public void setUp() throws IOException {
dispatcher = new DrainDispatcher();
dispatcher.register(DAGEventType.class, mock(EventHandler.class));
vertexEventHandler = new VertexEventHanlder();
dispatcher.register(VertexEventType.class, vertexEventHandler);
taskEventHandler = new TaskEventHandler();
dispatcher.register(TaskEventType.class, taskEventHandler);
dispatcher.register(TaskAttemptEventType.class,
new TaskAttemptEventHandler());
dispatcher.init(new Configuration());
dispatcher.start();
mockAppContext = mock(AppContext.class, RETURNS_DEEP_STUBS);
DAGPlan dagPlan = createDAGPlan();
dag =
new DAGImpl(dagId, new Configuration(), dagPlan,
dispatcher.getEventHandler(), mock(TaskAttemptListener.class),
new Credentials(), new SystemClock(), user,
mock(TaskHeartbeatHandler.class), mockAppContext);
when(mockAppContext.getCurrentDAG()).thenReturn(dag);
dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
LOG.info("finish setUp");
}
/**
* vertex1(New) -> StartRecoveryTransition(SUCCEEDED)
*/
@Test
public void testRecovery_Desired_SUCCEEDED() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null));
assertEquals(VertexState.INITED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.SUCCEEDED));
dispatcher.await();
assertEquals(VertexState.SUCCEEDED, vertex1.getState());
assertEquals(vertex1.numTasks, vertex1.succeededTaskCount);
assertEquals(vertex1.numTasks, vertex1.completedTaskCount);
// recover its task
assertTaskRecoveredEventSent(vertex1);
// vertex3 is still in NEW, when the desiredState is
// Completed State, each vertex recovery by itself, not depend on its parent
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
assertEquals(VertexState.NEW, vertex3.getState());
// no VertexEvent pass to downstream vertex
assertEquals(0, vertexEventHandler.getEvents().size());
}
/**
* vertex1(New) -> StartRecoveryTransition(FAILED)
*/
@Test
public void testRecovery_Desired_FAILED() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null));
assertEquals(VertexState.INITED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.FAILED));
dispatcher.await();
assertEquals(VertexState.FAILED, vertex1.getState());
assertEquals(vertex1.numTasks, vertex1.failedTaskCount);
assertEquals(0, vertex1.completedTaskCount);
// recover its task
assertTaskRecoveredEventSent(vertex1);
// vertex3 is still in NEW, when the desiredState is
// Completed State, each vertex recovery by itself, not depend on its parent
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
assertEquals(VertexState.NEW, vertex3.getState());
// no VertexEvent pass to downstream vertex
assertEquals(0, vertexEventHandler.getEvents().size());
}
/**
* vertex1(New) -> StartRecoveryTransition(KILLED)
*/
@Test
public void testRecovery_Desired_KILLED() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null));
assertEquals(VertexState.INITED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.KILLED));
dispatcher.await();
assertEquals(VertexState.KILLED, vertex1.getState());
assertEquals(vertex1.numTasks, vertex1.killedTaskCount);
assertEquals(0, vertex1.completedTaskCount);
// recover its task
assertTaskRecoveredEventSent(vertex1);
// vertex3 is still in NEW, when the desiredState is
// Completed State, each vertex recovery by itself, not depend on its parent
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
assertEquals(VertexState.NEW, vertex3.getState());
// no VertexEvent pass to downstream vertex
assertEquals(0, vertexEventHandler.getEvents().size());
}
/**
* vertex1(New) -> StartRecoveryTransition(ERROR)
*/
@Test
public void testRecovery_Desired_ERROR() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState = vertex1.restoreFromEvent(new VertexInitializedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, vertex1.getTotalTasks(), "", null));
assertEquals(VertexState.INITED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.ERROR));
dispatcher.await();
assertEquals(VertexState.ERROR, vertex1.getState());
assertEquals(vertex1.numTasks, vertex1.failedTaskCount);
assertEquals(0, vertex1.completedTaskCount);
// recover its task
assertTaskRecoveredEventSent(vertex1);
// vertex3 is still in NEW, when the desiredState is
// Completed State, each vertex recovery by itself, not depend on its parent
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
assertEquals(VertexState.NEW, vertex3.getState());
// no VertexEvent pass to downstream vertex
assertEquals(0, vertexEventHandler.getEvents().size());
}
private TezEvent createTezEvent() {
return new TezEvent(InputDataInformationEvent.createWithSerializedPayload(0, ByteBuffer.allocate(0)),
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", null,
null));
}
/**
* vertex1(New) -> restoreFromDataMovementEvent -> StartRecoveryTransition
*/
@Test
public void testRecovery_New_Desired_RUNNING() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
VertexState recoveredState =
vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex1.recoveredEvents.size());
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
// InputDataInformationEvent is removed
assertEquals(0, vertex1.recoveredEvents.size());
// V_INIT and V_START is sent
assertEquals(VertexState.RUNNING, vertex1.getState());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
}
private void assertTaskRecoveredEventSent(VertexImpl vertex) {
int sentNum = 0;
for (TaskEvent event : taskEventHandler.getEvents()) {
if (event.getType() == TaskEventType.T_RECOVER) {
TaskEventRecoverTask recoverEvent = (TaskEventRecoverTask)event;
if (recoverEvent.getTaskID().getVertexID().equals(vertex.getVertexId())){
sentNum++;
}
}
}
assertEquals("expect " + vertex.getTotalTasks()
+ " TaskEventTaskRecover sent for vertex:" + vertex.getVertexId() +
"but actuall sent " + sentNum, vertex.getTotalTasks(), sentNum);
}
private void assertOutputCommitters(VertexImpl vertex){
assertTrue(vertex.getOutputCommitters() != null);
for (OutputCommitter c : vertex.getOutputCommitters().values()) {
CountingOutputCommitter committer = (CountingOutputCommitter) c;
assertEquals(0, committer.abortCounter);
assertEquals(0, committer.commitCounter);
assertEquals(1, committer.initCounter);
assertEquals(1, committer.setupCounter);
}
}
private void restoreFromInitializedEvent(VertexImpl vertex) {
long initTimeRequested = 100L;
long initedTime = initTimeRequested + 100L;
VertexState recoveredState =
vertex.restoreFromEvent(new VertexInitializedEvent(vertex
.getVertexId(), "vertex1", initTimeRequested, initedTime, vertex.getTotalTasks(),
"", null));
assertEquals(VertexState.INITED, recoveredState);
assertEquals(vertex.getTotalTasks(), vertex.getTasks().size());
assertEquals(initTimeRequested, vertex.initTimeRequested);
assertEquals(initedTime, vertex.initedTime);
}
/**
* restoreFromVertexInitializedEvent -> StartRecoveryTransition
*/
@Test
public void testRecovery_Inited_Desired_RUNNING() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
VertexState recoveredState =
vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.INITED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
// InputDataInformationEvent is removed
assertEquals(0, vertex1.recoveredEvents.size());
assertEquals(VertexState.RUNNING, vertex1.getState());
// task recovered event is sent
assertTaskRecoveredEventSent(vertex1);
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
}
/**
* restoreFromVertexInitializedEvent -> restoreFromVertexStartedEvent ->
* StartRecoveryTransition
*/
@Test
public void testRecovery_Started_Desired_RUNNING() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
long startTimeRequested = initedTime + 100L;
long startedTime = startTimeRequested + 100L;
VertexState recoveredState =
vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
startTimeRequested, startedTime));
assertEquals(VertexState.RUNNING, recoveredState);
assertEquals(startTimeRequested, vertex1.startTimeRequested);
assertEquals(startedTime, vertex1.startedTime);
recoveredState =
vertex1.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex1.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.RUNNING, recoveredState);
assertEquals(1, vertex1.recoveredEvents.size());
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
// InputDataInformationEvent is removed
assertEquals(0, vertex1.recoveredEvents.size());
assertEquals(VertexState.RUNNING, vertex1.getState());
// task recovered event is sent
assertTaskRecoveredEventSent(vertex1);
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
}
/**
* restoreFromVertexInitializedEvent -> restoreFromVertexStartedEvent ->
* restoreFromVertexFinishedEvent -> StartRecoveryTransition
*/
@Test
public void testRecovery_Finished_Desired_RUNNING() {
// v1: initFromInitializedEvent
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
// v1: initFromStartedEvent
long startRequestedTime = initedTime + 100L;
long startTime = startRequestedTime + 100L;
VertexState recoveredState =
vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
startRequestedTime, startTime));
assertEquals(VertexState.RUNNING, recoveredState);
// v1: initFromFinishedEvent
long finishTime = startTime + 100L;
recoveredState =
vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, startRequestedTime,
startTime, finishTime, VertexState.SUCCEEDED, "",
new TezCounters(), new VertexStats(), null));
assertEquals(finishTime, vertex1.finishTime);
assertEquals(VertexState.SUCCEEDED, recoveredState);
assertEquals(false, vertex1.recoveryCommitInProgress);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
// InputDataInformationEvent is removed
assertEquals(0, vertex1.recoveredEvents.size());
assertEquals(VertexState.RUNNING, vertex1.getState());
// task recovered event is sent
assertTaskRecoveredEventSent(vertex1);
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
}
/**
* vertex1 (New) -> StartRecoveryTransition <br>
* vertex2 (New) -> StartRecoveryTransition vertex3 (New) -> RecoverTransition
*/
@Test
public void testRecovery_RecoveringFromNew() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex1.getState());
assertEquals(1, vertex1.getTasks().size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex2.getState());
// no OutputCommitter for vertex2
assertNull(vertex2.getOutputCommitters());
// v3 go to RUNNING because v1 and v2 both start
assertEquals(VertexState.RUNNING, vertex3.getState());
assertEquals(2, vertex3.numRecoveredSourceVertices);
assertEquals(2, vertex3.numInitedSourceVertices);
assertEquals(2, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
// RootInputDataInformation is removed
assertEquals(0, vertex3.recoveredEvents.size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex3);
}
@Test
public void testRecovery_VertexManagerErrorOnRecovery() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex1.getState());
assertEquals(vertex1.getTotalTasks(), vertex1.getTasks().size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3
.getVertexId(), "vertex3", initRequestedTime, initedTime, 0, "",
null));
assertEquals(VertexState.INITED, recoveredState);
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
vertex3.handle(new VertexEventManagerUserCodeError(vertex3.getVertexId(),
new AMUserCodeException(Source.VertexManager, new TezUncheckedException("test"))));
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
restoreFromInitializedEvent(vertex2);
vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex2.getState());
// v3 FAILED due to user code error
assertEquals(VertexState.FAILED, vertex3.getState());
Assert.assertEquals(VertexTerminationCause.AM_USERCODE_FAILURE, vertex3.getTerminationCause());
assertEquals(2, vertex3.numRecoveredSourceVertices);
}
/**
* vertex1 (New) -> restoreFromInitialized -> StartRecoveryTransition<br>
* vertex2 (New) -> restoreFromInitialized -> StartRecoveryTransition<br>
* vertex3 (New) -> restoreFromVertexInitedEvent -> RecoverTransition<br>
*/
@Test
public void testRecovery_RecoveringFromInited() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex1.getState());
assertEquals(vertex1.getTotalTasks(), vertex1.getTasks().size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
VertexState recoveredState =
vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
recoveredState =
vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3
.getVertexId(), "vertex3", initRequestedTime, initedTime, 2, "",
null));
assertEquals(VertexState.INITED, recoveredState);
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
restoreFromInitializedEvent(vertex2);
vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex2.getState());
// v3 go to RUNNING because v1 and v2 both start
assertEquals(VertexState.RUNNING, vertex3.getState());
assertEquals(2, vertex3.numRecoveredSourceVertices);
// numInitedSourceVertices is wrong but doesn't matter because v3 has
// already initialized
assertEquals(2, vertex3.numInitedSourceVertices);
assertEquals(2, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
// RootInputDataInformation is removed
assertEquals(0, vertex3.recoveredEvents.size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex3);
// 1 for vertex1, 2 for vertex2, the second 2 for vertex3
assertTaskRecoveredEventSent(vertex1);
assertTaskRecoveredEventSent(vertex2);
assertTaskRecoveredEventSent(vertex3);
}
/**
* vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
* StartRecoveryTransition <br>
* vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> StartRecoveryTransition <br>
* vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> RecoverTransition
*/
@Test
public void testRecovery_RecoveringFromRunning() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
initRequestedTime + 100L, initRequestedTime + 200L));
assertEquals(VertexState.RUNNING, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex1.getState());
assertEquals(1, vertex1.getTasks().size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
recoveredState =
vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
recoveredState =
vertex3.restoreFromEvent(new VertexInitializedEvent(vertex3
.getVertexId(), "vertex3", initRequestedTime, initedTime, vertex3.getTotalTasks(), "",
null));
assertEquals(VertexState.INITED, recoveredState);
recoveredState =
vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(),
initRequestedTime + 100L, initRequestedTime + 200L));
assertEquals(VertexState.RUNNING, recoveredState);
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
recoveredState = vertex2.restoreFromEvent(new VertexInitializedEvent(vertex2.getVertexId(),
"vertex2", initRequestedTime, initedTime, vertex2.getTotalTasks(), "", null));
assertEquals(VertexState.INITED, recoveredState);
recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(),
initRequestedTime + 100L, initRequestedTime + 200L));
assertEquals(VertexState.RUNNING, recoveredState);
vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex2.getState());
// v3 go to RUNNING because v1 and v2 both start
assertEquals(VertexState.RUNNING, vertex3.getState());
assertEquals(2, vertex3.numRecoveredSourceVertices);
assertEquals(2, vertex3.numInitedSourceVertices);
assertEquals(2, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
// RootInputDataInformation is removed
assertEquals(0, vertex3.recoveredEvents.size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex3);
assertTaskRecoveredEventSent(vertex1);
assertTaskRecoveredEventSent(vertex2);
assertTaskRecoveredEventSent(vertex3);
}
/**
* vertex1 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
* restoreFromVertexFinished -> StartRecoveryTransition<br>
* vertex2 (New) -> restoreFromInitialized -> restoreFromVertexStarted ->
* restoreFromVertexFinished -> StartRecoveryTransition<br>
* vertex3 (New) -> restoreFromInitialized -> restoreFromVertexStarted -> RecoverTransition
*/
@Test
public void testRecovery_RecoveringFromSUCCEEDED() {
VertexImpl vertex1 = (VertexImpl) dag.getVertex("vertex1");
restoreFromInitializedEvent(vertex1);
VertexState recoveredState = vertex1.restoreFromEvent(new VertexStartedEvent(vertex1.getVertexId(),
initRequestedTime + 100L, initRequestedTime + 200L));
assertEquals(VertexState.RUNNING, recoveredState);
recoveredState = vertex1.restoreFromEvent(new VertexFinishedEvent(vertex1.getVertexId(),
"vertex1", initRequestedTime, initedTime, initRequestedTime + 300L,
initRequestedTime + 400L, initRequestedTime + 500L,
VertexState.SUCCEEDED, "", new TezCounters(), new VertexStats(), null));
assertEquals(VertexState.SUCCEEDED, recoveredState);
vertex1.handle(new VertexEventRecoverVertex(vertex1.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex1.getState());
assertEquals(1, vertex1.getTasks().size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex1);
VertexImpl vertex3 = (VertexImpl) dag.getVertex("vertex3");
recoveredState =
vertex3.restoreFromEvent(new VertexRecoverableEventsGeneratedEvent(
vertex3.getVertexId(), Lists.newArrayList(createTezEvent())));
assertEquals(VertexState.NEW, recoveredState);
assertEquals(1, vertex3.recoveredEvents.size());
restoreFromInitializedEvent(vertex3);
recoveredState =
vertex3.restoreFromEvent(new VertexStartedEvent(vertex3.getVertexId(),
initRequestedTime + 100L, initRequestedTime + 200L));
assertEquals(VertexState.RUNNING, recoveredState);
// wait for recovery of vertex2
assertEquals(VertexState.RECOVERING, vertex3.getState());
assertEquals(1, vertex3.numRecoveredSourceVertices);
assertEquals(1, vertex3.numInitedSourceVertices);
assertEquals(1, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
VertexImpl vertex2 = (VertexImpl) dag.getVertex("vertex2");
recoveredState = vertex2.restoreFromEvent(new VertexInitializedEvent(vertex2.getVertexId(),
"vertex2", initRequestedTime, initedTime, vertex2.getTotalTasks(), "", null));
assertEquals(VertexState.INITED, recoveredState);
recoveredState = vertex2.restoreFromEvent(new VertexStartedEvent(vertex2.getVertexId(),
initRequestedTime + 100L, initRequestedTime + 200L));
assertEquals(VertexState.RUNNING, recoveredState);
vertex2.handle(new VertexEventRecoverVertex(vertex2.getVertexId(),
VertexState.RUNNING));
dispatcher.await();
assertEquals(VertexState.RUNNING, vertex2.getState());
// v3 go to RUNNING because v1 and v2 both start
assertEquals(VertexState.RUNNING, vertex3.getState());
assertEquals(2, vertex3.numRecoveredSourceVertices);
assertEquals(2, vertex3.numInitedSourceVertices);
assertEquals(2, vertex3.numStartedSourceVertices);
assertEquals(1, vertex3.getDistanceFromRoot());
// RootInputDataInformation is removed
assertEquals(0, vertex3.recoveredEvents.size());
// verify OutputCommitter is initialized
assertOutputCommitters(vertex3);
assertTaskRecoveredEventSent(vertex1);
assertTaskRecoveredEventSent(vertex2);
assertTaskRecoveredEventSent(vertex3);
}
}