blob: 9daa4f42ad11551c49987397a1f7f556e34451d3 [file] [log] [blame]
#!/usr/bin/env python
# coding=utf-8
# Copyright [2019] [Apache Software Foundation]
#
# Licensed under the Apache License, Version 2.0 (the 'License');
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
try:
import mock
except ImportError:
import unittest.mock as mock
from mock import call
from mock import ANY
from marvin_python_toolbox.management.engine import MarvinDryRun
from marvin_python_toolbox.management.engine import dryrun
from marvin_python_toolbox.management.engine import engine_httpserver
from marvin_python_toolbox.management.engine import _create_virtual_env
from marvin_python_toolbox.management.engine import _make_data_link
import os
class mocked_ctx(object):
obj = {'package_name': 'test_package', 'config': {'inidir': 'test_dir'}}
def mocked_sleep(value):
if value == 100:
raise KeyboardInterrupt()
class mocked_acquisitor():
def __init__(self, persistence_mode, is_remote_calling, default_root_path):
self.persistence_mode = persistence_mode
self.is_remote_calling = is_remote_calling
self.default_root_path = default_root_path
def execute(self, **kwargs):
print ('test')
@mock.patch('marvin_python_toolbox.management.engine.time.time')
@mock.patch('marvin_python_toolbox.management.engine.MarvinDryRun')
@mock.patch('marvin_python_toolbox.management.engine.sys.exit')
@mock.patch('marvin_python_toolbox.management.engine.os.system')
def test_dryrun(system_mocked, exit_mocked, MarvinDryRun_mocked, time_mocked):
params = '/tmp/params'
messages_file = '/tmp/messages'
feedback_file = '/tmp/feedback'
action = 'all'
spark_conf = '/opt/spark/conf'
time_mocked.return_value = 555
dryrun(ctx=mocked_ctx, action=action, params_file=params, messages_file=messages_file, feedback_file=feedback_file, initial_dataset=None,
dataset=None, model=None, metrics=None, response=False, spark_conf=spark_conf, profiling=None)
time_mocked.assert_called()
exit_mocked.assert_called_with("Stoping process!")
MarvinDryRun_mocked.assert_called_with(ctx=mocked_ctx, messages=[{}, {}], print_response=False)
MarvinDryRun_mocked.return_value.execute.assert_called_with(clazz='Feedback', dataset=None, initial_dataset=None, metrics=None, model=None,
params={}, profiling_enabled=None)
action = 'acquisitor'
dryrun(ctx=mocked_ctx, action=action, params_file=params, messages_file=messages_file, feedback_file=feedback_file, initial_dataset=None,
dataset=None, model=None, metrics=None, response=False, spark_conf=spark_conf, profiling=None)
time_mocked.assert_called()
MarvinDryRun_mocked.assert_called_with(ctx=mocked_ctx, messages=[{}, {}], print_response=False)
@mock.patch('marvin_python_toolbox.management.engine.json.dumps')
@mock.patch('marvin_python_toolbox.management.engine.dynamic_import')
def test_marvindryrun(import_mocked, dumps_mocked):
messages = ['/tmp/messages', '/tmp/feedback']
response = 'response'
clazz = 'PredictionPreparator'
import_mocked.return_value = mocked_acquisitor
test_dryrun = MarvinDryRun(ctx=mocked_ctx, messages=messages, print_response=response)
test_dryrun.execute(clazz=clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None, profiling_enabled=True)
import_mocked.assert_called_with("{}.{}".format('test_package', 'PredictionPreparator'))
dumps_mocked.assert_called_with(None, indent=4, sort_keys=True)
clazz = 'Feedback'
test_dryrun.execute(clazz=clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None, profiling_enabled=False)
import_mocked.assert_called_with("{}.{}".format('test_package', 'Feedback'))
clazz = 'Predictor'
test_dryrun.execute(clazz=clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None, profiling_enabled=False)
import_mocked.assert_called_with("{}.{}".format('test_package', 'PredictionPreparator'))
clazz = 'test'
test_dryrun.execute(clazz=clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None, profiling_enabled=True)
test_dryrun.execute(clazz=clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None, profiling_enabled=False)
import_mocked.assert_called_with("{}.{}".format('test_package', 'test'))
response = False
clazz = 'PredictionPreparator'
MarvinDryRun(ctx=mocked_ctx, messages=messages, print_response=response)
test_dryrun = MarvinDryRun(ctx=mocked_ctx, messages=messages, print_response=response)
test_dryrun.execute(clazz=clazz, params=None, initial_dataset=None, dataset=None, model=None, metrics=None, profiling_enabled=False)
dumps_mocked.assert_called_with(None, indent=4, sort_keys=True)
@mock.patch('marvin_python_toolbox.management.engine.sys.exit')
@mock.patch('marvin_python_toolbox.management.engine.time.sleep')
@mock.patch('marvin_python_toolbox.management.engine.MarvinData')
@mock.patch('marvin_python_toolbox.management.engine.Config')
@mock.patch('marvin_python_toolbox.management.engine.subprocess.Popen')
def test_engine_httpserver(Popen_mocked, Config_mocked, MarvinData_mocked, sleep_mocked, exit_mocked):
sleep_mocked.side_effect = mocked_sleep
engine_httpserver(ctx=mocked_ctx, action='all', params_file='test_params', initial_dataset='test_id', dataset='test_d', model='test_m', metrics='test_me',
protocol='test_protocol', spark_conf='test_conf', http_host='test_host', http_port=9999, executor_path='test_executor',
max_workers=9, max_rpc_workers=99, extra_executor_parameters="-DXX=123")
expected_calls = []
expected_calls.append(call([
'marvin', 'engine-grpcserver',
'-a', 'all',
'-w', '9',
'-rw', '99',
ANY, ANY,
ANY, ANY,
ANY, ANY,
ANY, ANY,
ANY, ANY,
ANY, ANY]
))
expected_calls.append(call([
'java',
'-DmarvinConfig.engineHome=test_dir',
'-DmarvinConfig.ipAddress=test_host',
'-DmarvinConfig.port=9999',
'-DmarvinConfig.protocol=test_protocol',
'-DXX=123',
'-jar',
MarvinData_mocked.download_file('test_executor')]
))
Popen_mocked.assert_has_calls(expected_calls)
exit_mocked.assert_called_with(0)
@mock.patch('marvin_python_toolbox.management.engine.subprocess.Popen')
def test_create_virtual_env(Popen_mocked):
name = "my_project"
dest = "/tmp/xxx"
python = "python"
mockx = mock.MagicMock()
mockx.wait.return_value = 0
Popen_mocked.return_value = mockx
env_name = _create_virtual_env(name, dest, python)
commands = [
'bash',
'-c',
'. virtualenvwrapper.sh; mkvirtualenv -p {0} -a {1} {2};'.format(python, dest, env_name)
]
Popen_mocked.assert_called_with(commands, env=os.environ)
assert env_name == 'my-project-env'
@mock.patch('marvin_python_toolbox.management.engine.sys')
@mock.patch('marvin_python_toolbox.management.engine.subprocess.Popen')
def test_create_virtual_env_error(Popen_mocked, sys_mocked):
name = "my_project"
dest = "/tmp/xxx"
python = "python"
mockx = mock.MagicMock()
mockx.wait.return_value = 3
Popen_mocked.return_value = mockx
env_name = _create_virtual_env(name, dest, python)
commands = [
'bash',
'-c',
'. virtualenvwrapper.sh; mkvirtualenv -p {0} -a {1} {2};'.format(python, dest, env_name)
]
Popen_mocked.assert_called_with(commands, env=os.environ)
mockx.wait.assert_called_once()
# sys_mocked.exit.assert_called_once_with(1)
@mock.patch('marvin_python_toolbox.management.engine.os.symlink')
def test_make_data_link_call_symlink(mock_symlink):
os.environ['MARVIN_DATA_PATH'] = '/tmp/'
dest = '/tmp/'
_make_data_link(dest)
mock_symlink.assert_called_once_with('/tmp/', '/tmp/notebooks/data')