ARIA-414 Current events handler mechanism relies on sqlalchemy
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 7f6612e..449e282 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -18,7 +18,6 @@
"""
import threading
-from contextlib import contextmanager
import aria
from aria.utils import file
@@ -106,12 +105,6 @@
self.model.log._session.remove()
self.model.log._engine.dispose()
- @property
- @contextmanager
- def persist_changes(self):
- yield
- self.model.task.update(self.task)
-
class NodeOperationContext(BaseOperationContext):
"""
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 5a323a6..c1868f8 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -93,12 +93,6 @@
}
)
- @property
- @contextmanager
- def persist_changes(self):
- yield
- self._model.execution.update(self.execution)
-
class _CurrentContext(threading.local):
"""
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 067d0c3..7cec6f4 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -28,126 +28,130 @@
@events.sent_task_signal.connect
def _task_sent(ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.status = ctx.task.SENT
+ task = ctx.task
+ task.status = ctx.task.SENT
+ ctx.model.task.update(task)
@events.start_task_signal.connect
def _task_started(ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.started_at = datetime.utcnow()
- ctx.task.status = ctx.task.STARTED
- _update_node_state_if_necessary(ctx, is_transitional=True)
+ task = ctx.task
+ ctx.task.started_at = datetime.utcnow()
+ ctx.task.status = ctx.task.STARTED
+ _update_node_state_if_necessary(ctx, is_transitional=True)
+ ctx.model.task.update(task)
@events.on_failure_task_signal.connect
def _task_failed(ctx, exception, *args, **kwargs):
- with ctx.persist_changes:
- should_retry = all([
- not isinstance(exception, exceptions.TaskAbortException),
- ctx.task.attempts_count < ctx.task.max_attempts or
- ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
- # ignore_failure check here means the task will not be retried and it will be marked
- # as failed. The engine will also look at ignore_failure so it won't fail the
- # workflow.
- not ctx.task.ignore_failure
- ])
- if should_retry:
- retry_interval = None
- if isinstance(exception, exceptions.TaskRetryException):
- retry_interval = exception.retry_interval
- if retry_interval is None:
- retry_interval = ctx.task.retry_interval
- ctx.task.status = ctx.task.RETRYING
- ctx.task.attempts_count += 1
- ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
- else:
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.FAILED
+ task = ctx.task
+ should_retry = all([
+ not isinstance(exception, exceptions.TaskAbortException),
+ task.attempts_count < task.max_attempts or
+ task.max_attempts == task.INFINITE_RETRIES,
+ # ignore_failure check here means the task will not be retried and it will be marked
+ # as failed. The engine will also look at ignore_failure so it won't fail the
+ # workflow.
+ not task.ignore_failure
+ ])
+ if should_retry:
+ retry_interval = None
+ if isinstance(exception, exceptions.TaskRetryException):
+ retry_interval = exception.retry_interval
+ if retry_interval is None:
+ retry_interval = ctx.task.retry_interval
+ task.status = task.RETRYING
+ task.attempts_count += 1
+ task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+ else:
+ task.ended_at = datetime.utcnow()
+ task.status = task.FAILED
+ ctx.model.task.update(task)
@events.on_success_task_signal.connect
def _task_succeeded(ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.SUCCESS
- ctx.task.attempts_count += 1
+ task = ctx.task
+ ctx.task.ended_at = datetime.utcnow()
+ ctx.task.status = ctx.task.SUCCESS
+ ctx.task.attempts_count += 1
- _update_node_state_if_necessary(ctx)
+ _update_node_state_if_necessary(ctx)
+ ctx.model.task.update(task)
@events.start_workflow_signal.connect
def _workflow_started(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- # the execution may already be in the process of cancelling
- if execution.status in (execution.CANCELLING, execution.CANCELLED):
- return
- execution.status = execution.STARTED
- execution.started_at = datetime.utcnow()
+ execution = workflow_context.execution
+ # the execution may already be in the process of cancelling
+ if execution.status in (execution.CANCELLING, execution.CANCELLED):
+ return
+ execution.status = execution.STARTED
+ execution.started_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_failure_workflow_signal.connect
def _workflow_failed(workflow_context, exception, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- execution.error = str(exception)
- execution.status = execution.FAILED
- execution.ended_at = datetime.utcnow()
+ execution = workflow_context.execution
+ execution.error = str(exception)
+ execution.status = execution.FAILED
+ execution.ended_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_success_workflow_signal.connect
def _workflow_succeeded(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- execution.status = execution.SUCCEEDED
- execution.ended_at = datetime.utcnow()
+ execution = workflow_context.execution
+ execution.status = execution.SUCCEEDED
+ execution.ended_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_cancelled_workflow_signal.connect
def _workflow_cancelled(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- # _workflow_cancelling function may have called this function already
- if execution.status == execution.CANCELLED:
- return
- # the execution may have already been finished
- elif execution.status in (execution.SUCCEEDED, execution.FAILED):
- _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
- else:
- execution.status = execution.CANCELLED
- execution.ended_at = datetime.utcnow()
+ execution = workflow_context.execution
+ # _workflow_cancelling function may have called this function already
+ if execution.status == execution.CANCELLED:
+ return
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLED
+ execution.ended_at = datetime.utcnow()
+ workflow_context.model.execution.update(execution)
@events.on_resume_workflow_signal.connect
def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- execution.status = execution.PENDING
- # Any non ended task would be put back to pending state
- for task in execution.tasks:
- if not task.has_ended():
- task.status = task.PENDING
+ execution = workflow_context.execution
+ execution.status = execution.PENDING
+ # Any non ended task would be put back to pending state
+ for task in execution.tasks:
+ if not task.has_ended():
+ task.status = task.PENDING
- if retry_failed:
- for task in execution.tasks:
- if task.status == task.FAILED and not task.ignore_failure:
- task.attempts_count = 0
- task.status = task.PENDING
+ if retry_failed:
+ for task in execution.tasks:
+ if task.status == task.FAILED and not task.ignore_failure:
+ task.attempts_count = 0
+ task.status = task.PENDING
+ workflow_context.model.execution.update(execution)
@events.on_cancelling_workflow_signal.connect
def _workflow_cancelling(workflow_context, *args, **kwargs):
- with workflow_context.persist_changes:
- execution = workflow_context.execution
- if execution.status == execution.PENDING:
- return _workflow_cancelled(workflow_context=workflow_context)
- # the execution may have already been finished
- elif execution.status in (execution.SUCCEEDED, execution.FAILED):
- _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
- else:
- execution.status = execution.CANCELLING
+ execution = workflow_context.execution
+ if execution.status == execution.PENDING:
+ return _workflow_cancelled(workflow_context=workflow_context)
+ # the execution may have already been finished
+ elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+ _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+ else:
+ execution.status = execution.CANCELLING
+ workflow_context.model.execution.update(execution)
def _update_node_state_if_necessary(ctx, is_transitional=False):
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index e7d03ea..d550b53 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -71,5 +71,6 @@
class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method
def execute(self, ctx, *args, **kwargs):
- with ctx.persist_changes:
- ctx.task.status = ctx.task.SUCCESS
+ task = ctx.task
+ task.status = ctx.task.SUCCESS
+ ctx.model.task.update(task)
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index bdb0eaf..797cd0d 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -27,28 +27,29 @@
Dry task executor: prints task information without causing any side effects.
"""
def execute(self, ctx):
- with ctx.persist_changes:
- # updating the task manually instead of calling self._task_started(task),
- # to avoid any side effects raising that event might cause
- ctx.task.started_at = datetime.utcnow()
- ctx.task.status = ctx.task.STARTED
+ task = ctx.task
+ # updating the task manually instead of calling self._task_started(task),
+ # to avoid any side effects raising that event might cause
+ task.started_at = datetime.utcnow()
+ task.status = task.STARTED
- dry_msg = u'<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
- logger = ctx.logger.info if ctx.task.function else ctx.logger.debug
+ dry_msg = '<dry> {name} {task.interface_name}.{task.operation_name} {suffix}'
+ logger = ctx.logger.info if task.function else ctx.logger.debug
- if hasattr(ctx.task.actor, 'source_node'):
- name = u'{source_node.name}->{target_node.name}'.format(
- source_node=ctx.task.actor.source_node, target_node=ctx.task.actor.target_node)
- else:
- name = ctx.task.actor.name
+ if hasattr(task.actor, 'source_node'):
+ name = '{source_node.name}->{target_node.name}'.format(
+ source_node=task.actor.source_node, target_node=task.actor.target_node)
+ else:
+ name = task.actor.name
- if ctx.task.function:
- logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
- logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
- else:
- logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
+ if task.function:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='started...'))
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='successful'))
+ else:
+ logger(dry_msg.format(name=name, task=ctx.task, suffix='has no implementation'))
- # updating the task manually instead of calling self._task_succeeded(task),
- # to avoid any side effects raising that event might cause
- ctx.task.ended_at = datetime.utcnow()
- ctx.task.status = ctx.task.SUCCESS
+ # updating the task manually instead of calling self._task_succeeded(task),
+ # to avoid any side effects raising that event might cause
+ task.ended_at = datetime.utcnow()
+ task.status = task.SUCCESS
+ ctx.model.task.update(task)
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 99d0b39..5c25fa2 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -25,10 +25,12 @@
INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS
- def __init__(self, storage, task_kwargs=None):
+ def __init__(self, storage=None, task_kwargs=None):
self.logger = logging.getLogger('mock_logger')
self._task_kwargs = task_kwargs or {}
- self._storage = storage
+ import mock
+ self._storage = storage or mock.MagicMock()
+ self._storage_kwargs = self._storage.serialization_dict if storage else None
self.task = MockTask(storage, **task_kwargs)
self.states = []
self.exception = None
@@ -38,7 +40,7 @@
return {
'context_cls': self.__class__,
'context': {
- 'storage_kwargs': self._storage.serialization_dict,
+ 'storage_kwargs': self._storage_kwargs,
'task_kwargs': self._task_kwargs
}
}
@@ -55,13 +57,11 @@
@classmethod
def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None):
- return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
- task_kwargs=(task_kwargs or {}))
-
- @property
- @contextmanager
- def persist_changes(self):
- yield
+ if storage_kwargs:
+ return cls(storage=aria.application_model_storage(**(storage_kwargs or {})),
+ task_kwargs=task_kwargs)
+ else:
+ return cls(task_kwargs=task_kwargs)
class MockActor(object):
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 32a68e0..e537258 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -35,38 +35,40 @@
)
import tests
+from .. import helpers
from . import MockContext
+
def _get_function(func):
return '{module}.{func.__name__}'.format(module=__name__, func=func)
def execute_and_assert(executor, storage=None):
expected_value = 'value'
- successful_task = MockContext(
+ successful_ctx = MockContext(
storage, task_kwargs=dict(function=_get_function(mock_successful_task))
)
- failing_task = MockContext(
+ failing_ctx = MockContext(
storage, task_kwargs=dict(function=_get_function(mock_failing_task))
)
- task_with_inputs = MockContext(
+ task_with_inputs_ctx = MockContext(
storage,
task_kwargs=dict(function=_get_function(mock_task_with_input),
arguments={'input': models.Argument.wrap('input', 'value')})
)
- for task in [successful_task, failing_task, task_with_inputs]:
- executor.execute(task)
+ for ctx in [successful_ctx, failing_ctx, task_with_inputs_ctx]:
+ executor.execute(ctx)
@retrying.retry(stop_max_delay=10000, wait_fixed=100)
def assertion():
- assert successful_task.states == ['start', 'success']
- assert failing_task.states == ['start', 'failure']
- assert task_with_inputs.states == ['start', 'failure']
- assert isinstance(failing_task.exception, MockException)
- assert isinstance(task_with_inputs.exception, MockException)
- assert task_with_inputs.exception.message == expected_value
+ assert successful_ctx.states == ['start', 'success']
+ assert failing_ctx.states == ['start', 'failure']
+ assert task_with_inputs_ctx.states == ['start', 'failure']
+ assert isinstance(failing_ctx.exception, MockException)
+ assert isinstance(task_with_inputs_ctx.exception, MockException)
+ assert task_with_inputs_ctx.exception.message == expected_value
assertion()
@@ -130,20 +132,20 @@
@pytest.fixture(autouse=True)
def register_signals():
- def start_handler(task, *args, **kwargs):
- task.states.append('start')
+ def start_handler(ctx, *args, **kwargs):
+ ctx.states.append('start')
- def success_handler(task, *args, **kwargs):
- task.states.append('success')
+ def success_handler(ctx, *args, **kwargs):
+ ctx.states.append('success')
- def failure_handler(task, exception, *args, **kwargs):
- task.states.append('failure')
- task.exception = exception
-
- events.start_task_signal.connect(start_handler)
- events.on_success_task_signal.connect(success_handler)
- events.on_failure_task_signal.connect(failure_handler)
- yield
- events.start_task_signal.disconnect(start_handler)
- events.on_success_task_signal.disconnect(success_handler)
- events.on_failure_task_signal.disconnect(failure_handler)
+ def failure_handler(ctx, exception, *args, **kwargs):
+ ctx.states.append('failure')
+ ctx.exception = exception
+ with helpers.disconnect_event_handlers():
+ events.start_task_signal.connect(start_handler)
+ events.on_success_task_signal.connect(success_handler)
+ events.on_failure_task_signal.connect(failure_handler)
+ yield
+ events.start_task_signal.disconnect(start_handler)
+ events.on_success_task_signal.disconnect(success_handler)
+ events.on_failure_task_signal.disconnect(failure_handler)
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index d801ffb..94c3585 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -33,11 +33,12 @@
import tests.storage
import tests.resources
-from tests.helpers import FilesystemDataHolder
-from tests.fixtures import ( # pylint: disable=unused-import
+from tests.fixtures import ( # pylint: disable=unused-import
plugins_dir,
plugin_manager,
)
+from tests.helpers import FilesystemDataHolder
+from ..helpers import disconnect_event_handlers
from . import MockContext
@@ -48,7 +49,6 @@
model,
task_kwargs=dict(function='mock_plugin1.operation', plugin_fk=mock_plugin.id)
)
-
executor.execute(ctx)
error = queue.get(timeout=60)
# tests/resources/plugins/mock-plugin1 is the plugin installed
@@ -85,7 +85,6 @@
model.argument.put(holder_path_argument)
model.argument.put(script_path_argument)
ctx = MockContext(
- model,
task_kwargs=dict(
function='{0}.{1}'.format(__name__, freezing_task.__name__),
arguments=dict(holder_path=holder_path_argument,
@@ -124,13 +123,15 @@
def handler(_, exception=None, **kwargs):
_queue.put(exception)
- events.on_success_task_signal.connect(handler)
- events.on_failure_task_signal.connect(handler)
- try:
- yield _queue
- finally:
- events.on_success_task_signal.disconnect(handler)
- events.on_failure_task_signal.disconnect(handler)
+ with disconnect_event_handlers():
+
+ events.on_success_task_signal.connect(handler)
+ events.on_failure_task_signal.connect(handler)
+ try:
+ yield _queue
+ finally:
+ events.on_success_task_signal.disconnect(handler)
+ events.on_failure_task_signal.disconnect(handler)
@pytest.fixture
diff --git a/tests/orchestrator/workflows/helpers.py b/tests/orchestrator/workflows/helpers.py
index 8e3f9b1..b1706d5 100644
--- a/tests/orchestrator/workflows/helpers.py
+++ b/tests/orchestrator/workflows/helpers.py
@@ -15,6 +15,9 @@
from contextlib import contextmanager
+from aria.orchestrator import events
+from aria.orchestrator.workflows.core import events_handler
+
@contextmanager
def events_collector(*signals):
@@ -35,3 +38,18 @@
finally:
for signal in signals:
signal.disconnect(handlers[signal])
+
+
+@contextmanager
+def disconnect_event_handlers():
+ # disconnect the system events handler
+ events.start_task_signal.disconnect(events_handler._task_started)
+ events.on_success_task_signal.disconnect(events_handler._task_succeeded)
+ events.on_failure_task_signal.disconnect(events_handler._task_failed)
+ try:
+ yield
+ finally:
+ # reconnect the system events handler
+ events.start_task_signal.connect(events_handler._task_started)
+ events.on_success_task_signal.connect(events_handler._task_succeeded)
+ events.on_failure_task_signal.connect(events_handler._task_failed)