blob: 8c073134cf95bee4f3489331824854aa197606f1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import logging
import pymongo
from ming import Session
from ming.orm.base import state
from ming.orm.ormsession import ThreadLocalORMSession, SessionExtension
from contextlib import contextmanager
from allura.lib.utils import chunked_list
from allura.tasks import index_tasks
log = logging.getLogger(__name__)
class ArtifactSessionExtension(SessionExtension):
def __init__(self, session):
SessionExtension.__init__(self, session)
self.objects_added = []
self.objects_modified = []
self.objects_deleted = []
def before_flush(self, obj=None):
if obj is None:
self.objects_added = list(self.session.uow.new)
self.objects_modified = list(self.session.uow.dirty)
self.objects_deleted = list(self.session.uow.deleted)
else: # pragma no cover
st = state(obj)
if st.status == st.new:
self.objects_added = [obj]
elif st.status == st.dirty:
self.objects_modified = [obj]
elif st.status == st.deleted:
self.objects_deleted = [obj]
def after_flush(self, obj=None):
"Update artifact references, and add/update this artifact to solr"
if not getattr(self.session, 'disable_artifact_index', False):
from pylons import app_globals as g
from .index import ArtifactReference, Shortlink
from .session import main_orm_session
# Ensure artifact references & shortlinks exist for new objects
arefs = []
try:
arefs = [
ArtifactReference.from_artifact(o)
for o in self.objects_added + self.objects_modified]
for obj in self.objects_added + self.objects_modified:
Shortlink.from_artifact(obj)
# Flush shortlinks
main_orm_session.flush()
except Exception:
log.exception(
"Failed to update artifact references. Is this a borked project migration?")
self.update_index(self.objects_deleted, arefs)
for obj in self.objects_added:
g.zarkov_event('create', extra=obj.index_id())
for obj in self.objects_modified:
g.zarkov_event('modify', extra=obj.index_id())
for obj in self.objects_deleted:
g.zarkov_event('delete', extra=obj.index_id())
self.objects_added = []
self.objects_modified = []
self.objects_deleted = []
def update_index(self, objects_deleted, arefs):
# Post delete and add indexing operations
if objects_deleted:
index_tasks.del_artifacts.post(
[obj.index_id() for obj in objects_deleted])
add_task = None
if arefs:
add_task = index_tasks.add_artifacts.post([aref._id for aref in arefs])
try:
l = logging.getLogger('allura.debug7047')
from tg import request
task = request.environ.get('task')
if task and task.task_name == 'forgetracker.tasks.bulk_edit':
l.debug('session: %s %s', self.session.impl.db, self.session)
l.debug('arefs: %s', arefs)
l.debug('objects_added: %s', [o._id for o in self.objects_added])
l.debug('objects_modified: %s', [o._id for o in self.objects_modified])
l.debug('objects_deleted: %s', [o._id for o in self.objects_deleted])
l.debug('add_artifacts task: %s', add_task)
except:
pass
class BatchIndexer(ArtifactSessionExtension):
"""
Tracks needed search index operations over the life of a
:class:`ming.odm.session.ThreadLocalODMSession` session, and performs them
in a batch when :meth:`flush` is called.
"""
to_delete = set()
to_add = set()
def __init__(self, session):
ArtifactSessionExtension.__init__(self, session)
def update_index(self, objects_deleted, arefs_added):
"""
Caches adds and deletes for handling later. Called after each flush of
the parent session.
:param objects_deleted: :class:`allura.model.artifact.Artifact`
instances that were deleted in the flush.
:param arefs_added: :class:`allura.model.artifact.ArtifactReference`
instances for all ``Artifact`` instances that were added or modified
in the flush.
"""
from .index import ArtifactReference
del_index_ids = [obj.index_id() for obj in objects_deleted]
deleted_aref_ids = [aref._id for aref in
ArtifactReference.query.find(dict(_id={'$in': del_index_ids}))]
cls = self.__class__
cls.to_add -= set(deleted_aref_ids)
cls.to_delete |= set(del_index_ids)
cls.to_add |= set([aref._id for aref in arefs_added])
@classmethod
def flush(cls):
"""
Creates indexing tasks for cached adds and deletes, and resets the
caches.
.. warning:: This method is NOT called automatically when the parent
session is flushed. It MUST be called explicitly.
"""
# Post in chunks to avoid overflowing the max BSON document
# size when the Monq task is created:
# cls.to_delete - contains solr index ids which can easily be over
# 100 bytes. Here we allow for 160 bytes avg, plus
# room for other document overhead.
# cls.to_add - contains BSON ObjectIds, which are 12 bytes each, so
# we can easily put 1m in a doc with room left over.
if cls.to_delete:
for chunk in chunked_list(list(cls.to_delete), 100 * 1000):
cls._post(index_tasks.del_artifacts, chunk)
if cls.to_add:
for chunk in chunked_list(list(cls.to_add), 1000 * 1000):
cls._post(index_tasks.add_artifacts, chunk)
cls.to_delete = set()
cls.to_add = set()
@classmethod
def _post(cls, task_func, chunk):
"""
Post task, recursively splitting and re-posting if the resulting
mongo document is too large.
"""
try:
task_func.post(chunk)
except pymongo.errors.InvalidDocument as e:
# there are many types of InvalidDocument, only recurse if its
# expected to help
if str(e).startswith('BSON document too large'):
cls._post(task_func, chunk[:len(chunk) // 2])
cls._post(task_func, chunk[len(chunk) // 2:])
else:
raise
@contextmanager
def substitute_extensions(session, extensions=None):
"""
Temporarily replace the extensions on a
:class:`ming.odm.session.ThreadLocalODMSession` session.
"""
original_exts = session._kwargs.get('extensions', [])
def _set_exts(exts):
session.flush()
session.close()
session._kwargs['extensions'] = exts
_set_exts(extensions or [])
yield session
_set_exts(original_exts)
main_doc_session = Session.by_name('main')
project_doc_session = Session.by_name('project')
task_doc_session = Session.by_name('task')
main_orm_session = ThreadLocalORMSession(main_doc_session)
project_orm_session = ThreadLocalORMSession(project_doc_session)
task_orm_session = ThreadLocalORMSession(task_doc_session)
artifact_orm_session = ThreadLocalORMSession(
doc_session=project_doc_session,
extensions=[ArtifactSessionExtension])
repository_orm_session = ThreadLocalORMSession(
doc_session=main_doc_session,
extensions=[])