blob: df1725f9915919133e33fb31753c90bfd55ae714 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Running workflows.
"""
import os
import sys
from datetime import datetime
from . import exceptions
from .context.workflow import WorkflowContext
from .workflows import builtin
from .workflows.core import engine, graph_compiler
from .workflows.executor.process import ProcessExecutor
from ..modeling import models
from ..modeling import utils as modeling_utils
from ..utils.imports import import_fullname
DEFAULT_TASK_MAX_ATTEMPTS = 30
DEFAULT_TASK_RETRY_INTERVAL = 30
class WorkflowRunner(object):
def __init__(self, model_storage, resource_storage, plugin_manager,
execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
Manages a single workflow execution on a given service.
:param workflow_name: workflow name
:param service_id: service ID
:param inputs: key-value dict of inputs for the execution
:param model_storage: model storage API ("MAPI")
:param resource_storage: resource storage API ("RAPI")
:param plugin_manager: plugin manager
:param executor: executor for tasks; defaults to a
:class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance
:param task_max_attempts: maximum attempts of repeating each failing task
:param task_retry_interval: retry interval between retry attempts of a failing task
"""
if not (execution_id or (workflow_name and service_id)):
exceptions.InvalidWorkflowRunnerParams(
"Either provide execution id in order to resume a workflow or workflow name "
"and service id with inputs")
self._is_resume = execution_id is not None
self._model_storage = model_storage
self._resource_storage = resource_storage
# the IDs are stored rather than the models themselves, so this module could be used
# by several threads without raising errors on model objects shared between threads
if self._is_resume:
self._execution_id = execution_id
self._service_id = self.execution.service.id
self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
else:
self._service_id = service_id
self._workflow_name = workflow_name
self._validate_workflow_exists_for_service()
self._execution_id = self._create_execution_model(inputs).id
self._workflow_context = WorkflowContext(
name=self.__class__.__name__,
model_storage=self._model_storage,
resource_storage=resource_storage,
service_id=service_id,
execution_id=self._execution_id,
workflow_name=self._workflow_name,
task_max_attempts=task_max_attempts,
task_retry_interval=task_retry_interval)
# Set default executor and kwargs
executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
# transforming the execution inputs to dict, to pass them to the workflow function
execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
if not self._is_resume:
workflow_fn = self._get_workflow_fn()
self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
compiler.compile(self._tasks_graph)
self._engine = engine.Engine(executors={executor.__class__: executor})
@property
def execution_id(self):
return self._execution_id
@property
def execution(self):
return self._model_storage.execution.get(self.execution_id)
@property
def service(self):
return self._model_storage.service.get(self._service_id)
def execute(self):
self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
def _create_execution_model(self, inputs):
execution = models.Execution(
created_at=datetime.utcnow(),
service=self.service,
workflow_name=self._workflow_name,
inputs={})
if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
workflow_inputs = dict() # built-in workflows don't have any inputs
else:
workflow_inputs = self.service.workflows[self._workflow_name].inputs
execution.inputs = modeling_utils.merge_parameter_values(inputs,
workflow_inputs,
model_cls=models.Input)
# TODO: these two following calls should execute atomically
self._validate_no_active_executions(execution)
self._model_storage.execution.put(execution)
return execution
def _validate_workflow_exists_for_service(self):
if self._workflow_name not in self.service.workflows and \
self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
raise exceptions.UndeclaredWorkflowError(
'No workflow policy {0} declared in service {1}'
.format(self._workflow_name, self.service.name))
def _validate_no_active_executions(self, execution):
active_executions = [e for e in self.service.executions if e.is_active()]
if active_executions:
raise exceptions.ActiveExecutionsError(
"Can't start execution; Service {0} has an active execution with ID {1}"
.format(self.service.name, active_executions[0].id))
def _get_workflow_fn(self):
if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
self._workflow_name))
workflow = self.service.workflows[self._workflow_name]
# TODO: Custom workflow support needs improvement, currently this code uses internal
# knowledge of the resource storage; Instead, workflows should probably be loaded
# in a similar manner to operation plugins. Also consider passing to import_fullname
# as paths instead of appending to sys path.
service_template_resources_path = os.path.join(
self._resource_storage.service_template.base_path,
str(self.service.service_template.id))
sys.path.append(service_template_resources_path)
try:
workflow_fn = import_fullname(workflow.function)
except ImportError:
raise exceptions.WorkflowImplementationNotFoundError(
'Could not find workflow {0} function at {1}'.format(
self._workflow_name, workflow.function))
return workflow_fn