blob: f0e80e9ccdc744c307f0fdc1a62e80bb43f1b776 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p/>
* <p/>
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
package org.apache.myriad.scheduler.event.handlers;
import com.lmax.disruptor.EventHandler;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.inject.Inject;
import org.apache.commons.collections.CollectionUtils;
import org.apache.mesos.Protos;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.Value;
import org.apache.mesos.SchedulerDriver;
import org.apache.myriad.scheduler.SchedulerUtils;
import org.apache.myriad.scheduler.ServiceResourceProfile;
import org.apache.myriad.scheduler.TaskConstraints;
import org.apache.myriad.scheduler.TaskConstraintsManager;
import org.apache.myriad.scheduler.TaskFactory;
import org.apache.myriad.scheduler.TaskUtils;
import org.apache.myriad.scheduler.constraints.Constraint;
import org.apache.myriad.scheduler.constraints.LikeConstraint;
import org.apache.myriad.scheduler.event.ResourceOffersEvent;
import org.apache.myriad.scheduler.fgs.OfferLifecycleManager;
import org.apache.myriad.state.NodeTask;
import org.apache.myriad.state.SchedulerState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* handles and logs resource offers events
public class ResourceOffersEventHandler implements EventHandler<ResourceOffersEvent> {
private static final Logger LOGGER = LoggerFactory.getLogger(ResourceOffersEventHandler.class);
private static final Lock driverOperationLock = new ReentrantLock();
private static final String RESOURCES_CPU_KEY = "cpus";
private static final String RESOURCES_MEM_KEY = "mem";
private static final String RESOURCES_PORTS_KEY = "ports";
private static final String RESOURCES_DISK_KEY = "disk";
private SchedulerState schedulerState;
private TaskUtils taskUtils;
private Map<String, TaskFactory> taskFactoryMap;
private OfferLifecycleManager offerLifecycleMgr;
private TaskConstraintsManager taskConstraintsManager;
public void onEvent(ResourceOffersEvent event, long sequence, boolean endOfBatch) throws Exception {
SchedulerDriver driver = event.getDriver();
List<Offer> offers = event.getOffers();
// Sometimes, we see that mesos sends resource offers before Myriad receives
// a notification for "framework registration". This is a simple defensive code
// to not process any offers unless Myriad receives a "framework registered" notification.
if (schedulerState.getFrameworkID() == null) {
LOGGER.warn("Received {} offers, but declining them since Framework ID is not yet set", offers.size());
for (Offer offer : offers) {
LOGGER.debug("Received offers {}", offers.size());
LOGGER.debug("Pending tasks: {}", this.schedulerState.getPendingTaskIds());
try {
for (Iterator<Offer> iterator = offers.iterator(); iterator.hasNext(); ) {
Offer offer =;
Set<NodeTask> nodeTasks = schedulerState.getNodeTasks(offer.getSlaveId());
for (NodeTask nodeTask : nodeTasks) {
// keep this in case SchedulerState gets out of sync. This should not happen with
// synchronizing addNodes method in SchedulerState
// but to keep it safe
final Set<Protos.TaskID> missingTasks = Sets.newHashSet();
Set<Protos.TaskID> pendingTasks = schedulerState.getPendingTaskIds();
if (CollectionUtils.isNotEmpty(pendingTasks)) {
for (Protos.TaskID pendingTaskId : pendingTasks) {
NodeTask taskToLaunch = schedulerState.getTask(pendingTaskId);
if (taskToLaunch == null) {
LOGGER.warn("Node task for TaskID: {} does not exist", pendingTaskId);
String taskPrefix = taskToLaunch.getTaskPrefix();
ServiceResourceProfile profile = taskToLaunch.getProfile();
Constraint constraint = taskToLaunch.getConstraint();
Set<NodeTask> launchedTasks = new HashSet<>();
if (matches(offer, taskToLaunch, constraint) && SchedulerUtils.isUniqueHostname(offer, taskToLaunch, launchedTasks)) {
try {
final TaskInfo task = taskFactoryMap.get(taskPrefix).createTask(offer, schedulerState.getFrameworkID().get(),
pendingTaskId, taskToLaunch);
List<OfferID> offerIds = new ArrayList<>();
List<TaskInfo> tasks = new ArrayList<>();
tasks.add(task);"Launching task: {} using offer: {}", task.getTaskId().getValue(), offer.getId());
LOGGER.debug("Launching task: {} with profile: {} using offer: {}", task, profile, offer);
driver.launchTasks(offerIds, tasks);
// For every NM Task that we launch, we currently
// need to backup the ExecutorInfo for that NM Task in the State Store.
// Without this, we will not be able to launch tasks corresponding to yarn
// containers. This is specially important in case the RM restarts.
schedulerState.addTask(pendingTaskId, taskToLaunch);
iterator.remove(); // remove the used offer from offers list
} catch (Throwable t) {
LOGGER.error("Exception thrown while trying to create a task for {}", taskPrefix, t);
for (Protos.TaskID taskId : missingTasks) {
for (Offer offer : offers) {
if (SchedulerUtils.isEligibleForFineGrainedScaling(offer.getHostname(), schedulerState)) {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Picking an offer from slave with hostname {} for fine grained scaling.", offer.getHostname());
} else {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Declining offer {} from slave {}.", offer, offer.getHostname());
} finally {
private boolean matches(Offer offer, NodeTask taskToLaunch, Constraint constraint) {
if (!meetsConstraint(offer, constraint)) {
return false;
Map<String, Object> results = new HashMap<String, Object>(5);
//Assign default values to avoid NPE
results.put(RESOURCES_CPU_KEY, Double.valueOf(0.0));
results.put(RESOURCES_MEM_KEY, Double.valueOf(0.0));
results.put(RESOURCES_DISK_KEY, Double.valueOf(0.0));
results.put(RESOURCES_PORTS_KEY, Integer.valueOf(0));
for (Resource resource : offer.getResourcesList()) {
if (resourceEvaluators.containsKey(resource.getName())) {
resourceEvaluators.get(resource.getName()).eval(resource, results);
} else {
LOGGER.warn("Ignoring unknown resource type: {}", resource.getName());
double cpus = (Double) results.get(RESOURCES_CPU_KEY);
double mem = (Double) results.get(RESOURCES_MEM_KEY);
int ports = (Integer) results.get(RESOURCES_PORTS_KEY);
checkResource(cpus <= 0, RESOURCES_CPU_KEY);
checkResource(mem <= 0, RESOURCES_MEM_KEY);
checkResource(ports <= 0, RESOURCES_PORTS_KEY);
return checkAggregates(offer, taskToLaunch, ports, cpus, mem);
private boolean checkAggregates(Offer offer, NodeTask taskToLaunch, int ports, double cpus, double mem) {
final ServiceResourceProfile profile = taskToLaunch.getProfile();
final String taskPrefix = taskToLaunch.getTaskPrefix();
final double aggrCpu = profile.getAggregateCpu() + profile.getExecutorCpu();
final double aggrMem = profile.getAggregateMemory() + profile.getExecutorMemory();
final TaskConstraints taskConstraints = taskConstraintsManager.getConstraints(taskPrefix);
if (aggrCpu <= cpus && aggrMem <= mem && taskConstraints.portsCount() <= ports) {
return true;
} else {
LOGGER.debug("Offer insufficient for task with, cpu: {}, memory: {}, ports: {}", aggrCpu, aggrMem, ports);
return false;
private boolean meetsConstraint(Offer offer, Constraint constraint) {
if (constraint != null) {
switch (constraint.getType()) {
case LIKE: {
LikeConstraint likeConstraint = (LikeConstraint) constraint;
if (likeConstraint.isConstraintOnHostName()) {
return likeConstraint.matchesHostName(offer.getHostname());
} else {
return likeConstraint.matchesSlaveAttributes(offer.getAttributesList());
return false;
return true;
private void checkResource(boolean fail, String resource) {
if (fail) {
LOGGER.debug("No " + resource + " resources present");
private static Double scalarToDouble(Resource resource, String id) {
Double value = new Double(0.0);
if (resource.getType().equals(Value.Type.SCALAR)) {
value = new Double(resource.getScalar().getValue());
} else {
LOGGER.error(id + " resource was not a scalar: {}", resource.getType().toString());
return value;
private interface EvalResources {
public void eval(Resource resource, Map<String, Object> results);
private static Map<String, EvalResources> resourceEvaluators;
static {
resourceEvaluators = new HashMap<String, EvalResources>(4);
resourceEvaluators.put(RESOURCES_CPU_KEY, new EvalResources() {
public void eval(Resource resource, Map<String, Object> results) {
results.put(RESOURCES_CPU_KEY, (Double) results.get(RESOURCES_CPU_KEY) + scalarToDouble(resource, RESOURCES_CPU_KEY));
resourceEvaluators.put(RESOURCES_MEM_KEY, new EvalResources() {
public void eval(Resource resource, Map<String, Object> results) {
results.put(RESOURCES_MEM_KEY, (Double) results.get(RESOURCES_MEM_KEY) + scalarToDouble(resource, RESOURCES_MEM_KEY));
resourceEvaluators.put(RESOURCES_DISK_KEY, new EvalResources() {
public void eval(Resource resource, Map<String, Object> results) {
resourceEvaluators.put(RESOURCES_PORTS_KEY, new EvalResources() {
public void eval(Resource resource, Map<String, Object> results) {
int ports = 0;
if (resource.getType().equals(Value.Type.RANGES)) {
Value.Ranges ranges = resource.getRanges();
for (Value.Range range : ranges.getRangeList()) {
if (range.getBegin() < range.getEnd()) {
ports += range.getEnd() - range.getBegin() + 1;
} else {
LOGGER.error("ports resource was not Ranges: {}", resource.getType().toString());
results.put(RESOURCES_PORTS_KEY, (Integer) results.get(RESOURCES_PORTS_KEY) + Integer.valueOf(ports));