blob: 58ea8a355c5f380766a6a34e30dfdfe4a21726e6 [file] [log] [blame]
import getpass
import os
import shutil
import tempfile
import unittest
from mysos.common.testing import Fake
from mysos.scheduler.scheduler import (
DEFAULT_TASK_CPUS,
DEFAULT_TASK_DISK,
DEFAULT_TASK_MEM,
INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION,
MysosScheduler)
from mysos.scheduler.launcher import create_resources
from mysos.scheduler.password import gen_encryption_key, PasswordBox
from mysos.scheduler.state import LocalStateProvider, MySQLCluster, Scheduler
from kazoo.handlers.threading import SequentialThreadingHandler
import mesos.interface.mesos_pb2 as mesos_pb2
from twitter.common import log
from twitter.common.metrics import RootMetrics
from twitter.common.quantity import Amount, Data, Time
from zake.fake_client import FakeClient
from zake.fake_storage import FakeStorage
import pytest
if 'MYSOS_DEBUG' in os.environ:
from twitter.common.log.options import LogOptions
LogOptions.set_stderr_log_level('google:DEBUG')
LogOptions.set_simple(True)
log.init('mysos_tests')
class FakeDriver(Fake): pass
class TestScheduler(unittest.TestCase):
def setUp(self):
self._driver = FakeDriver()
self._storage = FakeStorage(SequentialThreadingHandler())
self._zk_client = FakeClient(storage=self._storage)
self._zk_client.start()
self._framework_id = mesos_pb2.FrameworkID()
self._framework_id.value = "framework_id_0"
self._offer = mesos_pb2.Offer()
self._offer.id.value = "offer_id_0"
self._offer.framework_id.value = self._framework_id.value
self._offer.slave_id.value = "slave_id_0"
self._offer.hostname = "localhost"
resources = create_resources(
cpus=DEFAULT_TASK_CPUS * 3,
mem=DEFAULT_TASK_MEM * 3,
disk=DEFAULT_TASK_DISK * 3,
ports=set([10000, 10001, 10002]))
self._offer.resources.extend(resources)
self._framework_user = "framework_user"
self._zk_url = "zk://host/mysos/test"
self._cluster = MySQLCluster(
"cluster0", "user", "pass", 3, DEFAULT_TASK_CPUS, DEFAULT_TASK_MEM, DEFAULT_TASK_DISK)
self._tmpdir = tempfile.mkdtemp()
self._state_provider = LocalStateProvider(self._tmpdir)
framework_info = mesos_pb2.FrameworkInfo(
user=getpass.getuser(),
name="mysos",
checkpoint=False)
self._state = Scheduler(framework_info)
def tearDown(self):
shutil.rmtree(self._tmpdir, True) # Clean up after ourselves.
def test_scheduler_recovery(self):
scheduler_key = gen_encryption_key()
scheduler1 = MysosScheduler(
self._state,
self._state_provider,
self._framework_user,
"./executor.pex",
"cmd.sh",
self._zk_client,
self._zk_url,
Amount(5, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
scheduler_key)
scheduler1.registered(self._driver, self._framework_id, object())
scheduler1.create_cluster("cluster1", "mysql_user", 3)
scheduler1.resourceOffers(self._driver, [self._offer])
# One task is launched for one offer.
assert len(scheduler1._launchers["cluster1"]._cluster.tasks) == 1
with pytest.raises(MysosScheduler.ClusterExists):
scheduler1.create_cluster("cluster1", "mysql_user", 3)
# FrameworkID should have been persisted.
self._state = self._state_provider.load_scheduler_state()
assert self._state.framework_info.id.value == self._framework_id.value
# Simulate restart.
scheduler2 = MysosScheduler(
self._state,
self._state_provider,
self._framework_user,
"./executor.pex",
"cmd.sh",
self._zk_client,
self._zk_url,
Amount(5, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
scheduler_key)
# Scheduler always receives registered() with the same FrameworkID after failover.
scheduler2.registered(self._driver, self._framework_id, object())
assert len(scheduler2._launchers) == 1
assert scheduler2._launchers["cluster1"].cluster_name == "cluster1"
# Scheduler has recovered the cluster so it doesn't accept another of the same name.
with pytest.raises(MysosScheduler.ClusterExists):
scheduler2.create_cluster("cluster1", "mysql_user", 3)
def test_scheduler_recovery_failure_before_launch(self):
scheduler_key = gen_encryption_key()
scheduler1 = MysosScheduler(
self._state,
self._state_provider,
self._framework_user,
"./executor.pex",
"cmd.sh",
self._zk_client,
self._zk_url,
Amount(5, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
scheduler_key)
scheduler1.registered(self._driver, self._framework_id, object())
_, password = scheduler1.create_cluster("cluster1", "mysql_user", 3)
# Simulate restart before the task is successfully launched.
scheduler2 = MysosScheduler(
self._state,
self._state_provider,
self._framework_user,
"./executor.pex",
"cmd.sh",
self._zk_client,
self._zk_url,
Amount(5, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
scheduler_key)
assert len(scheduler2._launchers) == 0 # No launchers are recovered.
# Scheduler always receives registered() with the same FrameworkID after failover.
scheduler2.registered(self._driver, self._framework_id, object())
assert len(scheduler2._launchers) == 1
assert scheduler2._launchers["cluster1"].cluster_name == "cluster1"
password_box = PasswordBox(scheduler_key)
assert password_box.match(
password, scheduler2._launchers["cluster1"]._cluster.encrypted_password)
# Now offer the resources for this task.
scheduler2.resourceOffers(self._driver, [self._offer])
# One task is launched for the offer.
assert len(scheduler2._launchers["cluster1"]._cluster.active_tasks) == 1
# Scheduler has recovered the cluster so it doesn't accept another of the same name.
with pytest.raises(MysosScheduler.ClusterExists):
scheduler2.create_cluster("cluster1", "mysql_user", 3)
def test_incompatible_resource_role(self):
scheduler1 = MysosScheduler(
self._state,
self._state_provider,
self._framework_user,
"./executor.pex",
"cmd.sh",
self._zk_client,
self._zk_url,
Amount(5, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
gen_encryption_key(),
framework_role='mysos') # Require 'mysos' but the resources are in '*'.
scheduler1.registered(self._driver, self._framework_id, object())
scheduler1.create_cluster("cluster1", "mysql_user", 3)
scheduler1.resourceOffers(self._driver, [self._offer])
assert "declineOffer" in self._driver.method_calls
assert len(self._driver.method_calls["declineOffer"]) == 1
# [0][0][1]: [First declineOffer call][The positional args][The first positional arg], which is
# a 'Filters' object.
assert (self._driver.method_calls["declineOffer"][0][0][1].refuse_seconds ==
INCOMPATIBLE_ROLE_OFFER_REFUSE_DURATION.as_(Time.SECONDS))
def test_scheduler_metrics(self):
scheduler_key = gen_encryption_key()
scheduler = MysosScheduler(
self._state,
self._state_provider,
self._framework_user,
"./executor.pex",
"cmd.sh",
self._zk_client,
self._zk_url,
Amount(5, Time.SECONDS),
"/etc/mysos/admin_keyfile.yml",
scheduler_key)
RootMetrics().register_observable('scheduler', scheduler)
scheduler.registered(self._driver, self._framework_id, object())
_, password = scheduler.create_cluster("cluster1", "mysql_user", 3)
sample = RootMetrics().sample()
assert sample['scheduler.cluster_count'] == 1
assert sample['scheduler.total_requested_mem_mb'] == DEFAULT_TASK_MEM.as_(Data.MB) * 3
assert sample['scheduler.total_requested_disk_mb'] == DEFAULT_TASK_DISK.as_(Data.MB) * 3
assert sample['scheduler.total_requested_cpus'] == DEFAULT_TASK_CPUS * 3
scheduler.delete_cluster("cluster1", password)
sample = RootMetrics().sample()
assert sample['scheduler.cluster_count'] == 0
assert sample['scheduler.total_requested_mem_mb'] == 0
assert sample['scheduler.total_requested_disk_mb'] == 0
assert sample['scheduler.total_requested_cpus'] == 0