| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| """ |
| Provides the tasks to be entered into the task graph |
| """ |
| |
| from ... import context |
| from ....modeling import models |
| from ....modeling import utils as modeling_utils |
| from ....utils.uuid import generate_uuid |
| from .. import exceptions |
| |
| |
| class BaseTask(object): |
| """ |
| Base class for tasks. |
| """ |
| |
| def __init__(self, ctx=None, **kwargs): |
| if ctx is not None: |
| self._workflow_context = ctx |
| else: |
| self._workflow_context = context.workflow.current.get() |
| self._id = generate_uuid(variant='uuid') |
| |
| @property |
| def id(self): |
| """ |
| UUID4 ID. |
| """ |
| return self._id |
| |
| @property |
| def workflow_context(self): |
| """ |
| Context of the current workflow. |
| """ |
| return self._workflow_context |
| |
| |
| class OperationTask(BaseTask): |
| """ |
| Executes an operation. |
| |
| :ivar name: formatted name (includes actor type, actor name, and interface/operation names) |
| :vartype name: basestring |
| :ivar actor: node or relationship |
| :vartype actor: :class:`~aria.modeling.models.Node` or |
| :class:`~aria.modeling.models.Relationship` |
| :ivar interface_name: interface name on actor |
| :vartype interface_name: basestring |
| :ivar operation_name: operation name on interface |
| :vartype operation_name: basestring |
| :ivar plugin: plugin (or None for default plugin) |
| :vartype plugin: :class:`~aria.modeling.models.Plugin` |
| :ivar function: path to Python function |
| :vartype function: basestring |
| :ivar arguments: arguments to send to Python function |
| :vartype arguments: {:obj:`basestring`: :class:`~aria.modeling.models.Argument`} |
| :ivar ignore_failure: whether to ignore failures |
| :vartype ignore_failure: bool |
| :ivar max_attempts: maximum number of attempts allowed in case of failure |
| :vartype max_attempts: int |
| :ivar retry_interval: interval between retries (in seconds) |
| :vartype retry_interval: float |
| """ |
| |
| NAME_FORMAT = u'{interface}:{operation}@{type}:{name}' |
| |
| def __init__(self, |
| actor, |
| interface_name, |
| operation_name, |
| arguments=None, |
| ignore_failure=None, |
| max_attempts=None, |
| retry_interval=None): |
| """ |
| :param actor: node or relationship |
| :type actor: :class:`~aria.modeling.models.Node` or |
| :class:`~aria.modeling.models.Relationship` |
| :param interface_name: interface name on actor |
| :type interface_name: basestring |
| :param operation_name: operation name on interface |
| :type operation_name: basestring |
| :param arguments: override argument values |
| :type arguments: {:obj:`basestring`: object} |
| :param ignore_failure: override whether to ignore failures |
| :type ignore_failure: bool |
| :param max_attempts: override maximum number of attempts allowed in case of failure |
| :type max_attempts: int |
| :param retry_interval: override interval between retries (in seconds) |
| :type retry_interval: float |
| :raises ~aria.orchestrator.workflows.exceptions.OperationNotFoundException: if |
| ``interface_name`` and ``operation_name`` do not refer to an operation on the actor |
| """ |
| |
| # Creating OperationTask directly should raise an error when there is no |
| # interface/operation. |
| if not has_operation(actor, interface_name, operation_name): |
| raise exceptions.OperationNotFoundException( |
| u'Could not find operation "{operation_name}" on interface ' |
| u'"{interface_name}" for {actor_type} "{actor.name}"'.format( |
| operation_name=operation_name, |
| interface_name=interface_name, |
| actor_type=type(actor).__name__.lower(), |
| actor=actor) |
| ) |
| |
| super(OperationTask, self).__init__() |
| |
| self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), |
| name=actor.name, |
| interface=interface_name, |
| operation=operation_name) |
| self.actor = actor |
| self.interface_name = interface_name |
| self.operation_name = operation_name |
| self.ignore_failure = \ |
| self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure |
| self.max_attempts = max_attempts or self.workflow_context._task_max_attempts |
| self.retry_interval = retry_interval or self.workflow_context._task_retry_interval |
| |
| operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] |
| self.plugin = operation.plugin |
| self.function = operation.function |
| self.arguments = modeling_utils.merge_parameter_values(arguments, operation.arguments) |
| |
| actor = self.actor |
| if hasattr(actor, '_wrapped'): |
| # Unwrap instrumented model |
| actor = actor._wrapped |
| |
| if isinstance(actor, models.Node): |
| self._context_cls = context.operation.NodeOperationContext |
| elif isinstance(actor, models.Relationship): |
| self._context_cls = context.operation.RelationshipOperationContext |
| else: |
| raise exceptions.TaskCreationException(u'Could not create valid context for ' |
| u'{actor.__class__}'.format(actor=actor)) |
| |
| def __repr__(self): |
| return self.name |
| |
| |
| class StubTask(BaseTask): |
| """ |
| Enables creating empty tasks. |
| """ |
| |
| |
| class WorkflowTask(BaseTask): |
| """ |
| Executes a complete workflow. |
| """ |
| |
| def __init__(self, workflow_func, **kwargs): |
| """ |
| :param workflow_func: function to run |
| :param kwargs: kwargs that would be passed to the workflow_func |
| """ |
| super(WorkflowTask, self).__init__(**kwargs) |
| kwargs['ctx'] = self.workflow_context |
| self._graph = workflow_func(**kwargs) |
| |
| @property |
| def graph(self): |
| """ |
| Graph constructed by the sub workflow. |
| """ |
| return self._graph |
| |
| def __getattr__(self, item): |
| try: |
| return getattr(self._graph, item) |
| except AttributeError: |
| return super(WorkflowTask, self).__getattribute__(item) |
| |
| |
| def create_task(actor, interface_name, operation_name, **kwargs): |
| """ |
| Helper function that enables safe creation of :class:`OperationTask`. If the supplied interface |
| or operation do not exist, ``None`` is returned. |
| |
| :param actor: actor for this task |
| :param interface_name: name of the interface |
| :param operation_name: name of the operation |
| :param kwargs: any additional kwargs to be passed to the OperationTask |
| :return: OperationTask or None (if the interface/operation does not exists) |
| """ |
| try: |
| return OperationTask( |
| actor, |
| interface_name=interface_name, |
| operation_name=operation_name, |
| **kwargs |
| ) |
| except exceptions.OperationNotFoundException: |
| return None |
| |
| |
| def create_relationships_tasks( |
| node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): |
| """ |
| Creates a relationship task (source and target) for all of a node relationships. |
| |
| :param basestring source_operation_name: relationship operation name |
| :param basestring interface_name: name of the interface |
| :param source_operation_name: |
| :param target_operation_name: |
| :param node: source node |
| """ |
| sub_tasks = [] |
| for relationship in node.outbound_relationships: |
| relationship_operations = create_relationship_tasks( |
| relationship, |
| interface_name, |
| source_operation_name=source_operation_name, |
| target_operation_name=target_operation_name, |
| **kwargs) |
| sub_tasks.append(relationship_operations) |
| return sub_tasks |
| |
| |
| def create_relationship_tasks(relationship, interface_name, source_operation_name=None, |
| target_operation_name=None, **kwargs): |
| """ |
| Creates a relationship task (source and target). |
| |
| :param relationship: relationship instance itself |
| :param source_operation_name: |
| :param target_operation_name: |
| """ |
| operations = [] |
| if source_operation_name: |
| operations.append( |
| create_task( |
| relationship, |
| interface_name=interface_name, |
| operation_name=source_operation_name, |
| **kwargs |
| ) |
| ) |
| if target_operation_name: |
| operations.append( |
| create_task( |
| relationship, |
| interface_name=interface_name, |
| operation_name=target_operation_name, |
| **kwargs |
| ) |
| ) |
| |
| return [o for o in operations if o] |
| |
| |
| def has_operation(actor, interface_name, operation_name): |
| interface = actor.interfaces.get(interface_name, None) |
| return interface and interface.operations.get(operation_name, False) |