| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import pytest |
| |
| from aria.orchestrator.workflows import api |
| from aria.orchestrator.workflows.core import engine, graph_compiler |
| from aria.orchestrator.workflows.executor import process |
| from aria.orchestrator import workflow, operation |
| import tests |
| from tests import mock |
| from tests import storage |
| |
| |
| TEST_FILE_CONTENT = 'CONTENT' |
| TEST_FILE_ENTRY_ID = 'entry' |
| TEST_FILE_NAME = 'test_file' |
| |
| |
| def test_serialize_operation_context(context, executor, tmpdir): |
| test_file = tmpdir.join(TEST_FILE_NAME) |
| test_file.write(TEST_FILE_CONTENT) |
| resource = context.resource |
| resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) |
| |
| node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) |
| plugin = mock.models.create_plugin() |
| context.model.plugin.put(plugin) |
| interface = mock.models.create_interface( |
| node.service, |
| 'test', |
| 'op', |
| operation_kwargs=dict(function=_operation_mapping(), |
| plugin=plugin) |
| ) |
| node.interfaces[interface.name] = interface |
| context.model.node.update(node) |
| |
| graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter |
| graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) |
| eng = engine.Engine(executor) |
| eng.execute(context) |
| |
| |
| @workflow |
| def _mock_workflow(ctx, graph): |
| node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) |
| graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op')) |
| return graph |
| |
| |
| @operation |
| def _mock_operation(ctx): |
| # We test several things in this operation |
| # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created |
| # a correct ctx.task.function tells us we kept the correct task_id |
| assert ctx.task.function == _operation_mapping() |
| # a correct ctx.node.name tells us we kept the correct actor_id |
| assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME |
| # a correct ctx.name tells us we kept the correct name |
| assert ctx.name is not None |
| assert ctx.name == ctx.task.name |
| # a correct ctx.deployment.name tells us we kept the correct deployment_id |
| assert ctx.service.name == mock.models.SERVICE_NAME |
| # Here we test that the resource storage was properly re-created |
| test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME) |
| assert test_file_content == TEST_FILE_CONTENT |
| # a non empty plugin workdir tells us that we kept the correct base_workdir |
| assert ctx.plugin_workdir is not None |
| |
| |
| def _operation_mapping(): |
| return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation) |
| |
| |
| @pytest.fixture |
| def executor(): |
| result = process.ProcessExecutor(python_path=[tests.ROOT_DIR]) |
| try: |
| yield result |
| finally: |
| result.close() |
| |
| |
| @pytest.fixture |
| def context(tmpdir): |
| result = mock.context.simple( |
| str(tmpdir), |
| context_kwargs=dict(workdir=str(tmpdir.join('workdir'))) |
| ) |
| |
| yield result |
| storage.release_sqlite_storage(result.model) |