blob: d30d73fe7cf8d5228963b3e8b943406af653681a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.test;
import static org.junit.Assert.*;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSourceDescriptor;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerPluginContext;
import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.InputInitializerDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexManagerPluginContext;
import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.dag.api.EdgeProperty.DataSourceType;
import org.apache.tez.dag.api.EdgeProperty.SchedulingType;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.apache.tez.dag.app.dag.impl.OneToOneEdgeManager;
import org.apache.tez.dag.app.dag.impl.RootInputVertexManager;
import org.apache.tez.dag.library.vertexmanager.InputReadyVertexManager;
import org.apache.tez.runtime.api.AbstractLogicalIOProcessor;
import org.apache.tez.runtime.api.AbstractLogicalInput;
import org.apache.tez.runtime.api.AbstractLogicalOutput;
import org.apache.tez.runtime.api.Event;
import org.apache.tez.runtime.api.InputContext;
import org.apache.tez.runtime.api.InputInitializer;
import org.apache.tez.runtime.api.InputInitializerContext;
import org.apache.tez.runtime.api.LogicalInput;
import org.apache.tez.runtime.api.LogicalOutput;
import org.apache.tez.runtime.api.OutputContext;
import org.apache.tez.runtime.api.ProcessorContext;
import org.apache.tez.runtime.api.Reader;
import org.apache.tez.runtime.api.Writer;
import org.apache.tez.runtime.api.events.DataMovementEvent;
import org.apache.tez.runtime.api.events.InputDataInformationEvent;
import org.apache.tez.runtime.api.events.InputInitializerEvent;
import org.apache.tez.runtime.api.events.InputReadErrorEvent;
import org.apache.tez.runtime.api.events.VertexManagerEvent;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.test.TestAMRecovery.DoNothingProcessor;
import org.apache.tez.test.dag.MultiAttemptDAG.NoOpInput;
import org.junit.Test;
import com.google.common.collect.Lists;
public class TestExceptionPropagation {
private static final Log LOG = LogFactory
.getLog(TestExceptionPropagation.class);
private static TezConfiguration tezConf;
private static Configuration conf = new Configuration();
private static MiniTezCluster miniTezCluster = null;
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR
+ TestExceptionPropagation.class.getName() + "-tmpDir";
private static MiniDFSCluster dfsCluster = null;
private static FileSystem remoteFs = null;
private static TezClient tezSession = null;
private static TezClient tezClient = null;
private void startMiniTezCluster() {
LOG.info("Starting mini clusters");
try {
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, TEST_ROOT_DIR);
dfsCluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).format(true)
.racks(null).build();
remoteFs = dfsCluster.getFileSystem();
} catch (IOException io) {
throw new RuntimeException("problem starting mini dfs cluster", io);
}
miniTezCluster =
new MiniTezCluster(TestExceptionPropagation.class.getName(), 1, 1, 1);
Configuration miniTezconf = new Configuration(conf);
miniTezconf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 4);
miniTezconf.set("fs.defaultFS", remoteFs.getUri().toString()); // use HDFS
miniTezCluster.init(miniTezconf);
miniTezCluster.start();
}
private void stopTezMiniCluster() {
if (miniTezCluster != null) {
try {
LOG.info("Stopping MiniTezCluster");
miniTezCluster.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
if (dfsCluster != null) {
try {
LOG.info("Stopping DFSCluster");
dfsCluster.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void startSessionClient() throws Exception {
LOG.info("Starting session");
tezConf = new TezConfiguration();
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf
.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, true);
// for local mode
tezConf.setBoolean(TezConfiguration.TEZ_LOCAL_MODE, true);
tezConf.set("fs.defaultFS", "file:///");
tezConf.setBoolean(
TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, true);
tezSession = TezClient.create("TestExceptionPropagation", tezConf);
tezSession.start();
}
private void stopSessionClient() {
if (tezSession != null) {
try {
LOG.info("Stopping Tez Session");
tezSession.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
tezSession = null;
}
private void startNonSessionClient() throws Exception {
LOG.info("Starting Client");
tezConf = new TezConfiguration(miniTezCluster.getConfig());
tezConf.setInt(TezConfiguration.DAG_RECOVERY_MAX_UNFLUSHED_EVENTS, 0);
tezConf
.setBoolean(TezConfiguration.TEZ_AM_NODE_BLACKLISTING_ENABLED, false);
tezConf.setInt(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, 4);
tezConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, 500);
tezConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS, " -Xmx256m");
tezConf.setBoolean(TezConfiguration.TEZ_AM_SESSION_MODE, false);
tezClient = TezClient.create("TestExceptionPropagation", tezConf);
tezClient.start();
}
private void stopNonSessionClient() {
if (tezClient != null) {
try {
LOG.info("Stopping Tez Client");
tezClient.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
tezClient = null;
}
/**
* verify the diagnostics in DAGStatus is correct in session mode, using local
* mode for fast speed
*
* @throws Exception
*
*/
@Test(timeout = 600000)
public void testExceptionPropagationSession() throws Exception {
try {
startSessionClient();
for (ExceptionLocation exLocation : ExceptionLocation.values()) {
LOG.info("Session mode, Test for Exception from:" + exLocation.name());
DAG dag = createDAG(exLocation);
DAGClient dagClient = tezSession.submitDAG(dag);
DAGStatus dagStatus = dagClient.waitForCompletion();
String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
LOG.info("Diagnostics:" + diagnostics);
if (exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
assertTrue(diagnostics.contains("Too many counters"));
} else {
assertTrue(diagnostics.contains(exLocation.name()));
}
}
} finally {
stopSessionClient();
}
}
/**
* verify the diagnostics in {@link DAGStatus} is correct in non-session mode,
* and also verify that diagnostics from {@link DAGStatus} should match that
* from {@link ApplicationReport}
*
* @throws Exception
*/
@Test(timeout = 120000)
public void testExceptionPropagationNonSession() throws Exception {
try {
startMiniTezCluster();
startNonSessionClient();
ExceptionLocation exLocation = ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs;
LOG.info("NonSession mode, Test for Exception from:" + exLocation.name());
DAG dag = createDAG(exLocation);
DAGClient dagClient = tezClient.submitDAG(dag);
DAGStatus dagStatus = dagClient.waitForCompletion();
String diagnostics = StringUtils.join(dagStatus.getDiagnostics(), ",");
LOG.info("Diagnostics:" + diagnostics);
assertTrue(diagnostics.contains(exLocation.name()));
// wait for app complete (unregisterApplicationMaster is done)
ApplicationId appId = tezClient.getAppMasterApplicationId();
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(tezConf);
yarnClient.start();
Set<YarnApplicationState> FINAL_APPLICATION_STATES =
EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FAILED,
YarnApplicationState.FINISHED);
ApplicationReport appReport = null;
while (true) {
appReport = yarnClient.getApplicationReport(appId);
Thread.sleep(1000);
LOG.info("FinalAppStatus:" + appReport.getFinalApplicationStatus());
LOG.info("Diagnostics from appReport:" + appReport.getDiagnostics());
if (FINAL_APPLICATION_STATES.contains(appReport
.getYarnApplicationState())) {
break;
}
}
// wait for 1 second and call getApplicationReport again to ensure get the
// diagnostics
// TODO remove it after YARN-2560
Thread.sleep(1000);
appReport = yarnClient.getApplicationReport(appId);
LOG.info("FinalAppStatus:" + appReport.getFinalApplicationStatus());
LOG.info("Diagnostics from appReport:" + appReport.getDiagnostics());
assertTrue(appReport.getDiagnostics().contains(exLocation.name()));
// use "\n" as separator, because we also use it in Tez internally when
// assembling the application diagnostics.
assertEquals(StringUtils.join(dagStatus.getDiagnostics(), "\n").trim(),
appReport.getDiagnostics().trim());
} finally {
stopNonSessionClient();
Thread.sleep(10*1000);
stopTezMiniCluster();
}
}
public static enum ExceptionLocation {
INPUT_START, INPUT_GET_READER, INPUT_HANDLE_EVENTS, INPUT_CLOSE, INPUT_INITIALIZE, OUTPUT_START, OUTPUT_GET_WRITER,
// Not Supported yet
// OUTPUT_HANDLE_EVENTS,
OUTPUT_CLOSE, OUTPUT_INITIALIZE,
// Not Supported yet
// PROCESSOR_HANDLE_EVENTS
PROCESSOR_RUN_ERROR, PROCESSOR_CLOSE_ERROR, PROCESSOR_INITIALIZE_ERROR,
PROCESSOR_RUN_EXCEPTION, PROCESSOR_CLOSE_EXCEPTION, PROCESSOR_INITIALIZE_EXCEPTION,
PROCESSOR_COUNTER_EXCEEDED,
// VM
VM_INITIALIZE, VM_ON_ROOTVERTEX_INITIALIZE,VM_ON_SOURCETASK_COMPLETED, VM_ON_VERTEX_STARTED,
VM_ON_VERTEXMANAGEREVENT_RECEIVED,
// EdgeManager
EM_Initialize, EM_GetNumDestinationTaskPhysicalInputs, EM_GetNumSourceTaskPhysicalOutputs,
EM_RouteDataMovementEventToDestination, EM_GetNumDestinationConsumerTasks,
EM_RouteInputErrorEventToSource,
// Not Supported yet
// EM_RouteInputSourceTaskFailedEventToDestination,
// II
II_Initialize, II_HandleInputInitializerEvents, II_OnVertexStateUpdated
}
/**
* create a DAG with 2 vertices (v1 --> v2), set payload on Input/Output/Processor/VertexManagerPlugin to
* control where throw exception
*
* @param exLocation
* @return
* @throws IOException
*/
private DAG createDAG(ExceptionLocation exLocation) throws IOException {
DAG dag = DAG.create("dag_" + exLocation.name());
UserPayload payload =
UserPayload.create(ByteBuffer.wrap(exLocation.name().getBytes()));
Vertex v1 =
Vertex.create("v1", ProcessorWithException.getProcDesc(payload), 1);
InputDescriptor inputDesc = InputWithException.getInputDesc(payload);
InputInitializerDescriptor iiDesc =
InputInitializerWithException.getIIDesc(payload);
v1.addDataSource("input",
DataSourceDescriptor.create(inputDesc, iiDesc, null));
v1.setVertexManagerPlugin(RootInputVertexManagerWithException.getVMDesc(payload));
Vertex v2 =
Vertex.create("v2", DoNothingProcessor.getProcDesc(), 1);
v2.addDataSource("input2",
DataSourceDescriptor.create(InputDescriptor.create(NoOpInput.class.getName()),
InputInitializerWithException2.getIIDesc(payload), null));
dag.addVertex(v1)
.addVertex(v2);
if (exLocation.name().startsWith("EM_")) {
dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(
EdgeManagerPluginDescriptor.create(CustomEdgeManager.class.getName())
.setUserPayload(payload),
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
} else {
// set Customized VertexManager here, it can't been used for CustomEdge
v2.setVertexManagerPlugin(InputReadyVertexManagerWithException.getVMDesc(exLocation));
dag.addEdge(Edge.create(v1, v2, EdgeProperty.create(DataMovementType.ONE_TO_ONE,
DataSourceType.PERSISTED, SchedulingType.SEQUENTIAL,
OutputWithException.getOutputDesc(payload), InputWithException.getInputDesc(payload))));
}
return dag;
}
// InputInitializer of vertex1
public static class InputInitializerWithException extends InputInitializer {
private ExceptionLocation exLocation;
public InputInitializerWithException(
InputInitializerContext initializerContext) {
super(initializerContext);
this.exLocation =
ExceptionLocation.valueOf(new String(getContext().getUserPayload()
.deepCopyAsArray()));
}
@Override
public List<Event> initialize() throws Exception {
List<Event> events = new ArrayList<Event>();
events.add(InputDataInformationEvent.createWithObjectPayload(0, null));
return events;
}
@Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events)
throws Exception {
}
public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
return InputInitializerDescriptor.create(
InputInitializerWithException.class.getName())
.setUserPayload(payload);
}
}
// InputInitializer of vertex2
public static class InputInitializerWithException2 extends InputInitializer {
private ExceptionLocation exLocation;
private Object condition = new Object();
public InputInitializerWithException2(
InputInitializerContext initializerContext) {
super(initializerContext);
this.exLocation =
ExceptionLocation.valueOf(new String(getContext().getUserPayload()
.deepCopyAsArray()));
}
@Override
public List<Event> initialize() throws Exception {
if (exLocation == ExceptionLocation.II_Initialize) {
throw new Exception(exLocation.name());
}
if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
getContext().registerForVertexStateUpdates("v1", null);
}
if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents
|| exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
// wait for handleInputInitializerEvent() and onVertexStateUpdated() is invoked
synchronized (condition) {
condition.wait();
}
}
return null;
}
@Override
public void handleInputInitializerEvent(List<InputInitializerEvent> events)
throws Exception {
if (exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
throw new RuntimeException(exLocation.name());
}
}
@Override
public void onVertexStateUpdated(VertexStateUpdate stateUpdate)
throws Exception {
if (exLocation == ExceptionLocation.II_OnVertexStateUpdated) {
throw new Exception(exLocation.name());
}
super.onVertexStateUpdated(stateUpdate);
}
public static InputInitializerDescriptor getIIDesc(UserPayload payload) {
return InputInitializerDescriptor.create(
InputInitializerWithException2.class.getName())
.setUserPayload(payload);
}
}
// Input of vertex2
public static class InputWithException extends AbstractLogicalInput {
private ExceptionLocation exLocation;
private Object condition = new Object();
public InputWithException(InputContext inputContext, int numPhysicalInputs) {
super(inputContext, numPhysicalInputs);
this.exLocation =
ExceptionLocation.valueOf(new String(getContext().getUserPayload()
.deepCopyAsArray()));
}
@Override
public void start() throws Exception {
if (this.exLocation == ExceptionLocation.INPUT_START) {
throw new Exception(this.exLocation.name());
}
}
@Override
public Reader getReader() throws Exception {
if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
synchronized (condition) {
// wait for exception thrown from handleEvents. Otherwise,
// processor may exit before the exception from handleEvents is
// caught.
condition.wait();
}
}
if (this.exLocation == ExceptionLocation.INPUT_GET_READER) {
throw new Exception(this.exLocation.name());
}
return null;
}
@Override
public void handleEvents(List<Event> inputEvents) throws Exception {
if (this.exLocation == ExceptionLocation.INPUT_HANDLE_EVENTS) {
throw new Exception(this.exLocation.name());
}
}
@Override
public List<Event> close() throws Exception {
if (this.exLocation == ExceptionLocation.INPUT_CLOSE) {
throw new Exception(this.exLocation.name());
}
return null;
}
@Override
public List<Event> initialize() throws Exception {
getContext().requestInitialMemory(0l, null); // mandatory call
if (this.exLocation == ExceptionLocation.INPUT_INITIALIZE) {
throw new Exception(this.exLocation.name());
} else if ( getContext().getSourceVertexName().equals("v1")) {
if (this.exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource
|| this.exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
Event errorEvent = InputReadErrorEvent.create("read error", 0, 0);
return Lists.newArrayList(errorEvent);
}
}
return null;
}
public static InputDescriptor getInputDesc(UserPayload payload) {
return InputDescriptor.create(InputWithException.class.getName())
.setUserPayload(payload);
}
}
// Output of vertex1
public static class OutputWithException extends AbstractLogicalOutput {
private ExceptionLocation exLocation;
public OutputWithException(OutputContext outputContext,
int numPhysicalOutputs) {
super(outputContext, numPhysicalOutputs);
this.exLocation =
ExceptionLocation.valueOf(new String(getContext().getUserPayload()
.deepCopyAsArray()));
}
@Override
public void start() throws Exception {
if (this.exLocation == ExceptionLocation.OUTPUT_START) {
throw new Exception(this.exLocation.name());
}
}
@Override
public Writer getWriter() throws Exception {
if (this.exLocation == ExceptionLocation.OUTPUT_GET_WRITER) {
throw new Exception(this.exLocation.name());
}
return null;
}
@Override
public void handleEvents(List<Event> outputEvents) {
}
@Override
public List<Event> close() throws Exception {
if (this.exLocation == ExceptionLocation.OUTPUT_CLOSE) {
throw new RuntimeException(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
// send VertexManagerEvent to v2
List<Event> events = new ArrayList<Event>();
events.add(VertexManagerEvent.create("v2", ByteBuffer.wrap(new byte[0])));
return events;
} else if (this.exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
// send DataMovementEvent to v2
List<Event> events = new ArrayList<Event>();
events.add(DataMovementEvent.create(0, ByteBuffer.wrap(new byte[0])));
return events;
} else if (this.exLocation == ExceptionLocation.II_HandleInputInitializerEvents) {
// send InputInitliazer to InputInitializer of v2
List<Event> events = new ArrayList<Event>();
events.add(InputInitializerEvent.create("v2", "input2", ByteBuffer.wrap(new byte[0])));
return events;
} else {
return null;
}
}
@Override
public List<Event> initialize() throws Exception {
getContext().requestInitialMemory(0l, null); // mandatory call
if (this.exLocation == ExceptionLocation.OUTPUT_INITIALIZE) {
throw new RuntimeException(this.exLocation.name());
}
return null;
}
public static OutputDescriptor getOutputDesc(UserPayload payload) {
return OutputDescriptor.create(OutputWithException.class.getName())
.setUserPayload(payload);
}
}
public static class ProcessorWithException extends AbstractLogicalIOProcessor {
private ExceptionLocation exLocation;
public ProcessorWithException(ProcessorContext context) {
super(context);
this.exLocation =
ExceptionLocation.valueOf(new String(getContext().getUserPayload()
.deepCopyAsArray()));
}
@Override
public void run(Map<String, LogicalInput> inputs,
Map<String, LogicalOutput> outputs) throws Exception {
InputWithException input = (InputWithException) inputs.get("input");
input.start();
input.getReader();
OutputWithException output = (OutputWithException) outputs.get("v2");
output.start();
output.getWriter();
Thread.sleep(3*1000);
if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_ERROR) {
throw new Error(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.PROCESSOR_RUN_EXCEPTION) {
throw new Exception(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.PROCESSOR_COUNTER_EXCEEDED) {
// simulate the counter limitation exceeded
for (int i=0;i< TezConfiguration.TEZ_COUNTERS_MAX_DEFAULT+1; ++i) {
getContext().getCounters().findCounter("mycounter", "counter_"+i).increment(1);
}
}
}
@Override
public void handleEvents(List<Event> processorEvents) {
}
@Override
public void close() throws Exception {
if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE_ERROR) {
throw new Error(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.PROCESSOR_CLOSE_EXCEPTION) {
throw new Exception(this.exLocation.name());
}
}
@Override
public void initialize() throws Exception {
if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE_ERROR) {
throw new Error(this.exLocation.name());
} else if (this.exLocation == ExceptionLocation.PROCESSOR_INITIALIZE_EXCEPTION) {
throw new Exception(this.exLocation.name());
}
}
public static ProcessorDescriptor getProcDesc(UserPayload payload) {
return ProcessorDescriptor.create(ProcessorWithException.class.getName())
.setUserPayload(payload);
}
}
// VertexManager of vertex1
public static class RootInputVertexManagerWithException extends RootInputVertexManager {
private ExceptionLocation exLocation;
public RootInputVertexManagerWithException(VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
super.initialize();
this.exLocation =
ExceptionLocation.valueOf(new String(getContext().getUserPayload()
.deepCopyAsArray()));
if (this.exLocation == ExceptionLocation.VM_INITIALIZE) {
throw new RuntimeException(this.exLocation.name());
}
}
@Override
public void onRootVertexInitialized(String inputName,
InputDescriptor inputDescriptor, List<Event> events) {
if (this.exLocation == ExceptionLocation.VM_ON_ROOTVERTEX_INITIALIZE) {
throw new RuntimeException(this.exLocation.name());
}
super.onRootVertexInitialized(inputName, inputDescriptor, events);
}
@Override
public void onVertexStarted(Map<String, List<Integer>> completions) {
if (this.exLocation == ExceptionLocation.VM_ON_VERTEX_STARTED) {
throw new RuntimeException(this.exLocation.name());
}
super.onVertexStarted(completions);
}
public static VertexManagerPluginDescriptor getVMDesc(UserPayload payload) {
return VertexManagerPluginDescriptor.create(RootInputVertexManagerWithException.class.getName())
.setUserPayload(payload);
}
}
// VertexManager of vertex2
public static class InputReadyVertexManagerWithException extends InputReadyVertexManager {
private ExceptionLocation exLocation;
private static final String Test_ExceptionLocation = "Test.ExceptionLocation";
public InputReadyVertexManagerWithException(VertexManagerPluginContext context) {
super(context);
}
@Override
public void initialize() {
try {
super.initialize();
} catch (TezUncheckedException e) {
// workaround for testing
if (!e.getMessage().equals("Atleast 1 bipartite source should exist")) {
throw e;
}
}
Configuration conf;
try {
conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
this.exLocation = ExceptionLocation.valueOf(conf.get(Test_ExceptionLocation));
} catch (IOException e) {
throw new TezUncheckedException(e);
}
}
@Override
public void onSourceTaskCompleted(String srcVertexName, Integer attemptId) {
if (this.exLocation == ExceptionLocation.VM_ON_SOURCETASK_COMPLETED) {
throw new RuntimeException(this.exLocation.name());
}
super.onSourceTaskCompleted(srcVertexName, attemptId);
}
@Override
public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
if (this.exLocation == ExceptionLocation.VM_ON_VERTEXMANAGEREVENT_RECEIVED) {
throw new RuntimeException(this.exLocation.name());
}
super.onVertexManagerEventReceived(vmEvent);
}
public static VertexManagerPluginDescriptor getVMDesc(ExceptionLocation exLocation) throws IOException {
Configuration conf = new Configuration();
conf.set(Test_ExceptionLocation, exLocation.name());
UserPayload payload = TezUtils.createUserPayloadFromConf(conf);
return VertexManagerPluginDescriptor.create(InputReadyVertexManagerWithException.class.getName())
.setUserPayload(payload);
}
}
// EdgeManager for edge linking vertex1 and vertex2
public static class CustomEdgeManager extends OneToOneEdgeManager {
private ExceptionLocation exLocation;
public CustomEdgeManager(EdgeManagerPluginContext context) {
super(context);
this.exLocation =
ExceptionLocation.valueOf(new String(context.getUserPayload()
.deepCopyAsArray()));
}
@Override
public void initialize() {
if (exLocation == ExceptionLocation.EM_Initialize) {
throw new RuntimeException(exLocation.name());
}
try {
super.initialize();
} catch (TezUncheckedException e) {
// workaround for testing
if (!e.getMessage().equals("Atleast 1 bipartite source should exist")) {
throw e;
}
}
}
@Override
public int getNumDestinationConsumerTasks(int sourceTaskIndex) {
if (exLocation == ExceptionLocation.EM_GetNumDestinationConsumerTasks) {
throw new RuntimeException(exLocation.name());
}
return super.getNumDestinationConsumerTasks(sourceTaskIndex);
}
@Override
public int getNumSourceTaskPhysicalOutputs(int sourceTaskIndex) {
if (exLocation == ExceptionLocation.EM_GetNumSourceTaskPhysicalOutputs) {
throw new RuntimeException(exLocation.name());
}
LOG.info("ExLocation:" + exLocation);
return super.getNumSourceTaskPhysicalOutputs(sourceTaskIndex);
}
@Override
public int getNumDestinationTaskPhysicalInputs(int destinationTaskIndex) {
if (exLocation == ExceptionLocation.EM_GetNumDestinationTaskPhysicalInputs) {
throw new RuntimeException(exLocation.name());
}
return super.getNumDestinationTaskPhysicalInputs(destinationTaskIndex);
}
@Override
public void routeDataMovementEventToDestination(DataMovementEvent event,
int sourceTaskIndex, int sourceOutputIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
if (exLocation == ExceptionLocation.EM_RouteDataMovementEventToDestination) {
throw new RuntimeException(exLocation.name());
}
super.routeDataMovementEventToDestination(event, sourceTaskIndex,
sourceOutputIndex, destinationTaskAndInputIndices);
}
@Override
public int routeInputErrorEventToSource(InputReadErrorEvent event,
int destinationTaskIndex, int destinationFailedInputIndex) {
if (exLocation == ExceptionLocation.EM_RouteInputErrorEventToSource) {
throw new RuntimeException(exLocation.name());
}
return super.routeInputErrorEventToSource(event, destinationTaskIndex,
destinationFailedInputIndex);
}
@Override
public void routeInputSourceTaskFailedEventToDestination(
int sourceTaskIndex,
Map<Integer, List<Integer>> destinationTaskAndInputIndices) {
super.routeInputSourceTaskFailedEventToDestination(sourceTaskIndex,
destinationTaskAndInputIndices);
}
}
}