blob: 4c4e0140abb2170016d61ca5d4ea055c32c159f9 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ...utils.formatting import (json_dumps, yaml_dumps)
from ..loading import UriLocation
from ..presentation import PresenterNotFoundError
from .consumer import Consumer
PRESENTATION_CACHE = {}
CANONICAL_LOCATION_CACHE = {}
class Read(Consumer):
"""
Reads the presentation, handling imports recursively.
It works by consuming a data source via appropriate :class:`~aria.parser.loading.Loader`,
:class:`~aria.parser.reading.Reader`, and :class:`~aria.parser.presentation.Presenter`
instances.
It supports agnostic raw data composition for presenters that have
``_get_import_locations``, ``_validate_import``, and ``_merge_import``.
To improve performance, loaders are called asynchronously on separate threads.
Note that parsing may internally trigger more than one loading/reading/presentation
cycle, for example if the agnostic raw data has dependencies that must also be parsed.
"""
def __init__(self, context):
super(Read, self).__init__(context)
self._cache = {}
def consume(self):
# Present the main location and all imports recursively
main_result, all_results = self._present_all()
# Merge presentations
main_result.merge(all_results, self.context)
# Cache merged presentations
if self.context.presentation.cache:
for result in all_results:
result.cache()
self.context.presentation.presenter = main_result.presentation
if main_result.canonical_location is not None:
self.context.presentation.location = main_result.canonical_location
def dump(self):
if self.context.has_arg_switch('yaml'):
indent = self.context.get_arg_value_int('indent', 2)
raw = self.context.presentation.presenter._raw
self.context.write(yaml_dumps(raw, indent=indent))
elif self.context.has_arg_switch('json'):
indent = self.context.get_arg_value_int('indent', 2)
raw = self.context.presentation.presenter._raw
self.context.write(json_dumps(raw, indent=indent))
else:
self.context.presentation.presenter._dump(self.context)
def _handle_exception(self, e):
if isinstance(e, _CancelPresentation):
return
super(Read, self)._handle_exception(e)
def _present_all(self):
"""
Presents all locations, including all nested imports, from the main location. Uses a thread
pool executor for best performance.
The main presentation is returned separately for easier access.
"""
location = self.context.presentation.location
if location is None:
self.context.validation.report('Read consumer: missing location')
return
executor = self.context.presentation.create_executor()
try:
# This call may recursively submit tasks to the executor if there are imports
main_result = self._present(location, None, None, executor)
# Wait for all tasks to complete
executor.drain()
# Handle exceptions
for e in executor.exceptions:
self._handle_exception(e)
all_results = executor.returns or []
finally:
executor.close()
all_results.insert(0, main_result)
return main_result, all_results
def _present(self, location, origin_canonical_location, origin_presenter_class, executor):
"""
Presents a single location. If the location has imports, those are submitted to the thread
pool executor.
Supports a presentation cache based on the canonical location as cache key.
"""
# Link the context to this thread
self.context.set_thread_local()
# Canonicalize the location
if self.context.reading.reader is None:
loader, canonical_location = self._create_loader(location, origin_canonical_location)
else:
# If a reader is specified in the context then we skip loading
loader = None
canonical_location = location
# Skip self imports
if canonical_location == origin_canonical_location:
raise _CancelPresentation()
if self.context.presentation.cache:
# Is the presentation in the global cache?
try:
presentation = PRESENTATION_CACHE[canonical_location]
return _Result(presentation, canonical_location, origin_canonical_location)
except KeyError:
pass
try:
# Is the presentation in the local cache?
presentation = self._cache[canonical_location]
return _Result(presentation, canonical_location, origin_canonical_location)
except KeyError:
pass
# Create and cache new presentation
presentation = self._create_presentation(canonical_location, loader, origin_presenter_class)
self._cache[canonical_location] = presentation
# Submit imports to executor
if hasattr(presentation, '_get_import_locations'):
import_locations = presentation._get_import_locations(self.context)
if import_locations:
for import_location in import_locations:
import_location = UriLocation(import_location)
executor.submit(self._present, import_location, canonical_location,
presentation.__class__, executor)
return _Result(presentation, canonical_location, origin_canonical_location)
def _create_loader(self, location, origin_canonical_location):
loader = self.context.loading.loader_source.get_loader(self.context.loading, location,
origin_canonical_location)
if origin_canonical_location is not None:
# The cache key is is a combination of the canonical location of the origin, which is
# globally absolute and never changes, and our location, which might be relative to
# the origin's location
cache_key = (origin_canonical_location, location)
try:
canonical_location = CANONICAL_LOCATION_CACHE[cache_key]
return loader, canonical_location
except KeyError:
pass
else:
cache_key = None
try:
canonical_location = loader.get_canonical_location()
except NotImplementedError:
canonical_location = None
# Because retrieving the canonical location can be costly, we will try to cache it
if cache_key is not None:
CANONICAL_LOCATION_CACHE[cache_key] = canonical_location
return loader, canonical_location
def _create_presentation(self, canonical_location, loader, default_presenter_class):
# The reader we specified in the context will override
reader = self.context.reading.reader
if reader is None:
# Read raw data from loader
reader = self.context.reading.reader_source.get_reader(self.context.reading,
canonical_location, loader)
raw = reader.read()
# Wrap raw data in presenter class
if self.context.presentation.presenter_class is not None:
# The presenter class we specified in the context will override
presenter_class = self.context.presentation.presenter_class
else:
try:
presenter_class = self.context.presentation.presenter_source.get_presenter(raw)
except PresenterNotFoundError:
if default_presenter_class is None:
raise
else:
presenter_class = default_presenter_class
if presenter_class is None:
raise PresenterNotFoundError(u'presenter not found: {0}'.format(canonical_location))
presentation = presenter_class(raw=raw)
if hasattr(presentation, '_link_locators'):
presentation._link_locators()
return presentation
class _Result(object):
"""
The result of a :meth:`Read._present` call. Contains the read presentation itself, as well as
extra fields to help caching and keep track of merging.
"""
def __init__(self, presentation, canonical_location, origin_canonical_location):
self.presentation = presentation
self.canonical_location = canonical_location
self.origin_canonical_location = origin_canonical_location
self.merged = False
def get_imports(self, results):
imports = []
def has_import(result):
for i in imports:
if i.canonical_location == result.canonical_location:
return True
return False
for result in results:
if result.origin_canonical_location == self.canonical_location:
if not has_import(result):
imports.append(result)
return imports
def merge(self, results, context):
# Make sure to only merge each presentation once
if self.merged:
return
self.merged = True
for result in results:
if result.presentation == self.presentation:
result.merged = True
for result in self.get_imports(results):
# Make sure import is merged
result.merge(results, context)
# Validate import
if hasattr(self.presentation, '_validate_import'):
if not self.presentation._validate_import(context, result.presentation):
# _validate_import will report an issue if invalid
continue
# Merge import
if hasattr(self.presentation, '_merge_import'):
self.presentation._merge_import(result.presentation)
def cache(self):
if not self.merged:
# Only merged presentations can be cached
return
PRESENTATION_CACHE[self.canonical_location] = self.presentation
class _CancelPresentation(Exception):
pass