blob: 656a05bc3bf9e12473b2a7dd4e413f0396ef115d [file] [log] [blame]
use std::collections::HashMap;
use std::io;
use std::marker::PhantomData;
use std::ops::Range;
use rustc_hash::FxHashMap;
use super::stacker::Addr;
use crate::fastfield::MultiValuedFastFieldWriter;
use crate::fieldnorm::FieldNormReaders;
use crate::indexer::doc_id_mapping::DocIdMapping;
use crate::postings::recorder::{BufferLender, Recorder};
use crate::postings::{
FieldSerializer, IndexingContext, InvertedIndexSerializer, PerFieldPostingsWriter,
UnorderedTermId,
};
use crate::schema::{Field, FieldType, Schema, Term};
use crate::termdict::TermOrdinal;
use crate::tokenizer::{Token, TokenStream, MAX_TOKEN_LEN};
use crate::DocId;
const POSITION_GAP: u32 = 1;
fn make_field_partition(
term_offsets: &[(Term<&[u8]>, Addr, UnorderedTermId)],
) -> Vec<(Field, Range<usize>)> {
let term_offsets_it = term_offsets
.iter()
.map(|(term, _, _)| term.field())
.enumerate();
let mut prev_field_opt = None;
let mut fields = vec![];
let mut offsets = vec![];
for (offset, field) in term_offsets_it {
if Some(field) != prev_field_opt {
prev_field_opt = Some(field);
fields.push(field);
offsets.push(offset);
}
}
offsets.push(term_offsets.len());
let mut field_offsets = vec![];
for i in 0..fields.len() {
field_offsets.push((fields[i], offsets[i]..offsets[i + 1]));
}
field_offsets
}
/// Serialize the inverted index.
/// It pushes all term, one field at a time, towards the
/// postings serializer.
pub(crate) fn serialize_postings(
ctx: IndexingContext,
per_field_postings_writers: &PerFieldPostingsWriter,
fieldnorm_readers: FieldNormReaders,
doc_id_map: Option<&DocIdMapping>,
schema: &Schema,
serializer: &mut InvertedIndexSerializer,
) -> crate::Result<HashMap<Field, FxHashMap<UnorderedTermId, TermOrdinal>>> {
let mut term_offsets: Vec<(Term<&[u8]>, Addr, UnorderedTermId)> =
Vec::with_capacity(ctx.term_index.len());
term_offsets.extend(ctx.term_index.iter());
term_offsets.sort_unstable_by_key(|(k, _, _)| k.clone());
let mut unordered_term_mappings: HashMap<Field, FxHashMap<UnorderedTermId, TermOrdinal>> =
HashMap::new();
let field_offsets = make_field_partition(&term_offsets);
for (field, byte_offsets) in field_offsets {
let field_entry = schema.get_field_entry(field);
match *field_entry.field_type() {
FieldType::Str(_) | FieldType::Facet(_) => {
// populating the (unordered term ord) -> (ordered term ord) mapping
// for the field.
let unordered_term_ids = term_offsets[byte_offsets.clone()]
.iter()
.map(|&(_, _, bucket)| bucket);
let mapping: FxHashMap<UnorderedTermId, TermOrdinal> = unordered_term_ids
.enumerate()
.map(|(term_ord, unord_term_id)| {
(unord_term_id as UnorderedTermId, term_ord as TermOrdinal)
})
.collect();
unordered_term_mappings.insert(field, mapping);
}
FieldType::U64(_)
| FieldType::I64(_)
| FieldType::F64(_)
| FieldType::Date(_)
| FieldType::Bool(_) => {}
FieldType::Bytes(_) => {}
FieldType::JsonObject(_) => {}
FieldType::IpAddr(_) => {}
}
let postings_writer = per_field_postings_writers.get_for_field(field);
let fieldnorm_reader = fieldnorm_readers.get_field(field)?;
let mut field_serializer =
serializer.new_field(field, postings_writer.total_num_tokens(), fieldnorm_reader)?;
postings_writer.serialize(
&term_offsets[byte_offsets],
doc_id_map,
&ctx,
&mut field_serializer,
)?;
field_serializer.close()?;
}
Ok(unordered_term_mappings)
}
#[derive(Default)]
pub(crate) struct IndexingPosition {
pub num_tokens: u32,
pub end_position: u32,
}
/// The `PostingsWriter` is in charge of receiving documenting
/// and building a `Segment` in anonymous memory.
///
/// `PostingsWriter` writes in a `MemoryArena`.
pub(crate) trait PostingsWriter: Send + Sync {
/// Record that a document contains a term at a given position.
///
/// * doc - the document id
/// * pos - the term position (expressed in tokens)
/// * term - the term
/// * ctx - Contains a term hashmap and a memory arena to store all necessary posting list
/// information.
fn subscribe(
&mut self,
doc: DocId,
pos: u32,
term: &Term,
ctx: &mut IndexingContext,
) -> UnorderedTermId;
/// Serializes the postings on disk.
/// The actual serialization format is handled by the `PostingsSerializer`.
fn serialize(
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()>;
/// Tokenize a text and subscribe all of its token.
fn index_text(
&mut self,
doc_id: DocId,
token_stream: &mut dyn TokenStream,
term_buffer: &mut Term,
ctx: &mut IndexingContext,
indexing_position: &mut IndexingPosition,
mut term_id_fast_field_writer_opt: Option<&mut MultiValuedFastFieldWriter>,
) {
let end_of_path_idx = term_buffer.len_bytes();
let mut num_tokens = 0;
let mut end_position = indexing_position.end_position;
token_stream.process(&mut |token: &Token| {
// We skip all tokens with a len greater than u16.
if token.text.len() > MAX_TOKEN_LEN {
warn!(
"A token exceeding MAX_TOKEN_LEN ({}>{}) was dropped. Search for \
MAX_TOKEN_LEN in the documentation for more information.",
token.text.len(),
MAX_TOKEN_LEN
);
return;
}
term_buffer.truncate_value_bytes(end_of_path_idx);
term_buffer.append_bytes(token.text.as_bytes());
let start_position = indexing_position.end_position + token.position as u32;
end_position = end_position.max(start_position + token.position_length as u32);
let unordered_term_id = self.subscribe(doc_id, start_position, term_buffer, ctx);
if let Some(term_id_fast_field_writer) = term_id_fast_field_writer_opt.as_mut() {
term_id_fast_field_writer.add_val(unordered_term_id);
}
num_tokens += 1;
});
indexing_position.end_position = end_position + POSITION_GAP;
indexing_position.num_tokens += num_tokens;
term_buffer.truncate_value_bytes(end_of_path_idx);
}
fn total_num_tokens(&self) -> u64;
}
/// The `SpecializedPostingsWriter` is just here to remove dynamic
/// dispatch to the recorder information.
#[derive(Default)]
pub(crate) struct SpecializedPostingsWriter<Rec: Recorder> {
total_num_tokens: u64,
_recorder_type: PhantomData<Rec>,
}
impl<Rec: Recorder> From<SpecializedPostingsWriter<Rec>> for Box<dyn PostingsWriter> {
fn from(
specialized_postings_writer: SpecializedPostingsWriter<Rec>,
) -> Box<dyn PostingsWriter> {
Box::new(specialized_postings_writer)
}
}
impl<Rec: Recorder> SpecializedPostingsWriter<Rec> {
#[inline]
pub(crate) fn serialize_one_term(
term: &Term<&[u8]>,
addr: Addr,
doc_id_map: Option<&DocIdMapping>,
buffer_lender: &mut BufferLender,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let recorder: Rec = ctx.term_index.read(addr);
let term_doc_freq = recorder.term_doc_freq().unwrap_or(0u32);
serializer.new_term(term.value_bytes(), term_doc_freq)?;
recorder.serialize(&ctx.arena, doc_id_map, serializer, buffer_lender);
serializer.close_term()?;
Ok(())
}
}
impl<Rec: Recorder> PostingsWriter for SpecializedPostingsWriter<Rec> {
fn subscribe(
&mut self,
doc: DocId,
position: u32,
term: &Term,
ctx: &mut IndexingContext,
) -> UnorderedTermId {
debug_assert!(term.as_slice().len() >= 4);
self.total_num_tokens += 1;
let (term_index, arena) = (&mut ctx.term_index, &mut ctx.arena);
term_index.mutate_or_create(term.as_slice(), |opt_recorder: Option<Rec>| {
if let Some(mut recorder) = opt_recorder {
let current_doc = recorder.current_doc();
if current_doc != doc {
recorder.close_doc(arena);
recorder.new_doc(doc, arena);
}
recorder.record_position(position, arena);
recorder
} else {
let mut recorder = Rec::default();
recorder.new_doc(doc, arena);
recorder.record_position(position, arena);
recorder
}
}) as UnorderedTermId
}
fn serialize(
&self,
term_addrs: &[(Term<&[u8]>, Addr, UnorderedTermId)],
doc_id_map: Option<&DocIdMapping>,
ctx: &IndexingContext,
serializer: &mut FieldSerializer,
) -> io::Result<()> {
let mut buffer_lender = BufferLender::default();
for (term, addr, _) in term_addrs {
Self::serialize_one_term(term, *addr, doc_id_map, &mut buffer_lender, ctx, serializer)?;
}
Ok(())
}
fn total_num_tokens(&self) -> u64 {
self.total_num_tokens
}
}