blob: a9136d65cdea34ca6c6d75680b29b7c3862b3407 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceContext;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceProcessor;
import org.apache.hadoop.yarn.ams.ApplicationMasterServiceUtils;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords
.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.api.DistributedSchedulingAMProtocol;
import org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.DistributedSchedulingAllocateResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterDistributedSchedulingAMResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.YarnRPC;
import org.apache.hadoop.yarn.proto.ApplicationMasterProtocol.ApplicationMasterProtocolService;
import org.apache.hadoop.yarn.server.api.protocolrecords.RemoteNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.NodeQueueLoadMonitor;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.distributed.QueueLimitCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerAllocator;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
/**
* The OpportunisticContainerAllocatorAMService is started instead of the
* ApplicationMasterService if opportunistic scheduling is enabled for the YARN
* cluster (either centralized or distributed opportunistic scheduling).
*
* It extends the functionality of the ApplicationMasterService by servicing
* clients (AMs and AMRMProxy request interceptors) that understand the
* DistributedSchedulingProtocol.
*/
public class OpportunisticContainerAllocatorAMService
extends ApplicationMasterService implements DistributedSchedulingAMProtocol,
EventHandler<SchedulerEvent> {
private static final Log LOG =
LogFactory.getLog(OpportunisticContainerAllocatorAMService.class);
private final NodeQueueLoadMonitor nodeMonitor;
private final OpportunisticContainerAllocator oppContainerAllocator;
private final int k;
private final long cacheRefreshInterval;
private volatile List<RemoteNode> cachedNodes;
private volatile long lastCacheUpdateTime;
class OpportunisticAMSProcessor implements
ApplicationMasterServiceProcessor {
private ApplicationMasterServiceContext context;
private ApplicationMasterServiceProcessor nextProcessor;
private YarnScheduler getScheduler() {
return ((RMContext)context).getScheduler();
}
@Override
public void init(ApplicationMasterServiceContext amsContext,
ApplicationMasterServiceProcessor next) {
this.context = amsContext;
// The AMSProcessingChain guarantees that 'next' is not null.
this.nextProcessor = next;
}
@Override
public void registerApplicationMaster(
ApplicationAttemptId applicationAttemptId,
RegisterApplicationMasterRequest request,
RegisterApplicationMasterResponse response)
throws IOException, YarnException {
final SchedulerApplicationAttempt appAttempt = ((AbstractYarnScheduler)
getScheduler()).getApplicationAttempt(applicationAttemptId);
if (appAttempt.getOpportunisticContainerContext() == null) {
OpportunisticContainerContext opCtx =
new OpportunisticContainerContext();
opCtx.setContainerIdGenerator(new OpportunisticContainerAllocator
.ContainerIdGenerator() {
@Override
public long generateContainerId() {
return appAttempt.getAppSchedulingInfo().getNewContainerId();
}
});
int tokenExpiryInterval = getConfig()
.getInt(YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
YarnConfiguration.
DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS);
opCtx.updateAllocationParams(
getScheduler().getMinimumResourceCapability(),
getScheduler().getMaximumResourceCapability(),
getScheduler().getMinimumResourceCapability(),
tokenExpiryInterval);
appAttempt.setOpportunisticContainerContext(opCtx);
}
nextProcessor.registerApplicationMaster(
applicationAttemptId, request, response);
}
@Override
public void allocate(ApplicationAttemptId appAttemptId,
AllocateRequest request, AllocateResponse response)
throws YarnException {
// Partition requests to GUARANTEED and OPPORTUNISTIC.
OpportunisticContainerAllocator.PartitionedResourceRequests
partitionedAsks =
oppContainerAllocator.partitionAskList(request.getAskList());
// Allocate OPPORTUNISTIC containers.
SchedulerApplicationAttempt appAttempt =
((AbstractYarnScheduler)rmContext.getScheduler())
.getApplicationAttempt(appAttemptId);
OpportunisticContainerContext oppCtx =
appAttempt.getOpportunisticContainerContext();
oppCtx.updateNodeList(getLeastLoadedNodes());
List<Container> oppContainers =
oppContainerAllocator.allocateContainers(
request.getResourceBlacklistRequest(),
partitionedAsks.getOpportunistic(), appAttemptId, oppCtx,
ResourceManager.getClusterTimeStamp(), appAttempt.getUser());
// Create RMContainers and update the NMTokens.
if (!oppContainers.isEmpty()) {
handleNewContainers(oppContainers, false);
appAttempt.updateNMTokens(oppContainers);
ApplicationMasterServiceUtils.addToAllocatedContainers(
response, oppContainers);
}
// Allocate GUARANTEED containers.
request.setAskList(partitionedAsks.getGuaranteed());
nextProcessor.allocate(appAttemptId, request, response);
}
@Override
public void finishApplicationMaster(
ApplicationAttemptId applicationAttemptId,
FinishApplicationMasterRequest request,
FinishApplicationMasterResponse response) {
nextProcessor.finishApplicationMaster(applicationAttemptId,
request, response);
}
}
public OpportunisticContainerAllocatorAMService(RMContext rmContext,
YarnScheduler scheduler) {
super(OpportunisticContainerAllocatorAMService.class.getName(),
rmContext, scheduler);
this.oppContainerAllocator = new OpportunisticContainerAllocator(
rmContext.getContainerTokenSecretManager());
this.k = rmContext.getYarnConfiguration().getInt(
YarnConfiguration.OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED,
YarnConfiguration.DEFAULT_OPP_CONTAINER_ALLOCATION_NODES_NUMBER_USED);
long nodeSortInterval = rmContext.getYarnConfiguration().getLong(
YarnConfiguration.NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_SORTING_NODES_INTERVAL_MS);
this.cacheRefreshInterval = nodeSortInterval;
this.lastCacheUpdateTime = System.currentTimeMillis();
NodeQueueLoadMonitor.LoadComparator comparator =
NodeQueueLoadMonitor.LoadComparator.valueOf(
rmContext.getYarnConfiguration().get(
YarnConfiguration.NM_CONTAINER_QUEUING_LOAD_COMPARATOR,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_LOAD_COMPARATOR));
NodeQueueLoadMonitor topKSelector =
new NodeQueueLoadMonitor(nodeSortInterval, comparator);
float sigma = rmContext.getYarnConfiguration()
.getFloat(YarnConfiguration.NM_CONTAINER_QUEUING_LIMIT_STDEV,
YarnConfiguration.DEFAULT_NM_CONTAINER_QUEUING_LIMIT_STDEV);
int limitMin, limitMax;
if (comparator == NodeQueueLoadMonitor.LoadComparator.QUEUE_LENGTH) {
limitMin = rmContext.getYarnConfiguration()
.getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_LENGTH);
limitMax = rmContext.getYarnConfiguration()
.getInt(YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_LENGTH);
} else {
limitMin = rmContext.getYarnConfiguration()
.getInt(
YarnConfiguration.NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_MIN_QUEUE_WAIT_TIME_MS);
limitMax = rmContext.getYarnConfiguration()
.getInt(
YarnConfiguration.NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS,
YarnConfiguration.
DEFAULT_NM_CONTAINER_QUEUING_MAX_QUEUE_WAIT_TIME_MS);
}
topKSelector.initThresholdCalculator(sigma, limitMin, limitMax);
this.nodeMonitor = topKSelector;
}
@Override
public Server getServer(YarnRPC rpc, Configuration serverConf,
InetSocketAddress addr, AMRMTokenSecretManager secretManager) {
if (YarnConfiguration.isDistSchedulingEnabled(serverConf)) {
Server server = rpc.getServer(DistributedSchedulingAMProtocol.class, this,
addr, serverConf, secretManager,
serverConf.getInt(YarnConfiguration.RM_SCHEDULER_CLIENT_THREAD_COUNT,
YarnConfiguration.DEFAULT_RM_SCHEDULER_CLIENT_THREAD_COUNT));
// To support application running on NMs that DO NOT support
// Dist Scheduling... The server multiplexes both the
// ApplicationMasterProtocol as well as the DistributedSchedulingProtocol
((RPC.Server) server).addProtocol(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
ApplicationMasterProtocolPB.class,
ApplicationMasterProtocolService.newReflectiveBlockingService(
new ApplicationMasterProtocolPBServiceImpl(this)));
return server;
}
return super.getServer(rpc, serverConf, addr, secretManager);
}
@Override
protected List<ApplicationMasterServiceProcessor> getProcessorList(
Configuration conf) {
List<ApplicationMasterServiceProcessor> retVal =
super.getProcessorList(conf);
retVal.add(new OpportunisticAMSProcessor());
return retVal;
}
@Override
public RegisterDistributedSchedulingAMResponse
registerApplicationMasterForDistributedScheduling(
RegisterApplicationMasterRequest request) throws YarnException,
IOException {
RegisterApplicationMasterResponse response =
registerApplicationMaster(request);
RegisterDistributedSchedulingAMResponse dsResp = recordFactory
.newRecordInstance(RegisterDistributedSchedulingAMResponse.class);
dsResp.setRegisterResponse(response);
dsResp.setMinContainerResource(
rmContext.getScheduler().getMinimumResourceCapability());
dsResp.setMaxContainerResource(
rmContext.getScheduler().getMaximumResourceCapability());
dsResp.setContainerTokenExpiryInterval(
getConfig().getInt(
YarnConfiguration.RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS));
dsResp.setContainerIdStart(
this.rmContext.getEpoch() << ResourceManager.EPOCH_BIT_SHIFT);
// Set nodes to be used for scheduling
dsResp.setNodesForScheduling(getLeastLoadedNodes());
return dsResp;
}
@Override
public DistributedSchedulingAllocateResponse allocateForDistributedScheduling(
DistributedSchedulingAllocateRequest request)
throws YarnException, IOException {
List<Container> distAllocContainers = request.getAllocatedContainers();
handleNewContainers(distAllocContainers, true);
AllocateResponse response = allocate(request.getAllocateRequest());
DistributedSchedulingAllocateResponse dsResp = recordFactory
.newRecordInstance(DistributedSchedulingAllocateResponse.class);
dsResp.setAllocateResponse(response);
dsResp.setNodesForScheduling(getLeastLoadedNodes());
return dsResp;
}
private void handleNewContainers(List<Container> allocContainers,
boolean isRemotelyAllocated) {
for (Container container : allocContainers) {
// Create RMContainer
RMContainer rmContainer =
SchedulerUtils.createOpportunisticRmContainer(
rmContext, container, isRemotelyAllocated);
rmContainer.handle(
new RMContainerEvent(container.getId(),
RMContainerEventType.ACQUIRED));
}
}
@Override
public void handle(SchedulerEvent event) {
switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
nodeMonitor.addNode(nodeAddedEvent.getContainerReports(),
nodeAddedEvent.getAddedRMNode());
break;
case NODE_REMOVED:
if (!(event instanceof NodeRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeRemovedSchedulerEvent nodeRemovedEvent =
(NodeRemovedSchedulerEvent) event;
nodeMonitor.removeNode(nodeRemovedEvent.getRemovedRMNode());
break;
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)
event;
nodeMonitor.updateNode(nodeUpdatedEvent.getRMNode());
break;
case NODE_RESOURCE_UPDATE:
if (!(event instanceof NodeResourceUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeResourceUpdateSchedulerEvent nodeResourceUpdatedEvent =
(NodeResourceUpdateSchedulerEvent) event;
nodeMonitor.updateNodeResource(nodeResourceUpdatedEvent.getRMNode(),
nodeResourceUpdatedEvent.getResourceOption());
break;
// <-- IGNORED EVENTS : START -->
case APP_ADDED:
break;
case APP_REMOVED:
break;
case APP_ATTEMPT_ADDED:
break;
case APP_ATTEMPT_REMOVED:
break;
case CONTAINER_EXPIRED:
break;
case NODE_LABELS_UPDATE:
break;
case RELEASE_CONTAINER:
break;
// <-- IGNORED EVENTS : END -->
default:
LOG.error("Unknown event arrived at" +
"OpportunisticContainerAllocatorAMService: " + event.toString());
}
}
public QueueLimitCalculator getNodeManagerQueueLimitCalculator() {
return nodeMonitor.getThresholdCalculator();
}
private synchronized List<RemoteNode> getLeastLoadedNodes() {
long currTime = System.currentTimeMillis();
if ((currTime - lastCacheUpdateTime > cacheRefreshInterval)
|| (cachedNodes == null)) {
cachedNodes = convertToRemoteNodes(
this.nodeMonitor.selectLeastLoadedNodes(this.k));
if (cachedNodes.size() > 0) {
lastCacheUpdateTime = currTime;
}
}
return cachedNodes;
}
private List<RemoteNode> convertToRemoteNodes(List<NodeId> nodeIds) {
ArrayList<RemoteNode> retNodes = new ArrayList<>();
for (NodeId nId : nodeIds) {
RemoteNode remoteNode = convertToRemoteNode(nId);
if (null != remoteNode) {
retNodes.add(remoteNode);
}
}
return retNodes;
}
private RemoteNode convertToRemoteNode(NodeId nodeId) {
SchedulerNode node =
((AbstractYarnScheduler) rmContext.getScheduler()).getNode(nodeId);
if (node != null) {
RemoteNode rNode = RemoteNode.newInstance(nodeId, node.getHttpAddress());
rNode.setRackName(node.getRackName());
return rNode;
}
return null;
}
private static ApplicationAttemptId getAppAttemptId() throws YarnException {
AMRMTokenIdentifier amrmTokenIdentifier =
YarnServerSecurityUtils.authorizeRequest();
ApplicationAttemptId applicationAttemptId =
amrmTokenIdentifier.getApplicationAttemptId();
return applicationAttemptId;
}
}