blob: bf1be81bfcc5be09a254f8d30302c6ac1b858c04 [file] [log] [blame]
package org.apache.oodt.cas.resource.scheduler;
import java.net.URL;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.oodt.cas.resource.batchmgr.Batchmgr;
import org.apache.oodt.cas.resource.jobqueue.JobQueue;
import org.apache.oodt.cas.resource.monitor.Monitor;
import org.apache.oodt.cas.resource.structs.JobInstance;
import org.apache.oodt.cas.resource.structs.JobSpec;
import org.apache.oodt.cas.resource.structs.ResourceNode;
import org.apache.oodt.cas.resource.structs.SparkInstance;
import org.apache.oodt.cas.resource.structs.StreamingInstance;
import org.apache.oodt.cas.resource.structs.exceptions.JobInputException;
import org.apache.oodt.cas.resource.structs.exceptions.JobQueueException;
import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException;
import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.StreamingContext;
/**
* A scheduler that runs spark jobs on a spark cluster.
*
* @author starchmd
*
*/
public class SparkScheduler implements Scheduler {
SparkContext sc;
StreamingContext ssc;
JobQueue queue;
private static final Logger LOG = Logger.getLogger(SparkScheduler.class.getName());
public SparkScheduler(JobQueue queue) {
SparkConf conf = new SparkConf();
conf.setMaster(System.getProperty("resource.runner.spark.host","local"));
conf.setAppName("OODT Spark Job");
URL location = SparkScheduler.class.getResource('/'+SparkScheduler.class.getName().replace('.', '/')+".class");
conf.setJars(new String[]{"../lib/cas-resource-0.8-SNAPSHOT.jar"});
sc = new SparkContext(conf);
ssc = new StreamingContext(sc,new Duration(10000));
this.queue = queue;
}
/* (non-Javadoc)
* @see java.lang.Runnable#run()
*/
@Override
public void run() {
while (true) {
try {
if (queue.isEmpty())
continue;
JobSpec spec = queue.getNextJob();
Class<?> clazz = Class.forName(spec.getJob().getJobInstanceClassName());
if (!(clazz.newInstance() instanceof SparkInstance)) {
LOG.log(Level.WARNING,"Non-Spark job found ("+spec.getJob().getId()+") ignoring.");
continue;
}
this.schedule(spec);
} catch(SchedulerException e) {
LOG.log(Level.WARNING,"Scheduler exception detected: "+e.getMessage());
} catch (JobQueueException e) {
LOG.log(Level.WARNING,"Could not get next job from job-queue.");
} catch (ClassNotFoundException e) {
LOG.log(Level.WARNING,"Class not found: "+e.getMessage());
} catch (InstantiationException e) {
LOG.log(Level.WARNING,"Could not instantiate: "+e.getMessage());
} catch (IllegalAccessException e) {
LOG.log(Level.WARNING,"Could not access: "+e.getMessage());
}
}
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.resource.scheduler.Scheduler#schedule(org.apache.oodt.cas.resource.structs.JobSpec)
*/
@Override
public boolean schedule(JobSpec spec) throws SchedulerException {
try {
JobInstance instance = GenericResourceManagerObjectFactory.getJobInstanceFromClassName(spec.getJob().getJobInstanceClassName());
//spec.getIn().
SparkInstance sparkInstance = (SparkInstance) instance;
LOG.log(Level.INFO,"Setting SparkContext");
sparkInstance.setSparkContext(this.sc);
//Handle spark streaming
if (sparkInstance instanceof StreamingInstance) {
LOG.log(Level.INFO,"Found streaming instance, setting StreamingContext");
((StreamingInstance)sparkInstance).setStreamingContext(this.ssc);
}
sparkInstance.execute(spec.getIn());
} catch (JobInputException e) {
LOG.log(Level.WARNING,"Job input exception detected.");
throw new SchedulerException(e);
}
return false;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.resource.scheduler.Scheduler#nodeAvailable(org.apache.oodt.cas.resource.structs.JobSpec)
*/
@Override
public ResourceNode nodeAvailable(JobSpec spec) throws SchedulerException {
// TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.resource.scheduler.Scheduler#getMonitor()
*/
@Override
public Monitor getMonitor() {
return null;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.resource.scheduler.Scheduler#getBatchmgr()
*/
@Override
public Batchmgr getBatchmgr() {
return null;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.resource.scheduler.Scheduler#getJobQueue()
*/
@Override
public JobQueue getJobQueue() {
return queue;
}
/* (non-Javadoc)
* @see org.apache.oodt.cas.resource.scheduler.Scheduler#getQueueManager()
*/
@Override
public QueueManager getQueueManager() {
// TODO Auto-generated method stub
return null;
}
}