ARIA-237 Support for resuming failed workflow executions
Support for resuming failed workflow. It is now possible to rerun
failed tasks.
Additional changes:
* When a task succeeds, the attempt_counter is moved forward.
* Fixed an issue with the cli usage of resumable workflows.
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index ea70af5..4783442 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -157,10 +157,8 @@
@executions.command(name='resume',
short_help='Resume a stopped execution')
@aria.argument('execution-id')
-@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
@aria.options.dry_execution
-@aria.options.task_max_attempts()
-@aria.options.task_retry_interval()
+@aria.options.retry_failed_tasks
@aria.options.mark_pattern()
@aria.options.verbose()
@aria.pass_model_storage
@@ -168,9 +166,8 @@
@aria.pass_plugin_manager
@aria.pass_logger
def resume(execution_id,
+ retry_failed_tasks,
dry,
- task_max_attempts,
- task_retry_interval,
mark_pattern,
model_storage,
resource_storage,
@@ -194,8 +191,7 @@
workflow_runner = \
WorkflowRunner(
model_storage, resource_storage, plugin_manager,
- execution_id=execution_id, executor=executor,
- task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+ execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor,
)
logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
diff --git a/aria/cli/core/aria.py b/aria/cli/core/aria.py
index 515c06a..b84507c 100644
--- a/aria/cli/core/aria.py
+++ b/aria/cli/core/aria.py
@@ -310,6 +310,12 @@
is_flag=True,
help=helptexts.DRY_EXECUTION)
+ self.retry_failed_tasks = click.option(
+ '--retry-failed-tasks',
+ is_flag=True,
+ help=helptexts.RETRY_FAILED_TASK
+ )
+
self.reset_config = click.option(
'--reset-config',
is_flag=True,
diff --git a/aria/cli/helptexts.py b/aria/cli/helptexts.py
index a5d41e8..5ab353a 100644
--- a/aria/cli/helptexts.py
+++ b/aria/cli/helptexts.py
@@ -46,6 +46,7 @@
"How many times should a task be attempted in case of failures [default: {0}]"
DRY_EXECUTION = "Execute a workflow dry run (prints operations information without causing side " \
"effects)"
+RETRY_FAILED_TASK = "Retry tasks that failed in the previous execution attempt"
IGNORE_AVAILABLE_NODES = "Delete the service even if it has available nodes"
SORT_BY = "Key for sorting the list"
DESCENDING = "Sort list in descending order [default: False]"
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index df2643e..4d4f0fe 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@
PENDING: (STARTED, CANCELLED),
STARTED: END_STATES + (CANCELLING,),
CANCELLING: END_STATES,
- CANCELLED: PENDING
+ # Retrying
+ CANCELLED: PENDING,
+ FAILED: PENDING
}
# region one_to_many relationships
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 47270c0..a85e7d3 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@
class WorkflowRunner(object):
def __init__(self, model_storage, resource_storage, plugin_manager,
- execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+ execution_id=None, retry_failed_tasks=False,
+ service_id=None, workflow_name=None, inputs=None, executor=None,
task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
"""
@@ -62,6 +63,7 @@
"and service id with inputs")
self._is_resume = execution_id is not None
+ self._retry_failed_tasks = retry_failed_tasks
self._model_storage = model_storage
self._resource_storage = resource_storage
@@ -116,7 +118,9 @@
return self._model_storage.service.get(self._service_id)
def execute(self):
- self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+ self._engine.execute(ctx=self._workflow_context,
+ resuming=self._is_resume,
+ retry_failed=self._retry_failed_tasks)
def cancel(self):
self._engine.cancel_execution(ctx=self._workflow_context)
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..0ec3cd8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,15 @@
self._executors = executors.copy()
self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
- def execute(self, ctx, resuming=False):
+ def execute(self, ctx, resuming=False, retry_failed=False):
"""
Executes the workflow.
"""
if resuming:
- events.on_resume_workflow_signal.send(ctx)
+ events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
tasks_tracker = _TasksTracker(ctx)
+
try:
events.start_workflow_signal.send(ctx)
while True:
@@ -124,8 +125,10 @@
class _TasksTracker(object):
+
def __init__(self, ctx):
self._ctx = ctx
+
self._tasks = ctx.execution.tasks
self._executed_tasks = [task for task in self._tasks if task.has_ended()]
self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
@@ -155,7 +158,7 @@
def executable_tasks(self):
now = datetime.utcnow()
# we need both lists since retrying task are in the executing task list.
- for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
+ for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
if all([task.is_waiting(),
task.due_at <= now,
all(dependency in self._executed_tasks for dependency in task.dependencies)
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 37801de..219d2df 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -71,6 +71,7 @@
with ctx.persist_changes:
ctx.task.ended_at = datetime.utcnow()
ctx.task.status = ctx.task.SUCCESS
+ ctx.task.attempts_count += 1
_update_node_state_if_necessary(ctx)
@@ -119,7 +120,7 @@
@events.on_resume_workflow_signal.connect
-def _workflow_resume(workflow_context, *args, **kwargs):
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
with workflow_context.persist_changes:
execution = workflow_context.execution
execution.status = execution.PENDING
@@ -128,6 +129,13 @@
if not task.has_ended():
task.status = task.PENDING
+ if retry_failed:
+ for task in execution.tasks:
+ if task.status == task.FAILED and not task.ignore_failure:
+ task.attempts_count = 0
+ task.status = task.PENDING
+
+
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index e1167fc..25b4080 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -324,8 +324,7 @@
Execution.STARTED: [Execution.PENDING],
Execution.CANCELLING: [Execution.PENDING,
Execution.STARTED],
- Execution.FAILED: [Execution.PENDING,
- Execution.STARTED,
+ Execution.FAILED: [Execution.STARTED,
Execution.SUCCEEDED,
Execution.CANCELLED,
Execution.CANCELLING],
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index a77d727..30176ae 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -51,7 +51,7 @@
'is_resumed': Event(),
'is_active': Event(),
'execution_cancelled': Event(),
- 'execution_ended': Event()
+ 'execution_failed': Event(),
}
@@ -166,7 +166,8 @@
assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
- resuming=False)
+ resuming=False,
+ retry_failed=False)
def test_cancel_execution(request):
@@ -358,10 +359,11 @@
def test_resume_workflow(self, workflow_context, thread_executor):
node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
- self._create_interface(workflow_context, node, mock_resuming_task)
+ self._create_interface(workflow_context, node, mock_pass_first_task_only)
wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_parallel_workflow, thread_executor)
+ workflow_context, mock_parallel_tasks_workflow, thread_executor,
+ inputs={'number_of_tasks': 2})
wf_thread = Thread(target=wf_runner.execute)
wf_thread.daemon = True
@@ -369,6 +371,7 @@
# Wait for the execution to start
self._wait_for_active_and_cancel(wf_runner)
+ node = workflow_context.model.node.refresh(node)
tasks = workflow_context.model.task.list(filters={'_stub_type': None})
assert any(task.status == task.SUCCESS for task in tasks)
@@ -390,6 +393,7 @@
new_wf_runner.execute()
# Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
assert all(task.status == task.SUCCESS for task in tasks)
assert node.attributes['invocations'].value == 3
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -400,13 +404,15 @@
self._create_interface(workflow_context, node, mock_stuck_task)
wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_single_task_workflow, thread_executor)
+ workflow_context, mock_parallel_tasks_workflow, thread_executor,
+ inputs={'number_of_tasks': 1})
wf_thread = Thread(target=wf_runner.execute)
wf_thread.daemon = True
wf_thread.start()
self._wait_for_active_and_cancel(wf_runner)
+ node = workflow_context.model.node.refresh(node)
task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 1
assert task.status == task.STARTED
@@ -430,6 +436,7 @@
new_thread_executor.close()
# Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
assert node.attributes['invocations'].value == 2
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -439,13 +446,15 @@
node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
self._create_interface(workflow_context, node, mock_failed_before_resuming)
- wf_runner = self._create_initial_workflow_runner(
- workflow_context, mock_single_task_workflow, thread_executor)
+ wf_runner = self._create_initial_workflow_runner(workflow_context,
+ mock_parallel_tasks_workflow,
+ thread_executor)
wf_thread = Thread(target=wf_runner.execute)
wf_thread.setDaemon(True)
wf_thread.start()
self._wait_for_active_and_cancel(wf_runner)
+ node = workflow_context.model.node.refresh(node)
task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
assert node.attributes['invocations'].value == 2
@@ -474,10 +483,114 @@
new_thread_executor.close()
# Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
assert node.attributes['invocations'].value == task.max_attempts - 1
assert task.status == task.SUCCESS
assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+ def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_pass_first_task_only)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context,
+ mock_parallel_tasks_workflow,
+ thread_executor,
+ inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
+ )
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ if custom_events['execution_failed'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 3
+ failed_task = [t for t in tasks if t.status == t.FAILED][0]
+
+ # First task passes
+ assert any(task.status == task.FAILED for task in tasks)
+ assert failed_task.attempts_count == 2
+ # Second task fails
+ assert any(task.status == task.SUCCESS for task in tasks)
+ assert wf_runner.execution.status in wf_runner.execution.FAILED
+
+ custom_events['is_resumed'].set()
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ retry_failed_tasks=True,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=new_thread_executor)
+
+ new_wf_runner.execute()
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert failed_task.attempts_count == 1
+ assert node.attributes['invocations'].value == 4
+ assert all(task.status == task.SUCCESS for task in tasks)
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+ def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
+ node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+ node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+ self._create_interface(workflow_context, node, mock_fail_first_task_only)
+
+ wf_runner = self._create_initial_workflow_runner(
+ workflow_context,
+ mock_sequential_tasks_workflow,
+ thread_executor,
+ inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+ )
+ wf_thread = Thread(target=wf_runner.execute)
+ wf_thread.setDaemon(True)
+ wf_thread.start()
+
+ if custom_events['execution_failed'].wait(60) is False:
+ raise TimeoutError("Execution did not end")
+
+ tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 1
+ assert any(t.status == t.FAILED for t in tasks)
+ assert any(t.status == t.PENDING for t in tasks)
+
+ custom_events['is_resumed'].set()
+ new_thread_executor = thread.ThreadExecutor()
+ try:
+ new_wf_runner = WorkflowRunner(
+ service_id=wf_runner.service.id,
+ inputs={},
+ model_storage=workflow_context.model,
+ resource_storage=workflow_context.resource,
+ plugin_manager=None,
+ execution_id=wf_runner.execution.id,
+ executor=new_thread_executor)
+
+ new_wf_runner.execute()
+ finally:
+ new_thread_executor.close()
+
+ # Wait for it to finish and assert changes.
+ node = workflow_context.model.node.refresh(node)
+ assert node.attributes['invocations'].value == 2
+ assert any(t.status == t.SUCCESS for t in tasks)
+ assert any(t.status == t.FAILED for t in tasks)
+ assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+
+
@staticmethod
@pytest.fixture
def thread_executor():
@@ -524,51 +637,42 @@
def execution_cancelled(*args, **kwargs):
custom_events['execution_cancelled'].set()
- def execution_ended(*args, **kwargs):
- custom_events['execution_ended'].set()
+ def execution_failed(*args, **kwargs):
+ custom_events['execution_failed'].set()
events.on_cancelled_workflow_signal.connect(execution_cancelled)
- events.on_failure_workflow_signal.connect(execution_ended)
+ events.on_failure_workflow_signal.connect(execution_failed)
yield
events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
- events.on_failure_workflow_signal.disconnect(execution_ended)
+ events.on_failure_workflow_signal.disconnect(execution_failed)
for event in custom_events.values():
event.clear()
@workflow
-def mock_parallel_workflow(ctx, graph):
+def mock_sequential_tasks_workflow(ctx, graph,
+ retry_interval=1, max_attempts=10, number_of_tasks=1):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- graph.add_tasks(
- api.task.OperationTask(
- node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
- api.task.OperationTask(
- node, interface_name='aria.interfaces.lifecycle', operation_name='create')
- )
-
-
-@operation
-def mock_resuming_task(ctx):
- ctx.node.attributes['invocations'] += 1
-
- if ctx.node.attributes['invocations'] != 1:
- custom_events['is_active'].set()
- if not custom_events['is_resumed'].isSet():
- # if resume was called, increase by one. o/w fail the execution - second task should
- # fail as long it was not a part of resuming the workflow
- raise FailingTask("wasn't resumed yet")
+ graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
@workflow
-def mock_single_task_workflow(ctx, graph):
+def mock_parallel_tasks_workflow(ctx, graph,
+ retry_interval=1, max_attempts=10, number_of_tasks=1):
node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
- graph.add_tasks(
+ graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
+ return [
api.task.OperationTask(node,
- interface_name='aria.interfaces.lifecycle',
- operation_name='create',
- retry_interval=1,
- max_attempts=10),
- )
+ 'aria.interfaces.lifecycle',
+ 'create',
+ retry_interval=retry_interval,
+ max_attempts=max_attempts)
+ for _ in xrange(number_of_tasks)
+ ]
+
@operation
@@ -600,3 +704,23 @@
if not custom_events['is_active'].isSet():
custom_events['is_active'].set()
time.sleep(5)
+
+
+@operation
+def mock_pass_first_task_only(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ if ctx.node.attributes['invocations'] != 1:
+ custom_events['is_active'].set()
+ if not custom_events['is_resumed'].isSet():
+ # if resume was called, increase by one. o/w fail the execution - second task should
+ # fail as long it was not a part of resuming the workflow
+ raise FailingTask("wasn't resumed yet")
+
+
+@operation
+def mock_fail_first_task_only(ctx):
+ ctx.node.attributes['invocations'] += 1
+
+ if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
+ raise FailingTask("First task should fail")