blob: cdd35001c5ec99b4ae40a549208a0592b087d274 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import pymongo
from ming import Session
from ming.orm.base import state
from ming.orm.ormsession import ThreadLocalORMSession, SessionExtension
from contextlib import contextmanager
from allura.lib.utils import chunked_list
from allura.tasks import index_tasks
log = logging.getLogger(__name__)
class ArtifactSessionExtension(SessionExtension):
def __init__(self, session):
SessionExtension.__init__(self, session)
self.objects_added = []
self.objects_modified = []
self.objects_deleted = []
def before_flush(self, obj=None):
if obj is None:
self.objects_added = list(
self.objects_modified = list(self.session.uow.dirty)
self.objects_deleted = list(self.session.uow.deleted)
else: # pragma no cover
st = state(obj)
if st.status ==
self.objects_added = [obj]
elif st.status == st.dirty:
self.objects_modified = [obj]
elif st.status == st.deleted:
self.objects_deleted = [obj]
def after_flush(self, obj=None):
"Update artifact references, and add/update this artifact to solr"
if not getattr(self.session, 'disable_artifact_index', False):
from pylons import app_globals as g
from .index import ArtifactReference, Shortlink
from .session import main_orm_session
# Ensure artifact references & shortlinks exist for new objects
arefs = []
arefs = [
for obj in self.objects_added + self.objects_modified]
for obj in self.objects_added + self.objects_modified:
# Flush shortlinks
except Exception:
"Failed to update artifact references. Is this a borked project migration?")
self.update_index(self.objects_deleted, arefs)
for obj in self.objects_added:
g.zarkov_event('create', extra=obj.index_id())
for obj in self.objects_modified:
g.zarkov_event('modify', extra=obj.index_id())
for obj in self.objects_deleted:
g.zarkov_event('delete', extra=obj.index_id())
self.objects_added = []
self.objects_modified = []
self.objects_deleted = []
def update_index(self, objects_deleted, arefs):
# Post delete and add indexing operations
if objects_deleted:
[obj.index_id() for obj in objects_deleted])
if arefs:[aref._id for aref in arefs])
class BatchIndexer(ArtifactSessionExtension):
Tracks needed search index operations over the life of a
:class:`ming.odm.session.ThreadLocalODMSession` session, and performs them
in a batch when :meth:`flush` is called.
to_delete = set()
to_add = set()
def __init__(self, session):
ArtifactSessionExtension.__init__(self, session)
def update_index(self, objects_deleted, arefs_added):
Caches adds and deletes for handling later. Called after each flush of
the parent session.
:param objects_deleted: :class:`allura.model.artifact.Artifact`
instances that were deleted in the flush.
:param arefs_added: :class:`allura.model.artifact.ArtifactReference`
instances for all ``Artifact`` instances that were added or modified
in the flush.
from .index import ArtifactReference
del_index_ids = [obj.index_id() for obj in objects_deleted]
deleted_aref_ids = [aref._id for aref in
ArtifactReference.query.find(dict(_id={'$in': del_index_ids}))]
cls = self.__class__
cls.to_add -= set(deleted_aref_ids)
cls.to_delete |= set(del_index_ids)
cls.to_add |= set([aref._id for aref in arefs_added])
def flush(cls):
Creates indexing tasks for cached adds and deletes, and resets the
.. warning:: This method is NOT called automatically when the parent
session is flushed. It MUST be called explicitly.
# Post in chunks to avoid overflowing the max BSON document
# size when the Monq task is created:
# cls.to_delete - contains solr index ids which can easily be over
# 100 bytes. Here we allow for 160 bytes avg, plus
# room for other document overhead.
# cls.to_add - contains BSON ObjectIds, which are 12 bytes each, so
# we can easily put 1m in a doc with room left over.
if cls.to_delete:
for chunk in chunked_list(list(cls.to_delete), 100 * 1000):
cls._post(index_tasks.del_artifacts, chunk)
if cls.to_add:
for chunk in chunked_list(list(cls.to_add), 1000 * 1000):
cls._post(index_tasks.add_artifacts, chunk)
cls.to_delete = set()
cls.to_add = set()
def _post(cls, task_func, chunk):
Post task, recursively splitting and re-posting if the resulting
mongo document is too large.
except pymongo.errors.InvalidDocument as e:
# there are many types of InvalidDocument, only recurse if its
# expected to help
if str(e).startswith('BSON document too large'):
cls._post(task_func, chunk[:len(chunk) // 2])
cls._post(task_func, chunk[len(chunk) // 2:])
def substitute_extensions(session, extensions=None):
Temporarily replace the extensions on a
:class:`ming.odm.session.ThreadLocalODMSession` session.
original_exts = session._kwargs.get('extensions', [])
def _set_exts(exts):
session._kwargs['extensions'] = exts
_set_exts(extensions or [])
yield session
main_doc_session = Session.by_name('main')
project_doc_session = Session.by_name('project')
task_doc_session = Session.by_name('task')
main_orm_session = ThreadLocalORMSession(main_doc_session)
project_orm_session = ThreadLocalORMSession(project_doc_session)
task_orm_session = ThreadLocalORMSession(task_doc_session)
artifact_orm_session = ThreadLocalORMSession(
repository_orm_session = ThreadLocalORMSession(