blob: 449e282dd0ff55d3ea591290444d1c54b2cc7826 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Operation contexts.
import threading
import aria
from aria.utils import file
from . import common
class BaseOperationContext(common.BaseContext):
Base class for contexts used during operation creation and execution.
def __init__(self, task_id, actor_id, **kwargs):
self._task_id = task_id
self._actor_id = actor_id
self._thread_local = threading.local()
self._destroy_session = kwargs.pop('destroy_session', False)
logger_level = kwargs.pop('logger_level', None)
super(BaseOperationContext, self).__init__(**kwargs)
self._register_logger(, level=logger_level)
def __repr__(self):
details = u'function={task.function}; ' \
return u'{name}({0})'.format(details,
def task(self):
The task in the model storage.
# SQLAlchemy prevents from accessing an object which was created on a different thread.
# So we retrieve the object from the storage if the current thread isn't the same as the
# original thread.
if not hasattr(self._thread_local, 'task'):
self._thread_local.task = self.model.task.get(self._task_id)
return self._thread_local.task
def plugin_workdir(self):
A work directory that is unique to the plugin and the service ID.
if self.task.plugin is None:
return None
plugin_workdir = u'{0}/plugins/{1}/{2}'.format(self._workdir,,
return plugin_workdir
def serialization_dict(self):
context_dict = {
'service_id': self._service_id,
'task_id': self._task_id,
'actor_id': self._actor_id,
'workdir': self._workdir,
'model_storage': self.model.serialization_dict if self.model else None,
'resource_storage': self.resource.serialization_dict if self.resource else None,
'execution_id': self._execution_id,
'logger_level': self.logger.level
return {
'context_cls': self.__class__,
'context': context_dict
def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
if model_storage:
model_storage = aria.application_model_storage(**model_storage)
if resource_storage:
resource_storage = aria.application_resource_storage(**resource_storage)
return cls(model_storage=model_storage,
def close(self):
if self._destroy_session:
class NodeOperationContext(BaseOperationContext):
Context for node operations.
def node(self):
The node of the current operation.
return self.model.node.get(self._actor_id)
def node_template(self):
The node template of the current operation.
return self.node.node_template
class RelationshipOperationContext(BaseOperationContext):
Context for relationship operations.
def relationship(self):
The relationship instance of the current operation.
return self.model.relationship.get(self._actor_id)
def source_node(self):
The relationship source node.
return self.relationship.source_node
def source_node_template(self):
The relationship source node template.
return self.source_node.node_template
def target_node(self):
The relationship target node.
return self.relationship.target_node
def target_node_template(self):
The relationship target node template.
return self.target_node.node_template