Add some metrics in the scheduler.
diff --git a/mysos/scheduler/http.py b/mysos/scheduler/http.py
index c38ae1c..afe74a5 100644
--- a/mysos/scheduler/http.py
+++ b/mysos/scheduler/http.py
@@ -10,7 +10,7 @@
class MysosServer(HttpServer):
- def __init__(self, scheduler, asset_dir):
+ def __init__(self, scheduler, asset_dir, metric_sampler):
super(MysosServer, self).__init__()
self._scheduler = scheduler
self._asset_dir = asset_dir
@@ -20,6 +20,8 @@
self._clusters_template = Template(filename=os.path.join(self._template_dir, 'clusters.html'))
+ self._metric_sampler = metric_sampler
+
@route('/clusters/<clustername>', method=['POST'])
def create(self, clustername):
"""Create a db cluster."""
@@ -72,3 +74,7 @@
@route('/static/<filepath:path>', method=['GET'])
def serve_static(self, filepath):
return static_file(filepath, root=self._static_dir)
+
+ @route("/vars.json")
+ def serve_vars_json(self, var=None, value=None):
+ return self._metric_sampler.sample()
diff --git a/mysos/scheduler/launcher.py b/mysos/scheduler/launcher.py
index 6167546..da245a8 100644
--- a/mysos/scheduler/launcher.py
+++ b/mysos/scheduler/launcher.py
@@ -132,9 +132,15 @@
@property
def cluster_info(self):
with self._lock:
- ClusterInfo = namedtuple('ClusterInfo', ('name, user, num_nodes'))
+ ClusterInfo = namedtuple('ClusterInfo', [
+ 'name', 'user', 'num_nodes', 'total_cpus', 'total_mem_mb', 'total_disk_mb'])
return ClusterInfo(
- name=self._cluster.name, user=self._cluster.user, num_nodes=self._cluster.num_nodes)
+ name=self._cluster.name,
+ user=self._cluster.user,
+ num_nodes=self._cluster.num_nodes,
+ total_cpus=self._cluster.cpus * self._cluster.num_nodes,
+ total_mem_mb=self._cluster.mem.as_(Data.MB) * self._cluster.num_nodes,
+ total_disk_mb=self._cluster.disk.as_(Data.MB) * self._cluster.num_nodes)
def launch(self, offer):
"""
diff --git a/mysos/scheduler/mysos_scheduler.py b/mysos/scheduler/mysos_scheduler.py
index 7ff6a25..d1d6c1a 100644
--- a/mysos/scheduler/mysos_scheduler.py
+++ b/mysos/scheduler/mysos_scheduler.py
@@ -17,6 +17,7 @@
from twitter.common.exceptions import ExceptionalThread
from twitter.common.http import HttpServer
from twitter.common.log.options import LogOptions
+from twitter.common.metrics import MetricSampler, RootMetrics
from twitter.common.quantity import Time
from twitter.common.quantity.parse_simple import InvalidTime, parse_time
import yaml
@@ -275,6 +276,8 @@
executor_environ=options.executor_environ,
framework_role=options.framework_role)
+ RootMetrics().register_observable('scheduler', scheduler)
+
if fw_principal and fw_secret:
cred = Credential(principal=fw_principal, secret=fw_secret)
scheduler_driver = mesos.native.MesosSchedulerDriver(
@@ -290,8 +293,11 @@
scheduler_driver.start()
+ metric_sampler = MetricSampler(RootMetrics())
+ metric_sampler.start()
+
server = HttpServer()
- server.mount_routes(MysosServer(scheduler, web_assets_dir))
+ server.mount_routes(MysosServer(scheduler, web_assets_dir, metric_sampler))
et = ExceptionalThread(
target=server.run, args=('0.0.0.0', options.api_port, 'cherrypy'))
diff --git a/mysos/scheduler/scheduler.py b/mysos/scheduler/scheduler.py
index 8c1e36d..9add38a 100644
--- a/mysos/scheduler/scheduler.py
+++ b/mysos/scheduler/scheduler.py
@@ -17,6 +17,7 @@
import mesos.interface.mesos_pb2 as mesos_pb2
from twitter.common import log
from twitter.common.collections.orderedset import OrderedSet
+from twitter.common.metrics import AtomicGauge, MutatorGauge, Observable
from twitter.common.quantity import Amount, Data, Time
from twitter.common.quantity.parse_simple import InvalidData, parse_data
@@ -31,7 +32,7 @@
INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION = Amount(sys.maxint / 2, Time.NANOSECONDS)
-class MysosScheduler(mesos.interface.Scheduler):
+class MysosScheduler(mesos.interface.Scheduler, Observable):
class Error(Exception): pass
@@ -112,6 +113,17 @@
self.connected = threading.Event() # An event set when the scheduler is first connected to
# Mesos. The scheduler tolerates later disconnections.
+ self._cluster_count = self.metrics.register(AtomicGauge('cluster_count', 0))
+
+ # Total resources requested by the scheduler's clients. When a cluster is created its resources
+ # are added to the total; when it's deleted its resources are subtracted from the total.
+ # NOTE: These are 'requested' resources that are independent of resources offered by Mesos or
+ # allocated to or used by Mysos tasks running on Mesos cluster.
+ self._total_requested_cpus = self.metrics.register(MutatorGauge('total_requested_cpus', 0.))
+ self._total_requested_mem_mb = self.metrics.register(MutatorGauge('total_requested_mem_mb', 0.))
+ self._total_requested_disk_mb = self.metrics.register(
+ MutatorGauge('total_requested_disk_mb', 0.))
+
# --- Public interface. ---
def create_cluster(self, cluster_name, cluster_user, num_nodes, size=None, backup_id=None):
"""
@@ -146,12 +158,20 @@
if not cluster_user:
raise self.InvalidUser('Invalid user name: %s' % cluster_user)
- if int(num_nodes) <= 0:
+ num_nodes = int(num_nodes)
+ if num_nodes <= 0:
raise ValueError("Invalid number of cluster nodes: %s" % num_nodes)
resources = parse_size(size)
log.info("Requested resources per instance for cluster %s: %s" % (resources, cluster_name))
+ self._total_requested_cpus.write(
+ self._total_requested_cpus.read() + resources['cpus'] * num_nodes)
+ self._total_requested_mem_mb.write(
+ self._total_requested_mem_mb.read() + resources['mem'].as_(Data.MB) * num_nodes)
+ self._total_requested_disk_mb.write(
+ self._total_requested_disk_mb.read() + resources['disk'].as_(Data.MB) * num_nodes)
+
self._state.clusters.add(cluster_name)
self._state_provider.dump_scheduler_state(self._state)
@@ -161,7 +181,7 @@
cluster_name,
cluster_user,
self._password_box.encrypt(password),
- int(num_nodes),
+ num_nodes,
cpus=resources['cpus'],
mem=resources['mem'],
disk=resources['disk'],
@@ -186,6 +206,8 @@
executor_environ=self._executor_environ,
framework_role=self._framework_role)
+ self._cluster_count.increment()
+
return get_cluster_path(self._discover_zk_url, cluster_name), password
def delete_cluster(self, cluster_name, password):
@@ -204,6 +226,14 @@
launcher.kill(password)
log.info("Attempted to kill cluster %s" % cluster_name)
+ self._cluster_count.decrement()
+ cluster_info = launcher.cluster_info
+ self._total_requested_cpus.write(self._total_requested_cpus.read() - cluster_info.total_cpus)
+ self._total_requested_mem_mb.write(
+ self._total_requested_mem_mb.read() - cluster_info.total_mem_mb)
+ self._total_requested_disk_mb.write(
+ self._total_requested_disk_mb.read() - cluster_info.total_disk_mb)
+
return get_cluster_path(self._discover_zk_url, cluster_name)
@property
@@ -289,6 +319,16 @@
self._executor_environ,
self._framework_role)
+ # Recover metrics from restored state.
+ self._cluster_count.increment()
+
+ cluster_info = self._launchers[cluster.name].cluster_info
+ self._total_requested_cpus.write(self._total_requested_cpus.read() + cluster_info.total_cpus)
+ self._total_requested_mem_mb.write(
+ self._total_requested_mem_mb.read() + cluster_info.total_mem_mb)
+ self._total_requested_disk_mb.write(
+ self._total_requested_disk_mb.read() + cluster_info.total_disk_mb)
+
log.info("Recovered %s clusters" % len(self._launchers))
@logged
@@ -409,7 +449,7 @@
try:
resources_ = json.loads(size)
resources = dict(
- cpus=resources_['cpus'],
+ cpus=float(resources_['cpus']),
mem=parse_data(resources_['mem']),
disk=parse_data(resources_['disk']))
except (TypeError, KeyError, ValueError, InvalidData):
diff --git a/setup.py b/setup.py
index ab01bf7..db81925 100644
--- a/setup.py
+++ b/setup.py
@@ -52,6 +52,7 @@
make_commons_requirement('http'),
make_commons_requirement('lang'),
make_commons_requirement('log'),
+ make_commons_requirement('metrics'),
make_commons_requirement('quantity'),
make_commons_requirement('zookeeper'),
],
diff --git a/tests/scheduler/test_http.py b/tests/scheduler/test_http.py
index 9c814a8..90cb678 100644
--- a/tests/scheduler/test_http.py
+++ b/tests/scheduler/test_http.py
@@ -9,6 +9,7 @@
import pytest
from webtest import AppError, TestApp
+from twitter.common.metrics import MetricSampler, RootMetrics
MYSOS_MODULE = 'mysos.scheduler'
@@ -44,7 +45,8 @@
def setUp(self):
self._scheduler = FakeScheduler()
- self._app = TestApp(MysosServer(self._scheduler, self.web_assets_dir).app)
+ self._app = TestApp(
+ MysosServer(self._scheduler, self.web_assets_dir, MetricSampler(RootMetrics())).app)
def test_create_cluster_successful(self):
response = ('test_cluster_url', 'passwordfortestcluster')
diff --git a/tests/scheduler/test_scheduler.py b/tests/scheduler/test_scheduler.py
index 0beb182..58ea8a3 100644
--- a/tests/scheduler/test_scheduler.py
+++ b/tests/scheduler/test_scheduler.py
@@ -18,7 +18,8 @@
from kazoo.handlers.threading import SequentialThreadingHandler
import mesos.interface.mesos_pb2 as mesos_pb2
from twitter.common import log
-from twitter.common.quantity import Amount, Time
+from twitter.common.metrics import RootMetrics
+from twitter.common.quantity import Amount, Data, Time
from zake.fake_client import FakeClient
from zake.fake_storage import FakeStorage
@@ -203,3 +204,37 @@
# a 'Filters' object.
assert (self._driver.method_calls["declineOffer"][0][0][1].refuse_seconds ==
INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS))
+
+ def test_scheduler_metrics(self):
+ scheduler_key = gen_encryption_key()
+
+ scheduler = MysosScheduler(
+ self._state,
+ self._state_provider,
+ self._framework_user,
+ "./executor.pex",
+ "cmd.sh",
+ self._zk_client,
+ self._zk_url,
+ Amount(5, Time.SECONDS),
+ "/etc/mysos/admin_keyfile.yml",
+ scheduler_key)
+
+ RootMetrics().register_observable('scheduler', scheduler)
+
+ scheduler.registered(self._driver, self._framework_id, object())
+ _, password = scheduler.create_cluster("cluster1", "mysql_user", 3)
+
+ sample = RootMetrics().sample()
+ assert sample['scheduler.cluster_count'] == 1
+ assert sample['scheduler.total_requested_mem_mb'] == DEFAULT_TASK_MEM.as_(Data.MB) * 3
+ assert sample['scheduler.total_requested_disk_mb'] == DEFAULT_TASK_DISK.as_(Data.MB) * 3
+ assert sample['scheduler.total_requested_cpus'] == DEFAULT_TASK_CPUS * 3
+
+ scheduler.delete_cluster("cluster1", password)
+
+ sample = RootMetrics().sample()
+ assert sample['scheduler.cluster_count'] == 0
+ assert sample['scheduler.total_requested_mem_mb'] == 0
+ assert sample['scheduler.total_requested_disk_mb'] == 0
+ assert sample['scheduler.total_requested_cpus'] == 0