blob: 8a3b87e8f69efdd9b23d28abf758ca74d473ed04 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime
from aria.modeling import models
from aria.orchestrator import decorators
from aria.orchestrator.workflows.builtin.workflows import (
NORMATIVE_STANDARD_INTERFACE,
NORMATIVE_CREATE,
NORMATIVE_START,
NORMATIVE_STOP,
NORMATIVE_DELETE,
NORMATIVE_CONFIGURE,
NORMATIVE_CONFIGURE_INTERFACE,
NORMATIVE_PRE_CONFIGURE_SOURCE,
NORMATIVE_PRE_CONFIGURE_TARGET,
NORMATIVE_POST_CONFIGURE_SOURCE,
NORMATIVE_POST_CONFIGURE_TARGET,
NORMATIVE_ADD_SOURCE,
NORMATIVE_ADD_TARGET,
NORMATIVE_REMOVE_TARGET,
NORMATIVE_REMOVE_SOURCE
)
SERVICE_TEMPLATE_NAME = 'test_service_template'
SERVICE_NAME = 'test_service1'
NODE_TEMPLATE_NAME = 'test_node_template'
NODE_NAME = 'test_node1'
WORKFLOW_NAME = 'test_workflow'
TASK_RETRY_INTERVAL = 1
TASK_MAX_ATTEMPTS = 1
DEPENDENCY_NODE_TEMPLATE_NAME = 'dependency_node_template'
DEPENDENCY_NODE_NAME = 'dependency_node'
DEPENDENT_NODE_TEMPLATE_NAME = 'dependent_node_template'
DEPENDENT_NODE_NAME = 'dependent_node'
def create_service_template(name=SERVICE_TEMPLATE_NAME, description=None, inputs=None):
now = datetime.now()
inputs = inputs or {}
return models.ServiceTemplate(
name=name,
description=description,
inputs=inputs,
created_at=now,
updated_at=now,
main_file_name='main_file_name',
node_types=models.Type(variant='node', name='test_node_type'),
group_types=models.Type(variant='group', name='test_group_type'),
policy_types=models.Type(variant='policy', name='test_policy_type'),
relationship_types=models.Type(variant='relationship', name='test_relationship_type'),
capability_types=models.Type(variant='capability', name='test_capability_type'),
artifact_types=models.Type(variant='artifact', name='test_artifact_type'),
interface_types=models.Type(variant='interface', name='test_interface_type')
)
def create_service(service_template, name=SERVICE_NAME, inputs=None):
now = datetime.utcnow()
inputs = inputs or {}
return models.Service(
name=name,
inputs=inputs,
service_template=service_template,
description='',
created_at=now,
updated_at=now,
)
def create_service_with_dependencies(include_execution=False,
include_input=False,
include_output=False,
include_node=False):
service_template = create_service_template()
service = create_service(service_template=service_template)
if include_execution:
execution = create_execution(service=service, status=models.Execution.STARTED)
service.executions = [execution]
execution.id = '1'
if include_input:
input = create_input(name='input1', value='value1')
service.inputs = {'input1': input}
if include_output:
output = create_output(name='output1', value='value1')
service.outputs = {'output1': output}
if include_node:
node_template = create_node_template(service_template=service_template)
node = create_node(node_template, service, state=models.Node.STARTED)
node.id = '1'
return service
def create_node_template_with_dependencies(include_node=False, include_property=False):
service_template = create_service_template()
node_template = create_node_template(service_template=service_template)
if include_node:
service = create_service(service_template=service_template)
create_node(dependency_node_template=node_template, service=service)
if include_property:
node_template.properties = {'prop1': create_property(name='prop1', value='value1')}
return node_template
def create_node_with_dependencies(include_attribute=False):
node_template = create_node_template_with_dependencies()
node_template.service_template.services[0] = create_service(node_template.service_template)
node = create_node(node_template, node_template.service_template.services[0])
if include_attribute:
node.attributes['attribute1'] = models.Attribute.wrap('attribute1', 'value1') # pylint: disable=unsubscriptable-object
return node
def create_node_template(service_template,
name=NODE_TEMPLATE_NAME,
type=models.Type(variant='node', name='test_node_type'),
capability_templates=None,
requirement_templates=None,
interface_templates=None):
capability_templates = capability_templates or {}
requirement_templates = requirement_templates or []
interface_templates = interface_templates or {}
node_template = models.NodeTemplate(
name=name,
type=type,
capability_templates=capability_templates,
requirement_templates=requirement_templates,
interface_templates=interface_templates,
service_template=service_template)
service_template.node_templates[node_template.name] = node_template
return node_template
def create_dependency_node_template(service_template, name=DEPENDENCY_NODE_TEMPLATE_NAME):
node_type = service_template.node_types.get_descendant('test_node_type')
capability_type = service_template.capability_types.get_descendant('test_capability_type')
capability_template = models.CapabilityTemplate(
name='capability',
type=capability_type
)
return create_node_template(
service_template=service_template,
name=name,
type=node_type,
capability_templates=_dictify(capability_template)
)
def create_dependent_node_template(
service_template, dependency_node_template, name=DEPENDENT_NODE_TEMPLATE_NAME):
the_type = service_template.node_types.get_descendant('test_node_type')
requirement_template = models.RequirementTemplate(
name='requirement',
target_node_template=dependency_node_template
)
return create_node_template(
service_template=service_template,
name=name,
type=the_type,
interface_templates=_dictify(get_standard_interface_template(service_template)),
requirement_templates=[requirement_template],
)
def create_node(dependency_node_template, service, name=NODE_NAME, state=models.Node.INITIAL):
node = models.Node(
name=name,
type=dependency_node_template.type,
version=None,
node_template=dependency_node_template,
state=state,
service=service,
interfaces=get_standard_interface(service),
)
service.nodes[node.name] = node
return node
def create_relationship(source, target):
return models.Relationship(
source_node=source,
target_node=target,
interfaces=get_configure_interfaces(service=source.service),
)
def create_interface_template(service_template, interface_name, operation_name,
operation_kwargs=None, interface_kwargs=None):
the_type = service_template.interface_types.get_descendant('test_interface_type')
operation_template = models.OperationTemplate(
name=operation_name,
**(operation_kwargs or {})
)
return models.InterfaceTemplate(
type=the_type,
operation_templates=_dictify(operation_template),
name=interface_name,
**(interface_kwargs or {})
)
def create_operation(operation_name, operation_kwargs=None):
if operation_kwargs and operation_kwargs.get('arguments'):
operation_kwargs['arguments'] = dict(
(argument_name, models.Argument.wrap(argument_name, argument_value))
for argument_name, argument_value in operation_kwargs['arguments'].iteritems()
if argument_value is not None)
return models.Operation(
name=operation_name,
**(operation_kwargs or {})
)
def create_interface(service, interface_name, operation_name, operation_kwargs=None,
interface_kwargs=None):
the_type = service.service_template.interface_types.get_descendant('test_interface_type')
operation = create_operation(operation_name, operation_kwargs)
return models.Interface(
type=the_type,
operations=_dictify(operation),
name=interface_name,
**(interface_kwargs or {})
)
def create_execution(service, status=models.Execution.PENDING):
return models.Execution(
service=service,
status=status,
workflow_name=WORKFLOW_NAME,
created_at=datetime.utcnow(),
started_at=datetime.utcnow(),
inputs={}
)
def create_plugin(name='test_plugin', package_version='0.1'):
return models.Plugin(
name=name,
archive_name='archive_name',
distribution='distribution',
distribution_release='dist_release',
distribution_version='dist_version',
package_name='package',
package_source='source',
package_version=package_version,
supported_platform='any',
supported_py_versions=['python27'],
uploaded_at=datetime.now(),
wheels=[],
)
def create_plugin_specification(name='test_plugin', version='0.1'):
return models.PluginSpecification(
name=name,
version=version
)
def _create_parameter(name, value, model_cls):
return model_cls.wrap(name, value)
def create_property(name, value):
return _create_parameter(name, value, model_cls=models.Property)
def create_input(name, value):
return _create_parameter(name, value, model_cls=models.Input)
def create_output(name, value):
return _create_parameter(name, value, model_cls=models.Output)
def _dictify(item):
return dict(((item.name, item),))
def get_standard_interface_template(service_template):
the_type = service_template.interface_types.get_descendant('test_interface_type')
op_templates = dict(
(op_name, models.OperationTemplate(
name=op_name, implementation='{0}.{1}'.format(__file__, mock_operation.__name__)))
for op_name in (NORMATIVE_CREATE, NORMATIVE_CONFIGURE, NORMATIVE_START,
NORMATIVE_STOP, NORMATIVE_DELETE)
)
return models.InterfaceTemplate(name=NORMATIVE_STANDARD_INTERFACE,
operation_templates=op_templates,
type=the_type)
def get_standard_interface(service):
the_type = service.service_template.interface_types.get_descendant('test_interface_type')
ops = dict(
(op_name, models.Operation(
name=op_name, implementation='{0}.{1}'.format(__file__, mock_operation.__name__)))
for op_name in (NORMATIVE_CREATE, NORMATIVE_CONFIGURE, NORMATIVE_START,
NORMATIVE_STOP, NORMATIVE_DELETE)
)
return {
NORMATIVE_STANDARD_INTERFACE:
models.Interface(name=NORMATIVE_STANDARD_INTERFACE, operations=ops, type=the_type)
}
def get_configure_interfaces(service):
the_type = service.service_template.interface_types.get_descendant('test_interface_type')
operations = dict(
(op_name, models.Operation(
name=op_name, implementation='{0}.{1}'.format(__file__, mock_operation.__name__)))
for op_name in (NORMATIVE_PRE_CONFIGURE_SOURCE,
NORMATIVE_POST_CONFIGURE_SOURCE,
NORMATIVE_ADD_SOURCE,
NORMATIVE_REMOVE_SOURCE,
NORMATIVE_PRE_CONFIGURE_TARGET,
NORMATIVE_POST_CONFIGURE_TARGET,
NORMATIVE_ADD_TARGET,
NORMATIVE_REMOVE_TARGET)
)
interface = {
NORMATIVE_CONFIGURE_INTERFACE: models.Interface(
name=NORMATIVE_CONFIGURE_INTERFACE, operations=operations, type=the_type)
}
return interface
@decorators.operation
def mock_operation(*args, **kwargs):
pass