ARIA-237 Support for resuming failed workflow executions

Support for resuming failed workflow. It is now possible to rerun
failed tasks.

Additional changes:
* When a task succeeds, the attempt_counter is moved forward.
* Fixed an issue with the cli usage of resumable workflows.
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index ea70af5..4783442 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -157,10 +157,8 @@
 @executions.command(name='resume',
                     short_help='Resume a stopped execution')
 @aria.argument('execution-id')
-@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
 @aria.options.dry_execution
-@aria.options.task_max_attempts()
-@aria.options.task_retry_interval()
+@aria.options.retry_failed_tasks
 @aria.options.mark_pattern()
 @aria.options.verbose()
 @aria.pass_model_storage
@@ -168,9 +166,8 @@
 @aria.pass_plugin_manager
 @aria.pass_logger
 def resume(execution_id,
+           retry_failed_tasks,
            dry,
-           task_max_attempts,
-           task_retry_interval,
            mark_pattern,
            model_storage,
            resource_storage,
@@ -194,8 +191,7 @@
     workflow_runner = \
         WorkflowRunner(
             model_storage, resource_storage, plugin_manager,
-            execution_id=execution_id, executor=executor,
-            task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+            execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor,
         )
 
     logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
diff --git a/aria/cli/core/aria.py b/aria/cli/core/aria.py
index 515c06a..b84507c 100644
--- a/aria/cli/core/aria.py
+++ b/aria/cli/core/aria.py
@@ -310,6 +310,12 @@
             is_flag=True,
             help=helptexts.DRY_EXECUTION)
 
+        self.retry_failed_tasks = click.option(
+            '--retry-failed-tasks',
+            is_flag=True,
+            help=helptexts.RETRY_FAILED_TASK
+        )
+
         self.reset_config = click.option(
             '--reset-config',
             is_flag=True,
diff --git a/aria/cli/helptexts.py b/aria/cli/helptexts.py
index a5d41e8..5ab353a 100644
--- a/aria/cli/helptexts.py
+++ b/aria/cli/helptexts.py
@@ -46,6 +46,7 @@
     "How many times should a task be attempted in case of failures [default: {0}]"
 DRY_EXECUTION = "Execute a workflow dry run (prints operations information without causing side " \
                 "effects)"
+RETRY_FAILED_TASK = "Retry tasks that failed in the previous execution attempt"
 IGNORE_AVAILABLE_NODES = "Delete the service even if it has available nodes"
 SORT_BY = "Key for sorting the list"
 DESCENDING = "Sort list in descending order [default: False]"
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index df2643e..4d4f0fe 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@
         PENDING: (STARTED, CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
         CANCELLING: END_STATES,
-        CANCELLED: PENDING
+        # Retrying
+        CANCELLED: PENDING,
+        FAILED: PENDING
     }
 
     # region one_to_many relationships
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 47270c0..a85e7d3 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@
 class WorkflowRunner(object):
 
     def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+                 execution_id=None, retry_failed_tasks=False,
+                 service_id=None, workflow_name=None, inputs=None, executor=None,
                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
@@ -62,6 +63,7 @@
                 "and service id with inputs")
 
         self._is_resume = execution_id is not None
+        self._retry_failed_tasks = retry_failed_tasks
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
@@ -116,7 +118,9 @@
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+        self._engine.execute(ctx=self._workflow_context,
+                             resuming=self._is_resume,
+                             retry_failed=self._retry_failed_tasks)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..0ec3cd8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,15 @@
         self._executors = executors.copy()
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
-    def execute(self, ctx, resuming=False):
+    def execute(self, ctx, resuming=False, retry_failed=False):
         """
         Executes the workflow.
         """
         if resuming:
-            events.on_resume_workflow_signal.send(ctx)
+            events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
 
         tasks_tracker = _TasksTracker(ctx)
+
         try:
             events.start_workflow_signal.send(ctx)
             while True:
@@ -124,8 +125,10 @@
 
 
 class _TasksTracker(object):
+
     def __init__(self, ctx):
         self._ctx = ctx
+
         self._tasks = ctx.execution.tasks
         self._executed_tasks = [task for task in self._tasks if task.has_ended()]
         self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
