blob: 39d4e7ba681b6437e978fadbf58e92e7e32345f1 [file] [log] [blame]
#!/usr/bin/env python
import mesos
import os
import sys
import time
import httplib
import Queue
import threading
from optparse import OptionParser
from subprocess import *
from socket import gethostname
MIN_SERVERS = 1
START_THRESHOLD = 25
KILL_THRESHOLD = 5
HAPROXY_EXE = "/root/haproxy-1.3.20/haproxy" #EC2
#HAPROXY_EXE = "/scratch/haproxy-1.3.25/haproxy" #Rcluster
class ApacheWebFWScheduler(mesos.Scheduler):
def __init__(self):
mesos.Scheduler.__init__(self)
self.lock = threading.RLock()
self.id = 0
self.haproxy = -1
self.reconfigs = 0
self.servers = {}
self.overloaded = False
def registered(self, driver, fid):
print "Mesos haproxy+apache scheduler registered as framework #%s" % fid
self.driver = driver
def getFrameworkName(self, driver):
return "haproxy+apache"
def getExecutorInfo(self, driver):
execPath = os.path.join(os.getcwd(), "startapache.sh")
return mesos.ExecutorInfo(execPath, "")
def reconfigure(self):
print "reconfiguring haproxy"
name = "/tmp/haproxy.conf.%d" % self.reconfigs
with open(name, 'w') as config:
with open('haproxy.config.template', 'r') as template:
for line in template:
config.write(line)
for id, host in self.servers.iteritems():
config.write(" ")
config.write("server %d %s:80 check\n" % (id, host))
cmd = []
if self.haproxy != -1:
cmd = [HAPROXY_EXE,
"-f",
name,
"-sf",
str(self.haproxy.pid)]
else:
cmd = [HAPROXY_EXE,
"-f",
name]
self.haproxy = Popen(cmd, shell = False)
self.reconfigs += 1
def resourceOffer(self, driver, oid, slave_offers):
print "Got resource offer %s with %s slots." % (oid, len(slave_offers))
self.lock.acquire()
tasks = []
for offer in slave_offers:
if offer.host in self.servers.values():
print "Rejecting slot on host " + offer.host + " because we've launched a server on that machine already."
#print "self.servers currently looks like: " + str(self.servers)
elif not self.overloaded and len(self.servers) > 0:
print "Rejecting slot because we've launched enough tasks."
elif int(offer.params['mem']) < 1024:
print "Rejecting offer because it doesn't contain enough memory (it has " + offer.params['mem'] + " and we need 1024mb."
elif int(offer.params['cpus']) < 1:
print "Rejecting offer because it doesn't contain enough CPUs."
else:
print "Offer is for " + offer.params['cpus'] + " CPUS and " + offer.params["mem"] + " MB on host " + offer.host
params = {"cpus": "1", "mem": "1024"}
td = mesos.TaskDescription(self.id, offer.slaveId, "server %s" % self.id, params, "")
print "Accepting task, id=" + str(self.id) + ", params: " + params['cpus'] + " CPUS, and " + params['mem'] + " MB, on node " + offer.host
tasks.append(td)
self.servers[self.id] = offer.host
self.id += 1
self.overloaded = False
driver.replyToOffer(oid, tasks, {"timeout":"1"})
#driver.replyToOffer(oid, tasks, {})
print "done with resourceOffer()"
self.lock.release()
def statusUpdate(self, driver, status):
print "received status update from taskID " + str(status.taskId) + ", with state: " + str(status.state)
reconfigured = False
self.lock.acquire()
if status.taskId in self.servers.keys():
if status.state == mesos.TASK_STARTING:
print "Task " + str(status.taskId) + " reported that it is STARTING."
del self.servers[status.taskId]
self.reconfigure()
reconfigured = True
if status.state == mesos.TASK_RUNNING:
print "Task " + str(status.taskId) + " reported that it is RUNNING, reconfiguring haproxy to include it in webfarm now."
self.reconfigure()
reconfigured = True
if status.state == mesos.TASK_FINISHED:
del self.servers[status.taskId]
print "Task " + str(status.taskId) + " reported FINISHED (state " + status.state + ")."
self.reconfigure()
reconfigured = True
if status.state == mesos.TASK_FAILED:
print "Task " + str(status.taskId) + " reported that it FAILED!"
del self.servers[status.taskId]
self.reconfigure()
reconfigured = True
if status.state == mesos.TASK_KILLED:
print "Task " + str(status.taskId) + " reported that it was KILLED!"
del self.servers[status.taskId]
self.reconfigure()
reconfigured = True
if status.state == mesos.TASK_LOST:
print "Task " + str(status.taskId) + " reported was LOST!"
del self.servers[status.taskId]
self.reconfigure()
reconfigured = True
self.lock.release()
if reconfigured:
None#driver.reviveOffers()
print "done in statusupdate"
def scaleUp(self):
print "SCALING UP"
self.lock.acquire()
self.overloaded = True
self.lock.release()
def scaleDown(self, id):
print "SCALING DOWN (removing server %d)" % id
kill = False
self.lock.acquire()
if self.overloaded:
self.overloaded = False
else:
kill = True
self.lock.release()
if kill:
self.driver.killTask(id)
def monitor(sched):
print "in MONITOR()"
while True:
time.sleep(1)
print "done sleeping"
try:
conn = httplib.HTTPConnection("r3.millennium.berkeley.edu:9001")
print "done creating connection"
conn.request("GET", "/stats;csv")
print "done with request()"
res = conn.getresponse()
print "testing response status"
if (res.status != 200):
print "response != 200"
continue
else:
print "got some stats"
data = res.read()
lines = data.split('\n')[2:-2]
data = data.split('\n')
data = data[1].split(',')
if int(data[33]) >= START_THRESHOLD:
sched.scaleUp()
elif int(data[4]) <= KILL_THRESHOLD:
minload, minid = (sys.maxint, 0)
for l in lines:
cols = l.split(',')
id = int(cols[1])
load = int(cols[4])
if load < minload:
minload = load
minid = id
if len(lines) > MIN_SERVERS and minload == 0:
sched.scaleDown(minid)
conn.close()
except Exception, e:
print "exception in monitor()"
continue
print "done in MONITOR()"
if __name__ == "__main__":
parser = OptionParser(usage = "Usage: %prog mesos_master")
(options,args) = parser.parse_args()
if len(args) < 1:
print >> sys.stderr, "At least one parameter required."
print >> sys.stderr, "Use --help to show usage."
exit(2)
print "sched = ApacheWebFWScheduler()"
sched = ApacheWebFWScheduler()
print "Connecting to mesos master %s" % args[0]
driver = mesos.MesosSchedulerDriver(sched, sys.argv[1])
threading.Thread(target = monitor, args=[sched]).start()
driver.run()
print "Scheduler finished!"