blob: ac6ebde428ced1a55735669c2ea5de9d7a2e7a33 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.tests;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.EnumSet;
import java.util.List;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.tez.client.TezClient;
import org.apache.tez.common.TezUtils;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.client.DAGClient;
import org.apache.tez.dag.api.client.DAGStatus;
import org.apache.tez.dag.api.client.StatusGetOpts;
import org.apache.tez.dag.app.ErrorPluginConfiguration;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.launcher.TezTestServiceContainerLauncherWithErrors;
import org.apache.tez.dag.app.launcher.TezTestServiceNoOpContainerLauncher;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerService;
import org.apache.tez.dag.app.rm.TezTestServiceTaskSchedulerServiceWithErrors;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorImpl;
import org.apache.tez.dag.app.taskcomm.TezTestServiceTaskCommunicatorWithErrors;
import org.apache.tez.examples.JoinValidateConfigured;
import org.apache.tez.serviceplugins.api.ContainerLauncherDescriptor;
import org.apache.tez.serviceplugins.api.ServicePluginErrorDefaults;
import org.apache.tez.serviceplugins.api.ServicePluginsDescriptor;
import org.apache.tez.serviceplugins.api.TaskCommunicatorDescriptor;
import org.apache.tez.serviceplugins.api.TaskSchedulerDescriptor;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestExternalTezServicesErrors {
private static final Logger LOG = LoggerFactory.getLogger(TestExternalTezServicesErrors.class);
private static final String EXT_PUSH_ENTITY_NAME = "ExtServiceTestPush";
private static final String EXT_THROW_ERROR_ENTITY_NAME = "ExtServiceTestThrowErrors";
private static final String EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportNonFatalErrors";
private static final String EXT_REPORT_FATAL_ERROR_ENTITY_NAME = "ExtServiceTestReportFatalErrors";
private static final String SUFFIX_LAUNCHER = "ContainerLauncher";
private static final String SUFFIX_TASKCOMM = "TaskCommunicator";
private static final String SUFFIX_SCHEDULER = "TaskScheduler";
private static ExternalTezServiceTestHelper extServiceTestHelper;
private static ServicePluginsDescriptor servicePluginsDescriptor;
private static final Path SRC_DATA_DIR = new Path("/tmp/" + TestExternalTezServicesErrors.class.getSimpleName());
private static final Path HASH_JOIN_EXPECTED_RESULT_PATH = new Path(SRC_DATA_DIR, "expectedOutputPath");
private static final Path HASH_JOIN_OUTPUT_PATH = new Path(SRC_DATA_DIR, "outPath");
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_EXT_SERVICE_PUSH =
Vertex.VertexExecutionContext.create(
EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
// Throw error contexts
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_THROW =
Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_THROW =
Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
EXT_THROW_ERROR_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_THROW =
Vertex.VertexExecutionContext.create(EXT_THROW_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
// Report-non-fatal contexts
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL =
Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL =
Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL =
Vertex.VertexExecutionContext.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
// Report fatal contexts
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL =
Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL =
Vertex.VertexExecutionContext.create(EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME,
EXT_REPORT_FATAL_ERROR_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL =
Vertex.VertexExecutionContext.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME, EXT_PUSH_ENTITY_NAME, EXT_PUSH_ENTITY_NAME);
private static final Vertex.VertexExecutionContext EXECUTION_CONTEXT_DEFAULT = EXECUTION_CONTEXT_EXT_SERVICE_PUSH;
private static String TEST_ROOT_DIR = "target" + Path.SEPARATOR + TestExternalTezServicesErrors.class.getName()
+ "-tmpDir";
@BeforeClass
public static void setup() throws Exception {
extServiceTestHelper = new ExternalTezServiceTestHelper(TEST_ROOT_DIR);
UserPayload userPayload =
TezUtils.createUserPayloadFromConf(extServiceTestHelper.getConfForJobs());
UserPayload userPayloadThrowError =
ErrorPluginConfiguration.toUserPayload(ErrorPluginConfiguration.createThrowErrorConf());
UserPayload userPayloadReportFatalErrorLauncher = ErrorPluginConfiguration
.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_LAUNCHER));
UserPayload userPayloadReportFatalErrorTaskComm = ErrorPluginConfiguration
.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_TASKCOMM));
UserPayload userPayloadReportFatalErrorScheduler = ErrorPluginConfiguration
.toUserPayload(ErrorPluginConfiguration.createReportFatalErrorConf(SUFFIX_SCHEDULER));
UserPayload userPayloadReportNonFatalErrorLauncher = ErrorPluginConfiguration
.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_LAUNCHER));
UserPayload userPayloadReportNonFatalErrorTaskComm = ErrorPluginConfiguration
.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_TASKCOMM));
UserPayload userPayloadReportNonFatalErrorScheduler = ErrorPluginConfiguration
.toUserPayload(ErrorPluginConfiguration.createReportNonFatalErrorConf(SUFFIX_SCHEDULER));
TaskSchedulerDescriptor[] taskSchedulerDescriptors = new TaskSchedulerDescriptor[]{
TaskSchedulerDescriptor
.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskSchedulerService.class.getName())
.setUserPayload(userPayload),
TaskSchedulerDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
userPayloadThrowError),
TaskSchedulerDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
userPayloadReportFatalErrorScheduler),
TaskSchedulerDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
TezTestServiceTaskSchedulerServiceWithErrors.class.getName()).setUserPayload(
userPayloadReportNonFatalErrorScheduler),
};
ContainerLauncherDescriptor[] containerLauncherDescriptors = new ContainerLauncherDescriptor[]{
ContainerLauncherDescriptor
.create(EXT_PUSH_ENTITY_NAME, TezTestServiceNoOpContainerLauncher.class.getName())
.setUserPayload(userPayload),
ContainerLauncherDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadThrowError),
ContainerLauncherDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorLauncher),
ContainerLauncherDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
TezTestServiceContainerLauncherWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorLauncher)
};
TaskCommunicatorDescriptor[] taskCommunicatorDescriptors = new TaskCommunicatorDescriptor[]{
TaskCommunicatorDescriptor
.create(EXT_PUSH_ENTITY_NAME, TezTestServiceTaskCommunicatorImpl.class.getName())
.setUserPayload(userPayload),
TaskCommunicatorDescriptor.create(EXT_THROW_ERROR_ENTITY_NAME,
TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadThrowError),
TaskCommunicatorDescriptor.create(EXT_REPORT_FATAL_ERROR_ENTITY_NAME,
TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportFatalErrorTaskComm),
TaskCommunicatorDescriptor.create(EXT_REPORT_NON_FATAL_ERROR_ENTITY_NAME,
TezTestServiceTaskCommunicatorWithErrors.class.getName()).setUserPayload(userPayloadReportNonFatalErrorTaskComm)
};
servicePluginsDescriptor = ServicePluginsDescriptor.create(true, true,
taskSchedulerDescriptors, containerLauncherDescriptors, taskCommunicatorDescriptors);
extServiceTestHelper.setupSharedTezClient(servicePluginsDescriptor);
// Generate the join data set used for each run.
// Can a timeout be enforced here ?
Path dataPath1 = new Path(SRC_DATA_DIR, "inPath1");
Path dataPath2 = new Path(SRC_DATA_DIR, "inPath2");
extServiceTestHelper
.setupHashJoinData(SRC_DATA_DIR, dataPath1, dataPath2, HASH_JOIN_EXPECTED_RESULT_PATH, HASH_JOIN_OUTPUT_PATH);
extServiceTestHelper.shutdownSharedTezClient();
}
@AfterClass
public static void tearDown() throws IOException, TezException {
extServiceTestHelper.tearDownAll();
}
@Test(timeout = 90000)
public void testContainerLauncherThrowError() throws Exception {
testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_LAUNCHER_THROW,
SUFFIX_LAUNCHER, Lists.newArrayList("Service Error",
DAGAppMasterEventType.CONTAINER_LAUNCHER_SERVICE_FATAL_ERROR.name()));
}
@Test(timeout = 90000)
public void testTaskCommunicatorThrowError() throws Exception {
testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_TASKCOMM_THROW,
SUFFIX_TASKCOMM, Lists.newArrayList("Service Error",
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR.name()));
}
@Test(timeout = 90000)
public void testTaskSchedulerThrowError() throws Exception {
testFatalError("_testContainerLauncherError_", EXECUTION_CONTEXT_SCHEDULER_THROW,
SUFFIX_SCHEDULER, Lists.newArrayList("Service Error",
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR.name()));
}
@Test (timeout = 150000)
public void testNonFatalErrors() throws IOException, TezException, InterruptedException {
String methodName = "testNonFatalErrors";
TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
TezClient tezClient = TezClient
.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
tezClientConf)
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
try {
tezClient.start();
LOG.info("TezSessionStarted for " + methodName);
tezClient.waitTillReady();
LOG.info("TezSession ready for submission for " + methodName);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_LAUNCHER, EXECUTION_CONTEXT_LAUNCHER_REPORT_NON_FATAL);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_TASKCOMM, EXECUTION_CONTEXT_TASKCOMM_REPORT_NON_FATAL);
runAndVerifyForNonFatalErrors(tezClient, SUFFIX_SCHEDULER, EXECUTION_CONTEXT_SCHEDULER_REPORT_NON_FATAL);
} finally {
tezClient.stop();
}
}
@Test(timeout = 90000)
public void testContainerLauncherReportFatalError() throws Exception {
testFatalError("_testContainerLauncherReportFatalError_",
EXECUTION_CONTEXT_LAUNCHER_REPORT_FATAL, SUFFIX_LAUNCHER, Lists
.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
}
@Test(timeout = 90000)
public void testTaskCommReportFatalError() throws Exception {
testFatalError("_testTaskCommReportFatalError_", EXECUTION_CONTEXT_TASKCOMM_REPORT_FATAL,
SUFFIX_TASKCOMM, Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
}
@Test(timeout = 90000)
public void testTaskSchedulerReportFatalError() throws Exception {
testFatalError("_testTaskSchedulerReportFatalError_",
EXECUTION_CONTEXT_SCHEDULER_REPORT_FATAL, SUFFIX_SCHEDULER,
Lists.newArrayList(ErrorPluginConfiguration.REPORT_FATAL_ERROR_MESSAGE,
ServicePluginErrorDefaults.INCONSISTENT_STATE.name()));
}
private void testFatalError(String methodName,
Vertex.VertexExecutionContext lhsExecutionContext,
String dagNameSuffix, List<String> expectedDiagMessages) throws
IOException, TezException, YarnException, InterruptedException {
TezConfiguration tezClientConf = new TezConfiguration(extServiceTestHelper.getConfForJobs());
TezClient tezClient = TezClient
.newBuilder(TestExternalTezServicesErrors.class.getSimpleName() + methodName + "_session",
tezClientConf)
.setIsSession(true).setServicePluginDescriptor(servicePluginsDescriptor).build();
ApplicationId appId= null;
try {
tezClient.start();
LOG.info("TezSessionStarted for " + methodName);
tezClient.waitTillReady();
LOG.info("TezSession ready for submission for " + methodName);
JoinValidateConfigured joinValidate =
new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsExecutionContext,
EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
EXECUTION_CONTEXT_EXT_SERVICE_PUSH, dagNameSuffix);
DAG dag = joinValidate
.createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()),
HASH_JOIN_EXPECTED_RESULT_PATH,
HASH_JOIN_OUTPUT_PATH, 3);
DAGClient dagClient = tezClient.submitDAG(dag);
DAGStatus dagStatus =
dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
assertEquals(DAGStatus.State.ERROR, dagStatus.getState());
boolean foundDiag = false;
for (String diag : dagStatus.getDiagnostics()) {
foundDiag = checkDiag(diag, expectedDiagMessages);
if (foundDiag) {
break;
}
}
appId = tezClient.getAppMasterApplicationId();
assertTrue(foundDiag);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
tezClient.stop();
}
// Verify the state of the application.
if (appId != null) {
YarnClient yarnClient = YarnClient.createYarnClient();
try {
yarnClient.init(tezClientConf);
yarnClient.start();
ApplicationReport appReport = yarnClient.getApplicationReport(appId);
YarnApplicationState appState = appReport.getYarnApplicationState();
while (!EnumSet.of(YarnApplicationState.FINISHED, YarnApplicationState.FAILED,
YarnApplicationState.KILLED).contains(appState)) {
Thread.sleep(200L);
appReport = yarnClient.getApplicationReport(appId);
appState = appReport.getYarnApplicationState();
}
// TODO Workaround for YARN-4554. AppReport does not provide diagnostics - need to fetch them from ApplicationAttemptReport
ApplicationAttemptId appAttemptId = appReport.getCurrentApplicationAttemptId();
ApplicationAttemptReport appAttemptReport =
yarnClient.getApplicationAttemptReport(appAttemptId);
String diag = appAttemptReport.getDiagnostics();
assertEquals(FinalApplicationStatus.FAILED, appReport.getFinalApplicationStatus());
assertEquals(YarnApplicationState.FINISHED, appReport.getYarnApplicationState());
checkDiag(diag, expectedDiagMessages);
} finally {
yarnClient.stop();
}
}
}
private boolean checkDiag(String diag, List<String> expected) {
boolean found = true;
for (String exp : expected) {
if (diag.contains(exp)) {
found = true;
continue;
} else {
found = false;
break;
}
}
return found;
}
private void runAndVerifyForNonFatalErrors(TezClient tezClient, String componentName,
Vertex.VertexExecutionContext lhsContext) throws
TezException,
InterruptedException, IOException {
LOG.info("Running JoinValidate with componentName reportNonFatalException");
JoinValidateConfigured joinValidate =
new JoinValidateConfigured(EXECUTION_CONTEXT_DEFAULT, lhsContext,
EXECUTION_CONTEXT_EXT_SERVICE_PUSH,
EXECUTION_CONTEXT_EXT_SERVICE_PUSH, componentName);
DAG dag = joinValidate
.createDag(new TezConfiguration(extServiceTestHelper.getConfForJobs()),
HASH_JOIN_EXPECTED_RESULT_PATH,
HASH_JOIN_OUTPUT_PATH, 3);
DAGClient dagClient = tezClient.submitDAG(dag);
DAGStatus dagStatus =
dagClient.waitForCompletionWithStatusUpdates(Sets.newHashSet(StatusGetOpts.GET_COUNTERS));
assertEquals(DAGStatus.State.FAILED, dagStatus.getState());
boolean foundDiag = false;
for (String diag : dagStatus.getDiagnostics()) {
if (diag.contains(ErrorPluginConfiguration.REPORT_NONFATAL_ERROR_MESSAGE) &&
diag.contains(ServicePluginErrorDefaults.SERVICE_UNAVAILABLE.name())) {
foundDiag = true;
break;
}
}
assertTrue(foundDiag);
}
}