@@ -155,7 +158,7 @@
     def executable_tasks(self):
         now = datetime.utcnow()
         # we need both lists since retrying task are in the executing task list.
-        for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
+        for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
             if all([task.is_waiting(),
                     task.due_at <= now,
                     all(dependency in self._executed_tasks for dependency in task.dependencies)
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 37801de..219d2df 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -71,6 +71,7 @@
     with ctx.persist_changes:
         ctx.task.ended_at = datetime.utcnow()
         ctx.task.status = ctx.task.SUCCESS
+        ctx.task.attempts_count += 1
 
         _update_node_state_if_necessary(ctx)
 
@@ -119,7 +120,7 @@
 
 
 @events.on_resume_workflow_signal.connect
-def _workflow_resume(workflow_context, *args, **kwargs):
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
     with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.PENDING
@@ -128,6 +129,13 @@
             if not task.has_ended():
                 task.status = task.PENDING
 
+        if retry_failed:
+            for task in execution.tasks:
+                if task.status == task.FAILED and not task.ignore_failure:
+                    task.attempts_count = 0
+                    task.status = task.PENDING
+
+
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index e1167fc..25b4080 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -324,8 +324,7 @@
             Execution.STARTED: [Execution.PENDING],
             Execution.CANCELLING: [Execution.PENDING,
                                    Execution.STARTED],
-            Execution.FAILED: [Execution.PENDING,
-                               Execution.STARTED,
+            Execution.FAILED: [Execution.STARTED,
                                Execution.SUCCEEDED,
                                Execution.CANCELLED,
                                Execution.CANCELLING],
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index a77d727..30176ae 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -51,7 +51,7 @@
     'is_resumed': Event(),
     'is_active': Event(),
     'execution_cancelled': Event(),
-    'execution_ended': Event()
+    'execution_failed': Event(),
 }
 
 
@@ -166,7 +166,8 @@
         assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow'
 
         mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context,
-                                                    resuming=False)
+                                                    resuming=False,
+                                                    retry_failed=False)
 
 
 def test_cancel_execution(request):
@@ -358,10 +359,11 @@
     def test_resume_workflow(self, workflow_context, thread_executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_resuming_task)
+        self._create_interface(workflow_context, node, mock_pass_first_task_only)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_parallel_workflow, thread_executor)
+            workflow_context, mock_parallel_tasks_workflow, thread_executor,
+            inputs={'number_of_tasks': 2})
 
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
@@ -369,6 +371,7 @@
 
         # Wait for the execution to start
         self._wait_for_active_and_cancel(wf_runner)
+        node = workflow_context.model.node.refresh(node)
 
         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
         assert any(task.status == task.SUCCESS for task in tasks)
@@ -390,6 +393,7 @@
         new_wf_runner.execute()
 
         # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
         assert all(task.status == task.SUCCESS for task in tasks)
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -400,13 +404,15 @@
         self._create_interface(workflow_context, node, mock_stuck_task)
 
         wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_single_task_workflow, thread_executor)
+            workflow_context, mock_parallel_tasks_workflow, thread_executor,
+            inputs={'number_of_tasks': 1})
 
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
         wf_thread.start()
 
         self._wait_for_active_and_cancel(wf_runner)
+        node = workflow_context.model.node.refresh(node)
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
         assert node.attributes['invocations'].value == 1
         assert task.status == task.STARTED
@@ -430,6 +436,7 @@
             new_thread_executor.close()
 
         # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
         assert node.attributes['invocations'].value == 2
         assert task.status == task.SUCCESS
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
@@ -439,13 +446,15 @@
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_failed_before_resuming)
 
-        wf_runner = self._create_initial_workflow_runner(
-            workflow_context, mock_single_task_workflow, thread_executor)
+        wf_runner = self._create_initial_workflow_runner(workflow_context,
+                                                         mock_parallel_tasks_workflow,
+                                                         thread_executor)
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.setDaemon(True)
         wf_thread.start()
 
         self._wait_for_active_and_cancel(wf_runner)
+        node = workflow_context.model.node.refresh(node)
 
         task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
         assert node.attributes['invocations'].value == 2
