blob: e922fc636424d468b954fa7378ddb35da6265777 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.myriad.scheduler.fgs;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.mesos.Protos;
import org.apache.myriad.configuration.NodeManagerConfiguration;
import org.apache.myriad.executor.ContainerTaskStatusRequest;
import org.apache.myriad.scheduler.MyriadDriver;
import org.apache.myriad.scheduler.ResourceUtils;
import org.apache.myriad.scheduler.SchedulerUtils;
import org.apache.myriad.scheduler.TaskUtils;
import org.apache.myriad.scheduler.yarn.interceptor.BaseInterceptor;
import org.apache.myriad.scheduler.yarn.interceptor.InterceptorRegistry;
import org.apache.myriad.state.SchedulerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Manages the capacity exposed by NodeManager. It uses the offers available
* from Mesos to inflate the node capacity and lets ResourceManager make the
* scheduling decision. After the scheduling decision is done, there are 2 cases:
* <p/>
* 1. If ResourceManager did not use the expanded capacity, then the node's
* capacity is reverted back to original value and the offer is declined.
* 2. If ResourceManager ended up using the expanded capacity, then the node's
* capacity is updated accordingly and any unused capacity is returned back to
* Mesos.
*/
public class YarnNodeCapacityManager extends BaseInterceptor {
private static final Logger LOGGER = LoggerFactory.getLogger(YarnNodeCapacityManager.class);
private final AbstractYarnScheduler yarnScheduler;
private final RMContext rmContext;
private final MyriadDriver myriadDriver;
private final OfferLifecycleManager offerLifecycleMgr;
private final NodeStore nodeStore;
private final SchedulerState state;
private static final Lock yarnSchedulerLock = new ReentrantLock();
private static final Resource ZERO_RESOURCE = Resource.newInstance(0, 0);
private TaskUtils taskUtils;
@Inject
public YarnNodeCapacityManager(InterceptorRegistry registry, AbstractYarnScheduler yarnScheduler, RMContext rmContext,
MyriadDriver myriadDriver, OfferLifecycleManager offerLifecycleMgr, NodeStore nodeStore,
SchedulerState state, TaskUtils taskUtils) {
if (registry != null) {
registry.register(this);
}
this.yarnScheduler = yarnScheduler;
this.rmContext = rmContext;
this.myriadDriver = myriadDriver;
this.offerLifecycleMgr = offerLifecycleMgr;
this.nodeStore = nodeStore;
this.state = state;
this.taskUtils = taskUtils;
}
@Override
public CallBackFilter getCallBackFilter() {
return new CallBackFilter() {
@Override
public boolean allowCallBacksForNode(NodeId nodeManager) {
return SchedulerUtils.isEligibleForFineGrainedScaling(nodeManager.getHost(), state);
}
};
}
private Protos.TaskID containerToTaskId(RMContainer container) {
return Protos.TaskID.newBuilder().setValue("yarn_" + container.getContainerId()).build();
}
@Override
public void beforeReleaseContainers(List<ContainerId> containerIds, SchedulerApplicationAttempt attempt) {
//NOOP beforeCompletedContainer does this
}
@Override
public void beforeCompletedContainer(RMContainer rmContainer, ContainerStatus containerStatus, RMContainerEventType type) {
if (type.equals(RMContainerEventType.KILL) || type.equals(RMContainerEventType.RELEASED)) {
LOGGER.info("{} completed with exit status {}, killing cooresponding mesos task.", rmContainer.getContainerId().toString(), type);
removeYarnTask(rmContainer);
}
}
private void removeYarnTask(RMContainer rmContainer) {
if (rmContainer != null && rmContainer.getContainer() != null) {
Protos.TaskID taskId = containerToTaskId(rmContainer);
//TODO (darinj) Reliable messaging
state.makeTaskKillable(taskId);
myriadDriver.kill(taskId);
String hostname = rmContainer.getContainer().getNodeId().getHost();
Node node = nodeStore.getNode(hostname);
if (node != null) {
RMNode rmNode = node.getNode().getRMNode();
Resource resource = rmContainer.getContainer().getResource();
decrementNodeCapacity(rmNode, resource);
LOGGER.info("Removed task yarn_{} with exit status freeing {} cpu and {} mem.", rmContainer.getContainer().toString(),
rmContainer.getContainerExitStatus(), resource.getVirtualCores(), resource.getMemory());
} else {
LOGGER.warn(hostname + " not found");
}
}
}
@Override
public void afterSchedulerEventHandled(SchedulerEvent event) {
switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeAddedSchedulerEvent.class.getName());
return;
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent) event;
NodeId nodeId = nodeAddedEvent.getAddedRMNode().getNodeID();
String host = nodeId.getHost();
SchedulerNode node = yarnScheduler.getSchedulerNode(nodeId);
nodeStore.add(node);
LOGGER.info("afterSchedulerEventHandled: NM registration from node {}", host);
break;
case NODE_UPDATE:
if (!(event instanceof NodeUpdateSchedulerEvent)) {
LOGGER.error("{} not an instance of {}", event.getClass().getName(), NodeUpdateSchedulerEvent.class.getName());
return;
}
RMNode rmNode = ((NodeUpdateSchedulerEvent) event).getRMNode();
handleContainerAllocation(rmNode);
break;
default:
break;
}
}
/**
* Checks if any containers were allocated in the current scheduler run and
* launches the corresponding Mesos tasks. It also udpates the node
* capacity depending on what portion of the consumed offers were actually
* used.
*/
@VisibleForTesting
protected void handleContainerAllocation(RMNode rmNode) {
String host = rmNode.getNodeID().getHost();
ConsumedOffer consumedOffer = offerLifecycleMgr.drainConsumedOffer(host);
if (consumedOffer == null) {
LOGGER.debug("No offer consumed for {}", host);
return;
}
Node node = nodeStore.getNode(host);
Set<RMContainer> containersBeforeSched = node.getContainerSnapshot();
Set<RMContainer> containersAfterSched = new HashSet<>(node.getNode().getRunningContainers());
Set<RMContainer> containersAllocatedByMesosOffer = (containersBeforeSched == null) ? containersAfterSched : Sets.difference(
containersAfterSched, containersBeforeSched);
if (containersAllocatedByMesosOffer.isEmpty()) {
LOGGER.debug("No containers allocated using Mesos offers for host: {}", host);
for (Protos.Offer offer : consumedOffer.getOffers()) {
offerLifecycleMgr.declineOffer(offer);
}
decrementNodeCapacity(rmNode, OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers()));
} else {
LOGGER.debug("Containers allocated using Mesos offers for host: {} count: {}", host, containersAllocatedByMesosOffer.size());
// Identify the Mesos tasks that need to be launched
List<Protos.TaskInfo> tasks = Lists.newArrayList();
Resource resUsed = Resource.newInstance(0, 0);
for (RMContainer newContainer : containersAllocatedByMesosOffer) {
tasks.add(getTaskInfoForContainer(newContainer, consumedOffer, node));
resUsed = Resources.add(resUsed, newContainer.getAllocatedResource());
}
// Reduce node capacity to account for unused offers
Resource resOffered = OfferUtils.getYarnResourcesFromMesosOffers(consumedOffer.getOffers());
Resource resUnused = Resources.subtract(resOffered, resUsed);
decrementNodeCapacity(rmNode, resUnused);
myriadDriver.getDriver().launchTasks(consumedOffer.getOfferIds(), tasks);
}
// No need to hold on to the snapshot anymore
node.removeContainerSnapshot();
}
public void incrementNodeCapacity(RMNode rmNode, Resource addedCapacity) {
setNodeCapacity(rmNode, Resources.add(rmNode.getTotalCapability(), addedCapacity));
}
public void decrementNodeCapacity(RMNode rmNode, Resource removedCapacity) {
setNodeCapacity(rmNode, Resources.subtract(rmNode.getTotalCapability(), removedCapacity));
}
/**
* 1. Updates {@link RMNode#getTotalCapability()} with newCapacity.
* 2. Sends out a {@link NodeResourceUpdateSchedulerEvent} that's handled by YARN's scheduler.
* The scheduler updates the corresponding {@link SchedulerNode} with the newCapacity.
*
* @param rmNode
* @param newCapacity
*/
@SuppressWarnings("unchecked")
public void setNodeCapacity(RMNode rmNode, Resource newCapacity) {
//NOOP prevent YARN warning changing to same size
if ((Resources.equals(rmNode.getTotalCapability(), newCapacity))) {
return;
}
if (yarnScheduler.getSchedulerNode(rmNode.getNodeID()) == null) {
LOGGER.info("Yarn Scheduler doesn't have node {}, probably UNHEALTHY", rmNode.getNodeID());
return;
}
yarnSchedulerLock.lock();
try {
if (newCapacity.getMemory() < 0 || newCapacity.getVirtualCores() < 0) {
Resource zeroed = ResourceUtils.componentwiseMax(ZERO_RESOURCE, newCapacity);
rmNode.getTotalCapability().setMemory(zeroed.getMemory());
rmNode.getTotalCapability().setVirtualCores(zeroed.getVirtualCores());
LOGGER.warn("Asked to set Node {} to a value less than zero! Had {}, setting to {}.",
rmNode.getHttpAddress(), rmNode.getTotalCapability().toString(), zeroed.toString());
} else {
rmNode.getTotalCapability().setMemory(newCapacity.getMemory());
rmNode.getTotalCapability().setVirtualCores(newCapacity.getVirtualCores());
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Setting capacity for node {} to {}", rmNode.getHostName(), newCapacity);
}
}
// updates the scheduler with the new capacity for the NM.
// the event is handled by the scheduler asynchronously
rmContext.getDispatcher().getEventHandler().handle(new NodeResourceUpdateSchedulerEvent(rmNode, ResourceOption.newInstance(
rmNode.getTotalCapability(), RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT)));
} finally {
yarnSchedulerLock.unlock();
}
}
private Protos.TaskInfo getTaskInfoForContainer(RMContainer rmContainer, ConsumedOffer consumedOffer, Node node) {
Protos.Offer offer = consumedOffer.getOffers().get(0);
Container container = rmContainer.getContainer();
Protos.TaskID taskId = Protos.TaskID.newBuilder().setValue(
ContainerTaskStatusRequest.YARN_CONTAINER_TASK_ID_PREFIX + container.getId().toString()).build();
// TODO (sdaingade) Remove ExecutorInfo from the Node object
// as this is now cached in the NodeTask object in scheduler state.
Protos.ExecutorInfo executorInfo = node.getExecInfo();
if (executorInfo == null) {
executorInfo = Protos.ExecutorInfo.newBuilder(state.getNodeTask(offer.getSlaveId(), NodeManagerConfiguration.NM_TASK_PREFIX)
.getExecutorInfo()).setFrameworkId(offer.getFrameworkId()).build();
node.setExecInfo(executorInfo);
}
return Protos.TaskInfo.newBuilder()
.setName("task_" + taskId.getValue()).setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addAllResources(taskUtils.getScalarResource(offer, "cpus", (double) container.getResource().getVirtualCores(), 0.0))
.addAllResources(taskUtils.getScalarResource(offer, "mem", (double) container.getResource().getMemory(), 0.0))
.setExecutor(executorInfo)
.build();
}
}