blob: 3ad16cdfeb1fbf561883ccd6ab5d2ab87eb58a9d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.tez.dag.app;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.commons.collections4.ListUtils;
import org.apache.tez.dag.app.dag.event.TaskAttemptEvent;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStatusUpdate;
import org.apache.tez.runtime.api.events.TaskStatusUpdateEvent;
import org.apache.tez.runtime.api.impl.EventType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.tez.common.ContainerContext;
import org.apache.tez.common.ContainerTask;
import org.apache.tez.common.TezConverterUtils;
import org.apache.tez.common.TezLocalResource;
import org.apache.tez.common.TezTaskUmbilicalProtocol;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.dag.DAG;
import org.apache.tez.dag.app.dag.Task;
import org.apache.tez.dag.app.dag.event.TaskAttemptEventStartedRemotely;
import org.apache.tez.dag.app.dag.event.VertexEventRouteEvent;
import org.apache.tez.dag.app.rm.container.AMContainerTask;
import org.apache.tez.dag.app.security.authorize.TezAMPolicyProvider;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.apache.tez.dag.records.TezVertexID;
import org.apache.tez.runtime.api.impl.TezEvent;
import org.apache.tez.runtime.api.impl.TezHeartbeatRequest;
import org.apache.tez.runtime.api.impl.TezHeartbeatResponse;
import org.apache.tez.common.security.JobTokenSecretManager;
import com.google.common.collect.Maps;
@SuppressWarnings("unchecked")
public class TaskAttemptListenerImpTezDag extends AbstractService implements
TezTaskUmbilicalProtocol, TaskAttemptListener {
private static final ContainerTask TASK_FOR_INVALID_JVM = new ContainerTask(
null, true, null, null, false);
private static final Logger LOG = LoggerFactory
.getLogger(TaskAttemptListenerImpTezDag.class);
private final AppContext context;
protected final TaskHeartbeatHandler taskHeartbeatHandler;
protected final ContainerHeartbeatHandler containerHeartbeatHandler;
private final JobTokenSecretManager jobTokenSecretManager;
private InetSocketAddress address;
private Server server;
static class ContainerInfo {
ContainerInfo() {
this.lastReponse = null;
this.lastRequestId = 0;
this.amContainerTask = null;
this.taskPulled = false;
}
long lastRequestId;
TezHeartbeatResponse lastReponse;
AMContainerTask amContainerTask;
boolean taskPulled;
}
private ConcurrentMap<TezTaskAttemptID, ContainerId> attemptToInfoMap =
new ConcurrentHashMap<TezTaskAttemptID, ContainerId>();
private ConcurrentHashMap<ContainerId, ContainerInfo> registeredContainers =
new ConcurrentHashMap<ContainerId, ContainerInfo>();
public TaskAttemptListenerImpTezDag(AppContext context,
TaskHeartbeatHandler thh, ContainerHeartbeatHandler chh,
JobTokenSecretManager jobTokenSecretManager) {
super(TaskAttemptListenerImpTezDag.class.getName());
this.context = context;
this.jobTokenSecretManager = jobTokenSecretManager;
this.taskHeartbeatHandler = thh;
this.containerHeartbeatHandler = chh;
}
@Override
public void serviceStart() {
startRpcServer();
}
protected void startRpcServer() {
Configuration conf = getConfig();
if (!conf.getBoolean(TezConfiguration.TEZ_LOCAL_MODE, TezConfiguration.TEZ_LOCAL_MODE_DEFAULT)) {
try {
server = new RPC.Builder(conf)
.setProtocol(TezTaskUmbilicalProtocol.class)
.setBindAddress("0.0.0.0")
.setPort(0)
.setInstance(this)
.setNumHandlers(
conf.getInt(TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT,
TezConfiguration.TEZ_AM_TASK_LISTENER_THREAD_COUNT_DEFAULT))
.setPortRangeConfig(TezConfiguration.TEZ_AM_TASK_AM_PORT_RANGE)
.setSecretManager(jobTokenSecretManager).build();
// Enable service authorization?
if (conf.getBoolean(
CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION,
false)) {
refreshServiceAcls(conf, new TezAMPolicyProvider());
}
server.start();
InetSocketAddress serverBindAddress = NetUtils.getConnectAddress(server);
this.address = NetUtils.createSocketAddrForHost(
serverBindAddress.getAddress().getCanonicalHostName(),
serverBindAddress.getPort());
LOG.info("Instantiated TaskAttemptListener RPC at " + this.address);
} catch (IOException e) {
throw new TezUncheckedException(e);
}
} else {
try {
this.address = new InetSocketAddress(InetAddress.getLocalHost(), 0);
} catch (UnknownHostException e) {
throw new TezUncheckedException(e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Not starting TaskAttemptListener RPC in LocalMode");
}
}
}
void refreshServiceAcls(Configuration configuration,
PolicyProvider policyProvider) {
this.server.refreshServiceAcl(configuration, policyProvider);
}
@Override
public void serviceStop() {
stopRpcServer();
}
protected void stopRpcServer() {
if (server != null) {
server.stop();
}
}
public InetSocketAddress getAddress() {
return address;
}
@Override
public long getProtocolVersion(String protocol, long clientVersion)
throws IOException {
return versionID;
}
@Override
public ProtocolSignature getProtocolSignature(String protocol,
long clientVersion, int clientMethodsHash) throws IOException {
return ProtocolSignature.getProtocolSignature(this, protocol,
clientVersion, clientMethodsHash);
}
@Override
public ContainerTask getTask(ContainerContext containerContext)
throws IOException {
ContainerTask task = null;
if (containerContext == null || containerContext.getContainerIdentifier() == null) {
LOG.info("Invalid task request with an empty containerContext or containerId");
task = TASK_FOR_INVALID_JVM;
} else {
ContainerId containerId = ConverterUtils.toContainerId(containerContext
.getContainerIdentifier());
if (LOG.isDebugEnabled()) {
LOG.debug("Container with id: " + containerId + " asked for a task");
}
if (!registeredContainers.containsKey(containerId)) {
if(context.getAllContainers().get(containerId) == null) {
LOG.info("Container with id: " + containerId
+ " is invalid and will be killed");
} else {
LOG.info("Container with id: " + containerId
+ " is valid, but no longer registered, and will be killed");
}
task = TASK_FOR_INVALID_JVM;
} else {
pingContainerHeartbeatHandler(containerId);
task = getContainerTask(containerId);
if (task == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("No task current assigned to Container with id: " + containerId);
}
} else if (task == TASK_FOR_INVALID_JVM) {
LOG.info("Container with id: " + containerId
+ " is valid, but no longer registered, and will be killed. Race condition.");
} else {
context.getEventHandler().handle(
new TaskAttemptEventStartedRemotely(task.getTaskSpec()
.getTaskAttemptID(), containerId, context
.getApplicationACLs()));
LOG.info("Container with id: " + containerId + " given task: "
+ task.getTaskSpec().getTaskAttemptID());
}
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("getTask returning task: " + task);
}
return task;
}
/**
* Child checking whether it can commit.
*
* <br/>
* Repeatedly polls the ApplicationMaster whether it
* {@link Task#canCommit(TezTaskAttemptID)} This is * a legacy from the
* centralized commit protocol handling by the JobTracker.
*/
@Override
public boolean canCommit(TezTaskAttemptID taskAttemptId) throws IOException {
LOG.info("Commit go/no-go request from " + taskAttemptId.toString());
// An attempt is asking if it can commit its output. This can be decided
// only by the task which is managing the multiple attempts. So redirect the
// request there.
taskHeartbeatHandler.progressing(taskAttemptId);
pingContainerHeartbeatHandler(taskAttemptId);
DAG job = context.getCurrentDAG();
Task task =
job.getVertex(taskAttemptId.getTaskID().getVertexID()).
getTask(taskAttemptId.getTaskID());
return task.canCommit(taskAttemptId);
}
@Override
public void unregisterTaskAttempt(TezTaskAttemptID attemptId) {
ContainerId containerId = attemptToInfoMap.get(attemptId);
if(containerId == null) {
LOG.warn("Unregister task attempt: " + attemptId + " from unknown container");
return;
}
ContainerInfo containerInfo = registeredContainers.get(containerId);
if(containerInfo == null) {
LOG.warn("Unregister task attempt: " + attemptId +
" from non-registered container: " + containerId);
return;
}
synchronized (containerInfo) {
containerInfo.amContainerTask = null;
attemptToInfoMap.remove(attemptId);
}
}
@Override
public void dagComplete(DAG dag) {
// TODO TEZ-2335. Cleanup TaskHeartbeat handler structures.
// TODO TEZ-2345. Also cleanup attemptInfo map, so that any tasks which heartbeat are told to die.
// Container structures remain unchanged - since they could be re-used across restarts.
// This becomes more relevant when task kills without container kills are allowed.
// TODO TEZ-2336. Send a signal to containers indicating DAG completion.
}
@Override
public void dagSubmitted() {
// Nothing to do right now. Indicates that a new DAG has been submitted and
// the context has updated information.
}
@Override
public void registerRunningContainer(ContainerId containerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("ContainerId: " + containerId
+ " registered with TaskAttemptListener");
}
ContainerInfo oldInfo = registeredContainers.put(containerId, new ContainerInfo());
if(oldInfo != null) {
throw new TezUncheckedException(
"Multiple registrations for containerId: " + containerId);
}
}
@Override
public void registerTaskAttempt(AMContainerTask amContainerTask,
ContainerId containerId) {
ContainerInfo containerInfo = registeredContainers.get(containerId);
if(containerInfo == null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to unknown container: " + containerId);
}
synchronized (containerInfo) {
if(containerInfo.amContainerTask != null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " with existing assignment to: " + containerInfo.amContainerTask.getTask().getTaskAttemptID());
}
containerInfo.amContainerTask = amContainerTask;
containerInfo.taskPulled = false;
ContainerId containerIdFromMap =
attemptToInfoMap.put(amContainerTask.getTask().getTaskAttemptID(), containerId);
if(containerIdFromMap != null) {
throw new TezUncheckedException("Registering task attempt: "
+ amContainerTask.getTask().getTaskAttemptID() + " to container: " + containerId
+ " when already assigned to: " + containerIdFromMap);
}
}
}
@Override
public void unregisterRunningContainer(ContainerId containerId) {
if (LOG.isDebugEnabled()) {
LOG.debug("Unregistering Container from TaskAttemptListener: "
+ containerId);
}
registeredContainers.remove(containerId);
}
private void pingContainerHeartbeatHandler(ContainerId containerId) {
containerHeartbeatHandler.pinged(containerId);
}
private void pingContainerHeartbeatHandler(TezTaskAttemptID taskAttemptId) {
ContainerId containerId = attemptToInfoMap.get(taskAttemptId);
if (containerId != null) {
containerHeartbeatHandler.pinged(containerId);
} else {
LOG.warn("Handling communication from attempt: " + taskAttemptId
+ ", ContainerId not known for this attempt");
}
}
@Override
public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
throws IOException, TezException {
ContainerId containerId = ConverterUtils.toContainerId(request
.getContainerIdentifier());
long requestId = request.getRequestId();
if (LOG.isDebugEnabled()) {
LOG.debug("Received heartbeat from container"
+ ", request=" + request);
}
ContainerInfo containerInfo = registeredContainers.get(containerId);
if(containerInfo == null) {
LOG.warn("Received task heartbeat from unknown container with id: " + containerId +
", asking it to die");
TezHeartbeatResponse response = new TezHeartbeatResponse();
response.setLastRequestId(requestId);
response.setShouldDie();
return response;
}
synchronized (containerInfo) {
pingContainerHeartbeatHandler(containerId);
if(containerInfo.lastRequestId == requestId) {
LOG.warn("Old sequenceId received: " + requestId
+ ", Re-sending last response to client");
return containerInfo.lastReponse;
}
TezHeartbeatResponse response = new TezHeartbeatResponse();
response.setLastRequestId(requestId);
TezTaskAttemptID taskAttemptID = request.getCurrentTaskAttemptID();
if (taskAttemptID != null) {
ContainerId containerIdFromMap = attemptToInfoMap.get(taskAttemptID);
if(containerIdFromMap == null || !containerIdFromMap.equals(containerId)) {
throw new TezException("Attempt " + taskAttemptID
+ " is not recognized for heartbeat");
}
if(containerInfo.lastRequestId+1 != requestId) {
throw new TezException("Container " + containerId
+ " has invalid request id. Expected: "
+ containerInfo.lastRequestId+1
+ " and actual: " + requestId);
}
List<TezEvent> inEvents = request.getEvents();
if (LOG.isDebugEnabled()) {
LOG.debug("Ping from " + taskAttemptID.toString() +
" events: " + (inEvents != null? inEvents.size() : -1));
}
long currTime = context.getClock().getTime();
List<TezEvent> otherEvents = new ArrayList<TezEvent>();
// route TASK_STATUS_UPDATE_EVENT directly to TaskAttempt and route other events
// (DATA_MOVEMENT_EVENT, TASK_ATTEMPT_COMPLETED_EVENT, TASK_ATTEMPT_FAILED_EVENT)
// to VertexImpl to ensure the events ordering
// 1. DataMovementEvent is logged as RecoveryEvent before TaskAttemptFinishedEvent
// 2. TaskStatusEvent is handled before TaskAttemptFinishedEvent
TaskAttemptEventStatusUpdate taskAttemptEvent = null;
boolean readErrorReported = false;
for (TezEvent tezEvent : ListUtils.emptyIfNull(inEvents)) {
// for now, set the event time on the AM when it is received.
// this avoids any time disparity between machines.
tezEvent.setEventReceivedTime(currTime);
final EventType eventType = tezEvent.getEventType();
// send TA_STATUS_UPDATE before TA_DONE/TA_FAILED/TA_KILLED otherwise Status may be missed
if (eventType == EventType.TASK_STATUS_UPDATE_EVENT) {
taskAttemptEvent = new TaskAttemptEventStatusUpdate(taskAttemptID,
(TaskStatusUpdateEvent) tezEvent.getEvent());
} else {
if (eventType == EventType.INPUT_READ_ERROR_EVENT) {
readErrorReported = true;
}
otherEvents.add(tezEvent);
}
}
if (taskAttemptEvent != null) {
taskAttemptEvent.setReadErrorReported(readErrorReported);
context.getEventHandler().handle(taskAttemptEvent);
}
if(!otherEvents.isEmpty()) {
TezVertexID vertexId = taskAttemptID.getTaskID().getVertexID();
context.getEventHandler().handle(
new VertexEventRouteEvent(vertexId, Collections.unmodifiableList(otherEvents)));
}
taskHeartbeatHandler.pinged(taskAttemptID);
TaskAttemptEventInfo eventInfo = context
.getCurrentDAG()
.getVertex(taskAttemptID.getTaskID().getVertexID())
.getTaskAttemptTezEvents(taskAttemptID, request.getStartIndex(),
request.getPreRoutedStartIndex(), request.getMaxEvents());
response.setEvents(eventInfo.getEvents());
response.setNextFromEventId(eventInfo.getNextFromEventId());
response.setNextPreRoutedEventId(eventInfo.getNextPreRoutedFromEventId());
}
containerInfo.lastRequestId = requestId;
containerInfo.lastReponse = response;
return response;
}
}
private Map<String, TezLocalResource> convertLocalResourceMap(Map<String, LocalResource> ylrs)
throws IOException {
Map<String, TezLocalResource> tlrs = Maps.newHashMap();
if (ylrs != null) {
for (Entry<String, LocalResource> ylrEntry : ylrs.entrySet()) {
TezLocalResource tlr;
try {
tlr = TezConverterUtils.convertYarnLocalResourceToTez(ylrEntry.getValue());
} catch (URISyntaxException e) {
throw new IOException(e);
}
tlrs.put(ylrEntry.getKey(), tlr);
}
}
return tlrs;
}
private ContainerTask getContainerTask(ContainerId containerId) throws IOException {
ContainerTask containerTask = null;
ContainerInfo containerInfo = registeredContainers.get(containerId);
if (containerInfo == null) {
// This can happen if an unregisterTask comes in after we've done the initial checks for
// registered containers. (Race between getTask from the container, and a potential STOP_CONTAINER
// from somewhere within the AM)
// Implies that an un-registration has taken place and the container needs to be asked to die.
LOG.info("Container with id: " + containerId
+ " is valid, but no longer registered, and will be killed");
containerTask = TASK_FOR_INVALID_JVM;
} else {
synchronized (containerInfo) {
if (containerInfo.amContainerTask != null) {
if (!containerInfo.taskPulled) {
containerInfo.taskPulled = true;
AMContainerTask amContainerTask = containerInfo.amContainerTask;
containerTask = new ContainerTask(amContainerTask.getTask(), false,
convertLocalResourceMap(amContainerTask.getAdditionalResources()),
amContainerTask.getCredentials(), amContainerTask.haveCredentialsChanged());
} else {
containerTask = null;
}
} else {
containerTask = null;
}
}
}
return containerTask;
}
}