| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| from collections import namedtuple |
| |
| import requests |
| import pytest |
| |
| from aria.modeling import models |
| from aria.orchestrator import exceptions |
| from aria.orchestrator.execution_plugin import common |
| |
| |
| class TestDownloadScript(object): |
| |
| @pytest.fixture(autouse=True) |
| def patch_requests(self, mocker): |
| def _mock_requests_get(url): |
| response = namedtuple('Response', 'text status_code') |
| return response(url, self.status_code) |
| self.status_code = 200 |
| mocker.patch.object(requests, 'get', _mock_requests_get) |
| |
| def _test_url(self, url): |
| class Ctx(object): |
| task = models.Task |
| |
| script_path = url |
| result = common.download_script(Ctx, script_path) |
| with open(result) as f: |
| assert script_path == f.read() |
| assert result.endswith('-some_script.py') |
| |
| def test_http_url(self): |
| self._test_url('http://localhost/some_script.py') |
| |
| def test_https_url(self): |
| self._test_url('https://localhost/some_script.py') |
| |
| def test_url_status_code_404(self): |
| self.status_code = 404 |
| with pytest.raises(exceptions.TaskAbortException) as exc_ctx: |
| self.test_http_url() |
| exception = exc_ctx.value |
| assert 'status code: 404' in str(exception) |
| |
| def test_blueprint_resource(self): |
| test_script_path = 'my_script.py' |
| |
| class Ctx(object): |
| @staticmethod |
| def download_resource(destination, path): |
| assert path == test_script_path |
| return destination |
| result = common.download_script(Ctx, test_script_path) |
| assert result.endswith(test_script_path) |
| |
| |
| class TestCreateProcessConfig(object): |
| |
| def test_plain_command(self): |
| script_path = 'path' |
| process = common.create_process_config( |
| script_path=script_path, |
| process={}, |
| operation_kwargs={}) |
| assert process['command'] == script_path |
| |
| def test_command_with_args(self): |
| script_path = 'path' |
| process = {'args': [1, 2, 3]} |
| process = common.create_process_config( |
| script_path=script_path, |
| process=process, |
| operation_kwargs={}) |
| assert process['command'] == '{0} 1 2 3'.format(script_path) |
| |
| def test_command_prefix(self): |
| script_path = 'path' |
| command_prefix = 'prefix' |
| process = {'command_prefix': command_prefix} |
| process = common.create_process_config( |
| script_path=script_path, |
| process=process, |
| operation_kwargs={}) |
| assert process['command'] == '{0} {1}'.format(command_prefix, script_path) |
| |
| def test_command_with_args_and_prefix(self): |
| script_path = 'path' |
| command_prefix = 'prefix' |
| process = {'command_prefix': command_prefix, |
| 'args': [1, 2, 3]} |
| process = common.create_process_config( |
| script_path=script_path, |
| process=process, |
| operation_kwargs={}) |
| assert process['command'] == '{0} {1} 1 2 3'.format(command_prefix, script_path) |
| |
| def test_ctx_is_removed(self): |
| process = common.create_process_config( |
| script_path='', |
| process={}, |
| operation_kwargs={'ctx': 1}) |
| assert 'ctx' not in process['env'] |
| |
| def test_env_passed_explicitly(self): |
| env = {'one': '1', 'two': '2'} |
| process = common.create_process_config( |
| script_path='', |
| process={'env': env}, |
| operation_kwargs={}) |
| assert process['env'] == env |
| |
| def test_env_populated_from_operation_kwargs(self): |
| operation_kwargs = {'one': '1', 'two': '2'} |
| process = common.create_process_config( |
| script_path='', |
| process={}, |
| operation_kwargs=operation_kwargs) |
| assert process['env'] == operation_kwargs |
| |
| def test_env_merged_from_operation_kwargs_and_process(self): |
| operation_kwargs = {'one': '1', 'two': '2'} |
| env = {'three': '3', 'four': '4'} |
| process = common.create_process_config( |
| script_path='', |
| process={'env': env}, |
| operation_kwargs=operation_kwargs) |
| assert process['env'] == dict(operation_kwargs.items() + env.items()) |
| |
| def test_process_env_gets_precedence_over_operation_kwargs(self): |
| operation_kwargs = {'one': 'from_kwargs'} |
| env = {'one': 'from_env_process'} |
| process = common.create_process_config( |
| script_path='', |
| process={'env': env}, |
| operation_kwargs=operation_kwargs) |
| assert process['env'] == env |
| |
| def test_json_env_vars(self, mocker): |
| mocker.patch.object(common, 'is_windows', lambda: False) |
| operation_kwargs = {'a_dict': {'key': 'value'}, |
| 'a_list': ['a', 'b', 'c'], |
| 'a_tuple': (4, 5, 6), |
| 'a_bool': True} |
| process = common.create_process_config( |
| script_path='', |
| process={}, |
| operation_kwargs=operation_kwargs) |
| assert process['env'] == {'a_dict': '{"key": "value"}', |
| 'a_list': '["a", "b", "c"]', |
| 'a_tuple': '[4, 5, 6]', |
| 'a_bool': 'true'} |
| |
| def test_quote_json_env_vars(self): |
| operation_kwargs = {'one': []} |
| process = common.create_process_config( |
| script_path='', |
| process={}, |
| operation_kwargs=operation_kwargs, |
| quote_json_env_vars=True) |
| assert process['env']['one'] == "'[]'" |
| |
| def test_env_keys_converted_to_string_on_windows(self, mocker): |
| mocker.patch.object(common, 'is_windows', lambda: True) |
| env = {u'one': '1'} |
| process = common.create_process_config( |
| script_path='', |
| process={'env': env}, |
| operation_kwargs={}) |
| print type(process['env'].keys()[0]) |
| assert isinstance(process['env'].keys()[0], str) |
| |
| def test_env_values_quotes_are_escaped_on_windows(self, mocker): |
| mocker.patch.object(common, 'is_windows', lambda: True) |
| env = {'one': '"hello"'} |
| process = common.create_process_config( |
| script_path='', |
| process={'env': env}, |
| operation_kwargs={}) |
| assert process['env']['one'] == '\\"hello\\"' |