| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| # |
| |
| import textwrap |
| import unittest |
| |
| from mock import Mock, create_autospec, patch |
| |
| from apache.aurora.client.cli.context import AuroraCommandContext |
| from apache.aurora.client.hooks.hooked_api import HookedAuroraClientAPI |
| from apache.aurora.common.aurora_job_key import AuroraJobKey |
| from apache.aurora.common.cluster import Cluster |
| from apache.aurora.common.clusters import CLUSTERS, Clusters |
| |
| from ...api_util import SchedulerProxyApiSpec, SchedulerThriftApiSpec |
| |
| from gen.apache.aurora.api.constants import ACTIVE_STATES |
| from gen.apache.aurora.api.ttypes import ( |
| AssignedTask, |
| ExecutorConfig, |
| GetTierConfigResult, |
| JobKey, |
| JobUpdateSummary, |
| Resource, |
| Response, |
| ResponseCode, |
| ResponseDetail, |
| Result, |
| ScheduledTask, |
| ScheduleStatus, |
| ScheduleStatusResult, |
| StartJobUpdateResult, |
| TaskConfig, |
| TaskEvent, |
| TaskQuery, |
| TierConfig |
| ) |
| |
| |
| def mock_verb_options(verb): |
| # Handle default values opt.kwargs.get('default') |
| def opt_name(opt): |
| return opt.name.lstrip('--').replace('-', '_') |
| |
| def name_or_dest(opt): |
| """Prefers 'dest' if available otherwise defaults to name.""" |
| return opt.kwargs.get('dest') if 'dest' in opt.kwargs else opt_name(opt) |
| |
| options = Mock( |
| spec_set=[name_or_dest(opt) for opt in verb.get_options()] |
| ) |
| |
| # Apply default values to options. |
| for opt in verb.get_options(): |
| if 'default' in opt.kwargs: |
| setattr( |
| options, |
| name_or_dest(opt), |
| opt.kwargs.get('default')) |
| return options |
| |
| |
| class FakeAuroraCommandContext(AuroraCommandContext): |
| def __init__(self): |
| super(FakeAuroraCommandContext, self).__init__() |
| self.status = [] |
| self.fake_api = self.create_mock_api() |
| self.task_result = [] |
| self.out = [] |
| self.err = [] |
| self.config = None |
| |
| def get_api(self, cluster): |
| return self.fake_api |
| |
| @classmethod |
| def create_mock_api(cls): |
| """Builds up a mock API object, with a mock SchedulerProxy. |
| Returns the API and the proxy""" |
| mock_scheduler_proxy = create_autospec(spec=SchedulerProxyApiSpec, instance=True) |
| mock_scheduler_proxy.url = "http://something_or_other" |
| mock_scheduler_proxy.scheduler_client.return_value = mock_scheduler_proxy |
| mock_api = create_autospec(spec=HookedAuroraClientAPI) |
| mock_api.scheduler_proxy = mock_scheduler_proxy |
| return mock_api |
| |
| def print_out(self, msg, indent=0): |
| indent_str = " " * indent |
| self.out.append("%s%s" % (indent_str, msg)) |
| |
| def print_err(self, msg, indent=0): |
| indent_str = " " * indent |
| self.err.append("%s%s" % (indent_str, msg)) |
| |
| def get_job_config(self, jobkey, config_file): |
| if not self.config: |
| return super(FakeAuroraCommandContext, self).get_job_config(jobkey, config_file) |
| else: |
| return self.config |
| |
| def get_out(self): |
| return self.out |
| |
| def get_out_str(self): |
| return '\n'.join(self.out) |
| |
| def get_err(self): |
| return self.err |
| |
| def add_expected_status_query_result(self, expected_result): |
| self.add_task_result(expected_result) |
| self.fake_api.check_status.side_effect = self.task_result |
| |
| def add_expected_query_result(self, expected_result, job_key=None): |
| self.add_task_result(expected_result) |
| self.fake_api.query_no_configs.side_effect = self.task_result |
| if job_key: |
| self.fake_api.build_query.return_value = TaskQuery( |
| jobKeys=[job_key.to_thrift()], statuses=ACTIVE_STATES) |
| |
| def add_task_result(self, expected_result): |
| self.task_result.append(expected_result) |
| # each call adds an expected query result, in order. |
| self.fake_api.scheduler_proxy.getTasksWithoutConfigs.side_effect = self.task_result |
| |
| def add_config(self, config): |
| self.config = config |
| |
| |
| class AuroraClientCommandTest(unittest.TestCase): |
| FAKE_TIME = 42131 |
| |
| def setUp(self): |
| patcher = patch('webbrowser.open_new_tab') |
| self.mock_webbrowser = patcher.start() |
| self.addCleanup(patcher.stop) |
| |
| def run(self, result=None): |
| # Since CLUSTERS is a global value that evaluates code on import this is the best way to |
| # ensure it does not pollute any tests. |
| with CLUSTERS.patch(self.TEST_CLUSTERS._clusters.values()): |
| super(AuroraClientCommandTest, self).run(result) |
| |
| @classmethod |
| def create_blank_response(cls, code, msg): |
| return Response(responseCode=code, details=[ResponseDetail(message=msg)]) |
| |
| @classmethod |
| def create_simple_success_response(cls): |
| return cls.create_blank_response(ResponseCode.OK, 'OK') |
| |
| @classmethod |
| def create_error_response(cls): |
| return cls.create_blank_response(ResponseCode.ERROR, 'Whoops') |
| |
| @classmethod |
| def create_mock_api(cls): |
| """Builds up a mock API object, with a mock SchedulerProxy""" |
| mock_scheduler = create_autospec(spec=SchedulerThriftApiSpec, instance=True) |
| mock_scheduler.url = "http://something_or_other" |
| mock_scheduler_client = create_autospec(spec=SchedulerProxyApiSpec, instance=True) |
| mock_scheduler_client.url = "http://something_or_other" |
| mock_api = create_autospec(spec=HookedAuroraClientAPI, instance=True) |
| mock_api.scheduler_proxy = mock_scheduler_client |
| return mock_api, mock_scheduler_client |
| |
| @classmethod |
| def create_mock_api_factory(cls): |
| """Create a collection of mocks for a test that wants to mock out the client API |
| by patching the api factory.""" |
| mock_api, mock_scheduler_client = cls.create_mock_api() |
| mock_api_factory = lambda: mock_api |
| return mock_api_factory, mock_scheduler_client |
| |
| @classmethod |
| def create_query_call_result(cls, task=None): |
| status_response = cls.create_empty_task_result() |
| if task is None: |
| for i in range(20): |
| status_response.result.scheduleStatusResult.tasks.append(cls.create_scheduled_task(i)) |
| else: |
| status_response.result.scheduleStatusResult.tasks.append(task) |
| return status_response |
| |
| @classmethod |
| def create_start_job_update_result(cls, code, msg, key, metadata): |
| resp = cls.create_blank_response(code, msg) |
| resp.result = Result( |
| startJobUpdateResult=StartJobUpdateResult(key=key, updateSummary=JobUpdateSummary( |
| metadata=metadata))) |
| return resp |
| |
| @classmethod |
| def create_empty_task_result(cls): |
| status_response = cls.create_simple_success_response() |
| status_response.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=[])) |
| return status_response |
| |
| @classmethod |
| def create_scheduled_task(cls, instance_id, status=ScheduleStatus.RUNNING, |
| task_id=None, initial_time=None): |
| task = ScheduledTask( |
| status=status, |
| assignedTask=AssignedTask( |
| instanceId=instance_id, |
| taskId=task_id or "Task%s" % instance_id, |
| slaveId="Slave%s" % instance_id, |
| slaveHost="Slave%s" % instance_id, |
| task=TaskConfig()), |
| taskEvents=[TaskEvent(timestamp=initial_time or 1000)]) |
| return task |
| |
| @classmethod |
| def create_task_config(cls, name): |
| return TaskConfig( |
| maxTaskFailures=1, |
| executorConfig=ExecutorConfig(data='{"fake": "data"}'), |
| metadata=[], |
| job=JobKey(role=cls.TEST_ROLE, environment=cls.TEST_ENV, name=name), |
| resources=frozenset( |
| [Resource(numCpus=2), |
| Resource(ramMb=2), |
| Resource(diskMb=2)])) |
| |
| @classmethod |
| def create_scheduled_tasks(cls): |
| tasks = [] |
| for name in ['foo', 'bar', 'baz']: |
| task = ScheduledTask( |
| failureCount=0, |
| assignedTask=AssignedTask( |
| taskId=1287391823, |
| slaveHost='slavehost', |
| task=cls.create_task_config(name), |
| instanceId=4237894, |
| assignedPorts={}), |
| status=ScheduleStatus.RUNNING, |
| taskEvents=[TaskEvent( |
| timestamp=28234726395, |
| status=ScheduleStatus.RUNNING, |
| message="Hi there")]) |
| |
| tasks.append(task) |
| return tasks |
| |
| @classmethod |
| def setup_get_tasks_status_calls(cls, scheduler): |
| status_response = cls.create_query_call_result() |
| scheduler.getTasksWithoutConfigs.return_value = status_response |
| |
| @classmethod |
| def fake_time(cls, ignored): |
| """Utility function used for faking time to speed up tests.""" |
| cls.FAKE_TIME += 2 |
| return cls.FAKE_TIME |
| |
| CONFIG_BASE = """ |
| HELLO_WORLD = Job( |
| name = '%(job)s', |
| role = '%(role)s', |
| cluster = '%(cluster)s', |
| environment = '%(env)s', |
| instances = 20, |
| %(inner)s |
| update_config = UpdateConfig( |
| batch_size = 1, |
| watch_secs = 45, |
| max_per_shard_failures = 2, |
| ), |
| task = Task( |
| name = 'test', |
| processes = [Process(name = 'hello_world', cmdline = 'echo {{thermos.ports[http]}}')], |
| resources = Resources(cpu = 0.1, ram = 64 * MB, disk = 64 * MB), |
| ) |
| ) |
| jobs = [HELLO_WORLD] |
| """ |
| |
| CRON_CONFIG_BASE = """ |
| HELLO_WORLD = Job( |
| name = '%(job)s', |
| role = '%(role)s', |
| cluster = '%(cluster)s', |
| environment = '%(env)s', |
| cron_schedule = '*/5 * * * *', |
| %(inner)s |
| task = SimpleTask('test', 'echo test') |
| ) |
| jobs = [HELLO_WORLD] |
| """ |
| |
| UNBOUND_CONFIG = textwrap.dedent("""\ |
| HELLO_WORLD = Job( |
| name = '%(job)s', |
| role = '%(role)s', |
| cluster = '{{cluster_binding}}', |
| environment = '%(env)s', |
| instances = '{{instances_binding}}', |
| update_config = UpdateConfig( |
| batch_size = "{{TEST_BATCH}}", |
| watch_secs = 45, |
| max_per_shard_failures = 2, |
| ), |
| task = Task( |
| name = 'test', |
| processes = [Process( |
| name = 'hello_world', |
| cmdline = 'echo {{thermos.ports[http]}} {{flags_binding}}' |
| )], |
| resources = Resources(cpu = 0.1, ram = 64 * MB, disk = 64 * MB), |
| ) |
| ) |
| jobs = [HELLO_WORLD] |
| """) |
| |
| TEST_ROLE = 'bozo' |
| |
| TEST_ENV = 'test' |
| |
| TEST_JOB = 'hello' |
| |
| TEST_CLUSTER = 'west' |
| |
| TEST_JOBSPEC = 'west/bozo/test/hello' |
| |
| TEST_JOBKEY = AuroraJobKey('west', 'bozo', 'test', 'hello') |
| |
| TEST_CLUSTERS = Clusters([Cluster( |
| name=TEST_CLUSTER, |
| zk='zookeeper.example.com', |
| scheduler_zk_path='/foo/bar', |
| auth_mechanism='UNAUTHENTICATED')]) |
| |
| @classmethod |
| def get_instance_spec(cls, instances_spec): |
| """Create a job instance spec string""" |
| return '%s/%s' % (cls.TEST_JOBSPEC, instances_spec) |
| |
| @classmethod |
| def get_test_config(cls, base, cluster, role, env, job, inner=''): |
| """Create a config from the template""" |
| return base % {'job': job, 'role': role, 'env': env, 'cluster': cluster, 'inner': inner} |
| |
| @classmethod |
| def get_unbound_test_config(cls, role=None, env=None, job=None): |
| result = cls.UNBOUND_CONFIG % {'job': job or cls.TEST_JOB, 'role': role or cls.TEST_ROLE, |
| 'env': env or cls.TEST_ENV} |
| return result |
| |
| @classmethod |
| def get_valid_config(cls): |
| return cls.get_test_config( |
| cls.CONFIG_BASE, |
| cls.TEST_CLUSTER, |
| cls.TEST_ROLE, |
| cls.TEST_ENV, |
| cls.TEST_JOB) |
| |
| @classmethod |
| def get_valid_cron_config(cls): |
| return cls.get_test_config( |
| cls.CRON_CONFIG_BASE, |
| cls.TEST_CLUSTER, |
| cls.TEST_ROLE, |
| cls.TEST_ENV, |
| cls.TEST_JOB) |
| |
| @classmethod |
| def get_invalid_config(cls, bad_clause): |
| return cls.get_test_config( |
| cls.CONFIG_BASE, |
| cls.TEST_CLUSTER, |
| cls.TEST_ROLE, |
| cls.TEST_ENV, |
| cls.TEST_JOB, |
| bad_clause) |
| |
| @classmethod |
| def get_invalid_cron_config(cls, bad_clause): |
| return cls.get_test_config( |
| cls.CRON_CONFIG_BASE, |
| cls.TEST_CLUSTER, |
| cls.TEST_ROLE, |
| cls.TEST_ENV, |
| cls.TEST_JOB, |
| bad_clause) |
| |
| @classmethod |
| def assert_lock_message(cls, context): |
| assert [line for line in context.get_err() if line == "\t%s" % |
| context.JOB_UPDATING_ERROR_MSG] |
| |
| PREFERRED_TIER = TierConfig( |
| name='preferred', |
| settings={'preemptible': 'false', 'revocable': 'false'} |
| ) |
| |
| PREEMPTIBLE_TIER = TierConfig( |
| name='preemptible', |
| settings={'preemptible': 'true', 'revocable': 'false'} |
| ) |
| |
| REVOCABLE_TIER = TierConfig( |
| name='revocable', |
| settings={'preemptible': 'true', 'revocable': 'true'} |
| ) |
| |
| @classmethod |
| def get_mock_tier_configurations(cls): |
| response = cls.create_simple_success_response() |
| response.result = Result(getTierConfigResult=GetTierConfigResult( |
| defaultTierName=cls.PREEMPTIBLE_TIER.name, |
| tiers=frozenset([cls.PREFERRED_TIER, cls.PREEMPTIBLE_TIER, cls.REVOCABLE_TIER]) |
| )) |
| return response |
| |
| |
| class IOMock(object): |
| def __init__(self): |
| self.out = [] |
| |
| def put(self, s): |
| self.out.append(s) |
| |
| def get(self): |
| return self.out |