@@ -474,10 +483,114 @@
             new_thread_executor.close()
 
         # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
         assert node.attributes['invocations'].value == task.max_attempts - 1
         assert task.status == task.SUCCESS
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
+    def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_pass_first_task_only)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context,
+            mock_parallel_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
+        )
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 3
+        failed_task = [t for t in tasks if t.status == t.FAILED][0]
+
+        # First task passes
+        assert any(task.status == task.FAILED for task in tasks)
+        assert failed_task.attempts_count == 2
+        # Second task fails
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert wf_runner.execution.status in wf_runner.execution.FAILED
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                retry_failed_tasks=True,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert failed_task.attempts_count == 1
+        assert node.attributes['invocations'].value == 4
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+    def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_fail_first_task_only)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context,
+            mock_sequential_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+        )
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 1
+        assert any(t.status == t.FAILED for t in tasks)
+        assert any(t.status == t.PENDING for t in tasks)
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+        assert any(t.status == t.SUCCESS for t in tasks)
+        assert any(t.status == t.FAILED for t in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+
+
     @staticmethod
     @pytest.fixture
     def thread_executor():
@@ -524,51 +637,42 @@
         def execution_cancelled(*args, **kwargs):
             custom_events['execution_cancelled'].set()
 
-        def execution_ended(*args, **kwargs):
-            custom_events['execution_ended'].set()
+        def execution_failed(*args, **kwargs):
+            custom_events['execution_failed'].set()
 
         events.on_cancelled_workflow_signal.connect(execution_cancelled)
-        events.on_failure_workflow_signal.connect(execution_ended)
+        events.on_failure_workflow_signal.connect(execution_failed)
         yield
         events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
-        events.on_failure_workflow_signal.disconnect(execution_ended)
+        events.on_failure_workflow_signal.disconnect(execution_failed)
         for event in custom_events.values():
             event.clear()
 
 
 @workflow
-def mock_parallel_workflow(ctx, graph):
+def mock_sequential_tasks_workflow(ctx, graph,
+                                   retry_interval=1, max_attempts=10, number_of_tasks=1):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(
-        api.task.OperationTask(
-            node, interface_name='aria.interfaces.lifecycle', operation_name='create'),
-        api.task.OperationTask(
-            node, interface_name='aria.interfaces.lifecycle', operation_name='create')
-    )
-
-
-@operation
-def mock_resuming_task(ctx):
-    ctx.node.attributes['invocations'] += 1
-
-    if ctx.node.attributes['invocations'] != 1:
-        custom_events['is_active'].set()
-        if not custom_events['is_resumed'].isSet():
-            # if resume was called, increase by one. o/w fail the execution - second task should
-            # fail as long it was not a part of resuming the workflow
-            raise FailingTask("wasn't resumed yet")
+    graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
 
 
 @workflow
-def mock_single_task_workflow(ctx, graph):
+def mock_parallel_tasks_workflow(ctx, graph,
+                                 retry_interval=1, max_attempts=10, number_of_tasks=1):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    graph.add_tasks(
+    graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
+    return [
         api.task.OperationTask(node,
-                               interface_name='aria.interfaces.lifecycle',
-                               operation_name='create',
-                               retry_interval=1,
-                               max_attempts=10),
-    )
+                               'aria.interfaces.lifecycle',
+                               'create',
+                               retry_interval=retry_interval,
+                               max_attempts=max_attempts)
+        for _ in xrange(number_of_tasks)
+    ]
+
 
 
 @operation
@@ -600,3 +704,23 @@
         if not custom_events['is_active'].isSet():
             custom_events['is_active'].set()
         time.sleep(5)
+
+
+@operation
+def mock_pass_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if ctx.node.attributes['invocations'] != 1:
+        custom_events['is_active'].set()
+        if not custom_events['is_resumed'].isSet():
+            # if resume was called, increase by one. o/w fail the execution - second task should
+            # fail as long it was not a part of resuming the workflow
+            raise FailingTask("wasn't resumed yet")
+
+
+@operation
+def mock_fail_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1:
+        raise FailingTask("First task should fail")