| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import os |
| import time |
| import Queue |
| import subprocess |
| |
| import pytest |
| import psutil |
| import retrying |
| |
| import aria |
| from aria import operation |
| from aria.modeling import models |
| from aria.orchestrator import events |
| from aria.utils.plugin import create as create_plugin |
| from aria.orchestrator.workflows.executor import process |
| |
| import tests.storage |
| import tests.resources |
| from tests.helpers import FilesystemDataHolder |
| from tests.fixtures import ( # pylint: disable=unused-import |
| plugins_dir, |
| plugin_manager, |
| ) |
| from . import MockContext |
| |
| |
| class TestProcessExecutor(object): |
| |
| def test_plugin_execution(self, executor, mock_plugin, model): |
| ctx = MockContext( |
| model, |
| task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id) |
| ) |
| |
| queue = Queue.Queue() |
| |
| def handler(_, exception=None, **kwargs): |
| queue.put(exception) |
| |
| events.on_success_task_signal.connect(handler) |
| events.on_failure_task_signal.connect(handler) |
| try: |
| executor.execute(ctx) |
| error = queue.get(timeout=60) |
| # tests/resources/plugins/mock-plugin1 is the plugin installed |
| # during this tests setup. The module mock_plugin1 contains a single |
| # operation named "operation" which calls an entry point defined in the plugin's |
| # setup.py. This entry points simply prints 'mock-plugin-output' to stdout. |
| # The "operation" operation that called this subprocess, then raises a RuntimeError |
| # with that subprocess output as the error message. |
| # This is what we assert here. This tests checks that both the PYTHONPATH (operation) |
| # and PATH (entry point) are properly updated in the subprocess in which the task is |
| # running. |
| assert isinstance(error, RuntimeError) |
| assert error.message == 'mock-plugin-output' |
| finally: |
| events.on_success_task_signal.disconnect(handler) |
| events.on_failure_task_signal.disconnect(handler) |
| |
| def test_closed(self, executor, model): |
| executor.close() |
| with pytest.raises(RuntimeError) as exc_info: |
| executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) |
| assert 'closed' in exc_info.value.message |
| |
| @pytest.mark.skipif(os.name == 'nt', reason='uses bash script') |
| def test_process_termination(self, executor, model, fs_test_holder): |
| argument = models.Argument.wrap('holder_path', fs_test_holder._path) |
| model.argument.put(argument) |
| ctx = MockContext( |
| model, |
| task_kwargs=dict( |
| function='{0}.{1}'.format(__name__, freezing_task.__name__), |
| arguments=dict(holder_path=argument)), |
| ) |
| |
| executor.execute(ctx) |
| |
| @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) |
| def wait_for_extra_process_id(): |
| return fs_test_holder.get('subproc', False) |
| |
| pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()] |
| assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) |
| executor.terminate(ctx.task.id) |
| |
| # Give a chance to the processes to terminate |
| time.sleep(10) # windows might require more time |
| assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE |
| for p in psutil.process_iter() |
| for pid in pids) |
| |
| |
| @pytest.fixture |
| def fs_test_holder(tmpdir): |
| dataholder_path = str(tmpdir.join('dataholder')) |
| holder = FilesystemDataHolder(dataholder_path) |
| return holder |
| |
| |
| @pytest.fixture |
| def executor(plugin_manager): |
| result = process.ProcessExecutor( |
| plugin_manager=plugin_manager, |
| python_path=[tests.ROOT_DIR]) |
| yield result |
| result.close() |
| |
| |
| @pytest.fixture |
| def mock_plugin(plugin_manager, tmpdir): |
| source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') |
| plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) |
| return plugin_manager.install(source=plugin_path) |
| |
| |
| @pytest.fixture |
| def model(tmpdir): |
| _storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI, |
| initiator_kwargs=dict(base_dir=str(tmpdir))) |
| yield _storage |
| tests.storage.release_sqlite_storage(_storage) |
| |
| |
| @operation |
| def freezing_task(holder_path, **_): |
| holder = FilesystemDataHolder(holder_path) |
| holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid |
| while True: |
| time.sleep(5) |