| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import pytest |
| |
| from aria.orchestrator import context |
| from aria.orchestrator.workflows import api |
| |
| from tests import mock, storage |
| |
| |
| @pytest.fixture |
| def ctx(tmpdir): |
| """ |
| Create the following graph in storage: |
| dependency_node <------ dependent_node |
| :return: |
| """ |
| simple_context = mock.context.simple(str(tmpdir), inmemory=False) |
| simple_context.model.execution.put(mock.models.create_execution(simple_context.service)) |
| yield simple_context |
| storage.release_sqlite_storage(simple_context.model) |
| |
| |
| class TestOperationTask(object): |
| |
| def test_node_operation_task_creation(self, ctx): |
| interface_name = 'test_interface' |
| operation_name = 'create' |
| |
| plugin = mock.models.create_plugin('test_plugin', '0.1') |
| ctx.model.node.update(plugin) |
| |
| arguments = {'test_input': True} |
| |
| interface = mock.models.create_interface( |
| ctx.service, |
| interface_name, |
| operation_name, |
| operation_kwargs=dict(plugin=plugin, |
| function='op_path', |
| arguments=arguments),) |
| |
| node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) |
| node.interfaces[interface_name] = interface |
| ctx.model.node.update(node) |
| max_attempts = 10 |
| retry_interval = 10 |
| ignore_failure = True |
| |
| with context.workflow.current.push(ctx): |
| api_task = api.task.OperationTask( |
| node, |
| interface_name=interface_name, |
| operation_name=operation_name, |
| arguments=arguments, |
| max_attempts=max_attempts, |
| retry_interval=retry_interval, |
| ignore_failure=ignore_failure) |
| |
| assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( |
| type='node', |
| name=node.name, |
| interface=interface_name, |
| operation=operation_name |
| ) |
| assert api_task.function == 'op_path' |
| assert api_task.actor == node |
| assert api_task.arguments['test_input'].value is True |
| assert api_task.retry_interval == retry_interval |
| assert api_task.max_attempts == max_attempts |
| assert api_task.ignore_failure == ignore_failure |
| assert api_task.plugin.name == 'test_plugin' |
| |
| def test_source_relationship_operation_task_creation(self, ctx): |
| interface_name = 'test_interface' |
| operation_name = 'preconfigure' |
| |
| plugin = mock.models.create_plugin('test_plugin', '0.1') |
| ctx.model.plugin.update(plugin) |
| |
| arguments = {'test_input': True} |
| |
| interface = mock.models.create_interface( |
| ctx.service, |
| interface_name, |
| operation_name, |
| operation_kwargs=dict(plugin=plugin, |
| function='op_path', |
| arguments=arguments) |
| ) |
| |
| relationship = ctx.model.relationship.list()[0] |
| relationship.interfaces[interface.name] = interface |
| max_attempts = 10 |
| retry_interval = 10 |
| |
| with context.workflow.current.push(ctx): |
| api_task = api.task.OperationTask( |
| relationship, |
| interface_name=interface_name, |
| operation_name=operation_name, |
| arguments=arguments, |
| max_attempts=max_attempts, |
| retry_interval=retry_interval) |
| |
| assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( |
| type='relationship', |
| name=relationship.name, |
| interface=interface_name, |
| operation=operation_name |
| ) |
| assert api_task.function == 'op_path' |
| assert api_task.actor == relationship |
| assert api_task.arguments['test_input'].value is True |
| assert api_task.retry_interval == retry_interval |
| assert api_task.max_attempts == max_attempts |
| assert api_task.plugin.name == 'test_plugin' |
| |
| def test_target_relationship_operation_task_creation(self, ctx): |
| interface_name = 'test_interface' |
| operation_name = 'preconfigure' |
| |
| plugin = mock.models.create_plugin('test_plugin', '0.1') |
| ctx.model.node.update(plugin) |
| |
| arguments = {'test_input': True} |
| |
| interface = mock.models.create_interface( |
| ctx.service, |
| interface_name, |
| operation_name, |
| operation_kwargs=dict(plugin=plugin, |
| function='op_path', |
| arguments=arguments) |
| ) |
| |
| relationship = ctx.model.relationship.list()[0] |
| relationship.interfaces[interface.name] = interface |
| max_attempts = 10 |
| retry_interval = 10 |
| |
| with context.workflow.current.push(ctx): |
| api_task = api.task.OperationTask( |
| relationship, |
| interface_name=interface_name, |
| operation_name=operation_name, |
| arguments=arguments, |
| max_attempts=max_attempts, |
| retry_interval=retry_interval) |
| |
| assert api_task.name == api.task.OperationTask.NAME_FORMAT.format( |
| type='relationship', |
| name=relationship.name, |
| interface=interface_name, |
| operation=operation_name |
| ) |
| assert api_task.function == 'op_path' |
| assert api_task.actor == relationship |
| assert api_task.arguments['test_input'].value is True |
| assert api_task.retry_interval == retry_interval |
| assert api_task.max_attempts == max_attempts |
| assert api_task.plugin.name == 'test_plugin' |
| |
| def test_operation_task_default_values(self, ctx): |
| interface_name = 'test_interface' |
| operation_name = 'create' |
| |
| plugin = mock.models.create_plugin('package', '0.1') |
| ctx.model.node.update(plugin) |
| |
| dependency_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) |
| |
| interface = mock.models.create_interface( |
| ctx.service, |
| interface_name, |
| operation_name, |
| operation_kwargs=dict(plugin=plugin, |
| function='op_path')) |
| dependency_node.interfaces[interface_name] = interface |
| |
| with context.workflow.current.push(ctx): |
| task = api.task.OperationTask( |
| dependency_node, |
| interface_name=interface_name, |
| operation_name=operation_name) |
| |
| assert task.arguments == {} |
| assert task.retry_interval == ctx._task_retry_interval |
| assert task.max_attempts == ctx._task_max_attempts |
| assert task.ignore_failure == ctx._task_ignore_failure |
| assert task.plugin is plugin |
| |
| |
| class TestWorkflowTask(object): |
| |
| def test_workflow_task_creation(self, ctx): |
| |
| workspace = {} |
| |
| mock_class = type('mock_class', (object,), {'test_attribute': True}) |
| |
| def sub_workflow(**kwargs): |
| workspace.update(kwargs) |
| return mock_class |
| |
| with context.workflow.current.push(ctx): |
| workflow_task = api.task.WorkflowTask(sub_workflow, kwarg='workflow_kwarg') |
| assert workflow_task.graph is mock_class |
| assert workflow_task.test_attribute is True |