blob: 42d6f5cd9d2834c717b3c97167ac909720850f37 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.test;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.client.TezClientUtils;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtils;
import org.apache.tez.common.counters.DAGCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.app.RecoveryParser;
import org.apache.tez.dag.app.dag.impl.ImmediateStartVertexManager;
import org.apache.tez.dag.history.HistoryEvent;
import org.apache.tez.dag.history.HistoryEventType;
import org.apache.tez.dag.history.events.TaskAttemptFinishedEvent;
import org.apache.tez.dag.history.recovery.RecoveryService;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.library.processor.SimpleProcessor;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestAMRecovery {
private static final Log LOG = LogFactory.getLog(TestAMRecovery.class);
private static Configuration conf = new Configuration();
private static TezConfiguration tezConf;
private static int MAX_AM_ATTEMPT = 10;
private static MiniTezCluster miniTezCluster = null;
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestAMRecovery.class.getName() + "-tmpDir";
private static MiniDFSCluster dfsCluster = null;
private static TezClient tezSession = null;
private static FileSystem remoteFs = null;
private static String FAIL_ON_PARTIAL_FINISHED = "FAIL_ON_PARTIAL_COMPLETED";
private static String FAIL_ON_ATTEMPT = "FAIL_ON_ATTEMPT";
@BeforeClass
public static void beforeClass() throws Exception {
LOG.info("Starting mini clusters");
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
.racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
if (miniTezCluster == null) {
miniTezCluster =
new MiniTezCluster(TestAMRecovery.class.getName(), 1, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, MAX_AM_ATTEMPT);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
miniTezCluster.init(miniTezconf);
miniTezCluster.start();
}
}
@AfterClass
public static void afterClass() throws InterruptedException {
if (tezSession != null) {
try {
LOG.info("Stopping Tez Session");
tezSession.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
if (miniTezCluster != null) {
try {
LOG.info("Stopping MiniTezCluster");
miniTezCluster.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
if (dfsCluster != null) {
try {
LOG.info("Stopping DFSCluster");
dfsCluster.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
@Before
public void setup() throws Exception {
LOG.info("Starting session");
Path remoteStagingDir =
remoteFs.makeQualified(new Path(TEST_ROOT_DIR, String
.valueOf(new Random().nextInt(100000))));
TezClientUtils.ensureStagingDirExists(conf, remoteStagingDir);
tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf.set(TezConfiguration.TEZ_AM_LOG_LEVEL, "INFO");
tezConf.set(TezConfiguration.TEZ_AM_STAGING_DIR,
remoteStagingDir.toString());
tezConf
.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, MAX_AM_ATTEMPT);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
tezConf.setBoolean(
TezConfiguration.TEZ_AM_STAGING_SCRATCH_DATA_AUTO_DELETE, false);
tezConf.setBoolean(
RecoveryService.TEZ_AM_RECOVERY_HANDLE_REMAINING_EVENT_WHEN_STOPPED,
true);
tezSession = TezClient.create("TestDAGRecovery", tezConf);
tezSession.start();
}
@After
public void teardown() throws InterruptedException {
if (tezSession != null) {
try {
LOG.info("Stopping Tez Session");
tezSession.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
tezSession = null;
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is running. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (Broadcast)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexPartiallyFinished_Broadcast() throws Exception {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.BROADCAST, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is also done. History flush happens. AM dies. Once AM is recovered, task 0
* and Task 1 is not re-run. (Broadcast)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexCompletelyFinished_Broadcast() throws Exception {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.BROADCAST, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is running. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (ONE_TO_ONE)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexPartialFinished_One2One() throws Exception {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is also done. History flush happens. AM dies. Once AM is recovered, task 0
* and Task 1 is not re-run. (ONE_TO_ONE)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexCompletelyFinished_One2One() throws Exception {
DAG dag =
createDAG(ControlledInputReadyVertexManager.class,
DataMovementType.ONE_TO_ONE, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is running. History flush happens. AM dies. Once AM is recovered, task 0 is
* not re-run. Task 1 is re-run. (SCATTER_GATHER)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexPartiallyFinished_ScatterGather() throws Exception {
DAG dag =
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, true);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(5, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(1, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(0, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
/**
* Fine-grained recovery task-level, In a vertex (v1), task 0 is done task 1
* is also done. History flush happens. AM dies. Once AM is recovered, task 0
* and Task 1 is not re-run. (SCATTER_GATHER)
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testVertexCompletelyFinished_ScatterGather() throws Exception {
DAG dag =
createDAG(ControlledShuffleVertexManager.class,
DataMovementType.SCATTER_GATHER, false);
TezCounters counters = runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
assertEquals(4, counters.findCounter(DAGCounter.TOTAL_LAUNCHED_TASKS).getValue());
assertEquals(0, counters.findCounter(DAGCounter.NUM_KILLED_TASKS).getValue());
assertEquals(4, counters.findCounter(DAGCounter.NUM_SUCCEEDED_TASKS).getValue());
assertEquals(2, counters.findCounter(TestCounter.Counter_1).getValue());
List<HistoryEvent> historyEvents1 = readRecoveryLog(1);
List<HistoryEvent> historyEvents2 = readRecoveryLog(2);
// task_0 of v1 is finished in attempt 1, task_1 of v1 is not finished in
// attempt 1
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents1, 0, 1).size());
// task_0 of v1 is finished in attempt 1 and not rerun, task_1 of v1 is
// finished in attempt 2
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 0).size());
assertEquals(1, findTaskAttemptFinishedEvent(historyEvents2, 0, 1).size());
}
/**
* Set AM max attempt to high number. Kill many attempts. Last AM can still be
* recovered with latest AM history data.
*
* @throws Exception
*/
@Test(timeout = 600000)
public void testHighMaxAttempt() throws Exception {
Random rand = new Random();
tezConf.set(FAIL_ON_ATTEMPT, rand.nextInt(MAX_AM_ATTEMPT) + "");
LOG.info("Set FAIL_ON_ATTEMPT=" + tezConf.get(FAIL_ON_ATTEMPT));
DAG dag =
createDAG(FailOnAttemptVertexManager.class,
DataMovementType.SCATTER_GATHER, false);
runDAGAndVerify(dag, DAGStatus.State.SUCCEEDED);
}
TezCounters runDAGAndVerify(DAG dag, DAGStatus.State finalState) throws Exception {
tezSession.waitTillReady();
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus =
dagClient.waitForCompletionWithStatusUpdates(EnumSet
.of(StatusGetOpts.GET_COUNTERS));
Assert.assertEquals(finalState, dagStatus.getState());
return dagStatus.getDAGCounters();
}
/**
* v1 --> v2 <br>
* v2 has a customized VM which could control when to kill AM
*
* @param vertexManagerClass
* @param dmType
* @param failOnParitialCompleted
* @return
* @throws IOException
*/
private DAG createDAG(Class vertexManagerClass, DataMovementType dmType,
boolean failOnParitialCompleted) throws IOException {
if (failOnParitialCompleted) {
tezConf.set(FAIL_ON_PARTIAL_FINISHED, "true");
} else {
tezConf.set(FAIL_ON_PARTIAL_FINISHED, "false");
}
DAG dag = DAG.create("dag");
UserPayload payload = UserPayload.create(null);
Vertex v1 = Vertex.create("v1", MyProcessor.getProcDesc(), 2);
Vertex v2 = Vertex.create("v2", DoNothingProcessor.getProcDesc(), 2);
v2.setVertexManagerPlugin(VertexManagerPluginDescriptor.create(
vertexManagerClass.getName()).setUserPayload(
TezUtils.createUserPayloadFromConf(tezConf)));
dag.addVertex(v1).addVertex(v2);
dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(dmType,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
TestOutput.getOutputDesc(payload), TestInput.getInputDesc(payload))));
return dag;
}
private List<TaskAttemptFinishedEvent> findTaskAttemptFinishedEvent(
List<HistoryEvent> historyEvents, int vertexId, int taskId) {
List<TaskAttemptFinishedEvent> resultEvents =
new ArrayList<TaskAttemptFinishedEvent>();
for (HistoryEvent historyEvent : historyEvents) {
if (historyEvent.getEventType() == HistoryEventType.TASK_ATTEMPT_FINISHED) {
TaskAttemptFinishedEvent taFinishedEvent =
(TaskAttemptFinishedEvent) historyEvent;
if (taFinishedEvent.getTaskAttemptID().getTaskID().getVertexID()
.getId() == vertexId
&& taFinishedEvent.getTaskAttemptID().getTaskID().getId() == taskId) {
resultEvents.add(taFinishedEvent);
}
}
}
return resultEvents;
}
private List<HistoryEvent> readRecoveryLog(int attemptNum) throws IOException {
ApplicationId appId = tezSession.getAppMasterApplicationId();
Path tezSystemStagingDir =
TezCommonUtils.getTezSystemStagingPath(tezConf, appId.toString());
Path recoveryDataDir =
TezCommonUtils.getRecoveryPath(tezSystemStagingDir, tezConf);
FileSystem fs = tezSystemStagingDir.getFileSystem(tezConf);
Path currentAttemptRecoveryDataDir =
TezCommonUtils.getAttemptRecoveryPath(recoveryDataDir, attemptNum);
Path recoveryFilePath =
new Path(currentAttemptRecoveryDataDir, appId.toString().replace(
"application", "dag")
+ "_1" + TezConstants.DAG_RECOVERY_RECOVER_FILE_SUFFIX);
return RecoveryParser.parseDAGRecoveryFile(fs.open(recoveryFilePath));
}
public static class ControlledInputReadyVertexManager extends
InputReadyVertexManager {
private Configuration conf;
public ControlledInputReadyVertexManager(VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
super.initialize();
try {
conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
super.onSourceTaskCompleted(srcVertexName, taskId);
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
if (taskId == 0) {
System.exit(-1);
}
} else {
if (taskId == 1) {
System.exit(-1);
}
}
}
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
// sleep for 1 seconds to delay the running of task in v2.
// this could keep the case that task of v1 is partial finished or completely
// finished, and at the same time the task of v2 is not started
try {
Thread.sleep(1*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
super.onVertexStarted(completions);
}
}
public static class ControlledShuffleVertexManager extends
ShuffleVertexManager {
private Configuration conf;
public ControlledShuffleVertexManager(VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
super.initialize();
try {
conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
int dagAttemptNumber = getContext().getDAGAttemptNumber();
super.onSourceTaskCompleted(srcVertexName, taskId);
if (dagAttemptNumber == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
if (taskId == 0) {
System.exit(-1);
}
} else {
if (taskId == 1) {
System.exit(-1);
}
}
}
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
// sleep for 1 seconds to delay the running of task in v2.
// this could keep the case that task of v1 is partial finished or completely
// finished, and at the same time the task of v2 is not started
try {
Thread.sleep(1*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
super.onVertexStarted(completions);
}
}
public static class ControlledImmediateStartVertexManager extends
ImmediateStartVertexManager {
private Configuration conf;
public ControlledImmediateStartVertexManager(
VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
super.initialize();
try {
conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
super.onSourceTaskCompleted(srcVertexName, taskId);
if (getContext().getDAGAttemptNumber() == 1) {
if (conf.getBoolean(FAIL_ON_PARTIAL_FINISHED, true)) {
if (taskId == 0) {
System.exit(-1);
}
} else {
if (taskId == 1) {
System.exit(-1);
}
}
}
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
// sleep for 1 seconds to delay the running of task in v2.
// this could keep the case that task of v1 is partial finished or completely
// finished, and at the same time the task of v2 is not started
try {
Thread.sleep(1*1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
super.onVertexStarted(completions);
}
}
/**
* VM which could control fail on attempt less than a specified number
*
*/
public static class FailOnAttemptVertexManager extends ShuffleVertexManager {
private Configuration conf;
public FailOnAttemptVertexManager(VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
super.initialize();
try {
conf =
TezUtils.createConfFromUserPayload(getContext().getUserPayload());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer taskId) {
int curAttempt = getContext().getDAGAttemptNumber();
super.onSourceTaskCompleted(srcVertexName, taskId);
int failOnAttempt = conf.getInt(FAIL_ON_ATTEMPT, 1);
LOG.info("failOnAttempt:" + failOnAttempt);
LOG.info("curAttempt:" + curAttempt);
if (curAttempt < failOnAttempt) {
System.exit(-1);
}
}
}
public static enum TestCounter {
Counter_1,
}
/**
* Do nothing if it is in task 0, sleep 3 seconds for other tasks. This enable
* us to kill AM in VM when some tasks are still running.
*
*/
public static class MyProcessor extends SimpleProcessor {
public MyProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
getContext().getCounters().findCounter(TestCounter.Counter_1).increment(1);
if (getContext().getTaskIndex() == 0) {
// keep task_0 running for 1 seconds to wait for task_1 start running
Thread.sleep(1 * 1000);;
} else {
Thread.sleep(3 * 1000);
}
}
public static ProcessorDescriptor getProcDesc() {
return ProcessorDescriptor.create(MyProcessor.class.getName());
}
}
public static class DoNothingProcessor extends SimpleProcessor {
public DoNothingProcessor(ProcessorContext context) {
super(context);
}
@Override
public void run() throws Exception {
}
public static ProcessorDescriptor getProcDesc() {
return ProcessorDescriptor.create(DoNothingProcessor.class.getName());
}
}
}