blob: de0a867161363c6fa5f2c075452a849db2f6684a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.resource.scheduler;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.NotImplementedException;
import org.apache.mesos.Protos.ExecutorID;
import org.apache.mesos.Protos.ExecutorInfo;
import org.apache.mesos.Protos.FrameworkID;
import org.apache.mesos.Protos.MasterInfo;
import org.apache.mesos.Protos.Offer;
import org.apache.mesos.Protos.OfferID;
import org.apache.mesos.Protos.Resource;
import org.apache.mesos.Protos.SlaveID;
import org.apache.mesos.Protos.Status;
import org.apache.mesos.Protos.TaskID;
import org.apache.mesos.Protos.TaskInfo;
import org.apache.mesos.Protos.TaskStatus;
import org.apache.mesos.Protos.Value;
import org.apache.mesos.Scheduler;
import org.apache.mesos.SchedulerDriver;
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
import org.apache.oodt.cas.resource.batchmgr.MesosBatchManager;
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.MesosFrameworkException;
import org.apache.oodt.cas.resource.structs.exceptions.MonitorException;
import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
import org.apache.oodt.cas.resource.util.MesosUtilities;
/**
* @author starchmd
* @version $Revision$
*
* A scheduler for part of the mesos frame work.
*/
public class ResourceMesosScheduler implements Scheduler, org.apache.oodt.cas.resource.scheduler.Scheduler {
SchedulerDriver driver;
MesosBatchManager batch;
ExecutorInfo executor;
JobQueue queue;
Monitor mon;
//Logger
private static final Logger LOG = Logger.getLogger(ResourceMesosScheduler.class.getName());
/**
* Construct the scheduler
* @param batch - batch manager (must be MesosBatchManager)
* @param executor - Mesos ExecutorInfo
* @param queue Job Queue used
* @param mon - monitor used.
*/
public ResourceMesosScheduler(MesosBatchManager batch,ExecutorInfo executor, JobQueue queue, Monitor mon) {
this.batch = batch;
this.executor = executor;
this.queue = queue;
this.mon = mon;
LOG.log(Level.INFO,"Creating the resource-mesos scheduler.");
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#disconnected(org.apache.mesos.SchedulerDriver)
*/
@Override
public void disconnected(SchedulerDriver schedDriver) {
//TODO: Pause scheduler until master comes back online.
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#error(org.apache.mesos.SchedulerDriver, java.lang.String)
*/
@Override
public void error(SchedulerDriver schedDriver, String error) {
LOG.log(Level.SEVERE,"Mesos issued an error: "+error);
//TODO: kill something here.
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#executorLost(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.ExecutorID, org.apache.mesos.Protos.SlaveID, int)
*/
@Override
public void executorLost(SchedulerDriver schedDriver, ExecutorID executor,SlaveID slave, int status) {
//Tasks will have a "task lost" message automatically q.e.d no action necessary.
//TODO: do we need to restart?
LOG.log(Level.SEVERE,"Mesos executor "+executor+" on slave "+slave+" died with status "+status);
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#frameworkMessage(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.ExecutorID, org.apache.mesos.Protos.SlaveID, byte[])
*/
@Override
public void frameworkMessage(SchedulerDriver schedDriver, ExecutorID executor,
SlaveID slave, byte[] bytes) {
try {
LOG.log(Level.INFO,"Mesos framework executor"+executor+" on slave "+slave+" issued message: "+
new String(bytes,"ascii"));
} catch (UnsupportedEncodingException e) {
LOG.log(Level.WARNING,"Mesos framework message missed due to bad encoding: ascii. "+e.getMessage());
}
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#offerRescinded(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.OfferID)
*/
@Override
public void offerRescinded(SchedulerDriver schedDriver, OfferID offer) {
//TODO: take away resources from batch manager...or stand in.
//Unneeded?
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#registered(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.FrameworkID, org.apache.mesos.Protos.MasterInfo)
*/
@Override
public void registered(SchedulerDriver schedDriver, FrameworkID framework,
MasterInfo masterInfo) {
LOG.log(Level.INFO,"Mesos framework registered: "+framework.getValue()+" with master: "+masterInfo.getId());
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#reregistered(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.MasterInfo)
*/
@Override
public void reregistered(SchedulerDriver schedDriver, MasterInfo masterInfo) {
LOG.log(Level.INFO,"Mesos framework re-registered with: "+masterInfo.getId());
//TODO: call start, we are registered.
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#resourceOffers(org.apache.mesos.SchedulerDriver, java.util.List)
*/
@Override
public void resourceOffers(SchedulerDriver driver, List<Offer> offers) {
LOG.log(Level.INFO,"Offered mesos resources: "+offers.size()+" offers.");
//Log, if possible the offers
if (LOG.isLoggable(Level.FINER)) {
for (Offer offer : offers) {
try {
this.mon.addNode(new ResourceNode(offer.getSlaveId().getValue(),new URL("http://"+offer.getHostname()),-1));
} catch (MalformedURLException e) {
LOG.log(Level.WARNING,"Cannot add node to monitor (bad url). Giving up: "+e.getMessage());
} catch ( MonitorException e) {
LOG.log(Level.WARNING,"Cannot add node to monitor (unkn). Giving up: "+e.getMessage());
}
LOG.log(Level.FINER,"Offer ("+offer.getId().getValue()+"): "+offer.getHostname()+ "(Slave: "+
offer.getSlaveId().getValue()+") "+MesosUtilities.getResourceMessage(offer.getResourcesList()));
}
}
List<JobSet> assignments = this.getJobAssignmentsJobs(offers);
List<OfferID> used = new LinkedList<OfferID>();
for (JobSet assignment : assignments) {
//Launch tasks requires lists
List<OfferID> ids = new LinkedList<OfferID>();
List<TaskInfo> tasks = new LinkedList<TaskInfo>();
tasks.add(assignment.task);
used.add(assignment.offer.getId());
ids.add(assignment.offer.getId());
//Register locally and launch on mesos
batch.registerExecutedJob(assignment.job.getJob().getId(), assignment.task.getTaskId());
Status status = driver.launchTasks(ids,tasks); //Assumed one to one mapping
if (status != Status.DRIVER_RUNNING)
throw new MesosFrameworkException("Driver stopped: "+status.toString());
}
for (Offer offer : offers) {
if (!used.contains(offer.getId())) {
LOG.log(Level.INFO,"Rejecting Offer: "+offer.getId().getValue());
driver.declineOffer(offer.getId());
}
}
}
/**
* Builds a TaskInfo from the given jobspec
* @param job - JobSpec to TaskInfo-ify
* @param offer - offer add extra data (SlaveId)
* @return TaskInfo fully formed
*/
private TaskInfo getTaskInfo(JobSpec job,Offer offer) {
TaskID taskId = TaskID.newBuilder().setValue(job.getJob().getId()).build();
TaskInfo info = TaskInfo.newBuilder().setName("task " + taskId.getValue())
.setTaskId(taskId)
.setSlaveId(offer.getSlaveId())
.addResources(Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(job.getJob().getLoadValue()*1.0)))
.addResources(Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(job.getJob().getLoadValue()*1024.0)))
.setExecutor(ExecutorInfo.newBuilder(executor)).setData(MesosUtilities.jobSpecToByteString(job)).build();
return info;
}
/**
* Checks all offers against jobs in order, assigning jobs to offers until each offer is full,
* or all jobs are gone.
* @param offers - offers to assign jobs to.
* @return List of <JobSpec,TaskInfo,Offer> tuples (assigned to each other).
*/
private List<JobSet> getJobAssignmentsJobs(List<Offer> offers) {
List<JobSet> list = new LinkedList<JobSet>();
for (Offer offer : offers)
{
double cpus = 0.0, mem = 0.0;
//Get the resources offered from this offer
for (Resource resc : offer.getResourcesList()) {
if (resc.getName().equals("cpus"))
cpus += resc.getScalar().getValue();
if (resc.getName().equals("mem"))
mem += resc.getScalar().getValue();
}
//Search for enough jobs to fill the offer
for (int i = 0;i < queue.getSize();i++)
{
try {
JobSpec job = queue.getNextJob();
double load = job.getJob().getLoadValue();
//Check if enough resources
if (cpus < load || mem < load*1024)
{
queue.requeueJob(job);
continue;
}
cpus -= load;
mem -= 1024*load;
JobSet tmp = new JobSet(job,getTaskInfo(job,offer),offer);
list.add(tmp);
//Not enough left, optimise and stop looking for jobs
if (cpus < 0.5 || mem <= 512.0)
break;
} catch (JobQueueException e) {throw new RuntimeException(e);}
}
//Optimization: break when no jobs
if (queue.getSize() == 0)
break;
}
return list;
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#slaveLost(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.SlaveID)
*/
@Override
public void slaveLost(SchedulerDriver schedDriver, SlaveID slave) {
LOG.log(Level.WARNING,"Mesos slave "+slave+" lost, reissuing jobs.");
//TODO: reregister jobs
}
/* (non-Javadoc)
* @see org.apache.mesos.Scheduler#statusUpdate(org.apache.mesos.SchedulerDriver, org.apache.mesos.Protos.TaskStatus)
*/
@Override
public void statusUpdate(SchedulerDriver schedDriver, TaskStatus taskStatus) {
//TODO: deliver messages, some rerun, some finish.
LOG.log(Level.INFO,"Status update: "+taskStatus.getMessage());
}
@Override
public void run() {
LOG.log(Level.INFO,"Attempting to run framework. Nothing to do.");
LOG.log(Level.FINEST, "Paradigm shift enabled.");
LOG.log(Level.FINEST, "Spin and poll surplanted by event based execution.");
LOG.log(Level.FINEST, "Mesos-OODT Fusion complete.");
//Don't run anything
return;
}
@Override
public boolean schedule(JobSpec spec) throws SchedulerException {
throw new NotImplementedException("Schedule is not called when using mesos.");
}
@Override
public ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException {
return null;
}
@Override
public Monitor getMonitor() {
return mon;
}
@Override
public Batchmgr getBatchmgr() {
return batch;
}
@Override
public JobQueue getJobQueue() {
// TODO Auto-generated method stub
return queue;
}
@Override
public QueueManager getQueueManager() {
// TODO Auto-generated method stub
return null;
}
//Job set used internally to simplify data transmission
private class JobSet {
public JobSpec job;
public TaskInfo task;
public Offer offer;
//Build a job set
public JobSet(JobSpec job, TaskInfo task, Offer offer) {
this.job = job;
this.task = task;
this.offer = offer;
}
}
}