blob: 099515c09d1dde385f2f9578e27701f42dd971d9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from aria.orchestrator.workflows import api
from aria.orchestrator.workflows.core import engine, graph_compiler
from aria.orchestrator.workflows.executor import process
from aria.orchestrator import workflow, operation
import tests
from tests import mock
from tests import storage
TEST_FILE_CONTENT = 'CONTENT'
TEST_FILE_ENTRY_ID = 'entry'
TEST_FILE_NAME = 'test_file'
def test_serialize_operation_context(context, executor, tmpdir):
test_file = tmpdir.join(TEST_FILE_NAME)
test_file.write(TEST_FILE_CONTENT)
resource = context.resource
resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file))
node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
plugin = mock.models.create_plugin()
context.model.plugin.put(plugin)
interface = mock.models.create_interface(
node.service,
'test',
'op',
operation_kwargs=dict(function=_operation_mapping(),
plugin=plugin)
)
node.interfaces[interface.name] = interface
context.model.node.update(node)
graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter
graph_compiler.GraphCompiler(context, executor.__class__).compile(graph)
eng = engine.Engine(executor)
eng.execute(context)
@workflow
def _mock_workflow(ctx, graph):
node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
graph.add_tasks(api.task.OperationTask(node, interface_name='test', operation_name='op'))
return graph
@operation
def _mock_operation(ctx):
# We test several things in this operation
# ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
# a correct ctx.task.function tells us we kept the correct task_id
assert ctx.task.function == _operation_mapping()
# a correct ctx.node.name tells us we kept the correct actor_id
assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
# a correct ctx.name tells us we kept the correct name
assert ctx.name is not None
assert ctx.name == ctx.task.name
# a correct ctx.deployment.name tells us we kept the correct deployment_id
assert ctx.service.name == mock.models.SERVICE_NAME
# Here we test that the resource storage was properly re-created
test_file_content = ctx.resource.service_template.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
assert test_file_content == TEST_FILE_CONTENT
# a non empty plugin workdir tells us that we kept the correct base_workdir
assert ctx.plugin_workdir is not None
def _operation_mapping():
return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)
@pytest.fixture
def executor():
result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
try:
yield result
finally:
result.close()
@pytest.fixture
def context(tmpdir):
result = mock.context.simple(
str(tmpdir),
context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
)
yield result
storage.release_sqlite_storage(result.model)