blob: ea3101a68d5335582ae318851266b9563bef3991 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.app.rm;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AMResponse;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.BuilderUtils;
/**
* Keeps the data structures to send container requests to RM.
*/
public abstract class RMContainerRequestor extends RMCommunicator {
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
static final String ANY = "*";
private int lastResponseID;
private Resource availableResources;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
//Key -> Priority
//Value -> Map
//Key->ResourceName (e.g., hostname, rackname, *)
//Value->Map
//Key->Resource Capability
//Value->ResourceRequest
private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
remoteRequestsTable =
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
private final Set<ContainerId> release = new TreeSet<ContainerId>();
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
private int blacklistedNodeCount = 0;
private int lastClusterNmCount = 0;
private int clusterNmCount = 0;
private int maxTaskFailuresPerNode;
private final Map<String, Integer> nodeFailures = new HashMap<String, Integer>();
private final Set<String> blacklistedNodes = Collections
.newSetFromMap(new ConcurrentHashMap<String, Boolean>());
public RMContainerRequestor(ClientService clientService, AppContext context) {
super(clientService, context);
}
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability;
final String[] hosts;
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority);
}
public ContainerRequest(TaskAttemptId attemptID,
Resource capability, String[] hosts, String[] racks,
Priority priority) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
}
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("AttemptId[").append(attemptID).append("]");
sb.append("Capability[").append(capability).append("]");
sb.append("Priority[").append(priority).append("]");
return sb.toString();
}
}
@Override
public void init(Configuration conf) {
super.init(conf);
nodeBlacklistingEnabled =
conf.getBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
LOG.info("nodeBlacklistingEnabled:" + nodeBlacklistingEnabled);
maxTaskFailuresPerNode =
conf.getInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 3);
blacklistDisablePercent =
conf.getInt(
MRJobConfig.MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERECENT,
MRJobConfig.DEFAULT_MR_AM_IGNORE_BLACKLISTING_BLACKLISTED_NODE_PERCENT);
LOG.info("maxTaskFailuresPerNode is " + maxTaskFailuresPerNode);
if (blacklistDisablePercent < -1 || blacklistDisablePercent > 100) {
throw new YarnException("Invalid blacklistDisablePercent: "
+ blacklistDisablePercent
+ ". Should be an integer between 0 and 100 or -1 to disabled");
}
LOG.info("blacklistDisablePercent is " + blacklistDisablePercent);
}
protected AMResponse makeRemoteRequest() throws YarnRemoteException {
AllocateRequest allocateRequest = BuilderUtils.newAllocateRequest(
applicationAttemptId, lastResponseID, super.getApplicationProgress(),
new ArrayList<ResourceRequest>(ask), new ArrayList<ContainerId>(
release));
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response = allocateResponse.getAMResponse();
lastResponseID = response.getResponseId();
availableResources = response.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ response.getAllocatedContainers().size() + " finishedContainers="
+ response.getCompletedContainersStatuses().size()
+ " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount);
}
ask.clear();
release.clear();
return response;
}
// May be incorrect if there's multiple NodeManagers running on a single host.
// knownNodeCount is based on node managers, not hosts. blacklisting is
// currently based on hosts.
protected void computeIgnoreBlacklisting() {
if (!nodeBlacklistingEnabled) {
return;
}
if (blacklistDisablePercent != -1
&& (blacklistedNodeCount != blacklistedNodes.size() ||
clusterNmCount != lastClusterNmCount)) {
blacklistedNodeCount = blacklistedNodes.size();
if (clusterNmCount == 0) {
LOG.info("KnownNode Count at 0. Not computing ignoreBlacklisting");
return;
}
int val = (int) ((float) blacklistedNodes.size() / clusterNmCount * 100);
if (val >= blacklistDisablePercent) {
if (ignoreBlacklisting.compareAndSet(false, true)) {
LOG.info("Ignore blacklisting set to true. Known: " + clusterNmCount
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
}
} else {
if (ignoreBlacklisting.compareAndSet(true, false)) {
LOG.info("Ignore blacklisting set to false. Known: " + clusterNmCount
+ ", Blacklisted: " + blacklistedNodeCount + ", " + val + "%");
}
}
}
}
protected void containerFailedOnHost(String hostName) {
if (!nodeBlacklistingEnabled) {
return;
}
if (blacklistedNodes.contains(hostName)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Host " + hostName + " is already blacklisted.");
}
return; //already blacklisted
}
Integer failures = nodeFailures.remove(hostName);
failures = failures == null ? 0 : failures;
failures++;
LOG.info(failures + " failures on node " + hostName);
if (failures >= maxTaskFailuresPerNode) {
blacklistedNodes.add(hostName);
//Even if blacklisting is ignored, continue to remove the host from
// the request table. The RM may have additional nodes it can allocate on.
LOG.info("Blacklisted host " + hostName);
//remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){
//remove from host if no pending allocations
boolean foundAll = true;
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName);
if (reqMap != null) {
for (ResourceRequest req : reqMap.values()) {
if (!ask.remove(req)) {
foundAll = false;
// if ask already sent to RM, we can try and overwrite it if possible.
// send a new ask to RM with numContainers
// specified for the blacklisted host to be 0.
ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
zeroedRequest.setNumContainers(0);
// to be sent to RM on next heartbeat
ask.add(zeroedRequest);
}
}
// if all requests were still in ask queue
// we can remove this request
if (foundAll) {
remoteRequests.remove(hostName);
}
}
// TODO handling of rack blacklisting
// Removing from rack should be dependent on no. of failures within the rack
// Blacklisting a rack on the basis of a single node's blacklisting
// may be overly aggressive.
// Node failures could be co-related with other failures on the same rack
// but we probably need a better approach at trying to decide how and when
// to blacklist a rack
}
} else {
nodeFailures.put(hostName, failures);
}
}
protected Resource getAvailableResources() {
return availableResources;
}
protected void addContainerReq(ContainerRequest req) {
// Create resource requests
for (String host : req.hosts) {
// Data-local
if (!isNodeBlacklisted(host)) {
addResourceRequest(req.priority, host, req.capability);
}
}
// Nothing Rack-local for now
for (String rack : req.racks) {
addResourceRequest(req.priority, rack, req.capability);
}
// Off-switch
addResourceRequest(req.priority, ANY, req.capability);
}
protected void decContainerReq(ContainerRequest req) {
// Update resource requests
for (String hostName : req.hosts) {
decResourceRequest(req.priority, hostName, req.capability);
}
for (String rack : req.racks) {
decResourceRequest(req.priority, rack, req.capability);
}
decResourceRequest(req.priority, ANY, req.capability);
}
private void addResourceRequest(Priority priority, String resourceName,
Resource capability) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
if (remoteRequests == null) {
remoteRequests = new HashMap<String, Map<Resource, ResourceRequest>>();
this.remoteRequestsTable.put(priority, remoteRequests);
if (LOG.isDebugEnabled()) {
LOG.debug("Added priority=" + priority);
}
}
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
reqMap = new HashMap<Resource, ResourceRequest>();
remoteRequests.put(resourceName, reqMap);
}
ResourceRequest remoteRequest = reqMap.get(capability);
if (remoteRequest == null) {
remoteRequest = recordFactory.newRecordInstance(ResourceRequest.class);
remoteRequest.setPriority(priority);
remoteRequest.setHostName(resourceName);
remoteRequest.setCapability(capability);
remoteRequest.setNumContainers(0);
reqMap.put(capability, remoteRequest);
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
// Note this down for next interaction with ResourceManager
ask.add(remoteRequest);
if (LOG.isDebugEnabled()) {
LOG.debug("addResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
}
private void decResourceRequest(Priority priority, String resourceName,
Resource capability) {
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
if (reqMap == null) {
// as we modify the resource requests by filtering out blacklisted hosts
// when they are added, this value may be null when being
// decremented
if (LOG.isDebugEnabled()) {
LOG.debug("Not decrementing resource as " + resourceName
+ " is not present in request table");
}
return;
}
ResourceRequest remoteRequest = reqMap.get(capability);
if (LOG.isDebugEnabled()) {
LOG.debug("BEFORE decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
if (remoteRequest.getNumContainers() == 0) {
reqMap.remove(capability);
if (reqMap.size() == 0) {
remoteRequests.remove(resourceName);
}
if (remoteRequests.size() == 0) {
remoteRequestsTable.remove(priority);
}
//remove from ask if it may have
ask.remove(remoteRequest);
} else {
ask.add(remoteRequest);//this will override the request if ask doesn't
//already have it.
}
if (LOG.isDebugEnabled()) {
LOG.info("AFTER decResourceRequest:" + " applicationId="
+ applicationId.getId() + " priority=" + priority.getPriority()
+ " resourceName=" + resourceName + " numContainers="
+ remoteRequest.getNumContainers() + " #asks=" + ask.size());
}
}
protected void release(ContainerId containerId) {
release.add(containerId);
}
protected boolean isNodeBlacklisted(String hostname) {
if (!nodeBlacklistingEnabled || ignoreBlacklisting.get()) {
return false;
}
return blacklistedNodes.contains(hostname);
}
protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
ArrayList<String> newHosts = new ArrayList<String>();
for (String host : orig.hosts) {
if (!isNodeBlacklisted(host)) {
newHosts.add(host);
}
}
String[] hosts = newHosts.toArray(new String[newHosts.size()]);
ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
hosts, orig.racks, orig.priority);
return newReq;
}
}