#!/usr/bin/env python
import mesos.interface
import mesos.native
from mesos.interface import mesos_pb2
import os
import sys
import time
import re
import threading
from optparse import OptionParser
from subprocess import *
def mpiexec():
print "We've launched all our MPDs; waiting for them to come up"
while countMPDs() <= TOTAL_MPDS:
print "...waiting on MPD(s)..."
print "Got %d mpd(s), running mpiexec" % TOTAL_MPDS
print "Running mpiexec"
call([MPICH2PATH + 'mpiexec', '-1', '-n', str(TOTAL_MPDS)] + MPI_PROGRAM)
except OSError,e:
print >> sys.stderr, "Error executing mpiexec"
print >> sys.stderr, e
print "mpiexec completed, calling mpdallexit %s" % MPD_PID
# Ring/slave mpd daemons will be killed on executor's shutdown() if
# framework scheduler fails to call 'mpdallexit'.
call([MPICH2PATH + 'mpdallexit', MPD_PID])
class MPIScheduler(mesos.interface.Scheduler):
def __init__(self, options, ip, port):
self.mpdsLaunched = 0
self.mpdsFinished = 0
self.ip = ip
self.port = port
self.options = options
self.startedExec = False
def registered(self, driver, fid, masterInfo):
print "Mesos MPI scheduler and mpd running at %s:%s" % (self.ip, self.port)
print "Registered with framework ID %s" % fid.value
def resourceOffers(self, driver, offers):
print "Got %d resource offers" % len(offers)
for offer in offers:
print "Considering resource offer %s from %s" % (, offer.hostname)
if self.mpdsLaunched == TOTAL_MPDS:
print "Declining permanently because we have already launched enough tasks"
cpus = 0
mem = 0
tasks = []
for resource in offer.resources:
if == "cpus":
cpus = resource.scalar.value
elif == "mem":
mem = resource.scalar.value
if cpus < CPUS or mem < MEM:
print "Declining offer due to too few resources"
tid = self.mpdsLaunched
self.mpdsLaunched += 1
print "Accepting offer on %s to start mpd %d" % (offer.hostname, tid)
task = mesos_pb2.TaskInfo()
task.task_id.value = str(tid)
task.slave_id.value = offer.slave_id.value = "task %d " % tid
cpus = task.resources.add() = "cpus"
cpus.type = mesos_pb2.Value.SCALAR
cpus.scalar.value = CPUS
mem = task.resources.add() = "mem"
mem.type = mesos_pb2.Value.SCALAR
mem.scalar.value = MEM
task.command.value = "%smpd --noconsole --ncpus=%d --host=%s --port=%s" % (MPICH2PATH, CPUS, self.ip, self.port)
print "Replying to offer: launching mpd %d on host %s" % (tid, offer.hostname)
driver.launchTasks(, tasks)
if not self.startedExec and self.mpdsLaunched == TOTAL_MPDS:
threading.Thread(target = mpiexec).start()
self.startedExec = True
def statusUpdate(self, driver, update):
print "Task %s in state %s" % (update.task_id.value, update.state)
if (update.state == mesos_pb2.TASK_FAILED or
update.state == mesos_pb2.TASK_KILLED or
update.state == mesos_pb2.TASK_LOST):
print "A task finished unexpectedly, calling mpdexit on %s" % MPD_PID
call([MPICH2PATH + "mpdexit", MPD_PID])
if (update.state == mesos_pb2.TASK_FINISHED):
self.mpdsFinished += 1
if self.mpdsFinished == TOTAL_MPDS:
print "All tasks done, all mpd's closed, exiting"
def countMPDs():
mpdtraceproc = Popen(MPICH2PATH + "mpdtrace -l", shell=True, stdout=PIPE)
mpdtraceline = mpdtraceproc.communicate()[0]
return mpdtraceline.count("\n")
except OSError,e:
print >>sys.stderr, "Error starting mpd or mpdtrace"
print >>sys.stderr, e
def parseIpPort(s):
ba ="([^_]*)_([0-9]*)", s)
ip =
port =
return (ip, port)
if __name__ == "__main__":
parser = OptionParser(usage="Usage: %prog [options] mesos_master mpi_program")
parser.add_option("-n", "--num",
help="number of mpd's to allocate (default 1)",
dest="num", type="int", default=1)
parser.add_option("-c", "--cpus",
help="number of cpus per mpd (default 1)",
dest="cpus", type="int", default=1)
help="number of MB of memory per mpd (default 1GB)",
dest="mem", type="int", default=1024)
help="framework name", dest="name", type="string")
help="path to look for MPICH2 binaries (mpd, mpiexec, etc.)",
dest="path", type="string", default="")
help="alt. interface hostname for what mpd is running on (for scheduler)",
dest="ifhn_master", type="string")
# Add options to configure cpus and mem.
(options,args) = parser.parse_args()
if len(args) < 2:
print >> sys.stderr, "At least two parameters required."
print >> sys.stderr, "Use --help to show usage."
TOTAL_MPDS = options.num
CPUS = options.cpus
MEM = options.mem
MPI_PROGRAM = args[1:]
# Give options.path a trailing '/', if it doesn't have one already.
MPICH2PATH = os.path.join(options.path, "")
print "Connecting to Mesos master %s" % args[0]
mpd_cmd = MPICH2PATH + "mpd"
mpdtrace_cmd = MPICH2PATH + "mpdtrace -l"
if options.ifhn_master is not None:
call([mpd_cmd, "--daemon", "--ifhn=" + options.ifhn_master])
call([mpd_cmd, "--daemon"])
mpdtraceproc = Popen(mpdtrace_cmd, shell=True, stdout=PIPE)
mpdtraceout = mpdtraceproc.communicate()[0]
except OSError,e:
print >> sys.stderr, "Error starting mpd or mpdtrace"
print >> sys.stderr, e
(ip,port) = parseIpPort(mpdtraceout)
MPD_PID = mpdtraceout.split(" ")[0]
print "MPD_PID is %s" % MPD_PID
scheduler = MPIScheduler(options, ip, port)
framework = mesos_pb2.FrameworkInfo()
framework.user = ""
if is not None: =
else: = "MPI: %s" % MPI_PROGRAM[0]
driver = mesos.native.MesosSchedulerDriver(
sys.exit(0 if == mesos_pb2.DRIVER_STOPPED else 1)