blob: 1a4659593fc704c0a4c84c3af956e19794afb1e4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.tez.common.counters.AggregateFrameworkCounter;
import org.apache.tez.common.counters.AggregateTezCounterDelegate;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPlugin;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputCommitterDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.VertexStatus;
import org.apache.tez.dag.api.client.VertexStatus.State;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.MockDAGAppMaster.CountersDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.ContainerDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.EventsDelegate;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher;
import org.apache.tez.dag.app.MockDAGAppMaster.MockContainerLauncher.ContainerData;
import org.apache.tez.dag.app.MockDAGAppMaster.StatisticsDelegate;
import org.apache.tez.dag.app.dag.DAGState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.impl.DAGImpl;
import org.apache.tez.dag.app.dag.impl.TaskImpl;
import org.apache.tez.dag.app.dag.impl.VertexImpl;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.VertexStatistics;
import org.apache.tez.runtime.api.OutputCommitter;
import org.apache.tez.runtime.api.OutputCommitterContext;
import org.apache.tez.runtime.api.events.CompositeDataMovementEvent;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.CompositeRoutedDataMovementEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.IOStatistics;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.api.impl.TaskStatistics;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import org.apache.tez.serviceplugins.api.ContainerLaunchRequest;
import org.apache.tez.serviceplugins.api.ContainerStopRequest;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
public class TestMockDAGAppMaster {
private static final Log LOG = LogFactory.getLog(TestMockDAGAppMaster.class);
static Configuration defaultConf;
static FileSystem localFs;
static {
try {
defaultConf = new Configuration(false);
defaultConf.set("fs.defaultFS", "file:///");
defaultConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
localFs = FileSystem.getLocal(defaultConf);
String stagingDir = "target" + Path.SEPARATOR + TestMockDAGAppMaster.class.getName() + "-tmpDir";
defaultConf.set(TezConfiguration.TEZ_AM_STAGING_DIR, stagingDir);
} catch (IOException e) {
throw new RuntimeException("init failure", e);
}
}
static class TestEventsDelegate implements EventsDelegate {
@Override
public void getEvents(TaskSpec taskSpec, List<TezEvent> events, long time) {
for (OutputSpec output : taskSpec.getOutputs()) {
if (output.getPhysicalEdgeCount() == 1) {
events.add(new TezEvent(DataMovementEvent.create(0, 0, 0, null), new EventMetaData(
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
.getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
} else {
events.add(new TezEvent(CompositeDataMovementEvent.create(0,
output.getPhysicalEdgeCount(), null), new EventMetaData(
EventProducerConsumerType.OUTPUT, taskSpec.getVertexName(), output
.getDestinationVertexName(), taskSpec.getTaskAttemptID()), time));
}
}
}
}
@Test (timeout = 5000)
public void testLocalResourceSetup() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
Map<String, LocalResource> lrDAG = Maps.newHashMap();
String lrName1 = "LR1";
lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
Map<String, LocalResource> lrVertex = Maps.newHashMap();
String lrName2 = "LR2";
lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
DAG dag = DAG.create("testLocalResourceSetup").addTaskLocalFiles(lrDAG);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
dag.addVertex(vA);
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
ContainerData cData = mockLauncher.getContainers().values().iterator().next();
ContainerLaunchContext launchContext = cData.launchContext;
Map<String, LocalResource> taskLR = launchContext.getLocalResources();
// verify tasks are launched with both DAG and task resources.
Assert.assertTrue(taskLR.containsKey(lrName1));
Assert.assertTrue(taskLR.containsKey(lrName2));
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
}
@Test (timeout = 5000)
public void testInternalPreemption() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
// there is only 1 task whose first attempt will be preempted
DAG dag = DAG.create("testInternalPreemption");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
dag.addVertex(vA);
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
ContainerData cData = mockLauncher.getContainers().values().iterator().next();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockApp.getTaskSchedulerManager().preemptContainer(0, cData.cId);
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
TezVertexID vertexId = TezVertexID.getInstance(dagImpl.getID(), 0);
TezTaskAttemptID killedTaId = TezTaskAttemptID.getInstance(TezTaskID.getInstance(vertexId, 0), 0);
TaskAttempt killedTa = dagImpl.getVertex(vA.getName()).getTask(0).getAttempt(killedTaId);
//Refer to TEZ-3950
Assert.assertTrue(killedTa.getState().equals(TaskAttemptState.KILLED) || killedTa.getState().equals(TaskAttemptState.FAILED));
tezClient.stop();
}
@Test (timeout = 5000)
public void testBasicEvents() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.eventsDelegate = new TestEventsDelegate();
DAG dag = DAG.create("testBasicEvents");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 2);
Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 2);
Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 2);
Vertex vD = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 2);
dag.addVertex(vA)
.addVertex(vB)
.addVertex(vC)
.addVertex(vD)
.addEdge(
Edge.create(vA, vB, EdgeProperty.create(DataMovementType.BROADCAST,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
.addEdge(
Edge.create(vA, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
.addEdge(
Edge.create(vA, vD, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vB.getName());
TaskImpl tImpl = (TaskImpl) vImpl.getTask(1);
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
List<TezEvent> tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
int targetIndex1 = ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
int targetIndex2 = ((DataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
// order of vA task completion can change order of events
Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
(targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
Assert.assertEquals(2, tEvents.size()); // 2 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(1, ((CompositeRoutedDataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
Assert.assertEquals(vA.getName(), tEvents.get(1).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(1, ((CompositeRoutedDataMovementEvent)tEvents.get(1).getEvent()).getSourceIndex());
targetIndex1 = ((CompositeRoutedDataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex();
targetIndex2 = ((CompositeRoutedDataMovementEvent)tEvents.get(1).getEvent()).getTargetIndex();
// order of vA task completion can change order of events
Assert.assertTrue("t1: " + targetIndex1 + " t2: " + targetIndex2,
(targetIndex1 == 0 && targetIndex2 == 1) || (targetIndex1 == 1 && targetIndex2 == 0));
vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
tImpl = (TaskImpl) vImpl.getTask(1);
taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
tEvents = vImpl.getTaskAttemptTezEvents(taId, 0, 0, 1000).getEvents();
Assert.assertEquals(1, tEvents.size()); // 1 from vA
Assert.assertEquals(vA.getName(), tEvents.get(0).getDestinationInfo().getEdgeVertexName());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getTargetIndex());
Assert.assertEquals(0, ((DataMovementEvent)tEvents.get(0).getEvent()).getSourceIndex());
tezClient.stop();
}
public static class LegacyEdgeTestEdgeManager extends EdgeManagerPlugin {
List<Integer> destinationInputIndices =
Collections.unmodifiableList(Collections.singletonList(0));
public LegacyEdgeTestEdgeManager(EdgeManagerPluginContext context) {
super(context);
}
@Override
public void initialize() throws Exception {
}
@Override
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) throws Exception {
return 1;
}
@Override
public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) throws Exception {
return 1;
}
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
}
@Override
public void routeInputSourceTaskFailedEventToDestination(int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
destinationTaskAndInputIndices.put(sourceTaskIndex, destinationInputIndices);
}
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
int destinationTaskIndex, int destinationFailedInputIndex) {
return destinationTaskIndex;
}
@Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
return 1;
}
}
@Test (timeout = 100000)
public void testMixedEdgeRouting() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.eventsDelegate = new TestEventsDelegate();
DAG dag = DAG.create("testMixedEdgeRouting");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 1);
Vertex vB = Vertex.create("B", ProcessorDescriptor.create("Proc.class"), 1);
Vertex vC = Vertex.create("C", ProcessorDescriptor.create("Proc.class"), 1);
Vertex vD = Vertex.create("D", ProcessorDescriptor.create("Proc.class"), 1);
Vertex vE = Vertex.create("E", ProcessorDescriptor.create("Proc.class"), 1);
dag.addVertex(vA)
.addVertex(vB)
.addVertex(vC)
.addVertex(vD)
.addVertex(vE)
.addEdge(
Edge.create(vA, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
.addEdge(
Edge.create(vB, vC, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
.addEdge(
Edge.create(vA, vD, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
.addEdge(
Edge.create(vB, vD, EdgeProperty.create(
EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()),
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))))
.addEdge(
Edge.create(vB, vE, EdgeProperty.create(
EdgeManagerPluginDescriptor.create(LegacyEdgeTestEdgeManager.class.getName()),
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
// vC uses on demand routing and its task does not provide events
VertexImpl vImpl = (VertexImpl) dagImpl.getVertex(vC.getName());
TaskImpl tImpl = (TaskImpl) vImpl.getTask(0);
TezTaskAttemptID taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
Assert.assertEquals(0, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
// vD is mixed mode and only 1 out of 2 edges does legacy routing with task providing events
vImpl = (VertexImpl) dagImpl.getVertex(vD.getName());
tImpl = (TaskImpl) vImpl.getTask(0);
taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
Assert.assertEquals(1, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
// vE has single legacy edge and does not use on demand routing and its task provides events
vImpl = (VertexImpl) dagImpl.getVertex(vE.getName());
tImpl = (TaskImpl) vImpl.getTask(0);
taId = TezTaskAttemptID.getInstance(tImpl.getTaskId(), 0);
Assert.assertEquals(1, tImpl.getTaskAttemptTezEvents(taId, 0, 1000).size());
tezClient.stop();
}
@Test (timeout = 100000)
public void testConcurrencyLimit() throws Exception {
// the test relies on local mode behavior of launching a new container per task.
// so task concurrency == container concurrency
TezConfiguration tezconf = new TezConfiguration(defaultConf);
final int concurrencyLimit = 5;
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false, concurrencyLimit*4, 1000);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
final AtomicInteger concurrency = new AtomicInteger(0);
final AtomicBoolean exceededConcurrency = new AtomicBoolean(false);
mockApp.containerDelegate = new ContainerDelegate() {
@Override
public void stop(ContainerStopRequest event) {
concurrency.decrementAndGet();
}
@Override
public void launch(ContainerLaunchRequest event) {
int maxConc = concurrency.incrementAndGet();
if (maxConc > concurrencyLimit) {
exceededConcurrency.set(true);
}
System.out.println("Launched: " + maxConc);
}
};
DAG dag = DAG.create("testConcurrencyLimit");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 20).setConf(
TezConfiguration.TEZ_AM_VERTEX_MAX_TASK_CONCURRENCY, String.valueOf(concurrencyLimit));
dag.addVertex(vA);
mockLauncher.startScheduling(true);
DAGClient dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
Assert.assertFalse(exceededConcurrency.get());
tezClient.stop();
}
@Test
public void testCountersAggregation() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false);
tezClient.start();
final String vAName = "A";
final String vBName = "B";
final String procCounterName = "Proc";
final String globalCounterName = "Global";
DAG dag = DAG.create("testCountersAggregation");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10);
Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 1);
dag.addVertex(vA)
.addVertex(vB)
.addEdge(
Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
TezCounters temp = new TezCounters();
temp.findCounter(new String(globalCounterName), new String(globalCounterName)).increment(1);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(bos);
temp.write(out);
final byte[] payload = bos.toByteArray();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.countersDelegate = new CountersDelegate() {
int counterValue = 0;
@Override
public TezCounters getCounters(TaskSpec taskSpec) {
String vName = taskSpec.getVertexName();
TezCounters counters = new TezCounters();
final DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(ByteBuffer.wrap(payload));
try {
// this ensures that the serde code path is covered.
// the internal merges of counters covers the constructor code path.
counters.readFields(in);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
counters.findCounter(vName, procCounterName).setValue(++counterValue);
for (OutputSpec output : taskSpec.getOutputs()) {
counters.findCounter(vName, output.getDestinationVertexName()).setValue(++counterValue);
}
for (InputSpec input : taskSpec.getInputs()) {
counters.findCounter(vName, input.getSourceVertexName()).setValue(++counterValue);
}
return counters;
}
};
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
TezCounters counters = dagImpl.getAllCounters();
// verify processor counters
VertexImpl vAImpl = (VertexImpl) dagImpl.getVertex(vAName);
VertexImpl vBImpl = (VertexImpl) dagImpl.getVertex(vBName);
TezCounters vACounters = vAImpl.getAllCounters();
TezCounters vBCounters = vBImpl.getAllCounters();
Assert.assertEquals(19, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, procCounterName)).getMax());
Assert.assertEquals(1, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, procCounterName)).getMin());
Assert.assertEquals(20, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, vBName)).getMax());
Assert.assertEquals(2, ((AggregateTezCounterDelegate)vACounters.findCounter(vAName, vBName)).getMin());
Assert.assertEquals(21, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, procCounterName)).getMin());
Assert.assertEquals(21, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, procCounterName)).getMax());
Assert.assertEquals(22, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, vAName)).getMin());
Assert.assertEquals(22, ((AggregateTezCounterDelegate)vBCounters.findCounter(vBName, vAName)).getMax());
tezClient.stop();
}
@Test (timeout = 10000)
public void testBasicCounters() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false);
tezClient.start();
final String vAName = "A";
final String vBName = "B";
final String procCounterName = "Proc";
final String globalCounterName = "Global";
DAG dag = DAG.create("testBasicCounters");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10);
Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 1);
dag.addVertex(vA)
.addVertex(vB)
.addEdge(
Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
TezCounters temp = new TezCounters();
temp.findCounter(new String(globalCounterName), new String(globalCounterName)).increment(1);
ByteArrayOutputStream bos = new ByteArrayOutputStream();
DataOutput out = new DataOutputStream(bos);
temp.write(out);
final byte[] payload = bos.toByteArray();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.countersDelegate = new CountersDelegate() {
@Override
public TezCounters getCounters(TaskSpec taskSpec) {
String vName = taskSpec.getVertexName();
TezCounters counters = new TezCounters();
final DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(ByteBuffer.wrap(payload));
try {
// this ensures that the serde code path is covered.
// the internal merges of counters covers the constructor code path.
counters.readFields(in);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
counters.findCounter(vName, procCounterName).increment(1);
for (OutputSpec output : taskSpec.getOutputs()) {
counters.findCounter(vName, output.getDestinationVertexName()).increment(1);
}
for (InputSpec input : taskSpec.getInputs()) {
counters.findCounter(vName, input.getSourceVertexName()).increment(1);
}
return counters;
}
};
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
TezCounters counters = dagImpl.getAllCounters();
String osName = System.getProperty("os.name").toLowerCase(Locale.ENGLISH);
if (SystemUtils.IS_OS_LINUX) {
Assert.assertTrue(counters.findCounter(DAGCounter.AM_CPU_MILLISECONDS).getValue() > 0);
}
// verify processor counters
Assert.assertEquals(10, counters.findCounter(vAName, procCounterName).getValue());
Assert.assertEquals(1, counters.findCounter(vBName, procCounterName).getValue());
// verify edge counters
Assert.assertEquals(10, counters.findCounter(vAName, vBName).getValue());
Assert.assertEquals(1, counters.findCounter(vBName, vAName).getValue());
// verify global counters
Assert.assertEquals(11, counters.findCounter(globalCounterName, globalCounterName).getValue());
VertexImpl vAImpl = (VertexImpl) dagImpl.getVertex(vAName);
VertexImpl vBImpl = (VertexImpl) dagImpl.getVertex(vBName);
TezCounters vACounters = vAImpl.getAllCounters();
TezCounters vBCounters = vBImpl.getAllCounters();
String vACounterName = vACounters.findCounter(globalCounterName, globalCounterName).getName();
String vBCounterName = vBCounters.findCounter(globalCounterName, globalCounterName).getName();
if (vACounterName != vBCounterName) {
Assert.fail("String counter name objects dont match despite interning.");
}
CounterGroup vaGroup = vACounters.getGroup(globalCounterName);
String vaGrouName = vaGroup.getName();
CounterGroup vBGroup = vBCounters.getGroup(globalCounterName);
String vBGrouName = vBGroup.getName();
if (vaGrouName != vBGrouName) {
Assert.fail("String group name objects dont match despite interning.");
}
tezClient.stop();
}
@Test (timeout = 10000)
public void testBasicStatistics() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false);
tezClient.start();
final String vAName = "A";
final String vBName = "B";
final String sourceName = "In";
final String sinkName = "Out";
DAG dag = DAG.create("testBasisStatistics");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 3);
Vertex vB = Vertex.create(vBName, ProcessorDescriptor.create("Proc.class"), 2);
vA.addDataSource(sourceName,
DataSourceDescriptor.create(InputDescriptor.create("In"), null, null));
vB.addDataSink(sinkName, DataSinkDescriptor.create(OutputDescriptor.create("Out"), null, null));
dag.addVertex(vA)
.addVertex(vB)
.addEdge(
Edge.create(vA, vB, EdgeProperty.create(DataMovementType.SCATTER_GATHER,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputDescriptor.create("Out"), InputDescriptor.create("In"))));
IOStatistics ioStats = new IOStatistics();
ioStats.setDataSize(1);
ioStats.setItemsProcessed(1);
TaskStatistics vAStats = new TaskStatistics();
vAStats.addIO(vBName, ioStats);
vAStats.addIO(sourceName, ioStats);
TaskStatistics vBStats = new TaskStatistics();
vBStats.addIO(vAName, ioStats);
vBStats.addIO(sinkName, ioStats);
ByteArrayOutputStream bosA = new ByteArrayOutputStream();
DataOutput outA = new DataOutputStream(bosA);
vAStats.write(outA);
final byte[] payloadA = bosA.toByteArray();
ByteArrayOutputStream bosB = new ByteArrayOutputStream();
DataOutput outB = new DataOutputStream(bosB);
vBStats.write(outB);
final byte[] payloadB = bosB.toByteArray();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.statsDelegate = new StatisticsDelegate() {
@Override
public TaskStatistics getStatistics(TaskSpec taskSpec) {
byte[] payload = payloadA;
TaskStatistics stats = new TaskStatistics();
if (taskSpec.getVertexName().equals(vBName)) {
payload = payloadB;
}
final DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(ByteBuffer.wrap(payload));
try {
// this ensures that the serde code path is covered.
stats.readFields(in);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
return stats;
}
};
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
// verify that the values have been correct aggregated
for (org.apache.tez.dag.app.dag.Vertex v : dagImpl.getVertices().values()) {
VertexStatistics vStats = v.getStatistics();
if (v.getName().equals(vAName)) {
Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getDataSize());
Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getDataSize());
Assert.assertEquals(3, vStats.getOutputStatistics(vBName).getItemsProcessed());
Assert.assertEquals(3, vStats.getInputStatistics(sourceName).getItemsProcessed());
} else {
Assert.assertEquals(2, vStats.getInputStatistics(vAName).getDataSize());
Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getDataSize());
Assert.assertEquals(2, vStats.getInputStatistics(vAName).getItemsProcessed());
Assert.assertEquals(2, vStats.getOutputStatistics(sinkName).getItemsProcessed());
}
}
tezClient.stop();
}
private void checkMemory(String name, MockDAGAppMaster mockApp) {
long mb = 1024*1024;
//Getting the runtime reference from system
Runtime runtime = Runtime.getRuntime();
System.out.println("##### Heap utilization statistics [MB] for " + name);
runtime.gc();
//Print used memory
System.out.println("##### Used Memory:"
+ (runtime.totalMemory() - runtime.freeMemory()) / mb);
//Print free memory
System.out.println("##### Free Memory:"
+ runtime.freeMemory() / mb);
//Print total available memory
System.out.println("##### Total Memory:" + runtime.totalMemory() / mb);
//Print Maximum available memory
System.out.println("##### Max Memory:" + runtime.maxMemory() / mb);
}
@Ignore
@Test (timeout = 60000)
public void testBasicCounterMemory() throws Exception {
Logger.getRootLogger().setLevel(Level.WARN);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false);
tezClient.start();
final String vAName = "A";
DAG dag = DAG.create("testBasicCounterMemory");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 10000);
dag.addVertex(vA);
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.countersDelegate = new CountersDelegate() {
@Override
public TezCounters getCounters(TaskSpec taskSpec) {
TezCounters counters = new TezCounters();
final String longName = "abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz";
final String shortName = "abcdefghijklmnopqrstuvwxyz";
for (int i=0; i<6; ++i) {
for (int j=0; j<15; ++j) {
counters.findCounter((i + longName), (i + (shortName))).increment(1);
}
}
return counters;
}
};
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
TezCounters counters = dagImpl.getAllCounters();
Assert.assertNotNull(counters);
checkMemory(dag.getName(), mockApp);
tezClient.stop();
}
@Ignore
@Test (timeout = 60000)
public void testTaskEventsProcessingSpeed() throws Exception {
Logger.getRootLogger().setLevel(Level.WARN);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
tezconf.setBoolean(TezConfiguration.TEZ_AM_USE_CONCURRENT_DISPATCHER, true);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false, 30, 1000);
tezClient.start();
final String vAName = "A";
DAG dag = DAG.create("testTaskEventsProcessingSpeed");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), 50000);
dag.addVertex(vA);
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
tezClient.stop();
}
@Ignore
@Test (timeout = 60000)
public void testBasicStatisticsMemory() throws Exception {
Logger.getRootLogger().setLevel(Level.WARN);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null,
null, false, false);
tezClient.start();
final String vAName = "abcdefghijklmnopqrstuvwxyz";
int numTasks = 10000;
int numSources = 10;
IOStatistics ioStats = new IOStatistics();
ioStats.setDataSize(1);
ioStats.setItemsProcessed(1);
TaskStatistics vAStats = new TaskStatistics();
DAG dag = DAG.create("testBasicStatisticsMemory");
Vertex vA = Vertex.create(vAName, ProcessorDescriptor.create("Proc.class"), numTasks);
for (int i=0; i<numSources; ++i) {
final String sourceName = i + vAName;
vA.addDataSource(sourceName,
DataSourceDescriptor.create(InputDescriptor.create(sourceName), null, null));
vAStats.addIO(sourceName, ioStats);
}
dag.addVertex(vA);
ByteArrayOutputStream bosA = new ByteArrayOutputStream();
DataOutput outA = new DataOutputStream(bosA);
vAStats.write(outA);
final byte[] payloadA = bosA.toByteArray();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
mockApp.statsDelegate = new StatisticsDelegate() {
@Override
public TaskStatistics getStatistics(TaskSpec taskSpec) {
byte[] payload = payloadA;
TaskStatistics stats = new TaskStatistics();
final DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(ByteBuffer.wrap(payload));
try {
// this ensures that the serde code path is covered.
stats.readFields(in);
} catch (IOException e) {
Assert.fail(e.getMessage());
}
return stats;
}
};
mockApp.doSleep = false;
DAGClient dagClient = tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
DAGImpl dagImpl = (DAGImpl) mockApp.getContext().getCurrentDAG();
mockLauncher.startScheduling(true);
DAGStatus status = dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, status.getState());
Assert.assertEquals(numTasks,
dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getDataSize());
Assert.assertEquals(numTasks,
dagImpl.getVertex(vAName).getStatistics().getInputStatistics(0+vAName).getItemsProcessed());
checkMemory(dag.getName(), mockApp);
tezClient.stop();
}
@Test (timeout = 10000)
public void testMultipleSubmissions() throws Exception {
Map<String, LocalResource> lrDAG = Maps.newHashMap();
String lrName1 = "LR1";
lrDAG.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
Map<String, LocalResource> lrVertex = Maps.newHashMap();
String lrName2 = "LR2";
lrVertex.put(lrName2, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test1"),
LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1));
DAG dag = DAG.create("test").addTaskLocalFiles(lrDAG);
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5).addTaskLocalFiles(lrVertex);
dag.addVertex(vA);
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
DAGClient dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
// submit the same DAG again to verify it can be done.
tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
tezClient.stop();
}
@Test (timeout = 10000)
public void testSchedulerErrorHandling() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(false);
DAG dag = DAG.create("testSchedulerErrorHandling");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
tezClient.submitDAG(dag);
mockLauncher.waitTillContainersLaunched();
mockApp.handle(new DAGAppMasterEventSchedulingServiceError(
org.apache.hadoop.util.StringUtils.stringifyException(new RuntimeException("Mock error"))));
while(!mockApp.getShutdownHandler().wasShutdownInvoked()) {
Thread.sleep(100);
}
Assert.assertEquals(DAGState.RUNNING, mockApp.getContext().getCurrentDAG().getState());
}
@Test (timeout = 10000)
public void testInitFailed() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true,
null, null, null, new AtomicBoolean(false), true, false);
try {
tezClient.start();
} catch (Exception e) {
e.printStackTrace();
Assert.assertEquals("FailInit", e.getCause().getCause().getMessage());
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
// will timeout if DAGAppMasterShutdownHook is not invoked
mockApp.waitForServiceToStop(Integer.MAX_VALUE);
}
}
@Test (timeout = 10000)
public void testStartFailed() {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true,
null, null, null, new AtomicBoolean(false), false, true);
try {
tezClient.start();
} catch (Exception e) {
e.printStackTrace();
Assert.assertEquals("FailStart", e.getCause().getCause().getMessage());
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
// will timeout if DAGAppMasterShutdownHook is not invoked
mockApp.waitForServiceToStop(Integer.MAX_VALUE);
}
}
private OutputCommitterDescriptor createOutputCommitterDesc(boolean failOnCommit) {
OutputCommitterDescriptor outputCommitterDesc =
OutputCommitterDescriptor.create(FailingOutputCommitter.class.getName());
UserPayload payload = UserPayload.create(
ByteBuffer.wrap(new FailingOutputCommitter.FailingOutputCommitterConfig(failOnCommit).toUserPayload()));
outputCommitterDesc.setUserPayload(payload);
return outputCommitterDesc;
}
private DAG createDAG(String dagName, boolean uv12CommitFail, boolean v3CommitFail) {
DAG dag = DAG.create(dagName);
Vertex v1 = Vertex.create("v1", ProcessorDescriptor.create("Proc"), 1);
Vertex v2 = Vertex.create("v2", ProcessorDescriptor.create("Proc"), 1);
Vertex v3 = Vertex.create("v3", ProcessorDescriptor.create("Proc"), 1);
VertexGroup uv12 = dag.createVertexGroup("uv12", v1, v2);
DataSinkDescriptor uv12DataSink = DataSinkDescriptor.create(
OutputDescriptor.create("dummy output"), createOutputCommitterDesc(uv12CommitFail), null);
uv12.addDataSink("uv12Out", uv12DataSink);
DataSinkDescriptor v3DataSink = DataSinkDescriptor.create(
OutputDescriptor.create("dummy output"), createOutputCommitterDesc(v3CommitFail), null);
v3.addDataSink("v3Out", v3DataSink);
GroupInputEdge e1 = GroupInputEdge.create(uv12, v3, EdgeProperty.create(
DataMovementType.SCATTER_GATHER, DataSourceType.PERSISTED,
SchedulingType.SEQUENTIAL,
OutputDescriptor.create("dummy output class"),
InputDescriptor.create("dummy input class")), InputDescriptor
.create("merge.class"));
dag.addVertex(v1)
.addVertex(v2)
.addVertex(v3)
.addEdge(e1);
return dag;
}
@Test (timeout = 60000)
public void testCommitOutputOnDAGSuccess() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
// both committers succeed
DAG dag1 = createDAG("testDAGBothCommitsSucceed", false, false);
DAGClient dagClient = tezClient.submitDAG(dag1);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
// vertexGroupCommiter fail (uv12)
DAG dag2 = createDAG("testDAGVertexGroupCommitFail", true, false);
dagClient = tezClient.submitDAG(dag2);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
.contains("fail output committer:uv12Out"));
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v3", null).getState());
// vertex commit fail (v3)
DAG dag3 = createDAG("testDAGVertexCommitFail", false, true);
dagClient = tezClient.submitDAG(dag3);
dagClient.waitForCompletion();
LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
.contains("fail output committer:v3Out"));
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v3", null).getState());
// both committers fail
DAG dag4 = createDAG("testDAGBothCommitsFail", true, true);
dagClient = tezClient.submitDAG(dag4);
dagClient.waitForCompletion();
LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
String diag = StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",");
Assert.assertTrue(diag.contains("fail output committer:uv12Out") ||
diag.contains("fail output committer:v3Out"));
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v3", null).getState());
tezClient.stop();
}
@Test (timeout = 60000)
public void testCommitOutputOnVertexSuccess() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
tezconf.setBoolean(TezConfiguration.TEZ_AM_COMMIT_ALL_OUTPUTS_ON_DAG_SUCCESS, false);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
// both committers succeed
DAG dag1 = createDAG("testDAGBothCommitsSucceed", false, false);
DAGClient dagClient = tezClient.submitDAG(dag1);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.SUCCEEDED, dagClient.getDAGStatus(null).getState());
// vertexGroupCommiter fail (uv12)
DAG dag2 = createDAG("testDAGVertexGroupCommitFail", true, false);
dagClient = tezClient.submitDAG(dag2);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
.contains("fail output committer:uv12Out"));
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
VertexStatus.State v3State = dagClient.getVertexStatus("v3", null).getState();
// v3 either succeeded (commit completed before uv12 commit fails)
// or killed ( uv12 commit fail when v3 is in running/committing)
if (v3State.equals(VertexStatus.State.SUCCEEDED)) {
LOG.info("v3 is succeeded");
} else {
Assert.assertEquals(VertexStatus.State.KILLED, v3State);
}
// vertex commit fail (v3)
DAG dag3 = createDAG("testDAGVertexCommitFail", false, true);
dagClient = tezClient.submitDAG(dag3);
dagClient.waitForCompletion();
LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
Assert.assertTrue(StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",")
.contains("Commit failed"));
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
Assert.assertEquals(VertexStatus.State.FAILED, dagClient.getVertexStatus("v3", null).getState());
Assert.assertTrue(StringUtils.join(dagClient.getVertexStatus("v3", null).getDiagnostics(),",")
.contains("fail output committer:v3Out"));
// both committers fail
DAG dag4 = createDAG("testDAGBothCommitsFail", true, true);
dagClient = tezClient.submitDAG(dag4);
dagClient.waitForCompletion();
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
LOG.info(dagClient.getDAGStatus(null).getDiagnostics());
Assert.assertEquals(DAGStatus.State.FAILED, dagClient.getDAGStatus(null).getState());
String diag = StringUtils.join(dagClient.getDAGStatus(null).getDiagnostics(),",");
Assert.assertTrue(diag.contains("fail output committer:uv12Out") ||
diag.contains("fail output committer:v3Out"));
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v1", null).getState());
Assert.assertEquals(VertexStatus.State.SUCCEEDED, dagClient.getVertexStatus("v2", null).getState());
v3State = dagClient.getVertexStatus("v3", null).getState();
// v3 either failed (commit of v3 fail before uv12 commit)
// or killed ( uv12 commit fail before commit of v3)
if (v3State.equals(VertexStatus.State.FAILED)) {
LOG.info("v3 is failed");
Assert.assertTrue(StringUtils.join(dagClient.getVertexStatus("v3", null).getDiagnostics(),",")
.contains("fail output committer:v3Out"));
} else {
Assert.assertEquals(VertexStatus.State.KILLED, v3State);
}
tezClient.stop();
}
public static class FailingOutputCommitter extends OutputCommitter {
boolean failOnCommit = false;
public FailingOutputCommitter(OutputCommitterContext committerContext) {
super(committerContext);
}
@Override
public void initialize() throws Exception {
FailingOutputCommitterConfig config = new
FailingOutputCommitterConfig();
config.fromUserPayload(
getContext().getUserPayload().deepCopyAsArray());
failOnCommit = config.failOnCommit;
}
@Override
public void setupOutput() throws Exception {
}
@Override
public void commitOutput() throws Exception {
if (failOnCommit) {
throw new Exception("fail output committer:" + getContext().getOutputName());
}
}
@Override
public void abortOutput(State finalState) throws Exception {
}
public static class FailingOutputCommitterConfig {
boolean failOnCommit;
public FailingOutputCommitterConfig() {
this(false);
}
public FailingOutputCommitterConfig(boolean failOnCommit) {
this.failOnCommit = failOnCommit;
}
public byte[] toUserPayload() {
return Ints.toByteArray((failOnCommit ? 1 : 0));
}
public void fromUserPayload(byte[] userPayload) {
int failInt = Ints.fromByteArray(userPayload);
if (failInt == 0) {
failOnCommit = false;
} else {
failOnCommit = true;
}
}
}
}
@Test(timeout = 5000)
public void testDAGFinishedRecoveryError() throws Exception {
TezConfiguration tezconf = new TezConfiguration(defaultConf);
MockTezClient tezClient = new MockTezClient("testMockAM", tezconf, true, null, null, null, null);
tezClient.start();
MockDAGAppMaster mockApp = tezClient.getLocalClient().getMockApp();
mockApp.recoveryFatalError = true;
MockContainerLauncher mockLauncher = mockApp.getContainerLauncher();
mockLauncher.startScheduling(true);
DAG dag = DAG.create("test");
Vertex vA = Vertex.create("A", ProcessorDescriptor.create("Proc.class"), 5);
dag.addVertex(vA);
DAGClient dagClient = tezClient.submitDAG(dag);
dagClient.waitForCompletion();
while(!mockApp.getShutdownHandler().wasShutdownInvoked()) {
Thread.sleep(100);
}
Assert.assertEquals(DAGState.SUCCEEDED, mockApp.getContext().getCurrentDAG().getState());
Assert.assertEquals(DAGAppMasterState.FAILED, mockApp.getState());
Assert.assertTrue(StringUtils.join(mockApp.getDiagnostics(),",")
.contains("Recovery had a fatal error, shutting down session after" +
" DAG completion"));
}
}