blob: 024bac1bb713efafa1170da232d9f56a33062a44 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AMConstants;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
/**
* Keeps the data structures to send container requests to RM.
*/
public abstract class RMContainerRequestor extends RMCommunicator {
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
static final String ANY = "*";
private int lastResponseID;
private Resource availableResources;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//Key -> Priority
//Value -> Map
//Key->ResourceName (e.g., hostname, rackname, *)
//Value->Map
//Key->Resource Capability
//Value->ResourceReqeust
private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
remoteRequestsTable =
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
private final Set<ContainerId> release = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
private int maxTaskFailuresPerNode;
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
private final Set<String> blacklistedNodes = new HashSet<String>();
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
}
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability;
final String[] hosts;
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this.attemptID = event.getAttemptID();
this.capability = event.getCapability();
this.hosts = event.getHosts();
this.racks = event.getRacks();
//this.earlierAttemptFailed = event.getEarlierAttemptFailed();
this.priority = priority;
}
}
@Override
public void init(Configuration conf) {
super.init(conf);
nodeBlacklistingEnabled =
conf.getBoolean(AMConstants.NODE_BLACKLISTING_ENABLE, true);
LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
maxTaskFailuresPerNode =
conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
}
protected abstract void heartbeat() throws Exception;
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
AllocateRequest allocateRequest = recordFactory
.newRecordInstance(AllocateRequest.class);
allocateRequest.setApplicationAttemptId(applicationAttemptId);
allocateRequest.setResponseId(lastResponseID);
allocateRequest.addAllAsks(new ArrayList<ResourceRequest>(ask));
allocateRequest.addAllReleases(new ArrayList<ContainerId>(release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ response.getNewContainerCount() + " finishedContainers="
+ response.getFinishedContainerCount()
+ " resourcelimit=" + availableResources);
ask.clear();
release.clear();
return response;
}
protected void containerFailedOnHost(String hostName) {
if (!nodeBlacklistingEnabled) {
return;
}
if (blacklistedNodes.contains(hostName)) {
LOG.info("Host " + hostName + " is already blacklisted.");
return; //already blacklisted
}
Integer failures = nodeFailures.remove(hostName);
failures = failures == null ? 0 : failures;
failures++;
LOG.info(failures + " failures on node " + hostName);
if (failures >= maxTaskFailuresPerNode) {
blacklistedNodes.add(hostName);
LOG.info("Blacklisted host " + hostName);
//remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){
//remove from host
Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName);
if (reqMap != null) {
for (ResourceRequest req : reqMap.values()) {
ask.remove(req);
}
}
//TODO: remove from rack
}
} else {
nodeFailures.put(hostName, failures);
}
}
protected Resource getAvailableResources() {
return availableResources;
}
protected void addContainerReq(ContainerRequest req) {
// Create resource requests
for (String host : req.hosts) {
// Data-local
addResourceRequest(req.priority, host, req.capability);
}
// Nothing Rack-local for now
for (String rack : req.racks) {
addResourceRequest(req.priority, rack, req.capability);
}
// Off-switch
addResourceRequest(req.priority, ANY, req.capability);
}
protected void decContainerReq(ContainerRequest req) {
// Update resource requests
for (String hostName : req.hosts) {
decResourceRequest(req.priority, hostName, req.capability);
}
for (String rack : req.racks) {
decResourceRequest(req.priority, rack, req.capability);
}
decResourceRequest(req.priority, ANY, req.capability);
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
this.remoteRequestsTable.put(priority, remoteRequests);
LOG.info("Added priority=" + priority);
}
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
reqMap = new HashMap<Resource, ResourceRequest>();
remoteRequests.put(resourceName, reqMap);
}
ResourceRequest remoteRequest = reqMap.get(capability);
if (remoteRequest == null) {
remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
remoteRequest.setPriority(priority);
remoteRequest.setHostName(resourceName);
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
// Note this down for next interaction with ResourceManager
ask.add(remoteRequest);
LOG.info("addResourceRequest:" + " applicationId=" + applicationId.getId()
+ " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
}
private void decResourceRequest(Priority priority, String resourceName,
Resource capability) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
ResourceRequest remoteRequest = reqMap.get(capability);
LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
+ " priority=" + priority.getPriority() + " resourceName=" + resourceName
+ " numContainers=" + remoteRequest.getNumContainers() + " #asks="
+ ask.size());
remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
if (remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability);
if (reqMap.size() == 0) {
remoteRequests.remove(resourceName);
}
if (remoteRequests.size() == 0) {
remoteRequestsTable.remove(priority);
}
//remove from ask if it may have
ask.remove(remoteRequest);
} else {
ask.add(remoteRequest);//this will override the request if ask doesn't
//already have it.
}
LOG.info("AFTER decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
protected void release(ContainerId containerId) {
release.add(containerId);
}
}