blob: 6c25f0a245dcaffbc16486d98cd72314377415c7 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.task;
import static org.apache.tez.runtime.task.TaskExecutionTestHelpers.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Multimap;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.tez.common.TezExecutors;
import org.apache.tez.common.TezSharedExecutor;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.FileSystemCounter;
import org.apache.tez.common.counters.TaskCounter;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezReflectionException;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.records.TezDAGID;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezTaskID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.hadoop.shim.DefaultHadoopShim;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.ExecutionContext;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.ObjectRegistry;
import org.apache.tez.runtime.api.impl.ExecutionContextImpl;
import org.apache.tez.runtime.api.impl.InputSpec;
import org.apache.tez.runtime.api.impl.OutputSpec;
import org.apache.tez.runtime.api.impl.TaskSpec;
import org.apache.tez.runtime.common.resources.ScalingAllocator;
import org.apache.tez.runtime.internals.api.TaskReporterInterface;
import org.apache.tez.runtime.task.TaskExecutionTestHelpers.TestProcessor;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestTaskExecution2 {
private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecution2.class);
private static final Configuration defaultConf = new Configuration();
private static final FileSystem localFs;
private static final Path workDir;
private static final String FAILURE_START_STRING = "Error while running task ( failure )";
private static final String KILL_START_STRING = "Error while running task ( kill )";
private static final ExecutorService taskExecutor = Executors.newFixedThreadPool(1);
static {
defaultConf.set("fs.defaultFS", "file:///");
defaultConf.set(TezConfiguration.TEZ_TASK_SCALE_MEMORY_ALLOCATOR_CLASS,
ScalingAllocator.class.getName());
try {
localFs = FileSystem.getLocal(defaultConf);
Path wd = new Path(System.getProperty("test.build.data", "/tmp"),
TestTaskExecution2.class.getSimpleName());
workDir = localFs.makeQualified(wd);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Before
public void reset() {
TestProcessor.reset();
}
@AfterClass
public static void shutdown() {
taskExecutor.shutdownNow();
}
@Test(timeout = 5000)
public void testSingleSuccessfulTask() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testMultipleSuccessfulTasks() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY, true);
LogicalIOProcessorRuntimeTask runtimeTask = taskRunner.task;
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture = taskExecutor.submit(
new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
umbilical.resetTrackedEvents();
TezCounters tezCounters = runtimeTask.getCounters();
verifySysCounters(tezCounters, 5, 5);
taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY, false);
runtimeTask = taskRunner.task;
// Setup the executor
taskRunnerFuture = taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.signal();
result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
assertFalse(TestProcessor.wasAborted());
tezCounters = runtimeTask.getCounters();
verifySysCounters(tezCounters, -1, -1);
} finally {
executor.shutdownNow();
}
}
// test task failed due to exception in Processor
@Test(timeout = 5000)
public void testFailedTaskTezException() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_THROW_TEZ_EXCEPTION);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorTezException(), false, TaskFailureType.NON_FATAL);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
TezException.class.getName() + ": " + TezException.class.getSimpleName());
// Failure detected as a result of fall off from the run method. abort isn't required.
assertFalse(TestProcessor.wasAborted());
assertTrue(taskRunner.task.getCounters().countCounters() != 0);
} finally {
executor.shutdownNow();
}
}
// Test task failed due to Processor class not found
@Test(timeout = 5000)
public void testFailedTask2() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
"NotExitedProcessor", TestProcessor.CONF_EMPTY, false, true);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR,
new TezReflectionException("TezReflectionException"), false, TaskFailureType.NON_FATAL);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(FAILURE_START_STRING,
":org.apache.tez.dag.api.TezReflectionException: "
+ "Unable to load class: NotExitedProcessor");
// Failure detected as a result of fall off from the run method. abort isn't required.
assertFalse(TestProcessor.wasAborted());
assertTrue(taskRunner.task.getCounters().countCounters() != 0);
} finally {
executor.shutdownNow();
}
}
// test task failed due to exception in Processor
@Test(timeout = 5000)
public void testFailedTaskIOException() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_THROW_IO_EXCEPTION);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL);
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName());
// Failure detected as a result of fall off from the run method. abort isn't required.
assertFalse(TestProcessor.wasAborted());
assertTrue(taskRunner.task.getCounters().countCounters() != 0);
} finally {
executor.shutdownNow();
}
}
// test that makes sure errors aren't reported when the container is already failing
@Test(timeout = 5000)
public void testIgnoreErrorsDuringFailure() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = new TaskReporter(umbilical, 100, 1000, 100, new AtomicLong(0),
createContainerId(appId).toString()) {
@Override
protected boolean isShuttingDown() {
return true;
}
};
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_THROW_IO_EXCEPTION);
// Setup the executor
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
umbilical.verifyNoCompletionEvents();
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testHeartbeatException() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalThrowException();
umbilical.awaitRegisteredEvent();
// Not signaling an actual start to verify task interruption
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.COMMUNICATION_FAILURE,
new IOException("IOException"),
TaskExecutionTestHelpers.HEARTBEAT_EXCEPTION_STRING, false, TaskFailureType.NON_FATAL);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// No completion events since umbilical communication already failed.
umbilical.verifyNoCompletionEvents();
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testHeartbeatShouldDie() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
umbilical.signalSendShouldDie();
umbilical.awaitRegisteredEvent();
// Not signaling an actual start to verify task interruption
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.CONTAINER_STOP_REQUESTED, null, true, null);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// TODO Is this statement correct ?
// No completion events since shouldDie was requested by the AM, which should have killed the
// task.
umbilical.verifyNoCompletionEvents();
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testSignalDeprecatedFatalErrorAndLoop() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_SIGNAL_DEPRECATEDFATAL_AND_LOOP);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TestProcessor.awaitLoop();
// The fatal error should have caused an interrupt.
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName());
// Signal fatal error should cause the processor to fail.
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testSignalFatalAndThrow() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_SIGNAL_FATAL_AND_THROW);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.FATAL);
TestProcessor.awaitCompletion();
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName(), TaskFailureType.FATAL);
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testSignalNonFatalAndThrow() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_SIGNAL_NON_FATAL_AND_THROW);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_ERROR, createProcessorIOException(), false, TaskFailureType.NON_FATAL);
TestProcessor.awaitCompletion();
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskFailedEvent(
FAILURE_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName(), TaskFailureType.NON_FATAL);
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testTaskSelfKill() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_SELF_KILL_AND_COMPLETE);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.TASK_KILL_REQUEST, createProcessorIOException(), false,
null);
TestProcessor.awaitCompletion();
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskKilledEvent(
KILL_START_STRING,
IOException.class.getName() + ": " + IOException.class.getSimpleName());
assertTrue(TestProcessor.wasAborted());
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testTaskKilled() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2 taskRunner = createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
taskRunner.killTask();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.KILL_REQUESTED, null, false, null);
TestProcessor.awaitCompletion();
assertTrue(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
// Kill events are not sent over the umbilical at the moment.
umbilical.verifyNoCompletionEvents();
} finally {
executor.shutdownNow();
}
}
@Test(timeout = 5000)
public void testKilledAfterComplete() throws IOException, InterruptedException, TezException,
ExecutionException {
ListeningExecutorService executor = null;
try {
ExecutorService rawExecutor = Executors.newFixedThreadPool(1);
executor = MoreExecutors.listeningDecorator(rawExecutor);
ApplicationId appId = ApplicationId.newInstance(10000, 1);
TaskExecutionTestHelpers.TezTaskUmbilicalForTest
umbilical = new TaskExecutionTestHelpers.TezTaskUmbilicalForTest();
TaskReporter taskReporter = createTaskReporter(appId, umbilical);
TezTaskRunner2ForTest taskRunner =
createTaskRunnerForTest(appId, umbilical, taskReporter, executor,
TestProcessor.CONF_EMPTY);
// Setup the executor
Future<TaskRunner2Result> taskRunnerFuture =
taskExecutor.submit(new TaskRunnerCallable2ForTest(taskRunner));
// Signal the processor to go through
TestProcessor.awaitStart();
TestProcessor.signal();
TestProcessor.awaitCompletion();
taskRunner.awaitCallableCompletion();
taskRunner.killTask();
TaskRunner2Result result = taskRunnerFuture.get();
verifyTaskRunnerResult(result, EndReason.SUCCESS, null, false, null);
assertFalse(TestProcessor.wasInterrupted());
assertNull(taskReporter.currentCallable);
umbilical.verifyTaskSuccessEvent();
} finally {
executor.shutdownNow();
}
}
private void verifySysCounters(TezCounters tezCounters, int minTaskCounterCount, int minFsCounterCount) {
Preconditions.checkArgument((minTaskCounterCount > 0 && minFsCounterCount > 0) ||
(minTaskCounterCount <= 0 && minFsCounterCount <= 0),
"Both targetCounter counts should be postitive or negative. A mix is not expected");
int numTaskCounters = 0;
int numFsCounters = 0;
for (CounterGroup counterGroup : tezCounters) {
if (counterGroup.getName().equals(TaskCounter.class.getName())) {
for (TezCounter ignored : counterGroup) {
numTaskCounters++;
}
} else if (counterGroup.getName().equals(FileSystemCounter.class.getName())) {
for (TezCounter ignored : counterGroup) {
numFsCounters++;
}
}
}
// If Target <=0, assert counter count is exactly 0
if (minTaskCounterCount <= 0) {
assertEquals(tezCounters.toString(), 0, numTaskCounters);
assertEquals(tezCounters.toString(), 0, numFsCounters);
} else {
assertTrue(numTaskCounters >= minTaskCounterCount);
assertTrue(numFsCounters >= minFsCounterCount);
}
}
private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
EndReason expectedEndReason, Throwable expectedThrowable,
boolean wasShutdownRequested,
TaskFailureType taskFailureType) {
verifyTaskRunnerResult(taskRunner2Result, expectedEndReason, expectedThrowable, null,
wasShutdownRequested, taskFailureType);
}
private void verifyTaskRunnerResult(TaskRunner2Result taskRunner2Result,
EndReason expectedEndReason, Throwable expectedThrowable,
String expectedExceptionMessage,
boolean wasShutdownRequested,
TaskFailureType taskFailureType) {
assertEquals(expectedEndReason, taskRunner2Result.getEndReason());
if (expectedThrowable == null) {
assertNull(taskRunner2Result.getError());
} else {
assertNotNull(taskRunner2Result.getError());
Throwable cause = taskRunner2Result.getError();
LOG.info(cause.getClass().getName());
assertTrue(cause.getClass().isAssignableFrom(expectedThrowable.getClass()));
if (expectedExceptionMessage != null) {
assertTrue(cause.getMessage().contains(expectedExceptionMessage));
}
}
assertEquals(taskFailureType, taskRunner2Result.getTaskFailureType());
assertEquals(wasShutdownRequested, taskRunner2Result.isContainerShutdownRequested());
}
private static class TaskRunnerCallable2ForTest implements Callable<TaskRunner2Result> {
private final TezTaskRunner2 taskRunner;
public TaskRunnerCallable2ForTest(TezTaskRunner2 taskRunner) {
this.taskRunner = taskRunner;
}
@Override
public TaskRunner2Result call() throws Exception {
return taskRunner.run();
}
}
private TezTaskRunner2 createTaskRunner(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
ListeningExecutorService executor, byte[] processorConf) throws
IOException {
return createTaskRunner(appId, umbilical, taskReporter, executor, processorConf, true);
}
private TezTaskRunner2 createTaskRunner(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
ListeningExecutorService executor, byte[] processorConf,
boolean updateSysCounters)
throws IOException {
return createTaskRunner(appId, umbilical, taskReporter, executor, TestProcessor.class.getName(),
processorConf, false, updateSysCounters);
}
private TezTaskRunner2ForTest createTaskRunnerForTest(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
ListeningExecutorService executor,
byte[] processorConf)
throws IOException {
return (TezTaskRunner2ForTest) createTaskRunner(appId, umbilical, taskReporter, executor,
TestProcessor.class.getName(),
processorConf, true, true);
}
private TezTaskRunner2 createTaskRunner(ApplicationId appId,
TaskExecutionTestHelpers.TezTaskUmbilicalForTest umbilical,
TaskReporter taskReporter,
ListeningExecutorService executor, String processorClass,
byte[] processorConf, boolean testRunner,
boolean updateSysCounters) throws
IOException {
TezConfiguration tezConf = new TezConfiguration(defaultConf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Path testDir = new Path(workDir, UUID.randomUUID().toString());
String[] localDirs = new String[]{testDir.toString()};
TezDAGID dagId = TezDAGID.getInstance(appId, 1);
TezVertexID vertexId = TezVertexID.getInstance(dagId, 1);
TezTaskID taskId = TezTaskID.getInstance(vertexId, 1);
TezTaskAttemptID taskAttemptId = TezTaskAttemptID.getInstance(taskId, 1);
ProcessorDescriptor processorDescriptor = ProcessorDescriptor.create(processorClass)
.setUserPayload(UserPayload.create(ByteBuffer.wrap(processorConf)));
TaskSpec taskSpec =
new TaskSpec(taskAttemptId, "dagName", "vertexName", -1, processorDescriptor,
new ArrayList<InputSpec>(), new ArrayList<OutputSpec>(), null, null);
TezExecutors sharedExecutor = new TezSharedExecutor(tezConf);
TezTaskRunner2 taskRunner;
if (testRunner) {
taskRunner = new TezTaskRunner2ForTest(tezConf, ugi, localDirs, taskSpec, 1,
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
HashMultimap.<String, String>create(), taskReporter,
executor, null, "", new ExecutionContextImpl("localhost"),
Runtime.getRuntime().maxMemory(), updateSysCounters, sharedExecutor);
} else {
taskRunner = new TezTaskRunner2(tezConf, ugi, localDirs, taskSpec, 1,
new HashMap<String, ByteBuffer>(), new HashMap<String, String>(),
HashMultimap.<String, String>create(), taskReporter,
executor, null, "", new ExecutionContextImpl("localhost"),
Runtime.getRuntime().maxMemory(), updateSysCounters, new DefaultHadoopShim(),
sharedExecutor);
}
return taskRunner;
}
public static class TezTaskRunner2ForTest extends TezTaskRunner2 {
private final ReentrantLock testLock = new ReentrantLock();
private final Condition callableCompletionCondition = testLock.newCondition();
private final AtomicBoolean isCallableComplete = new AtomicBoolean(false);
public TezTaskRunner2ForTest(Configuration tezConf, UserGroupInformation ugi,
String[] localDirs,
TaskSpec taskSpec, int appAttemptNumber,
Map<String, ByteBuffer> serviceConsumerMetadata,
Map<String, String> serviceProviderEnvMap,
Multimap<String, String> startedInputsMap,
TaskReporterInterface taskReporter,
ListeningExecutorService executor,
ObjectRegistry objectRegistry,
String pid,
ExecutionContext executionContext,
long memAvailable,
boolean updateSysCounters,
TezExecutors sharedExecutor) throws IOException {
super(tezConf, ugi, localDirs, taskSpec, appAttemptNumber, serviceConsumerMetadata,
serviceProviderEnvMap, startedInputsMap, taskReporter, executor, objectRegistry, pid,
executionContext, memAvailable, updateSysCounters, new DefaultHadoopShim(),
sharedExecutor);
}
@Override
@VisibleForTesting
void processCallableResult(TaskRunner2Callable.TaskRunner2CallableResult executionResult) {
testLock.lock();
try {
super.processCallableResult(executionResult);
isCallableComplete.set(true);
callableCompletionCondition.signal();
} finally {
testLock.unlock();
}
}
void awaitCallableCompletion() throws InterruptedException {
testLock.lock();
try {
while (!isCallableComplete.get()) {
callableCompletionCondition.await();
}
} finally {
testLock.unlock();
}
}
}
}