| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # pylint: skip-file |
| |
| """ |
| Built-in heal workflow. |
| """ |
| |
| from aria import workflow |
| |
| from .workflows import (install_node, uninstall_node) |
| from ..api import task |
| |
| |
| @workflow |
| def heal(ctx, graph, node_id): |
| """ |
| Built-in heal workflow.. |
| |
| :param ctx: workflow context |
| :param graph: graph which will describe the workflow. |
| :param node_id: ID of the node to heal |
| :return: |
| """ |
| failing_node = ctx.model.node.get(node_id) |
| host_node = ctx.model.node.get(failing_node.host.id) |
| failed_node_subgraph = _get_contained_subgraph(ctx, host_node) |
| failed_node_ids = list(n.id for n in failed_node_subgraph) |
| |
| targeted_nodes = [node for node in ctx.nodes |
| if node.id not in failed_node_ids] |
| |
| uninstall_subgraph = task.WorkflowTask( |
| heal_uninstall, |
| failing_nodes=failed_node_subgraph, |
| targeted_nodes=targeted_nodes |
| ) |
| |
| install_subgraph = task.WorkflowTask( |
| heal_install, |
| failing_nodes=failed_node_subgraph, |
| targeted_nodes=targeted_nodes) |
| |
| graph.sequence(uninstall_subgraph, install_subgraph) |
| |
| |
| @workflow(suffix_template='{failing_nodes}') |
| def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): |
| """ |
| Uninstall phase of the heal mechanism. |
| |
| :param ctx: workflow context |
| :param graph: task graph to edit |
| :param failing_nodes: failing nodes to heal |
| :param targeted_nodes: targets of the relationships where the failing node are |
| """ |
| node_sub_workflows = {} |
| |
| # Create install stub workflow for each unaffected node |
| for node in targeted_nodes: |
| node_stub = task.StubTask() |
| node_sub_workflows[node.id] = node_stub |
| graph.add_tasks(node_stub) |
| |
| # create install sub workflow for every node |
| for node in failing_nodes: |
| node_sub_workflow = task.WorkflowTask(uninstall_node, |
| node=node) |
| node_sub_workflows[node.id] = node_sub_workflow |
| graph.add_tasks(node_sub_workflow) |
| |
| # create dependencies between the node sub workflow |
| for node in failing_nodes: |
| node_sub_workflow = node_sub_workflows[node.id] |
| for relationship in reversed(node.outbound_relationships): |
| graph.add_dependency( |
| node_sub_workflows[relationship.target_node.id], |
| node_sub_workflow) |
| |
| # Add operations for intact nodes depending on a node belonging to nodes |
| for node in targeted_nodes: |
| node_sub_workflow = node_sub_workflows[node.id] |
| |
| for relationship in reversed(node.outbound_relationships): |
| |
| target_node = \ |
| ctx.model.node.get(relationship.target_node.id) |
| target_node_subgraph = node_sub_workflows[target_node.id] |
| graph.add_dependency(target_node_subgraph, node_sub_workflow) |
| |
| if target_node in failing_nodes: |
| dependency = task.create_relationship_tasks( |
| relationship=relationship, |
| operation_name='aria.interfaces.relationship_lifecycle.unlink') |
| graph.add_tasks(*dependency) |
| graph.add_dependency(node_sub_workflow, dependency) |
| |
| |
| @workflow(suffix_template='{failing_nodes}') |
| def heal_install(ctx, graph, failing_nodes, targeted_nodes): |
| """ |
| Install phase of the heal mechanism. |
| |
| :param ctx: workflow context |
| :param graph: task graph to edit. |
| :param failing_nodes: failing nodes to heal |
| :param targeted_nodes: targets of the relationships where the failing node are |
| """ |
| node_sub_workflows = {} |
| |
| # Create install sub workflow for each unaffected |
| for node in targeted_nodes: |
| node_stub = task.StubTask() |
| node_sub_workflows[node.id] = node_stub |
| graph.add_tasks(node_stub) |
| |
| # create install sub workflow for every node |
| for node in failing_nodes: |
| node_sub_workflow = task.WorkflowTask(install_node, |
| node=node) |
| node_sub_workflows[node.id] = node_sub_workflow |
| graph.add_tasks(node_sub_workflow) |
| |
| # create dependencies between the node sub workflow |
| for node in failing_nodes: |
| node_sub_workflow = node_sub_workflows[node.id] |
| if node.outbound_relationships: |
| dependencies = \ |
| [node_sub_workflows[relationship.target_node.id] |
| for relationship in node.outbound_relationships] |
| graph.add_dependency(node_sub_workflow, dependencies) |
| |
| # Add operations for intact nodes depending on a node |
| # belonging to nodes |
| for node in targeted_nodes: |
| node_sub_workflow = node_sub_workflows[node.id] |
| |
| for relationship in node.outbound_relationships: |
| target_node = ctx.model.node.get( |
| relationship.target_node.id) |
| target_node_subworkflow = node_sub_workflows[target_node.id] |
| graph.add_dependency(node_sub_workflow, target_node_subworkflow) |
| |
| if target_node in failing_nodes: |
| dependent = task.create_relationship_tasks( |
| relationship=relationship, |
| operation_name='aria.interfaces.relationship_lifecycle.establish') |
| graph.add_tasks(*dependent) |
| graph.add_dependency(dependent, node_sub_workflow) |
| |
| |
| def _get_contained_subgraph(context, host_node): |
| contained_instances = [node |
| for node in context.nodes |
| if node.host_fk == host_node.id and |
| node.host_fk != node.id] |
| result = [host_node] |
| |
| if not contained_instances: |
| return result |
| |
| result.extend(contained_instances) |
| for node in contained_instances: |
| result.extend(_get_contained_subgraph(context, node)) |
| |
| return set(result) |