blob: 706855735beb1ae1a5dd70f1944968c06dca3916 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
ARIA modeling orchestration module
"""
# pylint: disable=no-self-argument, no-member, abstract-method
from datetime import datetime
from sqlalchemy import (
Column,
Integer,
Text,
DateTime,
Boolean,
Enum,
String,
Float,
orm,
PickleType)
from sqlalchemy.ext.declarative import declared_attr
from ..orchestrator.exceptions import (TaskAbortException, TaskRetryException)
from . import mixins
from . import (
relationship,
types as modeling_types
)
class ExecutionBase(mixins.ModelMixin):
"""
Workflow execution.
"""
__tablename__ = 'execution'
__private_fields__ = ('service_fk',
'service_template')
SUCCEEDED = 'succeeded'
FAILED = 'failed'
CANCELLED = 'cancelled'
PENDING = 'pending'
STARTED = 'started'
CANCELLING = 'cancelling'
STATES = (SUCCEEDED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING)
END_STATES = (SUCCEEDED, FAILED, CANCELLED)
VALID_TRANSITIONS = {
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES,
CANCELLED: PENDING
}
# region one_to_many relationships
@declared_attr
def inputs(cls):
"""
Execution parameters.
:type: {:obj:`basestring`: :class:`Input`}
"""
return relationship.one_to_many(cls, 'input', dict_key='name')
@declared_attr
def tasks(cls):
"""
Tasks.
:type: [:class:`Task`]
"""
return relationship.one_to_many(cls, 'task')
@declared_attr
def logs(cls):
"""
Log messages for the execution (including log messages for its tasks).
:type: [:class:`Log`]
"""
return relationship.one_to_many(cls, 'log')
# endregion
# region many_to_one relationships
@declared_attr
def service(cls):
"""
Associated service.
:type: :class:`Service`
"""
return relationship.many_to_one(cls, 'service')
# endregion
# region association proxies
@declared_attr
def service_name(cls):
return relationship.association_proxy('service', cls.name_column_name())
@declared_attr
def service_template(cls):
return relationship.association_proxy('service', 'service_template')
@declared_attr
def service_template_name(cls):
return relationship.association_proxy('service', 'service_template_name')
# endregion
# region foreign keys
@declared_attr
def service_fk(cls):
return relationship.foreign_key('service')
# endregion
created_at = Column(DateTime, index=True, doc="""
Creation timestamp.
:type: :class:`~datetime.datetime`
""")
started_at = Column(DateTime, nullable=True, index=True, doc="""
Started timestamp.
:type: :class:`~datetime.datetime`
""")
ended_at = Column(DateTime, nullable=True, index=True, doc="""
Ended timestamp.
:type: :class:`~datetime.datetime`
""")
error = Column(Text, nullable=True, doc="""
Error message.
:type: :obj:`basestring`
""")
status = Column(Enum(*STATES, name='execution_status'), default=PENDING, doc="""
Status.
:type: :obj:`basestring`
""")
workflow_name = Column(Text, doc="""
Workflow name.
:type: :obj:`basestring`
""")
@orm.validates('status')
def validate_status(self, key, value):
"""Validation function that verifies execution status transitions are OK"""
try:
current_status = getattr(self, key)
except AttributeError:
return
valid_transitions = self.VALID_TRANSITIONS.get(current_status, [])
if all([current_status is not None,
current_status != value,
value not in valid_transitions]):
raise ValueError('Cannot change execution status from {current} to {new}'.format(
current=current_status,
new=value))
return value
def has_ended(self):
return self.status in self.END_STATES
def is_active(self):
return not self.has_ended() and self.status != self.PENDING
def __str__(self):
return '<{0} id=`{1}` (status={2})>'.format(
self.__class__.__name__,
getattr(self, self.name_column_name()),
self.status
)
class TaskBase(mixins.ModelMixin):
"""
Represents the smallest unit of stateful execution in ARIA. The task state includes inputs,
outputs, as well as an atomic status, ensuring that the task can only be running once at any
given time.
The Python :attr:`function` is usually provided by an associated :class:`Plugin`. The
:attr:`arguments` of the function should be set according to the specific signature of the
function.
Tasks may be "one shot" or may be configured to run repeatedly in the case of failure.
Tasks are often based on :class:`Operation`, and thus act on either a :class:`Node` or a
:class:`Relationship`, however this is not required.
"""
__tablename__ = 'task'
__private_fields__ = ('dependency_operation_task_fk', 'dependency_stub_task_fk', 'node_fk',
'relationship_fk', 'plugin_fk', 'execution_fk')
START_WORKFLOW = 'start_workflow'
END_WORKFLOW = 'end_workflow'
START_SUBWROFKLOW = 'start_subworkflow'
END_SUBWORKFLOW = 'end_subworkflow'
STUB = 'stub'
CONDITIONAL = 'conditional'
STUB_TYPES = (
START_WORKFLOW,
START_SUBWROFKLOW,
END_WORKFLOW,
END_SUBWORKFLOW,
STUB,
CONDITIONAL,
)
PENDING = 'pending'
RETRYING = 'retrying'
SENT = 'sent'
STARTED = 'started'
SUCCESS = 'success'
FAILED = 'failed'
STATES = (
PENDING,
RETRYING,
SENT,
STARTED,
SUCCESS,
FAILED,
)
INFINITE_RETRIES = -1
# region one_to_many relationships
@declared_attr
def logs(cls):
"""
Log messages.
:type: [:class:`Log`]
"""
return relationship.one_to_many(cls, 'log')
@declared_attr
def arguments(cls):
"""
Arguments sent to the Python :attr:`function``.
:type: {:obj:`basestring`: :class:`Argument`}
"""
return relationship.one_to_many(cls, 'argument', dict_key='name')
# endregion
# region many_one relationships
@declared_attr
def execution(cls):
"""
Containing execution.
:type: :class:`Execution`
"""
return relationship.many_to_one(cls, 'execution')
@declared_attr
def node(cls):
"""
Node actor (can be ``None``).
:type: :class:`Node`
"""
return relationship.many_to_one(cls, 'node')
@declared_attr
def relationship(cls):
"""
Relationship actor (can be ``None``).
:type: :class:`Relationship`
"""
return relationship.many_to_one(cls, 'relationship')
@declared_attr
def plugin(cls):
"""
Associated plugin.
:type: :class:`Plugin`
"""
return relationship.many_to_one(cls, 'plugin')
# endregion
# region association proxies
@declared_attr
def node_name(cls):
return relationship.association_proxy('node', cls.name_column_name())
@declared_attr
def relationship_name(cls):
return relationship.association_proxy('relationship', cls.name_column_name())
@declared_attr
def execution_name(cls):
return relationship.association_proxy('execution', cls.name_column_name())
# endregion
# region foreign keys
@declared_attr
def execution_fk(cls):
return relationship.foreign_key('execution', nullable=True)
@declared_attr
def node_fk(cls):
return relationship.foreign_key('node', nullable=True)
@declared_attr
def relationship_fk(cls):
return relationship.foreign_key('relationship', nullable=True)
@declared_attr
def plugin_fk(cls):
return relationship.foreign_key('plugin', nullable=True)
# endregion
status = Column(Enum(*STATES, name='status'), default=PENDING, doc="""
Current atomic status ('pending', 'retrying', 'sent', 'started', 'success', 'failed').
:type: :obj:`basestring`
""")
due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow(), doc="""
Timestamp to start the task.
:type: :class:`~datetime.datetime`
""")
started_at = Column(DateTime, default=None, doc="""
Started timestamp.
:type: :class:`~datetime.datetime`
""")
ended_at = Column(DateTime, default=None, doc="""
Ended timestamp.
:type: :class:`~datetime.datetime`
""")
attempts_count = Column(Integer, default=1, doc="""
How many attempts occurred.
:type: :class:`~datetime.datetime`
""")
function = Column(String, doc="""
Full path to Python function.
:type: :obj:`basestring`
""")
max_attempts = Column(Integer, default=1, doc="""
Maximum number of attempts allowed in case of task failure.
:type: :obj:`int`
""")
retry_interval = Column(Float, default=0, doc="""
Interval between task retry attemps (in seconds).
:type: :obj:`float`
""")
ignore_failure = Column(Boolean, default=False, doc="""
Set to ``True`` to ignore failures.
:type: :obj:`bool`
""")
interface_name = Column(String, doc="""
Name of interface on node or relationship.
:type: :obj:`basestring`
""")
operation_name = Column(String, doc="""
Name of operation in interface on node or relationship.
:type: :obj:`basestring`
""")
_api_id = Column(String)
_executor = Column(PickleType)
_context_cls = Column(PickleType)
_stub_type = Column(Enum(*STUB_TYPES))
@property
def actor(self):
"""
Actor of the task (node or relationship).
"""
return self.node or self.relationship
@orm.validates('max_attempts')
def validate_max_attempts(self, _, value): # pylint: disable=no-self-use
"""
Validates that max attempts is either -1 or a positive number.
"""
if value < 1 and value != TaskBase.INFINITE_RETRIES:
raise ValueError('Max attempts can be either -1 (infinite) or any positive number. '
'Got {value}'.format(value=value))
return value
@staticmethod
def abort(message=None):
raise TaskAbortException(message)
@staticmethod
def retry(message=None, retry_interval=None):
raise TaskRetryException(message, retry_interval=retry_interval)
@declared_attr
def dependencies(cls):
return relationship.many_to_many(cls, self=True)
def has_ended(self):
return self.status in (self.SUCCESS, self.FAILED)
def is_waiting(self):
if self._stub_type:
return not self.has_ended()
else:
return self.status in (self.PENDING, self.RETRYING)
@classmethod
def from_api_task(cls, api_task, executor, **kwargs):
instantiation_kwargs = {}
if hasattr(api_task.actor, 'outbound_relationships'):
instantiation_kwargs['node'] = api_task.actor
elif hasattr(api_task.actor, 'source_node'):
instantiation_kwargs['relationship'] = api_task.actor
else:
raise RuntimeError('No operation context could be created for {actor.model_cls}'
.format(actor=api_task.actor))
instantiation_kwargs.update(
{
'name': api_task.name,
'status': cls.PENDING,
'max_attempts': api_task.max_attempts,
'retry_interval': api_task.retry_interval,
'ignore_failure': api_task.ignore_failure,
'execution': api_task._workflow_context.execution,
'interface_name': api_task.interface_name,
'operation_name': api_task.operation_name,
# Only non-stub tasks have these fields
'plugin': api_task.plugin,
'function': api_task.function,
'arguments': api_task.arguments,
'_context_cls': api_task._context_cls,
'_executor': executor,
}
)
instantiation_kwargs.update(**kwargs)
return cls(**instantiation_kwargs)
class LogBase(mixins.ModelMixin):
"""
Single log message.
"""
__tablename__ = 'log'
__private_fields__ = ('execution_fk',
'task_fk')
# region many_to_one relationships
@declared_attr
def execution(cls):
"""
Containing execution.
:type: :class:`Execution`
"""
return relationship.many_to_one(cls, 'execution')
@declared_attr
def task(cls):
"""
Containing task (can be ``None``).
:type: :class:`Task`
"""
return relationship.many_to_one(cls, 'task')
# endregion
# region foreign keys
@declared_attr
def execution_fk(cls):
return relationship.foreign_key('execution')
@declared_attr
def task_fk(cls):
return relationship.foreign_key('task', nullable=True)
# endregion
level = Column(String, doc="""
Log level.
:type: :obj:`basestring`
""")
msg = Column(String, doc="""
Log message.
:type: :obj:`basestring`
""")
created_at = Column(DateTime, index=True, doc="""
Creation timestamp.
:type: :class:`~datetime.datetime`
""")
traceback = Column(Text, doc="""
Error traceback in case of failure.
:type: :class:`~datetime.datetime`
""")
def __str__(self):
return self.msg
def __repr__(self):
name = (self.task.actor if self.task else self.execution).name
return '{name}: {self.msg}'.format(name=name, self=self)
class PluginBase(mixins.ModelMixin):
"""
Installed plugin.
Plugins are usually packaged as `wagons <https://github.com/cloudify-cosmo/wagon>`__, which
are archives of one or more `wheels <https://packaging.python.org/distributing/#wheels>`__.
Most of these fields are indeed extracted from the installed wagon's metadata.
"""
__tablename__ = 'plugin'
# region one_to_many relationships
@declared_attr
def tasks(cls):
"""
Associated Tasks.
:type: [:class:`Task`]
"""
return relationship.one_to_many(cls, 'task')
# endregion
archive_name = Column(Text, nullable=False, index=True, doc="""
Filename (not the full path) of the wagon's archive, often with a ``.wgn`` extension.
:type: :obj:`basestring`
""")
distribution = Column(Text, doc="""
Name of the operating system on which the wagon was installed (e.g. ``ubuntu``).
:type: :obj:`basestring`
""")
distribution_release = Column(Text, doc="""
Release of the operating system on which the wagon was installed (e.g. ``trusty``).
:type: :obj:`basestring`
""")
distribution_version = Column(Text, doc="""
Version of the operating system on which the wagon was installed (e.g. ``14.04``).
:type: :obj:`basestring`
""")
package_name = Column(Text, nullable=False, index=True, doc="""
Primary Python package name used when the wagon was installed, which is one of the wheels in the
wagon (e.g. ``cloudify-script-plugin``).
:type: :obj:`basestring`
""")
package_source = Column(Text, doc="""
Full install string for the primary Python package name used when the wagon was installed (e.g.
``cloudify-script-plugin==1.2``).
:type: :obj:`basestring`
""")
package_version = Column(Text, doc="""
Version for the primary Python package name used when the wagon was installed (e.g. ``1.2``).
:type: :obj:`basestring`
""")
supported_platform = Column(Text, doc="""
If the wheels are *all* pure Python then this would be "any", otherwise it would be the
installed platform name (e.g. ``linux_x86_64``).
:type: :obj:`basestring`
""")
supported_py_versions = Column(modeling_types.StrictList(basestring), doc="""
Python versions supported by all the wheels (e.g. ``["py26", "py27"]``)
:type: [:obj:`basestring`]
""")
wheels = Column(modeling_types.StrictList(basestring), nullable=False, doc="""
Filenames of the wheels archived in the wagon, often with a ``.whl`` extension.
:type: [:obj:`basestring`]
""")
uploaded_at = Column(DateTime, nullable=False, index=True, doc="""
Timestamp for when the wagon was installed.
:type: :class:`~datetime.datetime`
""")
class ArgumentBase(mixins.ParameterMixin):
"""
Python function argument parameter.
"""
__tablename__ = 'argument'
# region many_to_one relationships
@declared_attr
def task(cls):
"""
Containing task (can be ``None``);
:type: :class:`Task`
"""
return relationship.many_to_one(cls, 'task')
@declared_attr
def operation(cls):
"""
Containing operation (can be ``None``);
:type: :class:`Operation`
"""
return relationship.many_to_one(cls, 'operation')
# endregion
# region foreign keys
@declared_attr
def task_fk(cls):
return relationship.foreign_key('task', nullable=True)
@declared_attr
def operation_fk(cls):
return relationship.foreign_key('operation', nullable=True)
# endregion