blob: 58795d707041a728768e771f3d7ef721b1e76cad [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tez.dag.app.rm;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnRuntimeException;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
import org.apache.hadoop.yarn.client.AMRMClientAsync;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.RackResolver;
import org.apache.tez.dag.api.TezException;
import org.apache.tez.dag.app.rm.TaskScheduler.TaskSchedulerAppCallback.AppFinalStatus;
import com.google.common.annotations.VisibleForTesting;
/* TODO not yet updating cluster nodes on every allocate response
* from RMContainerRequestor
import org.apache.tez.dag.app.rm.node.AMNodeEventNodeCountUpdated;
if (clusterNmCount != lastClusterNmCount) {
LOG.info("Num cluster nodes changed from " + lastClusterNmCount + " to "
+ clusterNmCount);
eventHandler.handle(new AMNodeEventNodeCountUpdated(clusterNmCount));
}
*/
public class TaskScheduler extends AbstractService
implements AMRMClientAsync.CallbackHandler {
private static final Log LOG = LogFactory.getLog(TaskScheduler.class);
public interface TaskSchedulerAppCallback {
public class AppFinalStatus {
public final FinalApplicationStatus exitStatus;
public final String exitMessage;
public final String postCompletionTrackingUrl;
public AppFinalStatus(FinalApplicationStatus exitStatus,
String exitMessage,
String posCompletionTrackingUrl) {
this.exitStatus = exitStatus;
this.exitMessage = exitMessage;
this.postCompletionTrackingUrl = posCompletionTrackingUrl;
}
}
// upcall to app must be outside locks
public void taskAllocated(Object task,
Object appCookie,
Container container);
// this may end up being called for a task+container pair that the app
// has not heard about. this can happen because of a race between
// taskAllocated() upcall and deallocateTask() downcall
public void containerCompleted(Object taskLastAllocated,
ContainerStatus containerStatus);
public void nodesUpdated(List<NodeReport> updatedNodes);
public void appRebootRequested();
public void setApplicationRegistrationData(
Resource minContainerCapability,
Resource maxContainerCapability,
Map<ApplicationAccessType, String> appAcls
);
public void onError(Exception e);
public float getProgress();
public AppFinalStatus getFinalAppStatus();
}
final AMRMClientAsync<CookieContainerRequest> amRmClient;
final TaskSchedulerAppCallback appClient;
Map<Object, CookieContainerRequest> taskRequests =
new HashMap<Object, CookieContainerRequest>();
Map<Object, Container> taskAllocations =
new HashMap<Object, Container>();
Map<ContainerId, Object> containerAssigments =
new HashMap<ContainerId, Object>();
HashMap<ContainerId, Object> releasedContainers =
new HashMap<ContainerId, Object>();
final String appHostName;
final int appHostPort;
final String appTrackingUrl;
boolean isStopped = false;
class CRCookie {
Object task;
Object appCookie;
}
class CookieContainerRequest extends StoredContainerRequest {
CRCookie cookie;
public CookieContainerRequest(Resource capability, String[] hosts,
String[] racks, Priority priority, CRCookie cookie) {
super(capability, hosts, racks, priority);
this.cookie = cookie;
}
CRCookie getCookie() {
return cookie;
}
}
public TaskScheduler(ApplicationAttemptId id,
TaskSchedulerAppCallback appClient,
String appHostName,
int appHostPort,
String appTrackingUrl) {
super(TaskScheduler.class.getName());
this.appClient = appClient;
this.amRmClient =
new AMRMClientAsync<CookieContainerRequest>(id, 1000, this);
this.appHostName = appHostName;
this.appHostPort = appHostPort;
this.appTrackingUrl = appTrackingUrl;
}
@Private
@VisibleForTesting
TaskScheduler(ApplicationAttemptId id,
TaskSchedulerAppCallback appClient,
String appHostName,
int appHostPort,
String appTrackingUrl,
AMRMClientAsync<CookieContainerRequest> client) {
super(TaskScheduler.class.getName());
this.appClient = appClient;
this.amRmClient = client;
this.appHostName = appHostName;
this.appHostPort = appHostPort;
this.appTrackingUrl = appTrackingUrl;
}
public Resource getClusterAvailableResources() {
return amRmClient.getClusterAvailableResources();
}
public int getClusterNodeCount() {
return amRmClient.getClusterNodeCount();
}
// AbstractService methods
@Override
public synchronized void init(Configuration conf) {
super.init(conf);
amRmClient.init(conf);
}
@Override
public void start() {
try {
RegisterApplicationMasterResponse response = null;
synchronized (this) {
amRmClient.start();
super.start();
response = amRmClient.registerApplicationMaster(appHostName,
appHostPort,
appTrackingUrl);
}
// upcall to app outside locks
appClient.setApplicationRegistrationData(
response.getMinimumResourceCapability(),
response.getMaximumResourceCapability(),
response.getApplicationACLs());
} catch (YarnException e) {
LOG.error("Yarn Exception while registering", e);
throw new YarnRuntimeException(e);
} catch (IOException e) {
LOG.error("IO Exception while registering", e);
throw new YarnRuntimeException(e);
}
}
@Override
public void stop() {
// upcall to app outside of locks
AppFinalStatus status = appClient.getFinalAppStatus();
try {
// TODO TEZ-36 dont unregister automatically after reboot sent by RM
synchronized (this) {
isStopped = true;
amRmClient.unregisterApplicationMaster(status.exitStatus,
status.exitMessage,
status.postCompletionTrackingUrl);
}
// call client.stop() without lock client will attempt to stop the callback
// operation and at the same time the callback operation might be trying
// to get our lock.
amRmClient.stop();
super.stop();
} catch (YarnException e) {
LOG.error("Yarn Exception while unregistering ", e);
throw new TezException(e);
} catch (IOException e) {
LOG.error("IOException while unregistering ", e);
throw new TezException(e);
}
}
// AMRMClientAsync interface methods
@Override
public void onContainersCompleted(List<ContainerStatus> statuses) {
if(isStopped) {
return;
}
Map<Object, ContainerStatus> appContainerStatus =
new HashMap<Object, ContainerStatus>(statuses.size());
synchronized (this) {
for(ContainerStatus containerStatus : statuses) {
ContainerId completedId = containerStatus.getContainerId();
Object task = releasedContainers.remove(completedId);
if(task != null){
// TODO later we may want to check if exit code matched expectation
// e.g. successful container should not come back fail exit code after
// being released
// completion of a container we had released earlier
// an allocated container completed. notify app
LOG.info("Released container completed:" + completedId +
" last allocated to task: " + task);
appContainerStatus.put(task, containerStatus);
continue;
}
// not found in released containers. check currently allocated containers
// no need to release this container as the RM has already completed it
task = unAssignContainer(completedId, false);
if(task != null) {
// completion of a container we have allocated currently
// an allocated container completed. notify app
LOG.info("Allocated container completed:" + completedId +
" last allocated to task: " + task);
appContainerStatus.put(task, containerStatus);
continue;
}
// container neither allocated nor released
LOG.info("Ignoring unknown container: " + containerStatus.getContainerId());
}
}
// upcall to app must be outside locks
for(Entry<Object, ContainerStatus> entry : appContainerStatus.entrySet()) {
appClient.containerCompleted(entry.getKey(), entry.getValue());
}
}
@Override
public void onContainersAllocated(List<Container> containers) {
if(isStopped) {
return;
}
Map<CookieContainerRequest, Container> appContainers =
new HashMap<CookieContainerRequest, Container>(containers.size());
synchronized (this) {
for(Container container : containers) {
String location = container.getNodeId().getHost();
CookieContainerRequest assigned = getMatchingRequest(container, location);
if(assigned == null) {
location = RackResolver.resolve(location).getNetworkLocation();
assigned = getMatchingRequest(container, location);
}
if(assigned == null) {
location = ResourceRequest.ANY;
assigned = getMatchingRequest(container, location);
}
if(assigned == null) {
// not matched anything. release container
// Probably we cancelled a request and RM allocated that to us
// before RM heard of the cancellation
releaseContainer(container.getId(), null);
LOG.info("No RM requests matching container: " + container);
continue;
}
Object task = getTask(assigned);
assert task != null;
assignContainer(task, container, assigned);
appContainers.put(assigned, container);
LOG.info("Assigning container: " + container +
" for task: " + task +
" at locality: " + location);
}
}
// upcall to app must be outside locks
for (Entry<CookieContainerRequest, Container> entry :
appContainers.entrySet()) {
CookieContainerRequest assigned = entry.getKey();
appClient.taskAllocated(getTask(assigned), assigned.getCookie().appCookie,
entry.getValue());
}
}
@Override
public void onRebootRequest() {
if(isStopped) {
return;
}
// upcall to app must be outside locks
appClient.appRebootRequested();
}
@Override
public void onNodesUpdated(List<NodeReport> updatedNodes) {
if(isStopped) {
return;
}
// ignore bad nodes for now
// upcall to app must be outside locks
appClient.nodesUpdated(updatedNodes);
}
@Override
public float getProgress() {
if(isStopped) {
return 1;
}
return appClient.getProgress();
}
@Override
public void onError(Exception e) {
if(isStopped) {
return;
}
appClient.onError(e);
}
public synchronized void allocateTask(Object task,
Resource capability,
String[] hosts,
String[] racks,
Priority priority,
Object clientCookie) {
// TODO check for nulls etc
// TODO extra memory allocation
CRCookie cookie = new CRCookie();
cookie.task = task;
cookie.appCookie = clientCookie;
CookieContainerRequest request =
new CookieContainerRequest(capability,
hosts,
racks,
priority,
cookie);
addTaskRequest(task, request);
LOG.info("Allocation request for task: " + task +
" with request: " + request);
}
public synchronized Container deallocateTask(Object task) {
CookieContainerRequest request = removeTaskRequest(task);
if(request != null) {
// task not allocated yet
LOG.info("Deallocating task: " + task + " before allocation");
return null;
}
// task request not present. Look in allocations
Container container = unAssignContainer(task, true);
if(container != null) {
LOG.info("Deallocated task: " + task +
" from container: " + container.getId());
return container;
}
// task neither requested nor allocated.
LOG.info("Ignoring removal of unknown task: " + task);
return null;
}
public synchronized Object deallocateContainer(ContainerId containerId) {
Object task = unAssignContainer(containerId, true);
if(task != null) {
LOG.info("Deallocated container: " + containerId +
" from task: " + task);
return task;
}
LOG.info("Ignoring dealloction of unknown container: " + containerId);
return null;
}
private CookieContainerRequest getMatchingRequest(
Container container, String location) {
Priority priority = container.getPriority();
Resource capability = container.getResource();
CookieContainerRequest assigned = null;
List<? extends Collection<CookieContainerRequest>> requestsList =
amRmClient.getMatchingRequests(priority, location, capability);
if(requestsList.size() > 0) {
// pick first one
for(Collection<CookieContainerRequest> requests : requestsList) {
Iterator<CookieContainerRequest> iterator = requests.iterator();
if(iterator.hasNext()) {
assigned = requests.iterator().next();
}
}
}
return assigned;
}
private Object getTask(CookieContainerRequest request) {
return request.getCookie().task;
}
private void releaseContainer(ContainerId containerId, Object task) {
amRmClient.releaseAssignedContainer(containerId);
if(task != null) {
releasedContainers.put(containerId, task);
}
}
private void assignContainer(Object task,
Container container,
CookieContainerRequest assigned) {
CookieContainerRequest request = removeTaskRequest(task);
assert request != null;
//assert assigned.equals(request);
Container result = taskAllocations.put(task, container);
assert result == null;
containerAssigments.put(container.getId(), task);
}
private CookieContainerRequest removeTaskRequest(Object task) {
CookieContainerRequest request = taskRequests.remove(task);
if(request != null) {
// remove all references of the request from AMRMClient
amRmClient.removeContainerRequest(request);
}
return request;
}
private void addTaskRequest(Object task,
CookieContainerRequest request) {
// TODO TEZ-37 fix duplicate handling
taskRequests.put(task, request);
amRmClient.addContainerRequest(request);
}
private Container unAssignContainer(Object task, boolean releaseIfFound) {
Container container = taskAllocations.remove(task);
if(container == null) {
return null;
}
containerAssigments.remove(container.getId());
if(releaseIfFound) {
releaseContainer(container.getId(), task);
}
return container;
}
private Object unAssignContainer(ContainerId containerId,
boolean releaseIfFound) {
Object task = containerAssigments.remove(containerId);
if(task == null) {
return null;
}
taskAllocations.remove(task);
if(releaseIfFound) {
releaseContainer(containerId, task);
}
return task;
}
}