blob: c22ef9180c3aaf447c0288ee4948d0c8067791b7 [file] [log] [blame]
#!/usr/bin/env python
import mesos
import os
import sys
import time
import httplib
import Queue
import threading
import re
import socket
import torquelib
import time
import logging
import logging.handlers
from optparse import OptionParser
from subprocess import *
from socket import gethostname
PBS_SERVER_FILE = "/var/spool/torque/server_name"
EVENT_LOG_FILE = "log_fw_utilization.txt"
LOG_FILE = "log.txt"
SCHEDULER_ITERATION = 2 #number of seconds torque waits before looping through
#the queue to try to match resources to jobs. default
#is 10min (ie 600) but we want it to be low so jobs
#will run as soon as the framework has acquired enough
#resources
SAFE_ALLOCATION = {"cpus":48,"mem":128} #just set statically for now, 128MB
MIN_SLOT_SIZE = {"cpus":"1","mem":1024} #1GB
MIN_SLOTS_HELD = 0 #keep at least this many slots even if none are needed
eventlog = logging.getLogger("event_logger")
eventlog.setLevel(logging.DEBUG)
fh = logging.FileHandler(EVENT_LOG_FILE,'w') #create handler
fh.setFormatter(logging.Formatter("%(asctime)s %(message)s"))
eventlog.addHandler(fh)
#Something special about this file makes logging not work normally
#I think it might be swig? the StreamHandler prints at DEBUG level
#even though I setLevel to INFO
ch = logging.StreamHandler()
ch.setLevel(logging.INFO)
fh = logging.FileHandler(LOG_FILE,"w")
fh.setLevel(logging.DEBUG)
driverlog = logging.getLogger("driver_logger")
driverlog.setLevel(logging.DEBUG)
driverlog.addHandler(fh)
#driverlog.addHandler(ch)
monitorlog = logging.getLogger("monitor_logger")
monitorlog.setLevel(logging.DEBUG)
monitorlog.addHandler(fh)
#monitorlog.addHandler(ch)
class MyScheduler(mesos.Scheduler):
def __init__(self, ip):
mesos.Scheduler.__init__(self)
self.lock = threading.RLock()
self.id = 0
self.ip = ip
self.servers = {}
self.overloaded = False
self.numToRegister = MIN_SLOTS_HELD
def getExecutorInfo(self, driver):
execPath = os.path.join(os.getcwd(), "start_pbs_mom.sh")
initArg = self.ip # tell executor which node the pbs_server is running on
driverlog.info("in getExecutorInfo, setting execPath = " + execPath + " and initArg = " + initArg)
return mesos.ExecutorInfo(execPath, initArg)
def registered(self, driver, fid):
driverlog.info("Mesos torque framwork registered with frameworkID %s" % fid)
def resourceOffer(self, driver, oid, slave_offers):
self.driver = driver
driverlog.debug("Got slot offer %d" % oid)
self.lock.acquire()
driverlog.debug("resourceOffer() acquired lock")
tasks = []
for offer in slave_offers:
# if we haven't registered this node, accept slot & register w pbs_server
#TODO: check to see if slot is big enough
if self.numToRegister <= 0:
driverlog.debug("Rejecting slot, no need for more slaves")
continue
if offer.host in self.servers.values():
driverlog.debug("Rejecting slot, already registered node " + offer.host)
continue
if len(self.servers) >= SAFE_ALLOCATION["cpus"]:
driverlog.debug("Rejecting slot, already at safe allocation (i.e. %d CPUS)" % SAFE_ALLOCATION["cpus"])
continue
driverlog.info("Need %d more nodes, so accepting slot, setting up params for it..." % self.numToRegister)
params = {"cpus": "1", "mem": "1024"}
td = mesos.TaskDescription(
self.id, offer.slaveId, "task %d" % self.id, params, "")
tasks.append(td)
self.servers[self.id] = offer.host
self.regComputeNode(offer.host)
self.numToRegister -= 1
self.id += 1
driverlog.info("writing logfile")
eventlog.info("%d %d" % (time.time(),len(self.servers)))
driverlog.info("done writing logfile")
driverlog.info("self.id now set to " + str(self.id))
#print "---"
driver.replyToOffer(oid, tasks, {"timeout": "1"})
self.lock.release()
driverlog.debug("resourceOffer() finished, released lock\n\n")
def statusUpdate(self, driver, status):
driverlog.info("got status update from TID %s, state is: %s, data is: %s" %(status.taskId,status.state,status.data))
def regComputeNode(self, new_node):
driverlog.info("registering new compute node, "+new_node+", with pbs_server")
driverlog.info("checking to see if node is registered with server already")
#nodes = Popen("qmgr -c 'list node'", shell=True, stdout=PIPE).stdout
nodes = Popen("pbsnodes", shell=True, stdout=PIPE).stdout
driverlog.info("output of pbsnodes command is: ")
for line in nodes:
driverlog.info(line)
if line.find(new_node) != -1:
driverlog.info("Warn: tried to register node that's already registered, skipping")
return
#add node to server
driverlog.info("registering node with command: qmgr -c create node " + new_node)
qmgr_add = Popen("qmgr -c \"create node " + new_node + "\"", shell=True, stdout=PIPE).stdout
driverlog.info("output of qmgr:")
for line in qmgr_add: driverlog.info(line)
def unregComputeNode(self, node_name):
#remove node from server
monitorlog.info("removing node from pbs_server: qmgr -c delete node " + node_name)
monitorlog.info(Popen('qmgr -c "delete node ' + node_name + '"', shell=True, stdout=PIPE).stdout)
#unreg up to N random compute nodes, leave at least one
def unregNNodes(self, numNodes):
monitorlog.debug("unregNNodes called with arg %d" % numNodes)
if numNodes > len(self.servers)-MIN_SLOTS_HELD:
monitorlog.debug("... however, only unregistering %d nodes, leaving one alive" % (len(self.servers)-MIN_SLOTS_HELD))
toKill = min(numNodes,len(self.servers)-MIN_SLOTS_HELD)
monitorlog.debug("getting and filtering list of nodes using torquelib")
noJobs = lambda x: x.state != "job-exclusive"
inactiveNodes = map(lambda x: x.name,filter(noJobs, torquelib.getNodes()))
monitorlog.debug("victim pool of inactive nodes:")
for inode in inactiveNodes:
monitorlog.debug(inode)
for tid, hostname in self.servers.items():
if len(self.servers) > MIN_SLOTS_HELD and toKill > 0 and hostname in inactiveNodes:
monitorlog.info("We still have to kill %d of the %d compute nodes which master is tracking" % (toKill, len(self.servers)))
monitorlog.info("unregistering node " + str(hostname))
self.unregComputeNode(hostname)
self.servers.pop(tid)
eventlog.info("%d %d" % (time.time(),len(self.servers)))
toKill = toKill - 1
monitorlog.info("killing corresponding task with tid %d" % tid)
self.driver.killTask(tid)
if toKill > 0:
monitorlog.warn("Done killing. We were supposed to kill %d nodes, but only found and killed %d free nodes" % (numNodes, numNodes-toKill))
def getFrameworkName(self, driver):
return "Mesos Torque Framework"
def monitor(sched):
while True:
time.sleep(1)
monitorlog.debug("monitor thread acquiring lock")
sched.lock.acquire()
monitorlog.debug("computing num nodes needed to satisfy eligable jobs in queue")
needed = 0
jobs = torquelib.getActiveJobs()
monitorlog.debug("retreived jobs in queue, count: %d" % len(jobs))
for j in jobs:
#WARNING: this check should only be used if torque is using fifo queue
#if needed + j.needsnodes <= SAFE_ALLOCATION:
monitorlog.debug("job resource list is: " + str(j.resourceList))
needed += int(j.resourceList["nodect"])
monitorlog.debug("number of nodes needed by jobs in queue: %d" % needed)
numToRelease = len(sched.servers) - needed
monitorlog.debug("number of nodes to release is %d - %d" % (len(sched.servers),needed))
if numToRelease > 0:
sched.unregNNodes(numToRelease)
sched.numToRegister = 0
else:
monitorlog.debug("monitor updating sched.numToRegister from %d to %d" % (sched.numToRegister, numToRelease * -1))
sched.numToRegister = numToRelease * -1
sched.lock.release()
monitorlog.debug("monitor thread releasing lock")
monitorlog.debug("\n")
if __name__ == "__main__":
parser = OptionParser(usage = "Usage: %prog mesos_master")
(options,args) = parser.parse_args()
if len(args) < 1:
print >> sys.stderr, "At least one parameter required."
print >> sys.stderr, "Use --help to show usage."
exit(2)
fqdn = socket.getfqdn()
ip = socket.gethostbyname(gethostname())
#monitorlog.info("running killall pbs_server")
#Popen("killall pbs_server", shell=True)
#time.sleep(1)
#monitorlog.info("writing $(TORQUECFG)/server_name file with fqdn of pbs_server: " + fqdn)
#Popen("touch %s" % PBS_SERVER_FILE, shell=True)
#FILE = open(PBS_SERVER_FILE,'w')
#FILE.write(fqdn)
#FILE.close()
monitorlog.info("starting pbs_server")
#Popen("/etc/init.d/pbs_server start", shell=True)
Popen("pbs_server", shell=True)
#time.sleep(2)
# monitorlog.info("running command: qmgr -c \"set queue batch resources_available.nodes=%s\"" % SAFE_ALLOCATION["cpus"])
# Popen("qmgr -c \"set queue batch resources_available.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
# Popen("qmgr -c \"set server resources_available.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
# #these lines might not be necessary since we hacked the torque fifo scheduler
# Popen("qmgr -c \"set queue batch resources_max.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
# Popen("qmgr -c \"set server resources_max.nodect=%s\"" % SAFE_ALLOCATION["cpus"], shell=True)
# Popen("qmgr -c \"set server scheduler_iteration=%s\"" % SCHEDULER_ITERATION, shell=True)
# outp = Popen("qmgr -c \"l queue batch\"", shell=True, stdout=PIPE).stdout
# for l in outp:
# monitorlog.info(l)
# monitorlog.info("RE-killing pbs_server for resources_available setting to take effect")
# #Popen("/etc/init.d/pbs_server start", shell=True)
# Popen("qterm", shell=True)
# time.sleep(1)
# monitorlog.info("RE-starting pbs_server for resources_available setting to take effect")
#Popen("pbs_server", shell=True)
# monitorlog.debug("qmgr list queue settings: ")
# output = Popen("qmgr -c 'l q batch'", shell=True, stdout=PIPE).stdout
# for line in output:
# monitorlog.debug(line)
# monitorlog.info("running killall pbs_sched")
# Popen("killall pbs_sched", shell=True)
# #time.sleep(2)
monitorlog.info("starting pbs_scheduler")
#Popen("/etc/init.d/pbs_sched start", shell=True)
Popen("pbs_sched", shell=True)
#ip = Popen("hostname -i", shell=True, stdout=PIPE).stdout.readline().rstrip() #linux
#ip = Popen("ifconfig en1 | awk '/inet / { print $2 }'", shell=True, stdout=PIPE).stdout.readline().rstrip() # os x
monitorlog.info("Remembering IP address of scheduler (" + ip + "), and fqdn: " + fqdn)
monitorlog.info("Connecting to mesos master %s" % args[0])
sched = MyScheduler(fqdn)
threading.Thread(target = monitor, args=[sched]).start()
mesos.MesosSchedulerDriver(sched, args[0]).run()
monitorlog.info("Finished!")