blob: 7d5f40cc1f794c1043995d6580daa250049548d4 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Operation contexts.
"""
import threading
from contextlib import contextmanager
import aria
from aria.utils import file
from . import common
class BaseOperationContext(common.BaseContext):
"""
Base class for contexts used during operation creation and execution.
"""
def __init__(self, task_id, actor_id, **kwargs):
self._task_id = task_id
self._actor_id = actor_id
self._thread_local = threading.local()
self._destroy_session = kwargs.pop('destroy_session', False)
logger_level = kwargs.pop('logger_level', None)
super(BaseOperationContext, self).__init__(**kwargs)
self._register_logger(task_id=self.task.id, level=logger_level)
def __repr__(self):
details = 'function={task.function}; ' \
'operation_arguments={task.arguments}'\
.format(task=self.task)
return '{name}({0})'.format(details, name=self.name)
@property
def task(self):
"""
The task in the model storage
:return: Task model
"""
# SQLAlchemy prevents from accessing an object which was created on a different thread.
# So we retrieve the object from the storage if the current thread isn't the same as the
# original thread.
if not hasattr(self._thread_local, 'task'):
self._thread_local.task = self.model.task.get(self._task_id)
return self._thread_local.task
@property
def plugin_workdir(self):
"""
A work directory that is unique to the plugin and the deployment id
"""
if self.task.plugin is None:
return None
plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir,
self.service.id,
self.task.plugin.name)
file.makedirs(plugin_workdir)
return plugin_workdir
@property
def serialization_dict(self):
context_dict = {
'name': self.name,
'service_id': self._service_id,
'task_id': self._task_id,
'actor_id': self._actor_id,
'workdir': self._workdir,
'model_storage': self.model.serialization_dict if self.model else None,
'resource_storage': self.resource.serialization_dict if self.resource else None,
'execution_id': self._execution_id,
'logger_level': self.logger.level
}
return {
'context_cls': self.__class__,
'context': context_dict
}
@classmethod
def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
if model_storage:
model_storage = aria.application_model_storage(**model_storage)
if resource_storage:
resource_storage = aria.application_resource_storage(**resource_storage)
return cls(model_storage=model_storage,
resource_storage=resource_storage,
destroy_session=True,
**kwargs)
def close(self):
if self._destroy_session:
self.model.log._session.remove()
self.model.log._engine.dispose()
@property
@contextmanager
def persist_changes(self):
yield
self.model.task.update(self.task)
class NodeOperationContext(BaseOperationContext):
"""
Context for node operations.
"""
@property
def node_template(self):
"""
the node of the current operation
:return:
"""
return self.node.node_template
@property
def node(self):
"""
The node instance of the current operation
:return:
"""
return self.model.node.get(self._actor_id)
class RelationshipOperationContext(BaseOperationContext):
"""
Context for relationship operations.
"""
@property
def source_node_template(self):
"""
The source node
:return:
"""
return self.source_node.node_template
@property
def source_node(self):
"""
The source node instance
:return:
"""
return self.relationship.source_node
@property
def target_node_template(self):
"""
The target node
:return:
"""
return self.target_node.node_template
@property
def target_node(self):
"""
The target node instance
:return:
"""
return self.relationship.target_node
@property
def relationship(self):
"""
The relationship instance of the current operation
:return:
"""
return self.model.relationship.get(self._actor_id)