blob: 51895f4afd17a9b072c7c10152b88de9d0e71e6a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.tez.dag.app;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import org.apache.tez.common.Preconditions;
import org.apache.commons.collections4.ListUtils;
import org.apache.hadoop.yarn.event.Event;
import org.apache.tez.Utils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventSubmitted;
import org.apache.tez.runtime.api.TaskFailureType;
import org.apache.tez.runtime.api.events.TaskAttemptKilledEvent;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskCommunicator;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.api.UserPayload;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventType;
import org.apache.tez.serviceplugins.api.ContainerEndReason;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskAttemptFailedEvent;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventMetaData;
import org.apache.tez.runtime.api.impl.EventType;
import org.apache.tez.dag.api.event.VertexStateUpdate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.common.TezUtilsInternal;
import org.apache.tez.serviceplugins.api.TaskCommunicatorContext;
import org.apache.tez.serviceplugins.api.TaskAttemptEndReason;
import org.apache.tez.serviceplugins.api.TaskHeartbeatResponse;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.serviceplugins.api.TaskHeartbeatRequest;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptFailed;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventAttemptKilled;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventTezEventUpdate;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;
@SuppressWarnings("unchecked")
@InterfaceAudience.Private
public class TaskCommunicatorManager extends AbstractService implements
TaskCommunicatorManagerInterface {
private static final Logger LOG = LoggerFactory
.getLogger(TaskCommunicatorManager.class);
private final AppContext context;
private final TaskCommunicatorWrapper[] taskCommunicators;
private final TaskCommunicatorContext[] taskCommunicatorContexts;
protected final ServicePluginLifecycleAbstractService []taskCommunicatorServiceWrappers;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
private final TaskHeartbeatResponse RESPONSE_SHOULD_DIE = new TaskHeartbeatResponse(true, null, 0, 0);
private final ConcurrentMap<TezTaskAttemptID, ContainerId> registeredAttempts =
new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
private final ConcurrentMap<ContainerId, ContainerInfo> registeredContainers =
new ConcurrentHashMap<ContainerId, ContainerInfo>();
// Defined primarily to work around ConcurrentMaps not accepting null values
private static final class ContainerInfo {
TezTaskAttemptID taskAttemptId;
ContainerInfo(TezTaskAttemptID taskAttemptId) {
this.taskAttemptId = taskAttemptId;
}
}
private static final ContainerInfo NULL_CONTAINER_INFO = new ContainerInfo(null);
@VisibleForTesting
@InterfaceAudience.Private
/**
* Only for testing.
*/
public TaskCommunicatorManager(TaskCommunicator taskCommunicator, AppContext appContext,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh) {
super(TaskCommunicatorManager.class.getName());
this.context = appContext;
this.taskHeartbeatHandler = thh;
this.containerHeartbeatHandler = chh;
taskCommunicators =
new TaskCommunicatorWrapper[]{new TaskCommunicatorWrapper(taskCommunicator)};
taskCommunicatorContexts = new TaskCommunicatorContext[]{taskCommunicator.getContext()};
taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[]{
new ServicePluginLifecycleAbstractService(taskCommunicator)};
}
public TaskCommunicatorManager(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
List<NamedEntityDescriptor> taskCommunicatorDescriptors) throws TezException {
super(TaskCommunicatorManager.class.getName());
this.context = context;
this.taskHeartbeatHandler = thh;
this.containerHeartbeatHandler = chh;
if (taskCommunicatorDescriptors == null || taskCommunicatorDescriptors.isEmpty()) {
throw new IllegalArgumentException("TaskCommunicators must be specified");
}
this.taskCommunicators = new TaskCommunicatorWrapper[taskCommunicatorDescriptors.size()];
this.taskCommunicatorContexts = new TaskCommunicatorContext[taskCommunicatorDescriptors.size()];
this.taskCommunicatorServiceWrappers = new ServicePluginLifecycleAbstractService[taskCommunicatorDescriptors.size()];
for (int i = 0 ; i < taskCommunicatorDescriptors.size() ; i++) {
UserPayload userPayload = taskCommunicatorDescriptors.get(i).getUserPayload();
taskCommunicatorContexts[i] = new TaskCommunicatorContextImpl(context, this, userPayload, i);
taskCommunicators[i] = new TaskCommunicatorWrapper(createTaskCommunicator(taskCommunicatorDescriptors.get(i), i));
taskCommunicatorServiceWrappers[i] =
new ServicePluginLifecycleAbstractService(taskCommunicators[i].getTaskCommunicator());
}
// TODO TEZ-2118 Start using taskCommunicator indices properly
}
@Override
public void serviceStart() {
// TODO Why is init tied to serviceStart
for (int i = 0 ; i < taskCommunicators.length ; i++) {
taskCommunicatorServiceWrappers[i].init(getConfig());
taskCommunicatorServiceWrappers[i].start();
}
}
@Override
public void serviceStop() {
for (int i = 0 ; i < taskCommunicators.length ; i++) {
taskCommunicatorServiceWrappers[i].stop();
}
}
@VisibleForTesting
TaskCommunicator createTaskCommunicator(NamedEntityDescriptor taskCommDescriptor,
int taskCommIndex) throws TezException {
if (taskCommDescriptor.getEntityName().equals(TezConstants.getTezYarnServicePluginName())) {
return createDefaultTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
} else if (taskCommDescriptor.getEntityName()
.equals(TezConstants.getTezUberServicePluginName())) {
return createUberTaskCommunicator(taskCommunicatorContexts[taskCommIndex]);
} else {
return createCustomTaskCommunicator(taskCommunicatorContexts[taskCommIndex],
taskCommDescriptor);
}
}
@VisibleForTesting
TaskCommunicator createDefaultTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
LOG.info("Creating Default Task Communicator");
return new TezTaskCommunicatorImpl(taskCommunicatorContext);
}
@VisibleForTesting
TaskCommunicator createUberTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext) {
LOG.info("Creating Default Local Task Communicator");
return new TezLocalTaskCommunicatorImpl(taskCommunicatorContext);
}
@VisibleForTesting
TaskCommunicator createCustomTaskCommunicator(TaskCommunicatorContext taskCommunicatorContext,
NamedEntityDescriptor taskCommDescriptor)
throws TezException {
LOG.info("Creating TaskCommunicator {}:{} " + taskCommDescriptor.getEntityName(),
taskCommDescriptor.getClassName());
Class<? extends TaskCommunicator> taskCommClazz =
(Class<? extends TaskCommunicator>) ReflectionUtils
.getClazz(taskCommDescriptor.getClassName());
try {
Constructor<? extends TaskCommunicator> ctor =
taskCommClazz.getConstructor(TaskCommunicatorContext.class);
return ctor.newInstance(taskCommunicatorContext);
} catch (NoSuchMethodException | InvocationTargetException | InstantiationException | IllegalAccessException e) {
throw new TezUncheckedException(e);
}
}
public TaskHeartbeatResponse heartbeat(TaskHeartbeatRequest request)
throws IOException, TezException {
ContainerId containerId = ConverterUtils.toContainerId(request
.getContainerIdentifier());
LOG.debug("Received heartbeat from container, request={}", request);
if (!registeredContainers.containsKey(containerId)) {
LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
", asking it to die");
return RESPONSE_SHOULD_DIE;
}
// A heartbeat can come in anytime. The AM may have made a decision to kill a running task/container
// meanwhile. If the decision is processed through the pipeline before the heartbeat is processed,
// the heartbeat will be dropped. Otherwise the heartbeat will be processed - and the system
// know how to handle this - via FailedInputEvents for example (relevant only if the heartbeat has events).
// So - avoiding synchronization.
pingContainerHeartbeatHandler(containerId);
TaskAttemptEventInfo eventInfo = new TaskAttemptEventInfo(0, null, 0);
TezTaskAttemptID taskAttemptID = request.getTaskAttemptId();
if (taskAttemptID != null) {
ContainerId containerIdFromMap = registeredAttempts.get(taskAttemptID);
if (containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
// This can happen when a task heartbeats. Meanwhile the container is unregistered.
// The information will eventually make it through to the plugin via a corresponding unregister.
// There's a race in that case between the unregister making it through, and this method returning.
// TODO TEZ-2003 (post) TEZ-2666. An exception back is likely a better approach than sending a shouldDie = true,
// so that the plugin can handle the scenario. Alternately augment the response with error codes.
// Error codes would be better than exceptions.
LOG.info("Attempt: " + taskAttemptID + " is not recognized for heartbeats");
return RESPONSE_SHOULD_DIE;
}
List<TezEvent> inEvents = request.getEvents();
if (LOG.isDebugEnabled()) {
LOG.debug("Ping from " + taskAttemptID.toString() +
" events: " + (inEvents != null ? inEvents.size() : -1));
}
long currTime = context.getClock().getTime();
// taFinishedEvents - means the TaskAttemptFinishedEvent
// taGeneratedEvents - for recovery, means the events generated by this task attempt and is needed by its downstream vertices
// eventsForVertex - including all the taGeneratedEvents and other events such as INPUT_READ_ERROR_EVENT/INPUT_FAILED_EVENT
// taGeneratedEvents is routed both to TaskAttempt & Vertex. Route to Vertex is for performance consideration
// taFinishedEvents must be routed before taGeneratedEvents
List<TezEvent> taFinishedEvents = new ArrayList<TezEvent>();
List<TezEvent> taGeneratedEvents = new ArrayList<TezEvent>();
List<TezEvent> eventsForVertex = new ArrayList<TezEvent>();
TaskAttemptEventStatusUpdate taskAttemptEvent = null;
boolean readErrorReported = false;
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
// for now, set the event time on the AM when it is received.
// this avoids any time disparity between machines.
tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
// send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed
taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
(TaskStatusUpdateEvent) tezEvent.getEvent());
} else if (eventType == EventType.TASK_ATTEMPT_COMPLETED_EVENT
|| eventType == EventType.TASK_ATTEMPT_FAILED_EVENT
|| eventType == EventType.TASK_ATTEMPT_KILLED_EVENT) {
taFinishedEvents.add(tezEvent);
} else {
if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
readErrorReported = true;
}
if (eventType == EventType.DATA_MOVEMENT_EVENT
|| eventType == EventType.COMPOSITE_DATA_MOVEMENT_EVENT
|| eventType == EventType.ROOT_INPUT_INITIALIZER_EVENT
|| eventType == EventType.VERTEX_MANAGER_EVENT) {
taGeneratedEvents.add(tezEvent);
}
eventsForVertex.add(tezEvent);
}
}
if (taskAttemptEvent != null) {
taskAttemptEvent.setReadErrorReported(readErrorReported);
sendEvent(taskAttemptEvent);
}
// route taGeneratedEvents to TaskAttempt
if (!taGeneratedEvents.isEmpty()) {
sendEvent(new TaskAttemptEventTezEventUpdate(taskAttemptID, taGeneratedEvents));
}
// route events to TaskAttempt
Preconditions.checkArgument(taFinishedEvents.size() <= 1, "Multiple TaskAttemptFinishedEvent");
for (TezEvent e : taFinishedEvents) {
EventMetaData sourceMeta = e.getSourceInfo();
switch (e.getEventType()) {
case TASK_ATTEMPT_FAILED_EVENT:
case TASK_ATTEMPT_KILLED_EVENT:
TaskAttemptTerminationCause errCause = null;
switch (sourceMeta.getEventGenerator()) {
case INPUT:
errCause = TaskAttemptTerminationCause.INPUT_READ_ERROR;
break;
case PROCESSOR:
errCause = TaskAttemptTerminationCause.APPLICATION_ERROR;
break;
case OUTPUT:
errCause = TaskAttemptTerminationCause.OUTPUT_WRITE_ERROR;
break;
case SYSTEM:
errCause = TaskAttemptTerminationCause.FRAMEWORK_ERROR;
break;
default:
throw new TezUncheckedException("Unknown EventProducerConsumerType: " +
sourceMeta.getEventGenerator());
}
if (e.getEventType() == EventType.TASK_ATTEMPT_FAILED_EVENT) {
TaskAttemptFailedEvent taskFailedEvent = (TaskAttemptFailedEvent) e.getEvent();
sendEvent(
new TaskAttemptEventAttemptFailed(sourceMeta.getTaskAttemptID(),
TaskAttemptEventType.TA_FAILED, taskFailedEvent.getTaskFailureType(),
"Error: " + taskFailedEvent.getDiagnostics(),
errCause));
} else { // Killed
TaskAttemptKilledEvent taskKilledEvent = (TaskAttemptKilledEvent) e.getEvent();
sendEvent(
new TaskAttemptEventAttemptKilled(sourceMeta.getTaskAttemptID(),
"Error: " + taskKilledEvent.getDiagnostics(), errCause));
}
break;
case TASK_ATTEMPT_COMPLETED_EVENT:
sendEvent(
new TaskAttemptEvent(sourceMeta.getTaskAttemptID(), TaskAttemptEventType.TA_DONE));
break;
default:
throw new TezUncheckedException("Unhandled tez event type: "
+ e.getEventType());
}
}
if (!eventsForVertex.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getVertexID();
sendEvent(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(eventsForVertex)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(), request.getPreRoutedStartIndex(),
request.getMaxEvents());
}
return new TaskHeartbeatResponse(false, eventInfo.getEvents(), eventInfo.getNextFromEventId(), eventInfo.getNextPreRoutedFromEventId());
}
public void taskAlive(TezTaskAttemptID taskAttemptId) {
taskHeartbeatHandler.pinged(taskAttemptId);
}
public void containerAlive(ContainerId containerId) {
pingContainerHeartbeatHandler(containerId);
}
public void taskSubmitted(TezTaskAttemptID taskAttemptId, ContainerId containerId) {
sendEvent(new TaskAttemptEventSubmitted(taskAttemptId, containerId));
pingContainerHeartbeatHandler(containerId);
}
public void taskStartedRemotely(TezTaskAttemptID taskAttemptID) {
sendEvent(new TaskAttemptEventStartedRemotely(taskAttemptID));
}
public void taskKilled(TezTaskAttemptID taskAttemptId, TaskAttemptEndReason taskAttemptEndReason,
String diagnostics) {
// Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
// and messages from the scheduler will release the container.
// TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
// Fix along the same lines as TEZ-2124 by introducing an explict context.
sendEvent(new TaskAttemptEventAttemptKilled(taskAttemptId,
diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
taskAttemptEndReason)));
}
public void taskFailed(TezTaskAttemptID taskAttemptId, TaskFailureType taskFailureType,
TaskAttemptEndReason taskAttemptEndReason,
String diagnostics) {
// Regular flow via TaskAttempt will take care of un-registering from the heartbeat handler,
// and messages from the scheduler will release the container.
// TODO TEZ-2003 (post) TEZ-2671 Maybe consider un-registering here itself, since the task is not active anymore,
// instead of waiting for the unregister to flow through the Container.
// Fix along the same lines as TEZ-2124 by introducing an explict context.
//TODO-3183. Allow the FailureType to be specified
sendEvent(new TaskAttemptEventAttemptFailed(taskAttemptId,
TaskAttemptEventType.TA_FAILED, taskFailureType, diagnostics, TezUtilsInternal.fromTaskAttemptEndReason(
taskAttemptEndReason)));
}
public void vertexStateUpdateNotificationReceived(VertexStateUpdate event, int taskCommIndex) {
try {
taskCommunicators[taskCommIndex].onVertexStateUpdated(event);
} catch (Exception e) {
String msg = "Error in TaskCommunicator when handling vertex state update notification"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context)
+ ", vertexName=" + event.getVertexName()
+ ", vertexState=" + event.getVertexState();
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, e));
}
}
/**
* Child checking whether it can commit.
* <p/>
* <br/>
* Repeatedly polls the ApplicationMaster whether it
* {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
* centralized commit protocol handling by the JobTracker.
*/
// @Override
public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
// An attempt is asking if it can commit its output. This can be decided
// only by the task which is managing the multiple attempts. So redirect the
// request there.
taskHeartbeatHandler.progressing(taskAttemptId);
pingContainerHeartbeatHandler(taskAttemptId);
DAG job = context.getCurrentDAG();
Task task =
job.getVertex(taskAttemptId.getVertexID()).
getTask(taskAttemptId.getTaskID());
return task.canCommit(taskAttemptId);
}
// The TaskAttemptListener register / unregister methods in this class are not thread safe.
// The Tez framework should not invoke these methods from multiple threads.
@Override
public void dagComplete(DAG dag) {
// TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
// TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die.
// Container structures remain unchanged - since they could be re-used across restarts.
// This becomes more relevant when task kills without container kills are allowed.
// TODO TEZ-2336. Send a signal to containers indicating DAG completion.
// Inform all communicators of the dagCompletion.
for (int i = 0 ; i < taskCommunicators.length ; i++) {
try {
((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteStart(dag);
taskCommunicators[i].dagComplete(dag.getID().getId());
((TaskCommunicatorContextImpl) taskCommunicatorContexts[i]).dagCompleteEnd();
} catch (Exception e) {
String msg = "Error in TaskCommunicator when notifying for DAG completion"
+ ", communicator=" + Utils.getTaskCommIdentifierString(i, context);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, e));
}
}
}
@Override
public void dagSubmitted() {
// Nothing to do right now. Indicates that a new DAG has been submitted and
// the context has updated information.
}
@Override
public void registerRunningContainer(ContainerId containerId, int taskCommId) {
LOG.debug("ContainerId: {} registered with TaskAttemptListener", containerId);
ContainerInfo oldInfo = registeredContainers.put(containerId, NULL_CONTAINER_INFO);
if (oldInfo != null) {
throw new TezUncheckedException(
"Multiple registrations for containerId: " + containerId);
}
NodeId nodeId = context.getAllContainers().get(containerId).getContainer().getNodeId();
try {
taskCommunicators[taskCommId].registerRunningContainer(containerId, nodeId.getHost(),
nodeId.getPort());
} catch (Exception e) {
String msg = "Error in TaskCommunicator when registering running Container"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ ", containerId=" + containerId
+ ", nodeId=" + nodeId;
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, e));
}
}
@Override
public void unregisterRunningContainer(ContainerId containerId, int taskCommId, ContainerEndReason endReason, String diagnostics) {
LOG.debug("Unregistering Container from TaskAttemptListener: {}", containerId);
ContainerInfo containerInfo = registeredContainers.remove(containerId);
if (containerInfo.taskAttemptId != null) {
registeredAttempts.remove(containerInfo.taskAttemptId);
}
try {
taskCommunicators[taskCommId].registerContainerEnd(containerId, endReason, diagnostics);
} catch (Exception e) {
String msg = "Error in TaskCommunicator when unregistering Container"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ ", containerId=" + containerId;
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, e));
}
}
@Override
public void registerTaskAttempt(AMContainerTask amContainerTask,
ContainerId containerId, int taskCommId) {
ContainerInfo containerInfo = registeredContainers.get(containerId);
if (containerInfo == null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
}
if (containerInfo.taskAttemptId != null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " with existing assignment to: " +
containerInfo.taskAttemptId);
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, new ContainerInfo(amContainerTask.getTask().getTaskAttemptID()));
ContainerId containerIdFromMap = registeredAttempts.put(
amContainerTask.getTask().getTaskAttemptID(), containerId);
if (containerIdFromMap != null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " when already assigned to: " + containerIdFromMap);
}
try {
taskCommunicators[taskCommId].registerRunningTaskAttempt(containerId, amContainerTask.getTask(),
amContainerTask.getAdditionalResources(), amContainerTask.getCredentials(),
amContainerTask.haveCredentialsChanged(), amContainerTask.getPriority());
} catch (Exception e) {
String msg = "Error in TaskCommunicator when registering Task Attempt"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ ", containerId=" + containerId
+ ", taskId=" + amContainerTask.getTask().getTaskAttemptID();
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, e));
}
}
@Override
public void unregisterTaskAttempt(TezTaskAttemptID attemptId, int taskCommId, TaskAttemptEndReason endReason, String diagnostics) {
ContainerId containerId = registeredAttempts.remove(attemptId);
if (containerId == null) {
LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
return;
}
ContainerInfo containerInfo = registeredContainers.get(containerId);
if (containerInfo == null) {
LOG.warn("Unregister task attempt: " + attemptId +
" from non-registered container: " + containerId);
return;
}
// Explicitly putting in a new entry so that synchronization is not required on the existing element in the map.
registeredContainers.put(containerId, NULL_CONTAINER_INFO);
try {
taskCommunicators[taskCommId].unregisterRunningTaskAttempt(attemptId, endReason, diagnostics);
} catch (Exception e) {
String msg = "Error in TaskCommunicator when unregistering Task Attempt"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ ", containerId=" + containerId
+ ", taskId=" + attemptId;
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, e));
}
}
@Override
public TaskCommunicatorWrapper getTaskCommunicator(int taskCommIndex) {
return taskCommunicators[taskCommIndex];
}
@Override
public void reportError(int taskCommIndex, ServicePluginError servicePluginError,
String diagnostics,
DagInfo dagInfo) {
if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
String msg = "Fatal Error reported by TaskCommunicator"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommIndex, context)
+ ", servicePluginError=" + servicePluginError
+ ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
LOG.error(msg + ", Diagnostics=" + diagnostics);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_COMMUNICATOR_SERVICE_FATAL_ERROR,
msg, null));
} else {
Utils
.processNonFatalServiceErrorReport(
Utils.getTaskCommIdentifierString(taskCommIndex, context), servicePluginError,
diagnostics,
dagInfo, context,
"TaskCommunicator");
}
}
private void pingContainerHeartbeatHandler(ContainerId containerId) {
containerHeartbeatHandler.pinged(containerId);
}
private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
ContainerId containerId = registeredAttempts.get(taskAttemptId);
if (containerId != null) {
containerHeartbeatHandler.pinged(containerId);
} else {
LOG.warn("Handling communication from attempt: " + taskAttemptId
+ ", ContainerId not known for this attempt");
}
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
context.getEventHandler().handle(event);
}
@Override
public String getTaskCommunicatorClassName(int taskCommId) {
return taskCommunicators[taskCommId].getTaskCommunicator().getClass().getName();
}
@Override
public String getInProgressLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId) {
try {
return taskCommunicators[taskCommId].getInProgressLogsUrl(attemptID, containerNodeId);
} catch (Exception e) {
LOG.warn("Failed to retrieve InProgressLogsUrl from TaskCommunicator,"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ ", attemptId=" + attemptID, e);
}
return null;
}
@Override
public String getCompletedLogsUrl(int taskCommId, TezTaskAttemptID attemptID, NodeId containerNodeId) {
try {
return taskCommunicators[taskCommId].getCompletedLogsUrl(attemptID, containerNodeId);
} catch (Exception e) {
LOG.warn("Failed to retrieve CompletedLogsUrl from TaskCommunicator,"
+ ", communicator=" + Utils.getTaskCommIdentifierString(taskCommId, context)
+ ", attemptId=" + attemptID, e);
}
return null;
}
@Override
public long getTotalUsedMemory() {
long totalUsedMemory = 0;
for (int i = 0; i < taskCommunicators.length; i++) {
totalUsedMemory += taskCommunicators[i].getTaskCommunicator().getTotalUsedMemory();
}
return totalUsedMemory;
}
}