blob: 4def43f201a5542147c45a56f46b709ba8f6df78 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.runtime.task;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.log4j.Logger;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.common.counters.TezCounters;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask;
import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Responsible for communication between tasks running in a Container and the ApplicationMaster.
* Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to
* retrieve events specific to this task.
*
*/
public class TaskReporter {
private static final Logger LOG = Logger.getLogger(TaskReporter.class);
private final TezTaskUmbilicalProtocol umbilical;
private final long pollInterval;
private final long sendCounterInterval;
private final int maxEventsToGet;
private final AtomicLong requestCounter;
private final String containerIdStr;
private final ListeningExecutorService heartbeatExecutor;
@VisibleForTesting
HeartbeatCallable currentCallable;
public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval,
long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
this.umbilical = umbilical;
this.pollInterval = amPollInterval;
this.sendCounterInterval = sendCounterInterval;
this.maxEventsToGet = maxEventsToGet;
this.requestCounter = requestCounter;
this.containerIdStr = containerIdStr;
ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat("TaskHeartbeatThread").build());
heartbeatExecutor = MoreExecutors.listeningDecorator(executor);
}
/**
* Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc.
*/
public synchronized void registerTask(LogicalIOProcessorRuntimeTask task,
ErrorReporter errorReporter) {
currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval,
maxEventsToGet, requestCounter, containerIdStr);
ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable);
Futures.addCallback(future, new HeartbeatCallback(errorReporter));
}
/**
* This method should always be invoked before setting up heartbeats for another task running in
* the same container.
*/
public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) {
currentCallable.markComplete();
currentCallable = null;
}
public void shutdown() {
heartbeatExecutor.shutdownNow();
}
@VisibleForTesting
static class HeartbeatCallable implements Callable<Boolean> {
private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds
private static final float LOG_COUNTER_BACKOFF = 1.3f;
private final LogicalIOProcessorRuntimeTask task;
private EventMetaData updateEventMetadata;
private final TezTaskUmbilicalProtocol umbilical;
private final long pollInterval;
private final long sendCounterInterval;
private final int maxEventsToGet;
private final String containerIdStr;
private final AtomicLong requestCounter;
private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>();
private final ReentrantLock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
/*
* Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send /
* log counters.
*/
private int nonOobHeartbeatCounter = 0;
private int nextHeartbeatNumToLog = 0;
/*
* Tracks the last non-OOB heartbeat number at which counters were sent to the AM.
*/
private int prevCounterSendHeartbeatNum = 0;
public HeartbeatCallable(LogicalIOProcessorRuntimeTask task,
TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval,
int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) {
this.pollInterval = amPollInterval;
this.sendCounterInterval = sendCounterInterval;
this.maxEventsToGet = maxEventsToGet;
this.requestCounter = requestCounter;
this.containerIdStr = containerIdStr;
this.task = task;
this.umbilical = umbilical;
this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM,
task.getVertexName(), "", task.getTaskAttemptID());
nextHeartbeatNumToLog = (Math.max(1,
(int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f
: (float) amPollInterval))));
}
@Override
public Boolean call() throws Exception {
// Heartbeat only for active tasks. Errors, etc will be reported directly.
while (!task.isTaskDone() && !task.hadFatalError()) {
ResponseWrapper response = heartbeat(null);
if (response.shouldDie) {
// AM sent a shouldDie=true
LOG.info("Asked to die via task heartbeat");
return false;
} else {
if (response.numEvents < maxEventsToGet) {
// Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat
lock.lock();
try {
boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS);
if (!interrupted) {
nonOobHeartbeatCounter++;
}
} finally {
lock.unlock();
}
}
}
}
int pendingEventCount = eventsToSend.size();
if (pendingEventCount > 0) {
LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount);
}
return true;
}
/**
* @param eventsArg
* @return
* @throws IOException
* indicates an RPC communication failure.
* @throws TezException
* indicates an exception somewhere in the AM.
*/
private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException,
TezException {
if (eventsArg != null) {
eventsToSend.addAll(eventsArg);
}
TezEvent updateEvent = null;
List<TezEvent> events = new ArrayList<TezEvent>();
eventsToSend.drainTo(events);
if (!task.isTaskDone() && !task.hadFatalError()) {
TezCounters counters = null;
/**
* Increasing the heartbeat interval can delay the delivery of events. Sending just updated
* records would save CPU in DAG AM, but certain counters are updated very frequently. Until
* real time decisions are made based on these counters, it can be sent once per second.
*/
// Not completely accurate, since OOB heartbeats could go out.
if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) {
counters = task.getCounters();
prevCounterSendHeartbeatNum = nonOobHeartbeatCounter;
}
updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()),
updateEventMetadata);
events.add(updateEvent);
}
long requestId = requestCounter.incrementAndGet();
TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr,
task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat to AM, request=" + request);
}
maybeLogCounters();
TezHeartbeatResponse response = umbilical.heartbeat(request);
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat response from AM, response=" + response);
}
if (response.shouldDie()) {
LOG.info("Received should die response from AM");
return new ResponseWrapper(true, 1);
}
if (response.getLastRequestId() != requestId) {
throw new TezException("AM and Task out of sync" + ", responseReqId="
+ response.getLastRequestId() + ", expectedReqId=" + requestId);
}
// The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks
// are running using the same umbilical.
int numEventsReceived = 0;
if (task.isTaskDone() || task.hadFatalError()) {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
LOG.warn("Current task already complete, Ignoring all event in"
+ " heartbeat response, eventCount=" + response.getEvents().size());
}
} else {
if (response.getEvents() != null && !response.getEvents().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId="
+ task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size());
}
// This should ideally happen in a separate thread
numEventsReceived = response.getEvents().size();
task.handleEvents(response.getEvents());
}
}
return new ResponseWrapper(false, numEventsReceived);
}
public void markComplete() {
// Notify to clear pending events, if any.
lock.lock();
try {
condition.signal();
} finally {
lock.unlock();
}
}
private void maybeLogCounters() {
if (LOG.isDebugEnabled()) {
if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) {
LOG.debug("Counters: " + task.getCounters().toShortString());
nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF));
}
}
}
/**
* Sends out final events for task success.
* @param taskAttemptID
* @return
* @throws IOException
* indicates an RPC communication failure.
* @throws TezException
* indicates an exception somewhere in the AM.
*/
private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
task.getProgress()), updateEventMetadata);
TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(),
updateEventMetadata);
return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie;
}
/**
* Sends out final events for task failure.
* @param taskAttemptID
* @param t
* @param diagnostics
* @param srcMeta
* @return
* @throws IOException
* indicates an RPC communication failure.
* @throws TezException
* indicates an exception somewhere in the AM.
*/
private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
List<TezEvent> tezEvents = new ArrayList<TezEvent>();
try {
TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(),
task.getProgress()), updateEventMetadata);
tezEvents.add(statusUpdateEvent);
} catch (Exception e) {
// Counter may exceed limitation
LOG.warn("Error when get constructing TaskStatusUpdateEvent");
}
if (diagnostics == null) {
diagnostics = ExceptionUtils.getStackTrace(t);
} else {
diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t);
}
TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics),
srcMeta == null ? updateEventMetadata : srcMeta);
tezEvents.add(taskAttemptFailedEvent);
return !heartbeat(tezEvents).shouldDie;
}
private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
if (events != null && !events.isEmpty()) {
eventsToSend.addAll(events);
}
}
}
private static class HeartbeatCallback implements FutureCallback<Boolean> {
private final ErrorReporter errorReporter;
HeartbeatCallback(ErrorReporter errorReporter) {
this.errorReporter = errorReporter;
}
@Override
public void onSuccess(Boolean result) {
if (result == false) {
errorReporter.shutdownRequested();
}
}
@Override
public void onFailure(Throwable t) {
errorReporter.reportError(t);
}
}
public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException {
return currentCallable.taskSucceeded(taskAttemptID);
}
public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics,
EventMetaData srcMeta) throws IOException, TezException {
return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta);
}
public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) {
currentCallable.addEvents(taskAttemptID, events);
}
public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException {
return umbilical.canCommit(taskAttemptID);
}
private static final class ResponseWrapper {
boolean shouldDie;
int numEvents;
private ResponseWrapper(boolean shouldDie, int numEvents) {
this.shouldDie = shouldDie;
this.numEvents = numEvents;
}
}
}