blob: fb32caf025026b5a21b6d16fae3ce823e13d51df [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import unittest
import mock
from mock import create_autospec
from apache.aurora.client.api.job_monitor import JobMonitor
from apache.aurora.common.aurora_job_key import AuroraJobKey
from ...api_util import SchedulerProxyApiSpec
from gen.apache.aurora.api.ttypes import (
AssignedTask,
JobKey,
Response,
ResponseCode,
ResponseDetail,
Result,
ScheduledTask,
ScheduleStatus,
ScheduleStatusResult,
TaskEvent,
TaskQuery
)
class FakeEvent(object):
def __init__(self):
self._is_set = False
def wait(self, seconds):
pass
def is_set(self):
return self._is_set
def set(self):
self._is_set = True
class JobMonitorTest(unittest.TestCase):
def setUp(self):
self._scheduler = create_autospec(spec=SchedulerProxyApiSpec, instance=True)
self._job_key = AuroraJobKey('cl', 'johndoe', 'test', 'test_job')
self._event = FakeEvent()
def create_task(self, status, id):
return ScheduledTask(
assignedTask=AssignedTask(
instanceId=id,
taskId=id),
status=status,
taskEvents=[TaskEvent(
status=status,
timestamp=10)]
)
def mock_get_tasks(self, tasks, response_code=None):
response_code = ResponseCode.OK if response_code is None else response_code
resp = Response(responseCode=response_code, details=[ResponseDetail(message='test')])
resp.result = Result(scheduleStatusResult=ScheduleStatusResult(tasks=tasks))
self._scheduler.getTasksWithoutConfigs.return_value = resp
def expect_task_status(self, once=False, instances=None):
query = TaskQuery(jobKeys=[
JobKey(role=self._job_key.role, environment=self._job_key.env, name=self._job_key.name)])
if instances is not None:
query.instanceIds = frozenset([int(s) for s in instances])
if once:
self._scheduler.getTasksWithoutConfigs.assert_called_once_with(query, retry=False)
else:
self._scheduler.getTasksWithoutConfigs.assert_called_with(query, retry=False)
def test_wait_until_state(self):
self.mock_get_tasks([
self.create_task(ScheduleStatus.RUNNING, '1'),
self.create_task(ScheduleStatus.RUNNING, '2'),
self.create_task(ScheduleStatus.FAILED, '3'),
])
monitor = JobMonitor(self._scheduler, self._job_key)
assert monitor.wait_until(monitor.running_or_finished)
self.expect_task_status(once=True)
def test_empty_job_succeeds(self):
self.mock_get_tasks([])
monitor = JobMonitor(self._scheduler, self._job_key)
assert monitor.wait_until(monitor.running_or_finished)
self.expect_task_status(once=True)
def test_wait_with_instances(self):
self.mock_get_tasks([
self.create_task(ScheduleStatus.FAILED, '2'),
])
monitor = JobMonitor(self._scheduler, self._job_key)
assert monitor.wait_until(monitor.terminal, instances=[2])
self.expect_task_status(once=True, instances=[2])
def test_wait_until_timeout(self):
self.mock_get_tasks([
self.create_task(ScheduleStatus.RUNNING, '1'),
self.create_task(ScheduleStatus.RUNNING, '2'),
self.create_task(ScheduleStatus.RUNNING, '3'),
])
monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=self._event)
assert not monitor.wait_until(monitor.terminal, with_timeout=True)
self.expect_task_status()
def test_terminated_exits_immediately(self):
self._event.set()
monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=self._event)
assert monitor.wait_until(monitor.terminal)
def test_terminate(self):
mock_event = mock.Mock()
monitor = JobMonitor(self._scheduler, self._job_key, terminating_event=mock_event)
monitor.terminate()
mock_event.set.assert_called_once_with()