blob: 2ab5b5308b23be138f03cdc79d612855799c40ea [file] [log] [blame]
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import contextlib
from mock import call, patch
from twitter.common.contextutil import temporary_file
from apache.aurora.client.base import get_job_page
from apache.aurora.client.cli import (
EXIT_API_ERROR,
EXIT_COMMAND_FAILURE,
EXIT_INVALID_CONFIGURATION,
EXIT_OK
)
from apache.aurora.client.cli.client import AuroraCommandLine
from apache.aurora.config import AuroraConfig
from .util import AuroraClientCommandTest, FakeAuroraCommandContext
from gen.apache.aurora.api.ttypes import GetJobsResult, JobConfiguration, JobKey, Result
class TestCronNoun(AuroraClientCommandTest):
def test_successful_schedule(self):
mock_context = FakeAuroraCommandContext()
with contextlib.nested(
patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context)):
api = mock_context.get_api('west')
api.schedule_cron.return_value = self.create_simple_success_response()
api.get_tier_configs.return_value = self.get_mock_tier_configurations()
with temporary_file() as fp:
fp.write(self.get_valid_cron_config())
fp.flush()
cmd = AuroraCommandLine()
cmd.execute(['cron', 'schedule', self.TEST_JOBSPEC, fp.name])
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
assert api.schedule_cron.call_count == 1
assert isinstance(api.schedule_cron.call_args[0][0], AuroraConfig)
# The last text printed out to the user should contain a url to the job
assert get_job_page(api, self.TEST_JOBKEY) in mock_context.out[-1]
def test_schedule_failed(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api('west')
api.schedule_cron.return_value = self.create_error_response()
api.get_tier_configs.return_value = self.get_mock_tier_configurations()
with temporary_file() as fp:
fp.write(self.get_valid_cron_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'schedule', 'west/bozo/test/hello', fp.name])
assert result == EXIT_API_ERROR
# Now check that the right API calls got made.
# Check that create_job was called exactly once, with an AuroraConfig parameter.
assert api.schedule_cron.call_count == 1
assert isinstance(api.schedule_cron.call_args[0][0], AuroraConfig)
def test_schedule_failed_non_cron(self):
mock_context = FakeAuroraCommandContext()
api = mock_context.get_api('west')
api.get_tier_configs.return_value = self.get_mock_tier_configurations()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
with temporary_file() as fp:
fp.write(self.get_valid_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'schedule', 'west/bozo/test/hello', fp.name])
assert result == EXIT_COMMAND_FAILURE
def test_schedule_cron_failed_invalid_config(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
with temporary_file() as fp:
fp.write(self.get_invalid_cron_config('invalid_clause=oops'))
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'schedule', 'west/bozo/test/hello', fp.name])
assert result == EXIT_INVALID_CONFIGURATION
# Now check that the right API calls got made.
# Check that create_job was not called.
api = mock_context.get_api('west')
assert api.schedule_cron.call_count == 0
def test_schedule_cron_deep_api(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api("west")
api.schedule_cron.return_value = self.create_simple_success_response()
api.get_tier_configs.return_value = self.get_mock_tier_configurations()
with temporary_file() as fp:
fp.write(self.get_valid_cron_config())
fp.flush()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'schedule', 'west/bozo/test/hello', fp.name])
assert result == EXIT_OK
assert api.schedule_cron.call_count == 1
config = api.schedule_cron.call_args[0][0]
assert config.role() == "bozo"
assert config.environment() == "test"
assert config.name() == "hello"
def test_deschedule_cron_deep_api(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api("west")
api.deschedule_cron.return_value = self.create_simple_success_response()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'deschedule', self.TEST_JOBSPEC])
assert result == EXIT_OK
api.deschedule_cron.assert_called_once_with(self.TEST_JOBKEY)
def test_start_cron(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api("west")
api.start_cronjob.return_value = self.create_simple_success_response()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'start', 'west/bozo/test/hello'])
assert result == EXIT_OK
api.start_cronjob.assert_called_once_with(self.TEST_JOBKEY, config=None)
def test_start_cron_open_browser(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api("west")
api.start_cronjob.return_value = self.create_simple_success_response()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'start', self.TEST_JOBSPEC, '--open-browser'])
assert result == EXIT_OK
api.start_cronjob.assert_called_once_with(self.TEST_JOBKEY, config=None)
assert self.mock_webbrowser.mock_calls == [
call("http://something_or_other/scheduler/bozo/test/hello")
]
@classmethod
def _create_getjobs_response(cls):
response = cls.create_simple_success_response()
response.result = Result(getJobsResult=GetJobsResult(configs=[
JobConfiguration(
cronSchedule='* * * * *',
key=JobKey(role='bozo', environment='test', name='hello'))
]))
return response
def test_cron_status(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api("west")
api.get_jobs.return_value = self._create_getjobs_response()
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'show', 'west/bozo/test/hello'])
assert result == EXIT_OK
api.get_jobs.assert_called_once_with("bozo")
assert mock_context.get_out_str() == "west/bozo/test/hello\t * * * * *"
def test_cron_status_multiple_jobs(self):
mock_context = FakeAuroraCommandContext()
with patch('apache.aurora.client.cli.cron.CronNoun.create_context', return_value=mock_context):
api = mock_context.get_api("west")
response = self.create_simple_success_response()
response.result = Result(getJobsResult=GetJobsResult(configs=[
JobConfiguration(
key=JobKey(role='bozo', environment='test', name='hello'),
cronSchedule='* * * * *'),
JobConfiguration(
key=JobKey(role='bozo', environment='test', name='hello2'),
cronSchedule='* * * * *')
]))
api.get_jobs.return_value = response
cmd = AuroraCommandLine()
result = cmd.execute(['cron', 'show', 'west/bozo/test/hello'])
assert result == EXIT_OK
api.get_jobs.assert_called_once_with("bozo")
assert mock_context.get_out_str() == "west/bozo/test/hello\t * * * * *"