| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import contextlib |
| import getpass |
| import os |
| import signal |
| import subprocess |
| import tempfile |
| import threading |
| import time |
| from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer |
| from collections import defaultdict |
| |
| import mock |
| from mesos.interface import mesos_pb2 |
| from thrift.TSerialization import serialize |
| from twitter.common import log |
| from twitter.common.contextutil import temporary_dir |
| from twitter.common.dirutil import safe_mkdtemp, safe_rmtree |
| from twitter.common.exceptions import ExceptionalThread |
| from twitter.common.log.options import LogOptions |
| from twitter.common.quantity import Amount, Time |
| |
| from apache.aurora.config.schema.base import ( |
| MB, |
| HealthCheckConfig, |
| MesosJob, |
| MesosTaskInstance, |
| Process, |
| Resources, |
| Task |
| ) |
| from apache.aurora.executor.aurora_executor import AuroraExecutor, propagate_deadline |
| from apache.aurora.executor.common.executor_timeout import ExecutorTimeout |
| from apache.aurora.executor.common.health_checker import HealthCheckerProvider |
| from apache.aurora.executor.common.sandbox import DirectorySandbox, SandboxProvider |
| from apache.aurora.executor.common.status_checker import ChainedStatusChecker |
| from apache.aurora.executor.common.task_runner import TaskError |
| from apache.aurora.executor.status_manager import StatusManager |
| from apache.aurora.executor.thermos_task_runner import ( |
| DefaultThermosTaskRunnerProvider, |
| ThermosTaskRunner |
| ) |
| from apache.thermos.core.runner import TaskRunner |
| from apache.thermos.monitoring.monitor import TaskMonitor |
| |
| from gen.apache.aurora.api.constants import AURORA_EXECUTOR_NAME |
| from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, JobKey, TaskConfig |
| |
| if 'THERMOS_DEBUG' in os.environ: |
| LogOptions.set_stderr_log_level('google:DEBUG') |
| LogOptions.set_simple(True) |
| log.init('executor_logger') |
| |
| |
| class FastThermosExecutor(AuroraExecutor): |
| STOP_WAIT = Amount(0, Time.SECONDS) |
| |
| |
| class FastStatusManager(StatusManager): |
| POLL_WAIT = Amount(10, Time.MILLISECONDS) |
| |
| |
| class DefaultTestSandboxProvider(SandboxProvider): |
| def from_assigned_task(self, assigned_task, **kwargs): |
| return DirectorySandbox(safe_mkdtemp(), **kwargs) |
| |
| |
| class FailingStartingTaskRunner(ThermosTaskRunner): |
| def start(self): |
| raise TaskError('I am an idiot!') |
| |
| |
| class FailingSandbox(DirectorySandbox): |
| def __init__(self, root, exception_type, **kwargs): |
| self._exception_type = exception_type |
| super(FailingSandbox, self).__init__(root) |
| |
| def create(self): |
| raise self._exception_type('Could not create directory!') |
| |
| |
| class FailingSandboxProvider(SandboxProvider): |
| def __init__(self, exception_type=DirectorySandbox.CreationError): |
| self._exception_type = exception_type |
| |
| def from_assigned_task(self, assigned_task, **kwargs): |
| return FailingSandbox(safe_mkdtemp(), exception_type=self._exception_type, **kwargs) |
| |
| |
| class FileSystemImageTestSandboxProvider(SandboxProvider): |
| class FileSystemImageSandboxTest(DirectorySandbox): |
| def create(self): |
| pass |
| |
| @property |
| def is_filesystem_image(self): |
| return True |
| |
| def from_assigned_task(self, assigned_task, **kwargs): |
| return self.FileSystemImageSandboxTest(safe_mkdtemp()) |
| |
| |
| class SlowSandbox(DirectorySandbox): |
| def __init__(self, *args, **kwargs): |
| super(SlowSandbox, self).__init__(*args, **kwargs) |
| self.is_initialized = lambda: False |
| self._init_start = threading.Event() |
| self._init_done = threading.Event() |
| |
| def create(self): |
| self._init_start.wait() |
| super(SlowSandbox, self).create() |
| self.is_initialized = lambda: True |
| self._init_done.set() |
| |
| |
| class SlowSandboxProvider(SandboxProvider): |
| def from_assigned_task(self, assigned_task, **kwargs): |
| return SlowSandbox(safe_mkdtemp(), **kwargs) |
| |
| |
| class ProxyDriver(object): |
| def __init__(self): |
| self.method_calls = defaultdict(list) |
| self._stop_event = threading.Event() |
| |
| def __getattr__(self, attr): |
| def enqueue_arguments(*args, **kw): |
| self.method_calls[attr].append((args, kw)) |
| return enqueue_arguments |
| |
| def stop(self, *args, **kw): |
| self.method_calls['stop'].append((args, kw)) |
| self._stop_event.set() |
| |
| def wait_stopped(self): |
| return self._stop_event.wait() |
| |
| |
| def make_task(thermos_config, assigned_ports={}, **kw): |
| role = getpass.getuser() |
| task_id = thermos_config.task().name().get() + '-001' |
| at = AssignedTask( |
| taskId=task_id, |
| task=TaskConfig( |
| executorConfig=ExecutorConfig( |
| name=AURORA_EXECUTOR_NAME, |
| data=thermos_config.json_dumps()), |
| job=JobKey(role=role, environment='env', name='name')), |
| assignedPorts=assigned_ports, |
| **kw) |
| td = mesos_pb2.TaskInfo() |
| td.task_id.value = task_id |
| td.name = thermos_config.task().name().get() |
| td.data = serialize(at) |
| return td |
| |
| |
| BASE_MTI = MesosTaskInstance(instance=0, role=getpass.getuser()) |
| BASE_TASK = Task(resources=Resources(cpu=1.0, ram=16 * MB, disk=32 * MB)) |
| |
| HELLO_WORLD_TASK_ID = 'hello_world-001' |
| HELLO_WORLD = BASE_TASK( |
| name='hello_world', |
| processes=[Process(name='hello_world_{{thermos.task_id}}', cmdline='echo hello world')]) |
| HELLO_WORLD_MTI = BASE_MTI(task=HELLO_WORLD) |
| |
| SLEEP60 = BASE_TASK(processes=[Process(name='sleep60', cmdline='sleep 60')]) |
| SLEEP2 = BASE_TASK(processes=[Process(name='sleep2', cmdline='sleep 2')]) |
| SLEEP60_MTI = BASE_MTI(task=SLEEP60) |
| |
| MESOS_JOB = MesosJob( |
| name='does_not_matter', |
| instances=1, |
| role=getpass.getuser(), |
| ) |
| |
| |
| def thermos_runner_path(build=True): |
| if not build: |
| return getattr(thermos_runner_path, 'value', None) |
| |
| if not hasattr(thermos_runner_path, 'value'): |
| pex_dir = safe_mkdtemp() |
| assert subprocess.call(["./pants", "--pants-distdir=%s" % pex_dir, "binary", |
| "src/main/python/apache/thermos/runner:thermos_runner"]) == 0 |
| thermos_runner_path.value = os.path.join(pex_dir, 'thermos_runner.pex') |
| return thermos_runner_path.value |
| |
| |
| def make_provider(checkpoint_root, runner_class=ThermosTaskRunner, mesos_containerizer_path=None): |
| return DefaultThermosTaskRunnerProvider( |
| pex_location=thermos_runner_path(), |
| checkpoint_root=checkpoint_root, |
| task_runner_class=runner_class, |
| mesos_containerizer_path=mesos_containerizer_path |
| ) |
| |
| |
| def make_executor( |
| proxy_driver, |
| checkpoint_root, |
| task, |
| ports={}, |
| fast_status=False, |
| runner_class=ThermosTaskRunner, |
| status_providers=[HealthCheckerProvider()], |
| assert_task_is_running=True, |
| stop_timeout_in_secs=120): |
| |
| status_manager_class = FastStatusManager if fast_status else StatusManager |
| runner_provider = make_provider(checkpoint_root, runner_class) |
| te = FastThermosExecutor( |
| runner_provider=runner_provider, |
| status_manager_class=status_manager_class, |
| sandbox_provider=DefaultTestSandboxProvider(), |
| status_providers=status_providers, |
| stop_timeout_in_secs=stop_timeout_in_secs |
| ) |
| |
| ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start() |
| task_description = make_task(task, assigned_ports=ports, instanceId=0) |
| te.launchTask(proxy_driver, task_description) |
| |
| te.status_manager_started.wait() |
| |
| while len(proxy_driver.method_calls['sendStatusUpdate']) < 2: |
| time.sleep(0.1) |
| |
| # make sure startup was kosher |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| status_updates = [arg_tuple[0][0] for arg_tuple in updates] |
| assert status_updates[0].state == mesos_pb2.TASK_STARTING |
| |
| runner = None |
| if assert_task_is_running: |
| assert status_updates[1].state == mesos_pb2.TASK_RUNNING |
| # wait for the runner to bind to a task |
| while True: |
| runner = TaskRunner.get(task_description.task_id.value, checkpoint_root) |
| if runner: |
| break |
| time.sleep(0.1) |
| |
| assert te.launched.is_set() |
| return runner, te |
| |
| |
| class UnhealthyHandler(BaseHTTPRequestHandler): |
| def do_GET(self): |
| self.send_response(200) |
| self.end_headers() |
| self.wfile.write('not ok') |
| |
| |
| class HealthyHandler(BaseHTTPRequestHandler): |
| def do_GET(self): |
| self.send_response(200) |
| self.end_headers() |
| self.wfile.write('ok') |
| |
| |
| class SignalServer(ExceptionalThread): |
| def __init__(self, handler): |
| self._server = HTTPServer(('', 0), handler) |
| super(SignalServer, self).__init__() |
| self.daemon = True |
| self._stop = threading.Event() |
| |
| def run(self): |
| while not self._stop.is_set(): |
| self._server.handle_request() |
| |
| def __enter__(self): |
| self.start() |
| return self._server.server_port |
| |
| def __exit__(self, exc_type, exc_val, traceback): |
| self._stop.set() |
| |
| |
| class TestThermosExecutor(object): |
| PANTS_BUILT = False |
| LOG_DIR = None |
| |
| @classmethod |
| def setup_class(cls): |
| cls.LOG_DIR = tempfile.mkdtemp() |
| LogOptions.set_log_dir(cls.LOG_DIR) |
| LogOptions.set_disk_log_level('DEBUG') |
| log.init('executor_logger') |
| |
| @classmethod |
| def teardown_class(cls): |
| if 'THERMOS_DEBUG' not in os.environ: |
| safe_rmtree(cls.LOG_DIR) |
| thermos_path = thermos_runner_path(build=False) |
| if thermos_path: |
| safe_rmtree(os.path.dirname(thermos_path)) |
| else: |
| print('Saving executor logs in %s' % cls.LOG_DIR) |
| thermos_path = thermos_runner_path(build=False) |
| if thermos_path: |
| print('Saved thermos executor at %s' % thermos_path) |
| |
| def test_basic(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as tempdir: |
| te = AuroraExecutor( |
| runner_provider=make_provider(tempdir), |
| sandbox_provider=DefaultTestSandboxProvider(), |
| status_providers=[HealthCheckerProvider()]) |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| te.terminated.wait() |
| tm = TaskMonitor(tempdir, task_id=HELLO_WORLD_TASK_ID) |
| runner_state = tm.get_state() |
| |
| assert 'hello_world_hello_world-001' in runner_state.processes, ( |
| 'Could not find processes, got: %s' % ' '.join(runner_state.processes)) |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 3 |
| status_updates = [arg_tuple[0][0] for arg_tuple in updates] |
| assert status_updates[0].state == mesos_pb2.TASK_STARTING |
| assert status_updates[1].state == mesos_pb2.TASK_RUNNING |
| assert status_updates[2].state == mesos_pb2.TASK_FINISHED |
| |
| def test_basic_as_job(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as tempdir: |
| te = AuroraExecutor( |
| runner_provider=make_provider(tempdir), |
| sandbox_provider=DefaultTestSandboxProvider(), |
| status_providers=[HealthCheckerProvider()]) |
| te.launchTask(proxy_driver, make_task(MESOS_JOB(task=HELLO_WORLD), instanceId=0)) |
| te.runner_started.wait() |
| while te._status_manager is None: |
| time.sleep(0.1) |
| te.terminated.wait() |
| tm = TaskMonitor(tempdir, task_id=HELLO_WORLD_TASK_ID) |
| runner_state = tm.get_state() |
| |
| assert 'hello_world_hello_world-001' in runner_state.processes, ( |
| 'Could not find processes, got: %s' % ' '.join(runner_state.processes)) |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 3 |
| status_updates = [arg_tuple[0][0] for arg_tuple in updates] |
| assert status_updates[0].state == mesos_pb2.TASK_STARTING |
| assert status_updates[1].state == mesos_pb2.TASK_RUNNING |
| assert status_updates[2].state == mesos_pb2.TASK_FINISHED |
| |
| def test_runner_disappears(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as checkpoint_root: |
| _, executor = make_executor(proxy_driver, checkpoint_root, SLEEP60_MTI, fast_status=True) |
| while executor._runner is None or executor._runner._popen is None or ( |
| executor._runner._popen.pid is None): |
| time.sleep(0.1) |
| log.info('test_thermos_executor killing runner.pid %s' % executor._runner._popen.pid) |
| os.kill(executor._runner._popen.pid, signal.SIGKILL) |
| executor.terminated.wait() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 3 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_KILLED |
| |
| def test_task_killed(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as checkpoint_root: |
| runner, executor = make_executor(proxy_driver, checkpoint_root, SLEEP60_MTI) |
| runner.kill(force=True, preemption_wait=Amount(1, Time.SECONDS)) |
| executor.terminated.wait() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 3 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_KILLED |
| |
| def test_killTask(self): # noqa |
| proxy_driver = ProxyDriver() |
| |
| class ProvidedThermosRunnerMatcher(object): |
| """Matcher that ensures a bound method 'stop' from 'ProvidedThermosTaskRunner' is called.""" |
| |
| def __eq__(self, other): |
| return (type(other.im_self).__name__ == 'ProvidedThermosTaskRunner' |
| and other.__name__ == 'stop') |
| |
| with contextlib.nested( |
| temporary_dir(), |
| mock.patch('apache.aurora.executor.aurora_executor.propagate_deadline', |
| wraps=propagate_deadline)) as (checkpoint_root, mock_propagate_deadline): |
| |
| _, executor = make_executor( |
| proxy_driver, |
| checkpoint_root, |
| SLEEP60_MTI, |
| stop_timeout_in_secs=123) |
| # send two, expect at most one delivered |
| executor.killTask(proxy_driver, mesos_pb2.TaskID(value='sleep60-001')) |
| executor.killTask(proxy_driver, mesos_pb2.TaskID(value='sleep60-001')) |
| executor.terminated.wait() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| |
| mock_propagate_deadline.assert_called_with( # Ensure 'stop' is called with custom timeout. |
| ProvidedThermosRunnerMatcher(), |
| timeout=Amount(123, Time.SECONDS)) |
| assert len(updates) == 3 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_KILLED |
| |
| def test_shutdown(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as checkpoint_root: |
| _, executor = make_executor(proxy_driver, checkpoint_root, SLEEP60_MTI) |
| executor.shutdown(proxy_driver) |
| executor.terminated.wait() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 3 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_KILLED |
| |
| def test_shutdown_order(self): |
| proxy_driver = ProxyDriver() |
| |
| with contextlib.nested( |
| temporary_dir(), |
| mock.patch.object(ChainedStatusChecker, 'stop'), |
| mock.patch.object(ThermosTaskRunner, 'stop')) as ( |
| checkpoint_root, |
| status_check_stop, |
| runner_stop): |
| |
| parent = mock.MagicMock() |
| parent.attach_mock(status_check_stop, 'status_check_stop') |
| parent.attach_mock(runner_stop, 'runner_stop') |
| |
| _, executor = make_executor(proxy_driver, |
| checkpoint_root, |
| SLEEP60_MTI) |
| executor.shutdown(proxy_driver) |
| executor.terminated.wait() |
| |
| parent.assert_has_calls( |
| [mock.call.status_check_stop(), mock.call.runner_stop()], |
| any_order=False) |
| |
| def test_task_health_failed(self): |
| proxy_driver = ProxyDriver() |
| with SignalServer(UnhealthyHandler) as port: |
| with temporary_dir() as checkpoint_root: |
| health_check_config = HealthCheckConfig(initial_interval_secs=0.1, interval_secs=0.1) |
| _, executor = make_executor( |
| proxy_driver, |
| checkpoint_root, |
| MESOS_JOB(task=SLEEP60, health_check_config=health_check_config), |
| ports={'health': port}, |
| fast_status=True, |
| status_providers=(HealthCheckerProvider(),), |
| assert_task_is_running=False) |
| executor.terminated.wait() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_task_health_ok(self): |
| proxy_driver = ProxyDriver() |
| with SignalServer(HealthyHandler) as port: |
| with temporary_dir() as checkpoint_root: |
| health_check_config = HealthCheckConfig(initial_interval_secs=0.1, interval_secs=0.1) |
| _, executor = make_executor(proxy_driver, |
| checkpoint_root, |
| MESOS_JOB(task=SLEEP2, health_check_config=health_check_config), |
| ports={'health': port}, |
| fast_status=True, |
| status_providers=(HealthCheckerProvider(),)) |
| executor.terminated.wait() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 3 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_FINISHED |
| |
| def test_failing_runner_start(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as td: |
| runner_provider = make_provider(td, FailingStartingTaskRunner) |
| te = FastThermosExecutor( |
| runner_provider=runner_provider, |
| sandbox_provider=DefaultTestSandboxProvider()) |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_failing_runner_initialize(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as td: |
| te = FastThermosExecutor( |
| runner_provider=make_provider(td), |
| sandbox_provider=FailingSandboxProvider()) |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_failing_runner_initialize_unknown_exception(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as td: |
| te = FastThermosExecutor( |
| runner_provider=make_provider(td), |
| sandbox_provider=FailingSandboxProvider(exception_type=Exception)) |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_slow_runner_initialize(self): |
| proxy_driver = ProxyDriver() |
| |
| task = make_task(HELLO_WORLD_MTI) |
| |
| with temporary_dir() as td: |
| te = FastThermosExecutor( |
| runner_provider=make_provider(td), |
| sandbox_provider=SlowSandboxProvider()) |
| te.SANDBOX_INITIALIZATION_TIMEOUT = Amount(1, Time.MILLISECONDS) |
| te.launchTask(proxy_driver, task) |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| te._sandbox._init_start.set() |
| |
| def test_killTask_during_runner_initialize(self): # noqa |
| proxy_driver = ProxyDriver() |
| |
| task = make_task(HELLO_WORLD_MTI) |
| |
| with temporary_dir() as td: |
| te = FastThermosExecutor( |
| runner_provider=make_provider(td), |
| sandbox_provider=SlowSandboxProvider()) |
| te.launchTask(proxy_driver, task) |
| te.sandbox_initialized.wait() |
| te.killTask(proxy_driver, mesos_pb2.TaskID(value=task.task_id.value)) |
| assert te.runner_aborted.is_set() |
| assert not te.sandbox_created.is_set() |
| |
| # we've simulated a "slow" initialization by blocking it until the killTask was sent - so now, |
| # trigger the initialization to complete |
| te._sandbox._init_start.set() |
| |
| # however, wait on the runner to definitely finish its initialization before continuing |
| # (otherwise, this function races ahead too fast) |
| te._sandbox._init_done.wait() |
| te.sandbox_created.wait() |
| assert te.sandbox_initialized.is_set() |
| assert te.sandbox_created.is_set() |
| |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[-1][0][0].state == mesos_pb2.TASK_KILLED |
| |
| def test_launchTask_deserialization_fail(self): # noqa |
| proxy_driver = ProxyDriver() |
| |
| role = getpass.getuser() |
| task_info = mesos_pb2.TaskInfo() |
| task_info.name = task_info.task_id.value = 'broken' |
| task_info.data = serialize( |
| AssignedTask( |
| task=TaskConfig( |
| job=JobKey(role=role, environment='env', name='name'), |
| executorConfig=ExecutorConfig(name=AURORA_EXECUTOR_NAME, data='garbage')))) |
| |
| te = FastThermosExecutor( |
| runner_provider=make_provider(safe_mkdtemp()), |
| sandbox_provider=DefaultTestSandboxProvider()) |
| te.launchTask(proxy_driver, task_info) |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[0][0][0].state == mesos_pb2.TASK_STARTING |
| assert updates[1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_filesystem_image_assign_no_containerizer(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as tempdir: |
| te = FastThermosExecutor( |
| runner_provider=make_provider(tempdir, mesos_containerizer_path=None), |
| sandbox_provider=FileSystemImageTestSandboxProvider()) |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| |
| te.SANDBOX_INITIALIZATION_TIMEOUT = Amount(1, Time.MILLISECONDS) |
| te.START_TIMEOUT = Amount(10, Time.MILLISECONDS) |
| te.STOP_TIMEOUT = Amount(10, Time.MILLISECONDS) |
| |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[0][0][0].state == mesos_pb2.TASK_STARTING |
| assert updates[1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_filesystem_image_assign_missing_containerizer(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as tempdir: |
| te = FastThermosExecutor( |
| runner_provider=make_provider(tempdir, mesos_containerizer_path='/doesnotexist'), |
| sandbox_provider=FileSystemImageTestSandboxProvider(), stop_timeout_in_secs=1) |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| |
| te.SANDBOX_INITIALIZATION_TIMEOUT = Amount(1, Time.MILLISECONDS) |
| te.START_TIMEOUT = Amount(10, Time.MILLISECONDS) |
| |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[0][0][0].state == mesos_pb2.TASK_STARTING |
| assert updates[1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| def test_filesystem_image_containerizer_not_executable(self): |
| proxy_driver = ProxyDriver() |
| |
| with temporary_dir() as tempdir: |
| |
| tempfile = os.path.join(tempdir, 'fake-containierizer') |
| with open(tempfile, 'a'): |
| os.utime(tempfile, None) |
| |
| te = FastThermosExecutor( |
| runner_provider=make_provider(tempdir, mesos_containerizer_path=tempfile), |
| sandbox_provider=FileSystemImageTestSandboxProvider(), stop_timeout_in_secs=1) |
| |
| te.SANDBOX_INITIALIZATION_TIMEOUT = Amount(1, Time.MILLISECONDS) |
| te.START_TIMEOUT = Amount(10, Time.MILLISECONDS) |
| |
| te.launchTask(proxy_driver, make_task(HELLO_WORLD_MTI)) |
| |
| proxy_driver.wait_stopped() |
| |
| updates = proxy_driver.method_calls['sendStatusUpdate'] |
| assert len(updates) == 2 |
| assert updates[0][0][0].state == mesos_pb2.TASK_STARTING |
| assert updates[1][0][0].state == mesos_pb2.TASK_FAILED |
| |
| |
| def test_waiting_executor(): |
| proxy_driver = ProxyDriver() |
| with temporary_dir() as checkpoint_root: |
| te = AuroraExecutor( |
| runner_provider=make_provider(checkpoint_root), |
| sandbox_provider=DefaultTestSandboxProvider()) |
| ExecutorTimeout(te.launched, proxy_driver, timeout=Amount(100, Time.MILLISECONDS)).start() |
| proxy_driver.wait_stopped() |