blob: ec8e73fa5d0d24912137fbf255e075b9c788218e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.dag.event.DAGEventSchedulerUpdateTAAssigned;
import org.apache.tez.dag.app.rm.TaskSchedulerService.TaskSchedulerAppCallback;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.dag.app.rm.container.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
import com.google.common.base.Preconditions;
public class TaskSchedulerEventHandler extends AbstractService
implements TaskSchedulerAppCallback,
EventHandler<AMSchedulerEvent> {
static final Log LOG = LogFactory.getLog(TaskSchedulerEventHandler.class);
protected final AppContext appContext;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
protected TaskSchedulerService taskScheduler;
private DAGAppMaster dagAppMaster;
private Map<ApplicationAccessType, String> appAcls = null;
private Thread eventHandlingThread;
private volatile boolean stopEventHandling;
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
final DAGClientServer clientService;
private final ContainerSignatureMatcher containerSignatureMatcher;
private int cachedNodeCount = -1;
private AtomicBoolean shouldUnregisterFlag =
new AtomicBoolean(false);
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
@SuppressWarnings("rawtypes")
public TaskSchedulerEventHandler(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler, ContainerSignatureMatcher containerSignatureMatcher) {
super(TaskSchedulerEventHandler.class.getName());
this.appContext = appContext;
this.eventHandler = eventHandler;
this.clientService = clientService;
this.containerSignatureMatcher = containerSignatureMatcher;
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
return appAcls;
}
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
}
public int getNumClusterNodes() {
return cachedNodeCount;
}
public Resource getAvailableResources() {
return taskScheduler.getAvailableResources();
}
public Resource getTotalResources() {
return taskScheduler.getTotalResources();
}
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
LOG.info("Processing the event " + sEvent.toString());
switch (sEvent.getType()) {
case S_TA_LAUNCH_REQUEST:
handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
break;
case S_TA_ENDED: // TaskAttempt considered complete.
AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
switch(event.getState()) {
case FAILED:
case KILLED:
handleTAUnsuccessfulEnd((AMSchedulerEventTAEnded) sEvent);
break;
case SUCCEEDED:
handleTASucceeded(event);
break;
default:
throw new TezUncheckedException("Unexecpted TA_ENDED state: " + event.getState());
}
break;
case S_CONTAINER_DEALLOCATE:
handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
break;
case S_NODE_UNBLACKLISTED:
// fall through
case S_NODE_BLACKLISTED:
handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
break;
case S_NODE_UNHEALTHY:
break;
case S_NODE_HEALTHY:
// Consider changing this to work like BLACKLISTING.
break;
default:
break;
}
}
@Override
public void handle(AMSchedulerEvent event) {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
+ "of RMContainerAllocator: " + remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
throw new TezUncheckedException(e);
}
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
eventHandler.handle(event);
}
private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
taskScheduler.blacklistNode(event.getNodeId());
} else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
taskScheduler.unblacklistNode(event.getNodeId());
} else {
throw new TezUncheckedException("Invalid event type: " + event.getType());
}
}
private void handleContainerDeallocate(
AMSchedulerEventDeallocateContainer event) {
ContainerId containerId = event.getContainerId();
// TODO what happens to the task that was connected to this container?
// current assumption is that it will eventually call handleTaStopRequest
//TaskAttempt taskAttempt = (TaskAttempt)
taskScheduler.deallocateContainer(containerId);
// TODO does this container need to be stopped via C_STOP_REQUEST
sendEvent(new AMContainerEventStopRequest(containerId));
}
private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, false);
// use stored value of container id in case the scheduler has removed this
// assignment because the task has been deallocated earlier.
// retroactive case
ContainerId attemptContainerId = attempt.getAssignedContainerID();
if(!wasContainerAllocated) {
LOG.info("Task: " + attempt.getID() +
" has no container assignment in the scheduler");
if (attemptContainerId != null) {
LOG.error("No container allocated to task: " + attempt.getID()
+ " according to scheduler. Task reported container id: "
+ attemptContainerId);
}
}
if (attemptContainerId != null) {
// TODO either ways send the necessary events
// Ask the container to stop.
sendEvent(new AMContainerEventStopRequest(attemptContainerId));
// Inform the Node - the task has asked to be STOPPED / has already
// stopped.
sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
get(attemptContainerId).getContainer().getNodeId(), attemptContainerId,
attempt.getID(), event.getState() == TaskAttemptState.FAILED));
}
}
private void handleTASucceeded(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
ContainerId usedContainerId = event.getUsedContainerId();
// This could be null if a task fails / is killed before a container is
// assigned to it.
if (event.getUsedContainerId() != null) {
sendEvent(new AMContainerEventTASucceeded(usedContainerId,
event.getAttemptID()));
sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
get(usedContainerId).getContainer().getNodeId(), usedContainerId,
event.getAttemptID()));
}
boolean wasContainerAllocated = taskScheduler.deallocateTask(attempt, true);
if (!wasContainerAllocated) {
LOG.error("De-allocated successful task: " + attempt.getID()
+ ", but TaskScheduler reported no container assigned to task");
}
}
private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
TaskAttempt taskAttempt = event.getTaskAttempt();
TaskLocationHint locationHint = event.getLocationHint();
String hosts[] = null;
String racks[] = null;
if (locationHint != null) {
TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
if (taskAffinity != null) {
Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity
+ " for attempt: " + taskAttempt.getID());
int taskIndex = taskAffinity.getTaskIndex();
Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(),
"Invalid taskIndex in task based affinity " + taskAffinity
+ " for attempt: " + taskAttempt.getID());
TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
if (affinityAttempt != null) {
Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
taskScheduler.allocateTask(taskAttempt,
event.getCapability(),
affinityAttempt.getAssignedContainerID(),
Priority.newInstance(event.getPriority()),
event.getContainerContext(),
event);
return;
}
LOG.info("Attempt: " + taskAttempt.getID() + " has task based affinity to " + taskAffinity
+ " but no locality information exists for it. Ignoring hint.");
// fall through with null hosts/racks
} else {
hosts = (locationHint.getHosts() != null) ? locationHint
.getHosts().toArray(
new String[locationHint.getHosts().size()]) : null;
racks = (locationHint.getRacks() != null) ? locationHint.getRacks()
.toArray(new String[locationHint.getRacks().size()]) : null;
}
}
taskScheduler.allocateTask(taskAttempt,
event.getCapability(),
hosts,
racks,
Priority.newInstance(event.getPriority()),
event.getContainerContext(),
event);
}
protected TaskSchedulerService createTaskScheduler(String host, int port,
String trackingUrl, AppContext appContext) {
boolean isLocal = getConfig().getBoolean(TezConfiguration.TEZ_LOCAL_MODE,
TezConfiguration.TEZ_LOCAL_MODE_DEFAULT);
if (isLocal) {
return new LocalTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
}
else {
return new YarnTaskSchedulerService(this, this.containerSignatureMatcher,
host, port, trackingUrl, appContext);
}
}
@Override
public synchronized void serviceStart() {
InetSocketAddress serviceAddr = clientService.getBindAddress();
dagAppMaster = appContext.getAppMaster();
taskScheduler = createTaskScheduler(serviceAddr.getHostName(),
serviceAddr.getPort(), "", appContext);
taskScheduler.init(getConfig());
taskScheduler.start();
if (shouldUnregisterFlag.get()) {
// Flag may have been set earlier when task scheduler was not initialized
taskScheduler.setShouldUnregister();
}
this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@Override
public void run() {
AMSchedulerEvent event;
while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
try {
if (TaskSchedulerEventHandler.this.eventQueue.peek() == null) {
notifyForTest();
}
event = TaskSchedulerEventHandler.this.eventQueue.take();
} catch (InterruptedException e) {
if(!stopEventHandling) {
LOG.warn("Continuing after interrupt : ", e);
}
continue;
}
try {
handleEvent(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " to the TaskScheduler", t);
// Kill the AM.
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
return;
} finally {
notifyForTest();
}
}
}
};
this.eventHandlingThread.start();
}
protected void notifyForTest() {
}
@Override
public void serviceStop() {
synchronized(this) {
this.stopEventHandling = true;
if (eventHandlingThread != null)
eventHandlingThread.interrupt();
}
if (taskScheduler != null) {
((AbstractService)taskScheduler).stop();
}
}
// TaskSchedulerAppCallback methods
@Override
public synchronized void taskAllocated(Object task,
Object appCookie,
Container container) {
ContainerId containerId = container.getId();
if (appContext.getAllContainers().addContainerIfNew(container)) {
appContext.getNodeTracker().nodeSeen(container.getNodeId());
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), container.getId()));
}
AMSchedulerEventTALaunchRequest event =
(AMSchedulerEventTALaunchRequest) appCookie;
TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
// because the deallocateTask downcall may have raced with the
// taskAllocated() upcall
assert task.equals(taskAttempt);
if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
event.getContainerContext()));
}
sendEvent(new DAGEventSchedulerUpdateTAAssigned(taskAttempt, container));
sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
.getContainerContext().getCredentials()));
}
@Override
public synchronized void containerCompleted(Object task, ContainerStatus containerStatus) {
// Inform the Containers about completion.
AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
if (amContainer != null) {
String message = null;
int exitStatus = containerStatus.getExitStatus();
if (exitStatus == ContainerExitStatus.PREEMPTED) {
message = "Container preempted externally. ";
} else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
message = "Container disk failed. ";
} else {
message = "Container failed. ";
}
if (containerStatus.getDiagnostics() != null) {
message += containerStatus.getDiagnostics();
}
sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message));
}
}
@Override
public synchronized void containerBeingReleased(ContainerId containerId) {
AMContainer amContainer = appContext.getAllContainers().get(containerId);
if (amContainer != null) {
sendEvent(new AMContainerEventStopRequest(containerId));
}
}
@SuppressWarnings("unchecked")
@Override
public synchronized void nodesUpdated(List<NodeReport> updatedNodes) {
for (NodeReport nr : updatedNodes) {
// Scheduler will find out from the node, if at all.
// Relying on the RM to not allocate containers on an unhealthy node.
eventHandler.handle(new AMNodeEventStateChanged(nr));
}
}
@Override
public synchronized void appShutdownRequested() {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
LOG.info("App shutdown requested by scheduler");
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
}
@Override
public synchronized void setApplicationRegistrationData(
Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls,
ByteBuffer clientAMSecretKey) {
this.appContext.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
this.appAcls = appAcls;
this.clientService.setClientAMSecretKey(clientAMSecretKey);
}
// Not synchronized to avoid deadlocks from TaskScheduler callbacks.
// TaskScheduler uses a separate thread for it's callbacks. Since this method
// returns a value which is required, the TaskScheduler wait for the call to
// complete and can hence lead to a deadlock if called from within a TSEH lock.
@Override
public AppFinalStatus getFinalAppStatus() {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
StringBuffer sb = new StringBuffer();
if (dagAppMaster == null) {
finishState = FinalApplicationStatus.UNDEFINED;
sb.append("App not yet initialized");
} else {
DAGAppMasterState appMasterState = dagAppMaster.getState();
if (appMasterState == DAGAppMasterState.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED;
} else if (appMasterState == DAGAppMasterState.KILLED
|| (appMasterState == DAGAppMasterState.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED;
} else if (appMasterState == DAGAppMasterState.FAILED
|| appMasterState == DAGAppMasterState.ERROR) {
finishState = FinalApplicationStatus.FAILED;
} else {
finishState = FinalApplicationStatus.UNDEFINED;
}
List<String> diagnostics = dagAppMaster.getDiagnostics();
if(diagnostics != null) {
for (String s : diagnostics) {
sb.append(s).append("\n");
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("Setting job diagnostics to " + sb.toString());
}
String historyUrl = "";
/*String historyUrl = JobHistoryUtils.getHistoryUrl(getConfig(),
appContext.getApplicationID());
LOG.info("History url is " + historyUrl);*/
return new AppFinalStatus(finishState, sb.toString(), historyUrl);
}
// Not synchronized to avoid deadlocks from TaskScheduler callbacks.
// TaskScheduler uses a separate thread for it's callbacks. Since this method
// returns a value which is required, the TaskScheduler wait for the call to
// complete and can hence lead to a deadlock if called from within a TSEH lock.
@Override
public float getProgress() {
// at this point allocate has been called and so node count must be available
// may change after YARN-1722
int nodeCount = taskScheduler.getClusterNodeCount();
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount));
}
return dagAppMaster.getProgress();
}
@Override
public void onError(Throwable t) {
LOG.info("Error reported by scheduler", t);
sendEvent(new DAGAppMasterEventSchedulingServiceError(t));
}
public void dagCompleted() {
taskScheduler.resetMatchLocalityForAllHeldContainers();
}
@Override
public void preemptContainer(ContainerId containerId) {
taskScheduler.deallocateContainer(containerId);
// Inform the Containers about completion.
sendEvent(new AMContainerEventCompleted(containerId,
ContainerExitStatus.PREEMPTED, "Container preempted internally"));
}
public void setShouldUnregisterFlag() {
LOG.info("TaskScheduler notified that it should unregister from RM");
this.shouldUnregisterFlag.set(true);
if (this.taskScheduler != null) {
this.taskScheduler.setShouldUnregister();
}
}
public boolean hasUnregistered() {
return this.taskScheduler.hasUnregistered();
}
}