| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tez.runtime.task; |
| |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.List; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicLong; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| |
| import org.apache.commons.lang.exception.ExceptionUtils; |
| import org.apache.log4j.Logger; |
| import org.apache.tez.common.TezTaskUmbilicalProtocol; |
| import org.apache.tez.common.counters.TezCounters; |
| import org.apache.tez.dag.api.TezException; |
| import org.apache.tez.dag.records.TezTaskAttemptID; |
| import org.apache.tez.runtime.LogicalIOProcessorRuntimeTask; |
| import org.apache.tez.runtime.api.events.TaskAttemptCompletedEvent; |
| import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent; |
| import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent; |
| import org.apache.tez.runtime.api.impl.EventMetaData; |
| import org.apache.tez.runtime.api.impl.TezEvent; |
| import org.apache.tez.runtime.api.impl.TezHeartbeatRequest; |
| import org.apache.tez.runtime.api.impl.TezHeartbeatResponse; |
| import org.apache.tez.runtime.api.impl.EventMetaData.EventProducerConsumerType; |
| |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.Lists; |
| import com.google.common.util.concurrent.FutureCallback; |
| import com.google.common.util.concurrent.Futures; |
| import com.google.common.util.concurrent.ListenableFuture; |
| import com.google.common.util.concurrent.ListeningExecutorService; |
| import com.google.common.util.concurrent.MoreExecutors; |
| import com.google.common.util.concurrent.ThreadFactoryBuilder; |
| |
| /** |
| * Responsible for communication between tasks running in a Container and the ApplicationMaster. |
| * Takes care of sending heartbeats (regular and OOB) to the AM - to send generated events, and to |
| * retrieve events specific to this task. |
| * |
| */ |
| public class TaskReporter { |
| |
| private static final Logger LOG = Logger.getLogger(TaskReporter.class); |
| |
| private final TezTaskUmbilicalProtocol umbilical; |
| private final long pollInterval; |
| private final long sendCounterInterval; |
| private final int maxEventsToGet; |
| private final AtomicLong requestCounter; |
| private final String containerIdStr; |
| |
| private final ListeningExecutorService heartbeatExecutor; |
| |
| @VisibleForTesting |
| HeartbeatCallable currentCallable; |
| |
| public TaskReporter(TezTaskUmbilicalProtocol umbilical, long amPollInterval, |
| long sendCounterInterval, int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) { |
| this.umbilical = umbilical; |
| this.pollInterval = amPollInterval; |
| this.sendCounterInterval = sendCounterInterval; |
| this.maxEventsToGet = maxEventsToGet; |
| this.requestCounter = requestCounter; |
| this.containerIdStr = containerIdStr; |
| ExecutorService executor = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() |
| .setDaemon(true).setNameFormat("TaskHeartbeatThread").build()); |
| heartbeatExecutor = MoreExecutors.listeningDecorator(executor); |
| } |
| |
| /** |
| * Register a task to be tracked. Heartbeats will be sent out for this task to fetch events, etc. |
| */ |
| public synchronized void registerTask(LogicalIOProcessorRuntimeTask task, |
| ErrorReporter errorReporter) { |
| currentCallable = new HeartbeatCallable(task, umbilical, pollInterval, sendCounterInterval, |
| maxEventsToGet, requestCounter, containerIdStr); |
| ListenableFuture<Boolean> future = heartbeatExecutor.submit(currentCallable); |
| Futures.addCallback(future, new HeartbeatCallback(errorReporter)); |
| } |
| |
| /** |
| * This method should always be invoked before setting up heartbeats for another task running in |
| * the same container. |
| */ |
| public synchronized void unregisterTask(TezTaskAttemptID taskAttemptID) { |
| currentCallable.markComplete(); |
| currentCallable = null; |
| } |
| |
| public void shutdown() { |
| heartbeatExecutor.shutdownNow(); |
| } |
| |
| @VisibleForTesting |
| static class HeartbeatCallable implements Callable<Boolean> { |
| |
| private static final int LOG_COUNTER_START_INTERVAL = 5000; // 5 seconds |
| private static final float LOG_COUNTER_BACKOFF = 1.3f; |
| |
| private final LogicalIOProcessorRuntimeTask task; |
| private EventMetaData updateEventMetadata; |
| |
| private final TezTaskUmbilicalProtocol umbilical; |
| |
| private final long pollInterval; |
| private final long sendCounterInterval; |
| private final int maxEventsToGet; |
| private final String containerIdStr; |
| |
| private final AtomicLong requestCounter; |
| |
| private LinkedBlockingQueue<TezEvent> eventsToSend = new LinkedBlockingQueue<TezEvent>(); |
| |
| private final ReentrantLock lock = new ReentrantLock(); |
| private final Condition condition = lock.newCondition(); |
| |
| /* |
| * Keeps track of regular timed heartbeats. Is primarily used as a timing mechanism to send / |
| * log counters. |
| */ |
| private int nonOobHeartbeatCounter = 0; |
| private int nextHeartbeatNumToLog = 0; |
| /* |
| * Tracks the last non-OOB heartbeat number at which counters were sent to the AM. |
| */ |
| private int prevCounterSendHeartbeatNum = 0; |
| |
| public HeartbeatCallable(LogicalIOProcessorRuntimeTask task, |
| TezTaskUmbilicalProtocol umbilical, long amPollInterval, long sendCounterInterval, |
| int maxEventsToGet, AtomicLong requestCounter, String containerIdStr) { |
| |
| this.pollInterval = amPollInterval; |
| this.sendCounterInterval = sendCounterInterval; |
| this.maxEventsToGet = maxEventsToGet; |
| this.requestCounter = requestCounter; |
| this.containerIdStr = containerIdStr; |
| |
| this.task = task; |
| this.umbilical = umbilical; |
| this.updateEventMetadata = new EventMetaData(EventProducerConsumerType.SYSTEM, |
| task.getVertexName(), "", task.getTaskAttemptID()); |
| |
| nextHeartbeatNumToLog = (Math.max(1, |
| (int) (LOG_COUNTER_START_INTERVAL / (amPollInterval == 0 ? 0.000001f |
| : (float) amPollInterval)))); |
| } |
| |
| @Override |
| public Boolean call() throws Exception { |
| // Heartbeat only for active tasks. Errors, etc will be reported directly. |
| while (!task.isTaskDone() && !task.hadFatalError()) { |
| ResponseWrapper response = heartbeat(null); |
| |
| if (response.shouldDie) { |
| // AM sent a shouldDie=true |
| LOG.info("Asked to die via task heartbeat"); |
| return false; |
| } else { |
| if (response.numEvents < maxEventsToGet) { |
| // Wait before sending another heartbeat. Otherwise consider as an OOB heartbeat |
| lock.lock(); |
| try { |
| boolean interrupted = condition.await(pollInterval, TimeUnit.MILLISECONDS); |
| if (!interrupted) { |
| nonOobHeartbeatCounter++; |
| } |
| } finally { |
| lock.unlock(); |
| } |
| } |
| } |
| } |
| int pendingEventCount = eventsToSend.size(); |
| if (pendingEventCount > 0) { |
| LOG.warn("Exiting TaskReporter thread with pending queue size=" + pendingEventCount); |
| } |
| return true; |
| } |
| |
| /** |
| * @param eventsArg |
| * @return |
| * @throws IOException |
| * indicates an RPC communication failure. |
| * @throws TezException |
| * indicates an exception somewhere in the AM. |
| */ |
| private synchronized ResponseWrapper heartbeat(Collection<TezEvent> eventsArg) throws IOException, |
| TezException { |
| |
| if (eventsArg != null) { |
| eventsToSend.addAll(eventsArg); |
| } |
| |
| TezEvent updateEvent = null; |
| List<TezEvent> events = new ArrayList<TezEvent>(); |
| eventsToSend.drainTo(events); |
| |
| if (!task.isTaskDone() && !task.hadFatalError()) { |
| TezCounters counters = null; |
| /** |
| * Increasing the heartbeat interval can delay the delivery of events. Sending just updated |
| * records would save CPU in DAG AM, but certain counters are updated very frequently. Until |
| * real time decisions are made based on these counters, it can be sent once per second. |
| */ |
| // Not completely accurate, since OOB heartbeats could go out. |
| if ((nonOobHeartbeatCounter - prevCounterSendHeartbeatNum) * pollInterval >= sendCounterInterval) { |
| counters = task.getCounters(); |
| prevCounterSendHeartbeatNum = nonOobHeartbeatCounter; |
| } |
| updateEvent = new TezEvent(new TaskStatusUpdateEvent(counters, task.getProgress()), |
| updateEventMetadata); |
| events.add(updateEvent); |
| } |
| |
| long requestId = requestCounter.incrementAndGet(); |
| TezHeartbeatRequest request = new TezHeartbeatRequest(requestId, events, containerIdStr, |
| task.getTaskAttemptID(), task.getEventCounter(), maxEventsToGet); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Sending heartbeat to AM, request=" + request); |
| } |
| |
| maybeLogCounters(); |
| |
| TezHeartbeatResponse response = umbilical.heartbeat(request); |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Received heartbeat response from AM, response=" + response); |
| } |
| |
| if (response.shouldDie()) { |
| LOG.info("Received should die response from AM"); |
| return new ResponseWrapper(true, 1); |
| } |
| if (response.getLastRequestId() != requestId) { |
| throw new TezException("AM and Task out of sync" + ", responseReqId=" |
| + response.getLastRequestId() + ", expectedReqId=" + requestId); |
| } |
| |
| // The same umbilical is used by multiple tasks. Problematic in the case where multiple tasks |
| // are running using the same umbilical. |
| int numEventsReceived = 0; |
| if (task.isTaskDone() || task.hadFatalError()) { |
| if (response.getEvents() != null && !response.getEvents().isEmpty()) { |
| LOG.warn("Current task already complete, Ignoring all event in" |
| + " heartbeat response, eventCount=" + response.getEvents().size()); |
| } |
| } else { |
| if (response.getEvents() != null && !response.getEvents().isEmpty()) { |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Routing events from heartbeat response to task" + ", currentTaskAttemptId=" |
| + task.getTaskAttemptID() + ", eventCount=" + response.getEvents().size()); |
| } |
| // This should ideally happen in a separate thread |
| numEventsReceived = response.getEvents().size(); |
| task.handleEvents(response.getEvents()); |
| } |
| } |
| return new ResponseWrapper(false, numEventsReceived); |
| } |
| |
| public void markComplete() { |
| // Notify to clear pending events, if any. |
| lock.lock(); |
| try { |
| condition.signal(); |
| } finally { |
| lock.unlock(); |
| } |
| } |
| |
| private void maybeLogCounters() { |
| if (LOG.isDebugEnabled()) { |
| if (nonOobHeartbeatCounter == nextHeartbeatNumToLog) { |
| LOG.debug("Counters: " + task.getCounters().toShortString()); |
| nextHeartbeatNumToLog = (int) (nextHeartbeatNumToLog * (LOG_COUNTER_BACKOFF)); |
| } |
| } |
| } |
| |
| /** |
| * Sends out final events for task success. |
| * @param taskAttemptID |
| * @return |
| * @throws IOException |
| * indicates an RPC communication failure. |
| * @throws TezException |
| * indicates an exception somewhere in the AM. |
| */ |
| private boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { |
| TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), |
| task.getProgress()), updateEventMetadata); |
| TezEvent taskCompletedEvent = new TezEvent(new TaskAttemptCompletedEvent(), |
| updateEventMetadata); |
| return !heartbeat(Lists.newArrayList(statusUpdateEvent, taskCompletedEvent)).shouldDie; |
| } |
| |
| /** |
| * Sends out final events for task failure. |
| * @param taskAttemptID |
| * @param t |
| * @param diagnostics |
| * @param srcMeta |
| * @return |
| * @throws IOException |
| * indicates an RPC communication failure. |
| * @throws TezException |
| * indicates an exception somewhere in the AM. |
| */ |
| private boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, |
| EventMetaData srcMeta) throws IOException, TezException { |
| List<TezEvent> tezEvents = new ArrayList<TezEvent>(); |
| try { |
| TezEvent statusUpdateEvent = new TezEvent(new TaskStatusUpdateEvent(task.getCounters(), |
| task.getProgress()), updateEventMetadata); |
| tezEvents.add(statusUpdateEvent); |
| } catch (Exception e) { |
| // Counter may exceed limitation |
| LOG.warn("Error when get constructing TaskStatusUpdateEvent"); |
| } |
| if (diagnostics == null) { |
| diagnostics = ExceptionUtils.getStackTrace(t); |
| } else { |
| diagnostics = diagnostics + ":" + ExceptionUtils.getStackTrace(t); |
| } |
| TezEvent taskAttemptFailedEvent = new TezEvent(new TaskAttemptFailedEvent(diagnostics), |
| srcMeta == null ? updateEventMetadata : srcMeta); |
| tezEvents.add(taskAttemptFailedEvent); |
| return !heartbeat(tezEvents).shouldDie; |
| } |
| |
| private void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { |
| if (events != null && !events.isEmpty()) { |
| eventsToSend.addAll(events); |
| } |
| } |
| } |
| |
| private static class HeartbeatCallback implements FutureCallback<Boolean> { |
| |
| private final ErrorReporter errorReporter; |
| |
| HeartbeatCallback(ErrorReporter errorReporter) { |
| this.errorReporter = errorReporter; |
| } |
| |
| @Override |
| public void onSuccess(Boolean result) { |
| if (result == false) { |
| errorReporter.shutdownRequested(); |
| } |
| } |
| |
| @Override |
| public void onFailure(Throwable t) { |
| errorReporter.reportError(t); |
| } |
| } |
| |
| public synchronized boolean taskSucceeded(TezTaskAttemptID taskAttemptID) throws IOException, TezException { |
| return currentCallable.taskSucceeded(taskAttemptID); |
| } |
| |
| public synchronized boolean taskFailed(TezTaskAttemptID taskAttemptID, Throwable t, String diagnostics, |
| EventMetaData srcMeta) throws IOException, TezException { |
| return currentCallable.taskFailed(taskAttemptID, t, diagnostics, srcMeta); |
| } |
| |
| public synchronized void addEvents(TezTaskAttemptID taskAttemptID, Collection<TezEvent> events) { |
| currentCallable.addEvents(taskAttemptID, events); |
| } |
| |
| public boolean canCommit(TezTaskAttemptID taskAttemptID) throws IOException { |
| return umbilical.canCommit(taskAttemptID); |
| } |
| |
| private static final class ResponseWrapper { |
| boolean shouldDie; |
| int numEvents; |
| |
| private ResponseWrapper(boolean shouldDie, int numEvents) { |
| this.shouldDie = shouldDie; |
| this.numEvents = numEvents; |
| } |
| } |
| } |