blob: a5efaa8f528f3a6f4d8851636099581657814667 [file] [log] [blame]
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from datetime import datetime
import os
import shutil
import tempfile
from trac.util.datefmt import FixedOffset, utc
from bhsearch.api import ASC, DESC, SCORE, SortInstruction
from bhsearch.query_parser import DefaultQueryParser
from bhsearch.tests import unittest
from bhsearch.tests.base import BaseBloodhoundSearchTest
from bhsearch.whoosh_backend import WhooshBackend, \
WhooshEmptyFacetErrorWorkaround
from whoosh import index, query, sorting
from whoosh.fields import ID, KEYWORD, TEXT, Schema
from whoosh.qparser import MultifieldParser, MultifieldPlugin, PhrasePlugin, \
QueryParser, WhitespacePlugin
class WhooshBackendTestCase(BaseBloodhoundSearchTest):
def setUp(self):
super(WhooshBackendTestCase, self).setUp()
self.whoosh_backend = WhooshBackend(self.env)
self.whoosh_backend.recreate_index()
self.parser = DefaultQueryParser(self.env)
def test_can_retrieve_docs(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
self.whoosh_backend.add_doc(dict(id="2", type="ticket"))
result = self.whoosh_backend.query(
query.Every(),
sort = [SortInstruction("id", ASC)],
)
self.print_result(result)
self.assertEqual(2, result.hits)
docs = result.docs
self.assertEqual(
{'id': u'1', 'type': u'ticket', 'unique_id': u'empty:ticket:1',
'score': u'1'},
docs[0])
self.assertEqual(
{'id': u'2', 'type': u'ticket', 'unique_id': u'empty:ticket:2',
'score': u'2'},
docs[1])
def test_can_return_all_fields(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
result = self.whoosh_backend.query(query.Every())
self.print_result(result)
docs = result.docs
self.assertEqual(
{'id': u'1', 'type': u'ticket', 'unique_id': u'empty:ticket:1',
"score": 1.0},
docs[0])
def test_can_select_fields(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
result = self.whoosh_backend.query(query.Every(),
fields=("id", "type"))
self.print_result(result)
docs = result.docs
self.assertEqual(
{'id': '1', 'type': 'ticket'},
docs[0])
def test_can_survive_after_restart(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
whoosh_backend2 = WhooshBackend(self.env)
whoosh_backend2.add_doc(dict(id="2", type="ticket"))
result = whoosh_backend2.query(query.Every())
self.assertEqual(2, result.hits)
def test_can_apply_multiple_sort_conditions_asc(self):
self.whoosh_backend.add_doc(dict(id="2", type="ticket2"))
self.whoosh_backend.add_doc(dict(id="3", type="ticket1"))
self.whoosh_backend.add_doc(dict(id="4", type="ticket3"))
self.whoosh_backend.add_doc(dict(id="1", type="ticket1"))
result = self.whoosh_backend.query(
query.Every(),
sort = [SortInstruction("type", ASC), SortInstruction("id", ASC)],
fields=("id", "type"),
)
self.print_result(result)
self.assertEqual([{'type': 'ticket1', 'id': '1'},
{'type': 'ticket1', 'id': '3'},
{'type': 'ticket2', 'id': '2'},
{'type': 'ticket3', 'id': '4'}],
result.docs)
def test_can_apply_multiple_sort_conditions_desc(self):
self.whoosh_backend.add_doc(dict(id="2", type="ticket2"))
self.whoosh_backend.add_doc(dict(id="3", type="ticket1"))
self.whoosh_backend.add_doc(dict(id="4", type="ticket3"))
self.whoosh_backend.add_doc(dict(id="1", type="ticket1"))
result = self.whoosh_backend.query(
query.Every(),
sort = [SortInstruction("type", ASC), SortInstruction("id", DESC)],
fields=("id", "type"),
)
self.print_result(result)
self.assertEqual([{'type': 'ticket1', 'id': '3'},
{'type': 'ticket1', 'id': '1'},
{'type': 'ticket2', 'id': '2'},
{'type': 'ticket3', 'id': '4'}],
result.docs)
def test_can_sort_by_score_and_date(self):
the_first_date = datetime(2012, 12, 1)
the_second_date = datetime(2012, 12, 2)
the_third_date = datetime(2012, 12, 3)
exact_match_string = "texttofind"
not_exact_match_string = "texttofind bla"
self.whoosh_backend.add_doc(dict(
id="1",
type="ticket",
summary=not_exact_match_string,
time=the_first_date,
))
self.whoosh_backend.add_doc(dict(
id="2",
type="ticket",
summary=exact_match_string,
time=the_second_date,
))
self.whoosh_backend.add_doc(dict(
id="3",
type="ticket",
summary=not_exact_match_string,
time=the_third_date,
))
self.whoosh_backend.add_doc(dict(
id="4",
type="ticket",
summary="some text out of search scope",
time=the_third_date,
))
parsed_query = self.parser.parse("summary:texttofind")
result = self.whoosh_backend.query(
parsed_query,
sort = [
SortInstruction(SCORE, ASC),
SortInstruction("time", DESC)
],
)
self.print_result(result)
self.assertEqual(3, result.hits)
docs = result.docs
#must be found first, because the highest score (of exact match)
self.assertEqual("2", docs[0]["id"])
#must be found second, because the time order DESC
self.assertEqual("3", docs[1]["id"])
#must be found third, because the time order DESC
self.assertEqual("1", docs[2]["id"])
def test_can_do_facet_count(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket", product="A"))
self.whoosh_backend.add_doc(dict(id="2", type="ticket", product="B"))
self.whoosh_backend.add_doc(dict(id="3", type="wiki", product="A"))
result = self.whoosh_backend.query(
query.Every(),
sort = [SortInstruction("type", ASC), SortInstruction("id", DESC)],
fields=("id", "type"),
facets= ("type", "product")
)
self.print_result(result)
self.assertEqual(3, result.hits)
facets = result.facets
self.assertEqual({"ticket":2, "wiki":1}, facets["type"])
self.assertEqual({"A":2, "B":1}, facets["product"])
def test_can_do_facet_if_filed_missing_TODO(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
self.whoosh_backend.add_doc(dict(id="2", type="ticket", status="New"))
result = self.whoosh_backend.query(
query.Every(),
facets= ("type", "status")
)
self.print_result(result)
self.assertEqual(2, result.hits)
facets = result.facets
self.assertEqual({"ticket":2}, facets["type"])
self.assertEqual({None: 1, 'New': 1}, facets["status"])
def test_can_return_empty_result(self):
result = self.whoosh_backend.query(
query.Every(),
sort = [SortInstruction("type", ASC), SortInstruction("id", DESC)],
fields=("id", "type"),
facets= ("type", "product")
)
self.print_result(result)
self.assertEqual(0, result.hits)
def test_can_search_time_with_utc_tzinfo(self):
time = datetime(2012, 12, 13, 11, 8, 34, 711957,
tzinfo=FixedOffset(0, 'UTC'))
self.whoosh_backend.add_doc(dict(id="1", type="ticket", time=time))
result = self.whoosh_backend.query(query.Every())
self.print_result(result)
self.assertEqual(time, result.docs[0]["time"])
def test_can_search_time_without_tzinfo(self):
time = datetime(2012, 12, 13, 11, 8, 34, 711957, tzinfo=None)
self.whoosh_backend.add_doc(dict(id="1", type="ticket", time=time))
result = self.whoosh_backend.query(query.Every())
self.print_result(result)
self.assertEqual(time.replace(tzinfo=utc), result.docs[0]["time"])
def test_can_search_time_with_non_utc_tzinfo(self):
hours = 8
tz_diff = 1
time = datetime(2012, 12, 13, 11, hours, 34, 711957,
tzinfo=FixedOffset(tz_diff, "just_one_timezone"))
self.whoosh_backend.add_doc(dict(id="1", type="ticket", time=time))
result = self.whoosh_backend.query(query.Every())
self.print_result(result)
self.assertEqual(datetime(2012, 12, 13, 11, hours-tz_diff, 34, 711957,
tzinfo=utc), result.docs[0]["time"])
def test_can_apply_filter_and_facet(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
self.whoosh_backend.add_doc(dict(id="2", type="wiki" ))
result = self.whoosh_backend.query(
query.Every(),
filter=query.Term("type", "ticket"),
facets=["type"]
)
self.print_result(result)
self.assertEqual(1, result.hits)
self.assertEqual("ticket", result.docs[0]["type"])
@unittest.skip("TODO clarify behavior on Whoosh mail list")
def test_can_search_id_and_summary_TODO(self):
#arrange
self.insert_ticket("test x")
self.insert_ticket("test 1")
fieldboosts = dict(
id = 1,
summary = 1,
)
mfp = MultifieldPlugin(list(fieldboosts.keys()),)
pins = [WhitespacePlugin,
PhrasePlugin,
mfp]
parser = QueryParser(None, WhooshBackend.SCHEMA, plugins=pins)
parsed_query = parser.parse("1")
result = self.whoosh_backend.query(parsed_query)
self.print_result(result)
self.assertEqual(2, result.hits)
def test_no_index_error_when_counting_facet_on_missing_field(self):
"""
Whoosh 2.4.1 raises "IndexError: list index out of range"
when search contains facets on field that is missing in at least one
document in the index. The error manifests only when index contains
more than one segment
Introduced workaround should solve this problem.
"""
#add more tickets to make sure we have more than one segment in index
count = 20
for i in range(count):
self.insert_ticket("test %s" % (i))
result = self.whoosh_backend.query(
query.Every(),
facets=["milestone"]
)
self.assertEquals(count, result.hits)
def test_can_query_missing_field_and_type(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
self.whoosh_backend.add_doc(dict(id="2", type="ticket", milestone="A"))
self.whoosh_backend.add_doc(dict(id="3", type="wiki"))
filter = self.parser.parse_filters(["NOT (milestone:*)", "type:ticket"])
result = self.whoosh_backend.query(
query.Every(),
filter=filter,
)
self.print_result(result)
self.assertEqual(1, result.hits)
self.assertEqual("1", result.docs[0]["id"])
def test_can_query_missing_field(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
self.whoosh_backend.add_doc(dict(id="2", type="ticket", milestone="A"))
filter = self.parser.parse_filters(["NOT (milestone:*)"])
result = self.whoosh_backend.query(
query.Every(),
filter=filter,
)
self.print_result(result)
self.assertEqual(1, result.hits)
self.assertEqual("1", result.docs[0]["id"])
@unittest.skip("TODO clarify behavior on Whoosh mail list")
def test_can_query_missing_field_and_type_with_no_results(self):
self.whoosh_backend.add_doc(dict(id="1", type="ticket"))
self.whoosh_backend.add_doc(dict(id="3", type="wiki"))
filter = self.parser.parse_filters(["NOT (milestone:*)", "type:ticket"])
result = self.whoosh_backend.query(
query.Every(),
filter=filter,
)
self.print_result(result)
self.assertEqual(0, result.hits)
def test_can_highlight_given_terms(self):
term = 'search_term'
text = "foo foo %s bar bar" % term
self.whoosh_backend.add_doc(dict(id="1", type="ticket", content=text))
self.whoosh_backend.add_doc(dict(id="3", type="wiki", content=text))
search_query = self.parser.parse(term)
result = self.whoosh_backend.query(
search_query,
highlight=True,
highlight_fields=['content', 'summary']
)
self.print_result(result)
self.assertEqual(len(result.highlighting), 2)
for highlight in result.highlighting:
self.assertIn(self._highlighted(term), highlight['content'])
self.assertEquals("", highlight['summary'])
def test_that_highlighting_escapes_html(self):
term = 'search_term'
text = "bla <a href=''>%s bar</a> bla" % term
self.whoosh_backend.add_doc(dict(id="1", type="ticket", content=text))
search_query = self.parser.parse(term)
result = self.whoosh_backend.query(
search_query,
highlight=True,
highlight_fields=['content']
)
self.print_result(result)
self.assertEqual(len(result.highlighting), 1)
highlight = result.highlighting[0]
self.assertEquals(
"bla &lt;a href=''&gt;<em>search_term</em> bar&lt;/a&gt; bla",
highlight['content'])
def test_highlights_all_text_fields_by_default(self):
term = 'search_term'
text = "foo foo %s bar bar" % term
self.whoosh_backend.add_doc(dict(id="1", type="ticket", content=text))
self.whoosh_backend.add_doc(dict(id="3", type="wiki", content=text))
search_query = self.parser.parse(term)
result = self.whoosh_backend.query(
search_query,
highlight=True,
)
self.print_result(result)
self.assertEqual(len(result.highlighting), 2)
for highlight in result.highlighting:
self.assertIn('content', highlight)
self.assertIn('summary', highlight)
self.assertIn(self._highlighted(term), highlight['content'])
def test_only_highlights_terms_in_fields_that_match_query(self):
term = 'search_term'
self.whoosh_backend.add_doc(dict(id=term, type="wiki", content=term))
self.whoosh_backend.add_doc(dict(id=term, type="ticket", summary=term))
search_query = self.parser.parse('id:%s' % term)
result = self.whoosh_backend.query(
search_query,
highlight=True,
highlight_fields=["id", "content", "summary"]
)
self.print_result(result)
self.assertEqual(len(result.highlighting), 2)
for highlight in result.highlighting:
self.assertIn(self._highlighted(term), highlight['id'])
self.assertNotIn(self._highlighted(term), highlight['summary'])
self.assertNotIn(self._highlighted(term), highlight['content'])
def _highlighted(self, term):
return '<em>%s</em>' % term
class WhooshIndexCreationTests(BaseBloodhoundSearchTest):
def setUp(self):
super(WhooshIndexCreationTests, self).setUp()
self.index_dir = os.path.join(self.env.path, 'whoosh_index')
if not os.path.exists(self.index_dir):
os.mkdir(self.index_dir)
def test_does_not_automatically_create_index(self):
whoosh_backend = WhooshBackend(self.env)
self.assertIs(whoosh_backend.index, None)
self.assertEqual(whoosh_backend.is_index_outdated(), True)
whoosh_backend.recreate_index()
self.assertEqual(whoosh_backend.is_index_outdated(), False)
self.assertIsNot(whoosh_backend.index, None)
def test_detects_that_index_needs_upgrade(self):
wrong_schema = Schema(content=TEXT())
index.create_in(self.index_dir, schema=wrong_schema)
whoosh_backend = WhooshBackend(self.env)
self.assertEqual(whoosh_backend.is_index_outdated(), True)
whoosh_backend.recreate_index()
self.assertEqual(whoosh_backend.is_index_outdated(), False)
class WhooshFunctionalityTestCase(unittest.TestCase):
def setUp(self):
self.index_dir = tempfile.mkdtemp('whoosh_index')
def tearDown(self):
shutil.rmtree(self.index_dir)
def test_groupedby_empty_field(self):
schema = Schema(
unique_id=ID(stored=True, unique=True),
id=ID(stored=True),
type=ID(stored=True),
status=KEYWORD(stored=True),
content=TEXT(stored=True),
)
ix = index.create_in(self.index_dir, schema=schema)
with ix.writer() as w:
w.add_document(unique_id=u"1", type=u"type1")
w.add_document(unique_id=u"2", type=u"type2", status=u"New")
facet_fields = (u"type", u"status" )
groupedby = facet_fields
with ix.searcher() as s:
r = s.search(
query.Every(),
groupedby=groupedby,
maptype=sorting.Count,
)
facets = self._load_facets(r)
self.assertEquals(
{'status': {None: 1, 'New': 1}, 'type': {'type1': 1, 'type2': 1}},
facets)
def test_out_of_range_on_empty_facets(self):
"""
Whoosh raises exception IndexError: list index out of range
when search contains facets on field that is missing in at least one
document in the index. The error manifests only when index contains
more than one segment
The problem expected to be fixed in the next release.
For the time of being, whoosh-backend have to introduce workaround in
order to fix the problem. This unit-test is just a reminder to remove
workaround when the fixed version of Whoosh is applied.
"""
schema = Schema(
unique_id=ID(stored=True, unique=True),
status=ID(stored=True),
)
# ix = RamStorage().create_index(schema)
ix = index.create_in(self.index_dir, schema=schema)
def insert_docs():
with ix.writer() as w:
for i in range(10):
w.add_document(unique_id=unicode(i))
#the problem occurs only when index contains more than one segment
insert_docs()
insert_docs()
with ix.searcher() as s:
with self.assertRaises(IndexError):
s.search(
query.Every(),
groupedby=(u"status"),
maptype=sorting.Count,
)
def _load_facets(self, non_paged_results):
facet_names = non_paged_results.facet_names()
if not facet_names:
return None
facets_result = dict()
for name in facet_names:
facets_result[name] = non_paged_results.groups(name)
return facets_result
def test_can_auto_commit(self):
# pylint: disable=unused-argument
schema = Schema(
unique_id=ID(stored=True, unique=True),
type=ID(stored=True),
)
ix = index.create_in(self.index_dir, schema=schema)
with ix.writer() as w:
w.add_document(unique_id=u"1", type=u"type1")
w.add_document(unique_id=u"2", type=u"type2")
with ix.searcher() as s:
results = s.search(query.Every())
self.assertEquals(2, len(results))
def test_can_auto_cancel(self):
schema = Schema(
unique_id=ID(stored=True, unique=True),
type=ID(stored=True),
)
ix = index.create_in(self.index_dir, schema=schema)
try:
with ix.writer() as w:
w.add_document(unique_id=u"1", type=u"type1")
w.add_document(unique_id=u"2", type=u"type2")
raise Exception("some exception")
except Exception:
pass
with ix.searcher() as s:
results = s.search(query.Every())
self.assertEquals(0, len(results))
def test_handles_stop_words_in_queries(self):
schema = WhooshBackend.SCHEMA
ix = index.create_in(self.index_dir, schema=schema)
with ix.writer() as w:
w.add_document(content=u"A nice sentence with stop words.")
with ix.searcher() as s:
query_text = u"with stop"
# field_names both ignore stop words
q = MultifieldParser(['content', 'summary'],
WhooshBackend.SCHEMA).parse(query_text)
self.assertEqual(unicode(q.simplify(s)),
u'((content:with OR summary:with) AND '
u'(content:stop OR summary:stop))')
self.assertEqual(len(s.search(q)), 1)
# 'content' and 'id' ignores stop words
q = MultifieldParser(['content', 'id'],
WhooshBackend.SCHEMA).parse(query_text)
self.assertEqual(unicode(q.simplify(s)),
u'((content:with OR id:with) AND '
u'(content:stop OR id:stop))')
self.assertEqual(len(s.search(q)), 1)
def test_can_filter_to_no_results(self):
schema = Schema(
id=ID(stored=True),
filter=TEXT(stored=True),
)
ix = index.create_in(self.index_dir, schema=schema)
with ix.writer() as w:
w.add_document(id=u"1", filter=u"f1")
w.add_document(id=u"2", filter=u"f2")
with ix.searcher() as s:
r = s.search(
query.Every(),
filter=QueryParser('', schema).parse(u"filter:other")
)
self.assertEquals(len(r), 0)
class WhooshEmptyFacetErrorWorkaroundTestCase(BaseBloodhoundSearchTest):
def setUp(self):
super(WhooshEmptyFacetErrorWorkaroundTestCase, self).setUp()
self.whoosh_backend = WhooshBackend(self.env)
self.whoosh_backend.recreate_index()
self.parser = DefaultQueryParser(self.env)
self.empty_facet_workaround = WhooshEmptyFacetErrorWorkaround(self.env)
def tearDown(self):
shutil.rmtree(self.env.path)
self.env.reset_db()
def test_set_should_not_be_empty_fields(self):
self.insert_ticket("test x")
result = self.whoosh_backend.query(query.Every())
self.print_result(result)
doc = result.docs[0]
null_marker = WhooshEmptyFacetErrorWorkaround.NULL_MARKER
self.assertEqual(null_marker, doc["component"])
self.assertEqual(null_marker, doc["status"])
self.assertEqual(null_marker, doc["milestone"])
def test_can_fix_query_filter(self):
parsed_filter = self.parser.parse_filters(
["type:ticket", "NOT (milestone:*)"])
query_parameters = dict(filter=parsed_filter)
self.empty_facet_workaround.query_pre_process(
query_parameters)
result_filter = query_parameters["filter"]
self.assertEquals('(type:ticket AND milestone:empty)',
str(result_filter))
def test_does_interfere_query_filter_if_not_needed(self):
parsed_filter = self.parser.parse_filters(
["type:ticket", "milestone:aaa"])
query_parameters = dict(filter=parsed_filter)
self.empty_facet_workaround.query_pre_process(
query_parameters)
result_filter = query_parameters["filter"]
self.assertEquals('(type:ticket AND milestone:aaa)',
str(result_filter))
def suite():
test_suite = unittest.TestSuite()
test_suite.addTest(unittest.makeSuite(WhooshBackendTestCase, 'test'))
test_suite.addTest(unittest.makeSuite(WhooshFunctionalityTestCase, 'test'))
test_suite.addTest(
unittest.makeSuite(WhooshEmptyFacetErrorWorkaroundTestCase, 'test'))
return test_suite
if __name__ == '__main__':
unittest.main()