blob: 9a3292e8b83691251071b76b267ced74b258cbd5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.dag.impl;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyInt;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
import org.apache.tez.common.DrainDispatcher;
import org.apache.tez.common.counters.Limits;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.dag.event.DAGEventTerminateDag;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.apache.tez.common.MockDNSToSwitchMapping;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeManagerPluginOnDemand;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.DAGStatusBuilder;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.api.oldrecords.TaskState;
import org.apache.tez.dag.api.records.DAGProtos;
import org.apache.tez.dag.api.records.DAGProtos.ConfigurationProto;
import org.apache.tez.dag.api.records.DAGProtos.DAGPlan;
import org.apache.tez.dag.api.records.DAGProtos.EdgePlan;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataMovementType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeDataSourceType;
import org.apache.tez.dag.api.records.DAGProtos.PlanEdgeSchedulingType;
import org.apache.tez.dag.api.records.DAGProtos.PlanKeyValuePair;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskConfiguration;
import org.apache.tez.dag.api.records.DAGProtos.PlanTaskLocationHint;
import org.apache.tez.dag.api.records.DAGProtos.PlanVertexType;
import org.apache.tez.dag.api.records.DAGProtos.RootInputLeafOutputProto;
import org.apache.tez.dag.api.records.DAGProtos.TezEntityDescriptorProto;
import org.apache.tez.dag.api.records.DAGProtos.VertexPlan;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.ClusterInfo;
import org.apache.tez.dag.app.TaskCommunicatorManagerInterface;
import org.apache.tez.dag.app.TaskHeartbeatHandler;
import org.apache.tez.dag.app.dag.DAGScheduler;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.DAGTerminationCause;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.TestStateChangeNotifier.StateChangeNotifierForTest;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.VertexState;
import org.apache.tez.dag.app.dag.VertexTerminationCause;
import org.apache.tez.dag.app.dag.event.CallableEvent;
import org.apache.tez.dag.app.dag.event.CallableEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventDAGFinished;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEvent;
import org.apache.tez.dag.app.dag.event.DAGEventStartDag;
import org.apache.tez.dag.app.dag.event.DAGEventCommitCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventType;
import org.apache.tez.dag.app.dag.event.DAGEventVertexCompleted;
import org.apache.tez.dag.app.dag.event.DAGEventVertexReRunning;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.dag.app.dag.event.TaskEvent;
import org.apache.tez.dag.app.dag.event.TaskEventType;
import org.apache.tez.dag.app.dag.event.VertexEvent;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.dag.event.VertexEventTaskCompleted;
import org.apache.tez.dag.app.dag.event.VertexEventTaskReschedule;
import org.apache.tez.dag.app.dag.event.VertexEventType;
import org.apache.tez.dag.app.dag.impl.TestVertexImpl.CountingOutputCommitter;
import org.apache.tez.dag.app.rm.AMSchedulerEvent;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.common.security.ACLManager;
import org.apache.tez.dag.history.HistoryEventHandler;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputFailedEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.state.StateMachineTez;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.protobuf.ByteString;
public class TestDAGImpl {
private static final Logger LOG = LoggerFactory.getLogger(TestDAGImpl.class);
private DAGPlan dagPlan;
private TezDAGID dagId;
private static Configuration conf;
private DrainDispatcher dispatcher;
private ListeningExecutorService execService;
private Credentials fsTokens;
private AppContext appContext;
private ACLManager aclManager;
private ApplicationAttemptId appAttemptId;
private DAGImpl dag;
private TaskEventDispatcher taskEventDispatcher;
private VertexEventDispatcher vertexEventDispatcher;
private DagEventDispatcher dagEventDispatcher;
private TaskCommunicatorManagerInterface taskCommunicatorManagerInterface;
private TaskHeartbeatHandler thh;
private Clock clock = new SystemClock();
private DAGFinishEventHandler dagFinishEventHandler;
private AppContext mrrAppContext;
private DAGPlan mrrDagPlan;
private DAGImpl mrrDag;
private TezDAGID mrrDagId;
private AppContext groupAppContext;
private DAGPlan groupDagPlan;
private DAGImpl groupDag;
private TezDAGID groupDagId;
private DAGPlan dagPlanWithCustomEdge;
private DAGImpl dagWithCustomEdge;
private TezDAGID dagWithCustomEdgeId;
private AppContext dagWithCustomEdgeAppContext;
private HistoryEventHandler historyEventHandler;
private TaskAttemptEventDispatcher taskAttemptEventDispatcher;
private ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(8192,10));
private HadoopShim defaultShim = new DefaultHadoopShim();
static {
Limits.reset();
Configuration conf = new Configuration(false);
conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX, 100);
conf.setInt(TezConfiguration.TEZ_COUNTERS_MAX_GROUPS, 100);
Limits.setConfiguration(conf);
}
private DAGImpl chooseDAG(TezDAGID curDAGId) {
if (curDAGId.equals(dagId)) {
return dag;
} else if (curDAGId.equals(mrrDagId)) {
return mrrDag;
} else if (curDAGId.equals(groupDagId)) {
return groupDag;
} else if (curDAGId.equals(dagWithCustomEdgeId)) {
return dagWithCustomEdge;
} else {
throw new RuntimeException("Invalid event, unknown dag"
+ ", dagId=" + curDAGId);
}
}
private class DagEventDispatcher implements EventHandler<DAGEvent> {
@Override
public void handle(DAGEvent event) {
DAGImpl dag = chooseDAG(event.getDAGID());
dag.handle(event);
}
}
private class TaskEventDispatcher implements EventHandler<TaskEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(TaskEvent event) {
TezDAGID id = event.getDAGID();
DAGImpl handler = chooseDAG(id);
Vertex vertex = handler.getVertex(event.getVertexID());
Task task = vertex.getTask(event.getTaskID());
((EventHandler<TaskEvent>)task).handle(event);
}
}
private class TaskAttemptEventDispatcher implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
// Ignore
}
}
@SuppressWarnings("unchecked")
private class TaskAttemptEventDisptacher2 implements EventHandler<TaskAttemptEvent> {
@Override
public void handle(TaskAttemptEvent event) {
TezDAGID id = event.getDAGID();
DAGImpl handler = chooseDAG(id);
Vertex vertex = handler.getVertex(event.getVertexID());
Task task = vertex.getTask(event.getTaskID());
TaskAttempt ta = task.getAttempt(event.getTaskAttemptID());
((EventHandler<TaskAttemptEvent>)ta).handle(event);
}
}
private class VertexEventDispatcher
implements EventHandler<VertexEvent> {
@SuppressWarnings("unchecked")
@Override
public void handle(VertexEvent event) {
TezDAGID id = event.getDAGID();
DAGImpl handler = chooseDAG(id);
Vertex vertex = handler.getVertex(event.getVertexID());
((EventHandler<VertexEvent>) vertex).handle(event);
}
}
private class DAGFinishEventHandler
implements EventHandler<DAGAppMasterEventDAGFinished> {
public int dagFinishEvents = 0;
@Override
public void handle(DAGAppMasterEventDAGFinished event) {
++dagFinishEvents;
}
}
private DAGPlan createTestMRRDAGPlan() {
LOG.info("Setting up MRR dag plan");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
.addVertex(
VertexPlan.newBuilder()
.setName("vertex1")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host1")
.addRack("rack1")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x1.y1")
.build()
)
.addOutputs(
DAGProtos.RootInputLeafOutputProto.newBuilder()
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output1").build()
)
.setName("output1")
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
CountingOutputCommitter.class.getName()))
)
.addOutEdgeId("e1")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex2")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host2")
.addRack("rack2")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x2.y2")
.build()
)
.addOutputs(
DAGProtos.RootInputLeafOutputProto.newBuilder()
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output2").build()
)
.setName("output2")
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
CountingOutputCommitter.class.getName()))
)
.addInEdgeId("e1")
.addOutEdgeId("e2")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex3")
.setType(PlanVertexType.NORMAL)
.setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host3")
.addRack("rack3")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("foo")
.setTaskModule("x3.y3")
.build()
)
.addOutputs(
DAGProtos.RootInputLeafOutputProto.newBuilder()
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output3").build()
)
.setName("output3")
.setControllerDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName(
CountingOutputCommitter.class.getName()))
)
.addInEdgeId("e2")
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i2"))
.setInputVertexName("vertex1")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
.setOutputVertexName("vertex2")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e1")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3"))
.setInputVertexName("vertex2")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
.setOutputVertexName("vertex3")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e2")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.build();
return dag;
}
public static class TotalCountingOutputCommitter extends CountingOutputCommitter {
static int totalCommitCounter = 0;
public TotalCountingOutputCommitter(OutputCommitterContext context) {
super(context);
}
@Override
public void commitOutput() throws IOException {
++totalCommitCounter;
super.commitOutput();
}
}
// Create a plan with 3 vertices: A, B, C. Group(A,B)->C
static DAGPlan createGroupDAGPlan() {
LOG.info("Setting up group dag plan");
int dummyTaskCount = 1;
Resource dummyTaskResource = Resource.newInstance(1, 1);
org.apache.tez.dag.api.Vertex v1 = org.apache.tez.dag.api.Vertex.create("vertex1",
ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
org.apache.tez.dag.api.Vertex v2 = org.apache.tez.dag.api.Vertex.create("vertex2",
ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
org.apache.tez.dag.api.Vertex v3 = org.apache.tez.dag.api.Vertex.create("vertex3",
ProcessorDescriptor.create("Processor"),
dummyTaskCount, dummyTaskResource);
DAG dag = DAG.create("testDag");
String groupName1 = "uv12";
OutputCommitterDescriptor ocd = OutputCommitterDescriptor.create(
TotalCountingOutputCommitter.class.getName());
org.apache.tez.dag.api.VertexGroup uv12 = dag.createVertexGroup(groupName1, v1, v2);
OutputDescriptor outDesc = OutputDescriptor.create("output.class");
uv12.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
v3.addDataSink("uvOut", DataSinkDescriptor.create(outDesc, ocd, null));
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3,
EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")),
InputDescriptor.create("merge.class"));
dag.addVertex(v1);
dag.addVertex(v2);
dag.addVertex(v3);
dag.addEdge(e1);
return dag.createDag(conf, null, null, null, true);
}
public static DAGPlan createTestDAGPlan() {
LOG.info("Setting up dag plan");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
.setDagConf(ConfigurationProto.newBuilder()
.addConfKeyValues(PlanKeyValuePair.newBuilder()
.setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS)
.setValue(3 + "")))
.addVertex(
VertexPlan.newBuilder()
.setName("vertex1")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host1")
.addRack("rack1")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x1.y1")
.build()
)
.setVertexConf(ConfigurationProto.newBuilder()
.addConfKeyValues(PlanKeyValuePair.newBuilder()
.setKey(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS)
.setValue(2+"")))
.addOutEdgeId("e1")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex2")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host2")
.addRack("rack2")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x2.y2")
.build()
)
.addOutEdgeId("e2")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex3")
.setType(PlanVertexType.NORMAL)
.setProcessorDescriptor(TezEntityDescriptorProto.newBuilder().setClassName("x3.y3"))
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host3")
.addRack("rack3")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("foo")
.setTaskModule("x3.y3")
.build()
)
.addInEdgeId("e1")
.addInEdgeId("e2")
.addOutEdgeId("e3")
.addOutEdgeId("e4")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex4")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host4")
.addRack("rack4")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x4.y4")
.build()
)
.addInEdgeId("e3")
.addOutEdgeId("e5")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex5")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host5")
.addRack("rack5")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x5.y5")
.build()
)
.addInEdgeId("e4")
.addOutEdgeId("e6")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex6")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host6")
.addRack("rack6")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x6.y6")
.build()
)
.addInEdgeId("e5")
.addInEdgeId("e6")
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v1"))
.setInputVertexName("vertex1")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
.setOutputVertexName("vertex3")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e1")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i3_v2"))
.setInputVertexName("vertex2")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o2"))
.setOutputVertexName("vertex3")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e2")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i4_v3"))
.setInputVertexName("vertex3")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v4"))
.setOutputVertexName("vertex4")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e3")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i5_v3"))
.setInputVertexName("vertex3")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o3_v5"))
.setOutputVertexName("vertex5")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e4")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v4"))
.setInputVertexName("vertex4")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o4"))
.setOutputVertexName("vertex6")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e5")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("i6_v5"))
.setInputVertexName("vertex5")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o5"))
.setOutputVertexName("vertex6")
.setDataMovementType(PlanEdgeDataMovementType.SCATTER_GATHER)
.setId("e6")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.build();
return dag;
}
// v1 -> v2
private DAGPlan createDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
LOG.info("Setting up custome edge dag plan " + exLocation + " " + useLegacy);
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
.addVertex(
VertexPlan.newBuilder()
.setName("vertex1")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host1")
.addRack("rack1")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x1.y1")
.build()
)
.addOutEdgeId("e1")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex2")
.setType(PlanVertexType.NORMAL)
.setProcessorDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host2")
.addRack("rack2")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("foo")
.setTaskModule("x2.y2")
.build()
)
.addInEdgeId("e1")
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeManager(TezEntityDescriptorProto.newBuilder()
.setClassName(useLegacy ? CustomizedEdgeManagerLegacy.class.getName() :
CustomizedEdgeManager.class.getName())
.setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
.setUserPayload(ByteString.copyFromUtf8(exLocation.name())))
)
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
.setInputVertexName("vertex1")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
.setOutputVertexName("vertex2")
.setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
.setId("e1")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.build();
return dag;
}
// v1 -> v2
private DAGPlan createDAGWithNonExistEdgeManager() {
LOG.info("Setting up dag plan with non-exist edgemanager");
DAGPlan dag = DAGPlan.newBuilder()
.setName("testverteximpl")
.addVertex(
VertexPlan.newBuilder()
.setName("vertex1")
.setType(PlanVertexType.NORMAL)
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host1")
.addRack("rack1")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(1)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("")
.setTaskModule("x1.y1")
.build()
)
.addOutEdgeId("e1")
.build()
)
.addVertex(
VertexPlan.newBuilder()
.setName("vertex2")
.setType(PlanVertexType.NORMAL)
.setProcessorDescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("x2.y2"))
.addTaskLocationHint(
PlanTaskLocationHint.newBuilder()
.addHost("host2")
.addRack("rack2")
.build()
)
.setTaskConfig(
PlanTaskConfiguration.newBuilder()
.setNumTasks(2)
.setVirtualCores(4)
.setMemoryMb(1024)
.setJavaOpts("foo")
.setTaskModule("x2.y2")
.build()
)
.addInEdgeId("e1")
.build()
)
.addEdge(
EdgePlan.newBuilder()
.setEdgeManager(TezEntityDescriptorProto.newBuilder()
.setClassName("non-exist-edge-manager")
)
.setEdgeDestination(TezEntityDescriptorProto.newBuilder().setClassName("v1_v2"))
.setInputVertexName("vertex1")
.setEdgeSource(TezEntityDescriptorProto.newBuilder().setClassName("o1"))
.setOutputVertexName("vertex2")
.setDataMovementType(PlanEdgeDataMovementType.CUSTOM)
.setId("e1")
.setDataSourceType(PlanEdgeDataSourceType.PERSISTED)
.setSchedulingType(PlanEdgeSchedulingType.SEQUENTIAL)
.build()
)
.build();
return dag;
}
@BeforeClass
public static void beforeClass() {
MockDNSToSwitchMapping.initializeMockRackResolver();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
@Before
public void setup() {
conf = new Configuration();
conf.setBoolean(TezConfiguration.TEZ_AM_CONTAINER_REUSE_ENABLED, false);
appAttemptId = ApplicationAttemptId.newInstance(
ApplicationId.newInstance(100, 1), 1);
dagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 1);
Assert.assertNotNull(dagId);
dagPlan = createTestDAGPlan();
dispatcher = new DrainDispatcher();
fsTokens = new Credentials();
appContext = mock(AppContext.class);
execService = mock(ListeningExecutorService.class);
final ListenableFuture<Void> mockFuture = mock(ListenableFuture.class);
when(appContext.getHadoopShim()).thenReturn(defaultShim);
when(appContext.getApplicationID()).thenReturn(appAttemptId.getApplicationId());
doAnswer(new Answer() {
public ListenableFuture<Void> answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
CallableEvent e = (CallableEvent) args[0];
dispatcher.getEventHandler().handle(e);
return mockFuture;
}})
.when(execService).submit((Callable<Void>) any());
doReturn(execService).when(appContext).getExecService();
historyEventHandler = mock(HistoryEventHandler.class);
aclManager = new ACLManager("amUser");
doReturn(conf).when(appContext).getAMConf();
doReturn(appAttemptId).when(appContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(appContext).getApplicationID();
doReturn(dagId).when(appContext).getCurrentDAGID();
doReturn(historyEventHandler).when(appContext).getHistoryHandler();
doReturn(aclManager).when(appContext).getAMACLManager();
doReturn(defaultShim).when(appContext).getHadoopShim();
dag = new DAGImpl(dagId, conf, dagPlan,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, appContext);
dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
doReturn(dag).when(appContext).getCurrentDAG();
doReturn(clusterInfo).when(appContext).getClusterInfo();
mrrAppContext = mock(AppContext.class);
doReturn(aclManager).when(mrrAppContext).getAMACLManager();
doReturn(execService).when(mrrAppContext).getExecService();
doReturn(defaultShim).when(mrrAppContext).getHadoopShim();
mrrDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 2);
mrrDagPlan = createTestMRRDAGPlan();
mrrDag = new DAGImpl(mrrDagId, conf, mrrDagPlan,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh,
mrrAppContext);
mrrDag.entityUpdateTracker = new StateChangeNotifierForTest(mrrDag);
doReturn(conf).when(mrrAppContext).getAMConf();
doReturn(mrrDag).when(mrrAppContext).getCurrentDAG();
doReturn(appAttemptId).when(mrrAppContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(mrrAppContext).getApplicationID();
doReturn(historyEventHandler).when(mrrAppContext).getHistoryHandler();
doReturn(clusterInfo).when(mrrAppContext).getClusterInfo();
groupAppContext = mock(AppContext.class);
doReturn(aclManager).when(groupAppContext).getAMACLManager();
doReturn(execService).when(groupAppContext).getExecService();
doReturn(defaultShim).when(groupAppContext).getHadoopShim();
groupDagId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 3);
groupDagPlan = createGroupDAGPlan();
groupDag = new DAGImpl(groupDagId, conf, groupDagPlan,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh,
groupAppContext);
groupDag.entityUpdateTracker = new StateChangeNotifierForTest(groupDag);
doReturn(conf).when(groupAppContext).getAMConf();
doReturn(groupDag).when(groupAppContext).getCurrentDAG();
doReturn(appAttemptId).when(groupAppContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId())
.when(groupAppContext).getApplicationID();
doReturn(historyEventHandler).when(groupAppContext).getHistoryHandler();
doReturn(clusterInfo).when(groupAppContext).getClusterInfo();
// reset totalCommitCounter to 0
TotalCountingOutputCommitter.totalCommitCounter = 0;
dispatcher.register(CallableEventType.class, new CallableEventDispatcher());
taskEventDispatcher = new TaskEventDispatcher();
dispatcher.register(TaskEventType.class, taskEventDispatcher);
taskAttemptEventDispatcher = new TaskAttemptEventDispatcher();
dispatcher.register(TaskAttemptEventType.class, taskAttemptEventDispatcher);
vertexEventDispatcher = new VertexEventDispatcher();
dispatcher.register(VertexEventType.class, vertexEventDispatcher);
dagEventDispatcher = new DagEventDispatcher();
dispatcher.register(DAGEventType.class, dagEventDispatcher);
dagFinishEventHandler = new DAGFinishEventHandler();
dispatcher.register(DAGAppMasterEventType.class, dagFinishEventHandler);
dispatcher.init(conf);
dispatcher.start();
}
@After
public void teardown() {
dispatcher.await();
dispatcher.stop();
execService.shutdownNow();
dagPlan = null;
if (dag != null) {
dag.entityUpdateTracker.stop();
}
if (mrrDag != null) {
mrrDag.entityUpdateTracker.stop();
}
if (groupDag != null) {
groupDag.entityUpdateTracker.stop();
}
if (dagWithCustomEdge != null) {
dagWithCustomEdge.entityUpdateTracker.stop();
}
dag = null;
mrrDag = null;
groupDag = null;
dagWithCustomEdge = null;
}
private class AMSchedulerEventHandler implements EventHandler<AMSchedulerEvent> {
@Override
public void handle(AMSchedulerEvent event) {
// do nothing
}
}
private void setupDAGWithCustomEdge(ExceptionLocation exLocation) {
setupDAGWithCustomEdge(exLocation, false);
}
private void setupDAGWithCustomEdge(ExceptionLocation exLocation, boolean useLegacy) {
dagWithCustomEdgeId = TezDAGID.getInstance(appAttemptId.getApplicationId(), 4);
dagPlanWithCustomEdge = createDAGWithCustomEdge(exLocation, useLegacy);
dagWithCustomEdgeAppContext = mock(AppContext.class);
doReturn(aclManager).when(dagWithCustomEdgeAppContext).getAMACLManager();
when(dagWithCustomEdgeAppContext.getHadoopShim()).thenReturn(defaultShim);
dagWithCustomEdge = new DAGImpl(dagWithCustomEdgeId, conf, dagPlanWithCustomEdge,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, dagWithCustomEdgeAppContext);
dagWithCustomEdge.entityUpdateTracker = new StateChangeNotifierForTest(dagWithCustomEdge);
doReturn(conf).when(dagWithCustomEdgeAppContext).getAMConf();
doReturn(execService).when(dagWithCustomEdgeAppContext).getExecService();
doReturn(dagWithCustomEdge).when(dagWithCustomEdgeAppContext).getCurrentDAG();
doReturn(appAttemptId).when(dagWithCustomEdgeAppContext).getApplicationAttemptId();
doReturn(appAttemptId.getApplicationId()).when(dagWithCustomEdgeAppContext).getApplicationID();
doReturn(historyEventHandler).when(dagWithCustomEdgeAppContext).getHistoryHandler();
doReturn(clusterInfo).when(dagWithCustomEdgeAppContext).getClusterInfo();
dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDisptacher2());
dispatcher.register(AMSchedulerEventType.class, new AMSchedulerEventHandler());
when(dagWithCustomEdgeAppContext.getContainerLauncherName(anyInt())).thenReturn(
TezConstants.getTezYarnServicePluginName());
}
private void initDAG(DAGImpl impl) {
impl.handle(
new DAGEvent(impl.getID(), DAGEventType.DAG_INIT));
Assert.assertEquals(DAGState.INITED, impl.getState());
}
@SuppressWarnings("unchecked")
private void startDAG(DAGImpl impl) {
dispatcher.getEventHandler().handle(
new DAGEventStartDag(impl.getID(), null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, impl.getState());
}
@Test(timeout = 5000)
public void testDAGInit() {
initDAG(dag);
Assert.assertEquals(6, dag.getTotalVertices());
}
@Test(timeout = 5000)
public void testDAGInitFailed() {
setupDAGWithCustomEdge(ExceptionLocation.Initialize);
dagWithCustomEdge.handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
// START event is followed after INIT event
dagWithCustomEdge.handle(new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_START));
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
}
@Test(timeout = 5000)
public void testDAGInitFailedDuetoInvalidResource() {
// cluster maxContainerCapability is less than the vertex resource request
ClusterInfo clusterInfo = new ClusterInfo(Resource.newInstance(512,10));
doReturn(clusterInfo).when(appContext).getClusterInfo();
dag.handle(
new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), ",")
.contains("Vertex's TaskResource is beyond the cluster container capability"));
}
@Test(timeout = 5000)
public void testDAGStart() {
initDAG(dag);
startDAG(dag);
dispatcher.await();
for (int i = 0 ; i < 6; ++i ) {
TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.RUNNING, v.getState());
if (i < 2) {
Assert.assertEquals(0, v.getDistanceFromRoot());
} else if (i == 2) {
Assert.assertEquals(1, v.getDistanceFromRoot());
} else if ( i > 2 && i < 5) {
Assert.assertEquals(2, v.getDistanceFromRoot());
} else if (i == 5) {
Assert.assertEquals(3, v.getDistanceFromRoot());
}
}
for (int i = 0 ; i < 6; ++i ) {
TezVertexID vId = TezVertexID.getInstance(dagId, i);
LOG.info("Distance from root: v" + i + ":"
+ dag.getVertex(vId).getDistanceFromRoot());
}
}
@Test(timeout = 5000)
public void testNonExistEdgeManagerPlugin() {
dagPlan = createDAGWithNonExistEdgeManager();
dag = new DAGImpl(dagId, conf, dagPlan,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, appContext);
dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
doReturn(dag).when(appContext).getCurrentDAG();
dag.handle(new DAGEvent(dagId, DAGEventType.DAG_INIT));
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "")
.contains("java.lang.ClassNotFoundException: non-exist-edge-manager"));
}
@Test (timeout = 5000)
public void testNonExistDAGScheduler() {
conf.set(TezConfiguration.TEZ_AM_DAG_SCHEDULER_CLASS, "non-exist-dag-scheduler");
dag = new DAGImpl(dagId, conf, dagPlan,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, appContext);
dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
doReturn(dag).when(appContext).getCurrentDAG();
dag.handle(new DAGEvent(dag.getID(), DAGEventType.DAG_INIT));
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(DAGTerminationCause.INIT_FAILURE, dag.getTerminationCause());
Assert.assertTrue(StringUtils.join(dag.getDiagnostics(), "")
.contains("java.lang.ClassNotFoundException: non-exist-dag-scheduler"));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexCompletion() {
initDAG(dag);
Assert.assertTrue(0.0f == dag.getCompletedTaskProgress());
startDAG(dag);
Assert.assertTrue(0.0f == dag.getCompletedTaskProgress());
dispatcher.await();
TezVertexID vId = TezVertexID.getInstance(dagId, 1);
Vertex v = dag.getVertex(vId);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId, 1), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
// 2 tasks completed, total plan has 11 vertices
Assert.assertEquals((float) 2 / 11,
dag.getCompletedTaskProgress(), 0.05);
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_GetNumDestinationTaskPhysicalInputs() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationTaskPhysicalInputs);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationTaskPhysicalInputs.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_GetNumSourceTaskPhysicalOutputs() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumSourceTaskPhysicalOutputs);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
// After TEZ-1711, all task attempts of v1 fail which result in task fail, and finally
// dag failed.
Assert.assertEquals(DAGState.FAILED, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
String diag = StringUtils.join(v1.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.GetNumSourceTaskPhysicalOutputs.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_RouteDataMovementEventToDestination() {
setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskID(), 0));
DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
TezEvent tezEvent = new TezEvent(daEvent,
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getTaskAttemptID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
v2.getTaskAttemptTezEvents(ta1.getTaskAttemptID(), 0, 0, 1000);
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v1.getState());
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_RouteDataMovementEventToDestinationWithLegacyRouting() {
// Remove after legacy routing is removed
setupDAGWithCustomEdge(ExceptionLocation.RouteDataMovementEventToDestination, true);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskID(), 0));
DataMovementEvent daEvent = DataMovementEvent.create(ByteBuffer.wrap(new byte[0]));
TezEvent tezEvent = new TezEvent(daEvent,
new EventMetaData(EventProducerConsumerType.INPUT, "vertex1", "vertex2", ta1.getTaskAttemptID()));
dispatcher.getEventHandler().handle(
new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v1.getState());
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.RouteDataMovementEventToDestination.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_RouteInputSourceTaskFailedEventToDestinationLegacyRouting() {
// Remove after legacy routing is removed
setupDAGWithCustomEdge(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination, true);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskID(), 0));
InputFailedEvent ifEvent = InputFailedEvent.create(0, 1);
TezEvent tezEvent = new TezEvent(ifEvent,
new EventMetaData(EventProducerConsumerType.INPUT,"vertex1", "vertex2", ta1.getTaskAttemptID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
v2.getTaskAttemptTezEvents(ta1.getTaskAttemptID(), 0, 0, 1000);
dispatcher.await();
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v1.getState());
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.RouteInputSourceTaskFailedEventToDestination.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_GetNumDestinationConsumerTasks() {
setupDAGWithCustomEdge(ExceptionLocation.GetNumDestinationConsumerTasks);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskID(), 0));
InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
TezEvent tezEvent = new TezEvent(ireEvent,
new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getTaskAttemptID()));
dispatcher.getEventHandler().handle(
new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
//
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v1.getState());
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.GetNumDestinationConsumerTasks.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testEdgeManager_RouteInputErrorEventToSource() {
setupDAGWithCustomEdge(ExceptionLocation.RouteInputErrorEventToSource);
dispatcher.getEventHandler().handle(
new DAGEvent(dagWithCustomEdge.getID(), DAGEventType.DAG_INIT));
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagWithCustomEdge.getID(),
null));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dagWithCustomEdge.getState());
VertexImpl v1 = (VertexImpl)dagWithCustomEdge.getVertex("vertex1");
VertexImpl v2 = (VertexImpl)dagWithCustomEdge.getVertex("vertex2");
dispatcher.await();
Task t1= v2.getTask(0);
TaskAttemptImpl ta1= (TaskAttemptImpl)t1.getAttempt(TezTaskAttemptID.getInstance(t1.getTaskID(), 0));
InputReadErrorEvent ireEvent = InputReadErrorEvent.create("", 0, 0);
TezEvent tezEvent = new TezEvent(ireEvent,
new EventMetaData(EventProducerConsumerType.INPUT,"vertex2", "vertex1", ta1.getTaskAttemptID()));
dispatcher.getEventHandler().handle(new VertexEventRouteEvent(v2.getVertexId(), Lists.newArrayList(tezEvent)));
dispatcher.await();
//
Assert.assertEquals(VertexState.FAILED, v2.getState());
Assert.assertEquals(VertexState.KILLED, v1.getState());
String diag = StringUtils.join(v2.getDiagnostics(), ",");
Assert.assertTrue(diag.contains(ExceptionLocation.RouteInputErrorEventToSource.name()));
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testGroupDAGCompletionWithCommitSuccess() {
// should have only 2 commits. 1 vertex3 commit and 1 group commit.
initDAG(groupDag);
startDAG(groupDag);
dispatcher.await();
for (int i=0; i<3; ++i) {
Vertex v = groupDag.getVertex("vertex"+(i+1));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(i+1, groupDag.getSuccessfulVertices());
}
Assert.assertEquals(3, groupDag.getSuccessfulVertices());
Assert.assertTrue(1.0f == groupDag.getCompletedTaskProgress());
Assert.assertEquals(DAGState.SUCCEEDED, groupDag.getState());
Assert.assertEquals(2, TotalCountingOutputCommitter.totalCommitCounter);
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testGroupDAGWithVertexReRunning() {
groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(groupDag);
startDAG(groupDag);
dispatcher.await();
Vertex v1 = groupDag.getVertex("vertex1");
Vertex v2 = groupDag.getVertex("vertex2");
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
dispatcher.getEventHandler().handle(
new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// commit should not happen due to vertex-rerunning
Assert.assertEquals(0, TotalCountingOutputCommitter.totalCommitCounter);
dispatcher.getEventHandler().handle(
new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// commit happen
Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
Assert.assertEquals(2, groupDag.getSuccessfulVertices());
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testGroupDAGWithVertexReRunningAfterCommit() {
groupDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(groupDag);
startDAG(groupDag);
dispatcher.await();
Vertex v1 = groupDag.getVertex("vertex1");
Vertex v2 = groupDag.getVertex("vertex2");
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v1.getVertexId(), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(v2.getVertexId(), VertexState.SUCCEEDED));
dispatcher.await();
// vertex group commit happens
Assert.assertEquals(1, TotalCountingOutputCommitter.totalCommitCounter);
// dag failed when vertex re-run happens after vertex group commit is done.
dispatcher.getEventHandler().handle(new DAGEventVertexReRunning(v1.getVertexId()));
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, groupDag.getState());
Assert.assertEquals(DAGTerminationCause.VERTEX_RERUN_AFTER_COMMIT, groupDag.getTerminationCause());
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testDAGCompletionWithCommitSuccess() {
// all vertices completed -> DAG completion and commit
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();
for (int i=0; i<2; ++i) {
Vertex v = mrrDag.getVertex("vertex"+(i+1));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
}
// no commit yet
for (Vertex v : mrrDag.vertices.values()) {
for (OutputCommitter c : v.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
// dag completion and commit
Vertex v = mrrDag.getVertex("vertex3");
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
Assert.assertEquals(DAGState.SUCCEEDED, mrrDag.getState());
for (Vertex vertex : mrrDag.vertices.values()) {
for (OutputCommitter c : vertex.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(1, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout=5000)
public void testDAGCompletionWithCommitFailure() throws IOException {
// all vertices completed -> DAG completion and commit
initDAG(mrrDag);
dispatcher.await();
// committer for bad vertex will throw exception
Vertex badVertex = mrrDag.getVertex("vertex3");
List<RootInputLeafOutputProto> outputs =
new ArrayList<RootInputLeafOutputProto>();
outputs.add(RootInputLeafOutputProto.newBuilder()
.setControllerDescriptor(
TezEntityDescriptorProto
.newBuilder()
.setClassName(CountingOutputCommitter.class.getName())
.setTezUserPayload(DAGProtos.TezUserPayloadProto.newBuilder()
.setUserPayload(
ByteString
.copyFrom(new CountingOutputCommitter.CountingOutputCommitterConfig(
true, false, false).toUserPayload())).build()))
.setName("output3")
.setIODescriptor(
TezEntityDescriptorProto.newBuilder().setClassName("output.class")
)
.build());
badVertex.setAdditionalOutputs(outputs);
startDAG(mrrDag);
dispatcher.await();
for (int i=0; i<2; ++i) {
Vertex v = mrrDag.getVertex("vertex"+(i+1));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
}
// no commit yet
for (Vertex v : mrrDag.vertices.values()) {
for (OutputCommitter c : v.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
// dag completion and commit. Exception causes all outputs to be aborted
Vertex v = mrrDag.getVertex("vertex3");
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
Assert.assertEquals(DAGTerminationCause.COMMIT_FAILURE, mrrDag.getTerminationCause());
for (Vertex vertex : mrrDag.vertices.values()) {
for (OutputCommitter c : vertex.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(1, committer.abortCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout=5000)
public void testDAGErrorAbortAllOutputs() {
// error on a vertex -> dag error -> all outputs aborted.
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();
for (int i=0; i<2; ++i) {
Vertex v = mrrDag.getVertex("vertex"+(i+1));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
}
// no commit yet
for (Vertex v : mrrDag.vertices.values()) {
for (OutputCommitter c : v.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
// vertex error -> dag error -> abort all outputs
Vertex v = mrrDag.getVertex("vertex3");
dispatcher.getEventHandler().handle(new VertexEvent(
v.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
dispatcher.await();
Assert.assertEquals(VertexState.ERROR, v.getState());
Assert.assertEquals(DAGState.ERROR, mrrDag.getState());
for (Vertex vertex : mrrDag.vertices.values()) {
for (OutputCommitter c : vertex.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(1, committer.abortCounter);
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout=5000)
public void testDAGErrorAbortNonSuccessfulOutputs() {
// vertex success -> vertex output commit. failed dag aborts only non-successful vertices
mrrDag.getConf().setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();
for (int i=0; i<2; ++i) {
Vertex v = mrrDag.getVertex("vertex"+(i+1));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
for (OutputCommitter c : v.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
Assert.assertEquals(0, committer.abortCounter);
Assert.assertEquals(1, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
// error on vertex -> dag error
Vertex errorVertex = mrrDag.getVertex("vertex3");
dispatcher.getEventHandler().handle(new VertexEvent(
errorVertex.getVertexId(), VertexEventType.V_INTERNAL_ERROR));
dispatcher.await();
Assert.assertEquals(VertexState.ERROR, errorVertex.getState());
dispatcher.await();
Assert.assertEquals(DAGState.ERROR, mrrDag.getState());
for (Vertex vertex : mrrDag.vertices.values()) {
for (OutputCommitter c : vertex.getOutputCommitters().values()) {
CountingOutputCommitter committer= (CountingOutputCommitter) c;
if (vertex == errorVertex) {
Assert.assertEquals(1, committer.abortCounter);
Assert.assertEquals(0, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
} else {
// abort operation should take no side effort on the successful commit
Assert.assertEquals(1, committer.abortCounter);
Assert.assertEquals(1, committer.commitCounter);
Assert.assertEquals(1, committer.initCounter);
Assert.assertEquals(1, committer.setupCounter);
}
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout=5000)
public void testVertexReRunning() {
initDAG(dag);
dag.dagScheduler = mock(DAGScheduler.class);
startDAG(dag);
dispatcher.await();
TezVertexID vId = TezVertexID.getInstance(dagId, 1);
Vertex v = dag.getVertex(vId);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId, 1), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
dispatcher.getEventHandler().handle(
new VertexEventTaskReschedule(TezTaskID.getInstance(vId, 0)));
dispatcher.await();
Assert.assertEquals(VertexState.RUNNING, v.getState());
Assert.assertEquals(0, dag.getSuccessfulVertices());
Assert.assertEquals(0, dag.numCompletedVertices);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId, 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
Assert.assertEquals(1, dag.numCompletedVertices);
}
@SuppressWarnings("unchecked")
public void testKillStartedDAG() {
initDAG(dag);
startDAG(dag);
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null));
dispatcher.await();
Assert.assertEquals(DAGState.KILLED, dag.getState());
for (int i = 0 ; i < 6; ++i ) {
TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.KILLED, v.getState());
}
}
@Test(timeout = 5000)
public void testKillRunningDAG() {
_testTerminateRunningDAG(DAGTerminationCause.DAG_KILL);
}
@Test(timeout = 5000)
public void testServiceErrorRunningDAG() {
_testTerminateRunningDAG(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
}
@SuppressWarnings("unchecked")
private void _testTerminateRunningDAG(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
TezVertexID vId1 = TezVertexID.getInstance(dagId, 1);
Vertex v1 = dag.getVertex(vId1);
((EventHandler<VertexEvent>) v1).handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId1, 0), TaskState.SUCCEEDED));
TezVertexID vId0 = TezVertexID.getInstance(dagId, 0);
Vertex v0 = dag.getVertex(vId0);
((EventHandler<VertexEvent>) v0).handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(vId0, 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
Assert.assertEquals(VertexState.RUNNING, v1.getState());
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
Assert.assertEquals(DAGState.TERMINATING, dag.getState());
Assert.assertEquals(VertexState.SUCCEEDED, v0.getState());
Assert.assertEquals(VertexState.TERMINATING, v1.getState());
for (int i = 2 ; i < 6; ++i ) {
TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.KILLED, v.getState());
}
Assert.assertEquals(1, dag.getSuccessfulVertices());
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testInvalidEvent() {
dispatcher.getEventHandler().handle(
new DAGEventStartDag(dagId, null));
dispatcher.await();
Assert.assertEquals(DAGState.ERROR, dag.getState());
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
@Ignore // Duplicate completions from a vertex would be a bug. Invalid test.
public void testVertexSuccessfulCompletionUpdates() {
initDAG(dag);
startDAG(dag);
dispatcher.await();
for (int i = 0; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
Assert.assertEquals(1, dag.getSuccessfulVertices());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 2), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 3), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 4), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.SUCCEEDED, dag.getState());
Assert.assertEquals(6, dag.getSuccessfulVertices());
}
@SuppressWarnings("unchecked")
@Test(timeout = 10000)
public void testGetDAGStatusWithWait() throws TezException {
initDAG(dag);
startDAG(dag);
dispatcher.await();
// All vertices except one succeed
for (int i = 0; i < dag.getVertices().size() - 1; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
long dagStatusStartTime = System.currentTimeMillis();
DAGStatusBuilder dagStatus = dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 2000l);
long dagStatusEndTime = System.currentTimeMillis();
long diff = dagStatusEndTime - dagStatusStartTime;
Assert.assertTrue(diff >= 0 && diff < 2500);
Assert.assertEquals(DAGStatusBuilder.State.RUNNING, dagStatus.getState());
}
@SuppressWarnings("unchecked")
@Test(timeout = 20000)
public void testGetDAGStatusReturnOnDagSucceeded() throws InterruptedException, TezException {
runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.SUCCEEDED);
}
@SuppressWarnings("unchecked")
@Test(timeout = 20000)
public void testGetDAGStatusReturnOnDagFailed() throws InterruptedException, TezException {
runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.FAILED);
}
@SuppressWarnings("unchecked")
@Test(timeout = 20000)
public void testGetDAGStatusReturnOnDagKilled() throws InterruptedException, TezException {
runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.KILLED);
}
@SuppressWarnings("unchecked")
@Test(timeout = 20000)
public void testGetDAGStatusReturnOnDagError() throws InterruptedException, TezException {
runTestGetDAGStatusReturnOnDagFinished(DAGStatus.State.ERROR);
}
@SuppressWarnings("unchecked")
public void runTestGetDAGStatusReturnOnDagFinished(DAGStatusBuilder.State testState) throws TezException, InterruptedException {
initDAG(dag);
startDAG(dag);
dispatcher.await();
// All vertices except one succeed
for (int i = 0; i < dag.getVertices().size() - 1; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
}
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
// Verify that dagStatus is running state
Assert.assertEquals(DAGStatus.State.RUNNING, dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class),
10000L).getState());
ReentrantLock lock = new ReentrantLock();
Condition startCondition = lock.newCondition();
Condition endCondition = lock.newCondition();
DagStatusCheckRunnable statusCheckRunnable =
new DagStatusCheckRunnable(lock, startCondition, endCondition);
Thread t1 = new Thread(statusCheckRunnable);
t1.start();
lock.lock();
try {
while (!statusCheckRunnable.started.get()) {
startCondition.await();
}
} finally {
lock.unlock();
}
// Sleep for 2 seconds. Then mark the last vertex is successful.
Thread.sleep(2000l);
if (testState == DAGStatus.State.SUCCEEDED) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED));
} else if (testState == DAGStatus.State.FAILED) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.FAILED));
} else if (testState == DAGStatus.State.KILLED) {
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, DAGTerminationCause.DAG_KILL, null));
} else if (testState == DAGStatus.State.ERROR) {
dispatcher.getEventHandler().handle(new DAGEventStartDag(dagId, new LinkedList<URL>()));
} else {
throw new UnsupportedOperationException("Unsupported state for test: " + testState);
}
dispatcher.await();
// Wait for the dag status to return
lock.lock();
try {
while (!statusCheckRunnable.ended.get()) {
endCondition.await();
}
} finally {
lock.unlock();
}
long diff = statusCheckRunnable.dagStatusEndTime - statusCheckRunnable.dagStatusStartTime;
Assert.assertNotNull(statusCheckRunnable.dagStatus);
Assert.assertTrue("Status: " + statusCheckRunnable.dagStatus.getState()
+ ", Diff:" + diff, diff >= 0 && diff < 3500);
Assert.assertEquals(testState, statusCheckRunnable.dagStatus.getState());
t1.join();
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testVertexFailureHandling() {
initDAG(dag);
startDAG(dag);
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 2), VertexState.FAILED));
dispatcher.await();
Assert.assertEquals(DAGState.FAILED, dag.getState());
Assert.assertEquals(2, dag.getSuccessfulVertices());
// Expect running vertices to be killed on first failure
for (int i = 3; i < 6; ++i) {
TezVertexID vId = TezVertexID.getInstance(dagId, i);
Vertex v = dag.getVertex(vId);
Assert.assertEquals(VertexState.KILLED, v.getState());
}
}
@Test(timeout = 5000)
public void testDAGKill() {
_testDAGTerminate(DAGTerminationCause.DAG_KILL);
}
@Test(timeout = 5000)
public void testDAGServiceError() {
_testDAGTerminate(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
}
// Couple of vertices succeed. DAG_KILLED processed, which causes the rest of the vertices to be
// marked as KILLED.
@SuppressWarnings("unchecked")
private void _testDAGTerminate(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
Assert.assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertEquals(2, dag.getSuccessfulVertices());
int killedCount = 0;
for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) {
if (vEntry.getValue().getState() == VertexState.KILLED) {
killedCount++;
}
}
Assert.assertEquals(4, killedCount);
for (Vertex v : dag.getVertices().values()) {
Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause());
}
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
@Test (timeout = 5000L)
@SuppressWarnings("unchecked")
public void testDAGHang() throws Exception {
conf.setBoolean(
TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS,
false);
dag = spy(new DAGImpl(dagId, conf, dagPlan,
dispatcher.getEventHandler(), taskCommunicatorManagerInterface,
fsTokens, clock, "user", thh, appContext));
StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl> spyStateMachine =
spy(new StateMachineTez<DAGState, DAGEventType, DAGEvent, DAGImpl>(
dag.stateMachineFactory.make(dag), dag));
when(dag.getStateMachine()).thenReturn(spyStateMachine);
dag.entityUpdateTracker = new StateChangeNotifierForTest(dag);
doReturn(dag).when(appContext).getCurrentDAG();
DAGImpl.OutputKey outputKey = mock(DAGImpl.OutputKey.class);
ListenableFuture future = mock(ListenableFuture.class);
dag.commitFutures.put(outputKey, future);
initDAG(dag);
startDAG(dag);
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 2), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 3), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 4), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.COMMITTING, dag.getState());
DAGEventCommitCompleted dagEvent = new DAGEventCommitCompleted(
dagId, outputKey, false , new RuntimeException("test"));
doThrow(new RuntimeException("test")).when(
dag).logJobHistoryUnsuccesfulEvent(any(), any());
dag.handle(dagEvent);
dispatcher.await();
Assert.assertTrue("DAG did not terminate!", dag.getInternalState() == DAGState.FAILED);
}
@Test(timeout = 5000)
public void testDAGKillVertexSuccessAfterTerminated() {
_testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.DAG_KILL);
}
@Test(timeout = 5000)
public void testDAGServiceErrorVertexSuccessAfterTerminated() {
_testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
}
// Vertices succeed after a DAG kill has been processed. Should be ignored.
@SuppressWarnings("unchecked")
private void _testDAGKillVertexSuccessAfterTerminated(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
// Vertex SUCCESS gets processed after the DAG has reached the KILLED state. Should be ignored.
for (int i = 2; i < 6; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
int killedCount = 0;
for (Map.Entry<TezVertexID, Vertex> vEntry : dag.getVertices().entrySet()) {
if (vEntry.getValue().getState() == VertexState.KILLED) {
killedCount++;
}
}
Assert.assertEquals(4, killedCount);
Assert.assertEquals(terminationCause, dag.getTerminationCause());
Assert.assertEquals(2, dag.getSuccessfulVertices());
for (Vertex v : dag.getVertices().values()) {
Assert.assertEquals(VertexTerminationCause.DAG_TERMINATED, v.getTerminationCause());
}
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
@Test(timeout = 5000)
public void testDAGKillPending() {
_testDAGKillPending(DAGTerminationCause.DAG_KILL);
}
@Test(timeout = 5000)
public void testDAGServiceErrorPending() {
_testDAGKillPending(DAGTerminationCause.SERVICE_PLUGIN_ERROR);
}
// Vertex KILLED after a DAG_KILLED is issued. Termination reason should be DAG_KILLED
@SuppressWarnings("unchecked")
private void _testDAGKillPending(DAGTerminationCause terminationCause) {
initDAG(dag);
startDAG(dag);
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 0), VertexState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(DAGState.RUNNING, dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 1), VertexState.SUCCEEDED));
for (int i = 2; i < 5; ++i) {
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, i), VertexState.SUCCEEDED));
}
dispatcher.await();
dispatcher.getEventHandler().handle(new DAGEventTerminateDag(dagId, terminationCause, null));
dispatcher.await();
Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
dispatcher.getEventHandler().handle(new DAGEventVertexCompleted(
TezVertexID.getInstance(dagId, 5), VertexState.KILLED));
dispatcher.await();
Assert.assertEquals(terminationCause.getFinishedState(), dag.getState());
Assert.assertEquals(5, dag.getSuccessfulVertices());
Assert.assertEquals(dag.getVertex(TezVertexID.getInstance(dagId, 5)).getTerminationCause(),
VertexTerminationCause.DAG_TERMINATED);
Assert.assertEquals(1, dagFinishEventHandler.dagFinishEvents);
}
@Test(timeout = 5000)
public void testConfiguration() throws AMUserCodeException {
initDAG(dag);
// dag override the default configuration
Assert.assertEquals(3, dag.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
Vertex v1 = dag.getVertex("vertex1");
Vertex v2 = dag.getVertex("vertex2");
// v1 override the dagConfiguration
Assert.assertEquals(2, v1.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
// v2 inherit the configuration from dag
Assert.assertEquals(3, v2.getConf().getInt(TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS,
TezConfiguration.TEZ_AM_TASK_MAX_FAILED_ATTEMPTS_DEFAULT));
}
public static enum ExceptionLocation {
Initialize,
GetNumDestinationTaskPhysicalInputs,
GetNumSourceTaskPhysicalOutputs,
RouteDataMovementEventToDestination,
RouteInputSourceTaskFailedEventToDestination,
GetNumDestinationConsumerTasks,
RouteInputErrorEventToSource
}
public static class CustomizedEdgeManagerLegacy extends EdgeManagerPlugin {
private ExceptionLocation exLocation;
public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName())
.setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
}
public CustomizedEdgeManagerLegacy(EdgeManagerPluginContext context) {
super(context);
this.exLocation = ExceptionLocation.valueOf(
new String(context.getUserPayload().deepCopyAsArray()));
}
@Override
public void initialize() throws Exception {
if (exLocation == ExceptionLocation.Initialize) {
throw new Exception(exLocation.name());
}
}
@Override
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices)
throws Exception {
if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
throw new Exception(exLocation.name());
}
}
@Override
public void routeInputSourceTaskFailedEventToDestination(
int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices)
throws Exception {
if (exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
throw new Exception(exLocation.name());
}
}
@Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
int destinationTaskIndex, int destinationFailedInputIndex)
throws Exception {
if (exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
throw new Exception(exLocation.name());
}
return 0;
}
}
public static class CustomizedEdgeManager extends EdgeManagerPluginOnDemand {
private ExceptionLocation exLocation;
public static EdgeManagerPluginDescriptor getUserPayload(ExceptionLocation exLocation) {
return EdgeManagerPluginDescriptor.create(CustomizedEdgeManager.class.getName())
.setUserPayload(UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes())));
}
public CustomizedEdgeManager(EdgeManagerPluginContext context) {
super(context);
this.exLocation = ExceptionLocation.valueOf(
new String(context.getUserPayload().deepCopyAsArray()));
}
@Override
public void initialize() throws Exception {
if (exLocation == ExceptionLocation.Initialize) {
throw new Exception(exLocation.name());
}
}
@Override
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.GetNumDestinationTaskPhysicalInputs) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.GetNumSourceTaskPhysicalOutputs) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.GetNumDestinationConsumerTasks) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public int routeInputErrorEventToSource(int destinationTaskIndex,
int destinationFailedInputIndex) throws Exception {
if (exLocation == ExceptionLocation.RouteInputErrorEventToSource) {
throw new Exception(exLocation.name());
}
return 0;
}
@Override
public EventRouteMetadata routeDataMovementEventToDestination(int sourceTaskIndex,
int sourceOutputIndex, int destinationTaskIndex) throws Exception {
if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
throw new Exception(exLocation.name());
}
return null;
}
@Override
public CompositeEventRouteMetadata routeCompositeDataMovementEventToDestination(
int sourceTaskIndex, int destinationTaskIndex)
throws Exception {
if (exLocation == ExceptionLocation.RouteDataMovementEventToDestination) {
throw new Exception(exLocation.name());
}
return null;
}
@Override
public EventRouteMetadata routeInputSourceTaskFailedEventToDestination(
int sourceTaskIndex, int destinationTaskIndex) throws Exception {
if (exLocation == ExceptionLocation.RouteInputSourceTaskFailedEventToDestination) {
throw new Exception(exLocation.name());
}
return null;
}
@Override
public void prepareForRouting() throws Exception {
}
}
// Specificially for testGetDAGStatusReturnOnDagSuccess
private class DagStatusCheckRunnable implements Runnable {
private volatile DAGStatusBuilder dagStatus;
private volatile long dagStatusStartTime = -1;
private volatile long dagStatusEndTime = -1;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean ended = new AtomicBoolean(false);
private final ReentrantLock lock;
private final Condition startCondition;
private final Condition endCondition;
public DagStatusCheckRunnable(ReentrantLock lock,
Condition startCondition,
Condition endCondition) {
this.lock = lock;
this.startCondition = startCondition;
this.endCondition = endCondition;
}
@Override
public void run() {
started.set(true);
lock.lock();
try {
startCondition.signal();
} finally {
lock.unlock();
}
try {
dagStatusStartTime = System.currentTimeMillis();
dagStatus = dag.getDAGStatus(EnumSet.noneOf(StatusGetOpts.class), 10000l);
dagStatusEndTime = System.currentTimeMillis();
} catch (TezException e) {
}
lock.lock();
ended.set(true);
try {
endCondition.signal();
} finally {
lock.unlock();
}
}
}
@SuppressWarnings("unchecked")
@Test(timeout = 5000)
public void testCounterLimits() {
initDAG(mrrDag);
dispatcher.await();
startDAG(mrrDag);
dispatcher.await();
for (int i=0; i<3; ++i) {
Vertex v = mrrDag.getVertex("vertex"+(i+1));
TezCounters ctrs = new TezCounters();
for (int j = 0; j < 50; ++j) {
ctrs.findCounter("g", "c" + i + "_" + j).increment(1);
}
((VertexImpl) v).setCounters(ctrs);
dispatcher.getEventHandler().handle(new VertexEventTaskCompleted(
TezTaskID.getInstance(v.getVertexId(), 0), TaskState.SUCCEEDED));
dispatcher.await();
Assert.assertEquals(VertexState.SUCCEEDED, v.getState());
Assert.assertEquals(i+1, mrrDag.getSuccessfulVertices());
}
Assert.assertEquals(3, mrrDag.getSuccessfulVertices());
Assert.assertEquals(DAGState.FAILED, mrrDag.getState());
Assert.assertTrue("Diagnostics should contain counter limits error message",
StringUtils.join(mrrDag.getDiagnostics(), ",").contains("Counters limit exceeded"));
}
}