blob: d0d69d05c619658f9aeaa5a592a35153da5146ec [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.tez.Utils;
import org.apache.tez.dag.api.NamedEntityDescriptor;
import org.apache.tez.dag.api.TezConstants;
import org.apache.tez.dag.app.ServicePluginLifecycleAbstractService;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventUserServiceFatalError;
import org.apache.tez.serviceplugins.api.DagInfo;
import org.apache.tez.serviceplugins.api.ServicePluginError;
import org.apache.tez.serviceplugins.api.TaskScheduler;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext;
import org.apache.tez.serviceplugins.api.TaskSchedulerContext.AppFinalStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.common.ReflectionUtils;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.api.TezUncheckedException;
import org.apache.tez.dag.api.TaskLocationHint;
import org.apache.tez.dag.api.TaskLocationHint.TaskBasedLocationAffinity;
import org.apache.tez.dag.api.client.DAGClientServer;
import org.apache.tez.dag.api.oldrecords.TaskAttemptState;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.DAGAppMaster;
import org.apache.tez.dag.app.DAGAppMasterState;
import org.apache.tez.dag.app.dag.TaskAttempt;
import org.apache.tez.dag.app.dag.Vertex;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEvent;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventSchedulingServiceError;
import org.apache.tez.dag.app.dag.event.DAGAppMasterEventType;
import org.apache.tez.dag.app.rm.container.AMContainer;
import org.apache.tez.dag.app.rm.container.AMContainerEventAssignTA;
import org.apache.tez.dag.app.rm.container.AMContainerEventCompleted;
import org.apache.tez.dag.app.rm.container.AMContainerEventLaunchRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventStopRequest;
import org.apache.tez.dag.app.rm.container.AMContainerEventTASucceeded;
import org.apache.tez.dag.app.rm.container.AMContainerState;
import org.apache.tez.common.ContainerSignatureMatcher;
import org.apache.tez.dag.app.rm.node.AMNodeEventContainerAllocated;
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
import org.apache.tez.dag.app.rm.node.AMNodeEventStateChanged;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptEnded;
import org.apache.tez.dag.app.rm.node.AMNodeEventTaskAttemptSucceeded;
import org.apache.tez.dag.app.web.WebUIService;
import org.apache.tez.dag.records.TaskAttemptTerminationCause;
import org.apache.tez.hadoop.shim.HadoopShim;
import org.apache.tez.hadoop.shim.HadoopShimsLoader;
import com.google.common.base.Preconditions;
public class TaskSchedulerManager extends AbstractService implements
EventHandler<AMSchedulerEvent> {
static final Logger LOG = LoggerFactory.getLogger(TaskSchedulerManager.class);
static final String APPLICATION_ID_PLACEHOLDER = "__APPLICATION_ID__";
static final String HISTORY_URL_BASE = "__HISTORY_URL_BASE__";
protected final AppContext appContext;
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private final String historyUrl;
private DAGAppMaster dagAppMaster;
private Map<ApplicationAccessType, String> appAcls = null;
private Thread eventHandlingThread;
private volatile boolean stopEventHandling;
// Has a signal (SIGTERM etc) been issued?
protected volatile boolean isSignalled = false;
final DAGClientServer clientService;
private final ContainerSignatureMatcher containerSignatureMatcher;
private int cachedNodeCount = -1;
private AtomicBoolean shouldUnregisterFlag =
new AtomicBoolean(false);
private final WebUIService webUI;
private final NamedEntityDescriptor[] taskSchedulerDescriptors;
protected final TaskSchedulerWrapper[] taskSchedulers;
protected final ServicePluginLifecycleAbstractService []taskSchedulerServiceWrappers;
// Single executor service shared by all Schedulers for context callbacks
@VisibleForTesting
final ExecutorService appCallbackExecutor;
private final boolean isLocalMode;
// If running in non local-only mode, the YARN task scheduler will always run to take care of
// registration with YARN and heartbeats to YARN.
// Splitting registration and heartbeats is not straight-forward due to the taskScheduler being
// tied to a ContainerRequestType.
// Custom AppIds to avoid container conflicts if there's multiple sources
private final long SCHEDULER_APP_ID_BASE = 111101111;
private final long SCHEDULER_APP_ID_INCREMENT = 111111111;
private final HadoopShim hadoopShim;
BlockingQueue<AMSchedulerEvent> eventQueue
= new LinkedBlockingQueue<AMSchedulerEvent>();
// Not tracking container / task to schedulerId. Instead relying on everything flowing through
// the system and being propagated back via events.
@VisibleForTesting
@InterfaceAudience.Private
/**
* For Testing only
*/
public TaskSchedulerManager(TaskScheduler taskScheduler, AppContext appContext,
ContainerSignatureMatcher containerSignatureMatcher,
DAGClientServer clientService, ExecutorService appCallbackExecutor) {
super(TaskSchedulerManager.class.getName());
this.appContext = appContext;
this.containerSignatureMatcher = containerSignatureMatcher;
this.clientService = clientService;
this.eventHandler = appContext.getEventHandler();
this.appCallbackExecutor = appCallbackExecutor;
this.taskSchedulers = new TaskSchedulerWrapper[]{new TaskSchedulerWrapper(taskScheduler)};
this.taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[]{
new ServicePluginLifecycleAbstractService<>(taskScheduler)};
this.taskSchedulerDescriptors = null;
this.webUI = null;
this.historyUrl = null;
this.isLocalMode = false;
this.hadoopShim = new HadoopShimsLoader(appContext.getAMConf()).getHadoopShim();
}
/**
*
* @param appContext
* @param clientService
* @param eventHandler
* @param containerSignatureMatcher
* @param webUI
* @param schedulerDescriptors the list of scheduler descriptors. Tez internal classes will not have the class names populated.
* An empty list defaults to using the YarnTaskScheduler as the only source.
* @param isLocalMode whether the AM is running in local mode
*/
@SuppressWarnings("rawtypes")
public TaskSchedulerManager(AppContext appContext,
DAGClientServer clientService, EventHandler eventHandler,
ContainerSignatureMatcher containerSignatureMatcher,
WebUIService webUI,
List<NamedEntityDescriptor> schedulerDescriptors,
boolean isLocalMode,
HadoopShim hadoopShim) {
super(TaskSchedulerManager.class.getName());
Preconditions.checkArgument(schedulerDescriptors != null && !schedulerDescriptors.isEmpty(),
"TaskSchedulerDescriptors must be specified");
this.appContext = appContext;
this.eventHandler = eventHandler;
this.clientService = clientService;
this.containerSignatureMatcher = containerSignatureMatcher;
this.webUI = webUI;
this.historyUrl = getHistoryUrl();
this.isLocalMode = isLocalMode;
this.hadoopShim = hadoopShim;
this.appCallbackExecutor = createAppCallbackExecutorService();
if (this.webUI != null) {
this.webUI.setHistoryUrl(this.historyUrl);
}
this.taskSchedulerDescriptors = schedulerDescriptors.toArray(new NamedEntityDescriptor[schedulerDescriptors.size()]);
taskSchedulers = new TaskSchedulerWrapper[this.taskSchedulerDescriptors.length];
taskSchedulerServiceWrappers = new ServicePluginLifecycleAbstractService[this.taskSchedulerDescriptors.length];
}
public Map<ApplicationAccessType, String> getApplicationAcls() {
return appAcls;
}
public void setSignalled(boolean isSignalled) {
this.isSignalled = isSignalled;
LOG.info("TaskScheduler notified that iSignalled was : " + isSignalled);
}
public int getNumClusterNodes() {
return cachedNodeCount;
}
public Resource getAvailableResources(int schedulerId) {
try {
return taskSchedulers[schedulerId].getAvailableResources();
} catch (Exception e) {
String msg = "Error in TaskScheduler while getting available resources"
+ ", schedule=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
}
}
public Resource getTotalResources(int schedulerId) {
try {
return taskSchedulers[schedulerId].getTotalResources();
} catch (Exception e) {
String msg = "Error in TaskScheduler while getting total resources"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
}
}
private ExecutorService createAppCallbackExecutorService() {
return Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat("TaskSchedulerAppCallbackExecutor #%d")
.setDaemon(true)
.build());
}
public synchronized void handleEvent(AMSchedulerEvent sEvent) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing the event " + sEvent.toString());
}
switch (sEvent.getType()) {
case S_TA_LAUNCH_REQUEST:
handleTaLaunchRequest((AMSchedulerEventTALaunchRequest) sEvent);
break;
case S_TA_ENDED: // TaskAttempt considered complete.
AMSchedulerEventTAEnded event = (AMSchedulerEventTAEnded)sEvent;
switch(event.getState()) {
case FAILED:
case KILLED:
handleTAUnsuccessfulEnd(event);
break;
case SUCCEEDED:
handleTASucceeded(event);
break;
default:
throw new TezUncheckedException("Unexpected TA_ENDED state: " + event.getState());
}
break;
case S_CONTAINER_DEALLOCATE:
handleContainerDeallocate((AMSchedulerEventDeallocateContainer)sEvent);
break;
case S_NODE_UNBLACKLISTED:
// fall through
case S_NODE_BLACKLISTED:
handleNodeBlacklistUpdate((AMSchedulerEventNodeBlacklistUpdate)sEvent);
break;
case S_NODE_UNHEALTHY:
break;
case S_NODE_HEALTHY:
// Consider changing this to work like BLACKLISTING.
break;
default:
break;
}
}
@Override
public void handle(AMSchedulerEvent event) {
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0) {
LOG.info("Size of event-queue in RMContainerAllocator is " + qSize);
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue "
+ "of RMContainerAllocator: " + remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
throw new TezUncheckedException(e);
}
}
@SuppressWarnings("unchecked")
private void sendEvent(Event<?> event) {
eventHandler.handle(event);
}
private void handleNodeBlacklistUpdate(AMSchedulerEventNodeBlacklistUpdate event) {
boolean invalidEventType = false;
try {
if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
taskSchedulers[event.getSchedulerId()].blacklistNode(event.getNodeId());
} else if (event.getType() == AMSchedulerEventType.S_NODE_UNBLACKLISTED) {
taskSchedulers[event.getSchedulerId()].unblacklistNode(event.getNodeId());
} else {
invalidEventType = true;
}
} catch (Exception e) {
String msg = "Error in TaskScheduler for handling node blacklisting"
+ ", eventType=" + event.getType()
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
return;
}
if (invalidEventType) {
throw new TezUncheckedException("Invalid event type: " + event.getType());
}
}
private void handleContainerDeallocate(
AMSchedulerEventDeallocateContainer event) {
ContainerId containerId = event.getContainerId();
// TODO what happens to the task that was connected to this container?
// current assumption is that it will eventually call handleTaStopRequest
//TaskAttempt taskAttempt = (TaskAttempt)
try {
taskSchedulers[event.getSchedulerId()].deallocateContainer(containerId);
} catch (Exception e) {
String msg = "Error in TaskScheduler for handling Container De-allocation"
+ ", eventType=" + event.getType()
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ ", containerId=" + containerId;
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
return;
}
// TODO does this container need to be stopped via C_STOP_REQUEST
sendEvent(new AMContainerEventStopRequest(containerId));
}
private void handleTAUnsuccessfulEnd(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
// Propagate state and failure cause (if any) when informing the scheduler about the de-allocation.
boolean wasContainerAllocated = false;
try {
wasContainerAllocated = taskSchedulers[event.getSchedulerId()]
.deallocateTask(attempt, false, event.getTaskAttemptEndReason(), event.getDiagnostics());
} catch (Exception e) {
String msg = "Error in TaskScheduler for handling Task De-allocation"
+ ", eventType=" + event.getType()
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ ", taskAttemptId=" + attempt.getID();
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
return;
}
// use stored value of container id in case the scheduler has removed this
// assignment because the task has been deallocated earlier.
// retroactive case
ContainerId attemptContainerId = attempt.getAssignedContainerID();
if(!wasContainerAllocated) {
LOG.info("Task: " + attempt.getID() +
" has no container assignment in the scheduler");
if (attemptContainerId != null) {
LOG.error("No container allocated to task: " + attempt.getID()
+ " according to scheduler. Task reported container id: "
+ attemptContainerId);
}
}
if (attemptContainerId != null) {
// TODO either ways send the necessary events
// Ask the container to stop.
sendEvent(new AMContainerEventStopRequest(attemptContainerId));
// Inform the Node - the task has asked to be STOPPED / has already
// stopped.
// AMNodeImpl blacklisting logic does not account for KILLED attempts.
sendEvent(new AMNodeEventTaskAttemptEnded(appContext.getAllContainers().
get(attemptContainerId).getContainer().getNodeId(), event.getSchedulerId(),
attemptContainerId,
attempt.getID(), event.getState() == TaskAttemptState.FAILED));
}
}
private void handleTASucceeded(AMSchedulerEventTAEnded event) {
TaskAttempt attempt = event.getAttempt();
ContainerId usedContainerId = event.getUsedContainerId();
// This could be null if a task fails / is killed before a container is
// assigned to it.
if (event.getUsedContainerId() != null) {
sendEvent(new AMContainerEventTASucceeded(usedContainerId,
event.getAttemptID()));
sendEvent(new AMNodeEventTaskAttemptSucceeded(appContext.getAllContainers().
get(usedContainerId).getContainer().getNodeId(), event.getSchedulerId(), usedContainerId,
event.getAttemptID()));
}
boolean wasContainerAllocated = false;
try {
wasContainerAllocated = taskSchedulers[event.getSchedulerId()].deallocateTask(attempt,
true, null, event.getDiagnostics());
} catch (Exception e) {
String msg = "Error in TaskScheduler for handling Task De-allocation"
+ ", eventType=" + event.getType()
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ ", taskAttemptId=" + attempt.getID();
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
return;
}
if (!wasContainerAllocated) {
LOG.error("De-allocated successful task: " + attempt.getID()
+ ", but TaskScheduler reported no container assigned to task");
}
}
private void handleTaLaunchRequest(AMSchedulerEventTALaunchRequest event) {
TaskAttempt taskAttempt = event.getTaskAttempt();
TaskLocationHint locationHint = event.getLocationHint();
String hosts[] = null;
String racks[] = null;
if (locationHint != null) {
TaskBasedLocationAffinity taskAffinity = locationHint.getAffinitizedTask();
if (taskAffinity != null) {
Vertex vertex = appContext.getCurrentDAG().getVertex(taskAffinity.getVertexName());
Preconditions.checkNotNull(vertex, "Invalid vertex in task based affinity " + taskAffinity
+ " for attempt: " + taskAttempt.getID());
int taskIndex = taskAffinity.getTaskIndex();
Preconditions.checkState(taskIndex >=0 && taskIndex < vertex.getTotalTasks(),
"Invalid taskIndex in task based affinity " + taskAffinity
+ " for attempt: " + taskAttempt.getID());
TaskAttempt affinityAttempt = vertex.getTask(taskIndex).getSuccessfulAttempt();
if (affinityAttempt != null) {
Preconditions.checkNotNull(affinityAttempt.getAssignedContainerID(), affinityAttempt.getID());
try {
taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
event.getCapability(),
affinityAttempt.getAssignedContainerID(),
Priority.newInstance(event.getPriority()),
event.getContainerContext(),
event);
} catch (Exception e) {
String msg = "Error in TaskScheduler for handling Task Allocation"
+ ", eventType=" + event.getType()
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ ", taskAttemptId=" + taskAttempt.getID();
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
}
return;
}
LOG.info("No attempt for task affinity to " + taskAffinity + " for attempt "
+ taskAttempt.getID() + " Ignoring.");
// fall through with null hosts/racks
} else {
hosts = (locationHint.getHosts() != null) ? locationHint
.getHosts().toArray(
new String[locationHint.getHosts().size()]) : null;
racks = (locationHint.getRacks() != null) ? locationHint.getRacks()
.toArray(new String[locationHint.getRacks().size()]) : null;
}
}
try {
taskSchedulers[event.getSchedulerId()].allocateTask(taskAttempt,
event.getCapability(),
hosts,
racks,
Priority.newInstance(event.getPriority()),
event.getContainerContext(),
event);
} catch (Exception e) {
String msg = "Error in TaskScheduler for handling Task Allocation"
+ ", eventType=" + event.getType()
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(event.getSchedulerId(), appContext)
+ ", taskAttemptId=" + taskAttempt.getID();
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
}
}
@VisibleForTesting
TaskScheduler createTaskScheduler(String host, int port, String trackingUrl,
AppContext appContext,
NamedEntityDescriptor taskSchedulerDescriptor,
long customAppIdIdentifier,
int schedulerId) throws TezException {
TaskSchedulerContext rawContext =
new TaskSchedulerContextImpl(this, appContext, schedulerId, trackingUrl,
customAppIdIdentifier, host, port, taskSchedulerDescriptor.getUserPayload());
TaskSchedulerContext wrappedContext = wrapTaskSchedulerContext(rawContext);
String schedulerName = taskSchedulerDescriptor.getEntityName();
if (schedulerName.equals(TezConstants.getTezYarnServicePluginName())) {
return createYarnTaskScheduler(wrappedContext, schedulerId);
} else if (schedulerName.equals(TezConstants.getTezUberServicePluginName())) {
return createUberTaskScheduler(wrappedContext, schedulerId);
} else {
return createCustomTaskScheduler(wrappedContext, taskSchedulerDescriptor, schedulerId);
}
}
@VisibleForTesting
TaskSchedulerContext wrapTaskSchedulerContext(TaskSchedulerContext rawContext) {
return new TaskSchedulerContextImplWrapper(rawContext, appCallbackExecutor);
}
@VisibleForTesting
TaskScheduler createYarnTaskScheduler(TaskSchedulerContext taskSchedulerContext,
int schedulerId) {
LOG.info("Creating TaskScheduler: YarnTaskSchedulerService");
return new YarnTaskSchedulerService(taskSchedulerContext);
}
@VisibleForTesting
TaskScheduler createUberTaskScheduler(TaskSchedulerContext taskSchedulerContext,
int schedulerId) {
LOG.info("Creating TaskScheduler: Local TaskScheduler with clusterIdentifier={}",
taskSchedulerContext.getCustomClusterIdentifier());
return new LocalTaskSchedulerService(taskSchedulerContext);
}
@SuppressWarnings("unchecked")
TaskScheduler createCustomTaskScheduler(TaskSchedulerContext taskSchedulerContext,
NamedEntityDescriptor taskSchedulerDescriptor,
int schedulerId) throws TezException {
LOG.info("Creating custom TaskScheduler {}:{} with clusterIdentifier={}", taskSchedulerDescriptor.getEntityName(),
taskSchedulerDescriptor.getClassName(), taskSchedulerContext.getCustomClusterIdentifier());
return ReflectionUtils.createClazzInstance(taskSchedulerDescriptor.getClassName(),
new Class[]{TaskSchedulerContext.class},
new Object[]{taskSchedulerContext});
}
@VisibleForTesting
protected void instantiateSchedulers(String host, int port, String trackingUrl,
AppContext appContext) throws TezException {
// Iterate over the list and create all the taskSchedulers
int j = 0;
for (int i = 0; i < taskSchedulerDescriptors.length; i++) {
long customAppIdIdentifier;
if ((isLocalMode && taskSchedulerDescriptors[i].getEntityName()
.equals(TezConstants.getTezUberServicePluginName()) ||
taskSchedulerDescriptors[i].getEntityName()
.equals(TezConstants.getTezYarnServicePluginName()))) {
// Use the provided appId instead of constructing one for containers.
customAppIdIdentifier = appContext.getApplicationID().getClusterTimestamp();
} else {
customAppIdIdentifier = SCHEDULER_APP_ID_BASE + (j++ * SCHEDULER_APP_ID_INCREMENT);
}
taskSchedulers[i] = new TaskSchedulerWrapper(createTaskScheduler(host, port,
trackingUrl, appContext, taskSchedulerDescriptors[i], customAppIdIdentifier, i));
taskSchedulerServiceWrappers[i] =
new ServicePluginLifecycleAbstractService<>(taskSchedulers[i].getTaskScheduler());
}
}
@Override
public synchronized void serviceStart() throws Exception {
InetSocketAddress serviceAddr = clientService.getBindAddress();
dagAppMaster = appContext.getAppMaster();
// if web service is enabled then set tracking url. else disable it (value = "").
// the actual url set on the rm web ui will be the proxy url set by WebAppProxyServlet, which
// always try to connect to AM and proxy the response. hence it wont work if the webUIService
// is not enabled.
String trackingUrl = (webUI != null) ? webUI.getTrackingURL() : "";
instantiateSchedulers(serviceAddr.getHostName(), serviceAddr.getPort(), trackingUrl, appContext);
for (int i = 0 ; i < taskSchedulers.length ; i++) {
taskSchedulerServiceWrappers[i].init(getConfig());
taskSchedulerServiceWrappers[i].start();
if (shouldUnregisterFlag.get()) {
// Flag may have been set earlier when task scheduler was not initialized
// External services could need to talk to some other entity.
taskSchedulers[i].setShouldUnregister();
}
}
this.eventHandlingThread = new Thread("TaskSchedulerEventHandlerThread") {
@Override
public void run() {
AMSchedulerEvent event;
while (!stopEventHandling && !Thread.currentThread().isInterrupted()) {
try {
if (TaskSchedulerManager.this.eventQueue.peek() == null) {
notifyForTest();
}
event = TaskSchedulerManager.this.eventQueue.take();
} catch (InterruptedException e) {
if(!stopEventHandling) {
LOG.warn("Continuing after interrupt : ", e);
}
continue;
}
try {
handleEvent(event);
} catch (Throwable t) {
LOG.error("Error in handling event type " + event.getType()
+ " to the TaskScheduler", t);
// Kill the AM.
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.INTERNAL_ERROR));
return;
} finally {
notifyForTest();
}
}
}
};
this.eventHandlingThread.start();
}
protected void notifyForTest() {
}
public void initiateStop() {
for (int i = 0 ; i < taskSchedulers.length ; i++) {
try {
taskSchedulers[i].getTaskScheduler().initiateStop();
} catch (Exception e) {
// Ignore for now as scheduler stop invoked on shutdown
LOG.error("Failed to do a clean initiateStop for Scheduler: "
+ Utils.getTaskSchedulerIdentifierString(i, appContext), e);
}
}
}
@Override
public void serviceStop() throws InterruptedException {
synchronized(this) {
this.stopEventHandling = true;
if (eventHandlingThread != null)
eventHandlingThread.interrupt();
}
for (int i = 0 ; i < taskSchedulers.length ; i++) {
if (taskSchedulers[i] != null) {
taskSchedulerServiceWrappers[i].stop();
}
}
LOG.info("Shutting down AppCallbackExecutor");
appCallbackExecutor.shutdownNow();
appCallbackExecutor.awaitTermination(1000l, TimeUnit.MILLISECONDS);
}
// TaskSchedulerAppCallback methods with schedulerId, where relevant
public synchronized void taskAllocated(int schedulerId, Object task,
Object appCookie,
Container container) {
AMSchedulerEventTALaunchRequest event =
(AMSchedulerEventTALaunchRequest) appCookie;
ContainerId containerId = container.getId();
if (appContext.getAllContainers()
.addContainerIfNew(container, schedulerId, event.getLauncherId(),
event.getTaskCommId())) {
appContext.getNodeTracker().nodeSeen(container.getNodeId(), schedulerId);
sendEvent(new AMNodeEventContainerAllocated(container
.getNodeId(), schedulerId, container.getId()));
}
TaskAttempt taskAttempt = event.getTaskAttempt();
// TODO - perhaps check if the task still needs this container
// because the deallocateTask downcall may have raced with the
// taskAllocated() upcall
assert task.equals(taskAttempt);
if (appContext.getAllContainers().get(containerId).getState() == AMContainerState.ALLOCATED) {
sendEvent(new AMContainerEventLaunchRequest(containerId, taskAttempt.getVertexID(),
event.getContainerContext(), event.getLauncherId(), event.getTaskCommId()));
}
sendEvent(new AMContainerEventAssignTA(containerId, taskAttempt.getID(),
event.getRemoteTaskSpec(), event.getContainerContext().getLocalResources(), event
.getContainerContext().getCredentials(), event.getPriority()));
}
public synchronized void containerCompleted(int schedulerId, Object task, ContainerStatus containerStatus) {
// SchedulerId isn't used here since no node updates are sent out
// Inform the Containers about completion.
AMContainer amContainer = appContext.getAllContainers().get(containerStatus.getContainerId());
if (amContainer != null) {
String message = "Container completed. ";
TaskAttemptTerminationCause errCause = TaskAttemptTerminationCause.CONTAINER_EXITED;
int exitStatus = containerStatus.getExitStatus();
if (exitStatus == ContainerExitStatus.PREEMPTED) {
message = "Container preempted externally. ";
errCause = TaskAttemptTerminationCause.EXTERNAL_PREEMPTION;
} else if (exitStatus == ContainerExitStatus.DISKS_FAILED) {
message = "Container disk failed. ";
errCause = TaskAttemptTerminationCause.NODE_DISK_ERROR;
} else if (exitStatus != ContainerExitStatus.SUCCESS){
message = "Container failed, exitCode=" + exitStatus + ". ";
}
if (containerStatus.getDiagnostics() != null) {
message += containerStatus.getDiagnostics();
}
sendEvent(new AMContainerEventCompleted(amContainer.getContainerId(), exitStatus, message, errCause));
}
}
public synchronized void containerBeingReleased(int schedulerId, ContainerId containerId) {
// SchedulerId isn't used here since no node updates are sent out
AMContainer amContainer = appContext.getAllContainers().get(containerId);
if (amContainer != null) {
sendEvent(new AMContainerEventStopRequest(containerId));
}
}
@SuppressWarnings("unchecked")
public synchronized void nodesUpdated(int schedulerId, List<NodeReport> updatedNodes) {
for (NodeReport nr : updatedNodes) {
// Scheduler will find out from the node, if at all.
// Relying on the RM to not allocate containers on an unhealthy node.
eventHandler.handle(new AMNodeEventStateChanged(nr, schedulerId));
}
}
public synchronized void appShutdownRequested(int schedulerId) {
// This can happen if the RM has been restarted. If it is in that state,
// this application must clean itself up.
LOG.info("App shutdown requested by scheduler {}", schedulerId);
sendEvent(new DAGAppMasterEvent(DAGAppMasterEventType.AM_REBOOT));
}
public synchronized void setApplicationRegistrationData(
int schedulerId,
Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls,
ByteBuffer clientAMSecretKey) {
this.appContext.getClusterInfo().setMaxContainerCapability(
maxContainerCapability);
this.appAcls = appAcls;
this.clientService.setClientAMSecretKey(clientAMSecretKey);
}
// Not synchronized to avoid deadlocks from TaskScheduler callbacks.
// TaskScheduler uses a separate thread for it's callbacks. Since this method
// returns a value which is required, the TaskScheduler wait for the call to
// complete and can hence lead to a deadlock if called from within a TSEH lock.
public AppFinalStatus getFinalAppStatus() {
FinalApplicationStatus finishState = FinalApplicationStatus.UNDEFINED;
StringBuffer sb = new StringBuffer();
if (dagAppMaster == null) {
finishState = FinalApplicationStatus.UNDEFINED;
sb.append("App not yet initialized");
} else {
DAGAppMasterState appMasterState = dagAppMaster.getState();
if (appMasterState == DAGAppMasterState.SUCCEEDED) {
finishState = FinalApplicationStatus.SUCCEEDED;
} else if (appMasterState == DAGAppMasterState.KILLED
|| (appMasterState == DAGAppMasterState.RUNNING && isSignalled)) {
finishState = FinalApplicationStatus.KILLED;
} else if (appMasterState == DAGAppMasterState.FAILED
|| appMasterState == DAGAppMasterState.ERROR) {
finishState = FinalApplicationStatus.FAILED;
} else {
finishState = FinalApplicationStatus.UNDEFINED;
}
finishState = hadoopShim.applyFinalApplicationStatusCorrection(finishState,
dagAppMaster.isSession(), appMasterState == DAGAppMasterState.ERROR);
List<String> diagnostics = dagAppMaster.getDiagnostics();
if(diagnostics != null) {
for (String s : diagnostics) {
sb.append(s).append("\n");
}
}
}
if(LOG.isDebugEnabled()) {
LOG.debug("Setting job diagnostics to " + sb.toString());
}
// if history url is set use the same, if historyUrl is set to "" then rm ui disables the
// history url
return new AppFinalStatus(finishState, sb.toString(), historyUrl);
}
// Not synchronized to avoid deadlocks from TaskScheduler callbacks.
// TaskScheduler uses a separate thread for it's callbacks. Since this method
// returns a value which is required, the TaskScheduler wait for the call to
// complete and can hence lead to a deadlock if called from within a TSEH lock.
public float getProgress(int schedulerId) {
// at this point allocate has been called and so node count must be available
// may change after YARN-1722
// This is a heartbeat in from the scheduler into the APP, and is being used to piggy-back and
// node updates from the cluster.
// Doubles as a mechanism to update node counts periodically. Hence schedulerId required.
// TODO Handle this in TEZ-2124. Need a way to know which scheduler is calling in.
int nodeCount = 0;
try {
nodeCount = taskSchedulers[0].getClusterNodeCount();
} catch (Exception e) {
String msg = "Error in TaskScheduler while getting node count"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(schedulerId, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
throw new RuntimeException(e);
}
if (nodeCount != cachedNodeCount) {
cachedNodeCount = nodeCount;
sendEvent(new AMNodeEventNodeCountUpdated(cachedNodeCount, schedulerId));
}
return dagAppMaster.getProgress();
}
public void reportError(int taskSchedulerIndex, ServicePluginError servicePluginError,
String diagnostics,
DagInfo dagInfo) {
if (servicePluginError == YarnTaskSchedulerServiceError.RESOURCEMANAGER_ERROR) {
LOG.info("Error reported by scheduler {} - {}",
Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext) + ": " +
diagnostics);
if (taskSchedulerDescriptors[taskSchedulerIndex].getClassName()
.equals(YarnTaskSchedulerService.class.getName())) {
LOG.warn(
"Reporting a SchedulerServiceError to the DAGAppMaster since the error" +
" was reported by the default YARN Task Scheduler");
sendEvent(new DAGAppMasterEventSchedulingServiceError(diagnostics));
}
} else if (servicePluginError.getErrorType() == ServicePluginError.ErrorType.PERMANENT) {
String msg = "Fatal error reported by TaskScheduler"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext)
+ ", servicePluginError=" + servicePluginError
+ ", diagnostics= " + (diagnostics == null ? "" : diagnostics);
LOG.error(msg);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, null));
} else {
Utils.processNonFatalServiceErrorReport(
Utils.getTaskSchedulerIdentifierString(taskSchedulerIndex, appContext),
servicePluginError, diagnostics, dagInfo,
appContext, "TaskScheduler");
}
}
public void dagCompleted() {
for (int i = 0 ; i < taskSchedulers.length ; i++) {
try {
taskSchedulers[i].dagComplete();
} catch (Exception e) {
String msg = "Error in TaskScheduler when notified for Dag Completion"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
}
}
}
public void dagSubmitted() {
// Nothing to do right now. Indicates that a new DAG has been submitted and
// the context has updated information.
}
public void preemptContainer(int schedulerId, ContainerId containerId) {
// TODO Why is this making a call back into the scheduler, when the call is originating from there.
// An AMContainer instance should already exist if an attempt is being made to preempt it
AMContainer amContainer = appContext.getAllContainers().get(containerId);
try {
taskSchedulers[amContainer.getTaskSchedulerIdentifier()].deallocateContainer(containerId);
} catch (Exception e) {
String msg = "Error in TaskScheduler when preempting container"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(amContainer.getTaskSchedulerIdentifier(), appContext)
+ ", containerId=" + containerId;
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
}
// Inform the Containers about completion.
sendEvent(new AMContainerEventCompleted(containerId, ContainerExitStatus.INVALID,
"Container preempted internally", TaskAttemptTerminationCause.INTERNAL_PREEMPTION));
}
public void setShouldUnregisterFlag() {
LOG.info("TaskScheduler notified that it should unregister from RM");
this.shouldUnregisterFlag.set(true);
for (int i = 0 ; i < taskSchedulers.length ; i++) {
if (this.taskSchedulers[i] != null) {
try {
this.taskSchedulers[i].setShouldUnregister();
} catch (Exception e) {
String msg = "Error in TaskScheduler when setting Unregister Flag"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
}
}
}
}
public ContainerSignatureMatcher getContainerSignatureMatcher() {
return containerSignatureMatcher;
}
public boolean hasUnregistered() {
boolean result = true;
for (int i = 0 ; i < taskSchedulers.length ; i++) {
// Explicitly not catching any exceptions around this API
// No clear route to recover. Better to crash.
try {
result = result & this.taskSchedulers[i].hasUnregistered();
} catch (Exception e) {
String msg = "Error in TaskScheduler when checking if a scheduler has unregistered"
+ ", scheduler=" + Utils.getTaskSchedulerIdentifierString(i, appContext);
LOG.error(msg, e);
sendEvent(
new DAGAppMasterEventUserServiceFatalError(
DAGAppMasterEventType.TASK_SCHEDULER_SERVICE_FATAL_ERROR,
msg, e));
}
if (result == false) {
return result;
}
}
return result;
}
@VisibleForTesting
public String getHistoryUrl() {
Configuration config = this.appContext.getAMConf();
String historyUrl = "";
String historyUrlTemplate = config.get(TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE,
TezConfiguration.TEZ_AM_TEZ_UI_HISTORY_URL_TEMPLATE_DEFAULT);
String historyUrlBase = config.get(TezConfiguration.TEZ_HISTORY_URL_BASE, "");
if (!historyUrlTemplate.isEmpty() &&
!historyUrlBase.isEmpty()) {
// replace the placeholders, while tolerating extra or missing "/" in input. replace all
// instances of consecutive "/" with single (except for the http(s):// case
historyUrl = historyUrlTemplate
.replaceAll(APPLICATION_ID_PLACEHOLDER, appContext.getApplicationID().toString())
.replaceAll(HISTORY_URL_BASE, historyUrlBase)
.replaceAll("([^:])/{2,}", "$1/");
// make sure we have a valid scheme
if (!historyUrl.startsWith("http")) {
historyUrl = "http://" + historyUrl;
}
}
return historyUrl;
}
public String getTaskSchedulerClassName(int taskSchedulerIndex) {
return taskSchedulers[taskSchedulerIndex].getTaskScheduler().getClass().getName();
}
}