blob: dd1e9fb6d980878291d1f76d36f5c39a12f4b091 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from collections import namedtuple
import requests
import pytest
from aria.modeling import models
from aria.orchestrator import exceptions
from aria.orchestrator.execution_plugin import common
class TestDownloadScript(object):
@pytest.fixture(autouse=True)
def patch_requests(self, mocker):
def _mock_requests_get(url):
response = namedtuple('Response', 'text status_code')
return response(url, self.status_code)
self.status_code = 200
mocker.patch.object(requests, 'get', _mock_requests_get)
def _test_url(self, url):
class Ctx(object):
task = models.Task
script_path = url
result = common.download_script(Ctx, script_path)
with open(result) as f:
assert script_path == f.read()
assert result.endswith('-some_script.py')
def test_http_url(self):
self._test_url('http://localhost/some_script.py')
def test_https_url(self):
self._test_url('https://localhost/some_script.py')
def test_url_status_code_404(self):
self.status_code = 404
with pytest.raises(exceptions.TaskAbortException) as exc_ctx:
self.test_http_url()
exception = exc_ctx.value
assert 'status code: 404' in str(exception)
def test_blueprint_resource(self):
test_script_path = 'my_script.py'
class Ctx(object):
@staticmethod
def download_resource(destination, path):
assert path == test_script_path
return destination
result = common.download_script(Ctx, test_script_path)
assert result.endswith(test_script_path)
class TestCreateProcessConfig(object):
def test_plain_command(self):
script_path = 'path'
process = common.create_process_config(
script_path=script_path,
process={},
operation_kwargs={})
assert process['command'] == script_path
def test_command_with_args(self):
script_path = 'path'
process = {'args': [1, 2, 3]}
process = common.create_process_config(
script_path=script_path,
process=process,
operation_kwargs={})
assert process['command'] == '{0} 1 2 3'.format(script_path)
def test_command_prefix(self):
script_path = 'path'
command_prefix = 'prefix'
process = {'command_prefix': command_prefix}
process = common.create_process_config(
script_path=script_path,
process=process,
operation_kwargs={})
assert process['command'] == '{0} {1}'.format(command_prefix, script_path)
def test_command_with_args_and_prefix(self):
script_path = 'path'
command_prefix = 'prefix'
process = {'command_prefix': command_prefix,
'args': [1, 2, 3]}
process = common.create_process_config(
script_path=script_path,
process=process,
operation_kwargs={})
assert process['command'] == '{0} {1} 1 2 3'.format(command_prefix, script_path)
def test_ctx_is_removed(self):
process = common.create_process_config(
script_path='',
process={},
operation_kwargs={'ctx': 1})
assert 'ctx' not in process['env']
def test_env_passed_explicitly(self):
env = {'one': '1', 'two': '2'}
process = common.create_process_config(
script_path='',
process={'env': env},
operation_kwargs={})
assert process['env'] == env
def test_env_populated_from_operation_kwargs(self):
operation_kwargs = {'one': '1', 'two': '2'}
process = common.create_process_config(
script_path='',
process={},
operation_kwargs=operation_kwargs)
assert process['env'] == operation_kwargs
def test_env_merged_from_operation_kwargs_and_process(self):
operation_kwargs = {'one': '1', 'two': '2'}
env = {'three': '3', 'four': '4'}
process = common.create_process_config(
script_path='',
process={'env': env},
operation_kwargs=operation_kwargs)
assert process['env'] == dict(operation_kwargs.items() + env.items())
def test_process_env_gets_precedence_over_operation_kwargs(self):
operation_kwargs = {'one': 'from_kwargs'}
env = {'one': 'from_env_process'}
process = common.create_process_config(
script_path='',
process={'env': env},
operation_kwargs=operation_kwargs)
assert process['env'] == env
def test_json_env_vars(self, mocker):
mocker.patch.object(common, 'is_windows', lambda: False)
operation_kwargs = {'a_dict': {'key': 'value'},
'a_list': ['a', 'b', 'c'],
'a_tuple': (4, 5, 6),
'a_bool': True}
process = common.create_process_config(
script_path='',
process={},
operation_kwargs=operation_kwargs)
assert process['env'] == {'a_dict': '{"key": "value"}',
'a_list': '["a", "b", "c"]',
'a_tuple': '[4, 5, 6]',
'a_bool': 'true'}
def test_quote_json_env_vars(self):
operation_kwargs = {'one': []}
process = common.create_process_config(
script_path='',
process={},
operation_kwargs=operation_kwargs,
quote_json_env_vars=True)
assert process['env']['one'] == "'[]'"
def test_env_keys_converted_to_string_on_windows(self, mocker):
mocker.patch.object(common, 'is_windows', lambda: True)
env = {u'one': '1'}
process = common.create_process_config(
script_path='',
process={'env': env},
operation_kwargs={})
print type(process['env'].keys()[0])
assert isinstance(process['env'].keys()[0], str)
def test_env_values_quotes_are_escaped_on_windows(self, mocker):
mocker.patch.object(common, 'is_windows', lambda: True)
env = {'one': '"hello"'}
process = common.create_process_config(
script_path='',
process={'env': env},
operation_kwargs={})
assert process['env']['one'] == '\\"hello\\"'