blob: e6a74321f15392730f81eb0ba4db1a0dbf623099 [file] [log] [blame]
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.task;
import java.lang.reflect.UndeclaredThrowableException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.tez.common.CallableWithNdc;
import org.apache.tez.common.TezCommonUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.impl.TezUmbilical;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is responsible for running a {@link LogicalIOProcessorRuntimeTask}.
* It does not worry about reporting errors, heartbeats etc.
*
* Returns success / interrupt / failure status via it's return parameter.
*
* It's the responsibility of the invoker to handle whatever exceptions may be generated by this.
*/
public class TaskRunner2Callable extends CallableWithNdc<TaskRunner2Callable.TaskRunner2CallableResult> {
private static final Logger LOG = LoggerFactory.getLogger(TaskRunner2Callable.class);
private final LogicalIOProcessorRuntimeTask task;
private final UserGroupInformation ugi;
private final AtomicBoolean stopRequested = new AtomicBoolean(false);
private final AtomicBoolean interruptAttempted = new AtomicBoolean(false);
private volatile Thread ownThread;
/**
* Protocol to send the events.
*/
private final TezUmbilical tezUmbilical;
public TaskRunner2Callable(LogicalIOProcessorRuntimeTask task,
final UserGroupInformation ugi, final TezUmbilical umbilical) {
this.task = task;
this.ugi = ugi;
this.tezUmbilical = umbilical;
}
@Override
public TaskRunner2CallableResult callInternal() throws Exception {
ownThread = Thread.currentThread();
if (stopRequested.get()) {
return new TaskRunner2CallableResult(null);
}
try {
return ugi.doAs(new PrivilegedExceptionAction<TaskRunner2CallableResult>() {
@Override
public TaskRunner2CallableResult run() throws Exception {
if (stopRequested.get() || Thread.currentThread().isInterrupted()) {
return new TaskRunner2CallableResult(null);
}
LOG.info("Initializing task" + ", taskAttemptId={}", task.getTaskAttemptID());
TezUtilsInternal.setHadoopCallerContext(task.getHadoopShim(), task.getTaskAttemptID());
TezCommonUtils.logCredentials(LOG, ugi.getCredentials(), "taskInit");
IOStatisticsContext.getCurrentIOStatisticsContext().reset();
task.initialize();
if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
LOG.info("Running task, taskAttemptId={}", task.getTaskAttemptID());
task.run();
} else {
LOG.info("Stopped before running the processor taskAttemptId={}",
task.getTaskAttemptID());
task.setFrameworkCounters();
return new TaskRunner2CallableResult(null);
}
if (!stopRequested.get() && !Thread.currentThread().isInterrupted()) {
LOG.info("Closing task, taskAttemptId={}", task.getTaskAttemptID());
task.close();
task.setFrameworkCounters();
} else {
LOG.info("Stopped before closing the processor, taskAttemptId={}", task.getTaskAttemptID());
task.setFrameworkCounters();
return new TaskRunner2CallableResult(null);
}
LOG.info("Task completed, taskAttemptId={}, askedToStop={}", task.getTaskAttemptID(), stopRequested.get());
return new TaskRunner2CallableResult(null);
}
});
} catch (Throwable t) {
if (t instanceof UndeclaredThrowableException) {
t = t.getCause();
}
task.setFrameworkCounters();
return new TaskRunner2CallableResult(t);
} finally {
// If a stop was requested. Make sure the interrupt status is set during the cleanup.
// One drawback of not communicating out from here is that task complete messages will only
// be sent out after cleanup is complete.
// For a successful task, however, this should be almost no delay since close has already happened.
maybeFixInterruptStatus();
LOG.info("Cleaning up task {}, stopRequested={}", task.getTaskAttemptID(), stopRequested.get());
String ioStats = IOStatisticsLogging.ioStatisticsToPrettyString(
IOStatisticsContext.getCurrentIOStatisticsContext().getIOStatistics());
if (StringUtils.isNotEmpty(ioStats)) {
LOG.info("TaskAttemptId={}, {}", task.getTaskAttemptID(), ioStats);
}
task.getOutputContexts().forEach(outputContext
-> outputContext.trapEvents(new TezTrapEventHandler(outputContext,
this.tezUmbilical)));
task.cleanup();
}
}
private void maybeFixInterruptStatus() {
if (stopRequested.get() && !Thread.currentThread().isInterrupted()) {
Thread.currentThread().interrupt();
}
}
public void abortTask() {
if (!stopRequested.getAndSet(true)) {
task.abortTask();
}
}
public void interruptTask() {
if (!interruptAttempted.getAndSet(true)) {
LogicalIOProcessorRuntimeTask localTask = task;
// Send an interrupt only if the task is not done.
if (ownThread != null && (localTask != null && !localTask.isTaskDone())) {
ownThread.interrupt();
}
}
}
public static class TaskRunner2CallableResult {
final Throwable error;
public TaskRunner2CallableResult(Throwable error) {
this.error = error;
}
}
public TezCounters addAndGetTezCounter(final String name) {
return task.addAndGetTezCounter(name);
}
}