blob: 6163c092bb2bbeab76f7e7b1c72e405eb58e4281 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import time
import fasteners
import pytest
from aria.orchestrator import events
from aria.orchestrator.workflows.exceptions import ExecutorException
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
import tests
from tests.orchestrator.context import execute as execute_workflow
from tests.orchestrator.workflows.helpers import events_collector
from tests import mock
from tests import storage
from tests import helpers
@pytest.fixture
def dataholder(tmpdir):
dataholder_path = str(tmpdir.join('dataholder'))
holder = helpers.FilesystemDataHolder(dataholder_path)
return holder
def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
_test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
@operation
def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
_concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
_test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
@operation
def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
if not first:
raise RuntimeError('MESSAGE')
def _test(context, executor, lock_files, func, dataholder, expected_failure):
def _node(ctx):
return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
interface_name, operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
key = 'key'
first_value = 'value1'
second_value = 'value2'
arguments = {
'lock_files': lock_files,
'key': key,
'first_value': first_value,
'second_value': second_value,
'holder_path': dataholder.path
}
node = _node(context)
interface = mock.models.create_interface(
node.service,
interface_name,
operation_name,
operation_kwargs=dict(function='{0}.{1}'.format(__name__, func.__name__),
arguments=arguments)
)
node.interfaces[interface.name] = interface
context.model.node.update(node)
@workflow
def mock_workflow(graph, **_):
graph.add_tasks(
api.task.OperationTask(
node,
interface_name=interface_name,
operation_name=operation_name,
arguments=arguments),
api.task.OperationTask(
node,
interface_name=interface_name,
operation_name=operation_name,
arguments=arguments)
)
signal = events.on_failure_task_signal
with events_collector(signal) as collected:
try:
execute_workflow(mock_workflow, context, executor)
except ExecutorException:
pass
props = _node(context).attributes
assert dataholder['invocations'] == 2
assert props[key].value == dataholder[key]
exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
if expected_failure:
assert exceptions
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
yield result
result.close()
@pytest.fixture
def context(tmpdir):
result = mock.context.simple(str(tmpdir))
yield result
storage.release_sqlite_storage(result.model)
@pytest.fixture
def lock_files(tmpdir):
return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
holder = helpers.FilesystemDataHolder(holder_path)
locker1 = fasteners.InterProcessLock(lock_files[0])
locker2 = fasteners.InterProcessLock(lock_files[1])
first = locker1.acquire(blocking=False)
if first:
# Give chance for both processes to acquire locks
while locker2.acquire(blocking=False):
locker2.release()
time.sleep(0.1)
else:
locker2.acquire()
node.attributes[key] = first_value if first else second_value
holder['key'] = first_value if first else second_value
holder.setdefault('invocations', 0)
holder['invocations'] += 1
if first:
locker1.release()
else:
with locker1:
locker2.release()
return first