blob: 256927ce6a70093af5c47ec387e806e3cfe1a69e [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Built-in operation execution Workflow.
"""
from ... import workflow
from ..api import task
@workflow
def execute_operation(
ctx,
graph,
interface_name,
operation_name,
operation_kwargs,
run_by_dependency_order,
type_names,
node_template_ids,
node_ids,
**kwargs):
"""
Built-in operation execution Workflow.
:param workflow_context: workflow context
:param graph: graph which will describe the workflow
:param operation: operation name to execute
:param operation_kwargs:
:param run_by_dependency_order:
:param type_names:
:param node_template_ids:
:param node_ids:
:param kwargs:
:return:
"""
subgraphs = {}
# filtering node instances
filtered_nodes = list(_filter_nodes(
context=ctx,
node_template_ids=node_template_ids,
node_ids=node_ids,
type_names=type_names))
if run_by_dependency_order:
filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes)
for node in ctx.nodes:
if node.id not in filtered_node_ids:
subgraphs[node.id] = ctx.task_graph(
name=u'execute_operation_stub_{0}'.format(node.id))
# registering actual tasks to sequences
for node in filtered_nodes:
graph.add_tasks(
task.OperationTask(
node,
interface_name=interface_name,
operation_name=operation_name,
arguments=operation_kwargs
)
)
for _, node_sub_workflow in subgraphs.items():
graph.add_tasks(node_sub_workflow)
# adding tasks dependencies if required
if run_by_dependency_order:
for node in ctx.nodes:
for relationship in node.relationships:
graph.add_dependency(
source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]])
def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()):
def _is_node_template_by_id(node_template_id):
return not node_template_ids or node_template_id in node_template_ids
def _is_node_by_id(node_id):
return not node_ids or node_id in node_ids
def _is_node_by_type(node_type):
return not node_type.name in type_names
for node in context.nodes:
if all((_is_node_template_by_id(node.node_template.id),
_is_node_by_id(node.id),
_is_node_by_type(node.node_template.type))):
yield node