blob: d801ffb52640f687f37757956cce3dab0bdfb3ef [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import time
import Queue
import subprocess
import pytest
import psutil
import retrying
import aria
from aria import operation
from aria.modeling import models
from aria.orchestrator import events
from aria.utils.plugin import create as create_plugin
from aria.orchestrator.workflows.executor import process
import tests.storage
import tests.resources
from tests.helpers import FilesystemDataHolder
from tests.fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
)
from . import MockContext
class TestProcessExecutor(object):
def test_plugin_execution(self, executor, mock_plugin, model, queue):
ctx = MockContext(
model,
task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
)
executor.execute(ctx)
error = queue.get(timeout=60)
# tests/resources/plugins/mock-plugin1 is the plugin installed
# during this tests setup. The module mock_plugin1 contains a single
# operation named "operation" which calls an entry point defined in the plugin's
# setup.py. This entry points simply prints 'mock-plugin-output' to stdout.
# The "operation" operation that called this subprocess, then raises a RuntimeError
# with that subprocess output as the error message.
# This is what we assert here. This tests checks that both the PYTHONPATH (operation)
# and PATH (entry point) are properly updated in the subprocess in which the task is
# running.
assert isinstance(error, RuntimeError)
assert error.message == 'mock-plugin-output'
def test_closed(self, executor, model):
executor.close()
with pytest.raises(RuntimeError) as exc_info:
executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
assert 'closed' in exc_info.value.message
def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
freeze_script_path = str(tmpdir.join('freeze_script'))
with open(freeze_script_path, 'w+b') as f:
f.write(
'''import time
while True:
time.sleep(5)
'''
)
holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
script_path_argument = models.Argument.wrap('freezing_script_path',
str(tmpdir.join('freeze_script')))
model.argument.put(holder_path_argument)
model.argument.put(script_path_argument)
ctx = MockContext(
model,
task_kwargs=dict(
function='{0}.{1}'.format(__name__, freezing_task.__name__),
arguments=dict(holder_path=holder_path_argument,
freezing_script_path=script_path_argument)),
)
executor.execute(ctx)
@retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
def wait_for_extra_process_id():
return fs_test_holder.get('subproc', False)
task_pid = executor._tasks[ctx.task.id].proc.pid
extra_process_pid = wait_for_extra_process_id()
assert set([task_pid, extra_process_pid]).issubset(set(psutil.pids()))
executor.terminate(ctx.task.id)
# Give a chance to the processes to terminate
time.sleep(2)
# all processes should be either zombies or non existent
pids = [task_pid, extra_process_pid]
for pid in pids:
if pid in psutil.pids():
assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
else:
# making the test more readable
assert pid not in psutil.pids()
@pytest.fixture
def queue():
_queue = Queue.Queue()
def handler(_, exception=None, **kwargs):
_queue.put(exception)
events.on_success_task_signal.connect(handler)
events.on_failure_task_signal.connect(handler)
try:
yield _queue
finally:
events.on_success_task_signal.disconnect(handler)
events.on_failure_task_signal.disconnect(handler)
@pytest.fixture
def fs_test_holder(tmpdir):
dataholder_path = str(tmpdir.join('dataholder'))
holder = FilesystemDataHolder(dataholder_path)
return holder
@pytest.fixture
def executor(plugin_manager):
result = process.ProcessExecutor(plugin_manager=plugin_manager, python_path=[tests.ROOT_DIR])
try:
yield result
finally:
result.close()
@pytest.fixture
def mock_plugin(plugin_manager, tmpdir):
source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1')
plugin_path = create_plugin(source=source, destination_dir=str(tmpdir))
return plugin_manager.install(source=plugin_path)
@pytest.fixture
def model(tmpdir):
_storage = aria.application_model_storage(aria.storage.sql_mapi.SQLAlchemyModelAPI,
initiator_kwargs=dict(base_dir=str(tmpdir)))
yield _storage
tests.storage.release_sqlite_storage(_storage)
@operation
def freezing_task(holder_path, freezing_script_path, **_):
holder = FilesystemDataHolder(holder_path)
holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
while True:
time.sleep(5)