blob: 94d6232da504a86aca558b3655b3dc881e6d981d [file] [log] [blame]
#!/usr/bin/env python
import os
import pickle
import re
import sys
import time
from optparse import OptionParser
import mesos
# Default resources to use for the command we execute.
DEFAULT_CPUS = 1
DEFAULT_MEM = 512
# The scheduler for mesos-submit, running on the machine that the user
# executed mesos-submit on, launches a single task in the cluster with
# the required amounts of CPUs and memory, and then waits for it to become
# the framework's scheduler using the scheduler failover mechanism.
# It then exits mesos-submit successfully, while the task goes on to run
# the user's command.
#
# Note that we pass our framework ID, master URL and command to the executor
# using the task's argument field.
#
# We currently don't recover if our task fails for some reason, but we
# do print its state transitions so the user can notice this.
class SubmitScheduler(mesos.Scheduler):
def __init__(self, options, master, command):
mesos.Scheduler.__init__(self)
if options.name != None:
self.framework_name = options.name
else:
self.framework_name = "mesos-submit " + command
self.cpus = options.cpus
self.mem = options.mem
self.master = master
self.command = command
self.task_launched = False
def getFrameworkName(self, driver):
return self.framework_name
def getExecutorInfo(self, driver):
frameworkDir = os.path.abspath(os.path.dirname(sys.argv[0]))
executorPath = os.path.join(frameworkDir, "executor")
return mesos.ExecutorInfo(executorPath, "")
def registered(self, driver, fid):
print "Registered with Mesos, FID = %s" % fid
self.fid = "" + fid
def resourceOffer(self, driver, oid, offers):
if self.task_launched:
# Since we already launched our task, we reject the offer
driver.replyToOffer(oid, [], {"timeout": "-1"})
else:
for offer in offers:
cpus = int(offer.params["cpus"])
mem = int(offer.params["mem"])
if cpus >= self.cpus and mem >= self.mem:
print "Accepting slot on slave %s (%s)" % (offer.slaveId, offer.host)
params = {"cpus": "%d" % self.cpus, "mem": "%d" % self.mem}
arg = [self.fid, self.framework_name, self.master, self.command]
task = mesos.TaskDescription(0, offer.slaveId, "task", params,
pickle.dumps(arg))
driver.replyToOffer(oid, [task], {"timeout": "1"})
self.task_launched = True
return
def statusUpdate(self, driver, update):
print "Task %d in state %d" % (update.taskId, update.state)
def error(self, driver, code, message):
if message == "Framework failover":
# Scheduler failover is currently reported by this error message;
# this is kind of a brittle way to detect it, but it's all we can do now.
print "In-cluster scheduler started; exiting mesos-submit"
else:
print "Error from Mesos: %s (error code: %d)" % (message, code)
driver.stop()
if __name__ == "__main__":
parser = OptionParser(usage="Usage: %prog [options] <master_url> <command>")
parser.add_option("-c","--cpus",
help="number of CPUs to request (default: 1)",
dest="cpus", type="int", default=DEFAULT_CPUS)
parser.add_option("-m","--mem",
help="MB of memory to request (default: 512)",
dest="mem", type="int", default=DEFAULT_MEM)
parser.add_option("-n","--name",
help="Framework name", dest="name", type="string")
(options,args) = parser.parse_args()
if len(args) < 2:
parser.error("At least two parameters are required.")
exit(2)
master = args[0]
command = " ".join(args[1:])
print "Connecting to mesos master %s" % master
sched = SubmitScheduler(options, master, command)
mesos.MesosSchedulerDriver(sched, master).run()